标签:
SimpleMqttClient.java
import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; public class SimpleMqttClient implements MqttCallback { MqttClient myClient; MqttConnectOptions connOpt; static final String BROKER_URL = "tcp://q.m2m.io:1883"; static final String M2MIO_DOMAIN = "<Insert m2m.io domain here>"; static final String M2MIO_STUFF = "things"; static final String M2MIO_THING = "<Unique device ID>"; static final String M2MIO_USERNAME = "<m2m.io username>"; static final String M2MIO_PASSWORD_MD5 = "<m2m.io password (MD5 sum of password)>"; // the following two flags control whether this example is a publisher, a subscriber or both static final Boolean subscriber = true; static final Boolean publisher = true; /** * * connectionLost * This callback is invoked upon losing the MQTT connection. * */ @Override public void connectionLost(Throwable t) { System.out.println("Connection lost!"); // code to reconnect to the broker would go here if desired } /** * * deliveryComplete * This callback is invoked when a message published by this client * is successfully received by the broker. * */ @Override public void deliveryComplete(MqttDeliveryToken token) { //System.out.println("Pub complete" + new String(token.getMessage().getPayload())); } /** * * messageArrived * This callback is invoked when a message is received on a subscribed topic. * */ @Override public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception { System.out.println("-------------------------------------------------"); System.out.println("| Topic:" + topic.getName()); System.out.println("| Message: " + new String(message.getPayload())); System.out.println("-------------------------------------------------"); } /** * * MAIN * */ public static void main(String[] args) { SimpleMqttClient smc = new SimpleMqttClient(); smc.runClient(); } /** * * runClient * The main functionality of this simple example. * Create a MQTT client, connect to broker, pub/sub, disconnect. * */ public void runClient() { // setup MQTT Client String clientID = M2MIO_THING; connOpt = new MqttConnectOptions(); connOpt.setCleanSession(true); connOpt.setKeepAliveInterval(30); connOpt.setUserName(M2MIO_USERNAME); connOpt.setPassword(M2MIO_PASSWORD_MD5.toCharArray()); // Connect to Broker try { myClient = new MqttClient(BROKER_URL, clientID); myClient.setCallback(this); myClient.connect(connOpt); } catch (MqttException e) { e.printStackTrace(); System.exit(-1); } System.out.println("Connected to " + BROKER_URL); // setup topic // topics on m2m.io are in the form <domain>/<stuff>/<thing> String myTopic = M2MIO_DOMAIN + "/" + M2MIO_STUFF + "/" + M2MIO_THING; MqttTopic topic = myClient.getTopic(myTopic); // subscribe to topic if subscriber if (subscriber) { try { int subQoS = 0; myClient.subscribe(myTopic, subQoS); } catch (Exception e) { e.printStackTrace(); } } // publish messages if publisher if (publisher) { for (int i=1; i<=10; i++) { String pubMsg = "{\"pubmsg\":" + i + "}"; int pubQoS = 0; MqttMessage message = new MqttMessage(pubMsg.getBytes()); message.setQos(pubQoS); message.setRetained(false); // Publish the message System.out.println("Publishing to topic \"" + topic + "\" qos " + pubQoS); MqttDeliveryToken token = null; try { // publish message to broker token = topic.publish(message); // Wait until the message has been delivered to the broker token.waitForCompletion(); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } } // disconnect try { // wait to ensure subscribed messages are delivered if (subscriber) { Thread.sleep(5000); } myClient.disconnect(); } catch (Exception e) { e.printStackTrace(); } } }
标签:
原文地址:http://my.oschina.net/u/221951/blog/491759