当前位置: 首页 > news >正文

DataX实战|使用Python 构建简易的DataX数据血缘工具(一)

导读:

在这篇文章中,我介绍了如何使用 Python 构建简易的 DataX 数据血缘工具,以便解决 DataXWeb 在查询表上下游关系时的不足。
选择 Flask 作为框架,利用 DataXWeb 的元数据 job_info 表和 job_json,通过 JSON 解析与 sqllineage 的 SQL 解析能力提取出任务相关的库表信息。

1. 背景

由于我们在 ETL 时全部在 datax 上,在 datax 里写了大量复杂的 sql,导致在排查问题时困难重重,而唯一的可视化页面是 dataxweb。

如果我需要查询一个表的上下游,只能在 dataxweb 里去搜,而 dataxweb 只支持搜任务名,若任务名与表名相同,那算走运;若历史任务名比较特殊,则无从查起。

因此,需要这么一个工具,输入一个表,就能知道这个表出现在哪些 datax 任务中,不管他是 source 表还是 target 表。相当于一个简单的血缘,或者上下游。

那么,如何实现呢?人生苦短,我选 Python。

2. 框架选择

显然这是一个前后端结合的项目,在 python 中,常用的前端框架有两种:

  • flask
  • django

这两个如何选择呢?

2.1 flask VS django

2.1.1 项目规模与复杂度:

Flask:轻量级,适合小型项目、微服务及功能单一应用,如简易 API、小型博客,代码结构扁平,上手快。

Django:全栈式,自带诸多功能,契合大型、复杂项目,像电商网站这类有繁杂业务逻辑、多模型与权限管理需求的企业级应用,借助其完善架构与内置组件能高效搭建。

2.1.2 开发速度与灵活性:

Flask:灵活性强,可按需自选第三方库集成构建,开发速度依开发者对组件熟悉度而定,适合特殊或实验性场景。

Django:内置功能助力快速开发大型应用,ORM、管理界面等省却基础搭建精力,但项目结构较固定,遵循既定模式,限制部分灵活性,利于大型项目组织维护。

2.1.3 性能与资源占用;

Flask:自身轻量,资源占用少,在简单场景性能佳,不过性能优化多靠开发者引入第三方库实现。

Django:自带缓存、连接池等优化手段提升大型项目性能,但因功能丰富,资源占用相对多,小型项目需适当优化。

2.1.4 社区与文档支持:

Flask:社区活跃,文档简洁且示例多,便于初学者入门,也能助开发者解决常见问题、选用扩展。

Django:社区庞大、生态丰富,遇问题易获解答,还有海量插件可选;官方文档详尽全面,覆盖各层次开发知识需求。

基于以上,本次的小项目使用 flask 是比较快速的。

3. 整体思路

3.1 dataxweb 元数据

既然所有的信息都在 dataxweb 中,那么先观察一下 dataxweb 的元数据。

在这里插入图片描述

DataXWeb元数据

这是所有的表,见名之意,任务相关信息在 job_info 中。

观察一下 job_info 的 DDL

