当前位置: 首页 > news >正文

大数据组件(一)快速入门调度组件Airflow

大数据组件(一)快速入门调度组件Airflow

  • DolphinScheduler和 Airflow是数据领域很流行的两款开源任务调度系统。DolphinScheduler 致力于用可视化的方式去完成一个 DAG 工作流,而 Airflow 则想的是用类似于编程的方式完成一个 DAG 工作流。
    • Apache DolphinScheduler 可以直接在页面上完成对 DAG 工作流的开发。
    • 而 Apache Airflow 需要提交一个 Python 文件到后台服务器上,由 Apache Airflow 去解析这个 Python 文件,进而生成一个 DAG 工作流。
  • 我们,今天以几个简单的案例,快速了解下基于python的调度组件Airflow。
    • 官网地址:https://airflow.incubator.apache.org/

1 Airflow的安装(单机版)

这里,我们利用conda进行安装。

首先,创建环境:

conda create -n airflow  python=3.8 -y
conda activate airflow

然后,利用pip进行安装,需要注意的是:安装时候,需要指定约束文件,否则很容易会出现依赖冲突

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt"
pip install "apache-airflow==2.7.2" --constraint "${CONSTRAINT_URL}"

我们这里利用MySQL数据库进行配置:

# 3、查询版本
(airflow) [root@centos01 ~]# airflow version
2.7.2
(airflow) [root@centos01 airflow]# pwd
/root/airflow# 4、数据库初始化、改为Mysql数据库
(airflow) [root@centos01 airflow]# pip install mysql-connector-python
(airflow) [root@centos01 airflow]# airflow db init
(airflow) [root@centos01 airflow]# vim airflow.cfg#sql_alchemy_conn = sqlite:root/airflow/airflow.db
sql_alchemy_conn = mysql+mysqlconnector://root:123456@127.0.0.1:3306/airflow_db# 再次初始化
(airflow) [root@centos01 airflow]# airflow db init
# 报错如下:
sqlalchemy.exc.ProgrammingError: (mysql.connector.errors.ProgrammingError) 1067 (42000): Invalid default value for 'updated_at'
[SQL: 
CREATE TABLE dataset (id INTEGER NOT NULL AUTO_INCREMENT, uri VARCHAR(3000) COLLATE latin1_general_cs NOT NULL, extra JSON NOT NULL, created_at TIMESTAMP(6) NOT NULL, updated_at TIMESTAMP(6) NOT NULL, is_orphaned BOOL NOT NULL DEFAULT '0', CONSTRAINT dataset_pkey PRIMARY KEY (id)
)]
(Background on this error at: https://sqlalche.me/e/14/f405)# 解决方案
原因:字段 'update_at' 为 timestamp类型,取值范围是:1970-01-01 00:00:00 到 2037-12-31 23:59:59(UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。
推荐修改mysql存储时间戳格式:
mysql> set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION' 
重启MySQL会造成参数失效,推荐将参数写入到配置文件my.cnf中。# 在[mysqld]添加下面两行
[root@centos01 apps]# vim /etc/my.cnf
[mysqld]
skip_ssl
sql_mode=STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
# 重启mysql
[root@centos01 apps]# systemctl restart mysqld# 5、添加admin用户
(airflow) [root@centos01 airflow]# airflow users create \
> --username admin \
> --firstname Undo \
> --lastname Undo \
> --role Admin \
> --email undo@163.com
# 输入密码
[2024-12-27T16:28:17.107+0800] {manager.py:555} INFO - Added Permission %s to role %s
Password:# 6、启动airflow web服务和调度器, 启动后浏览器访问http://centos01:8080
(airflow) [root@centos01 airflow]# airflow webserver -p 8080 -D
(airflow) [root@centos01 airflow]# airflow scheduler -D

在这里插入图片描述

上图是默认安装时候,会出现的两条信息:

  • Airflow使用SQLite数据库,建议改为Mysql数据库,我们已经修改;
  • SequentialExecutor按顺序运行任务实例,不能并行执行任务,我们下面修改;
  • 注意:右上角默认是UTC时间,我们一定要点击修改时区,同时要修改airflow.cfg文件。
# 7、修改airflow的配置文件
(airflow) [root@centos01 airflow]# vim airflow.cfg 
# 可以使用官方推荐的几种执行器,也可以自定义。这里我们选择本地执行器即可。
[core]
# 存放python调度脚本的目录
dags_folder = /root/airflow/dags# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = LocalExecutor# 修改时区信息# default_ui_timezone = UTC
default_ui_timezone = Asia/Shanghai# default_timezone = utc
default_timezone = Asia/Shanghai# 创建目录,用来存放python调度脚本
(airflow) [root@centos01 airflow]# mkdir dags

我们把airflow启动、关闭封装为脚本:

(airflow) [root@centos01 ~]# vim af.sh
(airflow) [root@centos01 ~]# chmod +x af.sh 
#!/bin/bash
case $1 in
"start"){echo " --------start airflow-------"conda activate airflow;airflow webserver -p 8080 -D;airflow scheduler -D;conda deactivate
};;
"stop"){echo " --------stop airflow-------"ps -ef | egrep 'scheduler|airflow-webserver' | grep -v grep | awk '{print $2}' | xargs kill -15
};;
esac# 然后重启
(airflow) [root@centos01 ~]# ./af.sh stop--------stop airflow-------
(airflow) [root@centos01 ~]# ./af.sh start--------start airflow-------# 查看是否启动成功(airflow) [root@centos01 ~]# ps -ef | grep 8080
root       1935      1  1 20:55 ?        00:00:00 /opt/apps/minoconda3/envs/airflow/bin/python /opt/apps/minoconda3/envs/airflow/bin/airflow webserver -p 8080 -D
root       2618   1630  0 20:55 pts/0    00:00:00 grep --color=auto 8080# 访问http://centos01:8080/页面,页面上会出现很多官方示例
# 当然,我们也能通过命令查看
(airflow) [root@centos01 ~]# airflow dags list
dag_id                                                  | filepath                                                                                                                             | owner   | paused
========================================================+======================================================================================================================================+=========+=======
dataset_consumes_1                                      | /opt/apps/minoconda3/envs/airflow/lib/python3.8/site-packages/airflow/example_dags/example_datasets.py                               | airflow | True  
dataset_consumes_1_and_2                                | /opt/apps/minoconda3/envs/airflow/lib/python3.8/site-packages/airflow/example_dags/example_datasets.py                               | airflow | True  
dataset_consumes_1_never_scheduled                      | /opt/apps/minoconda3/envs/airflow/lib/python3.8/site-packages/airflow/example_dags/example_datasets.py                               | airflow | True  
......

