标签:one cond 配置详解 单元 enc sync 提交 size row
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
public class KafkaProducerClient {
private Producer<String, String> producer;
private KafkaProducerClient() {
}
/**
* 获取kafka消费端实例
*/
public KafkaProducerClient(Properties props) {
// kafka生产者配置详解:https://www.jianshu.com/p/9a31538ea4b3
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer(props);
}
/**
* 消息同步阻塞发送
*/
public String sendSyncMessage(String topic, String message) throws Exception {
RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(topic, message)).get();
return recordMetadata.offset() + "|" + recordMetadata.partition();
}
/**
* 消息异步非阻塞发送
*/
public void sendAsyncMessage(String topic, String message) {
producer.send(new ProducerRecord<String, String>(topic, message));
}
}
public class KafkaProducerClientTest {
private static final String TOPIC="topic-zcx";
@Test
public void sendMessage() throws Exception {
Properties props=new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.97.167.84:19090");
//配置retry时数据的幂等性,避免数据重复提交
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
//配置数据压缩
//props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
KafkaProducerClient kafkaProducerClient=new KafkaProducerClient(props);
String offset=kafkaProducerClient.sendSyncMessage(TOPIC,"hello kafka");
System.out.printf("offset:"+offset);
}
}
public class KafkaConsumerClient {
private KafkaConsumer consumer;
private KafkaConsumerClient() {
}
/**
* 获取kafka消费端实例
*/
public KafkaConsumerClient(Properties properties) {
// kafka消费端配置详解:https://blog.csdn.net/Dongguabai/article/details/86524023?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-1
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer(properties);
}
/**
* 监听消费指定topic数据
*/
public void receive(String topic, IConsumerListener callable) {
consumer.subscribe(Arrays.asList(topic));
while (true) {
callable.doReceive(consumer);
}
}
public interface IConsumerListener {
void doReceive(KafkaConsumer consumer);
}
}
public class KafkaConsumerClientTest {
private static final String TOPIC="topic-zcx";
@Test
public void receive() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.97.167.84:19090,47.97.167.84:19091,47.97.167.84:19092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
KafkaConsumerClient kafkaConsumerClient=new KafkaConsumerClient(properties);
kafkaConsumerClient.receive(TOPIC,consumer -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(30));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %s,value = %s\n", record.partition()+"|"+record.offset(),record.value());
}
});
}
}
标签:one cond 配置详解 单元 enc sync 提交 size row
原文地址:https://www.cnblogs.com/hunna/p/13194321.html