码迷,mamicode.com
首页 > 其他好文 > 详细

高并发计算总数和去重

时间:2015-12-25 13:39:04      阅读:247      评论:0      收藏:0      [点我收藏+]

标签:

spout:

package com.storm.WordCount;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

public class MyRandomSentenceSpout extends BaseRichSpout {
    
    private static final long serialVersionUID = 1L;
    
      SpoutOutputCollector _collector;
      Random _rand;
      String[] sentences = new String[]{ "a b c d","b c d e f","a d e f"};
  
  @Override
  public void nextTuple() {
    for (String sen : sentences) {
        _collector.emit(new Values(sen));
    }
    Utils.sleep(1000*5);
  }

  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      _collector = collector;
      _rand = new Random();
  }

  @Override
  public void ack(Object id) {
  }

  @Override
  public void fail(Object id) {
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }

}

 

splitBolt

package com.storm.WordCount;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MySplit  extends BaseBasicBolt{
    private static final long serialVersionUID = 1L;
    
    //切割符
    String patton;
    public MySplit(String patton){
        this.patton = patton;
    }

    
    /**
     * ask 被管理 ,不需要显式调
     */
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        
        try {
            String sen = input.getStringByField("word");
            if (sen != null) {
                for (String word : sen.split(patton)) {
                    //必须Values List类型
                    collector.emit(new Values(word));
                }
            }
        } catch (Exception e) {
            throw new FailedException("split fail!");
        }
    }
    
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
        declarer.declare(new Fields("word"));
        
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
    
    @Override
    public void cleanup() {
        
    }

}

 

wordCountBolt:

package com.storm.WordCount;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordCount  extends BaseBasicBolt {
    private static final long serialVersionUID = 1L;
    
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        
      String word = tuple.getString(0);
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      
      count++;
      counts.put(word, count);
      collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word", "count"));
    }
}

 

sumBolt:

package com.storm.WordCount;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class SumBolt implements IBasicBolt {

    private static final long serialVersionUID = 1L;
    
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        
        try {
            //总数个数
            Long words_um =0L;
            //去重总数个数
            long word_count =0L;
            
            String word = input.getString(0);
            Integer count = input.getInteger(1);
            counts.put(word, count);
            
            Iterator<Integer> i = counts.values().iterator();
            
            //获取总数,遍历counts的values,进行sum
            while (i.hasNext()) {
                words_um += i.next();
            }
            
            //获取word去重的个数,遍历counts的keySet,取count    
            Iterator<String> i2 = counts.keySet().iterator();
            while (i2.hasNext()) {
                
                String oneWord =i2.next();
                if(oneWord != null){
                    word_count ++;
                }
            }
            
            System.err.println("  总数  =  "+words_um +" 去重总数   = "+word_count);
        } catch (Exception e) {
            throw new FailedException("split fail!");
        }
    }
    
    
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        
    }
    @Override
    public void cleanup() {
        
    }

}

 

Topo:

package com.storm.WordCount;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;


public class WordCountTopology {
  public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new MyRandomSentenceSpout(), 1);

    //分割
    builder.setBolt("split", new MySplit(" "), 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
    
    //全局汇总必须开单线程
    builder.setBolt("sum", new SumBolt(), 1).shuffleGrouping("count");

    
    Config conf = new Config();
    conf.setDebug(true);


    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else {
      conf.setMaxTaskParallelism(3);

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());

//      Thread.sleep(10000);
//
//      cluster.shutdown();
    }
  }
}

高并发计算总数和去重

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5075568.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!