基于debezium(sqlserver)-kafka-starrocks跨机房cdc实践

目标需求

机房A:sqlserver分库分表
机房B(腾讯云):kafka,starrocks数仓

将sqlserver表数据流入kafka,流出到starrocks

QhUMOs

  • db: SCPRD
  • schema: test2,test3
  • table: test2.user,test3.user
  • column: id,name,age
    其中column字段只想进 idname,过滤age
    bHwB3g

前置配置

  • 开启db:SCPRD的数据库cdc
    1
    2
    3
    4
    5
    use SCPRD
    GO

    EXEC sys.sp_cdc_enable_db
    GO
  • 开启table:test2.user,test3.user的表cdc
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    USE SCPRD
    GO

    EXEC sys.sp_cdc_enable_table
    @source_schema = N'test2', -- 指定源表所属的 schema 名
    @source_name = N'user', -- 指定需要读取的源表名
    @role_name = N'cdc_role',
    @filegroup_name = NULL,
    @supports_net_changes = 1,
    @captured_column_list = N'[id],[name]'
    GO
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    USE SCPRD
    GO

    EXEC sys.sp_cdc_enable_table
    @source_schema = N'test3', -- 指定源表所属的 schema 名
    @source_name = N'user', -- 指定需要读取的源表名
    @role_name = N'cdc_role',
    @filegroup_name = NULL,
    @supports_net_changes = 1,
    @captured_column_list = N'[id],[name]'
    GO
    为表启用CDC后,SQLServer生成两个Agent作业 cdc.dbname_capturecdc.dbname_cleanup

实践

部署的方案有很多,如果能打通机房A和机房B的网络,那是最优的,无法打通的情况我们选择如下2个方案。

  • 优先方案:选择在机房A部署debezium/connect,远程连接机房B(腾讯云)的kafka,
  • 备选方案:选择在机房B(腾讯云)部署debezium/connect,远程连接机房A的sqlserver
  • 机房A部署debezium/connect

    因为跨机房,所有使用SASL_PLAINTEXT接入腾讯云kafka稳妥稳定一点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    version: '2'
    services:
    connect:
    image: ccr.ccs.tencentyun.com/da-debezium/connect:2.6.2.Final
    ports:
    - 8083:8083
    environment:
    BOOTSTRAP_SERVERS: 159.75.194.XXX:50001
    CONNECT_SASL_MECHANISM: PLAIN
    CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXT
    CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
    GROUP_ID: connect-cluster
    CONFIG_STORAGE_TOPIC: config.storage.topic
    OFFSET_STORAGE_TOPIC: offset.storage.topic
    STATUS_STORAGE_TOPIC: status.storage.topic
    CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
    CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_PLAINTEXT
    CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
    CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
    CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_PLAINTEXT
    CONNECT_CONSUMER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
    networks:
    kafka_network:
    name: debezium
  • 创建sqlserver connector
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    {
    "name": "source-sqlserver-user-cdc_v1",
    "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.hostname": "192.168.103.XX",
    "database.port": "1433",
    "database.user": "sa",
    "database.password": "XXX",
    "database.names": "SCPRD",
    "topic.prefix": "sqlserver-user-cdc_v1",
    "schema.include.list": "test2,test3",
    "table.include.list": "test2.user,test3.user",
    "column.include.list": "test(.*).user.id,test(.*).user.name",
    "schema.history.internal.kafka.bootstrap.servers": "159.75.194.XXX:50001",
    "schema.history.internal.kafka.topic": "schemahistory.sqlserver-user-cdc_v1",
    "schema.history.internal.producer.sasl.mechanism": "PLAIN",
    "schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
    "schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER_NAME\" password=\"PWD\";",
    "schema.history.internal.consumer.sasl.mechanism": "PLAIN",
    "schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
    "schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER_NAME\" password=\"PWD\";",
    "schema.history.internal.store.only.captured.tables.ddl": true,
    "schema.history.internal.store.only.captured.databases.ddl": true,
    "database.encrypt": "false",
    "transforms": "Reroute",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "(.*)test(.*)user",
    "transforms.Reroute.topic.replacement": "$1user"
    }
    }

运维常见命令

  • 为数据库 SCPRD 启用、关闭 CDC
    开启

    注意:数据库开启cdc功能,注意不能为master数据库启动该功能

    1
    2
    3
    4
    5
    use SCPRD
    GO

    EXEC sys.sp_cdc_enable_db
    GO

    关闭

    1
    2
    3
    4
    5
    USE SCPRD
    GO

    exec sys.sp_cdc_disable_db
    GO
  • 为 SQLServer 源表启用、关闭变更数据捕获
    开启

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    USE SCPRD
    GO

    EXEC sys.sp_cdc_enable_table
    @source_schema = N'test2', -- 指定源表所属的 schema 名
    @source_name = N'user', -- 指定需要读取的源表名
    @role_name = N'cdc_role',
    @filegroup_name = NULL,
    @supports_net_changes = 1,
    @captured_column_list = N'[id],[name]'
    GO

    关闭

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    USE SCPRD
    GO

    EXEC sys.sp_cdc_disable_table
    @source_schema = N'test2', -- 指定源表所属的 schema 名
    @source_name = N'user', -- 指定需要读取的源表名
    @role_name = N'cdc_role',
    @filegroup_name = NULL,
    @supports_net_changes = 1,
    @captured_column_list = N'[id],[name]'
    GO
  • 查询数据库的cdc开启状态、查询表cdc开启状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    --查询数据库是否开启CDC
    SELECT name,is_cdc_enabled FROM sys.databases

    --查询表是否开启CDC
    SELECT C.name,
    B.name,
    is_tracked_by_cdc
    FROM sys.tables AS A
    LEFT JOIN sys.objects AS B ON A.object_id = B.object_id
    LEFT JOIN sys.schemas AS C ON C.schema_id = B.schema_id;
  • 检查源表是否启动变更数据捕获

    1
    2
    3
    4
    5
    USE SCPRD
    GO

    EXEC sys.sp_cdc_help_change_data_capture
    GO

相关链接

SqlServer Change Data Capture CDC使用手册
Debezium SASL_PLAINTEXT