码迷,mamicode.com
首页 > 其他好文 > 详细

实时计算框架之二:Storm之入门实例

时间:2015-08-19 17:54:44      阅读:196      评论:0      收藏:0      [点我收藏+]

标签:

2 Storm的基本组成部分

经过前一部分,我们可以搭建起Storm的执行环境,并可以通过浏览器打开对应的管理页面。如果已经成功的到达这一步,那么恭喜你,Storm的框架已经搭建成功,接下来就是Storm具体该如何应用了。首先,先来介绍一下Storm的核心模块,我们基本需要依赖这几个模块来对应进行开发。

 

2.1 拓扑 - Topology

我们需要向Storm中提交一个实时运行的应用程序,由Storm来执行这个应用程序。那么,这个应用程序称作一个拓扑(Topology)。

为什么叫做拓扑呢?拓扑在计算机网络中,是将计算机和通信设备抽象为一个点,将传输介质抽象为一条线,由点和线组成的几何图形就是计算机网路的拓扑结构。我们提交的一个应用程序,是执行在Storm的集群上的,这个应用程序在运行的状态如下图。

技术分享

关于Spout和Bolt会在下面讲到,从图上直观来看,我们执行的应用程序就是一个拓扑。


2.2 喷口 - Spout

Spout是整个Topology的数据流来源,通常来说,Spout会从外部数据源中读取数据,然后转换为Topology内部的数据格式,再发送给Bolt进行计算处理。

Spout主要是有一个nextTuple函数,Topology会不断调用此函数,所以相关数据获取工作写在这个函数之内即可。

 

2.3 螺栓 – Bolt

在Topology中,所有的处理都在Bolt中完成,Bolt是Stream处理的节点。Bolt从Topology中获取数据,并进行处理。

Bolt主要有execute函数,在接收到数据后,会调用此函数,对接收的数据进行相关处理。

 

2.4 流 – Stream

Stream即一个无界的元组序列,一个接一个的序列,就构成了流。Spout和Bolt的处理数据即是流。

 

2.5 流分组 - Stream grouping

流分组定义了如何在Bolt的任务之间进行分发。就是说某个数据应该交由哪个固定的Worker来进行处理,这个在后面的实例中有个简单的例子,很容易理解。

技术分享


3 Storm实例

了解完上面几个部分,可能会有点不是很懂,现在接合一个具体的例子,来详细说明这几部分是如何接合及应用的。


3.1 需求描述

国外某地区,需要针对当地居民的名字做一个统计,即统计每个名字使用的次数。例如说,当地居民(假设为10人)有以下名字出现:

nathan

mike

jackson

jackson

mike

mike

golda

bertels

golda

bertels

那么,可以统计出以下结果信息:

nathan               1

mike                   3

jackson              2

golda                  2

bertels               2

另外,为了看到计算的结果,在处理每个名字时,为每个名字添加”!!!”,并进行打印输出。例如说nathan的打印结果为nathan!!!。

现在假设当前有N个人进行统计,名字假设还是只有这五种,那么在如何使用Storm来进行计算统计并打印结果呢?

 

3.2 Stream实现

由于现在作为数据进行传输的只有名字,所以当前Stream使用字符串既可。

 

3.3 Spout实现

根据上面需求的描述,Spout主要的任务是在名字数组String[] names = new String[]{"nathan", "mike","jackson", "golda", "bertels"};中随机活取N个名字,并发送到Bolt进行统计计算,并添加”!!!”后打印出来。所以具体实现如下。

[java] view plaincopy技术分享技术分享

  1. package storm.spout;  

  2.   

  3. import java.util.Map;  

  4. import java.util.Random;  

  5.   

  6. import backtype.storm.spout.SpoutOutputCollector;  

  7. import backtype.storm.task.TopologyContext;  

  8. import backtype.storm.topology.OutputFieldsDeclarer;  

  9. import backtype.storm.topology.base.BaseRichSpout;  

  10. import backtype.storm.tuple.Fields;  

  11. import backtype.storm.tuple.Values;  

  12. import backtype.storm.utils.Utils;  

  13.   

  14. public class NamesSpout extends BaseRichSpout {  

  15.     SpoutOutputCollector m_collector;  

  16.   

  17.     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {  

  18.         m_collector = collector;  

  19.     }  

  20.   

  21.     public void nextTuple() {  

  22.         final String[] names = new String[]{"nathan""mike""jackson""golda""bertels"};  

  23.         final Random rand = new Random();  

  24.         final String name = names[rand.nextInt(names.length)];  

  25.           

  26.         Utils.sleep(10);  

  27.         m_collector.emit(new Values(name));  

  28.     }  

  29.   

  30.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  

  31.         declarer.declare(new Fields("name"));  

  32.     }  

  33.   

  34. }  

