码迷,mamicode.com
首页 > 编程语言 > 详细

Kafka consumer在项目中的多线程处理方式

时间:2018-07-02 15:39:01      阅读:2865      评论:0      收藏:0      [点我收藏+]

标签:creat   src   执行   erro   比较   pre   key   订阅   分享   

对于KafkaConsumer而言,它不像KafkaProducer,不是线程安全的,状态是在consumer中维护的,所以实现时要注意多线程的使用,一般有2种使用方法:
 
1:每个Consumer有自己的线程,consumer去拉取数据,并对数据处理,这种方式比较简单,易于实现,容易顺序处理消息
2:消费者处理者方式,创建一个线程池,在consumer拉取数据后,由线程池来中的线程来处理数据,把拉取数据与处理数据解耦,但数据处理有可能破坏partition的消息顺序
 
从Kafka 文档中我们也可以查到有关consumer多线程的处理方式
技术分享图片

 

项目实践:

下图是项目中对consumer的具体应用,虽然也使用了线程池,但其实还是上述第一种方式,线程池在此只是用于启动consumer的运行:
 
技术分享图片

描述:

ConsumerGroup类:这对应于消费组,在上面第1步将创建一个监听对象,其将被传入到ConsumerGroup对象的创建过程中,在cg中将创建一个RunnableConsumer的对象列表(list),也就是上图第3步,列表中的consumer对象的数量将对应所期望的在Group组中consumer的数量。同时创建一个线程池对象executor,此处线程池的数量和consumer的数量一致
 

RunnableConsumer类:这是一个线程类,实现了Runnable接口,里面创建了一个KafkaConsumer对象,线程启动程序中执行对topic的订阅,并拉取消息

public class RunnableConsumer<K,V> implements Runnable {
    private Consumer<K,V> consumer;
    private final IConsumerListener<ConsumerRecords<K,V>> listener;

        
    private RunnableConsumer(final IConsumerListener<ConsumerRecords<K,V>> listener, Properties... props) {

        this.consumer = new KafkaConsumer<>(props, keyDeserClass, valueDeserClass);
        this.listener = listener;
    }
    
    public void run() {
        try {
            consumer.subscribe(topics);
            while (true) {
                ConsumerRecords<K,V> records = null;
                try {

                    //now handle any new record(s)
                    records = consumer.poll(1000);
                    if(records != null && records.count() > 0) {
                        listener.notify(records);
                    }
                } catch(WakeupException wex) {
                    LOGGER.trace("Got a WakeupException. Doing nothing. Exception Details:",wex);
                } 

            }
        } catch (Throwable e) {
              // ignore as we are waking up from the poll, we need to cleanly shutdown our consumer 
            LOGGER.error("getting non-recoverable throwable: ", e);
            throw e;
        } finally {
            //TODO: need to check on consumer closing that any outstanding offset commit is done.
            //otherwise we need to manually do it here.
            processCommit(SyncMode.SYNC);
            LOGGER.info("Trying to close Kafka consumer, ConsumerGroup.isRunning: {}", ConsumerGroup.this.isRunning);
            consumer.close();
        }
    }
}

 

Listener类:这是一个监听类,用于实际处理某一topic消息,先创建监听对象,在创建cg时,注册到RunnerConsumer类中,如果consumer拉取到消息,则将消息通知监听类去具体处理,不同的业务需要定义不同的业务监听类

 

修改为第二种方式

如果想使用第二种方式,将数据的处理从consumer中解耦出来,可以将上面的listener修改为一个线程类,在consumer中有拉取到消息,则从线程池中取出线程处理数据,这种方式的一个最大的问题,就是如何保证消息是按顺序处理的,例如,如果一个partition中先后有2条消息,当consumer poll到消息后,将提交到2个线程处理,这就无法保证顺序处理,需要额外的线程同步处理机制。同时因为不需要在consumer中对数据进行处理,consumer的性能也提高了,而且避免了数据处理超时,consumer Rebalance等潜在问题

records = consumer.poll(1000);
if(records != null && records.count() > 0) {
       executor.submit(new listener(records));
}

 

参考:

http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded

https://howtoprogram.xyz/2016/05/29/create-multi-threaded-apache-kafka-consumer/

 

Kafka consumer在项目中的多线程处理方式

标签:creat   src   执行   erro   比较   pre   key   订阅   分享   

原文地址:https://www.cnblogs.com/benfly/p/9242294.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!