rabbitmq客户端自动重连

栏目: Redis · 发布时间: 6年前

内容简介:开始找解决方案:上述的解决方案是在建立连接之后对连接添加或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连
编程rookie, 如有错误请指出 ☞: 253065903@qq.com

RabbitMQ Node.js 客户端( AMQP 0-9-1 V0.5.2 )自动重连

重启策略

开始找解决方案:

  1. 通过查看AMQP的源码,发现没有reconnect的选项

  2. 然后上GitHub上看有没有人提出类似的问题 github repo ,通过输入 reconnect 搜索issue区找到题为 Support Auto-reconnectionissue ,发现这个问题是早在 2013 年就提出来的,可是现在还是 open

    的状态!

  3. 在这个Issue区发现有一个解决方案还是可以实践一下的:

    function connectRMQ() {
      amqp.connect(config.rabbitmq.URI).then(function(conn) {
        conn.on('close', function() {
          console.error('Lost connection to RMQ.  Reconnecting in 60 seconds...');
          return setTimeout(connectRMQ, 60 * 1000);
        });
        return conn.createChannel().then(function(ch) {
            var ok = ch.assertQueue(config.rabbitmq.queue, {durable: true});
            ok = ok.then(function() {
                ch.prefetch(1);
                ch.consume(config.rabbitmq.queue, doWork, {noAck: false});
            });
            return ok.then(function() {
                console.log(" [*] Waiting in %s.", config.rabbitmq.queue);
            });
    
            function doWork(msg) {
                var body = msg.content.toString();
                console.log(" [x] Received %s", body);
                setTimeout(function() {
                    ch.ack(msg);
                }, config.rabbitmq.timeout);
            }
        });
      }).then(null, function() {
         setTimeout(connectRMQ, 10 * 1000);
         return console.log('connection failed');
      });
    }
    
    connectRMQ();
    

上述的解决方案是在建立连接之后对连接添加 close 的监听事件,当 close 事件触发,

或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连

  1. 还有一种简单粗暴的方法,监听 closeerror 事件, 有错误就抛出来,然后依靠外部的守护程序将此客户端重启

实践

按照Issue区发现的解决方案进行实践,修改之前的代码

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')
const REDIS_CONFIG = require('./conf/redis')
const Utils = require('./lib/Utils')
const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)
var amqp = require('amqplib')
var ex = MQ_CONFIG.ex
var patten = MQ_CONFIG.routing_key
var exType = MQ_CONFIG.ex_type
var q = MQ_CONFIG.q || 'signals'
var cnt = 0

function connect() {
  amqp
    .connect(MQ_CONFIG.url)
    .then(conn => {
      conn.on('close', e => {
        reconnect(e)
      })
      return conn
    })
    .then(conn => {
      cnt = 0
      log('connect RMQ OK')
      console.log(' [*] Waiting for signals. To exit press CTRL+C')
      return conn.createChannel()
    })
    .then(ch => {
      return ch.assertQueue(q, {
        durable: true
      })
        .then(() => {
          return ch.assertExchange(ex, exType, {
            durable: true
          })
        })
        .then(() => {
          return ch.assertQueue(q, {
            durable: true
          })
        })
        .then(() => {
          return ch.bindQueue(q, ex, patten)
        })
        .then(() => {
          return ch.consume(q, (msg) => {
            pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {
              if (err) {
                log(err)
                ch.nack(msg)
              } else {
                if (reply !== 0) {
                  ch.ack(msg)
                } else {
                  ch.ack(msg)
                  saveUnSubscribeMsg(msg.content.toString())
                }
              }
            })
          })
        })
    })
    .catch(e => {
      reconnect(e)
    })
}

function reconnect(e) {
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
        connect()
    }, 10 * 1000)
}
connect()
/**
 * if signals didn't be subscribed, they would be saved to ./data dir
 * @param {string} msg
 */
function saveUnSubscribeMsg(msg) {
  let date = new Date().toLocaleDateString()
  const fs = require('fs')
  const dir = './data'
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir)
  }
  let path = `${dir}/${date}.txt`
  let isExist = fs.existsSync(path)
  if (isExist) {
    fs.appendFileSync(path, msg)
  } else {
    fs.writeFileSync(path, msg)
  }
}

function log(...args) {
  console.log(...args, new Date().toLocaleString())
}

然后进行测试:

