基于debezium(sqlserver)-kafka-starrocks跨机房cdc实践
目标需求
机房A
:sqlserver分库分表机房B(腾讯云)
:kafka,starrocks数仓
将sqlserver表数据流入kafka,流出到starrocks
db
:SCPRD
schema
:test2,test3
table
:test2.user,test3.user
column
:id,name,age
其中column
字段只想进id
和name
,过滤age
前置配置
- 开启
db:SCPRD
的数据库cdc1
2
3
4
5use SCPRD
GO
EXEC sys.sp_cdc_enable_db
GO - 开启
table:test2.user,test3.user
的表cdc1
2
3
4
5
6
7
8
9
10
11USE SCPRD
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'test2', -- 指定源表所属的 schema 名
@source_name = N'user', -- 指定需要读取的源表名
@role_name = N'cdc_role',
@filegroup_name = NULL,
@supports_net_changes = 1,
@captured_column_list = N'[id],[name]'
GO为表启用CDC后,SQLServer生成两个Agent作业1
2
3
4
5
6
7
8
9
10
11USE SCPRD
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'test3', -- 指定源表所属的 schema 名
@source_name = N'user', -- 指定需要读取的源表名
@role_name = N'cdc_role',
@filegroup_name = NULL,
@supports_net_changes = 1,
@captured_column_list = N'[id],[name]'
GOcdc.dbname_capture
、cdc.dbname_cleanup
实践
部署的方案有很多,如果能打通机房A和机房B的网络,那是最优的,无法打通的情况我们选择如下2个方案。
- 优先方案:选择在机房A部署
debezium/connect
,远程连接机房B(腾讯云)的kafka,- 备选方案:选择在机房B(腾讯云)部署
debezium/connect
,远程连接机房A的sqlserver
- 机房A部署
debezium/connect
因为跨机房,所有使用SASL_PLAINTEXT接入腾讯云kafka稳妥稳定一点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24version: '2'
services:
connect:
image: ccr.ccs.tencentyun.com/da-debezium/connect:2.6.2.Final
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: 159.75.194.XXX:50001
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
GROUP_ID: connect-cluster
CONFIG_STORAGE_TOPIC: config.storage.topic
OFFSET_STORAGE_TOPIC: offset.storage.topic
STATUS_STORAGE_TOPIC: status.storage.topic
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_CONSUMER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
networks:
kafka_network:
name: debezium - 创建sqlserver connector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30{
"name": "source-sqlserver-user-cdc_v1",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "192.168.103.XX",
"database.port": "1433",
"database.user": "sa",
"database.password": "XXX",
"database.names": "SCPRD",
"topic.prefix": "sqlserver-user-cdc_v1",
"schema.include.list": "test2,test3",
"table.include.list": "test2.user,test3.user",
"column.include.list": "test(.*).user.id,test(.*).user.name",
"schema.history.internal.kafka.bootstrap.servers": "159.75.194.XXX:50001",
"schema.history.internal.kafka.topic": "schemahistory.sqlserver-user-cdc_v1",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER_NAME\" password=\"PWD\";",
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER_NAME\" password=\"PWD\";",
"schema.history.internal.store.only.captured.tables.ddl": true,
"schema.history.internal.store.only.captured.databases.ddl": true,
"database.encrypt": "false",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*)test(.*)user",
"transforms.Reroute.topic.replacement": "$1user"
}
}
运维常见命令
为数据库 SCPRD 启用、关闭 CDC
开启注意:数据库开启cdc功能,注意不能为master数据库启动该功能
1
2
3
4
5use SCPRD
GO
EXEC sys.sp_cdc_enable_db
GO关闭
1
2
3
4
5USE SCPRD
GO
exec sys.sp_cdc_disable_db
GO为 SQLServer 源表启用、关闭变更数据捕获
开启1
2
3
4
5
6
7
8
9
10
11USE SCPRD
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'test2', -- 指定源表所属的 schema 名
@source_name = N'user', -- 指定需要读取的源表名
@role_name = N'cdc_role',
@filegroup_name = NULL,
@supports_net_changes = 1,
@captured_column_list = N'[id],[name]'
GO关闭
1
2
3
4
5
6
7
8
9
10
11USE SCPRD
GO
EXEC sys.sp_cdc_disable_table
@source_schema = N'test2', -- 指定源表所属的 schema 名
@source_name = N'user', -- 指定需要读取的源表名
@role_name = N'cdc_role',
@filegroup_name = NULL,
@supports_net_changes = 1,
@captured_column_list = N'[id],[name]'
GO查询数据库的cdc开启状态、查询表cdc开启状态
1
2
3
4
5
6
7
8
9
10--查询数据库是否开启CDC
SELECT name,is_cdc_enabled FROM sys.databases
--查询表是否开启CDC
SELECT C.name,
B.name,
is_tracked_by_cdc
FROM sys.tables AS A
LEFT JOIN sys.objects AS B ON A.object_id = B.object_id
LEFT JOIN sys.schemas AS C ON C.schema_id = B.schema_id;检查源表是否启动变更数据捕获
1
2
3
4
5USE SCPRD
GO
EXEC sys.sp_cdc_help_change_data_capture
GO
相关链接
SqlServer Change Data Capture CDC使用手册
Debezium SASL_PLAINTEXT