标签:
转载请注明出处:jiq?钦‘s technical Blog - 季义钦
BIO和NIO是两种不同的网络通信模型,现如今NIO已经大量应用在Jetty、ZooKeeper、Netty等开源框架中。
下面通过一个例子解释两者区别:
假设当前服务端程序需要同时从与多个客户端建立的连接读取数据。
如果采用阻塞式IO,单线程情况下,处理者线程可能阻塞在其中一个套接字的read上,导致另一个套接字即使准备好了数据也无法处理,这个时候解决的方法就是针对每一个套接字,都新建一个线程处理其数据读取。
所以说,在BIO工作模式下,服务端程序要想同时处理多个套接字的数据读取,在等待接收连接请求的主线程之外,还要为每一个建立好的连接分配一个新的线程进行处理。
轮询方式
如果将套接字读操作换成非阻塞的,那么只需要一个线程就可以同时处理套接字,每次检查一个套接字,有数据则读取,没有则检查下一个,因为是非阻塞的,所以执行read操作时若没有数据准备好则立即返回,不会发生阻塞。
I/O多路复用
这种轮询的方式缺点是浪费CPU资源,大部分时间可能都是无数据可读的,不必仍不间断的反复执行read操作,I/O多路复用(IOmultiplexing)是一种更好的方法,调用select函数时,其内部会维护一张监听的套接字的列表,其会一直阻塞直到其中某一个套接字有数据准备好才返回,并告诉是哪个套接字可读,这时再调用该套接字的read函数效率更高。
所以基本可以认为 “NIO = I/O多路复用 + 非阻塞式I/O”,大部分情况下是单线程,但也有超过一个线程实现NIO的情况
NIO三种模型
上面所讲到的只需要一个线程就可以同时处理多个套接字,这只是其中的一种单线程模型,是一种较为极端的情况,NIO主要包含三种线程模型:
1) Reactor单线程模型
2) Reactor多线程模型
3)主从Reactor多线程模型
Reactor单线程模型:
单个线程完成所有事情包括接收客户端的TCP连接请求,读取和写入套接字数据等。
对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发的应用却不合适,主要原因如下:
1) 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;
2) 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,NIO线程会成为系统的性能瓶颈;
3) 可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
为了解决这些问题,演进出了Reactor多线程模型。
Reactor多线程模型:
Rector多线程模型与单线程模型最大的区别就是有一组NIO线程处理真实的IO操作。
Reactor多线程模型的特点:
1) 有专门一个NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求;
2) 网络IO操作-读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;
3) 1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。
在绝大多数场景下,Reactor多线程模型都可以满足性能需求;但是,在极特殊应用场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如百万客户端并发连接,或者服务端需要对客户端的握手消息进行安全认证,认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。
即从单线程中由一个线程即监听连接事件、读写事件、由完成数据读写,拆分为由一个线程专门监听各种事件,再由专门的线程池负责处理真正的IO数据读写。
主从Reactor多线程模型
主从Reactor线程模型与Reactor多线程模型的最大区别就是有一组NIO线程处理连接、读写事件。
主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是个1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到IO线程池(sub reactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的IO线程上,由IO线程负责后续的IO操作。
即从多线程模型中由一个线程来监听连接事件和数据读写事件,拆分为一个线程监听连接事件,线程池的多个线程监听已经建立连接的套接字的数据读写事件,另外和多线程模型一样有专门的线程池处理真正的IO操作。
NIO适用场景
服务器需要支持超大量的长时间连接。比如10000个连接以上,并且每个客户端并不会频繁地发送太多数据。例如总公司的一个中心服务器需要收集全国便利店各个收银机的交易信息,只需要少量线程按需处理维护的大量长期连接。
Jetty、Mina、Netty、ZooKeeper等都是基于NIO方式实现。
BIO适用场景
适用于连接数目比较小且固定的场景,这种方式对服务器资源要求比较高,并发局限于应用中。
服务端:
1. package cn.nio; 2. 3. import java.io.IOException; 4. import java.net.InetSocketAddress; 5. import java.nio.ByteBuffer; 6. import java.nio.channels.SelectionKey; 7. import java.nio.channels.Selector; 8. import java.nio.channels.ServerSocketChannel; 9. import java.nio.channels.SocketChannel; 10.import java.util.Iterator; 11. 12./** 13. * NIO服务端 14. */ 15.public class NIOServer { 16. //通道管理器 17. private Selector selector; 18. 19. /** 20. * 获得一个ServerSocket通道,并对该通道做一些初始化的工作 21. * @param port 绑定的端口号 22. * @throws IOException 23. */ 24. public void initServer(int port) throws IOException { 25. // 获得一个ServerSocket通道 26. ServerSocketChannel serverChannel = ServerSocketChannel.open(); 27. // 设置通道为非阻塞 28. serverChannel.configureBlocking(false); 29. // 将该通道对应的ServerSocket绑定到port端口 30. serverChannel.socket().bind(new InetSocketAddress(port)); 31. // 获得一个通道管理器 32. this.selector = Selector.open(); 33. //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后, 34. //当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。 35. serverChannel.register(selector, SelectionKey.OP_ACCEPT); 36. } 37. 38. /** 39. * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 40. * @throws IOException 41. */ 42. @SuppressWarnings("unchecked") 43. public void listen() throws IOException { 44. System.out.println("服务端启动成功!"); 45. // 轮询访问selector 46. while (true) { 47. //当注册的事件到达时,方法返回;否则,该方法会一直阻塞 48. selector.select(); 49. // 获得selector中选中的项的迭代器,选中的项为注册的事件 50. Iterator ite = this.selector.selectedKeys().iterator(); 51. while (ite.hasNext()) { 52. SelectionKey key = (SelectionKey) ite.next(); 53. // 删除已选的key,以防重复处理 54. ite.remove(); 55. // 客户端请求连接事件 56. if (key.isAcceptable()) { 57. ServerSocketChannel server = (ServerSocketChannel) key 58. .channel(); 59. // 获得和客户端连接的通道 60. SocketChannel channel = server.accept(); 61. // 设置成非阻塞 62. channel.configureBlocking(false); 63. 64. //在这里可以给客户端发送信息哦 65. channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes())); 66. //在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。 67. channel.register(this.selector, SelectionKey.OP_READ); 68. 69. // 获得了可读的事件 70. } else if (key.isReadable()) { 71. read(key); 72. } 73. 74. } 75. 76. } 77. } 78. /** 79. * 处理读取客户端发来的信息 的事件 80. * @param key 81. * @throws IOException 82. */ 83. public void read(SelectionKey key) throws IOException{ 84. // 服务器可读取消息:得到事件发生的Socket通道 85. SocketChannel channel = (SocketChannel) key.channel(); 86. // 创建读取的缓冲区 87. ByteBuffer buffer = ByteBuffer.allocate(10); 88. channel.read(buffer); 89. byte[] data = buffer.array(); 90. String msg = new String(data).trim(); 91. System.out.println("服务端收到信息:"+msg); 92. ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes()); 93. channel.write(outBuffer);// 将消息回送给客户端 94. } 95. 96. /** 97. * 启动服务端测试 98. * @throws IOException 99. */ 100. public static void main(String[] args) throws IOException { 101. NIOServer server = new NIOServer(); 102. server.initServer(8000); 103. server.listen(); 104. } 105. 106.}
客户端:
1. package cn.nio; 2. 3. import java.io.IOException; 4. import java.net.InetSocketAddress; 5. import java.nio.ByteBuffer; 6. import java.nio.channels.SelectionKey; 7. import java.nio.channels.Selector; 8. import java.nio.channels.SocketChannel; 9. import java.util.Iterator; 10. 11./** 12. * NIO客户端 13. */ 14.public class NIOClient { 15. //通道管理器 16. private Selector selector; 17. 18. /** 19. * 获得一个Socket通道,并对该通道做一些初始化的工作 20. * @param ip 连接的服务器的ip 21. * @param port 连接的服务器的端口号 22. * @throws IOException 23. */ 24. public void initClient(String ip,int port) throws IOException { 25. // 获得一个Socket通道 26. SocketChannel channel = SocketChannel.open(); 27. // 设置通道为非阻塞 28. channel.configureBlocking(false); 29. // 获得一个通道管理器 30. this.selector = Selector.open(); 31. 32. // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调 33. //用channel.finishConnect();才能完成连接 34. channel.connect(new InetSocketAddress(ip,port)); 35. //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。 36. channel.register(selector, SelectionKey.OP_CONNECT); 37. } 38. 39. /** 40. * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 41. * @throws IOException 42. */ 43. @SuppressWarnings("unchecked") 44. public void listen() throws IOException { 45. // 轮询访问selector 46. while (true) { 47. selector.select(); 48. // 获得selector中选中的项的迭代器 49. Iterator ite = this.selector.selectedKeys().iterator(); 50. while (ite.hasNext()) { 51. SelectionKey key = (SelectionKey) ite.next(); 52. // 删除已选的key,以防重复处理 53. ite.remove(); 54. // 连接事件发生 55. if (key.isConnectable()) { 56. SocketChannel channel = (SocketChannel) key 57. .channel(); 58. // 如果正在连接,则完成连接 59. if(channel.isConnectionPending()){ 60. channel.finishConnect(); 61. 62. } 63. // 设置成非阻塞 64. channel.configureBlocking(false); 65. 66. //在这里可以给服务端发送信息哦 67. channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes())); 68. //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。 69. channel.register(this.selector, SelectionKey.OP_READ); 70. 71. // 获得了可读的事件 72. } else if (key.isReadable()) { 73. read(key); 74. } 75. 76. } 77. 78. } 79. } 80. /** 81. * 处理读取服务端发来的信息 的事件 82. * @param key 83. * @throws IOException 84. */ 85. public void read(SelectionKey key) throws IOException{ 86. //和服务端的read方法一样 87. } 88. 89. 90. /** 91. * 启动客户端测试 92. * @throws IOException 93. */ 94. public static void main(String[] args) throws IOException { 95. NIOClient client = new NIOClient(); 96. client.initClient("localhost",8000); 97. client.listen(); 98. } 99. 100.}
标签:
原文地址:http://blog.csdn.net/jiyiqinlovexx/article/details/51204726