vOjcqn

背景

Ilnldj
在实际生产中,我们经常会有这样的需求,需要以原始数据流作为基础,然后关联大量的外部表来补充一些属性。例如,我们在订单数据中,希望能得到订单的支付信息和产品分类信息。
特点是:

  • 支付信息数据量大
  • 产品分类信息数据量少,修改不是太频繁

打宽数据时,我们面临的局限性:

  • 双流join存储所有state成本过高。

本篇我们来实践验证一下 部分列更新Partial Update + Lookup Join 打宽做分层的可行性。降低一下流计算压力。

准备

  • 源数据准备
    MySql创建名称为order_dw的数据库,创建三张表,并插入相应数据。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    CREATE TABLE `orders` (
    order_id bigint not null primary key,
    user_id varchar(50) not null,
    shop_id bigint not null,
    product_id bigint not null,
    buy_fee bigint not null,
    create_time timestamp not null,
    update_time timestamp not null default now(),
    state int not null
    );

    CREATE TABLE `orders_pay` (
    pay_id bigint not null primary key,
    order_id bigint not null,
    pay_platform int not null,
    create_time timestamp not null
    );

    CREATE TABLE `product_catalog` (
    product_id bigint not null primary key,
    catalog_name varchar(50) not null
    );

    -- 准备数据
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');

    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);

    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');
  • 包准备
    • paimon-s3-1.0.1.jar
    • paimon-flink-action-1.0.1.jar
    • paimon-flink-1.20-1.0.1.jar
    • mysql-connector-java-8.0.27.jar
    • flink-sql-connector-mysql-cdc-3.3.0.jar
      以上文件放进去 dinky 的 opt/dinky/customJar 以及对应flink集群的 lib 目录下。(该重启重启,该重新加载包重新加载包)

ODS 业务数据库实时入湖

  • 原始flink 命令行提交
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    ./bin/flink run -d \
    ./lib/paimon-flink-action-1.0.1.jar \
    mysql_sync_database \
    --warehouse s3://lakehouse/paimon \
    --database order_dw \
    --mysql_conf hostname=XXX \
    --mysql_conf port=63950 \
    --mysql_conf server-time-zone=Asia/Shanghai \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=order_dw \
    --catalog_conf s3.endpoint=http://192.168.103.113:9010 \
    --catalog_conf s3.path.style.access=true \
    --catalog_conf s3.access-key=XXX \
    --catalog_conf s3.secret-key=XXX \
    --table_conf bucket=1 \
    --table_conf changelog-producer=input \
    --table_conf sink.parallelism=1
  • dinky提交方式
    WSTA5i
  • 创建一个flink jar任务,并且在“资源” 上传 paimon-flink-action-1.0.1.jar
  • 正确填写jar包运行参数
    • 正确选择 “程序路径” 为刚上传的jar包;
    • 程序运行类填写 org.apache.paimon.flink.action.FlinkActions;
    • 程序运行参数填写

DWD 清洗打宽

dinky创建dwd_orders的flink sql任务
gno0hu

  • 部分列更新 partial-update + 维表 lookup join 打宽
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    -- 定义paimon catalog
    CREATE CATALOG paimoncatalog
    WITH
    (
    'type' = 'paimon',
    'warehouse' = 's3://lakehouse/paimon',
    's3.endpoint' = 'http://192.168.103.113:9010',
    's3.access-key' = 'XXX',
    's3.secret-key' = 'XXX'
    );

    USE CATALOG paimoncatalog;
    -- 创建 dwd_orders表
    CREATE TABLE IF NOT EXISTS order_dw.dwd_orders (
    order_id BIGINT,
    order_user_id STRING,
    order_shop_id BIGINT,
    order_product_id BIGINT,
    order_product_catalog_name STRING,
    order_fee BIGINT,
    order_create_time TIMESTAMP,
    order_update_time TIMESTAMP,
    order_state INT,
    pay_id BIGINT,
    pay_platform INT COMMENT 'platform 0: phone, 1: pc',
    pay_create_time TIMESTAMP,
    PRIMARY KEY (order_id) NOT ENFORCED
    )
    WITH
    (
    'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表
    'partial-update.remove-record-on-delete' = 'true', -- 让partial-update支持删除
    'changelog-producer' = 'full-compaction' -- 使用full-compaction或lookup增量数据产生机制以低延时产出变更数据
    );

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';

    -- 订单表关联产品分类 打宽
    INSERT INTO order_dw.dwd_orders
    SELECT
    o.order_id,
    o.user_id,
    o.shop_id,
    o.product_id,
    dim.catalog_name,
    o.buy_fee,
    o.create_time,
    o.update_time,
    o.state,
    CAST(NULL AS BIGINT) AS pay_id,
    CAST(NULL AS INT) AS pay_platform,
    CAST(NULL AS TIMESTAMP) AS pay_create_time
    FROM
    (
    SELECT *, PROCTIME() AS proctime FROM order_dw.orders
    ) o
    LEFT JOIN order_dw.product_catalog FOR SYSTEM_TIME AS OF o.proctime AS dim ON o.product_id = dim.product_id -- lookup join 维表
    UNION ALL -- Paimon目前暂不支持在同一个作业里通过多条INSERT语句写入同一张表,因此这里使用UNION ALL
    -- 订单支付表 打宽
    SELECT
    order_id,
    CAST(NULL AS STRING) AS user_id,
    CAST(NULL AS BIGINT) AS shop_id,
    CAST(NULL AS BIGINT) AS product_id,
    CAST(NULL AS STRING) AS order_product_catalog_name,
    CAST(NULL AS BIGINT) AS order_fee,
    CAST(NULL AS TIMESTAMP) AS order_create_time,
    CAST(NULL AS TIMESTAMP) AS order_update_time,
    CAST(NULL AS INT) AS order_state,
    pay_id,
    pay_platform,
    create_time
    FROM
    order_dw.orders_pay;
    oTtvEK

s1AjzS

DWS 指标计算

我们目标创建DWS层的聚合表dws_users以及dws_shops,但是为了同时计算用户视角的聚合表以及商户视角的聚合表,我们额外创建一个以用户 + 商户为主键的中间表 dwm_users_shops

  • 创建名为dwm_users_shops的SQL流作业,利用Paimon表的预聚合数据合并机制,自动对order_fee求和,算出用户在商户的消费总额。同时,自动对1求和,也能算出用户在商户的消费次数。
    WYDcWZ
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    CREATE CATALOG paimoncatalog
    WITH
    (
    'type' = 'paimon',
    'warehouse' = 's3://lakehouse/paimon',
    's3.endpoint' = 'http://192.168.103.113:9010',
    's3.access-key' = 'XXX',
    's3.secret-key' = 'XXX'
    );

    USE CATALOG paimoncatalog;

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';

    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';

    -- 为了同时计算用户视角的聚合表以及商户视角的聚合表,另外创建一个以用户 + 商户为主键的中间表。
    CREATE TABLE IF NOT EXISTS order_dw.dwm_users_shops (
    user_id STRING,
    shop_id BIGINT,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额',
    pv BIGINT COMMENT '当日用户在商户购买的次数',
    PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
    ) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    'fields.pv.aggregate-function' = 'sum', -- 对 pv 的数据求和产生聚合结果
    'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据
    -- dwm层的中间表一般不直接提供上层应用查询,因此可以针对写入性能进行优化。
    'file.format' = 'avro', -- 使用avro行存格式的写入性能更加高效。
    'metadata.stats-mode' = 'none' -- 放弃统计信息会增加OLAP查询代价(对持续的流处理无影响),但会让写入性能更加高效。
    );

    INSERT INTO order_dw.dwm_users_shops
    SELECT
    order_user_id,
    order_shop_id,
    DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
    order_fee,
    1 -- 一条输入记录代表一次消费
    FROM order_dw.dwd_orders
    WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
    FeWBHr
    JnwTVK
  • 创建SQL作业 dws_shops & dws_users
    a0sZjg
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    CREATE CATALOG paimoncatalog
    WITH
    (
    'type' = 'paimon',
    'warehouse' = 's3://lakehouse/paimon',
    's3.endpoint' = 'http://192.168.103.113:9010',
    's3.access-key' = 'XXX',
    's3.secret-key' = 'XXX'
    );

    USE CATALOG paimoncatalog;

    -- 用户维度聚合指标表。
    CREATE TABLE IF NOT EXISTS order_dw.dws_users (
    user_id STRING,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额',
    PRIMARY KEY (user_id, ds) NOT ENFORCED
    ) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    -- 由于dws_users表不再被下游流式消费,因此无需指定增量数据产生机制
    );

    -- 商户维度聚合指标表。
    CREATE TABLE IF NOT EXISTS order_dw.dws_shops (
    shop_id BIGINT,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额',
    uv BIGINT COMMENT '当日不同购买用户总人数',
    pv BIGINT COMMENT '当日购买用户总人次',
    PRIMARY KEY (shop_id, ds) NOT ENFORCED
    ) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    'fields.uv.aggregate-function' = 'sum', -- 对 uv 的数据求和产生聚合结果
    'fields.pv.aggregate-function' = 'sum' -- 对 pv 的数据求和产生聚合结果
    -- 由于dws_shops表不再被下游流式消费,因此无需指定增量数据产生机制
    );

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';

    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';

    INSERT INTO order_dw.dws_users
    SELECT
    user_id,
    ds,
    payed_buy_fee_sum
    FROM order_dw.dwm_users_shops;

    -- 以商户为主键,部分热门商户的数据量可能远高于其他商户。
    -- 因此使用local merge在写入Paimon之前先在内存中进行预聚合,缓解数据倾斜问题。
    INSERT INTO order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
    SELECT
    shop_id,
    ds,
    payed_buy_fee_sum,
    1, -- 一条输入记录代表一名用户在该商户的所有消费
    pv
    FROM order_dw.dwm_users_shops;
    Mzglef
    5oGgHE
    RS5JtQ

