Flink窗口之增量(滚动)聚合算子与全量聚合算子

  • 增量(滚动)聚合算子:如min、max、minBy、maxBy、sum、reduce、aggregate
    一次取一条数据,用聚合函数对中间累加器更新;窗口触发时,取累加器输出结果;
    优点:实时性提升,性能比较好,数据一进入窗口就计算,仅仅缓存计算中间值
    缺点:窗口数据无法排序,无法获取窗口信息
  • 全量聚合蒜子:如apply、process
    数据”攒”在状态容器中,窗口触发时,把整个窗口的数据交给聚合函数
    优点:对窗口中所有数据排序、获取窗口信息,比如时间窗口,获取窗口开始时间和结束时间
    缺点:窗口数据量较大时,数据可能很多,一起处理,比较耗时

pq0peM

增量聚合示例

简单聚合算子

1
2
3
4
5
6
keyedStream.countWindow(5,2)
// .max('score') // 得到的结果中,除了score是符合逻辑的结果外,其他字段是窗口中的第一条的值
// .min('score')
// .maxBy('score') // 得到结果是:最大score所在的那一行数据
// .minBy('score') // 得到结果是:最小score所在的那一行数据
// .sum('score') // 得到的结果中,除了score是符合逻辑(score之和)的结果外,其他字段是不可预料的,一直在更新

reduce 聚合算子

1
2
3
4
5
6
7
source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<long>(){
@override
public Long reduce(Long value1,Long value2) throws exception{
return null
}
})

aggregate 聚合算子

1
2
3
4
watermarkedBeanStream
.keyBy(EventBean::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.aggregate(new AggregateFunction......)

全量聚合示例

apply 聚合算子

1
2
3
4
watermarkedBeanStream
.keyBy(EventBean::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.appply(new WindowFunction...)

process 聚合算子(richFunction)

1
2
3
4
watermarkedBeanStream
.keyBy(EventBean::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.process(new WindowFunction...)

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
package cn.doitedu.flink.java.demos;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.*;

/**
* @Author: deep as the sea
* @Site: <a href="www.51doit.com">多易教育</a>
* @QQ: 657270652
* @Date: 2022/5/2
* @Desc:
*
* 测试数据 :
* 1,e01,10000,p01,10
* 1,e02,11000,p02,20
* 1,e02,12000,p03,40
* 1,e03,20000,p02,10
* 1,e01,21000,p03,50
* 1,e04,22000,p04,10
* 1,e06,28000,p05,60
* 1,e07,30000,p02,10
**/
public class _20_Window_Api_Demo1 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

// 1,e01,3000,pg02
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

SingleOutputStreamOperator<EventBean2> beanStream = source.map(s -> {
String[] split = s.split(",");
return new EventBean2(Long.parseLong(split[0]), split[1], Long.parseLong(split[2]), split[3], Integer.parseInt(split[4]));
}).returns(EventBean2.class);


// 分配 watermark ,以推进事件时间
SingleOutputStreamOperator<EventBean2> watermarkedBeanStream = beanStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<EventBean2>forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner(new SerializableTimestampAssigner<EventBean2>() {
@Override
public long extractTimestamp(EventBean2 eventBean, long recordTimestamp) {
return eventBean.getTimeStamp();
}
})
);

/**
* 滚动聚合api使用示例
* 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的行为事件条数
* 使用aggregate算子来实现
*/
SingleOutputStreamOperator<Integer> resultStream = watermarkedBeanStream
.keyBy(EventBean2::getGuid)
// 参数1: 窗口长度 ; 参数2:滑动步长
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
// reduce :滚动聚合算子,它有个限制 ,聚合结果的数据类型 与 数据源中的数据类型 ,是一致
/*.reduce(new ReduceFunction<EventBean>() {
@Override
public EventBean reduce(EventBean value1, EventBean value2) throws Exception {
return null;
}
})*/
.aggregate(new AggregateFunction<EventBean2, Integer, Integer>() {
/**
* 初始化累加器
* @return
*/
@Override
public Integer createAccumulator() {
return 0;
}

/**
* 滚动聚合的逻辑(拿到一条数据,如何去更新累加器)
* @param value The value to add
* @param accumulator The accumulator to add the value to
* @return
*/
@Override
public Integer add(EventBean2 value, Integer accumulator) {
return accumulator + 1;
}

/**
* 从累加器中,计算出最终要输出的窗口结算结果
* @param accumulator The accumulator of the aggregation
* @return
*/
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}

/**
* 批计算模式下,可能需要将多个上游的局部聚合累加器,放在下游进行全局聚合
* 因为需要对两个累加器进行合并
* 这里就是合并的逻辑
* 流计算模式下,不用实现!
* @param a An accumulator to merge
* @param b Another accumulator to merge
* @return
*/
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
});
/*resultStream.print();*/


/**
* 需求 二 : 每隔10s,统计最近 30s 的数据中,每个用户的平均每次行为时长
* 要求用 aggregate 算子来做聚合
* 滚动聚合api使用示例
*/
SingleOutputStreamOperator<Double> resultStream2 = watermarkedBeanStream
.keyBy(EventBean2::getGuid)
/*.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))*/
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.milliseconds(10)))
// 泛型1: 输入的数据的类型 ; 泛型2: 累加器的数据类型 ; 泛型3: 最终结果的类型
.aggregate(new AggregateFunction<EventBean2, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0, 0);
}

