标签:
此案例实现从数组中随机读取字符串发送到bolt,bolt将字符串变成大写发送到下一个bolt,bolt将字符串加上时间戳然后写到文件中
public class RandomWordSpout extends BaseRichSpout { /** * 数据源 */ private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; String[] str = new String[]{"cloud","web","android","ios","java","bigdata","linux"}; /* * 初始化方法,在spout组件实例化时调用一次 */ @Override public void open(Map map, TopologyContext tc, SpoutOutputCollector collector) { this.collector = collector; } //不断地往下一个组件发送tuple消息 //这里面是该spout组件的核心逻辑 @Override public void nextTuple() { //可以从kafka消息队列中拿到数据,简便起见,我们从str数组中随机挑选一个发送出去 Random r = new Random(); int index = r.nextInt(str.length); String job = str[index]; //将信息封装成tuple,发送消息给下一个组件 collector.emit(new Values(job)); //每发送一个消息,休眠500ms Utils.sleep(500); } //声明本spout组件发送出去的tuple中的数据的字段名 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("orignname")); } }
public class HandlerBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; //业务处理逻辑 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String job = tuple.getString(0);//获取nextTuple()方法emit()过来的数据 String job_upper = job.toUpperCase(); collector.emit(new Values(job_upper));//处理完毕后向下一级发送 } //声明该bolt组件要发出去的tuple的字段 @Override public void declareOutputFields(OutputFieldsDeclarer declare) { declare.declare(new Fields("job_upper")); } }
public class MoreBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; FileWriter fileWriter = null; //在bolt组件运行过程中只会被调用一次 @Override public void prepare(Map stormConf, TopologyContext context) { try { fileWriter = new FileWriter("/home/hadoop/storm/" + UUID.randomUUID()); } catch (IOException e) { e.printStackTrace(); } } //该bolt组件的核心处理逻辑 //每收到一个tuple消息,就会被调用一次 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { Date date = new Date(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); String d = format.format(date); String job_upper = tuple.getString(0); String jbo_date = job_upper + d; try { fileWriter.write(jbo_date); fileWriter.write("\n"); fileWriter.flush(); } catch (IOException e) { e.printStackTrace(); } } //本bolt已经不需要发送tuple消息到下一个组件,所以不需要再声明tuple的字段 @Override public void declareOutputFields(OutputFieldsDeclarer declare) { } }
public class TopoMain { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder = new TopologyBuilder(); //将spout设置到topology中 //parallelism_hint:4 表示用4个excutor来执行这个组件 //setNumTasks(8),设置该组件执行时并发的task数量,也就意味着1个excutor会执行2个task builder.setSpout("randomspout", new RandomWordSpout(),4).setNumTasks(8); //将bolt设置到topology中,并且指定他接收randomspout组件的消息 builder.setBolt("upperjob", new HandlerBolt(),4).shuffleGrouping("randomspout"); //将bolt设置到topology中,并且指定他接收upperjob组件的消息 builder.setBolt("datejob", new MoreBolt(), 4).shuffleGrouping("upperjob"); //用builder来创建topology StormTopology topology = builder.createTopology(); //配置一些topology在集群中运行的参数 Config conf = new Config(); conf.setNumWorkers(4); conf.setDebug(true); conf.setNumAckers(0); //conf.set("hbase.zookeeper.quorum", "lp5,lp6,lp7"); //提交 StormSubmitter.submitTopology("demo3", conf, topology); } }
标签:
原文地址:http://blog.csdn.net/llllpppp4444/article/details/51360279