当前位置: 首页 > news >正文

Flink四大基石之窗口(Window)使用详解

目录

一、引言

二、为什么需要 Window

三、Window 的控制属性

窗口的长度(大小)

窗口的间隔

四、Flink 窗口应用代码结构

是否分组

Keyed Window --键控窗

Non-Keyed Window

核心操作流程

五、Window 的生命周期

分配阶段

触发计算

六、Window 的分类

滚动窗口- TumblingWindow概念

滑动窗口– SlidingWindow概念

会话窗口 [了解]

七、Windows Function 窗口函数

分类剖析

增量聚合函数(以 AggregateFunction 为例)

全量聚合函数

八、案例实战

案例一

滚动窗口演示        

滑动窗口演示

热词统计案例

kafka发送消息的模板代码

九、总结


        本文深入探讨 Flink 中高级 API 里窗口(Window)的相关知识,涵盖为什么需要窗口、其控制属性、应用代码结构、生命周期、分类,以及窗口函数的各类细节,并辅以实例进行讲解,旨在助力开发者透彻理解并熟练运用 Flink 的窗口机制处理流数据。

一、引言

        在大数据实时处理领域,Apache Flink 凭借其卓越性能与丰富功能占据重要地位。而窗口(Window)作为 Flink 从流处理(Streaming)到批处理(Batch)的关键桥梁,理解与掌握其使用对高效数据处理意义非凡,接下来将全方位剖析其奥秘。

二、为什么需要 Window

        在流处理场景中,数据如潺潺溪流般持续涌入、无休无止。但诸多业务场景要求我们对特定时段数据做聚合操作,像统计 “过去的 1 分钟内有多少用户点击了我们的网页”。若不划定范围,面对无尽数据洪流,根本无法开展有针对性计算。窗口恰似神奇 “箩筐”,按规则收集一定时长或一定数据量数据,将无限流拆分成有限 “桶”,便于精准计算,满足如 “每隔 10min,计算最近 24h 的热搜词” 这类实时需求。

三、Window 的控制属性

窗口的长度(大小)

        明确要计算最近多久的数据,以时间维度举例,若关注 24 小时内热搜词数据量,那 24 小时即窗口长度;计数维度下,设定统计前 N 条数据,N 就是计数窗口的长度规格。

窗口的间隔

        决定隔多久进行一次计算操作。像 “每隔 10min,计算最近 24h 的热搜词” 里,每隔 10 分钟便是间隔设定,它把控着计算频次节奏。

四、Flink 窗口应用代码结构

是否分组

        首先要判定是否依 Key 对 DataStream 分组,经 keyBy 操作后,数据流成多组,下游算子多实例可并行跑,提效显著;若用 windowAll 则不分组,所有数据送下游单个实例(并行度为 1),后续窗口操作逻辑与分组情形(Keyed Window)类似,仅执行主体有别。

Keyed Window --键控窗

// Keyed Window
stream.keyBy(...)              <-  按照一个Key进行分组.window(...)            <-  将数据流中的元素分配到相应的窗口中[.trigger(...)]            <-  指定触发器Trigger(可选)[.evictor(...)]            <-  指定清除器Evictor(可选).reduce/aggregate/process/apply()      <-  窗口处理函数Window Function

Non-Keyed Window

// Non-Keyed Window
stream.windowAll(...)         <-  不分组,将数据流中的所有元素分配到相应的窗口中[.trigger(...)]            <-  指定触发器Trigger(可选)[.evictor(...)]            <-  指定清除器Evictor(可选).reduce/aggregate/process()      <-  窗口处理函数Window Function

核心操作流程

        借助窗口分配器(WindowAssigner)依时间(Event Time 或 Processing Time)把数据流元素 “分拣” 进对应窗口;待满足触发条件(常是窗口结束时间到等情况),用窗口处理函数(如 reduce、aggregate、process 等常用函数)处理窗口内数据,此外,trigger、evictor 是面向高级自定义需求的触发、销毁附加项,默认配置也能应对常见场景。

五、Window 的生命周期

