码迷,mamicode.com
首页 > 其他好文 > 详细

自定义协议传输

时间:2018-04-04 16:16:43      阅读:218      评论:0      收藏:0      [点我收藏+]

标签:connected   http   min   slf4j   col   play   boolean   java.net   dispose   

step1:协议格式

技术分享图片

step2:根据协议定义出对应的模型

技术分享图片
  1 package com.superb.mina.entity;
  2 
  3 import java.io.ByteArrayOutputStream;
  4 import java.io.DataOutputStream;
  5 import java.io.IOException;
  6 import java.io.Serializable;
  7 import java.nio.charset.Charset;
  8 
  9 /**
 10  * 协议包
 11  * @author sundg
 12  *
 13  * 2018年3月30日上午10:55:33
 14  */
 15 public class BaseMsg implements Serializable {
 16     /** 序列号*/
 17     private static final long serialVersionUID = -4614096987747485330L;
 18     public static int HEAD_SIZE =18;
 19     /**报文总长度*/
 20     private Integer length;
 21     /**序列号*/
 22     private Integer sequence;
 23     /**命令号*/
 24     private Integer command;
 25     /**保留字*/
 26     private Integer reserved;
 27     /**数据段*/
 28     private byte[] data;
 29     
 30     private Integer separator=0xFFFF;
 31     
 32     
 33     public static long getSerialversionuid() {
 34         return serialVersionUID;
 35     }
 36 
 37     public void setLength(Integer length) {
 38         this.length = length;
 39     }
 40 
 41     public Integer getLength() {
 42         return length;
 43     }
 44 
 45     public Integer getSequence() {
 46         return sequence;
 47     }
 48 
 49     public void setSequence(Integer sequence) {
 50         this.sequence = sequence;
 51     }
 52 
 53     public Integer getCommand() {
 54         return command;
 55     }
 56 
 57     public void setCommand(Integer command) {
 58         this.command = command;
 59     }
 60 
 61     public Integer getReserved() {
 62         return reserved;
 63     }
 64 
 65     public void setReserved(Integer reserved) {
 66         this.reserved = reserved;
 67     }
 68 
 69     public byte[] getData() {
 70         return data;
 71     }
 72 
 73     public void setData(byte[] data) {
 74         this.data = data;
 75         if (data != null) {
 76             this.length = HEAD_SIZE + data.length;
 77         } else {
 78             this.length = HEAD_SIZE;
 79         }
 80     }
 81 
 82     public Integer getSeparator() {
 83         return separator;
 84     }
 85 
 86     public void setSeparator(Integer separator) {
 87         this.separator = separator;
 88     }
 89     
 90     /**
 91      * 将要发送的信息保存为二进制流进行传输
 92      * @return
 93      */
 94     public byte[] toBytes() {
 95         byte[] ret = null;
 96         try {
 97             ByteArrayOutputStream bos = new ByteArrayOutputStream(this.length);
 98             DataOutputStream dos = new DataOutputStream(bos);
 99 
100             dos.writeInt(this.length);
101             dos.writeInt(this.sequence);
102             dos.writeInt(this.command);
103             if(this.reserved==null){
104                 dos.writeInt(0);
105             }else{
106                 dos.writeInt(this.reserved);
107             }
108             if (this.data != null) {
109                 dos.write(this.data);
110             }
111             dos.writeShort(0xFFFF);
112             ret = bos.toByteArray();
113         } catch (IOException e) {
114             e.printStackTrace();
115         }
116 
117         return ret;
118     }
119     
120 }
协议包

step3:自定义编码解码器

