标签:oar bytes dom sse instance notify 通信 pool art
转载请注明出处:http://blog.csdn.net/a906998248/article/details/52839425
前言,之前一直围绕着Http协议来开发项目,最近由于参与一个类似竞拍项目的开发,有这样一个场景,多个客户端竞拍一个商品,当一个客户端加价后,其它关注这个商品的客户端需要立即知道该商品的最新价格。
这里有个问题,Http协议是基于请求/响应的,客户端发送请求,然后服务端响应返回,客户端是主动方,服务端被动的接收客户端的请求来响应,无法解决上述场景中服务端主动将最新的数据推送给客户端的需求。
当然,有人会提出ajax轮询的方案,就是客户端不断的请求(假如1秒1次)最新竞拍价格。显然这种模式具有很明显的缺点,即浏览器需要不断地向服务器发出请求,但是Http request的Header是非常冗长的,里面包含的可用数据比例可能非常低,这会占用很多的带宽和服务器资源。
还有一种比较新颖的方案,long poll(长轮询)。利用长轮询,客户端可以打开指向服务端的Http连接,而服务器会一直保持连接打开,直到服务端数据更新再发送响应。虽然这种方式比ajax轮询有进步,但都存在一个共同问题:由于Http协议的开销,导致它们不适合用于低延迟应用。
一.WebSocket协议简介
WebSocket 是 Html5 开始提供的一种浏览器与服务器间进行全双工通信的网络技术。(全双工:同一时刻,数据可以在客户端和服务端两个方向上传输)
在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后浏览器和服务器之间就形成了一条快速通道,两者就可以直接互相传送数据了。
二.相比传统Http协议的优点及作用
1.Http协议的弊端:
a.Http协议为半双工协议。(半双工:同一时刻,数据只能在客户端和服务端一个方向上传输)
b.Http协议冗长且繁琐
c.易收到攻击,如长轮询
d.非持久化协议
2.WebSocket的特性:
a.单一的 TCP 连接,采用全双工模式通信
b.对代理、防火墙和路由器透明
c.无头部信息、Cookie 和身份验证
d.无安全开销
e.通过 ping/pong 帧保持链路激活
f.持久化协议,连接建立后,服务器可以主动传递消息给客户端,不再需要客户端轮询
三.聊天实例
前面提到过,WebSocket通信需要建立WebSocket连接,客户端首先要向服务端发起一个 Http 请求,这个请求和通常的 Http 请求不同,包含了一些附加头信息,其中附加信息"Upgrade:WebSocket"表明这是一个基于 Http 的 WebSocket 握手请求。如下:
- GET /chat HTTP/1.1
- Host: server.example.com
- Upgrade: websocket
- Connection: Upgrade
- Sec-WebSocket-Key: sdewgzgfewfsgergzgewrfaf==
- Sec-WebSocket-Protocol: chat, superchat
- Sec-WebSocket-Version: 13
- Origin: http://example.com
其中,Sec-WebSocket-Key是随机的,服务端会使用它加密后作为Sec-WebSocket-Accept的值返回;Sec-WebSocket-Protocol是一个用户定义的字符串,用来区分同URL下,不同的服务所需要的协议;Sec-WebSocket-Version是告诉服务器所使用的Websocket Draft(协议版本)
不出意外,服务端会返回下列信息表示握手成功,连接已经建立:
- HTTP/1.1 101 Switching Protocols
- Upgrade: websocket
- Connection: Upgrade
- Sec-WebSocket-Accept: sdgdfshgretghsdfgergtbd=
- Sec-WebSocket-Protocol: chat
到这里 WebSocket 连接已经成功建立,服务端和客户端可以正常通信了,此时服务端和客户端都是对等端点,都可以主动发送请求到另一端。
下面是前端和后端的实现过程,后端我采用了 Netty 的 API,因为最近在学 Netty,所以就采用了 Netty 中的 NIO 来构建 WebSocket 后端,我看了下网上也有用 Tomcat API 来实现,看起来也很简单,朋友们可以试试。前端使用HTML5 来构建,可以参考WebSocket接口文档,非常方便简单。
Lanucher用来启动WebSocket服务端
- import com.company.server.WebSocketServer;
-
- public class Lanucher {
-
- public static void main(String[] args) throws Exception {
-
- new WebSocketServer().run(WebSocketServer.WEBSOCKET_PORT);
- }
-
- }
使用 Netty 构建的 WebSocket 服务
- import org.apache.log4j.Logger;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.HttpServerCodec;
- import io.netty.handler.stream.ChunkedWriteHandler;
-
- public class WebSocketServer {
- private static final Logger LOG = Logger.getLogger(WebSocketServer.class);
-
-
- public static final int WEBSOCKET_PORT = 9090;
-
- public void run(int port) throws Exception {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
-
- @Override
- protected void initChannel(Channel channel) throws Exception {
- ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast("http-codec", new HttpServerCodec());
- pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
- pipeline.addLast("http-chunked", new ChunkedWriteHandler());
- pipeline.addLast("handler", new BananaWebSocketServerHandler());
- }
- });
-
- Channel channel = b.bind(port).sync().channel();
- LOG.info("WebSocket 已经启动,端口:" + port + ".");
- channel.closeFuture().sync();
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
-
- }
WebSocket 服务端处理类,注意第一次握手是 Http 协议
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelPromise;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.DefaultFullHttpResponse;
- import io.netty.handler.codec.http.FullHttpRequest;
- import io.netty.handler.codec.http.FullHttpResponse;
- import io.netty.handler.codec.http.HttpHeaders;
- import io.netty.handler.codec.http.HttpResponseStatus;
- import io.netty.handler.codec.http.HttpVersion;
- import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
- import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
- import io.netty.util.CharsetUtil;
-
- import org.apache.log4j.Logger;
-
- import com.company.serviceimpl.BananaService;
- import com.company.util.CODE;
- import com.company.util.Request;
- import com.company.util.Response;
- import com.google.common.base.Strings;
- import com.google.gson.JsonSyntaxException;
-
-
- public class BananaWebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
- private static final Logger LOG = Logger.getLogger(BananaWebSocketServerHandler.class.getName());
-
- private WebSocketServerHandshaker handshaker;
- private ChannelHandlerContext ctx;
- private String sessionId;
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg instanceof FullHttpRequest) {
- handleHttpRequest(ctx, (FullHttpRequest) msg);
- } else if (msg instanceof WebSocketFrame) {
- handleWebSocketFrame(ctx, (WebSocketFrame) msg);
- }
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.flush();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- LOG.error("WebSocket异常", cause);
- ctx.close();
- LOG.info(sessionId + " 注销");
- BananaService.logout(sessionId);
- BananaService.notifyDownline(sessionId);
- }
-
- @Override
- public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
- LOG.info("WebSocket关闭");
- super.close(ctx, promise);
- LOG.info(sessionId + " 注销");
- BananaService.logout(sessionId);
- BananaService.notifyDownline(sessionId);
- }
-
-
- private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
-
- if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
- sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
- return;
- }
-
-
- WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
- handshaker = wsFactory.newHandshaker(request);
- if (handshaker == null) {
- WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
- } else {
- handshaker.handshake(ctx.channel(), request);
-
- this.ctx = ctx;
- }
- }
-
-
- private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
-
- if (frame instanceof CloseWebSocketFrame) {
- handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
- return;
- }
-
- if (frame instanceof PingWebSocketFrame) {
- ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
- return;
- }
-
- if (!(frame instanceof TextWebSocketFrame)) {
- throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
- }
-
-
- try {
- Request request = Request.create(((TextWebSocketFrame)frame).text());
- Response response = new Response();
- response.setServiceId(request.getServiceId());
- if (CODE.online.code.intValue() == request.getServiceId()) {
- String requestId = request.getRequestId();
- if (Strings.isNullOrEmpty(requestId)) {
- response.setIsSucc(false).setMessage("requestId不能为空");
- return;
- } else if (Strings.isNullOrEmpty(request.getName())) {
- response.setIsSucc(false).setMessage("name不能为空");
- return;
- } else if (BananaService.bananaWatchMap.containsKey(requestId)) {
- response.setIsSucc(false).setMessage("您已经注册了,不能重复注册");
- return;
- }
- if (!BananaService.register(requestId, new BananaService(ctx, request.getName()))) {
- response.setIsSucc(false).setMessage("注册失败");
- } else {
- response.setIsSucc(true).setMessage("注册成功");
-
- BananaService.bananaWatchMap.forEach((reqId, callBack) -> {
- response.getHadOnline().put(reqId, ((BananaService)callBack).getName());
-
- if (!reqId.equals(requestId)) {
- Request serviceRequest = new Request();
- serviceRequest.setServiceId(CODE.online.code);
- serviceRequest.setRequestId(requestId);
- serviceRequest.setName(request.getName());
- try {
- callBack.send(serviceRequest);
- } catch (Exception e) {
- LOG.warn("回调发送消息给客户端异常", e);
- }
- }
- });
- }
- sendWebSocket(response.toJson());
- this.sessionId = requestId;
- } else if (CODE.send_message.code.intValue() == request.getServiceId()) {
- String requestId = request.getRequestId();
- if (Strings.isNullOrEmpty(requestId)) {
- response.setIsSucc(false).setMessage("requestId不能为空");
- } else if (Strings.isNullOrEmpty(request.getName())) {
- response.setIsSucc(false).setMessage("name不能为空");
- } else if (Strings.isNullOrEmpty(request.getMessage())) {
- response.setIsSucc(false).setMessage("message不能为空");
- } else {
- response.setIsSucc(true).setMessage("发送消息成功");
-
- BananaService.bananaWatchMap.forEach((reqId, callBack) -> {
- Request serviceRequest = new Request();
- serviceRequest.setServiceId(CODE.receive_message.code);
- serviceRequest.setRequestId(requestId);
- serviceRequest.setName(request.getName());
- serviceRequest.setMessage(request.getMessage());
- try {
- callBack.send(serviceRequest);
- } catch (Exception e) {
- LOG.warn("回调发送消息给客户端异常", e);
- }
- });
- }
- sendWebSocket(response.toJson());
- } else if (CODE.downline.code.intValue() == request.getServiceId()) {
- String requestId = request.getRequestId();
- if (Strings.isNullOrEmpty(requestId)) {
- sendWebSocket(response.setIsSucc(false).setMessage("requestId不能为空").toJson());
- } else {
- BananaService.logout(requestId);
- response.setIsSucc(true).setMessage("下线成功");
-
- BananaService.notifyDownline(requestId);
-
- sendWebSocket(response.toJson());
- }
-
- } else {
- sendWebSocket(response.setIsSucc(false).setMessage("未知请求").toJson());
- }
- } catch (JsonSyntaxException e1) {
- LOG.warn("Json解析异常", e1);
- } catch (Exception e2) {
- LOG.error("处理Socket请求异常", e2);
- }
- }
-
-
- private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
-
- if (response.getStatus().code() != 200) {
- ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
- response.content().writeBytes(buf);
- buf.release();
- HttpHeaders.setContentLength(response, response.content().readableBytes());
- }
-
-
- ChannelFuture f = ctx.channel().writeAndFlush(response);
- if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) {
- f.addListener(ChannelFutureListener.CLOSE);
- }
- }
-
-
- public void sendWebSocket(String msg) throws Exception {
- if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) {
- throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
- }
- this.ctx.channel().write(new TextWebSocketFrame(msg));
- this.ctx.flush();
- }
-
- }
聊天服务接口和实现类
- import com.company.util.Request;
-
- public interface BananaCallBack {
-
-
- void send(Request request) throws Exception;
-
- }
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
-
- import org.apache.log4j.Logger;
-
- import com.company.service.BananaCallBack;
- import com.company.util.CODE;
- import com.company.util.Request;
- import com.google.common.base.Strings;
-
- public class BananaService implements BananaCallBack {
- private static final Logger LOG = Logger.getLogger(BananaService.class);
-
- public static final Map<String, BananaCallBack> bananaWatchMap = new ConcurrentHashMap<String, BananaCallBack>();
-
- private ChannelHandlerContext ctx;
- private String name;
-
- public BananaService(ChannelHandlerContext ctx, String name) {
- this.ctx = ctx;
- this.name = name;
- }
-
- public static boolean register(String requestId, BananaCallBack callBack) {
- if (Strings.isNullOrEmpty(requestId) || bananaWatchMap.containsKey(requestId)) {
- return false;
- }
- bananaWatchMap.put(requestId, callBack);
- return true;
- }
-
- public static boolean logout(String requestId) {
- if (Strings.isNullOrEmpty(requestId) || !bananaWatchMap.containsKey(requestId)) {
- return false;
- }
- bananaWatchMap.remove(requestId);
- return true;
- }
-
- @Override
- public void send(Request request) throws Exception {
- if (this.ctx == null || this.ctx.isRemoved()) {
- throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
- }
- this.ctx.channel().write(new TextWebSocketFrame(request.toJson()));
- this.ctx.flush();
- }
-
-
-
- public static void notifyDownline(String requestId) {
- BananaService.bananaWatchMap.forEach((reqId, callBack) -> {
- Request serviceRequest = new Request();
- serviceRequest.setServiceId(CODE.downline.code);
- serviceRequest.setRequestId(requestId);
- try {
- callBack.send(serviceRequest);
- } catch (Exception e) {
- LOG.warn("回调发送消息给客户端异常", e);
- }
- });
- }
-
- public String getName() {
- return name;
- }
-
- }
前端html5聊天页面及js
- <!DOCTYPE html>
- <html>
- <head>
- <meta charset="UTF-8">
- <title>Netty WebSocket 聊天实例</title>
- </head>
- <script src="jquery.min.js" type="text/javascript"></script>
- <script src="map.js" type="text/javascript"></script>
- <script type="text/javascript">
- $(document).ready(function() {
- var uuid = guid(); // uuid在一个会话唯一
- var nameOnline = ‘‘; // 上线姓名
- var onlineName = new Map(); // 已上线人员, <requestId, name>
-
- $("#name").attr("disabled","disabled");
- $("#onlineBtn").attr("disabled","disabled");
- $("#downlineBtn").attr("disabled","disabled");
-
- $("#banana").hide();
-
- // 初始化websocket
- var socket;
- if (!window.WebSocket) {
- window.WebSocket = window.MozWebSocket;
- }
- if (window.WebSocket) {
- socket = new WebSocket("ws://localhost:9090/");
- socket.onmessage = function(event) {
- console.log("收到服务器消息:" + event.data);
- if (event.data.indexOf("isSucc") != -1) {// 这里需要判断是客户端请求服务端返回后的消息(response)
- var response = JSON.parse(event.data);
- if (response != undefined && response != null) {
- if (response.serviceId == 1001) { // 上线
- if (response.isSucc) {
- // 上线成功,初始化已上线人员
- onlineName.clear();
- $("#showOnlineNames").empty();
- for (var reqId in response.hadOnline) {
- onlineName.put(reqId, response.hadOnline[reqId]);
- }
- initOnline();
-
- $("#name").attr("disabled","disabled");
- $("#onlineBtn").attr("disabled","disabled");
- $("#downlineBtn").removeAttr("disabled");
- $("#banana").show();
- } else {
- alert("上线失败");
- }
- } else if (response.serviceId == 1004) {
- if (response.isSucc) {
- onlineName.clear();
- $("#showBanana").empty();
- $("#showOnlineNames").empty();
- $("#name").removeAttr("disabled");
- $("#onlineBtn").removeAttr("disabled");
- $("#downlineBtn").attr("disabled","disabled");
- $("#banana").hide();
- } else {
- alert("下线失败");
- }
- }
- }
- } else {// 还是服务端向客户端的请求(request)
- var request = JSON.parse(event.data);
- if (request != undefined && request != null) {
- if (request.serviceId == 1001 || request.serviceId == 1004) { // 有人上线/下线
- if (request.serviceId == 1001) {
- onlineName.put(request.requestId, request.name);
- }
- if (request.serviceId == 1004) {
- onlineName.removeByKey(request.requestId);
- }
-
- initOnline();
- } else if (request.serviceId == 1003) { // 有人发消息
- appendBanana(request.name, request.message);
- }
- }
- }
- };
- socket.onopen = function(event) {
- $("#name").removeAttr("disabled");
- $("#onlineBtn").removeAttr("disabled");
- console.log("已连接服务器");
- };
- socket.onclose = function(event) { // WebSocket 关闭
- console.log("WebSocket已经关闭!");
- };
- socket.onerror = function(event) {
- console.log("WebSocket异常!");
- };
- } else {
- alert("抱歉,您的浏览器不支持WebSocket协议!");
- }
-
- // WebSocket发送请求
- function send(message) {
- if (!window.WebSocket) { return; }
- if (socket.readyState == WebSocket.OPEN) {
- socket.send(message);
- } else {
- console.log("WebSocket连接没有建立成功!");
- alert("您还未连接上服务器,请刷新页面重试");
- }
- }
-
- // 刷新上线人员
- function initOnline() {
- $("#showOnlineNames").empty();
- for (var i=0;i<onlineName.size();i++) {
- $("#showOnlineNames").append(‘<tr><td>‘ + (i+1) + ‘</td>‘ +
- ‘<td>‘ + onlineName.element(i).value + ‘</td>‘ +
- ‘</tr>‘);
- }
- }
- // 追加聊天信息
- function appendBanana(name, message) {
- $("#showBanana").append(‘<tr><td>‘ + name + ‘: ‘ + message + ‘</td></tr>‘);
- }
-
- $("#onlineBtn").bind("click", function() {
- var name = $("#name").val();
- if (name == null || name == ‘‘) {
- alert("请输入您的尊姓大名");
- return;
- }
-
- nameOnline = name;
- // 上线
- send(JSON.stringify({"requestId":uuid, "serviceId":1001, "name":name}));
- });
-
- $("#downlineBtn").bind("click", function() {
- // 下线
- send(JSON.stringify({"requestId":uuid, "serviceId":1004}));
- });
-
- $("#sendBtn").bind("click", function() {
- var message = $("#messageInput").val();
- if (message == null || message == ‘‘) {
- alert("请输入您的聊天信息");
- return;
- }
-
- // 发送聊天消息
- send(JSON.stringify({"requestId":uuid, "serviceId":1002, "name":nameOnline, "message":message}));
- $("#messageInput").val("");
- });
-
- });
-
- function guid() {
- function S4() {
- return (((1+Math.random())*0x10000)|0).toString(16).substring(1);
- }
- return (S4()+S4()+"-"+S4()+"-"+S4()+"-"+S4()+"-"+S4()+S4()+S4());
- }
- </script>
- <body>
- <h1>Netty WebSocket 聊天实例</h1>
- <input type="text" id="name" value="佚名" placeholder="姓名" />
- <input type="button" id="onlineBtn" value="上线" />
- <input type="button" id="downlineBtn" value="下线" />
- <hr/>
- <table id="banana" border="1" >
- <tr>
- <td width="600" align="center">聊天</td>
- <td width="100" align="center">上线人员</td>
- </tr>
- <tr height="200" valign="top">
- <td>
- <table id="showBanana" border="0" width="600">
- <!--
- <tr>
- <td>张三: 大家好</td>
- </tr>
- <tr>
- <td>李四: 欢迎加入群聊</td>
- </tr>
- -->
- </table>
- </td>
- <td>
- <table id="showOnlineNames" border="0">
- <!--
- <tr>
- <td>1</td>
- <td>张三</td>
- <tr/>
- <tr>
- <td>2</td>
- <td>李四</td>
- <tr/>
- -->
- </table>
- </td>
- </tr>
- <tr height="40">
- <td></td>
- <td></td>
- </tr>
- <tr>
- <td>
- <input type="text" id="messageInput" style="width:590px" placeholder="巴拉巴拉点什么吧" />
- </td>
- <td>
- <input type="button" id="sendBtn" value="发送" />
- </td>
- </tr>
- </table>
-
- </body>
- </html>
运行方式:
1.运行Lanucher来启动后端的 WebSocket服务
2.打开Resources下的banana.html页面即可在线聊天,如下:
当有人上线/下线时,右边的"上线人员"会动态变化
综上,WebSocket 协议用于构建低延迟的服务,如竞拍、股票行情等,使用 Netty 可以方便的构建 WebSocket 服务,需要注意的是,WebSocket 协议基于 Http协议,采用 Http 握手成功后,就可以进行 TCP 全双工通信了。
GitHub上源码:https://github.com/leonzm/websocket_demo
参考:
《Netty 权威指南》
知乎上关于WebSocket
Websocket使用实例解读 -- tomcat
WebSocket API 接口
HTML5 WebSockets 教程
Netty笔记:使用WebSocket协议开发聊天系统
标签:oar bytes dom sse instance notify 通信 pool art
原文地址:http://www.cnblogs.com/sanhuan/p/6051696.html