DataX实战|使用Python 构建简易的DataX数据血缘工具(一)
导读:
在这篇文章中,我介绍了如何使用 Python 构建简易的 DataX 数据血缘工具,以便解决 DataXWeb 在查询表上下游关系时的不足。
选择 Flask 作为框架,利用 DataXWeb 的元数据 job_info 表和 job_json,通过 JSON 解析与 sqllineage 的 SQL 解析能力提取出任务相关的库表信息。
1. 背景
由于我们在 ETL 时全部在 datax 上,在 datax 里写了大量复杂的 sql,导致在排查问题时困难重重,而唯一的可视化页面是 dataxweb。
如果我需要查询一个表的上下游,只能在 dataxweb 里去搜,而 dataxweb 只支持搜任务名,若任务名与表名相同,那算走运;若历史任务名比较特殊,则无从查起。
因此,需要这么一个工具,输入一个表,就能知道这个表出现在哪些 datax 任务中,不管他是 source 表还是 target 表。相当于一个简单的血缘,或者上下游。
那么,如何实现呢?人生苦短,我选 Python。
2. 框架选择
显然这是一个前后端结合的项目,在 python 中,常用的前端框架有两种:
- flask
- django
这两个如何选择呢?
2.1 flask VS django
2.1.1 项目规模与复杂度:
Flask:轻量级,适合小型项目、微服务及功能单一应用,如简易 API、小型博客,代码结构扁平,上手快。
Django:全栈式,自带诸多功能,契合大型、复杂项目,像电商网站这类有繁杂业务逻辑、多模型与权限管理需求的企业级应用,借助其完善架构与内置组件能高效搭建。
2.1.2 开发速度与灵活性:
Flask:灵活性强,可按需自选第三方库集成构建,开发速度依开发者对组件熟悉度而定,适合特殊或实验性场景。
Django:内置功能助力快速开发大型应用,ORM、管理界面等省却基础搭建精力,但项目结构较固定,遵循既定模式,限制部分灵活性,利于大型项目组织维护。
2.1.3 性能与资源占用;
Flask:自身轻量,资源占用少,在简单场景性能佳,不过性能优化多靠开发者引入第三方库实现。
Django:自带缓存、连接池等优化手段提升大型项目性能,但因功能丰富,资源占用相对多,小型项目需适当优化。
2.1.4 社区与文档支持:
Flask:社区活跃,文档简洁且示例多,便于初学者入门,也能助开发者解决常见问题、选用扩展。
Django:社区庞大、生态丰富,遇问题易获解答,还有海量插件可选;官方文档详尽全面,覆盖各层次开发知识需求。
基于以上,本次的小项目使用 flask 是比较快速的。
3. 整体思路
3.1 dataxweb 元数据
既然所有的信息都在 dataxweb 中,那么先观察一下 dataxweb 的元数据。
DataXWeb元数据
这是所有的表,见名之意,任务相关信息在 job_info 中。
观察一下 job_info 的 DDL
CREATE TABLE `job_info` (`id` int NOT NULL AUTO_INCREMENT,`job_group` int NOT NULL COMMENT '执行器主键ID',`job_cron` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '任务执行CRON',`job_desc` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`project_id` int DEFAULT NULL COMMENT '所属项目id',`add_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,`user_id` int NOT NULL COMMENT '修改用户',`alarm_email` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '报警邮件',`executor_route_strategy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '执行器路由策略',`executor_handler` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '执行器任务handler',`executor_param` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '执行器任务参数',`executor_block_strategy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '阻塞处理策略',`executor_timeout` int NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位分钟',`executor_fail_retry_count` int NOT NULL DEFAULT '0' COMMENT '失败重试次数',`glue_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'GLUE类型',`glue_source` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT 'GLUE源代码',`glue_remark` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'GLUE备注',`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',`child_jobid` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',`trigger_status` tinyint NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',`trigger_last_time` bigint NOT NULL DEFAULT '0' COMMENT '上次调度时间',`trigger_next_time` bigint NOT NULL DEFAULT '0' COMMENT '下次调度时间',`job_json` text CHARACTER SET utf8 COLLATE utf8_general_ci COMMENT 'datax运行脚本',`replace_param` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '动态参数',`jvm_param` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'jvm参数',`inc_start_time` datetime DEFAULT NULL COMMENT '增量初始时间',`partition_info` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '分区信息',`last_handle_code` int DEFAULT '0' COMMENT '最近一次执行状态',`replace_param_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '增量时间格式',`reader_table` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'reader表名称',`primary_key` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '增量表主键',`inc_start_id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '增量初始id',`increment_type` tinyint DEFAULT '0' COMMENT '增量类型',`datasource_id` bigint DEFAULT NULL COMMENT '数据源id',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=600 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
这里有几个关键字段,
- id,
- job_desc 任务名,
- project_id 所属项目,
- trigger_status 调度状态:0-停止,1-运行 ,
- job_json datax 运行脚本
3.2 job_json 分析
这里还是比较简单的,主体就是 job_json 。熟悉 datax 的朋友 应该了解,datax 主要是 json,分为 reader 和 writer 。大体上分为两种,
reader 里读整个表,
reader 里写复杂的 sql
第一种的 json 如下:
"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","column": ["id","name"],"splitPk": "db_id","connection": [{"table": ["table"],"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]}]}
}
这种比较舒服,解析 json 就能把所有的信息获取。难点在于第二种。
"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","splitPk": "db_id","connection": [{"querySql": ["select * from table"],"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]}]}
}
就库表信息而言,都在 connection 里,第二种的难点在于 querySql 中有复杂的 sql,需要把 sql 解析出来 。(writer 都相似,不再赘述)
3.3 sql 解析
在 python 中 有一个解析 sql 的优秀项目叫 sqllineage 。官网地址[1]这里简单演示一下使用 。
from sqllineage.runner import LineageRunnersql = ''
with open('sql.sql', 'r', encoding='utf-8') as f:sql = f.read()
result = LineageRunner(sql)
print(result)
会得到如下结果
Source Tables:<default>.bd_a<default>.bd_b<default>.bd_c<default>.res_a<default>.res_c
Target Tables:
3.4 思路总结
至此,我们来理一下思路:
所有任务都在 dataxweb 的元数据 job_info 表里存在
datax 任务的主体在 job_info 里的 job_json 中
job_json 一种可以直接获取库、表信息,另一种写复杂 sql 的较为麻烦,但是可以用 sqllineage 来解决。
采用 flask 框架,url 中提供目标表,通过查询数据库和解析 json 的方式来找到和目标表相关的任务 。
下一篇,讲如何具体实现 ,以及实现过程的踩坑如何解决。
查询效果预览
第二篇文章在这里,含代码
▼ 关注「DataSpeed」,获取更多技术干货 ▼