码迷,mamicode.com
首页 > 编程语言 > 详细

java后端IM消息推送服务开发——协议

时间:2016-06-28 14:34:06      阅读:405      评论:0      收藏:0      [点我收藏+]

标签:

最近在一家saas企业使用Mqtt开发IM消息推送服务,把开发中的一些问题记录下来,项目仍在商用中,完整的消息服务包括4个模块---协议protocol,信令Signal,规则Rule,状态Status,这个主题主要是协议protocol部分。

主要技术涉及到MongoDB,webservice,httpclient,Mqtt等

protocol分为四个模块类来实现,当然这是为了以后的扩展性比较好

首先看一下我们的主类,主要是mqtt基础方法的一个框架

public class MqttProtocol 
{
	private static Logger logger = Logger.getLogger(MqttProtocol.class);  
    public static final String HOST = "tcp://xx.xx.xx.xx:1883";
    private static final String CLIENTID = "yyyy";
    private MqttClient client;
    private MqttConnectOptions options = new MqttConnectOptions();
    //private String userName = "admin";
    //private String passWord = "public";
    public MqttMessage message;
    private PushCallback callback;
    /**
     * 用于初始化mqttclient客户端,设置回调函数,同时连接mqtt服务器
     * @throws MqttException
     */
       public MqttProtocol() throws MqttException 
       {
           //MemoryPersistence设置clientid的保存形式,默认为以内存保存
           client = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
           callback = new PushCallback();
           client.setCallback(callback);
           options = new MqttConnectOptions();
           options.setCleanSession(false);
           options.setKeepAliveInterval(60);
           connect();
       }
       /**
        * 连接mqtt消息服务器,同时设置了断开重连的功能,主要是为了高可用性考虑,在断网服务器崩溃时候我们的程序仍然不会终止
        */
       private void connect() 
       {
      	 SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
      	 System.out.println(sdf.format(System.currentTimeMillis()));
           boolean tryConnecting = true;
           while (tryConnecting) {
             try {
               client.connect(options);
             } catch (Exception e1) {
          	   System.out.println("Connection attempt failed with '"+e1.getCause()+
                    "'. Retrying.");
             }
             if (client.isConnected()) {
          	   System.out.println("Connected.");
               tryConnecting = false;
             } else {
               pause();
             }
           }
       }
       private void pause() {
   	    try {
   	      Thread.sleep(1000);
   	    } catch (InterruptedException e) {
   	      // Error handling goes here...
   	    }
   	  }
       /**
        * 
        * @param topic
        * @param qos
        * @throws MqttPersistenceException
        * @throws MqttException
        * 订阅相关主题
        */
       public void subscribe(String topic , int qos) throws MqttPersistenceException,
       		MqttException 
       {
      	 client.subscribe(topic, qos);    	
       }
       /**
        * 
        * @throws MqttPersistenceException
        * @throws MqttException
        * 断开连接服务器
        */
       public void disconnect() throws MqttPersistenceException,
  		MqttException 
  	 {
      	 client.disconnect();
       }
       /**
        * 
        * @author binshi
        *实现mqttcallback接口,主要用于接收消息后的处理方法
        */
       private class PushCallback implements MqttCallback { 
      	 /**
      	  * 断开后 系统会自动调用这个函数,同时在这个函数里进行重连操作
      	  */
   	    public void connectionLost(Throwable cause) {  
   	        // 连接丢失后,一般在这里面进行重连  
   	    	System.out.println("连接断开,可以做重连");  
   	        connect();
   	        try {
				subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);
			} catch (MqttPersistenceException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
   	    } 
   	    /**
   	     * 消息成功传送后,系统会自动调用此函数,表明成功向topic发送消息
   	     */
   		@Override
  		public void deliveryComplete(IMqttDeliveryToken arg0) {
  			// TODO Auto-generated method stub
   			System.out.println("deliveryComplete---------" + arg0.isComplete());
  		}
   		/**
   		 * 连接mongo数据库,返回关于具体collection的Mongocollection
   		 * @param collectionname
   		 * @return
   		 */
   		
