标签:
最近在一家saas企业使用Mqtt开发IM消息推送服务,把开发中的一些问题记录下来,项目仍在商用中,完整的消息服务包括4个模块---协议protocol,信令Signal,规则Rule,状态Status,这个主题主要是协议protocol部分。
主要技术涉及到MongoDB,webservice,httpclient,Mqtt等
protocol分为四个模块类来实现,当然这是为了以后的扩展性比较好
首先看一下我们的主类,主要是mqtt基础方法的一个框架
public class MqttProtocol { private static Logger logger = Logger.getLogger(MqttProtocol.class); public static final String HOST = "tcp://xx.xx.xx.xx:1883"; private static final String CLIENTID = "yyyy"; private MqttClient client; private MqttConnectOptions options = new MqttConnectOptions(); //private String userName = "admin"; //private String passWord = "public"; public MqttMessage message; private PushCallback callback; /** * 用于初始化mqttclient客户端,设置回调函数,同时连接mqtt服务器 * @throws MqttException */ public MqttProtocol() throws MqttException { //MemoryPersistence设置clientid的保存形式,默认为以内存保存 client = new MqttClient(HOST, CLIENTID, new MemoryPersistence()); callback = new PushCallback(); client.setCallback(callback); options = new MqttConnectOptions(); options.setCleanSession(false); options.setKeepAliveInterval(60); connect(); } /** * 连接mqtt消息服务器,同时设置了断开重连的功能,主要是为了高可用性考虑,在断网服务器崩溃时候我们的程序仍然不会终止 */ private void connect() { SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS); System.out.println(sdf.format(System.currentTimeMillis())); boolean tryConnecting = true; while (tryConnecting) { try { client.connect(options); } catch (Exception e1) { System.out.println("Connection attempt failed with '"+e1.getCause()+ "'. Retrying."); } if (client.isConnected()) { System.out.println("Connected."); tryConnecting = false; } else { pause(); } } } private void pause() { try { Thread.sleep(1000); } catch (InterruptedException e) { // Error handling goes here... } } /** * * @param topic * @param qos * @throws MqttPersistenceException * @throws MqttException * 订阅相关主题 */ public void subscribe(String topic , int qos) throws MqttPersistenceException, MqttException { client.subscribe(topic, qos); } /** * * @throws MqttPersistenceException * @throws MqttException * 断开连接服务器 */ public void disconnect() throws MqttPersistenceException, MqttException { client.disconnect(); } /** * * @author binshi *实现mqttcallback接口,主要用于接收消息后的处理方法 */ private class PushCallback implements MqttCallback { /** * 断开后 系统会自动调用这个函数,同时在这个函数里进行重连操作 */ public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连"); connect(); try { subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 消息成功传送后,系统会自动调用此函数,表明成功向topic发送消息 */ @Override public void deliveryComplete(IMqttDeliveryToken arg0) { // TODO Auto-generated method stub System.out.println("deliveryComplete---------" + arg0.isComplete()); } /** * 连接mongo数据库,返回关于具体collection的Mongocollection * @param collectionname * @return */ public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(topic); SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS); System.out.println(sdf.format(System.currentTimeMillis())); System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + new String(message.getPayload())); //1 抽取事件信令消息 String messagejudge=new String(message.getPayload()); System.out.println("忽略所有robot消息以及offline离线消息"); JSONObject jo=new JSONObject(); try { jo=JSONObject.fromObject(messagejudge); } catch (Exception e) { e.printStackTrace(); } String from=jo.getString("from"); System.out.println("获得from"+from); System.out.println("确定消息是否包含offline,如果包含取得offline,为1就不处理"); String offline=null; if(messagejudge.contains("offline")) { offline=jo.getString("offline"); } if((offline==null)&&(!from.contains("robot"))) { System.out.println("处理非系统消息和非离线消息"); String type=jo.getString("type"); System.out.println("获得type"+type); if(type.equals("shakehand")) { System.out.println("处理shakehand消息"); String admin="doyounkowwhy"; if(jo.toString().contains("admin")) { admin=jo.getString("admin"); } System.out.println("取得admin 如果为1定义为客服,否则为普通用户 admin为"+admin); if(admin.equals("1")) { System.out.println("处理客服握手消息"); System.out.println("发送握手成功消息"); MqttTopic retopic=client.getTopic(topic); MsgOperation.sendSysMsgToClient(from,"0", "1005", "握手成功", null,retopic); System.out.println("向客户端发送离线未接收的消息"); String convid=jo.getString("convid"); String database="dolina"; String collection="messages"; MongoDBDao.getMongoDBDaoInstance().sendOfflineMsgToClient(from, convid,retopic,database,collection); } else { System.out.println("处理普通用户的握手消息"); String appid=jo.getString("appid"); String pageid=jo.getString("pageid"); String convid=jo.getString("convid"); MqttTopic retopic=client.getTopic(topic); MsgOperation.sendShakeHandInfo(from,convid,appid,pageid,retopic); } } else if(type.equals("text")||type.equals("image")) { System.out.println("处理图片和文字消息"); String tmpindex=jo.getString("tmpindex"); String convid=jo.getString("convid"); MqttTopic retopic=client.getTopic(topic); MsgOperation.getTextMsg( tmpindex, from, convid, retopic); System.out.println("保存图片文字消息"); String database="dolina"; String collection="messages"; MongoDBDao.getMongoDBDaoInstance().saveTextMsg(database,collection,jo); } else if(type.equals("ack")) { System.out.println("处理ack消息"); String tmpindex=jo.getString("tmpindex"); String convid=jo.getString("convid"); String database="dolina"; String collection="messages"; MongoDBDao.getMongoDBDaoInstance().getAck(tmpindex,convid,from,database,collection); } } } } /** * * @param args * @throws MqttException * 整个工程从这里开始执行,生成可执行jar包,这个设置为主类。 */ public static void main(String[] args) throws MqttException { MqttProtocol signal = new MqttProtocol(); signal.message = new MqttMessage(); /** server.message.setQos(2); server.message.setRetained(false); server.message.setPayload("给客户端124推送的信息".getBytes()); server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2); */ signal.subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2); System.out.println(signal.message.isRetained() + "------ratained状态"); } }接下来使我们的远程连接模块,主要是通过给定的url调用远程接口
public class RemoteOperation { private static Logger logger = Logger.getLogger(MqttProtocol.class); public static JSONObject remoteCall(String url) throws HttpException, IOException { HttpClient httpClient = new HttpClient(); GetMethod method =null ; method=new GetMethod(url); int retcode = httpClient.executeMethod(method); if (retcode != HttpStatus.SC_OK) {// 发送不成功 logger.info("远程调用出错"); return null; } else { String body = method.getResponseBodyAsString(); logger.info(body+"远程调用php成功"); JSONObject jsonObject=new JSONObject(); try { jsonObject=JSONObject.fromObject(body); } catch (Exception e) { e.printStackTrace(); } if (method != null) { method.releaseConnection(); } return jsonObject; } } }
public class MongoDBDao { private static Logger logger = Logger.getLogger(MongoDBDao.class); /** * MongoClient的实例代表数据库连接池,是线程安全的,可以被多线程共享,客户端在多线程条件下仅维持一个实例即可 * Mongo是非线程安全的,目前mongodb API中已经建议用MongoClient替代Mongo */ private MongoClient mongoClient = null; /** * * 私有的构造函数 * 作者:shibin */ private MongoDBDao(){ if(mongoClient == null){ String url = Constant.MONGO_MQTT_URL; String user = Constant.MONGO_MQTT_USER; String password = Constant.MONGO_MQTT_PASSWORD; String database = Constant.MONGO_MQTT_DATABASE; int port = 27017; ServerAddress serverAddress = new ServerAddress(url, port); List<ServerAddress> serverAddresses = new ArrayList<ServerAddress>(); serverAddresses.add(serverAddress); MongoCredential credential = MongoCredential.createCredential(user, database, password.toCharArray()); List<MongoCredential> credentials = new ArrayList<MongoCredential>(); credentials.add(credential); mongoClient = new MongoClient(serverAddresses, credentials); System.out.println(mongoClient); System.out.println("初始化client完成"); } } /********单例模式声明开始,采用饿汉式方式生成,保证线程安全********************/ //类初始化时,自行实例化,饿汉式单例模式 private static final MongoDBDao mongoDBDao = new MongoDBDao(); /** * * 方法名:getMongoDBDaoImplInstance * 作者:shibin * * 描述:单例的静态工厂方法 * @return */ public static MongoDBDao getMongoDBDaoInstance(){ return mongoDBDao; } public void sendOfflineMsgToClient(String from, String convid,MqttTopic retopic,String database,String collection) throws MqttPersistenceException, MqttException { System.out.println("获得message的连接"); MongoDatabase mongoDatabase = mongoClient.getDatabase(database); MongoCollection mongoCollection = mongoDatabase.getCollection(collection); System.out.println("取得convid所对应的msg列表"); BasicDBObject query = new BasicDBObject(); query.put("_id", convid); FindIterable<Document> iterable=null; iterable = mongoCollection.find(query); if(iterable.first()!=null) { System.out.println(iterable.first()); String res= iterable.first().toJson(); JSONObject jo=new JSONObject(); try { jo=JSONObject.fromObject(res); } catch (Exception e) { e.printStackTrace(); } JSONArray jsonArray=jo.getJSONArray("msg"); for(int i=0;i<jsonArray.length();i++) { String read=jsonArray.getJSONObject(i).getString("read"); System.out.println("获得msg对应的第"+i+"条记录的read信息"+read); System.out.println("判断read是否包含from的信息,如果不包含且这条消息不是他自己发的就给她发送这条消息"); if(!read.contains(from)&&!jsonArray.getJSONObject(i).getString("from").equals(from)) { System.out.println("获得这条消息的原型,然后加上offline=1并发送消息"); JSONObject msg=jsonArray.getJSONObject(i); msg.put("offline", "1"); retopic.publish(msg.toString().getBytes(), 0, false); } else { System.out.println("no offline message for "+from); } } } } public void saveTextMsg(String database,String collection,JSONObject jo) { MongoDatabase mongoDatabase = mongoClient.getDatabase(database); MongoCollection mongoCollection = mongoDatabase.getCollection(collection); BasicDBObject query = new BasicDBObject(); String convid=jo.getString("convid"); query.put("_id", convid); FindIterable iterable; iterable = mongoCollection.find(query); System.out.println("更新message之前的值"+iterable.first()); Bson filter = Filters.eq("_id", convid); Document content = new Document(); String type=jo.getString("type"); if(type.equals("text")) { String contentMsg=jo.getJSONObject("content").getString("content"); content.put("content", contentMsg); } else { String url=jo.getJSONObject("content").getString("url"); content.put("url", url); } String admin=jo.getJSONObject("extra").getString("admin"); String headimgurl=jo.getJSONObject("extra").getString("headimgurl"); String nickname=jo.getJSONObject("extra").getString("nickname"); String from=jo.getString("from"); String tmpindex=jo.getString("tmpindex"); Document extra = new Document(); extra.put("nickname", nickname); Document doc = new Document(); doc.put("from",from ); ArrayList<String> read=new ArrayList<String>(); doc.put("read", read); Document tdoc = new Document(); tdoc.put("msg", doc); UpdateOptions updateOptions=new UpdateOptions(); updateOptions.upsert(true); mongoCollection.updateOne(filter, new Document("$addToSet", tdoc), updateOptions); iterable = mongoCollection.find(query); System.out.println("更新message之后的值"+iterable.first()); } public void getAck(String tmpindex,String convid,String from,String database,String collection) { System.out.println("接收到ack消息后更新message中的read字段"); MongoDatabase mongoDatabase = mongoClient.getDatabase(database); MongoCollection mongoCollection = mongoDatabase.getCollection(collection); BasicDBObject query = new BasicDBObject(); query.put("_id", convid); query.put("msg.tmpindex", tmpindex); BasicDBObject query1 = new BasicDBObject(); query1.put("_id", convid); FindIterable iterable; FindIterable iterable2; iterable = mongoCollection.find(query1); iterable2 = mongoCollection.find(query); System.out.println("更新message满足id过滤条件之前的值"+iterable.first()); System.out.println("更新message满足id和tmpindex过滤条件之前的值"+iterable2.first()); if(iterable2.first()!=null) { Document doc = new Document(); doc.put("msg.$.read", from); UpdateOptions updateOptions=new UpdateOptions(); updateOptions.upsert(true); mongoCollection.updateOne(query, new Document("$addToSet", doc), updateOptions); } iterable = mongoCollection.find(query1); System.out.println("更新messages之后的值"+iterable.first()); } }
标签:
原文地址:http://blog.csdn.net/a2274335673/article/details/51774919