标签:
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ安装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
安装配置epel源 安装erlang # tar -xzvf otp_src_R15B03-1.tar.gz # ./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac # make && make install 配置erlang环境: #vim /etc/profile export PATH = $PATH: / usr / local / erlang / bin #source profile 安装RabbitMQ # rpm -i --nodeps rabbitmq-server-3.2.0-1.noarch.rpm |
安装python API模块
1
2
3
4
5
6
7
|
pip install pika or easy_install pika or 源码 https: / / pypi.python.org / pypi / pika windows:
python setup.py install
|
import Queue
import threading
message = Queue.Queue(10)
def producer(i):
while True:
message.put(i)
def consumer(i):
while True:
msg = message.get()
for i in range(12):
t = threading.Thread(target=producer, args=(i,)) #生产者
t.start()
for i in range(10):
t = threading.Thread(target=consumer, args=(i,)) #消费者
t.start()
生产者:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) #连接RabbitMQ服务器 channel = connection.channel() channel.queue_declare(queue=‘yangMQ‘) #声明一个队列,队列名为 yangMQ channel.basic_publish(exchange=‘‘, #往队列中插入数据 routing_key=‘yangMQ‘, body=‘Hello World!‘) #键值对应的内容 print(" [x] Sent ‘Hello World!‘") connection.close()
消费者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) #连接RabbitMQ服务器 channel = connection.channel() channel.queue_declare(queue=‘yangMQ‘) #声明队列 def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue=‘yangMQ‘, no_ack=True) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
1、acknowledgment 消息不丢失
no-ack = False,如果生产者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
import pika # ######################### 消费者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) channel = connection.channel() channel.queue_declare(queue=‘yangMQ‘) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) #暂停10秒,模拟异常中断,中断后不会发送ack应答,下次执行会再次接收 print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) #ack应答后,表示接收成功 channel.basic_consume(callback, queue=‘yangMQ‘, no_ack=False) #表示启用ack应答功能 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
import pika # ######################### 生产者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) channel = connection.channel() channel.queue_declare(queue=‘yangMQ2‘, durable=True) #定义durable channel.basic_publish(exchange=‘‘, routing_key=‘yangMQ2‘, body=‘Hello World2!‘, properties=pika.BasicProperties( delivery_mode=2, #消息不间断 )) print(" [x] Sent ‘Hello World2!‘") connection.close() import pika
# ######################### 消费者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) channel = connection.channel() channel.queue_declare(queue=‘yangMQ2‘, durable=True) #定义durable def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue=‘yangMQ2‘, no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者2去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
import pika # ######################### 消费者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) channel = connection.channel() channel.queue_declare(queue=‘yangMQ2‘, durable=True) #定义durable def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) #不按奇偶顺序 channel.basic_consume(callback, queue=‘yangMQ2‘, no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout
发布者
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) channel = connection.channel() channel.exchange_declare(exchange=‘mingwei‘, type=‘fanout‘) message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange=‘mingwei‘, routing_key=‘‘, body=message) print(" [x] Sent %r" % message) connection.close()
订阅者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) channel = connection.channel() channel.exchange_declare(exchange=‘mingwei‘, type=‘fanout‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=‘mingwei‘, queue=queue_name) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
import pika import sys # ######################### 发布者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘) message = "Hello World!" channel.basic_publish(exchange=‘direct_logs‘, routing_key=‘shanghai‘, #只给关键字有shanghai的订阅者发消息 body=message) print(" [x] Sent %r" % message) connection.close() # ######################### 订阅者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=‘shanghai‘) #关键字上海 channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=‘beijing‘) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
1
2
3
|
发送者路由值 队列中 old.boy.python old. * - - 不匹配 old.boy.python old. # -- 匹配 |
import pika import sys # ######################### 发布者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) message = "Hello World!" channel.basic_publish(exchange=‘topic_logs‘, routing_key=‘yangmv.com.zx‘, #模糊匹配2位 body=message) print(" [x] Sent %r" % message) connection.close() import pika import sys # ######################### 订阅者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.50.42‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=‘topic_logs‘, queue=queue_name, routing_key=‘yangmv.#‘) # #匹配0个或者多个,*只匹配1个 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
标签:
原文地址:http://www.cnblogs.com/yangmv/p/5201937.html