当前位置: 首页 > news >正文

[实战-11] FlinkSql 设置时区对TIMESTAMP和TIMESTAMP_LTZ的影响

table.local-time-zone

  • table.local-time-zone
  • DataStream-to-Table Conversion(拓展知识)
  • 代码测试
  • flinksql代码
  • 执行结果截图
    • 1. Asia/Shanghai 结果如下
    • 2. UTC结果如下

table.local-time-zone

table.local-time-zone可用于设置flinksql的时区。
flink的内置数据类型TIMESTAMP(n)或者是TIMESTAMP_LTZ(n), 我们设置水位线都是基于这两种类型,不同的是前者本质是字符串形势,后者本质是long,也因此前者不受时区影响,后者受时区影响类型。(n指的毫秒级的精度取值范围是 0~9)
原始数据库如果不是时间类型,可能要用TO_TIMESTAMP(字符串格式的时间)或者TO_TIMESTAMP_LTZ(long数字,n)

如果原始数据库是string则需要用TO_TIMESTAMP(字符串格式的时间字段)转成TIMESTAMP(n)
如果原始数据库中是long则需要用TO_TIMESTAMP_LTZ(long数字,n) 转成TIMESTAMP_LTZ(n)

DataStream-to-Table Conversion(拓展知识)

datastream API到Table Api转换的时候,是以后string的形式传递event_time, 并且这个string在DataStream Api是以UTC时区转换的,如果你的原始数据中是long, 如果不做处理展示出来的string就是UTC字符串,为了在东八区展示,则需要将long再加上8小时

        // 水位线 允许乱序WatermarkStrategy<String> waterStrategy = WatermarkStrategy.<String>forMonotonousTimestamps() //ofSeconds(20).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {try {Mybook book= JSON.parseObject(element,Mybook.class);return boo.time+8*60*60*1000  //转成东八区}catch (Exception e){return recordTimestamp;}}}).withIdleness(Duration.ofSeconds(timeWindowIdleness));SingleOutputStreamOperator<UserSlotGame> processStream = env.fromSource(source, waterStrategy, "readKafka").process(new ProcessFunction<String, UserSlotGame>() {@Overridepublic void processElement(String value, Context ctx, Collector<UserSlotGame> out) throws Exception {// 省略}}) ;

代码测试

mysql时区是Asia/Shanghai

CREATE TABLE `versioned_rates` (`operation_code` int DEFAULT NULL,`update_time` varchar(255) DEFAULT NULL, -- 注意这是字符串`product_id` varchar(255) DEFAULT NULL,`product_name` varchar(255) DEFAULT NULL,`price` float DEFAULT NULL,`time_long` bigint NOT NULL DEFAULT '0' -- 注意这是long
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ciINSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(1, '2024-01-01 00:01:00', 'p_001', 'scooter', 11.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(1, '2024-01-01 00:02:00', 'p_002', 'basketball', 23.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(2, '2024-01-01 12:00:00', 'p_001', 'scooter', 11.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(3, '2024-01-01 12:00:00', 'p_001', 'scooter', 12.99, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(2, '2024-01-01 12:00:00', 'p_002', 'basketball', 23.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(3, '2024-01-01 12:00:00', 'p_002', 'basketball', 19.99, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(4, '2024-01-01 18:00:00', 'p_001', 'scooter', 12.99, 1730346179000);

flinksql代码

package com.pg.TableAndDataStreamApi;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;/*
*
* */
public class version_table {private static final String SOURCE="CREATE TABLE source_table(\n" +"\toperation_code int,\n" +"\tupdate_time string,\n" +"\tup_t AS TO_TIMESTAMP(update_time),\n" +"\ttime_long bigint,\n" +"\tbbb AS TO_TIMESTAMP_LTZ(time_long,3) \n" +"    ) WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://ip:3306/flink',\n" +"   'driver'='com.mysql.cj.jdbc.Driver',\n "+"   'username'='用户名',\n"+"   'password'='密码',\n"+"   'table-name' = 'versioned_rates'\n" +")";public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql(SOURCE);Configuration configuration = new Configuration();
//        configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "Asia/Shanghai");tableEnv.getConfig().addConfiguration(configuration);// 从 MySQL 表中选择所有行Table t = tableEnv.sqlQuery("select * from source_table");t.execute().print();}
}

执行结果截图

TO_TIMESTAMP_LTZ 受时区影响
而TO_TIMESTAMP()意味着原始数据中本就是string, 是不会受到时区影响的

  1. 下方第一个红色列不管是UTC还是 Asia/Shanghai 我们看大的string都是一样的
  2. 下方第一个红色列UTC比 Asia/Shanghai 少了8个小时

1. Asia/Shanghai 结果如下

在这里插入图片描述

2. UTC结果如下

在这里插入图片描述


http://www.mrgr.cn/news/65294.html

相关文章:

  • Linux(inode + 软硬链接 图片+大白话)
  • 开源竞争-数据驱动成长-11/05-大专生的思考
  • qt QStatusBar详解
  • Matlab实现鲸鱼优化算法优化随机森林算法模型 (WOA-RF)(附源码)
  • leetcode224:基本计算器
  • git入门教程9:配置Git钩子
  • 每日一问:什么是SQL注入?注入方式和预防方法有哪些?
  • 100种算法【Python版】第35篇——PageRank算法
  • Java中的排序
  • 爱普生SG-8101CA可编程晶振应用在工业自动化机器人
  • 从0开始学习Linux——文本编辑器
  • java动态导入导出excel,javassist动态创建类
  • C/C++ stackful 有栈协同程式的一些缺点。
  • django电商易购系统-计算机设计毕业源码61059
  • JAVA通过AOP自定义注解记录日志
  • 100种算法【Python版】第38篇—— Tarjan算法
  • 智能推荐系统介绍
  • 【人工智能-初级】练习题:matplotlib基础练习30例
  • Python 中的迭代器与生成器详解
  • 关于halcon的可变形logo模板匹配find_local_deformable_modle_xld解释及简化匹配代码
  • JavaScript函数
  • 物联网赋能的人工智能图像检测系统
  • 探索 Python 的新天地:Helium 库揭秘
  • 代码随想录训练营Day15 | 530.二叉搜索树的最小绝对差 - 501.二叉搜索树中的众数 - 236. 二叉树的最近公共祖先
  • 15.函数的重载
  • 04741计算机网络原理真题-CRC的计算-案例分析