Golang 分布式异步任务队列 Machinery 教程

栏目: Go · 发布时间: 5年前

内容简介:Golang的分布式任务队列还不算多,目前比较成熟的应该就只有这篇文章里我们简略的看一下Machinery怎么用。但是我们首先简单介绍一下异步任务这个概念。如果你熟悉Python中的异步任务框架的话,想必一定听过Celery。异步任务框架是什么呢?异步任务的主要作用是将需要长时间执行 的代码放到一个单独的程序中,例如调用第三方邮件接口,但是这个接口可能非常慢才响应,而你又想确保自己的API及时响应。这个 时候就可以采用异步任务来进行解耦。

Golang的分布式任务队列还不算多,目前比较成熟的应该就只有 Machinery 了。

这篇文章里我们简略的看一下Machinery怎么用。但是我们首先简单介绍一下异步任务这个概念。

如果你熟悉 Python 中的异步任务框架的话,想必一定听过Celery。异步任务框架是什么呢?异步任务的主要作用是将需要长时间执行 的代码放到一个单独的程序中,例如调用第三方邮件接口,但是这个接口可能非常慢才响应,而你又想确保自己的API及时响应。这个 时候就可以采用异步任务来进行解耦。

一般来说,异步任务都由这么几部分组成:

- broker:broker是用来传递信息的,我们可以想象成“信使”,“外卖配送员”,它的作用是暂时保存产生的任务以便于消费
- 生产者:它负责产生任务
- 消费者:它负责消费任务
- result backend:这个不是必需,但是如果有保存结果的需要,那么就需要它。

而流程则是:

生产者发布任务 -> broker -> 消费者竞争一个任务,然后进行消费 -> (可选:消费后向broker确认已经消费,然后broker删除此任务,
否则将超时重发任务) -> result backend保存结果

Machinery

首先我们来把 Machinery 代码拉下来:

$ go get -u github.com/RichardKnop/machinery/v1

Machinery 对消息的定义是:

// Signature represents a single task invocation
type Signature struct {
    UUID           string
    Name           string
    RoutingKey     string
    ETA            *time.Time
    GroupUUID      string
    GroupTaskCount int
    Args           []Arg
    Headers        Headers
    Immutable      bool
    RetryCount     int
    RetryTimeout   int
    OnSuccess      []*Signature
    OnError        []*Signature
    ChordCallback  *Signature
}

就如同自己写任务队列可能用json一样。

一般生产者先调用 signature := tasks.NewSignature 定义好任务,然后 machineryServer.SendTask 就完成了任务的产生。

Machinery 的异步任务长这样:

func Add(args ...int64) (int64, error) {
  sum := int64(0)
  for _, arg := range args {
    sum += arg
  }
  return sum, nil
}

要注意一点,函数的最后一个参数必需是 error。然后这样注册任务。

server.RegisterTasks(map[string]interface{}{
  "add":      Add,
})

消费者先调用 worker := machineryServer.NewWorker("send_sms", 10) 然后 worker.Launch() 开始监听broker并且消费任务。 当你产生一个任务,名字是 add 时,这个函数就会被调用。

一般你可以把生产者和消费者放到两个文件里,分别定义main函数,然后自己写Makefile,这样就可以直接make然后产生两个可执行 文件,不过我个人更喜欢用 flag 来标识到底是什么身份:

func main() {
    // parse cmd args
    flag.Parse()

    // init config
    initConfig()

    // init machinery worker
    initMachinery()

    // register tasks
    machineryServer.RegisterTask("sendSMS", sendSMS)

    if *worker {
        startWorker()
    } else {
        startWebServer()
    }
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

计算机组成(第 6 版)

计算机组成(第 6 版)

Andrew S. Tanenbaum、Todd Austin / 刘卫东、宋佳兴 / 机械工业出版社 / 2014-8-19 / CNY 99.00

本书采用结构化方法来介绍计算机系统,书的内容完全建立在“计算机是由层次结构组成的,每层完成规定的功能”这一概念之上。作者对本版进行了彻底的更新,以反映当今最重要的计算机技术以及计算机组成和体系结构方面的最新进展。书中详细讨论了数字逻辑层、微体系结构层、指令系统层、操作系统层和汇编语言层,并涵盖了并行体系结构的内容,而且每一章结尾都配有丰富的习题。本书适合作为计算机专业本科生计算机组成与结构课程的教......一起来看看 《计算机组成(第 6 版)》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具