内容简介:我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架定义每条消息
我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架
consumer处理架构图
重要的对象
IMessage
定义每条消息
type IMessage interface {
Body() []byte
}
ConsumerHandleFunc
consumer中从队列收到每条消息后,调用这个方法
type ConsumerHandleFunc func(message IMessage) error
IConsumer
定义消费者行为
type IConsumer interface {
SetHandleFunc(hf ConsumerHandleFunc)
Start()
Stop()
}
NewWorkerFunc
每个Worker的构造方法
type NewWorkerFunc func() IWorker
IWorker
定义Worker
type IWorker interface {
SetWorkId(id int)
SetLogger(logger golog.ILogger)
Work(wg *sync.WaitGroup, lineCh chan []byte, stopCh chan bool)
}
LineProcessFunc
每条消息在Worker中的实际处理方法
type LineProcessFunc func(line []byte) error
BaseWorker
框架提供的一个简单基础Worker对象,组合这个对象后,只需要实现 LineProcessFunc 即可
type BaseWorker struct
Task
Task用于实现consumer的处理框架
使用示例
package main
import (
"github.com/goinbox/goconsumer"
"fmt"
"strconv"
"time"
)
// 这里实现Worker
type DemoWorker struct {
*goconsumer.BaseWorker
}
func NewDemoWorker() goconsumer.IWorker {
worker := &DemoWorker{goconsumer.NewBaseWorker()}
worker.SetLineProcessFunc(worker.LineProcessFunc)
return worker
}
func (d *DemoWorker) LineProcessFunc(line []byte) error {
idStr := strconv.Itoa(d.Id)
fmt.Println("wid:" + idStr + " process line:" + string(line))
return nil
}
// 这里实现Message
type DemoMessage struct {
body []byte
}
func (d *DemoMessage) Body() []byte {
return d.body
}
// 这里实现一个简单的Consumer,模拟从队列中获得100条消息
type DemoConsumer struct {
hf goconsumer.ConsumerHandleFunc
}
func (d *DemoConsumer) SetHandleFunc(hf goconsumer.ConsumerHandleFunc) {
d.hf = hf
}
func (d *DemoConsumer) Start() {
for i := 0; i < 100; i++ {
str := "This message is from DemoConsumer loop " + strconv.Itoa(i)
d.hf(&DemoMessage{[]byte(str)})
}
time.Sleep(time.Second * 1)
}
func (d *DemoConsumer) Stop() {
}
// 执行Task任务,调用consumer处理框架
func main() {
task := goconsumer.NewTask("Demo")
consumer := new(DemoConsumer)
task.SetConsumer(consumer).
SetWorker(10, NewDemoWorker).
Start()
}
欢迎大家使用,使用中有遇到问题随时反馈,我们会尽快响应,谢谢!
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 简述大数据实时处理框架
- 流式处理框架storm浅析(上篇)
- Bootstrap开发框架界面的调整处理
- iOS换肤功能的简单处理框架
- 一个不错的音视频快速处理框架
- Apache Flink 1.9.3 发布,流处理框架
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
部落:一呼百应的力量
高汀 (Godin.S.) / 刘晖 / 中信出版社 / 2009-7 / 26.00元
部落指的是任何一群人,规模可大可小,他们因追随领导、志同道合而相互联系在一起。人类其实数百万年前就有部落的出现,随之还形成了宗教、种族、政治或甚至音乐。 互联网消除了地理隔离,降低了沟通成本并缩短了时间。博客和社交网站都有益于现有的部落扩张,并促进了网络部落的诞生——这些部落的人数从10个到1000万个不等,他们所关注的也许是iPhone,或一场政治运动,或阻止全球变暖的新方法。 那么......一起来看看 《部落:一呼百应的力量》 这本书的介绍吧!