paimon cdc入湖 & StarRocks湖仓分析实践
背景
为了实现数据源快速入湖,实现湖仓快速分析,这边评估一下paimon的真实可用情况,验证一下是否能上生产。
首先我们明确一下目标
- 提交cdc方案验证
- flink cdc pipline ❌验证失败
- paimon-flink-action ✅通过验证
- 源数据入湖验证
分别验证一下业务中2个常用的源数据库入湖
- mysql ✅通过验证
- mongo ⌛️待验证
📢问题:mongo cdc导入paimon,所有字段类型都是字符串
boolean无法正确转换
- 文件系统验证
先用本地文件系统进行验证,然后验证cos存储
- file://储存 ✅通过验证
- s3://储存 ✅通过验证
- StarRocks湖仓分析验证
从StarRocks添加paimon catalog
- 验证含有json的字段类型入湖后是什么数据类型,及StarRocks是否支持查询含有json的信息 ✅通过验证
- 验证通过paimon catalog,创建物化视图是否能正常工作 ✅通过验证
环境及准备工作
- 环境
- flink 1.20
- paimon 0.9
- flink cdc 3.2.1
- 准备工作
FLINK_HOME/lib
中添加paimon-flink-1.20-0.9.0.jar
paimon-flink-1.20-0.9.0.jar
paimon-flink-action-0.9.0.jar
paimon-s3-0.9.0.jar
flink-sql-connector-mongodb-cdc-3.2.1.jar
flink-sql-connector-mysql-cdc-3.2.1.jar
FLINK_CDC_HOME/lib
中添加flink-cdc-pipeline-connector-mysql-3.2.1.jar
flink-cdc-pipeline-connector-paimon-3.2.1.jar
方案
我将采取2种方案去验证:
- 方案一:flink cdc 3.X 提供的pipline方式提交
之前有用过flink cdc pipline 从mysql -> starrocks,生产验证ok,
但一直没有验证过mysql -> paimon - 方案二:paimon官方提供的paimon-flink-action提交
该方案是因为flink cdc pipline的mysql->paimon无法正常工作,并且
paimon-flink-action
为paimon官方推荐的提交作业方式
开始验证吧
使用flink cdc pipline
⚠️ 这个方案目前没有走通,无论是从flink1.20换到1.18,均提交报错 ❌mysql-to-paimon.yaml
1 | ################################################################################ |
在 flink cdc
目录下进行提交
1 | bash bin/flink-cdc.sh mysql-to-paimon.yaml --flink-home ../flink-1.20.0 |
使用paimon-flink-action
file
1
2
3
4
5
6
7
8
9
10./bin/flink run \
./lib/paimon-flink-action-0.9.0.jar \
mysql_sync_database \
--warehouse /tmp/warehouse1 \
--database app_db \
--mysql_conf hostname=192.168.103.113 \
--mysql_conf username=root \
--mysql_conf password=XXX \
--mysql_conf database-name=app_db \
--table-conf bucket=1s3
这个s3的配置有点技巧,mysql-cdc内没有说明,我试了下可以catalog_conf配置,经过咨询官方,原来是在flink的配置文件内可以设置
1
2
3
4
5
6
7
8
9
10
11
12
13./bin/flink run \
./lib/paimon-flink-action-0.9.0.jar \
mysql_sync_database \
--warehouse s3://lakehouse-1253767413/paimon \
--database app_db \
--mysql_conf hostname=192.168.103.113 \
--mysql_conf username=root \
--mysql_conf password=XXX \
--mysql_conf database-name=app_db \
--catalog_conf s3.endpoint=cos.ap-guangzhou.myqcloud.com \
--catalog_conf s3.access-key=XXX \
--catalog_conf s3.secret-key=XXX \
--table-conf bucket=1
starrocks 湖查询
- 添加 starrocks paimon catalog
1
2
3
4
5
6
7
8
9
10CREATE EXTERNAL CATALOG paimon
PROPERTIES
(
"type" = "paimon",
"paimon.catalog.type" = "filesystem",
"paimon.catalog.warehouse" = "s3://lakehouse-1253767413/paimon",
"aws.s3.endpoint" = "cos.ap-guangzhou.myqcloud.com",
"aws.s3.access_key" = "XXX",
"aws.s3.secret_key" = "XXX"
); - 验证
- 查询paimon数据验证 ✅
1
2
3
4select * from paimon.app_db.orders
id price log
1 4.00 ["1"]
2 100.00 ["2"] - json验证 ✅
这里我们发现log字段本来在 mysql 内的是json类型,到了paimon则为string字符串类型 - 增删改同步 ✅
mysql 增删改,可以同步到paimon,从而starrocks进行查询 - 创建异步物化视图验证 ✅
1
2
3
4
5
6CREATE MATERIALIZED VIEW paimon_mv_test
COMMENT "paimon-mv-test"
REFRESH ASYNC START ( '2024-10-01 10:00:00' ) EVERY ( INTERVAL 10 MINUTE )
AS
select * from paimon.app_db.orders
- 查询paimon数据验证 ✅