2 Airflow入门案例

2.1 BashOperator

  • Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。

在这里插入图片描述

  • BashOperator主要执行bash脚本或命令
  • Operator参考:https://airflow.apache.org/docs/
(airflow) [root@centos01 shell_jobs]# cat first_shell.sh 
#!/bin/bashdt=$1echo "==== execute first shell ===="echo "---- first : time is ${dt}"
(airflow) [root@centos01 shell_jobs]# cat second_shell.sh 
#!/bin/bash
dt=$1
echo "==== execute second shell ===="
echo "---- second : time is ${dt}"
  • python脚本如下(可以用VS Code远程连接),注意:要放在配置的目录下

在这里插入图片描述

#!/usr/bin/python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'root','start_date':datetime(2024, 12, 27, 22, 0),   # 调度开始时间'retries': 1,                                 # 失败重试次数'retry_delay': timedelta(minutes=5)           # 失败重试间隔
}dag = DAG(dag_id='MyShellTest',default_args=default_args,schedule_interval='*/15 * * * *'    # 每15min运行一次
)first=BashOperator(task_id='first',# 脚本路径建议写绝对路径bash_command='sh /root/shell_jobs/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d-%H"),dag=dag
)second=BashOperator(task_id='second',# 脚本路径建议写绝对路径bash_command='sh /root/shell_jobs/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d-%H"),dag=dag
)first >> second

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

2.2 SSHOperator

  • 在实际的调度任务中,任务脚本大多分布在不同的机器上,我们可以使用SSHOperator来调用远程机器上的脚本任务。

  • Airflow中提供了很多的providers,需要通过pip安装apache-airflow-providers-ssh包。

    • 安装哪个版本的apache-airflow-providers-ssh包呢?

    • 需要查看我们安装时候的https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt约束文件

    • 在这里插入图片描述

  • 最后,配置SSH Connection连接。登录airflow webui ,选择“Admin”->“Connections”。

