码迷,mamicode.com
首页 > 编程语言 > 详细

javaNIO通信

时间:2018-06-12 13:37:29      阅读:205      评论:0      收藏:0      [点我收藏+]

标签:网速   net   jvm   数据格式   shm   under   icm   ssi   wak   

传统BIO模式

  服务端ServerSocket负责绑定IP地址,启动监听端口。客户端Socket负责发起连接操作,服务端接受到连接请求后为每个客户端创建一个新的线程进行链路处理,连路处理通过输入和输出流进行同步阻塞式通信。

  该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系。可以通过线程池来解决这个问题,使用线程池后,线程池会控制最大的线程数量以及线程的重复使用率。但本质上每个线程在处理访问时并没有大负荷允许,通信过程中每个通信都需要占一个线程,但其实线程使用率并不高。所以更理想的方式是,线程的连接与处理解藕,有一个统一的对象来管理所有连接,请求的处理交给独立的线程处理,这样连接的数量就会不再受限制线程的数量

NIO模式

  服务端ServerSocketChannel负责绑定IP地址,启动监听端口。然后将该ServerSocketChannel注册到一个Selector中。创建一个线程,轮询Selector来获取响应的事件。如果是ACCEPT事件,则有请求访问,获取响应的ServerSocketChannel,然后获取想要的SocketChannel,完成连接。然后再将SocketChannel注册到Selector中。如果是WRITE事件,则获取想要的SocketChannel,通过SocketChannel把一个缓存的数据写入;如果是READ事件,则获取想要的SocketChannel,通过SocketChannel读取数据到一个缓存中。实际上即使把ServerSocketChannel和SocketChannel绑定到Selector,通过Selector单线程去处理请求。

  客户端SocketChannel首先注册到一个Selector中。然后连接服务器。轮询Selector来获取响应的事件,如果是CONNECT事件,则获取想要的SocketChannel,完成连接,然后再将SocketChannel注册到Selector;如果是READ事件,则获取想要的SocketChannel,通过SocketChannel读取数据到一个缓存中;如果是WRITE事件,则获取想要的SocketChannel,通过SocketChannel把一个缓存的数据写入;

断包和粘包问题

  NIO读取和写入都是通过缓存实现的,由于存在缓存池的大小限制和网速的不均匀会造成一次读写的操作放入缓存池中的数据不完整,这就是断包问题。同理,如果一次性读入两个及两个以上的数据,则无法分辨两个数据包的界限问题,也就造成了粘包。总结就是NIO在读取和写入缓存时数据长度是不确定的,我们可以定制一个固定长度的缓存,通过判断缓存是否剩余来控制读取和写入数据的长度。那么缓存的长度定多少合适呢,一般会在读写数据时,定义一个头部数据来指定数据的长度。

NIO体系

  NIO中有三个核心概念。第一个是通道,有点类似与IO中的流的概念;第二个是缓存,NIO中数据都是先存储在缓存中,然后统一输出的或者统一输入到缓存;第三个是选折器,选择器像是一个消息监听器,我们可以把相应的通道的相应事件注册到选择器,然后通过选择器获取相应事件的发生。

  从最容易理解的缓存说起, IO中通信是直接端对端的,数据通信耦合非常大,时间比较浪费。在NIO中,通过缓存解藕了两端,数据首先进入缓存中,然后再将缓存中的数据全部写入输入通道;输出通道接收到数据后,首先将所有数据写入缓存中,然后交由相应程序处理。NIO提供了几种类型缓存:存储byte数据的缓存(ByteBuffer);存储char数据的缓存(CharBuffer);存储double数据的缓存(DoubleBuffer);存储float数据的缓存(FloatBuffer);存储int数据的缓存(IntBuffer);存储long数据的缓存(LongBuffer);存储short数据的缓存(ShortBuffer);文件数据缓存(MappedByteBuffer)不同的的缓存类型,只是存储的数据类型不同,本质都是二进制数据,只是定义的数据格式不同。所以可以通过ByteBuffer来存储各种类型数据,然后使用特定的编码方式读取。缓存内部本质是一个数组,有一个position标志标识当前的读写位置,有一个limit标识读写的范围缓存都是线程不安全的

