标签:apache kafka源码分析走读-p kafka源码分析走读-producer producer源码分析
Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式。
sync架构图
async架构图
调用流程如下:
代码流程如下:
Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer、DefaultEventHandler。在创建的同时,会默认new一个ProducerPool,即我们每new一个java的Producer类,就会有创建Producer、EventHandler和ProducerPool,ProducerPool为连接不同kafka broker的池,初始连接个数有broker.list参数决定。
调用producer.send方法流程:
当应用程序调用producer.send方法时,其内部其实调的是eventhandler.handle(message)方法,eventHandler会首先序列化该消息,
eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()
调用逻辑解释:当客户端应用程序调用producer发送消息messages时(既可以发送单条消息,也可以发送List多条消息),调用eventhandler.serialize首先序列化所有消息,序列化操作用户可以自定义实现Encoder接口,下一步调用partitionAndCollate根据topics的messages进行分组操作,messages分配给dataPerBroker(多个不同的Broker的Map),根据不同Broker调用不同的SyncProducer.send批量发送消息数据,SyncProducer包装了nio网络操作信息。
Producer的sync与async发送消息处理,大家看以上架构图一目了然。
partitionAndCollate方法详细作用:获取所有partitions的leader所在leaderBrokerId(就是在该partiionid的leader分布在哪个broker上),
创建一个HashMap<int, Map<TopicAndPartition, List<KeyedMessage<K,Message>>>>,把messages按照brokerId分组组装数据,然后为SyncProducer分别发送消息作准备工作。
名称解释:partKey:分区关键字,当客户端应用程序实现Partitioner接口时,传入参数key为分区关键字,根据key和numPartitions,返回分区(partitions)索引。记住partitions分区索引是从0开始的。
def handle(events: Seq[KeyedMessage[K,V]]) { ...... while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = SystemTime.milliseconds } outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) if (outstandingProduceRequests.size > 0) { info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1)) //休眠时间,多长时间刷新一次 Thread.sleep(config.retryBackoffMs) // 生产者定期请求刷新最新topics的broker元数据信息 Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) ..... } } }
def updateInfo(topics: Set[String], correlationId: Int) { var topicsMetadata: Seq[TopicMetadata] = Nil //根据topics列表,meta.broker.list,其他配置参数,correlationId表示请求次数,一个计数器参数而已 //创建一个topicMetadataRequest,并随机的选取传入的broker信息中任何一个去取metadata,直到取到为止 val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId) topicsMetadata = topicMetadataResponse.topicsMetadata // throw partition specific exception topicsMetadata.foreach(tmd =>{ trace("Metadata for topic %s is %s".format(tmd.topic, tmd)) if(tmd.errorCode == ErrorMapping.NoError) { topicPartitionInfo.put(tmd.topic, tmd) } else warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass)) tmd.partitionsMetadata.foreach(pmd =>{ if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) { warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId, ErrorMapping.exceptionFor(pmd.errorCode).getClass)) } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata }) }) producerPool.updateProducer(topicsMetadata) }
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null val shuffledBrokers = Random.shuffle(brokers) //生成随机数 while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) { //对随机选到的broker会创建一个SyncProducer val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i)) info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics)) try { //发送topicMetadataRequest到该broker去取metadata,获得该topic所对应的所有的broker信息 topicMetadataResponse = producer.send(topicMetadataRequest) fetchMetaDataSucceeded = true } catch { ...... } } if(!fetchMetaDataSucceeded) { throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t) } else { debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) } return topicMetadataResponse }
def updateProducer(topicMetadata: Seq[TopicMetadata]) { val newBrokers = new collection.mutable.HashSet[Broker] topicMetadata.foreach(tmd => { tmd.partitionsMetadata.foreach(pmd => { if(pmd.leader.isDefined) newBrokers+=(pmd.leader.get) }) }) lock synchronized { newBrokers.foreach(b => { if(syncProducers.contains(b.id)){ syncProducers(b.id).close() syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) } else syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) }) } }
当我们启动kafka broker后,并且大量producer和consumer时,经常会报如下异常信息。
root@lizhitao:/opt/soft$ Closing socket connection to 192.168.11.166
笔者也是经常很长时间看源码分析,才明白了为什么ProducerConfig配置信息里面并不要求使用者提供完整的kafka集群的broker信息,而是任选一个或几个即可。因为他会通过您选择的broker和topics信息而获取最新的所有的broker信息。
值得了解的是用于发送TopicMetadataRequest的SyncProducer虽然是用ProducerPool.createSyncProducer方法建出来的,但用完并不还回ProducerPool,而是直接Close.
apache kafka源码分析走读-Producer分析,布布扣,bubuko.com
标签:apache kafka源码分析走读-p kafka源码分析走读-producer producer源码分析
原文地址:http://blog.csdn.net/lizhitao/article/details/38438123