1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
| package cn.doitedu.flink.java.demos; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;
import java.time.Duration; import java.util.*;
/** * @Author: deep as the sea * @Site: <a href="www.51doit.com">多易教育</a> * @QQ: 657270652 * @Date: 2022/5/2 * @Desc: * * 测试数据 : * 1,e01,10000,p01,10 * 1,e02,11000,p02,20 * 1,e02,12000,p03,40 * 1,e03,20000,p02,10 * 1,e01,21000,p03,50 * 1,e04,22000,p04,10 * 1,e06,28000,p05,60 * 1,e07,30000,p02,10 **/ public class _20_Window_Api_Demo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 1,e01,3000,pg02 DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<EventBean2> beanStream = source.map(s -> { String[] split = s.split(","); return new EventBean2(Long.parseLong(split[0]), split[1], Long.parseLong(split[2]), split[3], Integer.parseInt(split[4])); }).returns(EventBean2.class);
// 分配 watermark ,以推进事件时间 SingleOutputStreamOperator<EventBean2> watermarkedBeanStream = beanStream.assignTimestampsAndWatermarks( WatermarkStrategy.<EventBean2>forBoundedOutOfOrderness(Duration.ofMillis(0)) .withTimestampAssigner(new SerializableTimestampAssigner<EventBean2>() { @Override public long extractTimestamp(EventBean2 eventBean, long recordTimestamp) { return eventBean.getTimeStamp(); } }) );
/** * 滚动聚合api使用示例 * 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的行为事件条数 * 使用aggregate算子来实现 */ SingleOutputStreamOperator<Integer> resultStream = watermarkedBeanStream .keyBy(EventBean2::getGuid) // 参数1: 窗口长度 ; 参数2:滑动步长 .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // reduce :滚动聚合算子,它有个限制 ,聚合结果的数据类型 与 数据源中的数据类型 ,是一致 /*.reduce(new ReduceFunction<EventBean>() { @Override public EventBean reduce(EventBean value1, EventBean value2) throws Exception { return null; } })*/ .aggregate(new AggregateFunction<EventBean2, Integer, Integer>() { /** * 初始化累加器 * @return */ @Override public Integer createAccumulator() { return 0; }
/** * 滚动聚合的逻辑(拿到一条数据,如何去更新累加器) * @param value The value to add * @param accumulator The accumulator to add the value to * @return */ @Override public Integer add(EventBean2 value, Integer accumulator) { return accumulator + 1; }
/** * 从累加器中,计算出最终要输出的窗口结算结果 * @param accumulator The accumulator of the aggregation * @return */ @Override public Integer getResult(Integer accumulator) { return accumulator; }
/** * 批计算模式下,可能需要将多个上游的局部聚合累加器,放在下游进行全局聚合 * 因为需要对两个累加器进行合并 * 这里就是合并的逻辑 * 流计算模式下,不用实现! * @param a An accumulator to merge * @param b Another accumulator to merge * @return */ @Override public Integer merge(Integer a, Integer b) { return a + b; } }); /*resultStream.print();*/
/** * 需求 二 : 每隔10s,统计最近 30s 的数据中,每个用户的平均每次行为时长 * 要求用 aggregate 算子来做聚合 * 滚动聚合api使用示例 */ SingleOutputStreamOperator<Double> resultStream2 = watermarkedBeanStream .keyBy(EventBean2::getGuid) /*.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))*/ .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.milliseconds(10))) // 泛型1: 输入的数据的类型 ; 泛型2: 累加器的数据类型 ; 泛型3: 最终结果的类型 .aggregate(new AggregateFunction<EventBean2, Tuple2<Integer, Integer>, Double>() { @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0, 0); }
@Override public Tuple2<Integer, Integer> add(EventBean2 eventBean, Tuple2<Integer, Integer> accumulator) { // accumulator.setField(accumulator.f0+1,0); // accumulator.setField(accumulator.f1+eventBean.getActTimelong(),1); // return accumulator;
return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + eventBean.getActTimelong()); }
@Override public Double getResult(Tuple2<Integer, Integer> accumulator) {
return accumulator.f1 / (double) accumulator.f0; }
/** * 在批计算模式中,shuffle的上游可以做局部聚合,然后会把局部聚合结果交给下游去做全局聚合 * 因此,就需要提供 两个局部聚合结果进行合并的逻辑 * * 在流式计算中,不存在这种 上游局部聚合和交给下游全局聚合的机制! * 所以,在流式计算模式下,不用实现下面的方法 * @param a An accumulator to merge * @param b Another accumulator to merge * @return */ @Override public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) { return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1); } }); /*resultStream2.print();*/
/** * TODO 补充练习 1 * 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的行为事件条数 * 滚动聚合api使用示例 * 使用sum算子来实现 */ watermarkedBeanStream .map(bean->Tuple2.of(bean,1)).returns(new TypeHint<Tuple2<EventBean2, Integer>>() {}) .keyBy(tp->tp.f0.getGuid()) .window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10))) // 数据: Tuple2<Bean,1> .sum("f1") /*.print()*/;
/** * TODO 补充练习 2 * 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的最大行为时长 * 滚动聚合api使用示例 * 用max算子来实现 */ watermarkedBeanStream .keyBy(EventBean2::getGuid) .window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10))) .max("actTimelong") /*.print()*/;
/** * TODO 补充练习 3 * 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的最大行为时长及其所在的那条行为记录 * 滚动聚合api使用示例 * 用maxBy算子来实现 */ watermarkedBeanStream .keyBy(EventBean2::getGuid) .window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10))) .maxBy("actTimelong") /*.print()*/;
/** * TODO 补充练习 4 * 需求 一 : 每隔10s,统计最近 30s 的数据中,每个页面上发生的行为中,平均时长最大的前2种事件及其平均时长 * 用 process算子来实现 */ watermarkedBeanStream .keyBy(bean->bean.getPageId()) .window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10))) .process(new ProcessWindowFunction<EventBean2, Tuple3<String,String,Double>, String, TimeWindow>() { @Override public void process(String key, ProcessWindowFunction<EventBean2, Tuple3<String,String,Double>, String, TimeWindow>.Context context, Iterable<EventBean2> elements, Collector<Tuple3<String,String,Double>> out) throws Exception { // 构造一个hashmap来记录每一个事件的发生总次数,和行为总时长 HashMap<String, Tuple2<Integer, Long>> tmpMap = new HashMap<>();
// 遍历窗口中的每一条数据 for (EventBean2 element : elements) { String eventId = element.getEventId(); Tuple2<Integer, Long> countAndTimelong = tmpMap.getOrDefault(eventId,Tuple2.of(0,0L));
tmpMap.put(eventId,Tuple2.of(countAndTimelong.f0+1,countAndTimelong.f1+element.getActTimelong()) ); }
// 然后,从tmpMap中,取到 平均时长 最大的前两个事件 ArrayList<Tuple2<String, Double>> tmpList = new ArrayList<>(); for (Map.Entry<String, Tuple2<Integer, Long>> entry : tmpMap.entrySet()) { String eventId = entry.getKey(); Tuple2<Integer, Long> tuple = entry.getValue(); double avgTimelong = tuple.f1/ (double)tuple.f0; tmpList.add(Tuple2.of(eventId,avgTimelong)); }
// 然后对tmpList按平均时长排序 Collections.sort(tmpList, new Comparator<Tuple2<String, Double>>() { @Override public int compare(Tuple2<String, Double> tp1, Tuple2<String, Double> tp2) { /* return tp2.f1.compareTo(tp1.f1);*/ return Double.compare(tp2.f1,tp1.f1); } });
// 输出前2个 for(int i=0;i<Math.min(tmpList.size(),2);i++){ out.collect(Tuple3.of(key,tmpList.get(i).f0,tmpList.get(i).f1)); } } }) .print();
/** * 全窗口计算api使用示例 * 需求 三 : 每隔10s,统计最近 30s 的数据中,每个用户的行为事件中,行为时长最长的前2条记录 * 要求用 apply 或者 process 算子来实现 * */ // 1. 用apply算子来实现需求 SingleOutputStreamOperator<EventBean2> resultStream3 = watermarkedBeanStream.keyBy(EventBean2::getGuid) .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
// 泛型1: 输入数据类型; 泛型2:输出结果类型; 泛型3: key的类型, 泛型4:窗口类型 .apply(new WindowFunction<EventBean2, EventBean2, Long, TimeWindow>() { /** * * @param key 本次传给咱们的窗口是属于哪个key的 * @param window 本次传给咱们的窗口的各种元信息(比如本窗口的起始时间,结束时间) * @param input 本次传给咱们的窗口中所有数据的迭代器 * @param out 结果数据输出器 * @throws Exception */ @Override public void apply(Long key, TimeWindow window, Iterable<EventBean2> input, Collector<EventBean2> out) throws Exception {
// low bi写法: 从迭代器中迭代出数据,放入一个arraylist,然后排序,输出前2条 ArrayList<EventBean2> tmpList = new ArrayList<>();
// 迭代数据,存入list for (EventBean2 eventBean2 : input) { tmpList.add(eventBean2); } // 排序 Collections.sort(tmpList, new Comparator<EventBean2>() { @Override public int compare(EventBean2 o1, EventBean2 o2) { return o2.getActTimelong() - o1.getActTimelong(); } });
// 输出前2条 for (int i = 0; i < Math.min(tmpList.size(), 2); i++) { out.collect(tmpList.get(i)); }
} }); /*resultStream3.print();*/
// 2. 用process算子来实现需求 SingleOutputStreamOperator<String> resultStream4 = watermarkedBeanStream.keyBy(EventBean2::getGuid) .window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) .process(new ProcessWindowFunction<EventBean2, String, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<EventBean2, String, Long, TimeWindow>.Context context, Iterable<EventBean2> input, Collector<String> out) throws Exception {
// 本次窗口的元信息 TimeWindow window = context.window(); long maxTimestamp = window.maxTimestamp();// 本窗口允许的最大时间戳 [1000,2000) ,其中 1999就是允许的最大时间戳; 2000就是窗口的end long windowStart = window.getStart(); long windowEnd = window.getEnd();
// low bi写法: 从迭代器中迭代出数据,放入一个arraylist,然后排序,输出前2条 ArrayList<EventBean2> tmpList = new ArrayList<>();
// 迭代数据,存入list for (EventBean2 eventBean2 : input) { tmpList.add(eventBean2); } // 排序 Collections.sort(tmpList, new Comparator<EventBean2>() { @Override public int compare(EventBean2 o1, EventBean2 o2) { return o2.getActTimelong() - o1.getActTimelong(); } });
// 输出前2条 for (int i = 0; i < Math.min(tmpList.size(), 2); i++) { EventBean2 bean = tmpList.get(i); out.collect( "窗口start:"+windowStart + "," +"窗口end:"+ windowEnd + "," + bean.getGuid() + "," + bean.getEventId() + "," + bean.getTimeStamp() + "," +bean.getPageId() + "," +bean.getActTimelong()); } }
}); /*resultStream4.print();*/ env.execute(); } }
|