当前位置: 首页 > news >正文


1. Hudi概述

1.1 Hudi简介 What is Apache Hudi

Apache Hudi is the next generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality directly to a data lake. Hudi provides tables, transactions, efficient upserts/deletes, advanced indexes, streaming ingestion services, data clustering/compaction optimizations, and concurrency all while keeping your data in open source file formats.

Apache Hudi 是下一代流式数据湖平台。Apache Hudi 将核心的数据仓库和数据库功能直接引入数据湖。Hudi 提供了表、事务、高效的更新/删除、先进的索引、流式摄取服务、数据聚类/压缩优化以及并发处理,同时保持数据在开源文件格式中。

行式存储: .avro

列式存储: .parquet

字节跳动基于Apache Hudi构建EB级数据湖实践

Not only is Apache Hudi great for streaming workloads, but it also allows you to create efficient incremental batch pipelines. Read the docs for more use case descriptions and check out who's using Hudi, to see how some of the largest data lakes in the world including Uber, Amazon, ByteDance, Robinhood and more are transforming their production data lakes with Hudi.

Apache Hudi 不仅适用于流式工作负载,还允许您创建高效的增量批处理管道。请阅读文档以获取更多用例描述,并查看谁在使用 Hudi,了解一些全球最大的数 据湖如何利用 Hudi 转型他们的生产数据湖,包括 Uber、亚马逊、字节跳动、Robinhood 等公司。 

Apache Hudi can easily be used on any cloud storage platform. Hudi’s advanced performance optimizations, make analytical workloads faster with any of the popular query engines including, Apache Spark, Flink, Presto, Trino, Hive, etc.

Apache Hudi 可以轻松地在任何云存储平台上使用。Hudi 的高级性能优化可以加快与任何流行查询引擎(包括 Apache Spark、Flink、Presto、Trino、Hive 等)一起进行的分析工作负载。

2. 编译Hudi

2.1 编译环境准备





tar -zxvf apache-maven-3.6.1-bin.tar.gz -C /opt/module/

mv apache-maven-3.6.1 maven-3.6.1


sudo vim /etc/profile.d/my_env.sh


export MAVEN_HOME=/opt/module/maven-3.6.1



source /etc/profile

mvn -v



vim /opt/module/maven-3.6.1/conf/settings.xml

<!-- 添加阿里云镜像-->
<mirror><id>nexus-aliyun</id><mirrorOf>central</mirrorOf><name>Nexus aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public</url>

2.2 下载源码包


wget https://dlcdn.apache.org/hudi/0.12.3/hudi-0.12.3.src.tgz






2.3 编译命令

mvn clean package -DskipTests -Dspark3.3 -Dflink1.13 -Dscala-2.12 -Dhadoop.version=3.3.1 -Pflink-bundle-shade-hive3



原因:hadoop2 和hadoop3兼容性问题

hadoop2 的API:







 2.4 解决Spark模块依赖冲突

java.lang.AbstractMethodError: org.apache.hudi.org.apache.jetty.server.AllowedResourceAliasChecker$AllowedResourceAliasCheckListener.lifeCycleFailure(Lorg/apache/hudi/org/apache/jetty/util/component/LifeCycle;Ljava/lang/Throwable;)V

vim /opt/software/hudi-0.12.3/packaging/hudi-spark-bundle/pom.xml

    <!-- Hive -->









    <!-- 增加hudi配置版本的jetty -->

vim /opt/software/hudi-0.12.3/packaging/hudi-utilities-bundle/pom.xml

    <!-- Hive -->










    <!-- 增加hudi配置版本的jetty -->


mvn clean package -DskipTests -Dspark3.3 -Dflink1.13 -Dscala-2.12 -Dhadoop.version=3.3.1 -Pflink-bundle-shade-hive3 


3. 核心概念

3.1 基本概念

Apache Hudi核心概念一网打尽

3.1.1 时间轴(TimeLine)

