标签:版权 ble and body edit 排序 href 没有 try
版权声明:本文为博主原创文章,未经博主允许不得转载。
转载自同事(董重)写得一篇wiki博客
Kafka Producer处理逻辑
Kafka Producer产生数据发送给Kafka Server,具体的分发逻辑及负载均衡逻辑,全部由producer维护。
默认Partition逻辑
1、没有key时的分发逻辑
每隔 topic.metadata.refresh.interval.ms 的时间,随机选择一个partition。这个时间窗口内的所有记录发送到这个partition。
发送数据出错后也会重新选择一个partition
2、根据key分发
对key求hash,然后对partition数量求模
Utils.abs(key.hashCode) % numPartitions |
如何获取Partition的leader信息(元数据)
决定好发送到哪个Partition后,需要明确该Partition的leader是哪台broker才能决定发送到哪里。
具体实现位置
kafka.client.ClientUtils#fetchTopicMetadata |
实现方案
1、从broker获取Partition的元数据。由于Kafka所有broker存有所有的元数据,所以任何一个broker都可以返回所有的元数据
2、broker选取策略:将broker列表随机排序,从首个broker开始访问,如果出错,访问下一个
3、出错处理:出错后向下一个broker请求元数据
注意
错误处理
producer的send函数默认没有返回值。出错处理有EventHandler实现。
DefaultEventHandler的错误处理如下:
出错重试次数由配置 message.send.max.retries 决定
所有重试全部失败时,DefaultEventHandler会抛出异常。代码如下
if(outstandingProduceRequests.size >0) { producerStats.failedSendRate.mark() val correlationIdEnd = correlationId.get() error("Failed to send requests for topics %s with correlation ids in [%d,%d]" .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","), correlationIdStart, correlationIdEnd-1)) thrownewFailedToSendMessageException("Failed to send messages after "+ config.messageSendMaxRetries +" tries.", null) } |
请注明转载自:http://write.blog.csdn.NET/postedit/26687109
标签:版权 ble and body edit 排序 href 没有 try
原文地址:http://www.cnblogs.com/the-tops/p/6473168.html