码迷,mamicode.com
首页 > Web开发 > 详细

Netty实现WebSocket

时间:2018-05-25 13:35:11      阅读:225      评论:0      收藏:0      [点我收藏+]

标签:else   factor   hand   obj   inactive   控制   header   utils   rgs   

package com.qmtt.server;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

@Service
public class NettyServer {
    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
    EventLoopGroup bossGroup;
    EventLoopGroup workGroup;
    Channel channel;

    // public static void main(String[] args) {
    // new NettyServer().run();
    // }

    @PostConstruct
    public void run() {
        log.info("启动netty");
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new ChildChannelHandler());
            channel = b.bind(7397).sync().channel();
            // channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("", e);
        } finally {
            // bossGroup.shutdownGracefully();
            // workGroup.shutdownGracefully();
        }
    }

    @PreDestroy
    public void stop() {
        log.info("关闭netty");
        if (null == channel) {
            log.error("server channel is null");
        }
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
        channel.closeFuture().syncUninterruptibly();
        bossGroup = null;
        workGroup = null;
        channel = null;
    }
}
package com.qmtt.server;

import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;

import com.qmtt.tools.JsonUtils;
import com.qmtt.tools.SpringUtil;
import com.qmtt.websocket.GameFunction2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;

public class MyWebSocket2 extends SimpleChannelInboundHandler<Object> {
    private static final Logger log = LoggerFactory.getLogger(MyWebSocket2.class);
    private WebSocketServerHandshaker handshaker;

    private static Map<String, ChannelHandlerContext> webSocketMap = new Hashtable<String, ChannelHandlerContext>();

    public GameFunction2 gameFunction;

    RedisTemplate redisTemplate;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端与服务端连接开启");
        gameFunction = SpringUtil.getBean(GameFunction2.class);
        redisTemplate = (RedisTemplate) SpringUtil.getBean("redisTemplate");
        // 添加
        // Global.group.add(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 移除
        // Global.group.remove(ctx.channel());
        log.info("客户端与服务端连接关闭");
        String key = null;
        Iterator iterator = webSocketMap.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, ChannelHandlerContext> entry = (Entry<String, ChannelHandlerContext>) iterator.next();
            key = entry.getKey();
            if (entry.getValue().equals(ctx)) {
                key = entry.getKey();
                break;
            }
        }
        log.info("<{}>断开连接", key);
        if (key != null) {
            webSocketMap.remove(key);
        }
        gameFunction.close(key);
    }

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, ((FullHttpRequest) msg));
        } else if (msg instanceof WebSocketFrame) {
            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            log.info("连接开闭");
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 本例程仅支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            log.info("不支持二进制消息");
            return;
        }
        // 返回应答消息
        String message = ((TextWebSocketFrame) frame).text();
        if (!message.contains("msgType")) {
            return;
        }
        log.info("服务端收到消息:" + message);
        try {
            Map map = JsonUtils.json2map(message);
            String msgTpye = map.get("msgType").toString();
            String openid = map.get("openid").toString();
            // 开始游戏
            if (msgTpye.equals("start")) {
                webSocketMap.put(openid, ctx);
                // String rankValue = map.get("rankValue").toString();
                gameFunction.joinGame(openid);
                return;
            }
            // 回答问题
            if (msgTpye.equals("answer")) {
                gameFunction.answer(map);
                return;
            }
            // 游戏结束
            if (msgTpye.equals("gameover")) {
                gameFunction.gameover(map);
                return;
            }
            // 发出邀请等待对手
            if (msgTpye.equals("wait")) {
                webSocketMap.put(openid, ctx);
                gameFunction.waitEnter(openid);
                return;
            }
            // 发出邀请对手进入
            if (msgTpye.equals("waitEnter")) {
                webSocketMap.put(openid, ctx);
                String inviteOpenid = (String) map.get("inviteOpenid");
                // 要判断用户是否已经开始在玩游戏 了,是否已经离开
                gameFunction.checkUserStatus(openid, inviteOpenid);
                return;
            }
            // 发出邀请对手进入
            if (msgTpye.equals("waitStart")) {
                gameFunction.waitStart(openid);
                return;
            }
            // 再来一局
            if (msgTpye.equals("playAgain")) {
                gameFunction.playAgain(openid);
                return;
            }

        } catch (Exception e) {
            log.error("", e);
        }
        // TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
        // + ctx.channel().id() + ":" + request);
        // // 群发
        // Global.group.writeAndFlush(tws);
        // 返回【谁发的发给谁】
        // ctx.channel().writeAndFlush(tws);
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        // 注意,这条地址别被误导了,其实这里填写什么都无所谓,WS协议消息的接收不受这里控制
        // 消息分发可以通过Req中获取uri处理
        // WebSocketServerHandshakerFactory wsFactory = new
        // WebSocketServerHandshakerFactory("ws://127.0.0.1:7397/websocket",
        // null,
        // false);
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("", null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回应答给客户端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    private static boolean isKeepAlive(FullHttpRequest req) {
        return false;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    public static Map<String, ChannelHandlerContext> getWebSocketMap() {
        return webSocketMap;
    }

    public static int sendMsg(String id, Object msg) {
        try {
            String ret = JsonUtils.toJsonStringIgnoreNull(msg);
            ChannelHandlerContext socket = MyWebSocket2.getWebSocketMap().get(id);
            if (socket != null) {
                log.info("给<{}>发送消息:{}", id, ret);
                socket.writeAndFlush(new TextWebSocketFrame(ret));
                return 1;
            } else {
                log.info("<{}>连接不存在,不处理", id);
            }
        } catch (Exception ex) {
            log.error("", ex);
        }
        return 0;
    }

    public static int sendMsg(String id, String msg) {
        try {
            ChannelHandlerContext socket = MyWebSocket2.getWebSocketMap().get(id);
            if (socket != null) {
                log.info("给<{}>发送消息:{}", id, msg);
                socket.writeAndFlush(new TextWebSocketFrame(msg));
                return 1;
            } else {
                log.info("连接不存在,不处理");
            }
        } catch (Exception ex) {
            log.error("", ex);
        }
        return 0;
    }
}
package com.qmtt.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel e) throws Exception {

        e.pipeline().addLast("http-codec", new HttpServerCodec());
        e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
        e.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
        e.pipeline().addLast("handler", new MyWebSocket2());
    }
}

此代码为诗词荣耀websocket的实现,解决了tomcat实现的websocket连接不稳定的问题

Netty实现WebSocket

标签:else   factor   hand   obj   inactive   控制   header   utils   rgs   

原文地址:https://www.cnblogs.com/wujf/p/9087394.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!