码迷,mamicode.com
首页 > 编程语言 > 详细

Java NIO案例

时间:2018-03-01 23:35:24      阅读:237      评论:0      收藏:0      [点我收藏+]

标签:任务   rom   style   inf   stop   react   --   没有   cli   

服务端、客户端

package com.dsp.nio;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 *
 * 监控是否可连接、可读、可写
 *
 * 代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler
 *
 */
public class Reactor implements Runnable {

    private static Logger log = LoggerFactory.getLogger(Reactor.class);

    final Selector selector;

    final ServerSocketChannel serverSocket;

    /**
     * 服务端配置初始化,监听端口
     * @param port
     * @throws IOException
     */
    public Reactor(int port) throws IOException {
        this.selector = Selector.open();
        this.serverSocket = ServerSocketChannel.open();
        this.serverSocket.socket().bind(new InetSocketAddress(port));
        this.serverSocket.configureBlocking(false);
        SelectionKey selectionKey = this.serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        // 利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor
        selectionKey.attach(new Acceptor());
        log.info("===>>> attach(new Acceptor())");
    }
    /*
     * SPI
     */
    // Alternatively, use explicit SPI provider
    // SelectorProvider selectorProvider = SelectorProvider.provider();
    // selector = selectorProvider.openSelector();
    // serverSocket = selectorProvider.openServerSocketChannel();

    /**
     * 分发请求
     *
     * @param selectionKey
     */
    void dispatch(SelectionKey selectionKey) {
        Runnable run = (Runnable) (selectionKey.attachment());
        if (run != null) {
            run.run();
        }
    }

    /**
     * 监听连接和channel是否就绪
     */
    public void run() {
        try {
            /**
             * 线程未被中断
             */
            while (!Thread.interrupted()) {
                int readySize = this.selector.select();
                log.info("I/O ready size = {}", readySize);
                Set<?> selectedKeys = this.selector.selectedKeys();
                Iterator<?> iterator = selectedKeys.iterator();
                // Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
                while (iterator.hasNext()) {
                    /*
                     * 一个新的连接,第一次出发Accepter线程任务,之后触发Handler线程任务
                     */
                    SelectionKey selectionKey = (SelectionKey) iterator.next();
                    log.info("===>>> acceptable = {}, connectable = {}, readable = {}, writable = {}.", 
              selectionKey.isAcceptable(), selectionKey.isConnectable(),
              selectionKey.isReadable(), selectionKey.isWritable()); dispatch(selectionKey); } selectedKeys.clear(); } }
catch (IOException ex) { log.info("reactor stop!" + ex); } } /** * 处理新连接 * * @author dsp * */ class Acceptor implements Runnable { @Override public void run() { try { log.debug("===>>> ready for accept!"); SocketChannel socketChannel = serverSocket.accept(); if (socketChannel != null) { new Handler(selector, socketChannel); } } catch (IOException ex) { /* . . . */ } } } }
package com.dsp.nio;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 *
 * 处理读写
 *
 */
final class Handler implements Runnable {

    private static Logger log = LoggerFactory.getLogger(Reactor.class);

    static final int MAX_IN = 1024;

    static final int MAX_OUT = 1024;

    ByteBuffer inputBuffer = ByteBuffer.allocate(MAX_IN);

    ByteBuffer output = ByteBuffer.allocate(MAX_OUT);

    final SocketChannel socketChannel;

    final SelectionKey selectionKey;

    static final int READING = 0, SENDING = 1;

    int state = READING;

    /**
     * 注意在Handler里面又执行了一次attach,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler,从而开始了数据的
     * “读 -> 处理 -> 写 -> 发出” 等流程处理。
     *
     * @param selector
     * @param socketChannel
     * @throws IOException
     */
    Handler(Selector selector, SocketChannel socketChannel) throws IOException {
        this.socketChannel = socketChannel;
        this.socketChannel.configureBlocking(false);
        this.selectionKey = this.socketChannel.register(selector, 0);
        this.selectionKey.attach(this);
        this.selectionKey.interestOps(SelectionKey.OP_READ);
        // selector.wakeup();
    }

