标签:ace connect channel str 定义 toc extend oid oop
package test; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * netty服务器启动类 * @author songyan * */ public class HttpProxyServer { public static void main(String[] args) throws Exception { int LOCAL_PORT = (args.length > 0) ? Integer.parseInt(args[0]) : 5688;// 代理的端口号 System.out.println("Proxying on port " + LOCAL_PORT); // 主从线程组模型 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建核心类 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // 添加助手类 .childHandler(new ServerInitialzer()).bind(LOCAL_PORT).sync().channel().closeFuture().sync(); } finally { // 关闭主从线程 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package test; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; /** * * @author songyan * 通用的初始化类 */ public class ServerInitialzer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //netty是基于http的,所以要添加http编码器 pipeline.addLast(new HttpServerCodec()); //对写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //设置单次请求的文件大小上限 pipeline.addLast(new HttpObjectAggregator(1024*1024*10)); //websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); //自定义的路由 pipeline.addLast(new HttpHandler()); } }
package test; import java.time.LocalDateTime; import java.util.regex.Matcher; import java.util.regex.Pattern; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; /** * 自定义的路由 既可以实现http又可以实现socket * * @author songyan * */ public class HttpHandler extends SimpleChannelInboundHandler<Object> { // 用于记录和管理所有客户端的channle private Channel outboundChannel; private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 打开链接 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("websocket::::::::::: active"); super.channelActive(ctx); } /** * 获取客户端的channle,添加到ChannelGroup中 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("websocket::::::::::: add"); clients.add(ctx.channel()); } /** * 从ChannelGroup中移除channel */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("websocket::::::::::: Removed"); } /** * 销毁channel */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("websocket::::::::::: destroyed"); if (clients != null) { closeOnFlush(outboundChannel); } } /** * 关闭释放channel * @param ch */ static void closeOnFlush(Channel ch) { if (ch != null && ch.isActive()) { ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 异常捕获 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.err.println("出错了"); cause.printStackTrace(); ctx.close(); } /** * 路由 * 对http,websocket单独处理 */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) {// 如果是HTTP请求,进行HTTP操作 handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) {// 如果是Websocket请求,则进行websocket操作 handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } /** * 对http请求的处理 */ private void handleHttpRequest(ChannelHandlerContext ctx, final FullHttpRequest msg) { final Channel inboundChannel = ctx.channel(); String host = msg.headers().get("Host"); int port = 80; String pattern = "(http://|https://)?([^:]+)(:[\\d]+)?"; Pattern r = Pattern.compile(pattern); Matcher m = r.matcher(host); if (m.find()) { host = m.group(2); port = (m.group(3) == null) ? 80 : Integer.parseInt(m.group(3).substring(1)); } Bootstrap b = new Bootstrap(); b.group(inboundChannel.eventLoop()) // use inboundChannel thread .channel(ctx.channel().getClass()).handler(new BackendHandlerInitializer(inboundChannel)); ChannelFuture f = b.connect("127.0.0.1", 8015); outboundChannel = f.channel(); msg.retain(); ChannelFuture channelFuture = f.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { outboundChannel.writeAndFlush(msg); } else { inboundChannel.close(); } } }); } /** * 对socket请求的处理 */ private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) { // 获取客户端传输过来的消息 String content = msg.toString(); System.out.println("websocket::: 接受到的数据:" + content); clients.writeAndFlush(new TextWebSocketFrame("[服务器在]" + LocalDateTime.now() + "接受到消息, 消息为:" + content)); } }
标签:ace connect channel str 定义 toc extend oid oop
原文地址:https://www.cnblogs.com/excellencesy/p/11241063.html