标签:art close 常见 oca sum list 阻塞 幂等性 重试
KafkaProducer 发送消息主要有以下 3 种方式:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
// 发送并忘记(fire-and-forget)
producer.send(record);
// 同步发送
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
// 异步发送
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
}
});
producer.close();
具体的发送流程可以参考 KafkaProducer发送流程简析。
KafkaProducer 是线程安全的,多个线程可以共享同一个 KafkaProducer 对象。
该参数可以是任意的字符串,broker 会用它来识别消息的来源,会在日志和监控指标里展示。
该属性指定 broker 的地址列表。
清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。
不过建议至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
这两个属性必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer
接口的类。
生产者会使用这个类把键值对象序列化成字节数组。
设置 socket 读写数据时用到的 TCP 缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。
当生产者或消费者与 broker 处于不同的机房时,可以适当增大这些值。
设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。
此时KafkaProducer.send()
会阻塞等待内存释放,等待时间超过 max.block.ms 后会抛出超时异常。
该参数指定了消息被发送给 broker 之前,使用哪一种压缩算法(snappy
,gzip
或lz4
)进行压缩。
使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
该参数指定了一个批次可以使用的内存字节数(而不是消息个数)。
消息批次ProducerBatch
包含了一组将要发送至同个分区的消息,当批次被填满,批次里的所有消息会被立即发送出去。
不过生产者并不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也可能被发送。
所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。
但如果设置得太小,生产者会频繁地发送消息,会增加一些额外的网络开销。
该参数指定了生产者在发送批次之前等待的时间。
生产者会在批次填满或等待时间达到 linger.ms 时把批次发送出去。
设置linger.ms>0
会增加延迟,但也会提升吞吐量(一次性发送更多的消息,每个消息的开销就变小了)。
参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
这个参数决定令消息丢失的可能性:
acks=0
生产者发出消息后不等待来自服务器的响应
如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。
不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
acks=1
只要集群的 leader 节点收到消息,生产者就会收到一个来自服务器的成功响应
如果消息无法到达 leader 节点(比如:leader节点崩溃,新的 leader 还没有被选举出来),生产者会收到一个错误响应。
为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新 leader,消息还是会丢失。
这个时候的吞吐量取决于使用的是同步发送还是异步发送:
Future.get()
方法),显然会增加延迟(在网络上传输一个来回的延迟)acks=all
只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
这种模式是最安全的,就算有服务器发生崩溃,数据也不会丢失。
不过,它的延迟比 acks=1
时更高,因为我们要等待不只一个服务器节点接收消息。
该参数决定了生产者可以重发消息的次数(每次重试之间等待 retry.backoff.ms
)。
服务器返回临时性的错误(比如:分区找不到 leader)时,生产者会自动重试,没必要在代码逻辑里处理可重试的错误。
作为开发者,只需要处理那些不可重试的错误(比如:消息字节数超过单个发送批次上限)或重试次数超出上限的情况即可。
该参数指定生产者,最多可以发送未响应在途消息批次数量。
在途消息批次越多,会占用更多的内存,不过也会提升吞吐量。
当retries > 0
且max.in.flight.requests.per.connection > 1
时,可能出现消息乱序。
如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。
如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。
一般不建议设置retries=0
,而是令max.in.flight.requests.per.connection = 1
来保证消息顺序。
在生产者尝试发送第一批消息时,就不会有其他的消息发送给 broker,即使发生重试消息也不会乱序。
不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。
当 broker 失效时生产者可能会自动重试,导致一条消息被重复写入多次。
为了避免这种情况,Kafka 在生产者端提供来幂等保证:同一条消息被生产者发送多次,但在 broker端这条消息只会被写入日志一次。
在发送端设置 enable.idempotence = true
可以开启幂等性,此时配置同时满足以下条件:
max.in.flight.requests.per.connection ≤ 5
retries > 0
acks = all
其工作机制如下:
producer id
该过程对用户来说是完全透明的)sequence number
用于消息去重(每个分区都有独立的序列号)(PID, SN)
信息一同持久化到对应的分区日志中(保证 leader 切换后去重仍然生效)若重试导致 broker 接收到小于或等于已知最大序列号的消息,broker 会拒绝写入这些消息,从而保证每条消息也只会被保存在日志中一次。
由于每个 producer 实例都会被分配不同的 PID,该机制只能保证单个 producer 实例的幂等性,无法实现协同多个 producer 实现幂等。
Kafka 事务可以实现 producer 对多个主题和分区的原子写入,并且保证 consumer 不会读取到未提交的数据。
Kafka 要求应用程序必须提供一个全局唯一的 TIDtransactional id
:
如果某个 producer 实例失效,该机制能够保证下一个拥有相同 TID 的实例首先完成之前未完成的事务。
此外,broker 还会为自动每个 producer 分配一个epoch
用于隔离fencing out
失效但仍存活的 producer:
如果存在,则认为当前 producer 是一个僵尸实例zombie instance
并拒绝为其提供服务,防止其破坏事务的完整性。
下面是两个常见的应用场景:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("enable.idempotence", "true"); // 开启幂等
properties.setProperty("transactional.id", "my-transaction-id"); // 设置事务ID
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record1 = new ProducerRecord<>("topic1", "key1", "value1");
ProducerRecord<String, String> record2 = new ProducerRecord<>("topic2", "key2", "value2");
ProducerRecord<String, String> record3 = new ProducerRecord<>("topic3", "key3", "value3");
producer.initTransactions(); // 初始化事务(只需执行一次)
try {
producer.beginTransaction(); // 开始事务
// 向多个不同的 topic 写入消息
producer.send(record1);
producer.send(record2);
producer.send(record3);
producer.commitTransaction(); // 提交事务
} catch (ProducerFencedException e) {
producer.close(); // 事务ID 已被占用
} catch (KafkaException e) {
producer.abortTransaction();
}
final String groupID = "my-group-id";
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.setProperty("enable.idempotence", "true"); // 开启幂等
producerProps.setProperty("transactional.id", "my-transaction-id"); // 设置事务ID
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.setProperty("isolation.level","read_committed"); // 设置隔离级别
consumerProps.setProperty("group.id", groupID); // 设置消费者组群ID
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
producer.initTransactions();
consumer.subscribe(Collections.singletonList("ping"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); // 读取消息
try {
producer.beginTransaction(); // 开启事务
// 处理消息(可以是任意业务场景)
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for(ConsumerRecord<String, String> record : records){
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())); // 记录消费偏移量
producer.send(new ProducerRecord<>("pong", record.value())); // 发送消息
}
producer.sendOffsetsToTransaction(offsets, groupID); // 提交消费偏移量
producer.commitTransaction(); // 事务提交
} catch (ProducerFencedException e) {
producer.close(); // 事务ID 已被占用
} catch (Exception e){
producer.abortTransaction(); // 回滚事务
}
}
标签:art close 常见 oca sum list 阻塞 幂等性 重试
原文地址:https://www.cnblogs.com/buttercup/p/14201153.html