StarRocks之使用Routine Load实现CDC实践
背景
在 基于debezium(mongo)-kafka-starrocks的cdc实践,我们使用StarRocks Connector for Kafka
将debezium格式的数据实时写入到starrocks。这次我想尝试一下Routine Load
是否能够将debezium json
数据(带有删除标记)写入到starrocks。
我们明确一下目标:
- 支持将kafka中debezium json 格式的数据导入,支持增删改
- 支持日期按需转换
那么我们使用debezium将源数据导入到kafka,在kafka中造一下debezium json格式的数据。
源表数据构建
我特意在建表的时候包含了:日期时间datetime、日期date、布尔tinyint、整形int、字符串varchar,以此来验证这些类型的数据经过Routine Load,是如何进行转换的,我们重点关注不同时间格式的转换,因为在进入kafka中,时间格式的数据都被转换成了不同的时间戳。
1 | CREATE TABLE `test_date` ( |
使用debezium将源数据写入到kafka
1 | { |
得到kafka中如下结构的2条数据
1 | { |
1 | { |
我们观察一下kafka数据:
__deleted
标记了是否删除,类型是字符串,c_datetime
、c_date
、c_timestamp
进入到了kafka后,变成了不同的数据。- mysql
datetime
类型:最终转成了时间戳(1970年01月01日0时0分0秒到指定日期的毫秒数),形如:1730479999000
。 - mysql
date
类型:最终转成了天数(1970年01月01日到指定日期的天数),形如:17648
。 - mysql
timestamp
类型:最终转成了UTC时间,形如:2024-11-01T08:53:19Z
。 - mysql
time
类型:最终转成了微妙数(将小时转成了微妙),形如:34341000000
。 (该例未展示)
- mysql
这里就产生了一个疑问🤔,这些数据如何通过Routine Load
导入转换正确的数据进入到StarRocks,并且支持删除?
使用Routine Load导入StarRocks
StarRocks创建目标主键表
1
2
3
4
5
6
7
8
9
10
11CREATE TABLE ods_example (
id INT NOT NULL,
c_name string,
c_datetime datetime,
c_date date,
c_married BOOLEAN,
c_age int,
c_timestamp datetime
)
PRIMARY KEY (id)
DISTRIBUTED BY HASH (id);StarRocks创建导入,首先分析一下
支持删除的导入
:在starrocks无非就是__op
标识,当数据中带有__op:1
的时候,做DELETE操作,当数据中带有__op:0
的时候,就是更新操作,那么我们其实就是转换一下kafka中debezium json 格式的删除标识,由__deleted: "true"
转换成__op:1
即可实现数据支持删除的cdc,具体可以看看文档 通过导入实现数据变更时间的转换
:因为在kafka内的时间都是时间戳,所以使用starrocks的from_unixtime和时区转换convert_tz,根据自己需求转换即可。
来试一下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27CREATE ROUTINE LOAD test.ods_example_load_32 ON ods_example
COLUMNS (
id,
c_name,
temp_datetime,
temp_date,
c_married,
c_age,
temp_timestamp,
temp_deleted,
__op= cast( cast( temp_deleted AS BOOLEAN) AS TINYINT),
c_timestamp = convert_tz(temp_timestamp,'+00:00', '+08:00'),
c_datetime = convert_tz(from_unixtime( cast( temp_datetime AS BIGINT )/ 1000 ),'+00:00', '-08:00'),
c_date = from_unixtime( cast( temp_date AS BIGINT )* 24 * 60 * 60, '%Y%m%d' ))
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.c_name\",\"$.c_datetime\",\"$.c_date\",\"$.c_married\",\"$.c_age\",\"$.c_timestamp\",\"$.__deleted\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "172.16.0.51:9092",
"kafka_topic" = "mysql-example.test.example",
"kafka_partitions" = "0,1,2",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
至此,完美支持替代StarRocks Connector for Kafka
来做实时cdc导入。但是我有个疑问,假设我有100个字段,COLUMNS和jsonpaths就得对齐100次??留着再研究吧