码迷,mamicode.com
首页 > 编程语言 > 详细

java的nio之:java的nio的服务器实现模型

时间:2016-09-05 20:38:12      阅读:224      评论:0      收藏:0      [点我收藏+]

标签:

【nio服务端序列图】

技术分享

一:nio服务器启动类

技术分享
 1 package com.yeepay.sxf.testnio;
 2 /**
 3  * nio创建的的timerServer服务器
 4  * 
 5  * @author sxf
 6  *
 7  */
 8 public class NIOTimerServer {
 9     
10     /**
11      * nio服务器启动的入口
12      * @param args
13      */
14     public static void main(String[] args) {
15         //启动服务器绑定的端口号
16         int port=8000;
17         //获取端口号
18         if(args!=null && args.length>0){
19             try {
20                 port=Integer.valueOf(args[0]);
21             } catch (Exception e) {
22                 e.printStackTrace();
23             }
24         }
25         
26         //新建nio服务器类
27         MultiplexerTimerServer timerServer=new MultiplexerTimerServer(port);
28         
29         //启动服务类的主线程
30         new Thread(timerServer,"NIO-MultiplexerTimerServer-001").start();
31     }
32 }
View Code

二:nio服务器

技术分享
  1 package com.yeepay.sxf.testnio;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.IOException;
  5 import java.net.InetSocketAddress;
  6 import java.nio.ByteBuffer;
  7 import java.nio.channels.SelectionKey;
  8 import java.nio.channels.Selector;
  9 import java.nio.channels.ServerSocketChannel;
 10 import java.nio.channels.SocketChannel;
 11 import java.util.Date;
 12 import java.util.Iterator;
 13 import java.util.Set;
 14 
 15 import com.sun.org.apache.xml.internal.utils.StopParseException;
 16 
 17 /**
 18  * nio的时间服务器
 19  * @author sxf
 20  *
 21  */
 22 public class MultiplexerTimerServer implements Runnable {
 23     
 24     //选择器
 25     private Selector selector;
 26 
 27     //
 28     private ServerSocketChannel serverSocketChannel;
 29     
 30     private volatile boolean stop;
 31     
 32     //启动服务
 33     public MultiplexerTimerServer(int port){
 34         try {
 35             //初始化多路复用器
 36             selector=Selector.open();
 37             //初始化socket通道
 38             serverSocketChannel=ServerSocketChannel.open();
 39             //设置通道为非阻塞模式
 40             serverSocketChannel.configureBlocking(false);
 41             //将该通道绑定地址和端口号
 42             serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
 43             //将该通道注册到多路复用器,并注册链接请求事件
 44             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
 45             System.out.println("The time server is start in port:"+port);
 46         } catch (Exception e) {
 47             // TODO: handle exception
 48             e.printStackTrace();
 49             System.exit(1);
 50         }
 51     }
 52     
 53     /**
 54      * 停止服务器
 55      */
 56     public void stop(){
 57         this.stop=true;
 58     }
 59     
 60     
 61     /**
 62      * 服务器运行主体
 63      */
 64     @Override
 65     public void run() {
 66         while(!stop){
 67             try {
 68                 System.out.println("MultiplexerTimerServer.run()");
 69                 //select()阻塞到至少有一个通道在你注册的事件上就绪了。
 70                 selector.select();
 71                 //获取注册在这个多路复用器上的已经就绪的通道的集合
 72                 Set<SelectionKey> selectionKeys=selector.selectedKeys();
 73                 //循环迭代已经就绪的通道集合
 74                 Iterator<SelectionKey> it=selectionKeys.iterator();
 75                 SelectionKey key=null;
 76                 while(it.hasNext()){
 77                     key=it.next();
 78                     //防止重复执行通道事件
 79                     it.remove();
 80                     //处理该通道上的事件
 81                     try {
 82                         handleInput(key);
 83                     } catch (Exception e) {
 84                         if(key!=null){
 85                             key.cancel();
 86                             if(key.channel()!=null){
 87                                 key.channel().close();
 88                             }
 89                         }
 90                     }
 91                 }
 92                 
 93             } catch (Exception e) {
 94                 e.printStackTrace();
 95             }
 96         
 97             
 98         }
 99     }
100 
101     
102     /**
103      * 处理请求的事件
104      * @param key
105      * @throws IOException
106      */
107     private void handleInput(SelectionKey key) throws IOException{
108         if(key.isValid()){
109             //处理新接入的请求消息
110             if(key.isAcceptable()){
111                 //请求链接事件就绪
112                 ServerSocketChannel ssc=(ServerSocketChannel) key.channel();
113                 SocketChannel  sc=ssc.accept();
114                 sc.configureBlocking(false);
115                 //在多路复用器上注册一个soketChannel,当有读事件则触发
116                 sc.register(selector, SelectionKey.OP_READ);
117             }
118             
119             if(key.isReadable()){
120                 //读事件就绪
121                 SocketChannel sc=(SocketChannel) key.channel();
122                 //声明一个缓冲区
123                 ByteBuffer readBuffer=ByteBuffer.allocate(1024);
124                 //从通道里读取数据写入缓冲区
125                 int readBytes=sc.read(readBuffer);
126                 //readBytes>0:表示读到了字节,对字节进行编解码。
127                 //readBytes=0:没有读取到字节,属于正常场景,忽略
128                 //readBytes=-1;链路已经关闭,需要关闭socketChannel,释放资源
129                 if(readBytes>0){
130                     //将ByteBuffer的limit设置为position,position设置为0
131                     readBuffer.flip();
132                     //编解码数据
133                     byte[] bytes=new byte[readBuffer.remaining()];
134                     //将数据从缓冲区复制到数组里
135                     readBuffer.get(bytes);
136                     //翻译请求的内容
137                     String body=new String(bytes,"UTF-8");
138                     //打印请求的内容
139                     System.out.println("the timerserver receive order:"+body);
140                 
141                     //处理请求内容
142                     String currentTime=null;
143                     if("shangxiaofei".equals(body)){
144                         currentTime=new Date().toString();
145                     }else{
146                         currentTime="request param is error";
147                     }
148                     
149                     //将处理的结果响应给客户端
150                     doWrite(sc, currentTime);
151                 }else if(readBytes<0){
152                     //对链路进行关闭
153                     key.cancel();
154                     sc.close();
155                 }else{
156                     //忽略
157                 }
158             }
159         }
160     }
161     
162     /**
163      * 响应请求的内容
164      * @param channel
165      * @param response
166      * @throws IOException 
167      */
168     private void doWrite(SocketChannel channel,String response) throws IOException{
169         if(response!=null&&response.trim().length()>0){
170             //将响应的内容转化成byte[]
171             byte[] bytes=response.getBytes();
172             //声明缓冲区
173             ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
174             //将数据写入缓冲区
175             writeBuffer.put(bytes);
176             //修改ByteBuffer的imit设置为position,position设置为0
177             writeBuffer.flip();
178             //将数据从缓冲区写入通道
179             channel.write(writeBuffer);
180         }
181     }
182     
183     
184 }
View Code

