码迷,mamicode.com
首页 > 其他好文 > 详细

engine.io分析3--socket.io的基石

时间:2014-12-15 06:29:16      阅读:322      评论:0      收藏:0      [点我收藏+]

标签:des   style   blog   http   io   ar   color   os   sp   

转载请注明: TheViper http://www.cnblogs.com/TheViper

上一篇讲了第一次请求(握手)的执行流程,这篇说说握手后,engine.io是怎么传输数据的。

engine.io对http的request事件进行了绑定。在回调函数中,根据有没有socket id来判断是不是握手请求。

    if (req._query.sid) {
      debug(‘setting new request for existing client‘);
      self.clients[req._query.sid].transport.onRequest(req);
    } else {
      self.handshake(req._query.transport, req);
    }

 上一篇走的是self.handshake(req._query.transport, req);,这篇走self.clients[req._query.sid].transport.onRequest(req);。

上一篇的握手请求是get方法。维持长连接的请求也是get方法。广播时,engine.io收到数据,会向长连接中写入数据,返回长连接,然后关闭。如果长连接超时了或者返回错误,客户端也会再次发出。如果在规定的时间(pingTimeout,默认60秒)内,服务器端没有收到客户端发出的长连接请求,则认定这个socket无效,从clients中清除该socket.

function Socket (id, server, transport, req) {
。。。。
  this.setTransport(transport);
  this.onOpen();
}
Socket.prototype.setPingTimeout = function () {
  var self = this;
  clearTimeout(self.pingTimeoutTimer);
  self.pingTimeoutTimer = setTimeout(function () {
    self.onClose(‘ping timeout‘);
  }, self.server.pingInterval + self.server.pingTimeout);
};
Socket.prototype.onOpen = function () {
  this.readyState = ‘open‘;

  // sends an `open` packet
  this.transport.sid = this.id;
  this.sendPacket(‘open‘, JSON.stringify({
      sid: this.id
    , upgrades: this.getAvailableUpgrades()
    , pingInterval: this.server.pingInterval
    , pingTimeout: this.server.pingTimeout
  }));

  this.emit(‘open‘);
  this.setPingTimeout();
};

可以看到,每次握手成功,初始化一个socket的时候,就会设置一个超时定时器。超时的话执行onClose(),里面就是各种清空,初始化,然后触发close事件,取消其他事件绑定等。

又回到onRequest(),针对请求方法的不同,执行不同的策略,

Polling.prototype.onRequest = function (req) {
  var res = req.res;
  
  if (‘GET‘ == req.method) {
    this.onPollRequest(req, res);
  } else if (‘POST‘ == req.method) {
    this.onDataRequest(req, res);
  } else {
    res.writeHead(500);
    res.end();
  }
};

 如果是get方法

Polling.prototype.onPollRequest = function (req, res) {
  debug(‘setting request‘);

  this.req = req;
  this.res = res;

  var self = this;

  function onClose () {
    self.onError(‘poll connection closed prematurely‘);
  }

  function cleanup () {
    req.removeListener(‘close‘, onClose);
    self.req = self.res = null;
  }
  req.cleanup = cleanup;
  req.on(‘close‘, onClose);

  this.writable = true;
  this.emit(‘drain‘);
};

 onPollRequest()里面其实就做了close事件绑定,触发drain事件。drain事件是在socket.js里面 this.setTransport(transport);中绑定的。

this.transport.on(‘drain‘, this.flush.bind(this));可以看到,最终还是用了flush().

如果是post方法

