flink table store实践之维表lookup join

背景

方案待验证

业务上需要对一张含有商品ID的表进行打宽,把商品更多属性打宽到es供上下游es查询,遇到如下问题

  • 1、商品表将作为1张维表,而这张维表数据量达到了200W+,对储存、计算(内存)存在压力

  • 2、商品表在mongodb,而生态上没有source的connecter能支持到lookup join

  • 3、首次发现mongodb Temporal join 仅支持主键_id

以前基本都是mysql cdc,没太关注mongo,但这几个问题的出现,让我首次关注到mongodb在flink生态的支持程度。那么我们就换一个思路,引入flink table store湖仓,来解决ODS到DWD再到ADS这些问题:

  • 1、使得商品表作为一张可复用的维表
  • 2、解决商品表能lookup join,且不仅仅支持主键join,还要能支持非主键join
  • 3、降低对储存、计算(内存)存在压力

DjJ3MP

模拟场景:假设我的订单明细表(order_item)有product_id,一般产品的更多产品参数信息是不会都存到订单上,我需要将我的订单明细表通过产品表(product)这个维表进行打宽,然后写到es,供上下游进行根据商品信息搜索订单,或者进行聚合统计商品订购top10.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
-- 创建并使用 FTS Catalog
CREATE CATALOG `product_catalog` WITH (
'type' = 'table-store',
'warehouse' = '/tmp/table-store-101'
);

USE CATALOG `product_catalog`;


-- ODS table schema
-- 注意在 FTS Catalog 下,创建使用其它连接器的表时,需要将表声明为临时表
-- 产品源表ods
CREATE TEMPORARY TABLE ods_product (
_id STRING,
created TIMESTAMP_LTZ(3),
mfrId STRING,
mfrName STRING,
name STRING,
ras STRING,
sn STRING,
spec STRING,
status BOOLEAN,
taxrate INT,
unit STRING,
updated TIMESTAMP_LTZ(3),
price DECIMAL(10, 5),
taxcode STRING,
clone STRING,
lastOrderAt TIMESTAMP_LTZ(3),
manual STRING,
pn STRING,
cumulativeSales INT,
isDeprecated BOOLEAN,
ship STRING,
storage STRING,
isPublic BOOLEAN,
invtCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'XXX',
'password' = 'XXX',
'database' = 'biocitydb',
'collection' = 'product'
);


-- DWD table schema
-- Create a table in table-store catalog
-- 产品入湖表dwd
CREATE TABLE `dwd_product` (
_id STRING,
created TIMESTAMP_LTZ(3),
mfrId STRING,
mfrName STRING,
name STRING,
ras STRING,
sn STRING,
spec STRING,
status BOOLEAN,
taxrate INT,
unit STRING,
updated TIMESTAMP_LTZ(3),
price DECIMAL(10, 5),
taxcode STRING,
clone STRING,
lastOrderAt TIMESTAMP_LTZ(3),
manual STRING,
pn STRING,
cumulativeSales INT,
isDeprecated BOOLEAN,
ship STRING,
storage STRING,
isPublic BOOLEAN,
invtCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
)


-- ods to dwd
-- 源表入湖
INSERT INTO
dwd_product
select
_id,
created,
mfrId,
mfrName,
name,
ras,
sn,
spec,
status,
taxrate,
unit,
updated,
price,
taxcode,
clone,
lastOrderAt,
manual,
pn,
cumulativeSales,
isDeprecated,
ship,
storage,
isPublic,
invtCode
from
ods_product;

这时候我们在flink table store 创造了一个CDC出来的维表。

任务2:订单明细表ods_order_item 与维表 dwd_product 进行 lookup join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

USE CATALOG `product_catalog`;

-- DWD table schema
-- 产品入湖表dwd(维表)
CREATE TABLE `dwd_product` (
_id STRING,
created TIMESTAMP_LTZ(3),
mfrId STRING,
mfrName STRING,
name STRING,
ras STRING,
sn STRING,
spec STRING,
status BOOLEAN,
taxrate INT,
unit STRING,
updated TIMESTAMP_LTZ(3),
price DECIMAL(10, 5),
taxcode STRING,
clone STRING,
lastOrderAt TIMESTAMP_LTZ(3),
manual STRING,
pn STRING,
cumulativeSales INT,
isDeprecated BOOLEAN,
ship STRING,
storage STRING,
isPublic BOOLEAN,
invtCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
)

-- ODS table schema
-- 注意在 FTS Catalog 下,创建使用其它连接器的表时,需要将表声明为临时表
-- 订单源表
CREATE TEMPORARY TABLE `ods_order_item` (
order_id INT,
status INT,
price INT,
order_date DATE,
product_id STRING,
proc_time AS PROCTIME(),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'XXX',
'password' = 'XXX',
'database' = 'biocitydb',
'collection' = 'order_item'
);


-- ADS table schema
-- es明细大宽表
CREATE TEMPORARY TABLE ads_es_enrich_order_item (
_id INT,
order_id STRING,
status STRING,
price DECIMAL(15,2),
order_date DATE,
product_id STRING,
mfr_name STRING, -- 打宽产品表
product_name STRING,
ras STRING,
sn STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'es_enrich_order_item'
);


-- 打宽
INSERT INTO
ads_es_enrich_order_item
SELECT
o.order_id,
o.status,
o.price,
o.order_date,
o.product_id,
p.mfrName,
p.name,
p.ras,
p.sn
FROM ods_order_item AS o
JOIN dwd_product FOR SYSTEM_TIME AS OF o.proc_time AS p
ON o.product_id = p._id; -- 这里不仅仅支持_id主键lookup join,也支持非主键

相关链接

flink table store Lookup Join
flink-table-store-101
Flink Table Store 0.3 构建流式数仓最佳实践

example for auto-create and connect table store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

CREATE TEMPORARY TABLE word_count (
word STRING PRIMARY KEY NOT ENFORCED,
cnt BIGINT
) WITH (
'connector'='table-store',
'path'='file:/tmp/word',
'auto-create'='true'
);


CREATE TEMPORARY TABLE word_table (
word STRING
) WITH (
'connector' = 'datagen',
'fields.word.length' = '1'
);

SET 'execution.checkpointing.interval' = '10 s';


INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;


SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'streaming';
SELECT * FROM word_count;