背景

​DataX 是一个异构数据源离线同步工具,本次需求是定时调度数据库,

  • mongodb、mysql定时同步到es
  • mongodb、mysql定时同步到StarRocks

原则是要配合海豚调度DolphinScheduler,但是DolphinScheduler目前看有点重,晚点评估。

job

  • mysql同步StarRocks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    {
    "job":
    {
    "setting":
    {
    "speed":
    {
    "channel": 3
    },
    "errorLimit":
    {
    "record": 0,
    "percentage": 0.02
    }
    },
    "content":
    [
    {
    "reader":
    {
    "name": "mysqlreader",
    "parameter":
    {
    "username": "root",
    "password": "XXX",
    "column":
    [
    "id",
    "user_name",
    "wecom_user_id",
    "sap_code",
    "password",
    "name",
    "cellphone",
    "email"
    ],
    "splitPk": "id",
    "connection":
    [
    {
    "table":
    [
    "user"
    ],
    "jdbcUrl":
    [
    "jdbc:mysql://192.168.103.113:3306/user"
    ]
    }
    ]
    }
    },
    "writer":
    {
    "name": "starrockswriter",
    "parameter":
    {
    "username": "root",
    "password": "XXX",
    "column":
    [
    "id",
    "user_name",
    "wecom_user_id",
    "sap_code",
    "password",
    "name",
    "cellphone",
    "email"
    ],
    "preSql":
    [],
    "postSql":
    [],
    "connection":
    [
    {
    "table":
    [
    "user"
    ],
    "jdbcUrl": "jdbc:mysql://192.168.103.202:9030/",
    "selectedDatabase": "assistant"
    }
    ],
    "loadUrl":
    [
    "192.168.103.202:8040" // FE的http port,这里直接用CN的port
    ],
    "loadProps":
    {}
    }
    }
    }
    ]
    }
    }
  • mongodb同步es

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    {
    "job":
    {
    "setting":
    {
    "speed":
    {
    "channel": 2
    }
    },
    "content":
    [
    {
    "reader":
    {
    "name": "mongodbreader",
    "parameter":
    {
    "address":
    [
    "192.168.103.113:27011"
    ],
    "userName": "root",
    "userPassword": "XXX",
    "authDb": "admin",
    "dbName": "biocitydb",
    "collectionName": "companies",
    "column":
    [
    {
    "name": "_id",
    "type": "string"
    },
    {
    "name": "name",
    "type": "string"
    },
    {
    "name": "sapCode",
    "type": "string"
    }
    ]
    }
    },
    "writer":
    {
    "name": "elasticsearchwriter",
    "parameter":
    {
    "endpoint": "http://192.168.199.113:9200",
    "accessId": "root",
    "accessKey": "XXX",
    "index": "companies",
    "type": "default",
    "cleanup": true,
    "settings":
    {
    "index":
    {
    "number_of_shards": 1,
    "number_of_replicas": 0
    }
    },
    "discovery": false,
    "batchSize": 1000,
    "splitter": ",",
    "column":
    [
    {
    "name": "id",
    "type": "id"
    },
    {
    "name": "name",
    "type": "keyword"
    },
    {
    "name": "sapCode",
    "type": "keyword"
    }
    ]
    }
    }
    }
    ]
    }
    }

启动

1
2
3
$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}

背景

开始统一数仓,seatunnel评估一轮

mongodb-cdc

  • mongodb-cdc实时同步mysql

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["pms"]
    collection = ["pms.demand_item_row"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "_id" : string,
    "ras" : string,
    "planSn" : string,
    "isPlaned" : boolean
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    _id = _id
    ras = ras
    planSn = planSn
    isPlaned = isPlaned
    }
    }
    }

    sink {
    jdbc {
    url = "jdbc:mysql://192.168.103.113:3306"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "XXX"
    generate_sink_sql = true
    database = test
    table = row
    primary_keys = ["_id"]
    }
    }
  • mongodb-cdc实时同步starrocks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["biocitydb"]
    collection = ["biocitydb.users"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "id" : int,
    "username" : string,
    "wecomUserId" : string,
    "password" : string,
    "name" : string,
    "cellphone" : string,
    "email" : string
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    id = id
    username = user_name
    wecomUserId = wecom_user_id
    password = password
    name = name
    cellphone = cellphone
    email = email
    }
    }
    }

    sink {
    StarRocks {
    batch_max_rows=10240
    table="user"
    database="assistant"
    base-url="jdbc:mysql://192.168.103.202:9030"
    password="XXX"
    username="root"
    nodeUrls=[
    "192.168.103.202:8040"
    ]
    }
    }
  • mongodb-cdc 实时同步es

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["pms"]
    collection = ["pms.demand_item_row"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "_id" : string,
    "ras" : string,
    "planSn" : string,
    "isPlaned" : boolean
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    _id = _id
    ras = ras
    planSn = planSn
    isPlaned = isPlaned
    }
    }
    }

    sink {
    Elasticsearch {
    hosts = ["192.168.103.113:9200"]
    index = "row"
    username = "elastic"
    password = "XXX"
    tls_verify_certificate = false
    primary_keys = ["_id"]
    }
    }

sqlserver cdc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 启用CDC功能
EXEC sys.sp_cdc_enable_db;

-- 确认SQL Server Agent已开启
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'

-- 判断当前数据库是否启用了CDC(如果返回1,表示已启用)
SELECT name,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1;

-- 查询schema_id
SELECT * FROM sys.schemas

-- 判断当前数据表是否启用了CDC(如果返回1,表示已启用)
SELECT name, is_tracked_by_cdc, schema_id FROM sys.tables WHERE is_tracked_by_cdc = 1;

-- source_schema 是表所属的架构(schema)的名称。
-- source_name 是要启用 CDC 跟踪的表的名称。
-- cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。
EXEC sys.sp_cdc_enable_table
@source_schema = 'wmwhse1',
@source_name = 'PICKDETAIL',
@role_name = 'cdc_role';

-- 一次核验是否开启CDC
USE SCPRD;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO

启动

1
2
3
4
开始
./bin/seatunnel.sh --config ./example/mongo2sr3 -e local
// 恢复
./bin/seatunnel.sh --config ./example/mongo2sr3 -r {jobId} -e local

其他

SQL Server CDC功能配置总结

背景

内网k8s集群需求:StarRocks的9030端口或mysql的3306端口需要暴露出去,而他们TCP协议,是L4层服务,而ingress是http协议,是L7层服务,不能使用ingress暴露出去

  • k8s-Starrocks情况
    • services: starrocks/starrockscluster-fe-service
      eaVxzN

