当前位置: 首页 > news >正文

Flink学习连载文章8--时间语义

Time的分类 (时间语义)

EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间

IngestionTime:摄入时间,是事件/数据到达流处理系统的时间

ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间

EventTime的重要性

假设,你正在去往地下停车场的路上,并且打算用手机点一份外卖。选好了外卖后,你就用在线支付功能付款了,这个时候是11点59分(EventTime)。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。
当你找到自己的车并且开出地下停车场的时候,已经是12点01分了(ProcessingTime)。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。在上面这个场景中你可以看到,
支付数据的事件时间是11点59分,而支付数据的处理时间是12点01分问题:
如果要统计12之前的订单金额,那么这笔交易是否应被统计?
答案:
应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分,
事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准。还可以通过钉钉打卡、饭卡机 等 举例子。一条错误日志的内容为:
2020-11-11 23:59:58 error NullPointExcep --事件时间
进入Flink的时间为2020-11-11 23:59:59      --摄入时间
到达Window的时间为2020-11-12 00:00:01     --处理时间
问题:
对于业务来说,要统计每天的的故障日志个数,哪个时间是最有意义的?
答案:
EventTime事件时间,因为bug真真正正产生的时间就是事件时间,只有事件时间才能真正反映/代表事件的本质! 

总结:
1.事件时间确实重要, 因为它能够代表事件/数据的本质,是事件/数据真真正正发生/产生的时间
2.按照事件时间进去处理/计算,会存在一定的难度, 因为数据可能会因为网路延迟等原因, 发生乱序或延迟到达, 那么最后的计算结果就有可能错误或数据丢失
3.需要有技术来解决上面的问题,使用Watermark技术来解决! 

Watermark是什么?-水印,水位线

为什么会有WaterMark?

当flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示

只要使用event time,就必须使用watermark,在上游指定,比如:source、map算子后.

Watermark的核心本质可以理解成一个延迟触发机制。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。

Watermark就是给数据额外添加的一列时间戳!

Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)

假如明天出去玩,09:00集合,最多允许迟到10分钟。
08:50 胜赛来了    08:50 - 10 = 08:40 
09:05 步迅来了    09:05 - 10 = 08:55 
09:35 青林来了    watermark = 09:35 - 10 = 09:25
能否上到车上的条件是:watermark <= 时间点

Watermark能解决什么问题,如何解决的?

有了Watermark 就可以在一定程度上解决数据乱序或延迟达到问题!

不添加watermark ,窗口如何触发:

1)窗口有数据

2)窗口的结束时间到了。

班车:到了时间点立即发车,来了数据也不要。

有了Watermark就可以根据Watermark来决定窗口的触发时机,满足下面的条件才触发:

1.窗口有数据

2.Watermark >= 窗口的结束时间

满足以上条件则触发窗口计算!

以前窗口触发:系统时间到了窗口结束时间就触发

现在窗口触发:Watermark >= 窗口的结束时间

而Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)

就意味着, 通过Watermark改变了窗口的触发时机了, 那么接下来我们看如何改变的/如何解决前面的问题的

需要记住:

Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)

窗口触发时机 : Watermark >= 窗口的结束时间

水印(watermark)就是一个时间戳,Flink可以给数据流添加水印,可以理解为:收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印,一般人为添加的消息的水印都会比当前消息的事件时间一些

窗口是否关闭,按照水印时间来判断,但原有事件时间不会被修改,窗口的边界依旧是事件时间来决定。

  • 水印并不会影响原有Eventtime
  • 当数据流添加水印后,会按照水印时间来触发窗口计算
  • 一般会设置水印时间,比Eventtime小一些(一般几秒钟)
  • 当接收到的水印时间>= 窗口的endTime且窗口内有数据,则触发计算

水印(水印时间)的计算:事件时间– 设置的水印长度 = 水印时间

比如,事件时间是10分30秒, 水印长度是2秒,那么水印时间就是10分28秒

Watermark图解原理

总结:

