标签:
在storm环境部署完毕,并正确启动之后,现在就可以真正进入storm开发了,按照惯例,以wordcount作为开始。
这个例子很简单,核心组件包括:一个spout,两个bolt,一个Topology。
spout从一个路径读取文件,然后readLine,向bolt发射,一个文件处理完毕后,重命名,以不再重复处理。
第一个bolt将从spout接收到的字符串按空格split,产生word,发射给下一个bolt。
第二个bolt接收到word后,统计、计数,放到HashMap<string, integer="">容器中。
1,定义一个spout,作用是源源不断滴向bolt发射字符串。
点击(此处)折叠或打开
2,定义一个bolt,作用是接收spout发过来的字符串,并分割成word,发射给下一个bolt。
点击(此处)折叠或打开
3,定义一个bolt,接收word,并统计。
点击(此处)折叠或打开
注意WordCounter类的prepare方法,里面定义了一个Thread,持续监控容器的变化(word个数增加或者新增word)。
4,定义一个Topology,提交作业。
点击(此处)折叠或打开
5,代码完成后,导出jar(导出时不要指定Main class),然后上传至storm集群,通过命令./storm jar com.x.x.WordCountTopo /data/tianzhen/input 2来提交作业。
Topo启动,spout、bolt执行过程:
Thread监控的统计结果:
源文件处理之后被重命名为*.bak。
和Hadoop不同,在任务执行完之后,Topo不会停止,spout会一直监控数据源,不停地往bolt发射数据。
所以现在如果源数据发生变化,应该能够立马体现出来。我往path下再放一个文本文件,结果:
可见,结果立刻更新了,storm的实时性就体现在这里
标签:
原文地址:http://www.cnblogs.com/yiguangchao9999/p/5554398.html