[root@localhost ~]# redis-server /opt/redis-2.4.10/redis.conf [7719] 16 Apr 11:37:22 # Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with ‘noeviction‘ policy now. [7719] 16 Apr 11:37:22 * Server started, Redis version 2.4.10 [7719] 16 Apr 11:37:22 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add ‘vm.overcommit_memory = 1‘ to /etc/sysctl.conf and then reboot or run the command ‘sysctl vm.overcommit_memory=1‘ for this to take effect.
[root@localhost ~]# redis-cli redis> subscribe news.sports Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.sports" 3) (integer) 1
可以看到已经开始了监听,向news.sports channel发布一条消息
[root@localhost ~]# redis-cli redis> publish news.sports "kaka is back" (integer) 1
redis> subscribe news.sports Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.sports" 3) (integer) 1 1) "message" 2) "news.sports" 3) "kaka is back"
发布消息是通过jedis.publish(String channel, String message)来发布的,其实就是往redis服务器发布一条publish命令。
public void publish(final byte[] channel, final byte[] message) { sendCommand(PUBLISH, channel, message); }
订阅消息是通过jedis.subscribe(JedisPub pub,String channel)来进行的,channel好理解,那么JedisPub是什么呢。
public void subscribe(JedisPubSub jedisPubSub, String... channels) { checkIsInMulti(); connect(); client.setTimeoutInfinite(); jedisPubSub.proceed(client, channels); client.rollbackTimeout(); }
可以看到,主要是通过jedisPubSub.proceed(client, channels);来进行订阅的。看proceed方法。
public void proceed(Client client, String... channels) { this.client = client; client.subscribe(channels); client.flush(); process(client); }
public void subscribe(final byte[]... channels) { sendCommand(SUBSCRIBE, channels); }
private void process(Client client) { do { } while (isSubscribed()); }
List<Object> reply = client.getObjectMultiBulkReply(); final Object firstObj = reply.get(0); if (!(firstObj instanceof byte[])) { throw new JedisException("Unknown message type: " + firstObj); } final byte[] resp = (byte[]) firstObj; if (Arrays.equals(SUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2)).intValue(); final byte[] bchannel = (byte[]) reply.get(1); final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel); onSubscribe(strchannel, subscribedChannels); } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2)).intValue(); final byte[] bchannel = (byte[]) reply.get(1); final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel); onUnsubscribe(strchannel, subscribedChannels); } else if (Arrays.equals(MESSAGE.raw, resp)) { final byte[] bchannel = (byte[]) reply.get(1); final byte[] bmesg = (byte[]) reply.get(2); final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel); final String strmesg = (bmesg == null) ? null : SafeEncoder .encode(bmesg); onMessage(strchannel, strmesg); } else if (Arrays.equals(PMESSAGE.raw, resp)) { final byte[] bpattern = (byte[]) reply.get(1); final byte[] bchannel = (byte[]) reply.get(2); final byte[] bmesg = (byte[]) reply.get(3); final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern); final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel); final String strmesg = (bmesg == null) ? null : SafeEncoder .encode(bmesg); onPMessage(strpattern, strchannel, strmesg); } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2)).intValue(); final byte[] bpattern = (byte[]) reply.get(1); final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern); onPSubscribe(strpattern, subscribedChannels); } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2)).intValue(); final byte[] bpattern = (byte[]) reply.get(1); final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern); onPUnsubscribe(strpattern, subscribedChannels); } else { throw new JedisException("Unknown message type: " + firstObj); }
可以看到,通过client.getObjectMultiBulkReply()来得到返回来的消息。判断消息的类型来进行不同的操作。比如Arrays.equals(SUBSCRIBE.raw, resp)判断返回来的消息是订阅,subscribedChannels = ((Long) reply.get(2)).intValue();是取得消息,也是do...while判断循环的条件,也就是说这一次如果读到消息了,则进行下一次循环。那么onSubscribe(String channel, int subscribedChannels)究竟做了什么事,看开头
public abstract void onMessage(String channel, String message); public abstract void onPMessage(String pattern, String channel, String message); public abstract void onSubscribe(String channel, int subscribedChannels); public abstract void onUnsubscribe(String channel, int subscribedChannels); public abstract void onPUnsubscribe(String pattern, int subscribedChannels); public abstract void onPSubscribe(String pattern, int subscribedChannels);
package redis.client.jredis.tests; import java.util.Timer; import java.util.TimerTask; import org.junit.Test; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisPubSub; public class JedisTest extends JedisTestBase { JedisPool pool = null; /** * 测试发布验证 */ @Test public void testPS(){ /** Jedis jedis = new Jedis("",6379); jedis.set("name", "xiaoruoen"); jedis.publish("news.blog.title", "Hello,World"); //*/ final String host = ""; JedisPoolConfig config = new JedisPoolConfig(); pool = new JedisPool(new JedisPoolConfig(),host); subscribe(new NewsListener(), "news.sports"); Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { // TODO Auto-generated method stub publish("news.sports", "{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}"); } }, 1000, 3000); } public Jedis getResource(int dbnum){ Jedis jedis = pool.getResource(); jedis.select(dbnum); return jedis; } /** * * @param channel * @param message ex:"{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}" */ public void publish(String channel,String message){ Jedis jedis = getResource(12); jedis.publish(channel, message); pool.returnResource(jedis); } public void subscribe(JedisPubSub listener,String channel){ Jedis jedis = getResource(12); jedis.subscribe(listener, channel); pool.returnResource(jedis); } }
package redis.client.jredis.tests; import redis.clients.jedis.JedisPubSub; public class NewsListener extends JedisPubSub { @Override public void onMessage(String channel, String message) { System.out.println("get message from"+channel+" "+message); } @Override public void onPMessage(String pattern, String channel, String message) { System.out.println("get message from"+channel+" "+message); } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("subscribe the channel:"+channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println("get message from"+channel); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { System.out.println("get message from"+subscribedChannels); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { System.out.println("get message from"+subscribedChannels); } }
发现只打印了一条数据subscribe the channel:news.sports
看到Note that subscribe is a blocking operation operation because it will poll Redis for responses on the thread that calls subscribe.可以看到subcribe是一个线程中的块操作。我猜测是在发布与接收的过程中,如果在同一线程里面进行操作,一边阻塞着流,另一边无法进行操作。于是将publish改写为另一线程启动。修改如下:
public static void main(String[] args){ final String host = ""; JedisPoolConfig config = new JedisPoolConfig(); pool = new JedisPool(new JedisPoolConfig(),host); Thread thread = new Thread(new Test().new PublishThread()); thread.start(); subscribe(new NewsListener(), "news.sports"); }
class PublishThread implements Runnable{ @Override public void run() { Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { // TODO Auto-generated method stub publish("news.sports", "{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}"); } }, 1000, 3000); } }
Pool:redis.clients.jedis.JedisPool@18e2b22 subscribe the channel:news.sports Pool:redis.clients.jedis.JedisPool@18e2b22 get message fromnews.sports {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} Pool:redis.clients.jedis.JedisPool@18e2b22 get message fromnews.sports {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} Pool:redis.clients.jedis.JedisPool@18e2b22 get message fromnews.sports {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} Pool:redis.clients.jedis.JedisPool@18e2b22 get message fromnews.sports {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} Pool:redis.clients.jedis.JedisPool@18e2b22
