seatunnel-cdc入湖实践

背景

开始统一数仓,seatunnel评估一轮

mongodb-cdc

  • mongodb-cdc实时同步mysql

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["pms"]
    collection = ["pms.demand_item_row"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "_id" : string,
    "ras" : string,
    "planSn" : string,
    "isPlaned" : boolean
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    _id = _id
    ras = ras
    planSn = planSn
    isPlaned = isPlaned
    }
    }
    }

    sink {
    jdbc {
    url = "jdbc:mysql://192.168.103.113:3306"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "XXX"
    generate_sink_sql = true
    database = test
    table = row
    primary_keys = ["_id"]
    }
    }
  • mongodb-cdc实时同步starrocks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["biocitydb"]
    collection = ["biocitydb.users"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "id" : int,
    "username" : string,
    "wecomUserId" : string,
    "password" : string,
    "name" : string,
    "cellphone" : string,
    "email" : string
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    id = id
    username = user_name
    wecomUserId = wecom_user_id
    password = password
    name = name
    cellphone = cellphone
    email = email
    }
    }
    }

    sink {
    StarRocks {
    batch_max_rows=10240
    table="user"
    database="assistant"
    base-url="jdbc:mysql://192.168.103.202:9030"
    password="XXX"
    username="root"
    nodeUrls=[
    "192.168.103.202:8040"
    ]
    }
    }
  • mongodb-cdc 实时同步es

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["pms"]
    collection = ["pms.demand_item_row"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "_id" : string,
    "ras" : string,
    "planSn" : string,
    "isPlaned" : boolean
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    _id = _id
    ras = ras
    planSn = planSn
    isPlaned = isPlaned
    }
    }
    }

    sink {
    Elasticsearch {
    hosts = ["192.168.103.113:9200"]
    index = "row"
    username = "elastic"
    password = "XXX"
    tls_verify_certificate = false
    primary_keys = ["_id"]
    }
    }

sqlserver cdc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 启用CDC功能
EXEC sys.sp_cdc_enable_db;

-- 确认SQL Server Agent已开启
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'

-- 判断当前数据库是否启用了CDC(如果返回1,表示已启用)
SELECT name,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1;

-- 查询schema_id
SELECT * FROM sys.schemas

-- 判断当前数据表是否启用了CDC(如果返回1,表示已启用)
SELECT name, is_tracked_by_cdc, schema_id FROM sys.tables WHERE is_tracked_by_cdc = 1;

-- source_schema 是表所属的架构(schema)的名称。
-- source_name 是要启用 CDC 跟踪的表的名称。
-- cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。
EXEC sys.sp_cdc_enable_table
@source_schema = 'wmwhse1',
@source_name = 'PICKDETAIL',
@role_name = 'cdc_role';

-- 一次核验是否开启CDC
USE SCPRD;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO

启动

1
2
3
4
开始
./bin/seatunnel.sh --config ./example/mongo2sr3 -e local
// 恢复
./bin/seatunnel.sh --config ./example/mongo2sr3 -r {jobId} -e local

其他

SQL Server CDC功能配置总结