目的:本编文章主要想分享一下NIO方面的知识,由于最近几天工作不忙,趁机学习了下Java NIO Selector的相关知识;主要是实践操作的;具体的理论知识,可以参考网上的文章。
测试用例主要有三种方式:
其实,是服务器端的逻辑不变,客户端有三种方式而已。
服务器端:2个selector + channel, 客户端:一个channel
服务器端:2个selector + channel, 客户端:多个channel(多线程方式)
服务器端:2个selector + channel, 客户端:1个selector + channel
服务端,如果想要一个selector+channel的话,直接在initAndRegister()方法中,注释掉相关代码即可了,当然,客户端也要修改端口部分
服务端代码:
package xingej.selector.test002; //基本思路逻辑: //------------------------------------------------------------------------------ //1、创建一个通道选择器Selector //2、创建服务器端的ServerSocketChannel通道 // 设置ServerSocketChannel属性, // 端口号的绑定 // 3、将通道选择器 与 ServerSocketChannel通道进行绑定,并向通道选择器注册感兴趣的事件 //------------------------------------------------------------------------------ // 4、通道选择器开始工作监听管道事件,调用select()方法,死循环的方式调用 // 如果用户感兴趣的事件发生,就去处理 // 否则,就阻塞在这里 import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; public class NIOSelectorServer { //这里声明了两个缓存区,发送和接收缓冲区 //其实,一个就可以了 private static ByteBuffer sendBuffer = ByteBuffer.allocate(1024); private static ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); private Selector selector; public void initAndRegister() throws Exception { //监听两个服务,因此需要两个端口的 int listenPortA = 8081; int listenPortB = 8082; //创建第一个ServerSocketChannel对象实例 ServerSocketChannel serverSocketChannelA = builderServerSocketChannel(listenPortA); //创建第二个ServerSocketChannel对象实例 ServerSocketChannel serverSocketChannelB = builderServerSocketChannel(listenPortB); //创建通道选择器Selector selector = Selector.open(); //将serverSocketChannelA 通道注册到通道选择器Selector里 register(selector, serverSocketChannelA); //将serverSocketChannelB 通道注册到通道选择器Selector里 register(selector, serverSocketChannelB); } //开始业务监听了 public void listen() throws Exception { System.out.println("-----服务器-------开始接收请求-------OK--------"); while (true) { int readyChannelNum = selector.select(); if (0 == readyChannelNum) { continue; } //从选择器中的selectedKeys,可以获取此时已经准备好的管道事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); //从迭代器移除刚选好的键 iterator.remove(); dealSelectionKey(selector, selectionKey); } Thread.sleep(2000); } } //处理具体事件 private void dealSelectionKey(Selector selector, SelectionKey selectionKey) throws Exception { if (selectionKey.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel clientSocketChannel = serverSocketChannel.accept(); clientSocketChannel.configureBlocking(false); clientSocketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else //读取客户端的内容 if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); receiveBuffer.clear(); StringBuilder msg = new StringBuilder(); //将客户端发送过来的数据,从管道中读取到或者说写到 接收缓存里 while (socketChannel.read(receiveBuffer) > 0) { receiveBuffer.flip(); msg.append(new String(receiveBuffer.array())); receiveBuffer.clear();//清楚数据,下次可以重新写入 } socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); //打印输出从客户端读取到的信息 System.out.println("------>:\t" + msg.toString()); // socketChannel.close(); } else //向客户端 发送数据 if (selectionKey.isWritable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); sendBuffer.flip(); socketChannel.write(sendBuffer); selectionKey.interestOps(SelectionKey.OP_READ); } } //将ServerSocketChannel 向 Selector进行注册,也就是将两者绑定在一起, private void register(Selector selector, ServerSocketChannel serverSocketChannel) throws Exception { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } //创建ServerSocketChannel对象,并进行属性设置 private ServerSocketChannel builderServerSocketChannel(int port) throws Exception { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //设置属性,如非阻塞模式 serverSocketChannel.configureBlocking(false); //绑定端口号 serverSocketChannel.bind(new InetSocketAddress(port)); return serverSocketChannel; } public static void main(String[] args) throws Exception { NIOSelectorServer nioSelectorServer = new NIOSelectorServer(); //初始化 并 注册 nioSelectorServer.initAndRegister(); //开始监听 nioSelectorServer.listen(); } }
客户端请求方式一:
模型如下:
代码如下:
package xingej.selector.test002; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class NIOClient { public static void main(String[] args) throws Exception { SocketChannel clientChannel = SocketChannel.open(); clientChannel.connect(new InetSocketAddress("localhost", 8081)); clientChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put(new String ("hello, server! ").getBytes()); buffer.flip(); clientChannel.write(buffer); clientChannel.close(); } }
客户端请求方式二:
模型如下:
代码如下:
package xingej.selector.test002; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Random; public class NIOClient2 { public static void main(String[] args) throws Exception { String msg = "hello, NIO Server, I‘m "; int[] ports = {8081, 8082}; for (int i = 0; i < 10; i++) { int index = i % 2; int port = ports[index]; new Thread(new SocketChannelThread(msg + i +" client", port)).start(); } } } class SocketChannelThread implements Runnable { //向服务器发送的消息体 private String msg; private int port; private SocketChannel clientChannel; public SocketChannelThread(String msg, int port) { this.msg = msg; this.port = port; } @Override public void run() { try { //创建一个SocketChannel对象实例 clientChannel = SocketChannel.open(); //链接服务器 clientChannel.connect(new InetSocketAddress("localhost", port)); //设置通道未非阻塞模式 clientChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); int sendNum = new Random().nextInt(5) + 1; for(int i = 0; i < sendNum; i++) { buffer.put(new String(msg).getBytes()); buffer.flip(); //将缓冲区的内容发送到通道里 clientChannel.write(buffer); //清理缓存区,下次重新写入 buffer.clear(); //每次发送完成后,休息几秒中,就是为了测试 Thread.sleep(sendNum * 1000); } } catch (Exception e) { e.printStackTrace(); } finally { try{ //如果此通过处于开通状态的话,就关闭此通道 if (clientChannel.isOpen()) { System.out.println("-----关闭通道了------"); clientChannel.close(); } }catch (IOException e) { e.printStackTrace(); } } } }
客户端请求方式三:
模型如下:
代码如下:
package xingej.selector.test002; //创建SocketChannel // 链接服务器 //向服务器发送消息 import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Random; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; // public class NIOSelectorClient { private static Selector selector; private static boolean flag = false; private ByteBuffer sendBuffer = ByteBuffer.allocate(1024); public void initAndRegister() throws Exception{ selector = Selector.open(); createAndRegister(5); } private void createAndRegister(int socketChannelNum) throws Exception{ ExecutorService socketThreadPool = Executors.newFixedThreadPool(5); CountDownLatch _latchs = new CountDownLatch(socketChannelNum); Integer[] ports = {8081, 8082}; for(int i = 0; i < socketChannelNum; i++) { int port = ports[i % 2]; socketThreadPool.submit(new SocketChannelThread(port, _latchs)); } _latchs.await(); socketThreadPool.shutdown(); flag = true; } class SocketChannelThread implements Runnable{ private CountDownLatch _latch; private int port; private SocketChannel socketChannel; public SocketChannelThread(int port, CountDownLatch _latch) { this.port = port; this._latch = _latch; } @Override public void run() { try { socketChannel= SocketChannel.open(); socketChannel.configureBlocking(false); //1到10秒钟,随机休息 //这里,添加时间的目的,是想模拟一下,不想同一时间,向服务器发起请求 int time = (new Random().nextInt(10) + 1) * 1000; System.out.println("----此通道----休息的时间是------:\t" + time / 1000 + " 秒"); Thread.sleep(time); System.out.println("--------2-------port:\t" + port); socketChannel.connect(new InetSocketAddress("localhost", port)); System.out.println("--------3-------"); socketChannel.register(selector, SelectionKey.OP_CONNECT); } catch (Exception e) { e.printStackTrace(); } finally { //计数器,减一 _latch.countDown(); } } } public void listen() throws Exception{ while (true) { System.out.println("-----客户端----准备好了----:\t"); int readyChannelNum = selector.select(); System.out.println("-----客户端----准备好的管道数量是-----:\t" + readyChannelNum); if (0 == readyChannelNum) { continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //下面的方法,就可以将selectionKey 键移除 iterator.remove(); if (selectionKey.isConnectable()) { if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); System.out.println("----客户端----链接完毕了-----"); } socketChannel.register(selector, SelectionKey.OP_WRITE); }else if (selectionKey.isWritable()) { sendBuffer.clear(); sendBuffer.put("hello, server, I‘m client! Are you OK!!!".getBytes()); //flip()必须有的 sendBuffer.flip(); socketChannel.write(sendBuffer); System.out.println("----客户端---向服务器---发送消息-----完毕----OK-----"); //这里注册的事件是write, //效果就是,客户端不断的发送消息 //当然,也可以修改成其他事件,如SelectionKey.OP_READ selectionKey.interestOps(SelectionKey.OP_WRITE); } } //每隔1秒中,就向服务器发送信息 Thread.sleep(1000); } } public static void main(String[] args) throws Exception{ NIOSelectorClient nioSelectorClient = new NIOSelectorClient(); nioSelectorClient.initAndRegister(); //死循环的方式,来监听标志位, //一旦标志位发生改变,就开始监听 while (true) { if (flag) { nioSelectorClient.listen(); break; } } } }
总结:
1、在调用Selector.select()方法之前,最好将要使用的一个SocketChannel或者多个SocketChannel 完成注册功能;也就是说,所有SocketChannel完成注册事件后,才能调用select方法;
不然,很容易出现死锁现象。
如下图所示:
解决措施方式一: 客户端请求方式三,刚开始并没有添加
CountDownLatch 计数器
,针对死锁才添加的。
主线程再调用监听方法时,最好使用观察者模式,目前这里使用了死循环的方式监听,感觉不太好。
2、SocketChannel 通道属于长链接方式,客户端不再发送消息时,通道依旧存在,因此,可以调用Channel.close方法进行关闭
学习方式的建议
如果想更加深入的了解NIO,Selector的话,最好还是不断的进行测试,
如在客户端添加Channel.close(),修改感兴趣的事件,等等
去观察客户端,服务器端的现象,
去总结,去研究源码,
研究源码的目的,不光光是搞清楚背后的原理,
还希望能够学到背后优秀的设计模式,设计思路,使用场景等等,
扩展眼界
代码已分享到git上
https://github.com/xej520/xingej-nio
本文出自 “XEJ分布式工作室” 博客,请务必保留此出处http://xingej.blog.51cto.com/7912529/1969782
原文地址:http://xingej.blog.51cto.com/7912529/1969782