基于debezium(mongo)-kafka-starrocks的cdc实践

先说结论

解决了mongodb能支持cdc的before消息体,但因为 StarRocks Connector for Kafka 惊天巨bug,本轮实践验证失败。

  • 临时解决措施:sink自行转换__op
1
2
3
4
5
6
7
"transforms": "RenameField,CastBool,CastInt",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "__deleted:__op",
"transforms.CastBool.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.CastBool.spec": "__op:boolean",
"transforms.CastInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.CastInt.spec": "__op:INT8",

目标

网上的资料真是少,怎么找都没有找到能支持mongodb删除的debezium cdc的中文资料,看了半天英文资料,实践一下,这里主要验证一下 StarRocks Connector for Kafka

  • source: MongoDbConnector
  • sink: StarRocksSinkConnector

我们的目标是将mongodb表数据实时CDC进入starrocks数仓,支持删除操作。

前置设施

mongodb

首先我们得确保mongodb是6.0,因为6.0后支持了changeStreamPreAndPostImages变更流
原理:在6.0以前,因为mongodb不支持changeStreamPreAndPostImages变更流,无法在cdc语句显示出before,所以当进行删除操作的时候,按照debizium的格式规范,无法完成删除操作

如果需要在debezium cdc输出的语句上显示before信息需要

  • 开启mongodb版本 6.0 中的新增功能 changeStreamPreAndPostImages
    1
    2
    3
    4
    5
    6
    7
    use user
    db.runCommand({
    collMod: "user",
    changeStreamPreAndPostImages: {
    enabled: true
    }
    })
  • 在debezium mongodb connector的capture.mode上使用以下任意一个
    • change_streams_with_pre_image 输出before,不输出after
    • change_streams_update_full_with_pre_image 输出before和after
      模式 描述
      change_streams 输出变化流,但是在进行update操作时,不输出after字段
      change_streams_update_full 在change_streams的基础上,增加after字段,用于输出现在变化后的数据的内容
      change_streams_with_pre_image 在change_streams的基础上,增加before字段的输出,但需要进行配置
      change_streams_update_full_with_pre_image 在change_streams_update_full的基础上增加,增加before字段,用于输出变化前变化后的数据的内容
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      {
      "name": "source-user-mongodb-connector",
      "config": {
      "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
      "mongodb.connection.string": "mongodb://xxx:xxx@172.16.0.11:27017,172.16.0.15:27017/?&authSource=admin",
      "collection.include.list": "mydb.user",
      "topic.prefix": "user-cdc",
      "capture.mode": "change_streams_update_full_with_pre_image"
      }
      }

kafka

开启了自动创建topic。
如果没有自动开启创建,需要为debezium手工创建

  • config.storage.topic
  • offset.storage.topic
  • status.storage.topic

手工创建,参考Kafka Connect 接入 CKafka,有许多debezium的要求,所以建议自动创建。

部署 debezium/connect

使用容器分布式部署方式,使得支持通过rest api管理connector,
通过volumes,将starrocks-kafka-connector也集成支持进去
这里需要注意debezium/connect镜像已经将常见的mysql、mongodb、sqlserver集成了进去了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
version: '2'
services:
connect:
image: debezium/connect:2.4.2.Final
ports:
- 8083:8083
environment:
- BOOTSTRAP_SERVERS=172.16.0.XX:9092
- GROUP_ID=connect-cluster
- CONFIG_STORAGE_TOPIC=config.storage.topic
- OFFSET_STORAGE_TOPIC=offset.storage.topic
- STATUS_STORAGE_TOPIC=status.storage.topic
volumes:
- /home/kafka-connect/starrocks-kafka-connector-1.0.3:/kafka/connect/starrocks-kafka-connector-1.0.3
debezium-ui:
image: debezium/debezium-ui:2.4
ports:
- 8085:8080
environment:
- KAFKA_CONNECT_URIS=http://connect:8083
networks:
kafka_network:
name: debezium

流入:mongo->kafka ✅

mongo user 表 流入 kafka

user表先插入一条数据吧,方便提交任务的时候,先走全量快照

