码迷,mamicode.com
首页 > 其他好文 > 详细

kafka producer batch 发送消息

时间:2019-01-06 13:31:34      阅读:270      评论:0      收藏:0      [点我收藏+]

标签:api   ==   gis   cer   sre   ack   数据   ref   client   

1. 使用 KafkaProducer 发送消息,是按 batch 发送的,producer 首先把消息放入 ProducerBatch 中:
org.apache.kafka.clients.producer.internals.ProducerBatch

2. KafkaProduer 类中有一个 Thread 属性,负责 IO,发送和接收数据:
            this.sender = new Sender(logContext,
                    client,
                    this.metadata,
                    this.accumulator,
                    maxInflightRequests == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    acks,
                    retries,
                    metricsRegistry.senderMetrics,
                    Time.SYSTEM,
                    this.requestTimeoutMs,
                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                    this.transactionManager,
                    apiVersions);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

Sender 类实现了 Runnable 接口,封装了具体的逻辑,发送消息和接收响应都在这个类中。

// 发送消息
long pollTimeout = sendProducerData(now);
// 接收响应
client.poll(pollTimeout, now);

 

3. 执行回调

org.apache.kafka.clients.producer.internals.Sender#completeBatch()

 

kafka producer batch 发送消息

标签:api   ==   gis   cer   sre   ack   数据   ref   client   

原文地址:https://www.cnblogs.com/allenwas3/p/9768149.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!