Flink之window & watermark

背景

Flink中的时间概念

Flink在流处理程序支持不同的时间概念。分别为Event Time/Processing Time/Ingestion Time,也就是事件时间、处理时间、提取时间。

从时间序列角度来说,发生的先后顺序是:

1
事件时间(Event Time)----> 提取时间(Ingestion Time)----> 处理时间(Processing Time)
  • Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述。
  • Ingestion Time 是数据进入Apache Flink流处理系统的时间,也就是Flink读取数据源时间。
  • Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是Flink程序处理该事件时当前系统时间。

mU8ra5

处理时间

是数据流入到具体某个算子时候相应的系统时间。

这个系统时间指的是执行相应操作的机器的系统时间。当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。

提取时间

提取时间在概念上位于事件时间和处理时间之间。IngestionTime是数据进入Apache Flink框架的时间,提取时间在概念上位于事件时间和处理时间之间。

事件时间

事件时间就是事件在真实世界的发生时间,即每个事件在产生它的设备上发生的时间(当地时间)。比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间。在进入Apache Flink框架之前EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。
基于事件时间处理的强大之处在于即使在乱序事件,延迟事件,历史数据以及从备份或持久化日志中的重复数据也能获得正确的结果。对于事件时间,时间的进度取决于数据,而不是任何时钟。

事件时间程序必须指定如何生成事件时间的Watermarks,这是表示事件时间进度的机制。

现在假设我们正在创建一个排序的数据流。这意味着应用程序处理流中的乱序到达的事件,并生成同样事件但按时间戳(事件时间)排序的新数据流。

比如:

1
2
3
有1~10个事件。
乱序到达的序列是:1,2,4,5,6,3,8,9,10,7
经过按 事件时间 处理后的序列是:1,2,3,4,5,6,7,8,9,10

为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每条数据都需要分配其事件时间戳。这通常通过提取每条数据中的固定字段来完成时间戳的获取。

watermark(水位线)

Watermark是一种告诉Flink一个消息延迟多少的方式。它定义了什么时候不再等待更早的数据。

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件或延迟数据的,这通常用watermark机制结合window来实现(Watermarks用来触发window窗口计算)。

比如对于late element,我们不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。 可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。

简而言之,只要属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。

例如:

1
2
3
4
5
使用基于事件时间的窗口策略,每5分钟创建一个不重叠(或翻滚)的窗口并允许延迟1分钟。

假定目前是12:00。

当具有落入该间隔的时间戳的第一个元素到达时,Flink将为12:00到12:05之间的间隔创建一个新窗口,当水位线(watermark)到12:06时间戳时将删除它。

window(窗口)

  • 滚动窗口(Tumbling Windows)
    滚动窗口分配器将每个元素分配给固定窗口大小的窗口。滚动窗口大小固定的并且不重叠。例如,如果指定大小为5分钟的滚动窗口,则将执行当前窗口,并且每五分钟将启动一个新窗口。

  • 滑动窗口(Sliding Windows)
    滑动窗口与滚动窗口的区别就是滑动窗口有重复的计算部分。
    例如,你可以使用窗口大小为10分钟的窗口,滑动大小为5分钟。这样,每5分钟会生成一个窗口,包含最后10分钟内到达的事件。

  • 会话窗口(Session Window)
    会话窗口分配器通过活动会话分组元素。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭。
    例如,不活动的间隙时。会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defines how long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。

  • 全局窗口(Global Window)

如何处理watermark & window 的触发

对于out-of-order(乱序)及正常的数据而言

  • watermark的时间戳 > = window endTime
  • 在 [window_start_time,window_end_time] 中有数据存在。

对于late element太多的数据而言

  • Event Time > watermark的时间戳

看看如何触发窗口
我们明白了窗口的触发机制,这里我们添加了水位线,到底是个怎么个情况?我们来看下面
假如我们设置10s的时间窗口(window),那么0-10s,10-20s都是一个窗口,以0~10s为例,0为start-time,10为end-time。
假如有4个数据的event-time分别是8(A),12.5(B),9(C),13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒

1
2
3
4
当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算
当B到达的时候,Watermarks为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算

