当前位置: 首页 > news >正文

Elasticjob在同一个实例上运行的多个分片任务

 一、#创作灵感#

elasticjob在同一个实例上运行的多个分片任务(实例数量小于分片数量)

  1. 很多贴子、AI说可以
  2. 实际不可以,很可能是官方说明翻译时出现了误差
  3. 每个分片由一个独立的作业服务器执行,可以同时处理多个分片,实现作业的并行执行(同时、并行都是指不同的服务器/实例之间并行)
  4. 网上的文章在部分没有写明代码对应的elasticjob版本,导致可读性差就成了鸡肋

需求:作业需要分多片,但不不想启动太多pod/实例

方案:当然这是有解决方案的。本文作者分享编程、配置两种实现方案

二、环境

- SpringBoot 2.7.18   官方下载地址:SpringBoot 2.7.18

- ElasticJob 3.0.4   官方下载地址:ElasticJob 3.0.4

- ElasticJob-UI 3.0.2 图形化工具  官方下载地址:ElasticJob-UI 3.0.2

- Nacos 2.2.3   官方下载地址:Nacos 2.2.3

- Oracle JDK8u202(Oracle JDK8最后一个非商业版本)   下载地址:Oracle JDK8u202

三、方案

1、使用不同的Job名称

为每个任务定义不同的Job名称。虽然这不一定直接解决在同一实例上运行多个分片的问题,但它可以帮助你更清晰地管理和识别不同的任务。

高版本(ElasticJob 3.0.4+)

    /*** 作业配置1* @return*/@Bean(name="myJob1")public JobConfiguration jobConfiguration1() {return JobConfiguration.newBuilder("myJob1", 2 ).shardingItemParameters("0=Beijing,1=Shanghai").jobParameter("param=value").build();}/*** 作业配置2* @return*/@Bean(name="myJob2")public JobConfiguration jobConfiguration2() {return JobConfiguration.newBuilder("myJob2", 2 ).shardingItemParameters("0=Beijing,1=Shanghai").jobParameter("param=value").build();}

低版本

// 第一个任务
SimpleJobConfiguration firstJobConfig = SimpleJobConfiguration.newBuilder(MyJob.class).cron(cron1).shardingTotalCount(2).jobParameter("param1").build();// 第二个任务
SimpleJobConfiguration secondJobConfig = SimpleJobConfiguration.newBuilder(MyJob.class).cron(cron2).shardingTotalCount(2).jobParameter("param2").build();

2、配置多个分片在同一JVM中执行

在ElasticJob中,你可以通过配置JobBootstrap来指定在一个JVM进程中运行多个分片。这通常通过在JobBootstrap中配置多个SimpleJobConfiguration对象实现。

高版本(ElasticJob 3.0.4+)

private void init(){ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "elastic_job_demo"));regCenter.init();JobConfiguration cfg1 = JobConfiguration.newBuilder("saasHealthJob1", 2 ).shardingItemParameters("0=A:01,1=B:01").jobParameter("param=value1").build();JobScheduler jobScheduler1 = new JobScheduler( regCenter, myJob, cfg1 );jobScheduler1.getJobScheduleController().scheduleJob("0 0/1 * * * ?","GMT+8:00");JobConfiguration cfg2 = JobConfiguration.newBuilder("saasHealthJob2", 2 ).shardingItemParameters("0=A:02,1=B:02").jobParameter("param=value2").build();JobScheduler jobScheduler2 = new JobScheduler( regCenter, myJob, cfg2 );jobScheduler2.getJobScheduleController().scheduleJob("0 0/1 * * * ?","GMT+8:00");}

低版本

List<SimpleJobConfiguration> jobConfigurations = new ArrayList<>();
jobConfigurations.add(firstJobConfig);
jobConfigurations.add(secondJobConfig);JobBootstrap.getInstance().setJobConfigNames("firstJobConfig", "secondJobConfig").initialize();

3、配置方式实现

这种方式只能是把类复制一份(改个名,代码一样)

# job1
elasticjob.jobs.ESLoadJob.elasticJobClass=person.dand.job.MyJob1
elasticjob.jobs.ESLoadJob.shardingTotalCount=2
elasticjob.jobs.ESLoadJob.sharding-item-parameters=0=A
elasticjob.jobs.ESLoadJob.jobErrorHandlerType=LOG
elasticjob.jobs.ESLoadJob.cron=0 0/1 * * * ?
elasticjob.jobs.ESLoadJob.jobExecutorServiceHandlerType=SINGLE_THREAD
elasticjob.jobs.ESLoadJob.misfire=false
elasticjob.jobs.ESLoadJob.failover=false
elasticjob.jobs.ESLoadJob.disabled=false
elasticjob.jobs.ESLoadJob.overwrite=true
elasticjob.jobs.ESLoadJob.description=XXXX任务1# job2
elasticjob.jobs.ESLoadJob.elasticJobClass=person.dand.job.MyJob2
elasticjob.jobs.ESLoadJob.shardingTotalCount=2
elasticjob.jobs.ESLoadJob.sharding-item-parameters=1=B
elasticjob.jobs.ESLoadJob.jobErrorHandlerType=LOG
elasticjob.jobs.ESLoadJob.cron=0 0/1 * * * ?
elasticjob.jobs.ESLoadJob.jobExecutorServiceHandlerType=SINGLE_THREAD
elasticjob.jobs.ESLoadJob.misfire=false
elasticjob.jobs.ESLoadJob.failover=false
elasticjob.jobs.ESLoadJob.disabled=false
elasticjob.jobs.ESLoadJob.overwrite=true
elasticjob.jobs.ESLoadJob.description=XXXX任务2

