Storm topology 结构
Storm VS MapReduce
Nimbus和Supervisor之间的所有协调工作都是通过一个Zookeeper集群来完成,并且Nimbus进程和supervisor都是快速失败(fail-fast)和无状态的,所有的状态要么在Zookeeper里面,
要么在本地磁盘上。这也就意味着可以用kill -9来杀死Nimbus和Supervisor进程, 然后再重启它们,它们可以继续工作, 就好像什么都没有发生过似的,这个设计使得storm不可思议的稳定。
Topologies -- 作业拓扑
为了在Storm上面做实时计算, 要去建立一些topologies。一个topology就是一个计算节点所组成的图。Topology里面的每个处理节点都包含处理逻辑,
而节点之间的连接则表示数据流动的方向。
运行一个topology是很简单的。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令:
strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2
这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。storm
jar负责连接到nimbus并且上传jar文件。
因为topology的定义其实就是一个Thrift结构并且Nimbus就是一个Thrift服务,可以用任何语言创建并且提交topology。上面的方法是用JVM-based语言提交的最简单的方法, 看一下文章: 在生产集群上运行topology去看看怎么启动以及停止topologies。
spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象(类似
Job)。
注:Thrift 是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在
C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务。
Stream -- 数据流
Stream是storm里面的关键抽象,一个stream是一个没有边界的tuple序列。storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。比如: 你可以把一个tweets流传输到热门话题的流。
Storm提供的最基本的处理stream的原语是spout和bolt。可以实现Spout和Bolt对应的接口以处理你应用的逻辑。
spout -- 流的源头
比如一个spout可能从Kestrel队列里面读取消息并且把这些消息发射成一个流。又比如一个spout可以调用twitter的一个api并且把返回的tweets发射成一个流。
通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装成Tuple形式,之后发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。
注: Kestrel是一个scala写的twitter开源的消息中间件,特点是高性能、小巧(2K行代码)、持久存储(记录日志到journal)并且可靠(支持可靠获取)。Kestrel的前身是Ruby写的Starling项目,后来twitter的开发人员尝试用scala重新实现。
Bolt -- 处理逻辑
负责处理输入的Stream,并产生新的输出Stream。Bolt可以执行过滤、函数操作、Join、操作数据库等任何操作。Bolt是一个被动的角色,其接口中有一个execute(Tuple
input)方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑。bolt可以接收任意多个输入stream, 作一些处理, 有些bolt可能还会发射一些新的stream。一些复杂的流转换, 比如从一些tweet里面计算出热门话题,
需要多个步骤, 从而也就需要多个bolt。 Bolt可以做任何事情: 运行函数、过滤tuple、 聚合、 合并以及访问数据库等等。
Spout 与 Bolt
注:1.topology里面的每一个节点都是并行运行的。 在topology里面,
可以指定每个节点的并行度, storm则会在集群里面分配多个线程来同时计算。
2.一个topology会一直运行直到显式停止它。storm自动重新分配一些运行失败的任务, 并且storm保证你不会有数据丢失, 即使在一些机器意外停机并且消息被丢掉的情况下。
运行中的Topology主要由以下三个组件组成的:Worker
processes、 Executors threads以及Tasks
它们的数量关系如下图所示:
Spout或者Bolt的Task个数一旦指定之后就不能改变了,而Executor的数量可以根据情况来进行动态的调整。默认情况下#
executor = #tasks即一个Executor中运行着一个Task。
三 数据模型
storm使用tuple来作为它的数据模型。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型, 一个tuple可以看作一个没有方法的java对象。总体来看,storm支持所有的基本类型、字符串以及字节数组作为tuple的值类型。也可以使用自己定义的类型来作为值类型,
只要实现对应的序列化器(serializer)。
一个Tuple代表数据流中的一个基本的处理单元,例如一条cookie日志,它可以包含多个Field,每个Field表示一个属性。
一个没有边界的、源源不断的、连续的Tuple序列就组成了Stream。Tuple本来应该是一个Key-Value的Map,由于各个组件间传递的tuple的字段名称已经事先定义好了,所以Tuple只需要按序填入各个Value,所以就是一个Value
List。
四 流分组策略(Stream grouping)
流分组策略告诉topology如何在两个组件之间发送tuple。
要记住, spouts和bolts以很多task的形式在topology里面同步执行。如果从task的粒度来看一个运行的topology, 它应该是这样的:
从task角度来看topology
当Bolt A的一个task要发送一个tuple给Bolt B, 它应该发送给Bolt B的哪个task呢?
stream grouping 专门回答这种问题的。在我们深入研究不同的stream grouping之前, 让我们看一下storm-starter里面的另外一个topology。WordCountTopology读取一些句子,
输出句子里面每个单词出现的次数.
|
TopologyBuilder
builder =newTopologyBuilder(); |
|
builder.setSpout( 1 ,new
RandomSentenceSpout(), 5 ); |
|
builder.setBolt( 2 ,new
SplitSentence(), 8 ) |
|
builder.setBolt( 3 ,new
WordCount(), 12 ) |
|
.fieldsGrouping( 2 ,newFields( "word" )); |
SplitSentence对于句子里面的每个单词发射一个新的tuple, WordCount在内存里面维护一个单词->次数的mapping, WordCount每收到一个单词, 它就更新内存里面的统计状态。
- 最简单的grouping是shuffle
grouping, 它随机发给任何一个task。上面例子里面RandomSentenceSpout和SplitSentence之间用的就是shuffle grouping, shuffle grouping对各个task的tuple分配的比较均匀。
-
一种更有趣的grouping是fields grouping, SplitSentence和WordCount之间使用的就是fields grouping, 这种grouping机制保证相同field值的tuple会去同一个task, 这对于WordCount来说非常关键,如果同一个单词不去同一个task,
那么统计出来的单词次数就不对了。
注:fields grouping是stream合并,是stream聚合以及很多其它场景的基础。在背后呢, fields grouping使用的一致性哈希来分配tuple。
Storm支持的组分配策略如下:
ShuffleGrouping:随机选择一个Task来发送。
FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。
AllGrouping:广播发送,将每一个Tuple发送到所有的Task。
GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task。
NoneGrouping:不关心Tuple发送给哪个Task来处理,等价于ShuffleGrouping。
DirectGrouping:直接将Tuple发送到指定的Task来处理。
Storm的组分配策略的概念类似于MapReduce的Partition机制,通过使用一些分组策略原语来为Tuple设置路由。
五 小结
Storm这种高可拓展性,能处理高频数据和大规模数据的实时流计算解决方案将被应用于实时搜索,高频交易和社交网络上,其应用场景将会越来越广泛。