码迷,mamicode.com
首页 > 其他好文 > 详细

storm使用范例

时间:2016-05-12 16:43:43      阅读:150      评论:0      收藏:0      [点我收藏+]

标签:

此案例实现从数组中随机读取字符串发送到bolt,bolt将字符串变成大写发送到下一个bolt,bolt将字符串加上时间戳然后写到文件中

public class RandomWordSpout extends BaseRichSpout {
	/**
	 * 数据源
	 */
	private static final long serialVersionUID = 1L;
	private SpoutOutputCollector collector;
	String[] str = new String[]{"cloud","web","android","ios","java","bigdata","linux"};
	
	/*
	 * 初始化方法,在spout组件实例化时调用一次
	 */
	@Override
	public void open(Map map, TopologyContext tc, SpoutOutputCollector collector) {
		this.collector = collector;
	}
	
	//不断地往下一个组件发送tuple消息
	//这里面是该spout组件的核心逻辑
	@Override
	public void nextTuple() {
		//可以从kafka消息队列中拿到数据,简便起见,我们从str数组中随机挑选一个发送出去
		Random r = new Random();
		int index = r.nextInt(str.length);
		String job = str[index];
		//将信息封装成tuple,发送消息给下一个组件
		collector.emit(new Values(job));
		//每发送一个消息,休眠500ms
		Utils.sleep(500);
	}

	

	//声明本spout组件发送出去的tuple中的数据的字段名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("orignname"));
	}

}

public class HandlerBolt extends BaseBasicBolt {

	private static final long serialVersionUID = 1L;

	//业务处理逻辑
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String job = tuple.getString(0);//获取nextTuple()方法emit()过来的数据
		String job_upper = job.toUpperCase();
		
		collector.emit(new Values(job_upper));//处理完毕后向下一级发送
	}

	//声明该bolt组件要发出去的tuple的字段
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declare) {
		declare.declare(new Fields("job_upper"));
	}

}

public class MoreBolt extends BaseBasicBolt {
	
	private static final long serialVersionUID = 1L;
	FileWriter fileWriter = null;
	
	//在bolt组件运行过程中只会被调用一次
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		try {
			fileWriter = new FileWriter("/home/hadoop/storm/" + UUID.randomUUID());
		} catch (IOException e) {
			
			e.printStackTrace();
		}
		
	}

	//该bolt组件的核心处理逻辑
	//每收到一个tuple消息,就会被调用一次
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		Date date = new Date();
		SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
		String d = format.format(date);
		String job_upper = tuple.getString(0);
		String jbo_date = job_upper + d;
		try {
			fileWriter.write(jbo_date);
			fileWriter.write("\n");
			fileWriter.flush();
		} catch (IOException e) {
			
			e.printStackTrace();
		}
	}

	//本bolt已经不需要发送tuple消息到下一个组件,所以不需要再声明tuple的字段
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declare) {
		
	}

}

public class TopoMain {

	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		TopologyBuilder builder = new TopologyBuilder();
		//将spout设置到topology中
		//parallelism_hint:4 表示用4个excutor来执行这个组件
		//setNumTasks(8),设置该组件执行时并发的task数量,也就意味着1个excutor会执行2个task
		builder.setSpout("randomspout", new RandomWordSpout(),4).setNumTasks(8);
		//将bolt设置到topology中,并且指定他接收randomspout组件的消息
		builder.setBolt("upperjob", new HandlerBolt(),4).shuffleGrouping("randomspout");
		//将bolt设置到topology中,并且指定他接收upperjob组件的消息
		builder.setBolt("datejob", new MoreBolt(), 4).shuffleGrouping("upperjob");
		//用builder来创建topology
		StormTopology topology = builder.createTopology();
		
		//配置一些topology在集群中运行的参数
		Config conf = new Config();
		conf.setNumWorkers(4);
		conf.setDebug(true);
		conf.setNumAckers(0);
		
		//conf.set("hbase.zookeeper.quorum", "lp5,lp6,lp7");
		
		//提交
		StormSubmitter.submitTopology("demo3", conf, topology);
	}

}


storm使用范例

标签:

原文地址:http://blog.csdn.net/llllpppp4444/article/details/51360279

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!