标签:注册 pen 解释 工具类 意义 throw 登录 一对一 depend
真的想尽快学完种种框架,综合搭建起一个项目来,然后看着他出Bug、慢慢地自己去优化,重构,再完善。
笔者经常能看到MQ这个词,知道其作为消息队列,但始终没有接触过,现在刚好有个机会(不知道在抢答系统中能不能用上),首先当然要知道MQ有什么作用:
Docker快速安装,想不到之前学Docker为了简化环境搭建,现在这么快就能体验上了
# 安装带有标签的版本,开启了插件有web管理页面
docker pull rabbitmq:management
# 运行rabbit,默认账号密码为 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
# 也可以改变环境变量来改变初始账号密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
RabbitMQ的端口是:5672,其插件manage的端口为:15672
还有手动安装的小伙伴其配置文件在: /etc/RabbitMQ/rabbit.conf(有些需要手动创建)
RabbitMQ提供了插件功能,上面的manager插件也就是Web管理页面给我们提供了Web页面管理MQ的途径。进去首先改密码、创建新的账户、创建新的虚拟主机(库)、将新账号分配新虚拟机等(这些名词后面会有解释)
那么就打开Web管理页面
// 打开浏览器输入,就会看见登录页面
// 默认账号密码都是guest
http://localhost:15672
进去先不要慌,点击上方的Admin标签,尝试改密码(主要为了熟悉界面,可以直接跳过)
这里提前解释一些后面会遇到的名词,方便大家构建对MQ的理解。
非常强烈建议去官网看Docs,其文档内容不多,有各语言的实操代码与解释。笔者就是看相关文档,加上自己实操与理解写下的笔记,以下内容均来自官网,笔者做了部分修改来契合自己的书写习惯,下面就以发送一条语句为例说明
首先需要导包:使用普通maven工程或Springboot工程都可,笔者就按照官网的硬编码方式走一遍
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
队列的名字就叫 Hello World ,是一对一模型,中间不需要交换机
public class Send {
// name the queue
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
// then we can create a connection to the server
// 根据方法名就知道各种参数是什么意思了,主要用于建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/hello");
factory.setUsername("guest");
factory.setPassword("guest");
// 使用try,resouce方式关闭连接
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()){
// 声明消息队列,各参数为:队列名字,持久化与否,连接是否独占队列,是否消费完自动删除,最后一个不管
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 要发送的消息
String message = "Hello World!";
// 发布消息:交换机,队列名,传递消息额外设置,消息内容需要字节
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
} catch (TimeoutException | IOException e) {
e.printStackTrace();
}
}
}
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// then we can create a connection to the server
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("guest");
factory.setPassword("guest");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消费消息:队列名,是否开启自动确认机制,回调接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 最后一个参数,即消息队列中取出的消息
System.out.println(" [x] Received ‘" + new String(body) + "‘");
}
});
// channel.close();
// conn.close();
}
}
创建工厂,获取连接,通道存在各业务中属于冗余代码,所以将其封装成一个工具类,方便后面使用,以及简化后面的逻辑,聚集在模型理解
public class RabbitMQUtil {
private static ConnectionFactory factory;
static {
factory = new ConnectionFactory();
factory.setHost("47.56.143.47");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
}
public static Connection get() {
try {
return factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void close(Channel channel, Connection conn) {
try {
if(channel != null){
channel.close();
}
if(conn != null){
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
与Hello World相比,这种队列是增加了消费者,应该容易理解
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.get();
Channel channel = conn.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "Word Queue!";
for (int i = 0; i < 100; i++) {
channel.basicPublish("",TASK_QUEUE_NAME,null,(message + ": " + i).getBytes());
}
System.out.println(" [x] Sent ‘" + message + "‘");
RabbitMQUtil.close(channel,conn);
}
}
// Worker2代码一样,不重复写了
public class Worker1 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.get();
Channel channel = conn.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicConsume(TASK_QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(" [x] Received ‘" + new String(body) + "‘");
}
});
}
}
// 仅接收一次未确认的消息
channel.basicQos(1);
// 队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 消息消费完后手动确认
channel.basicAck(envelope.getDeliveryTag(),false);
该模型中添加了交换机X,与以往不同。RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列
相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换类型定义 。
We‘ll focus on the last one -- the fanout,下面我们将主要讨论扇出这个模型
扇出类似于广播
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.get();
Channel channel = conn.createChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String message = "Fanout!";
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
RabbitMQUtil.close(channel,conn);
}
}
public class ReceiveLogs1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.get();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 获取随机队列
String queueName = channel.queueDeclare().getQueue();
// 第三个参数是路由key,广播中无意义
channel.queueBind(queueName,EXCHANGE_NAME,"");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
笔者理解为是通过设置路由关键字使消息定向到不同的队列
public class EmitLogDirect {
// 定义路由键
private static final String EXCHANGE_NAME = "direct_logs";
private static final String ROUTING_KEY = "info";
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.get();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String message = "Routing!";
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,message.getBytes());
System.out.println(" [x] Sent ‘" + ROUTING_KEY + "‘:‘" + message + "‘");
RabbitMQUtil.close(channel,conn);
}
}
public class ReceiveLogsDirect1 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String ROUTING_KEY = "info";
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.get();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,ROUTING_KEY);
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
主题的路由键它必须是单词列表,以点分隔。功能类似于动态路由,其中 * 匹配一个单词, # 匹配0或多个,eg:quick.orange.rabbit
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String ROUNTING_KEY = "lazy.origin";
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.get();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message = "Topic!";
channel.basicPublish(EXCHANGE_NAME,ROUNTING_KEY,null,message.getBytes());
System.out.println(" [x] Sent ‘" + ROUNTING_KEY + "‘:‘" + message + "‘");
RabbitMQUtil.close(channel,conn);
}
}
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String ROUNTING_KEY = "lazy.*";
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.get();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME,ROUNTING_KEY);
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
订阅
,对自己感兴趣的事接收消息,而这个订阅
功能用路由键来实现后面还有RPC以及新出的Publisher Confirms模型,这里简单给出RPC模型,因为笔者暂时使用不到这些模型,后期需要用到再来补坑
参考
https://www.rabbitmq.com/getstarted.html
标签:注册 pen 解释 工具类 意义 throw 登录 一对一 depend
原文地址:https://www.cnblogs.com/Howlet/p/12784391.html