标签:
在工业生产设计中,我们往往需要实现一个基于消息订阅的模式,用来对非定时的的消息进行监听订阅。
这种设计模式在 总线设计模式中得到体现。微软以前的WCF中实现了服务总线 ServiceBus的设计模式。然并卵。WCF已经好像是上个世纪的产物................
基于事件订阅的模式,比如 EventBus类的组件产品。但是往往设计比较复杂。
如果依赖于 Redis做事件消息推送。那就大大简化了这种设计模式,而且性能也比较客观。
Redis在 2.0之后的版本中 实现了 事件推送的 pub/sub命令
PSUBSCRIBE订阅一个或多个符合给定模式的频道
PUBLISH将信息message 发送到指定的频道channel
PUBSUB是一个查看订阅与发布系统状态的内省命令
PUBSUB CHANNELS pattern 列出当前的活跃频道
PUBSUB NUMSUB channel-1 channel-N 返回给定频道的订阅者数量
PUBSUB NUMPAT 返回订阅模式的数量
PUNSUBSCRIBE 指示客户端退订所有给定模式
SUBSCRIBE 订阅给定的一个或多个频道的信息
UNSUBSCRIBE 指示客户端退订给定的频道
P 开头的 (pattern)支持通配符模式。
以下举例说明如何发布用户的概念工作。在下面的例子给出一个客户端订阅一个通道名为redisChat
redis 127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1
现在,两个客户端都发布在同一个通道名redisChat消息及以上的订阅客户端接收消息。
redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"
(integer) 1
redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by tutorials point"
(integer) 1
1) "message"
2) "redisChat"
3) "Redis is a great caching technique"
1) "message"
2) "redisChat"
3) "Learn redis by tutorials point"
在 redis-cli客户端中 推送消息的时候,返回成功发送到订阅者的数目。
如:(integer) 1
原理:
RedisServer包含两个重要的结构:
1. channels:实际上就是一个key-value的Map结构,key为订阅地频道,value为Client的List
2. patterns:存放模式+client地址的列表
流程:从pubsub_channels中找出跟publish中channel相符的clients-list,然后再去pubsub_patterns中找出每一个相符的pattern和client。向这些客户端发送publish的消息。
订阅信道
消息推送
在C#中的实现
基于
1 Task.Factory.StartNew(() => 2 { 3 var client = new RedisClient("192.168.1.100", 6379, "你的密码"); 4 try 5 { 6 var isOpen = client.Ping(); 7 if (isOpen == false) 8 { 9 return; 10 } 11 var sub1 = client.CreateSubscription(); 12 13 //接受消息的委托 14 sub1.OnMessage = (chanel, message) => 15 { 16 Console.WriteLine("chanel is :{0}", chanel); 17 Console.WriteLine("message is :{0}", message); 18 }; 19 sub1.SubscribeToChannels(new string[] { "testchat" });//注意:订阅信道的时候 会开启阻塞模式,所以,需要将监听放到单独的线程里 20 } 21 catch (Exception) 22 { 23 24 throw; 25 } 26 27 28 29 30 });
注意:在程序终止或者类的实例被销毁的时候,请将订阅者实例注销掉,否则,在redis中一直存在这个订阅者。
1 使用idispose 显示释放
2 使用析构函数 CLR回收的时候 释放
sub1.Dispose();
例如:
1 public void Dispose() 2 { 3 Dispose(true); 4 GC.SuppressFinalize(this); 5 } 6 7 protected virtual void Dispose(bool disposing) 8 { 9 if (disposing) 10 { 11 if (redisClient != null) 12 redisClient.Dispose(); 13 if (subscription != null) 14 subscription.Dispose(); 15 if (log != null) 16 log.Dispose(); 17 } 18 } 19 20 ~RedisEx() 21 { 22 Dispose(false); 23 }
官方推荐这种写法
var clientsManager = new PooledRedisClientManager(new string[] { "密码@192.168.1.200:6379" }); var redisPubSub = new RedisPubSubServer(clientsManager, new string[] { "testchat" }) { OnMessage = (channel, msg) => { Console.WriteLine("方式2订阅演示............."); Console.WriteLine("Received ‘{0}‘ from ‘{1}‘", msg, channel); } }.Start();
To use RedisPubSubServer
, initialize it with the channels you want to subscribe to and assign handlers for each of the events you want to handle.
At a minimum you‘ll want to handle OnMessage
:
Calling Start()
after it‘s initialized will get it to start listening and processing any messages published to the subscribed channels.
官方文档:https://github.com/ServiceStack/ServiceStack.Redis
附加文档
PSUBSCRIBE pattern [pattern …]
订阅一个或多个符合给定模式的频道。
每个模式以* 作为匹配符,比如it* 匹配所有以it 开头的频道( it.news 、it.blog 、it.tweets 等等),
news.* 匹配所有以news. 开头的频道( news.it 、news.global.today 等等),诸如此类。
可用版本: >= 2.0.0
时间复杂度: O(N),N 是订阅的模式的数量。
返回值: 接收到的信息(请参见下面的代码说明)。
redis> psubscribe news.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe" # 返回值的类型:显示订阅成功
2) "news.*" # 订阅的模式
3) (integer) 1 # 目前已订阅的模式的数量
1) "pmessage" # 返回值的类型:信息
2) "news.*" # 信息匹配的模式
3) "news.it" # 信息本身的目标频道
4) "Google buy Motorola" # 信息的内容
PUBLISH channel message
将信息message 发送到指定的频道channel 。
可用版本: >= 2.0.0
时间复杂度: O(N+M),其中N 是频道channel 的订阅者数量,而M 则是使用模式订阅(subscribed
patterns) 的客户端的数量。
返回值: 接收到信息message 的订阅者数量。
# 对没有订阅者的频道发送信息
redis> publish bad_channel "can any body hear me?"
(integer) 0
# 向有一个订阅者的频道发送信息
redis> publish msg "good morning"
(integer) 1
# 向有多个订阅者的频道发送信息
redis> publish chat_room "hello~ everyone"
(integer) 3
PUBSUB [argument [argument …]]
PUBSUB 是一个查看订阅与发布系统状态的内省命令,它由数个不同格式的子命令组成,以下将分别对这
些子命令进行介绍。
可用版本: >= 2.8.0
列出当前的活跃频道。
活跃频道指的是那些至少有一个订阅者的频道,订阅模式的客户端不计算在内。
pattern 参数是可选的:
• 如果不给出pattern 参数,那么列出订阅与发布系统中的所有活跃频道。
• 如果给出pattern 参数,那么只列出和给定模式pattern 相匹配的那些活跃频道。
复杂度: O(N) ,N 为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常
数)。
返回值: 一个由活跃频道组成的列表。
# client-1 订阅news.it 和news.sport 两个频道
client-1> SUBSCRIBE news.it news.sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.sport"
3) (integer) 2
# client-2 订阅news.it 和news.internet 两个频道
client-2> SUBSCRIBE news.it news.internet
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.internet"
3) (integer) 2
# 首先, client-3 打印所有活跃频道
# 注意,即使一个频道有多个订阅者,它也只输出一次,比如news.it
client-3> PUBSUB CHANNELS
1) "news.sport"
2) "news.internet"
3) "news.it"
# 接下来, client-3 打印那些与模式news.i* 相匹配的活跃频道
# 因为news.sport 不匹配news.i* ,所以它没有被打印
redis> PUBSUB CHANNELS news.i*
1) "news.internet"
2) "news.it"
返回给定频道的订阅者数量,订阅模式的客户端不计算在内。
复杂度: O(N) ,N 为给定频道的数量。
返回值: 一个多条批量回复( Multi-bulk reply),回复中包含给定的频道,以及频道的订阅者数量。格式为:频道 channel-1 ,channel-1 的订阅者数量,频道 channel-2 ,channel-2 的订阅者数量,诸如此类。
回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。不给定任何频道而直接调用这个命令也是可以的,在这种情况下,命令只返回一个空列表。
# client-1 订阅 news.it 和 news.sport 两个频道
client-1> SUBSCRIBE news.it news.sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.sport"
3) (integer) 2
# client-2 订阅 news.it 和 news.internet 两个频道
client-2> SUBSCRIBE news.it news.internet
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.internet"
3) (integer) 2
# client-3 打印各个频道的订阅者数量
client-3> PUBSUB NUMSUB news.it news.internet news.sport news.music
1) "news.it" # 频道
2) "2" # 订阅该频道的客户端数量
3) "news.internet"
4) "1"
5) "news.sport"
6) "1"
7) "news.music" # 没有任何订阅者
8) "0"
注意,这个命令返回的不是订阅模式的客户端的数量,而是客户端订阅的所有模式的数量总和。
复杂度: O(1) 。
返回值: 一个整数回复( Integer reply)。
# client-1 订阅 news.* 和 discount.* 两个模式
client-1> PSUBSCRIBE news.* discount.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.*"
3) (integer) 1
1) "psubscribe"
2) "discount.*"
3) (integer) 2
# client-2 订阅 tweet.* 一个模式
client-2> PSUBSCRIBE tweet.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "tweet.*"
3) (integer) 1
# client-3 返回当前订阅模式的数量为 3
client-3> PUBSUB NUMPAT
(integer) 3
# 注意,当有多个客户端订阅相同的模式时,相同的订阅也被计算在 PUBSUB NUMPAT 之内
# 比如说,再新建一个客户端 client-4 ,让它也订阅 news.* 频道
client-4> PSUBSCRIBE news.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.*"
3) (integer) 1
# 这时再计算被订阅模式的数量,就会得到数量为 4
client-3> PUBSUB NUMPAT
(integer) 4
PUNSUBSCRIBE [pattern [pattern …]]
指示客户端退订所有给定模式。
如果没有模式被指定,也即是,一个无参数的 PUNSUBSCRIBE 调用被执行,那么客户端使用PSUBSCRIBE
命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
可用版本: >= 2.0.0
时间复杂度: O(N+M) ,其中 N 是客户端已订阅的模式的数量,M 则是系统中所有客户端订阅的模式的数量。
返回值: 这个命令在不同的客户端中有不同的表现。
订阅给定的一个或多个频道的信息。
可用版本: >= 2.0.0
时间复杂度: O(N),其中 N 是订阅的频道的数量。
返回值: 接收到的信息 (请参见下面的代码说明)。
# 订阅 msg 和 chat_room 两个频道
# 1 - 6 行是执行 subscribe 之后的反馈信息
# 第 7 - 9 行才是接收到的第一条信息
# 第 10 - 12 行是第二条
redis> subscribe msg chat_room
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 返回值的类型:显示订阅成功
2) "msg" # 订阅的频道名字
3) (integer) 1 # 目前已订阅的频道数量
1) "subscribe"
2) "chat_room"
3) (integer) 2
1) "message" # 返回值的类型:信息
2) "msg" # 来源 (从那个频道发送过来)
3) "hello moto" # 信息内容
1) "message"
2) "chat_room"
3) "testing...haha"
UNSUBSCRIBE [channel [channel …]]
指示客户端退订给定的频道。
如果没有频道被指定,也即是,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
可用版本: >= 2.0.0
时间复杂度: O(N) ,N 是客户端已订阅的频道的数量。
返回值: 这个命令在不同的客户端中有不同的表现。
参考文档:
http://blog.csdn.net/u011506468/article/details/47337839
http://my.oschina.net/itblog/blog/601284
http://www.oschina.net/code/snippet_584165_52231
http://redis.io/topics/pubsub
标签:
原文地址:http://www.cnblogs.com/micro-chen/p/5852598.html