Flink & Paimon & StarRocks & Dinky 流式湖仓分层实践验证

vOjcqn

背景

Ilnldj
在实际生产中,我们经常会有这样的需求,需要以原始数据流作为基础,然后关联大量的外部表来补充一些属性。例如,我们在订单数据中,希望能得到订单的支付信息和产品分类信息。
特点是:

  • 支付信息数据量大
  • 产品分类信息数据量少,修改不是太频繁

打宽数据时,我们面临的局限性:

  • 双流join存储所有state成本过高。

本篇我们来实践验证一下 部分列更新Partial Update + Lookup Join 打宽做分层的可行性。降低一下流计算压力。

准备

  • 源数据准备
    MySql创建名称为order_dw的数据库,创建三张表,并插入相应数据。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    CREATE TABLE `orders` (
    order_id bigint not null primary key,
    user_id varchar(50) not null,
    shop_id bigint not null,
    product_id bigint not null,
    buy_fee bigint not null,
    create_time timestamp not null,
    update_time timestamp not null default now(),
    state int not null
    );

    CREATE TABLE `orders_pay` (
    pay_id bigint not null primary key,
    order_id bigint not null,
    pay_platform int not null,
    create_time timestamp not null
    );

    CREATE TABLE `product_catalog` (
    product_id bigint not null primary key,
    catalog_name varchar(50) not null
    );

    -- 准备数据
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');

    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);

    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');
  • 包准备
    • paimon-s3-1.0.1.jar
    • paimon-flink-action-1.0.1.jar
    • paimon-flink-1.20-1.0.1.jar
    • mysql-connector-java-8.0.27.jar
    • flink-sql-connector-mysql-cdc-3.3.0.jar
      以上文件放进去 dinky 的 opt/dinky/customJar 以及对应flink集群的 lib 目录下。(该重启重启,该重新加载包重新加载包)

ODS 业务数据库实时入湖

  • 原始flink 命令行提交
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    ./bin/flink run -d \
    ./lib/paimon-flink-action-1.0.1.jar \
    mysql_sync_database \
    --warehouse s3://lakehouse/paimon \
    --database order_dw \
    --mysql_conf hostname=XXX \
    --mysql_conf port=63950 \
    --mysql_conf server-time-zone=Asia/Shanghai \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=order_dw \
    --catalog_conf s3.endpoint=http://192.168.103.113:9010 \
    --catalog_conf s3.path.style.access=true \
    --catalog_conf s3.access-key=XXX \
    --catalog_conf s3.secret-key=XXX \
    --table_conf bucket=1 \
    --table_conf changelog-producer=input \
    --table_conf sink.parallelism=1
  • dinky提交方式
    WSTA5i
  • 创建一个flink jar任务,并且在“资源” 上传 paimon-flink-action-1.0.1.jar
  • 正确填写jar包运行参数
    • 正确选择 “程序路径” 为刚上传的jar包;
    • 程序运行类填写 org.apache.paimon.flink.action.FlinkActions;
    • 程序运行参数填写

DWD 清洗打宽

dinky创建dwd_orders的flink sql任务
gno0hu

  • 部分列更新 partial-update + 维表 lookup join 打宽
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    -- 定义paimon catalog
    CREATE CATALOG paimoncatalog
    WITH
    (
    'type' = 'paimon',
    'warehouse' = 's3://lakehouse/paimon',
    's3.endpoint' = 'http://192.168.103.113:9010',
    's3.access-key' = 'XXX',
    's3.secret-key' = 'XXX'
    );

    USE CATALOG paimoncatalog;
    -- 创建 dwd_orders表
    CREATE TABLE IF NOT EXISTS order_dw.dwd_orders (
    order_id BIGINT,
    order_user_id STRING,
    order_shop_id BIGINT,
    order_product_id BIGINT,
    order_product_catalog_name STRING,
    order_fee BIGINT,
    order_create_time TIMESTAMP,
    order_update_time TIMESTAMP,
    order_state INT,
    pay_id BIGINT,
    pay_platform INT COMMENT 'platform 0: phone, 1: pc',
    pay_create_time TIMESTAMP,
    PRIMARY KEY (order_id) NOT ENFORCED
    )
    WITH
    (
    'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表
    'partial-update.remove-record-on-delete' = 'true', -- 让partial-update支持删除
    'changelog-producer' = 'full-compaction' -- 使用full-compaction或lookup增量数据产生机制以低延时产出变更数据
    );

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';

    -- 订单表关联产品分类 打宽
    INSERT INTO order_dw.dwd_orders
    SELECT
    o.order_id,
    o.user_id,
    o.shop_id,
    o.product_id,
    dim.catalog_name,
    o.buy_fee,
    o.create_time,
    o.update_time,
    o.state,
    CAST(NULL AS BIGINT) AS pay_id,
    CAST(NULL AS INT) AS pay_platform,
    CAST(NULL AS TIMESTAMP) AS pay_create_time
    FROM
    (
    SELECT *, PROCTIME() AS proctime FROM order_dw.orders
    ) o
    LEFT JOIN order_dw.product_catalog FOR SYSTEM_TIME AS OF o.proctime AS dim ON o.product_id = dim.product_id -- lookup join 维表
    UNION ALL -- Paimon目前暂不支持在同一个作业里通过多条INSERT语句写入同一张表,因此这里使用UNION ALL
    -- 订单支付表 打宽
    SELECT
    order_id,
    CAST(NULL AS STRING) AS user_id,
    CAST(NULL AS BIGINT) AS shop_id,
    CAST(NULL AS BIGINT) AS product_id,
    CAST(NULL AS STRING) AS order_product_catalog_name,
    CAST(NULL AS BIGINT) AS order_fee,
    CAST(NULL AS TIMESTAMP) AS order_create_time,
    CAST(NULL AS TIMESTAMP) AS order_update_time,
    CAST(NULL AS INT) AS order_state,
    pay_id,
    pay_platform,
    create_time
    FROM
    order_dw.orders_pay;
    oTtvEK