   		public void messageArrived(String topic, MqttMessage message) throws Exception
   		{
			System.out.println(topic);
	    	SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
	    	System.out.println(sdf.format(System.currentTimeMillis()));
	    	System.out.println("接收消息主题 : " + topic);  
	    	System.out.println("接收消息Qos : " + message.getQos());  
	    	System.out.println("接收消息内容 : " + new String(message.getPayload()));
	    	//1 抽取事件信令消息
		    String messagejudge=new String(message.getPayload());
		    System.out.println("忽略所有robot消息以及offline离线消息");
		    JSONObject jo=new JSONObject();
			try {
				 jo=JSONObject.fromObject(messagejudge);		
			} catch (Exception e) {							
				e.printStackTrace();
			}
			String from=jo.getString("from");
			System.out.println("获得from"+from);
			System.out.println("确定消息是否包含offline,如果包含取得offline,为1就不处理");
			String offline=null;
			if(messagejudge.contains("offline"))
			{
				offline=jo.getString("offline");
			}
			if((offline==null)&&(!from.contains("robot")))
			{
				System.out.println("处理非系统消息和非离线消息");
				String type=jo.getString("type");
				System.out.println("获得type"+type);
				if(type.equals("shakehand"))
				{
					System.out.println("处理shakehand消息");
					String admin="doyounkowwhy";
					if(jo.toString().contains("admin"))
					{
						admin=jo.getString("admin");
					}
					System.out.println("取得admin 如果为1定义为客服,否则为普通用户 admin为"+admin);
					if(admin.equals("1"))
					{
						System.out.println("处理客服握手消息");
						System.out.println("发送握手成功消息");
						MqttTopic retopic=client.getTopic(topic);
						MsgOperation.sendSysMsgToClient(from,"0", "1005", "握手成功", null,retopic);
						System.out.println("向客户端发送离线未接收的消息");
						String convid=jo.getString("convid");
						String database="dolina";
						String collection="messages";
						MongoDBDao.getMongoDBDaoInstance().sendOfflineMsgToClient(from, convid,retopic,database,collection);
					}
					else
					{
						System.out.println("处理普通用户的握手消息");
						String appid=jo.getString("appid");
						String pageid=jo.getString("pageid");
						String convid=jo.getString("convid");
						MqttTopic retopic=client.getTopic(topic);
						MsgOperation.sendShakeHandInfo(from,convid,appid,pageid,retopic);
					}
				}
				else if(type.equals("text")||type.equals("image"))
				{
					System.out.println("处理图片和文字消息");
					String tmpindex=jo.getString("tmpindex");
					String convid=jo.getString("convid");
					MqttTopic retopic=client.getTopic(topic);
					MsgOperation.getTextMsg( tmpindex, from, convid, retopic);
					System.out.println("保存图片文字消息");
					String database="dolina";
					String collection="messages";
					MongoDBDao.getMongoDBDaoInstance().saveTextMsg(database,collection,jo);
				}
				else if(type.equals("ack"))
				{
					System.out.println("处理ack消息");
					String tmpindex=jo.getString("tmpindex");
					String convid=jo.getString("convid");
					String database="dolina";
					String collection="messages";
					MongoDBDao.getMongoDBDaoInstance().getAck(tmpindex,convid,from,database,collection);
				}
			}
		    
		   
   		}
   	}
       /**
        * 
        * @param args
        * @throws MqttException
        * 整个工程从这里开始执行,生成可执行jar包,这个设置为主类。
        */
       public static void main(String[] args) throws MqttException 
       {  
    	   MqttProtocol signal = new MqttProtocol();
           signal.message = new MqttMessage();
           /**
           server.message.setQos(2);
           server.message.setRetained(false);
           server.message.setPayload("给客户端124推送的信息".getBytes()); 
           server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2);
           */
           signal.subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);
           System.out.println(signal.message.isRetained() + "------ratained状态");
  	}
}
接下来使我们的远程连接模块,主要是通过给定的url调用远程接口

