码迷,mamicode.com
首页 > 其他好文 > 详细

基于Celery + RabbitMQ实现订阅发布模式

时间:2020-10-05 22:08:39      阅读:27      评论:0      收藏:0      [点我收藏+]

标签:client   发布   war   alt   代码   inf   数据处理   cas   交换   

Celery + RabbitMQ实现订阅发布模式

使用RabbitMQ的扇形交换机(fanout exchange)实现订阅发布模式

消息(Message)由Client发送,RabbitMQ接收到消息之后通过交换机转发到对应的队列上面。Worker会从队列中获取未被读取的数据处理。
RabbitMQ消息模式的核心理念是:生产者没有直接发送任何消费到队列。实际上,生产者都不知道这个消费是发送给哪个队列的。
相反,生产者只能发送消息给交换机,交换机是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。

扇形交换机介绍

扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

实现方式

在Celery中,扇形交换机被定义为广播路由的形式进行分发消息,下面两张图诠释了RabbitMQ本身的扇形交换机工作方式与Celery广播路由的工作方式。
技术图片

技术图片
在RabbitMq中,你可以绑定多个队列到同一个扇形交换机来实现消息广播。

而celery使用rabbitmq做broker时,自己又实现的一套广播机制:通过一个创建一个广播Queue,来广播消息至订阅此队列的所有消费者。

实现具体代码如下:

消费者代码实现:值得注意: 在task_queues中Broadcast方法中需要指定一个固定的队列(多个消费者一定要不同)而且auto_delete需要指定为False
这样才能保证消费者在重新启动的时候不会删除fanout exchange中bind的队列,这样就不会丢失fanout exchange中的消息

#  fanout_celery_worker.py

from celery import Celery as Create_celery
from project import create_app

def init_publish_celery(app=None):
    app = app or create_app()
    celery_app = Create_celery(__name__, broker="amqp://admin:admin@192.168.2.229/pubsub-stage", include=[
        "project.task.fanout_task"])

    # 发布广播的队列
    from kombu.common import Broadcast
    celery_app.conf.task_queues = (Broadcast(name="mange", queue=‘mange.consumer1‘, auto_delete=False),)

    # 忽略消息队列未注册的任务
    from celery.worker.consumer import Consumer
    def on_unknown_task(self, body, message, exc):
        message.ack()

    Consumer.on_unknown_task = on_unknown_task

    class ContextTask(celery_app.Task):
        """Make celery tasks work with Flask app context"""

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    def publish(task_name, queue, **kwargs):
        celery_app.send_task(task_name, queue=queue, kwargs=kwargs)

    # 给Celery导入flask的上下文
    celery_app.Task = ContextTask
    # 发布广播才会用到
    celery_app.publish2broadcast = publish
    return celery_app

celery_publish = init_publish_celery()

# fanout_task.py

from project.task.fanout_celery_worker import celery_publish

@celery_publish.task(name=‘unit_update‘, queue=‘mange‘)
def unit_update(**kwargs):
    print("********************************")
    for _k, _v in kwargs.items():
        print("{}: {}".format(_k, _v))

启动之后监听队列如图:

技术图片

rabbitmq中mange exchange中绑定队列:(消费者服务关闭,该bind队列也不会自动删除)

技术图片

发送任务代码:

值得注意的是在第二个发送的异步任务是消费者未注册的任务,将不会被接收

from project.task.fanout_celery_worker import celery_publish

celery_publish.send_task(‘unit_update‘, queue=‘mange‘, kwargs={"user": "developer", "remark": "测试订阅发布模式"})

celery_publish.send_task(‘unit_delete‘, queue=‘mange‘, kwargs={"user": "developer", "remark": "未注册的任务将被忽略"})

执行结果如下:

仅有第一个任务执行

技术图片


  • From: xaohuihui
  • 手搓不易,记得点赞哦

基于Celery + RabbitMQ实现订阅发布模式

标签:client   发布   war   alt   代码   inf   数据处理   cas   交换   

原文地址:https://blog.51cto.com/14612701/2539925

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!