方法 说明
clear 将position标志设置为0;limit标识设置为容量大小,mark会被丢弃,常用在从管道中读写数据前。
flip 将position标志设置为0;limit标识设置为当前位置,mark会被丢弃,常用在从管道中读写数据后。
rewind 将position标志设置为0;mark会被丢弃,常用在从管道中读写数据后(如果limit已经设置正确)。
mark 将position标志设置mark点。
reset 将position标志设置成之前的mark点。
remaining 返回position到limit的距离
hasRemaining 是否还有剩余

 

方法 说明
allocate 分配指定的空间
wrap 将指定的数组包装成缓存对象,缓存的改变会影响原来的数组
slice 创建新的缓存分片,内容从position到capacity,并且共享这段存储空间。
duplicate 创建新的缓存,共享存储空间。
compact 将position到limit的内容拷贝到缓存最前面,复制后,position = limit -position,limit = capacity
isDirect  是否是直接内存。
put 写入数据
get 读取数据

  通道

按操作对象

  网络通道(SocketChannel、ServerSocketChannel):

  ServerSocketChannel内部本质封装了一个ServerSocket;

  

  

 

 SocketChannel

方法 参数 返回值 说明
       
       
       
       
       
       
       

   

ZooKeeper代码实例:

  服务端通过NIO接收请求,反序列化为相应对象,交给zookeeper服务器进行处理,处理结果写入缓存队列力,然后交给nio发送到客户端。

服务端NIO服务器,通过NIO建立通道(NIOServerCnxn.Factory):

技术分享图片
  1 static public class Factory extends Thread {
  2         static {
  3             //设置全局的异常处理
  4             Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  5                 public void uncaughtException(Thread t, Throwable e) {
  6                     LOG.error("Thread " + t + " died", e);
  7                 }
  8             });
  9             /**
 10              * jvm早期的nullpoint bug
 11              * http://bugs.sun.com/view_bug.do?bug_id=6427854
 12              */
 13             try {
 14                 Selector.open().close();
 15             } catch(IOException ie) {
 16                 LOG.error("Selector failed to open", ie);
 17             }
 18         }
 19         //服务器通道
 20         final ServerSocketChannel ss;
 21         //选择器
 22         final Selector selector = Selector.open();
 23         ZooKeeperServer zks;
 24         
 25         final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
 26         //所有的上下文环境
 27         final HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();
 28         //ip对应的上下文环境
 29         final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
 30             new HashMap<InetAddress, Set<NIOServerCnxn>>( );
 31         int maxClientCnxns = 10;
 32         
 33         public Factory(InetSocketAddress addr, int maxcc) throws IOException {
 34             setDaemon(true);
 35             //单个client链接最大数
 36             maxClientCnxns = maxcc;
 37           //创建服务器通道
 38             this.ss = ServerSocketChannel.open();
 39             ss.socket().setReuseAddress(true);
 40           //绑定端口
 41             ss.socket().bind(addr);
 42           //设置通道为非阻塞通道
 43             ss.configureBlocking(false);
 44           //把通道注册到选择器中
 45             ss.register(selector, SelectionKey.OP_ACCEPT);
 46         }
 47         public void run() {
 48             while (!ss.socket().isClosed()) {
 49                 try {
 50                     //选择一组键
 51                     selector.select(1000);
 52                     Set<SelectionKey> selected;
 53                     synchronized (this) {
 54                         selected = selector.selectedKeys();
 55                     }
 56                     ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
 57                             selected);
 58                     Collections.shuffle(selectedList);
 59                     for (SelectionKey k : selectedList) {
 60                         //如果通道已经准备好接收套接字
 61                         if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
 62                             SocketChannel sc = ((ServerSocketChannel) k
 63                                     .channel()).accept();
 64                             InetAddress ia = sc.socket().getInetAddress();
 65                             //判断最大连接数
 66                             int cnxncount = getClientCnxnCount(ia);
 67                             if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
 68                                 sc.close();
 69                             } else {
 70                                 // 配置为非阻塞
 71                                 sc.configureBlocking(false);
 72                               //把通道注册到选择器中
 73                                 SelectionKey sk = sc.register(selector,
 74                                         SelectionKey.OP_READ);
 75                                 NIOServerCnxn cnxn = createConnection(sc, sk);
 76                                 //给该通道附带一个上下文环境
 77                                 sk.attach(cnxn);
 78                                 addCnxn(cnxn);
 79                             }
 80                         } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
 81                             //通过上线文件来进行读写
 82                             NIOServerCnxn c = (NIOServerCnxn) k.attachment();
 83                             c.doIO(k);
 84                         } else {
 85                             if (LOG.isDebugEnabled()) {
 86                                 LOG.debug("Unexpected ops in select "
 87                                           + k.readyOps());
 88                             }
 89                         }
 90                     }
 91                     selected.clear();
 92                  catch (Exception e) {
 93                     LOG.warn("Ignoring exception", e);
 94                 }
 95             }
 96             clear();
 97         }
 98         //关闭
 99         public void shutdown() {
100                 try {
101                     ss.close();
102                     clear();
103                     this.interrupt();
104                     this.join();
105                 } catch (Exception e) {
106                     LOG.warn("Ignoring unexpected exception during shutdown", e);
107                 }
108                 try {
109                     selector.close();
110                 } catch (IOException e) {
111                     LOG.warn("Selector closing", e);
112                 }
113                 if (zks != null) {
114                     zks.shutdown();
115                 }
116         }
117         synchronized public void clear() {
118             selector.wakeup();
119             HashSet<NIOServerCnxn> cnxns;
120             synchronized (this.cnxns) {
121                 cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
122             }
123             // got to clear all the connections that we have in the selector
124             for (NIOServerCnxn cnxn: cnxns) {
125                 try {
126                     // don‘t hold this.cnxns lock as deadlock may occur
127                     cnxn.close();
128                 } catch (Exception e) {
129                     LOG.warn("Ignoring exception closing cnxn sessionid 0x"
130                             + Long.toHexString(cnxn.sessionId), e);
131                 }
132             }
133         }
134     }
View Code

