标签:byte java io 自己 io操作 eal 进程间 text 验证 todo
网络编程的基本模型是C/S模型,即两个进程间的通信。
服务端提供IP和监听端口,客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信。
传统的同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。
最原始BIO通信模型图:
那有没有方法改进呢? ,答案是有的。改进后BIO通信模型图:
此种BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理没处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答通宵模型。
代码演示
服务端:
package demo.com.test.io.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import demo.com.test.io.nio.NioSocketServer; public class BioSocketServer { //默认的端口号 private static int DEFAULT_PORT = 8083; public static void main(String[] args) { ServerSocket serverSocket = null; try { System.out.println("监听来自于"+DEFAULT_PORT+"的端口信息"); serverSocket = new ServerSocket(DEFAULT_PORT); while(true) { Socket socket = serverSocket.accept(); SocketServerThread socketServerThread = new SocketServerThread(socket); new Thread(socketServerThread).start(); } } catch(Exception e) { } finally { if(serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动所有线程后,进入等待状态 synchronized (NioSocketServer.class) { try { BioSocketServer.class.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class SocketServerThread implements Runnable { private Socket socket; public SocketServerThread (Socket socket) { this.socket = socket; } @Override public void run() { InputStream in = null; OutputStream out = null; try { //下面我们收取信息 in = socket.getInputStream(); out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; //使用线程,同样无法解决read方法的阻塞问题, //也就是说read方法处同样会被阻塞,直到操作系统有数据准备好 int realLen = in.read(contextBytes, 0, maxLen); //读取信息 String message = new String(contextBytes , 0 , realLen); //下面打印信息 System.out.println("服务器收到来自于端口:" + sourcePort + "的信息:" + message); //下面开始发送信息 out.write("回发响应信息!".getBytes()); } catch(Exception e) { System.out.println(e.getMessage()); } finally { //试图关闭 try { if(in != null) { in.close(); } if(out != null) { out.close(); } if(this.socket != null) { this.socket.close(); } } catch (IOException e) { System.out.println(e.getMessage()); } } } }
客户端:
package demo.com.test.io.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.URLDecoder; import java.util.concurrent.CountDownLatch; public class BioSocketClient{ public static void main(String[] args) throws Exception { Integer clientNumber = 20; CountDownLatch countDownLatch = new CountDownLatch(clientNumber); // 分别开始启动这20个客户端,并发访问 for (int index = 0; index < clientNumber; index++, countDownLatch.countDown()) { ClientRequestThread client = new ClientRequestThread(countDownLatch, index); new Thread(client).start(); } // 这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动所有线程后,进入等待状态 synchronized (BioSocketClient.class) { BioSocketClient.class.wait(); } } } /** * 一个ClientRequestThread线程模拟一个客户端请求。 * @author keep_trying */ class ClientRequestThread implements Runnable { private CountDownLatch countDownLatch; /** * 这个线程的编号 * @param countDownLatch */ private Integer clientIndex; /** * countDownLatch是java提供的同步计数器。 * 当计数器数值减为0时,所有受其影响而等待的线程将会被激活。这样保证模拟并发请求的真实性 * @param countDownLatch */ public ClientRequestThread(CountDownLatch countDownLatch , Integer clientIndex) { this.countDownLatch = countDownLatch; this.clientIndex = clientIndex; } @Override public void run() { Socket socket = null; OutputStream clientRequest = null; InputStream clientResponse = null; try { socket = new Socket("localhost",8083); clientRequest = socket.getOutputStream(); clientResponse = socket.getInputStream(); //等待,直到SocketClientDaemon完成所有线程的启动,然后所有线程一起发送请求 this.countDownLatch.await(); //发送请求信息 clientRequest.write(("这是第" + this.clientIndex + " 个客户端的请求。 over").getBytes()); clientRequest.flush(); //在这里等待,直到服务器返回信息 System.out.println("第" + this.clientIndex + "个客户端的请求发送完成,等待服务器返回信息"); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; int realLen; String message = ""; //程序执行到这里,会一直等待服务器返回信息(注意,前提是in和out都不能close,如果close了就收不到服务器的反馈了) while((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) { message += new String(contextBytes , 0 , realLen); } //String messageEncode = new String(message , "UTF-8"); message = URLDecoder.decode(message, "UTF-8"); System.out.println("第" + this.clientIndex + "个客户端接收到来自服务器的信息:" + message); } catch (Exception e) { } finally { try { if(clientRequest != null) { clientRequest.close(); } if(clientResponse != null) { clientResponse.close(); } } catch (IOException e) { } } } }
为了改进这种一连接一线程的模型,我们可以使用线程池来管理这些线程,实现1个或多个线程处理N个客户端的模型(但是底层还是使用的同步阻塞I/O),通常被称为“伪异步I/O模型“。
伪异步I/O模型图:
代码演示
只给出服务端,客户端和上面相同
package demo.com.test.io.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import demo.com.test.io.nio.NioSocketServer; public class BioSocketServerThreadPool { //默认的端口号 private static int DEFAULT_PORT = 8083; //线程池 懒汉式的单例 private static ExecutorService executorService = Executors.newFixedThreadPool(60); public static void main(String[] args) { ServerSocket serverSocket = null; try { System.out.println("监听来自于"+DEFAULT_PORT+"的端口信息"); serverSocket = new ServerSocket(DEFAULT_PORT); while(true) { Socket socket = serverSocket.accept(); //当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。 //最终改变不了.accept()只能一个一个接受socket的情况,并且被阻塞的情况 SocketServerThreadPool socketServerThreadPool = new SocketServerThreadPool(socket); executorService.execute(socketServerThreadPool); } } catch(Exception e) { } finally { if(serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动所有线程后,进入等待状态 synchronized (NioSocketServer.class) { try { BioSocketServerThreadPool.class.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class SocketServerThreadPool implements Runnable { private Socket socket; public SocketServerThreadPool (Socket socket) { this.socket = socket; } @Override public void run() { InputStream in = null; OutputStream out = null; try { //下面我们收取信息 in = socket.getInputStream(); out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; //使用线程,同样无法解决read方法的阻塞问题, //也就是说read方法处同样会被阻塞,直到操作系统有数据准备好 int realLen = in.read(contextBytes, 0, maxLen); //读取信息 String message = new String(contextBytes , 0 , realLen); //下面打印信息 System.out.println("服务器收到来自于端口:" + sourcePort + "的信息:" + message); //下面开始发送信息 out.write("回发响应信息!".getBytes()); } catch(Exception e) { System.out.println(e.getMessage()); } finally { //试图关闭 try { if(in != null) { in.close(); } if(out != null) { out.close(); } if(this.socket != null) { this.socket.close(); } } catch (IOException e) { System.out.println(e.getMessage()); } } } }
在 Socket socket = serverSocket.accept(); 处打了断点,有20个客户端同时发出请求,可服务端还是一个一个的处理,其它线程都处于阻塞状态
那么重点的问题并不是“是否使用了多线程、或是线程池”,而是为什么accept()、read()方法会被阻塞。API文档中对于 serverSocket.accept() 方法的使用描述:
Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made.
服务器线程发起一个accept动作,询问操作系统 是否有新的socket套接字信息从端口xx发送过来。
注意,是询问操作系统。也就是说socket套接字的IO模式支持是基于操作系统的,那么自然同步IO/异步IO的支持就是需要操作系统级别的了。如下图:
如果操作系统没有发现有套接字从指定的端口xx来,那么操作系统就会等待。这样serverSocket.accept()方法就会一直等待。这就是为什么accept()方法为什么会阻塞:它内部的实现是使用的操作系统级别的同步IO。
高级Java工程师必备 ----- 深入分析 Java IO (一)BIO
标签:byte java io 自己 io操作 eal 进程间 text 验证 todo
原文地址:https://www.cnblogs.com/java-chen-hao/p/11076176.html