码迷,mamicode.com
首页 > 编程语言 > 详细

多线程(11) — NIO

时间:2019-08-13 00:23:17      阅读:97      评论:0      收藏:0      [点我收藏+]

标签:enqueue   att   rri   serve   线程管理   service   而且   创建   input   

  Java NIO是new IO的简称,是一种可以替代Java IO的一套新的IO机制。它提供了一套不同于Java标准IO的操作机制,严格来说,NIO与并发并无直接关系,但是使用NIO技术可以大大提高线程的使用效率。Java NIO设计的基础内容有通道(Channel)、缓冲区(Buffer)、Selector(选择器)。下面说说这几个内容

1)通道(Channel)

  Channel:Channel是一对象,可以通过它读取和写入数据。可以把它看做是IO中的流,不同的是:

  • Channel是双向的,既可以读又可以写,而流是单向的
  • Channel可以进行异步的读写
  • 对Channel的读写必须通过buffer对象

  正如上面提到的,所有数据都通过Buffer对象处理,所以不会将字节写入到Channel中,而是将数据写入到Buffer中;不会从Channel中读取字节,而是将数据从Channel读入Buffer,再从Buffer获取这个字节。Channel可以比流更好地反映出底层操作系统的真实情况。特别是在Unix模型中,底层操作系统通常都是双向的。在Java NIO中的Channel主要有如下几种类型:

  • FileChannel:从文件读取数据的
  • DatagramChannel:读写UDP网络协议数据
  • SocketChannel:读写TCP网络协议数据
  • ServerSocketChannel:可以监听TCP连接

2)缓冲区(Buffer)

  Buffer是一对象,它包含一些要写入或者读到的Stream对象。应用程序不能直接对 Channel 进行读写操作,而必须通过 Buffer 来进行,即 Channel 是通过 Buffer 来读写数据的。在NIO中,所有的数据都是用Buffer处理的,它是NIO读写数据的中转池。Buffer实质上是一个数组,通常是一个字节数据,但也可以是其他类型的数组。但一个缓冲区不仅仅是一个数组,重要的是它提供了对数据的结构化访问,而且还可以跟踪系统的读写进程。使用 Buffer 读写数据一般遵循以下四个步骤:

  1. 写入数据到 Buffer;
  2. 调用 flip() 方法;
  3. 从 Buffer 中读取数据;
  4. 调用 clear() 方法或者 compact() 方法。

  当向 Buffer 写入数据时,Buffer 会记录下写了多少数据。一旦要读取数据,需要通过 flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用 clear() 或 compact() 方法。clear() 方法会清空整个缓冲区。compact() 方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。Buffer主要有如下几种:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

  CopyFile执行三个基本的操作:创建一个Buffer,然后从源文件读取数据到缓冲区,然后再将缓冲区写入目标文件。

public static void copyFileUseNIO(String src,String dst) throws IOException{
    //声明源文件和目标文件
    FileInputStream fi=new FileInputStream(new File(src));
    FileOutputStream fo=new FileOutputStream(new File(dst));
    //获得传输通道channel
    FileChannel inChannel=fi.getChannel();
    FileChannel outChannel=fo.getChannel();
    //获得容器buffer
    ByteBuffer buffer=ByteBuffer.allocate(1024);
    while(true){
        //判断是否读完文件
        int eof =inChannel.read(buffer);
        if(eof==-1){
            break;  
        }
        //重设一下buffer的position=0,limit=position
        buffer.flip();
        //开始写
        outChannel.write(buffer);
        //写完要重置buffer,重设position=0,limit=capacity
        buffer.clear();
    }
    inChannel.close();
    outChannel.close();
    fi.close();
    fo.close();
}   

三)Selector(选择器对象)

  Selector是一个对象,它可以注册到很多个Channel上,监听各个Channel上发生的事件,并且能够根据事件情况决定Channel读写。这样,通过一个线程管理多个Channel,就可以处理大量网络连接了。有了Selector,我们就可以利用一个线程来处理所有的channels。线程之间的切换对操作系统来说代价是很高的,并且每个线程也会占用一定的系统资源。所以,对系统来说使用的线程越少越好。Selector 就是注册对各种 I/O 事件的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。

Selector selector = Selector.open();

  为了能让Channel和Selector配合使用,我们需要把Channel注册到Selector上。通过调用 channel.register()方法来实现注册:

channel.configureBlocking(false);
SelectionKey key =channel.register(selector,SelectionKey.OP_READ);

  注意,注册的Channel 必须设置成异步模式 才可以,否则异步IO就无法工作,这就意味着我们不能把一个FileChannel注册到Selector,因为FileChannel没有异步模式,但是网络编程中的SocketChannel是可以的。

  register()的调用的返回值是一个SelectionKey,代表这个通道在此 Selector 上注册。当某个 Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。

