码迷,mamicode.com
首页 > 编程语言 > 详细

SpringBoot整合kafka的简单应用

时间:2021-06-02 14:22:43      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:obj   备份   批量   mem   cto   mvn   enc   resource   temp   

 

 

引入依赖

        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.7.1</version>
        </dependency>

 

配置文件yml

修改kafka连接地址 其他按需修改

#kafka的topic名称
kafkaTopic: topic-test

spring:
  kafka:
    bootstrap-servers: 192.168.1.12:9092 #kafka连接地址
    producer:
      acks: 1  #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384  #批量大小
      properties:
        linger.ms: 0   # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
      buffer-memory: 33554432  #生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: defaultConsumerGroup  # 默认的消费组ID
      enable-auto-commit: true  # 是否自动提交offset
      ## 当kafka中没有初始offset或offset超出范围时将自动重置offset
      ## earliest:重置为分区中最小的offset;
      ## latest:重置为分区中最新的offset(消费分区中新产生的数据);
      ## none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-commit-interval:
        ms: 1000
      auto-offset-reset: latest
      properties:
        session.timeout.ms: 120000    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        request.timeout.ms: 180000   # 消费请求超时时间
  flyway:
    connect-retries: 0  #重试次数

 

 

消费者:

KafkaConsumer.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @author yvioo
 */
@Component
public class KafkaConsumer {

    /**
     * 消费监听
     * @param record
     */
    @KafkaListener(topics = "${kafkaTopic}")
    public void onMessage(ConsumerRecord<?, ?> record){
        System.out.println("收到消息:topic名称:"+record.topic()+",分区:"+record.partition()+",值:"+record.value());
    }
}

 

生产者

KafkaProducer.java

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author zhipeih
 */
@RestController
public class KafkaProducer {

    @Value("${kafkaTopic}")
    private String kafkaTopic;

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     *  发送消息
     * @param message
     */
    @GetMapping("/send")
    public void sendMessage1(String message) {
        kafkaTemplate.send(kafkaTopic, message);
    }


    /**
     * 有发送结果回调
     * @param message
     */
    @GetMapping("/send/callback")
    public void sendMessage3(String message) {
        kafkaTemplate.send(kafkaTopic, message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("fail:"+ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("success:topic名称:" + result.getRecordMetadata().topic() + ",分区:"
                        + result.getRecordMetadata().partition() + ",消息在分区中的标识:" + result.getRecordMetadata().offset());
            }
        });
    }
}

 

SpringBoot整合kafka的简单应用

标签:obj   备份   批量   mem   cto   mvn   enc   resource   temp   

原文地址:https://www.cnblogs.com/pxblog/p/14821853.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!