标签:int git star utf-8 pen 建立连接 rip init override
1.maven依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.10.Final</version> </dependency>
2.springboot入口启动类
import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.support.SpringBootServletInitializer; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; import com.xxxxx.netty.NettyServer; /** * Spring Boot 应用启动类 * * Created by bysocket on 16/4/26. */ // Spring Boot 应用的标识 @SpringBootApplication // mapper 接口类扫描包配置 @EnableTransactionManagement @EnableScheduling @EnableAspectJAutoProxy(proxyTargetClass = true) //开启AspectJ代理,并将proxyTargetClass置为true,表示启用cglib对Class也进行代理 @MapperScan("com.xxxxx.dao") public class Application extends SpringBootServletInitializer { public static void main(String[] args) { // 程序启动入口 // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件 SpringApplication.run(Application.class, args); try { new NettyServer(8091).start(); }catch(Exception e) { System.out.println("NettyServerError:"+e.getMessage()); } } }
3. NettyServer
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; /** * NettyServer Netty服务器配置 * @author * @date */ public class NettyServer { private static Logger logger = LoggerFactory.getLogger(NettyServer.class); private final int port; public NettyServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); sb.group(group, bossGroup) // 绑定线程池 .channel(NioServerSocketChannel.class) // 指定使用的channel .localAddress(this.port)// 绑定监听端口 .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作 @Override protected void initChannel(SocketChannel ch) throws Exception { logger.info("收到新的客户端连接: {}",ch.toString()); //websocket协议本身是基于http协议的,所以这边也要使用http解编码器 ch.pipeline().addLast(new HttpServerCodec()); //以块的方式来写的处理器 ch.pipeline().addLast(new ChunkedWriteHandler()); ch.pipeline().addLast(new HttpObjectAggregator(8192)); ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10)); ch.pipeline().addLast(new MyWebSocketHandler()); } }); ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定 System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress()); cf.channel().closeFuture().sync(); // 关闭服务器通道 } finally { group.shutdownGracefully().sync(); // 释放线程池资源 bossGroup.shutdownGracefully().sync(); } } }
4.MyWebSocketHandler
import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject; import com.github.pagehelper.Page; import com.xxxxx.service.IServiceXfzhQz; import com.xxxxx.util.SpringUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; /** * MyWebSocketHandler * WebSocket处理器,处理websocket连接相关 * @author * @date */ public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ private static Logger logger = LoggerFactory.getLogger(MyWebSocketHandler.class); public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //用户id=>channel示例 //可以通过用户的唯一标识保存用户的channel //这样就可以发送给指定的用户 public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(); //由于@Autowired注解注入不进去,所以取巧了 static IServiceXfzhQz serviceXfzhQz; static { serviceXfzhQz = SpringUtil.getBean(IServiceXfzhQz.class); } /** * 每当服务端收到新的客户端连接时,客户端的channel存入ChannelGroup列表中,并通知列表中其他客户端channel * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //获取连接的channel Channel incomming = ctx.channel(); //通知所有已经连接到服务器的客户端,有一个新的通道加入 /*for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入\n"); }*/ channelGroup.add(ctx.channel()); } /** *每当服务端断开客户端连接时,客户端的channel从ChannelGroup中移除,并通知列表中其他客户端channel * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //获取连接的channel /*Channel incomming = ctx.channel(); for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"离开\n"); }*/ //从服务端的channelGroup中移除当前离开的客户端 channelGroup.remove(ctx.channel()); //从服务端的channelMap中移除当前离开的客户端 Collection<Channel> col = channelMap.values(); while(true == col.contains(ctx.channel())) { col.remove(ctx.channel()); logger.info("netty客户端连接删除成功!"); } } /** * 服务端监听到客户端活动 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("netty与客户端建立连接,通道开启!"); //添加到channelGroup通道组 //channelGroup.add(ctx.channel()); } /** * 服务端监听到客户端不活动 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info("netty与客户端断开连接,通道关闭!"); //添加到channelGroup 通道组 //channelGroup.remove(ctx.channel()); } /** * 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的Channel. * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { logger.info("netty客户端收到服务器数据: {}" , msg.text()); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //消息处理类 message(ctx,msg.text(),date); //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text())); } /** * 当服务端的IO 抛出异常时被调用 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //super.exceptionCaught(ctx, cause); Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:" + incoming.remoteAddress()+"异常"); //异常出现就关闭连接 cause.printStackTrace(); ctx.close(); } //消息处理类 public void message(ChannelHandlerContext ctx,String msg,String date) { try { Map<String,Object> resultmap = (Map<String,Object>)JSONObject.parse(msg); resultmap.put("CREATEDATE", date); //这里需要用户信息跟channel通道绑定 //所以每当一个客户端连接成功时,第一时间传一条登录信息 //该字段用来判断是登录绑定信息,还是发送信息 String msgtype = (String)resultmap.get("MSGTYPE"); if(msgtype.equals("DL")){//用户登录信息绑定 Channel channel = ctx.channel(); channelMap.put((String) resultmap.get("USERID"), channel); resultmap.put("success", true); resultmap.put("message", "用户链接绑定成功!"); channel.writeAndFlush(new TextWebSocketFrame(resultmap.toString())); logger.info("netty用户: {} 连接绑定成功!" , (String) resultmap.get("USERID")); }else if(msgtype.equals("DH")){//消息对话 //这里根据群组ID查询出来所有的用户信息并循环发送消息 //如果是单聊 可以直接获取channelMap中用户channel并发送 Page<Map<String, Object>> list = serviceXfzhQz.selectXfzhQzByQzId((String)resultmap.get("QZID")); for (Map<String, Object> map : list) { String userid = (String) map.get("USERID"); //判断该用户ID是否绑定通道 if(channelMap.containsKey(userid)){ Channel channel = channelMap.get(userid); channel.writeAndFlush(new TextWebSocketFrame(resultmap.toString())); } } } } catch (Exception e) { e.printStackTrace(); } } }
5.测试页面
向服务端发送消息时,数据格式是json字符串.
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>Netty-Websocket</title> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://127.0.0.1:8091/ws"); socket.onmessage = function(event){ var ta = document.getElementById(‘responseText‘); ta.value += event.data+"\r\n"; }; socket.onopen = function(event){ var ta = document.getElementById(‘responseText‘); ta.value = "Netty-WebSocket服务器。。。。。。连接 \r\n"; }; socket.onclose = function(event){ var ta = document.getElementById(‘responseText‘); ta.value = "Netty-WebSocket服务器。。。。。。关闭 \r\n"; }; }else{ alert("您的浏览器不支持WebSocket协议!"); } function send(message){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("WebSocket 连接没有建立成功!"); } } </script> </head> <body> <form onSubmit="return false;"> <label>TEXT</label><input type="text" name="message" value="这里输入消息" style="width: 1024px;height: 100px;"/> <br /> <br /> <input type="button" value="发送ws消息" onClick="send(this.form.message.value)" /> <hr color="black" /> <h3>服务端返回的应答消息</h3> <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea> </form> </body> </html>
springboot 集成Netty+websocket实现简单的聊天功能
标签:int git star utf-8 pen 建立连接 rip init override
原文地址:https://www.cnblogs.com/Lixiaogang/p/13045300.html