Elasticjob在同一个实例上运行的多个分片任务
一、#创作灵感#
elasticjob在同一个实例上运行的多个分片任务(实例数量小于分片数量)
- 很多贴子、AI说可以
- 实际不可以,很可能是官方说明翻译时出现了误差
- 每个分片由一个独立的作业服务器执行,可以同时处理多个分片,实现作业的并行执行(同时、并行都是指不同的服务器/实例之间并行)
- 网上的文章在部分没有写明代码对应的elasticjob版本,导致可读性差就成了鸡肋
需求:作业需要分多片,但不不想启动太多pod/实例
方案:当然这是有解决方案的。本文作者分享编程、配置两种实现方案
二、环境
- SpringBoot 2.7.18 官方下载地址:SpringBoot 2.7.18
- ElasticJob 3.0.4 官方下载地址:ElasticJob 3.0.4
- ElasticJob-UI 3.0.2 图形化工具 官方下载地址:ElasticJob-UI 3.0.2
- Nacos 2.2.3 官方下载地址:Nacos 2.2.3
- Oracle JDK8u202(Oracle JDK8最后一个非商业版本) 下载地址:Oracle JDK8u202
三、方案
1、使用不同的Job名称
为每个任务定义不同的Job名称。虽然这不一定直接解决在同一实例上运行多个分片的问题,但它可以帮助你更清晰地管理和识别不同的任务。
高版本(ElasticJob 3.0.4+)
/*** 作业配置1* @return*/@Bean(name="myJob1")public JobConfiguration jobConfiguration1() {return JobConfiguration.newBuilder("myJob1", 2 ).shardingItemParameters("0=Beijing,1=Shanghai").jobParameter("param=value").build();}/*** 作业配置2* @return*/@Bean(name="myJob2")public JobConfiguration jobConfiguration2() {return JobConfiguration.newBuilder("myJob2", 2 ).shardingItemParameters("0=Beijing,1=Shanghai").jobParameter("param=value").build();}
低版本
// 第一个任务
SimpleJobConfiguration firstJobConfig = SimpleJobConfiguration.newBuilder(MyJob.class).cron(cron1).shardingTotalCount(2).jobParameter("param1").build();// 第二个任务
SimpleJobConfiguration secondJobConfig = SimpleJobConfiguration.newBuilder(MyJob.class).cron(cron2).shardingTotalCount(2).jobParameter("param2").build();
2、配置多个分片在同一JVM中执行
在ElasticJob中,你可以通过配置JobBootstrap
来指定在一个JVM进程中运行多个分片。这通常通过在JobBootstrap
中配置多个SimpleJobConfiguration
对象实现。
高版本(ElasticJob 3.0.4+)
private void init(){ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "elastic_job_demo"));regCenter.init();JobConfiguration cfg1 = JobConfiguration.newBuilder("saasHealthJob1", 2 ).shardingItemParameters("0=A:01,1=B:01").jobParameter("param=value1").build();JobScheduler jobScheduler1 = new JobScheduler( regCenter, myJob, cfg1 );jobScheduler1.getJobScheduleController().scheduleJob("0 0/1 * * * ?","GMT+8:00");JobConfiguration cfg2 = JobConfiguration.newBuilder("saasHealthJob2", 2 ).shardingItemParameters("0=A:02,1=B:02").jobParameter("param=value2").build();JobScheduler jobScheduler2 = new JobScheduler( regCenter, myJob, cfg2 );jobScheduler2.getJobScheduleController().scheduleJob("0 0/1 * * * ?","GMT+8:00");}
低版本
List<SimpleJobConfiguration> jobConfigurations = new ArrayList<>();
jobConfigurations.add(firstJobConfig);
jobConfigurations.add(secondJobConfig);JobBootstrap.getInstance().setJobConfigNames("firstJobConfig", "secondJobConfig").initialize();
3、配置方式实现
这种方式只能是把类复制一份(改个名,代码一样)
# job1
elasticjob.jobs.ESLoadJob.elasticJobClass=person.dand.job.MyJob1
elasticjob.jobs.ESLoadJob.shardingTotalCount=2
elasticjob.jobs.ESLoadJob.sharding-item-parameters=0=A
elasticjob.jobs.ESLoadJob.jobErrorHandlerType=LOG
elasticjob.jobs.ESLoadJob.cron=0 0/1 * * * ?
elasticjob.jobs.ESLoadJob.jobExecutorServiceHandlerType=SINGLE_THREAD
elasticjob.jobs.ESLoadJob.misfire=false
elasticjob.jobs.ESLoadJob.failover=false
elasticjob.jobs.ESLoadJob.disabled=false
elasticjob.jobs.ESLoadJob.overwrite=true
elasticjob.jobs.ESLoadJob.description=XXXX任务1# job2
elasticjob.jobs.ESLoadJob.elasticJobClass=person.dand.job.MyJob2
elasticjob.jobs.ESLoadJob.shardingTotalCount=2
elasticjob.jobs.ESLoadJob.sharding-item-parameters=1=B
elasticjob.jobs.ESLoadJob.jobErrorHandlerType=LOG
elasticjob.jobs.ESLoadJob.cron=0 0/1 * * * ?
elasticjob.jobs.ESLoadJob.jobExecutorServiceHandlerType=SINGLE_THREAD
elasticjob.jobs.ESLoadJob.misfire=false
elasticjob.jobs.ESLoadJob.failover=false
elasticjob.jobs.ESLoadJob.disabled=false
elasticjob.jobs.ESLoadJob.overwrite=true
elasticjob.jobs.ESLoadJob.description=XXXX任务2
四、注意事项
-
资源管理:在同一实例上运行多个分片任务可能会增加资源使用(如CPU、内存等),确保你的服务器硬件能够支持这种配置。
-
任务隔离:尽管ElasticJob允许在同一JVM中运行多个任务,但在设计时应确保各个任务之间不会互相干扰,例如通过合理的日志记录、资源锁定等措施。
-
监控与日志:由于多个任务可能同时运行,确保有适当的监控和日志记录机制来跟踪每个任务的执行情况。
-
配置管理:合理管理配置文件,确保每个任务的配置(如分片数量、执行策略等)正确无误。
五、示例代码整合
下面是一个整合上述配置的示例:
高版本示例(ElasticJob 3.0.4+)
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduler;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Description: java代码创建elasticjob作业* 针对3个按资源池和集群分片的job(有6片),为了不用启6个实例,实现6个分片在两个实例上并行执行* @Author: dand* @CreateDate: 2025/2/13 09:20* @Version: 1.0*/
@Configuration
public class ElasticJobConfig {@AutowiredMyJob myJob;private void init(){ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "elastic_job_demo"));regCenter.init();JobConfiguration cfg1 = JobConfiguration.newBuilder("myJob1", 2 ).shardingItemParameters("0=A:01,1=B:01").jobParameter("param=value1").build();JobScheduler jobScheduler1 = new JobScheduler( regCenter, myJob, cfg1 );jobScheduler1.getJobScheduleController().scheduleJob("0 0/1 * * * ?","GMT+8:00");JobConfiguration cfg2 = JobConfiguration.newBuilder("myJob2", 2 ).shardingItemParameters("0=A:02,1=B:02").jobParameter("param=value2").build();JobScheduler jobScheduler2 = new JobScheduler( regCenter, myJob, cfg2 );jobScheduler2.getJobScheduleController().scheduleJob("0 0/1 * * * ?","GMT+8:00");}// /**
// * 配置注册中心(这里使用 Zookeeper)
// * @return
// */
// @Bean
// public ZookeeperRegistryCenter registryCenter() {
// ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "my-elastic-job");
// ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
// regCenter.init();
// return regCenter;
// }
//
// /**
// * 作业配置1
// * @return
// */
// @Bean(name="myJob1")
// public JobConfiguration jobConfiguration1() {
//
// return JobConfiguration.newBuilder("myJob1", 2 )
// .shardingItemParameters("0=Beijing,1=Shanghai")
// .jobParameter("param=value")
// .build();
// }
//
// /**
// * 作业配置2
// * @return
// */
// @Bean(name="myJob2")
// public JobConfiguration jobConfiguration2() {
//
// return JobConfiguration.newBuilder("myJob2", 2 )
// .shardingItemParameters("0=Beijing,1=Shanghai")
// .jobParameter("param=value")
// .build();
// }
}
低版本示例
public class ElasticJobApplication {public static void main(String[] args) {SimpleJobConfiguration firstJobConfig = SimpleJobConfiguration.newBuilder(MyJob.class).cron("0/5 * * * * ?") // 每5秒执行一次.shardingTotalCount(2) // 分片总数为2.jobParameter("param1") // 参数设置.build();SimpleJobConfiguration secondJobConfig = SimpleJobConfiguration.newBuilder(MyJob.class).cron("0/10 * * * * ?") // 每10秒执行一次.shardingTotalCount(2) // 分片总数为2.jobParameter("param2") // 参数设置.build();List<SimpleJobConfiguration> jobConfigurations = new ArrayList<>();jobConfigurations.add(firstJobConfig);jobConfigurations.add(secondJobConfig);JobBootstrap.getInstance().setJobConfigurations(jobConfigurations).initialize();}
}
通过以上设置,你可以在同一实例上高效地运行多个分片任务。
相关文章推荐:
多数据源SpringBoot应用集成ElasticJob3.0.4并实现作业事件追踪【最佳实践】_elastic job 3.-CSDN博客