服务端通道上下文,通过NIO进行读写(NIOServerCnxn.doIO):

技术分享图片
 1 void doIO(SelectionKey k) throws InterruptedException {
 2         try {
 3             //如果通道可以读取数据
 4             if (k.isReadable()) {
 5                 //读取数据到缓存中
 6                 int rc = sock.read(incomingBuffer);
 7                 //如果读满缓存
 8                 if (incomingBuffer.remaining() == 0) {
 9                     boolean isPayload;
10                     //如果第一次读取,则先读取长度内容,相应分配缓存;否则读取指定长度的数据内容
11                     if (incomingBuffer == lenBuffer) {
12                         incomingBuffer.flip();
13                         isPayload = readLength(k);
14                         incomingBuffer.clear();
15                     } else {
16                         isPayload = true;
17                     }
18                     if (isPayload) {
19                         //读取数据,初始化、读取请求数据封装成packet
20                         readPayload();
21                     }
22                 }
23             }
24           //如果通道可以读写数据
25             if (k.isWritable()) {
26                 //缓存中有数据需要写入
27                 if (outgoingBuffers.size() > 0) {
28                     //创建bytebuffer
29                     ByteBuffer directBuffer = factory.directBuffer;
30                     directBuffer.clear();
31                     //从队列中读取数据知道缓存读满
32                     for (ByteBuffer b : outgoingBuffers) {
33                         if (directBuffer.remaining() < b.remaining()) {
34                             b = (ByteBuffer) b.slice().limit(
35                                     directBuffer.remaining());
36                         }
37                         int p = b.position();
38                         directBuffer.put(b);
39                         b.position(p);
40                         if (directBuffer.remaining() == 0) {
41                             break;
42                         }
43                     }
44                     //将数据写入通道
45                     directBuffer.flip();
46                     int sent = sock.write(directBuffer);
47                     ByteBuffer bb;
48                     //从缓存中已经发送的删除数据
49                     while (outgoingBuffers.size() > 0) {
50                         bb = outgoingBuffers.peek();
51                         int left = bb.remaining() - sent;
52                         if (left > 0) {
53                             bb.position(bb.position() + sent);
54                             break;
55                         }
56                         sent -= bb.remaining();
57                         outgoingBuffers.remove();
58                     }
59                 }
60 
61                 synchronized(this.factory){
62                     if (outgoingBuffers.size() == 0) {
63                         sk.interestOps(sk.interestOps()
64                                 & (~SelectionKey.OP_WRITE));
65                     } else {
66                         sk.interestOps(sk.interestOps()
67                                 | SelectionKey.OP_WRITE);
68                     }
69                 }
70             }
71         }catch (IOException e) {
72             close();
73         }
74     }
75     private void readPayload() throws IOException, InterruptedException {
76         if (incomingBuffer.remaining() != 0) { // have we read length bytes?
77             int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
78         }
79         if (incomingBuffer.remaining() == 0) { 
80             //重置缓存
81             incomingBuffer.flip();
82             //如果没有进行初始化,首先要初始化;如果已经链接,则读取请求数据,封装成packet
83             if (!initialized) {
84                 readConnectRequest();
85             } else {
86                 readRequest();
87             }
88           //重置
89             lenBuffer.clear();
90             incomingBuffer = lenBuffer;
91         }
92     }
View Code

