Flink之DataStream API、Table API&SQL

rDT8Lc
Flink 根据使用的便捷性和表达能力的强弱提供了 3 层 API,由上到下,表达能力逐渐增强,比如 processFunction,是最底层的 API,表达能力最强,我们可以用他来操作 state 和 timer 等复杂功能。
Datastream API 相对于 processFunction 来说,又进行了进一步封装,提供了很多标准的语义算子给大家使用,比如我们常用的 window 算子(包括 Tumble, slide,session 等)。
最上面的 SQLTable API 使用最为便捷,具有自身的很多特点:
YZGCfu

我们从DataStream API、Table API、SQL依次看看使用方式

DataStream API

构成步骤

  • 一、获取执行环境(execution environment)
  • 二、读取数据源(source)
  • 三、定义给予数据的转换操作(transformations)
  • 四、定义计算结果的输出位置(sink)
  • 五、触发程序执行(execute)

示例1:不同的数据来源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.race.wc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;

public class sourceTest {
public static void main(String[] args) throws Exception {

// 一、获取执行环境(execution environment)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 二、读取数据源(source)
// ①. 从文件中读取数据
DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");

// ②. 从集合中读取数据
ArrayList<Integer> nums = new ArrayList<>();
nums.add(2);
nums.add(5);
DataStreamSource<Integer> numStream = env.fromCollection(nums);

ArrayList<Event> events = new ArrayList<>();
events.add(new Event("Marry", "./home", 1000L));
DataStreamSource<Event> stream2 = env.fromCollection(events);

// ③. 从元素读取数据
DataStreamSource<Event> stream3 = env.fromElements(
new Event("Marry", "./home", 1000L),
new Event("Marry", "./home", 1000L),
new Event("Marry", "./home", 1000L)
);

// ④. 从socket文本流获取数据 需打开nc -l 9999
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 9999);

// ⑤. 从kafka读取数据
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

DataStreamSource<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

// 三、定义给予数据的转换操作(transformations)

// 四、定义计算结果的输出位置(sink)
stream1.print("1");
numStream.print("nums");
stream2.print("2");
stream3.print("3");
socketStream.print("socket");
kafkaStream.print("kafka");

// 五、触发程序执行(execute)
env.execute();
}
}

示例2:定义给予数据的转换操作(transformations)

1、演示 flatMap、keyBy、sum的转换操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.race.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 一、获取执行环境(execution environment)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 二、读取数据源(source)
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
// 三、定义给予数据的转换操作(transformations)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);

// 四、定义计算结果的输出位置(sink)
dataStream.print();

// 五、触发程序执行(execute)
env.execute("Window WordCount");
}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

flatMap、keyBy都可以使用 Lambda 表达式,更直观简洁
注意 :使用 lambda表达式申明Java泛型时,需要显式声明类型信息 .returns(Types.TUPLE(Types.STRING, Types.LONG))

E4f7ct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap((String sentence, Collector<Tuple2<String, Integer>> out)->{
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG)) // // 显式提供类型信息
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);

dataStream.print();

env.execute("Window WordCount");
}

2、演示 max,maxBy,min,minby的转换操作

  • max返回最大值
    如果使用max,除了keyBy的字段和参与比较大小的字段,如果还有其他字段,为了返回结果对齐,返回的结果的其他字段就是第一次出现的字段
  • maxBy 把最大值对应的元素全部返回
    使用maxBy,除了keyBy的字段和参与比较大小的字段,如果还有其他字段,会返回最大值所在数据全部的数据,另外,maxBy有第二个的参数,用来确定当比较字段出现相同时,返回之前的还是现在的,默认返回之前的,设置为false则返回新的,设置为true返回之前的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class TSource {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

List list = new ArrayList<Tuple3<Integer, Integer, String>>();
list.add(new Tuple3<>(0, 1, "a"));
list.add(new Tuple3<>(0, 3, "b"));
list.add(new Tuple3<>(0, 2, "c"));
list.add(new Tuple3<>(0, 4, "d"));
list.add(new Tuple3<>(1, 5, "a"));
list.add(new Tuple3<>(1, 2, "b"));
list.add(new Tuple3<>(1, 7, "c"));

DataStreamSource<Tuple3<Integer, Integer, String>> stringDataStreamSource = env.fromCollection(list);

KeyedStream<Tuple3<Integer, Integer, String>, Integer> result = stringDataStreamSource
.keyBy(0);

result.max(1).print("max最大值");
result.maxBy(1).print("maxBy元素");

// min,minBy同理

env.execute("测试");
}
}

原数据:

原数据:3> (0,1,a)
原数据:3> (0,3,b)
原数据:3> (0,2,c)
原数据:3> (0,4,d)
原数据:3> (1,5,a)
原数据:3> (1,2,b)
原数据:3> (1,7,c)


返回结果:
max最大值:3> (0,1,a)
max最大值:3> (0,3,a)
max最大值:3> (0,3,a)
max最大值:3> (0,4,a)
max最大值:3> (1,5,a)
max最大值:3> (1,5,a)
max最大值:3> (1,7,a)

maxBy元素:3> (0,1,a)
maxBy元素:3> (0,3,b)
maxBy元素:3> (0,3,b)
maxBy元素:3> (0,4,d)
maxBy元素:3> (1,5,a)
maxBy元素:3> (1,5,a)
maxBy元素:3> (1,7,c)

Table API & SQL

构成步骤

  • 一、创建表环境
  • 二、创建输入表 source
  • 三、创建输出表 sink
  • 四、查询转换得到一个新的表
  • 五、写入输出表

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 创建表环境
//方式1,使用setting的方式创建
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode() // 使用流处理模式
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

// 方式2, 使用StreamExecutionEnvironment来构建table env
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 创建输入表,连接外部系统读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector' = ... )");

// 注册一个表,连接到外部系统,用于输出
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");

// 方式1、执行 SQL 对表进行查询转换,得到一个新的表
// Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... ");

// 方式2、使用 Table API 对表进行查询转换,得到一个新的表
Table table2 = tableEnv.from("inputTable").select(...);

// 将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("outputTable");