码迷,mamicode.com
首页 > 其他好文 > 详细

Storm Starter例子RollingTopWords代码学习

时间:2015-04-26 21:14:51      阅读:825      评论:0      收藏:0      [点我收藏+]

标签:storm


作者对这个例子的说明:这里

RollingTopWords实现了每过N分钟更新M分钟内最热门话题,如每1分钟刷新一次过去5分钟的热门话题。

  • 实现思路
    • spouts负责推送所有话题(words),同一话题推送给同一个RollingCountBolt task(使用Storm的fieldsGrouping实现)
    • RollingCountBolt接收推送的话题,并保存话题的出现次数。每过1分钟,RollingCountBolt将每个话题的出现次数推送给中间处理节点IntermediateRankingsBolt(同样使用Storm的fieldsGrouping实现)
    • IntermediateRankingsBolt内部保存一个TOP N的List,接收到推送过来的消息后刷新TOP N(根据话题发表次数排序word)。每过两秒钟将结果推送给最终节点TotalRankingsBolt。
    • TotalRankingsBolt将2秒内收到的各个TOP N进行汇总后,选出最终的TOP N。

  • 关键实现说明
    • Strom提供了类似Timer的机制,每过指定的时间系统自动发一个Tuple给Bolt,可通过如下代码在Bolt中设置

@Override
public Map<String, Object> getComponentConfiguration() {
    Map<String, Object> conf = new HashMap<String, Object>();
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
    return conf;
  }

//判断是否系统自动发送的Tuple
public static boolean isTick(Tuple tuple) {
    return tuple != null
           && Constants.SYSTEM_COMPONENT_ID  .equals(tuple.getSourceComponent())
           && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}
    • 欲实现“每过1分钟刷新一次5分钟内热门话题”的要求,需要将5分钟划分为5段,每一分钟为一段。在该实现钟将5分钟称为一个窗口(window),每一段称做一个slot。系统中需要保存每一段的话题次数。每过1分钟对前面5分钟求和,然后扔掉第一段的数据。以上过程由SlidingWindowCounter类和SlotBasedCounter类实现,在RollingCountBolt中调用。


  • 疑惑
    • 本例在Bolt调用Rankings类中方法时做了线程同步,是否表明 Storm对每个Task的调用不是线程安全的?

Storm Starter例子RollingTopWords代码学习

标签:storm

原文地址:http://blog.csdn.net/fytain/article/details/45291911

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!