Springboot整合分布式任务调度平台XXL-Job实现定时任务
什么是XXL-Job
xxl-job大众点评的员工徐雪里在15年发布的分布式任务调度平台,是轻量级的分布式任务调度框架,目标是开发迅速、简单、清理、易扩展; 老版本是依赖quartz的定时任务触发,在v2.1.0版本开始 移除quartz依赖
xxl-job将调度行为抽象形成“调度中心"公共平台,而平台自身并不承担业务逻辑,“调度中心"负责发起调度请求。 将任务抽象成分散的JobHandler,交由“执行器“统一管理 “执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑 因此,“调度”和“任务"两部分可以相互解耦,提高系统整体稳定性和扩展性
官网:分布式任务调度平台XXL-JOB
Github:xuxueli/xxl-job: A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)https://github.com/xuxueli/xxl-job
Gitee :xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。https://gitee.com/xuxueli0323/xxl-job
使用场景
在平时的业务场景中,经常有一些场景需要使用定时任务,比如:
- 时间驱动的场景:某个时间点发送优惠券,发送短信等等。
- 批量处理数据:批量统计上个月的账单,统计上个月销售数据等等。
- 固定频率的场景:每隔5分钟需要执行一次。
所以定时任务在平时开发中并不少见,而且对于现在快速消费的时代,每天都需要发送各种推送,消息都需要依赖定时任务去完成,应用非常广泛。
简要介绍
架构图(图片来源xxl-job官网)
调度中心
负责管理调度的信息,按照调度的配置来发出调度请求
支持可视化、简单的动态管理调度信息,包括新建、删除、更新等,这些操作都会实时生效,同时也支持监控调度结果以及执行日志。
执行器
负责接收请求并且执行任务的逻辑。任务模块专注于任务的执行操作等等,使得开发和维护更加的简单与高效
XXL-Job具有哪些特性
调度中心HA(中心式):调度采用了中心式进行设计,“调度中心“支持集群部署,可保证调度中心HA
执行器HA(分布式):任务分布式的执行,任务执行器支持集群部署,可保证任务执行HA
触发策略:有Cron触发、固定间隔触发、固定延时触发、API事件触发、人工触发、父子任务触发
路由策略:执行器在集群部署的时候提供了丰富的路由策略,如:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用LFU、最久未使用LRU、故障转移等等
故障转移:如果执行器集群的一台机器发生故障,会自动切换到一台正常的执行器发送任务调度
Rolling实时日志的监控:支持rolling方式查看输入的完整执行日志
脚本任务:支持GLUE模式开发和运行脚本任务,包括Shell、python、node.js、php等等类型脚本
更多特性可以去官网查看。
为什么需要任务调度平台
在Java中,传统的定时任务实现方案,比如Timer,Quartz等都或多或少存在一些问题:
- 不支持集群、不支持统计、没有管理平台、没有失败报警、没有监控等等
而且在现在分布式的架构中,有一些场景需要分布式任务调度:
- 同一个服务多个实例的任务存在互斥时,需要统一的调度。
- 任务调度需要支持高可用、监控、故障告警。
- 需要统一管理和追踪各个服务节点任务调度的结果,需要记录保存任务属性信息等。
显然传统的定时任务已经不满足现在的分布式架构,所以需要一个分布式任务调度平台,目前比较主流的是elasticjob和xxl-job。
使用xxl-job
安装
直接访问gitee选择自己需要的版本进行下载
或者拉取源代码:
git clone https://gitee.com/xuxueli0323/xxl-job.git
导入项目
导入到IDEA,配置一下Maven,下载相关的jar包,稍等一下后,就可以看到这样的项目结构:
创建数据库
执行doc下db文件夹中的sql脚本,数据库名为xxl_job
修改数据库密码
启动项目
运行XxlJobAdminApplication ,然后Ctrl+鼠标左键点击红色框部分跳转登录页面
账号:admin 密码:123456,登录成功后进入页面
功能介绍
创建执行器
填完信息后点击保存。
创建任务
任务管理(配置执行任务)
示例执行器:所用到的执行器
任务描述:概述该任务是做什么的
路由策略:
第一个:选择第一个机器
最后一个:选择最后一个机器
轮询:依次选择执行
随机:随机选择在线的机器
一致性HASH:每个任务按照Hash算法固定选择某一台机器,并目所有的任务均匀散列在不同的机器上
最不经常使用:使用频率最低的机器优先被使用
最近最久未使用:最久未使用的机器优先被选举
故障转移:按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标的执行器并且会发起任务调度
忙碌转移:按照顺序来依次进行空闲检测,第一个空闲检测成功的机器会被选定为目标群机器,并且会发起任务调度
分片广播:广播触发对于集群中的所有机器执行任务,同时会系统会自动传递分片的参数
Cron:执行规则
调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等
JobHandler:定义执行器的名字
阴塞处理策略:单机串行:新的调度任务在进入到执行器之后,该调度任务进入FIFO队列,并以串行的方式去进行
丢弃后续调度:新的调度任务在进入到执行器之后,如果存在相同的且正在运行的调度任务,本次的调度任务请求就会被丢弃掉,并且标记为失败
覆盖之前的调度:新的调度任务在进入到执行器之后,如果存在相同的且正在运行的调度任务,就会终止掉当前正在运行的调度任务,并且清空队列,运行新的调度任务。
子任务ID:输入子任务的任务id,可填写多个任务超时时间:添加任务超时的时候,单位s,设置时间大于0的时候就会生效
失败重试次数:设置失败重试的次数,设置时间大于0的时候就会生效
负责人:填写该任务调度的负责人
报警邮件:出现报警,则发送邮件
案例
1.创建Springboot项目
2.添加xxl-job依赖
<!--xxl-job 依赖-->
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.4.0</version>
</dependency>
3.编写简单的任务调度例子
在项目的resources文件夹下新增logback.xml 文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="1 seconds"><contextName>logback</contextName><property name="log.path" value="./data/logs/xxl-job/xxl-job-executor-sample-springboot.log"/><appender name="console" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${log.path}</file><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern></rollingPolicy><encoder><pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern></encoder></appender><root level="info"><appender-ref ref="console"/><appender-ref ref="file"/></root></configuration>
4.配置xxl-job
# web port
server:port: 8081# log config
logging:config: classpath:logback.xml### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl:job:admin:addresses: http://127.0.0.1:8080/xxl-job-adminaccessToken: default_tokenexecutor:#执行器app名称,必须和控制台那边配置一样,不然注册不上去appname: xxl-job-test address: # default use address to registry , otherwise use ip:port if address is nullip: # xxl-job executor server-infoport: 9999logpath: ./data/logs/xxl-job/jobhandler#日志保存天数logretentiondays: 30
5.编写配置类(XxlJobConfig )
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}/*** 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;** 1、引入依赖:* <dependency>* <groupId>org.springframework.cloud</groupId>* <artifactId>spring-cloud-commons</artifactId>* <version>${version}</version>* </dependency>** 2、配置文件,或者容器启动变量* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'** 3、获取IP* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();*/}
6.创建你的第一个xxl-job分布式调度任务
XxlJob开发示例(Bean模式)开发步骤:1、任务开发:在Spring Bean实例中,开发Job方法;2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;注:@XxlJob中"自定义jobhandler名称"需要对应下图中的JobHandler,init和destroy选传,默认为空
创建任务的handler
package com.cczj.jobhandler;import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class MyJobHandler {/*** 1、简单任务示例(Bean模式)*/@XxlJob(value = "demoJobHandler",init = "init",destroy = "destroy")public ReturnT<String> demoJobHandler() throws Exception {log.info("XXL-JOB, Hello World!"+ LocalDateTime.now());return ReturnT.SUCCESS;}/*** 2、带参数的任务示例(Bean模式)*/@XxlJob(value = "paramJobHandler", init = "init", destroy = "destroy")public ReturnT<String> paramJobHandler(String param) throws Exception {log.info("XXL-JOB, Hello World!"+ LocalDateTime.now());log.info("XXL-JOB, param: " + param);return ReturnT.SUCCESS;}/*** 3、阻塞任务示例(Bean模式)* 阻塞任务会导致调度线程阻塞,直到任务执行完成,适合耗时任务。*/@XxlJob(value = "blockJobHandler", init = "init", destroy = "destroy")public ReturnT<String> blockJobHandler() throws Exception {log.info("XXL-JOB, Hello World!"+ LocalDateTime.now());TimeUnit.SECONDS.sleep(3);log.info("XXL-JOB, Block job executed.");return ReturnT.SUCCESS;}/*** 4、初始化方法*/public void init(){log.info("XXL-JOB,init调用成功.");}/*** 5、销毁方法*/public void destroy(){log.info("XXL-JOB,destroy调用成功.");}}
配置任务管理
启动项目!项目启动成功,我们就可以在执行器里看到OnLine机器地址已经有了数据。
点击查看,就可以看到任务已经从我们本地的地址自动注册了 执行定时任务
这样我们就成功完成了xxl-job的使用,控制台也打印了对应的信息
如果想要一直执行定时任务,只需点击启动
然后状态变为RUNNING则代表该定时任务处于运行状态
可以看到控制台一直在打印信息
这样,我们就完成了xxl-job的简单使用。