码迷,mamicode.com
首页 > 其他好文 > 详细

kombu源码Producer收获一

时间:2018-08-17 19:09:36      阅读:225      评论:0      收藏:0      [点我收藏+]

标签:code   att   add   connect   iat   close   sync   consumer   pattern   

celery内置了kombu库,看了一下kombu的源码,从官网最简单的一个例子来分析---消息发布,源码如下:

from __future__ import absolute_import, unicode_literals
import datetime

from kombu import Connection
with Connection(redis://localhost:6379/0) as conn:
    simple_queue = conn.SimpleQueue(simple_queue)
    message = helloworld, sent at {0}.format(datetime.datetime.today())
    simple_queue.put(message)
    print(Sent: {0}.format(message))
    simple_queue.close()

运行之前开启redis服务。这真是简单到不能到简单的例子-.-

一步步分析画出如下类图:

技术分享图片

大概十七八个类。流程省略几百万个字。

 

记一下关键步骤:

1、创建生产者 messaging.Producer 时不会操作redis。

2、创建消息者 messaging.Consumer 时会创建exchange,及其对应的 routing_key、patter、queue(队列名称),具体格式像这样:

 _kombu.binding.exchange_name => (routing_key\x06\x16pattern\x06\x16queue_name)
这是一个sadd操作,key是 _kombu.binding.exchange_name,前面是固定的,exchange_name是变化的;
value是 routing_key、pattern、和绑定的队列名。\x06\x16是分隔符。这可以从redis里面看出:

技术分享图片

 



生产者在publish消息时,调用的是:
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare):

可以看到,生产者只需要知道exchange、routing_key就可以发消息到队列。发送到redis的消息内容如下:

这是个lpush命令,key是队列名、value是消息内容连同元数据:

lpush queue_name => [message, ... ]

技术分享图片

 

 

生产者producer发布消息到此结束。

其中kombu对redis库做了一下简单的封装,里面有个AsyncRedis类,不过貌似没什么卵用。

借鉴kombu里对redis封装的设计,我封装了一下redis,使用简单,绝对无公害。地址在这:Python RedisChannel

 

kombu源码Producer收获一

标签:code   att   add   connect   iat   close   sync   consumer   pattern   

原文地址:https://www.cnblogs.com/cool-fire/p/9494786.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!