码迷,mamicode.com
首页 > 其他好文 > 详细

Kafka消费者手动提交消息偏移

时间:2018-10-10 22:06:22      阅读:153      评论:0      收藏:0      [点我收藏+]

标签:scribe   一个   close   工作   表示   on()   moc   dem   发送消息   

生产者每次调用poll()方法时,它总是返回由生产者写入Kafka但还没有消费的消息,如果消费者一致处于运行状态,那么分区消息偏移量就没什么用处,但是如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费可能分配到新的分区,而不是之前处理的那个,为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量制定的地方开始工作。消费者会往一个__consumer_offser的主题发送消息,消息里包含每个分区的偏移量。

1.同步提交

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
 * Created by zhangpeiran on 2018/10/9.
 */
public class MyConsumer {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","ip:9092");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id","DemoConsumerGroup");

        //默认值为latest,当消费者读取的分区没有偏移量或偏移量无效时,消费者将从最新的记录开始读
        //当一个消费group第一次订阅主题时,符合这种情况,在Consumer启动之前,Producer生产的数据不会被读取
        //置为earliest,表示从分区起始位置读取消息
        properties.put("auto.offset.reset","earliest");

        //设置手动提交消息偏移
        properties.put("enable.auto.commit","false");

        //一次拉取的最大消息条数
        properties.put("max.poll.records",10);


        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        consumer.subscribe(Collections.singletonList("Demo3"));

        int count = 0;
        try {
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(10);
                for(ConsumerRecord<String ,String> record : records){
                    count ++;
                    if(count == 50)
                        consumer.commitSync();
                    System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
                }
                System.out.println(count);
            }
        } finally {
            consumer.close();
        }
    }
}

说明:在上述例子中,主题Demo3中已经有100条消息,第一次远行Consumer时,在读取到50条消息时,提交一次偏移量,输出的count值为100;第二次不改变消费group,会从51条开始读取,所以输出的count值为50

2. 异步提交,同步提交时,在broker回应指,会一直阻塞、重试,限制应用的吞吐量,因此可以采用异步提交,异步提交失败时不会重试,因为如果提交失败时因为临时的问题导致的,那么后续的提交总户有成功的。

consumer.commitAsync();

3. 同步、异步组合提交,确保消费者在关闭或者再均衡之前提交成功

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
 * Created by zhangpeiran on 2018/10/9.
 */
public class MyConsumer {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","ip:9092");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id","DemoConsumerGroup");

        //默认值为latest,当消费者读取的分区没有偏移量或偏移量无效时,消费者将从最新的记录开始读
        //当一个消费group第一次订阅主题时,符合这种情况,在Consumer启动之前,Producer生产的数据不会被读取
        //置为earliest,表示从分区起始位置读取消息
        properties.put("auto.offset.reset","earliest");

        //设置手动提交消息偏移
        properties.put("enable.auto.commit","false");

        //一次拉取的最大消息条数
        properties.put("max.poll.records",10);


        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        consumer.subscribe(Collections.singletonList("Demo3"));

        int count = 0;
        try {
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(10);
                for(ConsumerRecord<String ,String> record : records){
                    count ++;
                    //if(count == 50)
                        //consumer.commitAsync();
                        //consumer.commitSync();
                    System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
                }
                consumer.commitAsync();
                //System.out.println(count);
            }
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
            //consumer.close();
        }
    }
}

 

Kafka消费者手动提交消息偏移

标签:scribe   一个   close   工作   表示   on()   moc   dem   发送消息   

原文地址:https://www.cnblogs.com/darange/p/9768791.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!