标签:方式 序列 data broker 使用 this 文章 高级 loading
在使用MQ之前,我们回顾一下前两篇博文的内容.
RocketMQ
的四个概念,分别是:Producer
,Consumer
,Message
和Broker
RocketMQ
和其后台系统在本篇博文中,我们会使用使用SpringBoot构建两个微服务,一个作为生产者,一个作为消费者,通过RocketMQ
传递消息,了解在Java
中使用RocketMQ的方法.
在灰皮书第一篇文章中,我画了下面这个图:
现在我们本地的RocketMQ
也部署起来了,接下来我们创建两个微服务通过MQ来收发消息,实现基本的流程.
首先我们创建两个基于SpringBoot
的微服务,分别是:
rocketmq-consumer
消息消费者rocketmq-producer
消息生产者两个服务里面,rocketmq-consumer
的端口号是2001,rocketmq-producer
的端口号是2002
分别在两个微服务写两个测试方法,启动测试:
rocketmq-consumer
@RestController
public class ConsumerController {
@GetMapping("/consumer")
public String index() {
return "rocketmq-consumer";
}
}
rocketmq-producer
@RestController
public class ProducerController {
@GetMapping("/producer")
public String index() {
return "rocketmq-producer";
}
}
启动测试,两个接口都成功访问.
根据我们最上面的图,服务A发送消息到服务B,在这里,我们用rocketmq-producer
来发送消息,消息发送到rocketmq
以后,由服务Brocketmq-consumer
消费消息.
使用rocketmq发送消息有很多种方式,因为我们使用的是SpringBoot
,这里直接使用官方提供的rocketmq-spring-boot-starter
包来开发
在github
上有个项目:RocketMQ-Spring
它就是RocketMq官方提供的整合了SpringBoot
的rocketmq工具包,git地址如下:https://github.com/apache/rocketmq-spring
当然,你也可以使用原生的rocketmq-client
包,在官方的示例中,使用的就是这种方式,具体可以查看官方文档,下面我们直接使用rocketmq-spring-boot-starter
来发送消息.
我们可以看到有很多的版本可以用:
这里我们使用2.0.3
这个版本吧,具体的官方细节可以查看https://github.com/apache/rocketmq-spring/blob/release-2.0.3/README_zh_CN.md
首先是pom坐标:
<!--add dependency in pom.xml-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
然后再rocketmq-producer
的配置文件中配置rocketmq的name-server
和group
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=producer
rocketmq-spring-boot-starter
中提供了一个RocketMQTemplate
来方便我们发送消息,我们可以直接注入这个类来使用.
RocketMQTemplate
有send
方法和convertAndSend
方法,都可以用来发送消息,区别是,前者的方法入参是rocketmq
规定的Message
类型,而后者可以发送对象,并且帮我们转换,源码如下:
/**
* Send a message to the given destination.
* @param destination the target destination
* @param message the message to send
*/
void send(D destination, Message<?> message) throws MessagingException;
/**
* Convert the given Object to serialized form, possibly using a
* {@link org.springframework.messaging.converter.MessageConverter},
* wrap it as a message and send it to a default destination.
* @param payload the Object to use as payload
*/
void convertAndSend(Object payload) throws MessagingException;
下面我们直接发送消息到mq
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/producer")
public String index() {
rocketMQTemplate.convertAndSend("test-topic", "消息发送成功!");
return "rocketmq-producer";
}
convertAndSend
方法有两个参数,第一个参数是消息要发送到的topic
,也就是目的地,第二个参数就是消息本身,至于topic到底是什么,这个我们后面详细来说,我们只需要知道,我们的消息发送到了rocketmq
的一个叫做test-topic的地方即可.
并且,由于我们在灰皮书第二章的时候,启动mq的时候,指定了autoCreateTopicEnable=true
,也就是说,我们使用RocketMQTemplate
发送的消息,就算topic之前不存在,rocket也会帮我们创建好.
编码完成,重启项目,我们只要访问http://localhost:2002/producer
就会发送消息到mq,我们可以通过rocketmq-console
查看我们发送的消息
可以看到mq自动为我们创建了topic:
在message页签,可以查看到我们刚才发送的消息:
详细的消息内容:
在上面的例子中,我们直接发送字符串到MQ,一般来说,我们发送的消息体是一个java对象,在这里也是可以的,我们改造一下代码:
@GetMapping("/producer")
public String index() {
rocketMQTemplate.convertAndSend("test-topic", new User("张三", 20));
return "rocketmq-producer";
}
@Data
class User implements Serializable {
private static final long serialVersionUID = -3486413003967431764L;
private String name;
private Integer age;
User() {}
User(String name, Integer age) {
this.name = name;
this.age = age;
}
}
这样我们发送了一个User对象到RocketMQ
中,我们再去rocketmq-console
查看:
可以看到,消息成功发送到了mq中,需要注意的是,这里我们发送的对象要实现Serializable
接口,不然会抛异常.
那么我们发送的消息的内容是怎么序列化的呢?
RocketMQ的消息体都是以
byte[]
方式存储的,如果内容体是java.lang.String
类型时,统一按照UTF-8
编码转成byte[]
;如果消息内容不是String类型的,则采用jackson-databind
序列化成JSON格式的字符串后,再统一按照UTF-8
编码转换成byte[]
以上释义源于RocketMQ
官方文档,所以说,有问题多看看官方文档能很大程度上解决我们的疑惑!
好了,我们的消息发送成功了,接下来我们在rocketmq-consumer
应用中消费之前发送出来的消息.
在开发之前我们先想一下: 消息的生产者随着用户的请求,不断的往MQ中发送消息,那么消费者在消费消息的时候,是怎么知道它要取哪一条消息呢?
我们之前的文章中提到过一个topic
,生产者在发送消息的时候,会指定一个topic,消息会发送到某个topic下,那么自然而然的,消费者在获取消息的时候,也是需要知道它要从哪个topic
里面去获取消息的.
而获取消息,则是通过监听器
来完成的.
首先在rocketmq-consumer
项目的配置文件中指定mq的nameServer
的地址:rocketmq.name-server=127.0.0.1:9876
创建一个监听器:
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer")
@Slf4j
public class Consumerlistener implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
log.info("收到消息 : {}", message);
}
}
@RocketMQMessageListener
注解中我们指定了2个参数:
其次,我们自定义的监听器还要实现RocketMQListener<T>
接口,该接口的泛型类型就是我们生产者发送消息的消息类型,之前我们发送的是User
对象,因此这里也是User
对象
实现RocketMQListener
接口的onMessage
方法,方法的入参就是我们发送出来的消息,在这个方法中我们可以进行自己的业务处理.
启动服务rocketmq-consumer
,可以看到正常消费到了消息:
以上,我们成功的在我们的微服务中使用RocketMQ
进行了消息的发送和消费.
不仅仅是简单的消息,RocketMQ
还支持更高级的功能,比如事务消息
、消息轨迹
等,这些高级特效我们会下后面的进阶文章中详细讲解.
在本篇博文中,我们使用RocketMQ
官方提供的pom包进行了消息的发送和接收,也成功的在rocketmq-console
中查看到了消息.
在这个工程中,我们接触了很多新的概念:
以上这些概念,以及前面篇文章中遗留下来的概念,我们将在下一篇文章中详细介绍.
个人公众号<橙耘自留地>日前已经开通,后续博主发布的文章都会一并更新到公众号,如有需要,自行查阅.
关于橙耘自留地,是我个人聚焦互联网技术栈学习分享的一个平台,创立之初是因为目前业内各种技术课程资料层次不齐,褒贬不一,有时候一门课花费高价买入,其实内含草包,有时偶尔低价得之,却又大有干货.因此我会根据大家的意见和评价,选择不同的技术栈去学习,一为提升我自己的技术,二为大家梳理出质量比较好的课程,以作参考.同时,相关的学习心得也会一并更新到博客和公众号.
标签:方式 序列 data broker 使用 this 文章 高级 loading
原文地址:https://www.cnblogs.com/jianxiaochun/p/14412209.html