标签:
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
send:
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author:Jerry Shi import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘hello‘) channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘) print("已发送 Hello World!") connection.close()
receive:
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author:Jerry Shi import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘hello‘) print ‘ [*] Waiting for messages. To exit press CTRL+C‘ def callback(ch, method, properties, body): print(" 已收到 %r" %body) channel.basic_consume(callback, queue=‘hello‘, no_ack=True) channel.start_consuming()
send:
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author:Jerry Shi import pika connection = pika.BlockingConnection(pika.ConnectionParameters( ‘localhost‘)) ##相当建立一个socket channel = connection.channel() ##声明一个管道 channel.queue_declare(queue=‘hello‘) #声明queue channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, #发给一个queue名字 body=‘Hello World!‘) #发送的内容 print(" 已发送 ‘Hello World!‘") #发送完成后打印消息出来 connection.close()
receive:
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author:Jerry Shi import pika connection = pika.BlockingConnection(pika.ConnectionParameters( ‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘hello‘) #声明从哪个队列收消息 def callback(ch, method, properties, body): print("-->", ch, method, properties) print(" [x] Received %r" % body) #开始消费消息 channel.basic_consume(callback, #如果收到消息就调用callback函数来处理消息 queue=‘hello‘, no_ack=True) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming() #运行后一直等待收取消息
rpc_client:
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author:Jerry Shi import pika,uuid,time class RpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) #建立远程连接 self.channel = self.connection.channel() #建立声明隧道 result = self.channel.queue_declare(exclusive=True) #生成一个随机queue self.callback_queue = result.method.queue #随机queue self.channel.basic_consume(self.on_response, #只要收到消息就调用op_response no_ack=True, queue=self.callback_queue) #声明要收callback_queue def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: #判断是否是上次我发送出去的数据 self.response = body #body是队列的返回 def call(self, n): self.response = None #开始设置response为none self.corr_id = str(uuid.uuid4()) #生成一个随机的字符串 self.channel.basic_publish(exchange=‘‘, routing_key=‘rpc_queue‘, #发消息到rpc_queue里 properties=pika.BasicProperties( #消息持久化pika.BasicProperties reply_to=self.callback_queue, #生成完随机queue后发给我,我告诉服务端返回信息发送的这个queue correlation_id=self.corr_id, #发送uuid字符串给服务端 ), body=str(n)) #信息必须是字符串 while self.response is None: #respnse如果是空就执行本循环 self.connection.process_data_events() #非阻塞版的start_consuming(),有消息返回,没有消息也返回。 print("") time.sleep(0.5) return int(self.response) rpc = RpcClient() #实例化 print(" 您要做的是:") response = rpc.call(6) #调用call方法,传参一个30 print(" 您的结果是: %r" % response)
rpc_server:
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author:Jerry Shi import pika,time connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘rpc_queue‘) #声明一个rpe_queue def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) #收到信息 print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange=‘‘, routing_key=props.reply_to, #客户单随机生成的那个随机queue properties=pika.BasicProperties(correlation_id= props.correlation_id), #收到客户端生成的uuid后,自己再生成一个uuid并返回给客户端 body=str(response)) #发送消息给客户端 ch.basic_ack(delivery_tag=method.delivery_tag) #确认消息被消费 channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue=‘rpc_queue‘) print(" [x] Awaiting RPC requests") channel.start_consuming()
标签:
原文地址:http://www.cnblogs.com/shiyongzhi/p/5983260.html