@Override
public Tuple2<Integer, Integer> add(EventBean2 eventBean, Tuple2<Integer, Integer> accumulator) {
// accumulator.setField(accumulator.f0+1,0);
// accumulator.setField(accumulator.f1+eventBean.getActTimelong(),1);
// return accumulator;

return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + eventBean.getActTimelong());
}

@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {

return accumulator.f1 / (double) accumulator.f0;
}

/**
* 在批计算模式中,shuffle的上游可以做局部聚合,然后会把局部聚合结果交给下游去做全局聚合
* 因此,就需要提供 两个局部聚合结果进行合并的逻辑
*
* 在流式计算中,不存在这种 上游局部聚合和交给下游全局聚合的机制!
* 所以,在流式计算模式下,不用实现下面的方法
* @param a An accumulator to merge
* @param b Another accumulator to merge
* @return
*/
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
});
/*resultStream2.print();*/


/**
* TODO 补充练习 1
* 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的行为事件条数
* 滚动聚合api使用示例
* 使用sum算子来实现
*/
watermarkedBeanStream
.map(bean->Tuple2.of(bean,1)).returns(new TypeHint<Tuple2<EventBean2, Integer>>() {})
.keyBy(tp->tp.f0.getGuid())
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
// 数据: Tuple2<Bean,1>
.sum("f1")
/*.print()*/;


/**
* TODO 补充练习 2
* 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的最大行为时长
* 滚动聚合api使用示例
* 用max算子来实现
*/
watermarkedBeanStream
.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.max("actTimelong")
/*.print()*/;



/**
* TODO 补充练习 3
* 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的最大行为时长及其所在的那条行为记录
* 滚动聚合api使用示例
* 用maxBy算子来实现
*/
watermarkedBeanStream
.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.maxBy("actTimelong")
/*.print()*/;



/**
* TODO 补充练习 4
* 需求 一 : 每隔10s,统计最近 30s 的数据中,每个页面上发生的行为中,平均时长最大的前2种事件及其平均时长
* 用 process算子来实现
*/
watermarkedBeanStream
.keyBy(bean->bean.getPageId())
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.process(new ProcessWindowFunction<EventBean2, Tuple3<String,String,Double>, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<EventBean2, Tuple3<String,String,Double>, String, TimeWindow>.Context context, Iterable<EventBean2> elements, Collector<Tuple3<String,String,Double>> out) throws Exception {
// 构造一个hashmap来记录每一个事件的发生总次数,和行为总时长
HashMap<String, Tuple2<Integer, Long>> tmpMap = new HashMap<>();

// 遍历窗口中的每一条数据
for (EventBean2 element : elements) {
String eventId = element.getEventId();
Tuple2<Integer, Long> countAndTimelong = tmpMap.getOrDefault(eventId,Tuple2.of(0,0L));

tmpMap.put(eventId,Tuple2.of(countAndTimelong.f0+1,countAndTimelong.f1+element.getActTimelong()) );
}

// 然后,从tmpMap中,取到 平均时长 最大的前两个事件
ArrayList<Tuple2<String, Double>> tmpList = new ArrayList<>();
for (Map.Entry<String, Tuple2<Integer, Long>> entry : tmpMap.entrySet()) {
String eventId = entry.getKey();
Tuple2<Integer, Long> tuple = entry.getValue();
double avgTimelong = tuple.f1/ (double)tuple.f0;
tmpList.add(Tuple2.of(eventId,avgTimelong));
}

// 然后对tmpList按平均时长排序
Collections.sort(tmpList, new Comparator<Tuple2<String, Double>>() {
@Override
public int compare(Tuple2<String, Double> tp1, Tuple2<String, Double> tp2) {
/* return tp2.f1.compareTo(tp1.f1);*/
return Double.compare(tp2.f1,tp1.f1);
}
});

// 输出前2个
for(int i=0;i<Math.min(tmpList.size(),2);i++){
out.collect(Tuple3.of(key,tmpList.get(i).f0,tmpList.get(i).f1));
}
}
})
.print();



