当前位置: 首页 > news >正文

Seatunnel 整合 xxl-job 实现批处理任务定时执行

为解决 Seatunnel Web 没有定时任务的问题,本文采用 xxl-job 调度器结合 Seatunnel 实现 批处理的定时执行。
环境要求:在执行器的环境中存在 Seatunnel 环境,并且配置了 Seatunnel 环境变量。
说明:任务采用 Seatunnel 的 ./bin/seatunnel.sh -c config.template 的方式实现任务的提交 ,通过 -i 参数实现 参数动态传递

创建任务

在这里插入图片描述

编写脚本

在这里插入图片描述
将处理任务添加到 Shell 脚本中,具体脚本如下。保存后运行即可。通过修改 config_content 值实现任务的修改

批处理

#!/bin/bashSEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"# 定义任务停止时执行的清理操作
exit_func() {# 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等$SEATUNNEL_CMD -can "$JOB_ID"exit;
}# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL# 将配置内容写入变量
config_content=$(cat <<EOL
env {# You can set SeaTunnel environment configuration hereparallelism = 2job.mode = "BATCH"checkpoint.interval = 10000
}source {# This is a example source plugin **only for test and demonstrate the feature source plugin**FakeSource {parallelism = 2result_table_name = "fake"row.num = 16schema = {fields {name = "string"age = "int"}}}# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,# please go to https://seatunnel.apache.org/docs/connector-v2/source
}sink {Console {}
}
EOL
)echo "开始执行任务"## 同步任务,日志中会打印运行日志
echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin## 异步任务,日志不会记录运行日志# 将配置内容写入标准输入并传递给 SeaTunnel
# SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
# #JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')# #echo "任务Id: $JOB_ID"# # 监控任务状态
# while true; do
#     # 查询任务状态
#     STATUS_OUTPUT=$($SEATUNNEL_CMD -j "$JOB_ID" 2>&1)
#     TASK_STATE=$(echo "$STATUS_OUTPUT" | grep "$JOB_ID" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')#     if [[ "$TASK_STATE" == "FINISHED" ]]; then
#         echo "任务完成, 状态: $TASK_STATE"
#         exit 0
#     fi
#     # 检查任务是否已完成
#     if [[ "$TASK_STATE" != "RUNNING" ]]; then
#         echo "任务已结束,状态:$TASK_STATE"
#         exit 1
#     else
#         echo "任务运行中 ... 状态: $TASK_STATE"
#         # 等待 5 秒后再次查询
#         sleep 5
#     fi
# done

流处理

针对流操作还有部分问题

  1. 任务无法自启动
  2. 两次任务无法实现增量同步,每次同步对于Seatunnel来说,都是新任务。
#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
SEATUNNEL_HOST=localhost
SEATUNNEL_PORT=5801# 定义任务停止时执行的清理操作
exit_func() {# 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等$SEATUNNEL_CMD -can "$JOB_ID"exit;
}# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL# 将配置内容写入变量
config_content=$(cat <<EOL
env {parallelism = 2job.mode = "STREAMING"checkpoint.interval = 2000
}source {FakeSource {parallelism = 2plugin_output = "fake"row.num = 16schema = {fields {name = "string"age = "int"}}}
}sink {Console {}
}
EOL
)echo "开始执行任务"
echo "--------    配置信息    --------------"
echo "$config_content"
echo "--------    end    --------------"# 将配置内容写入标准输入并传递给 SeaTunnel
SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')echo "任务Id: $JOB_ID"# 监控任务状态
while true; doSTATUS_OUTPUT=$(curl -s http://$SEATUNNEL_HOST:$SEATUNNEL_PORT/hazelcast/rest/maps/job-info/$JOB_ID)echo $(date "+%Y-%m-%d %H:%M:%S.%3N") "写入数量 : "$(echo "$STATUS_OUTPUT" | awk -F'"SinkWriteCount":"' '{print $2}' | awk -F '","' '{print $1}')", 读取数量 :"$(echo "$STATUS_OUTPUT" | awk -F'"SourceReceivedCount":"' '{print $2}' | awk -F '","' '{print $1}')TASK_STATE=$(echo "$STATUS_OUTPUT" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')if [[ "$TASK_STATE" == "FINISHED" ]]; thenecho "任务完成, 状态: $TASK_STATE"exit 0fiif [[ "$TASK_STATE" != "RUNNING" ]]; thenecho "任务已结束,状态:$TASK_STATE"exit 1elseecho "任务运行中 ... 状态: $TASK_STATE"sleep 300fi
done

http://www.mrgr.cn/news/80837.html

相关文章:

  • 快速掌握源码部署Filebeat
  • Python权限系统-前后端分离
  • 使用Chat-LangChain模块创建一个与用户交流的机器人
  • golang中的值传递与引用传递,如何理解结构体的方法?为什么 T 和 *T 有不同的方法集?
  • 队列与树型结构中的二叉树
  • [LitCTF 2023]easy_math (中级)
  • springboot双数据源配置及进行数据库操作
  • Swin transformer 论文阅读记录 代码分析
  • 【Rust自学】4.2. 所有权规则、内存与分配
  • 将HTML转换为PDF:使用Spire.Doc的详细指南(一) 试用版
  • java开发入门学习五-流程控制
  • MySQL高可用
  • 【5】C#期末复习第5套
  • Android 10 Launcher3 删除谷歌搜索
  • 【软件设计_设计模式】设计模式代码笔记
  • 关于在浏览器里面获取手机方向的事件
  • Java收发邮件 Jakarta mail
  • 每日十题八股-2024年12月19日
  • el-table 多表头+跨行跨列案例
  • 使用Gradio编写大模型ollama客户端 -界面版
  • Kaggler日志--Day9
  • Docker:Dockerfile(补充四)
  • 【Rust自学】4.1. 所有权:栈内存 vs. 堆内存
  • 【Linux】NET9运行时移植到低版本GLIBC的Linux纯内核板卡上
  • 初学stm32 ——— 串口通信
  • Qt Quick:CheckBox 复选框