标签:des style blog http io ar color os 使用
转载请注明: TheViper http://www.cnblogs.com/TheViper
源码分析
var engine = require(‘./node_modules/engine.io/lib/engine.io.js‘);
var server = engine.listen(8000,{
transports:[‘polling‘]
});
server.on(‘connection‘, function(socket){
socket.send(‘utf 8 string‘);
});
之前提到过engine.io只有websocket,polling两种传输方式。对于websocket,engine.io用了第三方库ws,这个从源码目录看的出来,调用很简单。所以本文只说polling这种方式.
另外精力有限,所以只分析服务端,就当是抛砖引玉,源码比较简单。
首先是加载engine.io.js.
exports = module.exports = function() {
// backwards compatible use as `.attach`
// if first argument is an http server
if (arguments.length && arguments[0] instanceof http.Server) {
return attach.apply(this, arguments);
}
// if first argument is not an http server, then just make a regular eio server
return exports.Server.apply(null, arguments);
};
注释上说的很清楚,if是为了兼容以前的版本,判断传入的是不是http.然后初始化Server.
Server初始化只是将参数写入,没有指定参数的,使用默认参数,还有就是定义后面用到的变量。最后,如果参数方式中有websocket,就初始化WebSocketServer。
if (~this.transports.indexOf(‘websocket‘)) {
this.ws = new WebSocketServer({ noServer: true, clientTracking: false });
}
接着是engine.listen().
function listen(port, options, fn) {
if (‘function‘ == typeof options) {
fn = options;
options = {};
}
var server = http.createServer(function (req, res) {
res.writeHead(501);
res.end(‘Not Implemented‘);
});
server.listen(port, fn);
// create engine server
var engine = exports.attach(server, options);
engine.httpServer = server;
return engine;
};
先判断是不是重载的方法,如果是,则没有指定端口,使用默认端口,并且参数移位。后面是将我这种定义方式,变成
var engine = require(‘engine.io‘);
var http = require(‘http‘).createServer().listen(3000);
var server = engine.attach(http);
这是文档上另一种定义方式。
接着是attach()
function attach(server, options) {
var engine = new exports.Server(options);
engine.attach(server, options);
return engine;
};
转到server里面的attach(),里面做的事就比较多了。简单说就是
// cache and clean up listeners
var listeners = server.listeners(‘request‘).slice(0);
server.removeAllListeners(‘request‘);
server.on(‘close‘, self.close.bind(self));
// add request handler
server.on(‘request‘, function(req, res){
if (check(req)) {
debug(‘intercepting request for path "%s"‘, path);
self.handleRequest(req, res);
} else {
for (var i = 0, l = listeners.length; i < l; i++) {
listeners[i].call(server, req, res);
}
}
});
清空监听器,有请求传入时用server.on(‘request‘)监听。
function check (req) {
return path == req.url.substr(0, path.length);
}
一般情况下,check()返回的是true,搞不懂什么时候返回false.看后面好像是负载均衡,需要配置多个监听的时候用。不明白!
接着是self.handleRequest(req, res);
Server.prototype.handleRequest = function(req, res){
debug(‘handling "%s" http request "%s"‘, req.method, req.url);
this.prepare(req);
req.res = res;
var self = this;
this.verify(req, false, function(err, success) {
if (!success) {
sendErrorMessage(req, res, err);
return;
}
if (req._query.sid) {
debug(‘setting new request for existing client‘);
self.clients[req._query.sid].transport.onRequest(req);
} else {
self.handshake(req._query.transport, req);
}
});
};
this.prepare(req)是将请求里面的查询参数封装到req._query上。然后是verify()
Server.prototype.verify = function(req, upgrade, fn){
// transport check
var transport = req._query.transport;
if (!~this.transports.indexOf(transport)) {
debug(‘unknown transport "%s"‘, transport);
return fn(Server.errors.UNKNOWN_TRANSPORT, false);
}
// sid check
var sid = req._query.sid;
if (sid) {
if (!this.clients.hasOwnProperty(sid))
return fn(Server.errors.UNKNOWN_SID, false);
if (!upgrade && this.clients[sid].transport.name !== transport) {
debug(‘bad request: unexpected transport without upgrade‘);
return fn(Server.errors.BAD_REQUEST, false);
}
} else {
// handshake is GET only
if (‘GET‘ != req.method) return fn(Server.errors.BAD_HANDSHAKE_METHOD, false);
if (!this.allowRequest) return fn(null, true);
return this.allowRequest(req, fn);
}
fn(null, true);
};
记下请求中要求的传输方式,例如http://localhost:3000/socket.io/?EIO=3&transport=polling&t=1418545776090-1&sid=M5e5hB6NAOmh7YW1AAAB。可以看到由transport参数指定。然后记下sid,这个请求中也有。注意,第一次请求的时候,由于没有实例socket,所以没有socket的id。这时判断有没有sid,确定是不是第一次请求。
下面我们假定是第一次请求,然后走else.
if (‘GET‘ != req.method),这里要判断下,因为engine.io会通过请求的方法的不同确定后面的执行流程,默认get方法维持长连接,当然发出握手请求的也是get方法,这个后面会说到。
this.allowRequest像是拦截器,拦截握手请求,和传输升级的请求(websocket).
例子没有设置allowRequest方法,就直接fn(null,true),进入回调函数.
this.verify(req, false, function(err, success) {
if (!success) {
sendErrorMessage(req, res, err);
return;
}
if (req._query.sid) {
debug(‘setting new request for existing client‘);
self.clients[req._query.sid].transport.onRequest(req);
} else {
self.handshake(req._query.transport, req);
}
});
如果是第一次请求,后面执行握手;如果不是,则选定已经存在的socket,调用socket上面的transport(传输方式,已经绑定到socket)的onRequest方法,这个后面会说到。现在我们还是第一次请求,所以开始握手。
Server.prototype.handshake = function(transport, req){
var id = base64id.generateId();
debug(‘handshaking client "%s"‘, id);
var transportName = transport;
try {
var transport = new transports[transport](req);
if (‘polling‘ == transportName) {
transport.maxHttpBufferSize = this.maxHttpBufferSize;
}
if (req._query && req._query.b64) {
transport.supportsBinary = false;
} else {
transport.supportsBinary = true;
}
}
catch (e) {
sendErrorMessage(req, req.res, Server.errors.BAD_REQUEST);
return;
}
var socket = new Socket(id, this, transport, req);
var self = this;
if (false !== this.cookie) {
transport.on(‘headers‘, function(headers){
headers[‘Set-Cookie‘] = self.cookie + ‘=‘ + id;
});
}
transport.onRequest(req);
this.clients[id] = socket;
this.clientsCount++;
socket.once(‘close‘, function(){
delete self.clients[id];
self.clientsCount--;
});
this.emit(‘connection‘, socket);
};
var transport = new transports[transport](req);这里的transports是transports = require(‘./transports‘)。加载的是transprots文件夹下面的index.js,不是transprots.js.
var XHR = require(‘./polling-xhr‘);
var JSONP = require(‘./polling-jsonp‘);
module.exports = exports = {
polling: polling,
websocket: require(‘./websocket‘)
};
exports.polling.upgradesTo = [‘websocket‘];
function polling (req) {
if (‘string‘ == typeof req._query.j) {
return new JSONP(req);
} else {
return new XHR(req);
}
}
由于transport由请求中的transprot参数决定,这里是polling。所以会执行polling(req),再看请求参数里有没有j,最终确定具体的传输方式。这里是xhr.
function XHR(req){
Polling.call(this, req);
}
XHR.prototype.__proto__ = Polling.prototype;
这里XHR继承了Polling.然后实例化Polling.
function Polling (req) {
Transport.call(this, req);
}
Polling.prototype.__proto__ = Transport.prototype;
又去实例化Transport.Transport继承EventEmitter。
回到主线,var socket = new Socket(id, this, transport, req);这里开始实例化socket了。socket里面做了什么,后面会说到。然后在header上设置cookie,值是上面产生的随机id.方便开发者做权限控制。
接着是transport.onRequest(req); 进入polling-xhr.js
XHR.prototype.onRequest = function (req) {
if (‘OPTIONS‘ == req.method) {
var res = req.res;
var headers = this.headers(req);
headers[‘Access-Control-Allow-Headers‘] = ‘Content-Type‘;
res.writeHead(200, headers);
res.end();
} else {
Polling.prototype.onRequest.call(this, req);
}
};
这里会先判断请求方法是不是options,这个好像只有高级浏览器中才会出现。
第一次请求是get方法,走后面。
Polling.prototype.onRequest = function (req) {
var res = req.res;
if (‘GET‘ == req.method) {
this.onPollRequest(req, res);
} else if (‘POST‘ == req.method) {
this.onDataRequest(req, res);
} else {
res.writeHead(500);
res.end();
}
};
又对请求方法进行判断。从方法名字就看的出,get方法用来维持长连接的,当然广播返回的数据也是通过这个get方法。post方法用来传输客户端的数据,比如客户端聊天的文字,如果没有数据,就会发出get请求。
我们还在握手请求,get方法。
Polling.prototype.onPollRequest = function (req, res) {
if (this.req) {
debug(‘request overlap‘);
// assert: this.res, ‘.req and .res should be (un)set together‘
this.onError(‘overlap from client‘);
res.writeHead(500);
return;
}
debug(‘setting request‘);
this.req = req;
this.res = res;
var self = this;
function onClose () {
self.onError(‘poll connection closed prematurely‘);
}
function cleanup () {
req.removeListener(‘close‘, onClose);
self.req = self.res = null;
}
req.cleanup = cleanup;
req.on(‘close‘, onClose);
this.writable = true;
this.emit(‘drain‘);
// if we‘re still writable but had a pending close, trigger an empty send
if (this.writable && this.shouldClose) {
debug(‘triggering empty send to append close packet‘);
this.send([{ type: ‘noop‘ }]);
}
};
onPollRequest()主要是对相应对象,如request,绑定事件,回调函数,后面传输数据时会用到。
回到前面看实例化socket,然后做了什么。
function Socket (id, server, transport, req) { this.id = id; this.server = server; this.upgraded = false; this.readyState = ‘opening‘; this.writeBuffer = []; this.packetsFn = []; this.sentCallbackFn = []; this.request = req; // Cache IP since it might not be in the req later this.remoteAddress = req.connection.remoteAddress; this.checkIntervalTimer = null; this.upgradeTimeoutTimer = null; this.pingTimeoutTimer = null; this.setTransport(transport); this.onOpen(); }
变量初始化后,this.setTransport(transport);
Socket.prototype.setTransport = function (transport) { this.transport = transport; this.transport.once(‘error‘, this.onError.bind(this)); this.transport.on(‘packet‘, this.onPacket.bind(this)); this.transport.on(‘drain‘, this.flush.bind(this)); this.transport.once(‘close‘, this.onClose.bind(this, ‘transport close‘)); //this function will manage packet events (also message callbacks) this.setupSendCallback(); };
对前面实例化的transport绑定事件,回调函数。然后是onOpen()
Socket.prototype.onOpen = function () { this.readyState = ‘open‘; // sends an `open` packet this.transport.sid = this.id; this.sendPacket(‘open‘, JSON.stringify({ sid: this.id , upgrades: this.getAvailableUpgrades() , pingInterval: this.server.pingInterval , pingTimeout: this.server.pingTimeout })); this.emit(‘open‘); this.setPingTimeout(); };
sendPacket()开始传输针对握手请求的响应,然后是设置超时定时器。
Socket.prototype.setPingTimeout = function () { var self = this; clearTimeout(self.pingTimeoutTimer); self.pingTimeoutTimer = setTimeout(function () { self.onClose(‘ping timeout‘); }, self.server.pingInterval + self.server.pingTimeout); };
回到sendPacket()
Socket.prototype.sendPacket = function (type, data, callback) { if (‘closing‘ != this.readyState) { debug(‘sending packet "%s" (%s)‘, type, data); var packet = { type: type }; if (data) packet.data = data; // exports packetCreate event this.emit(‘packetCreate‘, packet); this.writeBuffer.push(packet); //add send callback to object this.packetsFn.push(callback); this.flush(); } };
this.writeBuffer是暂时保存响应数据的缓冲。packet是用来包装响应数据的。然后flush输出。
Socket.prototype.flush = function () { if (‘closed‘ != this.readyState && this.transport.writable && this.writeBuffer.length) { debug(‘flushing buffer to transport‘); this.emit(‘flush‘, this.writeBuffer); this.server.emit(‘flush‘, this, this.writeBuffer); var wbuf = this.writeBuffer; this.writeBuffer = []; if (!this.transport.supportsFraming) { this.sentCallbackFn.push(this.packetsFn); } else { this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn); } this.packetsFn = []; this.transport.send(wbuf); this.emit(‘drain‘); this.server.emit(‘drain‘, this); } };
触发各种绑定的事件,最后调用this.transport.send(wbuf);
Polling.prototype.send = function (packets) { if (this.shouldClose) { debug(‘appending close packet to payload‘); packets.push({ type: ‘close‘ }); this.shouldClose(); this.shouldClose = null; } var self = this; parser.encodePayload(packets, this.supportsBinary, function(data) { self.write(data); }); };
对buffer解析后,write()
Polling.prototype.write = function (data) { debug(‘writing "%s"‘, data); this.doWrite(data); this.req.cleanup(); this.writable = false; };
doWrite()由子类实现。
XHR.prototype.doWrite = function(data){ // explicit UTF-8 is required for pages not served under utf var isString = typeof data == ‘string‘; var contentType = isString ? ‘text/plain; charset=UTF-8‘ : ‘application/octet-stream‘; var contentLength = ‘‘ + (isString ? Buffer.byteLength(data) : data.length); var headers = { ‘Content-Type‘: contentType, ‘Content-Length‘: contentLength }; // prevent XSS warnings on IE // https://github.com/LearnBoost/socket.io/pull/1333 var ua = this.req.headers[‘user-agent‘]; if (ua && (~ua.indexOf(‘;MSIE‘) || ~ua.indexOf(‘Trident/‘))) { headers[‘X-XSS-Protection‘] = ‘0‘; } this.res.writeHead(200, this.headers(this.req, headers)); this.res.end(data); };
标签:des style blog http io ar color os 使用
原文地址:http://www.cnblogs.com/TheViper/p/4163884.html