在这里插入图片描述

# 首先停止airflow webserver与scheduler# 我们在https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt找相应的版本安装
(airflow) [root@centos01 airflow]# pip install apache-airflow-providers-ssh==3.7.3# 然后启动airflow webserver与scheduler
我们在另一台机器上创建两个shell脚本:
#!/bin/bash
# 获取主机名
HOSTNAME=$(hostname)
echo "==== 执行脚本主机是: $HOSTNAME, execute first shell ===="#!/bin/bash
# 获取主机名
HOSTNAME=$(hostname)
echo "==== 执行脚本主机是: $HOSTNAME, execute second shell ===="[root@centos02 shell_jobs]# ll
total 8
-rw-r--r--. 1 root root 122 Dec 28 16:34 first_shell.sh
-rw-r--r--. 1 root root 123 Dec 28 16:34 second_shell.sh

然后,创建Python脚本

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperatordefault_args = {'owner':'root','start_date': datetime(2024, 12, 28, 16),   # 开始执行时间'retries': 1,                               # 失败重试次数'retry_delay': timedelta(minutes=5)         # 失败重试间隔
}# 声明任务图, Airflow使用的是“前一个周期”来调度 DAG 运行
# 即:在2024-12-28 00:00:00时,Airflow会执行的调度账期是:2024-12-27 00:00:00
# dag = DAG('MyTaskTest', default_args=default_args, schedule_interval=timedelta(days=1))dag = DAG(dag_id = 'Myssh2centos02',default_args=default_args,schedule_interval=timedelta(minutes=10)  # 10min执行一次
)first=SSHOperator(task_id='first',ssh_conn_id='AA-My-ssh-centos02',                  # 配置在Airflow webui Connection中配置的SSH Conn idcommand='sh /root/shell_jobs/first_shell.sh ',     # 注意:带一个空格dag = dag
)second=SSHOperator(task_id='second',ssh_conn_id='AA-My-ssh-centos02',                     # 配置在Airflow webui Connection中配置的SSH Conn idcommand='sh /root/shell_jobs/second_shell.sh ',    # 注意:带一个空格remote_host="192.168.42.102",                      # 如果配置remote_host,将会替换Connection中的SSH配置的hostdag=dag
)first >> second

在这里插入图片描述

2.3 PythonOperator

  • PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。
import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pytzdef print__hello1(*a,**b):"""*  关键字参数允许传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple** 关键字参数允许传入0个或任意个含【参数名】的参数,这些关键字参数在函数内部自动组装为一个dict"""print("hello airflow1")print(a)print(b)# 返回的值只会打印到日志中return {"sss1":"print__hello1"}def print__hello2(execution_ds, execution_ts, random_base):print("hello airflow2")print("Today:{}, 当前执行的账期为:{}, 随机数为:{}".format(execution_ds, execution_ts, random_base))# 将字符串解析为带有时区信息的datetime对象utc_dt = datetime.strptime(execution_ts, "%Y-%m-%dT%H:%M:%S%z")# 定义目标时区(上海时区)shanghai_tz = pytz.timezone('Asia/Shanghai')# 将UTC时间转换为目标时区的时间shanghai_dt = utc_dt.astimezone(shanghai_tz)# 格式化输出execution_ts_sh = shanghai_dt.strftime('%Y-%m-%d %H:%M:%S')print("Today:{}, 当前执行的账期为:{}, 随机数为:{}".format(execution_ds, execution_ts_sh, random_base))# 返回的值只会打印到日志中return {"sss2":"print__hello2"}default_args = {'owner':'root','start_date':  datetime(2024, 12, 28, 14), # 开始执行时间'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'MyPythonOperator',default_args=default_args,schedule_interval='*/10 * * * *'  # 每10min运行一次
)first=PythonOperator(task_id='MyPython_first',#填写print__hello1方法时,不要加上()python_callable=print__hello1,# op_args对应print_hello1方法中的a参数op_args=[1,2,3,"hello","world"],# op_kwargs对应print__hello1方法中的b参数,带参数名称op_kwargs={"id":"1","name":"zs","age":18},dag = dag
)second=PythonOperator(task_id='MyPython_second',# 同样,填写print__hello2 方法时,不要加上()python_callable=print__hello2,op_kwargs={# {{ ds }} 是一个Airflow Jinja模板变量,表示当前的执行日期(格式为 YYYY-MM-DD)'execution_ds': '{{ ds }}', # {{ ts }} 是一个Airflow Jinja模板变量,表示当前的执行时间戳,格式为 YYYY-MM-DDTHH:MM:SS# 注意:默认传递是UTC时间,例如:2024-12-28T07:30:00+00:00'execution_ts': '{{ ts }}', # random_base参数对应print_hello2方法中参数的random_base"random_base": random.randint(0, 9) },dag=dag
)first >> second

