内容简介:在消息队列使用场景中,有时需要同时下发多条消息,但现在的消息队列比如kafka只支持单条消息的事务保证,不能保证多条消息,今天说的这个方案就时kafka内部的一个子项目中基于2PC和延迟更新来实现分布式事务2PC俗称两阶段提交,通过将一个操作分为两个阶段:准备阶段和提交阶段来尽可能保证操作的原子执行(实际上不可能,大家有个概念先)延迟更新其实是一个很常用的技术手段,简单来说,当某个操作条件不满足时,通过一定手段将数据暂存,等条件满足时在进行执行
在消息队列使用场景中,有时需要同时下发多条消息,但现在的消息队列比如kafka只支持单条消息的事务保证,不能保证多条消息,今天说的这个方案就时kafka内部的一个子项目中基于2PC和延迟更新来实现分布式事务
2PC
2PC俗称两阶段提交,通过将一个操作分为两个阶段:准备阶段和提交阶段来尽可能保证操作的原子执行(实际上不可能,大家有个概念先)
延迟更新
延迟更新其实是一个很常用的技术手段,简单来说,当某个操作条件不满足时,通过一定手段将数据暂存,等条件满足时在进行执行
基于2PC和延迟队列的分布式事务实现
系统架构
实现也蛮简单的, 在原来的业务消息之后再添加一条事务消息(事务消息可以通过类似唯一ID来关联到之前提交的消息), worker未消费到事物提交的消息,就会一直将消息放在本地延迟存储中,只有当接收到事物提交消息,才会进行业务逻辑处理
业务流程
生产者
- 逐条发送业务消息组
- 发送事务提交消息
消费者
- 消费消息队列,将业务消息存放本地延迟存储
- 接收提交事务消息,从本地延迟存储获取所有数据,然后从延迟存储中删除该消息
代码实现
核心组件
MemoryQuue: 用于模拟消息队列,接收事件分发事件 Worker: 模拟具体业务服务,接收消息,存入本地延迟更新存储,或者提交事务触发业务回调
Event与EventListener
Event: 用于标识事件,用户将业务数据封装成事件存入到MemoryQueue中 EventListener: 事件回调接口,用于MemoryQueue接收到数据后的回调 事件在发送的时候,需要通过一个前缀来进行事件类型标识,这里有三种TaskPrefix、CommitTaskPrefix、ClearTaskPrefix
const ( // TaskPrefix 任务key前缀 TaskPrefix string = "task-" // CommitTaskPrefix 提交任务key前缀 CommitTaskPrefix string = "commit-" // ClearTaskPrefix 清除任务 ClearTaskPrefix string = "clear-" ) // Event 事件类型 type Event struct { Key string Name string Value interface{} } // EventListener 用于接收消息回调 type EventListener interface { onEvent(event *Event) } 复制代码
MemoryQueue
MemoryQueue内存消息队列,通过Push接口接收用户数据,通过AddListener来注册EventListener, 同时内部通过poll来从chan event取出数据分发给所有的Listener
// MemoryQueue 内存消息队列 type MemoryQueue struct { done chan struct{} queue chan Event listeners []EventListener wg sync.WaitGroup } // Push 添加数据 func (mq *MemoryQueue) Push(eventType, name string, value interface{}) { mq.queue <- Event{Key: eventType + name, Name: name, Value: value} mq.wg.Add(1) } // AddListener 添加监听器 func (mq *MemoryQueue) AddListener(listener EventListener) bool { for _, item := range mq.listeners { if item == listener { return false } } mq.listeners = append(mq.listeners, listener) return true } // Notify 分发消息 func (mq *MemoryQueue) Notify(event *Event) { defer mq.wg.Done() for _, listener := range mq.listeners { listener.onEvent(event) } } func (mq *MemoryQueue) poll() { for { select { case <-mq.done: break case event := <-mq.queue: mq.Notify(&event) } } } // Start 启动内存队列 func (mq *MemoryQueue) Start() { go mq.poll() } // Stop 停止内存队列 func (mq *MemoryQueue) Stop() { mq.wg.Wait() close(mq.done) } 复制代码
Worker
Worker接收MemoryQueue里面的数据,然后在本地根据不同类型来进行对应事件事件类型处理, 主要是通过事件的前缀来进行对应事件回调函数的选择
// Worker 工作进程 type Worker struct { name string deferredTaskUpdates map[string][]Task onCommit ConfigUpdateCallback } func (w *Worker) onEvent(event *Event) { switch { // 获取任务事件 case strings.Contains(event.Key, TaskPrefix): w.onTaskEvent(event) // 清除本地延迟队列里面的任务 case strings.Contains(event.Key, ClearTaskPrefix): w.onTaskClear(event) // 获取commit事件 case strings.Contains(event.Key, CommitTaskPrefix): w.onTaskCommit(event) } } 复制代码
事件处理任务
事件处理任务主要分为:onTaskClear(从本地清楚该数据)、onTaskEvent(数据存储本地延迟存储进行暂存)、onTaskCommit(事务提交)
func (w *Worker) onTaskClear(event *Event) { task, err := event.Value.(Task) if !err { // log return } _, found := w.deferredTaskUpdates[task.Group] if !found { return } delete(w.deferredTaskUpdates, task.Group) // 还可以继续停止本地已经启动的任务 } // onTaskCommit 接收任务提交, 从延迟队列中取出数据然后进行业务逻辑处理 func (w *Worker) onTaskCommit(event *Event) { // 获取之前本地接收的所有任务 tasks, found := w.deferredTaskUpdates[event.Name] if !found { return } // 获取配置 config := w.getTasksConfig(tasks) if w.onCommit != nil { w.onCommit(config) } delete(w.deferredTaskUpdates, event.Name) } // onTaskEvent 接收任务数据,此时需要丢到本地暂存不能进行应用 func (w *Worker) onTaskEvent(event *Event) { task, err := event.Value.(Task) if !err { // log return } // 保存任务到延迟更新map configs, found := w.deferredTaskUpdates[task.Group] if !found { configs = make([]Task, 0) } configs = append(configs, task) w.deferredTaskUpdates[task.Group] = configs } // getTasksConfig 获取task任务列表 func (w *Worker) getTasksConfig(tasks []Task) map[string]string { config := make(map[string]string) for _, t := range tasks { config = t.updateConfig(config) } return config } 复制代码
主流程
unc main() { // 生成一个内存队列启动 queue := NewMemoryQueue(10) queue.Start() // 生成一个worker name := "test" worker := NewWorker(name, func(data map[string]string) { for key, value := range data { println("worker get task key: " + key + " value: " + value) } }) // 注册到队列中 queue.AddListener(worker) taskName := "test" // events 发送的任务事件 configs := []map[string]string{ map[string]string{"task1": "SendEmail", "params1": "Hello world"}, map[string]string{"task2": "SendMQ", "params2": "Hello world"}, } // 分发任务 queue.Push(ClearTaskPrefix, taskName, nil) for _, conf := range configs { queue.Push(TaskPrefix, taskName, Task{Name: taskName, Group: taskName, Config: conf}) } queue.Push(CommitTaskPrefix, taskName, nil) // 停止队列 queue.Stop() } 复制代码
输出
# go run main.go worker get task key: params1 value: Hello world worker get task key: task1 value: SendEmail worker get task key: params2 value: Hello world worker get task key: task2 value: SendMQ 复制代码
总结
在分布式环境中,很多时候并不需要使用CP模型,更多时候是满足最终一致性即可
基于2PC和延迟队列的这种设计,主要是依赖于事件驱动的架构
在kafka connect中, 每次节点变化都会触发一次任务的重分配,所以延迟存储直接用的就是内存中的HashMap, 因为即使分配消息的主节点挂了,那就再触发一次事件,直接将HashMap里面的数据清掉,进行下一次事务即可,并不需要保证延迟存储里面的数据不丢,
所以方案因环境、需求不同,可以做一些取舍,没必要什么东西都去加一个CP模型的中间件进来,当然其实那样更简单
未完待续!
更多文章可以访问www.sreguide.com/
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 什么是分布式队列?
- 分布式之消息队列复习精讲
- 分布式任务队列Celery使用说明
- 实力分享,聚焦分布式高可用消息队列
- 基于消息队列的分布式事务解决方案
- Golang 分布式异步任务队列 Machinery 教程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Text Processing in Python
David Mertz / Addison-Wesley Professional / 2003-6-12 / USD 54.99
Text Processing in Python describes techniques for manipulation of text using the Python programming language. At the broadest level, text processing is simply taking textual information and doing som......一起来看看 《Text Processing in Python》 这本书的介绍吧!