Flink之process、广播、主流、侧流、分组、分组最大值、sink实践

需求

  • 流1 eventCtn

    id eventId cnt
    1 event01 3
    1 event02 2
    2 event02 4
  • 流2 userInfo

    id gender city
    1 male shanghai
    2 female beijing
  1. 将流1的数据展开
    比如,一条数据:1,event01,3需要展开成3条:
    • 1,event01,随机数1
    • 1,event01,随机数2
    • 1,event01,随机数3
  2. 流1的数据,还需要关联上流2的数据(性别,城市)
    • 并且把关联失败的流1的数据,写入一个测流,否则输出到主流
  3. 对主流数据按照性别分组,取最大随机数所在的那一条数据,作为结果输出
  4. 把测流处理结果,写入文件系统,并写成parquet格式
  5. 把主流处理结果,写入mysql,并实现幂等更新

实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.race.wc.exercise;

import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Exercise {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建流s1
DataStream<String> ds1 = env
.socketTextStream("localhost", 9991);
SingleOutputStreamOperator<EventCount> s1 = ds1.map(s -> {
String[] arr = s.split(",");
return new EventCount(Integer.parseInt(arr[0]), arr[1], Integer.parseInt(arr[2]));
});

// 创建s2
DataStream<String> ds2 = env
.socketTextStream("localhost", 9991);
SingleOutputStreamOperator<UserInfo> s2 = ds2.map(s -> {
String[] arr = s.split(",");
return new UserInfo(Integer.parseInt(arr[0]), arr[1], arr[1]);
});
// 将流1的数据展开
//比如,一条数据:1,event01,3需要展开成3条:
//1,event01,随机数1
//1,event01,随机数2
//1,event01,随机数3
SingleOutputStreamOperator<EventCount> flattened = s1.process(new ProcessFunction<EventCount, EventCount>() {
@Override
public void processElement(EventCount eventCount, ProcessFunction<EventCount, EventCount>.Context context, Collector<EventCount> collector) {
// 去除count
int cnt = eventCount.getCnt();
for (int i = 1; i < cnt; i++) {
collector.collect(new EventCount(eventCount.getId(), eventCount.getEventId(), RandomUtils.nextInt(10, 100)));
}
}
});
// 广播流2
// 准备一个广播状态描述器
MapStateDescriptor<Integer, UserInfo> stateDescriptor = new MapStateDescriptor<>("s", Integer.class, UserInfo.class);
// 准备一个测流输出标签
OutputTag<EventCount> cOutputTag = new OutputTag<>("c", TypeInformation.of(EventCount.class));
BroadcastStream<UserInfo> broadcast2 = s2.broadcast(stateDescriptor);
// 连接s1和s2广播流
// 流1的数据,还需要关联上流2的数据(性别,城市)
// 并且把关联失败的流1的数据,写入一个测流,否则输出到主流
BroadcastConnectedStream<EventCount, UserInfo> connectedStream = flattened.connect(broadcast2);
SingleOutputStreamOperator<EventUserInfo> joinedResult = connectedStream.process(new BroadcastProcessFunction<EventCount, UserInfo, EventUserInfo>() {
// 主流
@Override
public void processElement(EventCount eventCount, BroadcastProcessFunction<EventCount, UserInfo, EventUserInfo>.ReadOnlyContext readOnlyContext, Collector<EventUserInfo> collector) throws Exception {
ReadOnlyBroadcastState<Integer, UserInfo> broadcastState = readOnlyContext.getBroadcastState(stateDescriptor);
UserInfo userInfo;
if (broadcastState != null && (userInfo = broadcastState.get(eventCount.getId())) != null) {
collector.collect(new EventUserInfo(eventCount.getId(), eventCount.getEventId(), eventCount.getCnt(), userInfo.getGender(), userInfo.getCity()));
} else {
// 关联失败的,输出测流
readOnlyContext.output(cOutputTag, eventCount);
}
}
// 广播流处理方法
@Override
public void processBroadcastElement(UserInfo userInfo, BroadcastProcessFunction<EventCount, UserInfo, EventUserInfo>.Context context, Collector<EventUserInfo> collector) throws Exception {
// 数据放入广播状态
BroadcastState<Integer, UserInfo> broadcastState = context.getBroadcastState(stateDescriptor);
broadcastState.put(userInfo.getId(), userInfo);
}
});
// 对主流数据按照性别分组,取最大随机数所在的那一条数据,作为结果输出
SingleOutputStreamOperator<EventUserInfo> mainResult = joinedResult
.keyBy(EventUserInfo::getGender)
.maxBy("cnt");

mainResult.print("main");
joinedResult.getSideOutput(cOutputTag).print("side");
env.execute();

}
}