码迷,mamicode.com
首页 > 其他好文 > 详细

【Kafka踩坑系列之二】无限循环消费数据

时间:2017-09-30 19:53:45      阅读:1699      评论:0      收藏:0      [点我收藏+]

标签:bst   syn   服务   修改   inf   配置   partition   超时   dead   

一、Bug背景

业务上线后,发现Kafka的消费者一直在重复拉取同一批数据。被消费的topic配置了10个分区,只有每个分区的第一批数据能够出队,并且无限循环。

因测试环境数据量比较小,一直无法复现问题。只能查生产环境的日志排查。

二、解决问题的思路

      初步猜测数据被消费之后,没有正常commit到Kafka服务端,导致Topic分区offset再消费完毕后未进行更新,下次取数据时还是从老offset开始取数据。

    1. 检查客户端配置
             自动提交已开启(enable.auto.commit的默认配置为true), 自动提交时间为5s(offsets.commit.timeout.ms的默认配置为5000)。
             既然默认已开启自动提交,按道理offset应该会被更新吖。然而并没有,Why?

    2. 添加手动提交代码
             每批数据处理完毕后,执行 consumer.commitSync(); 
             然并卵,why? 

    3. 只能添加日志埋点。
             发现消费者程序每批取出了6000多条数据,每批处理时间长达5分钟。
             另外一条关键日志,info级别,在每批数据处理完毕后打印出来:o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483646 dead.
             可以看出,等到每批数据处理完毕时,消费者已经被标记为dead。可以推断出处理超时了!

    4. 查看客户端超时配置
            会话超时时间为5s(session.timeout.ms的默认值为5000),而消费者程序处理一批数据竟然要5分钟!
            先尝试修改会话超时时间为30分钟,结果提示其余几个超时时间也必须同步修改为合理值。
            仔细一想,Kafka作为流式处理系统,目的就是快速响应,把会话时间改为30分钟明显是不合理的。
        
    5. 优化消费者程序的性能
            尝试优化程序性能,每条数据逐条处理改为成批处理。速度明显提升,但是要6000+条数据在30秒内处理完毕,臣妾还是办不到啊!!

    6. 修改每次拉取的字节数
            查看文档,发现消费者每次拉取数据的最大字节数(max.partition.fetch.bytes)为 1048576 Byte,即:1 MB。
            1MB可取出6000+条数据,那我改成100KB,岂不是只取出600+条数据?  
            consumerConfig.put("max.partition.fetch.bytes", 100 * 1024); //100kb
            果然奏效!每次只取出600+条数据,加上原先的性能优化,每批数据都控制在10秒内处理完毕。
            至此,不再出现日志 Marking the coordinator 2147483646 dead. 数据也不再死循环了。

      问题至此解决,可以安心下班了。(*^_^*)

【Kafka踩坑系列之二】无限循环消费数据

标签:bst   syn   服务   修改   inf   配置   partition   超时   dead   

原文地址:http://www.cnblogs.com/sea-horse/p/7615811.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!