标签:
虽然Redis有订阅功能,但是订阅功能是实时的,过了这个点,就接收不到消息了。
同时,如果订阅的客户端因为某些特殊原因shutdown了,那也就找不回未处理完整的订阅事件了。
但好在,Redis还有一个消息队列,通过消息队列,我们不仅可以把发布提交的更快速(发布会遍历所有订阅者,并通知到所有订阅者),又可能不用担心订阅者遗漏掉发生过异常的通知。
我将在后续随笔通过swoole,phpredis,yii三者来协调订阅者,邮局,发布者角色。
PUBLISH 命令的实际实现由 pubsubPublishMessage 函数完成,它的完整定义如下:
// 发送消息 int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; struct dictEntry *de; listNode *ln; listIter li; /* Send to clients listening for that channel */ // 向所有频道的订阅者发送消息 de = dictFind(server.pubsub_channels,channel); if (de) { list *list = dictGetVal(de); // 取出所有订阅者 listNode *ln; listIter li; // 遍历所有订阅者, 向它们发送消息 listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { redisClient *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); // 打印频道名 addReplyBulk(c,message); // 打印消息 receivers++; // 更新接收者数量 } } /* Send to clients listening to matching channels */ // 向所有被匹配模式的订阅者发送消息 if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); // 取出所有模式 channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; // 取出模式 // 如果模式和 channel 匹配的话 // 向这个模式的订阅者发送消息 if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); // 打印被匹配的模式 addReplyBulk(pat->client,channel); // 打印频道名 addReplyBulk(pat->client,message); // 打印消息 receivers++; // 更新接收者数量 } } decrRefCount(channel); // 释放用过的 channel } return receivers; // 返回接收者数量 } |
标签:
原文地址:http://www.cnblogs.com/si812cn/p/4401431.html