1
2
3
4
db.getCollection("user").insert( {
_id: ObjectId("667820b8d767977a330e9241"),
name: "danny"
} );

可通过 http://ip:8083/connectors ,提交source mongodb connector任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
curl --location 'http://ip:8083/connectors' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "source-test23-user-cdc",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "mongodb://mongouser:xxx@172.16.0.11:27017,172.16.0.15:27017/?&authSource=admin",
"collection.include.list": "test.user",
"topic.prefix": "user-cdc23",
"capture.mode": "change_streams_update_full_with_pre_image",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false"
}
}'

我们来看看增删改查对应的消息结构

  • 创建

    创建的时候 before是null,after是创建后的json字符串,op是c

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    {
    "schema": {...},
    "payload": {
    "before": null,
    "after": "{\"_id\": {\"$oid\": \"6676dccb988e8964430f7e46\"},\"name\": \"junyao\"}",
    "updateDescription": null,
    "source": {
    "version": "2.4.2.Final",
    "connector": "mongodb",
    "name": "user",
    "ts_ms": 1719065804000,
    "snapshot": "false",
    "db": "mydb",
    "sequence": null,
    "rs": "cmgo-6t0e5t7x_0",
    "collection": "user",
    "ord": 1,
    "lsid": null,
    "txnNumber": null,
    "wallTime": 1719065804058
    },
    "op": "c",
    "ts_ms": 1719065804054,
    "transaction": null
    }
    }
  • 更新

    before是更新前,after是更新后,op是u

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    {
    "schema": {...},
    "payload": {
    "before": "{\"_id\": {\"$oid\": \"6676dbe5988e8964430f7e45\"},\"name\": \"tony\"}",
    "after": "{\"_id\": {\"$oid\": \"6676dbe5988e8964430f7e45\"},\"name\": \"danny\"}",
    "updateDescription": {
    "removedFields": null,
    "updatedFields": "{\"name\": \"danny\"}",
    "truncatedArrays": null
    },
    "source": {
    "version": "2.4.2.Final",
    "connector": "mongodb",
    "name": "user",
    "ts_ms": 1719065650000,
    "snapshot": "false",
    "db": "mydb",
    "sequence": null,
    "rs": "cmgo-6t0e5t7x_0",
    "collection": "user",
    "ord": 1,
    "lsid": null,
    "txnNumber": null,
    "wallTime": 1719065650298
    },
    "op": "u",
    "ts_ms": 1719065650298,
    "transaction": null
    }
    }
  • 删除

    before是删除前,after是删除后

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    {
    "schema": {...},
    "payload": {
    "before": "{\"_id\": {\"$oid\": \"6676dbe5988e8964430f7e45\"},\"name\": \"danny\"}",
    "after": null,
    "updateDescription": null,
    "source": {
    "version": "2.4.2.Final",
    "connector": "mongodb",
    "name": "user",
    "ts_ms": 1719065732000,
    "snapshot": "false",
    "db": "mydb",
    "sequence": null,
    "rs": "cmgo-6t0e5t7x_0",
    "collection": "user",
    "ord": 1,
    "lsid": null,
    "txnNumber": null,
    "wallTime": 1719065732402
    },
    "op": "d",
    "ts_ms": 1719065732399,
    "transaction": null
    }
    }

流出:kafka -> StarRocksSinkConnector ⚠️

kafka流出starrocks,目前未走通

  • 创建starrocks表
    1
    2
    3
    4
    5
    6
    7
    8
    CREATE TABLE IF NOT EXISTS `scm`.`ods_test_user` (
    `_id` STRING ,
    `name` STRING NULL
    ) ENGINE=OLAP
    PRIMARY KEY (_id) DISTRIBUTED BY HASH (_id) PROPERTIES
    (
    "replication_num" = "1"
    )
  • 提交任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    curl --location 'http://ip:8083/connectors' \
    --header 'Content-Type: application/json' \
    --data '{
    "name": "sink-test23-user-cdc",
    "config": {
    "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
    "topics": "user-cdc23.test.user",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "false",
    "starrocks.http.url": "xxx.xxx.xxx.xxx:38030",
    "starrocks.topic2table.map": "user-cdc23.test.user:ods_test_user",
    "starrocks.username": "root",
    "starrocks.password": "xxx",
    "starrocks.database.name": "scm",
    "sink.properties.strip_outer_array": "true",
    "transforms": "addfield,unwrap",
    "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite"
    }
    }'

