码迷,mamicode.com
首页 > 其他好文 > 详细

kafka 生产者

时间:2019-10-01 13:45:37      阅读:73      评论:0      收藏:0      [点我收藏+]

标签:固定   发送消息   cap   dos   batch   kafka   select   pos   cer   

KafkaProducer 创建一个 KafkaThread 来运行 Sender.run 方法。

1. 发送消息的入口在 KafkaProducer#doSend 中,但其实是把消息加入到 batches 中:

kafka 生产者是按 batch 发送消息,RecordAccumulator 类有个变量 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches,
KafkaProducer#doSend 方法会把当前的这条消息放入到 ProducerBatch 中。然后调用 Sender#wakeup 方法,尝试唤醒阻塞的 io 线程。


2. 从 batches 取出数据发送,入口在 Sender.run,主要的逻辑抽象为 2 步:

2.1 NetworkClient.send
这里的 send 不是真正的网络发送,先把 ProducerReuquest 序列化成 Send 对象,然后加入到 inFlightRequests 的头部,调用 selector 的 send,实则是 KafkaChannel.setSend()

Send send = request.toSend(nodeId, header);

this.inFlightRequests.add(inFlightRequest);

selector.send(inFlightRequest.send);

 

2.2 NetworkClient.poll
真正的网络发送

Selector#pollSelectionKeys 处理网络读写事件,发送消息即写事件,同时把响应存放在 Selector#completedReceives 中
producer 发送消息,如果 acks = -1 和 1,即 producer 请求需要响应,
在 NetworkClient#handleCompletedSends 中,把不需要响应的请求,从 inFlightRequests 中删除
在 NetworkClient#handleCompletedReceives 处理响应
producer 设置了 ack 的值是固定的,producer 要么都需要响应,要么都不需要响应。
新的请求加在头部,收到的响应对应最旧的请求,即尾部的请求。

 

3. 主要的类
KafkaProducer: 直接暴露给用户的 api 类;Sender: 主要管理 ProducerBatch
NetworkClient: ProducerBatch 是对象,通过网络发送需要序列化,该类管理连接,更接近 io 层
Selector 对 java nio Selector 的封装
KafkaChannel

 

4. ByteBuffer

// ByteBuffer 的使用
// ByteBuffer 初始是写模式
public static void main(String[] args) throws UnsupportedEncodingException {
    // capacity = 512, limit = 512, position = 0
    ByteBuffer buffer = ByteBuffer.allocate(512);
    buffer.put((byte)‘h‘);
    buffer.put((byte)‘e‘);
    buffer.put((byte)‘l‘);
    buffer.put((byte)‘l‘);
    buffer.put((byte)‘o‘);

    // limit = position, position = 0
    buffer.flip();

    // 获取字节数
    int len = buffer.remaining();
    byte[] dst = new byte[len];
    buffer.get(dst);
    System.out.println(new String(dst));
    // 结论:ByteBuffer 只是对 byte[] 的封装
}

//SocketChannel
//输出
//SocketChannel#write(java.nio.ByteBuffer)
//读取输入
//SocketChannel#read(java.nio.ByteBuffer)

 

kafka 生产者

标签:固定   发送消息   cap   dos   batch   kafka   select   pos   cer   

原文地址:https://www.cnblogs.com/allenwas3/p/11615210.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!