服务端通道上下文,处理命令(NIOServerCnxn.readLength)

技术分享图片
 1  private boolean readLength(SelectionKey k) throws IOException {
 2         //如果是请求数据,根据长度分配缓存;如果是命令,执行相应命令。
 3         int len = lenBuffer.getInt();
 4         if (!initialized && checkFourLetterWord(k, len)) {
 5             return false;
 6         }
 7         if (len < 0 || len > BinaryInputArchive.maxBuffer) {
 8             throw new IOException("Len error " + len);
 9         }
10         if (zk == null) {
11             throw new IOException("ZooKeeperServer not running");
12         }
13         incomingBuffer = ByteBuffer.allocate(len);
14         return true;
15     }
16     private boolean checkFourLetterWord(final SelectionKey k, final int len)
17             throws IOException
18             {
19                 //获取命令
20                 String cmd = cmd2String.get(len);
21                 /** cancel the selection key to remove the socket handling
22                  * from selector. This is to prevent netcat problem wherein
23                  * netcat immediately closes the sending side after sending the
24                  * commands and still keeps the receiving channel open. 
25                  * The idea is to remove the selectionkey from the selector
26                  * so that the selector does not notice the closed read on the
27                  * socket channel and keep the socket alive to write the data to
28                  * and makes sure to close the socket after its done writing the data
29                  */
30                 if (k != null) {
31                     try {
32                         k.cancel();
33                     } catch(Exception e) {
34                         LOG.error("Error cancelling command selection key ", e);
35                     }
36                 }
37                 //根据命令类型,执行相应内容
38                 final PrintWriter pwriter = new PrintWriter(
39                         new BufferedWriter(new SendBufferWriter()));
40                 if (len == ruokCmd) {
41                     RuokCommand ruok = new RuokCommand(pwriter);
42                     ruok.start();
43                     return true;
44                 } else if (len == getTraceMaskCmd) {
45                     TraceMaskCommand tmask = new TraceMaskCommand(pwriter);
46                     tmask.start();
47                     return true;
48                 } else if (len == setTraceMaskCmd) {
49                     int rc = sock.read(incomingBuffer);
50                     if (rc < 0) {
51                         throw new IOException("Read error");
52                     }
53 
54                     incomingBuffer.flip();
55                     long traceMask = incomingBuffer.getLong();
56                     ZooTrace.setTextTraceLevel(traceMask);
57                     SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask);
58                     setMask.start();
59                     return true;
60                 } else if (len == enviCmd) {
61                     EnvCommand env = new EnvCommand(pwriter);
62                     env.start();
63                     return true;
64                 } else if (len == confCmd) {
65                     ConfCommand ccmd = new ConfCommand(pwriter);
66                     ccmd.start();
67                     return true;
68                 } else if (len == srstCmd) {
69                     StatResetCommand strst = new StatResetCommand(pwriter);
70                     strst.start();
71                     return true;
72                 } else if (len == crstCmd) {
73                     CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);
74                     crst.start();
75                     return true;
76                 } else if (len == dumpCmd) {
77                     DumpCommand dump = new DumpCommand(pwriter);
78                     dump.start();
79                     return true;
80                 } else if (len == statCmd || len == srvrCmd) {
81                     StatCommand stat = new StatCommand(pwriter, len);
82                     stat.start();
83                     return true;
84                 } else if (len == consCmd) {
85                     ConsCommand cons = new ConsCommand(pwriter);
86                     cons.start();
87                     return true;
88                 } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
89                     WatchCommand wcmd = new WatchCommand(pwriter, len);
90                     wcmd.start();
91                     return true;
92                 }
93                 return false;
94             }
View Code

