一:rabbitmq的安装:
参考:http://www.blogjava.net/hellxoul/archive/2014/06/25/415135.html
http://blog.haohtml.com/archives/15249
说明:修改机器名字后再安装(为后面集群做准备)
vi /etc/sysconfig/network 修改名字
vi /etc/hosts 修改地址映射表,如192.168.1.112 rabbitmq-node1.com rabbitmq-node1 #做集群时设置
重启
1.安装erlang:
#rpm -Uvh http://mirrors.hustunique.com/epel/6/x86_64/epel-release-6-8.noarch.rpm
#yum install erlang
测试:erl:
io:format("hello").
2.安装rabbitmq:
上传rabbitmq-server-3.3.4-1.noarch.rpm
#yum install rabbitmq-server-3.3.4-1.noarch.rpm
3.安装完成
4.启动:
/etc/init.d/rabbitmq-server start
5.开启插件:
rabbitmq-plugins enable rabbitmq_management
允许远程访问:
vi /etc/rabbitmq.config
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, []}]}
].
关闭iptables:service iptables stop
chkconfig iptables off禁止开机启动
访问http://ip:15672 账号密码为guest
二:集群搭建:
参考:http://www.cnblogs.com/flat_peach/archive/2013/04/07/3004008.html
http://www.centoscn.com/CentosServer/cluster/2014/1216/4324.html
说明:3台机器,名为rabbitmq-node1,rabbitmq-node2,rabbitmq-node3
1.修改地址映射:
vi /etc/hosts 修改地址映射表,如192.168.1.112 rabbitmq-node1.com rabbitmq-node1
2.设置节点cookie:
先关闭rabbitmq-server:/etc/init.d/rabbitmq-server stop
cook存放在/var/lib/rabbitmq/.erlang.cookie文件,该文件权限为400,修改前需先修改为777,修改后再改为400
把其中一台的该文件内容复制到其他所有机器上,使集群中的所有机器cookie保持一致,否则不能通讯
chmod 777 /var/lib/rabbitmq/.erlang.cookie
复制好后启动rabbitmq:/etc/init.d/rabbitmq-server start
3.rabbitmq-node1,rabbitmq-node2作为内存节点,rabbitmq-node3作为磁盘节点,连接起来做集群:
rabbitmq-node1:
rabbitmqctl stop_app :停止应用
rabbitmqctl join_cluster --ram rabbit@rabbitmq-node3 :增加节点
rabbitmqctl start_app :开启应用
rabbitmqctl cluster_status :查看状态
rabbitmq-node2:
rabbitmqctl stop_app :停止应用
rabbitmqctl join_cluster --ram rabbit@rabbitmq-node3 :增加节点
rabbitmqctl start_app :开启应用
rabbitmqctl cluster_status :查看状态
rabbitmq-node3:
rabbitmqctl cluster_status :查看状态
如果要使rabbitmq-node1或rabbitmq-node2在集群里也是磁盘节点,join_cluster 命令去掉--ram参数即可: rabbitmqctl join_cluster rabbit@rabbitmq-node3
集群模式分为普通模式(上述模式)和镜像模式:
普通模式:消息实体只存在其中一个节点,当连接在其他节点的消费者获取消息时,其他节点先从该节点获取消息,再传给消费者,容易造成该节点瓶颈。
镜像模式:消息实体会主动在各节点之间同步,而不是在消费者获取时才同步。降低系统性能消耗带宽
在任意节点执行:rabbitmqctl set_policy ha-all "^" ‘{"ha-mode":"all"}‘,就成为镜像模式了
三:python操作rabbitmq:
参考: http://www.01happy.com/python-rabbitmq-work-queues/
http://www.01happy.com/python-rabbitmq-exchanges/
http://yidao620c.iteye.com/blog/1947338
1.发送消息:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) #连接服务器
channel = connection.channel() #创建channel
channel.queue_declare(queue = ‘hello‘) #声明消息队列,如果队列不存在将自动清除这些信息。本利中为hello
if len (sys.argv) < 2 :
print ‘message is empty!‘
sys.exit(0)
message = sys.argv[1]
channel.basic_publish(exchange = ‘‘, routing_key=‘hello‘, body = message) #发送消息到上面声明的队列中。exchange为交换器,能精确指定该发到哪个队列;route_key为队列的名称,body为消息体
print "[x] sent: ‘" + message + "‘\n"
connection.close() #关闭连接
2.接收消息:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters( ‘localhost‘ ))
channel = connection.channel()
channel.queue_declare(queue = ‘hello‘ )
print ‘[*] Waiting for messages. To exit press CTRL+C‘
def callback(ch, method, properties, body): #回调函数,处理接受的消息
print body
channel.basic_consume(callback, queue = ‘hello‘ , no_ack = True ) #告诉rabbitmq使用callback接收消息,no_ack=True 表示消费完了这个消息以后不主动把完成状态通知rabbitmq。
channel.start_consuming() #开始接收信息,进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出
可以开启多个接收实例,每个实例可以接收一部分消息,多个实例接收的消息的总和为发送的消息数。
##########################################################################################
上面的消息接收实例,有一个处理消息出现错误后,消息就将会丢掉,不会被其他的实例再处理。为了保证每一个消息都不会被丢掉,可以在处理时加消息确认,
即修改回调函数,修改如下:
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep(5)
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag) #增加确认
channel.basic_consume(callback, queue=‘hello‘, no_ack=False) #需要ack
##########################################################################################
消息持久化:
rabbit挂掉的话任务就会丢失,需要消息持久化
channel.queue_declare(queue=‘hello‘, durable=True) #增加durable=True,已存在的队列不行,会报错。需要重新创建队列
channel.basic_publish(exchange = ‘‘, routing_key=‘hello‘, body = message,properties=pika.BasicProperties(delivery_mode = 2,))
公平调度:
有的任务需要时间长,有的时间短,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
channel.basic_qos(prefetch_count=1)
发送任务源码:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘task_queue‘, durable=True)
message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
routing_key=‘task_queue‘,
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()
接收任务源码:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘task_queue‘, durable=True)
print ‘ [*] Waiting for messages. To exit press CTRL+C‘
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count(‘.‘) )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue=‘task_queue‘)
channel.start_consuming()
本文出自 “黑色时间” 博客,请务必保留此出处http://blacktime.blog.51cto.com/11722918/1795298
原文地址:http://blacktime.blog.51cto.com/11722918/1795298