标签:tag self read parameter 自己 callback lis 不同的 无法
消息服务器rabbmit
RabbitMQ 消息队列
python里有两个Q, threading queue、不同线程间数据交互
进程Queue: 不同进程间交互这个说法是错误的。 这个是用于父进程与子进程间交互、或者同属于同一父进程下多个子进程进行交互。
两个python程序的进程间是无法通信的。
各个独立进程间通信:
QQ 要发送消息给 world。1、通过socket,这个需要自己去写很多东西(沾包、收到的是什么需要返回的又是什么……) 。2、消息队列,
那两个不同的程序要通信、两台机器要通信?
消息队列: 可以跟各种进程通信、且不用再去写什么沾包啊之类的
zeromq\activemq 这些都是消息队列
很多事件需要自己去体验一下。
Rabbitmq: erlang开发的
发消息:轮询
消息发出去了、需要确认吗? 还是发出去就完了,管你的。
大部份情况都需要回发一下处理完的消息。。万一处理过程中、挂了怎么办?发送端没收到回复是吧
挂了,socket断了,发送端自然知道、这消息立马就重新发给另一个在线的接收端。
no_ack=False# 需要收到了回应才放弃这条消息
回应保证的是:客户端、接收消息端挂掉 channel.basic_ack()
持久化: 保证的是,服务器挂掉之后
Rabbitmq安装:
需要安装好环境:erlang
brew install rabbitmq
看例子:
import time import pika import uuid class MyRabbmit(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( "localhost" )) self.channel = self.connection.channel() # self.channel.queue_declare(queue="rpc_queue") result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, queue=self.callback_queue, # no_ack=True ) def on_response(self, ch, method, property, body): print(property.reply_to) print(body) self.response = body.decode() def call(self, cmd): self.response = None self.uuid = str(uuid.uuid4()) print(self.uuid) self.channel.basic_publish(exchange="", routing_key="rpc_queue", body = cmd, properties = pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.uuid ) ) while self.response is None: time.sleep(0.5) self.connection.process_data_events() # print("wait...") return self.response my_rpc = MyRabbmit() cmd = input(">>").strip() if cmd: result = my_rpc.call(cmd) print("result: ", result)
import os import pika connection = pika.BlockingConnection(pika.ConnectionParameters( "localhost" )) channel = connection.channel() channel.queue_declare("rpc_queue") def func(cmd): result = os.popen(cmd).read() print(result) return result def on_response(ch, method, property, body): print(body) print(property.reply_to, property.correlation_id) body = func(body.decode()) channel.basic_publish(exchange="", routing_key=property.reply_to, body=body ) ch.basic_ack(delivery_tag=method.delivery_tag) # 回复一下结果 channel.basic_consume(on_response, queue="rpc_queue", no_ack=False, ) print("等待收消息...") channel.start_consuming()
标签:tag self read parameter 自己 callback lis 不同的 无法
原文地址:http://www.cnblogs.com/otcsnow/p/6568876.html