服务端通道上下文,处理请求数据(NIOServerCnxn.readRequest\NIOServerCnxn.readConnectRequest):

技术分享图片
 1 private void readRequest() throws IOException {
 2         //反序列化请求数据
 3         InputStream bais = new ByteBufferInputStream(incomingBuffer);
 4         BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
 5         RequestHeader h = new RequestHeader();
 6         h.deserialize(bia, "header");
 7         incomingBuffer = incomingBuffer.slice();
 8         if (h.getType() == OpCode.auth) {
 9             //如果是认证请求
10             AuthPacket authPacket = new AuthPacket();
11             ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
12             String scheme = authPacket.getScheme();
13             //进行认证
14             AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
15             if (ap == null
16                     || (ap.handleAuthentication(this, authPacket.getAuth())
17                             != KeeperException.Code.OK)) {
18                 // 认证失败,返回失败内容
19                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
20                         KeeperException.Code.AUTHFAILED.intValue());
21                 sendResponse(rh, null, null);
22                 //关闭链接
23                 sendCloseSession();
24                 disableRecv();
25             } else {
26                 //认证成功,返回成功内容
27                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
28                         KeeperException.Code.OK.intValue());
29                 sendResponse(rh, null, null);
30             }
31             return;
32         } else {
33             //如果是请求,提交到zk处理
34             Request si = new Request(this, sessionId, h.getXid(), h.getType(), incomingBuffer, authInfo);
35             si.setOwner(ServerCnxn.me);
36             zk.submitRequest(si);
37         }
38     }
39     
40     private void readConnectRequest() throws IOException, InterruptedException {
41         //反序列化链接请求对象
42         BinaryInputArchive bia = BinaryInputArchive
43                 .getArchive(new ByteBufferInputStream(incomingBuffer));
44         ConnectRequest connReq = new ConnectRequest();
45         connReq.deserialize(bia, "connect");
46         if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
47             throw new CloseRequestException(msg);
48         }
49         sessionTimeout = connReq.getTimeOut();
50         byte passwd[] = connReq.getPasswd();
51         //初始化session
52         disableRecv();
53         if (connReq.getSessionId() != 0) {
54             long clientSessionId = connReq.getSessionId();
55             factory.closeSessionWithoutWakeup(clientSessionId);
56             setSessionId(clientSessionId);
57             zk.reopenSession(this, sessionId, passwd, sessionTimeout);
58         } else {
59             zk.createSession(this, passwd, sessionTimeout);
60         }
61         initialized = true;
62     }
View Code

 服务端通道上下文,写返回数据(NIOServerCnxn.sendResponse)

