玩转redis-简单消息队列

栏目: IT技术 · 发布时间: 4年前

内容简介:redis的添加数据和获取数据的操作也是非常简单的

使用 go 语言基于 redis 写了一个简单的消息队列

源码地址

使用demo

redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

玩转redis-简单消息队列

添加数据和获取数据的操作也是非常简单的

LPUSH 从左边插入数据

RPUSH 大右边插入数据

LPOP 从左边取出一个数据

RPOP 从右边取出一个数据

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,

如果这期间有数据写入,则会读取并返回,没有数据则会返回空

在一个 窗口1 读取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

在另一个 窗口2 写入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

再开一个 窗口3 读取,第二次读取时, list 是空的,所以等待1秒后返回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

简单消息队列的实现

如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个 consumer 消费掉?

玩转redis-简单消息队列

当然不会,因为 redis 是单线程的,在从 list 取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

下面我说一下使用 list 实现一个简单的消息队列的整体思路

comsumer的实现

consumer 主要做的就是从list里读取数据,使用 LPOP 或者 BLPOP 都可以,

这里做了一个开关 optionsUseBLopp 如果为 true 时会使用 BLPOP

type consumer struct {
	once            sync.Once
	redisCmd        redis.Cmdable
	ctx             context.Context
	topicName       string
	handler         Handler
	rateLimitPeriod time.Duration
	options         ConsumerOptions
	_               struct{}
}

type ConsumerOptions struct {
	RateLimitPeriod time.Duration
	UseBLPop        bool
}

看一下创建 consumer 的代码,最后面的 opts 参数是可选的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
	consumer := &consumer{
		redisCmd:  redisCmd,
		ctx:       ctx,
		topicName: topicName,
	}
	for _, o := range opts {
		o(&consumer.options)
	}
	if consumer.options.RateLimitPeriod == 0 {
		consumer.options.RateLimitPeriod = time.Microsecond * 200
	}
	return consumer
}

读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理

有一个小的 interface 调用者根据自己的逻辑去实现

type Handler interface {
	HandleMessage(msg *Message)
}

读取数据的逻辑使用一个gorouting实现

func (s *consumer) startGetMessage() {
	go func() {
		ticker := time.NewTicker(s.options.RateLimitPeriod)
		defer func() {
			log.Println("stop get message.")
			ticker.Stop()
		}()
		for {
			select {
			case <-s.ctx.Done():
				log.Printf("context Done msg: %#v \n", s.ctx.Err())
				return
			case <-ticker.C:
				var revBody []byte
				var err error
				if !s.options.UseBLPop {
					revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
				} else {
					revs := s.redisCmd.BLPop(time.Second, s.topicName)
					err = revs.Err()
					revValues := revs.Val()
					if len(revValues) >= 2 {
						revBody = []byte(revValues[1])
					}
				}
				if err == redis.Nil {
					continue
				}
				if err != nil {
					log.Printf("LPOP error: %#v \n", err)
					continue
				}

				if len(revBody) == 0 {
					continue
				}
				msg := &Message{}
				json.Unmarshal(revBody, msg)
				if s.handler != nil {
					s.handler.HandleMessage(msg)
				}
			}
		}
	}()
}

Producer 的实现

Producer 还是很简单的就是把数据推送到 reids

type Producer struct {
	redisCmd redis.Cmdable
	_        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
	return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
	msg := NewMessage("", body)
	sendData, _ := json.Marshal(msg)
	return p.redisCmd.RPush(topicName, string(sendData)).Err()
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

编写高质量代码

编写高质量代码

秦小波 / 机械工业出版社华章公司 / 2011-12-28 / 59.00元

在通往“Java技术殿堂”的路上,本书将为你指点迷津!内容全部由Java编码的最佳实践组成,从语法、程序设计和架构、工具和框架、编码风格和编程思想等五大方面对Java程序员遇到的各种棘手的疑难问题给出了经验性的解决方案,为Java程序员如何编写高质量的Java代码提出了151条极为宝贵的建议。对于每一个问题,不仅以建议的方式从正反两面给出了被实践证明为十分优秀的解决方案和非常糟糕的解决方案,而且还......一起来看看 《编写高质量代码》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

随机密码生成器
随机密码生成器

多种字符组合密码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具