1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| package com.race.wc.exercise;
import org.apache.commons.lang3.RandomUtils; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;
public class Exercise { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 创建流s1 DataStream<String> ds1 = env .socketTextStream("localhost", 9991); SingleOutputStreamOperator<EventCount> s1 = ds1.map(s -> { String[] arr = s.split(","); return new EventCount(Integer.parseInt(arr[0]), arr[1], Integer.parseInt(arr[2])); });
// 创建s2 DataStream<String> ds2 = env .socketTextStream("localhost", 9991); SingleOutputStreamOperator<UserInfo> s2 = ds2.map(s -> { String[] arr = s.split(","); return new UserInfo(Integer.parseInt(arr[0]), arr[1], arr[1]); }); // 将流1的数据展开 //比如,一条数据:1,event01,3需要展开成3条: //1,event01,随机数1 //1,event01,随机数2 //1,event01,随机数3 SingleOutputStreamOperator<EventCount> flattened = s1.process(new ProcessFunction<EventCount, EventCount>() { @Override public void processElement(EventCount eventCount, ProcessFunction<EventCount, EventCount>.Context context, Collector<EventCount> collector) { // 去除count int cnt = eventCount.getCnt(); for (int i = 1; i < cnt; i++) { collector.collect(new EventCount(eventCount.getId(), eventCount.getEventId(), RandomUtils.nextInt(10, 100))); } } }); // 广播流2 // 准备一个广播状态描述器 MapStateDescriptor<Integer, UserInfo> stateDescriptor = new MapStateDescriptor<>("s", Integer.class, UserInfo.class); // 准备一个测流输出标签 OutputTag<EventCount> cOutputTag = new OutputTag<>("c", TypeInformation.of(EventCount.class)); BroadcastStream<UserInfo> broadcast2 = s2.broadcast(stateDescriptor); // 连接s1和s2广播流 // 流1的数据,还需要关联上流2的数据(性别,城市) // 并且把关联失败的流1的数据,写入一个测流,否则输出到主流 BroadcastConnectedStream<EventCount, UserInfo> connectedStream = flattened.connect(broadcast2); SingleOutputStreamOperator<EventUserInfo> joinedResult = connectedStream.process(new BroadcastProcessFunction<EventCount, UserInfo, EventUserInfo>() { // 主流 @Override public void processElement(EventCount eventCount, BroadcastProcessFunction<EventCount, UserInfo, EventUserInfo>.ReadOnlyContext readOnlyContext, Collector<EventUserInfo> collector) throws Exception { ReadOnlyBroadcastState<Integer, UserInfo> broadcastState = readOnlyContext.getBroadcastState(stateDescriptor); UserInfo userInfo; if (broadcastState != null && (userInfo = broadcastState.get(eventCount.getId())) != null) { collector.collect(new EventUserInfo(eventCount.getId(), eventCount.getEventId(), eventCount.getCnt(), userInfo.getGender(), userInfo.getCity())); } else { // 关联失败的,输出测流 readOnlyContext.output(cOutputTag, eventCount); } } // 广播流处理方法 @Override public void processBroadcastElement(UserInfo userInfo, BroadcastProcessFunction<EventCount, UserInfo, EventUserInfo>.Context context, Collector<EventUserInfo> collector) throws Exception { // 数据放入广播状态 BroadcastState<Integer, UserInfo> broadcastState = context.getBroadcastState(stateDescriptor); broadcastState.put(userInfo.getId(), userInfo); } }); // 对主流数据按照性别分组,取最大随机数所在的那一条数据,作为结果输出 SingleOutputStreamOperator<EventUserInfo> mainResult = joinedResult .keyBy(EventUserInfo::getGender) .maxBy("cnt"); mainResult.print("main"); joinedResult.getSideOutput(cOutputTag).print("side"); env.execute();
} }
|