public class RemoteOperation
{
	private static Logger logger = Logger.getLogger(MqttProtocol.class);  
	public static JSONObject remoteCall(String url) throws HttpException, IOException
		{
			HttpClient httpClient = new HttpClient(); 
	    	GetMethod method =null ;
	    	method=new GetMethod(url);
	    	int retcode = httpClient.executeMethod(method);
	    	if (retcode != HttpStatus.SC_OK)
	    	{// 发送不成功  
	          logger.info("远程调用出错"); 
	          return null;
	        }
	    	else 
	        {  
	        	String body = method.getResponseBodyAsString();  
	        	logger.info(body+"远程调用php成功");
	        	JSONObject jsonObject=new JSONObject();
				try {
					 jsonObject=JSONObject.fromObject(body);		
				} catch (Exception e) {							
					e.printStackTrace();
				}
				if (method != null) 
			    {  
			        method.releaseConnection();  
			    } 
				return jsonObject;
	       } 
	      
	       
		}
}

下面是Mongo数据库的相关操作的一个封装,设计为单例模式,相当于每次都使用同一个client打开连接,类似于连接池的概念,当然业务逻辑部分可以更换

public class MongoDBDao
{
	private static Logger logger = Logger.getLogger(MongoDBDao.class);
	/** 
     * MongoClient的实例代表数据库连接池,是线程安全的,可以被多线程共享,客户端在多线程条件下仅维持一个实例即可 
     * Mongo是非线程安全的,目前mongodb API中已经建议用MongoClient替代Mongo 
     */  
    private MongoClient mongoClient = null;  
    /** 
     *  
     * 私有的构造函数 
     * 作者:shibin
     */  
    private MongoDBDao(){  
        if(mongoClient == null){  
        	String url = Constant.MONGO_MQTT_URL;
    	    String user = Constant.MONGO_MQTT_USER;
    	    String password = Constant.MONGO_MQTT_PASSWORD;
    	    String database = Constant.MONGO_MQTT_DATABASE;
    	    int port = 27017;
    	    ServerAddress serverAddress = new ServerAddress(url, port);
            List<ServerAddress> serverAddresses = new ArrayList<ServerAddress>();
            serverAddresses.add(serverAddress);
            MongoCredential credential = MongoCredential.createCredential(user, database, password.toCharArray());
            List<MongoCredential> credentials = new ArrayList<MongoCredential>();
            credentials.add(credential);
            mongoClient = new MongoClient(serverAddresses, credentials);
            System.out.println(mongoClient);
            System.out.println("初始化client完成");
        }
    }  
      
    /********单例模式声明开始,采用饿汉式方式生成,保证线程安全********************/  
      
