标签:cat 状态码 nec new 实体类 locking map 实例 stop
目录
最近公司在预研设备app端与服务端的交互方案,主要方案有
虽然上面的一些成熟方案肯定更利于上生产环境,但它们通讯基础也都是socket长连接,所以本人主要是预研了一下socket长连接的交互,写了个简单demo,采用了BIO的多线程方案,实现了自定义简单协议,心跳机制,socket客户端身份强制验证,socket客户端断线获知等功能,并暴露了一些接口,可通过接口简单实现客户端与服务端的socket交互。
IO通讯模型主要包括阻塞式同步IO(BIO),非阻塞式同步IO,多路复用IO以及异步IO。大神博客请点此
BIO就是:blocking IO。最容易理解、最容易实现的IO工作方式,应用程序向操作系统请求网络IO操作,这时应用程序会一直等待;另一方面,操作系统收到请求后,也会等待,直到网络上有数据传到监听端口;操作系统在收集数据后,会把数据发送给应用程序;最后应用程序受到数据,并解除等待状态。
这种模式下,应用程序的线程不再一直等待操作系统的IO状态,而是在等待一段时间后,就解除阻塞。如果没有得到想要的结果,则再次进行相同的操作。这样的工作方式,暴增了应用程序的线程可以不会一直阻塞,而是可以进行一些其他工作。
目前流程的多路复用IO实现主要包括四种:select、poll、epoll、kqueue。下表是他们的一些重要特性的比较:
异步IO则是采用“订阅-通知”模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的函数。
Java
对阻塞式同步IO的支持主要是java.net
包中的Socket
套接字实现;Java
中非阻塞同步IO模式通过设置serverSocket.setSoTimeout(100);
即可实现;Java 1.4
中引入了NIO
框架(java.nio
包)可以构建多路复用、同步非阻塞IO
程序;Java 7
中对NIO
进行了进一步改进,即NIO2
,引入了异步非阻塞IO方式。由于是要实现socket长连接的demo,主要关注其一些实现注意点及方案,所以本demo采用了BIO
的多线程方案,该方案代码比较简单、直观,引入了多线程技术后,IO的处理吞吐量也大大提高了。下面是BIO
多线程方案server
端的简单实现:
public static void main(String[] args) throws Exception{
ServerSocket serverSocket = new ServerSocket(83);
try {
while(true) {
Socket socket = null;
socket = serverSocket.accept();
//这边获得socket连接后开启一个线程监听处理数据
SocketServerThread socketServerThread = new SocketServerThread(socket);
new Thread(socketServerThread).start();
}
} catch(Exception e) {
log.error("Socket accept failed. Exception:{}", e.getMessage());
} finally {
if(serverSocket != null) {
serverSocket.close();
}
}
}
}
@slf4j
class SocketServerThread implements Runnable {
private Socket socket;
public SocketServerThread (Socket socket) {
this.socket = socket;
}
@Override
public void run() {
InputStream in = null;
OutputStream out = null;
try {
in = socket.getInputStream();
out = socket.getOutputStream();
Integer sourcePort = socket.getPort();
int maxLen = 2048;
byte[] contextBytes = new byte[maxLen];
int realLen;
StringBuffer message = new StringBuffer();
BIORead:while(true) {
try {
while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
message.append(new String(contextBytes , 0 , realLen));
/*
* 我们假设读取到“over”关键字,
* 表示客户端的所有信息在经过若干次传送后,完成
* */
if(message.indexOf("over") != -1) {
break BIORead;
}
}
}
//下面打印信息
log.info("服务器(收到来自于端口:" + sourcePort + "的信息:" + message);
//下面开始发送信息
out.write("回发响应信息!".getBytes());
//关闭
out.close();
in.close();
this.socket.close();
} catch(Exception e) {
log.error("Socket read failed. Exception:{}", e.getMessage());
}
}
}
假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。
由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:
作为socket长连接的demo,使用了上述的解决思路2,即在包尾增加回车换行符进行数据的分割,同时整体数据使用约定的Json
体进行作为消息的传输格式。
使用换行符进行数据分割,可如下进行数据的单行读取:
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String message;
while ((message = reader.readLine()) != null) {
//....
}
可如下进行数据的单行写入:
PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
writer.println(message);
Json
消息格式如下:
@Data
public class ServerReceiveDto implements Serializable {
private static final long serialVersionUID = 6600253865619639317L;
/**
* 功能码 0 心跳 1 登陆 2 登出 3 发送消息
*/
private Integer functionCode;
/**
* 用户id
*/
private String userId;
/**
* 这边假设是string的消息体
*/
private String message;
}
@Data
public class ServerSendDto implements Serializable {
private static final long serialVersionUID = -7453297551797390215L;
/**
* 状态码 20000 成功,否则有errorMessage
*/
private Integer statusCode;
private String message;
/**
* 功能码
*/
private Integer functionCode;
/**
* 错误消息
*/
private String errorMessage;
}
@Data
public class ClientSendDto implements Serializable {
private static final long serialVersionUID = 97085384412852967L;
/**
* 功能码 0 心跳 1 登陆 2 登出 3 发送消息
*/
private Integer functionCode;
/**
* 用户id
*/
private String userId;
/**
* 这边假设是string的消息体
*/
private String message;
}
通过自定义心跳包来实现掉线检测功能,具体思路如下:
客户端连接上服务端后,在服务端会维护一个在线客户端列表。客户端每隔一段时间,向服务端发送一个心跳包,服务端受收到包以后,会更新客户端最近一次在线时间。一旦服务端超过规定时间没有接收到客户端发来的包,则视为掉线。
维护一个客户端map,其中key代表用户的唯一id(用户唯一id的身份验证下面会说明),value代表用户对应的一个实体
/**
* 存储当前由用户信息活跃的的socket线程
*/
private ConcurrentMap<String, Connection> existSocketMap = new ConcurrentHashMap<>();
其中Connection
对象包含的信息如下:
@Slf4j
@Data
public class Connection {
/**
* 当前的socket连接实例
*/
private Socket socket;
/**
* 当前连接线程
*/
private ConnectionThread connectionThread;
/**
* 当前连接是否登陆
*/
private boolean isLogin;
/**
* 存储当前的user信息
*/
private String userId;
/**
* 创建时间
*/
private Date createTime;
/**
* 最后一次更新时间,用于判断心跳
*/
private Date lastOnTime;
}
主要关注其中的lastOnTime
字段,每次服务端接收到标识是心跳数据,会更新当前的lastOnTime
字段,代码如下:
if (functionCode.equals(FunctionCodeEnum.HEART.getValue())) {
//心跳类型
connection.setLastOnTime(new Date());
//发送同样的心跳数据给客户端
ServerSendDto dto = new ServerSendDto();
dto.setFunctionCode(FunctionCodeEnum.HEART.getValue());
connection.println(JSONObject.toJSONString(dto));
}
额外会有一个监测进程,以一定频率来监测上述维护的map中的每一个Connection对象,如果当前时间与lastOnTime
的时间间隔超过自定义的长度,则自动将其对应的socket连接关闭,代码如下:
Date now = new Date();
Date lastOnTime = connectionThread.getConnection().getLastOnTime();
long heartDuration = now.getTime() - lastOnTime.getTime();
if (heartDuration > SocketConstant.HEART_RATE) {
//心跳超时,关闭当前线程
log.error("心跳超时");
connectionThread.stopRunning();
}
在上面代码中,服务端收到标识是心跳数据的时候,除了更新该socket
对应的lastOnTime
,还会同样同样心跳类型的数据给客户端,客户端收到标识是心跳数据的时候也会更新自己的lastOnTime
字段,同时也有一个心跳监测线程在监测当前的socket连接心跳是否超时
通过代码socket = serverSocket.accept()
获得的一个socket
连接我们仅仅只能知道其客户端的ip
以及端口号,并不能获知这个socket
连接对应的到底是哪一个客户端,因此必须得先获得客户端的身份并且验证通过其身份才能让其正常连接。
具体的实现思路是:
自定义一个登陆处理接口,当server
端受到标识是用户登陆的时候(此时会携带用户信息或者token,此处简化为用户id),调用用户的登陆验证,验证通过的话则将该socket
连接与用户信息绑定,设置其为已登录,并且封装对应的对象放入前面提的客户端map中,由此可获得具体用户对应的哪一个socket
连接。
为了实现socket
连接的强制验证,在监测线程中,也会判断当前用户多长时间内没有实现登录态,若超时则认为该socket
连接为非法连接,主动关闭该socket
连接。
自定义登陆处理接口,这边简单以userId来判断是否允许登陆:
public interface LoginHandler {
/**
* client登陆的处理函数
*
* @param userId 用户id
*
* @return 是否验证通过
*/
boolean canLogin(String userId);
}
收到客户端发来的数据时候的处理:
if (functionCode.equals(FunctionCodeEnum.LOGIN.getValue())) {
//登陆,身份验证
String userId = receiveDto.getUserId();
if (socketServer.getLoginHandler().canLogin(userId)) {
//设置用户对象已登录状态
connection.setLogin(true);
connection.setUserId(userId);
if (socketServer.getExistSocketMap().containsKey(userId)) {
//存在已登录的用户,发送登出指令并主动关闭该socket
Connection existConnection = socketServer.getExistSocketMap().get(userId);
ServerSendDto dto = new ServerSendDto();
dto.setStatusCode(999);
dto.setFunctionCode(FunctionCodeEnum.MESSAGE.getValue());
dto.setErrorMessage("force logout");
existConnection.println(JSONObject.toJSONString(dto));
existConnection.getConnectionThread().stopRunning();
log.error("用户被客户端重入踢出,userId:{}", userId);
}
//添加到已登录map中
socketServer.getExistSocketMap().put(userId, connection);
}
监测线程判断用户是否完成身份验证:
if (!connectionThread.getConnection().isLogin()) {
//还没有用户登陆成功
Date createTime = connectionThread.getConnection().getCreateTime();
long loginDuration = now.getTime() - createTime.getTime();
if (loginDuration > SocketConstant.LOGIN_DELAY) {
//身份验证超时
log.error("身份验证超时");
connectionThread.stopRunning();
}
}
socket
在读取数据或者发送数据的时候会出现各种异常,比如客户端的socket
已断开连接(正常断开或物理连接断开等),但是服务端还在发送数据或者还在接受数据的过程中,此时socket
会抛出相关异常,对于该异常的处理需要将自身的socket
连接关闭,避免资源的浪费,同时由于是多线程方案,还需将该socket
对应的线程正常清理。
下面以server端发送数据为例,改代码中加入了重试机制:
public void println(String message) {
int count = 0;
PrintWriter writer;
do {
try {
writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
writer.println(message);
break;
} catch (IOException e) {
count++;
if (count >= RETRY_COUNT) {
//重试多次失败,说明client端socket异常
this.connectionThread.stopRunning();
}
}
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e1) {
log.error("Connection.println.IOException interrupt,userId:{}", userId);
}
} while (count < 3);
}
上述调用的this.connectionThread.stopRunning();
代码如下:
public void stopRunning() {
//设置线程对象状态,便于线程清理
isRunning = false;
try {
//异常情况需要将该socket资源释放
socket.close();
} catch (IOException e) {
log.error("ConnectionThread.stopRunning failed.exception:{}", e);
}
}
上述代码中设置了线程对象的状态,下述代码在监测线程中执行,将没有运行的线程给清理掉
/**
* 存储只要有socket处理的线程
*/
private List<ConnectionThread> existConnectionThreadList = Collections.synchronizedList(new ArrayList<>());
/**
* 中间list,用于遍历的时候删除
*/
private List<ConnectionThread> noConnectionThreadList = Collections.synchronizedList(new ArrayList<>());
//...
//删除list中没有用的thread引用
existConnectionThreadList.forEach(connectionThread -> {
if (!connectionThread.isRunning()) {
noConnectionThreadList.add(connectionThread);
}
});
noConnectionThreadList.forEach(connectionThread -> {
existConnectionThreadList.remove(connectionThread);
if (connectionThread.getConnection().isLogin()) {
//说明用户已经身份验证成功了,需要删除map
this.existSocketMap.remove(connectionThread.getConnection().getUserId());
}
});
noConnectionThreadList.clear();
由于使用了springboot
框架来实现该demo,所以项目结构如下:
socket
工具包目录如下:
pom
文件主要添加了springboot
的相关依赖,以及json
工具和lombok
工具等,依赖如下:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>
</dependencies>
自己写的socket
工具包的使用方式如下:
@Configuration
@Slf4j
public class SocketServerConfig {
@Bean
public SocketServer socketServer() {
SocketServer socketServer = new SocketServer(60000);
socketServer.setLoginHandler(userId -> {
log.info("处理socket用户身份验证,userId:{}", userId);
//用户名中包含了dingxu则允许登陆
return userId.contains("dingxu");
});
socketServer.setMessageHandler((connection, receiveDto) -> log
.info("处理socket消息,userId:{},receiveDto:{}", connection.getUserId(),
JSONObject.toJSONString(receiveDto)));
socketServer.start();
return socketServer;
}
}
该demo中主要提供了以下几个接口进行测试:
具体的postman文件也放已在项目中,具体可点此链接获得
demo中还提供了一个简单压测函数,如下:
@Slf4j
public class SocketClientTest {
public static void main(String[] args) {
ExecutorService clientService = Executors.newCachedThreadPool();
String userId = "dingxu";
for (int i = 0; i < 1000; i++) {
int index = i;
clientService.execute(() -> {
try {
SocketClient client;
client = new SocketClient(InetAddress.getByName("127.0.0.1"), 60000);
//登陆
ClientSendDto dto = new ClientSendDto();
dto.setFunctionCode(FunctionCodeEnum.LOGIN.getValue());
dto.setUserId(userId + index);
client.println(JSONObject.toJSONString(dto));
ScheduledExecutorService clientHeartExecutor = Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, "socket_client+heart_" + r.hashCode()));
clientHeartExecutor.scheduleWithFixedDelay(() -> {
try {
ClientSendDto heartDto = new ClientSendDto();
heartDto.setFunctionCode(FunctionCodeEnum.HEART.getValue());
client.println(JSONObject.toJSONString(heartDto));
} catch (Exception e) {
log.error("客户端异常,userId:{},exception:{}", userId, e.getMessage());
client.close();
}
}, 0, 5, TimeUnit.SECONDS);
while (true){
}
} catch (Exception e) {
log.error(e.getMessage());
}
});
}
}
}
标签:cat 状态码 nec new 实体类 locking map 实例 stop
原文地址:https://www.cnblogs.com/ading-blog/p/10344447.html