当前位置: 首页 > news >正文

paimon,基础查询语句测试

基础设置

-- 创建catalog/加载catalog,如果这个catalog已经存在就不会创建,自动加载元数据信息CREATE CATALOG fs_paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://wsl01:8020/paimon/catalog'
);
-- 使用catalog
use catalog fs_paimon_catalog;-- sqlClinet使用
-- 设置为批处理模式
SET 'execution.runtime-mode' = 'batch';
-- 设置为流处理模式
SET 'execution.runtime-mode' = 'streaming';
-- 设置查询结果显示方式,sql-clinet 特有
SET 'sql-client.execution.result-mode' = 'tableau';
-- 设置checkpoint,如果使用流模式,必须设置
SET 'execution.checkpointing.interval' = '10 s';root@wsl01:~/soft/paimon/flink-1.17.0# cat fs_catalog.sql
CREATE CATALOG fs_catalog WITH ('type'='paimon','warehouse'='file:/mnt/d/wsl/soft/paimon/catalog'-- 'warehouse' = 'hdfs://wsl01:8020/paimon/catalog'
);
use catalog fs_catalog ;
SET 'sql-client.execution.result-mode' = 'tableau';
-- 默认批处理
SET 'execution.runtime-mode' = 'batch';-- 指定默认启动catalog
bin/sql-client.sh -i fs_catalog.sql

DDL

创建普通表

-- 普通表,没有主键CREATE TABLE t_sample (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
);

创建主键表

