当前位置: 首页 > news >正文

数仓开发那些事(10)

某神州优秀员工:(没错,这个diao毛被评为了优秀员工)一闪,听说你跑路了,不做零售行业了
一闪:没错,老东家的新it总监上任后大家都开始躺平,失去了当年的动力,所以需要换个环境
某神州优秀员工:躺平不好吗
一闪:等你到我这个年纪你就明白了.....
某神州优秀员工:你tm不是00后吗.....

--------------------------

老员工:一闪,你去把道路抓拍的数据从kafka写到iceberg里面去
一闪:好的老板(关掉知乎,开始干活)
鼓捣了半天,结果数据迟迟写不进去...
kafka的offset明明是正常变化的,为什么sink里面没有数据呢...应该是要排查一下sink端的代码
核对了半天,也没有地方写错啊
老员工:整完了没,晚上加班这个数据要用的
一闪:写不进去呀,在检查呢
老员工:让我康康,卧槽你这个diao毛,你怎么不开ck,不开ck写入不会提交的
一闪:我在测试环节为啥开ck,不过真的是ck导致的吗?
开始排查↓
pipeline.operator-chaining = false 断开算子链
发现确实算子里面输出了数据,但是commit的条数为0
打开ck后,commit条数动了起来,sink里面也出现了数据
这到底是一个什么原理呢?iceberg.apache.org启动!

翻译:在 notifyCheckpointComplete 回调中的 Flink 检查点成功之后发生了iceberg提交。可能会发生 Iceberg 提交失败 (无论什么原因) ,而 Flink 检查点成功。

好的,现在已经明确了ck和iceberg提交的关系了,但是为什么会这么设计呢,这肯定和写入的逻辑有关,再去官网翻翻

所得斯内,原来flink写入的数据都是写入了一个临时文件之中,这个临时文件只有提交之后才会对查询语句可见,实际上flink完成了写入,不过不是直接写入的数据表。

一闪:好了好了,已经可以写入了,开启ck之后我还研究了下背后的原理,还是大佬牛b,来一根思谋克。

老员工:戒了,听说楼下新开了一家咖啡店?
一闪:懂了,我这就去买杯冰美式(一看就是刚抽完一根,进入cd了)
老员工:我要不加糖的
(9.9买了一杯某幸)
一闪:来来来,大佬喝咖啡
老员工:不错不错,看在你这么努力的份上,再给你安排一个任务,根据实时的抓拍数据,把所有连续同行(在60秒内被同一个设备拍到)超过三个路口的车辆组合都找出来,案件分析的时候可以用上
一闪:卧槽...泥马


先来分析一下表结构:
vehicle_id string comment'车牌号'
device_id string comment'设备ID'
ts timestamp(3) comment'时间戳'
再分析一下需求:
一个设备其实就对应了一个路口,需求就可以被描述为:某两辆车有连续三次30s先后出现在摄像头下,关键字就是连续、三次、60s

"连续"代表大概率需要用到row_number()这种窗口函数来判断两辆车的形式轨迹,即第一次被抓拍通过是a路口,第二次被抓拍通过的是b路口
"三次"代表需要限制符合同行情况的次数>=3
"60s"代表抓拍的ts时间差应该在<=60s


直接写出如下代码

合并代码如下

create view rk_flow as 
selecta.vehicle_id     as vehicle_id -- 目标车辆,a.ts             as ts         -- 抓拍时间戳,a.device_id      as device_id  -- 设备id,row_number() over(partition by a.vehicle_id order by a.ts) as rk -- 该车辆第几次被抓拍
from kafka_source a;create view follow_flow as     
selecta.vehicle_id  as vehicle_target -- 目标车辆,b.vehicle_id as co_vehicle     -- 同行车辆,a.ts         as ts             -- 同行开始时间,a.rk         as v_target_rk    -- 目标车辆第几次被抓拍,a.rk - b.rk  as gap            -- 同行车辆出现次数差
// gap相同的记录证明是连续同行的,但是有殊途同归的情况,所以在下一步还要继续处理      
from rk_flow a
inner join rk_flow b
on abs(timestampDiff(second, a.ts, b.ts)) <= 60  // 前后出现不超过60秒and a.device_id = b.device_idand a.vehicle_id <> b.vehicle_id;create view follow_rk_flow as 
selecta.vehicle_target  -- 目标车辆,a.co_vehicle     -- 同行车辆,a.ts             -- 同行时间,a.v_target_rk    -- 目标车辆第几次被抓拍,a.gap            -- 同行车辆出现次数差,row_number() over(partition by vehicle_target,co_vehicle order by ts) as follow_rk
-- 同行组合出现的次数
from follow_flow a;create view result_flow as 
selecta.vehicle_target       -- 目标车辆,a.co_vehicle     	   -- 同行车辆,min(a.ts)                   as start_ts -- 同行开始时间,max(a.ts)                   as end_ts   -- 同行结束时间,a.gap                       as gap      -- 出现判断列,a.v_target_rk - a.follow_rk as follow_gap -- 同行判断列,count(1)                    as cnt      --满足次数
from follow_rk_flow a
group by a.vehicle_target,a.co_vehicle,a.gap,a.v_target_rk - a.follow_rk
having count(1) >= 3;

这时候有小朋友就要问了

"啊,你这不就是最简单的连续性问题吗,为什么被你写了这么多代码,明明应该很简单的哇"

其实一开始我也是这么以为的,但是老员工给我提醒了如下两点点

1.同行不代表一定是甲先乙后通过摄像头,不论谁先谁后,这都是同行车辆组

2.要考虑殊途同归的特殊情况(针对上面我标绿的,必须再加一次rk判断,所以代码看起来长了)

老员工:写的还不错,没有大bug,不过我问问你,如果被你判断为同行的车辆组其实是因为有一部分数据迟到导致算错的,而且你还已经把结果写到数据库里去了,这种场景你怎么办?

一闪:谢特(忘记异常处理这回事了)!如果常有数据迟到的话,我建议是把抓拍数据落到前端数据库,然后每次查询都跑一遍sql去查询,而不是用flink来计算。

老员工:和我想得差不多,毕竟查同行车辆肯定是有目标的去查的,不可能把全市的同行车辆组合全拿出来看吧。对了,我咖啡喝完了,准备去走一根。

一闪:大哥抽烟


http://www.mrgr.cn/news/95095.html

相关文章:

  • YOLOv11 目标检测
  • 网络编程之客户端通过服务器与另外一个客户端交流
  • springCloud集成tdengine(原生和mapper方式) 其一
  • SpringBoot对接DeepSeek
  • dify+deepseek联网搜索:免费开源搜索引擎Searxng使用(让你的大模型也拥有联网的功能)
  • Python功能完美的宝库——内置的强大“武器库”builtins
  • 春天遇到了冬天的吻
  • 【Java】Mybatis学习笔记
  • 火星探测发展概述2025.3.20
  • 如何判断 MSF 的 Payload 是 Staged 还是 Stageless(含 Meterpreter 与普通 Shell 对比)
  • scrollIntoView 的behavior都有哪些属性
  • STM32HAL库,解决串口UART中断接收到的第一个字节数据丢失
  • 基于springboot的房屋租赁系统(008)
  • L2TP实验 作业
  • 数学之握手问题
  • 基于单片机控制的电动汽车双闭环调速系统(论文+源码)
  • 微前端 qiankun vite vue3
  • Day20:丑数
  • 爬虫案例-爬取某狗音乐
  • 神经网络中层与层之间的关联