本文共 4548 字,大约阅读时间需要 15 分钟。
Windows是流计算的核心。Windows将流分成有限大小的“buckets”,我们可以在其上应用聚合计算(ProcessWindowFunction
,ReduceFunction
,AggregateFunction
或FoldFunction
)等。在Flink中编写一个窗口计算的基本结构如下:
Keyed Windows
stream .keyBy(...) .window(...) <- 必须制定: 窗口类型 [.trigger(...)] <- 可选: "trigger" (都有默认 触发器),决定窗口什么时候触发 [.evictor(...)] <- 可选: "evictor" (默认 没有剔出),剔出窗口中的元素 [.allowedLateness(...)] <- 可选: "lateness" (默认 0),不允许又迟到的数据 [.sideOutputLateData(...)] <- 可选: "output tag" 将迟到的数据输出到 指定流中 .reduce/aggregate/fold/apply() <- 必须指定: "function",实现对窗口数据的聚合计算 [.getSideOutput(...)] <- 可选: "output tag" 获取Sideout的数据,一般处理迟到数据
Non-Keyed Windows
stream .windowAll(...) <- 必须制定: 窗口类型 [.trigger(...)] <- 可选: "trigger" (都有默认 触发器),决定窗口什么时候触发 [.evictor(...)] <- 可选: "evictor" (默认 没有剔出),剔出窗口中的元素 [.allowedLateness(...)] <- 可选: "lateness" (默认 0),不允许又迟到的数据 [.sideOutputLateData(...)] <- 可选: "output tag" 将迟到的数据输出到 指定流中 .reduce/aggregate/fold/apply() <- 必须指定: "function",实现对窗口数据的聚合计算 [.getSideOutput(...)] <- 可选: "output tag" 获取Sideout的数据,一般处理迟到数据
一旦应属于该窗口的第一个元素到达,就会创建窗口,并且当时间(Event/Process Time)超过其结束时间戳时,会完全删除该窗口, Flink保证只删除基于时间的窗口,而不能删除其他类型的窗口,如全局窗口。
此外,每个窗口都会有一个触发器(Trigger)和一个函数(“ ProcessWindowFunction”,“ ReduceFunction”,“ AggregateFunction”或“ FoldFunction)附加到它。该函数将包含要应用于窗口内容的计算,而Trigger则指定条件,在该条件下,该窗口被视为可以应用该函数。
除上述内容外,您还可以指定一个“ Evictor”将能够在触发触发器之后以及应用此功能之前和/或之后从窗口中删除元素。
本文主要介绍flink的Window Assigners(窗口分配器)
窗口分配器定义了如何将元素分配给窗口。 这是通过在 window(…)(对于 keyed streams)或 windowAll()(对于 noneded streams)调用中指定您选择的WindowAssigner来完成的。
WindowAssigner负责将每个传入的元素分配给一个或多个窗口。 Flink带有针对最常见用例的预定义窗口分配器,即滚动窗口,滑动窗口,会话窗口和全局窗口,接下博主会对每个窗口分享案例,本文基于ProcessTime处理,后续会分享基于Event Time案例。
滚动窗口长度固定,滑动间隔等于窗口长度,窗口元素之间没有交叠。
var env=StreamExecutionEnvironment.getExecutionEnvironment env.socketTextStream("centos",9999) .flatMap(_.split("\\s+")) .map((_,1)) .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((v1,v2)=>(v1._1,v1._2+v2._2)) .print()env.execute("window")
滑动窗口长度固定,窗口长度大于窗口滑动间隔,元素存在交叠。
var env=StreamExecutionEnvironment.getExecutionEnvironment env.socketTextStream("centos",9999) .flatMap(_.split("\\s+")) .map((_,1)) .keyBy(_._1) .window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2))) .process(new ProcessWindowFunction[(String,Int),String,String,TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = { val sdf = new SimpleDateFormat("HH:mm:ss") val window = context.window println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)) for(e <- elements){ print(e+"\t") } println() } })env.execute("window")
通过计算元素时间间隔,如果间隔小于session gap,则会合并到一个窗口中;如果大于时间间隔,当前窗口关闭,后续的元素属于新的窗口。与滚动窗口和滑动窗口不同的是会话窗口没有固定的窗口大小,底层本质上做的是窗口合并。
var env=StreamExecutionEnvironment.getExecutionEnvironmentenv.socketTextStream("centos",9999) .flatMap(_.split("\\s+")) .map((_,1)) .keyBy(_._1) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) .apply(new WindowFunction[(String,Int),String,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = { val sdf = new SimpleDateFormat("HH:mm:ss") println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)) for(e<- input){ print(e+"\t") } println() } })env.execute("window")
全局窗口会将所有key相同的元素放到一个窗口中,默认该窗口永远都不会关闭(永远都不会触发),因为该窗口没有默认的窗口触发器Trigger,因此需要用户自定义Trigger。
注意:global window与前面的三个窗口不同,前面的三个窗口都是基于时间的,而global window是基于计数的。
var env=StreamExecutionEnvironment.getExecutionEnvironmentenv.socketTextStream("centos",9999) .flatMap(_.split("\\s+")) .map((_,1)) .keyBy(_._1) .window(GlobalWindows.create()) .trigger(CountTrigger.of[GlobalWindow](3)) .apply(new WindowFunction[(String,Int),String,String,GlobalWindow]{ override def apply(key: String, window: GlobalWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = { println("=======window========") for(e<- input){ print(e+"\t") } println() } })env.execute("window")
好了,关于flink的窗口WindowAssigner理解起来还是比较简单的,在这里博主做了一个简单的介绍,在接下的博文中,博主会分享flink的四大Window Functions,喜欢的点击这里吧!
转载地址:http://mimzi.baihongyu.com/