码迷,mamicode.com
首页 > 编程语言 > 详细

Java NIO框架Netty教程(五) 消息收发次数不匹配的问题

时间:2017-10-11 00:49:22      阅读:239      评论:0      收藏:0      [点我收藏+]

标签:let   ram   tor   att   break   user   ogre   使用   部分   

本来周末是最好的学习时间,不过这周末收房子,可想而知事情自然也不会少。这段时间的周末,可能很少有时间学习了。见缝插针吧。

不说废话了,好好学习。上回通过代码理解了Netty底层信息的流的传递机制,不过只是一个感性上的认识。教会你应该如何使用和使用的时候应该注意的方面。但是有一些细节的问题,并没有提及。比如在上回(《Java NIO框架Netty教程(四)- ChannelBuffer》http://www.it165.net/pro/html/201207/3198.html)的代码里,我们通过:

 

1.private void sendMessageByFrame(ChannelStateEvent e) {
2.String msgOne = "Hello, ";
3.String msgTwo = "I‘m ";
4.String msgThree = "client.";
5.e.getChannel().write(tranStr2Buffer(msgOne));
6.e.getChannel().write(tranStr2Buffer(msgTwo));
7.e.getChannel().write(tranStr2Buffer(msgThree));
8.}

 

这样的方式,连续返送三次消息。但是如果你在服务端进行接收计数的话,你会发现,大部分时候都是接收到两次的事件请求。不过消息都是完整的。网上也有人提到过,进行10000次的连续放松,往往接受到的消息个数是999X的,总是就是消息数目上不匹配,这又是为何呢?笔者也只能通过阅读Netty的源码来找原因,我们一起来慢慢分析吧www.it165.net

起点自然是选择在e.getChannel().writer()方法上。一路跟踪首先来到了:AbstractNioWorker.java类

 

001.protected void write0(AbstractNioChannel<?> channel) {
002.boolean open = true;
003.boolean addOpWrite = false;
004.boolean removeOpWrite = false;
005.boolean iothread = isIoThread(channel);
006. 
007.long writtenBytes = 0;
008. 
009.final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
010.final WritableByteChannel ch = channel.channel;
011.final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
012.final int writeSpinCount = channel.getConfig().getWriteSpinCount();
013.synchronized (channel.writeLock) {
014.channel.inWriteNowLoop = true;
015.for (;;) {
016.MessageEvent evt = channel.currentWriteEvent;
017.SendBuffer buf;
018.if (evt == null) {
019.if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
020.removeOpWrite = true;
021.channel.writeSuspended = false;
022.break;
023.}
024. 
025.channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
026.else {
027.buf = channel.currentWriteBuffer;
028.}
029. 
030.ChannelFuture future = evt.getFuture();
031.try {
032.long localWrittenBytes = 0;
033.for (int i = writeSpinCount; i > 0; i --) {
034.localWrittenBytes = buf.transferTo(ch);
035.if (localWrittenBytes != 0) {
036.writtenBytes += localWrittenBytes;
037.break;
038.}
039.if (buf.finished()) {
040.break;
041.}
042.}
043. 
044.if (buf.finished()) {
045.// Successful write - proceed to the next message.
046.buf.release();
047.channel.currentWriteEvent = null;
048.channel.currentWriteBuffer = null;
049.evt = null;
050.buf = null;
051.future.setSuccess();
052.else {
053.// Not written fully - perhaps the kernel buffer is full.
054.addOpWrite = true;
055.channel.writeSuspended = true;
056. 
057.if (localWrittenBytes > 0) {
058.// Notify progress listeners if necessary.
059.future.setProgress(
060.localWrittenBytes,
061.buf.writtenBytes(), buf.totalBytes());
062.}
063.break;
064.}
065.catch (AsynchronousCloseException e) {
066.// Doesn‘t need a user attention - ignore.
067.catch (Throwable t) {
068.if (buf != null) {
069.buf.release();
070.}
071.channel.currentWriteEvent = null;
072.channel.currentWriteBuffer = null;
073.buf = null;
074.evt = null;
075.future.setFailure(t);
076.if (iothread) {
077.fireExceptionCaught(channel, t);
078.else {
079.fireExceptionCaughtLater(channel, t);
080.}
081.if (t instanceof IOException) {
082.open = false;
083.close(channel, succeededFuture(channel));
084.}
085.}
086.}
087.channel.inWriteNowLoop = false;
088. 
089.// Initially, the following block was executed after releasing
090.// the writeLock, but there was a race condition, and it has to be
091.// executed before releasing the writeLock:
092.//
094.//
095.if (open) {
096.if (addOpWrite) {
097.setOpWrite(channel);
098.else if (removeOpWrite) {
099.clearOpWrite(channel);
100.}
101.}
102.}
103.if (iothread) {
104.fireWriteComplete(channel, writtenBytes);
105.else {
106.fireWriteCompleteLater(channel, writtenBytes);
107.}
108.}

 

这里, buf.transferTo(ch);的就是调用底层WritableByteChannel的write方法,把buffer写到管道里,传递过去。通过Debug可以看到,没调用一次这个方法,服务端的messageReceived方法就会进入断点一次。当然这个也只是表相,或者说也是在预料之内的。因为笔者从开始就怀疑是连续写入过快导致的问题,所以测试过每次write后停顿1秒。再write下一次。结果一切正常。

 
那么我们跟到这里的意义何在呢?笔者的思路是先证明不是在write端出现的写覆盖的问题,这样就可以从read端寻找问题。这里笔者也在这里加入了一个计数,测试究竟transferTo了几次。结果确实是3次。

 

1.for (int i = writeSpinCount; i > 0; i --) {
2.localWrittenBytes = buf.transferTo(ch);
3.System.out.println(++count);


接下来就从接收端找找原因,在NioWorker的read方法,实现如下:

01.@Override
02.protected boolean read(SelectionKey k) {
03.final SocketChannel ch = (SocketChannel) k.channel();
04.final NioSocketChannel channel = (NioSocketChannel) k.attachment();
05. 
06.final ReceiveBufferSizePredictor predictor =
07.channel.getConfig().getReceiveBufferSizePredictor();
08.final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
09. 
10.int ret = 0;
11.int readBytes = 0;
12.boolean failure = true;
13. 
14.ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
15.try {
16.while ((ret = ch.read(bb)) > 0) {
17.readBytes += ret;
18.if (!bb.hasRemaining()) {
19.break;
20.}
21.}
22.failure = false;
23.catch (ClosedChannelException e) {
24.// Can happen, and does not need a user attention.
25.catch (Throwable t) {
26.fireExceptionCaught(channel, t);
27.}
28. 
29.if (readBytes > 0) {
30.bb.flip();
31. 
32.final ChannelBufferFactory bufferFactory =
33.channel.getConfig().getBufferFactory();
34.final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
35.buffer.setBytes(0, bb);
36.buffer.writerIndex(readBytes);
37. 
38.recvBufferPool.release(bb);
39. 
40.// Update the predictor.
41.predictor.previousReceiveBufferSize(readBytes);
42. 
43.// Fire the event.
44.fireMessageReceived(channel, buffer);
45.else {
46.recvBufferPool.release(bb);
47.}
48. 
49.if (ret < 0 || failure) {
50.k.cancel(); // Some JDK implementations run into an infinite loop without this.
51.close(channel, succeededFuture(channel));
52.return false;
53.}
54. 
55.return true;
56.}

 

在这个方法的外层是一个循环,不停的遍历,如果有SelectionKey k存在,则进入此方法读取buffer中的数据。这个SelectionKey 区分只是一种类型,这个设计到Java NIO中的Seletor机制,这个笔者准备下讲穿插一下。属于Netty底层的一个重要的机制。

messageReceived事件的触发,是在读取完当前缓冲池中所有的信息之后在触发的。这倒是可以解释,为什么即使我们收到事件的次数少,但是消息是完整的。
 
从目前来看,Netty通过Java 的NIO机制传递数据,数据读写跟事件没有严格的绑定机制。数据是以流的形式独立存在,读写都有一个缓冲池。
 
不过,这些还远未解决笔者的疑惑。笔者决定先了解一下Seletor机制,再回头来探索这个问题。
 
待解决……

Java NIO框架Netty教程(五) 消息收发次数不匹配的问题

标签:let   ram   tor   att   break   user   ogre   使用   部分   

原文地址:http://www.cnblogs.com/hashcoder/p/7648407.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!