gobox中的consumer处理框架

栏目: Go · 发布时间: 6年前

内容简介:我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架定义每条消息

我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架

consumer处理架构图

gobox中的consumer处理框架

重要的对象

IMessage

定义每条消息

type IMessage interface {
	Body() []byte
}

ConsumerHandleFunc

consumer中从队列收到每条消息后,调用这个方法

type ConsumerHandleFunc func(message IMessage) error

IConsumer

定义消费者行为

type IConsumer interface {
	SetHandleFunc(hf ConsumerHandleFunc)
	Start()
	Stop()
}

NewWorkerFunc

每个Worker的构造方法

type NewWorkerFunc func() IWorker

IWorker

定义Worker

type IWorker interface {
	SetWorkId(id int)
	SetLogger(logger golog.ILogger)

	Work(wg *sync.WaitGroup, lineCh chan []byte, stopCh chan bool)
}

LineProcessFunc

每条消息在Worker中的实际处理方法

type LineProcessFunc func(line []byte) error

BaseWorker

框架提供的一个简单基础Worker对象,组合这个对象后,只需要实现 LineProcessFunc 即可

type BaseWorker struct

Task

Task用于实现consumer的处理框架

使用示例

package main

import (
	"github.com/goinbox/goconsumer"

	"fmt"
	"strconv"
	"time"
)

// 这里实现Worker
type DemoWorker struct {
	*goconsumer.BaseWorker
}

func NewDemoWorker() goconsumer.IWorker {
	worker := &DemoWorker{goconsumer.NewBaseWorker()}
	worker.SetLineProcessFunc(worker.LineProcessFunc)

	return worker
}

func (d *DemoWorker) LineProcessFunc(line []byte) error {
	idStr := strconv.Itoa(d.Id)
	fmt.Println("wid:" + idStr + " process line:" + string(line))

	return nil
}

// 这里实现Message
type DemoMessage struct {
	body []byte
}

func (d *DemoMessage) Body() []byte {
	return d.body
}

// 这里实现一个简单的Consumer,模拟从队列中获得100条消息
type DemoConsumer struct {
	hf goconsumer.ConsumerHandleFunc
}

func (d *DemoConsumer) SetHandleFunc(hf goconsumer.ConsumerHandleFunc) {
	d.hf = hf
}

func (d *DemoConsumer) Start() {
	for i := 0; i < 100; i++ {
		str := "This message is from DemoConsumer loop " + strconv.Itoa(i)
		d.hf(&DemoMessage{[]byte(str)})
	}

	time.Sleep(time.Second * 1)
}

func (d *DemoConsumer) Stop() {

}


// 执行Task任务,调用consumer处理框架
func main() {
	task := goconsumer.NewTask("Demo")
	consumer := new(DemoConsumer)

	task.SetConsumer(consumer).
		SetWorker(10, NewDemoWorker).
		Start()
}

欢迎大家使用,使用中有遇到问题随时反馈,我们会尽快响应,谢谢!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Head First Servlets & JSP(中文版)

Head First Servlets & JSP(中文版)

(美)巴萨姆、(美)塞若、(美)贝茨 / 苏钰函、林剑 / 中国电力出版社 / 2006-10 / 98.00元

《Head First Servlets·JSP》(中文版)结合SCWCD考试大纲讲述了关于如何编写servlets和JSP代码,如何使用JSP表达式语言,如何部署Web应用,如何开发定制标记,以及会话状态、包装器、过滤器、企业设计模式等方面的知识,以一种轻松、幽默而又形象的方式让你了解、掌握servlets和JSP,并将其运用到你的项目中去。《Head First Servlets·JSP》(中......一起来看看 《Head First Servlets & JSP(中文版)》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试