At its core, Hudi maintains a timeline of all actions performed on the table at different instants of time that helps provide instantaneous views of the table, while also efficiently supporting retrieval of data in the order of arrival. A Hudi instant consists of the following components





n. 瞬息;顷刻;刹那;(某一)时刻

adj. 立即的;即刻的;速溶的;即食的;速食的;紧急的;紧迫的;迫切的;本月的;正在考虑中的

retrieval:[n.] 检索;恢复;取回;找回;恢复

  • Instant action : Type of action performed on the table 
  • Instant time : Instant time is typically a timestamp (e.g: 20190117010349), which monotonically increases in the order of action's begin time.
  • state : current state of the instant



Hudi guarantees that the actions performed on the timeline are atomic & timeline consistent based on the instant time.


Key actions performed include

  • COMMITS - A commit denotes an atomic write of a batch of records into a table.
  • CLEANS - Background activity that gets rid of older versions of files in the table, that are no longer needed.
  • DELTA_COMMIT - A delta commit refers to an atomic write of a batch of records into a MergeOnRead type table, where some/all of the data could be just written to delta logs.
  • COMPACTION - Background activity to reconcile differential data structures within Hudi e.g: moving updates from row based log files to columnar [kəˈlʌmnə] formats. Internally, compaction manifests as a special commit on the timeline
  • ROLLBACK - Indicates that a commit/delta commit was unsuccessful & rolled back, removing any partial files produced during such a write
  • SAVEPOINT - Marks certain file groups as "saved", such that cleaner will not delete them. It helps restore the table to a point on the timeline, in case of disaster/data recovery scenarios.


  • 提交(COMMITS)- 提交表示将一批记录原子性地写入表中。
  • 清理(CLEANS)- 用于清除表中不再需要的旧文件版本的后台活动。
  • 增量提交(DELTA_COMMIT)- 增量提交指的是将一批记录原子性地写入MergeOnRead类型的表中,其中部分或全部数据可能刚刚写入增量日志。
  • 压缩(COMPACTION)- 后台活动,用于协调Hudi中的差异数据结构,例如将更新操作从基于行的日志文件合并到列式存储的数据文件中。在内部,Compaction表现为时间轴上的特殊提交。
  • 回滚(ROLLBACK)- 表示提交/增量提交失败并回滚,删除在此类写入期间生成的任何部分文件。
  • 保存点(SAVEPOINT)- 将某些文件组标记为“已保存”,以便清理器不会删除它们。它有助于在发生灾难需要数据恢复场景中将表恢复到时间轴上的某个点。

denote:vt. 表示;表明;代表;象征;意思是

reconcile [ˈrekənˌsaɪl]:vt.使接受;使顺从;使甘心于;使和解;使和好;使一致;使和谐;使(遭亵渎的教堂等)再次圣化。vi.和解;和好

Any given instant can be in one of the following states

  • REQUESTED - Denotes an action has been scheduled, but has not initiated
  • INFLIGHT - Denotes that the action is currently being performed
  • COMPLETED - Denotes completion of an action on the timeline





    Arrival time: 数据到达 Hudi 的时间,commit time。
    Event time: record 中记录的时间。


上图中采用时间(小时)作为分区字段,从 10:00 开始陆续产生各种 commits,10:20 来了一条 9:00 的数据,根据event time该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消费 10:00 (commit time)之后的增量更新(只消费有新 commits 的 group),那么这条延迟的数据仍然可以被消费到。 

Example above shows upserts happenings between 10:00 and 10:20 on a Hudi table, roughly every 5 mins, leaving commit metadata on the Hudi timeline, along with other background cleaning/compactions. One key observation to make is that the commit time indicates the arrival time of the data (10:20AM), while the actual data organization reflects the actual time or event time, the data was intended for (hourly buckets from 07:00). These are two key concepts when reasoning about tradeoffs between latency and completeness of data.