通过对MQserver的重启,均正常,然后将MQserver的机器的网断掉测试,发现了close事件并没有监听到,而是报了heartbeat超时的错误,从而程序直接退出了,于是又在代码中加入对error事件的监听:

amqp
  .connect(MQ_CONFIG.url)
  .then(conn => {
    conn.on('error', e => {
      reconnect(e)
    })
    conn.on('close', e => {
      reconnect(e)
    })
    return conn
  })

这下应该不会导致程序退出了吧,然而又引入了新的问题,当重启MQserver时,报了 ECONNECTRET 的错误,两个监听事件都监听到了,所以程序重连了两次,导致一个项目在MQserver上建立了两个连接,当再一次重启MQserver时,建立了四个连接!

这是很严重的错误,然而并不是所有时候两个监听事件都能监听到,比如 heartbeat 超时就只报 error 的错误,所有需要想出一个策略,让程序始终与MQserver之间只有一个连接。

采用声明一个变量,记录是不是正在连接

var isConnecting = false

如果已经在连接了,其他的重连都不做处理

function reconnect(e) {
  if (!isConnecting) {
    isConnecting = true
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
      connect()
    }, 10 * 1000)
  }
}

连接上时将重连的标志设为 false

.then(conn => {
  cnt = 0
  log('connect RMQ OK')
  isConnecting = false

于是乎,完整代码如下:

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')
const REDIS_CONFIG = require('./conf/redis')
const Utils = require('./lib/Utils')
const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)
var amqp = require('amqplib')
var ex = MQ_CONFIG.ex
var patten = MQ_CONFIG.routing_key
var exType = MQ_CONFIG.ex_type
var q = MQ_CONFIG.q || 'signals'
var cnt = 0
var isConnecting = false

function connect() {
  amqp
    .connect(MQ_CONFIG.url)
    .then(conn => {
      conn.on('error', (e) => {
        reconnect(e)
      })
      conn.on('close', e => {
        reconnect(e)
      })
      return conn
    })
    .then(conn => {
      cnt = 0
      log('connect RMQ OK')
      isConnecting = false
      console.log(' [*] Waiting for signals. To exit press CTRL+C')
      return conn.createChannel()
    })
    .then(ch => {
      return ch.assertQueue(q, {
        durable: true
      })
        .then(() => {
          return ch.assertExchange(ex, exType, {
            durable: true
          })
        })
        .then(() => {
          return ch.assertQueue(q, {
            durable: true
          })
        })
        .then(() => {
          return ch.bindQueue(q, ex, patten)
        })
        .then(() => {
          return ch.consume(q, (msg) => {
            pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {
              if (err) {
                log(err)
                ch.nack(msg)
              } else {
                if (reply !== 0) {
                  ch.ack(msg)
                } else {
                  ch.ack(msg)
                  saveUnSubscribeMsg(msg.content.toString())
                }
              }
            })
          })
        })
    })
    .catch(e => {
      isConnecting = false
      reconnect(e)
    })
}

function reconnect(e) {
  if (!isConnecting) {
    isConnecting = true
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
      connect()
    }, 10 * 1000)
  }
}
connect()
/**
 * if signals didn't be subscribed, they would be saved to ./data dir
 * @param {string} msg
 */
function saveUnSubscribeMsg(msg) {
  let date = new Date().toLocaleDateString()
  const fs = require('fs')
  const dir = './data'
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir)
  }
  let path = `${dir}/${date}.txt`
  let isExist = fs.existsSync(path)
  if (isExist) {
    fs.appendFileSync(path, msg)
  } else {
    fs.writeFileSync(path, msg)
  }
}

function log(...args) {
  console.log(...args, new Date().toLocaleString())
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

高性能网站建设指南(第二版)

高性能网站建设指南(第二版)

Steve Souders / 刘彦博 / 电子工业出版社 / 2015-5 / 55.00元

《高性能网站建设指南:前端工程师技能精髓》结合Web 2.0以来Web开发领域的最新形势和特点,介绍了网站性能问题的现状、产生的原因,以及改善或解决性能问题的原则、技术技巧和最佳实践。重点关注网页的行为特征,阐释优化Ajax、CSS、JavaScript、Flash和图片处理等要素的技术,全面涵盖浏览器端性能问题的方方面面。在《高性能网站建设指南:前端工程师技能精髓》中,作者给出了14条具体的优化......一起来看看 《高性能网站建设指南(第二版)》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码