标签:this param lse 线程安全 false mic nios 绑定 读取数据
package com.cn; import java.io.IOException; import java.nio.channels.Selector; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import com.cn.pool.NioSelectorRunnablePool; /** * 抽象selector线程基类 */ public abstract class AbstractNioSelector implements Runnable { /** * 线程池 */ private final Executor executor; /** * 选择器 */ protected Selector selector; /** * 选择器wakenUp状态标记 */ protected final AtomicBoolean wakenUp = new AtomicBoolean(); /** * 线程安全任务队列 */ private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); /** * 线程名称 */ private String threadName; /** * 线程池管理对象 */ protected NioSelectorRunnablePool selectorRunnablePool; AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { this.executor = executor; this.threadName = threadName; this.selectorRunnablePool = selectorRunnablePool; openSelector(); } /** * 获取selector并启动线程,一个线程拥有了select才能为多个客服服务。 */ private void openSelector() { try { this.selector = Selector.open(); } catch (IOException e) { throw new RuntimeException("Failed to create a selector."); } executor.execute(this);//像线程池中加入一个任务,并执行任务的run方法。运行当前任务,执行run方法。从线程池拿出一个线程执行这个任务。 } @Override public void run() { Thread.currentThread().setName(this.threadName);//给当前线程付一个名字 while (true) { try { wakenUp.set(false); select(selector);//接口,执行NioServerBoss或者NioServerWorker的select方法 processTaskQueue();//执行完任务队列里面的任务 process(selector);//接口,执行NioServerBoss或者NioServerWorker的process方法 } catch (Exception e) { // ignore } } } /** * 注册一个任务并激活selector * * @param task */ protected final void registerTask(Runnable task) { taskQueue.add(task); Selector selector = this.selector; if (selector != null) { if (wakenUp.compareAndSet(false, true)) {//wakenUp是不是false,是false就置为true, selector.wakeup(); } } else { taskQueue.remove(task); } } /** * 执行队列里的任务 */ private void processTaskQueue() { for (;;) { final Runnable task = taskQueue.poll(); if (task == null) { break; } task.run();//task是runnable元素 } } /** * 获取线程管理对象 * @return */ public NioSelectorRunnablePool getSelectorRunnablePool() { return selectorRunnablePool; } /** * select抽象方法 子类有重写 */ protected abstract int select(Selector selector) throws IOException; /** * selector的业务处理 子类有重写 */ protected abstract void process(Selector selector) throws IOException; }
package com.cn; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; import com.cn.pool.Boss; import com.cn.pool.NioSelectorRunnablePool; import com.cn.pool.Worker; /** * boss实现类,每一个NioServerBoss再一个线程里面 */ public class NioServerBoss extends AbstractNioSelector implements Boss{ public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor, threadName, selectorRunnablePool); } @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey key = i.next(); i.remove(); ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 新客户端 SocketChannel channel = server.accept(); // 设置为非阻塞 channel.configureBlocking(false); // 获取一个worker Worker nextworker = getSelectorRunnablePool().nextWorker();//通过线程管理对象获取一个worker(runnable任务对象), // 注册新客户端接入任务,将新的连接请求交给worker。 nextworker.registerNewChannelTask(channel);//往别的任务队列里面加任务 //安卓里面,子线程不能改变UI,要改变就要向主线程的任务队列里面加任务。 System.out.println("新客户端链接"); } } public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){ final Selector selector = this.selector; registerTask(new Runnable() { @Override public void run() { try { //注册serverChannel到selector serverChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } @Override protected int select(Selector selector) throws IOException { return selector.select(); } }
package com.cn; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; import com.cn.pool.NioSelectorRunnablePool; import com.cn.pool.Worker; /** * worker实现类,每一个NioServerWorker再一个线程里面 */ public class NioServerWorker extends AbstractNioSelector implements Worker{ public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor, threadName, selectorRunnablePool); } @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // 移除,防止重复处理 ite.remove(); // 得到事件发生的Socket通道 SocketChannel channel = (SocketChannel) key.channel(); // 数据总长度 int ret = 0; boolean failure = true; ByteBuffer buffer = ByteBuffer.allocate(1024); //读取数据 try { ret = channel.read(buffer); failure = false; } catch (Exception e) { // ignore } //判断是否连接已断开 if (ret <= 0 || failure) { key.cancel(); System.out.println("客户端断开连接"); }else{ System.out.println("收到数据:" + new String(buffer.array())); //回写数据 ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes()); channel.write(outBuffer);// 将消息回送给客户端 } } } /** * 加入一个新的socket客户端 */ public void registerNewChannelTask(final SocketChannel channel){ final Selector selector = this.selector; registerTask(new Runnable() { @Override public void run() { try { //将客户端注册到selector中 channel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } @Override protected int select(Selector selector) throws IOException { return selector.select(60000); } }
package com.cn; import java.net.SocketAddress; import java.nio.channels.ServerSocketChannel; import com.cn.pool.Boss; import com.cn.pool.NioSelectorRunnablePool; /** * 服务类 */ public class ServerBootstrap { private NioSelectorRunnablePool selectorRunnablePool; public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) { this.selectorRunnablePool = selectorRunnablePool; } /** * 监听端口 * @param localAddress */ public void bind(final SocketAddress localAddress){ try { // 获得一个ServerSocket通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); // 设置通道为非阻塞 serverChannel.configureBlocking(false); // 将该通道对应的ServerSocket绑定到port端口 serverChannel.socket().bind(localAddress); //获取一个boss线程 Boss nextBoss = selectorRunnablePool.nextBoss(); //向boss注册一个ServerSocket通道 nextBoss.registerAcceptChannelTask(serverChannel); } catch (Exception e) { e.printStackTrace(); } } }
package com.cn; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import com.cn.pool.NioSelectorRunnablePool; /** * 启动函数 */ public class Start { public static void main(String[] args) { //管理线程池的,初始化2个线程池,一个boss一个work, NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); //获取服务类 ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool); //绑定端口 bootstrap.bind(new InetSocketAddress(10101)); System.out.println("start"); } }
package com.cn.pool; import java.nio.channels.ServerSocketChannel; /** * boss接口 */ public interface Boss { /** * 加入一个新的ServerSocket,监听连接 */ public void registerAcceptChannelTask(ServerSocketChannel serverChannel); }
package com.cn.pool; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import com.cn.NioServerBoss; import com.cn.NioServerWorker; /** * selector线程管理者 * * 线程池是有多个线程,每个线程里面有一个任务队列,线程run的时候会从任务队列取一个任务出来,执行任务的run方法, 队列里面没有任务就阻塞等待新的任务进来。 */ public class NioSelectorRunnablePool { /** * boss任务数组,boss用来监听端口的, */ private final AtomicInteger bossIndex = new AtomicInteger(); private Boss[] bosses; /** * worker任务数组,用来处理事件的, */ private final AtomicInteger workerIndex = new AtomicInteger(); private Worker[] workeres; //boss和worker是一个线程池 public NioSelectorRunnablePool(Executor boss, Executor worker) { initBoss(boss, 1);//boss是一个线程池。 initWorker(worker, Runtime.getRuntime().availableProcessors() * 2); } /** * 初始化boss线程池的runable任务数组 * @param boss * @param count */ private void initBoss(Executor boss, int count) { this.bosses = new NioServerBoss[count]; //this.bosses是一个数组,里面是一个个的NioServerBoss, //NioServerBoss是runnable任务对象。runnable对象里面有线程池、选择器、线程名、线程管理者。 //executor.execute(this);通过NioServerBoss里面的线程池把任务对象NioServerBoss自己运行起来。 //所有的NioServerBoss任务对象都是通过boss线程池来调度的。 for (int i = 0; i < bosses.length; i++) { bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);//this是NioSelectorRunnablePool线程池管理者。 //boss thread 是任务runable的名字 } } /** * 初始化worker线程池的runable任务数组 * @param worker * @param count */ private void initWorker(Executor worker, int count) { this.workeres = new NioServerWorker[2/*count*/]; for (int i = 0; i < workeres.length; i++) { //所有的NioServerWorker任务对象都是通过worker线程池来调度的。 workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this); } //boss线程池里面有8个NioServerBoss.runable对象(8个大任务,开了8个线程), //每一个NioServerWorker再一个线程里面。8个NioServerBoss.runable对象一开始就去run, //每个NioServerBoss.runable对象里面有一个任务队列taskQueue,队列里面是一个个的Runnable对象。 /* public static void main(String[] args) { //创建一个线程池,可回收的,没任务就回收了。newCachedThreadPool可以很大。60秒没任务就回收。 ExecutorService pool = Executors.newCachedThreadPool();//线程池 for(int i = 1; i < 5; i++){//4个任务,一个任务就是一个Runnable pool.execute(new Runnable() {//没有返回值 @Override public void run() { try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread name: " + Thread.currentThread().getName()); } }); try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } pool.shutdown();//任务执行完就关了。 /*thread name: pool-1-thread-1 thread name: pool-1-thread-2 thread name: pool-1-thread-1 thread name: pool-1-thread-2 线程执行完了会回收,不一定开4个线程*/ } /** * 获取一个worker的runable任务,给每个work平均分配 */ public Worker nextWorker() { return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)]; } /** * 获取一个boss的runable任务 */ public Boss nextBoss() { return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; } }
package com.cn.pool; import java.nio.channels.SocketChannel; /** * worker接口 */ public interface Worker { /** * 加入一个新的客户端会话,监听客户端的处理 */ public void registerNewChannelTask(SocketChannel channel); }
标签:this param lse 线程安全 false mic nios 绑定 读取数据
原文地址:https://www.cnblogs.com/yaowen/p/9083763.html