标签:java类 main err art rgs 简单 iterator override 做什么
前段时间在Kafka QQ群中有人问及此事——关于Java consumer如何动态修改topic订阅的问题。仔细一想才发现这的确是个好问题,因为如果简单地在另一个线程中直接持有consumer实例然后调用subscribe进行修改,consumer端必然会抛出异常ConcurrentModificationException:KafkaConsumer is not safe for multi-threaded access
和KafkaProducer不同的是,KafkaConsumer不是线程安全的,所以我们不能直接在没有同步保护的机制下直接启用另一个线程调用consumer的任何方法(除了wakeup)。因此,实现这个需求有两种途径:
如果是第一种方式,则无论哪个线程访问consumer都必须要配备必要的同步保护机制,代价相当大且极易出错。本文选取第二种方式,我们可以借助Java提供的ConcurrentLinkedQueue来帮助我们实现。具体的步骤为:
完整样例代码如下:
public class ConsumerTest { public static void main(String[] args) { final ConcurrentLinkedQueue<String> subscribedTopics = new ConcurrentLinkedQueue<>(); // 创建另一个测试线程,启动后首先暂停10秒然后变更topic订阅 Runnable runnable = new Runnable() { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException e) { // swallow it. } // 变更为订阅topic: btopic, ctopic subscribedTopics.addAll(Arrays.asList("btopic", "ctopic")); } }; new Thread(runnable).start(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group1"); props.put("auto.offset.reset", "earliest"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 最开始的订阅列表:atopic、btopic consumer.subscribe(Arrays.asList("atopic", "btopic")); while (true) { consumer.poll(2000); //表示每2秒consumer就有机会去轮询一下订阅状态是否需要变更 // 本例不关注消息消费,因此每次只是打印订阅结果! System.out.println(consumer.subscription()); if (!subscribedTopics.isEmpty()) { Iterator<String> iter = subscribedTopics.iterator(); List<String> topics = new ArrayList<>(); while (iter.hasNext()) { topics.add(iter.next()); } subscribedTopics.clear(); consumer.subscribe(topics); // 重新订阅topic } } // 本例只是测试之用,使用了while(true),所以这里没有显式关闭consumer // consumer.close(); } }
输出如下:
[atopic, btopic]
[atopic, btopic]
[atopic, btopic]
[ctopic, btopic]
[ctopic, btopic]
由此可见,本consumer在没有关闭的情况下动态进行了topic的订阅变更。另外需要说一下,动态变更时最好不要直接调用subscribe(topics),而是要显式地定义ConsumerRebalanceListener以避免位移提交的混乱。
Kafka Java consumer动态修改topic订阅
标签:java类 main err art rgs 简单 iterator override 做什么
原文地址:http://www.cnblogs.com/huxi2b/p/7040617.html