标签:await 任务 init create lan follow dead ges 类型
死信队列产生几种情况
设置DLX的两个参数:
deadLetterExchange
: 设置DLX,当正常队列的消息成为死信后会被路由到DLX中deadLetterRoutingKey
: 设置DLX指定的路由键注意
:Dead-Letter-Exchange也是一种普通的Exchange
消息的TTL指的是消息的存活时间,RabbitMQ支持消息、队列两种方式设置TTL,分别如下:
消息设置TTL:对消息的设置是在发送时进行TTL设置,通过x-message-ttl
或expiration
字段设置,单位为毫秒,代表消息的过期时间,每条消息的TTL可不同。
队列设置TTL:对队列的设置是在消息入队列时计算,通过 x-expires
设置,队列中的所有消息都有相同的过期时间,当超过了队列的超时设置,消息会自动的清除。
注意
:如果以上两种方式都做了设置,消息的TTL则以两者之中最小的那个为准。
推荐采用 amqplib库,一个Node.js实现的RabbitMQ客户端。
const amqp = require(‘amqplib‘); const log4js = require(‘log4js‘); const logger = log4js.getLogger(); logger.level = ‘info‘; module.exports = { logger, init: () => amqp.connect(‘amqp://122.51.9.11:5672‘).then((connection) => { logger.info(‘rabbitmq connect success‘); return connection; }), };
const random = require(‘string-random‘); const rabbitmq = require(‘./rabbitmq.js‘); const logger = rabbitmq.logger; const sleep = (time) => new Promise((resolve) => setTimeout(resolve, time)); async function producerDLX(connnection) { const testQueue = ‘testQu‘; const testExchange = ‘testEx‘; const testRoutingKey = ‘testRoutingKey‘; const testExchangeDLX = ‘testExDLX‘; const testRoutingKeyDLX = ‘testRoutingKeyDLX‘; const msg = ‘Producer‘; const ch = await connnection.createChannel(); await ch.assertExchange(testExchange, ‘direct‘, { durable: true }); const queueResult = await ch.assertQueue(testQueue, { exclusive: false, messageTtl: 10000, deadLetterExchange: testExchangeDLX, deadLetterRoutingKey: testRoutingKeyDLX, }); await ch.bindQueue(queueResult.queue, testExchange, testRoutingKey); for (let i = 0; i < 5; i++) { await sleep(2000); const cMsg = `${i}:${msg} 消息 =>${random(10)}`; logger.info(cMsg); await ch.publish(testExchange, testRoutingKey, Buffer.from(cMsg)); } await ch.close(); } // 发送消息 rabbitmq.init().then((connection) => producerDLX(connection));
const rabbitmq = require(‘./rabbitmq.js‘); const logger = rabbitmq.logger; /** * 消费一个死信队列 * @param { Object } connnection */ async function consumerDLX(connnection) { const testExchangeDLX = ‘testExDLX‘; const testRoutingKeyDLX = ‘testRoutingKeyDLX‘; const testQueueDLX = ‘testQueueDLX‘; const ch = await connnection.createChannel(); await ch.assertExchange(testExchangeDLX, ‘direct‘, { durable: true }); const queueResult = await ch.assertQueue(testQueueDLX, { exclusive: false, }); await ch.bindQueue(queueResult.queue, testExchangeDLX, testRoutingKeyDLX); await ch.consume( queueResult.queue, (msg) => { logger.info(‘consumer msg:‘, msg.content.toString()); }, { noAck: true } ); } // 消费消息 rabbitmq.init().then((connection) => consumerDLX(connection));
分别执行消费者和生产者,可以看到 producer 在45秒发布了消息,consumer 是在55秒接收到的消息,实现了定时10秒种执行
参考
标签:await 任务 init create lan follow dead ges 类型
原文地址:https://www.cnblogs.com/xiaosongJiang/p/13043711.html