分配阶段

        窗口分配器依据设定规则(像按时间间隔、计数规则等),为流入数据 “找家”,安置到合适窗口 “桶” 内,确定数据归属,构建基础计算单元。

触发计算

        当预设触发条件达成,如时间窗口到结束点,对应窗口函数 “登场”,对窗口内数据按既定逻辑聚合处理,不同窗口函数(reduce、aggregate、process)处理细节、能力有差异,像 process 更底层、功能更强大,自带 open/close 生命周期方法且能获取 RuntimeContext。

        上图是窗口的生命周期示意图,假如我们设置的是一个10分钟的滚动窗口,第一个窗口的起始时间是0:00,结束时间是0:10,后面以此类推。当数据流中的元素流入后,窗口分配器会根据时间(Event Time或Processing Time)分配给相应的窗口。相应窗口满足了触发条件,比如已经到了窗口的结束时间,会触发相应的Window Function进行计算。注意,本图只是一个大致示意图,不同的Window Function的处理方式略有不同。

 

        从数据类型上来看,一个DataStream经过keyBy转换成KeyedStream,再经过window转换成WindowedStream,我们要在之上进行reduce、aggregate或process等Window Function,对数据进行必要的聚合操作。

六、Window 的分类

Window可以分成两类:

CountWindow按指定数据条数生成窗口,与时间脱钩。

滚动计数窗口:每隔 N 条数据,聚焦统计前 N 条,如每来 10 条统计前 10 条信息。

滑动计数窗口:每隔 N 条数据,统计前 M 条(N≠M),像每过 20 条统计前 15 条情况。

TimeWindow(重点):基于时间划定窗口。

滚动时间窗口:每隔 N 时间,统计前 N 时间范围数据,如每隔 5 分钟统计前 5 分钟车辆通过量,窗口长度与滑动距离均为 5 分钟。

滑动时间窗口:每隔 N 时间,统计前 M 时间范围数据(M≠N),像每隔 30 秒统计前 1 分钟车辆数据,窗口长度 1 分钟、滑动距离 30 秒。

会话窗口:设会话超时时间(如 10 分钟),期间无数据来则结算上一窗口数据,按毫秒精细界定范围,与 Key 值关联紧密,Key 值无新输入达设定时长就统计,不受全局新数据流入干扰。

滚动窗口- TumblingWindow概念

