Resources:
-
Tuple:
streams are composed of tuples:Tuple是一个interface,对应的实现类 TupleImpl。
-
OutputFieldsDeclarer:
used to declare streams and their schemas
-
Serialization:
Information about Storm‘s dynamic typing of tuples and declaring custom serializations
Ps:Storm中的tuple是接口,没有具体实现,但原话这么解释的:
Storm needs to know how to serialize all the values in a tuple. By default, Storm
* knows how to serialize the primitive types, strings, and byte arrays.
3、Spouts
在Topology中,每个Spout都是一个Streams源,通常情况下,Spouts会从外部源读取Tuple,并输入这些Tuple到Topology中。
Spouts既是可靠的又是不可靠的,因为,可靠的spout会在发送Tuple失败的情况下,重复发送;相反,不可靠的spout会忘记它发送过的Tuple,无论是否成功。
Spout代码过程:
Spouts的主要方法之一是:nextTuple() 发送tuple,nextTuple可以发送一个新的Tuple到Topology,或者当没有新的Tuple被发送的时候,就简单的返回。对于任何spout的实现,nextTuple都不能阻塞,因为Storm调用的所有spout都是基于同一个线程!
其次是 ack 和 fail 方法,它们都会被调用,当Storm发现一个tuple被从spout发射后,要么成功地完成的通过topology,要么错误的完成。ack 和 fail 方法只有在可靠的spouts下才能被调用。spout可靠性,请搜本页下面内容,或移至代码。
Resources:
Ps:nextTuple()方法中会发送Tuple,至于那种对象能发送,请看上述。
Qu:1、在代码中如何让声明的留和发送tuple联系起来,因为声明流的名称并不是tuple对象名?
2、是Storm中Spout的nextTuple对应一个线程,还是多个Spout对应一个线程?
answer:在集群中,应该是每个node的JVM中启动一个线程跑spout
4、Bolts
在Topologies中所有的处理都会在bolts中被执行,它能够过滤tuple、函数操作、合并(连接join、聚合aggregation)、数据库读写等。Bolt可以做复杂的流传输,需要多步骤、多bolt的连接。
Bolt也可以发射出一个或多个流,它需要使用
OutputFieldsDeclarer 类的
declareStream 方法
声明多个流,并且需要指定这个流去使用
OutputCollectorl类的
emit方法去
发射。
当你声明一个bolt的输入流时,你需要订阅一个指定的其他组件的流。每一个流的订阅都是一个个添加。
InputDeclarer类可以声明一个流在默认的流id上。 declarer.shuffleGrouping("1") 说明在组件“1”上订阅了这个默认流,等价于
declarer.shuffleGrouping("1",
DEFAULT_STREAM_ID)。
Bolts的主要方法是
execute
方法,它会吸收作为输入的一个新Tuple。Bolts使用
OutputCollector
对象发射新的Tuples。Bolts必须对每一个tuple调用
OutputCollector
的
ack
方法,以便于Storm知道什么时候元组们被处理完成(可以最终确定它的安全对于包装这个初始化spout
tuples)。
共同处理一个输入元组的情况下,发射0或多个元组们基于元组,然后包装输入元组,Storm提供一个IBasicBolt接口的自动包装。
Resources:
Ps:bolt发送或接收的数据流都可以多对多的进行。
5、Stream groupings 流分组
定义一个拓扑部分是指定了每个bolt门闩的流都应该作为输入被接收。一个流分组定义为:在门闩的任务之中如何区分流。
在Storm中有8种流分组方式,通过实现CustomStreamGroupingj接口,你可以实现一种风格流分组方式:
Storm 定义了八种内置数据流分组的方式:
1、Shuffle grouping(随机分组):这种方式会随机分发 tuple 给bolt 的各个 task,每个bolt 实例接收到的相同数量的 tuple 。
2、Fields grouping(按字段分组):根据指定字段的值进行分组。比如说,一个数据流根据“ word”字段进行分组,所有具有相同“ word ”字段值的 tuple 会路由到同一个 bolt 的 task 中。
3、All grouping(全复制分组):将所有的 tuple 复制后分发给所有 bolt task。每个订阅数据流的 task 都会接收到 tuple 的拷贝。
4、Globle grouping(全局分组):这种分组方式将所有的 tuples 路由到唯一一个 task 上。Storm 按照最小的 task ID 来选取接收数据的 task 。注意,当使用全局分组方式时,设置 bolt 的 task 并发度是没有意义的(spout并发有意义),因为所有 tuple 都转发到同一个 task 上了。使用全局分组的时候需要注意,因为所有的 tuple 都转发到一个 JVM 实例上,可能会引起
Storm 集群中某个 JVM 或者服务器出现性能瓶颈或崩溃。
5、None grouping(不分组):在功能上和随机分组相同,是为将来预留的。
6、Direct grouping(指向型分组):数据源会调用 emitDirect() 方法来判断一个 tuple 应该由哪个 Storm 组件来接收。只能在声明了是指向型的数据流上使用。
7、Local or shuffle grouping (本地或随机分组):和随机分组类似,但是,会将 tuple 分发给同一个 worker 内的bolt task (如果 worker 内有接收数据的 bolt task )。其他情况下,采用随机分组的方式。取决于topology 的并发度,本地或随机分组可以减少网络传输,从而提高 topology 性能。
8、Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the
Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This
paper provides a good explanation of how it works and the advantages it provides.
Resources:
-
TopologyBuilder:
use this class to define topologies
-
InputDeclarer:
this object is returned whenever
setBolt
is called onTopologyBuilder
and
is used for declaring a bolt‘s input streams and how those streams should be grouped
6、Reliability
Storm保证每一个喷口元组都将会在拓扑中完整的被处理。处理过程:它会追踪这个元组树被每一个喷口元组所触发,并且确定元组树已经成功完成。每个拓扑都有一个“信息超时”与之相关联。假如Storm未能检测到一个喷口元组已经超时完成(看不懂),它将舍弃并重新执行这个元组。
为了改善Storm的可靠性能力,你可以告诉Storm什么时候需要在元组树种创建一个新的边界,告诉Storm无论在什么时候都可以完成处理一个独立的元组。Bolt们都使用了
OutputCollector对象去发射元组们。锚定的完成于这个emit方法,你可以声明一个元组使用了ack方法而被完成。
以上详细的解释了可靠消息处理。
7、Tasks
每个喷口spout或者门闩bolt都有许多任务在集群中执行。每一个任务对应一个执行线程,流分组定义了如何从一个任务集到另外一个任务集发送元组。你可以使用
TopologyBuilder 类的setSpout和setBolt方法,为每一个喷口或门闩是设置并行。
Ps:可以理解task是并行处理。
8、Workers
拓扑执行要通过一个或多个worker进程。每一个worker进程都是一个物理的JVM和这个拓扑中执行了一个所有这个任务的子集。
例子:如果拓扑的联合并发数为300,分配了50个worker,因此每一个worker将会执行6个task(task将作为worker的线程)。Storm将会均匀的分配任务到所有worker上。
Worker结构:
Topology的并发机制:
storm的Worker、Executor、Task默认配置都是1
1、增加worker(本地模式无效,只有一个JVM)
Config
对象的setNumWorkers()
方法:
Config config = new Config();
config.setNumWorkers(2):
2、配置executor 和 task
默认都为1,setXXX指定一个Worker中有几个线程,而后面的setNumXXX指定总共需要执行的tasks数量,因此,一个Thread--Executor中需要跑tasks/threads个任务。
topologyBuilder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
// StormBaseSpout -> StormBaseBolt
topologyBuilder.setBolt(SPLIT_BOLT_ID, bolt).setNumTasks(2).shuffleGrouping(SENTENCE_SPOUT_ID);
// StormBaseBolt -> StormBaseBoltSecond
topologyBuilder.setBolt(COUNT_BOLT_ID, boltSecond, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
// StormBaseBoltSecond -> StormBaseBoltThird
topologyBuilder.setBolt(REPORT_BOLT_ID, boltThird).globalGrouping(COUNT_BOLT_ID);
storm的处理保障机制:
1、spout的可靠性
spout会记录它所发射出去的tuple,当下游任意一个bolt处理失败时spout能够重新发射该tuple。在spout的nextTuple()发送一个tuple时,为实现可靠消息处理需要给每个spout发出的tuple带上唯一ID,并将该ID作为参数传递给SpoutOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), tupleID);
实际上Values extends ArrayList<Object>
保障过程中,每个bolt每收到一个tuple,都要向上游应答或报错,在tuple树上的所有bolt都确认应答,spout才会隐式调用ack()方法表明这条消息(一条完整的流)已经处理完毕,将会对编号ID的消息应答确认;处理报错、超时则会调用fail()方法。
2、bolt的可靠性
bolt的可靠消息处理机制包含两个步骤:
a、当发射衍生的tuple,需要锚定读入的tuple
b、当处理消息时,需要应答或报错
可以通过OutputCollector中emit()的一个重载函数锚定或tuple:collector.emit(tuple, new Values(word)); 并且需要调用一次this.collector.ack(tuple)应答。