码迷,mamicode.com
首页 > 其他好文 > 详细

Storm入门(三)HelloWorld示例

时间:2017-06-09 00:52:29      阅读:344      评论:0      收藏:0      [点我收藏+]

标签:play   tor   main   usr   junit   pen   exti   tput   util   

一、关联代码

使用maven,代码如下。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.ljh.storm</groupId>
  <artifactId>storm-helloworld</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>storm-helloworld</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>1.1.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.6</version>
    </dependency>
  </dependencies>
</project>

ExclamationTopology.java

package cn.ljh.storm.helloworld;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

public class ExclamationTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("word", new TestWordSpout(), 1);
        builder.setBolt("exclaim", new ExclamationBolt(), 1).shuffleGrouping("word");
        builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("exclaim");

        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
          conf.setNumWorkers(3);

          StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        }
        else {

          LocalCluster cluster = new LocalCluster();
          cluster.submitTopology("test3", conf, builder.createTopology());
          Utils.sleep(10000);
          cluster.killTopology("test3");
          cluster.shutdown();
        }
      }
}

TestWordSpout.java

package cn.ljh.storm.helloworld;

import org.apache.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class TestWordSpout extends BaseRichSpout {
   public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
   SpoutOutputCollector _collector;
       
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
       _collector = collector;
   }
   
   public void nextTuple() {
       Utils.sleep(100);
       final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
       final Random rand = new Random();
       final String word = words[rand.nextInt(words.length)];
       _collector.emit(new Values(word));
   }
   
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("word"));
   }
}

ExclamationBolt.java

package cn.ljh.storm.helloworld;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }

  }

PrintBolt.java

package cn.ljh.storm.helloworld;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrintBolt extends BaseRichBolt {
        private static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
          _collector = collector;
        }

        public void execute(Tuple tuple) {
          LOG.info(tuple.getString(0) + " Hello World!");
          _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
}

 

storm有本地模式和集群模式。

本地模式一般用于测试和开发阶段,直接在Eclipse执行ExclamationTopology的main函数进行。

集群模式需要先把应用达成jar,然后使用storm命令提交到集群中去。

提交命令:storm jar /home/test/storm-helloworld-0.0.1-SNAPSHOT.jar cn.ljh.storm.helloworld.ExclamationTopology ExclamationTest

杀死命令:storm kill ExclamationTest

 

二、集群运行效果

运行提交命令后,出现如下log,说明提交成功。

技术分享

查看集群的进程jps,两个Supervisor节点出现了worker进程

技术分享技术分享

在Nimbus节点的/usr/local/storm/data/nimbus/inbox下面有提交的jar

技术分享

UI界面显示提交topology

技术分享

技术分享

技术分享

 

至此HelloWorld示例完成。

Storm入门(三)HelloWorld示例

标签:play   tor   main   usr   junit   pen   exti   tput   util   

原文地址:http://www.cnblogs.com/hd3013779515/p/6965311.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!