标签:返回 默认 消费 def value 封装 cto listener timeout
1)引入maven依赖
我这里使用的是springboot 2.1.3.RELEASE 版本:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
会引入一对的kafka包:
2)生产者配置:
所有配置参考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.producer.ProducerConfig类,并且在该类中可以查看所有配置项的默认值: CONFIG = (new ConfigDef()).define( 这里的define方法的第三个参数就是默认值
application.properties里可以这样配置:
##################### 重要配置 ###################### spring.kafka.producer.bootstrap.servers=192.168.2.60:9092,192.168.2.62:9092 spring.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer # acks=0 如果设置为0,生产者将不等待任何来自服务器的确认。每个记录返回的偏移量将始终设置为-1。 # acks=1 这意味着leader确认消息即可,但不等待所有副本的完全确认的情况下进行响应。在这种情况下,如果leader在确认记录后立即失败,但是在副本复制它之前,那么记录将丢失。 # acks=all 不仅需要leader确认收到消息,还将等待全部的副本确认。这保证了只要至少有一个副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于ack =-1设置。 # acks=-1 跟集群有关 # 默认 1 spring.kafka.producer.acks=1 # 一个批次发送的大小,默认16KB,超过这个大小就会发送数据 spring.kafka.producer.batch.size=16384 # 一个批次最长等待多久就发送数据,默认0,即马上发送 spring.kafka.producer.linger.ms=5000 # 控制生产者最大发送大小,默认 1MB。这个值必须小于kafka服务器server.properties配置文件里的最大可接收数据大小配置:socket.request.max.bytes=104857600 (默认104857600 = 100MB) spring.kafka.producer.max.request.size=1048576 ##################### 非重要配置 ###################### # 生产者内存缓冲区大小。默认33554432bytes=32MB spring.kafka.producer.buffer.memory=33554432 # 发送重试次数,默认 2147483647,接近无限大 spring.kafka.producer.retries=3 # 请求超时时间,默认30秒 spring.kafka.producer.request.timeout.ms=30000 # 默认值5。并发状态下,kafka生产者允许存在最大的kafka服务端未确认接收的消息个数最大值。 # 注意,如果该值设置为1,并且开启重试机制,则会在允许的重试次数内,阻塞其他消息发送到kafka Server端。并且为1的话,会严重影响生产者的吞吐量。仅适用于对数据有严格顺序要求的场景。 spring.kafka.producer.max.in.flight.requests.per.connection=5 # 最大阻塞时间,超过则抛出异常。默认60秒 spring.kafka.max.block.ms=60000 # 数据压缩类型:none、gzip、snappy、lz4、zstd,默认none什么都不做 spring.kafka.compression.type=none
在springboot框架里,手动封装 @bean对象:
@Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; @Bean public KafkaTemplate<String, String> kafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(props); KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(factory) ; //kafkaTemplate.setProducerListener(); return kafkaTemplate; } }
3)消费者配置:
end.
标签:返回 默认 消费 def value 封装 cto listener timeout
原文地址:https://www.cnblogs.com/zhuwenjoyce/p/13191163.html