标签:
最近在总结Dubbo关于Netty通信方面的实现,于是也就借此机会深入体会了一下Netty。一般启动Netty的Server端时都会设置两个ExecutorService对象,我们都习惯用boss,worker两个变量来引用这两个对象,于是从我一开始接触Netty就有了boss和worker的概念。这篇博客将对boss和worker进行介绍,但并不是涉及Netty其他部分介绍。
在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。这就是Netty里面Boss和worker之间的关系。下面看看Netty是如何让Boss和Worker进行协助的。
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
上面这段代码是Dubbo用来开启服务的,也是大部分使用Netty进行服务端开发常用的方式启动服务端。首先是设置boss和worker的线程池,以能够让它们在各自的线程池里面异步执行。当调用bootstrap.bind(getBindAddress())
的时候最终受理绑定操作的是NioServerSocketPipelineSink
的eventSunk
方法,看类名和方法签名就应该知道是处理IO事件的。方法eventSunk
实现如下:
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}
由于这个时候Server还处于bind阶段,所以channel肯定不是NioSocketChannel
,于是就到了方法handleServerSocket
里面,最后将会调用bind
方法来绑定某个端口启动服务。下面是bind
方法实现:
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean bossStarted = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(
bossExecutor,
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ‘)‘));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
}
}
可以看到socket的绑定以及设置异步的future成功,已通知服务启动成功,同时将绑定成功事件通知出去。接下来我看的重点来了,就是bossExecutor,可以看到它是通过NioServerSocketChannelFactory
里面去获取的,NioServerSocketChannelFactory
里面的boss就是之前我们设置进去的,可以确定我们之前设置boss的异步线程池是在这里被使用了。紧接下来的是启动我们的异步线程池,到这里进入了Boss该做的事情,Boss其实是实现了Runnable接口,从而可以交给boss的线程池运行,接下来的关注点就是Boss的run
方法,这里才是Boss做事情的地方。再此之前先看看Boss初始化做了什么事情:
Boss(NioServerSocketChannel channel) throws IOException {
this.channel = channel;
selector = Selector.open();
boolean registered = false;
try {
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered = true;
} finally {
if (!registered) {
closeSelector();
}
}
channel.selector = selector;
}
Boss初始化过程中其实就是将serversocket注册到一个selector里面,从而可以实现NIO的异步IO处理。
public void run() {
final Thread currentThread = Thread.currentThread();
channel.shutdownLock.lock();
try {
for (;;) {
try {
if (selector.select(1000) > 0) {
selector.selectedKeys().clear();
}
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket, currentThread);
}
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
break;
} catch (Throwable e) {
logger.warn(
"Failed to accept a connection.", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
} finally {
channel.shutdownLock.unlock();
closeSelector();
}
}
在run
方法里面是一个死循环,里面在不间断的等待客户端的连接,如果有客户端的连接,那么将会调用方法registerAcceptedChannel
进行后续的处理。
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
try {
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker();
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
logger.warn(
"Failed to initialize an accepted socket.", e);
try {
acceptedSocket.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
方法registerAcceptedChannel
就是将客户端的channle分配给一个worker,而这个worker是通过方法nextWorker
获取
NioWorker nextWorker() {
return workers[Math.abs(
workerIndex.getAndIncrement() % workers.length)];
}
可以看到方法nextWorker
是一个让worker里面的客户端channel保持平衡的作用,可能你会疑问这个workers是哪里来的,其实是在上面初始化NioServerSocketChannelFactory
的时候,NioServerSocketChannelFactory
再去初始化NioServerSocketPipelineSink
时候构造出来的,默认情况下workers的数量是我们初始化NioServerSocketChannelFactory
设置进去的。可以看到是调用worker的register
方法将客户端的channel注册到worker里面的。
void register(NioSocketChannel channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
Runnable registerTask = new RegisterTask(channel, future, server);
Selector selector;
synchronized (startStopLock) {
if (!started) {
.....
this.selector = selector = Selector.open();
.....
DeadLockProofWorker.start(
executor, new ThreadRenamingRunnable(this, threadName));
success = true;
.....
} else {
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
上面对worker有一个started状态的检测,如果没启动,则启动worker,这个额一般都是将第一个客户端的channel注册到worker里面才进行的。由于worker也是实现了Rannable接口,所以启动的主要工作就是让worker在某个线程里面跑起来,并且为这个worker分配一个selector,用来进行监控IO事件。下面便是这个过程实现:
DeadLockProofWorker.start(
executor, new ThreadRenamingRunnable(this, threadName));
success = true;
其中的executor
便是我们一开始设置的workerExecutor。
worker启动成功之后,接下来要做的便是让worker管理器客户端的channel
Runnable registerTask = new RegisterTask(channel, future, server);
.......
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
worker是将客户端包装成一个RegisterTask,然后放入队列,可见RegisterTask也实现了Runnable接口。那放入队列以后谁去取这个队列里面的数据呢?当然,肯定是worker去取。上面介绍启动worker的时候是让worker在某个线程里面跑起来,并且worker是实现了Rannable方法,于是运行worker的线程肯定是调用worker的run方法。
public void run() {
thread = Thread.currentThread();
boolean shutdown = false;
Selector selector = this.selector;
for (;;) {
.....
try {
SelectorUtil.select(selector);
.....
cancelledKeys = 0;
processRegisterTaskQueue();
processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys());
.....
} catch (Throwable t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
可以看到run
方法里面也是一个死循环,在不断的轮询调用selector的select IO的事件。接下来会调用三个方法processRegisterTaskQueue
,processWriteTaskQueue
和processSelectedKeys
。通过方法签名就应该知道这个三个方法具体是做什么事情的,第一个是处理上面registerTaskQueue
的,并且queue里面对象的run
方法,而第二个processWriteTaskQueue
是处理写任务的,而processSelectedKeys
是处理selector匹配的IO事件。我们先看看registerTaskQueue
是做了什么?
private void processRegisterTaskQueue() throws IOException {
for (;;) {
final Runnable task = registerTaskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
上面介绍过registerTaskQueue
里面的元素是RegisterTask
。所以需要去看看RegisterTask
的run方法实现,其中RegisterTask
是NioWorker
里面的内部类,所以RegisterTask
是可以访问NioWorker
的元素信息。
public void run() {
SocketAddress localAddress = channel.getLocalAddress();
SocketAddress remoteAddress = channel.getRemoteAddress();
if (localAddress == null || remoteAddress == null) {
if (future != null) {
future.setFailure(new ClosedChannelException());
}
close(channel, succeededFuture(channel));
return;
}
try {
if (server) {
channel.socket.configureBlocking(false);
}
synchronized (channel.interestOpsLock) {
channel.socket.register(
selector, channel.getRawInterestOps(), channel);
}
if (future != null) {
channel.setConnected();
future.setSuccess();
}
} catch (IOException e) {
if (future != null) {
future.setFailure(e);
}
close(channel, succeededFuture(channel));
....
}
if (!server) {
if (!((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
fireChannelConnected(channel, remoteAddress);
}
}
可以看到这里面主要做的事情是将Boss分配给worker的客户端channel和worker的selector关联上,从而worker可以处理该客户端channel的IO事件。
到这里就完成了由Boss接收到一个客户端连接,到分配给某个worker,以及worker是怎么去和客户端的channel关联的,其中由于worker有可能为多个客户端channel服务,所以worker并不会直接和某个channel产生引用,而是将客户端的channel注册在该worker的selector上面,worker的run方法里面通过不断对selector的select轮询,以达到对channel进行处理。接下来看看worker怎么处理selector的io事件的
<!--java:lang-->
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
}
if (cleanUpCancelledKeys()) {
break; // break the loop to avoid ConcurrentModificationException
}
}
}
上面的方法完成的是处理selector产生的io事件,其中如果当前IO时间是读,那么将SelectionKey中的channel流进行读出,并且向上交给Netty的Handler。如果是当前某个channel的写满足条件,则触发writeFromSelectorLoop
查看是否有待写出的内容。
对于写数据Netty在worker提供了三种入口
void writeFromUserCode(final NioSocketChannel channel) {
if (!channel.isConnected()) {
cleanUpWriteBuffer(channel);
return;
}
if (scheduleWriteIfNecessary(channel)) {
return;
}
if (channel.writeSuspended) {
return;
}
if (channel.inWriteNowLoop) {
return;
}
write0(channel);
}
void writeFromTaskLoop(final NioSocketChannel ch) {
if (!ch.writeSuspended) {
write0(ch);
}
}
void writeFromSelectorLoop(final SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment();
ch.writeSuspended = false;
write0(ch);
}
其中writeFromUserCode
是提供外部直接写出的,writeFromTaskLoop
是在worker的run方法调用processWriteTaskQueue
时候会触发。
标签:
原文地址:http://my.oschina.net/bieber/blog/406799