RabbitMQ三四事

栏目: 后端 · 发布时间: 6年前

内容简介:对于非常健壮稳定的后台系统,我们必须得考虑到各种宕机的情况:物理宕机,应用自身出错崩溃等,而这个时候我们的应用需要做到重启后数据依旧不丢失,这个问题就是在RabbitMQ中,如果要保证消息发送到需要注意的一点是,持久化会造成

数据的持久化

对于非常健壮稳定的后台系统,我们必须得考虑到各种宕机的情况:物理宕机,应用自身出错崩溃等,而这个时候我们的应用需要做到重启后数据依旧不丢失,这个问题就是 数据持久化 ,也就是说数据持久化到了磁盘。

在RabbitMQ中,如果要保证消息发送到 broker ,我们首先需要做到三点

  1. 持久化的 exchange (交换器):声明时开启 durable 选项
  2. 持久化的 queue (队列):声明时开启 durable 选项
  3. 持久化的 messagedelivery_mode 设置为2(php,python之类的库,2可以换成更友好的常量),在node的 amqp.node 库中是设置 persistenttrue

需要注意的一点是,持久化会造成 性能损耗 (写磁盘操作),但为了保证生产环境的数据 一致性 ,我们必须这么做。

发送消息的confirm机制

其实光光做到以上三点,数据依旧有丢失的可能,因为在客户端成功调用api存入消息之后,RabbitMQ还需要一段时间(很短,但不可忽略)才能落盘,RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,而在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。

为了解决以上问题,我们需要使用RabbitMQ的 生产者确认模式

为了开启确认模式,需要生产者将channel设置成confirm模式,一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的, 那么确认消息会在将消息写入磁盘之后发出 ,broker回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息 (来自参考1)

简单confirm示例

const QUEUE_NAME = 'notify_queue'
const EXCHANGE_NAME = 'notify_queue_exchange'
const config = require("./config")
const amqp = require('amqplib')

async function getMQConnection() {
    return await amqp.connect({
        protocol: 'amqp',
        hostname: config.MQ.host,
        port: config.MQ.port,
        username: config.MQ.user,
        password: config.MQ.pass,
        locale: 'en_US',
        frameMax: 0,
        heartbeat: 2, // 心跳
        vhost: config.MQ.vhost,
    })
}

async function run(rmqConn, msgArr) {
    try {
        const ch = await rmqConn.createConfirmChannel() // 开启confirm
        await ch.assertExchange(EXCHANGE_NAME, 'direct', { durable: true, autoDelete: false }) // 不存在exchange就新建exchange
        // queue name当routing key
        msgArr.forEach(str => {
            ch.publish(EXCHANGE_NAME, QUEUE_NAME, Buffer.from(str), { persistent: true })  // 没有绑定队列的情况下,也可以发送
        })
        await ch.waitForConfirms()
        console.log('发送批量数据成功')
        await ch.close()
    } catch(err) {
        // do something with err
        console.log('发送批量数据失败:' + err.message)
    }
}

async function testSendBatchMsg() {
    const conn = await getMQConnection()
    await run(conn, [
        'cat',
        'dog',
        'pig',
        'mouse',
        'mouse',
        'penguin'
    ])
    await conn.close()
}
testSendBatchMsg()

说明


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Growth Hacker Marketing

Growth Hacker Marketing

Ryan Holiday / Portfolio / 2013-9-3 / USD 10.31

Dropbox, Facebook, AirBnb, Twitter. A new generation of multibillion dollar brands built without spending a dime on “traditional marketing.” No press releases, no PR firms, and no billboards in Times ......一起来看看 《Growth Hacker Marketing》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

MD5 加密
MD5 加密

MD5 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具