玩转redis-简单消息队列

栏目: IT技术 · 发布时间: 4年前

内容简介:redis的添加数据和获取数据的操作也是非常简单的

使用 go 语言基于 redis 写了一个简单的消息队列

源码地址

使用demo

redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

玩转redis-简单消息队列

添加数据和获取数据的操作也是非常简单的

LPUSH 从左边插入数据

RPUSH 大右边插入数据

LPOP 从左边取出一个数据

RPOP 从右边取出一个数据

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,

如果这期间有数据写入,则会读取并返回,没有数据则会返回空

在一个 窗口1 读取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

在另一个 窗口2 写入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

再开一个 窗口3 读取,第二次读取时, list 是空的,所以等待1秒后返回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

简单消息队列的实现

如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个 consumer 消费掉?

玩转redis-简单消息队列

当然不会,因为 redis 是单线程的,在从 list 取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

下面我说一下使用 list 实现一个简单的消息队列的整体思路

comsumer的实现

consumer 主要做的就是从list里读取数据,使用 LPOP 或者 BLPOP 都可以,

这里做了一个开关 optionsUseBLopp 如果为 true 时会使用 BLPOP

type consumer struct {
	once            sync.Once
	redisCmd        redis.Cmdable
	ctx             context.Context
	topicName       string
	handler         Handler
	rateLimitPeriod time.Duration
	options         ConsumerOptions
	_               struct{}
}

type ConsumerOptions struct {
	RateLimitPeriod time.Duration
	UseBLPop        bool
}

看一下创建 consumer 的代码,最后面的 opts 参数是可选的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
	consumer := &consumer{
		redisCmd:  redisCmd,
		ctx:       ctx,
		topicName: topicName,
	}
	for _, o := range opts {
		o(&consumer.options)
	}
	if consumer.options.RateLimitPeriod == 0 {
		consumer.options.RateLimitPeriod = time.Microsecond * 200
	}
	return consumer
}

读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理

有一个小的 interface 调用者根据自己的逻辑去实现

type Handler interface {
	HandleMessage(msg *Message)
}

读取数据的逻辑使用一个gorouting实现

func (s *consumer) startGetMessage() {
	go func() {
		ticker := time.NewTicker(s.options.RateLimitPeriod)
		defer func() {
			log.Println("stop get message.")
			ticker.Stop()
		}()
		for {
			select {
			case <-s.ctx.Done():
				log.Printf("context Done msg: %#v \n", s.ctx.Err())
				return
			case <-ticker.C:
				var revBody []byte
				var err error
				if !s.options.UseBLPop {
					revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
				} else {
					revs := s.redisCmd.BLPop(time.Second, s.topicName)
					err = revs.Err()
					revValues := revs.Val()
					if len(revValues) >= 2 {
						revBody = []byte(revValues[1])
					}
				}
				if err == redis.Nil {
					continue
				}
				if err != nil {
					log.Printf("LPOP error: %#v \n", err)
					continue
				}

				if len(revBody) == 0 {
					continue
				}
				msg := &Message{}
				json.Unmarshal(revBody, msg)
				if s.handler != nil {
					s.handler.HandleMessage(msg)
				}
			}
		}
	}()
}

Producer 的实现

Producer 还是很简单的就是把数据推送到 reids

type Producer struct {
	redisCmd redis.Cmdable
	_        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
	return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
	msg := NewMessage("", body)
	sendData, _ := json.Marshal(msg)
	return p.redisCmd.RPush(topicName, string(sendData)).Err()
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Introduction to the Design and Analysis of Algorithms

Introduction to the Design and Analysis of Algorithms

Anany Levitin / Addison Wesley / 2006-2-24 / USD 122.00

Based on a Based on a new classification of algorithm design techniques and a clear delineation of analysis methods, "Introduction to the Design and Analysis of Algorithms" presents the subject in a c......一起来看看 《Introduction to the Design and Analysis of Algorithms》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换