    /**
     * 只是返回true,具体的判断没有实现
     *
     * @return
     */
    boolean inputIsComplete() {
        return true;
    }

    /**
     * 只是返回true,具体的判断没有实现
     *
     * @return
     */
    boolean outputIsComplete() {
        return true;
    }

    /**
     * 处理数据(无具体实现)
     */
    void process(String msg) {
//        output.put("hello world, hello dsp!".getBytes());
        String outMsg = "out + " + msg;
        output.put(outMsg.getBytes());
        output.flip();
    }

    /**
     * 读取请求数据并处理
     *
     * @throws IOException
     */
    void read() throws IOException {
        log.info("===>>> read into bytebuffer from socketchannel inputs.");
        if (inputIsComplete()) {
            socketChannel.read(inputBuffer);
            inputBuffer.flip();
            byte[] inputBytes = new byte[inputBuffer.limit()];
            inputBuffer.get(inputBytes);
            String inputString = new String(inputBytes);
            log.info("===>>> 从客户端读取请求信息 = {}", inputString);
            log.info("===>>> read complete.");

            process(inputString);

            state = SENDING;
            // 读完了数据之后,注册OP_WRITE事件
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        }
    }

    /**
     * 返回响应信息
     *
     * @throws IOException
     */
    void send() throws IOException {
        log.info("===>>> write into socketchannel from bytebuffer outputs");
        socketChannel.write(output);
        if (outputIsComplete()) {
            // The key will be removed from all of the selector‘s key sets during the next
            // selection operation.
            selectionKey.cancel();
            // 关闭通过,也就关闭了连接
            socketChannel.close();
            log.info("===>>> close socketchannel after write complete");
        }
    }

    @Override
    public void run() {
        try {
            if (state == READING)
                read();
            else if (state == SENDING)
                send();
        } catch (IOException ex) {
            /* . . . */
        }
    }

}
package com.dsp.nio;

import java.io.IOException;

/**
 *
 * Model: Reactor in SingleThread
 *
 * 利用NIO多路复用机制,多路IO复用一个线程
 *
 * @author dsp
 *
 */
public class ReactorInSingleThreadServer {

    public static void main(String args[]) throws IOException {
        Reactor reactor = new Reactor(9999);
        reactor.run(); // 不会开启线程,相当于普通方法调用
    }

}
package com.dsp.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * 访问NIO服务器的客户端
 *
 * @author dsp
 *
 */
public class ReactorInSingleThreadClient extends Thread {

    private static Logger log = LoggerFactory.getLogger(ReactorInSingleThreadClient.class);

    private static LinkedBlockingQueue<Thread> failureQueue = new LinkedBlockingQueue<Thread>();

    @Override
    public void run() {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = SocketChannel.open();
            boolean connected = socketChannel.connect(new InetSocketAddress(9999));
            if (connected) {
                log.info("===>>> 和服务器 {} 已连接...", socketChannel.getRemoteAddress());
                /*
                 * 请求
                 */
                String msg = "in  + 你好,dsp!" + Thread.currentThread().getName();
                buffer.put(msg.getBytes());
                buffer.flip();
                socketChannel.write(buffer);
                buffer.clear();

                /*
                 * 响应
                 */
                buffer.clear();
                socketChannel.read(buffer);
                buffer.flip();
                byte[] data = new byte[buffer.limit()];
                buffer.get(data);
                String string = new String(data);
                log.info("===>>> " + string);
                buffer.clear();

                socketChannel.close();
            } else {
                log.error("连不上服务器...");
            }
        } catch (java.net.ConnectException e) {
            failureQueue.offer(this);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        int maxThreads = 3000;
        while (maxThreads-- > 0) {
            new Thread(new ReactorInSingleThreadClient()).start();
        }

        Thread.sleep(Integer.MAX_VALUE);
    }

}

^_^

Java NIO案例

标签:任务   rom   style   inf   stop   react   --   没有   cli   

原文地址:https://www.cnblogs.com/gotodsp/p/8490702.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!