StarRocks之使用Routine Load实现CDC实践

背景

基于debezium(mongo)-kafka-starrocks的cdc实践,我们使用StarRocks Connector for Kafka将debezium格式的数据实时写入到starrocks。这次我想尝试一下Routine Load是否能够将debezium json数据(带有删除标记)写入到starrocks。
我们明确一下目标:

  • 支持将kafka中debezium json 格式的数据导入,支持增删改
  • 支持日期按需转换

那么我们使用debezium将源数据导入到kafka,在kafka中造一下debezium json格式的数据。

源表数据构建

我特意在建表的时候包含了:日期时间datetime、日期date、布尔tinyint、整形int、字符串varchar,以此来验证这些类型的数据经过Routine Load,是如何进行转换的,我们重点关注不同时间格式的转换,因为在进入kafka中,时间格式的数据都被转换成了不同的时间戳。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE `test_date` (
`id` int NOT NULL,
`c_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`c_datetime` datetime DEFAULT NULL,
`c_date` date DEFAULT NULL,
`c_married` tinyint(1) DEFAULT NULL,
`c_age` int DEFAULT NULL,
`c_timestamp` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;


INSERT INTO `test`.`example` (`id`, `c_name`, `c_datetime`, `c_date`, `c_married`, `c_age`, `c_timestamp`) VALUES (1, 'Kinoshita Momoka', '2024-11-01 16:53:19', '2009-08-16', 1, 11, '2024-11-01 16:53:19');
INSERT INTO `test`.`example` (`id`, `c_name`, `c_datetime`, `c_date`, `c_married`, `c_age`, `c_timestamp`) VALUES (2, 'Kimura Hazuki', '2024-11-01 16:53:19', '2018-04-27', 0, 12, '2024-11-01 16:53:19');

i1avIe

使用debezium将源数据写入到kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"name": "source-mysql-example-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "172.16.0.9",
"database.port": "3306",
"database.user": "root",
"database.password": "XXX",
"database.server.id": "1",
"database.server.name": "mysql-example-cdc",
"database.include.list": "test",
"table.include.list": "test.example",
"schema.history.internal.kafka.bootstrap.servers": "172.16.0.51:9092",
"schema.history.internal.kafka.topic": "da.schemahistory.example",
"decimal.handling.mode": "string",
"topic.prefix": "mysql-example",
"include.schema.changes": "false",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}

得到kafka中如下结构的2条数据

1
2
3
4
5
6
7
8
9
10
{
"id": 1,
"c_name": "Kinoshita Momoka",
"c_datetime": 1730479999000,
"c_date": 14472,
"c_married": 1,
"c_age": 11,
"c_timestamp": "2024-11-01T08:53:19Z",
"__deleted": "false"
}
1
2
3
4
5
6
7
8
9
10
{
"id": 2,
"c_name": "Kimura Hazuki",
"c_datetime": 1730479999000,
"c_date": 17648,
"c_married": 0,
"c_age": 12,
"c_timestamp": "2024-11-01T08:53:19Z",
"__deleted": "false"
}

我们观察一下kafka数据:

  • __deleted标记了是否删除,类型是字符串,

  • c_datetimec_datec_timestamp 进入到了kafka后,变成了不同的数据。

    • mysql datetime类型:最终转成了时间戳(1970年01月01日0时0分0秒到指定日期的毫秒数),形如:1730479999000
    • mysql date类型:最终转成了天数(1970年01月01日到指定日期的天数),形如:17648
    • mysql timestamp类型:最终转成了UTC时间,形如:2024-11-01T08:53:19Z
    • mysql time类型:最终转成了微妙数(将小时转成了微妙),形如:34341000000。 (该例未展示)

这里就产生了一个疑问🤔,这些数据如何通过Routine Load导入转换正确的数据进入到StarRocks,并且支持删除?
JBKtQm

使用Routine Load导入StarRocks

  • StarRocks创建目标主键表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    CREATE TABLE ods_example (
    id INT NOT NULL,
    c_name string,
    c_datetime datetime,
    c_date date,
    c_married BOOLEAN,
    c_age int,
    c_timestamp datetime
    )
    PRIMARY KEY (id)
    DISTRIBUTED BY HASH (id);
  • StarRocks创建导入,首先分析一下

    • 支持删除的导入:在starrocks无非就是__op标识,当数据中带有 __op:1的时候,做DELETE操作,当数据中带有__op:0的时候,就是更新操作,那么我们其实就是转换一下kafka中debezium json 格式的删除标识,由 __deleted: "true" 转换成 __op:1即可实现数据支持删除的cdc,具体可以看看文档 通过导入实现数据变更
    • 时间的转换:因为在kafka内的时间都是时间戳,所以使用starrocks的from_unixtime和时区转换convert_tz,根据自己需求转换即可。
      来试一下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      CREATE ROUTINE LOAD test.ods_example_load_32 ON ods_example
      COLUMNS (
      id,
      c_name,
      temp_datetime,
      temp_date,
      c_married,
      c_age,
      temp_timestamp,
      temp_deleted,
      __op= cast( cast( temp_deleted AS BOOLEAN) AS TINYINT),
      c_timestamp = convert_tz(temp_timestamp,'+00:00', '+08:00'),
      c_datetime = convert_tz(from_unixtime( cast( temp_datetime AS BIGINT )/ 1000 ),'+00:00', '-08:00'),
      c_date = from_unixtime( cast( temp_date AS BIGINT )* 24 * 60 * 60, '%Y%m%d' ))
      PROPERTIES
      (
      "desired_concurrent_number" = "5",
      "format" = "json",
      "jsonpaths" = "[\"$.id\",\"$.c_name\",\"$.c_datetime\",\"$.c_date\",\"$.c_married\",\"$.c_age\",\"$.c_timestamp\",\"$.__deleted\"]"
      )
      FROM KAFKA
      (
      "kafka_broker_list" = "172.16.0.51:9092",
      "kafka_topic" = "mysql-example.test.example",
      "kafka_partitions" = "0,1,2",
      "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
      XT2p8R

至此,完美支持替代StarRocks Connector for Kafka 来做实时cdc导入。但是我有个疑问,假设我有100个字段,COLUMNS和jsonpaths就得对齐100次??留着再研究吧

其他资料

使用Routine Load导入StarRocks
导入过程中实现数据转换