博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink window之四大WindowAssigner
阅读量:3959 次
发布时间:2019-05-24

本文共 4548 字,大约阅读时间需要 15 分钟。

Windows是流计算的核心。Windows将流分成有限大小的“buckets”,我们可以在其上应用聚合计算(ProcessWindowFunctionReduceFunctionAggregateFunctionFoldFunction)等。在Flink中编写一个窗口计算的基本结构如下:

Keyed Windows

stream    .keyBy(...)                    .window(...)               <-  必须制定: 窗口类型    [.trigger(...)]            <-  可选: "trigger" (都有默认 触发器),决定窗口什么时候触发    [.evictor(...)]            <-  可选: "evictor" (默认 没有剔出),剔出窗口中的元素    [.allowedLateness(...)]    <-  可选: "lateness" (默认 0),不允许又迟到的数据    [.sideOutputLateData(...)] <-  可选: "output tag" 将迟到的数据输出到 指定流中    .reduce/aggregate/fold/apply()  <-  必须指定: "function",实现对窗口数据的聚合计算    [.getSideOutput(...)]      <-  可选: "output tag" 获取Sideout的数据,一般处理迟到数据

Non-Keyed Windows

stream    .windowAll(...)            <-  必须制定: 窗口类型    [.trigger(...)]            <-  可选: "trigger" (都有默认 触发器),决定窗口什么时候触发    [.evictor(...)]            <-  可选: "evictor" (默认 没有剔出),剔出窗口中的元素    [.allowedLateness(...)]    <-  可选: "lateness" (默认 0),不允许又迟到的数据    [.sideOutputLateData(...)] <-  可选: "output tag" 将迟到的数据输出到 指定流中    .reduce/aggregate/fold/apply()  <-  必须指定: "function",实现对窗口数据的聚合计算    [.getSideOutput(...)]      <-  可选: "output tag" 获取Sideout的数据,一般处理迟到数据

一旦应属于该窗口的第一个元素到达,就会创建窗口,并且当时间(Event/Process Time)超过其结束时间戳时,会完全删除该窗口, Flink保证只删除基于时间的窗口,而不能删除其他类型的窗口,如全局窗口。

此外,每个窗口都会有一个触发器(Trigger)和一个函数(“ ProcessWindowFunction”,“ ReduceFunction”,“ AggregateFunction”或“ FoldFunction)附加到它。该函数将包含要应用于窗口内容的计算,而Trigger则指定条件,在该条件下,该窗口被视为可以应用该函数。

除上述内容外,您还可以指定一个“ Evictor”将能够在触发触发器之后以及应用此功能之前和/或之后从窗口中删除元素。

本文主要介绍flink的Window Assigners(窗口分配器)

Window Assigners(窗口分配器)

窗口分配器定义了如何将元素分配给窗口。 这是通过在 window(…)(对于 keyed streams)或 windowAll()(对于 noneded streams)调用中指定您选择的WindowAssigner来完成的。

WindowAssigner负责将每个传入的元素分配给一个或多个窗口。 Flink带有针对最常见用例的预定义窗口分配器,即滚动窗口滑动窗口会话窗口全局窗口,接下博主会对每个窗口分享案例,本文基于ProcessTime处理,后续会分享基于Event Time案例。

(1)Tumbling Windows

滚动窗口长度固定,滑动间隔等于窗口长度,窗口元素之间没有交叠。

在这里插入图片描述

var env=StreamExecutionEnvironment.getExecutionEnvironment	env.socketTextStream("centos",9999)		.flatMap(_.split("\\s+"))		.map((_,1))		.keyBy(0)		.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))		.reduce((v1,v2)=>(v1._1,v1._2+v2._2))		.print()env.execute("window")

(2) Sliding Windows

滑动窗口长度固定,窗口长度大于窗口滑动间隔,元素存在交叠。

在这里插入图片描述

var env=StreamExecutionEnvironment.getExecutionEnvironment	env.socketTextStream("centos",9999)	.flatMap(_.split("\\s+"))	.map((_,1))	.keyBy(_._1)	.window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))	.process(new ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {
val sdf = new SimpleDateFormat("HH:mm:ss") val window = context.window println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)) for(e <- elements){
print(e+"\t") } println() } })env.execute("window")

(3) Session Windows(MergerWindow)

通过计算元素时间间隔,如果间隔小于session gap,则会合并到一个窗口中;如果大于时间间隔,当前窗口关闭,后续的元素属于新的窗口。与滚动窗口和滑动窗口不同的是会话窗口没有固定的窗口大小,底层本质上做的是窗口合并。

在这里插入图片描述

var env=StreamExecutionEnvironment.getExecutionEnvironmentenv.socketTextStream("centos",9999)	.flatMap(_.split("\\s+"))	.map((_,1))	.keyBy(_._1)	.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))	.apply(new WindowFunction[(String,Int),String,String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
val sdf = new SimpleDateFormat("HH:mm:ss") println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)) for(e<- input){
print(e+"\t") } println() } })env.execute("window")

(4) Global Windows

全局窗口会将所有key相同的元素放到一个窗口中,默认该窗口永远都不会关闭(永远都不会触发),因为该窗口没有默认的窗口触发器Trigger,因此需要用户自定义Trigger。

注意:global window与前面的三个窗口不同,前面的三个窗口都是基于时间的,而global window是基于计数的。

var env=StreamExecutionEnvironment.getExecutionEnvironmentenv.socketTextStream("centos",9999)	.flatMap(_.split("\\s+"))	.map((_,1))	.keyBy(_._1)	.window(GlobalWindows.create())	.trigger(CountTrigger.of[GlobalWindow](3))	.apply(new WindowFunction[(String,Int),String,String,GlobalWindow]{
override def apply(key: String, window: GlobalWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
println("=======window========") for(e<- input){
print(e+"\t") } println() } })env.execute("window")

好了,关于flink的窗口WindowAssigner理解起来还是比较简单的,在这里博主做了一个简单的介绍,在接下的博文中,博主会分享flink的四大Window Functions,喜欢的点击这里吧!

转载地址:http://mimzi.baihongyu.com/

你可能感兴趣的文章
记录1年免费亚马逊AWS云服务器申请方法过程及使用技巧
查看>>
golang文章
查看>>
一些特殊的符号
查看>>
shell脚本的exit问题(退出脚本还是退出终端)
查看>>
linux export命令参数及用法详解--linux设置环境变量命令
查看>>
Shell单引号,双引号,反引号,反斜杠
查看>>
Qt中内存泄露和退出崩溃的问题
查看>>
常见颜色
查看>>
Source Insight 经典教程
查看>>
快速打开菜单附件中的工具
查看>>
Windows系统进程间通信
查看>>
linux exec的用法
查看>>
C语言中如何使用宏
查看>>
Http与RPC通信协议的比较
查看>>
Source Insight的对齐问题
查看>>
ubuntu设置开机默认进入字符界面方法
查看>>
chrome 快捷键
查看>>
Linux下buffer和cache的区别
查看>>
程序员不应该再犯的五大编程错误
查看>>
utf8中文编码范围
查看>>