相关配置

  • deployment: ingress-nginx-controller配置

    • 增加 hostNetwork: true,pod中运行的应用程序可以直接看到宿主主机的网络接口,宿主机所在的局域网上所有网络接口都可以访问到该应用程序及端口
    • 增加 - '--tcp-services-configmap=$(POD_NAMESPACE)/tcp-services'
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      spec:
      hostNetwork: true // 增加
      containers:
      - name: controller
      image: dyrnq/ingress-nginx-controller:v1.6.4
      args:
      - /nginx-ingress-controller
      - '--election-id=ingress-nginx-leader'
      - '--controller-class=k8s.io/ingress-nginx'
      - '--ingress-class=nginx'
      - '--configmap=$(POD_NAMESPACE)/ingress-nginx-controller'
      - '--validating-webhook=:8443'
      - '--validating-webhook-certificate=/usr/local/certificates/cert'
      - '--validating-webhook-key=/usr/local/certificates/key'
      - '--tcp-services-configmap=$(POD_NAMESPACE)/tcp-services' // 增加
      - '--udp-services-configmap=$(POD_NAMESPACE)/udp-services'
  • 编写TCP/UDP端口转发规则实现L4层服务暴露
    kubectl create -f tcp-services-configmap.yaml -n ingress-nginx

    1
    2
    3
    4
    5
    6
    7
    8
    9
    kind: ConfigMap
    apiVersion: v1
    metadata:
    name: tcp-services
    namespace: ingress-nginx
    data:
    '8030': starrocks/starrockscluster-fe-service:8030
    '8040': starrocks/starrockscluster-cn-service:8040
    '9030': starrocks/starrockscluster-fe-service:9030
  • 验证TCP 端口的L4服务暴露,查看pod nginx-ingress-controller的ip

    1
    2
    3
    4
    5
    > kubectl get pod -n ingress-nginx -owide
    NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
    ingress-nginx-admission-create-cpjcl 0/1 Completed 0 70d 10.244.3.20 k8s-node3 <none> <none>
    ingress-nginx-admission-patch-r6ql7 0/1 Completed 0 70d 10.244.2.12 k8s-node1 <none> <none>
    ingress-nginx-controller-58bcff6c76-xdmzq 1/1 Running 0 14m 192.168.103.202 k8s-master1 <none> <none>
  • navicat连接
    I7GLoM

相关连接

Nginx+Ingress-controller解决服务暴露和负载均衡

背景

为了让mongodb能使用标准化的sql语句查询,我们使用官方的mongo-bi做一层转换,这一步是统一数仓的关键。

目前该方案不具可行性,原因:

  • bi-connect连接mongodb数据源必须开启ssl,这就导致外部客户端连接bi-connect必须useSSL=true,而StarRocks的catalogs未支持SSL
  • 不稳定,从BI的外连接看,经常报错

下载安装

bi-connector支持不同平台安装部署,这里针对Linux环境安装部署配置进行记录。

通过官网下载:https://www.mongodb.com/try/download/bi-connector

我这里下载的文件版本为mongodb-bi-linux-x86_64-rhel70-v2.14.12.tgz
下载后解压到/opt/mongodb-bi目录

创建证书

当MongoDB启用认证时,bi-connector必须要配置使用证书,才能通过bi-connector连接mongodb

这里先创建证书

1
2
3
4
5
#执行创建 SSL 证书
mkdir -p /opt/mongodb-bi/certs
cd /opt/mongodb-bi/certs
openssl req -nodes -newkey rsa:2048 -keyout dakeweBI.key -out dakeweBI.crt -x509 -days 365 -subj "/C=US/ST=dakeweBI/L=dakeweBI/O=dakeweBI Security/OU=IT Department/CN=kayakwise.com"
cat dakeweBI.crt dakeweBI.key > dakeweBI.pem

安装 MongoDB BI Connector

1
sudo install -m755 bin/mongo* /usr/bin/

配置 MongoDB BI 配置文件

1
2
3
mkdir -p /opt/mongodb-bi/conf/
mkdir -p /opt/mongodb-bi/logs/
mkdir -p /opt/mongodb-bi/schemas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
net:
bindIp: "0.0.0.0"
port: 3307
ssl:
mode: allowSSL
PEMKeyFile: '/opt/mongodb-bi/certs/dakeweBI.pem'
allowInvalidCertificates: true
minimumTLSVersion: TLS1_0
mongodb:
net:
uri: "mongodb://192.168.103.113:27011"
ssl:
enabled: false
auth:
username: root
password: 'XXX'
source: admin
mechanism: SCRAM-SHA-1
security:
enabled: true
defaultMechanism: "SCRAM-SHA-1"
defaultSource: "admin"
systemLog:
path: /opt/mongodb-bi/logs/mongosqld.log
verbosity: 2
logAppend: true
schema:
path: /opt/mongodb-bi/schemas
maxVarcharLength: 65535
processManagement:
service:
name: "mongosql"
displayName: "MongoSQL Service"
description: "MongoSQL accesses MongoDB data with SQL"

安装 MongoDB BI Connector 服务

1
2
3
4
5
mongosqld install --config /opt/mongodb-bi/conf/mongosqld-config.yml
#重新加载
systemctl daemon-reload
#设置开机自启
systemctl enable mongosql.service

启动

1
2
3
4
5
6
#执行生成 schema 
mongodrdl --host 192.168.103.113:27011 --username root --password XXX --db assistant --authenticationDatabase admin --authenticationMechanism SCRAM-SHA-1 --out /opt/mongodb-bi/schemas/schemas.drdl
# 临时启动
mongosqld --config=/opt/mongodb-bi/conf/mongosqld-config.yml
# 常驻启动
systemctl start mongosql.service

连接查询

  • mysql cil,注意用户名密码是mongodb的用户名密码

    1
    mysql --enable-cleartext-plugin --user='root?source=admin&mechanism=SCRAM-SHA-1' --host=192.168.103.153 --protocol=tcp --port=3307 -p
  • navicat,需要勾选使用ssl
    4qsgkY

  • jdbc连接需要添加额外的JDBC连接字符串
    characterEncoding=UTF-8&connectTimeout=5000&useSSL=true&allowPublicKeyRetrieval=true&verifyServerCertificate=false

其他相关命令

1
2
3
4
5
6
7
8
9
10
11
systemctl start mongosql.service
systemctl status mongosql.service
systemctl stop mongosql.service
systemctl disable mongosql.service
journalctl -u mongosql.service
rm /etc/systemd/system/mongosql.service
rm /etc/systemd/system/mongosql.service
rm /usr/lib/systemd/system/mongosql.service
rm /usr/lib/systemd/system/mongosql.service
systemctl daemon-reload
systemctl reset-failed

lSj3FY

k8s集群背景

k8s-1.25.4部署笔记(containerd)
k8s使用nfs作为动态storageClass存储

部署 StarRocks Operator

  • 添加定制资源 StarRocksCluster
    1
    kubectl apply -f https://raw.githubusercontent.com/StarRocks/starrocks-kubernetes-operator/main/deploy/starrocks.com_starrocksclusters.yaml
  • 部署 StarRocks Operator
    1
    kubectl apply -f https://raw.githubusercontent.com/StarRocks/starrocks-kubernetes-operator/main/deploy/operator.yaml

部署 StarRocks 集群

配置文件范例

我这里配置一个使用腾讯云COS做存算分离+storageClass做FE元数据持久化