在这里插入图片描述

2.4 HiveOperator

  • 可以通过HiveOperator直接操作Hive SQL
  • 在airflow中使用HiveOperator调用Hive任务,首先需要安装以下依赖并配置Hive Metastore
# 在https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt找相应的版本安装
pip install apache-airflow-providers-apache-hive==6.1.6
  • 启动HDFS、Hive Metastore,在Hive中创建下面表,并加载文件。
-- 1、创建表
create table ods_person_info(id int,name string,age int, city_id int) 
row format delimited fields terminated by '\t';create table dim_city_info(city_id int,city_name string)
row format delimited fields terminated by '\t';-- 2、准备两个文件
ods_person_info.csv
1	John	30	1001
2	Jane	25	1002
3	Mike	40	1001
4	Sarah	35	1003
5	Liam	22	1004dim_city_info.csv
1001	New York
1002	Los Angeles
1003	Chicago
1004	Houston
1005	Phoenix-- 3、加载数据
LOAD DATA LOCAL INPATH '/root/input_doris_data/ods_person_info.csv' 
INTO TABLE ods_person_info;LOAD DATA LOCAL INPATH '/root/input_doris_data/dim_city_info.csv' 
INTO TABLE dim_city_info;
  • 同样,登录Airflow webui并设置Hive Metastore

