标签:
RabbitMQ使用Work Queues的主要目的是为了避免资源使用密集的任务,它不同于定时任务处理的方式,而是把任务封装为消息添加到队列中。而消息队列正是共享于多个工作者中使用,它们可以随意pop出数据进行处理。
为了保证`rabbitmq`意外重启等原因造成的消息丢失,通过设置消息的durable来实现数据的持久化,但是需要生产者和消费者同时设置持久化才能生效。
需要注意的是,`rabbitmq`并不允许更改已经创建的消息队列的属性,假如之前已经创建过非持久化的hello消息队列,那么会返回一个错误信息。
设置消息队列的可持久化属性(第二个参数):
channel.queue_declare(queue=‘hello‘, durable=True)
在消息发送时,需要指定`delivery_mode`来实现消息持久化:
channel.basic_publish(exchange=‘‘, routing_key="task_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))
`rabbitmq`实现了消息均分的功能,通过设置`basic.qos`方法的`prefetch_count`来实现。它会告诉`rabbitmq`的生产者不要给一个消费者分配过多的任务,也就是说不要在消费者处理完成已经接收到的任务之前分配新的任务。
channel.basic_qos(prefetch_count=1)
其中prefetch_count为可以接受处理的任务个数,如果未达到上限rabbitmq会继续向消费者推送任务。
#!/usr/bin/env python # coding=utf-8 import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘task_queue‘, durable=True) for i in range(100): message = str(i) + ‘ Hello World!‘ channel.basic_publish(exchange=‘‘, routing_key=‘task_queue‘, body=message, properties=pika.BasicProperties(delivery_mode = 2, # make message persistent)) print " [x] Sent %r" % (message,) time.sleep(1) connection.close()
#!/usr/bin/env python # coding=utf-8 import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘task_queue‘, durable=True) print ‘ [*] Waiting for messages. To exit press CTRL+C‘ def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(2) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=‘task_queue‘) channel.start_consuming()
标签:
原文地址:http://www.cnblogs.com/coder2012/p/4339565.html