When there is late arriving data (data intended for 9:00 arriving >1 hr late at 10:20), we can see the upsert producing new data into even older time buckets/folders. With the help of the timeline, an incremental query attempting to get all new data that was committed successfully since 10:00 hours, is able to very efficiently consume only the changed files without say scanning all the time buckets > 07:00.


3.1.2 文件布局(File Layout)


The following describes the general file layout structure for Apache Hudi

  • Hudi organizes data tables into a directory structure under a base path on a distributed file system
  • Tables are broken up into partitions
  • Within each partition, files are organized into file groups, uniquely identified by a file ID
  • Each file group contains several file slices
  • Each slice contains a base file (.parquet) produced at a certain commit/compaction instant time, along with set of log files (.log.*) that contain inserts/updates to the base file since the base file was produced.

Hudi adopts Multiversion Concurrency Control (MVCC), where compaction action merges logs and base files to produce new file slices and cleaning action gets rid of unused/older file slices to reclaim space on the file system.

以下描述了 Apache Hudi 的一般文件布局结构:

  • Hudi 将数据表组织成分布式文件系统上一个基础路径下的目录结构。  
  • 表被划分为多个分区。  
  • 在每个分区内,文件被组织成文件组,每个文件组由一个唯一的文件 ID 标识。  
  • 每个文件组包含多个文件切片。  
  • 每个切片包含一个基文件(.parquet),该文件是在某个提交/压缩瞬间生成的,以及一组日志文件(.log.*),这些日志文件包含自基文件生成以来对基文件的插入/更新。  

Hudi 采用多版本并发控制(MVCC),其中压缩操作将日志和基文件合并以生成新的文件切片,而清理操作则移除未使用的/旧的文件切片,以回收文件系统上的空间。

adopt: vt.

(2)数据:和hive一样,以分区方式存放数据;分区里面存放着Base File(.parquet)和Log File(.log.*);

(1)Hudi将数据表组织成 分布式文件系统基本路径(basepath)下的目录结构;





         1)一个基本文件(.parquet):在某个commit/compaction 瞬时时间(instant time)生成的(MOR表可能没有);


(6)Hudi采用了多版本并发控制(MVCC:Mutilversion Concurrency Control)



(7)Hudi每个文件的文件名都带有其归属的 FileID(即 FileGroup Identifier)和 base commit time(即 InstanceTime)。通过文件名的 group id 组织 FileGroup 的 logical 关系;通过文件名的 base commit time 组织 FileSlice 的逻辑关系。

(8)Hudi的base file(parquet 文件)在footer的meta去记录了record key组成的BloomFilter,用于在file based index的实现中实现高效率的key contains检测。只有不在BloomFilter的key才需要扫描整个文件消灭假阳。(当一个 key不在这个 BloomFilter中时,意味着它绝对不存在于文件中,因此需要扫描整个文件来查找这个 key就可以了。相反,如果一个 key在 BloomFilter中,那么它很可能存在于文件中,因此只需要扫描包含该值的行即可。这种方式可以大大提高查询效率和性能)

(9)Hudi的log(avro文件)是自己编码的,通过积攒数据buffer以LogBlock为单位写出,每个LogBlock包含magic number/size/content/footer等信息,用于数据读/校验和过滤。

3.1.3 索引(Index)

3.1.4 表类型(Table Types)

3.1.5 查询类型(Query Types)

3.2 数据写

3.2.1 写操作

3.2.2 写流程(UPSERT)

3.2.3 写流程(INSERT)


3.2.5 Key生成策略

3.2.6 删除策略

3.2.7 总结

3.3 数据读

3.3.1 Snapshot读

3.3.2 Incremental读

3.3.3 Streaming读

3.4 Compaction

4. 集成Spark

4.1 环境准备

4.2 spark-shell 方式


# Spark 3.3
spark-shell \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.3 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# Spark 3.2
spark-shell \
  --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.3 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

 # Spark 3.1
spark-shell \
  --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.12.3 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# Spark 2.4