首先,自定义的Spout需要继承Storm的相关Spout的接口,例如BaseRichSpout或者IRichSpout等。

其次,在open函数中,实现资源的初始化等操作,这里没有特殊操作,只将流获取绑定到本身Collector上即可。

第三,声明输出流的格式,即 declareOutputFields函数。

最后,实现流的生成操作nextTuple函数,这里在人名中随机选择一个,并通过emit进行发送,Bolt接收到这个人名,并进行下一步的处理。

至此,一个简单的Spout就完成了。

 

3.4 Bolt实现

Bolt的操作分为两部分,第一部分是统计计算,第二部分是进行”!!!”的添加。其也需要继承Storm对应的类BaseRichBolt或者其他的接口。具体实现如下。

[java] view plaincopy技术分享技术分享

  1. package storm.bolt;  

  2.   

  3. import java.util.HashMap;  

  4. import java.util.Map;  

  5.   

  6. import backtype.storm.task.OutputCollector;  

  7. import backtype.storm.task.TopologyContext;  

  8. import backtype.storm.topology.OutputFieldsDeclarer;  

  9. import backtype.storm.topology.base.BaseRichBolt;  

  10. import backtype.storm.tuple.Fields;  

  11. import backtype.storm.tuple.Tuple;  

  12.   

  13. public class ExclamationBolt extends BaseRichBolt {  

  14.     OutputCollector m_collector;  

  15.     public Map<String, Integer> NameCountMap = new HashMap<String, Integer>();  

  16.   

  17.     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {  

  18.         m_collector = collector;  

  19.     }  

  20.   

  21.     public void execute(Tuple input) {  

  22.         // 第一步,统计计算  

  23.         Integer value = 0;  

  24.         if (NameCountMap.containsKey(input.getString(0))) {  

  25.             value = NameCountMap.get(input.getString(0));  

  26.         }  

  27.         NameCountMap.put(input.getString(0), ++value);  

  28.           

  29.         // 第二步,输出  

  30.         System.out.println(input.getString(0) + "!!!");  

  31.         System.out.println(value);  

  32.           

  33.         m_collector.ack(input);  

  34.     }  

  35.   

  36.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  

  37.         declarer.declare(new Fields("name"));  

  38.     }  

  39. }  

关于初始化的prepare函数和声明输出流的函数declareOutputFields不在重新说明,和Spout的相关函数类似。

这里定义了一个map,用来统计名字出现的次数,另外名字修改后会打印到控制台信息中。

统计计算部分都在execute接口中实现,较复杂的情况下,可以拆分为多个Bolt来分别执行不同的计算部分。


3.5 Topology实现

主要的两大部分都已实现完毕,那么该如何将Topology执行起来呢?Topology执行分两种模式,第一个是本地模式,即Debug模式;第二个是提交到Storm框架上,远程执行。

首先按照本地模式来讲解,远程模式可以增加一个执行参数来区分。具体实现如下。

[java] view plaincopy技术分享技术分享

  1. package storm.topology;  

  2.   

  3. import storm.bolt.ExclamationBolt;  

  4. import storm.spout.NamesSpout;  

  5. import backtype.storm.Config;  

  6. import backtype.storm.LocalCluster;  

  7. import backtype.storm.StormSubmitter;  

  8. import backtype.storm.topology.TopologyBuilder;  

  9. import backtype.storm.utils.Utils;  

  10.   

  11. public class ExclamationTopology {  

  12.   

  13.     public static void main(String[] args) throws Exception {  

  14.         TopologyBuilder builder =  new TopologyBuilder();  

  15.         builder.setSpout("name"new NamesSpout(), 5);  

  16.         builder.setBolt("exclaim"new ExclamationBolt(), 5).shuffleGrouping("name");  

  17.           

  18.         Config conf = new Config();  

  19.         conf.setDebug(false);  

  20.         conf.put(Config.TOPOLOGY_DEBUG, false);  

  21.           

  22.         if (args != null && args.length > 0) {  

  23.             conf.setNumWorkers(10);  

  24.             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  

  25.         } else {  

  26.             LocalCluster cluster = new LocalCluster();  

  27.             cluster.submitTopology("test", conf, builder.createTopology());  

  28.             Utils.sleep(10000);  

  29.             cluster.killTopology("test");  

  30.             cluster.shutdown();  

  31.         }  

  32.     }  

  33.   

  34. }  

