标签:java asynchronous socket thread 异步
我们通过nio学习了Reactor模式,但是在java7中又出现了NIO.2,新的异步框架出来了,在上节中的服务端视线中看不到Reactor的影子了,但是Netty in action中写到:But notice that NIO.2 handles threading and the creation of the so-called event loop for you.所以模式还是没变,只是封装了而已!那让我们来分解下AIO(NIO.2)的封装吧!
首先看下AsynchronousServerSocketChannel类结构图:
AsynchronousSocketChannel的类结构图:
不同的jdk版本会有不同的实现,这里用的是windows的实现,只有一个类,所以跟踪源码不是很麻烦。
这个类的官方解释是:A grouping of asynchronous channels for the purpose of resource sharing(实现异步通道的资源共享).
An asynchronous channel group encapsulates the mechanics required to handle the completion of I/O operations initiated byasynchronous channels that are bound to the group(通道组封装了加入组中的那些通道的IO操作).A
group has an associated thread pool to which tasks are submitted to handle I/O events and dispatch tocompletion-handlers that consume the result of asynchronous operations performed on channels in
the group. In addition to handling I/O events, the pooled threads may also execute other tasks required to support the execution of asynchronous I/O operations.(每个组都有一个线程池,这个线程池可以运行IO事件的分发和Handler的业务处理,如果没有显式的创建通道组,那么 在创建通道的时候没有指定group参数的时候会用jvm默认的通道组)
因为有了默认的通道组,所以如果没有显式制定的话,都会用系统默认的那一套,这对于所有的通道都一样。
/**
* Opens an asynchronous server-socket channel.
*
* <p> The new channel is created by invoking the {@link
* java.nio.channels.spi.AsynchronousChannelProvider#openAsynchronousServerSocketChannel
* openAsynchronousServerSocketChannel} method on the {@link
* java.nio.channels.spi.AsynchronousChannelProvider} object that created
* the given group. If the group parameter is <tt>null</tt> then the
* resulting channel is created by the system-wide default provider, and
* bound to the <em>default group</em>.
*
* @param group
* The group to which the newly constructed channel should be bound,
* or <tt>null</tt> for the default group
*
* @return A new asynchronous server socket channel
*
* @throws ShutdownChannelGroupException
* If the channel group is shutdown
* @throws IOException
* If an I/O error occurs
*/
public static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group)
throws IOException
{
AsynchronousChannelProvider provider = (group == null) ?
AsynchronousChannelProvider.provider() : group.provider();
return provider.openAsynchronousServerSocketChannel(group);
}
/**
* Opens an asynchronous server-socket channel.
*
* <p> This method returns an asynchronous server socket channel that is
* bound to the <em>default group</em>. This method is equivalent to evaluating
* the expression:
* <blockquote><pre>
* open((AsynchronousChannelGroup)null);
* </pre></blockquote>
*
* @return A new asynchronous server socket channel
*
* @throws IOException
* If an I/O error occurs
*/
public static AsynchronousServerSocketChannel open()
throws IOException
{
return open(null);
}看下解释:
When a new connection is accepted then the resulting AsynchronousSocketChannel will be bound to the sameAsynchronousChannelGroup as this channel.
所以当一切都是以AsynchronousServerSocketChannel为开头的时候,所有他创建的AsynchronousSocketChannel都是和他一个group的所以他们共享一个线程池。
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel
.open();
InetSocketAddress address = new InetSocketAddress(port);
serverChannel.bind(address);
final CountDownLatch latch = new CountDownLatch(1);
serverChannel.accept(null,
new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(
final AsynchronousSocketChannel channel,
Object attachment) {
serverChannel.accept(null, this);
ByteBuffer buffer = ByteBuffer.allocate(100);
channel.read(buffer, buffer, new EchoCompletionHandler(
channel));
}
@Override
public void failed(Throwable throwable, Object attachment) {
try {
serverChannel.close();
} catch (IOException e) {
// ingnore on close
} finally {
latch.countDown();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}Future<AsynchronousSocketChannel> implAccept(Object attachment,
final CompletionHandler<AsynchronousSocketChannel,Object> handler)
{
if (!isOpen()) {
Throwable exc = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(exc);
Invoker.invokeIndirectly(this, handler, attachment, null, exc);
return null;
}
if (isAcceptKilled())
throw new RuntimeException("Accept not allowed due to cancellation");
// ensure channel is bound to local address
if (localAddress == null)
throw new NotYetBoundException();
// create the socket that will be accepted. The creation of the socket
// is enclosed by a begin/end for the listener socket to ensure that
// we check that the listener is open and also to prevent the I/O
// port from being closed as the new socket is registered.
WindowsAsynchronousSocketChannelImpl ch = null;
IOException ioe = null;
try {
begin();
ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);
} catch (IOException x) {
ioe = x;
} finally {
end();
}
if (ioe != null) {
if (handler == null)
return CompletedFuture.withFailure(ioe);
Invoker.invokeIndirectly(this, handler, attachment, null, ioe);
return null;
}
// need calling context when there is security manager as
// permission check may be done in a different thread without
// any application call frames on the stack
AccessControlContext acc = (System.getSecurityManager() == null) ?
null : AccessController.getContext();
PendingFuture<AsynchronousSocketChannel,Object> result =
new PendingFuture<AsynchronousSocketChannel,Object>(this, handler, attachment);
AcceptTask task = new AcceptTask(ch, acc, result);
result.setContext(task);
// check and set flag to prevent concurrent accepting
if (!accepting.compareAndSet(false, true))
throw new AcceptPendingException();
// initiate I/O
if (Iocp.supportsThreadAgnosticIo()) {
task.run();
} else {
Invoker.invokeOnThreadInThreadPool(this, task);
}
return result;
}
/**
* Invokes the given task on the thread pool associated with the given
* channel. If the current thread is in the thread pool then the task is
* invoked directly.
*/
static void invokeOnThreadInThreadPool(Groupable channel,
Runnable task)
{
boolean invokeDirect;
GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
AsynchronousChannelGroupImpl targetGroup = channel.group();
if (thisGroupAndInvokeCount == null) {
invokeDirect = false;
} else {
invokeDirect = (thisGroupAndInvokeCount.group == targetGroup);
}
try {
if (invokeDirect) {
task.run();
} else {
targetGroup.executeOnPooledThread(task);
}
} catch (RejectedExecutionException ree) {
throw new ShutdownChannelGroupException();
}
}/**
* Initiates the accept operation.
*/
@Override
public void run() {
long overlapped = 0L;
try {
// begin usage of listener socket
begin();
try {
// begin usage of child socket (as it is registered with
// completion port and so may be closed in the event that
// the group is forcefully closed).
channel.begin();
synchronized (result) {
overlapped = ioCache.add(result);
int n = accept0(handle, channel.handle(), overlapped, dataBuffer);
if (n == IOStatus.UNAVAILABLE) {
return;
}
// connection accepted immediately
finishAccept();
// allow another accept before the result is set
enableAccept();
result.setResult(channel);
}
} finally {
// end usage on child socket
channel.end();
}
} catch (Throwable x) {
// failed to initiate accept so release resources
if (overlapped != 0L)
ioCache.remove(overlapped);
closeChildChannel();
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
if (!(x instanceof IOException) && !(x instanceof SecurityException))
x = new IOException(x);
enableAccept();
result.setFailure(x);
} finally {
// end of usage of listener socket
end();
}
// accept completed immediately but may not have executed on
// initiating thread in which case the operation may have been
// cancelled.
if (result.isCancelled()) {
closeChildChannel();
}
// invoke completion handler
Invoker.invokeIndirectly(result);
}1 . invokeIndirectly
/**
* Invoke handler with completed result. The handler is invoked indirectly,
* via the channel group's thread pool.
*/
static <V,A> void invokeIndirectly(PendingFuture<V,A> future) {
assert future.isDone();
CompletionHandler<V,? super A> handler = future.handler();
if (handler != null) {
invokeIndirectly(future.channel(),
handler,
future.attachment(),
future.value(),
future.exception());
}
}2. invokeUnchecked
/**
* Invokes the handler indirectly via the channel group's thread pool.
*/
static <V,A> void invokeIndirectly(AsynchronousChannel channel,
final CompletionHandler<V,? super A> handler,
final A attachment,
final V result,
final Throwable exc)
{
try {
((Groupable)channel).group().executeOnPooledThread(new Runnable() {
public void run() {
GroupAndInvokeCount thisGroupAndInvokeCount =
myGroupAndInvokeCount.get();
if (thisGroupAndInvokeCount != null)
thisGroupAndInvokeCount.setInvokeCount(1);
invokeUnchecked(handler, attachment, result, exc);
}
});
} catch (RejectedExecutionException ree) {
throw new ShutdownChannelGroupException();
}
} /**
* Invoke handler without checking the thread identity or number of handlers
* on the thread stack.
*/
static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler,
A attachment,
V value,
Throwable exc)
{
if (exc == null) {
handler.completed(value, attachment);
} else {
handler.failed(exc, attachment);
}
// clear interrupt
Thread.interrupted();
}所以封装毕竟是封装啊,但是可以看到很多资源的公用啊,做的优化~
Nio学习5——对NIO.2(AIO) Reactor模式封装的拆解,布布扣,bubuko.com
Nio学习5——对NIO.2(AIO) Reactor模式封装的拆解
标签:java asynchronous socket thread 异步
原文地址:http://blog.csdn.net/working_brain/article/details/27714003