技术分享图片
 1 package com.superb.mina.coder;
 2 
 3 import org.apache.mina.core.session.IoSession;
 4 import org.apache.mina.filter.codec.ProtocolCodecFactory;
 5 import org.apache.mina.filter.codec.ProtocolDecoder;
 6 import org.apache.mina.filter.codec.ProtocolEncoder;
 7 
 8 public class SuperbProtocolCodecFactory implements ProtocolCodecFactory {
 9     private ProtocolDecoder decoder;
10     private ProtocolEncoder encoder;
11 
12     public SuperbProtocolCodecFactory() {
13         this.encoder = new SuperbProtocolEncoder();
14         this.decoder = new SuperbProtocolDecoder();
15     }
16 
17     @Override
18     public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
19         return this.decoder;
20     }
21 
22     @Override
23     public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
24         return this.encoder;
25     }
26 }
编码解码器工厂
技术分享图片
 1 package com.superb.mina.coder;
 2 
 3 import org.apache.mina.core.buffer.IoBuffer;
 4 import org.apache.mina.core.session.IoSession;
 5 import org.apache.mina.filter.codec.ProtocolEncoder;
 6 import org.apache.mina.filter.codec.ProtocolEncoderOutput;
 7 
 8 import com.superb.mina.entity.BaseMsg;
 9 
10 public class SuperbProtocolEncoder implements ProtocolEncoder {
11 
12     @Override
13     public void dispose(IoSession session) throws Exception {
14         // nothing to dispose
15     }
16 
17     @Override
18     public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
19         BaseMsg msg = (BaseMsg) message;
20         byte[] bys = msg.toBytes();
21 
22         IoBuffer buffer = IoBuffer.allocate(bys.length, false);
23         buffer.put(bys);
24         buffer.flip();
25         out.write(buffer);
26     }
27 }
编码器
技术分享图片
 1 package com.superb.mina.coder;
 2 
 3 import org.apache.mina.core.buffer.BufferDataException;
 4 import org.apache.mina.core.buffer.IoBuffer;
 5 import org.apache.mina.core.session.IoSession;
 6 import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
 7 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 8 
 9 import com.superb.mina.entity.BaseMsg;
10 
11 
12 public class SuperbProtocolDecoder extends CumulativeProtocolDecoder {
13     private boolean prefixedDataAvailable(IoBuffer in) {
14         if (in.remaining() < 4) {
15             return false;
16         }
17 
18         int dataLength = in.getInt(in.position());
19 
20         if (dataLength < BaseMsg.HEAD_SIZE) {
21             throw new BufferDataException("dataLength: " + dataLength);
22         }
23 
24         return in.remaining() >= dataLength;
25     }
26 
27     @Override
28     protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
29         if (!prefixedDataAvailable(in)) {
30             return false;
31         }
32 
33         BaseMsg msg = new BaseMsg();
34         int length = in.getInt();
35         msg.setLength(length);
36         msg.setSequence(in.getInt());
37         msg.setCommand(in.getInt());
38         msg.setReserved(in.getInt());
39         
40         int body_len = length - BaseMsg.HEAD_SIZE;
41 
42         if (body_len < 0 ) {
43             throw new Exception("body length error.(" + body_len + ")");
44         }
45 
46         if (body_len > 0) {
47             byte[] body = new byte[body_len];
48             in.get(body);
49             msg.setData(body);
50         }
51         in.getShort();
52         msg.setSeparator(0xFFFF);
53         out.write(msg);
54         return true;
55     }
56 }
解码器

step4:编写对应的客户端与服务器端

技术分享图片
 1 package com.superb.mina.clientServer;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.util.Date;
 5 import java.util.HashMap;
 6 import java.util.Map;
 7 
 8 import org.apache.mina.core.future.ConnectFuture;
 9 import org.apache.mina.core.service.IoConnector;