shared_data_mode.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
apiVersion: starrocks.com/v1
kind: StarRocksCluster
metadata:
name: starrockscluster
namespace: starrocks
spec:
starRocksFeSpec:
image: starrocks/fe-ubuntu:3.2.2
replicas: 1
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 2
memory: 4Gi
# service:
# type: NodePort # export fe service
# ports:
# - name: query # fill the name from the fe service ports
# nodePort: 32755
# port: 9030
# containerPort: 9030
storageVolumes:
- name: fe-storage-meta
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 10Gi # the size of storage volume for metadata
mountPath: /opt/starrocks/fe/meta # the path of metadata
- name: fe-storage-log
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 1Gi # the size of storage volume for log
mountPath: /opt/starrocks/fe/log # the path of log
configMapInfo:
configMapName: starrockscluster-fe-cm
resolveKey: fe.conf
starRocksCnSpec:
image: starrocks/cn-ubuntu:3.2.2
replicas: 1
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 2
memory: 4Gi
configMapInfo:
configMapName: starrockscluster-cn-cm
resolveKey: cn.conf
storageVolumes:
- name: cn-storage-data
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 10Gi # the size of storage volume for data
mountPath: /opt/starrocks/cn/storage # the path of data
- name: cn-storage-log
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 1Gi # the size of storage volume for log
mountPath: /opt/starrocks/cn/log # the path of log
starRocksFeProxySpec:
replicas: 1
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 1
memory: 2Gi
service:
type: NodePort # export fe proxy service
ports:
- name: http-port # fill the name from the fe proxy service ports
containerPort: 8080
nodePort: 30180 # The range of valid ports is 30000-32767
port: 8080
resolver: "kube-dns.kube-system.svc.cluster.local" # this is the default dns server.

---

# fe config
apiVersion: v1
kind: ConfigMap
metadata:
name: starrockscluster-fe-cm
namespace: starrocks
labels:
cluster: starrockscluster
data:
fe.conf: |
LOG_DIR = ${STARROCKS_HOME}/log
DATE = "$(date +%Y%m%d-%H%M%S)"
JAVA_OPTS="-Dlog4j2.formatMsgNoLookups=true -Xmx8192m -XX:+UseMembar -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:-CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:SoftRefLRUPolicyMSPerMB=0 -Xloggc:${LOG_DIR}/fe.gc.log.$DATE"
JAVA_OPTS_FOR_JDK_9="-Dlog4j2.formatMsgNoLookups=true -Xmx8192m -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -XX:+CMSClassUnloadingEnabled -XX:-CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:SoftRefLRUPolicyMSPerMB=0 -Xlog:gc*:${LOG_DIR}/fe.gc.log.$DATE:time"
JAVA_OPTS_FOR_JDK_11="-Dlog4j2.formatMsgNoLookups=true -Xmx8192m -XX:+UseG1GC -Xlog:gc*:${LOG_DIR}/fe.gc.log.$DATE:time"
http_port = 8030
rpc_port = 9020
query_port = 9030
edit_log_port = 9010
mysql_service_nio_enabled = true
sys_log_level = INFO
run_mode = shared_data
cloud_native_meta_port = 6090
# 是否允许 StarRocks 使用 FE 配置文件中指定的存储相关属性创建默认存储卷
enable_load_volume_from_conf = true
aws_s3_path = files-prod-1253767413/starrocks
aws_s3_region = ap-guangzhou
aws_s3_endpoint = https://cos.ap-guangzhou.myqcloud.com
aws_s3_access_key = XXX
aws_s3_secret_key = XXX

---

# cn config
apiVersion: v1
kind: ConfigMap
metadata:
name: starrockscluster-cn-cm
namespace: starrocks
labels:
cluster: starrockscluster
data:
cn.conf: |
sys_log_level = INFO
# ports for admin, web, heartbeat service
thrift_port = 9060
webserver_port = 8040
heartbeat_service_port = 9050
brpc_port = 8060
1
kubectl apply -f shared_data_mode.yaml

相关链接

使用 Operator 部署 StarRocks 集群

背景

验证一轮查询外部数据湖

Catalog

  • JDBC ✅已验证

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE EXTERNAL CATALOG jdbc1
    PROPERTIES
    (
    "type"="jdbc",
    "user"="root",
    "password"="XXX",
    "jdbc_uri"="jdbc:mysql://172.16.0.9:3306",
    "driver_url"="https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar",
    "driver_class"="com.mysql.cj.jdbc.Driver"
    );
  • es

    • 存算一体+自建es ✅已验证
    • 基于腾讯云cos存算分离 + 腾讯云es产品 ✅已验证

      存在问题,当使用腾讯云默认索引开了动态模板,starrocks解析mapping有bug,会导致show tables出不来

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      CREATE EXTERNAL CATALOG es_test
      COMMENT 'es'
      PROPERTIES
      (
      "type" = "es",
      "es.type" = "_doc",
      "hosts" = "http://172.16.0.8:9200",
      "es.net.ssl" = "false",
      "user" = "elastic",
      "password" = "xxx",
      "es.nodes.wan.only" = "false"
      );

EXTERNAL TABLE

  • COS文件外部表 ❌验证失败
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    CREATE EXTERNAL TABLE user_behavior_fet
    (
    UserID int,
    ItemID int,
    CategoryID int,
    BehaviorType string
    )
    ENGINE=file
    PROPERTIES
    (
    "path" = "s3://race-1301288126/input/user_behavior_ten_million_rows.parquet",
    "format" = "parquet",
    "aws.s3.enable_ssl" = "false",
    "aws.s3.enable_path_style_access" = "true",
    "aws.s3.endpoint" = "cos.ap-guangzhou.myqcloud.com",
    "aws.s3.access_key" = "AKIDAFICxeJwnror5OarCRYXQMF1f5X7lJtO",
    "aws.s3.secret_key" = "XXX"
    );

在现代数据库中,很多数据库都支持分区(Partition)或分桶(Tablet,分桶有时候又叫分片),它的主要目的是提高查询性能。
StarRocks 使用先分区后分桶的方式,可灵活地支持两种分布方式:

  • Hash分布: 不采用分区方式,整个table作为一个分区,只需指定分桶的数量。
  • Range-Hash组合数据分布: 设置分区,指定每个分区的分桶数量。
    StarRocks 同时支持分区和分桶,若干个Tablet组成一个Partition 。

分区

分区的主要作⽤是将⼀张表按照分区键拆分成不同的管理单元,针对每⼀个管理单元选择相应的存储策略,⽐如副本数、冷热策略和存储介质等等。对于访问频率高的分区,可以使用SSD存储;对于访问频率低的分区,可以使用STAT存储。选择合理的分区键可以有效的裁剪扫描的数据量,一般选择日期或者区域作为分区键

  • PARTITION BY RANGE()
  • 在实际应用中,用户一般选取时间列作为分区键,具体划分的粒度视数据量而定,单个分区原始数据量建议维持在100GB以内。

分桶

