标签:tag self read parameter 自己 callback lis 不同的 无法
消息服务器rabbmit
RabbitMQ 消息队列
python里有两个Q, threading queue、不同线程间数据交互
进程Queue: 不同进程间交互这个说法是错误的。 这个是用于父进程与子进程间交互、或者同属于同一父进程下多个子进程进行交互。
两个python程序的进程间是无法通信的。
各个独立进程间通信:
QQ 要发送消息给 world。1、通过socket,这个需要自己去写很多东西(沾包、收到的是什么需要返回的又是什么……) 。2、消息队列,
那两个不同的程序要通信、两台机器要通信?
消息队列: 可以跟各种进程通信、且不用再去写什么沾包啊之类的
zeromq\activemq 这些都是消息队列
很多事件需要自己去体验一下。
Rabbitmq: erlang开发的
发消息:轮询
消息发出去了、需要确认吗? 还是发出去就完了,管你的。
大部份情况都需要回发一下处理完的消息。。万一处理过程中、挂了怎么办?发送端没收到回复是吧
挂了,socket断了,发送端自然知道、这消息立马就重新发给另一个在线的接收端。
no_ack=False# 需要收到了回应才放弃这条消息
回应保证的是:客户端、接收消息端挂掉 channel.basic_ack()
持久化: 保证的是,服务器挂掉之后
Rabbitmq安装:
需要安装好环境:erlang
brew install rabbitmq
看例子:
import time
import pika
import uuid
class MyRabbmit(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            "localhost"
        ))
        self.channel = self.connection.channel()
        # self.channel.queue_declare(queue="rpc_queue")
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response,
                                   queue=self.callback_queue,
                                   # no_ack=True
                                   )
    def on_response(self, ch, method, property, body):
        print(property.reply_to)
        print(body)
        self.response = body.decode()
    def call(self, cmd):
        self.response = None
        self.uuid = str(uuid.uuid4())
        print(self.uuid)
        self.channel.basic_publish(exchange="",
                                   routing_key="rpc_queue",
                                   body = cmd,
                                   properties = pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.uuid
                                   )
                                   )
        while self.response is None:
            time.sleep(0.5)
            self.connection.process_data_events()
            # print("wait...")
        return self.response
my_rpc = MyRabbmit()
cmd = input(">>").strip()
if cmd:
    result = my_rpc.call(cmd)
    print("result: ", result)
import os
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "localhost"
))
channel = connection.channel()
channel.queue_declare("rpc_queue")
def func(cmd):
    result = os.popen(cmd).read()
    print(result)
    return result
def on_response(ch, method, property, body):
    print(body)
    print(property.reply_to, property.correlation_id)
    body = func(body.decode())
    channel.basic_publish(exchange="",
                          routing_key=property.reply_to,
                          body=body
                          )
    ch.basic_ack(delivery_tag=method.delivery_tag) # 回复一下结果
channel.basic_consume(on_response,
                      queue="rpc_queue",
                      no_ack=False,
                      )
print("等待收消息...")
channel.start_consuming()
标签:tag self read parameter 自己 callback lis 不同的 无法
原文地址:http://www.cnblogs.com/otcsnow/p/6568876.html