基于2PC和延迟更新完成分布式消息队列多条事务Golang版本

栏目: Go · 发布时间: 5年前

内容简介:在消息队列使用场景中,有时需要同时下发多条消息,但现在的消息队列比如kafka只支持单条消息的事务保证,不能保证多条消息,今天说的这个方案就时kafka内部的一个子项目中基于2PC和延迟更新来实现分布式事务2PC俗称两阶段提交,通过将一个操作分为两个阶段:准备阶段和提交阶段来尽可能保证操作的原子执行(实际上不可能,大家有个概念先)延迟更新其实是一个很常用的技术手段,简单来说,当某个操作条件不满足时,通过一定手段将数据暂存,等条件满足时在进行执行
基于2PC和延迟更新完成分布式消息队列多条事务Golang版本

在消息队列使用场景中,有时需要同时下发多条消息,但现在的消息队列比如kafka只支持单条消息的事务保证,不能保证多条消息,今天说的这个方案就时kafka内部的一个子项目中基于2PC和延迟更新来实现分布式事务

2PC

基于2PC和延迟更新完成分布式消息队列多条事务Golang版本

2PC俗称两阶段提交,通过将一个操作分为两个阶段:准备阶段和提交阶段来尽可能保证操作的原子执行(实际上不可能,大家有个概念先)

延迟更新

基于2PC和延迟更新完成分布式消息队列多条事务Golang版本

延迟更新其实是一个很常用的技术手段,简单来说,当某个操作条件不满足时,通过一定手段将数据暂存,等条件满足时在进行执行

基于2PC和延迟队列的分布式事务实现

系统架构

基于2PC和延迟更新完成分布式消息队列多条事务Golang版本

实现也蛮简单的, 在原来的业务消息之后再添加一条事务消息(事务消息可以通过类似唯一ID来关联到之前提交的消息), worker未消费到事物提交的消息,就会一直将消息放在本地延迟存储中,只有当接收到事物提交消息,才会进行业务逻辑处理

业务流程

生产者

  1. 逐条发送业务消息组
  2. 发送事务提交消息

消费者

  1. 消费消息队列,将业务消息存放本地延迟存储
  2. 接收提交事务消息,从本地延迟存储获取所有数据,然后从延迟存储中删除该消息

代码实现

核心组件

基于2PC和延迟更新完成分布式消息队列多条事务Golang版本

MemoryQuue: 用于模拟消息队列,接收事件分发事件 Worker: 模拟具体业务服务,接收消息,存入本地延迟更新存储,或者提交事务触发业务回调

Event与EventListener

Event: 用于标识事件,用户将业务数据封装成事件存入到MemoryQueue中 EventListener: 事件回调接口,用于MemoryQueue接收到数据后的回调 事件在发送的时候,需要通过一个前缀来进行事件类型标识,这里有三种TaskPrefix、CommitTaskPrefix、ClearTaskPrefix

const (
	// TaskPrefix 任务key前缀
	TaskPrefix string = "task-"
	// CommitTaskPrefix 提交任务key前缀
	CommitTaskPrefix string = "commit-"
	// ClearTaskPrefix 清除任务
	ClearTaskPrefix string = "clear-"
)

// Event 事件类型
type Event struct {
	Key   string
	Name  string
	Value interface{}
}

// EventListener 用于接收消息回调
type EventListener interface {
	onEvent(event *Event)
}
复制代码

MemoryQueue

MemoryQueue内存消息队列,通过Push接口接收用户数据,通过AddListener来注册EventListener, 同时内部通过poll来从chan event取出数据分发给所有的Listener

// MemoryQueue 内存消息队列
type MemoryQueue struct {
	done      chan struct{}
	queue     chan Event
	listeners []EventListener
	wg        sync.WaitGroup
}

// Push 添加数据
func (mq *MemoryQueue) Push(eventType, name string, value interface{}) {
	mq.queue <- Event{Key: eventType + name, Name: name, Value: value}
	mq.wg.Add(1)
}

// AddListener 添加监听器
func (mq *MemoryQueue) AddListener(listener EventListener) bool {
	for _, item := range mq.listeners {
		if item == listener {
			return false
		}
	}
	mq.listeners = append(mq.listeners, listener)
	return true
}

// Notify 分发消息
func (mq *MemoryQueue) Notify(event *Event) {
	defer mq.wg.Done()
	for _, listener := range mq.listeners {
		listener.onEvent(event)
	}
}

func (mq *MemoryQueue) poll() {
	for {
		select {
		case <-mq.done:
			break
		case event := <-mq.queue:
			mq.Notify(&event)
		}
	}
}

// Start 启动内存队列
func (mq *MemoryQueue) Start() {
	go mq.poll()
}

// Stop 停止内存队列
func (mq *MemoryQueue) Stop() {
	mq.wg.Wait()
	close(mq.done)
}
复制代码