分桶的目的就是将数据打散为一个个逻辑分片(Tablet),以Tablet作为数据均衡的最小单位,使数据尽量均匀的分布在集群的各个BE节点上,以便在查询时充分发挥集群多机多核的优势。

  • DISTRIBUTED BY HASH()
  • 对每个分区的数据,StarRocks还会再进行Hash分桶。我们在建表时通过DISTRIBUTED BY HASH()语句来设置分桶
  • 在StarRocks的存储引擎中,用户数据被水平划分为若干个数据分片Tablet,也称作数据分桶)。每个 Tablet包含若干数据行,各个Tablet之间的数据没有交集,并且在物理上是独立存储的。
  • 多个Tablet在逻辑上归属于不同的分区(Partition)。一个Tablet只属于一个Partition,而一个Partition包含若干个 Tablet。因为Tablet在物理上是独立存储的,所以可以视为Partition在物理上也是独立。Tablet是数据移动、复制等操作的最小物理存储单元。
  • 若干个Partition组成一个Table。Partition可以视为是逻辑上最小的管理单元,数据的导入与删除,都可以或仅能针对一个Partition进行。

副本数

StarRocks中的副本数就是同一个Tablet保存的份数,在建表时通过replication_num参数指定,也可以后面修改。默认不指定时,StarRocks使用三副本建表,也即每个Tablet会在不同节点存储三份(StarRocks的副本策略会将某个tablet的副本存储在与其不同IP的节点)。

为方便理解,我们假设当前有一个3BE节点的集群,有表Table A和Table B,表A和表B建表时都未设置分区(视为一个大分区),分桶数为3,副本数replication_num为2,则表A和表B在集群中数据分布的一种可能如下图:
yy05cc

案例讲解

如下:StarRocks 的数据划分以及 Tablet 多副本机制

dSQ1Ju

  • 表按照日期划分为4个分区,第一个分区切分成4个Tablet。每个Tablet使用3副本进行备份,分布在3个不同的BE节点上。

  • 由于一张表被切分成了多个Tablet,StarRocks在执行SQL语句时,可以对所有Tablet实现并发处理,从而充分的利用多机、多核提供的计算能力。

  • 用户也可以利用StarRocks数据的切分方式,将高并发请求压力分摊到多个物理节点,从而可以通过增加物理节点的方式来扩展系统支持高并发的能力。

总结一下

在StarRocks中,Partition是数据导入和备份恢复的最小逻辑单位,Tablet是数据复制和均衡的最小物理单位。表(Table)、分区(Partition)、逻辑分片(Tablet)的关系如下图:
bPDqhg

分区是针对表的,是对表的数据取段。分桶是针对每个分区的,会将分区后的每段数据打散为逻辑分片Tablet。副本数是针对Tablet的,是指Tablet保存的份数。那么我们不难发现,对某一个数据表,若每个分区的分桶数一致,其总Tablet数:总Tablet数=分区数分桶数副本数

以table01为例,我们为其设置了3个分区,为每个分区设置了20个分桶,又对分桶后的tablet设置了1副本,则table01的总tablet数= 3 * 20 * 1 = 60 个。查看table01的tablet信息,发现确实共有60个tablet:

1
2
show tablet from table01;
60 rows in set (0.01 sec)

背景及数据准备

本次仅记录一些常见的导入

  • 纽约市交通事故数据
    1
    curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/quickstart/datasets/NYPD_Crash_Data.csv
  • 天气数据
    1
    curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/quickstart/datasets/72505394728.csv
  • 用户行为数据集
    1
    curl -O https://starrocks-datasets.s3.amazonaws.com/user_behavior_ten_million_rows.parquet
  • file1.csv
    1
    2
    3
    4
    1,Lily,21
    2,Rose,22
    3,Alice,23
    4,Julia,24
  • file2.csv
    1
    2
    3
    4
    5,Tony,25
    6,Adam,26
    7,Allen,27
    8,Jacky,28
  • file3.json
    1
    {"name": "北京", "code": 2}

从本地文件系统导入(Stream Load)

