[实时计算flink]维表JOIN语句
对于每条流式数据,可以关联一个外部维表数据源,为实时计算Flink版提供数据关联查询。
背景信息
大部分连接器的维表Join都可以使用Cache策略,不同连接器对Cache策略的支持情况稍有不同,请查看对应的连接器文档确定具体的支持情况。通用的Cache策略详情如下:
-
None(默认值):无缓存。
-
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
-
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在。全量的Cache有一个过期时间,过期后会重新加载一遍全量Cache。适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。
说明
-
您需要根据具体业务需求,在平衡实时性和性能之间进行权衡。如果对数据实时性要求非常高,需要实时更新,可以不使用Cache,直接从维表读取。
-
如果使用Cache策略,可以配合LRU和TTL来实现较新的缓存数据。TTL可以设置的较短,例如几秒至几十秒,定期从源表加载数据。
-
使用ALL缓存策略时,请注意节点内存大小,防止出现OOM。
-
因为系统会异步加载维表数据,所以在使用ALL缓存策略时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
使用限制
-
维表JOIN仅支持对当前时刻维表快照的关联。
-
维表支持INNER JOIN和LEFT JOIN,不支持RIGHT JOIN或FULL JOIN。
注意事项
-
如果您有一对一JOIN需求,请确保连接条件中包含了维表中具有唯一性字段的等值连接条件。
-
对每条流式数据,只会关联当时维表的最新版本数据,即JOIN行为只发生在处理时间(Processing Time)。如果JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化。具体的维表的行为请参见对应连接器行为。
维表JOIN语法
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
说明
-
必须加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN维表当前时刻所看到的每条数据。
-
ON条件中必须包含维表实际能支持随机查找的字段的等值条件。
-
ON条件中维表字段不能使用CAST等类型转换函数。如果您有类型转换需求,请在源表字段进行操作。
维表JOIN Hints
您可以通过使用维表Hints(Hint功能参见Flink SQL Hints)对维表Join的策略进行配置。维表Hints包含Lookup Hint与其他Join Hints。
说明
-
仅VVR 8.0及以上版本支持Lookup Hint。
-
仅VVR 8.0.8及以上版本支持通过Lookup Hint配置是否开启shuffle策略。
-
VVR 8.0以上支持使用别名,如果维表定义了别名,Hint中必须使用别名。
-
仅VVR 4.0及以上版本支持其他Join Hints。
Lookup Hint
Lookup Hint功能和社区保持一致,可以用于配置维表的同步、异步、重试查找策略,详情参见Lookup Hint。VVR 8.0.8及以上版本对Lookup Hint的功能进行了扩展,支持配置通过'shuffle' = 'true'
选项配置维表联接时的shuffle策略,不同场景的shuffle策略如下表所示。
场景 | 联接策略 |
不配置'shuffle' = 'true'选项 | 使用引擎默认的shuffle策略。 |
不配置'shuffle' = 'true'选项,且维表连接器不提供自定义联接策略 | |
配置'shuffle' = 'true' 选项,且维表连接器不提供自定义联接策略 | 默认使用SHUFFLE_HASH策略,含义请参见SHUFFLE_HASH。 |
配置'shuffle' = 'true' 选项,且维表连接器提供自定义联接策略 | 使用表连接器的自定义shuffle策略。 |
说明
目前仅流式数据湖仓Paimon会提供自定义shuffle策略,具体会在Join字段包含全部分桶字段的情况下基于bucket进行shuffle。
对维表配置联接时的shuffle策略代码示例如下。
-- 只对维表dim1配置维表联接shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b-- 同时对维表dim1, dim2配置维表联接shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b-- 对维表dim1必须使用别名D1配置维表联接shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b-- 同时对维表dim1, dim2通过别名配置维表联接shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
其他Join Hints
维表Join Hints仅用于配置维表联接策略,包括SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH和SKEW。维表Cache策略和联接策略之间的适用场景详情如下表所示。
Cache策略 | SHUFFLE_HASH | REPLICATED_SHFFLE_HASH (和SKEW等价) |
None | 不建议使用该联接策略提示,主流会引入额外的网络开销。 | 不建议使用该联接策略提示,主流会引入额外的网络开销。 |
LRU | 在维表查找IO成为瓶颈时,建议考虑使用该联接策略提示。当主流数据在Join Key上有时间局部性时,可以提高Cache命中率,减少IO请求数,从而提升总吞吐。 重要 主流会引入额外的网络开销,当主流数据在Join Key上有倾斜,遇到性能瓶颈时,建议考虑REPLICATED_SHUFFLE_HASH。 | 在维表查找IO成为瓶颈且主流数据在Join Key上有倾斜时,建议考虑该联接策略提示。当主流数据在Join Key上有时间局部性时,可以提高Cache命中率,减少IO请求数,从而提升总吞吐。 |
ALL | 在维表内存使用量成为瓶颈时,建议使用该联接策略提示。内存使用率可降低为1/并发度。 重要 主流会引入额外的网络开销,当主流数据在Join Key上有倾斜,遇到性能瓶颈时,建议考虑REPLICATED_SHUFFLE_HASH。 | 在维表内存使用量成为瓶颈且主流数据在Join Key上有倾斜时,建议使用该联接策略提示。内存使用率降低为分桶数/并发度。 |
SHUFFLE_HASH
-
使用效果
在维表Join中使用Shuffle Hash策略,可以将主流数据在Join之前根据Join Key做一次shuffle。在使用LRU Cache策略时可以提高Cache命中率,减少IO请求数;在使用ALL Cache策略时可以减少内存使用量。每个SHUFFLE_HASH联接提示可指定多张维表。
-
使用限制
虽然SHUFFLE_HASH可以减少内存开销,但是由于上游数据需要按照Join Key做一次shuffle,引入额外的网络开销,因此下面两种场景不适合使用SHUFFLE_HASH联接策略。
-
主流数据在Join Key上存在严重的数据倾斜,这种场景下如果使用SHUFFLE_HASH联接,会因为数据倾斜导致Join节点成为性能瓶颈,从而会导致流作业出现严重反压或是批场景出现严重长尾,此时建议使用REPLICATED_SHUFFLE_HASH联接。
-
维表数据较小,ALL Cache策略加载没有内存瓶颈时,如果使用SHUFFLE_HASH联接,节约的内存开销和额外引入的网络开销相比,可能并不划算。
-
-
代码示例
-- 只对维表dim1开启SHUFFLE_HASH联接。 SELECT /*+ SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b-- 同时对维表dim1, dim2均开启SHUFFLE_HASH联接。 SELECT /*+ SHUFFLE_HASH(dim1, dim2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b-- 对维表dim1必须使用别名D1开启SHUFFLE_HASH联接。 SELECT /*+ SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b-- 同时对维表dim1, dim2通过别名开启SHUFFLE_HASH联接。 SELECT /*+ SHUFFLE_HASH(D1, D2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
REPLICATED_SHUFFLE_HASH
-
使用效果
在维表Join中使用Replicated Shuffle Hash策略,其效果基本与SHUFFLE_HASH一致,但不同点是其会将主流具有相同key的数据随机打散到指定的N个并发上,可以解决数据倾斜导致的性能瓶颈。每个REPLICATED_SHUFFLE_HASH联接提示中可指定多张维表。
-
使用限制
-
需要配置倾斜数据分桶数量参数
table.exec.skew-join.replicate-num
,其默认值为16,取值不能大于维表联接节点的并发。配置方法请参见如何配置作业运行参数?。 -
当前不支持更新流,当主流是更新流时,使用REPLICATED_SHUFFLE_HASH策略会报错。
-
-
代码示例
-- 对维表dim1开启REPLICATED_SHUFFLE_HASH联接 SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a-- 对维表dim1通过别名开启REPLICATED_SHUFFLE_HASH联接 SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
SKEW
-
使用效果
当指定表存在数据倾斜时,优化器会在维表Join中使用Replicated Shuffle Hash策略(Skew只是一个语法糖,底层的实现是用的Replicated Shuffle Hash策略)。
-
使用限制
-
每个SKEW提示只能指定1张表。
-
表名需要为存在数据倾斜的主表名称,而不是维表名称。
-
当前不支持更新流,当主流是更新流时,使用SKEW策略会报错。
-
-
代码示例
SELECT /*+ SKEW(src) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
重要
-
当前LOOKUP Hint的shuffle选项已能覆盖 SHUFFLE_HASH hint功能,两者同时使用时,会优先采纳LOOKUP hint的shuffle选项。
-
当前LOOKUP Hint的shuffle选项还未支持解决数据倾斜的功能,当和REPLICATED_SHUFFLE_HASH、SKEW同时使用时,会优先采纳REPLICATED_SHUFFLE_HASH、SKEW对应的shuffle策略。
使用示例
-
测试数据
-
表1 kafka_input
id(bigint)
name(varchar)
age(bigint)
1
lilei
22
2
hanmeimei
20
3
libai
28
-
表2 phoneNumber
name(varchar)
phoneNumber(bigint)
dufu
1390000111
baijuyi
1390000222
libai
1390000333
lilei
1390000444
-
-
测试语句
CREATE TEMPORARY TABLE kafka_input (id BIGINT,name VARCHAR,age BIGINT ) WITH ('connector' = 'kafka','topic' = '<yourTopic>','properties.bootstrap.servers' = '<yourKafkaBrokers>','properties.group.id' = '<yourKafkaConsumerGroupId>','format' = 'csv' );CREATE TEMPORARY TABLE phoneNumber(name VARCHAR,phoneNumber BIGINT,PRIMARY KEY(name) NOT ENFORCED ) WITH ('connector' = 'mysql','hostname' = '<yourHostname>','port' = '3306','username' = '<yourUsername>','password' = '<yourPassword>','database-name' = '<yourDatabaseName>','table-name' = '<yourTableName>' );CREATE TEMPORARY TABLE result_infor(id BIGINT,phoneNumber BIGINT,name VARCHAR ) WITH ('connector' = 'blackhole' );INSERT INTO result_infor SELECTt.id,w.phoneNumber,t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w ON t.name = w.name;
-
测试结果
id(bigint)
phoneNumber(bigint)
name(varchar)
1
1390000444
lilei
3
1390000333
libai