paimon cdc入湖 & StarRocks湖仓分析实践

背景

为了实现数据源快速入湖,实现湖仓快速分析,这边评估一下paimon的真实可用情况,验证一下是否能上生产。

首先我们明确一下目标

  • 提交cdc方案验证
    • flink cdc pipline ❌验证失败
    • paimon-flink-action ✅通过验证
  • 源数据入湖验证

    分别验证一下业务中2个常用的源数据库入湖

    • mysql ✅通过验证
    • mongo ⌛️待验证

      📢问题:mongo cdc导入paimon,所有字段类型都是字符串
      boolean无法正确转换

  • 文件系统验证

    先用本地文件系统进行验证,然后验证cos存储

    • file://储存 ✅通过验证
    • s3://储存 ✅通过验证
  • StarRocks湖仓分析验证

    从StarRocks添加paimon catalog

    • 验证含有json的字段类型入湖后是什么数据类型,及StarRocks是否支持查询含有json的信息 ✅通过验证
    • 验证通过paimon catalog,创建物化视图是否能正常工作 ✅通过验证

环境及准备工作

  • 环境
    • flink 1.20
    • paimon 0.9
    • flink cdc 3.2.1
  • 准备工作
    • FLINK_HOME/lib 中添加
      • paimon-flink-1.20-0.9.0.jar
      • paimon-flink-1.20-0.9.0.jar
      • paimon-flink-action-0.9.0.jar
      • paimon-s3-0.9.0.jar
      • flink-sql-connector-mongodb-cdc-3.2.1.jar
      • flink-sql-connector-mysql-cdc-3.2.1.jar
    • FLINK_CDC_HOME/lib 中添加
      • flink-cdc-pipeline-connector-mysql-3.2.1.jar
      • flink-cdc-pipeline-connector-paimon-3.2.1.jar

方案

我将采取2种方案去验证:

  • 方案一:flink cdc 3.X 提供的pipline方式提交

    之前有用过flink cdc pipline 从mysql -> starrocks,生产验证ok,
    但一直没有验证过mysql -> paimon

  • 方案二:paimon官方提供的paimon-flink-action提交

    该方案是因为flink cdc pipline的mysql->paimon无法正常工作,并且paimon-flink-action为paimon官方推荐的提交作业方式

开始验证吧

⚠️ 这个方案目前没有走通,无论是从flink1.20换到1.18,均提交报错 ❌
mysql-to-paimon.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
################################################################################
# Description: Sync MySQL all tables to Paimon
################################################################################
source:
type: mysql
hostname: 192.168.103.113
port: 3306
username: root
password: 'XXX'
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai

sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: file:///tmp/warehouse1


pipeline:
name: Sync MySQL Database to Paimon
parallelism: 1

flink cdc 目录下进行提交

1
bash bin/flink-cdc.sh mysql-to-paimon.yaml --flink-home ../flink-1.20.0
  • file

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ./bin/flink run \
    ./lib/paimon-flink-action-0.9.0.jar \
    mysql_sync_database \
    --warehouse /tmp/warehouse1 \
    --database app_db \
    --mysql_conf hostname=192.168.103.113 \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=app_db \
    --table-conf bucket=1
  • s3

    这个s3的配置有点技巧,mysql-cdc内没有说明,我试了下可以catalog_conf配置,经过咨询官方,原来是在flink的配置文件内可以设置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ./bin/flink run \
    ./lib/paimon-flink-action-0.9.0.jar \
    mysql_sync_database \
    --warehouse s3://lakehouse-1253767413/paimon \
    --database app_db \
    --mysql_conf hostname=192.168.103.113 \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=app_db \
    --catalog_conf s3.endpoint=cos.ap-guangzhou.myqcloud.com \
    --catalog_conf s3.access-key=XXX \
    --catalog_conf s3.secret-key=XXX \
    --table-conf bucket=1

starrocks 湖查询

  • 添加 starrocks paimon catalog
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE EXTERNAL CATALOG paimon
    PROPERTIES
    (
    "type" = "paimon",
    "paimon.catalog.type" = "filesystem",
    "paimon.catalog.warehouse" = "s3://lakehouse-1253767413/paimon",
    "aws.s3.endpoint" = "cos.ap-guangzhou.myqcloud.com",
    "aws.s3.access_key" = "XXX",
    "aws.s3.secret_key" = "XXX"
    );
  • 验证
    • 查询paimon数据验证 ✅
      1
      2
      3
      4
      select * from paimon.app_db.orders
      id price log
      1 4.00 ["1"]
      2 100.00 ["2"]
    • json验证 ✅
      这里我们发现log字段本来在 mysql 内的是json类型,到了paimon则为string字符串类型
    • 增删改同步 ✅
      mysql 增删改,可以同步到paimon,从而starrocks进行查询
    • 创建异步物化视图验证 ✅
      1
      2
      3
      4
      5
      6
      CREATE MATERIALIZED VIEW paimon_mv_test
      COMMENT "paimon-mv-test"
      REFRESH ASYNC START ( '2024-10-01 10:00:00' ) EVERY ( INTERVAL 10 MINUTE )
      AS

      select * from paimon.app_db.orders