标签:
功能:将多个数据源的数据汇集到一个处理单元进行集中分类处理;
入口类TestMain
1 | public class TestMain { |
2 | |
3 | public static void main(String[] args) { |
4 | TopologyBuilder builder = new TopologyBuilder(); |
5 | builder.setSpout("random1", new RandomWordSpout1(), 1); |
6 | builder.setSpout("random2", new RandomWordSpout2(), 1); |
7 | builder.setSpout("random3", new RandomWordSpout3(), 1); |
8 | builder.setBolt("", new TransferBolt(), 1) |
9 | .localOrShuffleGrouping("random1", "stream1") |
10 | .localOrShuffleGrouping("random2", "stream2") |
11 | .localOrShuffleGrouping("random3", "stream3"); |
12 | |
13 | Config conf = new Config(); |
14 | conf.setDebug(false); |
15 | conf.setNumWorkers(1); |
16 | LocalCluster cluster = new LocalCluster(); |
17 | cluster.submitTopology("test-1", conf, builder.createTopology()); |
18 | } |
19 | } |
数据源类RandomWordSpout1 输出字段为name
1 | public class RandomWordSpout1 extends BaseRichSpout { |
2 | |
3 | private static final long serialVersionUID = -4287209449750623371L; |
4 | |
5 | private SpoutOutputCollector collector; |
6 | |
7 | @Override |
8 | public void open(@SuppressWarnings("rawtypes") Map conf, |
9 | TopologyContext context, SpoutOutputCollector collector) { |
10 | this.collector = collector; |
11 | } |
12 | |
13 | @Override |
14 | public void declareOutputFields(OutputFieldsDeclarer declarer) { |
15 | declarer.declareStream("stream1", new Fields("name")); |
16 | } |
17 | |
18 | @Override |
19 | public void nextTuple() { |
20 | collector.emit("stream1", new Values("RandomWordSpout1")); |
21 | } |
22 | |
23 | } |
数据源类RandomWordSpout2 输出字段为content
1 | public class RandomWordSpout2 extends BaseRichSpout { |
2 | |
3 | private SpoutOutputCollector collector; |
4 | |
5 | @Override |
6 | public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { |
7 | this.collector = collector; |
8 | } |
9 | |
10 | @Override |
11 | public void declareOutputFields(OutputFieldsDeclarer declarer) { |
12 | declarer.declareStream("stream2", new Fields("content")); |
13 | } |
14 | |
15 | @Override |
16 | public void nextTuple() { |
17 | collector.emit("stream2",new Values("RandomWordSpout2")); |
18 | } |
19 | |
20 | } |
数据源类RandomWordSpout3输出key、value两个字段
1 | public class RandomWordSpout3 extends BaseRichSpout { |
2 | |
3 | private SpoutOutputCollector collector; |
4 | |
5 | @Override |
6 | public void open(@SuppressWarnings("rawtypes") Map conf, |
7 | TopologyContext context, SpoutOutputCollector collector) { |
8 | this.collector = collector; |
9 | } |
10 | |
11 | @Override |
12 | public void declareOutputFields(OutputFieldsDeclarer declarer) { |
13 | declarer.declareStream("stream3", new Fields("key", "value")); |
14 | } |
15 | |
16 | @Override |
17 | public void nextTuple() { |
18 | collector.emit("stream3", new Values("chenx","happyday")); |
19 | |
20 | } |
21 | |
22 | } |
聚流处理类TransferBolt,输出从各流获取到的数据
1 | public class TransferBolt extends BaseBasicBolt { |
2 | |
3 | private static final long serialVersionUID = 4223708336037089125L; |
4 | private Map<String, Fields> _fieldMap = null; |
5 | |
6 | @Override |
7 | public void prepare(@SuppressWarnings("rawtypes") Map stormConf, |
8 | TopologyContext context) { |
9 | _fieldMap = new HashMap<String, Fields>(); |
10 | Set<GlobalStreamId> sourceSet = context.getThisSources().keySet(); |
11 | for (GlobalStreamId source : sourceSet) { |
12 | Fields fields = context.getComponentOutputFields( |
13 | source.get_componentId(), source.get_streamId()); |
14 | _fieldMap.put(source.get_componentId() + source.get_streamId(), |
15 | fields); |
16 | } |
17 | |
18 | } |
19 | |
20 | @Override |
21 | public void declareOutputFields(OutputFieldsDeclarer declarer) { |
22 | } |
23 | |
24 | @Override |
25 | public void execute(Tuple input, BasicOutputCollector collector) { |
26 | String key = input.getSourceComponent() + input.getSourceStreamId(); |
27 | Fields fields = _fieldMap.get(key); |
28 | int size = fields.size(); |
29 | String content = ""; |
30 | for (int i = 0; i < size; i++) { |
31 | content += input.getStringByField(fields.get(i)); |
32 | } |
33 | System.out.println("SourceComponent:" + input.getSourceComponent() |
34 | + ",SourceStreamId:" + input.getSourceStreamId() + ",content:" |
35 | + content); |
36 | } |
37 | |
38 | } |
标签:
原文地址:http://www.cnblogs.com/jianyuan/p/4830839.html