Flink之广播BroadcastStream

背景

对于小变量,小数据集,需要和大数据集,大流进行联合计算的时候,往往把小数据集广播出去,整体直接和大数据集(流)的分布式最小粒度数据进行计算,最后把计算结果合并,这样效率更高,省去分布式节点之间的数据传输及二次计算。

例如:在Flink使用场景中,外部的配置文件或计算规则及维表等进行预加载,并定期更新,流式计算中广播小变量等场景。

场景预设

  • 流s1:用户行为日志(持续不断,同一个人会反复出现,次数不定)
  • 流s2:用户信息(姓名、年龄等信息,同一个数据只有一次,作为广播流)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.race.wc;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class broadcast {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// * 流s1:用户行为日志(持续不断,同一个人会反复出现,次数不定)
// * 流s2:用户信息(姓名、年龄等信息,同一个数据只有一次,作为广播刘)

// s1,行为日志流 id,eventId
DataStream<String> stream1 = env
.socketTextStream("localhost", 9999);

SingleOutputStreamOperator<Tuple2<String, String>> s1 = stream1.map(s -> {
String[] arr = s.split(",");
return Tuple2.of(arr[0], arr[1]);
}).returns(new TypeHint<Tuple2<String, String>>() {
});

// s2,用户信息流 id,age,city
DataStream<String> stream2 = env
.socketTextStream("localhost", 9999);

SingleOutputStreamOperator<Tuple3<String, String, String>> s2 = stream2.map(s -> {
String[] arr = s.split(",");
return Tuple3.of(arr[0], arr[1], arr[2]);
}).returns(new TypeHint<Tuple3<String, String, String>>() {
});

// 将s2转换为广播流 MapStateDescriptor->hashMap

MapStateDescriptor<String, Tuple2<String, String>> userInfoStateDesc = new MapStateDescriptor<>("userInfoState", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
}));

BroadcastStream<Tuple3<String, String, String>> s2BroadcastStream = s2.broadcast(userInfoStateDesc);

// s1连接s2广播流
BroadcastConnectedStream<Tuple2<String, String>, Tuple3<String, String, String>> connect = s1.connect(s2BroadcastStream);

// s1 是否keyedStream 决定用 KeyedBroadcastProcessFunction 还是 BroadcastProcessFunction
// 左流、右流的处理
SingleOutputStreamOperator<String> resultStream = connect.process(new BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>() {


/**
* 方法:s1流每来一条处理一次
* @param element 左流数据
* @param readOnlyContext 上下文,只读形式不能做修改,安全
* @param collector 输出器
*/
@Override
public void processElement(Tuple2<String, String> element, BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {

// 取到广播状态,只读
ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = readOnlyContext.getBroadcastState(userInfoStateDesc);

if (broadcastState != null) {
// 获取广播状态的用户信息
Tuple2<String, String> userInfo = broadcastState.get(element.f0);
// 返回组装数据
collector.collect(element.f0 + "," + element.f1 + "," + (userInfo == null ? null : userInfo.f0) + "," + (userInfo == null ? null : userInfo.f1));
} else {
collector.collect(element.f0 + "," + element.f1 + "," + null + "," + null);
}
}

/**
* 方法:s2广播流处理的一条数据
* @param element 左流数据
* @param context 上下文
* @param collector 输出器
*/
@Override
public void processBroadcastElement(Tuple3<String, String, String> element, BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>.Context context, Collector<String> collector) throws Exception {

// 从上下文中,获取广播状态
BroadcastState<String, Tuple2<String, String>> broadcastState = context.getBroadcastState(userInfoStateDesc);
// 然后将获得的数据 装入广播状态
broadcastState.put(element.f0, Tuple2.of(element.f1, element.f2));

}
});

resultStream.print();

env.execute();
}
}

总结