标签:客户端程序 自己 stack keepalive remote nal local start foreach
服务端 Server
package com.oy.groupchat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { private int port; public Server(int port) { this.port = port; } public static void main(String[] args) throws Exception { new Server(8000).run(); } public void run() throws Exception { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .channel(NioServerSocketChannel.class) .group(boss, work) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new GroupChatServerChannelInitializer()); // 绑定端口,启动服务 ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("server started and listen " + port); // 监听关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } }
服务器端 Channel 初始化器 GroupChatServerChannelInitializer
package com.oy.groupchat; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class GroupChatServerChannelInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel socketChannel) throws Exception { /* 向管道加入处理器 */ ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("decoder", new StringDecoder()); // 解码器 pipeline.addLast("encoder", new StringEncoder()); // 编码器 // 添加自定义的处理器 pipeline.addLast("GroupChatServerHandler", new GroupChatServerHandler()); } }
服务器端 自定义的 handler---GroupChatServerHandler
package com.oy.groupchat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.Date; public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> { // 定义一个 channel 组,管理所有的 channel // GlobalEventExecutor.INSTANCE 是全局的事件执行器,单例 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 一旦连接,第一个执行 * 将当前 channel 加入到 channelGroup */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); // 将该客户加入聊天的信息推送给其他在线的客户端 channelGroup.writeAndFlush(sdf.format(new Date()) + " [客户端]" + channel.remoteAddress() + " 加入聊天\n"); channelGroup.add(channel); } /** * 断开连接 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); // 将该客户离开聊天的信息推送给其他在线的客户端 channelGroup.writeAndFlush(sdf.format(new Date()) + " [客户端]" + channel.remoteAddress() + " 离开\n"); System.out.println("当前 channelGroup 大小:" + channelGroup.size()); } /** * 表示 channel 处于活动状态, 提示上线 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + "上线了。" + sdf.format(new Date())); } /** * 表示 channel 处于不活动状态, 提示下线 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + "下线了。" + sdf.format(new Date())); } protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { // 获取当前 channel final Channel channel = ctx.channel(); // 遍历 channelGroup,根据不同的情况,返回不同的消息 channelGroup.forEach(ch -> { if (channel != ch) { // 不是当前 channel, 转发消息 ch.writeAndFlush("[客户端]" + channel.remoteAddress() + "发送了消息:" + msg + "\n"); } else { ch.writeAndFlush("[自己]" + channel.remoteAddress() + "发送了消息:" + msg + "\n"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客户端 Client
package com.oy.groupchat; import com.oy.helloworld.NettyClient; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.Scanner; public class Client { private static final String HOST = "127.0.0.1"; private static final int PORT = 8000; public static void main(String[] args) { new Client().run(HOST, PORT); } public void run(String host, int port) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap client = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new GroupChatClientChannelInitializer()); } }); ChannelFuture future = client.connect(host, port).sync(); System.out.println("--------------" + future.channel().localAddress() + "--------------"); Channel channel = future.channel(); // 客户端输入信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.nextLine(); channel.writeAndFlush(msg + "\r\n"); } future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
GroupChatClientChannelInitializer
package com.oy.groupchat; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class GroupChatClientChannelInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel socketChannel) throws Exception { /* 向管道加入处理器 */ ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("decoder", new StringDecoder()); // 解码器 pipeline.addLast("encoder", new StringEncoder()); // 编码器 // 添加自定义的处理器 pipeline.addLast("GroupChatClientHandler", new GroupChatClientHandler()); } }
GroupChatClientHandler
package com.oy.groupchat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg); } }
启动服务端程序 和 3 个 客户端程序
客户端发送消息
---
标签:客户端程序 自己 stack keepalive remote nal local start foreach
原文地址:https://www.cnblogs.com/xy-ouyang/p/12824998.html