10 import org.apache.mina.core.session.IoSession;
11 import org.apache.mina.filter.codec.ProtocolCodecFilter;
12 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
13 import org.apache.mina.transport.socket.nio.NioSocketConnector;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16 
17 import com.superb.mina.clientServer.handler.ClientServerHandler;
18 import com.superb.mina.coder.MinaProtobufDecoder;
19 import com.superb.mina.coder.MinaProtobufEncoder;
20 import com.superb.mina.coder.SuperbProtocolCodecFactory;
21 import com.superb.mina.coder.SuperbProtocolDecoder;
22 import com.superb.mina.coder.SuperbProtocolEncoder;
23 import com.superb.mina.entity.BaseMsg;
24 import com.superb.mina.entity.Init;
25 
26 
27 public class ClientServer {
28     private static Logger logger=LoggerFactory.getLogger(ClientServer.class);
29     private static String HOST="192.168.1.115";
30     private static int PORT=8003;
31     public static void main(String[] args) {
32         IoConnector connector=null;
33         connector=new NioSocketConnector();
34         connector.setConnectTimeoutMillis(30000);
35         connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SuperbProtocolCodecFactory()));
36         connector.setHandler(new ClientServerHandler());
37         IoSession session=null;
38         try {
39             ConnectFuture future= connector.connect(new InetSocketAddress(HOST,PORT));
40             future.awaitUninterruptibly();
41             session=future.getSession();
42             BaseMsg msg=new BaseMsg();
43             Thread t=new Thread();
44             int i=0;
45             while(true){
46                 i++;
47                 Init init=new Init();
48                 init.setAc1(25);
49                 init.setAc1FaltAlertEnable(1);
50                 init.setcTime(new Date());
51                 byte[] bytes=init.getData(init);
52                 msg.setSequence(i);
53                 msg.setCommand(100);
54                 msg.setData(bytes);
55                 session.write(msg);
56                 t.sleep(8000);
57             }
58         } catch (Exception e) {
59             logger.info("客户端连接异常....",e);
60         }
61     }
62     
63     /**
64      * 尝试与服务器建立连接,如果连接成功返回IoSession 
65      * @param ip 服务器IP地址
66      * @param port 服务器端口号
67      * @return 返回连接成功之后的IoSession
68      */
69     public static IoSession connectToServer(String ip,int port){
70         IoConnector connector=null;
71         connector=new NioSocketConnector();
72         connector.setConnectTimeoutMillis(30000);
73         connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SuperbProtocolCodecFactory()));
74         connector.setHandler(new ClientServerHandler());
75         IoSession session=null;
76         try {
77             ConnectFuture future= connector.connect(new InetSocketAddress(ip,port));
78             future.awaitUninterruptibly();
79             session=future.getSession();
80             logger.info("客户端连接成功");
81         } catch (Exception e) {
82             logger.info("客户端连接异常....",e);
83         }
84         return session;
85     }
86     
87     /**
88      * 发送心跳包
89      * @param session 与服务器连接成功之后的IoSession
90      * @param baseMsg 将要发送的心跳包
91      */
92     public static void sendHeartBeat(IoSession session, BaseMsg baseMsg) {
93         logger.info("命令号为:"+baseMsg.getCommand());
94         session.write(baseMsg);
95     }
96 }
客户端通信层
技术分享图片
  1 package com.superb.mina.clientServer.handler;
  2 
  3 import java.io.ByteArrayInputStream;
  4 import java.io.ObjectInputStream;
  5 import java.net.InetSocketAddress;
  6 
  7 import org.apache.log4j.Logger;
  8 import org.apache.mina.core.future.ConnectFuture;
  9 import org.apache.mina.core.service.IoConnector;
 10 import org.apache.mina.core.service.IoHandlerAdapter;
 11 import org.apache.mina.core.session.IdleStatus;
 12 import org.apache.mina.core.session.IoSession;
 13 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 14 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
 15 import org.apache.mina.transport.socket.nio.NioSocketConnector;
 16 
 17 import com.superb.mina.entity.BaseMsg;
 18 import com.superb.mina.helper.Helper;
 19 
 20 
 21 public class ClientServerHandler extends IoHandlerAdapter{
 22     private static Logger logger=Logger.getLogger(ClientServerHandler.class);
 23     @Override
 24     public void sessionCreated(IoSession session) throws Exception {
 25         // TODO Auto-generated method stub
 26         super.sessionCreated(session);
 27         logger.info("创建session");
 28     }
 29     @Override
 30     public void sessionIdle(IoSession session, IdleStatus status)
 31             throws Exception {
 32         // TODO Auto-generated method stub
 33         super.sessionIdle(session, status);
 34         logger.info("session进入空闲状态");
 35     }
 36     @Override
 37     public void messageReceived(IoSession session, Object message)
 38             throws Exception {
 39         BaseMsg msg=(BaseMsg) message;
 40         logger.info("客户端接收到服务器的消息为:"+msg);
 41         logger.info("报文的长度为:"+msg.getLength());
 42         logger.info("报文序列号为:"+msg.getSequence());
 43         logger.info("报文命令号为:"+msg.getCommand());
 44         logger.info("报文保留字为:"+msg.getReserved());
 45         ByteArrayInputStream bi=new ByteArrayInputStream(msg.getData());
 46         ObjectInputStream oi=new ObjectInputStream(bi);
 47         Object obj=oi.readObject();
 48         
 49         if(0x80000000==msg.getCommand()){
 50             //心跳应答包
 51         }else if(0x80000001==msg.getCommand()){
 52             //初始化应答包
 53             
 54         }else if(0x80000002==msg.getCommand()){
 55             //设备动作上报应答包
 56         }else if(0x80000003==msg.getCommand()){
 57             //报警上报应答包
 58         }else if(0x00008000==msg.getCommand()){
 59             //状态查询请求
 60         }else if(0x00008001==msg.getCommand()){
 61             //设备控制请求
 62         }else if(0x00008002==msg.getCommand()){
 63             //常用配置参数
 64         }else if(0x00008003==msg.getCommand()){
 65             //用户参数设置
 66         }else if(0x00008004==msg.getCommand()){
 67             //报警使能设置
 68         }
 69         
 70     }
 71     
 72     
 73     @Override
 74     public void messageSent(IoSession session, Object message) throws Exception {
 75         // TODO Auto-generated method stub
 76         int i=0;
 77         super.messageSent(session, message);
 78         //logger.info(message.toString());
 79         logger.info("消息发送成功");
 80     }
 81     
 82     @Override
 83     public void exceptionCaught(IoSession session, Throwable cause)
 84             throws Exception {
 85         logger.error("客户端发生异常",cause);
 86     }
 87     @Override
 88     public void sessionOpened(IoSession session) throws Exception {
 89         super.sessionOpened(session);
 90         logger.info("连接成功");
 91     }
 92     @Override
 93     public void sessionClosed(IoSession session) throws Exception {
 94          logger.info("连接断开");  
 95          IoConnector connector=null;
 96          connector=new NioSocketConnector();
 97          connector.setConnectTimeoutMillis(30000);
 98          connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
 99          connector.setHandler(new ClientServerHandler());
100          //i=0;  
101          while(true) {  
102              try {  
103                  Thread.sleep(3000);  
104                  // 这里是异步操作 连接后立即返回  
105                  ConnectFuture future = connector.connect(new InetSocketAddress(  
106                          Helper.getMap().get("server").toString(), (int) Helper.getMap().get("server_port")));  
107                  future.awaitUninterruptibly();// 等待连接创建完成  
108                  session = future.getSession();  
109                  if(session.isConnected()) {  
110                      logger.info("重新连接成功");
111                      break;  
112                  }else{  
113 //                   Toast.makeText(getBaseContext(), "请检查网络", Toast.LENGTH_SHORT).show();  
114                      logger.error("请检查网络是否正常");
115                  }  
116              } catch (Exception e) {  
117                  logger.info("正在尝试重新连接....");
118              }  
119          }  
120     }
121 }
客户端业务层
技术分享图片
 1 package com.superb.mina.server;
 2 
 3 import java.net.InetSocketAddress;
 4 
 5 import org.apache.mina.core.service.IoAcceptor;
 6 import org.apache.mina.core.session.IdleStatus;
 7 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 8 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
 9 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 
