1. Spring 定时任务的简单实现
在Spring Boot
中使用定时任务,只需要@EnableScheduling
开启定时任务支持,在需要调度的方法上添加@Scheduled
注解。这样就能够在项目中开启定时调度功能了,支持通过cron、fixedRate、fixedDelay等灵活的控制执行周期和频率。
1.1 缺点
周期一旦指定,想要更改必须要重启应用
1.2 需求
热更新定时任务的执行周期,基于cron表达式并支持外部存储,如数据库,nacos等
最小改造兼容现有的定时任务(仅需添加一个注解)
动态增加定时任务
2.Spring 定时任务源码分析
2.1 @EnableScheduling
引入了配置类 SchedulingConfiguration
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Import(SchedulingConfiguration.class)@Documentedpublic@interfaceEnableScheduling{}
2.2 SchedulingConfiguration
只配置了一个bean,ScheduledAnnotationBeanPostProcessor
从名字就知道该类实现BeanPostProcessor
接口
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}
2.3 ScheduledAnnotationBeanPostProcessor
的postProcessAfterInitialization
实现,可见具体处理@Scheduled
实现定时任务的是processScheduled
方法
@OverridepublicObjectpostProcessAfterInitialization(Objectbean,StringbeanName){if(beaninstanceofAopInfrastructureBean||beaninstanceofTaskScheduler||beaninstanceofScheduledExecutorService){//IgnoreAOPinfrastructuresuchasscopedproxies.returnbean;}Class<?>targetClass=AopProxyUtils.ultimateTargetClass(bean);if(!this.nonAnnotatedClasses.contains(targetClass)&&AnnotationUtils.isCandidateClass(targetClass,Arrays.asList(Scheduled.class,Schedules.class))){//获取bean的方法及@Scheduled映射关系Map<Method,Set<Scheduled>>annotatedMethods=MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>)method->{Set<Scheduled>scheduledMethods=AnnotatedElementUtils.getMergedRepeatableAnnotations(method,Scheduled.class,Schedules.class);return(!scheduledMethods.isEmpty()?scheduledMethods:null);});if(annotatedMethods.isEmpty()){this.nonAnnotatedClasses.add(targetClass);if(logger.isTraceEnabled()){logger.trace("No@Scheduledannotationsfoundonbeanclass:"+targetClass);}}else{//Non-emptysetofmethodsannotatedMethods.forEach((method,scheduledMethods)->//处理@Scheduled注解scheduledMethods.forEach(scheduled->processScheduled(scheduled,method,bean)));if(logger.isTraceEnabled()){logger.trace(annotatedMethods.size()+"@Scheduledmethodsprocessedonbean'"+beanName+"':"+annotatedMethods);}}}returnbean;}
2.4 以下仅贴出ScheduledAnnotationBeanPostProcessor.processScheduled
处理cron
表达式的关键实现,
privatefinalScheduledTaskRegistrarregistrar;publicScheduledAnnotationBeanPostProcessor(){this.registrar=newScheduledTaskRegistrar();}protectedvoidprocessScheduled(Scheduledscheduled,Methodmethod,Objectbean){try{//将定时任务方法,转为RunnableRunnablerunnable=createRunnable(bean,method);booleanprocessedSchedule=false;Set<ScheduledTask>tasks=newLinkedHashSet<>(4);//Determineinitialdelay//处理scheduled.initialDelay()的值,略过...//CheckcronexpressionStringcron=scheduled.cron();if(StringUtils.hasText(cron)){Stringzone=scheduled.zone();if(this.embeddedValueResolver!=null){//${}变量值表达式的转换cron=this.embeddedValueResolver.resolveStringValue(cron);zone=this.embeddedValueResolver.resolveStringValue(zone);}if(StringUtils.hasLength(cron)){Assert.isTrue(initialDelay==-1,"'initialDelay'notsupportedforcrontriggers");processedSchedule=true;if(!Scheduled.CRON_DISABLED.equals(cron)){TimeZonetimeZone;if(StringUtils.hasText(zone)){timeZone=StringUtils.parseTimeZoneString(zone);}else{timeZone=TimeZone.getDefault();}//创建cron触发器CronTrigger对象,并注册CronTasktasks.add(this.registrar.scheduleCronTask(newCronTask(runnable,newCronTrigger(cron,timeZone))));}}}//处理fixedDelay和fixedRate,及ScheduledTask保存用于销毁,略过...}//略过catchException...}
以上通过this.registrar.scheduleCronTask
实现cron定时任务注册或初始化
3.动态定时任务的实现
实现思路: 重写ScheduledAnnotationBeanPostProcessor.processScheduled
方法,修改处理cron的部分代码,使用this.registrar.scheduleTriggerTask
注册或初始化定时任务
3.1 相关类图
classDiagramDisposableBean<|--DynamicCronScheduleTaskManagerEnvironmentAware<|--EnvironmentDynamicCronHandlerAbstractDynamicCronHandler<|--EnvironmentDynamicCronHandlerTrigger<|--DynamicCronTriggerEnvironmentAware:+setEnvironment()DisposableBean:+destroy()voidTrigger:+nextExecutionTime(TriggerContexttriggerContext)DateclassDynamicCronScheduleTaskManager{+Map<String,ScheduledTask>dynamicScheduledTaskMap-ScheduledTaskRegistrarregistrar+addTriggerTask(StringcronName,TriggerTasktask)ScheduledTask+contains(StringcronName)boolean+updateTriggerTask(StringcronName)void+removeTriggerTask(StringcronName)void}classAbstractDynamicCronHandler{-DynamicCronScheduleTaskManagerdynamicCronScheduleTaskManager;+getCronExpression(StringcronName)String+updateTriggerTash(StringcronName)void}classEnvironmentDynamicCronHandler{+Environmentenvironment+environmentChangeEvent(EnvironmentChangeEventevent)void}classDynamicCronTrigger{-StringcronName-AbstractDynamicCronHandlerdynamicCronHandler-StringcronExpression-CronSequenceGeneratorsequenceGenerator}classScheduledDynamicCron{+value()String+cronName()String+handler()Class<?extendsAbstractDynamicCronHandler>}
3.2 DynamicCronScheduleTaskManager
importorg.springframework.beans.factory.DisposableBean;importorg.springframework.scheduling.config.ScheduledTask;importorg.springframework.scheduling.config.ScheduledTaskRegistrar;importorg.springframework.scheduling.config.TriggerTask;importjava.util.HashMap;importjava.util.Map;/***@authorHuangJS*@date2021-12-113:04下午*/publicclassDynamicCronScheduleTaskManagerimplementsDisposableBean{privateMap<String,ScheduledTask>dynamicScheduledTaskMap=newHashMap<>();ScheduledTaskRegistrarregistrar;//添加定时任务publicScheduledTaskaddTriggerTask(StringcronName,TriggerTasktask){ScheduledTaskscheduledTask=dynamicScheduledTaskMap.get(cronName);if(scheduledTask!=null){scheduledTask.cancel();}scheduledTask=this.registrar.scheduleTriggerTask(task);dynamicScheduledTaskMap.put(cronName,scheduledTask);returnscheduledTask;}publicbooleancontains(StringcronName){returnthis.dynamicScheduledTaskMap.containsKey(cronName);}//更新定时任务的触发时机publicvoidupdateTriggerTask(StringcronName){ScheduledTaskscheduledTask=dynamicScheduledTaskMap.get(cronName);if(scheduledTask==null){thrownewIllegalStateException("InvalidcronName""+cronName+"",nofundScheduledTask");}scheduledTask.cancel();scheduledTask=this.registrar.scheduleTriggerTask((TriggerTask)scheduledTask.getTask());dynamicScheduledTaskMap.put(cronName,scheduledTask);}//移除定时任务publicvoidremoveTriggerTask(StringcronName){ScheduledTaskscheduledTask=dynamicScheduledTaskMap.remove(cronName);if(scheduledTask!=null){scheduledTask.cancel();}}@Overridepublicvoiddestroy()throwsException{for(ScheduledTaskvalue:dynamicScheduledTaskMap.values()){value.cancel();}this.dynamicScheduledTaskMap.clear();}}
3.3 AbstractDynamicCronHandler
publicabstractclassAbstractDynamicCronHandler{@AutowiredprivateDynamicCronScheduleTaskManagerdynamicCronScheduleTaskManager;/***获取cron表达式*@return*/publicabstractStringgetCronExpression(StringcronName);/***更新cronName对应的定时任务的触发时机*@paramcronName*/publicvoidupdateTriggerTask(StringcronName){dynamicCronScheduleTaskManager.updateTriggerTask(cronName);}}
3.4 EnvironmentDynamicCronHandler
基于Environment,在刷新配置时,自动刷新定时任务的触发时机,支持分布式多节点集群部署。
如,cron表达式配置在nacos,更新nacos上的配置时由于监听了EnvironmentChangeEvent
事件实现了定时任务的触发时机的更新
importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.cloud.context.environment.EnvironmentChangeEvent;importorg.springframework.context.EnvironmentAware;importorg.springframework.context.event.EventListener;importorg.springframework.core.env.Environment;/***@authorHuangJS*@date2021-12-1111:46上午*/publicclassEnvironmentDynamicCronHandlerextendsAbstractDynamicCronHandlerimplementsEnvironmentAware{privatefinalLoggerlogger=LoggerFactory.getLogger(EnvironmentDynamicCronHandler.class);privateEnvironmentenvironment;@OverridepublicStringgetCronExpression(StringcronName){try{returnenvironment.getProperty(cronName);}catch(Exceptione){logger.error(e.getMessage(),e);}returnnull;}@OverridepublicvoidsetEnvironment(Environmentenvironment){this.environment=environment;}@EventListenerpublicvoidenvironmentChangeEvent(EnvironmentChangeEventevent){for(Stringkey:event.getKeys()){if(this.dynamicCronScheduleTaskManager.contains(key)){this.dynamicCronScheduleTaskManager.updateTriggerTask(key);}}}}
3.5 DynamicCronTrigger
publicclassDynamicCronTriggerimplementsTrigger{privatefinalstaticLoggerLOGGER=LoggerFactory.getLogger(DynamicCronTrigger.class);privateStringcronName;privateAbstractDynamicCronHandlerdynamicCronHandler;privateStringcronExpression;privateCronSequenceGeneratorsequenceGenerator;publicDynamicCronTrigger(StringcronName,AbstractDynamicCronHandlerdynamicCronHandler){this.cronName=cronName;this.dynamicCronHandler=dynamicCronHandler;}@OverridepublicDatenextExecutionTime(TriggerContexttriggerContext){StringcronExpression=dynamicCronHandler.getCronExpression(cronName);if(cronExpression==null){returnnull;}if(this.sequenceGenerator==null||!cronExpression.equals(this.cronExpression)){try{this.sequenceGenerator=newCronSequenceGenerator(cronExpression);this.cronExpression=cronExpression;}catch(Exceptione){LOGGER.error(e.getMessage(),e);}}Datedate=triggerContext.lastCompletionTime();if(date!=null){Datescheduled=triggerContext.lastScheduledExecutionTime();if(scheduled!=null&&date.before(scheduled)){//Previoustaskapparentlyexecutedtooearly...//Let'ssimplyusethelastcalculatedexecutiontimethen,//inordertopreventaccidentalre-firesinthesamesecond.date=scheduled;}}else{date=newDate();}returnthis.sequenceGenerator.next(date);}}
3.6 注解类ScheduledDynamicCron
@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic@interfaceScheduledDynamicCron{/***动态cron名称*@return*/@AliasFor("cronName")Stringvalue()default"";/***动态cron名称*@return*/@AliasFor("value")StringcronName()default"";/***动态cron处理类,默认使用基于Environment实现的处理类*@return*/Class<?extendsAbstractDynamicCronHandler>handler()defaultEnvironmentDynamicCronHandler.class;}
3.7 DynamicScheduledAnnotationBeanPostProcessor
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}0
3.8 配置类SchedulerConfiguration
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}1
4. Spring boot使用示例
4.1 配置方式,在启动类中导入配置类SchedulerConfiguration
如下示例:
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}2
4.2 nacos配置cron示例
nacos的配置
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}3
使用@ScheduledDynamicCron
指定cron表达式的配置名cron.name1
,不指定handler()
默认使用EnvironmentDynamicCronHandler
,该类会根据指定的配置名cron.name1
获取nacos
上的cron
表达式
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}4
@Scheduled
仍需要添加,但会忽略其中的cron属性配置
修改nacos
的cron.name1
配置为0/2 * * * * ?
并发布,定时任务会立即由原来的5秒执行一次,变为2秒执行一次
4.3 数据库存储cron示例
扩展AbstractDynamicCronHandler,实现从数据库查询cron表达式
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}5
定时任务类
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}6
定时任务触发时机更新,需要在更新数据库配置时进行更新
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}7
4.4 分布式集群部署服务的定时任务的更新
上面更新数据库配置后,同步更新任务的触发时机,仅在本服务生效,集群中的其他服务节点并不会更新
其他节点的更新可以通过消息总线的方式进行更新,如通过MQ发送广播消息,其它服务节点消费消息后调用以下方法更新任务触发时机
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}8
4.5 添加定时任务
添加任务的web接口
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{@Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){returnnewScheduledAnnotationBeanPostProcessor();}}9
接口执行完成后,定时任务并不会执行,因为还没配置cron.name2
,在nacos配置cron表达式后,定时任务将开始调度
配置nacos后的控制台输出