ADS 物化视图StarRocks使用

ADS 这边使用StarRocks进行查询paimon 数据,并且构建物化视图

  • 添加paimon catalog
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE EXTERNAL CATALOG `paimon_lab`
    PROPERTIES (
    "paimon.catalog.type" = "filesystem",
    "aws.s3.access_key" = "XXX",
    "aws.s3.secret_key" = "XXX",
    "aws.s3.endpoint" = "http://172.16.32.16:2002",
    "type" = "paimon",
    "paimon.catalog.warehouse" = "s3://lakehouse/paimon"
    )
  • 查询
    1
    select * from paimon_lab.order_dw.dws_users
    d5VhQg
  • 物化视图
    1
    2
    3
    CREATE MATERIALIZED VIEW ads_users
    AS
    select * from paimon_lab.order_dw.dws_users

结束

使用4个job完成本例验证
hHiqQW

相关

优刻得:使用USDP实践近实时数据湖仓

背景

云原生flink流计算平台解决方案验证

该架设方案全部基于云原生k8s,通俗讲就是 flink任务跑在k8s上

环境要求

k8s部署的话可以看看 k8s-1.25.4部署笔记(containerd)

  • 前提条件
    • Kubernetes 版本 >= 1.9
      1
      2
      3
      4
      ➜  ~ kubectl version --short
      Client ➜ ~ Version: v1.24.4
      Kustomize Version: v4.5.4
      Server Version: v1.24.4
    • 确保您的 ~/.kube/config 文件已正确配置以访问 Kubernetes 集群
      1
      2
      3
      4
      5
      6
      7
      ➜  ~ export KUBECONFIG=~/.kube/config
      ➜ ~ kubectl get nodes
      NAME STATUS ROLES AGE VERSION
      k8s-master2 Ready control-plane 9d v1.24.4
      k8s-node1 Ready <none> 9d v1.24.4
      k8s-node2 Ready <none> 9d v1.24.4
      k8s-node3 Ready <none> 25h v1.24.4
    • 是否启用 Kubernetes DNS正常
      1
      2
      3
      ➜  ~ kubectl cluster-info 
      Kubernetes control plane is running at https://192.168.103.201:6443
      CoreDNS is running at https://192.168.103.201:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
    • 账户具有 RBAC 权限,确保您的命名空间中的 <nameSpace> 服务账户具有创建和删除 Pod 的必要 RBAC 权限。我创建新的命名空间为flink-native
      1
      2
      3
      kubectl create namespace flink-native     
      kubectl create serviceaccount flink-sa -n flink-native
      kubectl create clusterrolebinding flinknative-role-binding-flinknative -n flink-native --clusterrole=edit --serviceaccount=flink-native:flink-sa
  • 在k8s中启动flink集群

    flink1.20

    1
    2
    3
    4
    5
    6
    7
    8
    ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=flink-cluster1 \
    -Dtaskmanager.memory.process.size=4096m \
    -Dkubernetes.taskmanager.cpu=2 \
    -Dtaskmanager.numberOfTaskSlots=4 \
    -Dkubernetes.namespace=flink-native \
    -Dkubernetes.service-account=flink-sa \
    -Dresourcemanager.taskmanager-timeout=3600000
  • 关闭集群
    1
    kubectl delete deployment/flink-cluster1

Dinky 流计算平台部署(helm)

  • 创建pvc

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
    name: dinky-config-volume
    namespace: data-center
    spec:
    storageClassName: nfs-client
    accessModes:
    - ReadWriteMany
    resources:
    requests:
    storage: 5Gi
    ---
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
    name: dinky-lib-volume
    namespace: data-center
    spec:
    storageClassName: nfs-client
    accessModes:
    - ReadWriteMany
    resources:
    requests:
    storage: 5Gi
    ---
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
    name: dinky-resource-volume
    namespace: data-center
    spec:
    storageClassName: nfs-client
    accessModes:
    - ReadWriteMany
    resources:
    requests:
    storage: 5Gi
    • dinky-config-volume 用于放置配置文件(helm 包内的conf目录文件)
      XQW5uU
    • dinky-lib-volume 用于放置自定义jar包,映射的/opt/dinky/customJar/
      b2alEN
  • 调整helm包

    • 部署文件

      helm包经久未维护,我改了下

    • dinky.yaml 增加volumes:
      1
      2
      3
      4
      volumes:
      - name: dinky-lib-volume
      persistentVolumeClaim:
      claimName: dinky-lib-volume
    • dinky.yaml 增加volumeMounts:
      1
      2
      3
      volumeMounts:
      - mountPath: /opt/dinky/customJar
      name: dinky-lib-volume
    • dinky.yaml 修正auto.sh目录位置错误,原来是/opt/dinky/auto.sh
      1
      2
      3
      4
      5
      command:
      - /bin/bash
      - '-c'
      - >-
      /opt/dinky/bin/auto.sh startOnPending {{ .Values.spec.extraEnv.flinkVersion}}
    • values.yaml 配置mysql
      1
      2
      3
      4
      5
      6
      7
      mysql:
      enabled: true
      url: "192.168.103.113:3306"
      auth:
      username: "root"
      password: "XXX"
      database: "dinky"
  • 部署

    1
    2
    helm install dinky . -f values.yaml -n data-center
    helm uninstall dinky -n data-center
  • 在dinky内增加刚刚创建的Flink Native Kubernetes集群
    LoPzn6

流计算实践

实践1: mysql cdc connector 写入 paimon

dinky基于flink sql的作业类型
打开dinky页面,新建Flink Sql任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
EXECUTE CDCSOURCE demo WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\..*',
'sink.connector' = 'sql-catalog',
'sink.catalog.name' = 'fts',
'sink.catalog.type' = 'table-store',
'sink.catalog.warehouse'='file:/tmp/table_store',
'sink.auto-create' = 'true', -- 可自动paimon建表
);

实践2: paimon cdc 写入 paimon