TIP:http://<fe_host>:<fe_http_port>默认端口号为8030; 也可以是http://<be_host>:<be_http_port>默认端口号为 8040。

  • table1表或table2表

    • 创建table1表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      CREATE TABLE `table1`
      (
      `id` int(11) NOT NULL COMMENT "用户 ID",
      `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
      `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);
    • 导入file1.json
      1
      2
      3
      4
      5
      6
      curl --location-trusted -u <username>:<password> -H "label:123" \
      -H "Expect:100-continue" \
      -H "column_separator:," \
      -H "columns: id, name, score" \
      -T file1.csv -XPUT \
      http://<fe_host>:<fe_http_port>/api/import_example/table1/_stream_load
  • table_json表

    • 创建 table_json表
      1
      2
      3
      4
      5
      6
      7
      8
      CREATE TABLE `table_json`
      (
      `id` int(11) NOT NULL COMMENT "城市 ID",
      `city` varchar(65533) NULL COMMENT "城市名称"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);
    • 导入file3.json
      1
      2
      3
      4
      5
      6
      curl -v --location-trusted -u <username>:<password> -H "strict_mode: true" \
      -H "Expect:100-continue" \
      -H "format: json" -H "jsonpaths: [\"$.name\", \"$.code\"]" \
      -H "columns: city,tmp_id, id = tmp_id * 100" \
      -T file3.json -XPUT \
      http://<fe_host>:<fe_http_port>/api/import_example/table_json/_stream_load
  • crashdata 表

    • 创建 crashdata 表,用于存储交通事故数据集中的数据。该表的字段经过裁剪,仅包含与该教程相关字段。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      CREATE DATABASE IF NOT EXISTS import_example;
      USE import_example;

      CREATE TABLE IF NOT EXISTS crashdata (
      CRASH_DATE DATETIME,
      BOROUGH STRING,
      ZIP_CODE STRING,
      LATITUDE INT,
      LONGITUDE INT,
      LOCATION STRING,
      ON_STREET_NAME STRING,
      CROSS_STREET_NAME STRING,
      OFF_STREET_NAME STRING,
      CONTRIBUTING_FACTOR_VEHICLE_1 STRING,
      CONTRIBUTING_FACTOR_VEHICLE_2 STRING,
      COLLISION_ID INT,
      VEHICLE_TYPE_CODE_1 STRING,
      VEHICLE_TYPE_CODE_2 STRING
      );
    • 导入纽约市交通事故数据
      1
      2
      3
      4
      5
      6
      7
      8
      9
      curl --location-trusted -u root             \
      -T ./NYPD_Crash_Data.csv \
      -H "label:crashdata-0" \
      -H "column_separator:," \
      -H "skip_header:1" \
      -H "enclose:\"" \
      -H "max_filter_ratio:1" \
      -H "columns:tmp_CRASH_DATE, tmp_CRASH_TIME, CRASH_DATE=str_to_date(concat_ws(' ', tmp_CRASH_DATE, tmp_CRASH_TIME), '%m/%d/%Y %H:%i'),BOROUGH,ZIP_CODE,LATITUDE,LONGITUDE,LOCATION,ON_STREET_NAME,CROSS_STREET_NAME,OFF_STREET_NAME,NUMBER_OF_PERSONS_INJURED,NUMBER_OF_PERSONS_KILLED,NUMBER_OF_PEDESTRIANS_INJURED,NUMBER_OF_PEDESTRIANS_KILLED,NUMBER_OF_CYCLIST_INJURED,NUMBER_OF_CYCLIST_KILLED,NUMBER_OF_MOTORIST_INJURED,NUMBER_OF_MOTORIST_KILLED,CONTRIBUTING_FACTOR_VEHICLE_1,CONTRIBUTING_FACTOR_VEHICLE_2,CONTRIBUTING_FACTOR_VEHICLE_3,CONTRIBUTING_FACTOR_VEHICLE_4,CONTRIBUTING_FACTOR_VEHICLE_5,COLLISION_ID,VEHICLE_TYPE_CODE_1,VEHICLE_TYPE_CODE_2,VEHICLE_TYPE_CODE_3,VEHICLE_TYPE_CODE_4,VEHICLE_TYPE_CODE_5" \
      -XPUT http://<fe_host>:<fe_http_port>/api/import_example/crashdata/_stream_load
  • weatherdata 表

    • 创建 weatherdata 表,用于存储天气数据集中的数据。该表的字段同样经过裁剪,仅包含与该教程相关字段。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      CREATE DATABASE IF NOT EXISTS import_example;
      USE import_example;
      CREATE TABLE IF NOT EXISTS weatherdata (
      DATE DATETIME,
      NAME STRING,
      HourlyDewPointTemperature STRING,
      HourlyDryBulbTemperature STRING,
      HourlyPrecipitation STRING,
      HourlyPresentWeatherType STRING,
      HourlyPressureChange STRING,
      HourlyPressureTendency STRING,
      HourlyRelativeHumidity STRING,
      HourlySkyConditions STRING,
      HourlyVisibility STRING,
      HourlyWetBulbTemperature STRING,
      HourlyWindDirection STRING,
      HourlyWindGustSpeed STRING,
      HourlyWindSpeed STRING
      );
    • 导入天气数据
      1
      2
      3
      4
      5
      6
      7
      8
      9
      curl --location-trusted -u root             \
      -T ./72505394728.csv \
      -H "label:weather-0" \
      -H "column_separator:," \
      -H "skip_header:1" \
      -H "enclose:\"" \
      -H "max_filter_ratio:1" \
      -H "columns: STATION, DATE, LATITUDE, LONGITUDE, ELEVATION, NAME, REPORT_TYPE, SOURCE, HourlyAltimeterSetting, HourlyDewPointTemperature, HourlyDryBulbTemperature, HourlyPrecipitation, HourlyPresentWeatherType, HourlyPressureChange, HourlyPressureTendency, HourlyRelativeHumidity, HourlySkyConditions, HourlySeaLevelPressure, HourlyStationPressure, HourlyVisibility, HourlyWetBulbTemperature, HourlyWindDirection, HourlyWindGustSpeed, HourlyWindSpeed, Sunrise, Sunset, DailyAverageDewPointTemperature, DailyAverageDryBulbTemperature, DailyAverageRelativeHumidity, DailyAverageSeaLevelPressure, DailyAverageStationPressure, DailyAverageWetBulbTemperature, DailyAverageWindSpeed, DailyCoolingDegreeDays, DailyDepartureFromNormalAverageTemperature, DailyHeatingDegreeDays, DailyMaximumDryBulbTemperature, DailyMinimumDryBulbTemperature, DailyPeakWindDirection, DailyPeakWindSpeed, DailyPrecipitation, DailySnowDepth, DailySnowfall, DailySustainedWindDirection, DailySustainedWindSpeed, DailyWeather, MonthlyAverageRH, MonthlyDaysWithGT001Precip, MonthlyDaysWithGT010Precip, MonthlyDaysWithGT32Temp, MonthlyDaysWithGT90Temp, MonthlyDaysWithLT0Temp, MonthlyDaysWithLT32Temp, MonthlyDepartureFromNormalAverageTemperature, MonthlyDepartureFromNormalCoolingDegreeDays, MonthlyDepartureFromNormalHeatingDegreeDays, MonthlyDepartureFromNormalMaximumTemperature, MonthlyDepartureFromNormalMinimumTemperature, MonthlyDepartureFromNormalPrecipitation, MonthlyDewpointTemperature, MonthlyGreatestPrecip, MonthlyGreatestPrecipDate, MonthlyGreatestSnowDepth, MonthlyGreatestSnowDepthDate, MonthlyGreatestSnowfall, MonthlyGreatestSnowfallDate, MonthlyMaxSeaLevelPressureValue, MonthlyMaxSeaLevelPressureValueDate, MonthlyMaxSeaLevelPressureValueTime, MonthlyMaximumTemperature, MonthlyMeanTemperature, MonthlyMinSeaLevelPressureValue, MonthlyMinSeaLevelPressureValueDate, MonthlyMinSeaLevelPressureValueTime, MonthlyMinimumTemperature, MonthlySeaLevelPressure, MonthlyStationPressure, MonthlyTotalLiquidPrecipitation, MonthlyTotalSnowfall, MonthlyWetBulb, AWND, CDSD, CLDD, DSNW, HDSD, HTDD, NormalsCoolingDegreeDay, NormalsHeatingDegreeDay, ShortDurationEndDate005, ShortDurationEndDate010, ShortDurationEndDate015, ShortDurationEndDate020, ShortDurationEndDate030, ShortDurationEndDate045, ShortDurationEndDate060, ShortDurationEndDate080, ShortDurationEndDate100, ShortDurationEndDate120, ShortDurationEndDate150, ShortDurationEndDate180, ShortDurationPrecipitationValue005, ShortDurationPrecipitationValue010, ShortDurationPrecipitationValue015, ShortDurationPrecipitationValue020, ShortDurationPrecipitationValue030, ShortDurationPrecipitationValue045, ShortDurationPrecipitationValue060, ShortDurationPrecipitationValue080, ShortDurationPrecipitationValue100, ShortDurationPrecipitationValue120, ShortDurationPrecipitationValue150, ShortDurationPrecipitationValue180, REM, BackupDirection, BackupDistance, BackupDistanceUnit, BackupElements, BackupElevation, BackupEquipment, BackupLatitude, BackupLongitude, BackupName, WindEquipmentChangeDate" \
      -XPUT http://<fe_host>:<fe_http_port>/api/import_example/weatherdata/_stream_load

从云储存导入 ✅