    //类初始化时,自行实例化,饿汉式单例模式  
    private static final MongoDBDao mongoDBDao = new MongoDBDao();  
    /** 
     *  
     * 方法名:getMongoDBDaoImplInstance 
     * 作者:shibin 
     * 
     * 描述:单例的静态工厂方法 
     * @return 
     */  
    public static MongoDBDao getMongoDBDaoInstance(){  
        return mongoDBDao;  
    }  
    public  void sendOfflineMsgToClient(String from, String convid,MqttTopic retopic,String database,String collection) throws MqttPersistenceException, MqttException
	{
		System.out.println("获得message的连接");
		MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
        MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
		System.out.println("取得convid所对应的msg列表");
		BasicDBObject query = new BasicDBObject();
		query.put("_id", convid);
		FindIterable<Document> iterable=null;
		iterable = mongoCollection.find(query);
		if(iterable.first()!=null)
		{
			System.out.println(iterable.first());
			String res= iterable.first().toJson();
			 JSONObject jo=new JSONObject();
			try {
				 jo=JSONObject.fromObject(res);		
			} catch (Exception e) {							
				e.printStackTrace();
			}
			JSONArray jsonArray=jo.getJSONArray("msg");
			for(int i=0;i<jsonArray.length();i++)
			{
				String read=jsonArray.getJSONObject(i).getString("read");
				System.out.println("获得msg对应的第"+i+"条记录的read信息"+read);
				System.out.println("判断read是否包含from的信息,如果不包含且这条消息不是他自己发的就给她发送这条消息");
				if(!read.contains(from)&&!jsonArray.getJSONObject(i).getString("from").equals(from))
				{
					System.out.println("获得这条消息的原型,然后加上offline=1并发送消息");
					JSONObject msg=jsonArray.getJSONObject(i);
					msg.put("offline", "1");
					retopic.publish(msg.toString().getBytes(), 0, false);
				}
				else
				{
					System.out.println("no  offline message for "+from);
				}
			}
		}
	}
    public  void saveTextMsg(String database,String collection,JSONObject jo)
	{
    	MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
        MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
		BasicDBObject query = new BasicDBObject();
		String convid=jo.getString("convid");
    query.put("_id", convid);
    FindIterable iterable;
    iterable = mongoCollection.find(query);
    System.out.println("更新message之前的值"+iterable.first());
	Bson filter = Filters.eq("_id", convid);
	Document content = new Document();
	
	String type=jo.getString("type");
	if(type.equals("text"))
	{
		String contentMsg=jo.getJSONObject("content").getString("content");
		content.put("content", contentMsg);
	}
	else
	{
		String url=jo.getJSONObject("content").getString("url");
		content.put("url", url);
	}
	String admin=jo.getJSONObject("extra").getString("admin");
	String headimgurl=jo.getJSONObject("extra").getString("headimgurl");
	String nickname=jo.getJSONObject("extra").getString("nickname");
	String from=jo.getString("from");
	String tmpindex=jo.getString("tmpindex");
	Document extra = new Document();
	extra.put("nickname", nickname);
	Document doc = new Document();
	doc.put("from",from );
	ArrayList<String> read=new ArrayList<String>();
	doc.put("read", read);
	Document tdoc = new Document();
	tdoc.put("msg", doc);
	UpdateOptions updateOptions=new UpdateOptions();
	updateOptions.upsert(true);
	mongoCollection.updateOne(filter, new Document("$addToSet", tdoc), updateOptions);
    iterable = mongoCollection.find(query);
    System.out.println("更新message之后的值"+iterable.first());
	}
public  void getAck(String tmpindex,String convid,String from,String database,String collection)
	{
		System.out.println("接收到ack消息后更新message中的read字段");
		MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
        MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
   	BasicDBObject query = new BasicDBObject();
    query.put("_id", convid);
    query.put("msg.tmpindex", tmpindex);
    BasicDBObject query1 = new BasicDBObject();
   	query1.put("_id", convid);
   	FindIterable iterable;
	FindIterable iterable2;
   	iterable = mongoCollection.find(query1);
	iterable2 = mongoCollection.find(query);
   	System.out.println("更新message满足id过滤条件之前的值"+iterable.first());
	System.out.println("更新message满足id和tmpindex过滤条件之前的值"+iterable2.first());
   	if(iterable2.first()!=null)
   	{
   		Document doc = new Document();
	   	doc.put("msg.$.read", from);
	   	UpdateOptions updateOptions=new UpdateOptions();
	   	updateOptions.upsert(true);
	   	mongoCollection.updateOne(query, new Document("$addToSet", doc), updateOptions);
   	}
   	
   	iterable = mongoCollection.find(query1);
   	System.out.println("更新messages之后的值"+iterable.first());
		
	}
}

剩下的关于业务逻辑方面的就不多说了,主要是关于mqtt高可用性断开重连的功能以及mongo相关的操作

java后端IM消息推送服务开发——协议

标签:

原文地址:http://blog.csdn.net/a2274335673/article/details/51774919

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!