码迷,mamicode.com
首页 > 其他好文 > 详细

Storm 测试

时间:2019-07-19 12:40:50      阅读:96      评论:0      收藏:0      [点我收藏+]

标签:reduce   static   auto   boolean   ash   inf   jni   发送   必须   

本文将学习如何使用java创建Storm拓扑并将其部署到Storm集群。

Storm集群的组件

Storm集群类似于Hadoop集群,只不过 Hadoop 上运行"MapReduce jobs", Storm 上运行"topologies"。
两者最大的差别是,MapReducejobs 最终是完成的,而 topologies 是一直处理消息(或直到你杀死它)。

集群 任务名称 任务时效性
Storm topologies(拓扑) 一直处理消息(或直到你杀死它)
Hadoop MapReduce jobs 最终是完成的

Storm集群上有两种节点:master 和 worker 节点

  • master:
    运行一个名为 Nimbus 的守护进程,
    负责在集群周围分发代码,
    为机器分配任务以及监控故障。
    (类似 Hadoop 的 JobTracker)
  • worker:
    运行一个名为 Supervisor 的守护进程,
    负责监听、并根据需要启动、停止 "Nimbus" 分配给其的任务。
    每个工作进程都执行拓扑的子集。 运行拓扑由分布在许多计算机上的许多工作进程组成。

Nimbus 和 Supervisors 之间的协调是通过 Zookeeper 实现的。
此外,Nimbus 守护程序和 Supervisors 守护程序是 fail-fast 和 stateless;
所有状态都保存在Zookeeper或本地磁盘上。这意味着你可以通过 kill -9 杀死 Nimbus 或者 Supervisors ,但是它们会像没事一样重新开始。
这种设计使Storm集群非常稳定。
技术图片

Topologies

要想在 Storm 上进行实时计算,你需要创建一个 topologies 。
topologies 是一个计算图,topologies中的每个节点包含计算逻辑,并且通过节点之间的连接定义了数据在节点之间的流动方向。

运行拓扑很简单。首先,将所有代码和依赖项打包到一个jar中。然后,运行如下命令:

storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
额 这个命令没啥好解释的....

Streams

Stream 是一个无限的元组序列,是 Storm 抽象的核心。
Storm 提供了以分布式和可靠的方式进行 Stream 传换的原语。
比如将微博 Stream 转换为转换热门主题 Stream。

Storm 为进行 Stream 转换提供 spouts 和 bolt 两个基本原语。

  • spouts
    spouts 是 Stream 的来源。
    例如,spout可以读取Kestrel队列中的元组并将其作为 Stream 发出。或者 spout 可以连接到Twitter API并发出推文流。
  • bolt
    bolt会消耗任意数量的输入流,进行一些处理,并可能发出新的流。
    像从推文流计算趋势主题流之类的复杂流转换,需要多个步骤,因此需要多个 bolt 。
    bolt 可以执行任何操作,包括运行函数,过滤元组,进行流聚合,进行流连接,与数据库对话等等。

spout 和 bolt 网络被打包成一个 topology ,这是提交给 Storm 集群执行的顶级抽象。
拓扑是流转换的图形,其中每个节点都是一个 spout 或 bolt 。
图中的表示哪些 bolt 订阅了哪些流。
当一个 spout 或 bolt 向一个流发出一个元组时,它会将元组发送给订阅该流的每个 bolt 。

技术图片

拓扑中节点之间的链接指示应如何传递元组。
如上图,Spout A 和Bolt B 之间有链接,Spout A 到 Bolt C 之间有链接,以及从 Bolt B 到 Bolt C 之间有链接。
那么每次 Spout A 发出一个元组时,它都会将元组发送给 Bolt B 和 Bolt C .所有 Bolt B 的输出元组也将送给 Bolt C.

Storm拓扑中的每个节点并行执行。
在拓扑中,你可以为每个节点指定所需的并行度,Storm将在集群中生成该数量的线程以执行。

拓扑会一直执行(或直到你杀死它)。
Storm会自动重新分配失败的任务。
此外,Storm保证不会丢失数据,即使计算机出现故障并且消息丢失也是如此。


Data model

Storm使用元组作为其数据模型。
元组是一个命名的值列表,元组中的字段可以是任何类型的对象。
Storm支持所有原始类型,字符串和字节数组作为元组字段值。
要使用其他类型的对象,需要为该类型实现一个序列化程序。

拓扑中的每个节点都必须声明它发出的元组的输出字段。
例如下面代码中的bolt 声明它发出2元组,字段为 "double" 和 "triple"

package com.aaa.test;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * @author lillcol
 * 2019/7/18-11:46
 */
public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollector _collector;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));//声明["double", "triple"]组件的输出字段
    }
}

一个简单的拓扑(A simple topology)

如何实现一个简单的拓扑?
本地 idea测试
sbt构建

// libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0" % "provided"
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0"

定义一个Spout,此处采用随机数