在这里插入图片描述

  • 编写Python脚本
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.operators.python_operator import PythonOperator
import pytzdefault_args = {'owner': 'root','start_date': datetime(2024, 12, 30, 12),'retries': 1,'retry_delay': timedelta(minutes=5)
}dag = DAG(dag_id='MyHiveOperator',default_args=default_args,schedule_interval=timedelta(hours=1)
)# 1、利用PythonOperator封装分区变量
def get_partition_hour(**context):execution_dt = context['execution_date']# 转换Asia/Shanghai分区时间shanghai_tz = pytz.timezone('Asia/Shanghai')shanghai_dt = execution_dt.astimezone(shanghai_tz)# 分区时间partition_hour = shanghai_dt.strftime('%Y%m%d%H')context['ti'].xcom_push(key='partition_hour', value=partition_hour)return {'partition_hour': partition_hour}time_prep = PythonOperator(task_id='prepare_times',python_callable=get_partition_hour,provide_context=True,dag=dag
)# 2、创建结果表
create_table = HiveOperator(task_id='create_table',hive_cli_conn_id="AA-My-centos01-hive",hql="""CREATE TABLE IF NOT EXISTS dwd_person_city_info (id INT,name STRING,age INT,city_id INT,city_name STRING,insert_time STRING)PARTITIONED BY (p_hour STRING)ROW FORMAT DELIMITEDFIELDS TERMINATED BY '\t'""",dag=dag
)# 3、利用PythonOperator来预处理HQL语句,并将处理后的HQL语句传递给HiveOperator
def get_insert_time():# 获取当前的执行时间now = datetime.now()formatted_time = now.strftime('%Y-%m-%d %H:%M:%S')return formatted_timedef prepare_hql(**context):# Airflow中可以XCom功能来共享数据# 通过XCom,一个任务可以将数据推送到一个临时存储中,其他任务可以从这个存储中拉取数据partition_hour = context['ti'].xcom_pull(task_ids='prepare_times', key='partition_hour')# 调用本地方法formatted_time = get_insert_time()# 组装hive-sqlhql = f"""INSERT OVERWRITE TABLE dwd_person_city_info PARTITION(p_hour='{partition_hour}')SELECT a.id, a.name, a.age, a.city_id, b.city_name, '{formatted_time}' AS insert_timeFROM ods_person_info a JOIN dim_city_info b ON a.city_id = b.city_id"""return hqlprepare_hql_task = PythonOperator(task_id='prepare_hql',python_callable=prepare_hql,provide_context=True,dag=dag
)# 4、执行hive-sql插入到相应分区
insert_data = HiveOperator(task_id='insert_partitioned_data',hive_cli_conn_id="AA-My-centos01-hive",# 注意,这里使用了task_instance.xcom_pull而不是ti.xcom_pull# 因为在HiveOperator的模板上下文中,ti可能不是直接可用的# 而task_instance是Airflow在模板渲染时提供的一个全局变量,用于访问当前任务实例。hql="{{ task_instance.xcom_pull(task_ids='prepare_hql', key='return_value') }}",dag=dag
)time_prep >> create_table >> prepare_hql_task >> insert_data
0: jdbc:hive2://192.168.42.101:10000> show tables;
+-----------------------+
|       tab_name        |
+-----------------------+
| dim_city_info         |
| dwd_person_city_info  |
| ods_person_info       |
+-----------------------+
3 rows selected (0.931 seconds)
0: jdbc:hive2://192.168.42.101:10000> show partitions dwd_person_city_info;
+--------------------+
|     partition      |
+--------------------+
| p_hour=2024123012  |
| p_hour=2024123013  |
| p_hour=2024123014  |
| p_hour=2024123015  |
| p_hour=2024123016  |
| p_hour=2024123017  |
+--------------------+
6 rows selected (0.307 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from  dwd_person_city_info a  where p_hour=2024123017;
+-------+---------+--------+------------+--------------+----------------------+-------------+
| a.id  | a.name  | a.age  | a.city_id  | a.city_name  |    a.insert_time     |  a.p_hour   |
+-------+---------+--------+------------+--------------+----------------------+-------------+
| 1     | John    | 30     | 1001       | New York     | 2024-12-30 18:00:10  | 2024123017  |
| 2     | Jane    | 25     | 1002       | Los Angeles  | 2024-12-30 18:00:10  | 2024123017  |
| 3     | Mike    | 40     | 1001       | New York     | 2024-12-30 18:00:10  | 2024123017  |
| 4     | Sarah   | 35     | 1003       | Chicago      | 2024-12-30 18:00:10  | 2024123017  |
| 5     | Liam    | 22     | 1004       | Houston      | 2024-12-30 18:00:10  | 2024123017  |
+-------+---------+--------+------------+--------------+----------------------+-------------+
5 rows selected (2.301 seconds)

http://www.mrgr.cn/news/82045.html

相关文章:

  • 如何将服务器的镜像推送到阿里的容器镜像服务中
  • 系统设计——大文件传输方案设计
  • 【MySQL】发展起源与核心架构组件详细介绍
  • Colyseus Metadata 详解
  • Datawhale AI冬令营(第二期)动手学AI Agent task2--学Prompt工程,优化Agent效果
  • Linux postgresql-15部署文档
  • 人形机器人全身运动规划相关资料与文章
  • PaddleOCROCR关键信息抽取训练过程
  • 蓝桥杯(Java)(ing)
  • 【网络安全实验室】基础关实战详情
  • 沪深捉妖记(一)探寻妖股的特征
  • 数据结构与算法之动态规划: LeetCode 3105. 最长的严格递增或递减子数组 (Ts版)
  • Nginx - 整合lua 实现对POST请求的参数拦截校验(不使用Openresty)
  • L25.【LeetCode笔记】 三步问题的四种解法(含矩阵精彩解法!)
  • SAP SD信贷管理信用管理手册(下)
  • 通义千问QvQ-72B-Preview模型部署
  • FOC控制原理-ADC采样时机
  • HarmonyOS NEXT应用开发实战:免费练手的网络API接口分享
  • 数据结构与算法之动态规划: LeetCode 1143. 最长公共子序列 (Ts版)
  • 后端开发-Maven
  • 细说STM32F407单片机CAN基础知识及其HAL驱动程序
  • FPGA多路红外相机视频拼接输出,提供2套工程源码和技术支持
  • 数据结构与算法之动态规划: LeetCode 674. 最长连续递增序列 (Ts版)
  • 配置中心 之 apollo
  • Postman[8] 断言
  • python文件操作相关(excel)