rabbitmq客户端自动重连

栏目: Redis · 发布时间: 6年前

内容简介:开始找解决方案:上述的解决方案是在建立连接之后对连接添加或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连
编程rookie, 如有错误请指出 ☞: 253065903@qq.com

RabbitMQ Node.js 客户端( AMQP 0-9-1 V0.5.2 )自动重连

重启策略

开始找解决方案:

  1. 通过查看AMQP的源码,发现没有reconnect的选项

  2. 然后上GitHub上看有没有人提出类似的问题 github repo ,通过输入 reconnect 搜索issue区找到题为 Support Auto-reconnectionissue ,发现这个问题是早在 2013 年就提出来的,可是现在还是 open

    的状态!

  3. 在这个Issue区发现有一个解决方案还是可以实践一下的:

    function connectRMQ() {
      amqp.connect(config.rabbitmq.URI).then(function(conn) {
        conn.on('close', function() {
          console.error('Lost connection to RMQ.  Reconnecting in 60 seconds...');
          return setTimeout(connectRMQ, 60 * 1000);
        });
        return conn.createChannel().then(function(ch) {
            var ok = ch.assertQueue(config.rabbitmq.queue, {durable: true});
            ok = ok.then(function() {
                ch.prefetch(1);
                ch.consume(config.rabbitmq.queue, doWork, {noAck: false});
            });
            return ok.then(function() {
                console.log(" [*] Waiting in %s.", config.rabbitmq.queue);
            });
    
            function doWork(msg) {
                var body = msg.content.toString();
                console.log(" [x] Received %s", body);
                setTimeout(function() {
                    ch.ack(msg);
                }, config.rabbitmq.timeout);
            }
        });
      }).then(null, function() {
         setTimeout(connectRMQ, 10 * 1000);
         return console.log('connection failed');
      });
    }
    
    connectRMQ();
    

上述的解决方案是在建立连接之后对连接添加 close 的监听事件,当 close 事件触发,

或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连

  1. 还有一种简单粗暴的方法,监听 closeerror 事件, 有错误就抛出来,然后依靠外部的守护程序将此客户端重启

实践

按照Issue区发现的解决方案进行实践,修改之前的代码

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')
const REDIS_CONFIG = require('./conf/redis')
const Utils = require('./lib/Utils')
const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)
var amqp = require('amqplib')
var ex = MQ_CONFIG.ex
var patten = MQ_CONFIG.routing_key
var exType = MQ_CONFIG.ex_type
var q = MQ_CONFIG.q || 'signals'
var cnt = 0

function connect() {
  amqp
    .connect(MQ_CONFIG.url)
    .then(conn => {
      conn.on('close', e => {
        reconnect(e)
      })
      return conn
    })
    .then(conn => {
      cnt = 0
      log('connect RMQ OK')
      console.log(' [*] Waiting for signals. To exit press CTRL+C')
      return conn.createChannel()
    })
    .then(ch => {
      return ch.assertQueue(q, {
        durable: true
      })
        .then(() => {
          return ch.assertExchange(ex, exType, {
            durable: true
          })
        })
        .then(() => {
          return ch.assertQueue(q, {
            durable: true
          })
        })
        .then(() => {
          return ch.bindQueue(q, ex, patten)
        })
        .then(() => {
          return ch.consume(q, (msg) => {
            pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {
              if (err) {
                log(err)
                ch.nack(msg)
              } else {
                if (reply !== 0) {
                  ch.ack(msg)
                } else {
                  ch.ack(msg)
                  saveUnSubscribeMsg(msg.content.toString())
                }
              }
            })
          })
        })
    })
    .catch(e => {
      reconnect(e)
    })
}

function reconnect(e) {
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
        connect()
    }, 10 * 1000)
}
connect()
/**
 * if signals didn't be subscribed, they would be saved to ./data dir
 * @param {string} msg
 */
function saveUnSubscribeMsg(msg) {
  let date = new Date().toLocaleDateString()
  const fs = require('fs')
  const dir = './data'
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir)
  }
  let path = `${dir}/${date}.txt`
  let isExist = fs.existsSync(path)
  if (isExist) {
    fs.appendFileSync(path, msg)
  } else {
    fs.writeFileSync(path, msg)
  }
}

function log(...args) {
  console.log(...args, new Date().toLocaleString())
}

然后进行测试:

通过对MQserver的重启,均正常,然后将MQserver的机器的网断掉测试,发现了close事件并没有监听到,而是报了heartbeat超时的错误,从而程序直接退出了,于是又在代码中加入对error事件的监听:

