当前位置: 首页 > news >正文

定时任务上云改造方案

优质博文:IT-BLOG-CN

一、Job单元化

什么Job需要单元化:所有会往单元化DB更新/删除/插入数据的Job都需要进行单元化改造。

什么是单元化DB
【1】指配置为DRC双向复制的DB
【2】单元化的DB部署在多个Zone,每个Zone内的实例都会被读写,以便达到:1)Zone故障时无需等待DB切换;2)请求的处理链路完全封闭在Zone内,低延迟;不同Zone负责不同的数据,例如按照OrderId%100进行Shard流量,ShardZone是多对一关系,以保证多个Zone间数据的双向复制不会产生冲突。

上云改造方案

【1】Job部署在所有Zone,调用所有Zone中的实例,每个ZoneJob实例只处理属于本Zone的数据,不处理其他Zone的数据。
【2】当流量切分规则发生变化时,终止当前调度,重新获取当前Job所在Zone应该处理哪些Shard并用这些Shard的进度最小值作为新的进度开始调度。同时需要保证Job逻辑的幂等性,防止跳跃性数据修改规则后重复执行。

为什么当规则发生变化时,取最小值和保证Job数据的幂等性:
举例:SH处理Shard S1S2SIN处理S0,当需要把S1SH切换至SIN时,此时SH的进度是3,SIN的进度是7。在SIN响应流量切换规则变化时,发现自己此时可以处理Shard S0S1的数据,如果按照之前的进度7来进行,就会漏掉SH未处理的5。同时,JOB逻辑需要幂等。

Job为什么要ShardingZone执行部署数据:
因为DB进行了单元化改造,读写操作都在ZoneMaster上进行,上云前的框架都是依赖一个全局唯一的DB Master进行一些互斥和可见性的操作,所以DB单元化后JOB执行的数据也需要分Zone执行。

改造后的Job

确定数据是否归属当前Zone处理: 将处理的数据orderId传给单元化组件UCS客户端API判断。

流量切分规则变化:Zone故障或者Zone之间流量比例调整时,Shard->Zone的映射关系会发生变化,每个Zone内的Job实例处理的数据也会发生变化。
【1】QSchedule JobQSchedule会通知每个ZoneJob实例终止当前调度并发起一次新的调度,Job实例通过QSchedule传递过来的上下文获取当前Job实例应该处理哪些Shard并重新开始业务处理逻辑。
【2】非QSchedule JobJob实例需要自行监听UCS客户端的策略变化,收到通知后终止当前业务处理逻辑,然后通过UCS客户端重新获取当前Job实例应该处理哪些Shard并重新开始业务处理逻辑。

代码逻辑改动

每次调度执行一次批量计算的Job

改造前

public void startQSchedule(QScheduleContext ctx) {// 从 Offset 存储中获取上次扫描过的 OffsetOffset startOffset = offsetDao.queryStartOffset();// 根据 Offset 获取本地调度需要处理的数据List<SourceItem> sourceItems = sourceDao.selectBatch(startOffset, maxItemCount);// 业务逻辑处理process(sourceItems);// 计算并保存最新 OffsetoffsetDao.save(calcucateLatestOffset(startOffset, sourceItems));
}

改造后

public void startQSchedule(QScheduleContext ctx) {// #改造点#  通过 ctx 中本次调度负责的 Shards, 计算本次调度的起始 Offset (负责的Shards中最小的Offset)Offset startOffset = offsetDao.queryStartOffset();// 根据 Offset 获取本地调度需要处理的数据List<SourceItem> sourceItems = sourceDao.selectBatch(startOffset, maxItemCount);// #改造点# 使用 RouteContextBuilders 作为 UCS 路由 过滤本次调度处理的数据List<SourceItem> filteredItems = UCSHelper.filterCurrentZoneItems(ctx, sourceItems, RouteContextBuilders.Builder()...build());// 业务逻辑处理process(filteredItems);// #改造点# 计算并保本次调度完成后的最新 Offset,并映射到本次调度时单元化策略涉及到的 ShardUCSHelper.saveOffset(ctx, calcucateLatestOffset(startOffset, sourceItems));
}

Long-Running Job:指Job启动函数里存在无限循环,触发调度后,除非循环条件不满足,否则会一直持续运行,每次循环需要做的事情由业务逻辑自行控制,典型的Long-Running Job

改造前

