码迷,mamicode.com
首页 > 其他好文 > 详细

Storm实现单词计数

时间:2015-08-18 06:27:39      阅读:124      评论:0      收藏:0      [点我收藏+]

标签:

package com.mengyao.storm;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.io.FileUtils;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
 * Storm中的单词计数,拓扑结构为InputSpout->SplitBolt->CountBolt = WordCountTopology
 * @author mengyao
 *
 */
@SuppressWarnings("all")
public class WordCountTopology {

    public static class InputSpout extends BaseRichSpout{

        private Map conf;
        private TopologyContext context;
        private SpoutOutputCollector collector;
        
        /**
         * 实例化该Spout时预处理,仅会被调用一次,类似于MapReduce中Mapper/Reducer的setup()方法
         */
        @Override
        public void open(Map conf, TopologyContext context,
                SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 死循环发射每行消息
         */
        @Override
        public void nextTuple() {
            Collection<File> listFiles = FileUtils.listFiles(new File("D:/"), new String[]{"log"}, false);
            for (File file : listFiles) {
                try {
                    List<String> lines = FileUtils.readLines(file);
                    for (String line : lines) {
                        this.collector.emit(new Values(line));
                        System.err.println("==== InputSpout:"+line+" ====");
                    }
                    FileUtils.moveFile(file, new File(file.getAbsoluteFile()+".tmp"));
                } catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }

        /**
         * 声明字段“line”提供给下一个Bolt组件订阅
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
        
    }
    
    public static class SplitBolt extends BaseRichBolt{

        private Map stormConf;
        private TopologyContext context;
        private OutputCollector collector;
        
        /**
         * 实例化该Bolt时预处理,仅会被调用一次,类似于MapReduce中Mapper/Reducer的setup()方法
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            this.stormConf = stormConf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 死循环发送每个单词
         */
        @Override
        public void execute(Tuple input) {
            String line = input.getStringByField("line");
            String[] words = line.split("\t");
            for (String word : words) {
                this.collector.emit(new Values(word));
                System.err.println("==== SplitBolt:"+word+" ====");
            }
        }

        /**
         * 声明字段“word”提供给下一个Bolt组件订阅
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
        
    }
    
    public static class CountBolt extends BaseRichBolt{

        private Map stormConf;
        private TopologyContext context;
        private OutputCollector collector;
        HashMap<String, Long> map = new HashMap<String, Long>();
        
        /**
         * 实例化该Bolt时预处理,仅会被调用一次,类似于MapReduce中Mapper/Reducer的setup()方法
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            this.stormConf = stormConf;
            this.context = context;
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
            Long value = map.get(word);
            if (value==null) {
                value=0L;
            }
            value++;
            map.put(word, value);
            for (Entry<String, Long> entry : map.entrySet()) {
                System.err.println("==== CountBolt:"+entry+" ====");
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
        
    }

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        String topologyName = WordCountTopology.class.getSimpleName();
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("input", new InputSpout());
        builder.setBolt("split", new SplitBolt()).shuffleGrouping("input");
        builder.setBolt("count", new CountBolt()).shuffleGrouping("split");
        
        Config config = new Config();
        config.setDebug(true);
        
        if (args!=null && args.length>0) {        //如果是生产环境中使用集群模式提交拓扑
            config.setNumWorkers(3);
            StormSubmitter.submitTopology(topologyName, config, builder.createTopology());
        } else {                                      //否则使用本地模式提交拓扑
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, config, builder.createTopology());
            Utils.sleep(1000*100);
            cluster.killTopology(topologyName);
            cluster.shutdown();
        }
        
    }
}

依赖的jar包如下图:

技术分享

Storm实现单词计数

标签:

原文地址:http://www.cnblogs.com/mengyao/p/4738198.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!