标签:
在了解RabbitMQ之前,首先要了解AMQP协议。AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
当前各种应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一致使应用与中间件之间的耦合限制产品的选择,并增加维护成本。AMQP是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受不同客户端/中间件产品,不同开发语言等条件的限制。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP的实现有:
OpenAMQ
AMQP的开源实现,用C语言编写,运行于Linux、AIX、Solaris、Windows、OpenVMS
Apache Qpid
Apache的开源项目,支持C++、Ruby、Java、JMS、Python和.NET
Redhat Enterprise MRG
实现了AMQP的最新版本0-10,提供了丰富的特征集,比如完全管理、联合、Active-Active集群,有Web控制台,还有许多企业级特征,客户端支持C++、Ruby、Java、JMS、Python和.NET
RabbitMQ
一个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在Ubuntu、FreeBSD平台
AMQP Infrastructure
Linux下,包括Broker、管理工具、Agent和客户端
?MQ
一个高性能的消息平台,在分布式消息网络可作为兼容AMQP的Broker节点,绑定了多种语言,包括Python、C、C++、Lisp、Ruby等
Zyre
是一个Broker,实现了RestMS协议和AMQP协议,提供了RESTful HTTP访问网络AMQP的能力
以上是AMQP中的核心概念:
Broker
消息服务器的实体
虚拟主机(Virtual Host)
一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。客户端应用程序在登录到服务器之后,可以选择一个虚拟主机。每个连接(包括所有channel)都必须关联至一个虚拟主机
交换器(Exchange)
服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
消息队列(Message Queue)
服务器中的实体,用来保存消息直到发送给消费者
生产者(Producer)
一个向交换器发布消息的客户端应用程序
消费者(Consumer)
一个从消息队列中请求消息的客户端应用程序
绑定器(Binding)
将交换器和队列连接起来,并且封装消息的路由信息
所有这些组件的属性各不相同,但是只有交换器和队列被命名。客户端可以通过交换器的名字来发送消息,也可以通过队列的名字收取信息。因为AMQ 协议没有一个通用的标准方法来获得所有组件的名称,所以客户端对队列和交换器的访问被限制在仅能使用熟知的或者只有自己知道的名字。
绑定器没有名字,它们的生命期依赖于所紧密连接的交换器和队列。如果这两者任意一个被删除掉,那么绑定器便失效了。这就说明,若要知道交换器和队列的名字,还需要设置消息路由。
消息是一个不透明的数据包,这些包有如下性质:
元数据,例如内容的编码或者表明来源的字段
标志位,标记消息投递时候的一些保障机制
一个特殊的字段叫做routing key
发送消息是一个非常简单的过程。客户端声明一个它想要发送消息的目的交换器,然后将消息传递给交换器。
接受消息的最简单办法是设置一个订阅。客户端需要声明一个队列,并且使用一个绑定器将之前的交换器和队列绑定起来,这样的话,订阅就设置完毕。
交换器的类型:
fanout交换器
不会解释任何东西:它只是将消息投递到所有绑定到它的队列中
direct交换器
将消息根据其routing-key属性投递到包含对应key属性的绑定器上
topic交换器
模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key *.stock.#匹配routing-key usd.stcok和eur.stock.db,但是不匹配stock.nasdaq
header交换器
根据应用程序消息的特定属性进行匹配
failover和system交换器
当前RabbitMQ版本中均未实现
没有绑定器,哪怕是最简单的消息,交换器也不能将其投递到队列中,只能抛弃它。通过订阅一个队列,消费者能够从队列中获取消息,然后在使用过后将其从队列中删除。
不同于队列的是,交换器有相应的类型,表明它们的投递方式(通常是在和绑定器协作的时候)。因为交换器是命名实体,所以声明一个已经存在的交换器, 但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。
交换器也有一些性质:
持久性:如果启用,交换器将会在Broker重启前都有效
自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身
惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明
AMQP Broker都会对其支持的每种交换器类型(为每一个虚拟主机)声明一个实例。这些交换器的命名规则是amq.前缀加上类型名。例如 amq.fanout。空的交换器名称等于amq.direct。对这个默认的direct交换器(也仅仅是对这个交换器),Broker将会声明一个绑定了系统中所有队列的绑定器。
这个特点告诉我们,在系统中,任意队列都可以和默认的direct交换器绑定在一起,只要其routing-key等于队列名字。
默认绑定器的行为揭示了多绑定器的存在,将一个或者多个队列和一个或者多个交换器绑定起来。这使得可以将发送到不同交换器的具有不同routing key(或者其他属性)的消息发送到同一个队列中。
队列也有以下属性,这些属性和交换器所具有的属性类似。
持久性:如果启用,队列将会在Broker重启前都有效
自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身
惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明
排他性:如果启用,队列只能被声明它的消费者使用
这些性质可以用来创建例如排他和自删除的transient或者私有队列。这种队列将会在所有链接到它的客户端断开连接之后被自动删除掉 – 它们只是短暂地连接到Broker,但是可以用于实现例如RPC或者在AMQ上的对等通信。
AMQP上的RPC是这样的:RPC客户端声明一个回复队列,唯一命名(例如用UUID19), 并且是自删除和排他的。然后它发送请求给一些交换器,在消息的reply-to字段中包含了之前声明的回复队列的名字。RPC服务器将会回答这些请求,使用消息的reply-to作为routing key(之前提到过默认绑定器会绑定所有的队列到默认交换器)发送到默认交换器。注意仅仅是惯例而已。根据和RPC服务器的约定,它可以解释消息的任何属性(甚至数据体)来决定回复给谁。
队列也可以是持久的,可共享,非自动删除以及非排他的。使用同一个队列的多个用户接收到的并不是发送到这个队列的消息的一份拷贝,而是这些用户共享这队列中的一份数据,然后在使用完之后删除掉。
RabbitMQ是一个遵循AMQP协议的消息中间件,它从生产者接收消息并递送给消费者,在这个过程中,根据规则进行路由,缓存与持久化。
几个概念说明(完全遵循AMQP中的概念):
Broker:简单来说就是消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
Queue:消息队列载体,每个消息都会被投入到一个或多个队列
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
Routing Key:路由关键字,exchange根据这个关键字进行消息投递
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
producer:消息生产者,就是投递消息的程序
consumer:消息消费者,就是接受消息的程序
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
消息队列的使用过程大概如下:
客户端连接到消息队列服务器,打开一个channel
客户端声明一个exchange,并设置相关属性
客户端声明一个queue,并设置相关属性
客户端使用routing key,在exchange和queue之间建立好绑定关系
客户端投递消息到exchange
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ支持消息的持久化,消息队列持久化包括3个部分:
exchange持久化,在声明时指定durable为true
queue持久化,在声明时指定durable为true
消息持久化,在投递时指定delivery_mode 为2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
RabbitMQ的特性:
可靠性:包括消息持久化,消费者和生产者的消息确认
灵活路由:遵循AMQP协议,支持多种Exchange类型实现不同路由策略
分布式:集群的支持,包括本地网络与远程网络
高可用性:支持主从备份与镜像队列
多语言支持:支持多语言的客户端
WEB界面管理:可以管理用户权限,exhange,queue,binding,与实时监控
访问控制:基于vhosts实现访问控制
调试追踪:支持tracing,方便调试
因为RabbitMQ由ERLANG实现,安装RabbitMQ之前要先安装ERLANG
安装包:otp_src_R15B03-1.tar.gz ERLANG安装包
rabbitmq-server-generic-unix-3.0.0.tar.gz RabbitMQ服务端
rabbitmq-java-client-bin-3.0.0.tar.gz RabbitMQ客户端,包含性能测试脚本
以下是上述版本为例的安装步骤,后续章节描述的内容都对应此版本ERLANG的安装步骤:
tar -zxf otp_src_R15B03-1.tar.gzz cd otp_src_R15B03 ./configure make make install
RabbitMQ客户端与服务端的安装直接解压安装包即可,客户端的目录中,rabbitmq-client.jar为JAVA版的客户端,编写客户端程序时需要引用,脚本文件为性能测试脚本
$RABBIT_MQ_HOME/sbin目录中的文件说明及命令使用请参考http://www.rabbitmq.com/manpages.html
RabbitMQ的启停:
rabbitmq-server启动服务,如要以后台方式启动,增加-detached参数
rabbitmqctl stop停止服务
rabbitmq-plugins enable rabbitmq_management打开WEB管理界面插件,默认访问地址:
http://服务器IP:15672
通过配置环境变量或者配置文件,修改诸如端口,绑定IP,broker的名称等,参考配置管理章节
例如:
修改$RABBIT_MQ_HOME/sbin/rabbitmq-env文件,增加配置:
HOSTNAME=broker_138 如果是集群,每台机器的名称要不同
RABBITMQ_NODE_IP_ADDRESS=192.168.100.138 绑定机器IP
RabbitMQ集群的运行需要集群中的所有节点共享erlang.cookie,以其中一台RabbitMQ中用户目录下~/.erlang.cookie文件为准,复制文件内容,将所有节点的erlang.cookie文件都修改为此值。
先启动所有节点的RabbitMQ,然后依次在每台RabbitMQ中执行命令:
./rabbitmqctl stop_app ./rabbitmqctl join_cluster rabbit@broker_138 ./rabbitmqctl start_app
rabbit@broker_138为其中一台RabbitMQ的实例名称,所有RabbitMQ节点都添加同一节点即可。
一个简单的示例,P是生产者,C是消费者。P发送消息到队列,C从队列取消息。代码如下:
生产者:
首先建立连接,在连接上建立channel,通常一个连接会建立多个channel,可以提高消息的发送速度。这里只建立了一个连接,建立多个channel时,客户端可使用多线程,每个线程里使用一个channel
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //然后声明队列,如果队列没有预先创建,会创建队列。消息以字节码的形式发送,所以在客户端可以使用任何编码格式。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent ‘" + message + "‘"); //最后别忘了关闭channel和connection channel.close(); connection.close();
消费者:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received ‘" + message + "‘"); }
一个队列可以有多个消费者,队列发送消息采用Round-robin方式,即顺序发送给每个消费者,在单位时间内,每个消费者收到的消息数量是相同的。
以上是假设每个消费者处理消息的速度是一样的,如果每个消费者处理消息的速度不同,那么Round-robin方式的效率就不高了,这时可以设置prefetch参数。prefetch的值表示在消费者为返回上一条消息的确认信息时,队列最多发送给此消费者的消息数目。如果消息数目达到prefetch值,队列就停止发送消息给这个消费者,并随之发送给不忙的消费者。prefetch通过以下代码设置:
channel.basicQos(prefetchCount);
在上一个示例中,队列保证每条消息发送给其中一个消费者,即每个消息只被处理一次。在实际应用中,经常会有这样的需求,每条消息要同时发送给多个消费者或者更复杂的情况。也就是说消息需要根据一定的规则发送给不同的消费者。
为实现消息路由,需要引入Exchange,图中用X表示。生产者不再直接发送消息给队列,而是先发送到Exchange。然后Exchange与队列绑定。这样消息会根据不同规则发送给不同队列,最终到达不同的消费者。
实现代码如下:
生产者:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent ‘" + message + "‘"); channel.close(); connection.close();
消费者:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received ‘" + message + "‘"); }
很多时候,队列是非持久并且是自动删除的,这时队列名称也就不重要了,可以通过以下代码,由服务器自动生成队列名称。自动生成的队列以amq开头
String queueName = channel.queueDeclare().getQueue();
Exchange的类型不同,消息的路由规则也不同,Exchange的类型介绍请参考RabbitMQ简介。以下是以direct类型的Exchange为例的生产者代码实现, 最重要的两步就是声明Exhange类型与发送时指定routeKey
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); //返回info,error,warning作为routeKey String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘"); channel.close(); connection.close();
以下是以topic类型的Exchange为例的生产者代码实现:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent ‘" + routingKey + "‘:‘" + message + "‘"); connection.close();
一条以quick.orange.rabbit为routeKey的消息,Q1和Q2都会收到,lazy.orange.elephant也是。quick.orange.fox只能发送到Q1,lazy.brown.fox只能到Q2. lazy.pink.rabbit虽然符合两个匹配规则,但只发送到Q2,因为先匹配的lasy.#规则。quick.brown.fox则Q1和Q2都收不到,会被直接丢弃。
以上示例都是异步的,即生产者不需要等待消费者的反馈。在实际情况中,有些时候在消息处理比较快,且需要及时反馈时,则需要同步的方式,生产者发送消息,在收到消费者的反馈前一直处于阻塞状态。因为等待的返回来自远程主机,这种方式也被称为RPC(Remote procedure call)。RPC的实现有很多,比如JAVA平台下的RMI,JMX。
以下是在RabbitMQ中的实现:
RPCClient:
private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = java.util.UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } public void close() throws Exception { connection.close(); }
RPCServer:
private static final String RPC_QUEUE_NAME = "rpc_queue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }
工作流程如下:
客户端启动,建立发送队列与反馈队列
当RPC客户端发送消息时,设置replyTo和correlationId参数。replyTo参数为反馈队列的名称,correlationId作为一次请求的唯一标识,要每次请求都不同,用于关联服务端的反馈消息
请求发送到rpc_queue
服务端等待请求,当收到请求后,处理请求,并将反馈通过replyTo指定的反馈队列发送回去
客户端收到反馈,并校验correlationId的值是否与发送的一致,如果一致,则一次请求完成
RabbitMQ不支持连接的failover,所以需要客户端自己实现失败重连。
为保证消息的可靠传递,服务器使用持久化保证消息不丢失。包括exchange与queue必须定义为持久的,同时发送消息时,也要设置消息为持久消息。
在代码中可以通过以下语句设置发送持久消息:
channel.basicPublish(exchangeName, routeKey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg)
或者:
BasicProperties basicProperties = new AMQP.BasicProperties.Builder().deliveryMode(2).build(); // deliveryMode为1是非持久 channel.basicPublish(exchangeName, routeKey, basicProperties, msg)
生产者的消息确认叫做confirm,confirm确保消息已经发送到MQ中。当connection或channel异常时,会重新发送消息,如果消息是持久的,并不能一定保证消息持久化到磁盘中,因为消息可能存在与磁盘的缓存中。为进一步提高可靠性,可以使用事务。Confirm与事务不能同时使用。
当生产者收不到confirm时,消息可能会重复,所以如果消息不允许重复,则消费者需要自己实现消息去重。
使用以下代码打开confirm,默认是关闭的
channel.confirmSelect();
消费者的消息确认叫做Acknowledgements,Acknowledgements确保消费者已经处理了消息,如果收不到消费者的Acknowledgements,MQ会重新发送消息。
默认Acknowledgements是自动确认,如需客户端控制,在消费者的代码中设置:
channel.basicConsume(queueName,false,consumer);//声明队列时,设置autoack为false 。。。 //消息处理代码 。。。 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //发送确认
同样,MQ也可能收不到消费者的Acknowledgements,就会重复发送消息,若要避免,消费者需要自己实现消息去重。
RabbitMQ提供了3中分布式的解决方案,cluster,federation,shovel。cluster用于可靠的本地局域网,后两种用于不可靠的网络。
Cluster将多台机器连接为一个逻辑broker,各机器之间使用Erlang消息通信,所以cluster中各机器必须有一样的Erlang cookie,并且机器之间的网络要是可靠的,并且都运行相同版本的Erlang。
Virtual hosts,exchanges,用户及权限都在所有节点同步,queues可以位于本机,也可以作为镜像队列,在各个机器之间同步。
通常使用cluster来提高可靠性与增加吞吐量。
Federation允许一个exchange从另外一台机器或者cluster的exchange中接收消息,因为是两个exchange联合起来,所以必须有相同的用户权限。
联合起来的exchange是单向的点对点的连接。
通常应该在通过internet连接broker的时候使用Federation
Shovel与Federation的概念类似,只是工作在更低的层次。
Federation是从一个exchange到另一个exchange,而Shovel是从一边的queue中取走消息并发送到另一个exchange。
通常在通过internet连接broker的时,并且需要获得比Federation更多控制权的时候使用Shovel。
以下是三种分布式模式的简要对比:
Federation / Shovel |
Clustering |
Brokers are logically separate and may have different owners. |
A cluster forms a single logical broker. |
Brokers can run different versions of RabbitMQ and Erlang. |
Nodes must run the same version of RabbitMQ, and frequently Erlang. |
Brokers can run different versions of RabbitMQ and Erlang. |
Brokers must be connected via reliable LAN links. Communication is via Erlang internode messaging, requiring a shared Erlang cookie. |
Brokers can be connected in whatever topology you arrange. Links can be one- or two-way. |
All nodes connect to all other nodes in both directions. |
Brokers can be connected in whatever topology you arrange. Links can be one- or two-way. |
Chooses Consistency and Availability from the CAP theorem. |
Some exchanges in a broker may be federated while some may be local. |
Clustering is all-or-nothing. |
A client connecting to any broker can only see queues in that broker. |
A client connecting to any node can see queues on all nodes. |
当生产者发送消息的速率大于消息被路由到queue的速率时,会触发流量控制,发送速率受到限制,但不会完全阻塞。
当内存使用达到vm_memory_high_watermark的值时,会触发流量控制,生产者被阻塞。vm_memory_high_watermark的默认值是系统内存的40%,这个值可以在配置文件中修改。
[{rabbit, [{vm_memory_high_watermark, 0.4}]}].
或者在运行时通过命令rabbitmqctlset_vm_memory_high_watermark fraction修改,修改立即生效,但下次重启后恢复。所以要永久修改,必须同时修改配置文件。
当磁盘剩余空间小于disk_free_limit的值时,触发流量控制,生产者被阻塞。disk_free_limit的默认值是1GB,可在配置文件中修改。
[{rabbit, [{disk_free_limit, 25000000000}]}].
通过命令rabbitmqctl status可以查看内存使用状态,或者在WEB管理界面中点击节点后查看。
其中Queues表示队列中消息占用的内存
Mnesia表示MQ中定义的exchange,queue,bindings,用户及权限占用的内存
详细说明请参考http://www.rabbitmq.com/memory-use.html
RabbitMQ的默认配置在大部分情况下是最佳配置,如果服务运行良好,不需要修改。RabbitMQ支持3种方式修改配置:环境变量、配置文件、运行时参数与策略。
环境变量可以配置到shell环境变量中,也可以在RabbitMQ的环境变量中配置。例如:配置服务绑定IP,可以在shell环境变量里配置RABBITMQ_NODE_IP_ADDRESS的值,也可以在RabbitMQ的环境变量中配置NODE_IP_ADDRESS的值,即RabbitMQ的环境变量中变量名称要去掉RABBITMQ_。RabbitMQ的环境变量文件在$RABBITMQ_HOME/sbin/rabbitmq-env。配置的优先级为shell环境变量优先于RabbitMQ的环境变量,RabbitMQ的环境变量优先于RabbitMQ默认的环境变量。
通过配置文件配置,要先在环境变量中指定配置文件路径,例如:
CONFIG_FILE=/etc/rabbitmq/rabbitmq.config
然后添加配置,例如:
[ {mnesia, [{dump_log_write_threshold, 1000}]}, {rabbit, [{tcp_listeners, [5673]}]} ].
通过rabbitmqctl命令可以在运行时修改配置,例如修改vm_memory_high_watermark。还有些配置,比如镜像队列,是通过管理界面或命令配置策略实现的。
详细的配置项请参考http://www.rabbitmq.com/configure.html
RabbitMQ支持主从备份,当主服务器不可用时,存在磁盘中的消息可以由从服务器恢复。
也可以在集群的基础上配置主从备份。主从备份依赖Pacemaker来管理资源,主从备份的方式已不推荐使用,而镜像队列则更容易使用,且可靠性更高。
虽然使用cluster可以提高可靠性,exchange,binding在各个机器是共享的,但是queue中的消息实际上还是存在单独的机器,如果一台机器不可用,那么在这台机器恢复前,这台机器中存储的消息也是不可用的。
为解决这样的问题,引入了镜像队列,镜像队列是在集群中为队列建立的一个或多个物理镜像,这些镜像分别存储在主节点之外的其他节点,所有节点中的队列共同组成一个逻辑队列。将一个队列做镜像后,即使此机器不可用,RabbitMQ会自动从镜像中选择一个继续使用,不会导致队列中的消息不可用。
如果为一个队列建立多个镜像,前者称为主节点,后者称为从节点。如果主节点有问题,那么RabbitMQ会从从节点中选择最早同步的一个作为新的主节点,以保证尽量不丢失消息,然而原主节点中同步之前的消息还是会丢失。
镜像队列运行在cluster中,不建议通过WAN使用,也就是不建议在Federation和Shovel中使用。
镜像队列是通过策略配置的,添加一个策略,匹配相应的队列,然后指定一个key为ha-mode的参数,例如:
rabbitmqctl set_policy ha-all "^ha." ‘{"ha-mode":"all"}‘
这个策略设置所有的节点都为ha.开头的队列做镜像。这个设置也可以在管理界面中添加,详细信息请参考http://www.rabbitmq.com/ha.html
RabbitMQ的JAVA客户端中附带了性能测试脚本,以下数据都由此脚本测试得到。
硬件环境:CPU::Intel(R) Core(TM) i5-2400 CPU @ 3.10GHz
内存:4G
磁盘:500G 10000转/分
软件环境:otp_src_R15B03-1.tar.gz
rabbitmq-server-generic-unix-3.0.0.tar.gz (单台)
rabbitmq-java-client-bin-3.0.0.tar.gz
Red Hat 4.1.2-48 (Linux version 2.6.18)
以下是发送0.5KB大小消息的测试结果:
producer consumer confirm(max unconfirmed publishes 100) ack persistent throughput (msg/s)
1 1 N N N 17650
1 1 Y N N 15640
1 1 N Y N 17100
1 1 N N Y 17368
1 1 Y N Y 15635
1 1 N Y Y 9154
1 1 Y Y N 15266
1 1 Y Y Y 6111
max unconfirmed publishes的值对于吞吐量的影响较大.
在发送持久消息与打开消费者的acknowledgements时,吞吐量变化明显。
关于性能,请参考以下文章:
http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/
http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
RabbitMQ中的队列性能是一个值得关注的地方。在设计方案时就应该考虑到。队列只有在保持队列中不积压消息时,性能才是最佳的,队列中积压的消息越多,性能下降越多。
例如生产者发送消息的速度是600msg/s,消费者接收的速度是1200msg/s,正常情况下,是没有性能问题的。这时如果停止消费者一段时间,让消息在队列中积压,然后在打开消费者。按理消费者的速度大于生产者速度,可以转发新消息,并把老消息也取走,最终队列又回到为空的状态。但实际情况则不是,队列中的消息会继续积压,而且会继续变多,而这时消费者的速度就不如之前的了。
RabbitMQ中的队列,在实现上又分为多个小的队列,每个队列里存储着不同状态的消息。当消息不积压时,消息由交换器到达队列,就会被直接发送给消费者。而当消息堆积时,由于占用较多内存,RabbitMQ会把消息放入更深层次的队列,例如将内存中的消息换出到磁盘上(不管消息是否持久化),而这些操作会消耗更多的CPU等系统资源,从而导致影响队列中消息的发送。
为了不使消息积压,可以采取两种方法:
停止向队列发送消息
停止发送消息,让系统资源都集中到向消费者发送消息,队列中的消息逐渐减少,队列最终会恢复至为空状态。
转移负载
有些时候不能停止生产者,这时可以改变绑定,让新消息发送到新的队列,新队列必须位于新的机器上。当然也需要新的消费者来连接。这样可以让老队列中的消息慢慢取走,也不影响新消息的发送。
默认的集群模式下,虽然消息可以发送到一台机器,然后从另一台机器取出,但是因为每台机器的queue实际上消息是本地存储,所以消息发到A的queue,从B中取,首先需要从A再次发送到B中,这样会导致取消息的效率不高。
如果使用镜像模式,A中的消息会同步到B中,消费者从B中取消息,消息是从本地取了,但是队列做镜像依然对性能影响很大,尤其是镜像的数目增加,性能会成倍下降。镜像队列优于普通模式的地方在于可靠性,普通模式中,A如果有故障,那么A中的消息就无法取出。镜像模式中,A有故障,消息依然可以从B中取出。
以下是我们生产环境的集群配置方案,因为对于吞吐量要求很高,单台RabbitMQ无法满足性能要求,所以选择使用cluster,而镜像模式对于性能影响很大,只能采取其他方案:假设3台RabbitMQ组成一个集群。然后建立多个queue,exchange使用direct类型,并绑定所有queue,routeKey为0到2(和MQ的数量一致)中随机发送。生产者发送消息到exchange,并路由到各个queue,消费者也有多个,同时从各个queue获取消息。生产者与消费者使用多channel提高速度,同时消费者使用异步接收方式。
使用多个队列,可以显著提高集群的吞吐量,每个队列要位于不同的物理机器上。考虑性能优先,也取消了消息持久化。但是在可靠性方面,如果某个队列不可用,那么发送给这个队列的消息就会被丢弃。为避免这种情况,采用备用绑定与备用队列的方式,即建立多个绑定,默认情况exchange通过routeKey 0,1,2绑定队列a,b,c(橙色线路) ,备用绑定是exchange通过routeKey 0,1,2 绑定队列d(紫色线路)。比如当队列a不可用时,默认的绑定routeKey为0的消息就无法发送到a队列,这时备用策略自动生效,routeKey为0的消息会被发送到队列d上(走紫色线路),routeKey为1和2的消息照常发到b和c(还是橙色线路)。这样就可以确保消息不丢失。若要进一步提高可靠性,降低备用队列的压力,可以建立多个备用队列,然后将绑定分散开来。
1百万条1K的消息
[1] http://www.rabbitmq.com/documentation.html
[2] http://www.infoq.com/cn/articles/AMQP-RabbitMQ#ftn.26
[3] http://langyu.iteye.com/blog/759663/
[4] http://mysql.taobao.org/index.php/Rabbitmq
[5] http://blog.163.com/clevertanglei900@126/blog/static/111352259201011121041853/
[6] http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/
[7] http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
[8] http://www.rabbitmq.com/blog/2011/10/27/performance-of-queues-when-less-is-more/
[9] http://www.rabbitmq.com/blog/2011/09/24/sizing-your-rabbits/
[10] http://www.oschina.net/news/17973/message-queue-shootout
转自:http://changmengnan.com/284.html
标签:
原文地址:http://my.oschina.net/u/1861837/blog/504009