public void startJob(QScheduleContext ctx) {// 从 Offset 存储中获取上次扫描过的 OffsetOffset startOffset = offsetDao.queryStartOffset();// 这里是无限循环的条件,使用QSchedule提供的API来判断是否本次调度已被终止while (!ctx.isStopped()) {// 根据 Offset 获取本地调度需要处理的数据List<SourceItem> sourceItems = sourceDao.selectBatch(startOffset, maxItemCount);// 业务逻辑处理process(sourceItems, latestScanedOffset);// 等待一段时间TimeUnit.SECONDS.sleep(1);}
}

改造后

public void startJob(QScheduleContext ctx) {// #改造点# 通过ctx中本次调度应该负责的Shard,计算本次调度的起始 Offset,一般是用多个 Shard 的最小 OffsetOffset startOffset = UCSHelper.calculateMinimumOffsetOfShards(ctx);// 这里是无限循环的条件,使用QSchedule提供的API来判断是否本次调度已被终止// 当单元化策略发生变化时,调度将会停止,等待 QSchedule Server 再次启动调度,此时会更新 ctx 中的 Shards 信息while (!ctx.isStopped()) {// 根据 Offset 获取本地调度需要处理的数据List<SourceItem> sourceItems = sourceDao.selectBatch(startOffset, maxItemCount);// #改造点# 使用 RouteContextBuilders 作为 UCS 路由 过滤本次调度处理的数据List<SourceItem> filteredItems = UCSHelper.filterCurrentZoneItems(ctx, sourceItems, RouteContextBuilders.Builder()...build());// 业务逻辑处理 process(filteredItems);// #改造点# 计算并更新本次调度完成后的最新 OffsetstartOffset = calcucateLatestOffset(startOffset, filteredItems);// #改造点#UCSHelper.saveOffset(ctx, startOffset);// 等待一段时间TimeUnit.SECONDS.sleep(1);}
}

正确性保证

【1】QSchedule调度上下文会封装本次调度涉及到的Shard信息。多次调度过程看到的Shard信息会随单元化策略变化而变化。一次完整的调度过程内部使用的Shard信息是一致的,不会因为单元化策略变化而变化。
【2】调度过程中,下面三个步骤使用到的Shard信息是完全相同的,保证Shard信息的原子性:一下三个步骤如果检测到单元化策略发生变化,会及时终止本次调度,等待QSchedule服务端再次调度。
  ● 根据Shard获取对应的StartOffset
  ● 根据Shard过滤扫描到的数据;
  ● 保存处理的最新Offset和上述Shard的对应变化;

二、PaaS基础组件多IDC接入

根据RegionCode确定数据所在Region,使得常用的数据查询或业务处理操作可以在单个节点上执行,以达到数据单元化处理和数据合规策略动态调整的效果,从而避免跨节点带来额外性能消耗和数据跨境合规问题。

分布式调度中心: 因为业务中大部分JOB都是通过扫表来对数据进行批量处理,所以多IDC场景下则基于存储的RegionCode将任务分散到多个IDC,数据经过单元化过滤后,进行分片处理。


http://www.mrgr.cn/news/37215.html

相关文章:

  • HI3521DV200 22AP10/SS524V100 芯片及开发板
  • GNU链接器(LD):PHDRS 命令用法及实例详解
  • 解决图片放大模糊
  • 多线程计算π
  • C语言 | Leetcode C语言题解之第440题字典序的第K小数字
  • LM393 电压比较器和典型电路
  • DSP——从入门到放弃系列——多核导航器(持续更新)
  • C++中的动态图形与音频同步:实现罗盘时钟与音乐播放器
  • Flask 本地测试完成,如何部署到网络上,买什么样的空间
  • HarmonyOS异常处理实践
  • VC++同时处理ANSI和Unicode字符集,除了使用TCHAR和_T()宏外,还有其他方法可以实现吗?
  • 基于51单片机的方向盘模拟系统
  • 【学习笔记】手写 Tomcat 七
  • 算法学习021 c++有多少张桌子 并查集算法学习 中小学算法思维学习 比赛算法题解 信奥算法解析
  • TMR技术的发展及其应用技术的介绍
  • PDF 秒变 JPG,2024 这些工具来助力
  • 2024四川省赛 The 2024 Sichuan Provincial Collegiate Programming Contest补题记录
  • Java | Leetcode Java题解之第440题字典序的第K小数字
  • 增量式编码器实现原理
  • Materials - 基础视差原理