码迷,mamicode.com
首页 > 其他好文 > 详细

rabbitmq集群搭建(centos6.5)

时间:2016-07-03 23:50:12      阅读:443      评论:0      收藏:0      [点我收藏+]

标签:centos   rabbitmq   

一:rabbitmq的安装:

参考:http://www.blogjava.net/hellxoul/archive/2014/06/25/415135.html

      http://blog.haohtml.com/archives/15249

  

说明:修改机器名字后再安装(为后面集群做准备)

vi /etc/sysconfig/network 修改名字

vi /etc/hosts 修改地址映射表,如192.168.1.112   rabbitmq-node1.com rabbitmq-node1 #做集群时设置

重启

1.安装erlang:

#rpm -Uvh http://mirrors.hustunique.com/epel/6/x86_64/epel-release-6-8.noarch.rpm

#yum install erlang

测试:erl:

io:format("hello").

2.安装rabbitmq:

上传rabbitmq-server-3.3.4-1.noarch.rpm

#yum install rabbitmq-server-3.3.4-1.noarch.rpm

3.安装完成

4.启动:

/etc/init.d/rabbitmq-server start

5.开启插件:

rabbitmq-plugins enable rabbitmq_management

允许远程访问:

vi /etc/rabbitmq.config

[

{rabbit, [{tcp_listeners, [5672]}, {loopback_users, []}]}

].

        关闭iptables:service iptables stop 

              chkconfig iptables off禁止开机启动

访问http://ip:15672 账号密码为guest

二:集群搭建

参考:http://www.cnblogs.com/flat_peach/archive/2013/04/07/3004008.html

      http://www.centoscn.com/CentosServer/cluster/2014/1216/4324.html

  

说明:3台机器,名为rabbitmq-node1,rabbitmq-node2,rabbitmq-node3

1.修改地址映射:

vi /etc/hosts 修改地址映射表,如192.168.1.112   rabbitmq-node1.com rabbitmq-node1

2.设置节点cookie:

先关闭rabbitmq-server:/etc/init.d/rabbitmq-server stop

cook存放在/var/lib/rabbitmq/.erlang.cookie文件,该文件权限为400,修改前需先修改为777,修改后再改为400

把其中一台的该文件内容复制到其他所有机器上,使集群中的所有机器cookie保持一致,否则不能通讯

chmod 777  /var/lib/rabbitmq/.erlang.cookie

复制好后启动rabbitmq:/etc/init.d/rabbitmq-server start

3.rabbitmq-node1,rabbitmq-node2作为内存节点,rabbitmq-node3作为磁盘节点,连接起来做集群:

rabbitmq-node1:

rabbitmqctl stop_app :停止应用

rabbitmqctl join_cluster --ram rabbit@rabbitmq-node3 :增加节点

rabbitmqctl start_app :开启应用

rabbitmqctl cluster_status :查看状态

rabbitmq-node2:

rabbitmqctl stop_app :停止应用

rabbitmqctl join_cluster --ram rabbit@rabbitmq-node3 :增加节点

rabbitmqctl start_app :开启应用

rabbitmqctl cluster_status :查看状态

rabbitmq-node3:

rabbitmqctl cluster_status :查看状态

如果要使rabbitmq-node1或rabbitmq-node2在集群里也是磁盘节点,join_cluster 命令去掉--ram参数即可: rabbitmqctl join_cluster rabbit@rabbitmq-node3

集群模式分为普通模式(上述模式)和镜像模式:

普通模式:消息实体只存在其中一个节点,当连接在其他节点的消费者获取消息时,其他节点先从该节点获取消息,再传给消费者,容易造成该节点瓶颈。

镜像模式:消息实体会主动在各节点之间同步,而不是在消费者获取时才同步。降低系统性能消耗带宽

  在任意节点执行:rabbitmqctl set_policy ha-all "^" ‘{"ha-mode":"all"}‘,就成为镜像模式了

三:python操作rabbitmq:

参考:  http://www.01happy.com/python-rabbitmq-work-queues/

