码迷,mamicode.com
首页 > 编程语言 > 详细

redis的订阅与发布在java中的应用

时间:2017-09-25 20:41:56      阅读:183      评论:0      收藏:0      [点我收藏+]

标签:resource   lis   ace   private   xtend   final   start   ora   频道   

Redis为我们提供了publish/subscribe(发布/订阅)功能。我们可以对某个channel(频道)进行subscribe(订阅),当有人在这个channel上publish(发布)消息时,redis就会通知我们,这样我们可以收到别人发布的消息。
作为Java的redis客户端,Jedis提供了publish/subscribe的接口。本文讲述如何使用Jedis来实现redis的publish/subscribe。

定义一个Subscriber

Jedis定义了抽象类JedisPubSub,在这个类中,定义publish/subsribe的回调方法。通过继承JedisPubSub类并重新实现这些回调方法,当publish/subsribe事件发生时,我们可以定制自己的处理逻辑。

在以下例子中,我们定义了Subscriber类,这个类继承了JedisPubSub类,并重新实现了其中的回调方法。

import redis.clients.jedis.JedisPubSub;
public class Subscriber extends JedisPubSub {
    public Subscriber() {
    }
    public void onMessage(String channel, String message) {
        System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
    }
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", 
                channel, subscribedChannels));
    }
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", 
                channel, subscribedChannels));
    }
}

定义SubThread线程类

由于Jedis的subscribe操作是阻塞的,因此,我们另起一个线程来进行subscribe操作。

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class SubThread extends Thread {
    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();
    private final String channel = "mychannel";
    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }
    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(subscriber, channel);
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}
在上面的代码中,我们从JedisPool获取一个Jedis实例,并使用这个Jedis实例进行subscribe的操作。
Jedissubscribe的声明如下:
public void subscribe(final JedisPubSub jedisPubSub, final String… channels)
第一个参数接受一个JedisPubSub对象,第二个参数指定对哪个频道进行订阅。上例中,我们把我们定义的Subscriber对象传给subscribe方法。
当publish/subscribe的事件发生时,会自动调用我们Subscriber的方法。

定义Publisher类

Publisher类接受用户的输入,并将输入发布到channel。当用户输入”quit”后,输入结束。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class Publisher {
    private final JedisPool jedisPool;

    public Publisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    public void start() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();
        while (true) {
            String line = null;
            try {
                line = reader.readLine();
                if (!"quit".equals(line)) {
                    jedis.publish("mychannel", line);
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
         }
    }
}

定义入口代码

如下是我们的程序入口代码。

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class PubSubDemo 
{
    public static void main( String[] args )
    {
        // 替换成你的reids地址和端口
        String redisIp = "127.0.0.1";
        int reidsPort = 6379;
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), redisIp, reidsPort);
        System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", redisIp, reidsPort));
        SubThread subThread = new SubThread(jedisPool);
        subThread.start();
        Publisher publisher = new Publisher(jedisPool);
        publisher.start();
    }
}

在上面的代码中,我们首先生成了一个JedisPool的redis连接池,这是由于Jedis不是线程安全的,JedisPool是线程安全的。而我们的程序在主线程和订阅线程(SubThread)均需要使用Jedis,故在程序中我们使用JedisPool。
由于Jedis的subcribe操作是阻塞的,故我们另起了一个线程来进行subcribe操作。
通过调用Publisher::start()方法,接受用户的输入,并publish到指定的channel。



redis的订阅与发布在java中的应用

标签:resource   lis   ace   private   xtend   final   start   ora   频道   

原文地址:http://www.cnblogs.com/aizj/p/7593673.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!