内容简介:最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下。说到java中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下几点:其中NamedThreadFactory是我自定义的一个线程工厂,主要给线程池定义名称及相关日志打印便于后续的问题分析,这里就不多做介绍了。拒绝策略也是采用默认的拒绝策略。 然后测试了一下,满足目标需求的功
最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下。
实现过程
说到 java 中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下几点:
- Timer使用的是绝对时间,系统时间的改变会对Timer产生一定的影响;而ScheduledThreadPoolExecutor使用的是相对时间,所以不会有这个问题。
- Timer使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理,而ScheduledThreadPoolExecutor可以自定义线程数量。
- Timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个Timer崩溃,而ScheduledThreadPoolExecutor对运行时异常做了捕获(可以在 afterExecute() 回调方法中进行处理),所以更加安全。
- ScheduledThreadPoolExecutor 决定了用ScheduledThreadPoolExecutor来进行实现,接下来就是代码编写啦(大体流程代码)。 主要的延时实现如下:
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new ThreadPoolExecutor.AbortPolicy()); //从消息中取出延迟时间及相关信息的代码略 int delayTime = 0; executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { //具体操作逻辑 }},0,delayTime, TimeUnit.SECONDS); 复制代码
其中NamedThreadFactory是我自定义的一个线程工厂,主要给线程池定义名称及相关日志打印便于后续的问题分析,这里就不多做介绍了。拒绝策略也是采用默认的拒绝策略。 然后测试了一下,满足目标需求的功能,可以做到延迟指定时间后执行,至此似乎功能就被完成了。 大家可能疑问,这也太简单了有什么好说的,但是这种方式实现简单是简单但是存在一个潜在的问题,问题在哪呢,让我们看一下ScheduledThreadPoolExecutor的源码:
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);} 复制代码
ScheduledThreadPoolExecutor 由于它自身的延时和周期的特性,默认使用了DelayWorkQueue,而并不像我们平时使用的SingleThreadExecutor等构造是可以使用自己定义的LinkedBlockingQueue并且设置队列大小,问题就出在这里。DelayWrokQueue是一个无界队列,而我们的目标数据源是kafka,也就是一个高并发高吞吐的消息队列,很大可能在某一时间段有大量的消息过来从而导致OOM,在使用多线程时我们是肯定要考虑到OOM的可能性的,因为OOM带来的后果往往比较严重,系统OOM临时的解决办法一般只能是重启,可能会导致用户数据丢失等不可能挽回的问题,所以从编码设计阶段要采用尽可能稳妥的手段来避免这些问题。
- 采用 redis 和线程结合
这一次换了思路,采用redis来帮助我们做缓冲,从而避免消息过多OOM的问题。 相关redis zset api:
//添加元素 ZADD key score member [[score member] [score member] …] //根据分值及限制数量查询 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] //从zset中删除指定成员 ZREM key member [member …] 复制代码
我们采用redis基础数据结构的zset结构,采用score来存储我们目标发送时间的数值,整体处理流程如下:
- 第一步数据存储:9:10分从kafka接收了一条a的订单消息,要求30分钟后进行发货通知,那我们就将当前时间加上30分钟然后转为时间戳作为a的score,key为a的订单号存入redis中。代码如下:
public void onMessage(String topic, String message) { String orderId; int delayTime = 0; try { Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() { }.getType()); if (msgMap.isEmpty()) { return; } LOGGER.info("onMessage kafka content:{}", msgMap.toString()); orderId = msgMap.get("orderId"); if(StringUtils.isNotEmpty(orderId)){ delayTime = Integer.parseInt(msgMap.get("delayTime")); Calendar calendar = Calendar.getInstance(); //计算出预计发送时间 calendar.add(Calendar.MINUTE, delayTime); long sendTime = calendar.getTimeInMillis(); RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId); LOGGER.info("orderId:{}---放入redis中等待发送---sendTime:{}", ---orderId:{}, sendTime); } } catch (Exception e) { LOGGER.info("onMessage 延时发送异常:{}", e); } } 复制代码
- 第二步数据处理:另起一个线程具体调度时间根据业务需求来定,我这里3分钟执行一次,内部逻辑:从redis中取出一定量的zset数据,如何取呢,使用zset的zrangeByScore方法,根据数据的score进行排序,当然可以带上时间段,这里从0到现在,来进行消费,需要注意的一点是,在取出数据后我们需要用zrem方法将取出的数据从zset中删除,防止其他线程重复消费数据。在此之后进行接下来的发货通知等相关逻辑。代码如下:
public void run(){ //获取批量大小 int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100")); try { //批量获取离发送时间最近的orderNum条数据 Calendar calendar = Calendar.getInstance(); long now = calendar.getTimeInMillis(); //获取无限早到现在的事件key(防止上次批量数量小于放入数量,存在历史数据未消费情况) Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum); LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds)); if (CollectionUtils.isNotEmpty(orders)){ //删除key 防止重复发送 for (String orderId : orderIds) { RedisUtils.getInstance().zrem(Constant.DELAY, orderId); } //接下来执行发送等业务逻辑 } } catch (Exception e) { LOGGER.warn("task.run exception:{}", e); } } 复制代码
至此完成了依赖redis和线程完成了延时发送的功能。
结语
那么对上面两种不同的实现方式进行一下优缺点比较:
-
第一种方式实现简单,不依赖外部组件,能够快速的实现目标功能,但缺点也很明显,需要在特定的场景下使用,如果是我这种消息量大的情况下使用很可能是有问题,当然在数据源消息不多的情况下不失为好的选择。
-
第二种方式实现稍微复杂一点,但是能够适应消息量大的场景,采用redis的zset作为了“中间件”的效果,并且帮助我们进行延时的功能实现能够较好的适应高并发场景,缺点在于在编写的过程中需要考虑实际的因素较多,例如线程的执行周期时间,发送可能会有一定时间的延迟,批量数据大小的设置等等。
综上是本人这次延时功能的实现过程的两种实现方式的总结,具体采用哪种方式还需大家根据实际情况选择,希望能给大家带来帮助。ps:由于本人的技术能力有限,文章中可能出现技术描述不准确或者错误的情况恳请各位大佬指出,我立马进行改正,避免误导大家,谢谢!
以上所述就是小编给大家介绍的《如何利用Redis实现延时处理》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- [原理OK]利用Redis的notifications功能实现延时任务
- php订单延时处理-延时队列
- TCP协议之网络延时
- 什么是Hibernate延时加载?
- TCP协议之网络延时
- 低延时场景不要用 Webflux
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Game Engine Architecture, Second Edition
Jason Gregory / A K Peters/CRC Press / 2014-8-15 / USD 69.95
A 2010 CHOICE outstanding academic title, this updated book covers the theory and practice of game engine software development. It explains practical concepts and techniques used by real game studios,......一起来看看 《Game Engine Architecture, Second Edition》 这本书的介绍吧!