码迷,mamicode.com
首页 > 其他好文 > 详细

storm实现求和操作

时间:2019-03-09 20:20:14      阅读:172      评论:0      收藏:0      [点我收藏+]

标签:@param   初始化   etop   上下文   调用   col   接受   .sh   pac   

storm求和简单操作

 

package com.xiaodao.big;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * 累积求和
 */
public class LocalSumStormTopology {

    /**
     * spout 需要继承baserichspout
     * 数据源需要产生并发送数据
     */
    public static class DataSourceSpout extends BaseRichSpout{


        private SpoutOutputCollector collector;
        /**
         * 初始化方法只会被调用一次
         *
         * @param conf 配置参数
         * @param context   上下文
         * @param collector 数据发射器
         */
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }

            int num = 0;
        /**
         * 会产生数据,在生产上肯定是从消息队列中获取数据
         * 这个方法是一个死循环,是因为storm一直运行,会一直不行的执行
         */
        public void nextTuple() {
            collector.emit( new Values(num++));
            System.out.println("Spout:发送 "+ num);
                Utils.sleep(2000);

        }

        /**
         * 声明下一个blot接受的名称,不然blot不知道接受到了什么
         * @param declarer
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("no"));
        }
    }

    /**
     * 数据的累积求和 blot,接受数据,并处理
     */
    public static class SumBlot extends BaseRichBolt{

        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        int sum =0;
        /**
         * 也是一个自旋锁.(死循环)
         * @param input
         */
        public void execute(Tuple input) {
            //这里获取方式有很多
            Integer no =  input.getIntegerByField("no");
            sum +=no;
            System.out.println("Blot: sum = ["+ sum+"]");

        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    public static void main(String[] args) {
        //任何一个作业都需要topology
        //需要控制好blot spout 顺序
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout",new DataSourceSpout(),1);
        builder.setBolt("SumBlot",new SumBlot(),1).shuffleGrouping("DataSourceSpout");
        Config conf = new Config();
        conf.setNumWorkers(2);
        //如果到200个消息就不发送了
        conf.setMaxSpoutPending(200);
        //创建一个本地的模式,不需要搭建
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalSumStormTopology",conf,builder.createTopology());
    }
}

执行运行就可以

storm实现求和操作

标签:@param   初始化   etop   上下文   调用   col   接受   .sh   pac   

原文地址:https://www.cnblogs.com/bj-xiaodao/p/10502756.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!