SelectionKey中包含如下属性:

(1)interestSet

  把Channel注册到Selector来监听感兴趣的事件,interestSet就是你要选择的感兴趣的事件的集合。可以通过SelectionKey对象来读写interest set:

int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE; 

  通过上面例子可以看到,我们可以通过用 & 和 SelectionKey 中的常量做运算,从SelectionKey中找到我们感兴趣的事件。

(2)readySet

  readySet 是通道已经准备就绪进行操作的集合。在一次选Selection之后,你应该会首先访问这个readySet。Selection将在下一小节进行解释。可以这样访问ready集合,也可以用像检测interest集合那样的方法,来检测Channel中什么事件或操作已经就绪:

int readySet = selectionKey.readyOps();
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();

(3)Channel 和 Selector

我们可以通过SelectionKey获得Selector和注册的Channel:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector(); 

(4)Attach一个对象

  可以将一个对象或者更多信息attach 到SelectionKey上,这样就能识别某个给定的通道。例如,可以附加与通道一起使用的Buffer,或包含聚集数据对象。使用方法如下:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

  还可以在用register()方法向Selector注册Channel的时候附加对象。如:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

NIO多路复用

主要步骤和元素:

  • 首先,通过 Selector.open() 创建一个 Selector,作为类似调度员的角色。

  • 然后,创建一个 ServerSocketChannel,并且向 Selector 注册,通过指定 SelectionKey.OP_ACCEPT,告诉调度员,它关注的是新的连接请求。

  • 注意,为什么我们要明确配置非阻塞模式呢?这是因为阻塞模式下,注册操作是不允许的,会抛出 IllegalBlockingModeException 异常。

  • Selector 阻塞在 select 操作,当有 Channel 发生接入请求,就会被唤醒。

  • 在具体的方法中,通过 SocketChannel 和 Buffer 进行数据操作

  IO 都是同步阻塞模式,所以需要多线程以实现多任务处理。而 NIO 则是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高

 

下面用NIO设计一个Echo服务器:

首先定义一个Selector和线程池

private Selector selector;
private ExecutorService tp = Executors.newCachedThreadPool();

  selector处理所有的网络连接,tp线程池处理每一个客户端请求。为了统计服务器线程在客户端花费的时间,还需要定义一个时间统计有关的变量,用于统计在某一个Socket上花费的时间,time_stat的key为Socket,value为时间戳:

public static Map<Socket,Long> time_stat = new HashMap<Socket,Long>(10240);

  下面来看一下NIO服务器的核心代码,startServer()方法用于启动NIO Server。

    private void startServer() throws IOException{
        this.selector = SelectorProvider.provider().openSelector();
        ServerSocketChannel ssc = ServerSocketChannel.open();                          // 服务端SocketChannel
        ssc.configureBlocking(false);                                                  // 设置为非阻塞模式
        InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(),8000);// 使用8000端口
        ssc.socket().bind(isa);
        SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);   // 将ServerSocketChannel绑定到Selector上,感兴趣的时间为Accept
        for(;;){                     // 主要任务是等待-分发网络消息
            this.selector.select(); // 阻塞方法,如果当前没有准备好的的数据,就会等待,如果有的话返回已经准备好的SelectionKey数量
            Set<SelectionKey> readyKeys = this.selector.selectedKeys(); // 获取准备好的SelectionKey
            Iterator<SelectionKey> i = readyKeys.iterator();
            long e = 0;
            while(i.hasNext()){
                SelectionKey sk = i.next();
                i.remove();// 处理一个删除一个,不然可能重复处理
                if(sk.isAcceptable()){
                    doAccept(sk);
                }else if(sk.isValid() && sk.isReadable()){// 判断是否可以读
                    if(!time_stat.containsKey(((SocketChannel) sk.channel()).socket())){
                        time_stat.put(((SocketChannel) sk.channel()).socket(), System.currentTimeMillis());
                    }
                    doRead(sk);
                }else if(sk.isValid() && sk.isWritable()){ // 判断是否可以写
                    doWrite(sk);
                    e = System.currentTimeMillis();
                    long b = time_stat.remove(((SocketChannel) sk.channel()).socket());
                    System.out.println("spend: "+(b-e)+"ms");
                }
            }
        }
    }

  在了解服务端整体框架后,下面从具体的方法中看看几个主要方法的使用:

    private void doAccept(SelectionKey sk) {
        ServerSocketChannel server = (ServerSocketChannel) sk.channel();
        SocketChannel clientChannel;
        try {
            clientChannel = server.accept();
            clientChannel.configureBlocking(false);// 非阻塞
            SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);//将Channel注册到Selector上,并告诉Selector对读感兴趣,Channel准备好读时给线程一个通知
            EchoClient ec = new EchoClient();
            clientKey.attach(ec);// 客户端实例作为附件,附加到表示这个连接的SelectionKey上,可以在整个连接过程共享ec
            InetAddress clientAddress  = clientChannel.socket().getInetAddress();
            System.out.println("Accepted connection from "+clientAddress.getHostAddress());
        } catch (Exception e) {}
    }

  EchoClient封装一个队列,保存在需要恢复给这个客户端所有信息上,这样再进行回复,只要outq对象中弹出元素即可。