Worker

Worker接收MemoryQueue里面的数据,然后在本地根据不同类型来进行对应事件事件类型处理, 主要是通过事件的前缀来进行对应事件回调函数的选择

// Worker 工作进程
type Worker struct {
	name                string
	deferredTaskUpdates map[string][]Task
	onCommit            ConfigUpdateCallback
}

func (w *Worker) onEvent(event *Event) {
	switch {
	// 获取任务事件
	case strings.Contains(event.Key, TaskPrefix):
		w.onTaskEvent(event)
		// 清除本地延迟队列里面的任务
	case strings.Contains(event.Key, ClearTaskPrefix):
		w.onTaskClear(event)
		// 获取commit事件
	case strings.Contains(event.Key, CommitTaskPrefix):
		w.onTaskCommit(event)
	}
}
复制代码

事件处理任务

事件处理任务主要分为:onTaskClear(从本地清楚该数据)、onTaskEvent(数据存储本地延迟存储进行暂存)、onTaskCommit(事务提交)

func (w *Worker) onTaskClear(event *Event) {
	task, err := event.Value.(Task)
	if !err {
		// log
		return
	}
	_, found := w.deferredTaskUpdates[task.Group]
	if !found {
		return
	}
	delete(w.deferredTaskUpdates, task.Group)
	// 还可以继续停止本地已经启动的任务
}

// onTaskCommit 接收任务提交, 从延迟队列中取出数据然后进行业务逻辑处理
func (w *Worker) onTaskCommit(event *Event) {
	// 获取之前本地接收的所有任务
	tasks, found := w.deferredTaskUpdates[event.Name]
	if !found {
		return
	}

	// 获取配置
	config := w.getTasksConfig(tasks)
	if w.onCommit != nil {
		w.onCommit(config)
	}
	delete(w.deferredTaskUpdates, event.Name)
}

// onTaskEvent 接收任务数据,此时需要丢到本地暂存不能进行应用
func (w *Worker) onTaskEvent(event *Event) {
	task, err := event.Value.(Task)
	if !err {
		// log
		return
	}

	// 保存任务到延迟更新map
	configs, found := w.deferredTaskUpdates[task.Group]
	if !found {
		configs = make([]Task, 0)
	}
	configs = append(configs, task)
	w.deferredTaskUpdates[task.Group] = configs
}

// getTasksConfig 获取task任务列表
func (w *Worker) getTasksConfig(tasks []Task) map[string]string {
	config := make(map[string]string)
	for _, t := range tasks {
		config = t.updateConfig(config)
	}
	return config
}
复制代码

主流程

unc main() {

	// 生成一个内存队列启动
	queue := NewMemoryQueue(10)
	queue.Start()

	// 生成一个worker
	name := "test"
	worker := NewWorker(name, func(data map[string]string) {
		for key, value := range data {
			println("worker get task key: " + key + " value: " + value)
		}
	})
	// 注册到队列中
	queue.AddListener(worker)

	taskName := "test"
	// events 发送的任务事件
	configs := []map[string]string{
		map[string]string{"task1": "SendEmail", "params1": "Hello world"},
		map[string]string{"task2": "SendMQ", "params2": "Hello world"},
	}

	// 分发任务
	queue.Push(ClearTaskPrefix, taskName, nil)
	for _, conf := range configs {
		queue.Push(TaskPrefix, taskName, Task{Name: taskName, Group: taskName, Config: conf})
	}
	queue.Push(CommitTaskPrefix, taskName, nil)
	// 停止队列
	queue.Stop()
}
复制代码

输出

# go run main.go
worker get task key: params1 value: Hello world
worker get task key: task1 value: SendEmail
worker get task key: params2 value: Hello world
worker get task key: task2 value: SendMQ
复制代码

总结

在分布式环境中,很多时候并不需要使用CP模型,更多时候是满足最终一致性即可

基于2PC和延迟队列的这种设计,主要是依赖于事件驱动的架构

在kafka connect中, 每次节点变化都会触发一次任务的重分配,所以延迟存储直接用的就是内存中的HashMap, 因为即使分配消息的主节点挂了,那就再触发一次事件,直接将HashMap里面的数据清掉,进行下一次事务即可,并不需要保证延迟存储里面的数据不丢,

所以方案因环境、需求不同,可以做一些取舍,没必要什么东西都去加一个CP模型的中间件进来,当然其实那样更简单

未完待续!

更多文章可以访问www.sreguide.com/


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Text Processing in Python

Text Processing in Python

David Mertz / Addison-Wesley Professional / 2003-6-12 / USD 54.99

Text Processing in Python describes techniques for manipulation of text using the Python programming language. At the broadest level, text processing is simply taking textual information and doing som......一起来看看 《Text Processing in Python》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具