Watermark 是一个单独计算出来的时间戳
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(乱序度)
Watermark可以通过改变窗口的触发时机 在 一定程度上解决数据乱序或延迟达到的问题
Watermark >= 窗口结束时间 时 就会触发窗口计算(窗口中得有数据)
延迟或乱序严重的数据还是丢失, 但是可以通过调大 最大允许的延迟时间(乱序度) 来解决, 或 使用后面要学习的侧道输出流来单独收集延迟或乱序严重的数据,保证数据不丢失!

多并行度的水印触发

在多并行度下,每个并行有一个水印

比如并行度是6,那么程序中就有6个watermark

分别属于这6个并行度(线程)

那么,触发条件以6个水印中最小的那个为准

比如, 有个窗口是0-5

其中5个并行度的水印都超过了5

但有一个并行度的水印是3

那么,不管另外5个并行度中的水印达到了多大,都不会触发

因为6个并行度中的6个水印,最小的是3,不满足大于等于窗口结束5的条件

在测试水印的时候,记得把并行度设置为1 ,好看结果,否则,结果不太容易看出来。

Watermark代码演示

需求

实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s,计算5秒内,每个用户的订单总金额

并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。

不使用水印的时候【不能使用eventtime时间语义】,进行开发:

package com.bigdata.day05;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.Random;
import java.util.UUID;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-23 10:07:21** 实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)* 要求每隔5s,计算5秒内,每个用户的订单总金额* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。**/public class _01WatermarkDemo {@Data  // set get toString@AllArgsConstructor@NoArgsConstructorpublic static class OrderInfo2{private String orderId;private int uid;private int money;private long timeStamp;}public static class MySource implements SourceFunction<OrderInfo2> {boolean flag = true;@Overridepublic void run(SourceContext ctx) throws Exception {// 源源不断的产生数据Random random = new Random();while(flag){OrderInfo2 orderInfo = new OrderInfo2();orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(random.nextInt(3));orderInfo.setMoney(random.nextInt(101));orderInfo.setTimeStamp(System.currentTimeMillis());ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥@Overridepublic void cancel() {flag = false;}}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<OrderInfo2> orderSourceStream = env.addSource(new MySource());//3. transformation-数据处理转换// 每个用户的订单总额KeyedStream<OrderInfo2, Integer> keyedStream = orderDSWithWatermark.keyBy(orderInfo2 -> orderInfo2.getUid());SingleOutputStreamOperator<OrderInfo2> result1 = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money");result1.print();//4. sink-数据输出//5. execute-执行env.execute();}
}

假如出现如下错误:

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.bigdata.day05.OrderInfo2>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:224)at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:53)at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:688)at com.bigdata.day05._01WatermarkDemo.main(_01WatermarkDemo.java:103)

说明这个pojo 必须是public 的,否则不解析。

修正过的代码:

package com.bigdata.time;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
public class OrderInfo{private String orderId;private int uid;private int money;private long timeStamp;
}
package com.bigdata.time;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.Date;
import java.util.Random;
import java.util.UUID;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-11-26 10:30:28**/class MySource extends RichSourceFunction<OrderInfo> {boolean flag = true;@Overridepublic void run(SourceContext<OrderInfo> ctx) throws Exception {while(flag){OrderInfo orderInfo = new OrderInfo();Random random = new Random();int userId = random.nextInt(10);int money = random.nextInt(100);long timeStamp = System.currentTimeMillis() - random.nextInt(3000);orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(userId);orderInfo.setMoney(money);orderInfo.setTimeStamp(timeStamp);ctx.collect(orderInfo);Thread.sleep(20);}}@Overridepublic void cancel() {flag = false;}
}
public class _01_OrderDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSource<OrderInfo> streamSource = env.addSource(new MySource());//3. transformation-数据处理转换streamSource.keyBy(new KeySelector<OrderInfo, Integer>() {@Overridepublic Integer getKey(OrderInfo orderInfo) throws Exception {return orderInfo.getUid();}}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.sum("money").print();.apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer userId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {String beginTime = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");String endTime = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");int sum = 0;for (OrderInfo orderInfo : input) {sum += orderInfo.getMoney();}out.collect(beginTime+","+endTime+","+userId +","+sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

