随手尝试了一下StreamID的的用法。留个笔记。
==数据样例==
{ "Address": "小桥镇小桥中学对面", "CityCode": "511300", "CountyCode": "511322", "EnterpriseCode": "YUNDA", "MailNo": "667748320345", "Mobile": "183****5451", "Name": "王***", "ProvCode": "510000", "Weight": "39" }
==拓扑结构==
==程序源码==
<Spout1>
package test; import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import common.simulate.DataRandom; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; public class Spout1 extends BaseRichSpout { private SpoutOutputCollector _collector = null; private DataRandom _dataRandom = null; private int _timeInterval = 1000; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("Stream1", new Fields("json")); declarer.declareStream("Stream2", new Fields("json")); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _dataRandom = DataRandom.getInstance(); if (conf.containsKey(Constants.SpoutInterval)) { _timeInterval = Integer.valueOf((String) conf.get(Constants.SpoutInterval)); } } @Override public void nextTuple() { try { Thread.sleep(_timeInterval); } catch (InterruptedException e) { e.printStackTrace(); } JSONObject jsonObject = _dataRandom.getRandomExpressData(); System.out.print("[---Spout1---]jsonObject=" + jsonObject + "\n"); _collector.emit("Stream1", new Values(jsonObject.toJSONString())); _collector.emit("Stream2", new Values(jsonObject.toJSONString())); } }
<CountBolt1>
package test; import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; public class CountBolt1 extends BaseRichBolt { private OutputCollector _collector = null; private int taskId = 0; private Map<String, Integer> _map = new HashMap<>(); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("Stream3", new Fields("company", "count")); } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector = collector; taskId = context.getThisTaskId(); } @Override public void execute(Tuple input) { String str = input.getStringByField("json"); JSONObject jsonObject = JSONObject.parseObject(str); String company = jsonObject.getString(Constants.EnterpriseCode); int count = 0; if (_map.containsKey(company)) { count = _map.get(company); } count++; _map.put(company, count); _collector.emit("Stream3", new Values(company, count)); System.out.print("[---CountBolt1---]" + "taskId=" + taskId + ", company=" + company + ", count=" + count + "\n"); } }
<CountBolt2>
package test; import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; import java.util.UUID; public class CountBolt2 extends BaseRichBolt { private OutputCollector _collector = null; private int _taskId = 0; private Map<String, Integer> _map = new HashMap<>(); @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { _collector = outputCollector; _taskId = topologyContext.getThisTaskId(); } @Override public void execute(Tuple tuple) { String str = tuple.getStringByField("json"); JSONObject jsonObject = JSONObject.parseObject(str); String prov = jsonObject.getString(Constants.ProvCode); int count = 0; if (_map.containsKey(prov)) { count = _map.get(prov); } count++; _map.put(prov, count); _collector.emit("Stream4", new Values(prov, count, UUID.randomUUID())); System.out.print("[---CountBolt2---]" + "taskId=" + _taskId + ", prov=" + prov + ", count=" + count + "\n"); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declareStream("Stream4", new Fields("prov", "count", "random")); } }
<CountBolt3>
package test; import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; import java.util.UUID; public class CountBolt3 extends BaseRichBolt { private OutputCollector _collector = null; private int _taskId = 0; private Map<String, Integer> _map = new HashMap<>(); @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { _collector = outputCollector; _taskId = topologyContext.getThisTaskId(); } @Override public void execute(Tuple tuple) { String str = tuple.getStringByField("json"); JSONObject jsonObject = JSONObject.parseObject(str); String city = jsonObject.getString(Constants.CityCode); int count = 0; if (_map.containsKey(city)) { count = _map.get(city); } count++; _map.put(city, count); _collector.emit("Stream4", new Values(city, count)); System.out.print("[---CountBolt3---]" + "taskId=" + _taskId + ", city=" + city + ", count=" + count + "\n"); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declareStream("Stream4", new Fields("city", "count")); } }
<TopBolt>
package test; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import java.util.List; import java.util.Map; public class TopBolt extends BaseRichBolt { @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } @Override public void execute(Tuple tuple) { System.out.print("[---TopBolt---]StreamID=" + tuple.getSourceStreamId() + "\n"); List<Object> values = tuple.getValues(); for(Object value : values) { System.out.print("[---TopBolt---]value=" + value + "\n"); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
<TestTopology>
package test; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class TestTopology { public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("Spout1", new Spout1()); builder.setBolt("Count1", new CountBolt1()).shuffleGrouping("Spout1", "Stream1"); builder.setBolt("Count2", new CountBolt2()).shuffleGrouping("Spout1", "Stream2"); builder.setBolt("Count3", new CountBolt3()).shuffleGrouping("Spout1", "Stream2"); builder.setBolt("Top", new TopBolt()) .fieldsGrouping("Count1", "Stream3", new Fields("company")) .fieldsGrouping("Count2", "Stream4", new Fields("prov")) .fieldsGrouping("Count3", "Stream4", new Fields("city")); Config config = new Config(); config.setNumWorkers(1); config.put(common.constants.Constants.SpoutInterval, args[1]); if (Boolean.valueOf(args[0])) { StormSubmitter.submitTopology("TestTopology1", config, builder.createTopology()); } else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("TestTopology1", config, builder.createTopology()); } } }
==结果日志==
[---Spout1---]jsonObject={"CityCode":"511300","CountyCode":"511322","Address":"小桥镇小桥中学对面","MailNo":"667748320345","ProvCode":"510000","Mobile":"183****5451","EnterpriseCode":"YUNDA","Weight":"39","Name":"王***"} [---CountBolt1---]taskId=1, company=YUNDA, count=1 [---CountBolt3---]taskId=3, city=511300, count=1 [---CountBolt2---]taskId=2, prov=510000, count=1 [---TopBolt---]StreamID=Stream4 [---TopBolt---]value=510000 [---TopBolt---]value=1 [---TopBolt---]value=99bd1cdb-d5c1-4ac8-b1a1-a4cfffb5a616 [---TopBolt---]StreamID=Stream4 [---TopBolt---]value=511300 [---TopBolt---]value=1 [---TopBolt---]StreamID=Stream3 [---TopBolt---]value=YUNDA [---TopBolt---]value=1