内容简介:说道消息队列,你肯定会想到Redis通过废话补不多说上代码:
说道消息队列,你肯定会想到 Kafka
、 Rabbitmq
等消息中间件,这些专业的消息中间件提供了很多功能特性,当然他的部署使用维护都是比较麻烦的。如果你对消息队列没那么高要求,想要轻量级的,使用 Redis 就没错啦。
Redis通过 list
数据结构来实现消息队列.主要使用到如下命令:
- lpush和rpush入队列
- lpop和rpop出队列
- blpop和brpop阻塞式出队列
废话补不多说上代码:
$redis = new Redis(); $redis->connect('127.0.0.1', 6379); //发送消息 $redis->lPush($list, $value); //消费消息 while (true) { try { $msg = $redis->rPop($list); if (!$msg) { sleep(1); } //业务处理 } catch (Exception $e) { echo $e->getMessage(); } } 复制代码
上面代码会有个问题如果队列长时间是空的,那pop就不会不断的循环,这样会导致redis的QPS升高,影响性能。所以我们使用 sleep
来解决,当没有消息的时候阻塞一段时间。但其实这样还会带来另一个问题,就是 sleep
会导致消息的处理延迟增加。这个问题我们可以通过 blpop/brpop
来阻塞读取队列。
blpop/brpop
在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。
还有一个需要注意的点是我们需要是用 try/catch
来进行异常捕获,如果一直阻塞在那里,Redis服务器一般会主动断开掉空链接,来减少闲置资源的占用。
延迟队列
你是否在做电商项目的时候会遇到如下场景:
- 订单下单后超过一小时用户未支付,需要关闭订单
- 订单的评论如果7天未评价,系统需要自动产生一条评论
这个时候我们就需要用到延时队列了,顾名思义就是需要延迟一段时间后执行。Redis可通过 zset
来实现。我们可以将有序集合的value设置为我们的消息任务,把value的score设置为消息的到期时间,然后轮询获取有序集合的中的到期消息进行处理。
实现代码如下:
$redis = new Redis(); $redis->connect('127.0.0.1', 6379); $redis->zAdd($delayQueue,$tts, $value); while(true) { try{ $msg = $redis->zRangeByScore($delayQueue,0,time(),0,1); if($msg){ continue; } //删除消息 $ok = $redis.zrem($delayQueue,$msg); if($ok){ //业务处理 } } catch(\Exception $e) { } } 复制代码
这里又产生了一个问题,同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到的进程都是白取了一次任务,这是浪费。解决办法:将 zrangebyscore
和 zrem
使用 lua 脚本进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了。
以上所述就是小编给大家介绍的《Redis应用-异步消息队列与延时队列》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Redis应用-异步消息队列与延时队列
- 连续同源异步操作队列
- Django - Celery异步任务队列
- Web开发系列(九):消息队列,异步任务
- machinery 入门看这一篇(异步任务队列)
- Golang 分布式异步任务队列 Machinery 教程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。