标签:rgs amp 释放 hand pos ini row sock 阻塞
自己对着源码敲一遍练习,写上注释。发现NIO编程难度好高啊。。虽然很复杂,但是NIO编程的有点还是很多:
1、客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECTION等待后续结果,不需要像BIO的客户端一样被同步阻塞。
2、SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信模型就可以处理其他的链路,不需要同步等待这个链路可用。
3、线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,没有连接句柄的限制,那么Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降。所以它非常适合做高性能、高负载的网络服务器。
TimeClient:
1 package nio; 2 3 public class TimeClient { 4 public static void main(String args[]){ 5 int port = 8080; 6 if(args != null && args.length > 0){ 7 try{ 8 port = Integer.valueOf(args[0]); 9 }catch(NumberFormatException e){ 10 //采用默认值 11 } 12 } 13 new Thread(new TimeClientHandle("120.0.0.1",port),"TimeClient-001").start(); 14 } 15 }
TimeClientHandler:
1 package nio; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.SocketChannel; 9 import java.util.Iterator; 10 import java.util.Set; 11 12 public class TimeClientHandle implements Runnable{ 13 private String host; 14 private int port; 15 private Selector selector; 16 private SocketChannel socketChannel; 17 private volatile boolean stop; 18 19 public TimeClientHandle(String host,int port){ 20 this.host = host == null ? "127.0.0.1" : host; 21 this.port = port; 22 try{ 23 selector = Selector.open(); 24 socketChannel = SocketChannel.open(); 25 socketChannel.configureBlocking(false); 26 }catch(IOException e){ 27 e.printStackTrace(); 28 System.exit(1); 29 } 30 } 31 32 33 public void run() { 34 //发送请求连接 35 try{ 36 doConnect(); 37 }catch(IOException e){ 38 e.printStackTrace(); 39 System.exit(1); 40 } 41 while(!stop){ 42 try{ 43 selector.select(1000); 44 Set<SelectionKey> selectedKeys = selector.selectedKeys(); 45 Iterator<SelectionKey> it = selectedKeys.iterator(); 46 SelectionKey key = null; 47 //当有就绪的Channel时,执行handleInput(key)方法 48 while(it.hasNext()){ 49 key = it.next(); 50 it.remove(); 51 try{ 52 handleInput(key); 53 }catch(Exception e){ 54 if(key != null){ 55 key.cancel(); 56 if(key.channel() != null){ 57 key.channel().close(); 58 } 59 } 60 } 61 } 62 }catch(Exception e){ 63 e.printStackTrace(); 64 System.exit(1); 65 } 66 } 67 68 //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 69 if(selector != null){ 70 try{ 71 selector.close(); 72 }catch(IOException e){ 73 e.printStackTrace(); 74 } 75 } 76 77 } 78 79 80 private void handleInput(SelectionKey key) throws IOException{ 81 if(key.isValid()){ 82 SocketChannel sc = (SocketChannel)key.channel(); 83 //判断是否连接成功 84 if(key.isConnectable()){ 85 if(sc.finishConnect()){ 86 sc.register(selector, SelectionKey.OP_READ); 87 }else{ 88 System.exit(1); 89 } 90 } 91 92 if(key.isReadable()){ 93 ByteBuffer readBuffer = ByteBuffer.allocate(1024); 94 int readBytes = sc.read(readBuffer); 95 if(readBytes > 0){ 96 readBuffer.flip(); 97 byte[] bytes = new byte[readBuffer.remaining()]; 98 readBuffer.get(bytes); 99 String body = new String(bytes,"UTF-8"); 100 System.out.println("Now is :" + body); 101 this.stop = true; 102 }else if(readBytes < 0){ 103 //对端链路关闭 104 key.cancel(); 105 sc.close(); 106 }else{ 107 ; //读到0字节,忽略 108 } 109 } 110 } 111 } 112 113 private void doConnect() throws IOException{ 114 //如果直接连接成功,则注册到多路复用器上,发送请求信息,读应答 115 if(socketChannel.connect(new InetSocketAddress(host,port))){ 116 socketChannel.register(selector, SelectionKey.OP_READ); 117 doWrite(socketChannel); 118 }else{ 119 //说明服务器没有返回TCP祸首应答消息,但这并不代表连接失败,当服务器返回TCP syn-ack消息后,Selector就能够轮训这个SocketChannel处于连接就绪状态 120 socketChannel.register(selector, SelectionKey.OP_CONNECT); 121 } 122 } 123 124 private void doWrite(SocketChannel sc) throws IOException{ 125 byte[] req = "QUERY TIME ORDER".getBytes(); 126 ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); 127 writeBuffer.put(req); 128 writeBuffer.flip(); 129 sc.write(writeBuffer); 130 if(!writeBuffer.hasRemaining()){ 131 System.out.println("Send order 2 server succeed."); 132 } 133 } 134 135 }
TimeServer:
1 package nio; 2 3 import java.io.IOException; 4 5 public class TimeServer { 6 7 public static void main(String[] args) throws IOException{ 8 int port = 8080; 9 if(args != null && args.length >0){ 10 try{ 11 port = Integer.valueOf(args[0]); 12 }catch(NumberFormatException e){ 13 //采用默认值 14 } 15 } 16 //多路复用类,是一个独立的线程,负责轮训多路复用器Selctor,处理多个客户端的并发接入。 17 MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); 18 new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); 19 } 20 }
MultiplexerTimeServer:
1 package nio; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.ServerSocketChannel; 9 import java.nio.channels.SocketChannel; 10 import java.util.Iterator; 11 import java.util.Set; 12 13 public class MultiplexerTimeServer implements Runnable { 14 15 private Selector selector; 16 17 private ServerSocketChannel servChannel; 18 19 private volatile boolean stop; 20 21 public MultiplexerTimeServer(int port){ 22 try{ 23 24 selector = Selector.open(); 25 servChannel.configureBlocking(false); 26 //将ServerSocketChannel 设置为异步非阻塞,backlog设置为1024 27 servChannel.socket().bind(new InetSocketAddress(port),1024); 28 //将ServerSocket Channel注册到Selector,监听SelectionKey.OP_ACCEPT操作位,如果初始化失败,则退出 29 servChannel.register(selector,SelectionKey.OP_ACCEPT); 30 System.out.println("The time server is start in port:" + port); 31 }catch(IOException e){ 32 e.printStackTrace(); 33 System.exit(1); 34 } 35 } 36 37 public void stop(){ 38 this.stop = true; 39 } 40 41 public void run() { 42 while(!stop){ 43 try{ 44 //遍历时间设置1秒,每隔一秒唤醒一次,当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合 45 selector.select(1000); 46 Set<SelectionKey> selectedKeys = selector.selectedKeys(); 47 Iterator<SelectionKey> it = selectedKeys.iterator(); 48 SelectionKey key = null; 49 //通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作 50 while(it.hasNext()){ 51 key = it.next(); 52 it.remove(); 53 try{ 54 handleInput(key); 55 }catch(Exception e){ 56 if(key != null){ 57 key.cancel(); 58 if(key.channel() != null){ 59 key.channel().close(); 60 } 61 } 62 } 63 } 64 }catch(Throwable t){ 65 t.printStackTrace(); 66 } 67 } 68 69 //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 70 if(selector != null){ 71 try{ 72 selector.close(); 73 }catch(IOException e){ 74 e.printStackTrace(); 75 } 76 } 77 } 78 79 //处理新接入的请求消息 80 private void handleInput(SelectionKey key) throws IOException{ 81 if(key.isValid()){ 82 83 //根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作相当于 84 //完成了TCP的三次握手,TCP物理链路正式建立 85 if(key.isAcceptable()){ 86 ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); 87 SocketChannel sc = ssc.accept(); 88 sc.configureBlocking(false); 89 //Add the new connection tothe selector 90 sc.register(selector, SelectionKey.OP_READ); 91 } 92 93 if(key.isReadable()){ 94 //Read the data 95 96 SocketChannel sc = (SocketChannel)key.channel(); 97 ByteBuffer readBuffer = ByteBuffer.allocate(1024); 98 int readBytes = sc.read(readBuffer); 99 if(readBytes > 0){ 100 //将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作 101 readBuffer.flip(); 102 byte[] bytes = new byte[readBuffer.remaining()]; 103 readBuffer.get(bytes); 104 String body = new String(bytes,"UTF-8"); 105 System.out.println("The time server receive order: + body"); 106 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 107 doWrite(sc,currentTime); 108 }else if(readBytes < 0){ 109 //对端链路关闭 110 key.cancel(); 111 sc.close(); 112 }else{ 113 ; //读到0字节,忽略 114 } 115 } 116 } 117 } 118 119 private void doWrite(SocketChannel channel,String response) throws IOException{ 120 if(response != null && response.trim().length() >0){ 121 byte[] bytes = response.getBytes(); 122 ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); 123 writeBuffer.put(bytes); 124 writeBuffer.flip(); 125 channel.write(writeBuffer); 126 } 127 } 128 }
标签:rgs amp 释放 hand pos ini row sock 阻塞
原文地址:http://www.cnblogs.com/yangsy0915/p/6136018.html