码迷,mamicode.com
首页 > 其他好文 > 详细

Storm的HelloWorld实例

时间:2016-04-12 00:26:40      阅读:1544      评论:0      收藏:0      [点我收藏+]

标签:

====实例需求分析

数据源会源源不断的产生海量的英文语句。

我们需要实时的获取到单词的词频,或者是TopN,来观察词频是如何变化的。

设想这是不同商品的用户行为操作数据,我们是不是就可以实时观测到用户关注商品的热点呢?

 

====与Hadoop的对比

技术分享

 

====Storm编程模型

详细请参考后续文章中的部分。这里只进行简单介绍。

(1)消息源Spout组件

是消息源的接入组件。通常我们有两个方法来实现这个组件:

①、继承BaseRichSpout类:

相对来说比较简单的方法,并且需要重写的方法也比较少。

所以,这是一种比较简单的方式,但是能够满足基本的业务需求。

②、实现IRichSpout接口:

这种方式需要实现较多的已经定义好的方法,并且部分方法与BaseRichSpout是重叠的。

这种方式能够满足比较复杂的业务数据的接入的需求。

 

====WorldCount方案以及拓扑设计

技术分享

为了不把问题复杂化,我们在内存中生存数据源。

【消息源(RandomSentenceSpout)】

在Spout中随记发送内置的英文语句作为消息源。

【数据标准化(WordNormalizerBolt)】

然后使用一个Bolt进行归一化(语句切分),句子切分成单词发射出去。

【词频统计(WordCountBolt)】

使用一个Bolt接受订阅切分的单词Tuple,进行单词统计,并且选择使用按字段分组的策略,词频实时排序,把TopN实时发射出去。

【工具类(PrintBolt)】

最后使用一哥Bolt将结果打印到Log中。

 

====WordCount实例代码

可以从下面的Git上取得,版权所属:极客学院。

https://github.com/blogchong/storm-example

 

 

 

 

====相关类图

(参考原帖:http://blog.csdn.net/xeseo/article/details/17750379 

 

为了自己理解整理思路,我又将原帖中的记录进行了一些整理加工。

 

【IComponent接口】

Spout和Bolt都是其Component。所以,Storm定义了一个名叫IComponent的总接口。

技术分享

 

IComponent的继承关系如下图所示:

技术分享

绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关。

BaseComponent 是Storm提供的“偷懒”的类。为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法。

这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法。

但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null。

 

【Spout】

类图如下图所示:

技术分享

 

接口如下图所示:

技术分享

 

各个接口说明:

①、open方法:

是初始化动作。允许你在该spout初始化时做一些动作,传入了上下文,方便取上下文的一些数据。 

②、close方法

在该spout关闭前执行,但是并不能得到保证其一定被执行。

spout是作为task运行在worker内,在cluster模式下,supervisor会直接kill -9 woker的进程,这样它就无法执行了。

而在本地模式下,只要不是kill -9, 如果是发送停止命令,是可以保证close的执行的。 

③、activatedeactivate方法 

一个spout可以被暂时激活和关闭,这两个方法分别在对应的时刻被调用。 

④、nextTuple方法:

负责消息的接入,执行数据发射。是Spout中的最重要方法。

⑤、ack(Object)方法:

传入的Object其实是一个id,唯一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。 

⑥、fail(Object)方法:

同ack,只不过是tuple处理失败时执行。 

 

我们的RandomSpout由于继承了BaseRichSpout,

所以不用实现close、activate、deactivate、ack、fail和getComponentConfiguration方法,只关心最基本核心的部分。 

 

结论:

通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。 

 

【Bolt】

类图如下图所示:

技术分享

 

这里可以看到一个奇怪的问题: 为什么IBasicBolt并没有继承IBolt? 我们带着问题往下看。 

IBolt定义了三个方法: 

技术分享

①、prepare方法:

IBolt继承了java.io.Serializable,我们在nimbus上提交了topology以后,创建出来的bolt会序列化后发送到具体执行的worker上去。

worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文。

②、execute方法:

接受一个tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果。

③、cleanup方法:

同ISpout的close方法,在关闭前调用。同样不保证其一定执行。

 

红色部分(execute方法)是Bolt实现时一定要注意的地方。

而Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。

如果你确实要反馈失败,可以抛出FailedException。

 
我们来再写一个Bolt继承BaseRichBolt替代ExclaimBasicBolt。代码如下:
技术分享
修改topology
技术分享
运行下,结果一致。
 
结论:
通常情况下,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,
如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做掉了prepare方法和collector.emit.ack(inputTuple);

 

--END--

Storm的HelloWorld实例

标签:

原文地址:http://www.cnblogs.com/quchunhui/p/5380260.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!