amqp
  .connect(MQ_CONFIG.url)
  .then(conn => {
    conn.on('error', e => {
      reconnect(e)
    })
    conn.on('close', e => {
      reconnect(e)
    })
    return conn
  })

这下应该不会导致程序退出了吧,然而又引入了新的问题,当重启MQserver时,报了 ECONNECTRET 的错误,两个监听事件都监听到了,所以程序重连了两次,导致一个项目在MQserver上建立了两个连接,当再一次重启MQserver时,建立了四个连接!

这是很严重的错误,然而并不是所有时候两个监听事件都能监听到,比如 heartbeat 超时就只报 error 的错误,所有需要想出一个策略,让程序始终与MQserver之间只有一个连接。

采用声明一个变量,记录是不是正在连接

var isConnecting = false

如果已经在连接了,其他的重连都不做处理

function reconnect(e) {
  if (!isConnecting) {
    isConnecting = true
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
      connect()
    }, 10 * 1000)
  }
}

连接上时将重连的标志设为 false

.then(conn => {
  cnt = 0
  log('connect RMQ OK')
  isConnecting = false

于是乎,完整代码如下:

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')
const REDIS_CONFIG = require('./conf/redis')
const Utils = require('./lib/Utils')
const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)
var amqp = require('amqplib')
var ex = MQ_CONFIG.ex
var patten = MQ_CONFIG.routing_key
var exType = MQ_CONFIG.ex_type
var q = MQ_CONFIG.q || 'signals'
var cnt = 0
var isConnecting = false

function connect() {
  amqp
    .connect(MQ_CONFIG.url)
    .then(conn => {
      conn.on('error', (e) => {
        reconnect(e)
      })
      conn.on('close', e => {
        reconnect(e)
      })
      return conn
    })
    .then(conn => {
      cnt = 0
      log('connect RMQ OK')
      isConnecting = false
      console.log(' [*] Waiting for signals. To exit press CTRL+C')
      return conn.createChannel()
    })
    .then(ch => {
      return ch.assertQueue(q, {
        durable: true
      })
        .then(() => {
          return ch.assertExchange(ex, exType, {
            durable: true
          })
        })
        .then(() => {
          return ch.assertQueue(q, {
            durable: true
          })
        })
        .then(() => {
          return ch.bindQueue(q, ex, patten)
        })
        .then(() => {
          return ch.consume(q, (msg) => {
            pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {
              if (err) {
                log(err)
                ch.nack(msg)
              } else {
                if (reply !== 0) {
                  ch.ack(msg)
                } else {
                  ch.ack(msg)
                  saveUnSubscribeMsg(msg.content.toString())
                }
              }
            })
          })
        })
    })
    .catch(e => {
      isConnecting = false
      reconnect(e)
    })
}

function reconnect(e) {
  if (!isConnecting) {
    isConnecting = true
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
      connect()
    }, 10 * 1000)
  }
}
connect()
/**
 * if signals didn't be subscribed, they would be saved to ./data dir
 * @param {string} msg
 */
function saveUnSubscribeMsg(msg) {
  let date = new Date().toLocaleDateString()
  const fs = require('fs')
  const dir = './data'
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir)
  }
  let path = `${dir}/${date}.txt`
  let isExist = fs.existsSync(path)
  if (isExist) {
    fs.appendFileSync(path, msg)
  } else {
    fs.writeFileSync(path, msg)
  }
}

function log(...args) {
  console.log(...args, new Date().toLocaleString())
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

马云现象的经济学分析:互联网经济的八个关键命题

马云现象的经济学分析:互联网经济的八个关键命题

胡晓鹏 / 上海社会科学院出版社 / 2016-11-1 / CNY 68.00

互联网经济的产生、发展与扩张,在冲击传统经济理论观点的同时,也彰显了自身理论体系的独特内核,并与那种立足于工业经济时代的经典理论发生显著分野。今天看来,“马云”们的成功是中国经济长期“重制造、轻服务,重产能、轻消费,重国有、轻民营”发展逻辑的结果。但互联网经济的发展却不应仅仅止步于商业技巧的翻新,还需要在理论上进行一番审慎的思考。对此,我们不禁要问:互联网经济驱动交易发生的机理是什么?用户基数和诚......一起来看看 《马云现象的经济学分析:互联网经济的八个关键命题》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

SHA 加密
SHA 加密

SHA 加密工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具