http://www.01happy.com/python-rabbitmq-exchanges/

http://yidao620c.iteye.com/blog/1947338

1.发送消息:

#!/usr/bin/env python

# -*- coding: utf-8 -*-

import sys

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))   #连接服务器

channel = connection.channel()    #创建channel

channel.queue_declare(queue = ‘hello‘)    #声明消息队列,如果队列不存在将自动清除这些信息。本利中为hello

if len (sys.argv) < 2 :

print ‘message is empty!‘

sys.exit(0)

message = sys.argv[1]

channel.basic_publish(exchange = ‘‘, routing_key=‘hello‘, body = message)    #发送消息到上面声明的队列中。exchange为交换器,能精确指定该发到哪个队列;route_key为队列的名称,body为消息体

print "[x] sent: ‘" + message + "‘\n"

connection.close()    #关闭连接

2.接收消息:

#!/usr/bin/env python

# -*- coding: utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters( ‘localhost‘ ))

channel = connection.channel()

channel.queue_declare(queue = ‘hello‘ )

print ‘[*] Waiting for messages. To exit press CTRL+C‘

 

def callback(ch, method, properties, body):    #回调函数,处理接受的消息

print body

 

channel.basic_consume(callback, queue = ‘hello‘ , no_ack = True )    #告诉rabbitmq使用callback接收消息,no_ack=True 表示消费完了这个消息以后不主动把完成状态通知rabbitmq。

channel.start_consuming()    #开始接收信息,进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出

可以开启多个接收实例,每个实例可以接收一部分消息,多个实例接收的消息的总和为发送的消息数。

##########################################################################################

上面的消息接收实例,有一个处理消息出现错误后,消息就将会丢掉,不会被其他的实例再处理。为了保证每一个消息都不会被丢掉,可以在处理时加消息确认,

即修改回调函数,修改如下:

def callback(ch, method, properties, body):

print " [x] Received %r" % (body,)

time.sleep(5)

print " [x] Done"

ch.basic_ack(delivery_tag = method.delivery_tag)  #增加确认

channel.basic_consume(callback, queue=‘hello‘, no_ack=False) #需要ack

##########################################################################################

消息持久化:

rabbit挂掉的话任务就会丢失,需要消息持久化

channel.queue_declare(queue=‘hello‘, durable=True)    #增加durable=True,已存在的队列不行,会报错。需要重新创建队列

channel.basic_publish(exchange = ‘‘, routing_key=‘hello‘, body = message,properties=pika.BasicProperties(delivery_mode = 2,))

公平调度:

有的任务需要时间长,有的时间短,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)

发送任务源码:

#!/usr/bin/env python

import pika

import sys

 

connection = pika.BlockingConnection(pika.ConnectionParameters(

host=‘localhost‘))

channel = connection.channel()

 

channel.queue_declare(queue=‘task_queue‘, durable=True)

 

message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange=‘‘,

  routing_key=‘task_queue‘,

  body=message,

  properties=pika.BasicProperties(

 delivery_mode = 2, # make message persistent

  ))

print " [x] Sent %r" % (message,)

connection.close()

接收任务源码:

#!/usr/bin/env python

import pika

import time

 

connection = pika.BlockingConnection(pika.ConnectionParameters(

host=‘localhost‘))

channel = connection.channel()

 

channel.queue_declare(queue=‘task_queue‘, durable=True)

print ‘ [*] Waiting for messages. To exit press CTRL+C‘

 

def callback(ch, method, properties, body):

print " [x] Received %r" % (body,)

time.sleep( body.count(‘.‘) )

print " [x] Done"

ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,

  queue=‘task_queue‘)

 

channel.start_consuming()


本文出自 “黑色时间” 博客,请务必保留此出处http://blacktime.blog.51cto.com/11722918/1795298

rabbitmq集群搭建(centos6.5)

标签:centos   rabbitmq   

原文地址:http://blacktime.blog.51cto.com/11722918/1795298

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!