package com.test.storm;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

/**
 * @author lillcol
 * 2019/7/18-12:03
 */
public class TestWordSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        //Spouts负责向拓扑中发送新消息
        Utils.sleep(100);
        //每隔100ms就会从列表中随机选一个单词发出
        final String[] words = new String[]{"hellow", "lillcol", "study", "storm"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        collector.emit(new Values(word));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

定义Bolt,功能接收到的信息追加"levelUp!"

package com.test.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * @author lillcol
 * 2019/7/18-12:04
 */
public class ExclamationBolt extends BaseRichBolt {
    OutputCollector collector;

   //prepare方法为 Bolt 提供了一个OutputCollector用于从 Bolt 中发出元组 。
   //元组可以随时的从prepare,execute,cleanup,甚至在另一个线程中异步发出。
   //当前prepare实现只是将OutputCollector作为实例变量保存,以便稍后在execute方法中使用。
    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.collector =collector;
    }
    //execute方法从一个Bolt的输入接收一个元组。
    //此execute取数组的第一个字段并发出追加字符串“levelUp!” 得字符串。
    //如果您实现了一个订阅多个输入源的bolt,您可以通过使用Tuple#getSourceComponent方法找出Tuple来自哪个组件。
    @Override
    public void execute(Tuple input) {
        String sourceComponent = input.getSourceComponent();
        //输入元组作为第一个参数传递emit
        collector.emit(input, new Values(input.getString(0) + "levelUp!"));
        System.out.println(input.getString(0));
        // 输入元组在最后一行被激活。这些是Storm的可靠性API的一部分,用于保证不会丢失数据
        collector.ack(input);
    }

    //declareOutputFields方法声明ExclamationBolt发出1元组,其中一个字段称为“word”。
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

//如果implements IRichBol
//还需要重写下面两个方法
//当Bolt被关闭时调用cleanup方法,并且应该清除所有打开的资源。
//无法保证在集群上调用此方法:例如,如果任务正在运行的计算机爆炸,则无法调用该方法。
 @Override
    public void cleanup() {
    }
//getComponentConfiguration方法允许配置此组件运行方式的各个方面    
@Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
//但是一般情况下我们不需要这两个方法,所以我们可以通过继承BaseRichBolt来定义Bolt

定义调用类

package com.test.storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.shade.org.apache.jute.Utils;
import org.apache.storm.topology.TopologyBuilder;


/**
 * @author lillcol
 * 2019/7/18-12:03
 */
public class SimpleTopology {
    public static void main(String[] args) throws Exception {
        SimpleTopology topology = new SimpleTopology();
        topology.runLocal(60);

    }

    public void runLocal(int waitSeconds) throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //第一个参数是给Spout一个id "words", 
        //第二个参数是要调用的Spout类
        //第三个参数是节点所需的并行度,是可选的。它指示应在群集中执行该组件的线程数。如果省略它,Storm将只为该节点分配一个线程。
        topologyBuilder.setSpout("words", new TestWordSpout(), 1);
        //Bolt的参数与Spout
        //只是要通过shuffleGrouping 指定数据来源"words")
        //“shuffle grouping”意味着元组应该从输入任务随机分配到bolt的任务中。
        topologyBuilder.setBolt("DoubleAndTripleBolt1", new ExclamationBolt(), 1)
                .shuffleGrouping("words");
        //一个Bolt可以接收多个数据来源,是要多次调用shuffleGrouping即可       
        topologyBuilder.setBolt("DoubleAndTripleBolt2", new ExclamationBolt(), 1)
                .shuffleGrouping("DoubleAndTripleBolt1")
                .shuffleGrouping("words");

        //loacl 测试
        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("word_count", config, topologyBuilder.createTopology());

        org.apache.storm.utils.Utils.sleep(1000*10);
        cluster.killTopology("word_count");
        cluster.shutdown();
    }
}
运行结果:
study
study
studylevelUp!
study
study
studylevelUp!
hellow
hellow
hellowlevelUp!
lillcol
lillcol
lillcollevelUp!
hellow
hellow
hellowlevelUp!
hellow
hellow
hellowlevelUp!
lillcol
lillcol
lillcollevelUp!
. . .

异常

可能出现异常1:

java.lang.NoClassDefFoundError: org/apache/storm/topology/IRichSpout
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.storm.topology.IRichSpout
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more
Error: A JNI error has occurred, please check your installation and try again

这个是因为在sbt构建的时候  % "provided" 意思是已提供相关jar,但是我们idea测试的时候并没有相关jar
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0" % "provided"

所以不能用上面的语句,改成下面的即可
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0"

maven 对应着改就可以了

可能出现异常2:

Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.log4j.LogManager.getLogger(LogManager.java:44)
    at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
    at org.apache.storm.LocalCluster.<clinit>(LocalCluster.java:128)
    at com.test.storm.SimpleTopology.runLocal(SimpleTopology.java:28)
    at com.test.storm.SimpleTopology.main(SimpleTopology.java:16)
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
    at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
    ... 7 more
    