Polling.prototype.onDataRequest = function (req, res) {
  if (this.dataReq) {
    // assert: this.dataRes, ‘.dataReq and .dataRes should be (un)set together‘
    this.onError(‘data request overlap from client‘);
    res.writeHead(500);
    return;
  }

  var isBinary = ‘application/octet-stream‘ == req.headers[‘content-type‘];

  this.dataReq = req;
  this.dataRes = res;

  var chunks = isBinary ? new Buffer(0) : ‘‘;
  var self = this;

  function cleanup () {
    chunks = isBinary ? new Buffer(0) : ‘‘;
    req.removeListener(‘data‘, onData);
    req.removeListener(‘end‘, onEnd);
    req.removeListener(‘close‘, onClose);
    self.dataReq = self.dataRes = null;
  }

  function onClose () {
    cleanup();
    self.onError(‘data request connection closed prematurely‘);
  }

  function onData (data) {
    var contentLength;
    if (typeof data == ‘string‘) {
      chunks += data;
      contentLength = Buffer.byteLength(chunks);
    } else {
      chunks = Buffer.concat([chunks, data]);
      contentLength = chunks.length;
    }

    if (contentLength > self.maxHttpBufferSize) {
      chunks = ‘‘;
      req.connection.destroy();
    }
  }

  function onEnd () {
    self.onData(chunks);

    var headers = {
      // text/html is required instead of text/plain to avoid an
      // unwanted download dialog on certain user-agents (GH-43)
      ‘Content-Type‘: ‘text/html‘,
      ‘Content-Length‘: 2
    };

    // prevent XSS warnings on IE
    // https://github.com/LearnBoost/socket.io/pull/1333
    var ua = req.headers[‘user-agent‘];
    if (ua && (~ua.indexOf(‘;MSIE‘) || ~ua.indexOf(‘Trident/‘))) {
      headers[‘X-XSS-Protection‘] = ‘0‘;
    }

    res.writeHead(200, self.headers(req, headers));
    res.end(‘ok‘);
    cleanup();
  }

  req.abort = cleanup;
  req.on(‘close‘, onClose);
  req.on(‘data‘, onData);
  req.on(‘end‘, onEnd);
  if (!isBinary) req.setEncoding(‘utf8‘);
};

里面其实就是对http request里面事件的绑定。当有数据传入时,onData(),判断是不是超出设定的buffer最大长度,如果没有超出,则将数据写入chunks。

当收到数据后,onEnd().然后是self.onData(chunks);

Polling.prototype.onData = function (data) {
  debug(‘received "%s"‘, data);
  var self = this;
  var callback = function(packet) {
    if (‘close‘ == packet.type) {
      debug(‘got xhr close packet‘);
      self.onClose();
      return false;
    }
    self.onPacket(packet);
  };

  parser.decodePayload(data, callback);
};

parser.decodePayload(data, callback);用来处理字符编码的,可以简单的看成是对数据处理后执行回调函数。

self.onPacket(packet);在父类transport.js里面。

Transport.prototype.onPacket = function (packet) {
  this.emit(‘packet‘, packet);
};

触发transport上的packet事件,这个又是在socket.js的setTransport()里绑定的。前面的onPollRequest()里的drain事件也是。

Socket.prototype.setTransport = function (transport) {
  this.transport = transport;
  this.transport.once(‘error‘, this.onError.bind(this));
  this.transport.on(‘packet‘, this.onPacket.bind(this));
  this.transport.on(‘drain‘, this.flush.bind(this));
  this.transport.once(‘close‘, this.onClose.bind(this, ‘transport close‘));
  //this function will manage packet events (also message callbacks)
  this.setupSendCallback();
};

然后是onPacket()回调

Socket.prototype.onPacket = function (packet) {
  if (‘open‘ == this.readyState) {
    // export packet event
    debug(‘packet‘);
    this.emit(‘packet‘, packet);
    // Reset ping timeout on any packet, incoming data is a good sign of
    // other side‘s liveness
    this.setPingTimeout();

    switch (packet.type) {

      case ‘ping‘:
        debug(‘got ping‘);
        this.sendPacket(‘pong‘);
        this.emit(‘heartbeat‘);
        break;

      case ‘error‘:
        this.onClose(‘parse error‘);
        break;

      case ‘message‘:
        this.emit(‘data‘, packet.data);
        this.emit(‘message‘, packet.data);
        break;
    }
  } else {
    debug(‘packet received with closed socket‘);
  }
};

重新设置超时定时器.由于每次长连接关闭的时候总会再发出一个post请求,engine.io由此确定客户端还在,这时,packet.type是ping,然后返回响应的心跳。客户端收到才会重新发出新的get长连接请求。

如果post请求中带有数据,packet.type会是message.触发data,message事件。绑定,回调在服务端自定义。这样服务器端就收到了客户端的数据。

最后说下socket.send,socket.write.这也是向客户端返回数据的方法。很简单。

Socket.prototype.send =
Socket.prototype.write = function(data, callback){
  this.sendPacket(‘message‘, data, callback);
  return this;
};

 sendPacket()在握手时也是用的这个返回握手响应,这个方法最后也用了flush().

engine.io分析3--socket.io的基石

标签:des   style   blog   http   io   ar   color   os   sp   

原文地址:http://www.cnblogs.com/TheViper/p/4163891.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!