RabbitMQ 消息队列 消息的传递
安装 http://www.rabbitmq.com/install-standalone-mac.html
如果是在windows上安装还要安装erlang语言
安装python RabbitMQ
pip install pika or easy_install pika or 源码 https://pypi.python.org/pypi/pika
实现最简单的队列通信
http://www.rabbitmq.com/getstarted.html
producer(生产者)
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #建立一个socket channel = connection.channel()#建立一个管道 channel.queue_declare(queue="hello")#声明queue channel.basic_publish(exchange="",routing_key="hello",body="Hello world") #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. print("produce send to consume") connection.close()#关闭
consumer(消费者)
import pika connetion = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connetion.channel() channel.queue_declare(queue="hello") #You may ask why we declare the queue again ? we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good # practice to repeat declaring the queue in both programs. def callback(ch,method,properties,body): print(ch,method,properties) print(body) channel.basic_consume(callback,queue="hello",no_ack=True) print("waiting for messages. To exit press ctrl+c") channel.start_consuming()
消息分发轮询
这是一个一对多的情况,一个生产者对应多个消费者
在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多
producer
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #建立一个socket channel = connection.channel()#建立一个管道 channel.queue_declare(queue="hello")#声明queue channel.basic_publish(exchange="",routing_key="hello",body="Hello world") #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. print("produce send to consume") connection.close()#关闭
consumer
import pika
connetion = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connetion.channel()
channel.queue_declare(queue="hello")
#You may ask why we declare the queue again ? we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good
# practice to repeat declaring the queue in both programs.
def callback(ch,method,properties,body):
print(ch,method,properties)
print(body)
ch.basic_ack(delivery_tag = method.delivery_tag) #手动确认消息完毕
channel.basic_consume(callback,queue="hello") #这里主要把no_ack去掉
print("waiting for messages. To exit press ctrl+c")
channel.start_consuming()
先启动生产者,然后启动3个消费者,生产者多发几条消息,你会发先消息依次被消费者接收。
如果生产者在发送数据时,突然消费者断开,怎么保障数据的不丢失?
去除消费者中的no_ack,如果生产者正在发送,突然消费者断开,那么第一个消费者没接收完,转到第2个消费者接收,再断开,转到第3个消费者,以此类推。。
消息持久化
生产者如果在发送数据时突然断开,就会导致消息和消息队列丢失,怎么才能保障生产者在断开时,消息和消息队列不丢失呢?
channel.queue_declare(queue=‘hello‘, durable=True) #保证消息队列不丢失
channel.basic_publish(exchange=‘‘, routing_key="hello", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent 保证消息不丢失 ))
消息的公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
消息持久化+消息的公平分发的完整代码
producer
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #建立一个socket
channel = connection.channel()#建立一个管道
channel.queue_declare(queue="hello1",
durable=True
)#声明queue
channel.basic_publish(exchange="",
routing_key="hello1",
body="Hello world",
properties=pika.BasicProperties
(delivery_mode = 2,) # make message persistent
)
print("produce send to consume")
connection.close()#关闭
consumer
import pika connetion = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connetion.channel() channel.queue_declare(queue="hello1", durable=True) def callback(ch,method,properties,body): print(ch,method,properties) print(body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) #消息公平分发主要是添加prefech_count=1 channel.basic_consume( callback, queue="hello1" ) print("waiting for messages. To exit press ctrl+c") channel.start_consuming()
消息发布和订阅
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
headers: 通过headers 来决定把消息发给哪些queue
fanout 广播式接收消息
fanout_produce
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.exchange_declare(exchange="logs",exchange_type="fanout") channel.basic_publish(exchange="logs", routing_key="", body="hello world!555" ) print("[x] Sent hello world") connection.close()
fanout_consume
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.exchange_declare(exchange="logs",exchange_type="fanout") result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queuename = result.method.queue channel.queue_bind(exchange="logs",queue=queuename) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch,method,properties,body): print(body) channel.basic_consume( callback, queue=queuename, no_ack=True ) channel.start_consuming()
direct有选择的接收消息
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
direct_producer
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs‘, exchange_type="direct") severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ channel.basic_publish(exchange=‘direct_logs‘, routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
direct_consumer
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs‘, exchange_type="direct") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=severity) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
topic更加细致的接收消息
To receive all the logs run:
python receive_logs_topic.py "#"
To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*"
Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical"
You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"
And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error"
topic_producer
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, exchange_type="topic") severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ channel.basic_publish(exchange=‘topic_logs‘, routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
topic_consumer
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, exchange_type="topic") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange=‘topic_logs‘, queue=queue_name, routing_key=severity) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
Remote procedure call (RPC) 远程过程调用
RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。
为什么RPC呢?就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成的需求,比如不同的系统间的通讯,甚至不同的组织间的通讯。由于计算能力需要横向扩展,需要在多台机器组成的集群上部署应用,
RPC的协议有很多,比如最早的CORBA,Java RMI,Web Service的RPC风格,Hessian,Thrift,甚至Rest API。
RPC的处理流程:
- 当客户端启动时,创建一个匿名的回调队列。
- 客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。
- 请求被发送到rpc_queue队列中。
- RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
- 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了。
rpc_server
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘rpc_queue‘) def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange=‘‘, routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id= props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue=‘rpc_queue‘) print(" [x] Awaiting RPC requests") channel.start_consuming()
rpc_client
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange=‘‘, routing_key=‘rpc_queue‘, properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(10)") response = fibonacci_rpc.call(10) print(" [.] Got %r" % response)