错误报的很明显
log4j-over-slf4j.jar AND slf4j-log4j12.jar  冲突了
我的解决办法是在测试的时候随便删掉一个,但是生产的时候在可能冲突的依赖中把它去掉

Storm 的 的hellow word(word count)
//定义Spout WordReader
package com.test.storm;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

/**
 * @author lillcol
 * 2019/7/19-9:17
 */
public class WordReader extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;

    /**
     * open方法,接收三个参数:
     * 第一个是创建Topology的配置,
     * 第二个是所有的Topology数据
     * 第三个是用来把Spout的数据发射给bolt
     **/
    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            //获取创建Topology时指定的要读取的文件路径
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["
                    + conf.get("wordFile") + "]");
        }
        //初始化发射器
        this.collector = collector;
    }

    /**
     * nextTuple是Spout最主要的方法:
     * 在这里我们读取文本文件,并把它的每一行发射出去(给bolt)
     * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下
     **/
    @Override
    public void nextTuple() {
        if (completed) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return;
        }
        String str;
        BufferedReader bufferedReader = new BufferedReader(fileReader);
        try {
            while ((str = bufferedReader.readLine()) != null) {
                //发送一行
                collector.emit(new Values(str), str);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            completed = true;
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}
//定义Bolt WordSplit 实现切割
package com.test.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author lillcol
 * 2019/7/19-9:38
 */
public class WordSplit implements IRichBolt {
    private OutputCollector collector;


    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    /**
     * execute是bolt中最重要的方法:
     * 当接收到一个tuple时,此方法被调用
     * 这个方法的作用就是把接收到的每一行切分成单个单词,并把单词发送出去(给下一个bolt处理)
     **/
    @Override
    public void execute(Tuple input) {
        String line = input.getString(0);
        String[] words = line.split(",| |\\|");
        for (String word : words) {
            word = word.trim();
            if (!word.isEmpty()) {
                List a = new ArrayList();
                a.add(input);
                collector.emit(a, new Values(word));
            }
        }
        collector.ack(input);
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
//定义Bolt WordCounter 实现统计
package com.test.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
 * @author lillcol
 * 2019/7/19-10:01
 */
public class WordCounter implements IRichBolt {
    Integer id;
    String name;
    Map<String, Integer> counters;
    private OutputCollector collector;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void execute(Tuple input) {
        String str = input.getString(0);
        if (!counters.containsKey(str)) {
            counters.put(str, 1);
        } else {
            counters.put(str, counters.get(str) + 1);
        }
        collector.ack(input);
    }

    @Override
    public void cleanup() {
        System.out.println("--Word Counter [" + name + "-" + id + "] --");
        for (Map.Entry<String, Integer> entry : counters.entrySet()) {
            System.out.println(entry.getKey() + ":" + entry.getValue());
        }
        counters.clear();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
//定义主类
package com.test.storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

/**
 * @author lillcol
 * 2019/7/19-10:33
 */
public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordReader",new WordReader(),1);
        topologyBuilder.setBolt("WordSplit",new WordSplit(),1)
                .shuffleGrouping("wordReader");
        topologyBuilder.setBolt("",new WordCounter(),2)
                .shuffleGrouping("WordSplit");

        //配置
        Config config = new Config();
        config.put("wordsFile","D:\\stromFile");
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        //创建一个本地模式cluster
        LocalCluster localCluster = new LocalCluster();
        //提交Topology
        localCluster.submitTopology("WordCountTopology",config,topologyBuilder.createTopology());

        Thread.sleep(2000);//这个时间要控制好,太短看不到效果
        localCluster.shutdown();

    }
}
//输出结果
11:35:16.845 [SLOT_1024] INFO  o.a.s.e.ExecutorShutdown - Shutting down executor :[2, 2]
11:35:16.845 [Thread-37--executor[2, 2]] INFO  o.a.s.u.Utils - Async loop interrupted!
--Word Counter [-2] --
Thread[SLOT_1027:73
40673ms:1
11:34:31.865:1
30724ms:1
11:34:23.065:1
11:34:27.365:1
. . .


11:35:16.846 [SLOT_1024] INFO  o.a.s.e.ExecutorShutdown - Shut down executor :[2, 2]
11:35:16.846 [SLOT_1024] INFO  o.a.s.e.ExecutorShutdown - Shutting down executor :[1, 1]
11:35:16.846 [Thread-38--executor[1, 1]] INFO  o.a.s.u.Utils - Async loop interrupted!
--Word Counter [-1] --
Thread[SLOT_1027:87
11:34:31.465:1
29524ms:1
26024ms:1
. . . 

Storm 测试

标签:reduce   static   auto   boolean   ash   inf   jni   发送   必须   

原文地址:https://www.cnblogs.com/lillcol/p/11212383.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!