gobox中的consumer处理框架

栏目: Go · 发布时间: 7年前

内容简介:我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架定义每条消息

我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架

consumer处理架构图

gobox中的consumer处理框架

重要的对象

IMessage

定义每条消息

type IMessage interface {
	Body() []byte
}

ConsumerHandleFunc

consumer中从队列收到每条消息后,调用这个方法

type ConsumerHandleFunc func(message IMessage) error

IConsumer

定义消费者行为

type IConsumer interface {
	SetHandleFunc(hf ConsumerHandleFunc)
	Start()
	Stop()
}

NewWorkerFunc

每个Worker的构造方法

type NewWorkerFunc func() IWorker

IWorker

定义Worker

type IWorker interface {
	SetWorkId(id int)
	SetLogger(logger golog.ILogger)

	Work(wg *sync.WaitGroup, lineCh chan []byte, stopCh chan bool)
}

LineProcessFunc

每条消息在Worker中的实际处理方法

type LineProcessFunc func(line []byte) error

BaseWorker

框架提供的一个简单基础Worker对象,组合这个对象后,只需要实现 LineProcessFunc 即可

type BaseWorker struct

Task

Task用于实现consumer的处理框架

使用示例

package main

import (
	"github.com/goinbox/goconsumer"

	"fmt"
	"strconv"
	"time"
)

// 这里实现Worker
type DemoWorker struct {
	*goconsumer.BaseWorker
}

func NewDemoWorker() goconsumer.IWorker {
	worker := &DemoWorker{goconsumer.NewBaseWorker()}
	worker.SetLineProcessFunc(worker.LineProcessFunc)

	return worker
}

func (d *DemoWorker) LineProcessFunc(line []byte) error {
	idStr := strconv.Itoa(d.Id)
	fmt.Println("wid:" + idStr + " process line:" + string(line))

	return nil
}

// 这里实现Message
type DemoMessage struct {
	body []byte
}

func (d *DemoMessage) Body() []byte {
	return d.body
}

// 这里实现一个简单的Consumer,模拟从队列中获得100条消息
type DemoConsumer struct {
	hf goconsumer.ConsumerHandleFunc
}

func (d *DemoConsumer) SetHandleFunc(hf goconsumer.ConsumerHandleFunc) {
	d.hf = hf
}

func (d *DemoConsumer) Start() {
	for i := 0; i < 100; i++ {
		str := "This message is from DemoConsumer loop " + strconv.Itoa(i)
		d.hf(&DemoMessage{[]byte(str)})
	}

	time.Sleep(time.Second * 1)
}

func (d *DemoConsumer) Stop() {

}


// 执行Task任务,调用consumer处理框架
func main() {
	task := goconsumer.NewTask("Demo")
	consumer := new(DemoConsumer)

	task.SetConsumer(consumer).
		SetWorker(10, NewDemoWorker).
		Start()
}

欢迎大家使用,使用中有遇到问题随时反馈,我们会尽快响应,谢谢!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

一网打尽

一网打尽

[美]布拉德·斯通 / 李晶、李静 / 中信出版社 / 2014-1-15 / 49.00元

亚马逊最早起步于通过邮购来经营图书业务。但贝佐斯却不满足于仅做一名书商,他希望缔造亚马逊万货商店的神话——能提供海量的货源,并以超低的价格提供最具吸引力的便捷服务。为了实现这一诺言,他发展了一种企业文化,这种文化蕴含着执着的雄心与难以破解 的秘诀。亚马逊的这 一文化现在依旧在发扬光大。 布拉德·斯通非常幸运地得到采访亚马逊的前任和现任高管、员工以及贝佐斯本人、家人的机会,使我们第一次有机会深......一起来看看 《一网打尽》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具