码迷,mamicode.com
首页 > Windows程序 > 详细

StormAPI简单使用

时间:2015-12-12 17:04:41      阅读:288      评论:0      收藏:0      [点我收藏+]

标签:

StormAPI

StormAPI

○ StormAPI基本概念

Storm称用户的一个作业为
作业=Topology=拓扑
技术分享
拓扑是由一些点和边组成的有向无关图,点有两种,一种是数据源节点(Spout),另一种是普通计算节点(Bolt),点之间的边称为数据流(Stream),数据流之间的每一条记录称为Tuple

在下图就展示了一个Storm的一个拓扑
技术分享
每个水龙头就表示一个Spout,它会发送一个Tuple给下游的Bolt,Bolt经过处理之后再把Tuple发送给再下游的Bolt,在这个Bolt里面就可以进行写数据在外部存储的一些操作了,比如说把数据写到数据库里面等等。
这些Spout和Bolt之间的这些边里面,可以设置多种的grouping的方式:

  1. 随机发送
  2. 按一定规则发送

技术分享
Spout只是单纯的读取数据,SplitBolt对文件的每行切分单词,然后SplitBolt会把它产生的输出交给Storm框架,Storm框架再把数据进行一次Tuple交给CountBolt,CountBolt再把数据写到DATA-SINK里面。本质上和MapReduce的数据处理方式没有什么区别,但Storm是进行流式计算的,所以说数据是源源不断的到来的,输出也是每来一条数据就输出一次。

○ StormAPI的简单使用

TopologyBuilder的公有办法
创建提交拓扑的过程:

  1. 使用new关键字创建一个TopologyBuilder对象
  2. 调用setSpout方法设置Spout
  3. 调用setBolt方法设置Bolt
  4. 调用createTopology方法返回StormTopology对象给submitTopology方法作为输入参数
    WordCountTopology.java
1.package storm.starter;
2.
3.import backtype.storm.Config;
4.import backtype.storm.LocalCluster;
5.import backtype.storm.StormSubmitter;
6.import backtype.storm.task.ShellBolt;
7.import backtype.storm.topology.BasicOutputCollector;
8.import backtype.storm.topology.IRichBolt;
9.import backtype.storm.topology.OutputFieldsDeclarer;
10.import backtype.storm.topology.TopologyBuilder;
11.import backtype.storm.topology.base.BaseBasicBolt;
12.import backtype.storm.tuple.Fields;
13.import backtype.storm.tuple.Tuple;
14.import backtype.storm.tuple.Values;
15.import storm.starter.spout.RandomSentenceSpout;
16.
17.import java.util.HashMap;
18.import java.util.Map;
19.
20.//SplitSentence派生出ShellBolt这样一个子类
21.public class WordCountTopology {
22. public static class SplitSentence extends ShellBolt implements IRichBolt {
23.
24. public SplitSentence() {
25. super("python", "splitsentence.py");
26./* 通过标准输入输出来和用户指定的一个脚本来进行交互,使得真正的计算逻辑可以在用户的一个shell脚本里面执行
27. 具体的工作流程就是说,我们在ShellBolt里面指定一个命令,比如说python splitsentence.py这样一个命令,然后ShellBolt就会启动这样一个程序,并且把bolt所拿到数据进行一个序列化,通过标准输入这个脚本,把它该输出的东西也进行节分序列化,然后打印到标准输出里面,然后ShellBolt就回去捕获刚打印出来的标准输出 */

28. }
29.
30. @Override
31. public void
32. //把输出declare回来并且交给下游
33. declareOutputFields(OutputFieldsDeclarer declarer) {
34. declarer.declare(new Fields("word"));
35. }
36.
37. @Override
38. public Map<String, Object> getComponentConfiguration() {
39. return null;
40. }
41. }
42.//WordCount派生自BaseBasicBolt这样一个类
43. public static class WordCount extends BaseBasicBolt {
44. Map<String, Integer> counts = new HashMap<String, Integer>();
45.
46. @Override
47. //实现execute这样一个方法
48. public void execute(Tuple tuple, BasicOutputCollector collector) {
49. //tuple可以getString拿到一个字段
50. String word = tuple.getString(0);
51. //可以在内存里统计一下,这个"word"出现了多少次
52. Integer count = counts.get(word);
53. if (count == null)
54. count = 0;
55. count++;
56. counts.put(word, count);
57. collector.emit(new Values(word, count));//输出word和截至当前的count是多少
58. }
59.
60. @Override
61. public void declareOutputFields(OutputFieldsDeclarer declarer) {
62. declarer.declare(new Fields("word", "count"));
63. }
64. }
65.
66. public static void main(String[] args) throws Exception {
67.
68.//构造一个TopologyBuilder对象
69. TopologyBuilder builder = new TopologyBuilder();
70.
71.//添加一个id为"spout",并行度为5的RandomSentenceSpout对象
72. builder.setSpout("spout", new RandomSentenceSpout(), 5);
73.
74.//添加一个id为"split",并行度为8的SplitSentence对象,它的上游"spout",它的分组方式是shuffleGrouping,也就是说我不关心如何分发的,只要随机发下来就可以
75. builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
76.
77.//添加一个id为"count",并行度为12的WordCount对象,它的上游是"split",它的分组方式是fieldGrouping,也就是说按"word"这么一个字段进行分组
78. builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
79.//也就是前面说的将"word"字段相同的组发往同一个bolt进行处理,由同一个bolt进行处理,以保证其正确性
80. Config conf = new Config();
81. conf.setDebug(true);
82.
83.
84. if (args != null && args.length > 0) {
85. conf.setNumWorkers(3);
86.
87. StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
88. }
89. else {
90. conf.setMaxTaskParallelism(3);
91.
92.//创建一个LocalCluster对象,调用submitTopology,把拓扑含进去
93. LocalCluster cluster = new LocalCluster();
94.
95.//提交拓扑
96. cluster.submitTopology("word-count", conf, builder.createTopology());
97.
98.//线程睡眠10s,即拓扑可以运行10s
99. Thread.sleep(10000);
100.
101.//关闭拓扑
102. cluster.shutdown();
103. }
104. }
105.}

