标签:决定 roc 微软 hang 示例 任务分发 适合 through 选择
1 import pika 2 3 connection = pika.BlockingConnection( 4 pika.ConnectionParameters(‘localhost‘) 5 ) 6 # 声明一个管道 7 channel = connection.channel() 8 9 # 声明queue 10 channel.queue_declare(queue=‘hello‘) 11 12 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 13 channel.basic_publish( 14 exchange=‘‘, # 先不管 15 routing_key=‘hello‘, # queue名字 16 body=‘Hello World!‘ # 信息 17 ) 18 print(" [x] Sent ‘Hello World!‘") 19 connection.close()
receive端:
1 import pika 2 3 connection = pika.BlockingConnection( 4 pika.ConnectionParameters(‘localhost‘) 5 ) 6 7 channel = connection.channel() 8 9 # You may ask why we declare the queue again ? we have already declared it in our previous code. 10 # We could avoid that if we were sure that the queue already exists. For example if send.py program 11 # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good 12 # practice to repeat declaring the queue in both programs. 13 # 这句话的意思就是你如果确定这个队列存在,你是可以不需要再次声明这个队列的,但是如果队列还不存在的话,比如你先运行receive端,这时 14 # 就需要先声明这个队列,然后才能接收消息 15 channel.queue_declare(queue=‘hello‘) 16 17 def callback(ch, method, properties, body): 18 print(" [x] Received %r" % body) 19 20 channel.basic_consume( 21 callback, 22 queue=‘hello‘, 23 no_ack=True 24 ) 25 26 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 27 channel.start_consuming()
1 import pika 2 3 connection = pika.BlockingConnection( 4 pika.ConnectionParameters(‘localhost‘) 5 ) 6 # 声明一个管道 7 channel = connection.channel() 8 9 # 声明queue 10 channel.queue_declare(queue=‘task‘) 11 12 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 13 channel.basic_publish( 14 exchange=‘‘, # 先不管 15 routing_key=‘task‘, # queue名字 16 body=‘Hello World!‘ # 信息 17 ) 18 print(" [x] Sent ‘Hello World!‘") 19 connection.close()
receive端:
1 import pika 2 import time 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(‘localhost‘) 6 ) 7 8 channel = connection.channel() 9 10 # You may ask why we declare the queue again ? we have already declared it in our previous code. 11 # We could avoid that if we were sure that the queue already exists. For example if send.py program 12 # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good 13 # practice to repeat declaring the queue in both programs. 14 # 这句话的意思就是你如果确定这个队列存在,你是可以不需要再次声明这个队列的,但是如果队列还不存在的话,比如你先运行receive端,这时 15 # 就需要先声明这个队列,然后才能接收消息 16 channel.queue_declare(queue=‘task‘) 17 18 def callback(ch, method, properties, body): 19 print(" [x] Received %r" % body) 20 time.sleep(10) 21 ch.basic_ack(delivery_tag=method.delivery_tag) # 这句代码即为确认任务完成 22 23 channel.basic_consume( 24 callback, 25 queue=‘task‘, 26 # 如果no_ack设置为True的话消费者处理完事情是不会给rabbitmq发送确认消息的,也就是队列不管任务处理没处理完,都会抹去这条任务; 27 # 默认为False,消费者处理完事情会发送一个确认信息,(确认信息是:ch.basic_ack(delivery_tag=method.delivery_tag)) 28 # 如果消费者宕机了,任务没处理完,这时rabbitmq检测到消费者的socket断了,则会将此次任务转交给下一个消费者接着处理, 29 # 直到任务处理完毕,消费者给rabbitmq发送一个确认消息,此任务才会从队列中抹去 30 # no_ack=True 31 ) 32 33 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 34 channel.start_consuming()
注:通过ch.basic_ack(delivery_tag=method.delivery_tag)来确认任务完成,只有任务完成了此任务才会从RabbitMQ队列中删除,也就是说处理这个任务的机器宕机了,该任务并不会消失,而是会被RabbitMQ分发到下一个机器上继续运行,直到任务被处理完成才会抹去。那么RabbitMQ是怎么知道处理该任务的机器宕机了呢?socket断了啊,机器宕机的话,socket会断开,这时RabbitMQ就回收此任务分发到下一个机器。
1 import pika 2 3 connection = pika.BlockingConnection( 4 pika.ConnectionParameters(‘localhost‘) 5 ) 6 # 声明一个管道 7 channel = connection.channel() 8 9 # 声明queue 10 channel.queue_declare(queue=‘forever1‘, durable=True) # durable=True是队列持久化 11 12 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 13 channel.basic_publish( 14 exchange=‘‘, # 先不管 15 routing_key=‘forever1‘, # queue名字 16 body=‘Hello World!‘, # 信息 17 properties=pika.BasicProperties( 18 delivery_mode=2, # 消息持久化 19 ) 20 ) 21 print(" [x] Sent ‘Hello World!‘") 22 connection.close()
receive端:
1 import pika 2 import time 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(‘localhost‘) 6 ) 7 8 channel = connection.channel() 9 10 # You may ask why we declare the queue again ? we have already declared it in our previous code. 11 # We could avoid that if we were sure that the queue already exists. For example if send.py program 12 # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good 13 # practice to repeat declaring the queue in both programs. 14 # 这句话的意思就是你如果确定这个队列存在,你是可以不需要再次声明这个队列的,但是如果队列还不存在的话,比如你先运行receive端,这时 15 # 就需要先声明这个队列,然后才能接收消息 16 channel.queue_declare(queue=‘forever1‘, durable=True) 17 18 def callback(ch, method, properties, body): 19 print(" [x] Received %r" % body) 20 time.sleep(10) 21 ch.basic_ack(delivery_tag=method.delivery_tag) # 这句代码即为确认任务完成 22 23 channel.basic_consume( 24 callback, 25 queue=‘forever1‘, 26 # 如果no_ack设置为True的话消费者处理完事情是不会给rabbitmq发送确认消息的,也就是队列不管任务处理没处理完,都会抹去这条任务; 27 # 默认为False,消费者处理完事情会发送一个确认信息,(确认信息是:ch.basic_ack(delivery_tag=method.delivery_tag)) 28 # 如果消费者宕机了,任务没处理完,这时rabbitmq检测到消费者的socket断了,则会将此次任务转交给下一个消费者接着处理, 29 # 直到任务处理完毕,消费者给rabbitmq发送一个确认消息,此任务才会从队列中抹去 30 # no_ack=True 31 ) 32 33 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 34 channel.start_consuming()
核心代码:
channel.basic_qos(prefetch_count=1)
send端:(其实没什么变化)
1 import pika 2 3 connection = pika.BlockingConnection( 4 pika.ConnectionParameters(‘localhost‘) 5 ) 6 # 声明一个管道 7 channel = connection.channel() 8 9 # 声明queue 10 channel.queue_declare(queue=‘forever1‘, durable=True) # durable=True是队列持久化 11 12 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 13 channel.basic_publish( 14 exchange=‘‘, # 先不管 15 routing_key=‘forever1‘, # queue名字 16 body=‘Hello World!‘, # 信息 17 properties=pika.BasicProperties( 18 delivery_mode=2, # 消息持久化 19 ) 20 ) 21 print(" [x] Sent ‘Hello World!‘") 22 connection.close()
receive端:
1 import pika 2 import time 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(‘localhost‘) 6 ) 7 8 channel = connection.channel() 9 10 # You may ask why we declare the queue again ? we have already declared it in our previous code. 11 # We could avoid that if we were sure that the queue already exists. For example if send.py program 12 # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good 13 # practice to repeat declaring the queue in both programs. 14 # 这句话的意思就是你如果确定这个队列存在,你是可以不需要再次声明这个队列的,但是如果队列还不存在的话,比如你先运行receive端,这时 15 # 就需要先声明这个队列,然后才能接收消息 16 channel.queue_declare(queue=‘forever1‘, durable=True) 17 18 def callback(ch, method, properties, body): 19 print(" [x] Received %r" % body) 20 time.sleep(30) 21 ch.basic_ack(delivery_tag=method.delivery_tag) # 这句代码即为确认任务完成 22 23 channel.basic_qos(prefetch_count=1) # 只处理一个任务 24 channel.basic_consume( 25 callback, 26 queue=‘forever1‘, 27 # 如果no_ack设置为True的话消费者处理完事情是不会给rabbitmq发送确认消息的,也就是队列不管任务处理没处理完,都会抹去这条任务; 28 # 默认为False,消费者处理完事情会发送一个确认信息,(确认信息是:ch.basic_ack(delivery_tag=method.delivery_tag)) 29 # 如果消费者宕机了,任务没处理完,这时rabbitmq检测到消费者的socket断了,则会将此次任务转交给下一个消费者接着处理, 30 # 直到任务处理完毕,消费者给rabbitmq发送一个确认消息,此任务才会从队列中抹去 31 # no_ack=True 32 ) 33 34 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 35 channel.start_consuming()
注:即在生产中可以给高配置机器设置多接收几个任务,而低配置机器则设置少接收几个任务,避免造成任务堆积。
send端:
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host=‘localhost‘)) 5 channel = connection.channel() 6 7 # 绑定exchange,声明exchange类型 8 channel.exchange_declare( 9 exchange=‘logs‘, 10 type=‘fanout‘ 11 ) 12 13 message = "info: Hello World!2" 14 15 channel.basic_publish( 16 exchange=‘logs‘, # exchange名字 17 routing_key=‘‘, # 必须写,且为空,不然会报错 18 body=message # 消息内容 19 ) 20 print(" [x] Sent %r" % message) 21 22 connection.close()
receive端:
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host=‘localhost‘)) 5 channel = connection.channel() 6 7 channel.exchange_declare( 8 exchange=‘logs‘, 9 type=‘fanout‘ 10 ) 11 # exclusive排他,唯一的,不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 12 result = channel.queue_declare(exclusive=True) 13 queue_name = result.method.queue 14 print("random queuename", queue_name) 15 16 channel.queue_bind( 17 exchange=‘logs‘, 18 queue=queue_name 19 ) 20 21 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 22 23 def callback(ch, method, properties, body): 24 print(" [x] %r" % body) 25 26 channel.basic_consume( 27 callback, 28 queue=queue_name, 29 no_ack=True 30 ) 31 32 channel.start_consuming()
注:消息发布和订阅均是实时的,就像广播一样。
send端:
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘) 8 9 severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ 10 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 11 12 channel.basic_publish(exchange=‘direct_logs‘, routing_key=severity, body=message) 13 print(" [x] Sent %r:%r" % (severity, message)) 14 connection.close()
receive端:
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘) 8 9 result = channel.queue_declare(exclusive=True) 10 queue_name = result.method.queue 11 12 severities = sys.argv[1:] 13 if not severities: 14 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 15 sys.exit(1) 16 print(severities) 17 for severity in severities: 18 channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=severity) 19 20 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 21 22 def callback(ch, method, properties, body): 23 print(" [x] %r:%r" % (method.routing_key, body)) 24 25 channel.basic_consume(callback, queue=queue_name, no_ack=True) 26 27 channel.start_consuming()
5.3RabbitMQ之topic(更细致的消息过滤)
send端:
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) 8 9 routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘ 10 11 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ 12 channel.basic_publish(exchange=‘topic_logs‘, routing_key=routing_key, body=message) 13 print(" [x] Sent %r:%r" % (routing_key, message)) 14 connection.close()
receice端:
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange=‘topic_logs‘, type=‘topic‘) 8 9 result = channel.queue_declare(exclusive=True) 10 queue_name = result.method.queue 11 12 binding_keys = sys.argv[1:] 13 if not binding_keys: 14 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 15 sys.exit(1) 16 17 for binding_key in binding_keys: 18 channel.queue_bind(exchange=‘topic_logs‘, queue=queue_name, routing_key=binding_key) 19 20 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 21 22 def callback(ch, method, properties, body): 23 print(" [x] %r:%r" % (method.routing_key, body)) 24 25 channel.basic_consume(callback, queue=queue_name, no_ack=True) 26 27 channel.start_consuming()
6.RabbitMQ之RPC(Remote procedure call)
server端:
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) 4 5 channel = connection.channel() 6 channel.queue_declare(queue=‘rpc_queue‘) 7 8 def fib(n): 9 if n == 0: 10 return 0 11 elif n == 1: 12 return 1 13 else: 14 return fib(n - 1) + fib(n - 2) 15 16 def on_request(ch, method, props, body): 17 n = int(body) 18 19 print(" [.] fib(%s)" % n) 20 response = fib(n) 21 22 ch.basic_publish(exchange=‘‘, 23 routing_key=props.reply_to, 24 properties=pika.BasicProperties(correlation_id=props.correlation_id), 25 body=str(response)) 26 ch.basic_ack(delivery_tag=method.delivery_tag) 27 28 channel.basic_consume(on_request, queue=‘rpc_queue‘) 29 30 print(" [x] Awaiting RPC requests") 31 channel.start_consuming()
client端:
1 import pika 2 import uuid 3 4 class FibonacciRpcClient(object): 5 def __init__(self): 6 self.connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host=‘localhost‘)) 8 self.channel = self.connection.channel() 9 result = self.channel.queue_declare(exclusive=True) 10 self.callback_queue = result.method.queue 11 12 self.channel.basic_consume(self.on_response, # 只要一收到消息就调用on_response 13 no_ack=True, 14 queue=self.callback_queue) 15 16 def on_response(self, ch, method, props, body): 17 if self.corr_id == props.correlation_id: 18 self.response = body 19 20 def call(self, n): 21 self.response = None 22 self.corr_id = str(uuid.uuid4()) 23 24 self.channel.basic_publish(exchange=‘‘, 25 routing_key=‘rpc_queue‘, 26 properties=pika.BasicProperties( 27 reply_to=self.callback_queue, 28 correlation_id=self.corr_id, 29 ), 30 body=str(n)) 31 32 while self.response is None: 33 self.connection.process_data_events() # 非阻塞版的start_consuming() 34 return int(self.response) 35 36 fibonacci_rpc = FibonacciRpcClient() 37 38 print(" [x] Requesting fib(30)") 39 response = fibonacci_rpc.call(30) 40 print(" [.] Got %r" % response) 41 42 print(" [x] Requesting fib(20)") 43 response = fibonacci_rpc.call(20) 44 print(" [.] Got %r" % response)
标签:决定 roc 微软 hang 示例 任务分发 适合 through 选择
原文地址:http://www.cnblogs.com/breakering/p/7041215.html