1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| package com.race.wc;
import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;
public class broadcast { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
// * 流s1:用户行为日志(持续不断,同一个人会反复出现,次数不定) // * 流s2:用户信息(姓名、年龄等信息,同一个数据只有一次,作为广播刘)
// s1,行为日志流 id,eventId DataStream<String> stream1 = env .socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, String>> s1 = stream1.map(s -> { String[] arr = s.split(","); return Tuple2.of(arr[0], arr[1]); }).returns(new TypeHint<Tuple2<String, String>>() { });
// s2,用户信息流 id,age,city DataStream<String> stream2 = env .socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple3<String, String, String>> s2 = stream2.map(s -> { String[] arr = s.split(","); return Tuple3.of(arr[0], arr[1], arr[2]); }).returns(new TypeHint<Tuple3<String, String, String>>() { });
// 将s2转换为广播流 MapStateDescriptor->hashMap
MapStateDescriptor<String, Tuple2<String, String>> userInfoStateDesc = new MapStateDescriptor<>("userInfoState", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() { }));
BroadcastStream<Tuple3<String, String, String>> s2BroadcastStream = s2.broadcast(userInfoStateDesc);
// s1连接s2广播流 BroadcastConnectedStream<Tuple2<String, String>, Tuple3<String, String, String>> connect = s1.connect(s2BroadcastStream);
// s1 是否keyedStream 决定用 KeyedBroadcastProcessFunction 还是 BroadcastProcessFunction // 左流、右流的处理 SingleOutputStreamOperator<String> resultStream = connect.process(new BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>() {
/** * 方法:s1流每来一条处理一次 * @param element 左流数据 * @param readOnlyContext 上下文,只读形式不能做修改,安全 * @param collector 输出器 */ @Override public void processElement(Tuple2<String, String> element, BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
// 取到广播状态,只读 ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = readOnlyContext.getBroadcastState(userInfoStateDesc);
if (broadcastState != null) { // 获取广播状态的用户信息 Tuple2<String, String> userInfo = broadcastState.get(element.f0); // 返回组装数据 collector.collect(element.f0 + "," + element.f1 + "," + (userInfo == null ? null : userInfo.f0) + "," + (userInfo == null ? null : userInfo.f1)); } else { collector.collect(element.f0 + "," + element.f1 + "," + null + "," + null); } }
/** * 方法:s2广播流处理的一条数据 * @param element 左流数据 * @param context 上下文 * @param collector 输出器 */ @Override public void processBroadcastElement(Tuple3<String, String, String> element, BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>.Context context, Collector<String> collector) throws Exception {
// 从上下文中,获取广播状态 BroadcastState<String, Tuple2<String, String>> broadcastState = context.getBroadcastState(userInfoStateDesc); // 然后将获得的数据 装入广播状态 broadcastState.put(element.f0, Tuple2.of(element.f1, element.f2));
} });
resultStream.print();
env.execute(); } }
|