13 import com.superb.mina.coder.MinaProtobufDecoder;
14 import com.superb.mina.coder.MinaProtobufEncoder;
15 import com.superb.mina.coder.SuperbProtocolCodecFactory;
16 import com.superb.mina.coder.SuperbProtocolDecoder;
17 import com.superb.mina.coder.SuperbProtocolEncoder;
18 import com.superb.mina.server.handler.ServerHandler;
19 
20 
21 public class Server {
22 private static Logger logger=LoggerFactory.getLogger(Server.class);
23     
24     private static int PORT=8003;
25     
26     public static void main(String[] args) {
27         IoAcceptor acceptor=null;
28         try {
29             acceptor=new NioSocketAcceptor();
30             acceptor.getFilterChain().addLast("codec", 
31                                     new ProtocolCodecFilter(new SuperbProtocolCodecFactory()));
32             acceptor.getSessionConfig().setReadBufferSize(2048);
33             acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
34             acceptor.setHandler(new ServerHandler());
35             acceptor.bind(new InetSocketAddress(PORT));
36             logger.info("服务器端启动成功...  端口号为:"+PORT);
37         } catch (Exception e) {
38             logger.error("服务器启动异常...", e);
39         }
40     }
41 }
服务端通信层
技术分享图片
 1 package com.superb.mina.server.handler;
 2 
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ObjectInputStream;
 5 import java.util.Date;
 6 
 7 import org.apache.log4j.Logger;
 8 import org.apache.mina.core.service.IoHandlerAdapter;
 9 import org.apache.mina.core.session.IoSession;
