Java中的BIO、NIO、AIO-3
java
这一篇是代码篇,敲代码有助于理解记忆这些抽象的东西:
参考资料:
目录
Java BIO代码
服务器
客户端
- package sock;
-
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.PrintWriter;
- import java.net.Socket;
-
- public class socketClientT extends Socket{
- private static final String server = "127.0.0.1";
- private static final int port = 2018;
-
- private Socket sock;
- private PrintWriter send;
- private BufferedReader rec;
- socketClientT() throws IOException{
- super(server,port);
- sock = this;
- send = new PrintWriter(sock.getOutputStream(),true);
- rec = new BufferedReader(new InputStreamReader(sock.getInputStream()));
- Thread t = new recvThread();
- BufferedReader sysBuff = new BufferedReader(new InputStreamReader(System.in));
- String line = "";
- while(! line.contains("bye")){
- line = sysBuff.readLine();
- send.println(line);
- }
- send.close();
- rec.close();
- sysBuff.close();
- this.close();
- }
- class recvThread extends Thread{
- private BufferedReader buff;
- recvThread(){
- try {
- buff = new BufferedReader(new InputStreamReader(sock.getInputStream()));
- start();
- } catch (Exception e){
- e.printStackTrace();
- }
- }
- public void run(){
- String res = "";
- try{
- while(true){
- res = buff.readLine();
- if(res.contains("bye"))
- break;
- System.out.println(res);
- }
- send.close();
- buff.close();
- sock.close();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
-
- public static void main(String args[]){
- try {
- socketClientT s = new socketClientT();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
-
Java NIO
JDK 1.4的java.util.*;
包中引入了新的Java I/O库,其目的是提高IO操作的速度。
简介
NIO我们一般认为是New I/O(也是官方的叫法),因为它是相对于老的I/O类库新增的(其实在JDK 1.4中就已经被引入了,但这个名词还会继续用很久,即使它们在现在看来已经是“旧”的了,所以也提示我们在命名时,需要好好考虑),做了很大的改变。但民间跟多人称之为Non-block I/O,即非阻塞I/O,因为这样叫,更能体现它的特点。而下文中的NIO,不是指整个新的I/O库,而是非阻塞I/O。
NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。
新增的着两种通道都支持阻塞和非阻塞两种模式。
阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。
对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO的非阻塞模式来开发。
下面会先对基础知识进行介绍。
缓冲区Buffer
Buffer是一个对象,包含一些要写入或者读出的数据。
在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。
缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。
具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他们实现了相同的接口:Buffer。
通道Channel
我们对数据的读取和写入要通过Channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。
底层的操作系统的通道一般都是全双工的,所以全双工的Channel比流能更好的映射底层操作系统的API。
Channel主要分两大类:
- SelectableChannel:用户网络读写
- FileChannel:用于文件操作
后面代码会涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子类。
多路复用器
Selector是Java NIO 编程的基础。
Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。
服务器端代码
server:
-
-
-
- public class server {
-
- private static int port = 8000;
- private static serverHandle sHandle;
-
- public static void start() {
- start(port);
- }
- private static synchronized void start(int port) {
- if (sHandle != null) {
- sHandle.setStarted(false);
- }
- sHandle = new serverHandle(port);
- new Thread(sHandle,"server").start();
-
- }
- public static void main(String[] args) {
- start();
- }
- }
serverHandle
客户端代码
client
- import java.util.Scanner;
-
-
-
-
- public class clientChannel {
- private static String host = "127.0.0.1";
- private static int port = 8000;
- private static clientHandle cHandle;
-
- public static void start() {
- start(host,port);
- }
- public static synchronized void start(String host, int port) {
- if (cHandle != null) {
- cHandle.stop();
- }
- cHandle = new clientHandle(host, port);
- new Thread(cHandle, "client").start();;
- }
-
- public static Boolean sendMsg(String msg) throws Exception {
- if (msg.contains("q")) {
- return false;
- }
- cHandle.sendMsg(msg);
- return true;
- }
- public static void main(String[] args) {
- try {
- start();
- Scanner s = new Scanner(System.in);
- String tmp;
- while ((tmp = s.nextLine())!= null) {
- sendMsg(tmp);
- }
- } catch (Exception e) {
-
- e.printStackTrace();
- }
-
- }
- }
clientHandle
##测试及解析
测试代码:
- import java.util.Scanner;
-
-
-
-
- public class Test {
-
- public static void main(String[] args) {
- server.start();
- try {
- Thread.sleep(3000);
- } catch (Exception e) {
-
- e.printStackTrace();
- }
- clientChannel.start();
- Scanner s = new Scanner(System.in);
- String tmp;
- try {
- while ((tmp = s.nextLine()) != null) {
- clientChannel.sendMsg(tmp);
- }
- } catch (Exception e) {
-
- e.printStackTrace();
- }
-
- }
-
- }
可以看到,创建NIO服务端的主要步骤如下:
- 打开ServerSocketChannel,监听客户端连接
- 绑定监听端口,设置连接为非阻塞模式
- 创建Reactor线程,创建多路复用器并启动线程
- 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
- Selector轮询准备就绪的key
- Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
- 设置客户端链路为非阻塞模式
- 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
- 异步读取客户端消息到缓冲区
- 对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
- 将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端
Java AIO
Java nio 2.0的主要改进就是引入了异步IO(包括文件和网络),这里主要介绍下异步网络IO API的使用以及框架的设计,以TCP服务端为例。首先看下为了支持AIO引入的新的类和接口:
** java.nio.channels.AsynchronousChannel**
标记一个channel支持异步IO操作。
** java.nio.channels.AsynchronousServerSocketChannel**
ServerSocket的aio版本,创建TCP服务端,绑定地址,监听端口等。
** java.nio.channels.AsynchronousSocketChannel**
面向流的异步socket channel,表示一个连接。
** java.nio.channels.AsynchronousChannelGroup**
异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup
绑定一个线程池,这个线程池执行两个任务:处理IO事件和派发CompletionHandler
。AsynchronousServerSocketChannel
创建的时候可以传入一个AsynchronousChannelGroup
,那么通过AsynchronousServerSocketChannel
创建的AsynchronousSocketChannel
将同属于一个组,共享资。
** java.nio.channels.CompletionHandler**
异步IO操作结果的回调接口,用于定义在IO操作完成后所作的回调工作。AIO的API允许两种方式来处理异步操作的结果:返回的Future
模式或者注册CompletionHandler
,推荐用CompletionHandler
的方式,这些handler的调用是由AsynchronousChannelGroup
的线程池派发的。显然,线程池的大小是性能的关键因素。AsynchronousChannelGroup
允许绑定不同的线程池,通过三个静态方法来创建:
public static AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory) throws IOException
public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor, int initialSize)
public static AsynchronousChannelGroup withThreadPool(ExecutorService executor) throws IOException
需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。
在介绍完了aio引入的TCP的主要接口和类之后,我们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,一般都是采用Reactor
模式,Reactor负责事件的注册、select、事件的派发;相应地,异步IO有个Proactor模式,Proactor负责CompletionHandler
的派发,查看一个典型的IO写操作的流程来看两者的区别:
Reactor: send(msg) -> 消息队列是否为空,如果为空 -> 向Reactor注册OP_WRITE,然后返回 -> Reactor select -> 触发Writable,通知用户线程去处理 ->先注销Writable(很多人遇到的cpu 100%的问题就在于没有注销),处理Writeable,如果没有完全写入,继续注册OP_WRITE。注意到,写入的工作还是用户线程在处理。
Proactor: send(msg) -> 消息队列是否为空,如果为空,发起read异步调用,并注册CompletionHandler,然后返回。 -> 操作系统负责将你的消息写入,并返回结果(写入的字节数)给Proactor -> Proactor派发CompletionHandler。可见,写入的工作是操作系统在处理,无需用户线程参与。事实上在aio的API中,AsynchronousChannelGroup就扮演了Proactor的角色。
CompletionHandler有三个方法,分别对应于处理成功、失败、被取消(通过返回的Future)情况下的回调处理:
public interface CompletionHandler<V,A> {
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
void cancelled(A attachment);
}
其中的泛型参数V表示IO调用的结果,而A是发起调用时传入的attchment。
server端代码
server
-
-
- public class Server {
- public static int clientCount = 0;
- public static int port = 8000;
- public static String hoString = "127.0.0.1";
-
- public static void start() {
- start(Server.port);
- }
- public static void start(int port) {
- AsyncServerHandler serverHandler = new AsyncServerHandler(port);
- Thread t1 = new Thread(serverHandler);
- t1.start();
- }
- public static void main(String[] args) {
- start();
- }
- }
-
AsyncServerHandler
-
-
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.channels.AsynchronousServerSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.CountDownLatch;
-
- public class AsyncServerHandler implements Runnable{
- private AsynchronousServerSocketChannel serverSocketChannel;
- private CountDownLatch latch;
- public AsyncServerHandler(int port) {
-
- InetSocketAddress address = new InetSocketAddress(port);
- try {
- serverSocketChannel = AsynchronousServerSocketChannel.open();
- serverSocketChannel.bind(address);
- } catch (IOException e) {
-
- e.printStackTrace();
- }
- System.out.println("服务器已经启动,端口号:" + port);
- }
-
- @Override
- public void run() {
-
-
-
-
-
-
- this.latch = new CountDownLatch(1);
- serverSocketChannel.accept(this, new AcceptHandler(this.latch));
-
- try {
- latch.await();
- } catch (InterruptedException e) {
-
- e.printStackTrace();
- }
- }
-
- public AsynchronousServerSocketChannel getServerSocketChannel() {
- return serverSocketChannel;
- }
-
- public void setServerSocketChannel(AsynchronousServerSocketChannel serverSocketChannel) {
- this.serverSocketChannel = serverSocketChannel;
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- public void setLatch(CountDownLatch latch) {
- this.latch = latch;
- }
-
- }
-
AcceptHandler
-
-
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousServerSocketChannel;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.CountDownLatch;
-
- public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncServerHandler> {
-
- private CountDownLatch latch;
- public AcceptHandler(CountDownLatch latch) {
-
- this.latch = latch;
- }
- @Override
- public void completed(AsynchronousSocketChannel socketChannel, AsyncServerHandler serverHandler) {
-
-
- Server.clientCount ++;
- System.out.println("当前连接的客户数:" + Server.clientCount);
-
- AsynchronousServerSocketChannel channel = serverHandler.getServerSocketChannel();
- channel.accept(serverHandler, this);
-
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- socketChannel.read(buffer, buffer, new serverReadHandler(socketChannel));
-
- }
-
- @Override
- public void failed(Throwable exc, AsyncServerHandler attachment) {
-
- exc.printStackTrace();
- this.latch.countDown();
- }
-
-
- }
-
serverReadHandler
-
-
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
-
- public class serverReadHandler implements CompletionHandler<Integer,ByteBuffer> {
-
- private AsynchronousSocketChannel serverChannel;
- public serverReadHandler(AsynchronousSocketChannel channel) {
-
- this.serverChannel = channel;
- }
-
-
- @Override
- public void completed(Integer result, ByteBuffer buffer) {
-
-
-
-
- if (buffer.hasRemaining()) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- String msg = null;
- try {
- msg = new String(bytes, "UTF-8");
- } catch (UnsupportedEncodingException e) {
-
- e.printStackTrace();
- }
- System.out.println("服务器收到消息:" + msg);
-
- String calResult = null;
- StringBuffer stringBuffer = new StringBuffer(msg);
- calResult = stringBuffer.reverse().toString();
-
- byte[] resultBytes = calResult.getBytes();
- ByteBuffer rBuffer = ByteBuffer.allocate(resultBytes.length);
- rBuffer.put(resultBytes);
- this.serverChannel.write(rBuffer, rBuffer, new ServerWriteHandler(this.serverChannel));
- }else {
- System.out.println("服务器没有读取到数据");
- }
- }
-
- @Override
- public void failed(Throwable exc, ByteBuffer buffer) {
-
- try {
- this.serverChannel.close();
- System.out.println("服务器socket关闭~~~");
- } catch (IOException e) {
-
- e.printStackTrace();
- }
- }
-
- }
-
serverWriteHandler
-
-
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
-
- public class ServerWriteHandler implements CompletionHandler<Integer,ByteBuffer> {
- private AsynchronousSocketChannel serverChannel;
- public ServerWriteHandler(AsynchronousSocketChannel channel) {
-
- this.serverChannel = channel;
- }
- @Override
- public void completed(Integer result, ByteBuffer buffer) {
-
-
-
- if (buffer.hasRemaining()) {
- System.out.println("服务器输出数据~~~");
- buffer.clear();
-
- this.serverChannel.write(buffer, buffer, this);
- } else {
-
- ByteBuffer readBuffer = ByteBuffer.allocate(1024);
- this.serverChannel.read(readBuffer, readBuffer, new serverReadHandler(this.serverChannel));
- }
- }
- @Override
- public void failed(Throwable exc, ByteBuffer buffer) {
-
-
- try {
- this.serverChannel.close();
- } catch (IOException e) {
-
- e.printStackTrace();
- }
- }
-
- }
-
客户端
client
-
-
- import java.util.Scanner;
-
- public class client {
- private static String DEFAULT_HOST = "127.0.0.1";
- private static int DEFAULT_PORT = 8000;
- private static AsyncClientHandler clientHandle;
- public static void start(){
- start(DEFAULT_HOST,DEFAULT_PORT);
- }
- public static synchronized void start(String ip,int port){
- if(clientHandle!=null)
- return;
- clientHandle = new AsyncClientHandler(ip,port);
- new Thread(clientHandle,"Client").start();
- }
-
- public static boolean sendMsg(String msg) throws Exception{
- if(msg.equals("q")) return false;
- clientHandle.sendMsg(msg);
- return true;
- }
-
- public static void main(String[] args) throws Exception{
-
- start();
- System.out.println("请输入请求消息:");
- Scanner scanner = new Scanner(System.in);
- String tmp = null;
- for (int i = 0; i < 10; i++) {
- tmp = scanner.nextLine();
- clientHandle.sendMsg(tmp);
- }
-
- }
- }
-
AsyncClientHandler
-
-
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.CountDownLatch;
-
- public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable{
- private AsynchronousSocketChannel clientChannel;
- private String host;
- private int port;
- private CountDownLatch latch;
-
- public AsyncClientHandler(String host, int port) {
-
- this.host = host;
- this.port = port;
- try {
-
- clientChannel = AsynchronousSocketChannel.open();
- } catch (Exception e) {
-
- e.printStackTrace();
- }
- }
-
-
- @Override
- public void run() {
-
- latch = new CountDownLatch(1);
-
- InetSocketAddress address = new InetSocketAddress(this.host, this.port);
-
- clientChannel.connect(address, this, this);
- try {
- latch.await();
- } catch (InterruptedException e) {
-
- e.printStackTrace();
- }
- try {
- clientChannel.close();
- } catch (IOException e) {
-
- e.printStackTrace();
- }
- }
-
- @Override
- public void completed(Void result, AsyncClientHandler attachment) {
-
-
- System.out.println("连接服务器成功!!!!");
- }
-
- @Override
- public void failed(Throwable exc, AsyncClientHandler attachment) {
-
-
- System.out.println("连接服务器失败!!!");
- exc.printStackTrace();
-
- try {
- clientChannel.close();
- latch.countDown();
- } catch (IOException e) {
-
- e.printStackTrace();
- }
- }
-
-
- public void sendMsg(String msg) {
- System.out.println(msg);
- byte[] bytes = msg.getBytes();
- ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- buffer.put(bytes);
-
- buffer.flip();
-
- clientChannel.write(buffer, buffer, new WriteHandler(clientChannel,latch));
-
-
- }
-
-
- }
-
-
writeHandler
-
-
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.CountDownLatch;
-
- public class WriteHandler implements CompletionHandler<Integer, ByteBuffer>{
-
- private AsynchronousSocketChannel clientChannel;
- private CountDownLatch latch;
- public WriteHandler(AsynchronousSocketChannel channel, CountDownLatch latch) {
-
- this.clientChannel = channel;
- this.latch = latch;
- }
- @Override
- public void completed(Integer result, ByteBuffer buffer) {
-
- System.out.println("发送数据成功!~~~");
-
-
-
- if (buffer.hasRemaining()) {
- System.out.println("进入写数据!!!");
- clientChannel.write(buffer, buffer, this);
- System.out.println("发送数据成功~~~");
- }
-
- System.out.println("进入读取数据");
- ByteBuffer readBuffer = ByteBuffer.allocate(1024);
- clientChannel.read(readBuffer, readBuffer, new ReadHandle(clientChannel,latch));
-
- }
-
- @Override
- public void failed(Throwable exc, ByteBuffer buffer) {
-
- System.err.println("发送数据失败~~~");
- try {
- clientChannel.close();
- latch.countDown();
- } catch (IOException e) {
-
- e.printStackTrace();
- }
-
- }
-
- }
-
ReadHandle
-
-
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.CountDownLatch;
-
- public class ReadHandle implements CompletionHandler<Integer,ByteBuffer> {
- private AsynchronousSocketChannel clientChannel;
- private CountDownLatch latch;
- public ReadHandle(AsynchronousSocketChannel channel, CountDownLatch latch) {
-
- this.latch = latch;
- this.clientChannel = channel;
- }
- @Override
- public void completed(Integer result, ByteBuffer buffer) {
-
-
-
- System.out.println("读取数据成功!!!");
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- String res;
- try {
- res = new String(bytes, "UTF-8");
- System.out.println("收到的数据: " + res);
- } catch (UnsupportedEncodingException e) {
-
- e.printStackTrace();
- }
-
-
-
-
- }
- @Override
- public void failed(Throwable exc, ByteBuffer attachment) {
-
- System.err.println("读取数据失败~~~");
-
- try {
- clientChannel.close();
- this.latch.countDown();
- } catch (IOException e) {
-
- e.printStackTrace();
- }
- }
-
-
- }
-