这里需要注意,按照starrocks的文档导入 Debezium CDC 格式数据,对于mongodb的transforms.unwrap.type存在差异,因为我是mongodb6.0,所以我看了一下最新的 MongoDB New Document State Extraction,相比较 New Record State Extraction,有以下的差异

需要调整成 io.debezium.connector.mongodb.transforms.ExtractNewDocumentState

调整好提交任务,看报错日志。

1
2
3
root@starrockscluster-cn-1:/opt/starrocks# curl http://starrockscluster-cn-1.starrockscluster-cn-search.starrocks.svc.cluster.local:8040/api/_load_error_log?file=error_log_864bbf459bb530a6_44aa9dc5433d4c0
Error: NULL value in non-nullable column '_id'. Row: [NULL, NULL, 0]
root@starrockscluster-cn-1:/opt/starrocks#

失败,差评!

流出:kafka -> StarRocksSinkConnector 失败后换个思路 ⚠️

换个思路,我可不可以在 mongo source流入kafka的时候就处理好格式? 于是我改了下,添加以下内容到source

1
2
3
4
5
"transforms": "addfield,unwrap",
"transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite"
  • mongo source
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    curl --location 'ip:8083/connectors' \
    --header 'Accept: application/json' \
    --header 'Content-Type: application/json' \
    --data-raw '{
    "name": "source-test26-user-cdc",
    "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.connection.string": "mongodb://mongouser:XXX@172.16.0.11:27017,172.16.0.15:27017/?&authSource=admin",
    "collection.include.list": "test.user",
    "topic.prefix": "user-cdc26",
    "capture.mode": "change_streams_update_full_with_pre_image",
    "snapshot.mode": "initial",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "false",
    "transforms": "addfield,unwrap",
    "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite"
    }
    }'
    得到了这样的数据结构
    1
    2
    3
    4
    5
    {
    "_id": "66793958d767977a330e9245",
    "name": "jy",
    "__deleted": false
    }
  • starrocks sink
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    curl --location 'http://ip:8083/connectors' \
    --header 'Content-Type: application/json' \
    --data '{
    "name": "sink-test27-user-cdc",
    "config": {
    "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
    "topics": "user-cdc27.test.user",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "false",
    "starrocks.http.url": "159.75.190.XXX:38030",
    "starrocks.topic2table.map": "user-cdc27.test.user:ods_test_user",
    "starrocks.username": "root",
    "starrocks.password": "XXX",
    "starrocks.database.name": "scm",
    "sink.properties.strip_outer_array": "true"
    }
    }'
  • 验证

    我们来验证一下增删改查一下

    • 创建
      1
      2
      3
      4
      5
      {
      "_id": "66793e78d767977a330e9246",
      "name": "ivan",
      "__deleted": false
      }
      ⚠️成功同步数据到starrocks,并没有添加__op:0, 没有添加的时候系统默认是__op:0
    • 更新
      1
      2
      3
      4
      5
      {
      "_id": "66793958d767977a330e9245",
      "name": "danny",
      "__deleted": false
      }
      ⚠️成功同步数据到starrocks,并没有添加__op:0, 没有添加的时候系统默认是__op:0
    • 删除
      1
      2
      3
      4
      5
      {
      "_id": "66793e78d767977a330e9246",
      "name": "ivan",
      "__deleted": true
      }
      ⚠️失败,starrocks并没有删除,并没有添加__op:1, 没有添加的时候系统默认是__op:0,所以无法删除,应该是做了更新操作

我们可以看到实际transforms出来的并没有包含__op,
而按照StarRocks 的主键表的UPSERT 和 DELETE 操作
xqhGZM

我们发现,消息体内没有__op,都会按照UPSERT,这就能理解为什么没有成功删除了。

差评!

其他