dinky基于flink jar的作业类型 (paimon-flink-action-1.0.0.jar
打开dinky页面,新建Flink jar任务

  • 原始提交命令:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ./bin/flink run \
    ./lib/paimon-flink-action-0.9.0.jar \
    mysql_sync_database \
    --warehouse s3://lakehouse-1253767413/paimon \
    --database app_db \
    --mysql_conf hostname=192.168.103.113 \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=app_db \
    --catalog_conf s3.endpoint=cos.ap-guangzhou.myqcloud.com \
    --catalog_conf s3.access-key=XXX \
    --catalog_conf s3.secret-key=XXX \
    --table_conf bucket=1
  • dinky作业

    通过资源中心上传paimon-flink-action-0.9.0.jar包,然后按照上面原始命令分别填写程序路径程序运行类程序运行参数
    plpJLv

dinky基于flink sql的作业类型
打开dinky页面,新建Flink Sql任务

待验证 刚发布的flink cdc 3.3是否现在可以写入paimon,以前验证flink cdc pipline是无法成功写paimon paimon cdc入湖 & StarRocks湖仓分析实践
示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
SET 'execution.checkpointing.interval' = '10s';
EXECUTE PIPELINE WITHYAML (
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404

sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse

pipeline:
name: MySQL to Paimon Pipeline
parallelism: 1
)

其他相关参考

Native Kubernetes
Flink 1.10 Native Kubernetes 原理与实践
Flink on Kubernetes - Native Kubernetes - 配置基础环境
Flink CDC+Dinky同步到paimon(HDFS)
FlinkCDC pipline+Dinky整库同步StarRocks

背景

以前曾个人玩法自建部署headscale进行组网,主要用于个人电脑(在外或在公司)能与家里群晖nas进行组网,解决随时从nas获取数据的需求。具体可看看过往的记录

今天我们的需求是企业场景,企业场景与个人场景略显不同,它主要是对混合云的组网,要求会更高。
这里的要求如下:

  • 希望是完全私有化的方案,不允许有安全问题
  • 希望能将腾讯云和自建IDC组网成混合云
  • 希望组网后能互相访问内网不同的网段。

我们来验证一下ztnet能否做到以上这3点,为此我们先规划一下资源

  • 腾讯云广州
    • 腾讯云广州六区-qs服务器 172.16.32.16 私有planet自建
    • 腾讯云广州三区-scm服务器 172.16.0.4 leaf客户端
    • 腾讯云广州三区-mysql服务 172.16.0.9:3306
  • IDC机房
    • 机房-data-center服务器 leaf客户端
    • 机房-sqlserver数据库

验证

  • ztnet部署

    ztnet是zerotier私有根服务器的解决方案,解决根服务在别人手里的问题及根节点访问慢的问题
    我们部署在腾讯云广州六区-qs服务器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    services:
    postgres:
    image: postgres:15.2-alpine
    container_name: postgres
    restart: unless-stopped
    environment:
    POSTGRES_USER: postgres
    POSTGRES_PASSWORD: postgres
    POSTGRES_DB: ztnet
    volumes:
    - postgres-data:/var/lib/postgresql/data
    networks:
    - app-network

    zerotier:
    image: zyclonite/zerotier:1.14.2
    hostname: zerotier
    container_name: zerotier
    restart: unless-stopped
    volumes:
    - zerotier:/var/lib/zerotier-one
    cap_add:
    - NET_ADMIN
    - SYS_ADMIN
    devices:
    - /dev/net/tun:/dev/net/tun
    networks:
    - app-network
    ports:
    - "9993:9993/udp"
    environment:
    - ZT_OVERRIDE_LOCAL_CONF=true
    - ZT_ALLOW_MANAGEMENT_FROM=172.31.255.0/29

    ztnet:
    image: sinamics/ztnet:latest
    container_name: ztnet
    working_dir: /app
    volumes:
    - zerotier:/var/lib/zerotier-one
    restart: unless-stopped
    ports:
    - 3000:3000
    # - 127.0.0.1:3000:3000 <--- Use / Uncomment this line to restrict access to localhost only
    environment:
    POSTGRES_HOST: postgres
    POSTGRES_PORT: 5432
    POSTGRES_USER: postgres
    POSTGRES_PASSWORD: postgres
    POSTGRES_DB: ztnet
    NEXTAUTH_URL: "http://localhost:3000" # !! Important !! Set the NEXTAUTH_URL environment variable to the canonical URL or IP of your site with port 3000
    NEXTAUTH_SECRET: "random_secret"
    NEXTAUTH_URL_INTERNAL: "http://ztnet:3000" # Internal NextAuth URL for 'ztnet' container on port 3000. Do not change unless modifying container name.
    networks:
    - app-network
    links:
    - postgres
    depends_on:
    - postgres
    - zerotier

    ############################################################################
    # #
    # Uncomment the section below to enable HTTPS reverse proxy with Caddy. #
    # #
    # Steps: #
    # 1. Replace <YOUR-PUBLIC-HOST-NAME> with your actual public domain name. #
    # 2. Uncomment the caddy_data volume definition in the volumes section. #
    # #
    ############################################################################

    # https-proxy:
    # image: caddy:latest
    # container_name: ztnet-https-proxy
    # restart: unless-stopped
    # depends_on:
    # - ztnet
    # command: caddy reverse-proxy --from <YOUR-PUBLIC-HOST-NAME> --to ztnet:3000
    # volumes:
    # - caddy_data:/data
    # networks:
    # - app-network
    # links:
    # - ztnet
    # ports:
    # - "80:80"
    # - "443:443"

    volumes:
    zerotier:
    postgres-data:
    # caddy_data:

    networks:
    app-network:
    driver: bridge
    ipam:
    driver: default
    config:
    - subnet: 172.31.255.0/29

背景

注意:此处的增量指的T+1天的增量刷新,并非paimon数据湖中有增量数据,物化视图只刷新增量数据。

StarRocks并未支持到能增量物化视图刷新,仅支持按照该分区定时刷新该分区数据,这是极为遗憾的,因为按照该分区每一次都全量刷新,是及其损耗性能的。

举个例子:订单表今天有1W条数据,按照天刷新,也就是每一次都要刷新1W条数据。

但是该种刷新方式,对于T+1天的场景还是基本能满足要求的,毕竟BI场景中,T+1天的场景非常常见。

  • paimon 1.0
  • StarRocks 3.3.8
  • flink 1.20
    • paimon-s3-1.0.0.jar 放入lib目录
    • paimon-flink-action-1.0.0.jar 放入lib目录
    • paimon-flink-1.20-1.0.0.jar 放入lib目录

目标:

  • 将mysql表同步到paimon

    使用paimon cdc同步mysql源数据orders表到数据湖作为我们数仓的贴源层ods_orders,并且设置paimon表分区按天。

  • 通过paimon catalog,创建按天分区增量刷新的物化视图

    在Starrocks创建基于paimon分区表的物化视图dwd_orders_mv,验证StarRocks是否能够按照Paimon的分区进行按天增量刷新。

实践

  • mysql表准备

    mysql准备一张orders表,字段有一个创建时间created,后续同步到paimon将使用该字段进行按天分区

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    -- 创建orders表
    CREATE TABLE `orders` (
    `id` int NOT NULL AUTO_INCREMENT,
    `created` datetime DEFAULT NULL,
    `type` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
    `k1` int DEFAULT NULL,
    `v2` int DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
    -- 造数据
    INSERT INTO `orders` VALUES(1,'2024-11-01 11:17:41',"1",1,1),(2,'2024-11-02 22:17:41',"1",1,1),(3,'2024-11-02 22:17:41',"1",1,1);
    -- 查询数据
    SELECT * FROM orders;
    id created type k1 v2
    1 2024-11-01 11:17:41 1 1 1
    2 2024-11-02 22:17:41 1 1 1
    3 2024-11-02 22:17:41 1 1 1
  • paimon cdc 设置分区键进行同步 (ods_orders)

    partition_keys 设置按天分区

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    ./bin/flink run -d \
    ./lib/paimon-flink-action-1.0.0.jar \
    mysql_sync_table \
    --warehouse s3://lakehouse/paimon-dev \
    --database test \
    --table ods_orders \
    --primary_keys id,pt \
    --partition_keys pt \
    --computed_column 'pt=date_format(created, yyyyMMdd)' \
    --mysql_conf hostname=192.168.103.113 \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=test \
    --mysql_conf table-name='orders' \
    --catalog_conf s3.endpoint=http://192.168.103.113:9010 \
    --catalog_conf s3.path.style.access=true \
    --catalog_conf s3.access-key=yTaSTipLTvJY86qkJGEO \
    --catalog_conf s3.secret-key=SisnaqWR0uMxa52PjeZGq5HilQA1a3PRXDz4R8v4 \
    --table_conf bucket=1 \
    --table_conf changelog-producer=input
  • StarRocks创建按天分区的物化视图 (dwd_orders_mv)

    • 创建物化视图
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      DROP MATERIALIZED VIEW `test`.`dwd_orders_mv`;
      CREATE MATERIALIZED VIEW `test`.`dwd_orders_mv`
      COMMENT "dwd-order"
      REFRESH ASYNC START('2024-01-01 10:00:00') EVERY (interval 3 MINUTE)
      PARTITION BY pt
      ORDER BY( created )
      PROPERTIES ( "partition_refresh_number" = "30" )
      AS
      SELECT
      *
      FROM paimon.test.ods_orders so
    • 查询物化视图
      1
      2
      3
      4
      5
      6
      SELECT * FROM `test`.`dwd_orders_mv`

      created id type k1 v2 pt
      2024-11-01 11:17:41 1 1 1 1 20241101
      2024-11-02 22:17:41 3 1 1 1 20241102
      2024-11-02 22:17:41 2 1 1 1 20241102
  • mysql更新数据

    1
    insert into orders values(4,'2024-11-03 11:17:41',"1",1,1)
  • 查询分区刷新情况

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    SELECT
    *
    FROM
    information_schema.task_runs
    WHERE
    task_name IN (
    SELECT
    TASK_NAME
    FROM
    information_schema.materialized_views
    WHERE
    TABLE_NAME IN ( "dwd_orders_mv" ))
    ORDER BY
    TASK_NAME ASC,
    CREATE_TIME DESC
    LIMIT 10

    NIrRU2

结论

MFk96Z
mysql新增2024-11-03这一天的数据后,paimon cdc同步到paimon表,存入20241103分区,starrocks正常按天分区刷新2024-11-03号的数据。

参考

mysql到paimon并基于starrocks做异步分区物化视图全流程

背景

本次验证的场景是订单表基于日期按天分区的场景,主要验证物化视图是否能正常的按天增量刷新,以及验证是否能忽略基表刷新(excluded_refresh_tables)

涉及表:

  • 字典表ods_dict、字典物化视图dwd_dict_mv
  • 订单表ods_order
  • 订单表关联字典表 物化视图 dwd_order_mv1(关联table字典)、dwd_order_mv2(关联mv字典)

版本3.3.6-3.3.8,存算分离

本次验证结论非常惊恐,无法控制基表的刷新导致的增量刷新。

测试数据

  • ods_order

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    CREATE TABLE IF NOT EXISTS test.ods_order
    (
    id INT,
    created DATETIME,
    type STRING,
    k1 INT,
    v2 INT
    )PRIMARY KEY (`id`,created)
    PARTITION BY date_trunc('day', `created`);

    insert into ods_order values(1,'2024-11-01 11:17:41',"1",1,1),(2,'2024-11-02 22:17:41',"1",1,1),(3,'2024-11-02 22:17:41',"1",1,1)
    1
    2
    3
    4
    id	created	type	k1	v2
    1 2024-11-01 11:17:41 1 1 1
    2 2024-11-02 22:17:41 1 1 1
    3 2024-11-02 22:17:41 1 1 1
  • ods_dict

    1
    2
    3
    4
    5
    6
    7
    8
    CREATE TABLE IF NOT EXISTS test.ods_dict
    (
    id INT,
    dict_label STRING,
    dict_value STRING
    )PRIMARY KEY (`id`)

    insert into ods_dict values(1,'电商订单',"1"),(2,'普通订单',"2")
    1
    2
    3
    id	dict_label	dict_value
    2 普通订单 2
    1 电商订单 1
  • dwd_dict_mv

    建这张表的目的是验证关联table和关联mv对分区刷新的差异,注意这里将刷新时间设置成了10天

    1
    2
    3
    4
    5
    CREATE MATERIALIZED VIEW dwd_dict_mv
    DISTRIBUTED BY HASH(id)
    REFRESH ASYNC START('2024-10-01 10:00:00') EVERY (interval 10 DAY)
    AS SELECT *
    FROM ods_dict

创建物化视图

定时3分钟刷新

  • dwd_order_mv1

    关联的ods_dict

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    CREATE MATERIALIZED VIEW dwd_order_mv1
    COMMENT "dwd-订单"
    REFRESH ASYNC START('2024-10-01 10:00:00') EVERY(INTERVAL 3 MINUTE)
    PARTITION BY date_trunc('day', created)
    PROPERTIES(
    "partition_refresh_number" = "30"
    )
    AS
    SELECT
    o.id,
    o.created,
    o.type,
    dict.dict_label as type_label,
    o.k1,
    o.v2
    FROM
    ods_order o
    LEFT JOIN ods_dict dict ON o.type = dict.dict_value
  • dwd_order_mv2

    关联的dwd_dict_mv

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    CREATE MATERIALIZED VIEW dwd_order_mv2
    COMMENT "dwd-订单"
    REFRESH ASYNC START('2024-10-01 10:00:00') EVERY(INTERVAL 3 MINUTE)
    PARTITION BY date_trunc('day', created)
    PROPERTIES(
    "partition_refresh_number" = "30"
    )
    AS
    SELECT
    o.id,
    o.created,
    o.type,
    dict.dict_label as type_label,
    o.k1,
    o.v2
    FROM
    ods_order o
    LEFT JOIN dwd_dict_mv dict ON o.type = dict.dict_value

验证

我的需求是ods_order订单表关联字典清洗一张dwd_order_mv的物化视图,这里分别验证关联 table或mv的差异

  • ods_order更新

    增加1条11月3号的数据

    1
    insert into ods_order values(4,'2024-11-03 11:17:41',"1",1,1)

    T4Ag4M

    ✅ 结论: dwd_order_mv1、dwd_order_mv2各自增量刷新了11月3号的分区数据,并且在后续没有新增数据的情况下,没有再刷新任何分区数据

  • ods_dict更新

    1
    UPDATE `ods_dict` SET `dict_label` = '电商订单_new' WHERE id = 1;

    这里我们一定要记得dwd_dict_mv表是设置了10天刷新,所以这时候dwd_dict_mv的数据是还没有更新的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    --- ods_dict
    id dict_label dict_value
    1 电商订单_new 1
    2 普通订单 2

    --- dwd_dict_mv
    id dict_label dict_value
    1 电商订单 1
    2 普通订单 2

    那么dwd_order_mv1和dwd_order_mv2的刷新会有什么表现呢?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    -- dwd_order_mv1
    {
    "forceRefresh": false,
    "mvPartitionsToRefresh": ["p20241102_20241103", "p20241101_20241102", "p20241103_20241104"],
    "refBasePartitionsToRefreshMap": {
    "ods_order": ["p20241101", "p20241102", "p20241103"]
    },
    "basePartitionsToRefreshMap": {
    "ods_dict": ["ods_dict"],
    "ods_order": ["p20241101", "p20241102", "p20241103"]
    },
    "processStartTime": 1733076180617,
    "executeOption": {
    "priority": 0,
    "taskRunProperties": {
    "mvId": "1337190"
    },
    "isMergeRedundant": true,
    "isManual": false,
    "isSync": false,
    "isReplay": false
    },
    "planBuilderMessage": {
    "ods_order": "p20241101,p20241102,p20241103"
    }
    }

    -- dwd_order_mv2
    {
    "forceRefresh": false,
    "mvPartitionsToRefresh": ["p20241102_20241103", "p20241101_20241102", "p20241103_20241104"],
    "refBasePartitionsToRefreshMap": {
    "ods_order": ["p20241101", "p20241102", "p20241103"]
    },
    "basePartitionsToRefreshMap": {
    "dwd_dict_mv": ["dwd_dict_mv"],
    "ods_order": ["p20241101", "p20241102", "p20241103"]
    },
    "processStartTime": 1733076181616,
    "executeOption": {
    "priority": 0,
    "taskRunProperties": {
    "mvId": "1337239"
    },
    "isMergeRedundant": true,
    "isManual": false,
    "isSync": false,
    "isReplay": false
    },
    "planBuilderMessage": {
    "ods_order": "p20241101,p20241102,p20241103"
    }
    }

E2VUMK

⚠️结论1:我们惊恐的发现,即使dwd_dict_mv数据还没有因为设置的10天刷新过来,但是,物化视图dwd_order_mv2(ods_order 关联 dwd_dict_mv)却进行了刷新!!

WxEW5E

⚠️结论2:接着就是更为糟糕的在每一次dwd_order_mv2物化视图3分钟刷新时间,每一次都进行了全分区刷新!!!

在出现这种异常情况后,我决定试一试excluded_trigger_tablesexcluded_refresh_tables

  • 重新创建物化视图

    记得先恢复一下数据 UPDATE ods_dictSETdict_label = '电商订单' WHERE id = 1;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    ---dwd_order_mv1配置忽略ods_dict更新导致物化视图更新
    CREATE MATERIALIZED VIEW dwd_order_mv1
    COMMENT "dwd-订单"
    REFRESH ASYNC START('2024-10-01 10:00:00') EVERY(INTERVAL 3 MINUTE)
    PARTITION BY date_trunc('day', created)
    PROPERTIES(
    "partition_refresh_number" = "30",
    "excluded_trigger_tables" = "test.ods_dict",
    "excluded_refresh_tables" = "test.ods_dict"
    )
    AS
    SELECT
    o.id,
    o.created,
    o.type,
    dict.dict_label as type_label,
    o.k1,
    o.v2
    FROM
    ods_order o
    LEFT JOIN ods_dict dict ON o.type = dict.dict_value

    --- dwd_order_mv2配置忽略dwd_dict_mv更新导致物化视图更新
    CREATE MATERIALIZED VIEW dwd_order_mv2
    COMMENT "dwd-订单"
    REFRESH ASYNC START('2024-10-01 10:00:00') EVERY(INTERVAL 3 MINUTE)
    PARTITION BY date_trunc('day', created)
    PROPERTIES(
    "partition_refresh_number" = "30",
    "excluded_trigger_tables" = "test.dwd_dict_mv",
    "excluded_refresh_tables" = "test.dwd_dict_mv"
    )
    AS
    SELECT
    o.id,
    o.created,
    o.type,
    dict.dict_label as type_label,
    o.k1,
    o.v2
    FROM
    ods_order o
    LEFT JOIN dwd_dict_mv dict ON o.type = dict.dict_value

    MCc8ha

    ⚠️结论3:你猜出现了什么事情,在没有任何基表刷新的时候,dwd_order_mv2居然在第一次全量全分区刷新后的定时刷新中全分区刷新,而且是每一次3分钟刷新物化视图的时候再全量全分区刷新

  • 更新 ods_dict

    1
    UPDATE `ods_dict` SET `dict_label` = '电商订单_new' WHERE id = 1;

    dChmrL

    ✅结论4:dwd_order_mv1对excluded_trigger_tables和excluded_refresh_tables的配置生效, 配置的基表刷新了。没有让dwd_order_mv1进行刷新
    ⚠️结论5:dwd_order_mv2对excluded_trigger_tables和excluded_refresh_tables的配置不生效,永远在全量全分区刷新

结论

如果物化视图关联的基表是一个table,那么和分区刷新相关的都正常,
如果物化视图关联的基表是一个mv,即使这个基表没有任何更新,在第一次刷新后的任意一次定时刷新,都是全量刷新全分区,

这究竟是本身不支持,还是一个究极大BUG?

其他

查看dwd_order_mv2和dwd_order_mv1的最近10次刷新历史

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
(SELECT
TASK_NAME,
CREATE_TIME,
DEFINITION,
EXTRA_MESSAGE
FROM
information_schema.task_runs
WHERE
task_name IN (
SELECT
TASK_NAME
FROM
information_schema.materialized_views
WHERE
TABLE_NAME IN ( "dwd_order_mv2" ))
ORDER BY
TASK_NAME ASC,
CREATE_TIME DESC
LIMIT 10 )
UNION ALL
(SELECT
TASK_NAME,
CREATE_TIME,
DEFINITION,
EXTRA_MESSAGE
FROM
information_schema.task_runs
WHERE
task_name IN (
SELECT
TASK_NAME
FROM
information_schema.materialized_views
WHERE
TABLE_NAME IN ( "dwd_order_mv1" ))
ORDER BY
TASK_NAME ASC,
CREATE_TIME DESC
LIMIT 10)
ORDER BY
TASK_NAME ASC,
CREATE_TIME DESC

背景

为了实现数据源快速入湖,实现湖仓快速分析,这边评估一下paimon的真实可用情况,验证一下是否能上生产。

首先我们明确一下目标

  • 提交cdc方案验证
    • flink cdc pipline ❌验证失败
    • paimon-flink-action ✅通过验证
  • 源数据入湖验证

    分别验证一下业务中2个常用的源数据库入湖

    • mysql ✅通过验证
    • mongo ⌛️待验证

      📢问题:mongo cdc导入paimon,所有字段类型都是字符串
      boolean无法正确转换

  • 文件系统验证

    先用本地文件系统进行验证,然后验证cos存储

    • file://储存 ✅通过验证
    • s3://储存 ✅通过验证
  • StarRocks湖仓分析验证

    从StarRocks添加paimon catalog

    • 验证含有json的字段类型入湖后是什么数据类型,及StarRocks是否支持查询含有json的信息 ✅通过验证
    • 验证通过paimon catalog,创建物化视图是否能正常工作 ✅通过验证

环境及准备工作

  • 环境
    • flink 1.20
    • paimon 0.9
    • flink cdc 3.2.1
  • 准备工作
    • FLINK_HOME/lib 中添加
      • paimon-flink-1.20-0.9.0.jar
      • paimon-flink-1.20-0.9.0.jar
      • paimon-flink-action-0.9.0.jar
      • paimon-s3-0.9.0.jar
      • flink-sql-connector-mongodb-cdc-3.2.1.jar
      • flink-sql-connector-mysql-cdc-3.2.1.jar
    • FLINK_CDC_HOME/lib 中添加
      • flink-cdc-pipeline-connector-mysql-3.2.1.jar
      • flink-cdc-pipeline-connector-paimon-3.2.1.jar

方案

我将采取2种方案去验证:

  • 方案一:flink cdc 3.X 提供的pipline方式提交

    之前有用过flink cdc pipline 从mysql -> starrocks,生产验证ok,
    但一直没有验证过mysql -> paimon

  • 方案二:paimon官方提供的paimon-flink-action提交

    该方案是因为flink cdc pipline的mysql->paimon无法正常工作,并且paimon-flink-action为paimon官方推荐的提交作业方式

开始验证吧

⚠️ 这个方案目前没有走通,无论是从flink1.20换到1.18,均提交报错 ❌
mysql-to-paimon.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
################################################################################
# Description: Sync MySQL all tables to Paimon
################################################################################
source:
type: mysql
hostname: 192.168.103.113
port: 3306
username: root
password: 'XXX'
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai

sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: file:///tmp/warehouse1


pipeline:
name: Sync MySQL Database to Paimon
parallelism: 1

flink cdc 目录下进行提交

1
bash bin/flink-cdc.sh mysql-to-paimon.yaml --flink-home ../flink-1.20.0
  • file

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ./bin/flink run \
    ./lib/paimon-flink-action-0.9.0.jar \
    mysql_sync_database \
    --warehouse /tmp/warehouse1 \
    --database app_db \
    --mysql_conf hostname=192.168.103.113 \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=app_db \
    --table_conf bucket=1
  • s3

    这个s3的配置有点技巧,mysql-cdc内没有说明,我试了下可以catalog_conf配置,经过咨询官方,原来是在flink的配置文件内可以设置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ./bin/flink run \
    ./lib/paimon-flink-action-0.9.0.jar \
    mysql_sync_database \
    --warehouse s3://lakehouse-1253767413/paimon \
    --database app_db \
    --mysql_conf hostname=192.168.103.113 \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=app_db \
    --catalog_conf s3.endpoint=cos.ap-guangzhou.myqcloud.com \
    --catalog_conf s3.access-key=XXX \
    --catalog_conf s3.secret-key=XXX \
    --table_conf bucket=1

starrocks 湖查询

  • 添加 starrocks paimon catalog
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE EXTERNAL CATALOG paimon
    PROPERTIES
    (
    "type" = "paimon",
    "paimon.catalog.type" = "filesystem",
    "paimon.catalog.warehouse" = "s3://lakehouse-1253767413/paimon",
    "aws.s3.endpoint" = "cos.ap-guangzhou.myqcloud.com",
    "aws.s3.access_key" = "XXX",
    "aws.s3.secret_key" = "XXX"
    );
  • 验证
    • 查询paimon数据验证 ✅
      1
      2
      3
      4
      select * from paimon.app_db.orders
      id price log
      1 4.00 ["1"]
      2 100.00 ["2"]
    • json验证 ✅
      这里我们发现log字段本来在 mysql 内的是json类型,到了paimon则为string字符串类型
    • 增删改同步 ✅
      mysql 增删改,可以同步到paimon,从而starrocks进行查询
    • 创建异步物化视图验证 ✅
      1
      2
      3
      4
      5
      6
      CREATE MATERIALIZED VIEW paimon_mv_test
      COMMENT "paimon-mv-test"
      REFRESH ASYNC START ( '2024-10-01 10:00:00' ) EVERY ( INTERVAL 10 MINUTE )
      AS

      select * from paimon.app_db.orders

背景

我们前期根据kube-prometheus-stack部署实践进行了监控的部署,并且很好的对k8s集群的各项指标进行了grafana可视化监控。
但是我们还有一个监控需求来源于数仓,日常管理数仓中,我会出现如下几个需求点:

  • 缓存数据到磁盘,
    这个需求源于我们使用的TKE使用的腾讯云的CFS作为存储,而CFS是按量收费的,那么StarRocks缓存到磁盘到底占用的多少磁盘空间,以及是否需要清理,就迫在眉睫
  • 数仓与对象储存流量情况
    我们需要日常关注StarRocks与对象存储的流量带宽情况
  • 物化视图的成功与否及监控告警
    StarRocks中创建了非常多的物化视图,而这些物化视图的成功失败及时间节点,需要更好的监控到位

基于以上需求,我们来尝试解决这些问题

StarRocks配置prometheus metrics scrape

根据 StarRocks Cluster Integration With Prometheus and Grafana Service 指南,我们先给StarRocks配置好 prometheus metrics scrape

我是根据operator安装的而非helm,所以根据文档我的配置如下:

重点关注spec.starRocksBeSpec.service.annotationsspec.starRocksFeSpec.service.annotations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
piVersion: starrocks.com/v1
kind: StarRocksCluster
metadata:
name: kube-starrocks
namespace: default
spec:
starRocksBeSpec:
configMapInfo:
configMapName: kube-starrocks-be-cm
resolveKey: be.conf
image: starrocks/be-ubuntu:3.3-latest
limits:
cpu: 4
memory: 4Gi
replicas: 1
requests:
cpu: 1
memory: 2Gi
service:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "8040"
prometheus.io/scrape: "true"
starRocksFeSpec:
configMapInfo:
configMapName: kube-starrocks-fe-cm
resolveKey: fe.conf
image: starrocks/fe-ubuntu:3.3-latest
limits:
cpu: 4
memory: 4Gi
replicas: 1
requests:
cpu: 1
memory: 2Gi
service:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "8030"
prometheus.io/scrape: "true"

根据 Service 注解动态采集 参考

prometheus-additional.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
- job_name: 'StarRocks_Cluster'
kubernetes_sd_configs:
- role: endpoints
relabel_configs:
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scheme]
action: replace
target_label: __scheme__
regex: (https?)
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
action: replace
target_label: __address__
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
- action: labelmap
regex: __meta_kubernetes_service_label_(.+)
- source_labels: [__meta_kubernetes_namespace]
action: keep
regex: starrocks # 过滤starrocks命名空间
- source_labels: [__meta_kubernetes_namespace]
action: replace
target_label: kubernetes_namespace
- source_labels: [__meta_kubernetes_service_name]
action: replace
target_label: kubernetes_name
- source_labels: [__meta_kubernetes_pod_name]
action: replace
target_label: kubernetes_pod_name
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
scrape_configs:
- job_name: starrocks-fe-monitor
honor_labels: true
scrape_interval: 15s
metrics_path: /metrics
scheme: http
kubernetes_sd_configs:
- role: endpoints
namespaces:
names:
- starrocks
relabel_configs:
- source_labels:
- __meta_kubernetes_endpoint_port_name
regex: http
action: keep
- source_labels:
- __meta_kubernetes_service_name
regex: starrockscluster-fe-service
action: keep
- source_labels:
- __meta_kubernetes_pod_node_name
target_label: node
- source_labels:
- __meta_kubernetes_namespace
target_label: namespace
- source_labels:
- __meta_kubernetes_service_name
target_label: service
- source_labels:
- __meta_kubernetes_pod_name
target_label: pod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
scrape_configs:
- job_name: starrocks-be-monitor
honor_labels: true
scrape_interval: 15s
metrics_path: /metrics
scheme: http
kubernetes_sd_configs:
- role: endpoints
namespaces:
names:
- starrocks
relabel_configs:
- source_labels:
- __meta_kubernetes_endpoint_port_name
regex: webserver
action: keep
- source_labels:
- __meta_kubernetes_service_name
regex: starrockscluster-cn-service
action: keep
- source_labels:
- __meta_kubernetes_pod_node_name
target_label: node
- source_labels:
- __meta_kubernetes_namespace
target_label: namespace
- source_labels:
- __meta_kubernetes_service_name
target_label: service
- source_labels:
- __meta_kubernetes_pod_name
target_label: pod

kube-prometheus-stack 采集配置方法

如果你使用 kube-prometheus-stack 来安装 Prometheus,需要在 additionalScrapeConfigs或者additionalScrapeConfigsSecret里加上采集配置,示例:

  • 在additionalScrapeConfigsSecret配置
    1
    kubectl create secret generic additional-configs --from-file=prometheus-additional.yaml -n monitoring
    1
    2
    3
    4
    5
    additionalScrapeConfigsSecret:
    enabled: true
    name: additional-configs
    key: prometheus-additional.yaml

  • 在additionalScrapeConfigs配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    prometheus:
    prometheusSpec:
    additionalScrapeConfigs:
    - job_name: 'StarRocks_Cluster'
    kubernetes_sd_configs:
    - role: endpoints
    relabel_configs:
    - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scrape]
    action: keep
    regex: true
    - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scheme]
    action: replace
    target_label: __scheme__
    regex: (https?)
    - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path]
    action: replace
    target_label: __metrics_path__
    regex: (.+)
    - source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
    action: replace
    target_label: __address__
    regex: ([^:]+)(?::\d+)?;(\d+)
    replacement: $1:$2
    - action: labelmap
    regex: __meta_kubernetes_service_label_(.+)
    - source_labels: [__meta_kubernetes_namespace]
    action: keep
    regex: starrocks # 过滤starrocks命名空间
    - source_labels: [__meta_kubernetes_namespace]
    action: replace
    target_label: kubernetes_namespace
    - source_labels: [__meta_kubernetes_service_name]
    action: replace
    target_label: kubernetes_name
    - source_labels: [__meta_kubernetes_pod_name]
    action: replace
    target_label: kubernetes_pod_name
    配置好后,我们到Prometheus web界面观察,发现已经正常在采集了。
    CICYEY