一般而言,流式服务是常驻的,不需要关掉的,但是如果判断失败,所以运行10s就关掉。

spout的书写
一个spout的书写就很简单,只要派生个BaseRichSpout这样一个类,实现三个接口:声明有哪些字段、该如何初始化、实现每次读取数据然后把数据输出给下游的逻辑。
RandomSentenceSpout.java

1.package storm.starter.spout;
2.
3.import backtype.storm.spout.SpoutOutputCollector;
4.import backtype.storm.task.TopologyContext;
5.import backtype.storm.topology.OutputFieldsDeclarer;
6.import backtype.storm.topology.base.BaseRichSpout;
7.import backtype.storm.tuple.Fields;
8.import backtype.storm.tuple.Values;
9.import backtype.storm.utils.Utils;
10.
11.import java.util.Map;
12.import java.util.Random;
13.
14.//RandomSentenceSpout派生出BaseRichSpout,然后实现了几个方法
15.public class RandomSentenceSpout extends BaseRichSpout {
16. SpoutOutputCollector _collector;
17. Random _rand;
18.
19.
20. @Override
21. //在spout初始化的时候框架会调用open接口进行处理,由于本例SentenceSpout在内存里产生输出的,所以初始化的时候没有做什么特别操作,只是把collector存了起来,生成了Random对象
22. public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
23. _collector = collector;
24. _rand = new Random();
25. }
26.
27. @Override
28. //框架会持续不断地调用与时间的nextTuple函数,nextTuple函数里面用户需要去读取数据,然后将读取数据用_collector.emit发送给下游
29. public void nextTuple() {
30. Utils.sleep(100);//睡眠100毫秒
31. String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
32. "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };//随机选择一个句子输送给下游
33. String sentence = sentences[_rand.nextInt(sentences.length)];
34. _collector.emit(new Values(sentence));
35. }
36.
37. @Override
38. public void ack(Object id) {
39. }
40.
41. @Override
42. public void fail(Object id) {
43. }
44.
45. @Override
46. //告诉框架我有哪些输出字段,这里就说明了我输出了一个叫"word"的字段
47. public void declareOutputFields(OutputFieldsDeclarer declarer) {
48. declarer.declare(new Fields("word"));
49. }
50.
51.}

那么我们将脚本放在哪个位置呢?一般而言,都是在你的工程的multilang里面的resources目录下

splitsentence.py

1.import storm #载入storm.py这样一个脚本
2.
3.class SplitSentenceBolt(storm.BasicBolt):
4. def process(self, tup):#实现一个process的方法,对所有的行和数据进行一个split
5. words = tup.values[0].split(" ")
6. for word in words:
7. storm.emit([word]) #emit给下游
8.
9.SplitSentenceBolt().run()
10.#这样就实现了切分单词的方法
11.

storm.py
strom.py的功能很简单,就是完成它的主要功能,读入并且进行json的反序列化,把输出进行序列化,并打到标准输出上面去

