seatunnel-cdc入湖实践
背景
开始统一数仓,seatunnel评估一轮
mongodb-cdc
mongodb-cdc实时同步mysql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "192.168.103.113:27011"
database = ["pms"]
collection = ["pms.demand_item_row"]
username = root
password = "XXX"
schema = {
fields {
"_id" : string,
"ras" : string,
"planSn" : string,
"isPlaned" : boolean
}
}
}
}
transform {
FieldMapper {
field_mapper = {
_id = _id
ras = ras
planSn = planSn
isPlaned = isPlaned
}
}
}
sink {
jdbc {
url = "jdbc:mysql://192.168.103.113:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "XXX"
generate_sink_sql = true
database = test
table = row
primary_keys = ["_id"]
}
}mongodb-cdc实时同步starrocks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "192.168.103.113:27011"
database = ["biocitydb"]
collection = ["biocitydb.users"]
username = root
password = "XXX"
schema = {
fields {
"id" : int,
"username" : string,
"wecomUserId" : string,
"password" : string,
"name" : string,
"cellphone" : string,
"email" : string
}
}
}
}
transform {
FieldMapper {
field_mapper = {
id = id
username = user_name
wecomUserId = wecom_user_id
password = password
name = name
cellphone = cellphone
email = email
}
}
}
sink {
StarRocks {
batch_max_rows=10240
table="user"
database="assistant"
base-url="jdbc:mysql://192.168.103.202:9030"
password="XXX"
username="root"
nodeUrls=[
"192.168.103.202:8040"
]
}
}mongodb-cdc 实时同步es
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "192.168.103.113:27011"
database = ["pms"]
collection = ["pms.demand_item_row"]
username = root
password = "XXX"
schema = {
fields {
"_id" : string,
"ras" : string,
"planSn" : string,
"isPlaned" : boolean
}
}
}
}
transform {
FieldMapper {
field_mapper = {
_id = _id
ras = ras
planSn = planSn
isPlaned = isPlaned
}
}
}
sink {
Elasticsearch {
hosts = ["192.168.103.113:9200"]
index = "row"
username = "elastic"
password = "XXX"
tls_verify_certificate = false
primary_keys = ["_id"]
}
}
sqlserver cdc
1 | -- 启用CDC功能 |
启动
1 | 开始 |