需求升级:

实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

* 要求每隔5s,计算5秒内,每个用户的订单总金额

* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。

假如你添加了 eventTime 缺没有添加水印的代码,会报如下错误:

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:83)at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:302)

代码演示-开发版

生成 Watermark | Apache Flink

package com.bigdata.time;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-11-26 10:30:28**/public class _02_OrderDemoWithWaterMark {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSource<OrderInfo> streamSource = env.addSource(new MySource());//3. transformation-数据处理转换streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {// long 是时间戳吗?是秒值还是毫秒呢?年月日时分秒的的字段怎么办呢?@Overridepublic long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {// 这个方法的返回值是毫秒,所有的数据只要不是这个毫秒值,都需要转换为毫秒return orderInfo.getTimeStamp();}})).keyBy(new KeySelector<OrderInfo, Integer>() {@Overridepublic Integer getKey(OrderInfo orderInfo) throws Exception {return orderInfo.getUid();}}).window(TumblingEventTimeWindows.of(Time.seconds(5)))//.sum("money").print();.apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer userId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {String beginTime = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");String endTime = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");int sum = 0;for (OrderInfo orderInfo : input) {sum += orderInfo.getMoney();}out.collect(beginTime+","+endTime+","+userId +","+sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。

WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)

我们实现一个延迟3秒的固定延迟水印,可以这样做:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));

Flink对于迟到数据的处理

水印:对于迟到数据不长

allowedLateness: 迟到时间很长

侧道输出:对于迟到时间特别长

对于延迟数据的理解:

水印机制(水位线、watermark)机制可以帮助我们在短期延迟下,允许乱序数据的到来。

这个机制很好的处理了那些因为网络等情况短期延迟的数据,让窗口等它们一会儿。

但是水印机制无法长期的等待下去,因为水印机制简单说就是让窗口一直等在那里,等达到水印时间才会触发计算和关闭窗口

这个等待不能一直等,因为会一直缓着数据不计算。

一般水印也就是几秒钟最多几分钟而已(看业务)

那么,在现实世界中,延迟数据除了有短期延迟外,长期延迟也是很常见的。

比如:

l 客户端断网,等了好几个小时才恢复

l 车联网系统进入隧道后没有信号无法上报数据

l 手机欠费没有网

等等,这些场景下数据的迟到就不是简单的网络堵塞造成的几秒延迟了

而是小时、天级别的延迟

对于水印来说,这样的长期延迟数据是无法很好处理的。

那么有没有什么办法去处理这些长期延迟的数据呢?让其可以找到其所属的窗口正常完成计算,哪怕晚了几个小时。

这个场景的解决方式就是:延迟数据处理机制(allowedLateness方法)

水印:乱序数据处理(时间很短的延迟)

延迟处理:长期延迟数据的处理机制。

延迟数据的处理:

waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,对于延迟的数据Flink也有自己的解决办法,

主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据

设置允许延迟的时间是通过allowedLateness(lateness: Time)设置

保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存

获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取

1)allowedLateness(lateness: Time)

当我们对流设置窗口后得到的WindowedSteam对象就可以使用allowedLateness方法

该方法传入一个Time值,设置允许的长期延迟(迟到)的时间。

和watermark不同。

未设置allowedLateness(为0),当watermark满足条件,会触发窗口的 执行 + 关闭

当设置了allowedLateness,当watermark满足条件后,只会触发窗口的执行,不会触发窗口关闭

也就是,watermark满足条件后会正常触发窗口计算,将已有的数据完成计算。

但是,不会关闭窗口。如果在allowedLateness允许的时间内仍有这个窗口的数据进来,那么每进来一条,会和已经计算过的(被watermark触发的)数据一起在计算一次

水印:短期延迟,达到条件后触发计算并且关闭窗口(触发+关闭同时进行)