技术分享图片
 1 synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
 2         try {
 3             //序列化返回结果,
 4             ByteArrayOutputStream baos = new ByteArrayOutputStream();
 5             BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
 6             try {
 7                 baos.write(fourBytes);
 8                 bos.writeRecord(h, "header");
 9                 if (r != null) {
10                     bos.writeRecord(r, tag);
11                 }
12                 baos.close();
13             } catch (IOException e) {
14                 LOG.error("Error serializing response");
15             }
16             //写入数据长度
17             byte b[] = baos.toByteArray();
18             ByteBuffer bb = ByteBuffer.wrap(b);
19             bb.putInt(b.length - 4).rewind();
20             //
21             sendBuffer(bb);
22          } catch(Exception e) {
23             LOG.warn("Unexpected exception. Destruction averted.", e);
24          }
25     }
26     void sendBuffer(ByteBuffer bb) {
27         try {
28             if (bb != closeConn) {
29                 //直接发送数据
30                 if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
31                     try {
32                         sock.write(bb);
33                     } catch (IOException e) {
34                         // we are just doing best effort right now
35                     }
36                 }
37                 if (bb.remaining() == 0) {
38                     packetSent();
39                     return;
40                 }
41             }
42             //写入缓存中。
43             synchronized(this.factory){
44                 sk.selector().wakeup();
45                 outgoingBuffers.add(bb);
46                 if (sk.isValid()) {
47                     sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
48                 }
49             }
50             
51         } catch(Exception e) {
52             LOG.error("Unexpected Exception: ", e);
53         }
54     }
View Code

  客户端响应处理,处理返回事件(ClientCnxn.EventThread):