将文件放到cos的云储存,导入到StarRocks内
mdDEHG

  • 通过如下语句,把COS存储空间race-1301288126input文件夹内数据文件user_behavior_ten_million_rows.parquet的数据导入到目标表 user_behavior_replica
    • 建表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      CREATE DATABASE IF NOT EXISTS mydatabase;
      USE mydatabase;

      CREATE TABLE user_behavior_replica
      (
      UserID int(11),
      ItemID int(11),
      CategoryID int(11),
      BehaviorType varchar(65533),
      Timestamp varbinary
      )
      ENGINE = OLAP
      DUPLICATE KEY(UserID)
      DISTRIBUTED BY HASH(UserID);
    • 导入任务
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      LOAD LABEL mydatabase.label_brokerloadtest_502
      (
      DATA INFILE("cosn://race-1301288126/input/user_behavior_ten_million_rows.parquet")
      INTO TABLE user_behavior_replica
      FORMAT AS "parquet"
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      )
      PROPERTIES
      (
      "timeout" = "3600"
      );
  • 通过如下语句,把COS存储空间race-1301288126input文件夹内数据文件file1.csvfile2.csv的数据导入到目标表table1table2
    • 建表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      CREATE TABLE `table1`
      (
      `id` int(11) NOT NULL COMMENT "用户 ID",
      `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
      `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);

      CREATE TABLE `table2`
      (
      `id` int(11) NOT NULL COMMENT "用户 ID",
      `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
      `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);
    • 导入单个文件到单表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      LOAD LABEL mydatabase.label_brokerloadtest_501
      (
      DATA INFILE("cosn://race-1301288126/input/file1.csv")
      INTO TABLE table1
      COLUMNS TERMINATED BY ","
      (id, name, score)
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      )
      PROPERTIES
      (
      "timeout" = "3600"
      );
    • 导入多个文件到单表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      LOAD LABEL mydatabase.label_brokerloadtest_402
      (
      DATA INFILE("cosn://race-1301288126/input/*")
      INTO TABLE table1
      COLUMNS TERMINATED BY ","
      (id, name, score)
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      )
      PROPERTIES
      (
      "timeout" = "3600"
      );
    • 导入多个数据文件到多表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      LOAD LABEL mydatabase.label_brokerloadtest_403
      (
      DATA INFILE("cosn://race-1301288126/input/file1.csv")
      INTO TABLE table1
      COLUMNS TERMINATED BY ","
      (id, name, score)
      ,
      DATA INFILE("cosn://race-1301288126/input/file2.csv")
      INTO TABLE table2
      COLUMNS TERMINATED BY ","
      (id, name, score)
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      );
      PROPERTIES
      (
      "timeout" = "3600"
      );

其他数据集

  • 1.3 亿条亚马逊产品的用户评论信息,总大小约为 37GB

    每行包含用户 ID(customer_id)、评论 ID(review_id)、已购买产品 ID(product_id)、产品分类(product_category)、评分(star_rating)、评论标题(review_headline)、评论内容(review_body)等 15 列信息。
    amazon_reviews/amazon_reviews_2010
    amazon_reviews/amazon_reviews_2011
    amazon_reviews/amazon_reviews_2012
    amazon_reviews/amazon_reviews_2013
    amazon_reviews/amazon_reviews_2014
    amazon_reviews/amazon_reviews_2015

  • 建表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    CREATE TABLE `amazon_reviews` (  
    `review_date` int(11) NULL,
    `marketplace` varchar(20) NULL,
    `customer_id` bigint(20) NULL,
    `review_id` varchar(40) NULL,
    `product_id` varchar(10) NULL,
    `product_parent` bigint(20) NULL,
    `product_title` varchar(500) NULL,
    `product_category` varchar(50) NULL,
    `star_rating` smallint(6) NULL,
    `helpful_votes` int(11) NULL,
    `total_votes` int(11) NULL,
    `vine` boolean NULL,
    `verified_purchase` boolean NULL,
    `review_headline` varchar(500) NULL,
    `review_body` string NULL
    ) ENGINE=OLAP
    DUPLICATE KEY(`review_date`)
    COMMENT 'OLAP'
    DISTRIBUTED BY HASH(`review_date`);

同步物化视图

  • 基础数据准备
    以下示例基于表 sales_records,其中包含每笔交易的交易 ID record_id、销售员 seller_id、售卖门店 store_id、销售时间 sale_date 以及销售额 sale_amt。
  • 建表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    CREATE TABLE sales_records(
    record_id INT,
    seller_id INT,
    store_id INT,
    sale_date DATE,
    sale_amt BIGINT
    ) DISTRIBUTED BY HASH(record_id);

    INSERT INTO sales_records
    VALUES
    (001,01,1,"2022-03-13",8573),
    (002,02,2,"2022-03-14",6948),
    (003,01,1,"2022-03-14",4319),
    (004,03,3,"2022-03-15",8734),
    (005,03,3,"2022-03-16",4212),
    (006,02,2,"2022-03-17",9515);
  • 业务需求
    该示例业务场景需要频繁分析不同门店的销售额,则查询需要大量调用 sum() 函数,耗费大量系统资源。可使用 EXPLAIN 命令查看此查询的 Query Profile。
    1
    2
    3
    SELECT store_id, SUM(sale_amt)
    FROM sales_records
    GROUP BY store_id;
    其 Query Profile 中的 rollup 项显示为 sales_records(即基表),说明该查询未使用物化视图加速。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    MySQL > EXPLAIN SELECT store_id, SUM(sale_amt)
    FROM sales_records
    GROUP BY store_id;
    +-----------------------------------------------------------------------------+
    | Explain String |
    +-----------------------------------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:3: store_id | 6: sum |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 4:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: HASH_PARTITIONED: 3: store_id |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 04 |
    | UNPARTITIONED |
    | |
    | 3:AGGREGATE (merge finalize) |
    | | output: sum(6: sum) |
    | | group by: 3: store_id |
    | | |
    | 2:EXCHANGE |
    | |
    | PLAN FRAGMENT 2 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 02 |
    | HASH_PARTITIONED: 3: store_id |
    | |
    | 1:AGGREGATE (update serialize) |
    | | STREAMING |
    | | output: sum(5: sale_amt) |
    | | group by: 3: store_id |
    | | |
    | 0:OlapScanNode |
    | TABLE: sales_records |
    | PREAGGREGATION: ON |
    | partitions=1/1 |
    | rollup: sales_records <------ |
    | tabletRatio=10/10 |
    | tabletList=12049,12053,12057,12061,12065,12069,12073,12077,12081,12085 |
    | cardinality=1 |
    | avgRowSize=2.0 |
    | numNodes=0 |
    +-----------------------------------------------------------------------------+
    45 rows in set (0.00 sec)
  • 创建同步物化视图
    1
    2
    3
    4
    5
    CREATE MATERIALIZED VIEW store_amt 
    AS
    SELECT store_id, SUM(sale_amt)
    FROM sales_records
    GROUP BY store_id;
  • 验证查询是否命中同步物化视图
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    MySQL > EXPLAIN SELECT store_id, SUM(sale_amt) FROM sales_records GROUP BY store_id;
    +-----------------------------------------------------------------------------+
    | Explain String |
    +-----------------------------------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:3: store_id | 6: sum |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 4:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: HASH_PARTITIONED: 3: store_id |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 04 |
    | UNPARTITIONED |
    | |
    | 3:AGGREGATE (merge finalize) |
    | | output: sum(6: sum) |
    | | group by: 3: store_id |
    | | |
    | 2:EXCHANGE |
    | |
    | PLAN FRAGMENT 2 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 02 |
    | HASH_PARTITIONED: 3: store_id |
    | |
    | 1:AGGREGATE (update serialize) |
    | | STREAMING |
    | | output: sum(5: sale_amt) |
    | | group by: 3: store_id |
    | | |
    | 0:OlapScanNode |
    | TABLE: sales_records |
    | PREAGGREGATION: ON |
    | partitions=1/1 |
    | rollup: store_amt <------ |
    | tabletRatio=10/10 |
    | tabletList=12092,12096,12100,12104,12108,12112,12116,12120,12124,12128 |
    | cardinality=6 |
    | avgRowSize=2.0 |
    | numNodes=0 |
    +-----------------------------------------------------------------------------+
    45 rows in set (0.00 sec)
    可以看到,此时 Query Profile 中的 rollup 项显示为 store_amt(即同步物化视图),说明该查询已命中同步物化视图。