public class EchoClient {
    private LinkedList<ByteBuffer> outq;
    public EchoClient() {
        this.outq = new LinkedList<ByteBuffer>();
    }
    public LinkedList<ByteBuffer> getOutq() {
        return outq;
    }
    public void enqueue(ByteBuffer bb) {
        this.outq.addFirst(bb);
    }
}

下面看看doRead()方法的实现。

    private void doRead(SelectionKey sk) {
        SocketChannel c = (SocketChannel) sk.channel();
        ByteBuffer bb = ByteBuffer.allocate(8192);
        int len;
        try {
            len = c.read(bb);// 存放读取的数据
            if(len<0){
                disconnect(sk);
                return;
            }
        } catch (Exception e) {
            System.out.println("Failed to read from client!");
            e.printStackTrace();
            disconnect(sk);
            return;
        }
        bb.flip();
        tp.execute(new HandleMsg(sk,bb)); // 线程池处理数据
    }

  HandleMsg的实现很简单:

public class HandleMsg implements Runnable{

    SelectionKey sk;
    ByteBuffer bb;
    public HandleMsg(SelectionKey sk,ByteBuffer bb){
        this.sk = sk;
        this.bb = bb;
    }
    @Override
    public void run() {
        EchoClient ec = (EchoClient) sk.attachment();
        ec.enqueue(bb);// 将收到的数据压入队列,业务逻辑也可以在这个地方处理了
        sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
        selector.wakeup();// 强迫Selector立即返回
    }
}

  doWrite()代码如下,这个方法拿到的sk和doread()方法拿到的是同一个,通过这个sk可以操作共享的EchoClient

    private void doWrite(SelectionKey sk) {
        SocketChannel c = (SocketChannel) sk.channel();
        EchoClient ec = (EchoClient) sk.attachment();
        LinkedList<ByteBuffer> outq = ec.getOutq();
        ByteBuffer bb = outq.getLast();// 列表顶部元素,写回客户端
        try {
            int len = c.write(bb);
            if(len == -1){
                disconnect(sk);
                return;
            }
            if(bb.remaining()== 0){
                outq.removeLast();// 缓冲区已经完成写,删除它
            }
        } catch (Exception e) {
            System.out.println("Failed to write to client.");
            e.printStackTrace();
            disconnect(sk);
            return;
        }
        if(outq.size()==0){
            sk.interestOps(SelectionKey.OP_READ);
        }
    }

下面用NIO设计一个客户端

  首先初始化Selector和Channel

private Selector selector;
public void init(String ip,int port) throws IOException{
    SocketChannel s = SocketChannel.open();
    s.configureBlocking(false);
    this.selector = SelectorProvider.provider().openSelector();
    s.connect(new InetSocketAddress(ip,port));// 并不定连接成功,需要finishConnect()确认
    s.register(selector, SelectionKey.OP_CONNECT);
}

  程序的工作执行逻辑,主要两件事,一个是链接就绪的Connect,一个是刻度的read()事件:

    public void working() throws IOException{
        while(true){
            if(!this.selector.isOpen()){
                break;
            }
            this.selector.select();
            Iterator<SelectionKey> i = this.selector.selectedKeys().iterator();
            while(i.hasNext()){
                SelectionKey key = i.next();
                i.remove();
                if(key.isConnectable()){
                    connect(key);// 判断有没有完成连接,没有的话使用finishConnect()方法完成连接,并向Channel中写入数据及感兴趣的事情
                }else if(key.isReadable()){
                    read(key);
                }
            }
        }
    }

下面是read事件

    private void read(SelectionKey key) throws IOException {
        SocketChannel c = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(100);
        c.read(buffer);
        byte[] bs = buffer.array();
        String msg = new String(bs).trim();
        System.out.println("客户端收到信息:"+msg);
        c.close();
        key.selector().close();
    }

 

多线程(11) — NIO

标签:enqueue   att   rri   serve   线程管理   service   而且   创建   input   

原文地址:https://www.cnblogs.com/wangyongwen/p/11337420.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!