gobox中的consumer处理框架

栏目: Go · 发布时间: 7年前

内容简介:我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架定义每条消息

我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架

consumer处理架构图

gobox中的consumer处理框架

重要的对象

IMessage

定义每条消息

type IMessage interface {
	Body() []byte
}

ConsumerHandleFunc

consumer中从队列收到每条消息后,调用这个方法

type ConsumerHandleFunc func(message IMessage) error

IConsumer

定义消费者行为

type IConsumer interface {
	SetHandleFunc(hf ConsumerHandleFunc)
	Start()
	Stop()
}

NewWorkerFunc

每个Worker的构造方法

type NewWorkerFunc func() IWorker

IWorker

定义Worker

type IWorker interface {
	SetWorkId(id int)
	SetLogger(logger golog.ILogger)

	Work(wg *sync.WaitGroup, lineCh chan []byte, stopCh chan bool)
}

LineProcessFunc

每条消息在Worker中的实际处理方法

type LineProcessFunc func(line []byte) error

BaseWorker

框架提供的一个简单基础Worker对象,组合这个对象后,只需要实现 LineProcessFunc 即可

type BaseWorker struct

Task

Task用于实现consumer的处理框架

使用示例

package main

import (
	"github.com/goinbox/goconsumer"

	"fmt"
	"strconv"
	"time"
)

// 这里实现Worker
type DemoWorker struct {
	*goconsumer.BaseWorker
}

func NewDemoWorker() goconsumer.IWorker {
	worker := &DemoWorker{goconsumer.NewBaseWorker()}
	worker.SetLineProcessFunc(worker.LineProcessFunc)

	return worker
}

func (d *DemoWorker) LineProcessFunc(line []byte) error {
	idStr := strconv.Itoa(d.Id)
	fmt.Println("wid:" + idStr + " process line:" + string(line))

	return nil
}

// 这里实现Message
type DemoMessage struct {
	body []byte
}

func (d *DemoMessage) Body() []byte {
	return d.body
}

// 这里实现一个简单的Consumer,模拟从队列中获得100条消息
type DemoConsumer struct {
	hf goconsumer.ConsumerHandleFunc
}

func (d *DemoConsumer) SetHandleFunc(hf goconsumer.ConsumerHandleFunc) {
	d.hf = hf
}

func (d *DemoConsumer) Start() {
	for i := 0; i < 100; i++ {
		str := "This message is from DemoConsumer loop " + strconv.Itoa(i)
		d.hf(&DemoMessage{[]byte(str)})
	}

	time.Sleep(time.Second * 1)
}

func (d *DemoConsumer) Stop() {

}


// 执行Task任务,调用consumer处理框架
func main() {
	task := goconsumer.NewTask("Demo")
	consumer := new(DemoConsumer)

	task.SetConsumer(consumer).
		SetWorker(10, NewDemoWorker).
		Start()
}

欢迎大家使用,使用中有遇到问题随时反馈,我们会尽快响应,谢谢!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

逻辑的引擎

逻辑的引擎

[美] 马丁·戴维斯 / 张卜天 / 湖南科学技术出版社 / 2005-5 / 20.00元

本书介绍了现代计算机背后的那些基本概念和发展这些概念的人,描写了莱布尼茨、布尔、费雷格、康托尔、希尔伯特、哥德尔、图灵等天才的生活和工作,讲述了数学家们如何在成果付诸应用之前很久就已经提出了其背后的思想。博达著作权代理有限公司授权出版据美国W.W.Norton公司2000年版本译出。2007年第二版亦使用同一ISBN。一起来看看 《逻辑的引擎》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具