10 
11 import com.superb.mina.entity.BaseMsg;
12 import com.superb.mina.entity.DeviceAction;
13 import com.superb.mina.entity.DeviceAlert;
14 import com.superb.mina.entity.HeartBeat;
15 import com.superb.mina.entity.Init;
16 
17 
18 public class ServerHandler extends IoHandlerAdapter{
19     private static Logger logger=Logger.getLogger(ServerHandler.class);
20     
21     @Override
22     public void messageReceived(IoSession session, Object message)
23             throws Exception {
24         int i=0;
25         BaseMsg msg=(BaseMsg) message;
26         logger.info("报文的长度为:"+msg.getLength());
27         logger.info("报文序列号为:"+msg.getSequence());
28         logger.info("报文命令号为:"+msg.getCommand());
29         logger.info("报文保留字为:"+msg.getReserved());
30         //logger.info(msg.getData());
31         ByteArrayInputStream bi=new ByteArrayInputStream(msg.getData());
32         ObjectInputStream oi=new ObjectInputStream(bi);
33         Object obj=oi.readObject();
34         if(0==msg.getCommand()){
35             HeartBeat heartBeat=new HeartBeat();
36             logger.info("室内温度为:"+heartBeat.getInTemp());
37             logger.info("风机电流为:"+heartBeat.getFanElec());
38             logger.info("室外温度为:"+heartBeat.getOutTemp());
39         }else if(1==msg.getCommand()){
40             Init init=(Init)obj;
41             logger.info("初始化时间为:"+init.getcTime());
42             logger.info("初始化mac为:"+init.getMac());
43             logger.info("初始化室内温度为:"+init.getInTemp());
44             logger.info("初始化室外温度为:"+init.getOutTemp());
45             logger.info("初始化风机电流为:"+init.getFanElec());
46             logger.info("初始化主风机1为:"+init.getInFan1());
47         }
48         
49         /*ByteArrayInputStream bi=new ByteArrayInputStream(msg.getData());
50         ObjectInputStream oi=new ObjectInputStream(bi);
51         Object obj=oi.readObject();
52         
53         logger.info(obj);
54         bi.close();
55         oi.close();
56         if(obj instanceof Init){
57             Init init=(Init)obj;
58             logger.info("初始化时间为:"+init.getcTime());
59             logger.info("初始化mac为:"+init.getMac());
60             logger.info("初始化室内温度为:"+init.getInTemp());
61             logger.info("初始化室外温度为:"+init.getOutTemp());
62             logger.info("初始化风机电流为:"+init.getFanElec());
63             logger.info("初始化主风机1为:"+init.getInFan1());
64         }else if(obj instanceof DeviceAction){
65             
66         }else if(obj instanceof DeviceAlert){
67             
68         }else if(obj instanceof HeartBeat){
69             HeartBeat beat =(HeartBeat)obj;
70             logger.info("当前时间为:"+beat.getcTime());
71             logger.info("风机电流为:"+beat.getFanElec());
72             logger.info("室内温度为:"+beat.getInTemp());
73             logger.info("室外温度为:"+beat.getOutTemp());
74         }
75         
76         
77         
78         msg.getData();
79         logger.info("报文数据段为:"+msg.getData());*/
80         session.write(msg);
81     }
82     @Override
83     public void exceptionCaught(IoSession session, Throwable cause)
84             throws Exception {
85         logger.info("服务器发生异常",cause);
86     }
87 }
服务端业务层

 

自定义协议传输

标签:connected   http   min   slf4j   col   play   boolean   java.net   dispose   

原文地址:https://www.cnblogs.com/s-d-g/p/8717468.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!