内容简介:redis的添加数据和获取数据的操作也是非常简单的
redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据
添加数据和获取数据的操作也是非常简单的
LPUSH 从左边插入数据
RPUSH 大右边插入数据
LPOP 从左边取出一个数据
RPOP 从右边取出一个数据
127.0.0.1:6379> LPUSH list1 a (integer) 1 127.0.0.1:6379> RPUSH list1 b (integer) 2 127.0.0.1:6379> LPOP list1 "a" 127.0.0.1:6379> RPOP list1 "b"
或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,
如果这期间有数据写入,则会读取并返回,没有数据则会返回空
在一个 窗口1 读取
127.0.0.1:6379> BLPOP list1 10 1) "list1" 2) "a"
在另一个 窗口2 写入
127.0.0.1:6379> RPUSH list1 a b c (integer) 3
再开一个 窗口3 读取,第二次读取时, list 是空的,所以等待1秒后返回空。
127.0.0.1:6379> BRPOP list1 1 1) "list1" 2) "c" 127.0.0.1:6379> BRPOP list1 1 (nil) (1.04s)
简单消息队列的实现
如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个 consumer 消费掉?
当然不会,因为 redis 是单线程的,在从 list 取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的
下面我说一下使用 list 实现一个简单的消息队列的整体思路
comsumer的实现
consumer 主要做的就是从list里读取数据,使用 LPOP 或者 BLPOP 都可以,
这里做了一个开关 options 的 UseBLopp 如果为 true 时会使用 BLPOP 。
type consumer struct {
once sync.Once
redisCmd redis.Cmdable
ctx context.Context
topicName string
handler Handler
rateLimitPeriod time.Duration
options ConsumerOptions
_ struct{}
}
type ConsumerOptions struct {
RateLimitPeriod time.Duration
UseBLPop bool
}
看一下创建 consumer 的代码,最后面的 opts 参数是可选的配置
type Consumer = *consumer
func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
consumer := &consumer{
redisCmd: redisCmd,
ctx: ctx,
topicName: topicName,
}
for _, o := range opts {
o(&consumer.options)
}
if consumer.options.RateLimitPeriod == 0 {
consumer.options.RateLimitPeriod = time.Microsecond * 200
}
return consumer
}
读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理
有一个小的 interface 调用者根据自己的逻辑去实现
type Handler interface {
HandleMessage(msg *Message)
}
读取数据的逻辑使用一个gorouting实现
func (s *consumer) startGetMessage() {
go func() {
ticker := time.NewTicker(s.options.RateLimitPeriod)
defer func() {
log.Println("stop get message.")
ticker.Stop()
}()
for {
select {
case <-s.ctx.Done():
log.Printf("context Done msg: %#v \n", s.ctx.Err())
return
case <-ticker.C:
var revBody []byte
var err error
if !s.options.UseBLPop {
revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
} else {
revs := s.redisCmd.BLPop(time.Second, s.topicName)
err = revs.Err()
revValues := revs.Val()
if len(revValues) >= 2 {
revBody = []byte(revValues[1])
}
}
if err == redis.Nil {
continue
}
if err != nil {
log.Printf("LPOP error: %#v \n", err)
continue
}
if len(revBody) == 0 {
continue
}
msg := &Message{}
json.Unmarshal(revBody, msg)
if s.handler != nil {
s.handler.HandleMessage(msg)
}
}
}
}()
}
Producer 的实现
Producer 还是很简单的就是把数据推送到 reids
type Producer struct {
redisCmd redis.Cmdable
_ struct{}
}
func NewProducer(cmd redis.Cmdable) *Producer {
return &Producer{redisCmd: cmd}
}
func (p *Producer) Publish(topicName string, body []byte) error {
msg := NewMessage("", body)
sendData, _ := json.Marshal(msg)
return p.redisCmd.RPush(topicName, string(sendData)).Err()
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 消息队列(三)常见消息队列介绍
- 消息队列探秘 – RabbitMQ 消息队列介绍
- springboot整合各种消息队列(二):rabbitmq消息队列
- springboot整合各种消息队列(一):redis消息队列
- 消息队列系列二(IOT中消息队列的应用)
- 消息队列(七)RocketMQ消息发送
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。