码迷,mamicode.com
首页 > 其他好文 > 详细

kafka2.5.0生产者与消费者配置详解

时间:2020-06-25 12:20:32      阅读:155      评论:0      收藏:0      [点我收藏+]

标签:返回   默认   消费   def   value   封装   cto   listener   timeout   

1)引入maven依赖

我这里使用的是springboot 2.1.3.RELEASE 版本:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

会引入一对的kafka包:

技术图片

 

 2)生产者配置:

所有配置参考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.producer.ProducerConfig类,并且在该类中可以查看所有配置项的默认值: CONFIG = (new ConfigDef()).define(  这里的define方法的第三个参数就是默认值

application.properties里可以这样配置:

#####################  重要配置  ######################
spring.kafka.producer.bootstrap.servers=192.168.2.60:9092,192.168.2.62:9092
spring.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
# acks=0  如果设置为0,生产者将不等待任何来自服务器的确认。每个记录返回的偏移量将始终设置为-1。
# acks=1  这意味着leader确认消息即可,但不等待所有副本的完全确认的情况下进行响应。在这种情况下,如果leader在确认记录后立即失败,但是在副本复制它之前,那么记录将丢失。
# acks=all  不仅需要leader确认收到消息,还将等待全部的副本确认。这保证了只要至少有一个副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于ack =-1设置。
# acks=-1   跟集群有关
# 默认 1
spring.kafka.producer.acks=1
# 一个批次发送的大小,默认16KB,超过这个大小就会发送数据
spring.kafka.producer.batch.size=16384
# 一个批次最长等待多久就发送数据,默认0,即马上发送
spring.kafka.producer.linger.ms=5000
# 控制生产者最大发送大小,默认 1MB。这个值必须小于kafka服务器server.properties配置文件里的最大可接收数据大小配置:socket.request.max.bytes=104857600 (默认104857600 = 100MB)
spring.kafka.producer.max.request.size=1048576
 
#####################  非重要配置  ######################
# 生产者内存缓冲区大小。默认33554432bytes=32MB
spring.kafka.producer.buffer.memory=33554432
# 发送重试次数,默认 2147483647,接近无限大
spring.kafka.producer.retries=3
# 请求超时时间,默认30秒
spring.kafka.producer.request.timeout.ms=30000
# 默认值5。并发状态下,kafka生产者允许存在最大的kafka服务端未确认接收的消息个数最大值。
# 注意,如果该值设置为1,并且开启重试机制,则会在允许的重试次数内,阻塞其他消息发送到kafka Server端。并且为1的话,会严重影响生产者的吞吐量。仅适用于对数据有严格顺序要求的场景。
spring.kafka.producer.max.in.flight.requests.per.connection=5
# 最大阻塞时间,超过则抛出异常。默认60秒
spring.kafka.max.block.ms=60000
# 数据压缩类型:none、gzip、snappy、lz4、zstd,默认none什么都不做
spring.kafka.compression.type=none

在springboot框架里,手动封装 @bean对象:

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(props);
        KafkaTemplate kafkaTemplate
                = new KafkaTemplate<String, String>(factory) ;
        //kafkaTemplate.setProducerListener();
        return kafkaTemplate;
    }
}

  

 

 

3)消费者配置:

 

 

 

end.

kafka2.5.0生产者与消费者配置详解

标签:返回   默认   消费   def   value   封装   cto   listener   timeout   

原文地址:https://www.cnblogs.com/zhuwenjoyce/p/13191163.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!