流是连续的,无界的(有明确的开始,无明确的结束

假设有个红绿灯,提出个问题:计算一下通过这个路口的汽车数量

对于这个问题,肯定是无法回答的,为何?

因为,统计是一种对固定数据进行计算的动作。

因为流的数据是源源不断的,无法满足固定数据的要求(因为不知道何时结束)

那么,我们换个问题:统计1分钟内通过的汽车数量

那么,对于这个问题,我们就可以解答了。因为这个问题确定了数据的边界,从无界的流数据中,取出了一部分有边界的数据子集合进行计算。

描述完整就是:每隔1分钟,统计这1分钟内通过汽车的数量。窗口长度是1分钟,时间间隔是1分钟,所以这样的窗口就是滚动窗口。

那么,这个行为或者说这个统计的数据边界,就称之为窗口。

同时,我们的问题,是以时间来划分被处理的数据边界的,那么按照时间划分边界的就称之为:时间窗口

反之,如果换个问题,统计100辆通过的车里面有多少宝马品牌,那么这个边界的划分就是按照数量的,这样的称之为:计数窗口

同时,这样的窗口被称之为滚动窗口,按照窗口划分依据分为:滚动时间窗口、滚动计数窗口。

滑动窗口– SlidingWindow概念

同样是需求,改为:

每隔1分钟,统计前面2分钟内通过的车辆数

对于这个需求我们可以看出,窗口长度是2分钟,每隔1分钟统计一次,窗口长度和时间间隔不相等,并且是大于关系,就是滑动窗口

或者:每通过100辆车,统计前面通过的50辆车的品牌占比

对于这个需求可以看出,窗口长度是50辆车,但是每隔100辆车统计一次

对于这样的窗口,我们称之为滑动窗口。

那么在这里面,统计多少数据是窗口长度(如统计2分钟内的数据,统计50辆车中的数据)

隔多久统计一次称之为滑动距离(如,每隔1分钟,每隔100辆车)

那么可以看出,滑动窗口,就是滑动距离不等于窗口长度的一种窗口

比如,每隔1分钟 统计先前5分钟的数据,窗口长度5分钟,滑动距离1分钟,不相等

比如,每隔100条数据,统计先前50条数据,窗口长度50条,滑动距离100条,不相等

那如果相等呢?相等就是比如:每隔1分钟统计前面1分钟的数据,窗口长度1分钟,滑动距离1分钟,相等。

对于这样的需求可以简化成:每隔1分钟统计一次数据,这就是前面说的滚动窗口

那么,我们可以看出:

滚动窗口:窗口长度= 滑动距离

滑动窗口:窗口长度!= 滑动距离

总结:其中可以发现,对于滑动窗口:

滑动距离> 窗口长度,会漏掉数据,比如:每隔5分钟,统计前面1分钟的数据(滑动距离5分钟,窗口长度1分钟,漏掉4分钟的数据)这样的东西,没人用。

滑动距离< 窗口长度,会重复处理数据,比如:每隔1分钟,统计前面5分钟的数据(滑动距离1分钟,窗口长度5分钟,重复处理4分钟的数据)

滑动距离= 窗口长度,不漏也不会重复,也就是滚动窗口

窗口的长度(大小) > 窗口的间隔 : 如每隔5s, 计算最近10s的数据 【滑动窗口】

窗口的长度(大小) = 窗口的间隔: 如每隔10s,计算最近10s的数据 【滚动窗口】

窗口的长度(大小) < 窗口的间隔: 每隔15s,计算最近10s的数据 【没有名字,不用】

会话窗口 [了解]

Session 会话,一次会话。就是谈话。

设置一个会话超时时间间隔即可, 如10分钟,那么表示:

如果10分钟没有数据到来, 就计算上一个窗口的数据

代码中,并行度设置为1,测试比较 方便。

窗口的范围:

窗口的判断是按照毫秒为单位

如果窗口长度是5秒

窗口的开始: start

窗口的结束: start + 窗口长度 -1 毫秒

比如窗口长度是5秒, 从0开始

那么窗口结束是: 0 + 5000 -1 = 4999

七、Windows Function 窗口函数

分类剖析

全量函数:耐心缓存窗口所有元素,直至触发条件成熟,才对全量数据 “开刀” 计算,此特性可满足数据排序等复杂需求。

增量函数:保存中间数据 “蓝本”,新元素流入就与之融合更新,持续迭代中间成果,高效且灵活。

增量聚合函数(以 AggregateFunction 为例)

        每有新数据 “入局”,立马按规则计算,其接口含输入类型(IN)、累加器类型(ACC)、输出类型(OUT)参数,有对应 add、createAccumulator、merge、extractOutput 等方法,构建严谨聚合流程。

实现方法(常见的增量聚合函数如下):
reduce(reduceFunction)
aggregate(aggregateFunction)
sum()
min()
max()

reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 <T>
maxBy、minBy、sum这3个底层都是由reduce实现的
aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R>

AggregateFunction 【了解】

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)

输入类型是输入流中的元素类型,AggregateFunction有一个add方法可以将一个输入元素添加到一个累加器中。该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。

package com.bigdata.windows;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _04_AggDemo {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L)};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);KeyedStream<Tuple3<String,String,Long>, String> keyedStream = dataStreamSource.keyBy(new KeySelector<Tuple3<String,String,Long>, String>() {@Overridepublic String getKey(Tuple3<String,String,Long> tuple3) throws Exception {return tuple3.f0;}});//3. transformation-数据处理转换// 三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)keyedStream.countWindow(3).aggregate(new AggregateFunction<Tuple3<String,String,Long>, Tuple3<String,Long,Integer>, Tuple2<String,Double>>() {// 初始化一个中间变量Tuple3<String,Long,Integer> tuple3 = Tuple3.of(null,0L,0);@Overridepublic Tuple3<String,Long,Integer> createAccumulator() {return tuple3;}@Overridepublic Tuple3<String,Long,Integer> add(Tuple3<String, String, Long> value, Tuple3<String,Long,Integer> accumulator) {long tempScore = value.f2 + accumulator.f1;int length = accumulator.f2 + 1;return Tuple3.of(value.f0, tempScore,length);}@Overridepublic Tuple2<String, Double> getResult( Tuple3<String,Long,Integer> accumulator) {return Tuple2.of(accumulator.f0,(double) accumulator.f1 / accumulator.f2);}@Overridepublic Tuple3<String, Long, Integer> merge(Tuple3<String, Long, Integer> a, Tuple3<String, Long, Integer> b) {return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

