服务端、客户端
package com.dsp.nio; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * * 监控是否可连接、可读、可写 * * 代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler * */ public class Reactor implements Runnable { private static Logger log = LoggerFactory.getLogger(Reactor.class); final Selector selector; final ServerSocketChannel serverSocket; /** * 服务端配置初始化,监听端口 * @param port * @throws IOException */ public Reactor(int port) throws IOException { this.selector = Selector.open(); this.serverSocket = ServerSocketChannel.open(); this.serverSocket.socket().bind(new InetSocketAddress(port)); this.serverSocket.configureBlocking(false); SelectionKey selectionKey = this.serverSocket.register(selector, SelectionKey.OP_ACCEPT); // 利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor selectionKey.attach(new Acceptor()); log.info("===>>> attach(new Acceptor())"); } /* * SPI */ // Alternatively, use explicit SPI provider // SelectorProvider selectorProvider = SelectorProvider.provider(); // selector = selectorProvider.openSelector(); // serverSocket = selectorProvider.openServerSocketChannel(); /** * 分发请求 * * @param selectionKey */ void dispatch(SelectionKey selectionKey) { Runnable run = (Runnable) (selectionKey.attachment()); if (run != null) { run.run(); } } /** * 监听连接和channel是否就绪 */ public void run() { try { /** * 线程未被中断 */ while (!Thread.interrupted()) { int readySize = this.selector.select(); log.info("I/O ready size = {}", readySize); Set<?> selectedKeys = this.selector.selectedKeys(); Iterator<?> iterator = selectedKeys.iterator(); // Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。 while (iterator.hasNext()) { /* * 一个新的连接,第一次出发Accepter线程任务,之后触发Handler线程任务 */ SelectionKey selectionKey = (SelectionKey) iterator.next(); log.info("===>>> acceptable = {}, connectable = {}, readable = {}, writable = {}.",
selectionKey.isAcceptable(), selectionKey.isConnectable(),
selectionKey.isReadable(), selectionKey.isWritable()); dispatch(selectionKey); } selectedKeys.clear(); } } catch (IOException ex) { log.info("reactor stop!" + ex); } } /** * 处理新连接 * * @author dsp * */ class Acceptor implements Runnable { @Override public void run() { try { log.debug("===>>> ready for accept!"); SocketChannel socketChannel = serverSocket.accept(); if (socketChannel != null) { new Handler(selector, socketChannel); } } catch (IOException ex) { /* . . . */ } } } }
package com.dsp.nio; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /** * * 处理读写 * */ final class Handler implements Runnable { private static Logger log = LoggerFactory.getLogger(Reactor.class); static final int MAX_IN = 1024; static final int MAX_OUT = 1024; ByteBuffer inputBuffer = ByteBuffer.allocate(MAX_IN); ByteBuffer output = ByteBuffer.allocate(MAX_OUT); final SocketChannel socketChannel; final SelectionKey selectionKey; static final int READING = 0, SENDING = 1; int state = READING; /** * 注意在Handler里面又执行了一次attach,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler,从而开始了数据的 * “读 -> 处理 -> 写 -> 发出” 等流程处理。 * * @param selector * @param socketChannel * @throws IOException */ Handler(Selector selector, SocketChannel socketChannel) throws IOException { this.socketChannel = socketChannel; this.socketChannel.configureBlocking(false); this.selectionKey = this.socketChannel.register(selector, 0); this.selectionKey.attach(this); this.selectionKey.interestOps(SelectionKey.OP_READ); // selector.wakeup(); } /** * 只是返回true,具体的判断没有实现 * * @return */ boolean inputIsComplete() { return true; } /** * 只是返回true,具体的判断没有实现 * * @return */ boolean outputIsComplete() { return true; } /** * 处理数据(无具体实现) */ void process(String msg) { // output.put("hello world, hello dsp!".getBytes()); String outMsg = "out + " + msg; output.put(outMsg.getBytes()); output.flip(); } /** * 读取请求数据并处理 * * @throws IOException */ void read() throws IOException { log.info("===>>> read into bytebuffer from socketchannel inputs."); if (inputIsComplete()) { socketChannel.read(inputBuffer); inputBuffer.flip(); byte[] inputBytes = new byte[inputBuffer.limit()]; inputBuffer.get(inputBytes); String inputString = new String(inputBytes); log.info("===>>> 从客户端读取请求信息 = {}", inputString); log.info("===>>> read complete."); process(inputString); state = SENDING; // 读完了数据之后,注册OP_WRITE事件 selectionKey.interestOps(SelectionKey.OP_WRITE); } } /** * 返回响应信息 * * @throws IOException */ void send() throws IOException { log.info("===>>> write into socketchannel from bytebuffer outputs"); socketChannel.write(output); if (outputIsComplete()) { // The key will be removed from all of the selector‘s key sets during the next // selection operation. selectionKey.cancel(); // 关闭通过,也就关闭了连接 socketChannel.close(); log.info("===>>> close socketchannel after write complete"); } } @Override public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* . . . */ } } }
package com.dsp.nio; import java.io.IOException; /** * * Model: Reactor in SingleThread * * 利用NIO多路复用机制,多路IO复用一个线程 * * @author dsp * */ public class ReactorInSingleThreadServer { public static void main(String args[]) throws IOException { Reactor reactor = new Reactor(9999); reactor.run(); // 不会开启线程,相当于普通方法调用 } }
package com.dsp.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * 访问NIO服务器的客户端 * * @author dsp * */ public class ReactorInSingleThreadClient extends Thread { private static Logger log = LoggerFactory.getLogger(ReactorInSingleThreadClient.class); private static LinkedBlockingQueue<Thread> failureQueue = new LinkedBlockingQueue<Thread>(); @Override public void run() { try { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = SocketChannel.open(); boolean connected = socketChannel.connect(new InetSocketAddress(9999)); if (connected) { log.info("===>>> 和服务器 {} 已连接...", socketChannel.getRemoteAddress()); /* * 请求 */ String msg = "in + 你好,dsp!" + Thread.currentThread().getName(); buffer.put(msg.getBytes()); buffer.flip(); socketChannel.write(buffer); buffer.clear(); /* * 响应 */ buffer.clear(); socketChannel.read(buffer); buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); String string = new String(data); log.info("===>>> " + string); buffer.clear(); socketChannel.close(); } else { log.error("连不上服务器..."); } } catch (java.net.ConnectException e) { failureQueue.offer(this); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, InterruptedException { int maxThreads = 3000; while (maxThreads-- > 0) { new Thread(new ReactorInSingleThreadClient()).start(); } Thread.sleep(Integer.MAX_VALUE); } }
^_^