标签:storm maven 集群 thrift eclipse
本文描述将一个简单的HelloWorld,提交到JStorm中运行。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <repositories> <repository> <id>github-releases</id> <url>http://oss.sonatype.org/content/repositories/github-releases/</url> </repository> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> <repository> <id>twitter4j</id> <url>http://twitter4j.org/maven2</url> </repository> </repositories> <dependencies> ... <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.9.0</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.0.2</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>由于JStorm的artifactId目前无法连接,所以,用storm的,一样可以编译和运行。
import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; /** * Hello world! * */ public class HelloWorldBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private int myCount = 0; /* * prepare() => on create */ @Override public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } /* * execute() => most important method in the bolt is execute(Tuple input), * which is called once per tuple received the bolt may emit several tuples * for each tuple received */ @Override public void execute(Tuple tuple) { String test = tuple.getStringByField("sentence"); if (test == "Hello World") { myCount++; System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount)); } } /* * declareOutputFields => This bolt emits nothing hence no body for * declareOutputFields() */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * Hello world! * */ public class HelloWorldSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; private int referenceRandom; private static final int MAX_RANDOM = 10; public HelloWorldSpout() { final Random rand = new Random(); referenceRandom = rand.nextInt(MAX_RANDOM); } /* * declareOutputFields() => you need to tell the Storm cluster which fields * this Spout emits within the declareOutputFields method. */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } /* * open() => The first method called in any spout is 'open' TopologyContext * => contains all our topology data SpoutOutputCollector => enables us to * emit the data that will be processed by the bolts conf => created in the * topology definition */ @Override public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) { this.collector = collector; } /* * nextTuple() => Storm cluster will repeatedly call the nextTuple method * which will do all the work of the spout. nextTuple() must release the * control of the thread when there is no work to do so that the other * methods have a chance to be called. */ @Override public void nextTuple() { final Random rand = new Random(); int instanceRandom = rand.nextInt(MAX_RANDOM); if (instanceRandom == referenceRandom) { collector.emit(new Values("Hello World")); } else { collector.emit(new Values("Other Random Word")); } } }
import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; /** * Hello world! * */ public class HelloWorldTopology { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10); builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 1).shuffleGrouping("randomHelloWorld"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) {// 如果在JStrom集群中运行 conf.setNumWorkers(3); // JStorm 安装完后,默认的NIMBUS端口配置为7672 conf.put(Config.NIMBUS_THRIFT_PORT, 7672); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } } }
# $JSTORM_HOME/bin/jstorm jar HelloWorld.jar com.test.jstorm.HelloWorldTopology HelloWorld提交后,再刷新JStorm的管理页面,可看到是否提交成功。
标签:storm maven 集群 thrift eclipse
原文地址:http://blog.csdn.net/szzhaom/article/details/41792023