Storm集群表面类似Hadoop集群。但在Hadoop上你运行的是”MapReduce jobs”,在Storm上你运行的是”topologies”。”Jobs”和”topologies”是大不同的,一个关键不同是一个MapReduce的Job最终会结束,而一个topology永远处理消息(或直到你kill它)。
Storm集群有两种节点:控制(master)节点和工作者(worker)节点。控制节点运行一个称之为”Nimbus”的后台程序,它类似于Haddop的”JobTracker”。Nimbus负责在集群范围内分发代码、为worker分配任务和故障监测。
注意:Hadoop 2.0以前使用JobTrack来进行Job的分发,但2.x之后就使用了全新的资源调度框架,即yarn,这点尤其需要注意。
每个工作者节点运行一个称之”Supervisor”的后台程序。Supervisor监听分配给它所在机器的工作,基于Nimbus分配给它的事情来决定启动或停止工作者进程。每个工作者进程执行一个topology的子集(也就是一个子拓扑结构);一个运行中的topology由许多跨多个机器的工作者进程组成。
一个Zookeeper集群负责Nimbus和多个Supervisor之间的所有协调工作(一个完整的拓扑可能被分为多个子拓扑并由多个supervisor完成)。
此外,Nimbus后台程序和Supervisor后台程序都是快速失败(fail-fast)和无状态的;所有状态维持在Zookeeper或本地磁盘。这意味着你可以kill -9杀掉nimbus进程和supervisor进程,然后重启,它们将恢复状态并继续工作,就像什么也没发生。这种设计使storm极其稳定。这种设计中Master并没有直接和worker通信,而是借助一个中介Zookeeper,这样一来可以分离master和worker的依赖,将状态信息存放在zookeeper集群内以快速恢复任何失败的一方。
可以参考官方文档:http://storm.apache.org/releases/1.0.6/Setting-up-a-Storm-cluster.html
官方文档对于配置中的解释是非常清晰明了和容易理解的。
下载地址:https://storm.apache.org/downloads.html
需要确保已经安装好了zookeeper环境,在我的环境中已经搭建好了zookeeper集群环境。
1.解压
[uplooking@uplooking01 soft]$ tar -zxvf apache-storm-1.0.2.tar.gz -C ../app/
[uplooking@uplooking01 app]$ mv apache-storm-1.0.2/ storm
2.修改配置文件
# storm-env.sh
export JAVA_HOME=/opt/jdk
export STORM_CONF_DIR="/home/uplooking/app/storm/conf"
# storm.yaml
storm.zookeeper.servers:
- "uplooking01"
- "uplooking02"
- "uplooking03"
nimbus.seeds: ["uplooking01", "uplooking02"]
storm.local.dir: "/home/uplooking/data/storm"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
3.创建storm.local.dir
mkdir -p /home/uplooing/data/storm
4.配置环境变量
# .bash_profile
export STORM_HOME=/home/uplooking/app/storm
export PATH=$PATH:$STORM_HOME/bin
# 将其同步到其它节点
scp .bash_profile uplooking@uplooking02:/home/uplooking
scp .bash_profile uplooking@uplooking03:/home/uplooking
5.复制storm安装目录到其它节点
scp -r storm/ uplooking@uplooking02:/home/uplooking/app
scp -r storm/ uplooking@uplooking03:/home/uplooking/app
6.启动storm集群
# uplooking01
storm nimbus &
storm ui &
# uplooking02
storm nimbus &
storm supervisor &
# uplooking03
storm supervisor &
7.启动logviewer(可选)
在所有从节点执行"nohup bin/storm logviewer >/dev/null 2>&1 &"启动log后台程序,并放到后台执行。
(nimbus节点可以不用启动logviewer进程,因为logviewer进程主要是为了方便查看任务的执行日志,这些执行日志都在supervisor节点上)。
因为启动了storm ui,在地址栏中输入:http://uplooking01:8080就可以查看storm集群的相关信息:
同时查看其显示的信息,对于我们前面的配置也有一个十分直观的体现。
使用前面的计算总和的例子:
package cn.xpleaf.bigdata.storm.remote;
import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Date;
import java.util.Map;
/**
* 1°、实现数字累加求和的案例:数据源不断产生递增数字,对产生的数字累加求和。
* <p>
* Storm组件:Spout、Bolt、数据是Tuple,使用main中的Topology将spout和bolt进行关联
* MapReduce的组件:Mapper和Reducer、数据是Writable,通过一个main中的job将二者关联
* <p>
* 适配器模式(Adapter):BaseRichSpout,其对继承接口中一些没必要的方法进行了重写,但其重写的代码没有实现任何功能。
* 我们称这为适配器模式
*/
public class StormSumTopology {
/**
* 数据源
*/
static class OrderSpout extends BaseRichSpout {
private Map conf; // 当前组件配置信息
private TopologyContext context; // 当前组件上下文对象
private SpoutOutputCollector collector; // 发送tuple的组件
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
/**
* 接收数据的核心方法
*/
@Override
public void nextTuple() {
long num = 0;
while (true) {
num++;
StormUtil.sleep(1000);
System.out.println("当前时间" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "产生的订单金额:" + num);
this.collector.emit(new Values(num));
}
}
/**
* 是对发送出去的数据的描述schema
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("order_cost"));
}
}
/**
* 计算和的Bolt节点
*/
static class SumBolt extends BaseRichBolt {
private Map conf; // 当前组件配置信息
private TopologyContext context; // 当前组件上下文对象
private OutputCollector collector; // 发送tuple的组件
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
private Long sumOrderCost = 0L;
/**
* 处理数据的核心方法
*/
@Override
public void execute(Tuple input) {
Long orderCost = input.getLongByField("order_cost");
sumOrderCost += orderCost;
System.out.println("商城网站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品总交易额" + sumOrderCost);
StormUtil.sleep(1000);
}
/**
* 如果当前bolt为最后一个处理单元,该方法可以不用管
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
/**
* 构建拓扑,相当于在MapReduce中构建Job
*/
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
/**
* 设置spout和bolt的dag(有向无环图)
*/
builder.setSpout("id_order_spout", new OrderSpout());
builder.setBolt("id_sum_bolt", new SumBolt())
.shuffleGrouping("id_order_spout"); // 通过不同的数据流转方式,来指定数据的上游组件
// 使用builder构建topology
StormTopology topology = builder.createTopology();
String topologyName = StormSumTopology.class.getSimpleName(); // 拓扑的名称
Config config = new Config(); // Config()对象继承自HashMap,但本身封装了一些基本的配置
// 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter
if (args == null || args.length < 1) { // 没有参数时使用本地模式,有参数时使用集群模式
LocalCluster localCluster = new LocalCluster(); // 本地开发模式,创建的对象为LocalCluster
localCluster.submitTopology(topologyName, config, topology);
} else {
StormSubmitter.submitTopology(topologyName, config, topology);
}
}
}
可以看到区别在于后面作业的提供方式,使用集群的方式为:StormSubmitter.submitTopology(topologyName, config, topology);
。
这里使用Maven的方式进行打包,确保pom.xml中已经配置了storm-core
依赖的可见范围和相关的打包插件:
<!--依赖-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<!--可见范围为provided时,打包时不会对依赖进行打包,但在本地测试开发时应该注释掉,否则程序无法运行-->
<!--另外不需要打包storm的依赖是因为,集群中已经有storm的相关依赖jar包了-->
<scope>provided</scope>
</dependency>
<!--打包插件-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<!-- 将依赖也一起打包 -->
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以在这里指定运行的主类,这样在打包为jar包后就可以不用指定需要运行的类 -->
<mainClass>
</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
在idea中配置maven打包的命令:
clean package -DskipTests
之后就可以打包并上传到我们的集群环境中了。
[uplooking@uplooking01 storm]$ cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster
-bash: cn.xpleaf.bigdata.storm.remote.StormSumTopology: command not found
[uplooking@uplooking01 storm]$ storm jar storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster
Running: /opt/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/uplooking/app/storm -Dstorm.log.dir=/home/uplooking/app/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/uplooking/app/storm/lib/log4j-over-slf4j-1.6.6.jar:/home/uplooking/app/storm/lib/reflectasm-1.10.1.jar:/home/uplooking/app/storm/lib/disruptor-3.3.2.jar:/home/uplooking/app/storm/lib/clojure-1.7.0.jar:/home/uplooking/app/storm/lib/objenesis-2.1.jar:/home/uplooking/app/storm/lib/log4j-slf4j-impl-2.1.jar:/home/uplooking/app/storm/lib/slf4j-api-1.7.7.jar:/home/uplooking/app/storm/lib/log4j-core-2.1.jar:/home/uplooking/app/storm/lib/storm-core-1.0.2.jar:/home/uplooking/app/storm/lib/storm-rename-hack-1.0.2.jar:/home/uplooking/app/storm/lib/kryo-3.0.3.jar:/home/uplooking/app/storm/lib/asm-5.0.3.jar:/home/uplooking/app/storm/lib/log4j-api-2.1.jar:/home/uplooking/app/storm/lib/servlet-api-2.5.jar:/home/uplooking/app/storm/lib/minlog-1.3.0.jar:storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/uplooking/app/storm/conf:/home/uplooking/app/storm/bin -Dstorm.jar=storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar cn.xpleaf.bigdata.storm.remote.StormSumTopology cluster
842 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8973061592627522790:-5130577098800003128
934 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds []
1036 [main] INFO o.a.s.StormSubmitter - Uploading topology jar storm-study-1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: /home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
1064 [main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
1064 [main] INFO o.a.s.StormSubmitter - Submitting topology StormSumTopology in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8973061592627522790:-5130577098800003128"}
1710 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: StormSumTopology
注意看输出,jar包被上传到/home/uplooking/data/storm/nimbus/inbox/stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
,后面可以在leader
节点中查看到有该jar包:
[uplooking@uplooking02 inbox]$ pwd
/home/uplooking/data/storm/nimbus/inbox
[uplooking@uplooking02 inbox]$ ls
stormjar-f51fd883-fe67-4cb8-8f61-67c053620fd6.jar
因为此时uplooking01
节点不是leader
,所以在其上面是没有该jar包的,这点需要注意。
可以在storm ui中查看此时的集群状态信息:
再查看详细的Topology信息:
再查看spout或者bolt的详细信息:
可以看到是在uplooking02
上运行的Executors
,此时可以到该节点上查看输出信息:
[uplooking@uplooking02 6700]$ pwd
/home/uplooking/app/storm/logs/workers-artifacts/StormSumTopology-1-1523548000/6700
[uplooking@uplooking02 6700]$ tail -5 worker.log
2018-04-13 00:39:56.636 STDIO [INFO] 商城网站到目前20180413003956的商品总交易额5054610
2018-04-13 00:39:57.636 STDIO [INFO] 当前时间20180413003957产生的订单金额:3181
2018-04-13 00:39:57.637 STDIO [INFO] 商城网站到目前20180413003957的商品总交易额5057790
2018-04-13 00:39:58.638 STDIO [INFO] 当前时间20180413003958产生的订单金额:3182
2018-04-13 00:39:58.639 STDIO [INFO] 商城网站到目前20180413003958的商品总交易额5060971
需要注意的是,此时在uplooking03
上是没有这些信息的,因为集群将作业交给了uplooking02
上的supervisor
来运行。此外还需要知道的是,在uplooking02
的data目录下也可以查看到有前面的jar包,其是由nimbus
分发过来的:
[uplooking@uplooking02 StormSumTopology-1-1523548000]$ pwd
/home/uplooking/data/storm/supervisor/stormdist/StormSumTopology-1-1523548000
[uplooking@uplooking02 StormSumTopology-1-1523548000]$ ls
stormcode.ser stormconf.ser stormjar.jar
但是在uplooking03
上也是没有的。
另外也可以在uplooking02
上使用jps命令查看到有worker
进程:
[uplooking@uplooking02 ~]$ jps
2224 QuorumPeerMain
1858 Elasticsearch
27427 logviewer
2291 NameNode
27972 LogWriter
27988 worker
25878 nimbus
28006 Jps
26054 supervisor
2552 DFSZKFailoverController
2365 DataNode
2462 JournalNode
对于输出信息的查看,其实也可以在storm ui上直接进行查看,上面的界面,点击6700
的链接,就可以进行查看,但是前提是需要先在uplooking02
上运行了logviewer
:
storm logviewer &
查看到的输出如下:
由前面可以知道,目前worker
运行在uplooking02
上,如果在此节点上直接将该进程kill掉,那么其又会自动进行重启:
[uplooking@uplooking02 ~]$ jps | grep worker
27988 worker
[uplooking@uplooking02 ~]$ kill -9 27988
[uplooking@uplooking02 ~]$ jps | grep worker
kill 27988: 没有那个进程
[uplooking@uplooking02 ~]$ kill 27988: 没有那个进程
[uplooking@uplooking02 ~]$ jps | grep worker
28235 worker
当然如果真的希望停掉Topology作业,有两种方式:
第一种是在storm ui的topology界面中进行操作:
Topology actions中有Kill的操作,点击即可
第二种是在命令行中使用命令进行操作:
[uplooking@uplooking01 ~]$ storm kill
Syntax: [storm kill topology-name [-w wait-time-secs]]
-w后接秒数,表示多少秒后将停止该topology作业
再做进一步的验证,如果把三台主机上除了了worker
进程(nimbus、supervisor等)都关掉,那么此时worker
是可以继续正常运行的,数据也会正常产生,只是此时不同的是,不能够再向storm集群中添加作业了。
Storm笔记整理(三):Storm集群安装部署与Topology作业提交
原文地址:http://blog.51cto.com/xpleaf/2097682