Topology中即实现主方法main,其中创建Topology,Topology要把Spout和Bolt的关系建立起来,建立关系的方法主要是通过名称建立。例如指定Spout输出流的处理Bolt时,通过设置shuffleGrouping中的名字即可,即将名字设置为Spout的名字”name”。

最后,载入配置,并执行。这里通过参数区分本地模式和远程模式,如果含有参数,则为远程模式,否则是本地模式。

完成这部分后,点击Eclipse的执行按钮,即可将Topology执行起来,输出窗口中也可以看到Bolt中打印的消息。

技术分享


3.6 Stream grouping实现

接下来,是一个很有趣的部分。Stream grouping对Stream进行分组,具体是怎么用的呢?

先看一下上一次执行的结果,如下:

技术分享

可以看到,mike两次结果都为3,这明显的是错误的,这是为什么呢?

回来看我们Topology部分的实现,有这一行代码:

[java] view plaincopy技术分享技术分享

  1. builder.setBolt("exclaim"new ExclamationBolt(), 5).shuffleGrouping("name");  

可以看到后面有个shuffleGrouping,这个就是所谓的Stream grouping了。当前设置的是随机分组,那么map中的统计数目自然也就是错乱的了。我们将这行代码,换成以下形式:

[java] view plaincopy技术分享技术分享

  1. builder.setBolt("exclaim"new ExclamationBolt(), 5).fieldsGrouping("name"new Fields("name"));  

那么,新的结果如下:
技术分享

可以发现结果都是正确的,复合我们的计算要求。

Storm里面的Stream分组方式有7种,具体信息可以去官网查看文档,另外,也可以自己定义需要的分组方式。


4 打包与执行

如何创建一个Topology和Topology的执行部分都已经描述完毕,接下来就是如何将这个Topology提交到Storm框架里面来执行了。现在,需要用到我们之前下载安装的Maven工具来进行打包。

 

4.1 Maven打包

4.1.1 安装Eclipse的Maven插件

在Eclipse的菜单栏,选择【Help】中的【Install New Software】,如下图:

技术分享

在【Workwith】中输入Maven的更新站点:

http://download.eclipse.org/technology/m2e/release

然后选择要安装的组件后,一直点击【Next】,等待安装完成后,重启Eclipse即可。

 

4.1.2 进行打包

在Eclipse中,右键点击要打包的Project,并在右键菜单中选择【Run as】中的【Maven build】,如下图所示。

技术分享

增加参数:clean package,当在输出窗口中看到Success,则表示打包成功,否则根据错误提示进行修改并重新打包。


4.2 提交jar包到Storm上

打开控制台,进入到工程目录,并进入到工程目录下的target目录下,使用ls命令查看所有文件如下:

技术分享

其中,StormDemo-0.0.1-SNAPSHOT.jar文件就是我们要提交到Storm上执行的jar包。使用以下命令进行jar包提交。

storm jar StormDemo-0.0.1-SNAPSHOT.jarstorm.topology.ExclamationTopology demo

其中,storm.topology.ExclamationTopology是jar包主入口所在位置,后面的demo为参数,前面我们提到过,使用该参数来区分本地模式和远程模式。


4.3 查看提交及执行结果

提交后,可以在Storm的网页上看到对应的执行情况。

技术分享


5 进一步思考

一直以来,基本全在C++这条不归路上奋斗着,随着这段时间来的各种突发感受,发现了更多以前见识还是太过短浅。“完美”固然是一个非常宏伟的目标,但是在这种快速开发迭代出产品的情况下,却应该收起这种心态。一直看着一些缺陷漏洞百出的产品,会产生一种越来越疲惫的心态吧?

试着去理解另外一种心态,以“曳光弹”出发,快速开发,快速迭代,逐步完善,向完美靠拢。

时代在改变,技术在爆发,走向未来吧!


实时计算框架之二:Storm之入门实例

标签:

原文地址:http://my.oschina.net/HIJAY/blog/494685

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!