内容简介:鉴于对Spring实现的@Scheduled的调度和SchedulerFactoryBean的研究发现,基于Spring的调度封装虽满足了大多需求,但为了简化使用方式使得Job并不容易得到控制,导致开发对Job的控制和运维成本上升;下面是本人基于Quartz和Spring及Annotation开发的单机版调度配置,满足单机调度的大部分需求和管理、运维操作并解放对配置文件的繁琐操作;通过页面可对作业进行统一的监控和管理(触发、暂停、恢复、动态添加、参数下发)及报警等操作;简要列出以下功能点:
鉴于对Spring实现的@Scheduled的调度和SchedulerFactoryBean的研究发现,基于Spring的调度封装虽满足了大多需求,但为了简化使用方式使得Job并不容易得到控制,导致开发对Job的控制和运维成本上升;下面是本人基于Quartz和Spring及Annotation开发的单机版调度配置,满足单机调度的大部分需求和管理、运维操作并解放对配置文件的繁琐操作;
功能点描述
功能点 | Spring @Scheduled | 自定义@SchedulerJob |
---|---|---|
可控制 | 否 | 是 |
可运维 | 否 | 是 |
可页面化 | 否 | 是 |
可统一跟踪业务状态 | 否 | 是 |
可统一跟踪调度状态 | 否 | 是 |
支持cron表达式 | 是 | 是 |
支持类似ScheduledExecutorService的定时调度 | 是 | 否 |
代码演示
- 基于注解进行作业配置
@Slf4j(topic = "dynamic-datasource") @Component public class DetectJob { /** * 作业配置 value=作业名,group=作业所属组,init=true为容器创建完毕时立即触发 */ @SchedulerJob(value = "detectDataSource",cron = "${cron.detect.data.source}",group = "dynamic-datasource", descrption="动态数据源切换",init = true) public void detectDataSource(){ log.info(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"dynamic-datasource","detectDataSource")); } } 复制代码
##cron表达式 cron.detect.data.source=1 * * * * ? 复制代码
- 代码执行效果
页面演示
通过页面可对作业进行统一的监控和管理(触发、暂停、恢复、动态添加、参数下发)及报警等操作;
简要列出以下功能点:
- 作业展示
- 作业运维报警
- 作业参数下发
- 作业事件跟踪
设计思路
- 应当满足什么业务场景
- 如何简化操作、降低开发成本
- 如何对业务、系统功能进行监控、控制、运维
- 如何设计才能便于后期业务和功能的扩展
功能设计
- 设计思路
- 如何获取方法上的注解及配置
- 如何实现通过Quartz定时执行注解方法
- 如何对每个方法上的注解进行统一的资源管理和监控、控制、运维
- 如何对调度进行性能的优化
- 功能点分析
- 基本调度
- 初始化立即调度
- 人工或系统控制调度(任务创建后不执行调度,控制权交给外部)
- 定时执行调度(及按照指定cron配置周期调度)
- 是否可并发执行
- 资源管理
- 统一管理系统内全部的配置资源(作业所属组、描述、cron表达式、是否开启报警、是否开启监控等)
- 调度管理
- 调度状态管理(系统状态、业务状态)
- 调度行为管理
- 作业业务参数下发(弥补业务过失)
- 调度跟踪、业务跟踪
- 调度报警、业务报警
- 基本调度
- 基本功能点实现
- 注解配置
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface SchedulerJob { /** * 作业名 * @return */ String value(); /** * 表达式 * @return */ String cron(); /** * 是否初始化时立即执行 * @return */ boolean init() default false; /** * 是否人为控制 * @return */ boolean control() default false; /** * 所属组 * @return */ String group() default "default"; /** * 作业描述 * @return */ String descrption() default ""; /** * 作业执行器 * @return */ Class jobClass() default SimpleJob.class; } @Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface SchedulerJobs { /** * 注解集 * @return */ SchedulerJob[] value(); } 复制代码
- 调度创建
@Slf4j @Configuration public class SchedulerBean implements InitializingBean, DisposableBean { private Scheduler scheduler; @Value("#{schdulerProperties['quartz.thread.count']}") private String threadCount; @Override public void destroy() throws Exception { scheduler.shutdown(); } @Override public void afterPropertiesSet() throws Exception { createScheduler(); } /** * 创建调度 * @throws SchedulerException */ public void createScheduler() throws SchedulerException { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(getBaseQuartzProperties()); this.scheduler = factory.getScheduler(); } /** * 作业配置 * @return */ private Properties getBaseQuartzProperties() { Properties result = new Properties(); result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName()); result.put("org.quartz.threadPool.threadCount", threadCount); result.put("org.quartz.scheduler.threadName", "baiyunpeng-scheduler"); result.put("org.quartz.scheduler.instanceName", "baiyunpeng-scheduler"); result.put("org.quartz.jobStore.misfireThreshold", "1"); return result; } /** * 创建作业 * @param jobParam * @throws SchedulerException */ public void createJob(JobParam jobParam) throws SchedulerException { SchedulerJob schedulerJob = jobParam.getSchedulerJob(); JobDetail jobDetail = JobBuilder.newJob(schedulerJob.jobClass()) .withIdentity(jobParam.getJobKey()) .withDescription(jobParam.getJobKey().getName()) .build(); addJobDataMap(jobDetail,jobParam.getTarget(),jobParam.getTargetMethod()); this.scheduler.scheduleJob(jobDetail,createTrigger(jobParam.getJobKey(),jobParam.getCron())); } /** * 创建触发器 * @param jobKey * @param cron * @return */ private Trigger createTrigger(JobKey jobKey, String cron) { return TriggerBuilder.newTrigger().withIdentity(jobKey.getName(),jobKey.getGroup()) .withSchedule(CronScheduleBuilder.cronSchedule(cron) .withMisfireHandlingInstructionDoNothing()).build(); } /** * 添加作业map * @param jobDetail * @param target * @param targetMethod */ private void addJobDataMap(JobDetail jobDetail, Object target, Method targetMethod) { JobDataMap jobDataMap = jobDetail.getJobDataMap(); jobDataMap.put("executeJob",target); jobDataMap.put("executeMethod",targetMethod); } public Scheduler getScheduler() { return scheduler; } public void start() throws SchedulerException { this.scheduler.start(); } } 复制代码
- 简单的作业执行器创建
/** * 作业抽象类 * @author baiyunpeng */ public abstract class ExecuteJob implements Job { protected Object executeJob; protected Method executeMethod; public void setExecuteJob(Object executeJob) { this.executeJob = executeJob; } public void setExecuteMethod(Method executeMethod) { this.executeMethod = executeMethod; } } /** * 非并发执行 * @author baiyunpeng */ @Slf4j public class SimpleJob extends ExecuteJob { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { try { executeMethod.invoke(executeJob); } catch (IllegalAccessException | InvocationTargetException e) { log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e))); } } } /** * 可并发执行 * @author baiyunpeng */ @Slf4j @PersistJobDataAfterExecution @DisallowConcurrentExecution public class ConcurrentJob extends ExecuteJob{ @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { try { executeMethod.invoke(executeJob); } catch (IllegalAccessException | InvocationTargetException e) { log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e))); } } } 复制代码
- 作业创建
/** * 作业配置解析 * @param scheduled * @param method * @param bean */ protected void processScheduled(SchedulerJob scheduled, Method method, Object bean) { Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass()); String cron = scheduled.cron(); if(StringUtils.hasText(cron)){ if(Objects.nonNull(this.embeddedValueResolver)){ cron = this.embeddedValueResolver.resolveStringValue(cron); } jobParams.add(new JobParam(scheduled,bean,invocableMethod,new JobKey(scheduled.value(),scheduled.group()),cron)); } } /** * 作业初始化 */ private void finishRegister() { if(Objects.isNull(this.schedulerBean)){ SchedulerBean schedulerBean = beanFactory.getBean(SCHEDULER_BEAN, SchedulerBean.class); AssertUtil.assertNull(schedulerBean, SystemErrorCode.NS000000,"the scheduler bean init error"); this.schedulerBean = schedulerBean; try { jobParams.parallelStream().forEach(jobParam -> { try { this.schedulerBean.createJob(jobParam); SchedulerJob schedulerJob = jobParam.getSchedulerJob(); if(!schedulerJob.control()){ if (schedulerJob.init()){ this.schedulerBean.getScheduler().triggerJob(jobParam.getJobKey()); } }else { this.schedulerBean.getScheduler().pauseJob(jobParam.getJobKey()); } } catch (SchedulerException e) { log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e))); System.exit(1); } }); schedulerBean.start(); }catch (Exception e){ log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e))); System.exit(1); } } } 复制代码
总结
- 如何异步执行方法,首先得获取该方法的实例
- 如何定时执行,首先创建并获取定时器
- 如何基于Quarzt监控作业执行,需获Schedule和Jobkey等
后续更新
- 如何统一监控调度状态和业务状态
- 如何解决work线程池被任务阻塞的问题
- 如何做任务补发(note:除了misfire机制外还有哪些做法)
- 如何基于单机调度实现基本的分布式调度
- 分布式调度需要考虑的点有哪些
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Solr单机版的搭建
- Linux安装单机版Redis
- Windows7系统搭建单机版Spark开发环境
- 理解golang调度之一 :操作系统调度
- 理解golang调度之二 :Go调度器
- Golang 源码学习调度逻辑(三):工作线程的执行流程与调度循环
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。