异步物化视图

  • 基础数据准备
    以下示例基于 Default Catalog 中的两张基表:
    • 表 goods 包含产品 ID item_id1、产品名称 item_name 和产品价格 price。
    • 表 order_list 包含订单 ID order_id、客户 ID client_id 和产品 ID item_id2。
    • 其中 item_id1 与 item_id2 等价。
  • 建表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    CREATE TABLE goods(
    item_id1 INT,
    item_name STRING,
    price FLOAT
    ) DISTRIBUTED BY HASH(item_id1);

    INSERT INTO goods
    VALUES
    (1001,"apple",6.5),
    (1002,"pear",8.0),
    (1003,"potato",2.2);

    CREATE TABLE order_list(
    order_id INT,
    client_id INT,
    item_id2 INT,
    order_date DATE
    ) DISTRIBUTED BY HASH(order_id);

    INSERT INTO order_list
    VALUES
    (10001,101,1001,"2022-03-13"),
    (10001,101,1002,"2022-03-13"),
    (10002,103,1002,"2022-03-13"),
    (10002,103,1003,"2022-03-14"),
    (10003,102,1003,"2022-03-14"),
    (10003,102,1001,"2022-03-14");
  • 业务需求
    该示例业务场景需要频繁分析订单总额,则查询需要将两张表关联并调用 sum() 函数,根据订单 ID 和总额生成一张新表。除此之外,该业务场景需要每天刷新订单总额。
    1
    2
    3
    4
    5
    SELECT
    order_id,
    sum(goods.price) as total
    FROM order_list INNER JOIN goods ON goods.item_id1 = order_list.item_id2
    GROUP BY order_id;
  • 创建异步物化视图
    以下示例根据上述查询语句,基于表 goods 和表 order_list 创建一个“以订单 ID 为分组,对订单中所有商品价格求和”的异步物化视图,并设定其刷新方式为 ASYNC,每天自动刷新。
    1
    2
    3
    4
    5
    6
    7
    8
    CREATE MATERIALIZED VIEW order_mv
    DISTRIBUTED BY HASH(`order_id`)
    REFRESH ASYNC START('2022-09-01 10:00:00') EVERY (interval 1 day)
    AS SELECT
    order_list.order_id,
    sum(goods.price) as total
    FROM order_list INNER JOIN goods ON goods.item_id1 = order_list.item_id2
    GROUP BY order_id;
  • 管理异步物化视图
    • 查询异步物化视图
      1
      2
      3
      4
      5
      6
      7
      8
      9
      MySQL > SELECT * FROM order_mv;
      +----------+--------------------+
      | order_id | total |
      +----------+--------------------+
      | 10001 | 14.5 |
      | 10002 | 10.200000047683716 |
      | 10003 | 8.700000047683716 |
      +----------+--------------------+
      3 rows in set (0.01 sec)
    • 修改异步物化视图
      1
      2
      3
      4
      5
      6
      // 启用被禁用的异步物化视图(将物化视图的状态设置为 Active)。
      ALTER MATERIALIZED VIEW order_mv ACTIVE;
      // 修改异步物化视图名称为 order_total
      ALTER MATERIALIZED VIEW order_mv RENAME order_total;
      // 修改异步物化视图的最大刷新间隔为 2 天。
      ALTER MATERIALIZED VIEW order_mv REFRESH ASYNC EVERY(INTERVAL 2 DAY);
    • 查看异步物化视图
      1
      2
      3
      SHOW MATERIALIZED VIEWS;
      SHOW MATERIALIZED VIEWS WHERE NAME = "order_mv";
      SHOW MATERIALIZED VIEWS WHERE NAME LIKE "order%";
    • 查看异步物化视图创建语句
      1
      SHOW CREATE MATERIALIZED VIEW order_mv;
    • 查看异步物化视图的执行状态
      1
      select * from information_schema.tasks  order by CREATE_TIME desc limit 1\G;
    • 删除异步物化视图
      1
      DROP MATERIALIZED VIEW order_mv;

目前StarRocks根据摄入数据和实际存储数据之间的映射关系,分为明细模型(Duplicate key)、聚合模型(Aggregate key)、更新模型(Unique key)和主键模型(Primary key)。

四种模型分别对应不同业务场景

明细模型

StarRocks建表默认采用明细模型,排序列使用稀疏索引,可以快速过滤数据。明细模型用于保存所有历史数据,并且用户可以考虑将过滤条件中频繁使用的维度列作为排序键,比如用户经常需要查看某一时间,可以将事件时间和事件类型作为排序键。

  • 建表,在建表时指定模型和排序键
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE TABLE IF NOT EXISTS detail (
    event_time DATETIME NOT NULL COMMENT "datetime of event",
    event_type INT NOT NULL COMMENT "type of event",
    user_id INT COMMENT "id of user",
    device_code INT COMMENT "device of ",
    channel INT COMMENT ""
    )
    DUPLICATE KEY ( event_time, event_type )
    DISTRIBUTED BY HASH ( user_id );
  • 插入测试数据
    1
    2
    3
    4
    5
    INSERT INTO detail VALUES('2021-11-18 12:00:00.00',1,1001,1,1);
    INSERT INTO detail VALUES('2021-11-17 12:00:00.00',2,1001,1,1);
    INSERT INTO detail VALUES('2021-11-16 12:00:00.00',3,1001,1,1);
    INSERT INTO detail VALUES('2021-11-15 12:00:00.00',1,1001,1,1);
    INSERT INTO detail VALUES('2021-11-14 12:00:00.00',2,1001,1,1);
  • 查询
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    SELECT *FROM detail;

    +---------------------+------------+---------+-------------+---------+
    | event_time | event_type | user_id | device_code | channel |
    +---------------------+------------+---------+-------------+---------+
    | 2021-11-18 12:00:00 | 1 | 1001 | 1 | 1 |
    | 2021-11-17 12:00:00 | 2 | 1001 | 1 | 1 |
    | 2021-11-16 12:00:00 | 3 | 1001 | 1 | 1 |
    | 2021-11-15 12:00:00 | 1 | 1001 | 1 | 1 |
    | 2021-11-14 12:00:00 | 2 | 1001 | 1 | 1 |
    +---------------------+------------+---------+-------------+---------+

聚合模型

