内容简介:微服务的概念可以说给程序设计打开了一个新世界,带来了众多的优点,但是也将一些以往容易处理的问题变得复杂,例如:缓存、事务、定时任务等。缓存可以用中间件例如redis、memcached等,事务有诸多分布式事务框架解决,定时任务也有分布式的解决方案,例如quartz、elastic job等,今天我要讲的是就是定时任务。既然已经有成熟的分布式定时任务框架,我要讲的东西并不是用另一种设计去实现相同的功能,而是从不同的角度去解决分布式定时任务的问题。这个问题来起源于一个小功能,我们有一个发送短信的微服务,需要获
微服务的概念可以说给程序设计打开了一个新世界,带来了众多的优点,但是也将一些以往容易处理的问题变得复杂,例如:缓存、事务、定时任务等。缓存可以用中间件例如 redis 、 memcached 等,事务有诸多分布式事务框架解决,定时任务也有分布式的解决方案,例如quartz、elastic job等,今天我要讲的是就是定时任务。
既然已经有成熟的分布式定时任务框架,我要讲的东西并不是用另一种设计去实现相同的功能,而是从不同的角度去解决分布式定时任务的问题。
问题来源
这个问题来起源于一个小功能,我们有一个发送短信的微服务,需要获取短信的状态报告,状态报告对于短信发送不是同步的,短信提交到服务商,服务商要提交运营商发送之后才能生成状态报告,因此有一定的延迟,需要异步获取,并且服务商提供的接口有频率限制,因此需要做一个定时任务,且需要单点执行,那么问题来了,因为这一个功能我就需要引入一个定时任务框架吗,总感觉有点大材小用的意思。
之前我们的定时任务处理既有用过quartz,也用过elastic job,但是只为这样一个小功能就引入一个框架,再加上配置又得好半天,想想都不划算。
例如要用quartz,要创建一堆数据库表,但表里面只存储了一个任务信息。
用elastic job吧,还要使用zookeeper,即便用lite版,也需要一堆配置,远比我写业务的时间要长。
我只想简简单单的写逻辑!!!
解决方案
谈分布式解决方案大致总离不开中间件,联想到上次解决websocket的分布式方案(参见 Spring Cloud 微服务架构下的 WebSocket 解决方案 )使用到的Spring Cloud Stream,大概有了思路:
- 我需要一个任务分发中心,专门负责触发定时任务
- 其他服务如果需要触发定时任务,接收特定的触发消息
- 任务执行完成向任务分发中心推送任务完成的确认消息
- 为任务执行端提供一个公共的spring boot starter晚上2,3的步骤,实际需要编码的几乎就剩下业务逻辑本身了
详细设计
根据上一步的方案,需要确认一些细节,以及一些特殊的情况,例如定时任务可能是由微服务集群中单个实例执行,也可能存在集体执行(例如更新内存中的缓存),还可能存在分区执行。
客户端(需要定时任务的为服务端)需要建立以下消息队列:
- 集群接收的队列,每个微服务实例建立一个,每个微服务实例都会收到相同消息
- 单独接收的队列,每个应用集群建立一个,确保消息只被一个实例消费
- 按分区接收的队列,每个分区建立一个,确保只被分区内一个实例消费
客户端与服务端需要通过唯一的任务id来确认需要执行的定时任务
服务端(任务分发微服务)需要根据情况将消息推送到不同的队列,不能直接使用Spring Cloud Stream,需要使用rabbitmq
服务端本身也是分布式的,因此需要一个定时任务框架用于任务触发,我这里选择了quartz
代码实现
Spring Cloud Stream的基本知识我不再复述了, Spring Cloud 微服务架构下的 WebSocket 解决方案 中有讲解。
定时任务分发服务
定义定时任务
data class ScheduleTask(
/** 任务的id,全局唯一,与客户端的taskId完全匹配 */
var taskId: String = "",
/** 定时任务的cron 表达式 */
var cron: String = "",
/** 关联应用 */
var appId: Int = 0,
/** 任务描述 */
var description: String = "",
/** 接收任务的分区 */
var zone: String? = null,
/** 调度方式,广播到集群或单例执行,默认单例 */
var dispatchMode: DispatchMode = DispatchMode.Singleton,
/** 是否启用 */
var enabled: Boolean = true,
/** 任务的数据库记录 id,自增 */
var id: Int = -1)
复制代码
任务调度
使用quartz进行任务调度
private fun scheduleJob(task: ScheduleTask) {
val job = JobBuilder.newJob(TaskEmitterJob::class.java)
.withIdentity(task.taskId, task.appId.toString())
.withDescription(task.description)
.storeDurably()
.requestRecovery()
.usingJobData("id", task.id)
.usingJobData("taskId", task.taskId)
.build()
val trigger = TriggerBuilder.newTrigger()
.withIdentity(task.taskId, task.appId.toString())
.withSchedule(CronScheduleBuilder.cronSchedule(task.cron))
.forJob(job)
.build()
scheduler.addJob(job, true, true)
if (scheduler.checkExists(trigger.key)) {
scheduler.rescheduleJob(trigger.key, trigger)
} else {
scheduler.scheduleJob(trigger)
}
}
复制代码
ScheduleTask是持久化的,插入的时候同时向quartz插入任务,更新的时候也要向quartz更新,删除的时候同时删除
quartz的任务触发
class TaskEmitterJob : Job {
companion object {
private val log = LogFactory.getLog(TaskEmitterJob::class.java)
}
override fun execute(context: JobExecutionContext) {
try {
val taskId = context.jobDetail.jobDataMap["taskId"] as String
log.info("任务分发:$taskId")
val service = ScheduleCenterApplication.context.getBean(ScheduleTaskService::class.java)
service.launch(taskId)
} catch (e: Exception) {
log.error("任务失败$[taskId]", e)
}
}
}
复制代码
rabbitmq的发送逻辑
/**
* 发布定时任务事件
*/
fun launch(task: ScheduleTask) {
val exchange = when (task.dispatchMode) {
Cluster -> "aegisScheduleCluster"
Singleton -> "aegisScheduleSingleton"
}
val routingKey = when (task.dispatchMode) {
Cluster -> exchange
Singleton -> "$exchange.${task.appName}"
}
val executeTaskInfo = ScheduleTaskInfo(task.taskId, task.appName!!)
amqpTemplate.convertAndSend(exchange, routingKey,
executeTaskInfo)
taskExecuteRecordDAO.save(
TaskExecuteRecord(executeTaskInfo.uid, task.id, Date())
)
}
复制代码
客户端spring boot starter的实现
定义定时任务接口,只要在项目中实现该接口并将实现声明为bean,即可完成定时任务的定义
@FunctionalInterface
interface ScheduledJob {
/**
* 执行定时任务
*/
fun execute(properties: Map<String, Any>)
/**
* 获取定时任务id
* @return 定时任务id,对应任务分发中心ScheduleTask的taskId
*/
fun getId(): String
}
复制代码
接收任务
/**
* 接收单例任务
*/
@StreamListener(SINGLETON_INPUT)
fun acceptGroupTask(taskInfo: ScheduleTaskInfo) {
if (taskInfo.app == application) {
val receivedTime = Date()
val job = jobsProvider.ifAvailable?.firstOrNull {
it.getId() == taskInfo.id
}
job?.execute(taskInfo.properties ?: mapOf())
singletonOutput.send(GenericMessage(
ConfirmInfo(taskInfo.id, taskInfo.uid, job != null, receivedTime, Date())
))
}
}
复制代码
集群全体执行任务与单例任务的区别只在stream的配置,一个需要声明binding的group,一个不需要,这属于Spring Cloud Stream的知识范畴,可以自己看官方文档或查看我前面提到的文档,如果有不懂的可以私聊我。
stream的事件流声明
/**
* 定时任务信息的事件流接口
* @author 吴昊
* @since 0.1.0
*/
interface AegisScheduleClient {
companion object {
const val CLUSTER_INPUT = "aegisScheduleClusterInput"
const val SINGLETON_INPUT = "aegisScheduleSingletonInput"
const val CONFIRM_OUTPUT = "aegisScheduleGroupOutput"
}
/**
*
* @return
*/
@Input(CLUSTER_INPUT)
fun scheduleInput(): SubscribableChannel
/**
*
* @return
*/
@Input(SINGLETON_INPUT)
fun singletonScheduleInput(): SubscribableChannel
/**
*
* @return
*/
@Output(CONFIRM_OUTPUT)
fun confirmOutput(): MessageChannel
}
复制代码
最后再加上服务端确认消息的接收代码:
@StreamListener(CONFIRM_INPUT)
fun acceptGroupTask(confirmInfo: ConfirmInfo) {
LOG.info("接收到确认消息:$confirmInfo")
scheduleTaskService.confirm(confirmInfo)
}
复制代码
主要的代码已经全部放上来了,整体思路也很简单,后面仍有很多需要优化的地方,例如消息推送失败,或者确认消息未送达等等,于整体设计并没有多大的影响了。
这样在微服务端如果需要添加定时任务,只需要
- 引入starter
- 实现ScheduledJob接口
- 在任务调度中心添加任务
至于在任务中心添加任务,主题代码有了,实现个简单管理界面很容易对不对,也就几个字段的输入。
最后附上管理界面的截图:
任务列表
任务详情
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Intersectional Internet
Safiya Umoja Noble、Brendesha M. Tynes / Peter Lang Publishing / 2016
From race, sex, class, and culture, the multidisciplinary field of Internet studies needs theoretical and methodological approaches that allow us to question the organization of social relations that ......一起来看看 《The Intersectional Internet》 这本书的介绍吧!