水印+allowedLateness : 短期延迟+ 等待长期延迟效果, 达到水印条件后,会触发窗口计算,但是不关闭窗口。事件时间延迟达到水印+allowedLateness之和后会关闭窗口。

2) 侧输出-SideOutput

Flink 通过watermark在短时间内允许了乱序到来的数据

通过延迟数据处理机制,可以处理长期迟到的数据。

但总有那么些数据来的晚的太久了。允许迟到1天的设置,它迟到了2天才来。

对于这样的迟到数据,水印无能为力,设置allowedLateness也无能为力,那对于这样的数据Flink就只能任其丢掉了吗?

不会,Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口,但是真的迟到太久了,Flink也有一个机制将这些数据收集起来

保存成为一个DataStream,然后,交由开发人员自行处理。

那么这个机制就叫做侧输出机制(Side Output)

侧输出机制:可以将错过水印又错过allowedLateness允许的时间的数据,单独的存放到一个DataStream中,然后开发人员可以自定逻辑对这些超级迟到数据进行处理。

处理主要使用两个方式:

对窗口对象调用sideOutputLateData(OutputTag outputTag)方法,将数据存储到一个地方

对DataStream对象调用getSideOutput(OutputTag outputTag)方法,取出这些被单独处理的数据的DataStream

注意,取到的是一个DataStream,这意味着你可以对这些超级迟到数据继续写 如keyBy, window等处理逻辑。


使用方式:
先定义OutputTag对象(注意,必须new一个匿名内部类形式的OutputTag对象的实例)
然后调用sideOutputLateData方法
// side output   OutputTag对象必须是匿名内部类的形式创建出来, 本质上得到的是OutputTag对象的一个匿名子类
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("side output"){};
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> sideOutputLateData =allowedLateness.sideOutputLateData(outputTag);
用法:
DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);
// 对得到的保存超级迟到数据的DataStream进行处理
sideOutput.print("late>>>");

代码演示-完美版/企业版

前面的案例中已经可以使用Watermark 来解决一定程度上的数据延迟和数据乱序问题

但是对于延迟/迟到/乱序严重的数据还是会丢失,所以接下来

使用Watermark + AllowedLateness + SideOutput ,即使用侧道输出机制来单独收集延迟/迟到/乱序严重的数据,避免数据丢失!

