码迷,mamicode.com
首页 > 其他好文 > 详细

MQTT 消息 发布 订阅

时间:2016-10-21 19:10:43      阅读:5753      评论:0      收藏:0      [点我收藏+]

标签:locking   意思   url   method   技术分享   sdn   sync   archive   qos   


 

当连接向一个mqtt服务器时,clientId必须是唯一的。设置一样,导致client.setCallback总是走到 connectionLost回调。报connection reset。调查一天才发现是clientid重复导致。

client = new MqttAsyncClient(serverURIString, "client-id");

 


 

clientId是用来保存会话信息。

MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);

当服务器将message发布向所有订阅过的客户端,就会清除这个message。如果当前客户端不在线,等它连接时发送。

 

session与client之间的关系是怎样的?

这样的,比如你一个板子,作为客户端,发起mqtt的连接请求connect到mqtt服务器,比如说就是emqtt服务吧,emqtt服务端收到这个板子的连接请求之后,在tcp层上会和板子建立一个tcp的连接,在emqtt内部,会产生一个进程,和这个板子做数据通讯,同时还会产生一个进程,叫session,这个sessoin是专门管理这个板子订阅的主题,其它板子如果发布了这个板子感兴趣的主题的时候,也会发到这个板子对应的这个session里面,如果这个session收到订阅的主题之后,发现对用的client还活着,就通过这个client把数据经过tcp发到这个板子上,如果发现client已经没有了,就是说板子和服务端断掉了,那么session就会把收到的订阅的主题,先保存在session里面,下次板子连接上了,而且cleansession=false,那么这个session就不会清除,在这次连接时,就会把以前收到的订阅消息,发给板子,大概就是这个意思。


 

参考:

http://www.blogjava.net/yongboy/archive/2014/02/15/409893.html

http://www.cnblogs.com/znlgis/p/4930990.html

http://blog.csdn.net/ljf10010/article/details/51424506

paho客户端示例

 

https://github.com/eclipse/paho.mqtt.java/tree/master/org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test

http://www.eclipse.org/paho/files/javadoc/index.html api文档

ibm客户端paho示例:

http://www.programcreek.com/java-api-examples/index.php?source_dir=streamsx.messaging-master/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttAsyncClientWrapper.java

