redis 订阅发布
项目名称:SmRemind_NEW
@Service
public class PubServiceImpl implements PubService {
@Resource(name="stringRedisTemplate")
private StringRedisTemplate stringRedisTemplate;
private String channelTopic = "Baojing";
/*发布消息到Channel*/
public void Publisher(String message) {
stringRedisTemplate.convertAndSend(channelTopic, message);
}
}
sub端我使用的是java做服务(Spring)管理
项目名称redis-service_pubsub
<!-- SDR Pub/Sub配置 -->
<bean id="topicMessageListener" class="com.chr.service.impl.SubServiceImpl">
</bean>
<bean id="topicMessageListener_stop" class="com.chr.service.impl.stopServiceImpl"></bean>
<bean id="topicContainer"
class="org.springframework.data.redis.listener.RedisMessageListenerContainer"
destroy-method="destroy">
<property name="connectionFactory" ref="connectionFactory" />
<property name="messageListeners">
<map>
<entry key-ref="topicMessageListener">
<!-- <bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="user:topic" /> </bean> -->
<ref bean="channelTopic" />
</entry>
<entry key-ref="topicMessageListener_stop">
<!-- <bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="user:topic" /> </bean> -->
<ref bean="channelTopics" />
</entry>
</map>
</property>
</bean>
<bean id="channelTopic" class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="Baojing" />
</bean>
<bean id="channelTopics" class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="tuichu" />
</bean>
运行主类:
public class MainPcl {
public static Object o = new Object();
public static AtomicBoolean exit = new AtomicBoolean();
public static void main(String[] args) throws InterruptedException {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
RedisMessageListenerContainer r = (RedisMessageListenerContainer) context.getBean("topicContainer");
exit.set(true);
while (exit.get()) {
synchronized (o) {
o.wait();
};
}
r.stop();
}
}
写无限循环,设置AtomicBoolean exit的值为true
AtomicBoolean 是线程保护的
当exit的值为false的时候终止循环
循环中调用object的wait()方法让程序等待,因为wait()方法会在jdk不管什么原因下异常退出,所以循环条件是exit的值为false
RedisMessageListenerContainer 为spring创建管理的pub/sub线程池
在循环结束的时候关闭线程池
因为object是静态的变量所以调用wait()方法时用synchronized
接收消息类
public class SubServiceImpl implements SubService {
private Logger log = Logger.getLogger(this.getClass());
@Autowired
private ChannelTopic channelTopic;
@Autowired
private linkmanDao linkmanDao;
private String bjhm = "13201706118";
public void onMessage(Message message, byte[] pattern) {
//Message为接收消息
// System.out.println(message.toString() + " 1111" + channelTopic.getTopic() + " " + Thread.currentThread().getName());
log.info("redis-service_pubsub|" + channelTopic.getTopic() + "|" + message);
//解析接收消息;
String[] arg = message.toString().split("\\|");
//主叫号码
String callingnum = arg[0];
//被叫号码
String callednum = arg[1];
//时间
String calltime = arg[2];
if (callednum.equals(bjhm)) {
//被叫号码等于配置号码,查询数据库获取发送短信集合
//临时使用我的手机号作为测试号码
List<linkman> list = linkmanDao.getlinkman_Userid("13259781605");
for (linkman l : list) {
}
}
}
public String send(String smsUrl, String message, String phonenum) {
String sendurl = String.format(smsUrl, phonenum);
log.info("redis-service_pubsub|send|" + phonenum + "|success|url:" + sendurl + "|message:" + message);
// if (true) {
// return "ok";
// }
try {
URL url = new URL(sendurl);
HttpURLConnection httpCon = (HttpURLConnection) url.openConnection();
httpCon.setRequestProperty("Content-type", "application/x-www-form-urlencoded;charset=utf-8");
httpCon.setRequestMethod("POST");
httpCon.setUseCaches(false);//POST方法设置成无缓存
httpCon.setDoOutput(true);
httpCon.setConnectTimeout(5000);
httpCon.setReadTimeout(5000);
//POST方法将内容以流的方式写出
OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream(), "utf-8");
out.write(message);
out.flush();
out.close();
//返回请求完成的状态码
int httpcode = httpCon.getResponseCode();
if (httpcode == 204) {
return "ok";
} else {
return "exp";
}
} catch (Exception e) {
log.error("redis-service_pubsub|send|" + phonenum + "|error|url:" + sendurl + "|message:" + message, e);
return "exp";
}
}
暂停服务接收
public class stopServiceImpl implements SubService {
private Logger log = Logger.getLogger(this.getClass());
@Autowired
private ChannelTopic channelTopics;
public void onMessage(Message message, byte[] pattern) {
log.info("redis-service_pubsub|"+channelTopics.getTopic()+"|"+message);
//关闭服务
MainPcl.exit.set(false);
synchronized (MainPcl.o) {
o.notify();
};
}
}
两个接收消息类实际是一样的,只是配置时
<bean id="channelTopic" class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="Baojing" />
</bean>
<bean id="channelTopics" class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="tuichu" />
</bean>
配置的channel的value不一样。
redis pub/sub Spring StringRedisTemplate
原文地址:http://blog.csdn.net/ufo2910628/article/details/45216861