如何利用Redis实现延时处理

栏目: 数据库 · 发布时间: 5年前

内容简介:最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下。说到java中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下几点:其中NamedThreadFactory是我自定义的一个线程工厂,主要给线程池定义名称及相关日志打印便于后续的问题分析,这里就不多做介绍了。拒绝策略也是采用默认的拒绝策略。 然后测试了一下,满足目标需求的功

最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下。

实现过程

说到 java 中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下几点:

  • Timer使用的是绝对时间,系统时间的改变会对Timer产生一定的影响;而ScheduledThreadPoolExecutor使用的是相对时间,所以不会有这个问题。
  • Timer使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理,而ScheduledThreadPoolExecutor可以自定义线程数量。
  • Timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个Timer崩溃,而ScheduledThreadPoolExecutor对运行时异常做了捕获(可以在 afterExecute() 回调方法中进行处理),所以更加安全。
  1. ScheduledThreadPoolExecutor 决定了用ScheduledThreadPoolExecutor来进行实现,接下来就是代码编写啦(大体流程代码)。 主要的延时实现如下:
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new 
ThreadPoolExecutor.AbortPolicy());
//从消息中取出延迟时间及相关信息的代码略
int delayTime = 0;
executorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            //具体操作逻辑
        }},0,delayTime, TimeUnit.SECONDS);
复制代码

其中NamedThreadFactory是我自定义的一个线程工厂,主要给线程池定义名称及相关日志打印便于后续的问题分析,这里就不多做介绍了。拒绝策略也是采用默认的拒绝策略。 然后测试了一下,满足目标需求的功能,可以做到延迟指定时间后执行,至此似乎功能就被完成了。 大家可能疑问,这也太简单了有什么好说的,但是这种方式实现简单是简单但是存在一个潜在的问题,问题在哪呢,让我们看一下ScheduledThreadPoolExecutor的源码:

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, 
    TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}
复制代码

ScheduledThreadPoolExecutor 由于它自身的延时和周期的特性,默认使用了DelayWorkQueue,而并不像我们平时使用的SingleThreadExecutor等构造是可以使用自己定义的LinkedBlockingQueue并且设置队列大小,问题就出在这里。DelayWrokQueue是一个无界队列,而我们的目标数据源是kafka,也就是一个高并发高吞吐的消息队列,很大可能在某一时间段有大量的消息过来从而导致OOM,在使用多线程时我们是肯定要考虑到OOM的可能性的,因为OOM带来的后果往往比较严重,系统OOM临时的解决办法一般只能是重启,可能会导致用户数据丢失等不可能挽回的问题,所以从编码设计阶段要采用尽可能稳妥的手段来避免这些问题。

  1. 采用 redis 和线程结合

这一次换了思路,采用redis来帮助我们做缓冲,从而避免消息过多OOM的问题。 相关redis zset api:

//添加元素
ZADD key score member [[score member] [score member] …]
//根据分值及限制数量查询
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
//从zset中删除指定成员
ZREM key member [member …]
复制代码

我们采用redis基础数据结构的zset结构,采用score来存储我们目标发送时间的数值,整体处理流程如下:

  • 第一步数据存储:9:10分从kafka接收了一条a的订单消息,要求30分钟后进行发货通知,那我们就将当前时间加上30分钟然后转为时间戳作为a的score,key为a的订单号存入redis中。代码如下:
public void onMessage(String topic, String message) {
        String orderId;
		int delayTime = 0;
        try {
            Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
            }.getType());
            if (msgMap.isEmpty()) {
                return;
            }
            LOGGER.info("onMessage kafka content:{}", msgMap.toString());
	    orderId = msgMap.get("orderId");
            if(StringUtils.isNotEmpty(orderId)){
                delayTime = Integer.parseInt(msgMap.get("delayTime"));
                Calendar calendar = Calendar.getInstance();
                //计算出预计发送时间
                calendar.add(Calendar.MINUTE, delayTime);
                long sendTime = calendar.getTimeInMillis();
                RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);
                LOGGER.info("orderId:{}---放入redis中等待发送---sendTime:{}", ---orderId:{}, sendTime);
            }
        } catch (Exception e) {
            LOGGER.info("onMessage 延时发送异常:{}", e);
        }
    }

复制代码
  • 第二步数据处理:另起一个线程具体调度时间根据业务需求来定,我这里3分钟执行一次,内部逻辑:从redis中取出一定量的zset数据,如何取呢,使用zset的zrangeByScore方法,根据数据的score进行排序,当然可以带上时间段,这里从0到现在,来进行消费,需要注意的一点是,在取出数据后我们需要用zrem方法将取出的数据从zset中删除,防止其他线程重复消费数据。在此之后进行接下来的发货通知等相关逻辑。代码如下:
public void run(){
        //获取批量大小
        int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));
        try {
            //批量获取离发送时间最近的orderNum条数据
	    Calendar calendar = Calendar.getInstance();
	    long now = calendar.getTimeInMillis();
	    //获取无限早到现在的事件key(防止上次批量数量小于放入数量,存在历史数据未消费情况)
	    Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);
	    LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
            if (CollectionUtils.isNotEmpty(orders)){
                //删除key 防止重复发送
                for (String orderId : orderIds) {
                    RedisUtils.getInstance().zrem(Constant.DELAY, orderId);
                }
	        //接下来执行发送等业务逻辑                 
            }
        } catch (Exception e) {
            LOGGER.warn("task.run exception:{}", e);
        }
    }
复制代码

至此完成了依赖redis和线程完成了延时发送的功能。

结语

那么对上面两种不同的实现方式进行一下优缺点比较:

  • 第一种方式实现简单,不依赖外部组件,能够快速的实现目标功能,但缺点也很明显,需要在特定的场景下使用,如果是我这种消息量大的情况下使用很可能是有问题,当然在数据源消息不多的情况下不失为好的选择。

  • 第二种方式实现稍微复杂一点,但是能够适应消息量大的场景,采用redis的zset作为了“中间件”的效果,并且帮助我们进行延时的功能实现能够较好的适应高并发场景,缺点在于在编写的过程中需要考虑实际的因素较多,例如线程的执行周期时间,发送可能会有一定时间的延迟,批量数据大小的设置等等。

综上是本人这次延时功能的实现过程的两种实现方式的总结,具体采用哪种方式还需大家根据实际情况选择,希望能给大家带来帮助。ps:由于本人的技术能力有限,文章中可能出现技术描述不准确或者错误的情况恳请各位大佬指出,我立马进行改正,避免误导大家,谢谢!


以上所述就是小编给大家介绍的《如何利用Redis实现延时处理》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

创业小败局

创业小败局

创业家、i黑马 / 时代华文书局 / 2014-8-1 / 42.00元

让别人的失败,成为你的成功之母! 《创业小败局》由徐小平、何伯权等六位经验丰富的业界大佬,从《创业家》五年来跟踪的数千个创业案例中,精心挑选而来。21个最具代表性的失败案例,每个案例都代表了一种最常见的失败规律,也基本上覆盖了当下中国创业浪潮中,最容易遭遇的创业陷阱。失 败是有规律的。有时候创业者的选择和 行为,必然会导致失败,但当事人却因为缺乏经验而没有察觉。比如在错误心态下引入错误的合伙......一起来看看 《创业小败局》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具