RabbitMQ三四事

栏目: 后端 · 发布时间: 5年前

内容简介:对于非常健壮稳定的后台系统,我们必须得考虑到各种宕机的情况:物理宕机,应用自身出错崩溃等,而这个时候我们的应用需要做到重启后数据依旧不丢失,这个问题就是在RabbitMQ中,如果要保证消息发送到需要注意的一点是,持久化会造成

数据的持久化

对于非常健壮稳定的后台系统,我们必须得考虑到各种宕机的情况:物理宕机,应用自身出错崩溃等,而这个时候我们的应用需要做到重启后数据依旧不丢失,这个问题就是 数据持久化 ,也就是说数据持久化到了磁盘。

在RabbitMQ中,如果要保证消息发送到 broker ,我们首先需要做到三点

  1. 持久化的 exchange (交换器):声明时开启 durable 选项
  2. 持久化的 queue (队列):声明时开启 durable 选项
  3. 持久化的 messagedelivery_mode 设置为2(php,python之类的库,2可以换成更友好的常量),在node的 amqp.node 库中是设置 persistenttrue

需要注意的一点是,持久化会造成 性能损耗 (写磁盘操作),但为了保证生产环境的数据 一致性 ,我们必须这么做。

发送消息的confirm机制

其实光光做到以上三点,数据依旧有丢失的可能,因为在客户端成功调用api存入消息之后,RabbitMQ还需要一段时间(很短,但不可忽略)才能落盘,RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,而在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。

为了解决以上问题,我们需要使用RabbitMQ的 生产者确认模式

为了开启确认模式,需要生产者将channel设置成confirm模式,一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的, 那么确认消息会在将消息写入磁盘之后发出 ,broker回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息 (来自参考1)

简单confirm示例

const QUEUE_NAME = 'notify_queue'
const EXCHANGE_NAME = 'notify_queue_exchange'
const config = require("./config")
const amqp = require('amqplib')

async function getMQConnection() {
    return await amqp.connect({
        protocol: 'amqp',
        hostname: config.MQ.host,
        port: config.MQ.port,
        username: config.MQ.user,
        password: config.MQ.pass,
        locale: 'en_US',
        frameMax: 0,
        heartbeat: 2, // 心跳
        vhost: config.MQ.vhost,
    })
}

async function run(rmqConn, msgArr) {
    try {
        const ch = await rmqConn.createConfirmChannel() // 开启confirm
        await ch.assertExchange(EXCHANGE_NAME, 'direct', { durable: true, autoDelete: false }) // 不存在exchange就新建exchange
        // queue name当routing key
        msgArr.forEach(str => {
            ch.publish(EXCHANGE_NAME, QUEUE_NAME, Buffer.from(str), { persistent: true })  // 没有绑定队列的情况下,也可以发送
        })
        await ch.waitForConfirms()
        console.log('发送批量数据成功')
        await ch.close()
    } catch(err) {
        // do something with err
        console.log('发送批量数据失败:' + err.message)
    }
}

async function testSendBatchMsg() {
    const conn = await getMQConnection()
    await run(conn, [
        'cat',
        'dog',
        'pig',
        'mouse',
        'mouse',
        'penguin'
    ])
    await conn.close()
}
testSendBatchMsg()

说明


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

深度探索C++对象模型

深度探索C++对象模型

斯坦利•B.李普曼 (Stanley B. Lippman) / 侯捷 / 电子工业出版社 / 2012-1 / 69.00元

作者Lippman参与设计了全世界第一套C++编译程序cfront,这本书就是一位伟大的C++编译程序设计者向你阐述他如何处理各种explicit(明确出现于C++程序代码中)和implicit(隐藏于程序代码背后)的C++语意。 本书专注于C++面向对象程序设计的底层机制,包括结构式语意、临时性对象的生成、封装、继承,以及虚拟——虚拟函数和虚拟继承。这本书让你知道:一旦你能够了解底层实现模......一起来看看 《深度探索C++对象模型》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码