Flink CDC实践之Doris预聚合

背景及目标

1.尝试 CDC mysql 数据到doris
2.通过网站访问pv,使用doris实践对数据的PV进行预聚合

源表

  • 创建Mysql数据库表

    1
    2
    3
    4
    5
    6
    CREATE TABLE `mysql_pv` (
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT DEFAULT '0'
    );
  • 创建doris表

    这里对PV进行SUM预聚合

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE TABLE doris_pv
    (
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
    )
    AGGREGATE KEY(siteid, citycode, username)
    DISTRIBUTED BY HASH(siteid) BUCKETS 10
    PROPERTIES("replication_num" = "1");

映射表

source

mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE mysql_pv_source ( 
siteid INT,
citycode SMALLINT,
username STRING,
pv BIGINT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'demo',
'table-name' = 'offices'
);

sink

doris

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE doris_pv_sink (
siteid INT,
citycode SMALLINT,
username STRING,
pv BIGINT
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'db_audit.doris_pv',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
)

执行

1
INSERT INTO doris_pv_sink select siteid,citycode,username,pv from mysql_pv_source

相关链接

使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris