标签:enqueue att rri serve 线程管理 service 而且 创建 input
Java NIO是new IO的简称,是一种可以替代Java IO的一套新的IO机制。它提供了一套不同于Java标准IO的操作机制,严格来说,NIO与并发并无直接关系,但是使用NIO技术可以大大提高线程的使用效率。Java NIO设计的基础内容有通道(Channel)、缓冲区(Buffer)、Selector(选择器)。下面说说这几个内容
Channel:Channel是一对象,可以通过它读取和写入数据。可以把它看做是IO中的流,不同的是:
正如上面提到的,所有数据都通过Buffer对象处理,所以不会将字节写入到Channel中,而是将数据写入到Buffer中;不会从Channel中读取字节,而是将数据从Channel读入Buffer,再从Buffer获取这个字节。Channel可以比流更好地反映出底层操作系统的真实情况。特别是在Unix模型中,底层操作系统通常都是双向的。在Java NIO中的Channel主要有如下几种类型:
Buffer是一对象,它包含一些要写入或者读到的Stream对象。应用程序不能直接对 Channel 进行读写操作,而必须通过 Buffer 来进行,即 Channel 是通过 Buffer 来读写数据的。在NIO中,所有的数据都是用Buffer处理的,它是NIO读写数据的中转池。Buffer实质上是一个数组,通常是一个字节数据,但也可以是其他类型的数组。但一个缓冲区不仅仅是一个数组,重要的是它提供了对数据的结构化访问,而且还可以跟踪系统的读写进程。使用 Buffer 读写数据一般遵循以下四个步骤:
当向 Buffer 写入数据时,Buffer 会记录下写了多少数据。一旦要读取数据,需要通过 flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用 clear() 或 compact() 方法。clear() 方法会清空整个缓冲区。compact() 方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。Buffer主要有如下几种:
CopyFile执行三个基本的操作:创建一个Buffer,然后从源文件读取数据到缓冲区,然后再将缓冲区写入目标文件。
public static void copyFileUseNIO(String src,String dst) throws IOException{ //声明源文件和目标文件 FileInputStream fi=new FileInputStream(new File(src)); FileOutputStream fo=new FileOutputStream(new File(dst)); //获得传输通道channel FileChannel inChannel=fi.getChannel(); FileChannel outChannel=fo.getChannel(); //获得容器buffer ByteBuffer buffer=ByteBuffer.allocate(1024); while(true){ //判断是否读完文件 int eof =inChannel.read(buffer); if(eof==-1){ break; } //重设一下buffer的position=0,limit=position buffer.flip(); //开始写 outChannel.write(buffer); //写完要重置buffer,重设position=0,limit=capacity buffer.clear(); } inChannel.close(); outChannel.close(); fi.close(); fo.close(); }
Selector是一个对象,它可以注册到很多个Channel上,监听各个Channel上发生的事件,并且能够根据事件情况决定Channel读写。这样,通过一个线程管理多个Channel,就可以处理大量网络连接了。有了Selector,我们就可以利用一个线程来处理所有的channels。线程之间的切换对操作系统来说代价是很高的,并且每个线程也会占用一定的系统资源。所以,对系统来说使用的线程越少越好。Selector 就是注册对各种 I/O 事件的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。
Selector selector = Selector.open();
为了能让Channel和Selector配合使用,我们需要把Channel注册到Selector上。通过调用 channel.register()方法来实现注册:
channel.configureBlocking(false); SelectionKey key =channel.register(selector,SelectionKey.OP_READ);
注意,注册的Channel 必须设置成异步模式 才可以,否则异步IO就无法工作,这就意味着我们不能把一个FileChannel注册到Selector,因为FileChannel没有异步模式,但是网络编程中的SocketChannel是可以的。
register()的调用的返回值是一个SelectionKey,代表这个通道在此 Selector 上注册。当某个 Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。
SelectionKey中包含如下属性:
把Channel注册到Selector来监听感兴趣的事件,interestSet就是你要选择的感兴趣的事件的集合。可以通过SelectionKey对象来读写interest set:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
通过上面例子可以看到,我们可以通过用 & 和 SelectionKey 中的常量做运算,从SelectionKey中找到我们感兴趣的事件。
readySet 是通道已经准备就绪进行操作的集合。在一次选Selection之后,你应该会首先访问这个readySet。Selection将在下一小节进行解释。可以这样访问ready集合,也可以用像检测interest集合那样的方法,来检测Channel中什么事件或操作已经就绪:
int readySet = selectionKey.readyOps();
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
我们可以通过SelectionKey获得Selector和注册的Channel:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
可以将一个对象或者更多信息attach 到SelectionKey上,这样就能识别某个给定的通道。例如,可以附加与通道一起使用的Buffer,或包含聚集数据对象。使用方法如下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
还可以在用register()方法向Selector注册Channel的时候附加对象。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
主要步骤和元素:
首先,通过 Selector.open() 创建一个 Selector,作为类似调度员的角色。
然后,创建一个 ServerSocketChannel,并且向 Selector 注册,通过指定 SelectionKey.OP_ACCEPT,告诉调度员,它关注的是新的连接请求。
注意,为什么我们要明确配置非阻塞模式呢?这是因为阻塞模式下,注册操作是不允许的,会抛出 IllegalBlockingModeException 异常。
Selector 阻塞在 select 操作,当有 Channel 发生接入请求,就会被唤醒。
在具体的方法中,通过 SocketChannel 和 Buffer 进行数据操作
IO 都是同步阻塞模式,所以需要多线程以实现多任务处理。而 NIO 则是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高
下面用NIO设计一个Echo服务器:
首先定义一个Selector和线程池
private Selector selector; private ExecutorService tp = Executors.newCachedThreadPool();
selector处理所有的网络连接,tp线程池处理每一个客户端请求。为了统计服务器线程在客户端花费的时间,还需要定义一个时间统计有关的变量,用于统计在某一个Socket上花费的时间,time_stat的key为Socket,value为时间戳:
public static Map<Socket,Long> time_stat = new HashMap<Socket,Long>(10240);
下面来看一下NIO服务器的核心代码,startServer()方法用于启动NIO Server。
private void startServer() throws IOException{ this.selector = SelectorProvider.provider().openSelector(); ServerSocketChannel ssc = ServerSocketChannel.open(); // 服务端SocketChannel ssc.configureBlocking(false); // 设置为非阻塞模式 InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(),8000);// 使用8000端口 ssc.socket().bind(isa); SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT); // 将ServerSocketChannel绑定到Selector上,感兴趣的时间为Accept for(;;){ // 主要任务是等待-分发网络消息 this.selector.select(); // 阻塞方法,如果当前没有准备好的的数据,就会等待,如果有的话返回已经准备好的SelectionKey数量 Set<SelectionKey> readyKeys = this.selector.selectedKeys(); // 获取准备好的SelectionKey Iterator<SelectionKey> i = readyKeys.iterator(); long e = 0; while(i.hasNext()){ SelectionKey sk = i.next(); i.remove();// 处理一个删除一个,不然可能重复处理 if(sk.isAcceptable()){ doAccept(sk); }else if(sk.isValid() && sk.isReadable()){// 判断是否可以读 if(!time_stat.containsKey(((SocketChannel) sk.channel()).socket())){ time_stat.put(((SocketChannel) sk.channel()).socket(), System.currentTimeMillis()); } doRead(sk); }else if(sk.isValid() && sk.isWritable()){ // 判断是否可以写 doWrite(sk); e = System.currentTimeMillis(); long b = time_stat.remove(((SocketChannel) sk.channel()).socket()); System.out.println("spend: "+(b-e)+"ms"); } } } }
在了解服务端整体框架后,下面从具体的方法中看看几个主要方法的使用:
private void doAccept(SelectionKey sk) { ServerSocketChannel server = (ServerSocketChannel) sk.channel(); SocketChannel clientChannel; try { clientChannel = server.accept(); clientChannel.configureBlocking(false);// 非阻塞 SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);//将Channel注册到Selector上,并告诉Selector对读感兴趣,Channel准备好读时给线程一个通知 EchoClient ec = new EchoClient(); clientKey.attach(ec);// 客户端实例作为附件,附加到表示这个连接的SelectionKey上,可以在整个连接过程共享ec InetAddress clientAddress = clientChannel.socket().getInetAddress(); System.out.println("Accepted connection from "+clientAddress.getHostAddress()); } catch (Exception e) {} }
EchoClient封装一个队列,保存在需要恢复给这个客户端所有信息上,这样再进行回复,只要outq对象中弹出元素即可。
public class EchoClient { private LinkedList<ByteBuffer> outq; public EchoClient() { this.outq = new LinkedList<ByteBuffer>(); } public LinkedList<ByteBuffer> getOutq() { return outq; } public void enqueue(ByteBuffer bb) { this.outq.addFirst(bb); } }
下面看看doRead()方法的实现。
private void doRead(SelectionKey sk) { SocketChannel c = (SocketChannel) sk.channel(); ByteBuffer bb = ByteBuffer.allocate(8192); int len; try { len = c.read(bb);// 存放读取的数据 if(len<0){ disconnect(sk); return; } } catch (Exception e) { System.out.println("Failed to read from client!"); e.printStackTrace(); disconnect(sk); return; } bb.flip(); tp.execute(new HandleMsg(sk,bb)); // 线程池处理数据 }
HandleMsg的实现很简单:
public class HandleMsg implements Runnable{ SelectionKey sk; ByteBuffer bb; public HandleMsg(SelectionKey sk,ByteBuffer bb){ this.sk = sk; this.bb = bb; } @Override public void run() { EchoClient ec = (EchoClient) sk.attachment(); ec.enqueue(bb);// 将收到的数据压入队列,业务逻辑也可以在这个地方处理了 sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE); selector.wakeup();// 强迫Selector立即返回 } }
doWrite()代码如下,这个方法拿到的sk和doread()方法拿到的是同一个,通过这个sk可以操作共享的EchoClient
private void doWrite(SelectionKey sk) { SocketChannel c = (SocketChannel) sk.channel(); EchoClient ec = (EchoClient) sk.attachment(); LinkedList<ByteBuffer> outq = ec.getOutq(); ByteBuffer bb = outq.getLast();// 列表顶部元素,写回客户端 try { int len = c.write(bb); if(len == -1){ disconnect(sk); return; } if(bb.remaining()== 0){ outq.removeLast();// 缓冲区已经完成写,删除它 } } catch (Exception e) { System.out.println("Failed to write to client."); e.printStackTrace(); disconnect(sk); return; } if(outq.size()==0){ sk.interestOps(SelectionKey.OP_READ); } }
下面用NIO设计一个客户端
首先初始化Selector和Channel
private Selector selector; public void init(String ip,int port) throws IOException{ SocketChannel s = SocketChannel.open(); s.configureBlocking(false); this.selector = SelectorProvider.provider().openSelector(); s.connect(new InetSocketAddress(ip,port));// 并不定连接成功,需要finishConnect()确认 s.register(selector, SelectionKey.OP_CONNECT); }
程序的工作执行逻辑,主要两件事,一个是链接就绪的Connect,一个是刻度的read()事件:
public void working() throws IOException{ while(true){ if(!this.selector.isOpen()){ break; } this.selector.select(); Iterator<SelectionKey> i = this.selector.selectedKeys().iterator(); while(i.hasNext()){ SelectionKey key = i.next(); i.remove(); if(key.isConnectable()){ connect(key);// 判断有没有完成连接,没有的话使用finishConnect()方法完成连接,并向Channel中写入数据及感兴趣的事情 }else if(key.isReadable()){ read(key); } } } }
下面是read事件
private void read(SelectionKey key) throws IOException { SocketChannel c = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(100); c.read(buffer); byte[] bs = buffer.array(); String msg = new String(bs).trim(); System.out.println("客户端收到信息:"+msg); c.close(); key.selector().close(); }
标签:enqueue att rri serve 线程管理 service 而且 创建 input
原文地址:https://www.cnblogs.com/wangyongwen/p/11337420.html