Flink & Paimon & StarRocks & Dinky 流式湖仓分层实践验证
背景
在实际生产中,我们经常会有这样的需求,需要以原始数据流作为基础,然后关联大量的外部表来补充一些属性。例如,我们在订单数据中,希望能得到订单的支付信息和产品分类信息。
特点是:
- 支付信息数据量大
- 产品分类信息数据量少,修改不是太频繁
打宽数据时,我们面临的局限性:
- 双流join存储所有state成本过高。
本篇我们来实践验证一下 部分列更新Partial Update + Lookup Join 打宽做分层的可行性。降低一下流计算压力。
准备
- 源数据准备
MySql创建名称为order_dw的数据库,创建三张表,并插入相应数据。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43CREATE TABLE `orders` (
order_id bigint not null primary key,
user_id varchar(50) not null,
shop_id bigint not null,
product_id bigint not null,
buy_fee bigint not null,
create_time timestamp not null,
update_time timestamp not null default now(),
state int not null
);
CREATE TABLE `orders_pay` (
pay_id bigint not null primary key,
order_id bigint not null,
pay_platform int not null,
create_time timestamp not null
);
CREATE TABLE `product_catalog` (
product_id bigint not null primary key,
catalog_name varchar(50) not null
);
-- 准备数据
INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
INSERT INTO orders VALUES
(100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
(100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
(100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
(100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
(100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
(100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
(100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
INSERT INTO orders_pay VALUES
(2001, 100001, 1, '2023-02-15 17:40:56'),
(2002, 100002, 1, '2023-02-15 17:40:56'),
(2003, 100003, 0, '2023-02-15 17:40:56'),
(2004, 100004, 0, '2023-02-15 17:40:56'),
(2005, 100005, 0, '2023-02-15 18:40:56'),
(2006, 100006, 0, '2023-02-15 18:40:56'),
(2007, 100007, 0, '2023-02-15 18:40:56'); - 包准备
paimon-s3-1.0.1.jar
paimon-flink-action-1.0.1.jar
paimon-flink-1.20-1.0.1.jar
mysql-connector-java-8.0.27.jar
flink-sql-connector-mysql-cdc-3.3.0.jar
以上文件放进去 dinky 的opt/dinky/customJar
以及对应flink集群的lib
目录下。(该重启重启,该重新加载包重新加载包)
ODS 业务数据库实时入湖
- 原始flink 命令行提交
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18./bin/flink run -d \
./lib/paimon-flink-action-1.0.1.jar \
mysql_sync_database \
--warehouse s3://lakehouse/paimon \
--database order_dw \
--mysql_conf hostname=XXX \
--mysql_conf port=63950 \
--mysql_conf server-time-zone=Asia/Shanghai \
--mysql_conf username=root \
--mysql_conf password=XXX \
--mysql_conf database-name=order_dw \
--catalog_conf s3.endpoint=http://192.168.103.113:9010 \
--catalog_conf s3.path.style.access=true \
--catalog_conf s3.access-key=XXX \
--catalog_conf s3.secret-key=XXX \
--table_conf bucket=1 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=1 - dinky提交方式
- 创建一个flink jar任务,并且在“资源” 上传
paimon-flink-action-1.0.1.jar
包 - 正确填写jar包运行参数
- 正确选择 “程序路径” 为刚上传的jar包;
- 程序运行类填写
org.apache.paimon.flink.action.FlinkActions
; - 程序运行参数填写
DWD 清洗打宽
dinky创建dwd_orders
的flink sql任务
部分列更新 partial-update
+维表 lookup join
打宽1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77-- 定义paimon catalog
CREATE CATALOG paimoncatalog
WITH
(
'type' = 'paimon',
'warehouse' = 's3://lakehouse/paimon',
's3.endpoint' = 'http://192.168.103.113:9010',
's3.access-key' = 'XXX',
's3.secret-key' = 'XXX'
);
USE CATALOG paimoncatalog;
-- 创建 dwd_orders表
CREATE TABLE IF NOT EXISTS order_dw.dwd_orders (
order_id BIGINT,
order_user_id STRING,
order_shop_id BIGINT,
order_product_id BIGINT,
order_product_catalog_name STRING,
order_fee BIGINT,
order_create_time TIMESTAMP,
order_update_time TIMESTAMP,
order_state INT,
pay_id BIGINT,
pay_platform INT COMMENT 'platform 0: phone, 1: pc',
pay_create_time TIMESTAMP,
PRIMARY KEY (order_id) NOT ENFORCED
)
WITH
(
'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表
'partial-update.remove-record-on-delete' = 'true', -- 让partial-update支持删除
'changelog-producer' = 'full-compaction' -- 使用full-compaction或lookup增量数据产生机制以低延时产出变更数据
);
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- 订单表关联产品分类 打宽
INSERT INTO order_dw.dwd_orders
SELECT
o.order_id,
o.user_id,
o.shop_id,
o.product_id,
dim.catalog_name,
o.buy_fee,
o.create_time,
o.update_time,
o.state,
CAST(NULL AS BIGINT) AS pay_id,
CAST(NULL AS INT) AS pay_platform,
CAST(NULL AS TIMESTAMP) AS pay_create_time
FROM
(
SELECT *, PROCTIME() AS proctime FROM order_dw.orders
) o
LEFT JOIN order_dw.product_catalog FOR SYSTEM_TIME AS OF o.proctime AS dim ON o.product_id = dim.product_id -- lookup join 维表
UNION ALL -- Paimon目前暂不支持在同一个作业里通过多条INSERT语句写入同一张表,因此这里使用UNION ALL
-- 订单支付表 打宽
SELECT
order_id,
CAST(NULL AS STRING) AS user_id,
CAST(NULL AS BIGINT) AS shop_id,
CAST(NULL AS BIGINT) AS product_id,
CAST(NULL AS STRING) AS order_product_catalog_name,
CAST(NULL AS BIGINT) AS order_fee,
CAST(NULL AS TIMESTAMP) AS order_create_time,
CAST(NULL AS TIMESTAMP) AS order_update_time,
CAST(NULL AS INT) AS order_state,
pay_id,
pay_platform,
create_time
FROM
order_dw.orders_pay;
DWS 指标计算
我们目标创建DWS层的聚合表dws_users
以及dws_shops
,但是为了同时计算用户视角的聚合表以及商户视角的聚合表,我们额外创建一个以用户 + 商户为主键的中间表 dwm_users_shops
- 创建名为
dwm_users_shops
的SQL流作业,利用Paimon表的预聚合数据合并机制,自动对order_fee求和,算出用户在商户的消费总额。同时,自动对1求和,也能算出用户在商户的消费次数。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45CREATE CATALOG paimoncatalog
WITH
(
'type' = 'paimon',
'warehouse' = 's3://lakehouse/paimon',
's3.endpoint' = 'http://192.168.103.113:9010',
's3.access-key' = 'XXX',
's3.secret-key' = 'XXX'
);
USE CATALOG paimoncatalog;
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- 为了同时计算用户视角的聚合表以及商户视角的聚合表,另外创建一个以用户 + 商户为主键的中间表。
CREATE TABLE IF NOT EXISTS order_dw.dwm_users_shops (
user_id STRING,
shop_id BIGINT,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额',
pv BIGINT COMMENT '当日用户在商户购买的次数',
PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
'fields.pv.aggregate-function' = 'sum', -- 对 pv 的数据求和产生聚合结果
'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据
-- dwm层的中间表一般不直接提供上层应用查询,因此可以针对写入性能进行优化。
'file.format' = 'avro', -- 使用avro行存格式的写入性能更加高效。
'metadata.stats-mode' = 'none' -- 放弃统计信息会增加OLAP查询代价(对持续的流处理无影响),但会让写入性能更加高效。
);
INSERT INTO order_dw.dwm_users_shops
SELECT
order_user_id,
order_shop_id,
DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
order_fee,
1 -- 一条输入记录代表一次消费
FROM order_dw.dwd_orders
WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL; - 创建SQL作业
dws_shops & dws_users
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63CREATE CATALOG paimoncatalog
WITH
(
'type' = 'paimon',
'warehouse' = 's3://lakehouse/paimon',
's3.endpoint' = 'http://192.168.103.113:9010',
's3.access-key' = 'XXX',
's3.secret-key' = 'XXX'
);
USE CATALOG paimoncatalog;
-- 用户维度聚合指标表。
CREATE TABLE IF NOT EXISTS order_dw.dws_users (
user_id STRING,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额',
PRIMARY KEY (user_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
-- 由于dws_users表不再被下游流式消费,因此无需指定增量数据产生机制
);
-- 商户维度聚合指标表。
CREATE TABLE IF NOT EXISTS order_dw.dws_shops (
shop_id BIGINT,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额',
uv BIGINT COMMENT '当日不同购买用户总人数',
pv BIGINT COMMENT '当日购买用户总人次',
PRIMARY KEY (shop_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
'fields.uv.aggregate-function' = 'sum', -- 对 uv 的数据求和产生聚合结果
'fields.pv.aggregate-function' = 'sum' -- 对 pv 的数据求和产生聚合结果
-- 由于dws_shops表不再被下游流式消费,因此无需指定增量数据产生机制
);
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
INSERT INTO order_dw.dws_users
SELECT
user_id,
ds,
payed_buy_fee_sum
FROM order_dw.dwm_users_shops;
-- 以商户为主键,部分热门商户的数据量可能远高于其他商户。
-- 因此使用local merge在写入Paimon之前先在内存中进行预聚合,缓解数据倾斜问题。
INSERT INTO order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
SELECT
shop_id,
ds,
payed_buy_fee_sum,
1, -- 一条输入记录代表一名用户在该商户的所有消费
pv
FROM order_dw.dwm_users_shops;
ADS 物化视图StarRocks使用
ADS 这边使用StarRocks进行查询paimon 数据,并且构建物化视图
- 添加paimon catalog
1
2
3
4
5
6
7
8
9CREATE EXTERNAL CATALOG `paimon_lab`
PROPERTIES (
"paimon.catalog.type" = "filesystem",
"aws.s3.access_key" = "XXX",
"aws.s3.secret_key" = "XXX",
"aws.s3.endpoint" = "http://172.16.32.16:2002",
"type" = "paimon",
"paimon.catalog.warehouse" = "s3://lakehouse/paimon"
) - 查询
1
select * from paimon_lab.order_dw.dws_users
- 物化视图
1
2
3CREATE MATERIALIZED VIEW ads_users
AS
select * from paimon_lab.order_dw.dws_users
结束
使用4个job完成本例验证