Grafana 监控可视化展示

按照文档Import StarRocks Grafana Dashboard,导入Grafana模板,发现毛数据都木有,哈哈哈🤣,至此等待StarRocks官方修复。
fXzdKd

我们来试试其他几个模板
Dashboard 模板

Lxfc0Q

gUQLim

其他参考

Kubernetes 监控:Prometheus Operator

Environment

  • Kubernetes 集群
    需要一个已经部署完成且可用的Kubernetes 1.16+集群。
  • Helm
    helm version v3+

Steps

  • 添加 Prometheus chart repo 到 Helm

    1
    2
    helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
    helm repo update
  • 查看版本信息

    1
    2
    3
    $ helm search repo prometheus-community/kube-prometheus-stack
    NAME CHART VERSION APP VERSION DESCRIPTION
    prometheus-community/kube-prometheus-stack 57.0.2 v0.72.0 kube-prometheus-stack collects Kubernetes manif...
  • 将仓库拉取到本地

    1
    helm pull prometheus-community/kube-prometheus-stack
  • 修改values.yaml

    • 配置简单的本地 NFS 存储卷
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      $ vim ./values.yaml
      ---
      ## Storage is the definition of how storage will be used by the Alertmanager instances.
      storage:
      volumeClaimTemplate:
      spec:
      storageClassName: nfs-client
      accessModes: ["ReadWriteOnce"]
      resources:
      requests:
      storage: 10Gi
      ...
      grafana:
      enabled: true
      namespaceOverride: ""

      defaultDashboardsTimezone: Asia/Shanghai
      adminPassword: prom-operator
      persistence:
      enabled: true
      type: pvc
      storageClassName: nfs-client
      accessModes:
      - ReadWriteOnce
      size: 20Gi
      finalizers:
      - kubernetes.io/pvc-protection
      ...
      # 配置Prometheus持久化NFS存储
      prometheus:
      prometheusSpec:
      podMonitorSelectorNilUsesHelmValues: false
      serviceMonitorSelectorNilUsesHelmValues: false
      ## Prometheus StorageSpec for persistent data
      storageSpec: {}
      ## Using PersistentVolumeClaim
      volumeClaimTemplate:
      spec:
      storageClassName: nfs-client
      accessModes: ["ReadWriteOnce"]
      resources:
      requests:
      storage: 20Gi
      ...
      ## Storage is the definition of how storage will be used by the ThanosRuler instances.
      ## ref: https://github.com/prometheus-operator/prometheus-operator/blob/main/Documentation/user-guides/storage.md
      ##
      storage:
      volumeClaimTemplate:
      spec:
      storageClassName: nfs-client
      accessModes: ["ReadWriteOnce"]
      resources:
      requests:
      storage: 5Gi
    • 配置grafana ingress
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      ingress:
      ## If true, Grafana Ingress will be created
      ##
      enabled: true

      ## IngressClassName for Grafana Ingress.
      ## Should be provided if Ingress is enable.
      ##
      ingressClassName: nginx

      ## Annotations for Grafana Ingress
      ##
      annotations:
      kubernetes.io/ingress.class: nginx
      # kubernetes.io/tls-acme: "true"

      ## Labels to be added to the Ingress
      ##
      labels: {}

      ## Hostnames.
      ## Must be provided if Ingress is enable.
      ##
      # hosts:
      # - grafana.domain.com
      hosts:
      - grafana.dev.XXX.cn
      ## Path for grafana ingress
      path: /
      ## TLS configuration for grafana Ingress
      ## Secret must be manually created in the namespace
      ##
      tls: []
      # - secretName: grafana-general-tls
      # hosts:
      # - grafana.example.com
  • 使用 Helm 更新版本重新部署

    1
    helm upgrade prometheus  --namespace monitoring --create-namespace -f values.yaml .
  • 查看资源组件情况

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    kubectl get pod -n monitoring
    NAME READY STATUS RESTARTS AGE
    alertmanager-prometheus-kube-prometheus-alertmanager-0 2/2 Running 0 18m
    prometheus-grafana-0 3/3 Running 0 10m
    prometheus-kube-prometheus-operator-546f866469-rvssk 1/1 Running 1 (15h ago) 18h
    prometheus-kube-state-metrics-868cc5957b-9lgt5 1/1 Running 1 (15h ago) 18h
    prometheus-prometheus-kube-prometheus-prometheus-0 2/2 Running 0 18m
    prometheus-prometheus-node-exporter-577kd 1/1 Running 1 (15h ago) 18h
    prometheus-prometheus-node-exporter-f5g8r 1/1 Running 1 (15h ago) 18h
    prometheus-prometheus-node-exporter-gkhmw 1/1 Running 0 18h
    prometheus-prometheus-node-exporter-lql7g 1/1 Running 0 18h