技术分享
  1 package com.xxx.mqtt;
  2 
  3 import java.net.URI;
  4 
  5 import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
  6 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  7 import org.eclipse.paho.client.mqttv3.IMqttToken;
  8 import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
  9 import org.eclipse.paho.client.mqttv3.MqttCallback;
 10 import org.eclipse.paho.client.mqttv3.MqttClient;
 11 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 12 import org.eclipse.paho.client.mqttv3.MqttException;
 13 import org.eclipse.paho.client.mqttv3.MqttMessage;
 14 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 15 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 16 
 17 
 18 public class MyMqttClient implements MqttCallback {
 19 
 20     private static final MemoryPersistence DATA_STORE = new MemoryPersistence();
 21     private static final String topic = "mytopic";
 22 
 23     private String HOST = "127.0.0.1";
 24     private int PORT = 1883;
 25     private String USERNAME = "user";
 26     private String PASSWORD = "password";
 27     private String serverURIString = "tcp://" + HOST + ":" + PORT;
 28     
 29     String clientId = "client-1";
 30 
 31     MqttAsyncClient client;
 32     // Tokens
 33     IMqttToken connectToken;
 34     IMqttDeliveryToken pubToken;
 35 
 36 
 37     public static void main(String[] args) {
 38         MyMqttClient app = new MyMqttClient();
 39         app.asyncClient();
 40         try {
 41             Thread.sleep(20000);
 42             app.disconnect();
 43         } catch (Exception e) {
 44             e.printStackTrace();
 45         }
 46         System.out.println("end");
 47     }
 48 
 49     public void blockingClient() {
 50         
 51         try {
 52             MqttClient sampleClient = new MqttClient(serverURIString, clientId);
 53             MqttConnectOptions connOpts = new MqttConnectOptions();
 54             connOpts.setCleanSession(true);
 55             connOpts.setUserName(USERNAME);
 56             connOpts.setPassword(PASSWORD.toCharArray());
 57             System.out.println("Connecting to broker: " + serverURIString);
 58             sampleClient.connect(connOpts);
 59             sampleClient.subscribe("#", 1);
 60             System.out.println("Connected");
 61 //                System.out.println("Publish message: " + content);
 62 //                MqttMessage message = new MqttMessage(content.getBytes());
 63 //                message.setQos(qos);
 64             sampleClient.setCallback(this);
 65 //                sampleClient.publish(topic, message);
 66 //                System.out.println("Message published");
 67             try {
 68                 Thread.sleep(10000000);
 69                 System.out.println("Disconnected");
 70                 sampleClient.disconnect();
 71             } catch (Exception e) {
 72                 e.printStackTrace();
 73             }
 74 
 75         } catch (MqttException me) {
 76             System.out.println("reason " + me.getReasonCode());
 77             System.out.println("msg " + me.getMessage());
 78             System.out.println("loc " + me.getLocalizedMessage());
 79             System.out.println("cause " + me.getCause());
 80             System.out.println("except " + me);
 81             me.printStackTrace();
 82         }
 83     }
 84 
 85     public void asyncClient() {
 86         info(" MQTT init start.");
 87 
 88         // Tokens
 89         IMqttToken connectToken;
 90         IMqttDeliveryToken pubToken;
 91 
 92         // Client Options
 93         MqttConnectOptions options = new MqttConnectOptions();
 94         options.setCleanSession(false);
 95         options.setAutomaticReconnect(true);
 96 
 97         options.setUserName(USERNAME);
 98         options.setPassword(PASSWORD.toCharArray());
 99 
100         try {
101             client = new MqttAsyncClient(serverURIString, clientId);
102 
103             DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions();
104             disconnectedOpts.setBufferEnabled(true);
105             client.setBufferOpts(disconnectedOpts);
106 
107             connectToken = client.connect(options);
108             connectToken.waitForCompletion();//异步变成了同步。可以用IMqttCallbackListen..在connect时候设置回调。
109             boolean isConnected = client.isConnected();
110             info("Connection isConnected: " + isConnected);
111 
112             if (connectToken.isComplete() && connectToken.getException() == null && client.isConnected()) {
113                 info("[Connect:] Success: "); //$NON-NLS-1$ //$NON-NLS-2$ 
114                 client.setCallback(this);
115 
116             } else {
117                 info("[Connect:] faild: "); //$NON-NLS-1$ //$NON-NLS-2$ 
118             }
119 
120 //             MqttTopic topic = client.getTopic(topic); 
121 //             topic.
122             //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息    
123 //                options.setWill(topic, "close".getBytes(), 2, true);  
124 
125             IMqttToken subToken = client.subscribe("#", 1);
126 
127             subToken.waitForCompletion(1000);
128 
129             if (subToken.isComplete()) {
130                 info("subToken  complete.");
131                 if (subToken.getException() != null) {
132                     info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 
133                 }
134             } else {
135                 info("subToken not complete.");
136                 if (subToken.getException() != null) {
137                     info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 
138                 }
139             }
140 
141             info("init end");
142 
143         } catch (MqttException e) {
144             // TODO Auto-generated catch block
145             e.printStackTrace();
146         }
147 
148     }
149 
150 //String clientId, String topic, String message
151     public void send() {
152         String topic;
153         String message;
154         info("===Send Message start.===");
155         message = "Hello, boy.";
156         
157 
158         boolean isConnected = client.isConnected();
159         if (!isConnected) {
160             //no need. it will auto reconnect and send.
161         }
162 
163         // Publish Message
164         try {
165             pubToken = client.publish(topic, new MqttMessage(message.getBytes()));
166 
167             info("Publish attempted: isComplete:" + pubToken.isComplete());
168 
169             pubToken.waitForCompletion();
170         } catch (MqttPersistenceException e) {
171             // TODO Auto-generated catch block
172             e.printStackTrace();
173         } catch (MqttException e) {
174             // TODO Auto-generated catch block
175             e.printStackTrace();
176         }
177 
178         // Check that Message has been delivered
179         info("Message Delivered: " + pubToken.isComplete());
180         info("=== send end.====");
181     }
182 
183     void disconnect() {
184         IMqttToken disconnectToken;
185         try {
186             disconnectToken = client.disconnect();
187             disconnectToken.waitForCompletion();
188             client.close();
189         } catch (MqttException e) {
190             // TODO Auto-generated catch block
191             e.printStackTrace();
192         }
193         client = null;
194     }
195 
196     void info(String s) {
197         System.out.println(s);
198     }
199 
200     public void connectionLost(Throwable thrwbl) {
201         // TODO Auto-generated method stub
202         info("connectionLost");
203 
204         info("MQTT is disconnected from topic: {}. Message: {}. Cause: {}" + topic + thrwbl.getMessage() + thrwbl.getCause().getMessage());
205         thrwbl.printStackTrace();
206 
207     }
208 
209     public void deliveryComplete(IMqttDeliveryToken arg0) {
210         // TODO Auto-generated method stub
211         info("deliveryComplete");
212 
213     }
214 
215     public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
216         // TODO Auto-generated method stub
217         String message = new String(arg1.getPayload());
218         String topic = arg0;
219 
220         info("xxx Receive : topic=" + topic + "; message=" + message);
221 
222     }
223 }
View Code
技术分享
 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3     <modelVersion>4.0.0</modelVersion>
 4 
 5     <groupId>com.italktv.mqtt.client</groupId>
 6     <artifactId>mqttclient</artifactId>
 7     <version>0.0.1-SNAPSHOT</version>
 8     <packaging>jar</packaging>
 9 
10     <name>mqttclient</name>
11     <url>http://maven.apache.org</url>
12 
13     <properties>
14         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15     </properties>
16 
17     <dependencies>
18         <dependency>
19             <groupId>junit</groupId>
20             <artifactId>junit</artifactId>
21             <version>3.8.1</version>
22             <scope>test</scope>
23         </dependency>
24 
25 
26         <dependency>
27             <groupId>org.eclipse.paho</groupId>
28             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
29             <version>1.1.0</version>
30         </dependency>
31 
32     </dependencies>
33 </project>
View Code

上面是maven管理项目的pom.xml

MQTT 消息 发布 订阅

标签:locking   意思   url   method   技术分享   sdn   sync   archive   qos   

原文地址:http://www.cnblogs.com/bigben0123/p/5985562.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!