码迷,mamicode.com
首页 > 编程语言 > 详细

redis pub/sub Spring StringRedisTemplate

时间:2015-04-23 09:34:28      阅读:317      评论:0      收藏:0      [点我收藏+]

标签:java   redis   spring   

redis 订阅发布

项目名称:SmRemind_NEW

@Service
public class PubServiceImpl implements PubService {
@Resource(name="stringRedisTemplate")
private  StringRedisTemplate stringRedisTemplate;

private String channelTopic = "Baojing";

/*发布消息到Channel*/
public void Publisher(String message) {


stringRedisTemplate.convertAndSend(channelTopic, message);
}
}




sub端我使用的是java做服务(Spring)管理

项目名称redis-service_pubsub

<!-- SDR Pub/Sub配置 -->
<bean id="topicMessageListener" class="com.chr.service.impl.SubServiceImpl">
            
</bean>
        <bean id="topicMessageListener_stop" class="com.chr.service.impl.stopServiceImpl"></bean>
<bean id="topicContainer"
class="org.springframework.data.redis.listener.RedisMessageListenerContainer"
destroy-method="destroy">
<property name="connectionFactory" ref="connectionFactory" />
<property name="messageListeners">
<map>
<entry key-ref="topicMessageListener">
<!-- <bean class="org.springframework.data.redis.listener.ChannelTopic"> 
<constructor-arg value="user:topic" /> </bean> -->
<ref bean="channelTopic" />
</entry>
                                <entry key-ref="topicMessageListener_stop">
<!-- <bean class="org.springframework.data.redis.listener.ChannelTopic"> 
<constructor-arg value="user:topic" /> </bean> -->
                                        <ref bean="channelTopics" />
</entry>
</map>
</property>
</bean>


<bean id="channelTopic" class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="Baojing" />
</bean>
        <bean id="channelTopics" class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="tuichu" />
</bean>



运行主类:

public class MainPcl {


    public static Object o = new Object();
    public static AtomicBoolean exit = new AtomicBoolean();


    public static void main(String[] args) throws InterruptedException {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
        RedisMessageListenerContainer r = (RedisMessageListenerContainer) context.getBean("topicContainer");
        exit.set(true);
        while (exit.get()) {
            synchronized (o) {
                o.wait();
            };
        }
        r.stop();
    }
}

写无限循环,设置AtomicBoolean  exit的值为true

AtomicBoolean  是线程保护的

当exit的值为false的时候终止循环


循环中调用object的wait()方法让程序等待,因为wait()方法会在jdk不管什么原因下异常退出,所以循环条件是exit的值为false

RedisMessageListenerContainer 为spring创建管理的pub/sub线程池

在循环结束的时候关闭线程池

因为object是静态的变量所以调用wait()方法时用synchronized


接收消息类

public class SubServiceImpl implements SubService {


    private Logger log = Logger.getLogger(this.getClass());


    @Autowired
    private ChannelTopic channelTopic;
    @Autowired
    private linkmanDao linkmanDao;
    private String bjhm = "13201706118";



    public void onMessage(Message message, byte[] pattern) {
        //Message为接收消息
//        System.out.println(message.toString() + "  1111" + channelTopic.getTopic() + "   " + Thread.currentThread().getName());
        log.info("redis-service_pubsub|" + channelTopic.getTopic() + "|" + message);
        //解析接收消息;
        String[] arg = message.toString().split("\\|");
        //主叫号码
        String callingnum = arg[0];
        //被叫号码
        String callednum = arg[1];
        //时间
        String calltime = arg[2];
        if (callednum.equals(bjhm)) {
            //被叫号码等于配置号码,查询数据库获取发送短信集合
            //临时使用我的手机号作为测试号码
            List<linkman> list = linkmanDao.getlinkman_Userid("13259781605");
            for (linkman l : list) {

            }
        }


    }


    public String send(String smsUrl, String message, String phonenum) {
        String sendurl = String.format(smsUrl, phonenum);
        log.info("redis-service_pubsub|send|" + phonenum + "|success|url:" + sendurl + "|message:" + message);
//        if (true) {
//            return "ok";
//        }


        try {
            URL url = new URL(sendurl);
            HttpURLConnection httpCon = (HttpURLConnection) url.openConnection();
            httpCon.setRequestProperty("Content-type", "application/x-www-form-urlencoded;charset=utf-8");
            httpCon.setRequestMethod("POST");
            httpCon.setUseCaches(false);//POST方法设置成无缓存
            httpCon.setDoOutput(true);
            httpCon.setConnectTimeout(5000);
            httpCon.setReadTimeout(5000);


            //POST方法将内容以流的方式写出
            OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream(), "utf-8");
            out.write(message);
            out.flush();
            out.close();


            //返回请求完成的状态码
            int httpcode = httpCon.getResponseCode();
            if (httpcode == 204) {
                return "ok";
            } else {
                return "exp";
            }


        } catch (Exception e) {
            log.error("redis-service_pubsub|send|" + phonenum + "|error|url:" + sendurl + "|message:" + message, e);
            return "exp";
        }


    }






暂停服务接收

public class stopServiceImpl implements SubService {


    private Logger log = Logger.getLogger(this.getClass());
    @Autowired
    private ChannelTopic channelTopics;


    public void onMessage(Message message, byte[] pattern) {
        log.info("redis-service_pubsub|"+channelTopics.getTopic()+"|"+message);
        //关闭服务
        MainPcl.exit.set(false);
        synchronized (MainPcl.o) {
            o.notify();
        };
    }
}

两个接收消息类实际是一样的,只是配置时

<bean id="channelTopic" class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="Baojing" />
</bean>
        <bean id="channelTopics" class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="tuichu" />
</bean>


配置的channel的value不一样。



redis pub/sub Spring StringRedisTemplate

标签:java   redis   spring   

原文地址:http://blog.csdn.net/ufo2910628/article/details/45216861

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!