触发计算的时候,会将A,C(因为他们都小于10)都计算进去,其中C是迟到的。
max这个很关键,就是当前窗口内,所有事件的最大事件。
这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算。假设加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。
从这里上面E的丢失说明,水位线也不是万能的,但是如果根据我们自己的生产经验+侧道输出等方案,可以做到数据不丢失。

时间语义示例

1.12之前的语义(已deprecated)

1.12以前,flink默认以processing time作为默认的时间语义,可以在env上设置所想要的时间语义;

1
2
3
4
5
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

1.12以后的语义

1.12及以后,flink默认以上述的event time 作为默认的时间语义(已deprecated)
再需要指定时间语义的相关操作(如时间窗口)时,可以通过显式的api来使用特定的时间语义

  • 滚动窗口
    1
    2
    keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
    keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  • 滑动窗口
    1
    2
    keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)))
    keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)))

如果要禁用event time机制,则可以通过设置watermark生成评率间隔来实现

1
ExecutionConfig.setAutoWatermarkinterval(long);

watermark生成策略

1.12之前(已过期)

  • AssignerWithPeriodicWatermarks 周期性生成watermark
  • AssignerWithPunctuatedWatermarks 按指定标记性时间生成watermark

1.12之后,新版api内置的watermark策略

  • 单调递增的watermark生成策略 (完全不容忍乱序)
    czSNzw
    WatermarkStrategy.forMonotonousTimestamps();
  • 允许乱序的watermark生成策略
    W2gSLW
    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
  • 自定义watermark生成策略
    WatermarkStrategy.forGenerator(new WatermarkGenerator(){...})

watermark策略的代码模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 1,e01,16876768678,pg01
DataStreamSource<String> s1 = env.socketTextStream( hostname: "localhost",port: 9999) ;

//给上面的source 算子,添加watermark生成策略

//策略1: WatermarkStrategy.nowatermarks() 不生成watermark,禁用了事件时间的推进机制
//策略2: WatermarkStrategy. forMonotonousTimestamps() 有序,紧跟最大事件事件
//策略3: WatermarkStrategy.forBoundedOutOfOrderness() 无序
//策略4: WatermarkStrategy.forGenerator() 自定义

s1.assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner((element, timestamp) -> Long.parseLong(element.split(regex:",")[2]));
)

窗口的代码模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 一、各种全局窗口开窗api
*/

// 全局 计数滚动窗口
beanStream.countWindowAll(10) // 10条数据一个窗口
.apply(new AllWindowFunction<EventBean2, String, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<EventBean2> values, Collector<String> out) throws Exception {

}
});


// 全局 计数滑动窗口
beanStream.countWindowAll(10, 2); // 窗口长度为10条数据,滑动步长为2条数据
/*.apply()*/


// 全局 事件时间滚动窗口
beanStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) // 窗口长度为30s的滚动窗口
.apply(new AllWindowFunction<EventBean2, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<EventBean2> values, Collector<String> out) throws Exception {

}
});


// 全局 事件时间滑动窗口
beanStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))); // 窗口长度:30s,滑动步长:10s
/*.apply()*/

// 全局 事件时间会话窗口
beanStream.windowAll(EventTimeSessionWindows.withGap(Time.seconds(30))); // 前后两个事件的间隙超过30s就划分窗口
/*.apply()*/

// 全局 处理时间滚动窗口
beanStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30)));


// 全局 处理时间滑动窗口
beanStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));

// 全局 处理间会话窗口
beanStream.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(30)));


/**
* 二、各种Keyed窗口开窗api
*/

KeyedStream<EventBean2, Long> keyedStream = beanStream.keyBy(EventBean2::getGuid);

// Keyed 计数滚动窗口
keyedStream.countWindow(10);


// Keyed 计数滑动窗口
keyedStream.countWindow(10, 2);


// Keyed 事件时间滚动窗口
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(30)));


// Keyed 事件时间滑动窗口
keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)));


// Keyed 事件时间会话窗口
keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(30)));


// Keyed 处理时间滚动窗口
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));


// Keyed 处理时间滑动窗口
keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));


// Keyed 处理时间会话窗口
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)));