s1AjzS

DWS 指标计算

我们目标创建DWS层的聚合表dws_users以及dws_shops,但是为了同时计算用户视角的聚合表以及商户视角的聚合表,我们额外创建一个以用户 + 商户为主键的中间表 dwm_users_shops

  • 创建名为dwm_users_shops的SQL流作业,利用Paimon表的预聚合数据合并机制,自动对order_fee求和,算出用户在商户的消费总额。同时,自动对1求和,也能算出用户在商户的消费次数。
    WYDcWZ
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    CREATE CATALOG paimoncatalog
    WITH
    (
    'type' = 'paimon',
    'warehouse' = 's3://lakehouse/paimon',
    's3.endpoint' = 'http://192.168.103.113:9010',
    's3.access-key' = 'XXX',
    's3.secret-key' = 'XXX'
    );

    USE CATALOG paimoncatalog;

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';

    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';

    -- 为了同时计算用户视角的聚合表以及商户视角的聚合表,另外创建一个以用户 + 商户为主键的中间表。
    CREATE TABLE IF NOT EXISTS order_dw.dwm_users_shops (
    user_id STRING,
    shop_id BIGINT,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额',
    pv BIGINT COMMENT '当日用户在商户购买的次数',
    PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
    ) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    'fields.pv.aggregate-function' = 'sum', -- 对 pv 的数据求和产生聚合结果
    'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据
    -- dwm层的中间表一般不直接提供上层应用查询,因此可以针对写入性能进行优化。
    'file.format' = 'avro', -- 使用avro行存格式的写入性能更加高效。
    'metadata.stats-mode' = 'none' -- 放弃统计信息会增加OLAP查询代价(对持续的流处理无影响),但会让写入性能更加高效。
    );

    INSERT INTO order_dw.dwm_users_shops
    SELECT
    order_user_id,
    order_shop_id,
    DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
    order_fee,
    1 -- 一条输入记录代表一次消费
    FROM order_dw.dwd_orders
    WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
    FeWBHr
    JnwTVK
  • 创建SQL作业 dws_shops & dws_users
    a0sZjg
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    CREATE CATALOG paimoncatalog
    WITH
    (
    'type' = 'paimon',
    'warehouse' = 's3://lakehouse/paimon',
    's3.endpoint' = 'http://192.168.103.113:9010',
    's3.access-key' = 'XXX',
    's3.secret-key' = 'XXX'
    );

    USE CATALOG paimoncatalog;

    -- 用户维度聚合指标表。
    CREATE TABLE IF NOT EXISTS order_dw.dws_users (
    user_id STRING,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额',
    PRIMARY KEY (user_id, ds) NOT ENFORCED
    ) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    -- 由于dws_users表不再被下游流式消费,因此无需指定增量数据产生机制
    );

    -- 商户维度聚合指标表。
    CREATE TABLE IF NOT EXISTS order_dw.dws_shops (
    shop_id BIGINT,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额',
    uv BIGINT COMMENT '当日不同购买用户总人数',
    pv BIGINT COMMENT '当日购买用户总人次',
    PRIMARY KEY (shop_id, ds) NOT ENFORCED
    ) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    'fields.uv.aggregate-function' = 'sum', -- 对 uv 的数据求和产生聚合结果
    'fields.pv.aggregate-function' = 'sum' -- 对 pv 的数据求和产生聚合结果
    -- 由于dws_shops表不再被下游流式消费,因此无需指定增量数据产生机制
    );

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';

    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';

    INSERT INTO order_dw.dws_users
    SELECT
    user_id,
    ds,
    payed_buy_fee_sum
    FROM order_dw.dwm_users_shops;

    -- 以商户为主键,部分热门商户的数据量可能远高于其他商户。
    -- 因此使用local merge在写入Paimon之前先在内存中进行预聚合,缓解数据倾斜问题。
    INSERT INTO order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
    SELECT
    shop_id,
    ds,
    payed_buy_fee_sum,
    1, -- 一条输入记录代表一名用户在该商户的所有消费
    pv
    FROM order_dw.dwm_users_shops;
    Mzglef
    5oGgHE
    RS5JtQ

ADS 物化视图StarRocks使用

ADS 这边使用StarRocks进行查询paimon 数据,并且构建物化视图

  • 添加paimon catalog
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE EXTERNAL CATALOG `paimon_lab`
    PROPERTIES (
    "paimon.catalog.type" = "filesystem",
    "aws.s3.access_key" = "XXX",
    "aws.s3.secret_key" = "XXX",
    "aws.s3.endpoint" = "http://172.16.32.16:2002",
    "type" = "paimon",
    "paimon.catalog.warehouse" = "s3://lakehouse/paimon"
    )
  • 查询
    1
    select * from paimon_lab.order_dw.dws_users
    d5VhQg
  • 物化视图
    1
    2
    3
    CREATE MATERIALIZED VIEW ads_users
    AS
    select * from paimon_lab.order_dw.dws_users

结束

使用4个job完成本例验证
hHiqQW

相关

优刻得:使用USDP实践近实时数据湖仓