码迷,mamicode.com
首页 > 其他好文 > 详细

redis实现消息队列

时间:2015-04-01 19:25:50      阅读:174      评论:0      收藏:0      [点我收藏+]

标签:

/**
* @author sam
* @date 2015/04/01
* @desc a message queue
*/
var EventEmitter = require(‘events‘).EventEmitter;

var util = require(‘util‘);

/**
* @cons
* @param {client} a redis client
* @param {priority:2} a number for set priority level, default 2 mean there have two priority level
* */
var RedisMessageQueue = function(client, name, priority){

if(name === null){
throw new Error(‘message queue name is null‘);
}

this.name = name;

priority = priority || 3; //系统默认具有三个优先级的

this.priority = priority < 1 ? 1 : priority; //处理小于等于0的情况

this.__client = client;

this.__queueNames = [];

this.__init();
};

RedisMessageQueue.EVENTS = {
ERROR : ‘error‘
};

util.inherits(RedisMessageQueue,EventEmitter);


RedisMessageQueue.prototype.__init = function(){
var self = this;
//生成优先级队列名称,*_0,*_1 数字小的优先级高
for(var i = 0 ; i < this.priority; i++){
var messageQueueName = ‘message_queue_‘ + this.name + ‘_‘ + i;

self.__queueNames[i] = messageQueueName;
//异步删除
//this.__client.del(messageQueueName);
}

this.__client.on(‘error‘,function(err){
self.emit(RedisMessageQueue.EVENTS.ERROR,err);
});

};


RedisMessageQueue.prototype.produce = function(message, level, callback){
var self = this;

if(message == null){
callback && callback(null);
return;
}

var str = null;

var priority = self.__queueNames.length;

//默认写到最低优先级队列里面去
level = level || priority - 1;

level = level < priority ? level : priority - 1;

if(typeof message === ‘string‘){
str = message;
}else{
try{
str = JSON.stringify(message);
}catch (ex){
callback && callback(ex);
return;
}
}

self.__client.lpush(this.__queueNames[level],str,callback);

};

RedisMessageQueue.prototype.consume = function(cb){

var self = this; var queueNames = self.__queueNames; var time = 0; //表示客户端永远等待 queueNames.push(time); var client = self.__client; client.brpop(queueNames,function(err,res){ if(err){ cb && cb(err); return; } if(result === null){ cb && cb(null,null); return; } var result = res[1]; try{ cb && cb(null, JSON.parse(result)); return; }catch (ex){ if(typeof result === ‘string‘){ cb && cb(null, result); return; } cb && cb(ex); } });};exports.RedisMessageQueue = RedisMessageQueue;

redis实现消息队列

标签:

原文地址:http://www.cnblogs.com/inode/p/4384573.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!