标签:
Storm简介
Storm是一个分布式实时流式框架,大多应用于以下场景:实时分析、在线机器学习、流式计算、分布式RPC ETL(BL分析)等等。同类型的框架有hadoop和spark。hadoop侧重于海量数据的离线计算,spark则更擅长实时迭代计算。要注意的是,storm并不直接处理数据,而是把我们的业务程序(逻辑)放在很多服务器上并发运行,待处理消息被分散到很多服务器上并发处理,以此扩展程序的负载能力。
Direction
简单来说的话,Storm框架包含两个部分。一个是Storm程序,一个是Storm集群。
Storm程序包含两个部分。一个是Spout,而另外一个是Bolt。同时可以有多个Bolt存在。
Storm集群也包含两个角色:Nimbus(集群主节点)和Supervisor(集群从节点)。
这里只介绍Storm程序,Storm集群只简单了解,以后有机会再来详细介绍。
Storm程序
Storm的处理逻辑如下图所示:
接下来我用一个例子来详细介绍Storm程序的编写,使用Storm框架时要提前导入Storm的jar包,这些jar包可以在Apache的官网下载。
业务要求如下:
假设需求要将小写转换为大写并加后缀,例如将 iphone转换为 IPHONE_suffix,我们分为两个步骤做。业务处理流程如下:
1、Spout 读取数据并封装为tuple发送出去
2、Upper_Bolt 将商品名称转换为大写
3、Suffix_Bolt 将大写后的商品名加上一个后缀
4、TopoMain 描述topology的结构以及创建topology并提交给集群
Spout的处理代码如下:
简单起见,Spout的数据不是从外部获取的,而是从内部数组中随机获取。所以将类名命名为RandomSpout
1 public class RandomSpout extends BaseRichSpout { 2 SpoutOutputCollector collector = null; 3 String[] goods = {"iphone","xiaomi","meizu","zhongxing","huawei","moto","sangsung"}; 4 5 /** 6 * 获取消息并发送给下一个组件的方法,会被storm不断地调用 7 * 8 * 从goods中随机取一个商品名称封装到tuple中发送出去 9 */ 10 @Override 11 public void nextTuple() { 12 //随机取到一个商品名称 13 Random random = new Random(); 14 String good = goods[random.nextInt(goods.length)]; 15 16 //封装到tuple中发送出去 17 collector.emit(new Values(good)); 18 19 //休眠一段时间 20 Utils.sleep(2000); 21 } 22 23 //进行初始化,只在开始时调用一次 24 @Override 25 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 26 this.collector = collector; 27 } 28 29 /** 30 * 定义tuple的scheme 31 */ 32 @Override 33 public void declareOutputFields(OutputFieldsDeclarer declarer) { 34 declarer.declare(new Fields("src_word")); 35 } 36 }
Upper_Bolt的处理代码如下:
1 /** 2 * 将受到的原始商品名转换为大写再发送出去 3 * @author Shy 4 * 5 */ 6 public class UpperBolt extends BaseBasicBolt { 7 @Override 8 public void execute(Tuple tuple, BasicOutputCollector collector) { 9 //从tuple中拿到数据——原始商品名 10 String src_word = tuple.getString(0); 11 //转换成大写 12 String upper_word = src_word.toUpperCase(); 13 //发送出去 14 collector.emit(new Values(upper_word)); 15 } 16 17 @Override 18 public void declareOutputFields(OutputFieldsDeclarer declarer) { 19 declarer.declare(new Fields("upper_word")); 20 } 21 }
Suffix_Bolt的处理代码如下:
1 /** 2 * 将商品名称添加后缀,将数据写入文件中 3 * @author Shy 4 * 5 */ 6 public class SuffixBolt extends BaseBasicBolt { 7 FileWriter fileWriter = null; 8 public void prepare(Map stormConf, TopologyContext context){ 9 try { 10 fileWriter = new FileWriter("/home/hadoop"+UUID.randomUUID()); 11 } catch (IOException e) { 12 e.printStackTrace(); 13 } 14 } 15 16 @Override 17 public void execute(Tuple tuple, BasicOutputCollector collector) { 18 //从消息元组中拿到上一个组件发送过来的数据 19 String upper_word = tuple.getString(0); 20 21 //给商品名称添加后缀 22 String result = upper_word +"_suffix"; 23 24 //将结果保存到文件 25 try { 26 fileWriter.append(result); 27 fileWriter.append("\n"); 28 fileWriter.flush(); 29 } catch (IOException e) { 30 e.printStackTrace(); 31 } 32 } 33 34 //声明该组件要发送出去的tuple的字段定义 35 @Override 36 public void declareOutputFields(OutputFieldsDeclarer arg0) { 37 // TODO Auto-generated method stub 38 } 39 }
TopoMain的处理代码如下:
1 /** 2 * 描述topology的结构以及创建topology并提交给集群 3 * @author Shy 4 * 5 */ 6 public class TopoMain { 7 8 public static void main(String[] args) throws Exception { 9 10 TopologyBuilder topologyBuilder = new TopologyBuilder(); 11 //设置消息源组件为RandomSpout 12 topologyBuilder.setSpout("randomspout", new RandomSpout(), 4); 13 14 //设置逻辑处理组件UpperBolt,并指定接受randomspout的消息 15 topologyBuilder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout"); 16 17 //设置逻辑处理组件SuffixBolt,并指定接收upperbolt的消息 18 topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt"); 19 20 //创建一个topology 21 StormTopology topo = topologyBuilder.createTopology(); 22 23 //创建一个storm的配置参数对象 24 Config conf = new Config(); 25 //设置storm集群为这个topo启动的进程数 26 conf.setNumWorkers(4); 27 conf.setDebug(true); 28 conf.setNumAckers(0); 29 30 //提交topo到storm集群中 31 StormSubmitter.submitTopology("demotopo", conf, topo); 32 } 33 }
storm程序写好之后就可以打包成jar包提交到storm集群上面去运行了。
Storm集群
集群搭建包括以下步骤:
1、集群安装,先安装zookeeper集群
2、上传storm安装包到服务器
3、解压安装包
4、修改配置文件
5、启动集群
启动storm集群的步骤:
1、先启动nimbus
bin/storm nimbus 1>/dev/null 2>&1 &
2、启动一个web服务进程
bin/storm ui 1>/dev/null 2>&1 &
3、启动各节点上的supervisor进程
bin/storm supervisor 1>/dev/null 2>&1 &
标签:
原文地址:http://www.cnblogs.com/shen-smile/p/5143564.html