标签:
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP,即Advanced message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ Server: 也叫broker server,是一种传输服务,负责维护一条从Producer到consumer的路线,保证数据能够按照指定的方式进行传输。
Producer,数据的发送方。
Consumer,数据的接收方。
Exchanges 接收消息,转发消息到绑定的队列。主要使用3种类型:direct, topic, fanout。
Queue RabbitMQ内部存储消息的对象。相同属性的queue可以重复定义,但只有第一次定义的有效。
Bindings 绑定Exchanges和Queue之间的路由。
Connection: 就是一个TCP的连接。Producer和consumer都是通过TCP连接到RabbitMQ Server的。
Channel:虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
Ubuntu 安装
编辑 /etc/apt/sources.list 文件添加如下行
deb http://www.rabbitmq.com/debian/ testing main
执行更新软件源命令
#apt-get update.
安装rabbitmq
#apt-get install rabbitmq-server
启动rabbitmq
# service rabbitmq-server start
Centos 安装
从官网下载erlang和rabbitmq 的rpm包
安装erlang
#rpm –i erlang-17.4-1.el6.x86_64.rpm
安装rabbitmq
#rpm –i rabbitmq-server-3.5.3-1.noarch.rpm
rabbitmq的rpm安装包不能指定安装路径
RABBITMQ配置文件位置:
启用web插件
#rabbitmq-plugins enable rabbitmq_management #启用
关闭web插件
# rabbitmq-plugins disable rabbitmq_management
可以用默认账号guest,guest登陆http://主机IP:15672,如果要远程登录,需要先创建帐户,可查看下一节。
如果是集群的话,只要在一台主机设置即可,其它会自动同步。
#rabbitmqctl add_user iom 123456 –iom为新建的用户,123456为密码
#rabbitmqctl set_user_tags iom administrator –将用户设置为管理员角色
#rabbitmqctl set_permissions -p / iom “.*” “.*” “.*”
–在 / 虚拟主机里设置iom用户配置权限,写权限,读权限。.*是正则表达式里用法。rabbitmq的权限是根据不同的虚拟主机(virtual hosts)配置的,同用户在不同的虚拟主机(virtual hosts)里可能不一样。
如果采用标准的 AMQP 协议,则唯一能够保证消息不会丢失的方式是利用事务机制 — 令 channel 处于 transactional 模式、向其 publish 消息、执行 commit 动作。在这种方式下,事务机制会带来大量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。
confirm 机制是在channel上使用 confirm.select方法,处于 transactional 模式的 channel 不能再被设置成 confirm 模式,反之亦然。
在 channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被 nack 一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm 又被 nack 。
RabbitMQ 将在下面的情况中对消息进行 confirm :
RabbitMQ发现当前消息无法被路由到指定的 queues 中;
非持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中);
持久消息到达了其所应该到达的所有 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync);
持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 acknowledge)。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。
如果没启动消息确认机制,RabbitMQ在consumer收到消息后就会把消息删除。
启用消息确认后,consumer在处理数据后应通过回调函数显示发送ack, RabbitMQ收到ack后才会删掉数据。如果consumer一段时间内不回馈,RabbitMQ会将该消息重新分配给另外一个绑定在该队列上的consumer。另一种情况是consumer断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。
注意:如果consumer 没调用basic.qos 方法设置prefetch_count=1,那即使该consumer有未ack的messages,RabbitMQ仍会继续发messages给它。
消息确认机制确保了consumer退出时消息不会丢失,但如果是RabbitMQ本身因故障退出,消息还是会丢失。为了保证在RabbitMQ出现意外情况时数据仍没有丢失,需要将queue和message都要持久化。
queue持久化:channel.queue_declare(queue=’hello’, durable=True)
message持久化:channel.basic_publish(exchange=”,
routing_key=”task_queue”,
body=message,
properties=pika.BasicProperties(
delivery_mode = 2,) #消息持久化
)
即使有消息持久化,数据也有可能丢失,因为rabbitmq是先将数据缓存起来,到一定条件才保存到硬盘上,这期间rabbitmq出现意外数据有可能丢失。
网上有测试表明:持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。
一个RABBITMQ集 群中可以共享user,virtualhosts,queues(开启Highly Available Queues),exchanges等。但message只会在创建的节点上传输。当message进入A节点的queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。
RABBITMQ的集群节点包括内存节点、磁盘节点。内存节点的元数据仅放在内存中,性能比磁盘节点会有所提升。不过,如果在投递message时,打开了message的持久化,那么内存节点的性能只能体现在资源管理上,比如增加或删除队列(queue),虚拟主机(vrtual hosts),交换机(exchange)等,发送和接受message速度同磁盘节点一样。一个集群至少要有一个磁盘节点。
环境:有三台主机,主机名和IP如下,rabbitmq的执行用户为rabbitmq,所属组为rabbitmq。
主机名 IP
rabbitmq1 192.168.10.2
rabbitmq2 192.168.10.3
rabbitmq3 192.168.10.4
杀掉rabbitmq2和rabbitmq3的rabbitmq进程:
#ps –ef|grep rab|awk ‘{print $2}’|xargs kill -9。–用service rabbitmq-servier stop停会有遗留进程。
登陆rabbitmq1(rabbitmq1上的rabbitmq服务不能关),执行
#cd /var/lib/rabbitmq –进入erlang.cookie所在目录,只有ls –al能看见此文件
#chmod 777 .erlang* –该文件默认为400权限,为方便传输,先修改权限,非必须操作
#scp .erlang.cookie rabbitmq@192.168.10.3:/var/lib/rabbitmq –将此文件传给另外两条主机
#scp .erlang.cookie rabbitmq@192.168.10.4:/var/lib/rabbitmq
#chmod 400 .er* –恢复文件权限
分别在rabbitmq2和rabbitmq3 上执行
#chown rabbitmq:rabbitmq .er* –修改文件所属用户和所属组
#chmod 400 .er* –修改文件权限
#service rabbitmq-server start
查询rabbitmq1节点名称
#rabbitmqctl cluster_status
Cluster status of node rabbit@rabbitmq1 …
[{nodes,[{disc,[rabbit@rabbitmq1]}]},{running_nodes,[rabbit@ rabbitmq1]}]
…done.
rabbitmq2 加入rabbitmq1 节点.
# rabbitmqctl stop_app –关掉rabbitmq2服务
# rabbitmqctl join_cluster rabbit@rabbitmq1 — rabbitmq2加入rabbitmq1, rabbitmq2必须能通过rabbitmq1的主机名ping通rabbitmq1。
# rabbitmqctl start_app –启动rabbitmq2服务
查看集群信息
# rabbitmqctl cluster_status –此时里面就应该能看见两个节点。集群名字为rabbit@rabbitmq。
用相同的方法把rabbitmq3也加入rabbitmq1。
#rabbitmqctl stop_app –停止rabbitmq服务
#rabbitmqctl change_cluster_node_type disc/ram –更改节点为磁盘或内存节点
#rabbitmqctl start_app –开启rabbitmq服务
#rabbitmqctl cluster_status
[{nodes,[{disc,[rabbit@rabbitmq1,rabbit@rabbitmq2,rabbit@rabbitmq3]}]}, {running_nodes,[rabbit@rabbitmq1,rabbit@rabbitmq2, rabbit@rabbitmq3]}]…done.
–第一行是集群中的节点成员,disc表示这些都是磁盘节点。
–第二行是正在运行的节点成员
假设要把rabbitmq2退出集群
在rabbitmq2上执行
#rabbitmqctl stop_app
#rabbitmqctl reset
#rabbitmqctl start_app
在集群主节点上执行
# rabbitmqctl forget_cluster_node rabbit@rabbitmq2
集群重启时,最后一个挂掉的节点应该第一个重启,如果因特殊原因(比如同时断电),而不知道哪个节点最后一个挂掉。可用以下方法重启:
先在一个节点上执行
#rabbitmqctl force_boot
#service rabbitmq-server start
在其他节点上执行
#service rabbitmq-server start
查看cluster状态是否正常(要在所有节点上查询)。
#rabbitmqctl cluster_status
如果有节点没加入集群,可以先退出集群,然后再重新加入集群。
上述方法不适合内存节点重启,内存节点重启的时候是会去磁盘节点同步数据,如果磁盘节点没起来,内存节点一直失败。
镜像队列可以同步queue和message,当主queue挂掉,从queue中会有一个变为主queue来接替工作。
镜像队列是基于普通的集群模式的,所以你还是得先配置普通集群,然后才能设置镜像队列。
镜像队列设置后,会分一个主节点和多个从节点,如果主节点宕机,从节点会有一个选为主节点,原先的主节点起来后会变为从节点。
queue和message虽然会存在所有镜像队列中,但客户端读取时不论物理面连接的主节点还是从节点,都是从主节点读取数据,然后主节点再将queue和message的状态同步给从节点,因此多个客户端连接不同的镜像队列不会产生同一message被多次接受的情况。
沿用3.2的环境,现在我们把名为“hello”的队列设置为同步给所有节点
#rabbitmqctl set_policy ha-all “hello” ‘{“ha-mode”:”all”}’
ha-all 是同步模式,指同步给所有节点,还有另外两种模式ha-exactly表示在指定个数的节点上进行镜像,节点的个数由ha-params指定,ha-nodes表示在指定的节点上进行镜像,节点名称通过ha-params指定;
hello 是同步的队列名,可以用正则表达式匹配;
{“ha-mode”:”all”} 表示同步给所有,同步模式的不同,此参数也不同。
执行上面命令后,可以在web管理界面查看queue 页面,里面hello队列的node节点后会出现+2标签,表示有2个从节点,而主节点则是当前显示的node(xf7021是测试用的名字,按4-2应该为rabbitmq(1-3))。
红字为手工加的备注,原文件里没有。
vi /etc/keepalived/keepalived.cnf文件
global_defs {
router_id LVS_MASTER }
vrrp_instance VI_1 {
state MASTER
interface eth0
virtual_router_id 51
priority 100
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
virtual_ipaddress {
192.168.10.251/24 #rabbitmq
}
}
virtual_server 192.168.10.251 5672 {
delay_loop 6
lb_algo rr
lb_kind DR
protocol TCP
real_server 192.168.10.3 5672 {
weight 3
TCP_CHECK {
connect_timeout 3
nb_get_retry 3
delay_before_retry 3
connect_port 5672
}
}
real_server 192.168.10.4 5672 {
weight 3
TCP_CHECK {
connect_timeout 3
nb_get_retry 3
delay_before_retry 3
connect_port 5672
}
}
}
lvs_rabbitmq.sh脚本内容:
#!/bin/bash
VIP=192.168.10.251
case “$1” in
start)
ifconfig lo:0 $VIP netmask 255.255.255.255 broadcast $VIP
/sbin/route add -host $VIP dev lo:0
echo “1” >/proc/sys/net/ipv4/conf/lo/arp_ignore
echo “2” >/proc/sys/net/ipv4/conf/lo/arp_announce
echo “1” >/proc/sys/net/ipv4/conf/all/arp_ignore
echo “2” >/proc/sys/net/ipv4/conf/all/arp_announce
sysctl -p >/dev/null 2>&1
echo “lvs_vip server start ok!”;;
stop)
ifconfig lo:0 down
/sbin/route del $VIP >/dev/null 2>&1
echo “0” >/proc/sys/net/ipv4/conf/lo/arp_ignore
echo “0” >/proc/sys/net/ipv4/conf/lo/arp_announce
echo “0” >/proc/sys/net/ipv4/conf/all/arp_ignore
echo “0” >/proc/sys/net/ipv4/conf/all/arp_announce
echo “lvs_vip server stoped.”;;
*)
echo “arg start|stop.”
exit 1
esac
exit 0
RabbitMQ监控项目很多,可通过web管理界面监控。
OVERVIEW页面下有4个标签。主要关注totals和nodes两个。
ready为待处理消息量,total为总消息量。
publish为每秒发送消息量,deliver为每秒接受消息量。
下面5个灰色长方块分别代表对应的模块连接数。
name为节点名称,后面5个蓝色方块分别代表文件打开数,socket连接数,erlang processes(暂时未知),内存占用两,磁盘空余量;info里显示节点属性,将鼠标放在内容上会显示对应的统计内容。
主机名 IP
VIP: 192.168.10.251
client 192.168.10.2 –本测试发送(producer)和接收(consumer)在同一台机器
rabbitmq1 192.168.10.3
rabbitmq2 192.168.10.4
负载机启动keepalived
# service keepalived start
rabbitmq1和rabbitmq2执行5-2的脚本
#./lvs_rabbitmq.sh start
按第3和第4章的方法组建集群,配置镜像队列,节点类型最好都设置为磁盘节点。
按第1-5创建用户。
测试用RabbitMQ的python语言客户端,注意python是靠缩进量来区分语句块。红色部分为注释,源码上没有。
发送源码:
#vi send.py
#!/usr/bin/env python
import pika
import time
credentials=pika.PlainCredentials(‘iom’,’123456′) –配置连接的用户名和密码
parameters=pika.ConnectionParameters(‘192.168.10.251′,5672,’/’,credentials)
connection=pika.BlockingConnection(parameters)
channel=connection.channel()
channel.queue_declare(queue=’hello’)
count=0
while count<9999:
message=’Hello World’+str(count)
count=count+1
channel.basic_publish(exchange=”,routing_key=’hello’,body=message)
print “Sent %s” %(message)
time.sleep(1)
connection.close()
接收源码
#vi receive.py
#!/usr/bin/env python
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host=’xf7027′))
channel=connection.channel()
channel.queue_declare(queue=’hello’)
print ‘[*] Waiting for message.To exit press CTRL+C’
def callback(ch,method,properties,body):
print “[x] Received %r” %(body,)
channel.basic_consume(callback,queue=’hello’,no_ack=True)
channel.start_consuming()
在客户机上执行
#python send.py
在负载机上执行
#watch ipvsadm –Ln
可以看到rabbitmq1或rabbitmq2的activeconn列数值为1。
客户机重新执行发送程序
#python send.py
在负载机上可以看到另一个rabbitmq服务的activeconn 列数值也变为1。
测试容灾性:
在客户机上分别执行发送和接受程序。
#python send.py
#python receive.py
然后关掉一个rabbitmq节点,如果关掉的正好是客户机连的那个节点的话,客户机发送和接收程序会报错退出(程序本身如果有错误重发机制则不受任何影响)。如果关掉的是另外的节点,程序不受任何影响。
标签:
原文地址:http://www.cnblogs.com/grimm/p/5771038.html