标签:size 文章 headers mono als outer ble interrupt path
class kombu.Producer(channel, exchange=None, routing_key=None,
serializer=None, auto_declare=None,
compression=None, on_return=None)
# 发布消息
.publish(body, routing_key=None, delivery_mode=None, mandatory=False,
immediate=False, priority=0, content_type=None, content_encoding=None,
serializer=None, headers=None, compression=None, exchange=None,
retry=False, retry_policy=None, declare=None, expiration=None, **properties)
class kombu.Consumer(channel, queues=None, no_ack=None,
auto_declare=None, callbacks=None, on_decode_error=None,
on_message=None, accept=None, prefetch_count=None, tag_prefix=None)
# 消费
.consume(no_ack=None)
from kombu import Exchange, Queue, Connection, Consumer, Producer
task_queue = Queue(‘tasks‘, exchange=Exchange(‘tasks‘, type=‘direct‘), routing_key=‘tasks‘)
# 生产者
with Connection(‘amqp://guest@localhost:5672//‘) as conn:
with conn.channel() as channel:
producer = Producer(channel)
producer.publish({‘hello‘: ‘world‘},
retry=True,
exchange=task_queue.exchange,
routing_key=task_queue.routing_key,
declare=[task_queue])
def get_message(body, message):
print(body)
# message.ack()
# 消费者
with Connection(‘amqp://guest@localhost:5672//‘) as conn:
with conn.channel() as channel:
consumer = Consumer(channel, queues=task_queue, callbacks=[get_message,], prefetch_count=10)
consumer.consume(no_ack=True)
我们先创建相关的exchange和queue,queues.py文件如下:
from kombu import Exchange, Queue
task_exchange = Exchange(‘tasks‘, type=‘direct‘)
task_queues = [Queue(‘high‘, exchange=task_exchange, routing_key=‘high‘),
Queue(‘middle‘, exchange=task_exchange, routing_key=‘middle‘),
Queue(‘low‘, exchange=task_exchange, routing_key=‘low‘)]
from kombu.mixins import ConsumerMixin
from queues import task_queues
# 消费者
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
consumer = Consumer(queues=task_queues, callbacks=[self.process_task], accept=[‘text/plain‘, ‘json‘, ‘pickle‘])
consumer.qos(prefetch_count=10) # 最多一下子获取10个任务
return [consumer]
def process_task(self, body, message):
fun = body[‘fun‘]; args = body[‘args‘]; kwargs = body[‘kwargs‘]
try:
fun(*args, **kwargs)
except Exception as exc:
print(exc)
message.requeue()
else:
message.ack()
if __name__ == ‘__main__‘:
from kombu import Connection
with Connection(‘amqp://guest@localhost:5672//‘) as conn:
try:
worker = Worker(conn)
worker.run()
except KeyboardInterrupt:
print(‘bye bye‘)
def hello_task(who=‘world‘):
import time
print(‘wait one second‘)
time.sleep(1)
print(‘Hello {}‘.format(who))
from kombu.pools import producers
from queues import task_exchange
routing_keys = {
‘high‘: ‘high‘,
‘middle‘: ‘middle‘,
‘low‘: ‘low‘
}
# 将消息序列化后发送到队列中
def send_as_task(connection, fun, key=‘middle‘, args=(), kwargs={}):
payload = {‘fun‘: fun, ‘args‘: args, ‘kwargs‘: kwargs}
routing_key = routing_keys[key]
with producers[connection].acquire(block=True) as producer:
producer.publish(payload, serializer=‘pickle‘, exchange=task_exchange,
routing_key=routing_key, declare=[task_exchange])
if __name__ == ‘__main__‘:
from kombu import Connection
from tasks import hello_task
with Connection(‘amqp://guest@localhost:5672//‘) as conn:
send_as_task(conn, fun=hello_task, args=(‘wang‘,))
标签:size 文章 headers mono als outer ble interrupt path
原文地址:http://www.cnblogs.com/George1994/p/7377026.html