step1:协议格式
step2:根据协议定义出对应的模型
1 package com.superb.mina.entity; 2 3 import java.io.ByteArrayOutputStream; 4 import java.io.DataOutputStream; 5 import java.io.IOException; 6 import java.io.Serializable; 7 import java.nio.charset.Charset; 8 9 /** 10 * 协议包 11 * @author sundg 12 * 13 * 2018年3月30日上午10:55:33 14 */ 15 public class BaseMsg implements Serializable { 16 /** 序列号*/ 17 private static final long serialVersionUID = -4614096987747485330L; 18 public static int HEAD_SIZE =18; 19 /**报文总长度*/ 20 private Integer length; 21 /**序列号*/ 22 private Integer sequence; 23 /**命令号*/ 24 private Integer command; 25 /**保留字*/ 26 private Integer reserved; 27 /**数据段*/ 28 private byte[] data; 29 30 private Integer separator=0xFFFF; 31 32 33 public static long getSerialversionuid() { 34 return serialVersionUID; 35 } 36 37 public void setLength(Integer length) { 38 this.length = length; 39 } 40 41 public Integer getLength() { 42 return length; 43 } 44 45 public Integer getSequence() { 46 return sequence; 47 } 48 49 public void setSequence(Integer sequence) { 50 this.sequence = sequence; 51 } 52 53 public Integer getCommand() { 54 return command; 55 } 56 57 public void setCommand(Integer command) { 58 this.command = command; 59 } 60 61 public Integer getReserved() { 62 return reserved; 63 } 64 65 public void setReserved(Integer reserved) { 66 this.reserved = reserved; 67 } 68 69 public byte[] getData() { 70 return data; 71 } 72 73 public void setData(byte[] data) { 74 this.data = data; 75 if (data != null) { 76 this.length = HEAD_SIZE + data.length; 77 } else { 78 this.length = HEAD_SIZE; 79 } 80 } 81 82 public Integer getSeparator() { 83 return separator; 84 } 85 86 public void setSeparator(Integer separator) { 87 this.separator = separator; 88 } 89 90 /** 91 * 将要发送的信息保存为二进制流进行传输 92 * @return 93 */ 94 public byte[] toBytes() { 95 byte[] ret = null; 96 try { 97 ByteArrayOutputStream bos = new ByteArrayOutputStream(this.length); 98 DataOutputStream dos = new DataOutputStream(bos); 99 100 dos.writeInt(this.length); 101 dos.writeInt(this.sequence); 102 dos.writeInt(this.command); 103 if(this.reserved==null){ 104 dos.writeInt(0); 105 }else{ 106 dos.writeInt(this.reserved); 107 } 108 if (this.data != null) { 109 dos.write(this.data); 110 } 111 dos.writeShort(0xFFFF); 112 ret = bos.toByteArray(); 113 } catch (IOException e) { 114 e.printStackTrace(); 115 } 116 117 return ret; 118 } 119 120 }
step3:自定义编码解码器
1 package com.superb.mina.coder; 2 3 import org.apache.mina.core.session.IoSession; 4 import org.apache.mina.filter.codec.ProtocolCodecFactory; 5 import org.apache.mina.filter.codec.ProtocolDecoder; 6 import org.apache.mina.filter.codec.ProtocolEncoder; 7 8 public class SuperbProtocolCodecFactory implements ProtocolCodecFactory { 9 private ProtocolDecoder decoder; 10 private ProtocolEncoder encoder; 11 12 public SuperbProtocolCodecFactory() { 13 this.encoder = new SuperbProtocolEncoder(); 14 this.decoder = new SuperbProtocolDecoder(); 15 } 16 17 @Override 18 public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception { 19 return this.decoder; 20 } 21 22 @Override 23 public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception { 24 return this.encoder; 25 } 26 }
1 package com.superb.mina.coder; 2 3 import org.apache.mina.core.buffer.IoBuffer; 4 import org.apache.mina.core.session.IoSession; 5 import org.apache.mina.filter.codec.ProtocolEncoder; 6 import org.apache.mina.filter.codec.ProtocolEncoderOutput; 7 8 import com.superb.mina.entity.BaseMsg; 9 10 public class SuperbProtocolEncoder implements ProtocolEncoder { 11 12 @Override 13 public void dispose(IoSession session) throws Exception { 14 // nothing to dispose 15 } 16 17 @Override 18 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { 19 BaseMsg msg = (BaseMsg) message; 20 byte[] bys = msg.toBytes(); 21 22 IoBuffer buffer = IoBuffer.allocate(bys.length, false); 23 buffer.put(bys); 24 buffer.flip(); 25 out.write(buffer); 26 } 27 }
1 package com.superb.mina.coder; 2 3 import org.apache.mina.core.buffer.BufferDataException; 4 import org.apache.mina.core.buffer.IoBuffer; 5 import org.apache.mina.core.session.IoSession; 6 import org.apache.mina.filter.codec.CumulativeProtocolDecoder; 7 import org.apache.mina.filter.codec.ProtocolDecoderOutput; 8 9 import com.superb.mina.entity.BaseMsg; 10 11 12 public class SuperbProtocolDecoder extends CumulativeProtocolDecoder { 13 private boolean prefixedDataAvailable(IoBuffer in) { 14 if (in.remaining() < 4) { 15 return false; 16 } 17 18 int dataLength = in.getInt(in.position()); 19 20 if (dataLength < BaseMsg.HEAD_SIZE) { 21 throw new BufferDataException("dataLength: " + dataLength); 22 } 23 24 return in.remaining() >= dataLength; 25 } 26 27 @Override 28 protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { 29 if (!prefixedDataAvailable(in)) { 30 return false; 31 } 32 33 BaseMsg msg = new BaseMsg(); 34 int length = in.getInt(); 35 msg.setLength(length); 36 msg.setSequence(in.getInt()); 37 msg.setCommand(in.getInt()); 38 msg.setReserved(in.getInt()); 39 40 int body_len = length - BaseMsg.HEAD_SIZE; 41 42 if (body_len < 0 ) { 43 throw new Exception("body length error.(" + body_len + ")"); 44 } 45 46 if (body_len > 0) { 47 byte[] body = new byte[body_len]; 48 in.get(body); 49 msg.setData(body); 50 } 51 in.getShort(); 52 msg.setSeparator(0xFFFF); 53 out.write(msg); 54 return true; 55 } 56 }
step4:编写对应的客户端与服务器端
1 package com.superb.mina.clientServer; 2 3 import java.net.InetSocketAddress; 4 import java.util.Date; 5 import java.util.HashMap; 6 import java.util.Map; 7 8 import org.apache.mina.core.future.ConnectFuture; 9 import org.apache.mina.core.service.IoConnector; 10 import org.apache.mina.core.session.IoSession; 11 import org.apache.mina.filter.codec.ProtocolCodecFilter; 12 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 13 import org.apache.mina.transport.socket.nio.NioSocketConnector; 14 import org.slf4j.Logger; 15 import org.slf4j.LoggerFactory; 16 17 import com.superb.mina.clientServer.handler.ClientServerHandler; 18 import com.superb.mina.coder.MinaProtobufDecoder; 19 import com.superb.mina.coder.MinaProtobufEncoder; 20 import com.superb.mina.coder.SuperbProtocolCodecFactory; 21 import com.superb.mina.coder.SuperbProtocolDecoder; 22 import com.superb.mina.coder.SuperbProtocolEncoder; 23 import com.superb.mina.entity.BaseMsg; 24 import com.superb.mina.entity.Init; 25 26 27 public class ClientServer { 28 private static Logger logger=LoggerFactory.getLogger(ClientServer.class); 29 private static String HOST="192.168.1.115"; 30 private static int PORT=8003; 31 public static void main(String[] args) { 32 IoConnector connector=null; 33 connector=new NioSocketConnector(); 34 connector.setConnectTimeoutMillis(30000); 35 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SuperbProtocolCodecFactory())); 36 connector.setHandler(new ClientServerHandler()); 37 IoSession session=null; 38 try { 39 ConnectFuture future= connector.connect(new InetSocketAddress(HOST,PORT)); 40 future.awaitUninterruptibly(); 41 session=future.getSession(); 42 BaseMsg msg=new BaseMsg(); 43 Thread t=new Thread(); 44 int i=0; 45 while(true){ 46 i++; 47 Init init=new Init(); 48 init.setAc1(25); 49 init.setAc1FaltAlertEnable(1); 50 init.setcTime(new Date()); 51 byte[] bytes=init.getData(init); 52 msg.setSequence(i); 53 msg.setCommand(100); 54 msg.setData(bytes); 55 session.write(msg); 56 t.sleep(8000); 57 } 58 } catch (Exception e) { 59 logger.info("客户端连接异常....",e); 60 } 61 } 62 63 /** 64 * 尝试与服务器建立连接,如果连接成功返回IoSession 65 * @param ip 服务器IP地址 66 * @param port 服务器端口号 67 * @return 返回连接成功之后的IoSession 68 */ 69 public static IoSession connectToServer(String ip,int port){ 70 IoConnector connector=null; 71 connector=new NioSocketConnector(); 72 connector.setConnectTimeoutMillis(30000); 73 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SuperbProtocolCodecFactory())); 74 connector.setHandler(new ClientServerHandler()); 75 IoSession session=null; 76 try { 77 ConnectFuture future= connector.connect(new InetSocketAddress(ip,port)); 78 future.awaitUninterruptibly(); 79 session=future.getSession(); 80 logger.info("客户端连接成功"); 81 } catch (Exception e) { 82 logger.info("客户端连接异常....",e); 83 } 84 return session; 85 } 86 87 /** 88 * 发送心跳包 89 * @param session 与服务器连接成功之后的IoSession 90 * @param baseMsg 将要发送的心跳包 91 */ 92 public static void sendHeartBeat(IoSession session, BaseMsg baseMsg) { 93 logger.info("命令号为:"+baseMsg.getCommand()); 94 session.write(baseMsg); 95 } 96 }
1 package com.superb.mina.clientServer.handler; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ObjectInputStream; 5 import java.net.InetSocketAddress; 6 7 import org.apache.log4j.Logger; 8 import org.apache.mina.core.future.ConnectFuture; 9 import org.apache.mina.core.service.IoConnector; 10 import org.apache.mina.core.service.IoHandlerAdapter; 11 import org.apache.mina.core.session.IdleStatus; 12 import org.apache.mina.core.session.IoSession; 13 import org.apache.mina.filter.codec.ProtocolCodecFilter; 14 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 15 import org.apache.mina.transport.socket.nio.NioSocketConnector; 16 17 import com.superb.mina.entity.BaseMsg; 18 import com.superb.mina.helper.Helper; 19 20 21 public class ClientServerHandler extends IoHandlerAdapter{ 22 private static Logger logger=Logger.getLogger(ClientServerHandler.class); 23 @Override 24 public void sessionCreated(IoSession session) throws Exception { 25 // TODO Auto-generated method stub 26 super.sessionCreated(session); 27 logger.info("创建session"); 28 } 29 @Override 30 public void sessionIdle(IoSession session, IdleStatus status) 31 throws Exception { 32 // TODO Auto-generated method stub 33 super.sessionIdle(session, status); 34 logger.info("session进入空闲状态"); 35 } 36 @Override 37 public void messageReceived(IoSession session, Object message) 38 throws Exception { 39 BaseMsg msg=(BaseMsg) message; 40 logger.info("客户端接收到服务器的消息为:"+msg); 41 logger.info("报文的长度为:"+msg.getLength()); 42 logger.info("报文序列号为:"+msg.getSequence()); 43 logger.info("报文命令号为:"+msg.getCommand()); 44 logger.info("报文保留字为:"+msg.getReserved()); 45 ByteArrayInputStream bi=new ByteArrayInputStream(msg.getData()); 46 ObjectInputStream oi=new ObjectInputStream(bi); 47 Object obj=oi.readObject(); 48 49 if(0x80000000==msg.getCommand()){ 50 //心跳应答包 51 }else if(0x80000001==msg.getCommand()){ 52 //初始化应答包 53 54 }else if(0x80000002==msg.getCommand()){ 55 //设备动作上报应答包 56 }else if(0x80000003==msg.getCommand()){ 57 //报警上报应答包 58 }else if(0x00008000==msg.getCommand()){ 59 //状态查询请求 60 }else if(0x00008001==msg.getCommand()){ 61 //设备控制请求 62 }else if(0x00008002==msg.getCommand()){ 63 //常用配置参数 64 }else if(0x00008003==msg.getCommand()){ 65 //用户参数设置 66 }else if(0x00008004==msg.getCommand()){ 67 //报警使能设置 68 } 69 70 } 71 72 73 @Override 74 public void messageSent(IoSession session, Object message) throws Exception { 75 // TODO Auto-generated method stub 76 int i=0; 77 super.messageSent(session, message); 78 //logger.info(message.toString()); 79 logger.info("消息发送成功"); 80 } 81 82 @Override 83 public void exceptionCaught(IoSession session, Throwable cause) 84 throws Exception { 85 logger.error("客户端发生异常",cause); 86 } 87 @Override 88 public void sessionOpened(IoSession session) throws Exception { 89 super.sessionOpened(session); 90 logger.info("连接成功"); 91 } 92 @Override 93 public void sessionClosed(IoSession session) throws Exception { 94 logger.info("连接断开"); 95 IoConnector connector=null; 96 connector=new NioSocketConnector(); 97 connector.setConnectTimeoutMillis(30000); 98 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); 99 connector.setHandler(new ClientServerHandler()); 100 //i=0; 101 while(true) { 102 try { 103 Thread.sleep(3000); 104 // 这里是异步操作 连接后立即返回 105 ConnectFuture future = connector.connect(new InetSocketAddress( 106 Helper.getMap().get("server").toString(), (int) Helper.getMap().get("server_port"))); 107 future.awaitUninterruptibly();// 等待连接创建完成 108 session = future.getSession(); 109 if(session.isConnected()) { 110 logger.info("重新连接成功"); 111 break; 112 }else{ 113 // Toast.makeText(getBaseContext(), "请检查网络", Toast.LENGTH_SHORT).show(); 114 logger.error("请检查网络是否正常"); 115 } 116 } catch (Exception e) { 117 logger.info("正在尝试重新连接...."); 118 } 119 } 120 } 121 }
1 package com.superb.mina.server; 2 3 import java.net.InetSocketAddress; 4 5 import org.apache.mina.core.service.IoAcceptor; 6 import org.apache.mina.core.session.IdleStatus; 7 import org.apache.mina.filter.codec.ProtocolCodecFilter; 8 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 9 import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 10 import org.slf4j.Logger; 11 import org.slf4j.LoggerFactory; 12 13 import com.superb.mina.coder.MinaProtobufDecoder; 14 import com.superb.mina.coder.MinaProtobufEncoder; 15 import com.superb.mina.coder.SuperbProtocolCodecFactory; 16 import com.superb.mina.coder.SuperbProtocolDecoder; 17 import com.superb.mina.coder.SuperbProtocolEncoder; 18 import com.superb.mina.server.handler.ServerHandler; 19 20 21 public class Server { 22 private static Logger logger=LoggerFactory.getLogger(Server.class); 23 24 private static int PORT=8003; 25 26 public static void main(String[] args) { 27 IoAcceptor acceptor=null; 28 try { 29 acceptor=new NioSocketAcceptor(); 30 acceptor.getFilterChain().addLast("codec", 31 new ProtocolCodecFilter(new SuperbProtocolCodecFactory())); 32 acceptor.getSessionConfig().setReadBufferSize(2048); 33 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 34 acceptor.setHandler(new ServerHandler()); 35 acceptor.bind(new InetSocketAddress(PORT)); 36 logger.info("服务器端启动成功... 端口号为:"+PORT); 37 } catch (Exception e) { 38 logger.error("服务器启动异常...", e); 39 } 40 } 41 }
1 package com.superb.mina.server.handler; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ObjectInputStream; 5 import java.util.Date; 6 7 import org.apache.log4j.Logger; 8 import org.apache.mina.core.service.IoHandlerAdapter; 9 import org.apache.mina.core.session.IoSession; 10 11 import com.superb.mina.entity.BaseMsg; 12 import com.superb.mina.entity.DeviceAction; 13 import com.superb.mina.entity.DeviceAlert; 14 import com.superb.mina.entity.HeartBeat; 15 import com.superb.mina.entity.Init; 16 17 18 public class ServerHandler extends IoHandlerAdapter{ 19 private static Logger logger=Logger.getLogger(ServerHandler.class); 20 21 @Override 22 public void messageReceived(IoSession session, Object message) 23 throws Exception { 24 int i=0; 25 BaseMsg msg=(BaseMsg) message; 26 logger.info("报文的长度为:"+msg.getLength()); 27 logger.info("报文序列号为:"+msg.getSequence()); 28 logger.info("报文命令号为:"+msg.getCommand()); 29 logger.info("报文保留字为:"+msg.getReserved()); 30 //logger.info(msg.getData()); 31 ByteArrayInputStream bi=new ByteArrayInputStream(msg.getData()); 32 ObjectInputStream oi=new ObjectInputStream(bi); 33 Object obj=oi.readObject(); 34 if(0==msg.getCommand()){ 35 HeartBeat heartBeat=new HeartBeat(); 36 logger.info("室内温度为:"+heartBeat.getInTemp()); 37 logger.info("风机电流为:"+heartBeat.getFanElec()); 38 logger.info("室外温度为:"+heartBeat.getOutTemp()); 39 }else if(1==msg.getCommand()){ 40 Init init=(Init)obj; 41 logger.info("初始化时间为:"+init.getcTime()); 42 logger.info("初始化mac为:"+init.getMac()); 43 logger.info("初始化室内温度为:"+init.getInTemp()); 44 logger.info("初始化室外温度为:"+init.getOutTemp()); 45 logger.info("初始化风机电流为:"+init.getFanElec()); 46 logger.info("初始化主风机1为:"+init.getInFan1()); 47 } 48 49 /*ByteArrayInputStream bi=new ByteArrayInputStream(msg.getData()); 50 ObjectInputStream oi=new ObjectInputStream(bi); 51 Object obj=oi.readObject(); 52 53 logger.info(obj); 54 bi.close(); 55 oi.close(); 56 if(obj instanceof Init){ 57 Init init=(Init)obj; 58 logger.info("初始化时间为:"+init.getcTime()); 59 logger.info("初始化mac为:"+init.getMac()); 60 logger.info("初始化室内温度为:"+init.getInTemp()); 61 logger.info("初始化室外温度为:"+init.getOutTemp()); 62 logger.info("初始化风机电流为:"+init.getFanElec()); 63 logger.info("初始化主风机1为:"+init.getInFan1()); 64 }else if(obj instanceof DeviceAction){ 65 66 }else if(obj instanceof DeviceAlert){ 67 68 }else if(obj instanceof HeartBeat){ 69 HeartBeat beat =(HeartBeat)obj; 70 logger.info("当前时间为:"+beat.getcTime()); 71 logger.info("风机电流为:"+beat.getFanElec()); 72 logger.info("室内温度为:"+beat.getInTemp()); 73 logger.info("室外温度为:"+beat.getOutTemp()); 74 } 75 76 77 78 msg.getData(); 79 logger.info("报文数据段为:"+msg.getData());*/ 80 session.write(msg); 81 } 82 @Override 83 public void exceptionCaught(IoSession session, Throwable cause) 84 throws Exception { 85 logger.info("服务器发生异常",cause); 86 } 87 }