Usage

  • 我们可以通过Grafana ingress 地址进行访问,浏览Grafana仪表板
    Mm0Kiu
  • 可以通过prometheus 9090端口的web界面进行访问查看prometheus信息
    jUUwJe

背景

基于debezium(mongo)-kafka-starrocks的cdc实践,我们使用StarRocks Connector for Kafka将debezium格式的数据实时写入到starrocks。这次我想尝试一下Routine Load是否能够将debezium json数据(带有删除标记)写入到starrocks。
我们明确一下目标:

  • 支持将kafka中debezium json 格式的数据导入,支持增删改
  • 支持日期按需转换

那么我们使用debezium将源数据导入到kafka,在kafka中造一下debezium json格式的数据。

源表数据构建

我特意在建表的时候包含了:日期时间datetime、日期date、布尔tinyint、整形int、字符串varchar,以此来验证这些类型的数据经过Routine Load,是如何进行转换的,我们重点关注不同时间格式的转换,因为在进入kafka中,时间格式的数据都被转换成了不同的时间戳。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE `test_date` (
`id` int NOT NULL,
`c_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`c_datetime` datetime DEFAULT NULL,
`c_date` date DEFAULT NULL,
`c_married` tinyint(1) DEFAULT NULL,
`c_age` int DEFAULT NULL,
`c_timestamp` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;


INSERT INTO `test`.`example` (`id`, `c_name`, `c_datetime`, `c_date`, `c_married`, `c_age`, `c_timestamp`) VALUES (1, 'Kinoshita Momoka', '2024-11-01 16:53:19', '2009-08-16', 1, 11, '2024-11-01 16:53:19');
INSERT INTO `test`.`example` (`id`, `c_name`, `c_datetime`, `c_date`, `c_married`, `c_age`, `c_timestamp`) VALUES (2, 'Kimura Hazuki', '2024-11-01 16:53:19', '2018-04-27', 0, 12, '2024-11-01 16:53:19');

i1avIe

使用debezium将源数据写入到kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"name": "source-mysql-example-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "172.16.0.9",
"database.port": "3306",
"database.user": "root",
"database.password": "XXX",
"database.server.id": "1",
"database.server.name": "mysql-example-cdc",
"database.include.list": "test",
"table.include.list": "test.example",
"schema.history.internal.kafka.bootstrap.servers": "172.16.0.51:9092",
"schema.history.internal.kafka.topic": "da.schemahistory.example",
"decimal.handling.mode": "string",
"topic.prefix": "mysql-example",
"include.schema.changes": "false",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}

得到kafka中如下结构的2条数据

1
2
3
4
5
6
7
8
9
10
{
"id": 1,
"c_name": "Kinoshita Momoka",
"c_datetime": 1730479999000,
"c_date": 14472,
"c_married": 1,
"c_age": 11,
"c_timestamp": "2024-11-01T08:53:19Z",
"__deleted": "false"
}
1
2
3
4
5
6
7
8
9
10
{
"id": 2,
"c_name": "Kimura Hazuki",
"c_datetime": 1730479999000,
"c_date": 17648,
"c_married": 0,
"c_age": 12,
"c_timestamp": "2024-11-01T08:53:19Z",
"__deleted": "false"
}

我们观察一下kafka数据:

  • __deleted标记了是否删除,类型是字符串,

  • c_datetimec_datec_timestamp 进入到了kafka后,变成了不同的数据。

    • mysql datetime类型:最终转成了时间戳(1970年01月01日0时0分0秒到指定日期的毫秒数),形如:1730479999000
    • mysql date类型:最终转成了天数(1970年01月01日到指定日期的天数),形如:17648
    • mysql timestamp类型:最终转成了UTC时间,形如:2024-11-01T08:53:19Z
    • mysql time类型:最终转成了微妙数(将小时转成了微妙),形如:34341000000。 (该例未展示)

这里就产生了一个疑问🤔,这些数据如何通过Routine Load导入转换正确的数据进入到StarRocks,并且支持删除?
JBKtQm

使用Routine Load导入StarRocks

  • StarRocks创建目标主键表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    CREATE TABLE ods_example (
    id INT NOT NULL,
    c_name string,
    c_datetime datetime,
    c_date date,
    c_married BOOLEAN,
    c_age int,
    c_timestamp datetime
    )
    PRIMARY KEY (id)
    DISTRIBUTED BY HASH (id);
  • StarRocks创建导入,首先分析一下

    • 支持删除的导入:在starrocks无非就是__op标识,当数据中带有 __op:1的时候,做DELETE操作,当数据中带有__op:0的时候,就是更新操作,那么我们其实就是转换一下kafka中debezium json 格式的删除标识,由 __deleted: "true" 转换成 __op:1即可实现数据支持删除的cdc,具体可以看看文档 通过导入实现数据变更
    • 时间的转换:因为在kafka内的时间都是时间戳,所以使用starrocks的from_unixtime和时区转换convert_tz,根据自己需求转换即可。
      来试一下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      CREATE ROUTINE LOAD test.ods_example_load_32 ON ods_example
      COLUMNS (
      id,
      c_name,
      temp_datetime,
      temp_date,
      c_married,
      c_age,
      temp_timestamp,
      temp_deleted,
      __op= cast( cast( temp_deleted AS BOOLEAN) AS TINYINT),
      c_timestamp = convert_tz(temp_timestamp,'+00:00', '+08:00'),
      c_datetime = convert_tz(from_unixtime( cast( temp_datetime AS BIGINT )/ 1000 ),'+00:00', '-08:00'),
      c_date = from_unixtime( cast( temp_date AS BIGINT )* 24 * 60 * 60, '%Y%m%d' ))
      PROPERTIES
      (
      "desired_concurrent_number" = "5",
      "format" = "json",
      "jsonpaths" = "[\"$.id\",\"$.c_name\",\"$.c_datetime\",\"$.c_date\",\"$.c_married\",\"$.c_age\",\"$.c_timestamp\",\"$.__deleted\"]"
      )
      FROM KAFKA
      (
      "kafka_broker_list" = "172.16.0.51:9092",
      "kafka_topic" = "mysql-example.test.example",
      "kafka_partitions" = "0,1,2",
      "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
      XT2p8R

至此,完美支持替代StarRocks Connector for Kafka 来做实时cdc导入。但是我有个疑问,假设我有100个字段,COLUMNS和jsonpaths就得对齐100次??留着再研究吧

其他资料

使用Routine Load导入StarRocks
导入过程中实现数据转换

Kafka Connect是Apache Kafka®的一部分,在Kafka和其它系统之间提供可靠的、可扩展的分布式流式集成。Kafka Connect具有可用于许多系统的连接器,它是一个配置驱动的工具,不需要编码。

Kafka Connect API还提供了一个简单的接口,用于处理从源端通过数据管道到接收端的记录,该API称为单消息转换(SMT),顾名思义,当数据通过Kafka Connect连接器时,它可以对数据管道中的每条消息进行操作。

连接器分为源端或接收端,它们要么从Kafka上游的系统中提取数据,要么向Kafka的下游推送数据。这个转换可以配置为在任何一侧进行,源端连接器可以在写入Kafka主题之前对数据进行转换,接收端连接器也可以在将数据写入接收端之前对其进行转换。

转换的一些常见用途是:

  • 对字段重命名;
  • 掩蔽值;
  • 根据值将记录路由到主题;
  • 将时间戳转换或插入记录中;
  • 操作主键,例如根据字段的值设置主键。

Kafka自带了许多转换器,但是开发自定义的转换器也非常容易。

配置Kafka Connect的单消息转换

需要给转换器指定一个名字,该名字将用于指定该转换器的其他属性。例如,下面是JDBC源端利用RegexRouter转换器的配置片段,该转换器将固定字符串附加到要写入的主题的末尾:

1
2
3
4
5
6
7
8
9
10
11
{
"name": "jdbcSource",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

"transforms": "routeRecords",
"transforms.routeRecords.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeRecords.regex": "(.*)",
"transforms.routeRecords.replacement": "$1-test"
}
}

该转换器被命名为routeRecords,且在后续中用于传递属性。注意,上面的示例显示了RegexRouter的两个配置属性:正则表达式regex和匹配组引用replacement。此设置将从JDBC源端获取表名,并将其加上-test后缀。根据转换器的功能不同,也可能会有不同的配置属性,具体可以参见相关的文档。

执行多次转换

有时需要执行多次转换,Kafka Connect支持定义多个转化器,他们在配置中链接在一起。这些消息按照在transforms属性中定义的顺序执行转换。

下面的转换使用ValueToKey转换器将值转换为主键,并使用ExtractField转换器仅使用ID整数值作为主键:

1
2
3
4
5
“transforms”:”createKey,extractInt”,
“transforms.createKey.type”:”org.apache.kafka.connect.transforms.ValueToKey”,
“transforms.createKey.fields”:”c1”,
“transforms.extractInt.type”:”org.apache.kafka.connect.transforms.ExtractField$Key”,
“transforms.extractInt.field”:”c1”

注意,使用上述$Key符号,会指定此转换将作用于记录的Key,如果要针对记录的Value,需要在这里指定$Value。最后ConnectRecord看起来像这样:

1
2
3
key        value
------------------------------
null {"c1":{"int":100},"c2":{"string":"bar"}}

转换后:

1
2
3
key        value
------------------------------
100 {"c1":{"int":100},"c2":{"string":"bar"}}

单消息转换深入解读

下面深入地看下连接器如何处理数据。转换器被编译为JAR,并通过Connect工作节点的属性文件中的plugin.path属性,指定其可用于Kafka Connect,安装后就可以在连接器属性中配置转换。

配置和部署后,源端连接器将从上游系统接收记录,将其转换为ConnectRecord,然后将该记录传递给配置的转换器的apply()函数,然后等待返回记录。接收端连接器也是执行类似的过程,从Kafka主题读取并反序列化每个消息之后,将调用转换器的apply()方法,并将结果记录发送到目标系统。

如何开发单消息转换器

要开发将UUID插入每个记录的简单转换器,需要逐步执行以下的步骤。

apply方法是转换器的核心,这种转换支持带有模式和不带有模式的数据,因此每个都有一个转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}

private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);

final Map<String, Object> updatedValue = new HashMap<>(value);

updatedValue.put(fieldName, getRandomUuid());

return newRecord(record, null, updatedValue);
}

private R applyWithSchema(R record) {
final Struct value = requireStruct(operatingValue(record), PURPOSE);

Schema updatedSchema = schemaUpdateCache.get(value.schema());
if(updatedSchema == null) {
updatedSchema = makeUpdatedSchema(value.schema());
schemaUpdateCache.put(value.schema(), updatedSchema);
}

final Struct updatedValue = new Struct(updatedSchema);

for (Field field : value.schema().fields()) {
updatedValue.put(field.name(), value.get(field));
}

updatedValue.put(fieldName, getRandomUuid());

return newRecord(record, updatedSchema, updatedValue);
}

此转换器可以应用于记录的键或值,因此需要实现Key和Value子类,其扩展了InsertUuid类并实现apply方法调用的newRecord方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static class Key<R extends ConnectRecord<R>> extends InsertUuid<R> {

@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}

@Override
protected Object operatingValue(R record) {
return record.key();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}

}

public static class Value<R extends ConnectRecord<R>> extends InsertUuid<R> {

@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}

@Override
protected Object operatingValue(R record) {
return record.value();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}

}

该转换器仅改变了模式和值,但是要注意其可以操纵ConnectRecord的所有部分:Key、Value、Key和Value的模式、目标主题、目标分区和时间戳。

该转换器具有可选的参数,这些参数可以在运行时配置,并可以通过转换器类中重写的configure()方法访问:

1
2
3
4
5
6
7
8
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
fieldName = config.getString(ConfigName.UUID_FIELD_NAME);

schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
}

如上所示,该Transformation接口很简单,它实现了一个apply()方法来接收ConnectRecord然后再返回ConnectRecord,它可以选择通过configure()方法接收参数。

接下来,编译此JAR并将其放入Connect工作节点中plugin.path指定的路径中。注意需要将转换器所依赖的任何依赖项打包到它的路径中或编译为胖JAR。然后在连接器配置中调用它,如下所示(注意$Value内部类约定,以指示此转换应作用于记录的值):

1
2
3
transforms=insertuuid
transforms.insertuuid.type=kafka.connect.smt.InsertUuid$Value
transforms.insertuuid.uuid.field.name="uuid"

相关

kafka-connect-insert-uuid

0%