码迷,mamicode.com
首页 > 编程语言 > 详细

spring-kafka —— 生产者消费者重要配置

时间:2019-07-15 10:43:48      阅读:277      评论:0      收藏:0      [点我收藏+]

标签:lan   内存   第一条   分区   使用   style   选项   topic   一个   

 

一、生产者配置

    # 以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接
    spring.kafka.producer.bootstrap-servers=TopKafka1:9092,TopKafka2:9092,TopKafka3:9092
     
    # 设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
    spring.kafka.producer.retries=0
     
    # 每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
    # 这有助于提升客户端和服务端之间的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
    spring.kafka.producer.batch-size=16384
     
    # producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
    spring.kafka.producer.buffer-memory=33554432
     
    # key的Serializer类,实现了org.apache.kafka.common.serialization.Serializer接口
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
     
    # 值的Serializer类,实现了org.apache.kafka.common.serialization.Serializer接口
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
     
    # procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
    # acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
    # acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
    # acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
    # 可以设置的值为:all, -1, 0, 1
    spring.kafka.producer.acks=-1
     
    # 当向server发出请求时,这个字符串会发送给server。目的是能够追踪请求源头,以此来允许ip/port许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪
    spring.kafka.producer.client-id=1
     
    # producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好
    spring.kafka.producer.compression-type=snappy

 

二、消费者配置

    # 以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接
    spring.kafka.consumer.bootstrap-servers=TopKafka1:9092,TopKafka2:9092,TopKafka3:9092
     
    # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
    spring.kafka.consumer.group-id=TyyLoveZyy
     
    # max.poll.records条数据需要在session.timeout.ms这个时间内处理完,默认:500
    spring.kafka.consumer.max-poll-records=500
     
    # 消费超时时间,大小不能超过session.timeout.ms,默认:3000
    spring.kafka.consumer.heartbeat-interval=3000
     
    # 如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程挂掉时,由新的consumer使用,默认:true
    spring.kafka.consumer.enable-auto-commit=true
     
    # consumer自动向zookeeper提交offset的频率,默认:5000
    spring.kafka.consumer.auto-commit-interval=5000
     
    # 没有初始化的offset时,可以设置以下三种情况:(默认:latest)
    # earliest
    # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    # latest
    # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    # none
    # topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    spring.kafka.consumer.auto-offset-reset=earliest
     
    # 每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认:1
    spring.kafka.consumer.fetch-min-size=1
     
    # Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。
    spring.kafka.consumer.fetch-max-wait=500
     
    # 消费者进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。。默认:""
    spring.kafka.consumer.client-id=1
     
    # key的反序列化类。实现了org.apache.kafka.common.serialization.Deserializer接口
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
     
    # 值的反序列化类。实现了org.apache.kafka.common.serialization.Deserializer接口
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

三、其它重要配置

    # consumer是通过拉取的方式向服务端拉取数据,当超过指定时间间隔max.poll.interval.ms没有向服务端发送poll()请求,而心跳heartbeat线程仍然在继续,会认为该consumer锁死,就会将该consumer退出group,并进行再分配。默认:300000
    spring.kafka.consumer.properties.max.poll.interval.ms=300000
     
    # 会话的超时限制。如果consumer在这段时间内没有发送心跳信息,则它会被认为挂掉了,并且reblance将会产生,必须在[group.min.session.timeout.ms, group.max.session.timeout.ms]范围内。默认:10000
    spring.kafka.consumer.properties.session.timeout.ms=10000

 

spring-kafka —— 生产者消费者重要配置

标签:lan   内存   第一条   分区   使用   style   选项   topic   一个   

原文地址:https://www.cnblogs.com/caoweixiong/p/11187404.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!