码迷,mamicode.com
首页 > 编程语言 > 详细

SpringBoot中使用kafka

时间:2019-02-15 22:26:25      阅读:525      评论:0      收藏:0      [点我收藏+]

标签:top   mil   serial   int   csdn   receiver   sch   pac   logger   

在能够在windows下使用命令行启动kafka服务器,创建topic、producer、以及consumer后,尝试在JAVA中使用Kafka。

使用IDEA创建SpringBoot项目

这个使用IDEA创建一个新的SpringBoot项目就可以,也可以在https://start.spring.io/下载一个新的项目。

配置依赖

 pom文件如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5     <parent>
 6         <groupId>org.springframework.boot</groupId>
 7         <artifactId>spring-boot-starter-parent</artifactId>
 8         <version>2.1.2.RELEASE</version>
 9         <relativePath/> <!-- lookup parent from repository -->
10     </parent>
11     <groupId>com.example</groupId>
12     <artifactId>demo</artifactId>
13     <version>0.0.1-SNAPSHOT</version>
14     <name>demo</name>
15     <description>Demo project for Spring Boot</description>
16 
17     <properties>
18         <java.version>1.8</java.version>
19     </properties>
20 
21     <dependencies>
22         <dependency>
23             <groupId>org.springframework.boot</groupId>
24             <artifactId>spring-boot-starter-web</artifactId>
25         </dependency>
26 
27         <dependency>
28             <groupId>org.projectlombok</groupId>
29             <artifactId>lombok</artifactId>
30             <optional>true</optional>
31         </dependency>
32 
33         <dependency>
34             <groupId>org.springframework.boot</groupId>
35             <artifactId>spring-boot-starter-test</artifactId>
36             <scope>test</scope>
37         </dependency>
38 
39         <dependency>
40             <groupId>org.springframework.kafka</groupId>
41             <artifactId>spring-kafka</artifactId>
42             <version>2.2.0.RELEASE</version>
43         </dependency>
44 
45         <dependency>
46             <groupId>com.google.code.gson</groupId>
47             <artifactId>gson</artifactId>
48             <version>2.8.2</version>
49         </dependency>
50 
51     </dependencies>
52 
53     <build>
54         <plugins>
55             <plugin>
56                 <groupId>org.springframework.boot</groupId>
57                 <artifactId>spring-boot-maven-plugin</artifactId>
58             </plugin>
59         </plugins>
60     </build>
61 
62 </project>

主要引入了 spring-kafka 、lombok 、 gson 依赖。

相关配置

 1 #============== kafka ===================
 2 # 指定kafka 代理地址,可以多个
 3 spring.kafka.bootstrap-servers=127.0.0.1:9092
 4 
 5 #=============== provider  =======================
 6 
 7 spring.kafka.producer.retries=0
 8 # 每次批量发送消息的数量
 9 spring.kafka.producer.batch-size=16384
10 spring.kafka.producer.buffer-memory=33554432
11 
12 # 指定消息key和消息体的编解码方式
13 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
14 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
15 
16 #=============== consumer  =======================
17 # 指定默认消费者group id
18 spring.kafka.consumer.group-id=test-consumer-group
19 
20 spring.kafka.consumer.auto-offset-reset=earliest
21 spring.kafka.consumer.enable-auto-commit=true
22 spring.kafka.consumer.auto-commit-interval=100
23 
24 # 指定消息key和消息体的编解码方式
25 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
26 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

创建producer类

 KafkaSender通过KafkaTemplate发送消息到topic

 1 @Component
 2 public class KafkaSender {
 3 
 4     private Logger log = LoggerFactory.getLogger(KafkaSender.class);
 5 
 6     @Resource
 7     private KafkaTemplate<String, String> kafkaTemplate;
 8 
 9     private Gson gson = new GsonBuilder().create();
10 
11     //发送消息方法
12     public void send() {
13         Message message = new Message();
14         message.setId(System.currentTimeMillis());
15         message.setMsg(UUID.randomUUID().toString());
16         message.setSendTime(new Date());
17         log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
18         kafkaTemplate.send("test22", gson.toJson(message));
19     }
20 }

创建consumer类

 1 @Component
 2 public class KafkaReceiver {
 3     private Logger log = LoggerFactory.getLogger(KafkaSender.class);
 4     
 5     @KafkaListener(topics = {"test22"})
 6     public void listen(ConsumerRecord<?, ?> record) {
 7         Optional<?> kafkaMessage = Optional.ofNullable(record.value());
 8         if (kafkaMessage.isPresent()) {
 9             Object message = kafkaMessage.get();
10             log.info("----------------- record =" + record);
11             log.info("------------------ message =" + message);
12         }
13     }
14 }

message实体类

1 @Data
2 public class Message {
3     private Long id;
4 
5     private String msg;
6 
7     private Date sendTime;
8 }

我在这里省略了getter、setter方法。

在启动项目之前,请先启动kafka服务,注意端口号是否一致。kafka相关参见前一篇随笔《windows下安装并使用kafka

 

参考:https://blog.csdn.net/tzs_1041218129/article/details/78988439

 

SpringBoot中使用kafka

标签:top   mil   serial   int   csdn   receiver   sch   pac   logger   

原文地址:https://www.cnblogs.com/gidybzc/p/10386215.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!