码迷,mamicode.com
首页 > 其他好文 > 详细

第一个Storm的程序例子

时间:2015-10-12 21:12:59      阅读:267      评论:0      收藏:0      [点我收藏+]

标签:

 

-----------------------------WordSpout 

 

package com.hzw.storm.Spout;

 

import java.util.Map;

import java.util.Random;

 

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

public class WordSpout extends BaseRichSpout {

 

private SpoutOutputCollector collector;

 

private static final String[] msgs = new String[] { "I have a dream""my dream is to be a data analyst",

"you can do what you are dreaming""don‘t give up your dreams" };

 

private static final Random random = new Random();

 

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector c) {

// 初始化,只执行1次

this.collector = c;

}

 

@Override

public void nextTuple() {

//不停的被调用

String sentence = msgs[random.nextInt(4)];

this.collector.emit(new Values(sentence));

 

}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

//给数据值加一个标签

declarer.declare(new Fields("sentence"));

}

 

}

 

-----------------------------SplitSentenceBolt 

package com.hzw.storm.bolt;

 

import java.util.Map;

 

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.IBasicBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class SplitSentenceBolt implements IBasicBolt {

 

@Override

public void prepare(Map conf, TopologyContext context) {

// 一次性的初始化工作

}

 

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String sentence = tuple.getString(0);

for (String word : sentence.split(" ")) {

collector.emit(new Values(word));

}

}

 

@Override

public void cleanup() {

 

}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

 

@Override

public Map<String, Object> getComponentConfiguration() {

 

return null;

}

 

}

 

-----------------------------WordCountBolt

package com.hzw.storm.bolt;

 

import java.util.HashMap;

import java.util.Map;

 

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.IBasicBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class WordCountBolt implements IBasicBolt {

private Map<String, Integer> _counts = new HashMap<String, Integer>();

 

@Override

public void prepare(Map arg0, TopologyContext arg1) {

 

}

 

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String word = tuple.getString(0);

int count;

if (_counts.containsKey(word)) {

count = _counts.get(word);

else {

count = 0;

}

count++;

_counts.put(wordcount);

collector.emit(new Values(wordcount));

 

}

 

@Override

public void cleanup() {

 

}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word""count"));

}

 

@Override

public Map<String, Object> getComponentConfiguration() {

return null;

}

 

}

 -----------------------------MyTopology

 

package com.hzw.storm.topology;

 

import com.hzw.storm.Spout.WordSpout;

import com.hzw.storm.bolt.SplitSentenceBolt;

import com.hzw.storm.bolt.WordCountBolt;

 

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

 

public class MyTopology {

 

public static void main(String[] args) {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("WordSpout"new WordSpout(), 2);

builder.setBolt("SplitSentenceBolt"new SplitSentenceBolt(), 10).shuffleGrouping("WordSpout");

builder.setBolt("WordCountBolt"new WordCountBolt(), 20).fieldsGrouping("SplitSentenceBolt"new Fields("word"));

}

 

}

 

这样,整个拓扑图就写好了。

关于拓扑图的具体的意思,请参考文章:http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis

 

可以先小摘片段放在这里备忘:

Stream Groupings:

Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型:

1. 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。

2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。

3. 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。

4. 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。

5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。

6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。

 

 

第一个Storm的程序例子

标签:

原文地址:http://my.oschina.net/qiangzigege/blog/516035

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!