spark-shell \
  --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.12.3 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 

Please note the following

  • For Spark 3.2 and above, the additional spark_catalog config is required: --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
  • We have used hudi-spark-bundle built for scala 2.12 since the spark-avro module used can also depend on 2.12.


// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecordval tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator


// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecordval tableName = "hudi_trips_cow"
val basePath1 = "obs://bigdata-teach/tmp/hudi_trips_cow"
val dataGen = new DataGenerator

 The DataGenerator can generate sample inserts and updates based on the the sample trip schema here.


3)Create Table

// scala
// No separate create table command required in spark. First batch of write to a table will create the table if not exists.  



// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").option(TABLE_NAME, tableName).mode(Overwrite).save(basePath1)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。 

We provided a record key (uuid in schema), partition field (region/country/city) and combine logic (ts in schema) to ensure trip records are unique within each partition. For more info, refer to Modeling data stored in Hudi and for info on ways to ingest data into Hudi, refer to Writing Hudi Tables. Here we are using the default write operation : upsert. If you have a workload without updates, you can also issue insert or bulk_insert operations which could be faster. To know more, refer to Write operations .


4.3 spark-sql方式

# Spark 3.3
spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.3 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
partitioned by (dt, hh)
location 'obs://bigdata-test1233/tmp/tmp/hudi/hudi_cow_pt_tbl'; 

spark-sql (default)> show create table hudi_cow_nonpcf_tbl;
CREATE TABLE default.hudi_cow_nonpcf_tbl (
  _hoodie_commit_time STRING,
  _hoodie_commit_seqno STRING,
  _hoodie_record_key STRING,
  _hoodie_partition_path STRING,
  _hoodie_file_name STRING,
  uuid INT,
  name STRING,
  price DOUBLE)
USING hudi
LOCATION 'obs://bigdata-test1233/tmp/tmp/hudi/hudi_cow_pt_tbl'
  'primaryKey' = 'uuid',
  'type' = 'cow')

insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20; 



4.4 IDEA集成

4.5 DeltaStreamer导入工具

4.6 并发控制

4.7 常规调优

5. 集成Flink

5.1 环境准备

5.2 sql-client方式

5.3 IDEA编码方式

5.4 类型映射

5.5 核心参数设置

5.6 内存优化

5.7 读取方式

5.8 限流

5.9 写入方式

5.10 写入模式

5.11 Bucket索引

5.12 Hudi Catalog

5.13 离线Compaction

5.14 离线Clustering

5.15 常见基础问题

5.16 核心原理分析

6. 集成Hive

6.1 集成步骤

6.2 Hive 同步

6.3 Flink使用HiveCatalog

6.4 创建Hive 外表

6.5 查询Hive外表

6.6 Hive sync tool



  • 与 Cursor AI 对话编程:2小时开发报修维修微信小程序
  • 搭建Tomcat(一)---SocketServerSocket
  • Python 单例模式工厂模式和classmethod装饰器
  • vue3+vite接入iconify,支持离线
  • Docker的初识
  • IP研究 | 大数据洞察黄油小熊的爆火之路
  • 【数字花园】个人知识库网站搭建:②本地部署数字花园
  • 原生微信小程序使用原子化tailwindcss
  • 【数据结构——查找】顺序查找(头歌实践教学平台习题)【合集】
  • Ultra-Fast-Lane-Detection复现、部署及训练
  • kill crash原因分析
  • C++ 泛编程 —— 函数模板(中)
  • rman 迁移数据到其他机器实际实验
  • hive—常用的日期函数
  • ES6 混合 ES5学习记录
  • 【数据结构——栈与队列】链栈的基本运算(头歌实践教学平台习题)【合集】
  • 【蓝桥杯每日一题】砍竹子
  • 黑马商城微服务复习(6)
  • MVC配置文件及位置
  • 【C语言】浮点数的原理、整型如何转换成浮点数