【nio客户端序列图】

技术分享

 

三:nio服务器客户端启动类

技术分享
 1 package com.yeepay.sxf.testnio;
 2 
 3 
 4 /**
 5  * 向TimerServer发送请求的客户端
 6  * @author sxf
 7  *
 8  */
 9 public class NIOTimerClient {
10     
11     public static void main(String[] args) {
12         int port=8000;
13         
14         if(args!=null&&args.length>0){
15             port=Integer.valueOf(args[0]);
16         }
17         new Thread(new TimerClientHandler("127.0.0.1", port),"TimeClient-001").start();
18     }
19 }
View Code

四:nio服务器的客户端

技术分享
  1 package com.yeepay.sxf.testnio;
  2 
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.SocketChannel;
  9 import java.util.Iterator;
 10 import java.util.Set;
 11 
 12 /**
 13  * timerclient请求线程
 14  * @author sxf
 15  *
 16  */
 17 public class TimerClientHandler implements Runnable{
 18     //链接timer服务器的ip地址
 19     private String host;
 20     //链接timer服务器服务的端口号
 21     private int port;
 22     //多路复用器
 23     private Selector selector;
 24     //通道
 25     private SocketChannel socketChannel;
 26     //当前请求线程是否停止
 27     private volatile boolean stop;
 28     
 29     
 30     public TimerClientHandler(String host,int port) {
 31         this.host=host==null?"127.0.0.1":host;
 32         this.port=port;
 33         try {
 34             this.selector=Selector.open();
 35             this.socketChannel=SocketChannel.open();
 36             socketChannel.configureBlocking(false);
 37         } catch (Exception e) {
 38             e.printStackTrace();
 39             System.exit(1);
 40         }
 41     }
 42     
 43     /**
 44      * 链接时间服务器
 45      * @throws IOException 
 46      */
 47     private void doConnect() throws IOException{
 48         if(socketChannel.connect(new InetSocketAddress(host, port))){
 49             socketChannel.register(selector, SelectionKey.OP_READ);
 50             //doWrite(socketChannel);
 51         }else{
 52             socketChannel.register(selector, SelectionKey.OP_CONNECT);
 53         }
 54     }
 55     
 56     /**
 57      * 向时间服务器发送请求
 58      * @param sc
 59      * @throws IOException 
 60      */
 61     private void doWrite(SocketChannel sc) throws IOException{
 62         //发送请求的请求内容
 63         byte[] req="shangxiaofei".getBytes();
 64         //声明缓冲区
 65         ByteBuffer writeBuffer=ByteBuffer.allocate(req.length);
 66         //将请求体写入缓冲区
 67         writeBuffer.put(req);
 68         //设置limit
 69         writeBuffer.flip();
 70         //将缓冲区的内容写入通道
 71         sc.write(writeBuffer);
 72         if(!writeBuffer.hasRemaining()){
 73             System.out.println("send order to server success........");
 74         }
 75         
 76     }
 77     
 78     
 79     private void handleInput(SelectionKey key) throws IOException{
 80         if(key.isValid()){
 81             //判断链接是否成功
 82             SocketChannel sc=(SocketChannel) key.channel();
 83         
 84                 //链接事件就绪
 85                 if(sc.finishConnect()){
 86                     //是否链接完成
 87                     sc.register(selector, SelectionKey.OP_READ);
 88                     doWrite(sc);
 89                 }else{
 90                     //链接失败,进程退出
 91                     System.exit(1);
 92                 }
 93                 
 94                 if(key.isReadable()){
 95                     //读事件就绪
 96                     ByteBuffer readBuffer=ByteBuffer.allocate(1024);
 97                     int readBytes=sc.read(readBuffer);
 98                     if(readBytes>0){
 99                         readBuffer.flip();
100                         byte[] bytes=new byte[readBuffer.remaining()];
101                         readBuffer.get(bytes);
102                         String body=new String(bytes,"UTF-8");
103                         System.out.println("TimerServer response:"+body);
104                         this.stop=true;
105                     }else if(readBytes<0){
106                         //对端链路关闭
107                         key.cancel();
108                         sc.close();
109                     }else{
110                         //读到0字节,忽略
111                     }
112                 }
113             
114         }
115     }
116     
117     @Override
118     public void run() {
119         try {
120             //链接并发送请求
121             doConnect();
122         } catch (Exception e) {
123             // TODO: handle exception
124             e.printStackTrace();
125         }
126         
127         while(!stop){
128             try {
129                 //等待响应
130                 selector.select();
131                 //获取已经就绪的通道事件集合,在这个多路复用器上
132                 Set<SelectionKey> selectedKeys=selector.selectedKeys();
133                 //循环迭代处理事件集合
134                 Iterator<SelectionKey> it=selectedKeys.iterator();
135                 SelectionKey key=null;
136                 while (it.hasNext()) {
137                     key=it.next();
138                     it.remove();
139                     try {
140                         handleInput(key);
141                     } catch (Exception e) {
142                         e.printStackTrace();
143                     }
144                     
145                 }
146             } catch (Exception e) {
147                 e.printStackTrace();
148             }
149         }
150         
151         //多路复用器关闭后,所有注册在上面的channel和Pipe等资源都会被自动去注册并关闭
152         //所以不需要重复释放资源
153 //        if(selector!=null){
154 //            try {
155 //                selector.close();
156 //            } catch (Exception e) {
157 //                e.printStackTrace();
158 //            }
159 //        }
160         
161     }
162     
163 
164 }
View Code

 

java的nio之:java的nio的服务器实现模型

标签:

原文地址:http://www.cnblogs.com/shangxiaofei/p/5843284.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!