码迷,mamicode.com
首页 > 编程语言 > 详细

SpringBoot Kafka 整合实例教程

时间:2018-10-31 22:38:21      阅读:819      评论:0      收藏:0      [点我收藏+]

标签:produce   sum   server   str   public   实例   日期   nta   project   

 

1、使用IDEA新建工程引导方式,创建消息生产工程 springboot-kafka-producer。

技术分享图片

工程POM文件代码如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5 
 6     <groupId>com.miniooc</groupId>
 7     <artifactId>springboot-kafka-producer</artifactId>
 8     <version>1.0.0-SNAPSHOT</version>
 9     <packaging>jar</packaging>
10 
11     <name>springboot-kafka-producer</name>
12     <description>Demo project for Spring Boot</description>
13 
14     <parent>
15         <groupId>org.springframework.boot</groupId>
16         <artifactId>spring-boot-starter-parent</artifactId>
17         <version>2.0.3.RELEASE</version>
18         <relativePath/>
19     </parent>
20 
21     <properties>
22         <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
23     </properties>
24 
25     <dependencies>
26         <dependency>
27             <groupId>org.springframework.boot</groupId>
28             <artifactId>spring-boot-starter-web</artifactId>
29         </dependency>
30         <dependency>
31             <groupId>org.springframework.boot</groupId>
32             <artifactId>spring-boot-starter-actuator</artifactId>
33         </dependency>
34         <dependency>
35             <groupId>org.springframework.kafka</groupId>
36             <artifactId>spring-kafka</artifactId>
37         </dependency>
38         <dependency>
39             <groupId>org.springframework.boot</groupId>
40             <artifactId>spring-boot-starter-test</artifactId>
41             <scope>test</scope>
42         </dependency>
43 
44         <!-- 添加 gson 依赖 -->
45         <dependency>
46             <groupId>com.google.code.gson</groupId>
47             <artifactId>gson</artifactId>
48             <version>2.8.5</version>
49         </dependency>
50         <!-- 添加 lombok 依赖 -->
51         <dependency>
52             <groupId>org.projectlombok</groupId>
53             <artifactId>lombok</artifactId>
54             <version>1.16.22</version>
55             <scope>provided</scope>
56         </dependency>
57     </dependencies>
58 
59     <dependencyManagement>
60         <dependencies>
61             <dependency>
62                 <groupId>org.springframework.cloud</groupId>
63                 <artifactId>spring-cloud-dependencies</artifactId>
64                 <version>${spring-cloud.version}</version>
65                 <type>pom</type>
66                 <scope>import</scope>
67             </dependency>
68         </dependencies>
69     </dependencyManagement>
70 
71     <build>
72         <plugins>
73             <plugin>
74                 <groupId>org.springframework.boot</groupId>
75                 <artifactId>spring-boot-maven-plugin</artifactId>
76             </plugin>
77         </plugins>
78     </build>
79 
80 
81 </project>

注释部分为手动添加的 gson、lombok 依赖。

2、创建消息实体类

 1 package com.miniooc.kafka.message;
 2 
 3 import lombok.Data;
 4 
 5 import java.io.Serializable;
 6 import java.util.Date;
 7 import java.util.List;
 8 
 9 @Data
10 public class OrderBasic implements Serializable {
11 
12     /**
13      * 订单ID
14      */
15     private String orderId;
16     /**
17      * 订单编号
18      */
19     private String orderNumber;
20     /**
21      * 订单日期
22      */
23     private Date date;
24     /**
25      * 订单信息
26      */
27     private List<String> desc;
28 
29 }

3、创建消息生产类

 1 /**
 2  *
 3  */
 4 package com.miniooc.kafka.producer;
 5 
 6 import com.google.gson.GsonBuilder;
 7 import com.miniooc.kafka.message.OrderBasic;
 8 import lombok.extern.java.Log;
 9 import org.springframework.beans.factory.annotation.Value;
10 import org.springframework.kafka.core.KafkaTemplate;
11 import org.springframework.stereotype.Component;
12 
13 import javax.annotation.Resource;
14 
15 /**
16  * Kafka消息生产类
17  */
18 @Log
19 @Component
20 public class KafkaProducer {
21 
22     @Resource
23     private KafkaTemplate<String, String> kafkaTemplate;
24 
25     @Value("${kafka.topic.order}")
26     private String topicOrder;
27 
28     /**
29      * 发送订单消息
30      *
31      * @param orderBasic 订单信息
32      */
33     public void sendOrderMessage(OrderBasic orderBasic) {
34         GsonBuilder builder = new GsonBuilder();
35         builder.setPrettyPrinting();
36         builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
37         String message = builder.create().toJson(orderBasic);
38         kafkaTemplate.send(topicOrder, message);
39         log.info("\n" + message);
40     }
41 }

 4、编辑资源配置文件 application.properties

1 server.port=9526
2 spring.application.name=kafka-producer
3 kafka.bootstrap.servers=localhost:9092
4 kafka.topic.order=topic-order
5 kafka.group.id=group-order

5、启动 zookeeper

D:\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

技术分享图片

6、启动 kafka

D:\kafka>bin\windows\kafka-server-start.bat config\server.properties

技术分享图片

7、运行工程,通过控制器调用消息生产类,创建一条消息到kafka

技术分享图片

看到红框内容,说明消息发送成功。

8、再使用IDEA新建工程引导方式,创建消息消费工程 springboot-kafka-producer。

9、创建消息消费类,并监听topic。

 1 package com.miniooc.kafka.consumer;
 2 
 3 import com.google.gson.Gson;
 4 import com.google.gson.GsonBuilder;
 5 import com.google.gson.reflect.TypeToken;
 6 import com.miniooc.kafka.message.OrderBasic;
 7 import lombok.extern.java.Log;
 8 import org.springframework.kafka.annotation.KafkaListener;
 9 import org.springframework.messaging.handler.annotation.Payload;
10 import org.springframework.stereotype.Component;
11 
12 @Log
13 @Component
14 public class KafkaConsumer {
15 
16     @KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")
17     public void consume(@Payload String message) {
18         GsonBuilder builder = new GsonBuilder();
19         builder.setPrettyPrinting();
20         builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
21         Gson gson = builder.create();
22         OrderBasic orderBasic = gson.fromJson(message, new TypeToken<OrderBasic>() {
23         }.getType());
24         String json = gson.toJson(orderBasic);
25         log.info("\n接受并消费消息\n" + json);
26     }
27 }

10、运行工程。

技术分享图片

看到红框内容,说明消息消费成功。

SpringBoot Kafka 整合完成!

SpringBoot Kafka 整合实例教程

标签:produce   sum   server   str   public   实例   日期   nta   project   

原文地址:https://www.cnblogs.com/songlu/p/9885892.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!