内容简介:开始找解决方案:上述的解决方案是在建立连接之后对连接添加或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连
RabbitMQ
Node.js 客户端( AMQP 0-9-1 V0.5.2
)自动重连
重启策略
开始找解决方案:
-
通过查看AMQP的源码,发现没有reconnect的选项
-
然后上GitHub上看有没有人提出类似的问题 github repo ,通过输入
reconnect
搜索issue区找到题为 Support Auto-reconnection 的 issue ,发现这个问题是早在 2013 年就提出来的,可是现在还是 open的状态!
-
在这个Issue区发现有一个解决方案还是可以实践一下的:
function connectRMQ() { amqp.connect(config.rabbitmq.URI).then(function(conn) { conn.on('close', function() { console.error('Lost connection to RMQ. Reconnecting in 60 seconds...'); return setTimeout(connectRMQ, 60 * 1000); }); return conn.createChannel().then(function(ch) { var ok = ch.assertQueue(config.rabbitmq.queue, {durable: true}); ok = ok.then(function() { ch.prefetch(1); ch.consume(config.rabbitmq.queue, doWork, {noAck: false}); }); return ok.then(function() { console.log(" [*] Waiting in %s.", config.rabbitmq.queue); }); function doWork(msg) { var body = msg.content.toString(); console.log(" [x] Received %s", body); setTimeout(function() { ch.ack(msg); }, config.rabbitmq.timeout); } }); }).then(null, function() { setTimeout(connectRMQ, 10 * 1000); return console.log('connection failed'); }); } connectRMQ();
上述的解决方案是在建立连接之后对连接添加 close
的监听事件,当 close
事件触发,
或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连
-
还有一种简单粗暴的方法,监听
close
、error
事件, 有错误就抛出来,然后依靠外部的守护程序将此客户端重启
实践
按照Issue区发现的解决方案进行实践,修改之前的代码
#!/usr/bin/env node const MQ_CONFIG = require('./conf/rabbitmq') const REDIS_CONFIG = require('./conf/redis') const Utils = require('./lib/Utils') const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url) var amqp = require('amqplib') var ex = MQ_CONFIG.ex var patten = MQ_CONFIG.routing_key var exType = MQ_CONFIG.ex_type var q = MQ_CONFIG.q || 'signals' var cnt = 0 function connect() { amqp .connect(MQ_CONFIG.url) .then(conn => { conn.on('close', e => { reconnect(e) }) return conn }) .then(conn => { cnt = 0 log('connect RMQ OK') console.log(' [*] Waiting for signals. To exit press CTRL+C') return conn.createChannel() }) .then(ch => { return ch.assertQueue(q, { durable: true }) .then(() => { return ch.assertExchange(ex, exType, { durable: true }) }) .then(() => { return ch.assertQueue(q, { durable: true }) }) .then(() => { return ch.bindQueue(q, ex, patten) }) .then(() => { return ch.consume(q, (msg) => { pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) { if (err) { log(err) ch.nack(msg) } else { if (reply !== 0) { ch.ack(msg) } else { ch.ack(msg) saveUnSubscribeMsg(msg.content.toString()) } } }) }) }) }) .catch(e => { reconnect(e) }) } function reconnect(e) { log(e.message) log('lost RMQ connection') cnt++ log(`正在第${cnt}次重新连接RMQ...`) setTimeout(() => { connect() }, 10 * 1000) } connect() /** * if signals didn't be subscribed, they would be saved to ./data dir * @param {string} msg */ function saveUnSubscribeMsg(msg) { let date = new Date().toLocaleDateString() const fs = require('fs') const dir = './data' if (!fs.existsSync(dir)) { fs.mkdirSync(dir) } let path = `${dir}/${date}.txt` let isExist = fs.existsSync(path) if (isExist) { fs.appendFileSync(path, msg) } else { fs.writeFileSync(path, msg) } } function log(...args) { console.log(...args, new Date().toLocaleString()) }
然后进行测试:
通过对MQserver的重启,均正常,然后将MQserver的机器的网断掉测试,发现了close事件并没有监听到,而是报了heartbeat超时的错误,从而程序直接退出了,于是又在代码中加入对error事件的监听:
amqp .connect(MQ_CONFIG.url) .then(conn => { conn.on('error', e => { reconnect(e) }) conn.on('close', e => { reconnect(e) }) return conn })
这下应该不会导致程序退出了吧,然而又引入了新的问题,当重启MQserver时,报了 ECONNECTRET
的错误,两个监听事件都监听到了,所以程序重连了两次,导致一个项目在MQserver上建立了两个连接,当再一次重启MQserver时,建立了四个连接!
这是很严重的错误,然而并不是所有时候两个监听事件都能监听到,比如 heartbeat
超时就只报 error
的错误,所有需要想出一个策略,让程序始终与MQserver之间只有一个连接。
采用声明一个变量,记录是不是正在连接
var isConnecting = false
如果已经在连接了,其他的重连都不做处理
function reconnect(e) { if (!isConnecting) { isConnecting = true log(e.message) log('lost RMQ connection') cnt++ log(`正在第${cnt}次重新连接RMQ...`) setTimeout(() => { connect() }, 10 * 1000) } }
连接上时将重连的标志设为 false
.then(conn => { cnt = 0 log('connect RMQ OK') isConnecting = false
于是乎,完整代码如下:
#!/usr/bin/env node const MQ_CONFIG = require('./conf/rabbitmq') const REDIS_CONFIG = require('./conf/redis') const Utils = require('./lib/Utils') const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url) var amqp = require('amqplib') var ex = MQ_CONFIG.ex var patten = MQ_CONFIG.routing_key var exType = MQ_CONFIG.ex_type var q = MQ_CONFIG.q || 'signals' var cnt = 0 var isConnecting = false function connect() { amqp .connect(MQ_CONFIG.url) .then(conn => { conn.on('error', (e) => { reconnect(e) }) conn.on('close', e => { reconnect(e) }) return conn }) .then(conn => { cnt = 0 log('connect RMQ OK') isConnecting = false console.log(' [*] Waiting for signals. To exit press CTRL+C') return conn.createChannel() }) .then(ch => { return ch.assertQueue(q, { durable: true }) .then(() => { return ch.assertExchange(ex, exType, { durable: true }) }) .then(() => { return ch.assertQueue(q, { durable: true }) }) .then(() => { return ch.bindQueue(q, ex, patten) }) .then(() => { return ch.consume(q, (msg) => { pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) { if (err) { log(err) ch.nack(msg) } else { if (reply !== 0) { ch.ack(msg) } else { ch.ack(msg) saveUnSubscribeMsg(msg.content.toString()) } } }) }) }) }) .catch(e => { isConnecting = false reconnect(e) }) } function reconnect(e) { if (!isConnecting) { isConnecting = true log(e.message) log('lost RMQ connection') cnt++ log(`正在第${cnt}次重新连接RMQ...`) setTimeout(() => { connect() }, 10 * 1000) } } connect() /** * if signals didn't be subscribed, they would be saved to ./data dir * @param {string} msg */ function saveUnSubscribeMsg(msg) { let date = new Date().toLocaleDateString() const fs = require('fs') const dir = './data' if (!fs.existsSync(dir)) { fs.mkdirSync(dir) } let path = `${dir}/${date}.txt` let isExist = fs.existsSync(path) if (isExist) { fs.appendFileSync(path, msg) } else { fs.writeFileSync(path, msg) } } function log(...args) { console.log(...args, new Date().toLocaleString()) }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 支付宝客户端架构解析:iOS 客户端启动性能优化初探
- 自己动手做数据库客户端: BashSQL开源数据库客户端
- 支付宝客户端架构解析:Android 客户端启动速度优化之「垃圾回收」
- 客户端HTTP缓存
- 简析移动客户端安全
- 配置Hadoop集群客户端
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
高性能网站建设指南(第二版)
Steve Souders / 刘彦博 / 电子工业出版社 / 2015-5 / 55.00元
《高性能网站建设指南:前端工程师技能精髓》结合Web 2.0以来Web开发领域的最新形势和特点,介绍了网站性能问题的现状、产生的原因,以及改善或解决性能问题的原则、技术技巧和最佳实践。重点关注网页的行为特征,阐释优化Ajax、CSS、JavaScript、Flash和图片处理等要素的技术,全面涵盖浏览器端性能问题的方方面面。在《高性能网站建设指南:前端工程师技能精髓》中,作者给出了14条具体的优化......一起来看看 《高性能网站建设指南(第二版)》 这本书的介绍吧!