技术分享图片
  1     class EventThread extends Thread {
  2         private final LinkedBlockingQueue<Object> waitingEvents =
  3             new LinkedBlockingQueue<Object>();
  4 
  5         /** This is really the queued session state until the event
  6          * thread actually processes the event and hands it to the watcher.
  7          * But for all intents and purposes this is the state.
  8          */
  9         private volatile KeeperState sessionState = KeeperState.Disconnected;
 10 
 11        private volatile boolean wasKilled = false;
 12        private volatile boolean isRunning = false;
 13 
 14       //添加响应事件到队列中
 15        public void queueEvent(WatchedEvent event) {
 16             WatcherSetEventPair pair = new WatcherSetEventPair(
 17                     watcher.materialize(event.getState(), event.getType(),
 18                             event.getPath()),
 19                             event);
 20             // queue the pair (watch set & event) for later processing
 21             waitingEvents.add(pair);
 22         }
 23        //添加数据包到队列中
 24        public void queuePacket(Packet packet) {
 25           if (wasKilled) {
 26              synchronized (waitingEvents) {
 27                 if (isRunning) waitingEvents.add(packet);
 28                 else processEvent(packet);
 29              }
 30           } else {
 31              waitingEvents.add(packet);
 32           }
 33        }
 34        //添加结束事件
 35         public void queueEventOfDeath() {
 36             waitingEvents.add(eventOfDeath);
 37         }
 38         
 39         @Override
 40         public void run() {
 41            try {
 42               while (true) {
 43                  // 获取响应事件
 44                  Object event = waitingEvents.take();
 45                  if (event == eventOfDeath) {
 46                     wasKilled = true;
 47                  } else {
 48                      //处理响应
 49                     processEvent(event);
 50                  }
 51                  if (wasKilled)
 52                     synchronized (waitingEvents) {
 53                        if (waitingEvents.isEmpty()) {
 54                           isRunning = false;
 55                           break;
 56                        }
 57                     }
 58               }
 59            } catch (InterruptedException e) {
 60               LOG.error("Event thread exiting due to interruption", e);
 61            }
 62             LOG.info("EventThread shut down");
 63         }
 64 
 65        private void processEvent(Object event) {
 66           try {
 67               if (event instanceof WatcherSetEventPair) {
 68                   // each watcher will process the event
 69                   WatcherSetEventPair pair = (WatcherSetEventPair) event;
 70                   for (Watcher watcher : pair.watchers) {
 71                       try {
 72                           watcher.process(pair.event);
 73                       } catch (Throwable t) {
 74                           LOG.error("Error while calling watcher ", t);
 75                       }
 76                   }
 77               } else {
 78                   Packet p = (Packet) event;
 79                   int rc = 0;
 80                   String clientPath = p.clientPath;
 81                   if (p.replyHeader.getErr() != 0) {
 82                       rc = p.replyHeader.getErr();
 83                   }
 84                   if (p.cb == null) {
 85                       LOG.warn("Somehow a null cb got to EventThread!");
 86                   } else if (p.response instanceof ExistsResponse
 87                           || p.response instanceof SetDataResponse
 88                           || p.response instanceof SetACLResponse) {
 89                       StatCallback cb = (StatCallback) p.cb;
 90                       if (rc == 0) {
 91                           if (p.response instanceof ExistsResponse) {
 92                               cb.processResult(rc, clientPath, p.ctx,
 93                                       ((ExistsResponse) p.response)
 94                                               .getStat());
 95                           } else if (p.response instanceof SetDataResponse) {
 96                               cb.processResult(rc, clientPath, p.ctx,
 97                                       ((SetDataResponse) p.response)
 98                                               .getStat());
 99                           } else if (p.response instanceof SetACLResponse) {
100                               cb.processResult(rc, clientPath, p.ctx,
101                                       ((SetACLResponse) p.response)
102                                               .getStat());
103                           }
104                       } else {
105                           cb.processResult(rc, clientPath, p.ctx, null);
106                       }
107                   } else if (p.response instanceof GetDataResponse) {
108                       DataCallback cb = (DataCallback) p.cb;
109                       GetDataResponse rsp = (GetDataResponse) p.response;
110                       if (rc == 0) {
111                           cb.processResult(rc, clientPath, p.ctx, rsp
112                                   .getData(), rsp.getStat());
113                       } else {
114                           cb.processResult(rc, clientPath, p.ctx, null,
115                                   null);
116                       }
117                   } else if (p.response instanceof GetACLResponse) {
118                       ACLCallback cb = (ACLCallback) p.cb;
119                       GetACLResponse rsp = (GetACLResponse) p.response;
120                       if (rc == 0) {
121                           cb.processResult(rc, clientPath, p.ctx, rsp
122                                   .getAcl(), rsp.getStat());
123                       } else {
124                           cb.processResult(rc, clientPath, p.ctx, null,
125                                   null);
126                       }
127                   } else if (p.response instanceof GetChildrenResponse) {
128                       ChildrenCallback cb = (ChildrenCallback) p.cb;
129                       GetChildrenResponse rsp = (GetChildrenResponse) p.response;
130                       if (rc == 0) {
131                           cb.processResult(rc, clientPath, p.ctx, rsp
132                                   .getChildren());
133                       } else {
134                           cb.processResult(rc, clientPath, p.ctx, null);
135                       }
136                   } else if (p.response instanceof GetChildren2Response) {
137                       Children2Callback cb = (Children2Callback) p.cb;
138                       GetChildren2Response rsp = (GetChildren2Response) p.response;
139                       if (rc == 0) {
140                           cb.processResult(rc, clientPath, p.ctx, rsp
141                                   .getChildren(), rsp.getStat());
142                       } else {
143                           cb.processResult(rc, clientPath, p.ctx, null, null);
144                       }
145                   } else if (p.response instanceof CreateResponse) {
146                       StringCallback cb = (StringCallback) p.cb;
147                       CreateResponse rsp = (CreateResponse) p.response;
148                       if (rc == 0) {
149                           cb.processResult(rc, clientPath, p.ctx,
150                                   (chrootPath == null
151                                           ? rsp.getPath()
152                                           : rsp.getPath()
153                                     .substring(chrootPath.length())));
154                       } else {
155                           cb.processResult(rc, clientPath, p.ctx, null);
156                       }
157                   } else if (p.cb instanceof VoidCallback) {
158                       VoidCallback cb = (VoidCallback) p.cb;
159                       cb.processResult(rc, clientPath, p.ctx);
160                   }
161               }
162           } catch (Throwable t) {
163               LOG.error("Caught unexpected throwable", t);
164           }
165        }
166     }
View Code

 

javaNIO通信

标签:网速   net   jvm   数据格式   shm   under   icm   ssi   wak   

原文地址:https://www.cnblogs.com/zhangwanhua/p/8528281.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!