标签:one boot 完成 row 恢复 做了 resume while tom
kafka启动后开始消费的话, 如果此时消费流程中有其他依赖没有启动完成的话,比如 Redis , Mysql ,RPC等就会有空指针或其他问题。这时候就要延后kafka的消费时机。
kafka启动时,不在启动时开启消费线程。
public class KafkaConsumer {
private KafkaConsumer<String, byte[]> consumer;
private ThreadPoolExecutor threadPoolExecutor;
private ThreadPoolExecutor threadConsumerExecutor;
public void init(String kafkaTopic, String groupIdConfig) {
//init pool
//init consumer
。。。。。
}
//开始消费单独拿出方法
public void startConsumer(final Handler handler) {
threadConsumerExecutor.submit(new Runnable() {
@Override
public void run() {
while(true){
ConsumerRecords<String, byte[]> data = consumer.poll(200);
.....
handler.onMessage(data)
sleep(100)
}
}
});
}
然后在SpringBoot启动后,再启用kafka的消费线程
@Component
public class ApplicationRunnerImpl implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
//校验Redis
//校验Mysql
//校验RPC
kafkaConsumer.start()
}
}
此时启用kafkaConsumer,就不会有问题了。
可以设置标志位,来灵活做到启/停/暂停/恢复
public class KafkaConsumer {
//标识位,是否开始消费线程
private AtomicBoolean polling = new AtomicBoolean(true);
private KafkaConsumer<String, byte[]> consumer;
private ThreadPoolExecutor threadPoolExecutor;
private ThreadPoolExecutor threadConsumerExecutor;
private Handler handler;
public void init(String kafkaTopic, String groupIdConfig) {
//init pool
//init consumer
。。。。。
startConsumer()
}
//开始消费单独拿出方法
public void startConsumer() {
threadConsumerExecutor.submit(new Runnable() {
@Override
public void run() {
while(this.polling.get()){
ConsumerRecords<String, byte[]> data = consumer.poll(200);
.....
handler.onMessage(data)
sleep(100)
}
}
});
}
//暂停消费线程
public void pause(){
this.polling.set(false)
}
//恢复消费线程
public void resume(){
this.polling.set(true)
}
//关闭消费
public void shutDown(){
pause();
unsubscribe();
threadConsumerExecutor.shutdown();
...
}
代码做了些简略处理。明白思路就好做,然后就可以针对业务场景细化一下了。
标签:one boot 完成 row 恢复 做了 resume while tom
原文地址:https://www.cnblogs.com/ElEGenT/p/12516578.html