全量聚合函数

        坚守等窗口数据集齐 “发令枪响” 才运算原则,确保计算基于完整数据集,保障结果准确性、完整性,契合多场景聚合诉求。

实现方法
apply(windowFunction)
process(processWindowFunction)

全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。
ProcessWindowFunction一次性迭代整个窗口里的所有元素,比较重要的一个对象是Context,可以获取到事件和状态信息,这样我们就可以实现更加灵活的控制,该算子会浪费很多性能,主要原因是不增量计算,要缓存整个窗口然后再去处理,所以要设计好内存。

package com.bigdata.day04;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;public class Demo03 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L)};// 先求每个班级的总分数,再求每个班级的总人数DataStreamSource<Tuple3<String,String,Long>> streamSource = env.fromElements(ENGLISH);KeyedStream<Tuple3<String, String, Long>, String> keyedStream = streamSource.keyBy(v -> v.f0);// 每个分区中的数据都达到了3条才能触发,哪个分区达到了三条,哪个就触发,不够的不计算// //Tuple3<String, String, Long> 输入类型//    //Tuple2<Long, Long> 累加器ACC类型,保存中间状态 第一个值代表总成绩,第二个值代表总人数//    //Double 输出类型// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象keyedStream.countWindow(3).apply(new WindowFunction<Tuple3<String, String, Long>, Double, String, GlobalWindow>() {@Overridepublic void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, Long>> input, Collector<Double> out) throws Exception {// 计算总成绩,计算总人数int sumScore = 0,sumPerson=0;for (Tuple3<String, String, Long> tuple3 : input) {sumScore += tuple3.f2;sumPerson += 1;}out.collect((double)sumScore/sumPerson);}}).print();//5. execute-执行env.execute();}
}

八、案例实战

案例一

 需求为 “每 5 秒钟统计一次,最近 5 秒钟内,各个路口通过红绿灯汽车的数量”,借 Flink 代码实现,底层算法作用下,数据按节奏聚合统计,时间设 1 分钟更易观察效果,能清晰看到各时段车辆数统计产出。

nc -lk 9999
有如下数据表示:
信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4

没有添加窗口的写法:

package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo07 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);// 9,2 --> (9,2)//3. transformation-数据处理转换streamSource.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(String line) throws Exception {String[] arr = line.split(",");int monitor_id = Integer.valueOf(arr[0]);int num = Integer.valueOf(arr[1]);return Tuple2.of(monitor_id,num);}}).keyBy(tuple->tuple.f0).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

此处的sum求和,中count ,其实是CartInfo中的一个字段而已。

演示:

滚动窗口演示
        

package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Demo08 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);// 9,2 --> (9,2)//3. transformation-数据处理转换streamSource.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(String line) throws Exception {String[] arr = line.split(",");int monitor_id = Integer.valueOf(arr[0]);int num = Integer.valueOf(arr[1]);return Tuple2.of(monitor_id,num);}}).keyBy(tuple->tuple.f0).window(TumblingProcessingTimeWindows.of(Time.minutes(1))).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

        以上代码的时间最好修改为1分钟,假如时间间隔是1分钟,那么48分03秒时输入的信号灯数据,49分整点会统计出来结果,原因是底层有一个算法。

        滑动窗口的话,不太容易看到效果,因为有些数据被算到了多个窗口中,需要我们拿笔自己计算一下,对比一下:

滑动窗口演示

        同样统计各路口汽车数量,但需求改为 “每 5 秒钟统计一次,最近 10 秒钟内”,因数据会在多窗口重复计算,需手动比对梳理,深入体会滑动窗口数据处理逻辑与特点。