四、注意事项

  • 资源管理:在同一实例上运行多个分片任务可能会增加资源使用(如CPU、内存等),确保你的服务器硬件能够支持这种配置。

  • 任务隔离:尽管ElasticJob允许在同一JVM中运行多个任务,但在设计时应确保各个任务之间不会互相干扰,例如通过合理的日志记录、资源锁定等措施。

  • 监控与日志:由于多个任务可能同时运行,确保有适当的监控和日志记录机制来跟踪每个任务的执行情况。

  • 配置管理:合理管理配置文件,确保每个任务的配置(如分片数量、执行策略等)正确无误。

五、示例代码整合 

下面是一个整合上述配置的示例:

高版本示例(ElasticJob 3.0.4+)

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduler;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Description: java代码创建elasticjob作业*               针对3个按资源池和集群分片的job(有6片),为了不用启6个实例,实现6个分片在两个实例上并行执行* @Author: dand* @CreateDate: 2025/2/13 09:20* @Version: 1.0*/
@Configuration
public class ElasticJobConfig {@AutowiredMyJob myJob;private void init(){ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "elastic_job_demo"));regCenter.init();JobConfiguration cfg1 = JobConfiguration.newBuilder("myJob1", 2 ).shardingItemParameters("0=A:01,1=B:01").jobParameter("param=value1").build();JobScheduler jobScheduler1 = new JobScheduler( regCenter, myJob, cfg1 );jobScheduler1.getJobScheduleController().scheduleJob("0 0/1 * * * ?","GMT+8:00");JobConfiguration cfg2 = JobConfiguration.newBuilder("myJob2", 2 ).shardingItemParameters("0=A:02,1=B:02").jobParameter("param=value2").build();JobScheduler jobScheduler2 = new JobScheduler( regCenter, myJob, cfg2 );jobScheduler2.getJobScheduleController().scheduleJob("0 0/1 * * * ?","GMT+8:00");}//    /**
//     * 配置注册中心(这里使用 Zookeeper)
//     * @return
//     */
//    @Bean
//    public ZookeeperRegistryCenter registryCenter() {
//        ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "my-elastic-job");
//        ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
//        regCenter.init();
//        return regCenter;
//    }
//
//    /**
//     * 作业配置1
//     * @return
//     */
//    @Bean(name="myJob1")
//    public JobConfiguration jobConfiguration1() {
//
//        return JobConfiguration.newBuilder("myJob1", 2 )
//                .shardingItemParameters("0=Beijing,1=Shanghai")
//                .jobParameter("param=value")
//                .build();
//    }
//
//    /**
//     * 作业配置2
//     * @return
//     */
//    @Bean(name="myJob2")
//    public JobConfiguration jobConfiguration2() {
//
//        return JobConfiguration.newBuilder("myJob2", 2 )
//                .shardingItemParameters("0=Beijing,1=Shanghai")
//                .jobParameter("param=value")
//                .build();
//    }
}

低版本示例

public class ElasticJobApplication {public static void main(String[] args) {SimpleJobConfiguration firstJobConfig = SimpleJobConfiguration.newBuilder(MyJob.class).cron("0/5 * * * * ?") // 每5秒执行一次.shardingTotalCount(2) // 分片总数为2.jobParameter("param1") // 参数设置.build();SimpleJobConfiguration secondJobConfig = SimpleJobConfiguration.newBuilder(MyJob.class).cron("0/10 * * * * ?") // 每10秒执行一次.shardingTotalCount(2) // 分片总数为2.jobParameter("param2") // 参数设置.build();List<SimpleJobConfiguration> jobConfigurations = new ArrayList<>();jobConfigurations.add(firstJobConfig);jobConfigurations.add(secondJobConfig);JobBootstrap.getInstance().setJobConfigurations(jobConfigurations).initialize();}
}

通过以上设置,你可以在同一实例上高效地运行多个分片任务。

相关文章推荐:

多数据源SpringBoot应用集成ElasticJob3.0.4并实现作业事件追踪【最佳实践】_elastic job 3.-CSDN博客


http://www.mrgr.cn/news/90503.html

相关文章:

  • 本地部署DeepSeek-R1(Mac版)
  • Java中关于JSON的基本使用
  • 面向服务架构(SOA)详细介绍
  • Redis企业开发实战(四)——点评项目之分布式锁
  • 【Leetcode 热题 100】287. 寻找重复数
  • 以简单的方式过一生
  • Flask Web开发的重要概念和示例
  • 洛谷 P4552 [Poetize6] IncDec Sequence
  • 阿里云 DeepSeek 模型部署与使用技术评测
  • 机器学习 - 机器学习模型的评价指标
  • 十大知识领域中涉及到的工具与技术(三)
  • 数据类型/运算符/输入与输出/注释
  • 【云安全】云原生-K8S API Server 未授权访问
  • Openssl的使用,CA证书,中间证书,服务器证书的生成与使用
  • ffmpeg学习:ubuntu下编译Android版ffmpeg-kit
  • java lambda表达式
  • Cherry Studio 连接私域deepseek-r1模型搭建私域知识库和智能体(也可使用第三方模型)
  • 从零开始搭建一个英语学习网站
  • 国产ARM处理器工控机如何助力企业实现自主可控?
  • JVM学习
  • 微服务与网关
  • 【环境安装】重装Docker-26.0.2版本
  • DeepSeek渣机部署编程用的模型,边缘设备部署模型
  • 微信小程序医院挂号系统
  • tkinter-TinUI-xml实战(12)应用组启动器
  • windows系统远程桌面连接ubuntu18.04