码迷,mamicode.com
首页 > 编程语言 > 详细

storm问题记录(1) python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理

时间:2016-07-05 17:05:01      阅读:1853      评论:0      收藏:0      [点我收藏+]

标签:

一、问题背景

    Python 写的脚本,不断从txt文件中读取一行数据封装成消息,作为producer发给kafka, storm的spout从kafka中读取这些消息后做一些处理发送给bolt,bolt最后将数据按既定的格式写入到HBASE

二、问题描述

    一共14000条左右的数据,加调试信息观察到spout把消息都读到处理并发射了,但是bolt中只处理了一部分(2000多条,还有一万条显然没有处理到),写入HBASE的也只有2000多条,即Bolt读到的那些

 

出问题时的最后的log:

技术分享
OLT + acSN=210235A1AMB159000008clientMacStr = d3f4-6b29-c82eonLineTimeStr2015-07-03 17:49:18
BOLT count = 38215
472728 [Thread-14-HBASE_BOLT] INFO  b.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: KAFKA_SPOUT:2, stream: default, id: {}, [{"acSN": "210235A1AMB159000008", "onLineTime": "2015-07-03 17:49:18", "clientMAC": "d3f4-6b29-c82e"}]
472728 [Thread-14-HBASE_BOLT] INFO  b.s.d.executor - Execute done TUPLE source: KAFKA_SPOUT:2, stream: default, id: {}, [{"acSN": "210235A1AMB159000008", "onLineTime": "2015-07-03 17:49:18", "clientMAC": "d3f4-6b29-c82e"}] TASK: 1 DELTA: 
472728 [Thread-14-HBASE_BOLT] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
472729 [Thread-14-HBASE_BOLT] INFO  b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@781650e2> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=37640}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16394, read_pos=16387, capacity=16384, population=7}]> #<DataPoint [__process-latency = {KAFKA_SPOUT:default=7.727948990435706}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {KAFKA_SPOUT:default=7.951167728237792}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=37680}]>]]
472734 [Thread-14-HBASE_BOLT] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
472735 [Thread-14-HBASE_BOLT] INFO  b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5a51d4b2> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16394, read_pos=16387, capacity=16384, population=7}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=20}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]]
472735 [Thread-14-HBASE_BOLT] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
472735 [Thread-14-HBASE_BOLT] INFO  b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@32632071> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16394, read_pos=16387, capacity=16384, population=7}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=0}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]]
472735 [Thread-14-HBASE_BOLT] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
472736 [Thread-14-HBASE_BOLT] INFO  b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@9e6f48f> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16394, read_pos=16392, capacity=16384, population=2}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=0}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]]
472736 [Thread-14-HBASE_BOLT] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
472736 [Thread-14-HBASE_BOLT] INFO  b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@19c1dd5d> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16394, read_pos=16392, capacity=16384, population=2}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=0}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]]
500126 [Thread-12-__acker] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
500126 [Thread-16-__system] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
500127 [Thread-12-__acker] INFO  b.s.d.task - Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@16c17bfd> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=8, read_pos=7, capacity=16384, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
500128 [Thread-16-__system] INFO  b.s.d.task - Emitting: __system __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@16c17bfd> [#<DataPoint [__ack-count = {}]> #<DataPoint [__send-iconnection = {}]> #<DataPoint [GC/PSScavenge = {count=0, timeMs=0}]> #<DataPoint [memory/heap = {unusedBytes=48303576, usedBytes=632222248, maxBytes=1908932608, initBytes=134209408, virtualFreeBytes=1276710360, committedBytes=680525824}]> #<DataPoint [__receive = {write_pos=8, read_pos=7, capacity=16384, population=1}]> #<DataPoint [GC/PSMarkSweep = {count=0, timeMs=0}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 0]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=405520, usedBytes=63229936, maxBytes=136314880, initBytes=24576000, virtualFreeBytes=73084944, committedBytes=63635456}]> #<DataPoint [uptimeSecs = 500.127]> #<DataPoint [startTimeSecs = 1.467698245917E9]> #<DataPoint [__transfer = {write_pos=16387, read_pos=16387, capacity=32, population=0}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
506118 [Thread-14-HBASE_BOLT] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
506119 [Thread-14-HBASE_BOLT] INFO  b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@21a8aeba> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16395, read_pos=16394, capacity=16384, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=0}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]]
514859 [Thread-10-KAFKA_SPOUT] INFO  b.s.d.task - Emitting: KAFKA_SPOUT default [{"acSN": "11111111", "onLineTime": "2018-12-01 11:06:25", "clientMAC": "0000-0000-0001"}]
receive:{"acSN": "11111111", "onLineTime": "2018-12-01 11:06:25", "clientMAC": "0000-0000-0001"}
spout emit:43173
514859 [Thread-10-KAFKA_SPOUT] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: KAFKA_SPOUT:2, stream: default, id: {}, [{"acSN": "11111111", "onLineTime": "2018-12-01 11:06:25", "clientMAC": "0000-0000-0001"}]
560126 [Thread-12-__acker] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
560126 [Thread-16-__system] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
560128 [Thread-12-__acker] INFO  b.s.d.task - Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@73e0f92d> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=9, read_pos=8, capacity=16384, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
560128 [Thread-16-__system] INFO  b.s.d.task - Emitting: __system __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@370734ca> [#<DataPoint [__ack-count = {}]> #<DataPoint [__send-iconnection = {}]> #<DataPoint [GC/PSScavenge = {count=1, timeMs=13}]> #<DataPoint [memory/heap = {unusedBytes=522122784, usedBytes=146868704, maxBytes=1908932608, initBytes=134209408, virtualFreeBytes=1762063904, committedBytes=668991488}]> #<DataPoint [__receive = {write_pos=9, read_pos=8, capacity=16384, population=1}]> #<DataPoint [GC/PSMarkSweep = {count=0, timeMs=0}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 0]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=331208, usedBytes=63304248, maxBytes=136314880, initBytes=24576000, virtualFreeBytes=73010632, committedBytes=63635456}]> #<DataPoint [uptimeSecs = 560.128]> #<DataPoint [startTimeSecs = 1.467698245917E9]> #<DataPoint [__transfer = {write_pos=16387, read_pos=16387, capacity=32, population=0}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
566119 [Thread-14-HBASE_BOLT] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
566120 [Thread-14-HBASE_BOLT] INFO  b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@4f2d2517> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16396, read_pos=16395, capacity=16384, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=0}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]]
View Code

 

三、问题原因,请参考以下这篇文章,介绍了storm内部的通信原理

http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

经过排查,是TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE这个参数导致了问题

四、解决办法

参考上文

总体来说是spout发的太快了,把自已的出buffer写满了,满了后的消息都丢弃,
经测试,通过把TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE调大(如下红色部分),或放慢spout的发送速度都是可以规避该问题

conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

storm问题记录(1) python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理

标签:

原文地址:http://www.cnblogs.com/zhengchunhao/p/5644164.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!