码迷,mamicode.com
首页 > 其他好文 > 详细

利用redis实现带优先级的消息队列

时间:2015-03-31 09:12:19      阅读:238      评论:0      收藏:0      [点我收藏+]

标签:redis

前言

以前一直有使用celery的优先级机制(基于redis的任务队列),一直很好奇它的实现机制,在查阅了部分资料后,决定写这篇文章,作为总结。

1. 利用Sorted Set 实现

使用Sorted Set 做优先级队列最大的优点是直观明了。

ZADD key score member [[score member] [score member] ...]

score 作为优先级,member 作为相应的任务
在Sorted Set 中,score 小的,位于优先级队列的头部,即优先级较高
由于score 就是menber的优先级,因此非常直观
可以使用

MULTI
ZRANGE key 0 0 WITHSCORES
ZREMRANGEBYRANK task_list 0 0
EXEC

来获取任务队列中优先级最高的元素
ZRANGE 用于获取任务,ZREMRANGEBYRANK 用于从消息队列中移除

注意:由于Sorted Set本身是一个set,因此消息队列中的消息不能重复,否则新加入的消息会覆盖以前加入的消息
注意:对于score 相同的消息,Sorted Set 会按照字典序进行排序

2. 利用List实现

应该一下就能想到,list 是作为消息队列的最理想的选择,但这里使用list 实现带优先级的消息队列也可以有好几种不同的实现方式。

2.1 准备

首先,如果我们假定消息队列中的消息,从消息队列的右侧推入(RPUSH),从左侧取出(LPOP)
那么单个list 很容易构造成一个FIFO 队列。但是如果优先级只有两级,高和低,那么我们可以把高优先级的消息,使用LPUSH 推入队列左侧,把低优先级的消息,使用RPUSH推入到队列右侧, 这样单个list就可以实现2级的带优先级的消息队列。

2.2 使用BLPOP

redis 提供了列表的阻塞式(blocking)弹出原语。

BLPOP key [key ...] timeout

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。
这样我们可以创建三个队列,high,normal, low ,分别代表高优先级,普通优先级,低优先级

BLPOP high normal low

2.3 基于多个key 的LPOP

有时候我们并不想要阻塞式的原语,那么在业务层,我们可以在多个队列中遍历,查找来获取消息

queue_list = ["high", "normal", "low"]
def get_msg():
    for queue in queue_list:
        msg = redis_db.lpop(queue)
        if msg is not None:
            return msg
    return None

翻阅rq 的源码时,我发现rq的带优先级的任务队列正是这样实现的

2.4 扩展

如果我们需要10个优先级的消息队列,可以想到我们需要至少5个队列(参考2.1)
这时候我们的消息队列的命名可能就需要采取某种规则
比如,原打算命名的消息队列的名称为 msg_queue
那么这5个消息队列就可以被命名为
msg_queue-0
msg_queue-1
msg_queue-2
msg_queue-3
msg_queue-4

如果再结合

KEYS pattern

我们就可以得到对任意多个优先级支持的消息队列

# priority 1 ~ 10
# push message into list
def push_message(queue, priority, message):
    num = (priority - 1) / 2
    target_queue = queue + "-" + str(num)
    # direct
    if priority % 2 == 1:
        redis_db.lpush(target_queue, message)
    else:
        redis_db.rpush(target_queue, message)
# fetch  a message
def fetch_message(queue):
    queue_list = redis_db.keys(queue + "-?")
    queue_list = sorted(queue_list)
    for queue in queue_list:
        msg = redis_db.lpop(queue)
        if msg is not None:
            return msg
    return None

注意:采用这种做法,同一优先级的消息,并不满足FIFO

利用redis实现带优先级的消息队列

标签:redis

原文地址:http://blog.csdn.net/woshiaotian/article/details/44757621

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!