CREATE TABLE `job_info` (`id` int NOT NULL AUTO_INCREMENT,`job_group` int NOT NULL COMMENT '执行器主键ID',`job_cron` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '任务执行CRON',`job_desc` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`project_id` int DEFAULT NULL COMMENT '所属项目id',`add_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,`user_id` int NOT NULL COMMENT '修改用户',`alarm_email` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '报警邮件',`executor_route_strategy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '执行器路由策略',`executor_handler` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '执行器任务handler',`executor_param` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '执行器任务参数',`executor_block_strategy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '阻塞处理策略',`executor_timeout` int NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位分钟',`executor_fail_retry_count` int NOT NULL DEFAULT '0' COMMENT '失败重试次数',`glue_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'GLUE类型',`glue_source` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT 'GLUE源代码',`glue_remark` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'GLUE备注',`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',`child_jobid` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',`trigger_status` tinyint NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',`trigger_last_time` bigint NOT NULL DEFAULT '0' COMMENT '上次调度时间',`trigger_next_time` bigint NOT NULL DEFAULT '0' COMMENT '下次调度时间',`job_json` text CHARACTER SET utf8 COLLATE utf8_general_ci COMMENT 'datax运行脚本',`replace_param` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '动态参数',`jvm_param` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'jvm参数',`inc_start_time` datetime DEFAULT NULL COMMENT '增量初始时间',`partition_info` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '分区信息',`last_handle_code` int DEFAULT '0' COMMENT '最近一次执行状态',`replace_param_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '增量时间格式',`reader_table` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'reader表名称',`primary_key` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '增量表主键',`inc_start_id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '增量初始id',`increment_type` tinyint DEFAULT '0' COMMENT '增量类型',`datasource_id` bigint DEFAULT NULL COMMENT '数据源id',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=600 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;

这里有几个关键字段,

  • id,
  • job_desc 任务名,
  • project_id 所属项目,
  • trigger_status 调度状态:0-停止,1-运行 ,
  • job_json datax 运行脚本

3.2 job_json 分析

这里还是比较简单的,主体就是 job_json 。熟悉 datax 的朋友 应该了解,datax 主要是 json,分为 reader 和 writer 。大体上分为两种,

reader 里读整个表,
reader 里写复杂的 sql
第一种的 json 如下:

"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","column": ["id","name"],"splitPk": "db_id","connection": [{"table": ["table"],"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]}]}
}

这种比较舒服,解析 json 就能把所有的信息获取。难点在于第二种。

"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","splitPk": "db_id","connection": [{"querySql": ["select * from table"],"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]}]}
}

就库表信息而言,都在 connection 里,第二种的难点在于 querySql 中有复杂的 sql,需要把 sql 解析出来 。(writer 都相似,不再赘述)

3.3 sql 解析

在 python 中 有一个解析 sql 的优秀项目叫 sqllineage 。官网地址[1]这里简单演示一下使用 。

from sqllineage.runner import LineageRunnersql = ''
with open('sql.sql', 'r', encoding='utf-8') as f:sql = f.read()
result = LineageRunner(sql)
print(result)

会得到如下结果

Source Tables:<default>.bd_a<default>.bd_b<default>.bd_c<default>.res_a<default>.res_c
Target Tables:

3.4 思路总结

至此,我们来理一下思路:

所有任务都在 dataxweb 的元数据 job_info 表里存在
datax 任务的主体在 job_info 里的 job_json 中
job_json 一种可以直接获取库、表信息,另一种写复杂 sql 的较为麻烦,但是可以用 sqllineage 来解决。
采用 flask 框架,url 中提供目标表,通过查询数据库和解析 json 的方式来找到和目标表相关的任务 。

下一篇,讲如何具体实现 ,以及实现过程的踩坑如何解决。

查询效果预览
在这里插入图片描述

第二篇文章在这里,含代码

▼ 关注「DataSpeed」,获取更多技术干货 ▼


http://www.mrgr.cn/news/78796.html

相关文章:

  • nginx配置 - 资源参数配置(性能优化)
  • 安卓cpu调度优化
  • Unity 对Sprite或者UI使用模板测试扣洞
  • 基于云架构Web端的工业MES系统:赋能制造业数字化变革
  • springboot3 redis 批量删除特定的 key 或带有特定前缀的 key
  • IDEA 撤销 merge 操作(详解)
  • Proxy详解
  • 大数据hadoop、spark、flink、kafka发展的过程
  • 【MySQL-6】MySQL的复合查询
  • RPC中定时器制作思路
  • 视觉语言动作模型VLA的持续升级:从π0之参考基线Octo到OpenVLA、TinyVLA、DeeR-VLA、3D-VLA
  • 贪心算法理论
  • JavaScript 前端开发:从入门到精通的奇幻之旅
  • 自动驾驶目标检测融合全貌
  • 点击A组件跳转到B页面的tab的某一列
  • 云备份实战项目
  • 探索嵌入式硬件设计:揭秘智能设备的心脏
  • 【Rust在WASM中实现pdf文件的生成】
  • debian 11 虚拟机环境搭建过坑记录
  • Flink常见面试题
  • 嵌入式C编程:宏定义与typedef的深入对比与应用
  • Python知识分享第十六天
  • Hadoop生态圈框架部署(九)- Hive部署
  • MySQL中如何减少回表
  • 微服务即时通讯系统的实现(服务端)----(3)
  • 基础Web安全|SQL注入