package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Demo09 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);// 9,2 --> (9,2)//3. transformation-数据处理转换streamSource.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(String line) throws Exception {String[] arr = line.split(",");int monitor_id = Integer.valueOf(arr[0]);int num = Integer.valueOf(arr[1]);return Tuple2.of(monitor_id,num);}}).keyBy(tuple->tuple.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

热词统计案例

        借助 Kafka 随机发送 50000 个热词(200 毫秒间隔),分别基于滚动、滑动窗口统计,编写 Flink 代码时着重体会 apply 方法,兼顾二者效果差异,同时知晓工作中 process 函数因更强大底层能力常成首选。

apply和process都是处理全量计算,但工作中正常用process。

process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。

package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;public class Demo10 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g2");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("flink-01",new SimpleStringSchema(),properties);DataStreamSource<String> kafkaSource = env.addSource(kafkaConsumer);//3. transformation-数据处理转换kafkaSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value,1);}}).keyBy(tuple->tuple.f0)//.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 代表分组key值     五旬老太守国门TimeWindow window, // 代表窗口对象Iterable<Tuple2<String, Integer>> input, // 分组过之后的数据 [1,1,1,1,1]Collector<String> out  // 用于输出的对象) throws Exception {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long start = window.getStart();long end = window.getEnd();int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum += tuple2.f1;}out.collect(key+",窗口开始:"+dateFormat.format(new Date(start))+",结束时间:"+dateFormat.format(new Date(end))+","+sum);//out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

kafka发送消息的模板代码

package com.bigdata.day03.time;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class CustomProducer {public static void main(String[] args) {// Properties 它是map的一种Properties properties = new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 创建了一个消息生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 调用这个里面的send方法String[] hotWords= new String[]{"郭有才","歌手2024","五旬老太守国门","师夷长技以制夷"};Random random = new Random();for (int i = 0; i < 50000; i++) {String word = hotWords[random.nextInt(4)];ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("flink-01",word);kafkaProducer.send(producerRecord);}kafkaProducer.close();}
}

九、总结

        Flink 窗口机制犹如精密仪器,从控制属性、分类设计到函数运用,各环节紧密相扣。深入理解其原理、熟练实操代码,能为实时流数据处理注入强大动力,解锁更多高效、智能数据聚合分析场景,助力开发者在大数据浪潮中稳立潮头、驾驭数据。后续可深入探索自定义窗口逻辑、优化性能调优等进阶方向,深挖 Flink 窗口潜力。


http://www.mrgr.cn/news/78513.html

相关文章:

  • Windows 11 搭建 Docker 桌面版详细教程
  • linux系统中常用文件日常使用命令记录
  • 架构第十三章:zabbix-3
  • Android 手写签名板
  • 微信小程序按字母顺序渲染城市 功能实现详细讲解
  • 【大数据学习 | Spark-Core】广播变量和累加器
  • 群控系统服务端开发模式-应用开发-邮箱配置功能开发
  • 全链接神经网络拟合函数
  • 【halcon】Metrology工具系列之get_metrology_object_result_contour
  • 数据类型.
  • 【CSS】clip-path 属性(剪裁显示区域)
  • 【FPGA】UART串口通信
  • 常见的Web安全漏洞——XSS
  • LayaBox1.8.4实现战争迷雾效果
  • 数据结构与算法——1125—面试题位运算
  • redis的主从复制
  • 【通用】操作系统 知识总结:IPC方式 / 进程线程 / 死锁 / 虚拟内存 / 段页存储
  • Oracle对比表与表之间的结构
  • 20241128解决Ubuntu20.04安装libwxgtk3.0-dev异常的问题
  • linux内核面试题精选及参考答案
  • 探讨播客的生态系统
  • 零基础快速掌握——C语言基础【数据类型】【运算符】
  • python array矩阵相关操作
  • 《操作系统 - 清华大学》6 -1:局部页面置换算法:最优页面置换算法
  • 针对Qwen-Agent框架的Function Call及ReAct的源码阅读与解析:Agent基类篇
  • Robot Framework框架中常用的变量