码迷,mamicode.com
首页 > 其他好文 > 详细

NIO基础篇(二)

时间:2015-06-26 00:20:30      阅读:195      评论:0      收藏:0      [点我收藏+]

标签:

  Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。 

  下面的例子是客户端从服务器端下载文件,客户端使用了多线程技术模拟同时下载。Selector可以同时处理多个客户端的连接事件并通知服务器端进行响应操作。

  服务器端的代码为:  

package nio;

import java.io.File;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;

//模拟下载服务
public class DownloadServer<T> implements Callable<T>{
    //A selector may be created by invoking the open method of this class, which will use the system‘s default selector provider to create a new selector. 
    //A selector may also be created by invoking the openSelector method of a custom selector provider.
    //A selector remains open until it is closed via its close method. 
    //A selectable channel‘s registration with a selector is represented by a SelectionKey object. 
    private Selector selector;//创建全局selector
    private Map<SocketChannel, Handle> map = new HashMap<SocketChannel, Handle>();//socketChannel和handle之间的映射

    //创建一个服务器serverSocketChannel,并与selector进行注册
    public DownloadServer() throws Exception {
        selector = Selector.open();
        //ServerSocketChannel is a selectable channel for stream-oriented listening sockets. 
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //if false then it will be placed non-blocking mode 
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(2361));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //服务端接收客户端连接事件
    }

    //对selector.select进行迭代,并依次进行处理
    public T call() throws Exception {
        System.out.println("startTo listen in 2361....");
        for(; ;) {
            //Selects a set of keys whose corresponding channels are ready for I/O operations
            //select() performs a blocking selection operation. It returns only after at least one channel is selected
            selector.select();
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while(keyIterator.hasNext()) {
                //A selectionkey is a token representing the registration of a SelectableChannel with a Selector. 
                //A selectionkey is created each time a channel is registered with a selector. 
                SelectionKey key = keyIterator.next();
                if(key.isValid())
                    handle(key);
                keyIterator.remove();
            }
        }
    }

    //处理每个key,对于acceptable的key,由主类进行处理,而其他事件,则由内部类进行处理
    private void handle(final SelectionKey key) throws Exception {
        //Tests whether this key‘s channel is ready to accept a new socket connection.
        if(key.isAcceptable()) {
            ServerSocketChannel channel = (ServerSocketChannel) key.channel();
            //Accepts a connection made to this channel‘s socket. 
            SocketChannel socketChannel = channel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);//注册读事件
            map.put(socketChannel, new Handle());//把socket和handle进行绑定
        }
        //用map中的handle处理read和write事件,以模拟多个文件同时进行下载
        if(key.isReadable() || key.isWritable()) {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            final Handle handle = map.get(socketChannel);
            if(handle != null)
                handle.handle(key);
        }
    }

    //内部类,模拟一个内部类处理一个文件下载服务,多个类可以处理多个文件下载服务
    private class Handle{
        private StringBuilder message;
        private boolean writeOK = true;
        private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        private FileChannel fileChannel;
        private String fileName;

        private void handle(SelectionKey key) throws Exception {
            if(key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                if(writeOK)
                    message = new StringBuilder();
                while(true) {
                    byteBuffer.clear();
                    int r = socketChannel.read(byteBuffer);
                    if(r == 0)
                        break;
                    if(r == -1) {
                        socketChannel.close();
                        key.cancel();
                        return;
                    }
                    message.append(new String(byteBuffer.array(), 0, r));
                }
                //将接收到的信息转化成文件名,以映射到服务器上的指定文件
                if(writeOK && invokeMessage(message)) {
                    socketChannel.register(selector, SelectionKey.OP_WRITE);
                    writeOK = false;
                }
            }
            //向客户端写数据
            if(key.isWritable()) { //读方法中的socketChannel和写方法中的socketChannel应该是一个,不需要flip吗?
                if(!key.isValid())
                    return;
                SocketChannel socketChannel = (SocketChannel) key.channel();
                if(fileChannel == null)
                    fileChannel = new FileInputStream(fileName).getChannel();
                byteBuffer.clear();
                int w = fileChannel.read(byteBuffer);
                //如果文件已写完,则关掉key和socket
                if(w <= 0) {
                    fileName = null;
                    fileChannel.close();
                    fileChannel = null;
                    writeOK = true;
                    socketChannel.close();
                    key.channel();
                    return;
                }
                byteBuffer.flip();
                socketChannel.write(byteBuffer);
            }
        }

        //将信息转化成文件名
        private boolean invokeMessage(StringBuilder message) {
            String m = message.toString();
            try {
                File f = new File(m);
                if(!f.exists())
                    return false;
                fileName = m;
                return true;
            } catch(Exception e) {
                return false;
            }
        }

    }

    public static void main(String[] args) throws Exception {
        /*
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(new DownloadServer<Object>());
        executorService.shutdown();
        */
        new DownloadServer().call();
    }
}

  客户端的代码为:

package nio;

import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SelectorClient<T> implements Callable<T>{
    private FileChannel fileChannel;
    private static Selector selector;
    private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    private String serverFileName;//服务器上的文件
    private String localFileName;//下载到客户端的文件名

    public SelectorClient(String serverFileName, String localFileName) {
        this.serverFileName = serverFileName;
        this.localFileName = localFileName;
    }

    public T call() throws Exception {
        //开启selector,并建立socket到指定端口的连接
        if(selector == null)
            selector = Selector.open();
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress("172.16.72.181", 2361));
        channel.register(selector, SelectionKey.OP_CONNECT); //客户端连接服务端事件 
        //进行信息读取
        for(; ;) {
            selector.select();
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //连接事件
                if(key.isConnectable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    if(socketChannel.isConnectionPending())
                        socketChannel.finishConnect();
                    socketChannel.write(ByteBuffer.wrap(serverFileName.getBytes()));//向服务器发信息,信息中即服务器上的文件名
                    socketChannel.register(selector, SelectionKey.OP_READ);  // 读事件
                }
                //读事件
                if(key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    byteBuffer.clear();
                    if(!socketChannel.isConnected())
                        return null;
                    //向本机下载文件创建文件channel
                    if(fileChannel == null)
                        fileChannel = new RandomAccessFile(localFileName, "rw").getChannel();
                    int r = socketChannel.read(byteBuffer);
                    //如果文件下载完毕,则关掉channel,同时关掉socketChannel
                    if(r <= 0) {
                        if(fileChannel != null)
                            fileChannel.close();
                        channel.close();
                        key.cancel();
                        return null;
                    }
                    byteBuffer.flip();
                    //写到下载文件中
                    fileChannel.write(byteBuffer);
                }
            }
        }
    }

    //客户端用10个线程向服务器端下载文件,并保存为不同的文件
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for(int i = 0; i < 10; i++) {
            executorService.submit(new SelectorClient<Object>("test.txt", "d:/down" + i + ".txt"));
        }
        executorService.shutdown();
    }
}

 

NIO基础篇(二)

标签:

原文地址:http://www.cnblogs.com/lnlvinso/p/4601138.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!