背景
我们之前在Flink CDC同步数据实践 快速的体验了从mysql和pg获取数据,最后在es打成宽表数据。但是对于其他的数据库如sql server等source表的实际能力还未可知,本次就一次的调研实践一次。
安装:从代码编译
正常情况下直接下载编译好的即可
因为flink cdc新特性在master分支,并且没有release,比如新版sql server在master,未发布release-2.2,我们来从源码编译
1 2 3
| git clone https://github.com/ververica/flink-cdc-connectors.git cd flink-cdc-connectors mvn clean install -DskipTests
|
常见FlinkSql命令
首先你得启动吧
1 2 3 4
| // 启动集群 bin/start-cluster.sh // 停止集群 bin/stop-cluster.sh
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| // 启动集群后,进入flink sql 客户端命令行界面 bin/sql-client.sh embedded
// 表格模式 SET 'sql-client.execution.result-mode' = 'table'; // 变更日志模式 SET 'sql-client.execution.result-mode' = 'changelog'; // Tableau模式 SET 'sql-client.execution.result-mode' = 'tableau';
// 查看当前运行的jobs bin/flink list // 查看所有的任务,包括失败、成功、取消的 bin/flink list -a // 取消命令 bin/flink cancel jobID
|
1 2 3 4 5 6 7 8 9
| SHOW CATALOGS; SHOW DATABASES; SHOW TABLES; SHOW VIEWS; SHOW FUNCTIONS; SELECT CURRENT_TIMESTAMP; RESET table.planner; RESET quit
|
Source表
DataGen ☑️ 测试通过
在flink 1.11中,内置提供了一个DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'datagen', -- optional options -- 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' );
select * from datagen;
|
filesystem ☑️ 测试通过
1 2 3 4 5 6 7 8 9 10 11
| CREATE TABLE employee_information ( emp_id INT, name VARCHAR, dept_id INT ) WITH ( 'connector' = 'filesystem', 'path' = '/path/to/something.csv', 'format' = 'csv' );
SELECT * from employee_information WHERE dept_id = 1;
|
mongodb ☑️ 测试通过
先决条件:副本集要求,你懂的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| CREATE TABLE offices ( _id STRING, name STRING, addr STRING, status BOOLEAN, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb-cdc', 'hosts' = '10.8.99.44:27011', 'username' = 'root', 'password' = '@junyao2022', 'database' = 'biocitydb', 'collection' = 'offices' );
select * from offices;
|
mysql ☑️ 测试通过
先决条件:binlog开启,你懂的
1 2 3 4 5 6 7 8 9 10 11 12 13
| CREATE TABLE products ( id INT, name STRING, description STRING ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'products' );
|
postgres ☑️ 测试通过
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'localhost', 'port' = '5432', 'username' = 'postgres', 'password' = 'postgres', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments' );
|
sql Server ☑️ 测试通过
先决条件:需要先开启sql server的cdc能力:EXEC sys.sp_cdc_enable_db;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| CREATE TABLE material ( FMATERIALID INT, FNUMBER STRING, PRIMARY KEY (FMATERIALID) NOT ENFORCED ) WITH ( 'connector' = 'sqlserver-cdc', 'hostname' = '10.8.99.31', 'port' = '1433', 'username' = 'sa', 'password' = 'XXXsa1999$', 'database-name' = 'dkw', 'schema-name' = 'dbo', 'table-name' = 'T_BD_MATERIAL' );
select * from material;
|
Sink表
elasticsearch
http验证通过 ✅
https、ssl,带证书未知如何配置 ❎
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| CREATE TABLE enriched_orders ( order_id INT, order_date TIMESTAMP_LTZ(3), customer_id INT, price DECIMAL(10, 5), product ROW<name STRING, description STRING>, order_status BOOLEAN, customer_name STRING, customer_address STRING, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'enriched_orders', 'username' = 'root', 'password' = 'password' );
|
Doris
1 2 3 4 5 6 7 8 9 10 11 12 13
| CREATE TABLE doris_test_sink ( id INT, name STRING ) WITH ( 'connector' = 'doris', 'fenodes' = 'localhost:8030', 'table.identifier' = 'db_audit.doris_test', 'sink.batch.size' = '2', 'sink.batch.interval'='1', 'username' = 'root', 'password' = '' )
|
执行插入Sink表
1 2 3 4 5
| INSERT INTO department_counts SELECT dept_id, COUNT(*) as emp_count FROM employee_information;
|
相关链接
flink-cdc-connectors