码迷,mamicode.com
首页 > 其他好文 > 详细

storm 事务和DRPC结合

时间:2015-10-24 10:05:07      阅读:253      评论:0      收藏:0      [点我收藏+]

标签:

示例代码:

package com.lky.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


/**
 * @Title: ReachTopology.java
 * @Package com.lky.topology
 * @Description: 计算一个包含特定url的微博,最终能被多少人看到
 * @author lky
 * @date 2015年10月23日 下午9:09:22
 * @version V1.0
 */
public class ReachTopology {
    private static Log log = LogFactory.getLog(ReachTopology.class);
    public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {
        {
            put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
            put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
            put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
        }
    };

    public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {
        {
            put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
            put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
            put("tim", Arrays.asList("alex"));
            put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
            put("adam", Arrays.asList("david", "carissa"));
            put("mike", Arrays.asList("john", "bob"));
            put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
        }
    };

    /**
     * @Title: ReachTopology.java
     * @Package com.lky.topology
     * @Description: 获取包含该特定url的所有用户,随机发放到下游bolt中
     * @author lky
     * @date 2015年10月23日 下午11:46:19
     * @version V1.0
     */
    @SuppressWarnings("serial")
    public static class GetTweeters extends BaseBasicBolt {

        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            Object id = null;
            String url = null;
            try {
                id = input.getValue(0);
                url = input.getString(1);
                List<String> tweeters = new ArrayList<String>();//获取包含该url的所有用户

                if (null != id && null != url) {
                    tweeters = TWEETERS_DB.get(url);
                    if (null != tweeters) {
                        for (String tweeter : tweeters) {
                            log.info("execute1------>[id = " + id + " ]["+url+"---->tweeter=" + tweeter + "]");
                            collector.emit(new Values(id, tweeter));
                        }
                    }
                }
            } catch (Exception e) {
                log.error("execute 发射消息错误!!!!!");
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "tweeter"));
        }

    }

    /**
     * @Title: ReachTopology.java
     * @Package com.lky.topology
     * @Description:获取每一个用户的粉丝,然后按字段分组(id,fllower)到下游bolt中,保证同一类url的相同用户被分到相同的批次
     * @author lky
     * @date 2015年10月23日 下午11:47:45
     * @version V1.0
     */
    @SuppressWarnings("serial")
    public static class GetFollowers extends BaseBasicBolt {

        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            Object id = null;
            String _follower = null;

            try {
                id = input.getValue(0);
                _follower = input.getString(1);
                List<String> followers = new ArrayList<String>();

                if (null != id && null != _follower) {
                    followers = FOLLOWERS_DB.get(_follower);//获取该用户的所有粉丝
                    if (null != followers) {
                        for (String follower : followers) {
                            log.info("execute2------>[id = " + id + " ]["+_follower+"------>follower=" + follower + "]");
                            collector.emit(new Values(id, follower));
                        }
                    }
                }

            } catch (Exception e) {
                log.error("execute 发射消息异常!!!");
            }

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "follower"));
        }

    }

    /**
     * @Title: ReachTopology.java
     * @Package com.lky.topology
     * @Description: 按批次统计粉丝数量
     * @author lky
     * @date 2015年10月23日 下午11:50:51
     * @version V1.0
     */
    @SuppressWarnings({ "serial", "rawtypes" })
    public static class PartialUniquer extends BaseBatchBolt {
        private BatchOutputCollector collector;
        private Object id;
        private Set<String> _followerSet = new HashSet<String>();

        @Override
        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
            this.collector = collector;
            this.id = id;
        }

        @Override
        public void execute(Tuple tuple) {
            String uname = null;

            try {
                uname = tuple.getString(1);
                if (null != uname) {
                    log.info("execute3------>[id = " + tuple.getValue(0) + " ][ uname=" + uname + "]");
                    _followerSet.add(uname);
                }
            } catch (Exception e) {
                log.error("execute 接收消息异常!!!");
            }
        }

        @Override
        public void finishBatch() {
            log.info("execute4------>[id = " + id + " ][ size=" + _followerSet.size() + "]");
            collector.emit(new Values(id, _followerSet.size()));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "count"));
        }

    }

    /**
     * @Title: ReachTopology.java
     * @Package com.lky.topology
     * @Description: 按相同的id汇总批次
     * @author lky
     * @date 2015年10月23日 下午11:51:49
     * @version V1.0
     */
    @SuppressWarnings({ "serial", "rawtypes" })
    public static class CountAggregator extends BaseBatchBolt {
        private BatchOutputCollector collector;
        private Object id;
        private int _count = 0;

        @Override
        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
            this.collector = collector;
            this.id = id;
        }

        @Override
        public void execute(Tuple tuple) {
            Integer count = null;
            try {
                count = tuple.getInteger(1);
                log.info("execute5------>[id = " + tuple.getValue(0) + " ][ count=" + count + "]");
                _count += count;
            } catch (Exception e) {
                log.error("execute 接收消息异常");
            }
        }

        @Override
        public void finishBatch() {
            collector.emit(new Values(id, _count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }
    }

    @SuppressWarnings("deprecation")
    public static LinearDRPCTopologyBuilder construct() {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
        builder.addBolt(new GetTweeters(), 4);
        builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
        builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
        builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
        return builder;
    }

    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        LinearDRPCTopologyBuilder builder = construct();

        Config conf = new Config();
        conf.setMaxTaskParallelism(3);
        LocalDRPC drpc = new LocalDRPC();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));

        String[] urlsToTry = new String[] { "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
        for (String url : urlsToTry) {
            System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
        }

        Utils.sleep(1000 * 10);
        cluster.shutdown();
        drpc.shutdown();
    }
}

storm 事务和DRPC结合

标签:

原文地址:http://www.cnblogs.com/dmir/p/4906210.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!