/**
* 全窗口计算api使用示例
* 需求 三 : 每隔10s,统计最近 30s 的数据中,每个用户的行为事件中,行为时长最长的前2条记录
* 要求用 apply 或者 process 算子来实现
*
*/
// 1. 用apply算子来实现需求
SingleOutputStreamOperator<EventBean2> resultStream3 = watermarkedBeanStream.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))

// 泛型1: 输入数据类型; 泛型2:输出结果类型; 泛型3: key的类型, 泛型4:窗口类型
.apply(new WindowFunction<EventBean2, EventBean2, Long, TimeWindow>() {
/**
*
* @param key 本次传给咱们的窗口是属于哪个key的
* @param window 本次传给咱们的窗口的各种元信息(比如本窗口的起始时间,结束时间)
* @param input 本次传给咱们的窗口中所有数据的迭代器
* @param out 结果数据输出器
* @throws Exception
*/
@Override
public void apply(Long key, TimeWindow window, Iterable<EventBean2> input, Collector<EventBean2> out) throws Exception {

// low bi写法: 从迭代器中迭代出数据,放入一个arraylist,然后排序,输出前2条
ArrayList<EventBean2> tmpList = new ArrayList<>();

// 迭代数据,存入list
for (EventBean2 eventBean2 : input) {
tmpList.add(eventBean2);
}
// 排序
Collections.sort(tmpList, new Comparator<EventBean2>() {
@Override
public int compare(EventBean2 o1, EventBean2 o2) {
return o2.getActTimelong() - o1.getActTimelong();
}
});

// 输出前2条
for (int i = 0; i < Math.min(tmpList.size(), 2); i++) {
out.collect(tmpList.get(i));
}

}
});
/*resultStream3.print();*/


// 2. 用process算子来实现需求
SingleOutputStreamOperator<String> resultStream4 = watermarkedBeanStream.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
.process(new ProcessWindowFunction<EventBean2, String, Long, TimeWindow>() {
@Override
public void process(Long aLong, ProcessWindowFunction<EventBean2, String, Long, TimeWindow>.Context context, Iterable<EventBean2> input, Collector<String> out) throws Exception {

// 本次窗口的元信息
TimeWindow window = context.window();
long maxTimestamp = window.maxTimestamp();// 本窗口允许的最大时间戳 [1000,2000) ,其中 1999就是允许的最大时间戳; 2000就是窗口的end
long windowStart = window.getStart();
long windowEnd = window.getEnd();


// low bi写法: 从迭代器中迭代出数据,放入一个arraylist,然后排序,输出前2条
ArrayList<EventBean2> tmpList = new ArrayList<>();

// 迭代数据,存入list
for (EventBean2 eventBean2 : input) {
tmpList.add(eventBean2);
}
// 排序
Collections.sort(tmpList, new Comparator<EventBean2>() {
@Override
public int compare(EventBean2 o1, EventBean2 o2) {
return o2.getActTimelong() - o1.getActTimelong();
}
});

// 输出前2条
for (int i = 0; i < Math.min(tmpList.size(), 2); i++) {
EventBean2 bean = tmpList.get(i);
out.collect( "窗口start:"+windowStart + "," +"窗口end:"+ windowEnd + "," + bean.getGuid() + "," + bean.getEventId() + "," + bean.getTimeStamp() + "," +bean.getPageId() + "," +bean.getActTimelong());
}
}

});
/*resultStream4.print();*/
env.execute();
}
}