1.import sys
2.import os
3.import traceback
4.from collections import deque
5.
6.try:
7. import simplejson as json
8.except ImportError:
9. import json
10.
11.json_encode = lambda x: json.dumps(x)
12.json_decode = lambda x: json.loads(x)
13.
14.#reads lines and reconstructs newlines appropriately
15.def readMsg():
16. msg = ""
17. while True:
18. line = sys.stdin.readline()[0:-1]
19. if line == "end":
20. break
21. msg = msg + line + "\n"
22. return json_decode(msg[0:-1])
23.
24.MODE = None
25.ANCHOR_TUPLE = None
26.
27.#queue up commands we read while trying to read taskids
28.pending_commands = deque()
29.
30.def readTaskIds():
31. if pending_taskids:
32. return pending_taskids.popleft()
33. else:
34. msg = readMsg()
35. while type(msg) is not list:
36. pending_commands.append(msg)
37. msg = readMsg()
38. return msg
39.
40.#queue up taskids we read while trying to read commands/tuples
41.pending_taskids = deque()
42.
43.def readCommand():
44. if pending_commands:
45. return pending_commands.popleft()
46. else:
47. msg = readMsg()
48. while type(msg) is list:
49. pending_taskids.append(msg)
50. msg = readMsg()
51. return msg
52.
53.def readTuple():
54. cmd = readCommand()
55. return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
56.
57.def sendMsgToParent(msg):
58. print json_encode(msg)
59. print "end"
60. sys.stdout.flush()
61.
62.def sync():
63. sendMsgToParent({‘command‘:‘sync‘})
64.
65.def sendpid(heartbeatdir):
66. pid = os.getpid()
67. sendMsgToParent({‘pid‘:pid})
68. open(heartbeatdir + "/" + str(pid), "w").close()
69.
70.def emit(*args, **kwargs):
71. __emit(*args, **kwargs)
72. return readTaskIds()
73.
74.def emitDirect(task, *args, **kwargs):
75. kwargs[directTask] = task
76. __emit(*args, **kwargs)
77.
78.def __emit(*args, **kwargs):
79. global MODE
80. if MODE == Bolt:
81. emitBolt(*args, **kwargs)
82. elif MODE == Spout:
83. emitSpout(*args, **kwargs)
84.
85.def emitBolt(tup, stream=None, anchors = [], directTask=None):
86. global ANCHOR_TUPLE
87. if ANCHOR_TUPLE is not None:
88. anchors = [ANCHOR_TUPLE]
89. m = {"command": "emit"}
90. if stream is not None:
91. m["stream"] = stream
92. m["anchors"] = map(lambda a: a.id, anchors)
93. if directTask is not None:
94. m["task"] = directTask
95. m["tuple"] = tup
96. sendMsgToParent(m)
97.
98.def emitSpout(tup, stream=None, id=None, directTask=None):
99. m = {"command": "emit"}
100. if id is not None:
101. m["id"] = id
102. if stream is not None:
103. m["stream"] = stream
104. if directTask is not None:
105. m["task"] = directTask
106. m["tuple"] = tup
107. sendMsgToParent(m)
108.
109.def ack(tup):
110. sendMsgToParent({"command": "ack", "id": tup.id})
111.
112.def fail(tup):
113. sendMsgToParent({"command": "fail", "id": tup.id})
114.
115.def log(msg):
116. sendMsgToParent({"command": "log", "msg": msg})
117.
118.def initComponent():
119. setupInfo = readMsg()
120. sendpid(setupInfo[‘pidDir‘])
121. return [setupInfo[‘conf‘], setupInfo[‘context‘]]
122.
123.class Tuple:
124. def __init__(self, id, component, stream, task, values):
125. self.id = id
126. self.component = component
127. self.stream = stream
128. self.task = task
129. self.values = values
130.
131. def __repr__(self):
132. return ‘<%s%s>‘ % (
133. self.__class__.__name__,
134. ‘‘.join(‘ %s=%r‘ % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
135.
136.class Bolt:
137. def initialize(self, stormconf, context):
138. pass
139.
140. def process(self, tuple):
141. pass
142.
143. def run(self):
144. global MODE
145. MODE = Bolt
146. conf, context = initComponent()
147. self.initialize(conf, context)
148. try:
149. while True:
150. tup = readTuple()
151. self.process(tup)
152. except Exception, e:
153. log(traceback.format_exc(e))
154.
155.class BasicBolt:
156. def initialize(self, stormconf, context):
157. pass
158.
159. def process(self, tuple):
160. pass
161.
162. def run(self):
163. global MODE
164. MODE = Bolt
165. global ANCHOR_TUPLE
166. conf, context = initComponent()
167. self.initialize(conf, context)
168. try:
169. while True:
170. tup = readTuple()
171. ANCHOR_TUPLE = tup
172. self.process(tup)
173. ack(tup)
174. except Exception, e:
175. log(traceback.format_exc(e))
176.
177.class Spout:
178. def initialize(self, conf, context):
179. pass
180.
181. def ack(self, id):
182. pass
183.
184. def fail(self, id):
185. pass
186.
187. def nextTuple(self):
188. pass
189.
190. def run(self):
191. global MODE
192. MODE = Spout
193. conf, context = initComponent()
194. self.initialize(conf, context)
195. try:
196. while True:
197. msg = readCommand()
198. if msg["command"] == "next":
199. self.nextTuple()
200. if msg["command"] == "ack":
201. self.ack(msg["id"])
202. if msg["command"] == "fail":
203. self.fail(msg["id"])
204. sync()
205. except Exception, e:
206. log(traceback.format_exc(e))

StormAPI简单使用

标签:

原文地址:http://www.cnblogs.com/XBlack/p/5041399.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!