标签:des style blog http io ar os 使用 java
这篇文章主要和大家分享一下,在我们基础软件升级过程中遇到的经典Netty问题。当然,官方资料也许是一个更好的补充。另外,大家如果对Netty及其Grizzly架构以及源码有疑问的,欢迎交流。后续会为大家奉献我们基于Grizzly和Netty构建的RPC框架的压测分析,希望大家能够喜欢!
好了,言归正传~
Netty小组大概从3.3.0开始,将依赖坐标从
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.10.Final</version>
</dependency> 改成了(Netty作者离开了Jboss公司)<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.3.0.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.23.Final</version>
</dependency> 但请注意,从4开始,Netty团队做了模块依赖的优化,像Grizzly一样,分离出很多功能独立的Package。比方说,你希望使用Netty的buffer组件,只需简单依赖这个包就好了。还是让我们来看下netty-all里面都有哪些依赖吧,如:
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-buffer</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec-socks</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-rxtx</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-sctp</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-udt</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>每个包都代表什么呢?描述如下:
| Artifact ID | Description |
|---|---|
| netty-parent | Maven parent POM |
| netty-common | Utility classes and logging facade |
| netty-buffer | ByteBuf API that replaces java.nio.ByteBuffer |
| netty-transport | Channel API and core transports |
| netty-transport-rxtx | Rxtx transport |
| netty-transport-sctp | SCTP transport |
| netty-transport-udt | UDT transport |
| netty-handler | Useful ChannelHandler implementations |
| netty-codec | Codec framework that helps write an encoder and a decoder |
| netty-codec-http | Codecs related with HTTP, Web Sockets, SPDY, and RTSP |
| netty-codec-socks | Codecs related with SOCKS protocol |
| netty-all | All-in-one JAR that combines all artifacts above |
| netty-tarball | Tarball distribution |
| netty-example | Examples |
| netty-testsuite-* | A collection of integration tests |
| netty-microbench | Microbenchmarks |
通过依赖分析,最终我选择了精简依赖,如下:
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.0.23.Final</version>
</dependency>
</dependencies> <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${project.version}</version>
<classifier>${os.detected.classifier}</classifier>
<scope>compile</scope>
<optional>true</optional>
</dependency>
用户可以指定自己的 EventExecutor来执行特定的 handler。通常情况下,这种EventExecutor是单线程的,当然,如果你指定了多线程的 EventExecutor或者 EventLoop,线程sticky特性会保证,除非出现 deregistration,否则其中的一个线程将一直占用。如果两个handler分别注册了不同的EventExecutor,这时就要注意线程安全问题了。
Netty4的线程模型还是有很多可以优化的地方,比方说目前Eventloop对channel的处理不均等问题,而这些问题都会在Netty 5里面优化掉,感兴趣的朋友可以参看官方Issues
先来看两幅图,第一幅图是Netty3的Channel状态模型,第二附图是Netty4优化过的模型。可以看到,channelOpen,channelBound,和channelConnected 已经被channelActive替代。channelDisconnected,channelUnbound和channelClosed 也被 channelInactive替代。
Netty 3
Netty 4
这里就产生了两个问题:
其一,channelRegistered and channelUnregistered 不等价于 channelOpen and channelClosed,它是Netty4新引入的状态为了实现Channel的dynamic registration, deregistration, and re-registration。
第二, 既然是合并,那原先针对channelOpen的方法如何迁移?简单来做,可以直接迁移到替代方法里面。
ChannelPipeline cp = Channels.pipeline();
ChannelPipeline cp = ch.pipeline();
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception
{
if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e;
switch (cse.getState()) {
case OPEN:
if (Boolean.TRUE.equals(cse.getValue())) {
// connect
channelCount.incrementAndGet();
allChannels.add(e.getChannel());
}
else {
// disconnect
channelCount.decrementAndGet();
allChannels.remove(e.getChannel());
}
break;
case BOUND:
break;
}
}
if (e instanceof UpstreamMessageEvent) {
UpstreamMessageEvent ume = (UpstreamMessageEvent) e;
if (ume.getMessage() instanceof ChannelBuffer) {
ChannelBuffer cb = (ChannelBuffer) ume.getMessage();
int readableBytes = cb.readableBytes();
// compute stats here, bytes read from remote
bytesRead.getAndAdd(readableBytes);
}
}
ctx.sendUpstream(e);
}
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception
{
if (e instanceof DownstreamMessageEvent) {
DownstreamMessageEvent dme = (DownstreamMessageEvent) e;
if (dme.getMessage() instanceof ChannelBuffer) {
ChannelBuffer cb = (ChannelBuffer) dme.getMessage();
int readableBytes = cb.readableBytes();
// compute stats here, bytes written to remote
bytesWritten.getAndAdd(readableBytes);
}
}
ctx.sendDownstream(e);
} ctx.channel().setReadable(false);//Before
ctx.channel().config().setAutoRead(false);//After 2. TCP参数优化// Before:
cfg.setOption("tcpNoDelay", true);
cfg.setOption("tcpNoDelay", 0); // Runtime ClassCastException
cfg.setOption("tcpNoDelays", true); // Typo in the option name - ignored silently
// After:
cfg.setOption(ChannelOption.TCP_NODELAY, true);
cfg.setOption(ChannelOption.TCP_NODELAY, 0); // Compile error3. 单元测试经常用到的CodecEmbedder类已经变名为EmbeddedChannel@Test
public void testMultipleLinesStrippedDelimiters() {
EmbeddedChannel ch = new EmbeddedChannel(new DelimiterBasedFrameDecoder(8192, true,
Delimiters.lineDelimiter()));
ch.writeInbound(Unpooled.copiedBuffer("TestLine\r\ng\r\n", Charset.defaultCharset()));
assertEquals("TestLine", releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
assertEquals("g", releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
assertNull(ch.readInbound());
ch.finish();
}4. 简化的关闭操作,以前我是这么玩stop的 if (serverChannel != null) {
log.info("stopping transport {}:{}",getName(), port);
// first stop accepting
final CountDownLatch latch = new CountDownLatch(1);
serverChannel.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// stop and process remaining in-flight invocations
if (def.getExecutor() instanceof ExecutorService) {
ExecutorService exe = (ExecutorService) getExecutor();
ShutdownUtil.shutdownExecutor(exe, "dispatcher");
}
latch.countDown();
}
});
latch.await();
serverChannel = null;
}
// If the channelFactory was created by us, we should also clean it up. If the
// channelFactory was passed in by Bootstrap, then it may be shared so don't clean it up.
if (channelFactory != null) {
ShutdownUtil.shutdownChannelFactory(channelFactory, bossExecutor, ioWorkerExecutor,allChannels);
}
}
public void stop() throws InterruptedException {
// Wait until the server socket is closed.
channelFuture.channel().closeFuture().syncUninterruptibly();
bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly();
} cp.addLast("idleTimeoutHandler", new IdleStateHandler(getTimer(),
getClientIdleTimeout().toMillis(),
NO_WRITER_IDLE_TIMEOUT,
NO_ALL_IDLE_TIMEOUT,
TimeUnit.MILLISECONDS));
cp.addLast("heartbeatHandler", new HeartbeatHandler()); cp.addLast("idleTimeoutHandler", new IdleStateHandler(
NO_WRITER_IDLE_TIMEOUT, NO_WRITER_IDLE_TIMEOUT,
NO_ALL_IDLE_TIMEOUT, TimeUnit.MILLISECONDS));
cp.addLast("heartbeatHandler", new HeartbeatHandler());
标签:des style blog http io ar os 使用 java
原文地址:http://blog.csdn.net/fengjia10/article/details/40789807