标签:artifact 面向 which pom arc 时间 dep declare user
哪些互联网大厂在使用RabbitMQ,为什么?初识RabbitMQ:
哪些互联网大厂在使用RabbitMQ:
为什么使用RabbitMQ:
什么是AMQP高级消息队列协议:
AMQP协议模型:
RabbitMQ整体架构图:
RabbitMQ消息流转图:
官方下载地址:
我们知道RabbitMQ是基于Erlang编写的,所以在安装RabbitMQ之前需要确保安装了Erlang环境。RabbitMQ与Erlang是有版本对应关系的,可以参考官方列举的版本对应关系:
例如,我这里要安装3.8.9版本的RabbitMQ,那么按官方的说明,我需要安装 22.3 ~ 23.x 版本的Erlang环境,我这里选择23.1.3版本的Erlang。使用如下命令下载RPM安装包:
[root@rabbitmq01 ~]# cd /usr/local/src
[root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.1.3/erlang-23.1.3-1.el7.x86_64.rpm
[root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq01 /usr/local/src]# ls
erlang-23.1.3-1.el7.x86_64.rpm rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq01 /usr/local/src]#
使用yum
命令进行安装,因为yum
可自动解决依赖关系:
[root@rabbitmq01 /usr/local/src]# yum install -y erlang-23.1.3-1.el7.x86_64.rpm
[root@rabbitmq01 /usr/local/src]# yum install -y rabbitmq-server-3.8.9-1.el7.noarch.rpm
RabbitMQ新版本没有提供配置文件的示例,需要自己去Github上下载:
将配置文件放到/etc/rabbitmq
目录下:
[root@rabbitmq01 /usr/local/src]# mv rabbitmq.conf.example /etc/rabbitmq/rabbitmq.conf
修改配置文件:
[root@rabbitmq01 ~]# vim /etc/rabbitmq/rabbitmq.conf
# 允许默认用户被外部网络访问
loopback_users.guest = false
完成配置后,启动RabbitMQ Server:
[root@rabbitmq01 ~]# rabbitmq-server start &
检查端口是否正常监听,5672是RabbitMQ的默认端口号:
[root@rabbitmq01 ~]# netstat -lntp |grep 5672
tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 1922/beam.smp
tcp6 0 0 :::5672 :::* LISTEN 1922/beam.smp
[root@rabbitmq01 ~]#
启用RabbitMQ的管控台插件,我们可以在管控台中查看RabbitMQ的基础监控信息,以及对RabbitMQ进行管理:
[root@rabbitmq01 ~]# rabbitmq-plugins enable rabbitmq_management
使用浏览器访问管控台的15672端口,进入到登录界面,默认用户名密码均为guest:
登录成功,进入到管控台首页:
rabbitmqctl
命令行操作rabbitmqctl
基础操作命令:
# 关闭应用
rabbitmqctl stop_app
# 启动应用
rabbitmqctl start_app
# 节点状态
rabbitmqctl status
# 添加用户
rabbitmqctl add user username password
# 列出所有用户
rabbitmqctl list users
# 删除用户
rabbitmqctl delete_user username
# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限
rabbitmqctl list_user_permissions username
# 修改密码
rabbitmqctl change_password username newpassword
# 设置用户权限
rabbitmqctl set permissions -p vhostpath username ".*" ".*" ".*"
# 创建虚拟主机
rabbitmqctl add vhost vhostpath
# 列出所有虚拟主机
rabbitmqctl list_vhosts
# 列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath
# 查看所有队列信息
rabbitmqctl list_queues
# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue blue
rabbitmqctl
高级操作命令:
# 移除所有数据,要在rabbitmqctIl stop_app之后使用
rabbitmqctl reset
# 组成集群命令
rabbitmqctl join_cluster <clusternode> [--ram]
# 查看集群状态
rabbitmqctl cluster_status
# 修改集群节点的存储形式
rabbitmqctl change_cluster_node_type disc | ram
# 忘记节点(摘除节点)
rabbitmqctl forget cluster_node [--offline]
# 修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2..]
创建一个Maven工程,在pom
文件中添加如下依赖:
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
生产者代码示例:
package com.zj.rabbitmq.learn.basic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
/**
* 生产者
*
* @author 01
* @date 2020-11-23
**/
public class MyProducer {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
for (int i = 0; i < 5; i++) {
// 通过Channel发送数据
String msg = "Hello RabbitMQ!";
// 不设置Exchange默认走default direct exchange,此时routingKey就是队列名称
channel.basicPublish("", "test001", null, msg.getBytes());
}
}
}
}
消费者代码示例:
package com.zj.rabbitmq.learn.basic;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
/**
* 消费者
*
* @author 01
* @date 2020-11-23
**/
public class MyConsumer {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
// 声明一个队列,队列不存在会自动创建
channel.queueDeclare("test001", true, false, false, null);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println("Received: " + message);
}
};
// 持续监听,消费消息
while (true){
channel.basicConsume("test001", true, consumer);
Thread.sleep(1000);
}
}
}
}
先运行消费者,再运行生产者,此时消费者控制台输出如下:
Exchange(交换机)用于接收消息,并根据路由键转发消息所绑定的队列:
交换机属性:
生产者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
public class ProducerOfDirectExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
// 通过Channel发送数据
String msg = "Hello RabbitMQ of Direct Exchange!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
}
消费者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
public class ConsumerOfDirectExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
// 声明一个direct类型的Exchange
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明一个队列,队列不存在会自动创建
channel.queueDeclare(queueName, true, false, false, null);
// 将队列绑定到指定的Exchange上
channel.queueBind(queueName, exchangeName, routingKey);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println("Received: " + message);
}
};
// 持续监听,消费消息
while (true) {
channel.basicConsume(queueName, true, consumer);
Thread.sleep(1000);
}
}
}
}
生产者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
public class ProducerOfTopicExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
// 通过Channel发送数据
String msg = "Hello RabbitMQ of Topic Exchange!";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
}
}
}
消费者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
public class ConsumerOfTopicExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
//String routingKey = "user.*";
String routingKey = "user.#";
// 声明一个topic类型的Exchange
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明一个队列,队列不存在会自动创建
channel.queueDeclare(queueName, true, false, false, null);
// 将队列绑定到指定的Exchange上
channel.queueBind(queueName, exchangeName, routingKey);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println("Received: " + message);
}
};
// 持续监听,消费消息
while (true) {
channel.basicConsume(queueName, true, consumer);
Thread.sleep(1000);
}
}
}
}
生产者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
public class ProducerOfFanoutExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
String exchangeName = "test_fanout_exchange";
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
for (int i = 0; i < 10; i++) {
// 通过Channel发送数据
String msg = "Hello RabbitMQ of Fanout Exchange!";
channel.basicPublish(exchangeName, "", null, msg.getBytes());
}
}
}
}
消费者代码示例:
package com.zj.rabbitmq.learn.exchange;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
public class ConsumerOfFanoutExchange {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
// 不设置routingKey
String routingKey = "";
// 声明一个fanout类型的Exchange
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明一个队列,队列不存在会自动创建
channel.queueDeclare(queueName, true, false, false, null);
// 将队列绑定到指定的Exchange上
channel.queueBind(queueName, exchangeName, routingKey);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println("Received: " + message);
}
};
// 持续监听,消费消息
while (true) {
channel.basicConsume(queueName, true, consumer);
Thread.sleep(1000);
}
}
}
}
Binding - 绑定:
Queue - 消息队列:
Message - 消息:
设置Message属性代码示例:
package com.zj.rabbitmq.learn.message;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;
import java.util.HashMap;
import java.util.Map;
class MyProducer {
@SneakyThrows
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.243.164");
factory.setPort(5672);
factory.setVirtualHost("/");
Map<String, Object> headers = new HashMap<>();
headers.put("a", "1");
headers.put("b", "2");
// 自定义Message的一些属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 持久化模式
.deliveryMode(2)
// 消息的编码格式
.contentEncoding("UTF-8")
// 消息过期时间
.expiration("15000")
// 设置消息的头
.headers(headers)
.build();
// 通过连接工厂创建连接
try (Connection connection = factory.newConnection();
// 通过连接创建一个Channel
Channel channel = connection.createChannel()) {
for (int i = 0; i < 5; i++) {
// 通过Channel发送数据
String msg = "Hello RabbitMQ!";
// 不设置Exchange默认走direct exchange,此时routingKey就是队列名称
channel.basicPublish("", "test001", properties, msg.getBytes());
}
}
}
}
Virtual host - 虚拟主机:
标签:artifact 面向 which pom arc 时间 dep declare user
原文地址:https://blog.51cto.com/zero01/2553646