标签:mqtt 读写 更新 字段名 catch 应用开发 异常 链接 资源
Internet(全球互联网)是无数台机器基于TCP/IP协议族相互通信产生的。TCP/IP协议族分了四层实现,链路层、网络层、传输层、应用层。
顺序
|
字段名 | 长度(字节) |
字段类型
|
描述
|
1
|
消息长度 |
4(32bit)
|
int | socket报文的长度最长2^31-1字节,大文件传输不使用此字段 |
2
|
行为标识
|
1(8bit)
|
byte
|
用于分支处理数据1字节可标识256种行为,一般够用
|
3
|
加密标识 |
1(8bit)
|
byte
|
区分加密方式0不加密
|
4
|
时间戳 |
8(64bit)
|
long | 消息时间戳,其实也没啥用,加着玩的,忽视掉吧 |
5
|
消息体
|
|
String
|
长度为消息长度-10字节,建议使用json,具体解析行为由行为标识字段定义
|
2、规定通信工作流程
/** 消息包(报文) **/ class SocketPackage { int length;// 长度 byte action;// 行为标识 byte encryption;// 加密标识 long timestamp;// 时间戳 String data;// 消息体 /** TODO:将此消息包转换为适当的byte数组 **/ byte[] toBytes() { byte[] lengthBytes = int2bytes(length); // ...将各个字段都做了转换成bytes的操作后,合并byte数组并返回 } /** TODO:读取输入流转换成一个消息包 **/ static SocketPackage parse(InputStream in) throws IOException { SocketPackage sp = new SocketPackage(); byte[] lengthBytes = new byte[4]; in.read(lengthBytes);// 未收到信息时此步将会阻塞 sp.length = bytes2int(lengthBytes); // .....其他字段读取就不写了,这里要控制好异常,不要随意catch住,如果发生异常,不是socket坏了就是报文异常了,应当采用拒绝连接的形式向对方跑出异常 } }
/** 封装下socket,使其可以保存更多的连接信息,不要纠结名字,我纠结了好一会儿不知道怎么命名,反正是伪代码,就这样写着吧 **/ class NiuxzSocket { Socket socket; volatile long lastUse;// 上次使用时间 // ...这里还可以再加其他属性,比如是否是写状态,写操作开始时间,上次非心跳包时间等 NiuxzSocket(Socket socket) { this.socket = socket; this.lastUse = System.currentTimeMillis(); } InputStream getIn() { return socket.getInputStream(); } void write(byte[] bytes) throws IOException { this.socket.getOutputStream().write(bytes); } }
/** 封装一个发送信息的接口,提供常用的发送信息方法。 **/ interface SocketClient { SocketPackage sendData(SocketPackage sp);// 发送一个消息包,并等待返回的消息包 // TODO:还可以根据双方的业务和协议添加几个更方便使用的接口方法。比如只返回消息体字段,或者直接返回json内容的 void sendHeartBeat(NiuxzSocket socket);// 发送一个心跳包,这个方法后面讲心跳包时会用到 } class DefaultSocketClient implements SocketClient { SocketPool socketPool;// 先假装有一个socket连接池,用来管理socket。不使用连接池的话,在这里直接注入一个NiuxzSocket就可以了。下面代码中也直接使用socket,但是一定要在使用时进行加锁操作。否则就会造成多线程访问同一个socket导致数据错乱了。 /** 此方法就是主动端工作入口了,业务代码可以直接调用这里进行发送数据 **/ SocketPackage sendData(SocketPackage sp){ NiuxzSocket niuxzSocket = socketPool.get();//获取一个socket,这里可以看到获取的socket并不是原生的socket,其实是我们自己封装后的socket try{ niuxzSocket.write(sp.toBytes());//阻塞持续写到缓存中 niuxzSocket.lastUse = System.currentTimeMillis();//根据业务方法更新socket的状态信息 SocketPackage sp = SocketPackage.parse(niuxzSocket.getIn());//阻塞读,等待消息的返回,因为是单线程操作socket所以不存在消息插队的情况。 return sp; }catch(Exception e){ LOG.error("发送消息包失败",e); socketPool.destroy(niuxzSocket) //在发生不可复用的异常时才关闭socket,并销毁这个NiuxzSocke。不可复用异常意思是IO操作到了一半不知道具体到哪了所以整个socket都不可用了。 } finally{ if(socketPool!=null){ socketPool.recycle(niuxzSocket );//使用完这个socket后我们不要关闭,因为还要复用,让连接池回收这个socket。recycle内要判断socket是否是销毁状态。 } } } }
/** 定义一个连接池接口SocketPool **/ interface SocketPool { /** 获取一个连接 **/ NiuxzSocket get(); /** 回收Socket **/ void recycle(NiuxzSocket ns); /** 销毁Socket **/ void destroy(NiuxzSocket ns); } /** 实现连接池 **/ class DefaultSocketPool implements SocketPool { BlockingQueue<NiuxzSocket> sockets;// 存放socket的容器,也可以使用数组 NiuxzSocket get() { // TODO:池里有就获取,没有就开一个线程去创建 并且等待创建完成,可使用synchronized/wait或Lock/condition } // TODO:实现socketPool,实现连接池是属于性能可靠性优化,要做的事情会比较多。偷个懒,大家懂就好,具体实现,等有时间我把我的连接池代码整理后再写一篇文章,有想了解的可以给我评论讨论下。 }
/**开启一个ServerSocket并等待连接,联入后开启一个线程进行处理**/ class NiuxzServer{ ServerSocket serverSocket; HashMap<NiuxzSocket> sockets = new HashMap<NiuxzSocket>(); public static AtomicInteger workerCount = 0; public Object waitLock = new Object(); int maxWorkerCount = 100;//允许100个连接进入 int port;//配置一个端口号 /**工作入口**/ void work(){ serverSocket = new ServerSocket(port); while(true){ Socket socekt = serverSocket.accept();//阻塞等待连接 NiuxzSocket niuxzSocket = new NiuxzSocket(socket); sockets.put(niuxzSocket ,1);//将连接放入map中 Worker worker = new Worker(niuxzSocket );//创建一个工作线程 worker.start();//开始线程 while(true){ if(workerCount.incrementAndGet()>=maxWorkerCount){//如果超过了规定的最大线程数,就进入等待,等待其他连接销毁 synchronized(waitLock){ if(workerCount.incrementAndGet()>=maxWorkerCount){//double check 确定进入等待前没有正在断开的socket waitLock.wait(); }else{ break; } } }else{ break; } } } } /**销毁一个连接**/ void destroy(NiuxzSocket socket){ synchronized(waitLock){ sockets.remove(socket);//从池子里删除 workerCount.decrementAndGet();//当前连接数减一 waitLock.notify();//通知work方法 可以继续接受请求了 } } /**创建一个工作者线程类,处理连入的socket**/ class Worker extends Thread{ HashMap<Integer,SocketHandler> handlers;//针对每种行为标识做的消息处理器。 NiuxzSocket socket; Worker(NiuxzSocketsocket){//构造函数 this.socket = socket; } void run(){ try{ while(true){ SocketPackage sp = SocketPackage.parse(socket.getIn());//阻塞读,直到读完一个消息包未知,这样可以解决粘包或半包的问题 SocketHandler handler = handlers.get(sp.getAction());//根据行为标识获取响应的处理器 handler.handle(sp,socket);//处理结果和响应信息都在handler中回写 } }cache(Exception e){ LOG.error("连接异常中断",e); NiuxzServer.destroy(socket); } } } }
/** 创建一个消息处理器 SocketHandler 接收所有内容后 回显 **/ class EchoSocketHandler implements SocketHandler { /** 处理socket请求 **/ void handle(SocketPackage sp, NiuxzSocket socket) { sp.setAction(10);// 比如协议中的行为标识10是响应成功的意思 socket.write(sp.toBytes());// 直接回写 } }
至此两端的工作代码已经初步完成。socket可以按照相互制定的通讯方式进行通讯了。
3、心跳机制:
心跳机制socket长链接通讯中不可或缺的一个机制。主动端可以检测socket是否存活,被动端可以检测对方是否还在线。因为有时候网络并不一定那么完美,会出现链路上的异常,此时应用层可能并不能发现问题,等下次再用这个连接的时候就会抛出异常了,如果是被动端,还会白白占用着一个线程,不如在那之前就发现一部分异常,并销毁连接,下次通讯时出错的概率就降低了很多,被动端也会释放线程,释放资源。
@Scheduled(fixedDelay=30*1000)//延时30秒执行一次 void HeartBeat(){ for(NiuxzSocket socket:socketPool.getAllSocket()){ if(System.curTime() - socket.getLastUse() > 30*1000){//如果系统时间减上次使用时间大于30秒 //开启线程,从连接池中取出这个连接remove(socket)移除成功再继续操作,保证不会有其他线程同时使用这个socket。发送一个SocketPackage,socketClient.sendHeartBeat() if(socketPool.remove(socket)){ socketClient.snedHeartBeat(socket);//socketClient.snedHeartBeat这个方法实现:行为标识设置为心跳包,比如规定1就是心跳包。完事回收这个链接socketPool.recycle(socket),但当中间反生异常,则代表这个连接不可用了,就销毁socketPool.destroy(socket)。 } } } }
以上便是我用同步socket实现第一版分布式文件系统时总结的经验,有些问题其实在NIO中变得不是问题了。NIO和AIO更适合会持有大量连接的服务器端。
标签:mqtt 读写 更新 字段名 catch 应用开发 异常 链接 资源
原文地址:http://www.cnblogs.com/niuxiaozu/p/7942804.html