Flink流平台-实践mongodb CDC

背景

半年前提笔写下了monstache实践mongodb同步es,半年后因为搭建完实时数据流平台,我将用flink sql实时流平台来尝试本次的实践,对标 monstache实践mongodb同步es.

monstache Flink CDC
近实时 ☑️ ☑️
支持(旧数据)全量同步 ☑️ ☑️
支持增量同步(增删改) ☑️ ☑️
是否现在社区主流 ☑️是 ☑️未来主流
同步方式 数据层oplog 数据层oplog流处理

实践

mongodb-material表数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"_id": ObjectId("5f601cd6afef2b4993dc7afd"),
"created": ISODate("2020-09-15T01:45:44.753Z"),
"mfrId": ObjectId("5ea279be76860394d14a4982"),
"mfrName": "Biolegend",
"name": "FITC anti-mouse CD2",
"ras": "100105-BLG",
"sn": "100105",
"spec": "50 μg",
"status": true,
"taxrate": 13,
"unit": "EA",
"updated": ISODate("2021-01-15T08:55:45.668Z"),
"price": NumberInt("980"),
"taxcode": "107030710",
"clone": "RM2-5",
"lastOrderAt": ISODate("2021-08-05T03:09:47.577Z"),
"manual": "https://www.biolegend.com/Default.aspx?ID=6664&productid=472",
"pn": "472",
"cumulativeSales": NumberInt("0"),
"isDeprecated": false,
"ship": "蓝冰",
"storage": "2°C-8°C",
"isPublic": true,
"invtCode": "1405.01"
}

添加实时流任务
Yl6y6J
Flink SQL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
CREATE TABLE material (
_id STRING,
created TIMESTAMP_LTZ(3),
mfrId STRING,
mfrName STRING,
name STRING,
ras STRING,
sn STRING,
spec STRING,
status BOOLEAN,
taxrate INT,
unit STRING,
updated TIMESTAMP_LTZ(3),
price DECIMAL(10, 5),
taxcode STRING,
clone STRING,
lastOrderAt TIMESTAMP_LTZ(3),
manual STRING,
pn STRING,
cumulativeSales INT,
isDeprecated BOOLEAN,
ship STRING,
storage STRING,
isPublic BOOLEAN,
invtCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'XXX',
'password' = 'XXX',
'database' = 'biocitydb',
'collection' = 'material'
);
CREATE TABLE es_material (
_id STRING,
created TIMESTAMP_LTZ(3),
mfrId STRING,
mfrName STRING,
name STRING,
ras STRING,
sn STRING,
spec STRING,
status BOOLEAN,
taxrate INT,
unit STRING,
updated TIMESTAMP_LTZ(3),
price DECIMAL(10, 5),
taxcode STRING,
clone STRING,
lastOrderAt TIMESTAMP_LTZ(3),
manual STRING,
pn STRING,
cumulativeSales INT,
isDeprecated BOOLEAN,
ship STRING,
storage STRING,
isPublic BOOLEAN,
invtCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'es_material'
);
INSERT INTO
es_material
select
_id,
created,
mfrId,
mfrName,
name,
ras,
sn,
spec,
status,
taxrate,
unit,
updated,
price,
taxcode,
clone,
lastOrderAt,
manual,
pn,
cumulativeSales,
isDeprecated,
ship,
storage,
isPublic,
invtCode
from
material;

添加Maven pom 或者Upload Jar

  • flink-sql-connector-elasticsearch7_2.11-1.13.2.jar
  • flink-sql-connector-mongodb-cdc-2.2.1.jar
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.14.4</version>
</dependency>

C6zfRh