在数据分析中,很多场景需要基于明细数据进行统计和汇总,这个时候就可以使用聚合模型了。比如:统计app访问流量、用户访问时长、用户访问次数、展示总量、消费统计等等场景。
适合聚合模型来分析的业务场景有以下特点:

  • 业务方进行查询为汇总类查询,比如sum、count、max
  • 不需要查看原始明细数据
  • 老数据不会被频繁修改,只会追加和新增

  • 建表,指定聚合模型
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE TABLE IF NOT EXISTS aggregate_tbl (
    site_id LARGEINT NOT NULL COMMENT "id of site",
    date DATE NOT NULL COMMENT "time of event",
    city_code VARCHAR ( 20 ) COMMENT "city_code of user",
    pv BIGINT SUM DEFAULT "0" COMMENT "total page views",
    mt BIGINT MAX
    )
    AGGREGATE KEY(site_id, date, city_code)
    DISTRIBUTED BY HASH (site_id);
  • 插入测试数据
    1
    2
    3
    4
    5
    6
    7
    8
    INSERT INTO aggregate_tbl VALUES(1001,'2021-11-18 12:00:00.00',100,1,5);
    INSERT INTO aggregate_tbl VALUES(1001,'2021-11-18 12:00:00.00',100,1,10);
    INSERT INTO aggregate_tbl VALUES(1001,'2021-11-18 12:00:00.00',100,1,15);
    INSERT INTO aggregate_tbl VALUES(1001,'2021-11-18 12:00:00.00',100,1,100);
    INSERT INTO aggregate_tbl VALUES(1001,'2021-11-18 12:00:00.00',100,1,20);
    INSERT INTO aggregate_tbl VALUES(1002,'2021-11-18 12:00:00.00',100,1,5);
    INSERT INTO aggregate_tbl VALUES(1002,'2021-11-18 12:00:00.00',100,3,25);
    INSERT INTO aggregate_tbl VALUES(1002,'2021-11-18 12:00:00.00',100,1,15);
  • 查询测试数据,可以看到pv是sum累计的值,mt是明细中最大的值。如果只需要查看聚合后的指标,那么使用此种模型将会大大减少存储的数据量。
    1
    2
    3
    4
    5
    6
    7
    8
    select *from aggregate_tbl;

    +---------+------------+-----------+------+------+
    | site_id | date | city_code | pv | mt |
    +---------+------------+-----------+------+------+
    | 1001 | 2021-11-18 | 100 | 5 | 100 |
    | 1002 | 2021-11-18 | 100 | 5 | 25 |
    +---------+------------+-----------+------+------+

更新模型

有些分析场景之下,数据需要进行更新比如拉链表,StarRocks则采用更新模型来满足这种需求,比如电商场景中,订单的状态经常会发生变化,每天的订单更新量可突破上亿。这种业务场景下,如果只靠明细模型下通过delete+insert的方式,是无法满足频繁更新需求的,因此,用户需要使用更新模型(唯一键来判断唯一性)来满足分析需求。但是如果用户需要更加实时/频繁的更新操作,建议使用主键模型。
使用更新模型的场景特点:

  • 已经写入的数据有大量的更新需求
  • 需要进行实时数据分析

  • 建表,指定更新模型
    1
    2
    3
    4
    5
    6
    7
    8
    CREATE TABLE IF NOT EXISTS update_detail (
    create_time DATE NOT NULL COMMENT "create time of an order",
    order_id BIGINT NOT NULL COMMENT "id of an order",
    order_state INT COMMENT "state of an order",
    total_price BIGINT COMMENT "price of an order"
    )
    UNIQUE KEY ( create_time, order_id )
    DISTRIBUTED BY HASH ( order_id );
  • 插入测试数据,注意:现在是指定create_time和order_id为唯一键,那么相同日期相同订单的数据会进行覆盖操作
    1
    2
    3
    4
    5
    INSERT INTO update_detail VALUES('2011-11-18',1001,1,1000);
    INSERT INTO update_detail VALUES('2011-11-18',1001,2,2000);
    INSERT INTO update_detail VALUES('2011-11-17',1001,2,500);
    INSERT INTO update_detail VALUES('2011-11-18',1002,3,3000);
    INSERT INTO update_detail VALUES('2011-11-18',1002,4,4500);
  • 查询结果,可以看到如果日期和订单相同则会进行覆盖操作。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    select *from update_detail;

    +-------------+----------+-------------+-------------+
    | create_time | order_id | order_state | total_price |
    +-------------+----------+-------------+-------------+
    | 2011-11-17 | 1001 | 2 | 500 |
    | 2011-11-18 | 1001 | 2 | 2000 |
    | 2011-11-18 | 1002 | 4 | 4500 |
    +-------------+----------+-------------+-------------+

主键模型

相比较更新模型,主键模型可以更好地支持实时/频繁更新的功能。虽然更新模型也可以实现实时对数据的更新,但是更新模型采用Merge on Read读时合并策略会大大限制查询功能,在主键模型更好地解决了行级的更新操作。配合Flink-connector-starrocks可以完成Mysql CDC实时同步的方案。
需要注意的是:由于存储引擎会为主键建立索引,导入数据时会把索引加载到内存中,所以主键模型对内存的要求更高,所以不适合主键模型的场景还是比较多的。
目前比较适合使用主键模型的场景有这两种:

  • 数据冷热特征,比如最近几天的数据才需要修改,老的冷数据很少需要修改,比如订单数据,老的订单完成后就不在更新,并且分区是按天进行分区的,那么在导入数据时历史分区的数据的主键就不会被加载,也就不会占用内存了,内存中仅会加载近几天的索引。
  • 大宽表(数百列数千列),主键只占整个数据的很小一部分,内存开销比较低。比如用户状态/画像表,虽然列非常多,但总的用户数量不大(千万-亿级别),主键索引内存占用相对可控。

原理:由于更新模型采用Merge策略,使得谓词无法下推和索引无法使用,严重影响查询性能。所以主键模型通过主键约束,保证同一个主键仅存一条数据的记录,这样就规避了Merge操作。StarRocks收到对某记录的更新操作时,会通过主键索引找到该条数据的位置,并对其标记为删除,再插入一条数据,相当于把update改写为delete+insert

  • 建表,指定主键模型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE TABLE IF NOT EXISTS users (
    user_id BIGINT NOT NULL,
    name STRING NOT NULL,
    email STRING NULL,
    address STRING NULL,
    age TINYINT NULL,
    sex TINYINT NULL
    )
    PRIMARY KEY ( user_id )
    DISTRIBUTED BY HASH (user_id);
  • 插入测试数据,和更新模型类似,当user_id相同发送冲突时会进行覆盖

    1
    2
    3
    4
    INSERT INTO users VALUES(1001,'张三','zhang@qq.com','address1',17,'0');
    INSERT INTO users VALUES(1001,'李四','li@qq.com','address2',18,'1');
    INSERT INTO users VALUES(1002,'alice','alice@qq.com','address3',18,'0');
    INSERT INTO users VALUES(1002,'peter','peter@qq.com','address4',18,'1');
  • 查询数据

    1
    2
    3
    4
    5
    6
    7
    select *from users;
    +---------+--------+--------------+----------+------+------+
    | user_id | name | email | address | age | sex |
    +---------+--------+--------------+----------+------+------+
    | 1001 | 李四 | li@qq.com | address2 | 18 | 1 |
    | 1002 | peter | peter@qq.com | address4 | 18 | 1 |
    +---------+--------+--------------+----------+------+------+

其他

StarRocks入门教程

0%