码迷,mamicode.com
首页 > Windows程序 > 详细

MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现

时间:2015-07-02 13:58:22      阅读:285      评论:0      收藏:0      [点我收藏+]

标签:

 MQTT moquette 的Server发布主题

Java代码  技术分享
  1. package com.etrip.mqtt.future;  
  2.   
  3. import java.net.URISyntaxException;  
  4.   
  5. import org.fusesource.mqtt.client.FutureConnection;  
  6. import org.fusesource.mqtt.client.MQTT;  
  7. import org.fusesource.mqtt.client.QoS;  
  8. import org.fusesource.mqtt.client.Topic;  
  9. import org.slf4j.Logger;  
  10. import org.slf4j.LoggerFactory;  
  11.   
  12. /** 
  13.  *  
  14.  *  
  15.  *  
  16.  * 采用Future式 发布主题  
  17.  *  
  18.  * @author longgangbai 
  19.  */  
  20. public class MQTTFutureServer {  
  21.         private static final Logger LOG = LoggerFactory.getLogger(MQTTFutureServer.class);  
  22.         private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883";  
  23.         private final static boolean CLEAN_START = true;  
  24.         private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s  
  25.         public  static Topic[] topics = {  
  26.                         new Topic("china/beijing", QoS.EXACTLY_ONCE),  
  27.                         new Topic("china/tianjin", QoS.AT_LEAST_ONCE),  
  28.                         new Topic("china/henan", QoS.AT_MOST_ONCE)};  
  29.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
  30.         public final  static long RECONNECTION_DELAY=2000;  
  31.           
  32.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M  
  33.         public static void main(String[] args)   {  
  34.             MQTT mqtt = new MQTT();  
  35.             try {  
  36.                 //设置服务端的ip  
  37.                 mqtt.setHost(CONNECTION_STRING);  
  38.                 //连接前清空会话信息  
  39.                 mqtt.setCleanSession(CLEAN_START);  
  40.                 //设置重新连接的次数  
  41.                 mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
  42.                 //设置重连的间隔时间  
  43.                 mqtt.setReconnectDelay(RECONNECTION_DELAY);  
  44.                 //设置心跳时间  
  45.                 mqtt.setKeepAlive(KEEP_ALIVE);  
  46.                 //设置缓冲的大小  
  47.                 mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
  48.       
  49.                 //创建连接   
  50.                 final FutureConnection connection= mqtt.futureConnection();  
  51.                 connection.connect();  
  52.                 int count=1;  
  53.                 while(true){  
  54.                     count++;  
  55.                     // 用于发布消息,目前手机段不需要向服务端发送消息  
  56.                     //主题的内容  
  57.                     String message="hello "+count+"chinese people !";  
  58.                     String topic = "china/beijing";  
  59.                     connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE,  
  60.                             false);  
  61.                     System.out.println("MQTTFutureServer.publish Message "+"Topic Title :"+topic+" context :"+message);  
  62.                       
  63.                 }  
  64.             } catch (URISyntaxException e) {  
  65.                 // TODO Auto-generated catch block  
  66.                 e.printStackTrace();  
  67.             } catch (Exception e) {  
  68.                 // TODO Auto-generated catch block  
  69.                 e.printStackTrace();  
  70.             }  
  71.         }  
  72. }  

 

 

 MQTT moquette 的Client接收主题

 

Java代码  技术分享
  1. package com.etrip.mqtt.future;  
  2.   
  3. import java.net.URISyntaxException;  
  4.   
  5. import org.fusesource.mqtt.client.Future;  
  6. import org.fusesource.mqtt.client.FutureConnection;  
  7. import org.fusesource.mqtt.client.MQTT;  
  8. import org.fusesource.mqtt.client.Message;  
  9. import org.fusesource.mqtt.client.QoS;  
  10. import org.fusesource.mqtt.client.Topic;  
  11. import org.slf4j.Logger;  
  12. import org.slf4j.LoggerFactory;  
  13. /** 
  14.  *  
  15.  * MQTT moquette 的Client 段用于订阅主题,并接收主题信息 
  16.  *  
  17.  * 采用Future 式 订阅主题  
  18.  *  
  19.  * @author longgangbai 
  20.  */  
  21. public class MQTTFutureClient {  
  22.         private static final Logger LOG = LoggerFactory.getLogger(MQTTFutureClient.class);  
  23.         private final static String CONNECTION_STRING = "tcp://192.168.208.46:1883";  
  24.         private final static boolean CLEAN_START = true;  
  25.         private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s  
  26.         private final static String CLIENT_ID = "publishService";  
  27.         public  static Topic[] topics = {  
  28.                         new Topic("china/beijing", QoS.EXACTLY_ONCE),  
  29.                         new Topic("china/tianjin", QoS.AT_LEAST_ONCE),  
  30.                         new Topic("china/henan", QoS.AT_MOST_ONCE)};  
  31.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
  32.         public final  static long RECONNECTION_DELAY=2000;  
  33.           
  34.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M  
  35.           
  36.           
  37.           public static void main(String[] args)   {  
  38.                 //创建MQTT对象  
  39.                 MQTT mqtt = new MQTT();  
  40.                 try {  
  41.                     //设置mqtt broker的ip和端口  
  42.                     mqtt.setHost(CONNECTION_STRING);  
  43.                     //连接前清空会话信息  
  44.                     mqtt.setCleanSession(CLEAN_START);  
  45.                     //设置重新连接的次数  
  46.                     mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
  47.                     //设置重连的间隔时间  
  48.                     mqtt.setReconnectDelay(RECONNECTION_DELAY);  
  49.                     //设置心跳时间  
  50.                     mqtt.setKeepAlive(KEEP_ALIVE);  
  51.                     //设置缓冲的大小  
  52.                     mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
  53.                       
  54.                     //获取mqtt的连接对象BlockingConnection  
  55.                     final FutureConnection connection= mqtt.futureConnection();  
  56.                     connection.connect();  
  57.                     connection.subscribe(topics);  
  58.                     while(true){  
  59.                         Future<Message> futrueMessage=connection.receive();  
  60.                         Message message =futrueMessage.await();  
  61.                           
  62.                           
  63.                         System.out.println("MQTTFutureClient.Receive Message "+ "Topic Title :"+message.getTopic()+" context :"+String.valueOf(message.getPayloadBuffer()));  
  64.                     }  
  65.                 } catch (URISyntaxException e) {  
  66.                     // TODO Auto-generated catch block  
  67.                     e.printStackTrace();  
  68.                 } catch (Exception e) {  
  69.                     // TODO Auto-generated catch block  
  70.                     e.printStackTrace();  
  71.                 }finally{  
  72.                       
  73.                 }  
  74.             }  
  75. }  

 

MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现

标签:

原文地址:http://www.cnblogs.com/yudar/p/4615695.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!