标签:roc art ring into 关闭 margin shu bootstra key
本篇文章是对《Netty In Action》一书第十二章"WebSocket"的学习摘记,主要内容为开发一个基于广播的WEB聊天室
请求的 URL 以/ws 结尾时,通过升级握手的机制把该协议升级为 WebSocket,之后客户端发送一个消息,这个消息会被广播到所有其它连接的客户端
当有新的客户端连入时,其它客户端也能得到通知
首先实现该处理 HTTP 请求的组件,当请求的url没有指定的WebSocket连接的后缀时(如后缀/ws),这个组件将提供聊天室网页页面的http response响应
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class
.getProtectionDomain()
.getCodeSource().getLocation();
try {
String path = location.toURI() + "index.html";
path = !path.contains("file:") ? path : path.substring(6);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate index.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
throws Exception {
//(1) 如果请求了 WebSocket 协议升级,则增加引用计数(调用 retain()方法)
// 并将它传递给下一个 ChannelInboundHandler
if (wsUri.equalsIgnoreCase(request.uri())) {
ctx.fireChannelRead(request.retain());
} else {
//(2) 处理 100 Continue 请求以符合 HTTP 1.1 规范
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx);
}
//读取 index.html
RandomAccessFile file = new RandomAccessFile(INDEX, "r");
HttpResponse response = new DefaultHttpResponse(
request.protocolVersion(), HttpResponseStatus.OK);
response.headers().set(
HttpHeaderNames.CONTENT_TYPE,"text/html; charset=UTF-8");
boolean keepAlive = HttpUtil.isKeepAlive(request);
//如果请求了keep-alive,则添加所需要的 HTTP 头信息
if (keepAlive) {
response.headers().set(
HttpHeaderNames.CONTENT_LENGTH,
file.length());
response.headers().set(
HttpHeaderNames.CONNECTION,
HttpHeaderValues.KEEP_ALIVE);
}
//(3) 将 HttpResponse 写到客户端
// 只是设置了响应的头信息,因此没有进行flush冲刷
ctx.write(response);
//(4) 将 index.html 写到客户端,也不进行flush冲刷
if (ctx.pipeline().get(SslHandler.class) == null) {
// 如果不需要进行加密,则利用零拷贝特性达到最佳效率
ctx.write(new DefaultFileRegion(
file.getChannel(), 0, file.length()));
} else {
ctx.write(new ChunkedNioFile(
file.getChannel()));
}
//(5) 写 LastHttpContent 标记响应的结束并冲刷至客户端
ChannelFuture future = ctx.writeAndFlush(
LastHttpContent.EMPTY_LAST_CONTENT);
//(6)如果没有请求keep-alive,则在写操作完成后关闭 Channel
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
由 IETF 发布的 WebSocket RFC,定义了 6 种帧,Netty 为它们每种都提供了一个 POJO 实现
TextWebSocketFrame 是我们唯一真正需要处理的帧类型,下面展示了处理代码
public class TextWebSocketFrameHandler
extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ChannelGroup group;
public TextWebSocketFrameHandler(ChannelGroup group) {
this.group = group;
}
//重写 userEventTriggered()方法以处理自定义事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
//如果该事件表示握手成功,则从该 ChannelPipeline 中移除HttpRequest-Handler
//因为将不会接收到任何HTTP消息了
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
ctx.pipeline().remove(HttpRequestHandler.class);
//(1) 通知所有已经连接的WebSocket 客户端新的客户端已经连接上了
group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
//(2) 将新的 WebSocket Channel 添加到 ChannelGroup 中,以便它可以接收到所有的消息
group.add(ctx.channel());
System.out.println("a new channel added to group");
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, final TextWebSocketFrame msg)
throws Exception {
//(3) 增加消息的引用计数,并将它写到 ChannelGroup中所有已经连接的客户端
group.writeAndFlush(msg.retain());
}
}
To know once a handshake was done you can intercept the ChannelInboundHandler.userEventTriggered(ChannelHandlerContext, Object) and check if the event was instance of WebSocketServerProtocolHandler.HandshakeComplete, the event will contain extra information about the handshake such as the request and selected subprotocol.
A thread-safe Set that contains open Channels and provides various bulk operations on them. Using ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state basis.) A closed Channel is automatically removed from the collection, so that you don‘t need to worry about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.
将所有需要的ChannelHandler添加到ChannelPipeline
public class ChatServerInitializer extends ChannelInitializer<Channel> {
private final ChannelGroup group;
public ChatServerInitializer(ChannelGroup group) {
this.group = group;
}
@Override
//将所有需要的ChannelHandler 添加到 ChannelPipeline 中
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler(group));
}
}
各个ChannelHandler的作用如下
这里有一个我们从未接触过的ChannelHandler —— WebSocketServerProtocolHandler,它能够帮我们处理如"升级握手",以及Close、Ping、Pong三种控制帧等繁重的工作,Text和Binary两种数据帧会被发送到下一个ChannelHandler,能够方便我们将工作重点落在实际的数据处理上
This handler does all the heavy lifting for you to run a websocket server. It takes care of websocket handshaking as well as processing of control frames (Close, Ping, Pong). Text and Binary data frames are passed to the next handler in the pipeline (implemented by you) for processing.
WebSocket 协议升级之前的 ChannelPipeline 的状态如图所示。这代表了刚刚被 ChatServerInitializer初始化之后的ChannelPipeline
当 WebSocket 协议升级完成之后,WebSocketServerProtocolHandler 将会把 Http- RequestDecoder 替换为 WebSocketFrameDecoder,把 HttpResponseEncoder 替换为 WebSocketFrameEncoder。为了性能最大化,我们移除了不再被 WebSocket 连接所需要的 HttpRequestHandler
将各组件组合到一起
public class ChatServer {
//创建 DefaultChannelGroup,其将保存所有已经连接的 WebSocket Channel
private final ChannelGroup channelGroup =
new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workGroup = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture start(InetSocketAddress address) {
//引导服务器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(createInitializer(channelGroup));
ChannelFuture future = bootstrap.bind(address);
future.syncUninterruptibly();
channel = future.channel();
return future;
}
//创建 ChatServerInitializer
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
return new ChatServerInitializer(group);
}
//处理服务器关闭,并释放所有的资源
public void destroy() {
if (channel != null) {
channel.close();
}
channelGroup.close();
bossGroup.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Please give port as argument");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
final ChatServer endpoint = new ChatServer();
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}
当请求的URI不是以/ws结尾时,返回index.html页面内容,可见页面内容的长度为3985字节
下面是聊天功能的展示
我们需要将SslHandler添加到ChannelPipeline的首部
public class SecureChatServerInitializer extends ChatServerInitializer {
private final SslContext context;
public SecureChatServerInitializer(ChannelGroup group, SslContext context) {
super(group);
this.context = context;
}
@Override
protected void initChannel(Channel ch) throws Exception {
//调用父类的 initChannel() 方法
super.initChannel(ch);
SSLEngine engine = context.newEngine(ch.alloc());
engine.setUseClientMode(false);
//将 SslHandler 添加到 ChannelPipeline 中
ch.pipeline().addFirst(new SslHandler(engine));
}
}
"引导"处的代码也要做相应调整
public class SecureChatServer extends ChatServer {
private final SslContext context;
public SecureChatServer(SslContext context) {
this.context = context;
}
@Override
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
//返回之前创建的 SecureChatServerInitializer 以启用加密
return new SecureChatServerInitializer(group, context);
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Please give port as argument");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContext context = SslContextBuilder.forServer(
cert.certificate(), cert.privateKey()).build();
final SecureChatServer endpoint = new SecureChatServer(context);
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}
注意要用HTTPS连接进行测试
标签:roc art ring into 关闭 margin shu bootstra key
原文地址:https://www.cnblogs.com/kuluo/p/12687942.html