CREATE TABLE t_pk (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

创建分区表

-- 分区表的分区字段必须是主键的子集CREATE TABLE t_partition (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);

创建临时表

-- 如果进入paimon创建的catalog后,无法创建非paimon类型的表,如果需要借助第三方的表,需要创建临时表来使用CREATE TEMPORARY TABLE t_temporary (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH ('connector' = 'filesystem','path' = 'file:///mnt/d/wsl/soft/paimon/temp_table.csv','format' = 'csv'
);

复制表-AS,create table as


-- create as 创建主键表,CREATE TABLE t_create_as_pk AS SELECT * FROM t_pk;
show create table t__pk;
show create table t_create_as_pk;-- create as 创建分区表
show create table t_partition ;
CREATE TABLE t_create_as_partition AS SELECT * FROM t_partition;
show create table t_create_as_partition ;

上述执行结果告诉我们,create as 的表,只保留原表的字段,不保留其他属性信息


-- 通过with 重新指定,关于with的用法,参考flink
CREATE TABLE t_create_as_with with ('primary-key' = 'dt,hh','partition' = 'dt') AS SELECT * FROM t_pk  ;
show create table t_create_as_with;

上述执行结果告诉我们,create as 的表可以通过with 重新指定属性信息

复制表-LIKE,create table like

CREATE TABLE t_create_like like t_pk;
show create table t_pk;
show create table t_create_like;

上述执行结果告诉我们,create like 的表,保留全部信息

DML

常用管理语句

desc #{name}
show create table #{name}
show catalogs;
show databases;
show tables;

新增-普通表

insert into t_sample(user_id,item_id,behavior,dt,hh) values(100,100,'sing-sample','1','2');insert into t_sample  values(101,101,'jump-sample','1','2');insert into t_sample  select 102,102,'rap-sample','1','2';Flink SQL> select * from t_sample;
+---------+---------+-------------+----+----+
| user_id | item_id |    behavior | dt | hh |
+---------+---------+-------------+----+----+
|     100 |     100 | sing-sample |  1 |  2 |
|     101 |     101 | jump-sample |  1 |  2 |
|     102 |     102 |  rap-sample |  1 |  2 |
+---------+---------+-------------+----+----+
3 rows in set

新增-主键表

insert into t_pk values(1,1,'sing','1','2');
insert into t_pk values(2,2,'jump','1','2');
insert into t_pk values(3,3,'rap','1','2');
Flink SQL> select * from t_pk;
+---------+---------+----------+----+----+
| user_id | item_id | behavior | dt | hh |
+---------+---------+----------+----+----+
|       1 |       1 |     sing |  1 |  2 |
|       2 |       2 |     jump |  1 |  2 |
|       3 |       3 |      rap |  1 |  2 |
+---------+---------+----------+----+----+
3 rows in set
insert into t_pk values(3,3,'basketball','1','2');
Flink SQL>  select * from t_pk;
+---------+---------+------------+----+----+
| user_id | item_id |   behavior | dt | hh |
+---------+---------+------------+----+----+
|       1 |       1 |       sing |  1 |  2 |
|       2 |       2 |       jump |  1 |  2 |
|       3 |       3 | basketball |  1 |  2 |
+---------+---------+------------+----+----+
3 rows in set-- 我们发现,主键表,写入两条相同主键的数据,后者会覆盖前者
-- 主键表有一个默认引擎,默认是就是 'merge-engine' = 'deduplicate',因此才有这个效果

新增-分区表

insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(1,1,'sing');
insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(2,2,'jump');
insert into t_partition(user_id,item_id,behavior,dt,hh) values(3,3,'rap','2024-10-08','16');
insert into t_partition values(4,4,'basketball','2024-10-08','16');Flink SQL> select * from t_partition;
+---------+---------+------------+------------+----+
| user_id | item_id |   behavior |         dt | hh |
+---------+---------+------------+------------+----+
|       1 |       1 |       sing | 2024-10-08 | 15 |
|       2 |       2 |       jump | 2024-10-08 | 15 |
|       3 |       3 |        rap | 2024-10-08 | 16 |
|       4 |       4 | basketball | 2024-10-08 | 16 |
+---------+---------+------------+------------+----+
4 rows in setinsert into t_partition as select * from t_sample;insert into t_partition partition(dt='2099-10-08',hh='15')(user_id,item_id,behavior) select user_id,item_id,behavior from t_sample;Flink SQL> select * from t_partition;
+---------+---------+-------------+------------+----+
| user_id | item_id |    behavior |         dt | hh |
+---------+---------+-------------+------------+----+
|       1 |       1 |        sing | 2024-10-08 | 15 |
|       2 |       2 |        jump | 2024-10-08 | 15 |
|     100 |     100 | sing-sample | 2099-10-08 | 15 |
|     101 |     101 | jump-sample | 2099-10-08 | 15 |
|     102 |     102 |  rap-sample | 2099-10-08 | 15 |
|       3 |       3 |         rap | 2024-10-08 | 16 |
|       4 |       4 |  basketball | 2024-10-08 | 16 |
|     100 |     100 | sing-sample |          1 |  2 |
|     101 |     101 | jump-sample |          1 |  2 |
|     102 |     102 |  rap-sample |          1 |  2 |
+---------+---------+-------------+------------+----+
10 rows in setFlink SQL> insert overwrite t_partition(user_id,item_id,behavior) values(5,5,'non-partition');
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Column 'dt' has no default value and does not allow NULLs

以上多种写入方式都支持,把分区字段当成普通字段用就行,但是分区字段不能为空

新增-覆盖写入

Flink SQL> select * from t_pk;
+---------+---------+------------+----+----+
| user_id | item_id |   behavior | dt | hh |
+---------+---------+------------+----+----+
|       1 |       1 |       sing |  1 |  2 |
|       2 |       2 |       jump |  1 |  2 |
|       3 |       3 | basketball |  1 |  2 |
+---------+---------+------------+----+----+insert into t_pk values(10,10,'overwrite','1','2');Flink SQL> select * from t_pk;
+---------+---------+-----------+----+----+
| user_id | item_id |  behavior | dt | hh |
+---------+---------+-----------+----+----+
|      10 |      10 | overwrite |  1 |  2 |
+---------+---------+-----------+----+----+
1 row in set

overwrite 会直接清空表,不会考虑主键


insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(1,1,'sing');
insert into t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(2,2,'jump');
insert into t_partition partition(dt='2024-10-09',hh='15')(user_id,item_id,behavior) values(3,3,'rap');
Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
|       1 |       1 |     sing | 2024-10-08 | 15 |
|       2 |       2 |     jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in setinsert overwrite t_partition partition(dt='2024-10-08',hh='15')(user_id,item_id,behavior) values(4,4,'basketball');Flink SQL> select * from t_partition;
+---------+---------+------------+------------+----+
| user_id | item_id |   behavior |         dt | hh |
+---------+---------+------------+------------+----+
|       3 |       3 |        rap | 2024-10-09 | 15 |
|       4 |       4 | basketball | 2024-10-08 | 15 |
+---------+---------+------------+------------+----+
2 rows in set

分区表只会overwrite 当前要写入分区


Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
|       1 |       1 |     sing | 2024-10-08 | 15 |
|       2 |       2 |     jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in set
-- 对指定分区写入空记录,没有效果INSERT OVERWRITE t_partition  PARTITION (`dt` = '2024-10-08', `hh` = '15') SELECT user_id,item_id,behavior FROM t_partition WHERE false;Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
|       1 |       1 |     sing | 2024-10-08 | 15 |
|       2 |       2 |     jump | 2024-10-08 | 15 |
+---------+---------+----------+------------+----+
3 rows in set-- 对指定分区写入空记录,指定 /*+ OPTIONS('dynamic-partition-overwrite'='false') */,会清空指定的分区INSERT OVERWRITE t_partition  /*+ OPTIONS('dynamic-partition-overwrite'='false') */  PARTITION (`dt` = '2024-10-08', `hh` = '15') SELECT user_id,item_id,behavior FROM t_partition WHERE false;
Flink SQL> select * from t_partition;
+---------+---------+----------+------------+----+
| user_id | item_id | behavior |         dt | hh |
+---------+---------+----------+------------+----+
|       3 |       3 |      rap | 2024-10-09 | 15 |
+---------+---------+----------+------------+----+
1 row in set-- 不指定分区写入空记录,指定 /*+ OPTIONS('dynamic-partition-overwrite'='false') */,会清空所有分区,truncate
INSERT OVERWRITE t_partition /*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM t_partition WHERE false;Flink SQL> select * from t_partition;
Empty set

/*+ OPTIONS('dynamic-partition-overwrite'='false') */ Flink默认的覆盖模式是动态分区覆盖 (即Paimon只删除覆盖数据中出现的分区)。可以配置动态分区覆盖将其更改为静态覆盖。

paimon 没有truncate,因此我们可以借助overwite+静态覆盖,这个实现truncate

查询


修改

Important table properties setting:

  • Only primary key table supports this feature. 表必须有主键
  • MergeEngine needs to be deduplicate or partial-update to support this feature. 合并引擎必须为deduplicate,以后会支持partial-update
  • Do not support updating primary keys. 不能修改主键
  • Flink 版本1.17 及以上版本才支持
  • 必须是批处理模式
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;Flink SQL> select * from t_partition;
+---------+---------+-------------+----+----+
| user_id | item_id |    behavior | dt | hh |
+---------+---------+-------------+----+----+
|     100 |     100 | sing-sample |  1 |  2 |
|     101 |     101 | jump-sample |  1 |  2 |
|     102 |     102 |  rap-sample |  1 |  2 |
+---------+---------+-------------+----+----+
3 rows in setupdate t_partition set behavior = 'baskteball-sample' where user_id =100;Flink SQL> select * from t_partition;
+---------+---------+-------------------+----+----+
| user_id | item_id |          behavior | dt | hh |
+---------+---------+-------------------+----+----+
|     100 |     100 | baskteball-sample |  1 |  2 |
|     101 |     101 |       jump-sample |  1 |  2 |
|     102 |     102 |        rap-sample |  1 |  2 |
+---------+---------+-------------------+----+----+
3 rows in set

删除


常用属性

CREATE TABLE my_table (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);

统计数优化

paimon 会默认为每一列添加3个统计属性:最大值、最小值、null值数量

有四种配置来约束统计属性

  • full:为所有数据添加最大值、最小值、null值数量统计
  • truncate(length):截断length长度后,为所有数据添加最大值、最小值、null值数量统计,这个是默认值:默认length 16,为了避免长文本字段的统计
  • counts:只对null值数量统计
  • none:不统计

如果需要修改某个字段的统计属性

  • fields.{field_name}.stats-mode,with ( ‘fields.behavior.stats-mode’ = ‘full’ )

官网原文:https://paimon.apache.org/docs/master/flink/sql-ddl/#specify-statistics-mode
在这里插入图片描述

字段默认值

paimon表可以设置字段默认值,但是 不能 对主键字段设置默认值

如果需要修改某个字段的统计属性

  • fields.{field_name}.default-value
  • with ( ‘fields.behavior.default-value’ = ‘sing’ )

官网原文:https://paimon.apache.org/docs/master/flink/sql-ddl/#field-default-value
在这里插入图片描述

聚类写入

在批作业中配置该参数可以启用聚类写入功能,使数据在特定列上按大小范围聚集分布,从而提升该列的查询速度。只能在批处理或者append table(流处理)中使用。

多个列名请使用英文逗号(,)进行分隔,例如’col1,col2’。

  • sink.clustering.by-columns
  • with ( ‘sink.clustering.by-columns’ = ‘user_id,item_id’ )

也可以使用Hints

  • INSERT INTO my_table /*+ OPTIONS(‘sink.clustering.by-columns’ = ‘a,b’) */ SELECT * FROM source;

动态覆盖

Flink overwrite模式,在分区表中默认是动态分区覆盖,也就是说在使用overwrite时,只覆盖当前写入分区的数据,写入数据为空时,不进行覆盖,我们可以设置为静态覆盖,当写入数据为空时,也会覆盖。如果写入的分区为空则覆盖所有分区!

Hints,跟在表的后边,也就是声明本次sql的执行策略,dynamic-partition-overwrite=false > 静态覆盖,truncate的替代语法(TRUNCATE TABLE my_table 需要flink 1.18 之后才支持)
INSERT OVERWRITE my_table /*+ OPTIONS(‘dynamic-partition-overwrite’=‘false’) */

合并引擎


http://www.mrgr.cn/news/46375.html

相关文章:

  • CSS的小知识
  • Redis--21--大Key问题解决方案
  • mysql性能压测
  • ros2笔记-6.2 使用urdf创建机器人模型
  • STC89C51与AT89C51芯片区别深度剖析
  • 设计一篇利用python爬虫获取1688详情API接口的长篇软文
  • 【Oracle APEX开发小技巧9】通过页面设置文本大写避免upper()函数转换占用额外资源
  • Hugging face简要介绍
  • 【Java】集合中单列集合详解(一):Collection与List
  • 算法 动态规划
  • C#中Json序列化的进阶用法
  • [投稿优惠|稳定检索]2024年电子器件与机械工程、材料国际会议(EDMEM 2024)
  • 系统架构师备考记忆不太清楚的点-信息系统-需求分析
  • 10.9今日错题解析(软考)
  • 低代码开发平台应该归属哪个部门管理?
  • 2003 -Can‘t connect to MySQL server on‘192.168.‘(10060 “Unknown error“
  • Maven 三种项目打包方式:POM、JAR 和 WAR 的区别详解
  • 基于开元鸿蒙(OpenHarmony)的【智能药房与药品管理综合应用系统】
  • 微服务实战——登录(普通登录、社交登录、SSO单点登录)
  • 新硬盘第一次使用需要怎样做?
  • JDK17安装教程
  • 超详解C++类与对象(中)
  • 学习内容的记录学习方向
  • 关于halcon深度图tiff生成点云文件
  • 【新品发布】数字能源EMS管理再掀新篇章
  • 环保数采仪下噪音在线监测,打造声环境精准管控解决方案