Flink CDC关于source和sink全调研及实践

I3OVQU

背景

我们之前在Flink CDC同步数据实践 快速的体验了从mysql和pg获取数据,最后在es打成宽表数据。但是对于其他的数据库如sql server等source表的实际能力还未可知,本次就一次的调研实践一次。

安装:从代码编译

正常情况下直接下载编译好的即可

因为flink cdc新特性在master分支,并且没有release,比如新版sql server在master,未发布release-2.2,我们来从源码编译

1
2
3
git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

常见FlinkSql命令

首先你得启动吧

1
2
3
4
// 启动集群
bin/start-cluster.sh
// 停止集群
bin/stop-cluster.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 启动集群后,进入flink sql 客户端命令行界面
bin/sql-client.sh embedded

// 表格模式
SET 'sql-client.execution.result-mode' = 'table';
// 变更日志模式
SET 'sql-client.execution.result-mode' = 'changelog';
// Tableau模式
SET 'sql-client.execution.result-mode' = 'tableau';

// 查看当前运行的jobs
bin/flink list
// 查看所有的任务,包括失败、成功、取消的
bin/flink list -a
// 取消命令
bin/flink cancel jobID
1
2
3
4
5
6
7
8
9
SHOW CATALOGS;
SHOW DATABASES;
SHOW TABLES;
SHOW VIEWS;
SHOW FUNCTIONS;
SELECT CURRENT_TIMESTAMP;
RESET table.planner;
RESET
quit

Source表

DataGen ☑️ 测试通过

在flink 1.11中,内置提供了一个DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
);

select * from datagen;

filesystem ☑️ 测试通过

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE employee_information (
emp_id INT,
name VARCHAR,
dept_id INT
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/something.csv',
'format' = 'csv'
);

SELECT * from employee_information WHERE dept_id = 1;

mongodb ☑️ 测试通过

先决条件:副本集要求,你懂的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE offices (
_id STRING,
name STRING,
addr STRING,
status BOOLEAN,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '10.8.99.44:27011',
'username' = 'root',
'password' = '@junyao2022',
'database' = 'biocitydb',
'collection' = 'offices'
);

select * from offices;

mysql ☑️ 测试通过

先决条件:binlog开启,你懂的

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE products (
id INT,
name STRING,
description STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);

postgres ☑️ 测试通过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments'
);

sql Server ☑️ 测试通过

先决条件:需要先开启sql server的cdc能力:EXEC sys.sp_cdc_enable_db;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE material (
FMATERIALID INT,
FNUMBER STRING,
PRIMARY KEY (FMATERIALID) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '10.8.99.31',
'port' = '1433',
'username' = 'sa',
'password' = 'XXXsa1999$',
'database-name' = 'dkw',
'schema-name' = 'dbo',
'table-name' = 'T_BD_MATERIAL'
);

select * from material;

Sink表

elasticsearch

http验证通过 ✅
https、ssl,带证书未知如何配置 ❎

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP_LTZ(3),
customer_id INT,
price DECIMAL(10, 5),
product ROW<name STRING, description STRING>,
order_status BOOLEAN,
customer_name STRING,
customer_address STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders',
'username' = 'root',
'password' = 'password'
);

Doris

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE doris_test_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'db_audit.doris_test',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
)

执行插入Sink表

1
2
3
4
5
INSERT INTO department_counts
SELECT
dept_id,
COUNT(*) as emp_count
FROM employee_information;

相关链接

flink-cdc-connectors