码迷,mamicode.com
首页 > 编程语言 > 详细

springboot kafka发送消息支持成功失败通知

时间:2020-06-25 12:04:39      阅读:143      评论:0      收藏:0      [点我收藏+]

标签:tms   cti   list   gen   cep   llb   span   get   public   

springboot集成kafka是比较简单的是事情,但是kafka发送消息的失败回调在日常工作中,如果不容忍消息丢失的话,发送失败需要再次发送或者放到数据库中用任务重推。
以下是演示用的发送类代码

@Slf4j
@Component
public class TestRunner implements ApplicationRunner {
    @Autowired
    KafkaTemplate kafkaTemplate;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        KafkaMsgEntity kafkaMsgEntity = new KafkaMsgEntity();
        kafkaMsgEntity.setActionName("login");
        String tmpStr = "id:%d,msg:login";
        for (int i = 1; i < 500; i++) {
            String tmpStr1 = tmpStr.replace("%d", String.valueOf(i));
            Thread.sleep(500);
            kafkaMsgEntity.setMsgBody(tmpStr1);
            kafkaTemplate.send("test", JSON.toJSONString(kafkaMsgEntity)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    if (throwable instanceof KafkaProducerException) {
                        String value = (String) ((KafkaProducerException) throwable).getProducerRecord().value();
                        log.info("{} get throwable msg:{}", value, throwable.getMessage());
                    } else {
                        log.info("get throwable msg:{}", throwable.getMessage());
                    }
                }

                @Override
                public void onSuccess(SendResult<String, String> o) {
                    log.info("{}, success", o.getProducerRecord().value());
                }
            });
        }
    }
}

在kafka运行过程中kill进程达到异常发送的条件。

 

技术图片

 

springboot kafka发送消息支持成功失败通知

标签:tms   cti   list   gen   cep   llb   span   get   public   

原文地址:https://www.cnblogs.com/gavinjunftd/p/13191166.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!