package com.bigdata.day05;import com.bigdata.day04.OrderInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
import java.util.UUID;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-15 17:06:04**/class MyOrderSource2 implements SourceFunction<OrderInfo> {@Overridepublic void run(SourceContext<OrderInfo> ctx) throws Exception {Random random = new Random();while(true){OrderInfo orderInfo = new OrderInfo();orderInfo.setOrderId(UUID.randomUUID().toString().replace("-",""));// 在这个地方可以模拟迟到数据long orderTime = System.currentTimeMillis() - 1000*random.nextInt(100);orderInfo.setOrdertime(orderTime);int money = random.nextInt(10);System.out.println("订单产生的时间:"+ DateFormatUtils.format(orderTime,"yyyy-MM-dd HH:mm:ss")+",金额:"+money);orderInfo.setMoney(money);orderInfo.setUserId(random.nextInt(2));ctx.collect(orderInfo);Thread.sleep(500);}}@Overridepublic void cancel() {}
}
public class Demo01 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// 每隔五秒统计每个用户的前面5秒的订单的总金额//2. source-加载数据DataStreamSource<OrderInfo> streamSource = env.addSource(new MyOrderSource2());//-2.告诉Flink最大允许的延迟时间/乱序时间为多少SingleOutputStreamOperator<OrderInfo> orderDSWithWatermark = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))//-3.告诉Flink哪一列是事件时间.withTimestampAssigner((order, time) -> order.getOrdertime()));OutputTag<OrderInfo> outputTag = new OutputTag<OrderInfo>("side output"){};//3. transformation-数据处理转换SingleOutputStreamOperator<String> result = orderDSWithWatermark.keyBy(orderInfo -> orderInfo.getUserId()).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(4)).sideOutputLateData(outputTag).apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer key,  // 代表分组key值     五旬老太守国门TimeWindow window, // 代表窗口对象Iterable<OrderInfo> input, // 分组过之后的数据 [1,1,1,1,1]Collector<String> out  // 用于输出的对象) throws Exception {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long start = window.getStart();long end = window.getEnd();int sum = 0;// 专门存放迟到的订单时间for (OrderInfo orderInfo : input) {sum += orderInfo.getMoney();}out.collect(key + ",窗口开始:" + dateFormat.format(new Date(start)) + ",结束时间:" + dateFormat.format(new Date(end)) + "," + sum);//out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);}});result.print("流中的数据,包含迟到的数据:");result.getSideOutput(outputTag).print("严重迟到的数据:");//4. sink-数据输出//5. execute-执行env.execute();}
}
订单产生的时间:2024-05-16 11:19:00,金额:1
订单产生的时间:2024-05-16 11:18:13,金额:3
严重迟到的数据:> f96d34c438ce400eb21a25328fe772ee,1,3,2024-05-16 11:18:13
订单产生的时间:2024-05-16 11:19:10,金额:3
2024-05-16 11:19:00
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:00,结束时间:2024-05-16 11:19:05,1,迟到的订单时间:
订单产生的时间:2024-05-16 11:18:19,金额:8
严重迟到的数据:> cf85339600c647c99856841021466c5d,1,8,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:18:19,金额:9
严重迟到的数据:> f087ee9c9eaa4e3f9eac06a907b47fb4,1,9,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:17:48,金额:3
严重迟到的数据:> 22f48bb693874a99b80177f6764f0912,0,3,2024-05-16 11:17:48
订单产生的时间:2024-05-16 11:19:23,金额:1
2024-05-16 11:19:10
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:10,结束时间:2024-05-16 11:19:15,3,迟到的订单时间:
订单产生的时间:2024-05-16 11:19:19,金额:7
2024-05-16 11:19:19
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:15,结束时间:2024-05-16 11:19:20,7,迟到的订单时间:
订单产生的时间:2024-05-16 11:18:25,金额:2
严重迟到的数据:> 9b91cfd413784e4da9fb7f16b68186d0,0,2,2024-05-16 11:18:25
订单产生的时间:2024-05-16 11:18:48,金额:3
严重迟到的数据:> 62bdd621dd4d43b2b9f401949c1c5686,1,3,2024-05-16 11:18:48
订单产生的时间:2024-05-16 11:18:38,金额:1
严重迟到的数据:> e6fe834c88d043ef94b66853ad67dc46,1,1,2024-05-16 11:18:38
订单产生的时间:2024-05-16 11:18:39,金额:5
严重迟到的数据:> 472b35fc32744c39990d13bd490ae131,1,5,2024-05-16 11:18:39
订单产生的时间:2024-05-16 11:18:19,金额:0
严重迟到的数据:> 49270e8cf00445f38a4fbcfebedfdc0a,0,0,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:19:07,金额:8
严重迟到的数据:> a337af0e6f1c46e48e2ed2ec99d6dc53,0,8,2024-05-16 11:19:07
订单产生的时间:2024-05-16 11:19:22,金额:8
订单产生的时间:2024-05-16 11:19:01,金额:3
严重迟到的数据:> 5bf845744c9d486f97fbd8106deb22bb,0,3,2024-05-16 11:19:01
订单产生的时间:2024-05-16 11:18:42,金额:2
严重迟到的数据:> f861897e49a54b589863efd6866ce61f,0,2,2024-05-16 11:18:42
订单产生的时间:2024-05-16 11:18:15,金额:7
严重迟到的数据:> 37a8d94dc8b94854963ddcbefa3d00dc,0,7,2024-05-16 11:18:15
订单产生的时间:2024-05-16 11:17:56,金额:6
严重迟到的数据:> ab81cf409c604bb398d43b15f59d7e53,1,6,2024-05-16 11:17:56
订单产生的时间:2024-05-16 11:18:24,金额:3
严重迟到的数据:> f85598988f4b43599e719b873d930440,1,3,2024-05-16 11:18:24
订单产生的时间:2024-05-16 11:17:52,金额:2
严重迟到的数据:> 03ade6f3972d441289bd745f979c961a,1,2,2024-05-16 11:17:52
订单产生的时间:2024-05-16 11:19:20,金额:9
订单产生的时间:2024-05-16 11:18:07,金额:1
严重迟到的数据:> d8b8992ea2b74a5fb10bffb198917c8f,0,1,2024-05-16 11:18:07
订单产生的时间:2024-05-16 11:18:34,金额:2
严重迟到的数据:> a8855d90701449588efc4dfd48df1dd3,0,2,2024-05-16 11:18:34
订单产生的时间:2024-05-16 11:19:11,金额:3
严重迟到的数据:> a7e245864b2c4c03b463b121277309eb,0,3,2024-05-16 11:19:11
订单产生的时间:2024-05-16 11:18:11,金额:9
严重迟到的数据:> cb99f1b5c1e24e5bb6ccacb9f0f9ea78,0,9,2024-05-16 11:18:11
订单产生的时间:2024-05-16 11:18:36,金额:7Process finished with exit code 130

虽然我们添加了延迟的效果,就是说侧道输出,侧道输出不能触发窗口的执行,窗口的执行只能通过水印时间触发 ,允许迟到的数据,不放入到当前窗口中,而是作为一个触发条件看到,它需要放入到它对应的窗口中。

只考虑 1 个并行度的问题

订单发生的真实事件:窗口5秒,间隔5秒,允许迟到 3秒 最晚允许迟到4秒

10:44:00 第一个区间就应该是10:44:00 10:44:05

10:44:01

10:44:02

10:44:03

10:44:04

10:44:05

10:44:07 第一个区间就应该是10:44:05 10:44:10

10:44:22 第一个区间就应该是10:44:20 10:44:25

10:44:30

10:44:28

10:44:20

通过上面这个图可以知道,44:07没有办法触发00~05的执行,但是07不放入00~05区间,而是放入10:44:05 10:44:10

44:22 一个数据触发了两个区间的执行 00~05 05~10

假如有一个订单时44:10产生的,放入哪个区间?应该放入10~15这个区间


http://www.mrgr.cn/news/78183.html

相关文章:

  • 【MySQL 保姆级教学】用户管理和数据库权限(16)
  • 边缘计算应用十大领域
  • WebRtc03: Web服务器原理和Nodejs搭建
  • 如何利用Java爬虫批量获取商品信息
  • 【数据结构】链表(2):双向链表和双向循环链表
  • el-table 实现纵向多级表头
  • Flink cdc同步增量数据timestamp字段相差八小时(分析|解决)不是粘贴复制的!
  • ESP8266 + DHT11 + OLED0.96温湿度中文显示和MQTT(二)
  • 面试学习准备
  • 学习与理解LabVIEW中多列列表框项名和项首字符串属性
  • 旋转磁体产生的场 - 实验视频资源下载
  • 东方微电嵌入式面试题及参考答案
  • Jir 关联 agit
  • 上下文信息、全局信息、局部信息
  • PostgreSQL外键全解析:从概念到实践的进阶指南
  • 软件团队的共担责任
  • 计算机毕业设计Hadoop+Spark音乐推荐系统 音乐预测系统 音乐可视化大屏 音乐爬虫 HDFS hive数据仓库 机器学习 深度学习 大数据毕业设计
  • vue项目的创建
  • 【Elasticsearch】开启大数据分析的探索与预处理之旅
  • 文件导入-使用java反射修改日期数据
  • SAR ADC系列15:基于Vcm-Base的开关切换策略
  • K8s的水平自动扩容和缩容HPA
  • C++ 优先算法 —— 无重复字符的最长子串(滑动窗口)
  • QT QRadioButton控件 全面详解
  • 数据结构 (12)串的存储实现
  • 大语言模型(LLM)不平衡的内存使用问题;训练过程中 Transformer层1和Transformer层2的反向传播计算量差异