环境与背景

在特定的情况下,要保证信息安全的同时还能享受到AIGC大模型带来的乐趣和功能,那么,私有化部署就能帮助到你,最起码,它是一个真正可用的方案。私有化部署指的是将AI应用部署在企业内部的服务器上,而非云端。这种部署方式可以在保证数据安全的同时,提高企业对于自身数据资产的控制权。

简单描述下本地电脑的配置:

  • 处理器:Apple M2 Pro
  • 内存:32 GB
  • 系统:14.3.1 (23D60)

本次只是初步评估ChatGLM3-6B的效果,尽可能在已有本地设备的情况下进行低成本本地模型部署,如果要更好的效果,还是上专业的硬件设备。

MAC部署

ChatGLM3 下载

1
git clone https://github.com/THUDM/ChatGLM3

但是,默认里面是没有模型的,只有自带的简单的聊天项目以及相关的接口示例项目,还得继续下载模型。

ChatGLM3-6B 模型下载

当然,如果你自己不下载这些模型,这些模型就会在运行的时候自动下载(网络不好的话会影响使用体验,所以,建议提前下载)
ZHGKaj

1
2
3
4
git lfs install
git clone https://huggingface.co/THUDM/chatglm3-6b
或者
git clone https://www.modelscope.cn/ZhipuAI/chatglm3-6b.git

项目配置和部署

把下载的服务直接放到需要运行的地方,然后执行python环境管理

1
2
conda create --name chatglm3 python=3.10
conda activate chatglm3

然后,进入到主项目中,开始配置一些环境

1
2
3
4
cd ChatGLM3
pip install -r requirements.txt -i https://mirror.sjtu.edu.cn/pypi/web/simple
pip list //查看安装了什么包
pip show openai // 查看包安装到了哪里

vq1ItJ
可以看到,实际上我们可以运行7种案例。

  1. 基础例子(cli_demo , web_demo_streamlit )
  2. 综合例子(聊天,工具,代码解释)
  3. 模型微调
  4. 类似于langchain的案例
  5. openai接口的案例
  6. TensorRT-LLM推理部署
  7. 工具调用

目前,只有第二个的综合例子,是比较有趣的,就以它为案例进行配置修改。

composite_demo例子

看到,这个demo下还有requirements.txt文件,我们把他给安装了

1
pip install -r requirements.txt -i https://mirror.sjtu.edu.cn/pypi/web/simple

演示中使用 Code Interpreter 还需要安装 Jupyter 内核:

1
2
pip install ipykernel -i https://mirror.sjtu.edu.cn/pypi/web/simple
ipython kernel install --name chatglm3 --user

接着修改client.py里面的配置信息
PG7zl2

1
2
// 修改 MODEL_PATH , chatglm3-6b 绝对路径
MODEL_PATH = os.environ.get('MODEL_PATH', '/Users/junyao/Desktop/chatglm/chatglm3-6b')

对于搭载了Apple Silicon或者AMD GPU的 Mac,可以使用MPS后端来在GPU上运行ChatGLM3-6B。需要参考Apple的官方说明 安装 PyTorch-Nightly(正确的版本号应该是2.x.x.dev2023xxxx,而不是 2.x.x)。

1
2
pip uninstall torch torchvision torchaudio 
pip install --pre torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/nightly/cpu

使用命令

1
pip list | grep torch 

看到类似这样的带dev的就可以下一步了

1
2
3
torch 2.3.0.dev20231224
torchaudio 2.2.0.dev20231224
torchvision 0.18.0.dev20231224

client.pydevice_map = "auto"改为device_map = "mps"
136-140行

1
2
3
4
5
self.model = AutoModel.from_pretrained(
model_path,
trust_remote_code=True,
config=config,
device_map="mps").eval()

150行

1
self.model = AutoModel.from_pretrained(MODEL_PATH, trust_remote_code=True, device_map="mps").eval()

QJx6CW
然后,执行以下命令启动服务

1
streamlit run main.py

效果展示

这回答速度真绝,非常的快。

对话模式

FXB1yl

输入你是谁,它就输自动的输出信息,速度还挺快。而控制台也会显示你输入的信息以及返回的信息。

gVeMKf

工具模式

工具模式,需要自己先定义工具,我这边没有定义,有兴趣的可以整一下。
以下是自带的工具进行的演示:我调用了一个查询天气的工具(tool_registry.py) 文件可以看到 get_weather的代码
7X0GG2

代码解释器模式

fC4KjC

总结

一开始的时候,没有按照Apple的官方说明安装PyTorch-Nightly,并配置MPS,结果效果喜人,一直在推理。后来配置后,感觉速度不亚于chatgpt3.5,答复效果也非常好。下一步开始使用chatGLM搭建私有知识库。

背景

​DataX 是一个异构数据源离线同步工具,本次需求是定时调度数据库,

  • mongodb、mysql定时同步到es
  • mongodb、mysql定时同步到StarRocks

原则是要配合海豚调度DolphinScheduler,但是DolphinScheduler目前看有点重,晚点评估。

job

  • mysql同步StarRocks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    {
    "job":
    {
    "setting":
    {
    "speed":
    {
    "channel": 3
    },
    "errorLimit":
    {
    "record": 0,
    "percentage": 0.02
    }
    },
    "content":
    [
    {
    "reader":
    {
    "name": "mysqlreader",
    "parameter":
    {
    "username": "root",
    "password": "XXX",
    "column":
    [
    "id",
    "user_name",
    "wecom_user_id",
    "sap_code",
    "password",
    "name",
    "cellphone",
    "email"
    ],
    "splitPk": "id",
    "connection":
    [
    {
    "table":
    [
    "user"
    ],
    "jdbcUrl":
    [
    "jdbc:mysql://192.168.103.113:3306/user"
    ]
    }
    ]
    }
    },
    "writer":
    {
    "name": "starrockswriter",
    "parameter":
    {
    "username": "root",
    "password": "XXX",
    "column":
    [
    "id",
    "user_name",
    "wecom_user_id",
    "sap_code",
    "password",
    "name",
    "cellphone",
    "email"
    ],
    "preSql":
    [],
    "postSql":
    [],
    "connection":
    [
    {
    "table":
    [
    "user"
    ],
    "jdbcUrl": "jdbc:mysql://192.168.103.202:9030/",
    "selectedDatabase": "assistant"
    }
    ],
    "loadUrl":
    [
    "192.168.103.202:8040" // FE的http port,这里直接用CN的port
    ],
    "loadProps":
    {}
    }
    }
    }
    ]
    }
    }
  • mongodb同步es

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    {
    "job":
    {
    "setting":
    {
    "speed":
    {
    "channel": 2
    }
    },
    "content":
    [
    {
    "reader":
    {
    "name": "mongodbreader",
    "parameter":
    {
    "address":
    [
    "192.168.103.113:27011"
    ],
    "userName": "root",
    "userPassword": "XXX",
    "authDb": "admin",
    "dbName": "biocitydb",
    "collectionName": "companies",
    "column":
    [
    {
    "name": "_id",
    "type": "string"
    },
    {
    "name": "name",
    "type": "string"
    },
    {
    "name": "sapCode",
    "type": "string"
    }
    ]
    }
    },
    "writer":
    {
    "name": "elasticsearchwriter",
    "parameter":
    {
    "endpoint": "http://192.168.199.113:9200",
    "accessId": "root",
    "accessKey": "XXX",
    "index": "companies",
    "type": "default",
    "cleanup": true,
    "settings":
    {
    "index":
    {
    "number_of_shards": 1,
    "number_of_replicas": 0
    }
    },
    "discovery": false,
    "batchSize": 1000,
    "splitter": ",",
    "column":
    [
    {
    "name": "id",
    "type": "id"
    },
    {
    "name": "name",
    "type": "keyword"
    },
    {
    "name": "sapCode",
    "type": "keyword"
    }
    ]
    }
    }
    }
    ]
    }
    }

启动

1
2
3
$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}

背景

开始统一数仓,seatunnel评估一轮

mongodb-cdc

  • mongodb-cdc实时同步mysql

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["pms"]
    collection = ["pms.demand_item_row"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "_id" : string,
    "ras" : string,
    "planSn" : string,
    "isPlaned" : boolean
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    _id = _id
    ras = ras
    planSn = planSn
    isPlaned = isPlaned
    }
    }
    }

    sink {
    jdbc {
    url = "jdbc:mysql://192.168.103.113:3306"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "XXX"
    generate_sink_sql = true
    database = test
    table = row
    primary_keys = ["_id"]
    }
    }
  • mongodb-cdc实时同步starrocks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["biocitydb"]
    collection = ["biocitydb.users"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "id" : int,
    "username" : string,
    "wecomUserId" : string,
    "password" : string,
    "name" : string,
    "cellphone" : string,
    "email" : string
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    id = id
    username = user_name
    wecomUserId = wecom_user_id
    password = password
    name = name
    cellphone = cellphone
    email = email
    }
    }
    }

    sink {
    StarRocks {
    batch_max_rows=10240
    table="user"
    database="assistant"
    base-url="jdbc:mysql://192.168.103.202:9030"
    password="XXX"
    username="root"
    nodeUrls=[
    "192.168.103.202:8040"
    ]
    }
    }
  • mongodb-cdc 实时同步es

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["pms"]
    collection = ["pms.demand_item_row"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "_id" : string,
    "ras" : string,
    "planSn" : string,
    "isPlaned" : boolean
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    _id = _id
    ras = ras
    planSn = planSn
    isPlaned = isPlaned
    }
    }
    }

    sink {
    Elasticsearch {
    hosts = ["192.168.103.113:9200"]
    index = "row"
    username = "elastic"
    password = "XXX"
    tls_verify_certificate = false
    primary_keys = ["_id"]
    }
    }

sqlserver cdc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 启用CDC功能
EXEC sys.sp_cdc_enable_db;

-- 确认SQL Server Agent已开启
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'

-- 判断当前数据库是否启用了CDC(如果返回1,表示已启用)
SELECT name,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1;

-- 查询schema_id
SELECT * FROM sys.schemas

-- 判断当前数据表是否启用了CDC(如果返回1,表示已启用)
SELECT name, is_tracked_by_cdc, schema_id FROM sys.tables WHERE is_tracked_by_cdc = 1;

-- source_schema 是表所属的架构(schema)的名称。
-- source_name 是要启用 CDC 跟踪的表的名称。
-- cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。
EXEC sys.sp_cdc_enable_table
@source_schema = 'wmwhse1',
@source_name = 'PICKDETAIL',
@role_name = 'cdc_role';

-- 一次核验是否开启CDC
USE SCPRD;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO

启动

1
2
3
4
开始
./bin/seatunnel.sh --config ./example/mongo2sr3 -e local
// 恢复
./bin/seatunnel.sh --config ./example/mongo2sr3 -r {jobId} -e local

其他

SQL Server CDC功能配置总结

背景

内网k8s集群需求:StarRocks的9030端口或mysql的3306端口需要暴露出去,而他们TCP协议,是L4层服务,而ingress是http协议,是L7层服务,不能使用ingress暴露出去

  • k8s-Starrocks情况
    • services: starrocks/starrockscluster-fe-service
      eaVxzN

相关配置

  • deployment: ingress-nginx-controller配置

    • 增加 hostNetwork: true,pod中运行的应用程序可以直接看到宿主主机的网络接口,宿主机所在的局域网上所有网络接口都可以访问到该应用程序及端口
    • 增加 - '--tcp-services-configmap=$(POD_NAMESPACE)/tcp-services'
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      spec:
      hostNetwork: true // 增加
      containers:
      - name: controller
      image: dyrnq/ingress-nginx-controller:v1.6.4
      args:
      - /nginx-ingress-controller
      - '--election-id=ingress-nginx-leader'
      - '--controller-class=k8s.io/ingress-nginx'
      - '--ingress-class=nginx'
      - '--configmap=$(POD_NAMESPACE)/ingress-nginx-controller'
      - '--validating-webhook=:8443'
      - '--validating-webhook-certificate=/usr/local/certificates/cert'
      - '--validating-webhook-key=/usr/local/certificates/key'
      - '--tcp-services-configmap=$(POD_NAMESPACE)/tcp-services' // 增加
      - '--udp-services-configmap=$(POD_NAMESPACE)/udp-services'
  • 编写TCP/UDP端口转发规则实现L4层服务暴露
    kubectl create -f tcp-services-configmap.yaml -n ingress-nginx

    1
    2
    3
    4
    5
    6
    7
    8
    9
    kind: ConfigMap
    apiVersion: v1
    metadata:
    name: tcp-services
    namespace: ingress-nginx
    data:
    '8030': starrocks/starrockscluster-fe-service:8030
    '8040': starrocks/starrockscluster-cn-service:8040
    '9030': starrocks/starrockscluster-fe-service:9030
  • 验证TCP 端口的L4服务暴露,查看pod nginx-ingress-controller的ip

    1
    2
    3
    4
    5
    > kubectl get pod -n ingress-nginx -owide
    NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
    ingress-nginx-admission-create-cpjcl 0/1 Completed 0 70d 10.244.3.20 k8s-node3 <none> <none>
    ingress-nginx-admission-patch-r6ql7 0/1 Completed 0 70d 10.244.2.12 k8s-node1 <none> <none>
    ingress-nginx-controller-58bcff6c76-xdmzq 1/1 Running 0 14m 192.168.103.202 k8s-master1 <none> <none>
  • navicat连接
    I7GLoM

相关连接

Nginx+Ingress-controller解决服务暴露和负载均衡

背景

为了让mongodb能使用标准化的sql语句查询,我们使用官方的mongo-bi做一层转换,这一步是统一数仓的关键。

目前该方案不具可行性,原因:

  • bi-connect连接mongodb数据源必须开启ssl,这就导致外部客户端连接bi-connect必须useSSL=true,而StarRocks的catalogs未支持SSL
  • 不稳定,从BI的外连接看,经常报错

下载安装

bi-connector支持不同平台安装部署,这里针对Linux环境安装部署配置进行记录。

通过官网下载:https://www.mongodb.com/try/download/bi-connector

我这里下载的文件版本为mongodb-bi-linux-x86_64-rhel70-v2.14.12.tgz
下载后解压到/opt/mongodb-bi目录

创建证书

当MongoDB启用认证时,bi-connector必须要配置使用证书,才能通过bi-connector连接mongodb

这里先创建证书

1
2
3
4
5
#执行创建 SSL 证书
mkdir -p /opt/mongodb-bi/certs
cd /opt/mongodb-bi/certs
openssl req -nodes -newkey rsa:2048 -keyout dakeweBI.key -out dakeweBI.crt -x509 -days 365 -subj "/C=US/ST=dakeweBI/L=dakeweBI/O=dakeweBI Security/OU=IT Department/CN=kayakwise.com"
cat dakeweBI.crt dakeweBI.key > dakeweBI.pem

安装 MongoDB BI Connector

1
sudo install -m755 bin/mongo* /usr/bin/

配置 MongoDB BI 配置文件

1
2
3
mkdir -p /opt/mongodb-bi/conf/
mkdir -p /opt/mongodb-bi/logs/
mkdir -p /opt/mongodb-bi/schemas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
net:
bindIp: "0.0.0.0"
port: 3307
ssl:
mode: allowSSL
PEMKeyFile: '/opt/mongodb-bi/certs/dakeweBI.pem'
allowInvalidCertificates: true
minimumTLSVersion: TLS1_0
mongodb:
net:
uri: "mongodb://192.168.103.113:27011"
ssl:
enabled: false
auth:
username: root
password: 'XXX'
source: admin
mechanism: SCRAM-SHA-1
security:
enabled: true
defaultMechanism: "SCRAM-SHA-1"
defaultSource: "admin"
systemLog:
path: /opt/mongodb-bi/logs/mongosqld.log
verbosity: 2
logAppend: true
schema:
path: /opt/mongodb-bi/schemas
maxVarcharLength: 65535
processManagement:
service:
name: "mongosql"
displayName: "MongoSQL Service"
description: "MongoSQL accesses MongoDB data with SQL"

安装 MongoDB BI Connector 服务

1
2
3
4
5
mongosqld install --config /opt/mongodb-bi/conf/mongosqld-config.yml
#重新加载
systemctl daemon-reload
#设置开机自启
systemctl enable mongosql.service

启动

1
2
3
4
5
6
#执行生成 schema 
mongodrdl --host 192.168.103.113:27011 --username root --password XXX --db assistant --authenticationDatabase admin --authenticationMechanism SCRAM-SHA-1 --out /opt/mongodb-bi/schemas/schemas.drdl
# 临时启动
mongosqld --config=/opt/mongodb-bi/conf/mongosqld-config.yml
# 常驻启动
systemctl start mongosql.service

连接查询

  • mysql cil,注意用户名密码是mongodb的用户名密码

    1
    mysql --enable-cleartext-plugin --user='root?source=admin&mechanism=SCRAM-SHA-1' --host=192.168.103.153 --protocol=tcp --port=3307 -p
  • navicat,需要勾选使用ssl
    4qsgkY

  • jdbc连接需要添加额外的JDBC连接字符串
    characterEncoding=UTF-8&connectTimeout=5000&useSSL=true&allowPublicKeyRetrieval=true&verifyServerCertificate=false

其他相关命令

1
2
3
4
5
6
7
8
9
10
11
systemctl start mongosql.service
systemctl status mongosql.service
systemctl stop mongosql.service
systemctl disable mongosql.service
journalctl -u mongosql.service
rm /etc/systemd/system/mongosql.service
rm /etc/systemd/system/mongosql.service
rm /usr/lib/systemd/system/mongosql.service
rm /usr/lib/systemd/system/mongosql.service
systemctl daemon-reload
systemctl reset-failed

lSj3FY

k8s集群背景

k8s-1.25.4部署笔记(containerd)
k8s使用nfs作为动态storageClass存储

部署 StarRocks Operator

  • 添加定制资源 StarRocksCluster
    1
    kubectl apply -f https://raw.githubusercontent.com/StarRocks/starrocks-kubernetes-operator/main/deploy/starrocks.com_starrocksclusters.yaml
  • 部署 StarRocks Operator
    1
    kubectl apply -f https://raw.githubusercontent.com/StarRocks/starrocks-kubernetes-operator/main/deploy/operator.yaml

部署 StarRocks 集群

配置文件范例

我这里配置一个使用腾讯云COS做存算分离+storageClass做FE元数据持久化

shared_data_mode.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
apiVersion: starrocks.com/v1
kind: StarRocksCluster
metadata:
name: starrockscluster
namespace: starrocks
spec:
starRocksFeSpec:
image: starrocks/fe-ubuntu:3.2.2
replicas: 1
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 2
memory: 4Gi
# service:
# type: NodePort # export fe service
# ports:
# - name: query # fill the name from the fe service ports
# nodePort: 32755
# port: 9030
# containerPort: 9030
storageVolumes:
- name: fe-storage-meta
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 10Gi # the size of storage volume for metadata
mountPath: /opt/starrocks/fe/meta # the path of metadata
- name: fe-storage-log
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 1Gi # the size of storage volume for log
mountPath: /opt/starrocks/fe/log # the path of log
configMapInfo:
configMapName: starrockscluster-fe-cm
resolveKey: fe.conf
starRocksCnSpec:
image: starrocks/cn-ubuntu:3.2.2
replicas: 1
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 2
memory: 4Gi
configMapInfo:
configMapName: starrockscluster-cn-cm
resolveKey: cn.conf
storageVolumes:
- name: cn-storage-data
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 10Gi # the size of storage volume for data
mountPath: /opt/starrocks/cn/storage # the path of data
- name: cn-storage-log
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 1Gi # the size of storage volume for log
mountPath: /opt/starrocks/cn/log # the path of log
starRocksFeProxySpec:
replicas: 1
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 1
memory: 2Gi
service:
type: NodePort # export fe proxy service
ports:
- name: http-port # fill the name from the fe proxy service ports
containerPort: 8080
nodePort: 30180 # The range of valid ports is 30000-32767
port: 8080
resolver: "kube-dns.kube-system.svc.cluster.local" # this is the default dns server.

---

# fe config
apiVersion: v1
kind: ConfigMap
metadata:
name: starrockscluster-fe-cm
namespace: starrocks
labels:
cluster: starrockscluster
data:
fe.conf: |
LOG_DIR = ${STARROCKS_HOME}/log
DATE = "$(date +%Y%m%d-%H%M%S)"
JAVA_OPTS="-Dlog4j2.formatMsgNoLookups=true -Xmx8192m -XX:+UseMembar -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:-CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:SoftRefLRUPolicyMSPerMB=0 -Xloggc:${LOG_DIR}/fe.gc.log.$DATE"
JAVA_OPTS_FOR_JDK_9="-Dlog4j2.formatMsgNoLookups=true -Xmx8192m -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -XX:+CMSClassUnloadingEnabled -XX:-CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:SoftRefLRUPolicyMSPerMB=0 -Xlog:gc*:${LOG_DIR}/fe.gc.log.$DATE:time"
JAVA_OPTS_FOR_JDK_11="-Dlog4j2.formatMsgNoLookups=true -Xmx8192m -XX:+UseG1GC -Xlog:gc*:${LOG_DIR}/fe.gc.log.$DATE:time"
http_port = 8030
rpc_port = 9020
query_port = 9030
edit_log_port = 9010
mysql_service_nio_enabled = true
sys_log_level = INFO
run_mode = shared_data
cloud_native_meta_port = 6090
# 是否允许 StarRocks 使用 FE 配置文件中指定的存储相关属性创建默认存储卷
enable_load_volume_from_conf = true
aws_s3_path = files-prod-1253767413/starrocks
aws_s3_region = ap-guangzhou
aws_s3_endpoint = https://cos.ap-guangzhou.myqcloud.com
aws_s3_access_key = XXX
aws_s3_secret_key = XXX

---

# cn config
apiVersion: v1
kind: ConfigMap
metadata:
name: starrockscluster-cn-cm
namespace: starrocks
labels:
cluster: starrockscluster
data:
cn.conf: |
sys_log_level = INFO
# ports for admin, web, heartbeat service
thrift_port = 9060
webserver_port = 8040
heartbeat_service_port = 9050
brpc_port = 8060
1
kubectl apply -f shared_data_mode.yaml

相关链接

使用 Operator 部署 StarRocks 集群

背景

验证一轮查询外部数据湖

Catalog

  • JDBC ✅已验证

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE EXTERNAL CATALOG jdbc1
    PROPERTIES
    (
    "type"="jdbc",
    "user"="root",
    "password"="XXX",
    "jdbc_uri"="jdbc:mysql://172.16.0.9:3306",
    "driver_url"="https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar",
    "driver_class"="com.mysql.cj.jdbc.Driver"
    );
  • es

    • 存算一体+自建es ✅已验证
    • 基于腾讯云cos存算分离 + 腾讯云es产品 ✅已验证

      存在问题,当使用腾讯云默认索引开了动态模板,starrocks解析mapping有bug,会导致show tables出不来

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      CREATE EXTERNAL CATALOG es_test
      COMMENT 'es'
      PROPERTIES
      (
      "type" = "es",
      "es.type" = "_doc",
      "hosts" = "http://172.16.0.8:9200",
      "es.net.ssl" = "false",
      "user" = "elastic",
      "password" = "xxx",
      "es.nodes.wan.only" = "false"
      );

EXTERNAL TABLE

  • COS文件外部表 ❌验证失败
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    CREATE EXTERNAL TABLE user_behavior_fet
    (
    UserID int,
    ItemID int,
    CategoryID int,
    BehaviorType string
    )
    ENGINE=file
    PROPERTIES
    (
    "path" = "s3://race-1301288126/input/user_behavior_ten_million_rows.parquet",
    "format" = "parquet",
    "aws.s3.enable_ssl" = "false",
    "aws.s3.enable_path_style_access" = "true",
    "aws.s3.endpoint" = "cos.ap-guangzhou.myqcloud.com",
    "aws.s3.access_key" = "AKIDAFICxeJwnror5OarCRYXQMF1f5X7lJtO",
    "aws.s3.secret_key" = "XXX"
    );

在现代数据库中,很多数据库都支持分区(Partition)或分桶(Tablet,分桶有时候又叫分片),它的主要目的是提高查询性能。
StarRocks 使用先分区后分桶的方式,可灵活地支持两种分布方式:

  • Hash分布: 不采用分区方式,整个table作为一个分区,只需指定分桶的数量。
  • Range-Hash组合数据分布: 设置分区,指定每个分区的分桶数量。
    StarRocks 同时支持分区和分桶,若干个Tablet组成一个Partition 。

分区

分区的主要作⽤是将⼀张表按照分区键拆分成不同的管理单元,针对每⼀个管理单元选择相应的存储策略,⽐如副本数、冷热策略和存储介质等等。对于访问频率高的分区,可以使用SSD存储;对于访问频率低的分区,可以使用STAT存储。选择合理的分区键可以有效的裁剪扫描的数据量,一般选择日期或者区域作为分区键

  • PARTITION BY RANGE()
  • 在实际应用中,用户一般选取时间列作为分区键,具体划分的粒度视数据量而定,单个分区原始数据量建议维持在100GB以内。

分桶

分桶的目的就是将数据打散为一个个逻辑分片(Tablet),以Tablet作为数据均衡的最小单位,使数据尽量均匀的分布在集群的各个BE节点上,以便在查询时充分发挥集群多机多核的优势。

  • DISTRIBUTED BY HASH()
  • 对每个分区的数据,StarRocks还会再进行Hash分桶。我们在建表时通过DISTRIBUTED BY HASH()语句来设置分桶
  • 在StarRocks的存储引擎中,用户数据被水平划分为若干个数据分片Tablet,也称作数据分桶)。每个 Tablet包含若干数据行,各个Tablet之间的数据没有交集,并且在物理上是独立存储的。
  • 多个Tablet在逻辑上归属于不同的分区(Partition)。一个Tablet只属于一个Partition,而一个Partition包含若干个 Tablet。因为Tablet在物理上是独立存储的,所以可以视为Partition在物理上也是独立。Tablet是数据移动、复制等操作的最小物理存储单元。
  • 若干个Partition组成一个Table。Partition可以视为是逻辑上最小的管理单元,数据的导入与删除,都可以或仅能针对一个Partition进行。

副本数

StarRocks中的副本数就是同一个Tablet保存的份数,在建表时通过replication_num参数指定,也可以后面修改。默认不指定时,StarRocks使用三副本建表,也即每个Tablet会在不同节点存储三份(StarRocks的副本策略会将某个tablet的副本存储在与其不同IP的节点)。

为方便理解,我们假设当前有一个3BE节点的集群,有表Table A和Table B,表A和表B建表时都未设置分区(视为一个大分区),分桶数为3,副本数replication_num为2,则表A和表B在集群中数据分布的一种可能如下图:
yy05cc

案例讲解

如下:StarRocks 的数据划分以及 Tablet 多副本机制

dSQ1Ju

  • 表按照日期划分为4个分区,第一个分区切分成4个Tablet。每个Tablet使用3副本进行备份,分布在3个不同的BE节点上。

  • 由于一张表被切分成了多个Tablet,StarRocks在执行SQL语句时,可以对所有Tablet实现并发处理,从而充分的利用多机、多核提供的计算能力。

  • 用户也可以利用StarRocks数据的切分方式,将高并发请求压力分摊到多个物理节点,从而可以通过增加物理节点的方式来扩展系统支持高并发的能力。

总结一下

在StarRocks中,Partition是数据导入和备份恢复的最小逻辑单位,Tablet是数据复制和均衡的最小物理单位。表(Table)、分区(Partition)、逻辑分片(Tablet)的关系如下图:
bPDqhg

分区是针对表的,是对表的数据取段。分桶是针对每个分区的,会将分区后的每段数据打散为逻辑分片Tablet。副本数是针对Tablet的,是指Tablet保存的份数。那么我们不难发现,对某一个数据表,若每个分区的分桶数一致,其总Tablet数:总Tablet数=分区数分桶数副本数

以table01为例,我们为其设置了3个分区,为每个分区设置了20个分桶,又对分桶后的tablet设置了1副本,则table01的总tablet数= 3 * 20 * 1 = 60 个。查看table01的tablet信息,发现确实共有60个tablet:

1
2
show tablet from table01;
60 rows in set (0.01 sec)

背景及数据准备

本次仅记录一些常见的导入

  • 纽约市交通事故数据
    1
    curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/quickstart/datasets/NYPD_Crash_Data.csv
  • 天气数据
    1
    curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/quickstart/datasets/72505394728.csv
  • 用户行为数据集
    1
    curl -O https://starrocks-datasets.s3.amazonaws.com/user_behavior_ten_million_rows.parquet
  • file1.csv
    1
    2
    3
    4
    1,Lily,21
    2,Rose,22
    3,Alice,23
    4,Julia,24
  • file2.csv
    1
    2
    3
    4
    5,Tony,25
    6,Adam,26
    7,Allen,27
    8,Jacky,28
  • file3.json
    1
    {"name": "北京", "code": 2}

从本地文件系统导入(Stream Load)

TIP:http://<fe_host>:<fe_http_port>默认端口号为8030; 也可以是http://<be_host>:<be_http_port>默认端口号为 8040。

  • table1表或table2表

    • 创建table1表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      CREATE TABLE `table1`
      (
      `id` int(11) NOT NULL COMMENT "用户 ID",
      `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
      `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);
    • 导入file1.json
      1
      2
      3
      4
      5
      6
      curl --location-trusted -u <username>:<password> -H "label:123" \
      -H "Expect:100-continue" \
      -H "column_separator:," \
      -H "columns: id, name, score" \
      -T file1.csv -XPUT \
      http://<fe_host>:<fe_http_port>/api/import_example/table1/_stream_load
  • table_json表

    • 创建 table_json表
      1
      2
      3
      4
      5
      6
      7
      8
      CREATE TABLE `table_json`
      (
      `id` int(11) NOT NULL COMMENT "城市 ID",
      `city` varchar(65533) NULL COMMENT "城市名称"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);
    • 导入file3.json
      1
      2
      3
      4
      5
      6
      curl -v --location-trusted -u <username>:<password> -H "strict_mode: true" \
      -H "Expect:100-continue" \
      -H "format: json" -H "jsonpaths: [\"$.name\", \"$.code\"]" \
      -H "columns: city,tmp_id, id = tmp_id * 100" \
      -T file3.json -XPUT \
      http://<fe_host>:<fe_http_port>/api/import_example/table_json/_stream_load
  • crashdata 表

    • 创建 crashdata 表,用于存储交通事故数据集中的数据。该表的字段经过裁剪,仅包含与该教程相关字段。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      CREATE DATABASE IF NOT EXISTS import_example;
      USE import_example;

      CREATE TABLE IF NOT EXISTS crashdata (
      CRASH_DATE DATETIME,
      BOROUGH STRING,
      ZIP_CODE STRING,
      LATITUDE INT,
      LONGITUDE INT,
      LOCATION STRING,
      ON_STREET_NAME STRING,
      CROSS_STREET_NAME STRING,
      OFF_STREET_NAME STRING,
      CONTRIBUTING_FACTOR_VEHICLE_1 STRING,
      CONTRIBUTING_FACTOR_VEHICLE_2 STRING,
      COLLISION_ID INT,
      VEHICLE_TYPE_CODE_1 STRING,
      VEHICLE_TYPE_CODE_2 STRING
      );
    • 导入纽约市交通事故数据
      1
      2
      3
      4
      5
      6
      7
      8
      9
      curl --location-trusted -u root             \
      -T ./NYPD_Crash_Data.csv \
      -H "label:crashdata-0" \
      -H "column_separator:," \
      -H "skip_header:1" \
      -H "enclose:\"" \
      -H "max_filter_ratio:1" \
      -H "columns:tmp_CRASH_DATE, tmp_CRASH_TIME, CRASH_DATE=str_to_date(concat_ws(' ', tmp_CRASH_DATE, tmp_CRASH_TIME), '%m/%d/%Y %H:%i'),BOROUGH,ZIP_CODE,LATITUDE,LONGITUDE,LOCATION,ON_STREET_NAME,CROSS_STREET_NAME,OFF_STREET_NAME,NUMBER_OF_PERSONS_INJURED,NUMBER_OF_PERSONS_KILLED,NUMBER_OF_PEDESTRIANS_INJURED,NUMBER_OF_PEDESTRIANS_KILLED,NUMBER_OF_CYCLIST_INJURED,NUMBER_OF_CYCLIST_KILLED,NUMBER_OF_MOTORIST_INJURED,NUMBER_OF_MOTORIST_KILLED,CONTRIBUTING_FACTOR_VEHICLE_1,CONTRIBUTING_FACTOR_VEHICLE_2,CONTRIBUTING_FACTOR_VEHICLE_3,CONTRIBUTING_FACTOR_VEHICLE_4,CONTRIBUTING_FACTOR_VEHICLE_5,COLLISION_ID,VEHICLE_TYPE_CODE_1,VEHICLE_TYPE_CODE_2,VEHICLE_TYPE_CODE_3,VEHICLE_TYPE_CODE_4,VEHICLE_TYPE_CODE_5" \
      -XPUT http://<fe_host>:<fe_http_port>/api/import_example/crashdata/_stream_load
  • weatherdata 表

    • 创建 weatherdata 表,用于存储天气数据集中的数据。该表的字段同样经过裁剪,仅包含与该教程相关字段。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      CREATE DATABASE IF NOT EXISTS import_example;
      USE import_example;
      CREATE TABLE IF NOT EXISTS weatherdata (
      DATE DATETIME,
      NAME STRING,
      HourlyDewPointTemperature STRING,
      HourlyDryBulbTemperature STRING,
      HourlyPrecipitation STRING,
      HourlyPresentWeatherType STRING,
      HourlyPressureChange STRING,
      HourlyPressureTendency STRING,
      HourlyRelativeHumidity STRING,
      HourlySkyConditions STRING,
      HourlyVisibility STRING,
      HourlyWetBulbTemperature STRING,
      HourlyWindDirection STRING,
      HourlyWindGustSpeed STRING,
      HourlyWindSpeed STRING
      );
    • 导入天气数据
      1
      2
      3
      4
      5
      6
      7
      8
      9
      curl --location-trusted -u root             \
      -T ./72505394728.csv \
      -H "label:weather-0" \
      -H "column_separator:," \
      -H "skip_header:1" \
      -H "enclose:\"" \
      -H "max_filter_ratio:1" \
      -H "columns: STATION, DATE, LATITUDE, LONGITUDE, ELEVATION, NAME, REPORT_TYPE, SOURCE, HourlyAltimeterSetting, HourlyDewPointTemperature, HourlyDryBulbTemperature, HourlyPrecipitation, HourlyPresentWeatherType, HourlyPressureChange, HourlyPressureTendency, HourlyRelativeHumidity, HourlySkyConditions, HourlySeaLevelPressure, HourlyStationPressure, HourlyVisibility, HourlyWetBulbTemperature, HourlyWindDirection, HourlyWindGustSpeed, HourlyWindSpeed, Sunrise, Sunset, DailyAverageDewPointTemperature, DailyAverageDryBulbTemperature, DailyAverageRelativeHumidity, DailyAverageSeaLevelPressure, DailyAverageStationPressure, DailyAverageWetBulbTemperature, DailyAverageWindSpeed, DailyCoolingDegreeDays, DailyDepartureFromNormalAverageTemperature, DailyHeatingDegreeDays, DailyMaximumDryBulbTemperature, DailyMinimumDryBulbTemperature, DailyPeakWindDirection, DailyPeakWindSpeed, DailyPrecipitation, DailySnowDepth, DailySnowfall, DailySustainedWindDirection, DailySustainedWindSpeed, DailyWeather, MonthlyAverageRH, MonthlyDaysWithGT001Precip, MonthlyDaysWithGT010Precip, MonthlyDaysWithGT32Temp, MonthlyDaysWithGT90Temp, MonthlyDaysWithLT0Temp, MonthlyDaysWithLT32Temp, MonthlyDepartureFromNormalAverageTemperature, MonthlyDepartureFromNormalCoolingDegreeDays, MonthlyDepartureFromNormalHeatingDegreeDays, MonthlyDepartureFromNormalMaximumTemperature, MonthlyDepartureFromNormalMinimumTemperature, MonthlyDepartureFromNormalPrecipitation, MonthlyDewpointTemperature, MonthlyGreatestPrecip, MonthlyGreatestPrecipDate, MonthlyGreatestSnowDepth, MonthlyGreatestSnowDepthDate, MonthlyGreatestSnowfall, MonthlyGreatestSnowfallDate, MonthlyMaxSeaLevelPressureValue, MonthlyMaxSeaLevelPressureValueDate, MonthlyMaxSeaLevelPressureValueTime, MonthlyMaximumTemperature, MonthlyMeanTemperature, MonthlyMinSeaLevelPressureValue, MonthlyMinSeaLevelPressureValueDate, MonthlyMinSeaLevelPressureValueTime, MonthlyMinimumTemperature, MonthlySeaLevelPressure, MonthlyStationPressure, MonthlyTotalLiquidPrecipitation, MonthlyTotalSnowfall, MonthlyWetBulb, AWND, CDSD, CLDD, DSNW, HDSD, HTDD, NormalsCoolingDegreeDay, NormalsHeatingDegreeDay, ShortDurationEndDate005, ShortDurationEndDate010, ShortDurationEndDate015, ShortDurationEndDate020, ShortDurationEndDate030, ShortDurationEndDate045, ShortDurationEndDate060, ShortDurationEndDate080, ShortDurationEndDate100, ShortDurationEndDate120, ShortDurationEndDate150, ShortDurationEndDate180, ShortDurationPrecipitationValue005, ShortDurationPrecipitationValue010, ShortDurationPrecipitationValue015, ShortDurationPrecipitationValue020, ShortDurationPrecipitationValue030, ShortDurationPrecipitationValue045, ShortDurationPrecipitationValue060, ShortDurationPrecipitationValue080, ShortDurationPrecipitationValue100, ShortDurationPrecipitationValue120, ShortDurationPrecipitationValue150, ShortDurationPrecipitationValue180, REM, BackupDirection, BackupDistance, BackupDistanceUnit, BackupElements, BackupElevation, BackupEquipment, BackupLatitude, BackupLongitude, BackupName, WindEquipmentChangeDate" \
      -XPUT http://<fe_host>:<fe_http_port>/api/import_example/weatherdata/_stream_load

从云储存导入 ✅

将文件放到cos的云储存,导入到StarRocks内
mdDEHG

  • 通过如下语句,把COS存储空间race-1301288126input文件夹内数据文件user_behavior_ten_million_rows.parquet的数据导入到目标表 user_behavior_replica
    • 建表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      CREATE DATABASE IF NOT EXISTS mydatabase;
      USE mydatabase;

      CREATE TABLE user_behavior_replica
      (
      UserID int(11),
      ItemID int(11),
      CategoryID int(11),
      BehaviorType varchar(65533),
      Timestamp varbinary
      )
      ENGINE = OLAP
      DUPLICATE KEY(UserID)
      DISTRIBUTED BY HASH(UserID);
    • 导入任务
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      LOAD LABEL mydatabase.label_brokerloadtest_502
      (
      DATA INFILE("cosn://race-1301288126/input/user_behavior_ten_million_rows.parquet")
      INTO TABLE user_behavior_replica
      FORMAT AS "parquet"
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      )
      PROPERTIES
      (
      "timeout" = "3600"
      );
  • 通过如下语句,把COS存储空间race-1301288126input文件夹内数据文件file1.csvfile2.csv的数据导入到目标表table1table2
    • 建表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      CREATE TABLE `table1`
      (
      `id` int(11) NOT NULL COMMENT "用户 ID",
      `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
      `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);

      CREATE TABLE `table2`
      (
      `id` int(11) NOT NULL COMMENT "用户 ID",
      `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
      `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);
    • 导入单个文件到单表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      LOAD LABEL mydatabase.label_brokerloadtest_501
      (
      DATA INFILE("cosn://race-1301288126/input/file1.csv")
      INTO TABLE table1
      COLUMNS TERMINATED BY ","
      (id, name, score)
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      )
      PROPERTIES
      (
      "timeout" = "3600"
      );
    • 导入多个文件到单表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      LOAD LABEL mydatabase.label_brokerloadtest_402
      (
      DATA INFILE("cosn://race-1301288126/input/*")
      INTO TABLE table1
      COLUMNS TERMINATED BY ","
      (id, name, score)
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      )
      PROPERTIES
      (
      "timeout" = "3600"
      );
    • 导入多个数据文件到多表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      LOAD LABEL mydatabase.label_brokerloadtest_403
      (
      DATA INFILE("cosn://race-1301288126/input/file1.csv")
      INTO TABLE table1
      COLUMNS TERMINATED BY ","
      (id, name, score)
      ,
      DATA INFILE("cosn://race-1301288126/input/file2.csv")
      INTO TABLE table2
      COLUMNS TERMINATED BY ","
      (id, name, score)
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      );
      PROPERTIES
      (
      "timeout" = "3600"
      );

其他数据集

  • 1.3 亿条亚马逊产品的用户评论信息,总大小约为 37GB

    每行包含用户 ID(customer_id)、评论 ID(review_id)、已购买产品 ID(product_id)、产品分类(product_category)、评分(star_rating)、评论标题(review_headline)、评论内容(review_body)等 15 列信息。
    amazon_reviews/amazon_reviews_2010
    amazon_reviews/amazon_reviews_2011
    amazon_reviews/amazon_reviews_2012
    amazon_reviews/amazon_reviews_2013
    amazon_reviews/amazon_reviews_2014
    amazon_reviews/amazon_reviews_2015

  • 建表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    CREATE TABLE `amazon_reviews` (  
    `review_date` int(11) NULL,
    `marketplace` varchar(20) NULL,
    `customer_id` bigint(20) NULL,
    `review_id` varchar(40) NULL,
    `product_id` varchar(10) NULL,
    `product_parent` bigint(20) NULL,
    `product_title` varchar(500) NULL,
    `product_category` varchar(50) NULL,
    `star_rating` smallint(6) NULL,
    `helpful_votes` int(11) NULL,
    `total_votes` int(11) NULL,
    `vine` boolean NULL,
    `verified_purchase` boolean NULL,
    `review_headline` varchar(500) NULL,
    `review_body` string NULL
    ) ENGINE=OLAP
    DUPLICATE KEY(`review_date`)
    COMMENT 'OLAP'
    DISTRIBUTED BY HASH(`review_date`);

同步物化视图

  • 基础数据准备
    以下示例基于表 sales_records,其中包含每笔交易的交易 ID record_id、销售员 seller_id、售卖门店 store_id、销售时间 sale_date 以及销售额 sale_amt。
  • 建表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    CREATE TABLE sales_records(
    record_id INT,
    seller_id INT,
    store_id INT,
    sale_date DATE,
    sale_amt BIGINT
    ) DISTRIBUTED BY HASH(record_id);

    INSERT INTO sales_records
    VALUES
    (001,01,1,"2022-03-13",8573),
    (002,02,2,"2022-03-14",6948),
    (003,01,1,"2022-03-14",4319),
    (004,03,3,"2022-03-15",8734),
    (005,03,3,"2022-03-16",4212),
    (006,02,2,"2022-03-17",9515);
  • 业务需求
    该示例业务场景需要频繁分析不同门店的销售额,则查询需要大量调用 sum() 函数,耗费大量系统资源。可使用 EXPLAIN 命令查看此查询的 Query Profile。
    1
    2
    3
    SELECT store_id, SUM(sale_amt)
    FROM sales_records
    GROUP BY store_id;
    其 Query Profile 中的 rollup 项显示为 sales_records(即基表),说明该查询未使用物化视图加速。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    MySQL > EXPLAIN SELECT store_id, SUM(sale_amt)
    FROM sales_records
    GROUP BY store_id;
    +-----------------------------------------------------------------------------+
    | Explain String |
    +-----------------------------------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:3: store_id | 6: sum |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 4:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: HASH_PARTITIONED: 3: store_id |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 04 |
    | UNPARTITIONED |
    | |
    | 3:AGGREGATE (merge finalize) |
    | | output: sum(6: sum) |
    | | group by: 3: store_id |
    | | |
    | 2:EXCHANGE |
    | |
    | PLAN FRAGMENT 2 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 02 |
    | HASH_PARTITIONED: 3: store_id |
    | |
    | 1:AGGREGATE (update serialize) |
    | | STREAMING |
    | | output: sum(5: sale_amt) |
    | | group by: 3: store_id |
    | | |
    | 0:OlapScanNode |
    | TABLE: sales_records |
    | PREAGGREGATION: ON |
    | partitions=1/1 |
    | rollup: sales_records <------ |
    | tabletRatio=10/10 |
    | tabletList=12049,12053,12057,12061,12065,12069,12073,12077,12081,12085 |
    | cardinality=1 |
    | avgRowSize=2.0 |
    | numNodes=0 |
    +-----------------------------------------------------------------------------+
    45 rows in set (0.00 sec)
  • 创建同步物化视图
    1
    2
    3
    4
    5
    CREATE MATERIALIZED VIEW store_amt 
    AS
    SELECT store_id, SUM(sale_amt)
    FROM sales_records
    GROUP BY store_id;
  • 验证查询是否命中同步物化视图
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    MySQL > EXPLAIN SELECT store_id, SUM(sale_amt) FROM sales_records GROUP BY store_id;
    +-----------------------------------------------------------------------------+
    | Explain String |
    +-----------------------------------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:3: store_id | 6: sum |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 4:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: HASH_PARTITIONED: 3: store_id |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 04 |
    | UNPARTITIONED |
    | |
    | 3:AGGREGATE (merge finalize) |
    | | output: sum(6: sum) |
    | | group by: 3: store_id |
    | | |
    | 2:EXCHANGE |
    | |
    | PLAN FRAGMENT 2 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 02 |
    | HASH_PARTITIONED: 3: store_id |
    | |
    | 1:AGGREGATE (update serialize) |
    | | STREAMING |
    | | output: sum(5: sale_amt) |
    | | group by: 3: store_id |
    | | |
    | 0:OlapScanNode |
    | TABLE: sales_records |
    | PREAGGREGATION: ON |
    | partitions=1/1 |
    | rollup: store_amt <------ |
    | tabletRatio=10/10 |
    | tabletList=12092,12096,12100,12104,12108,12112,12116,12120,12124,12128 |
    | cardinality=6 |
    | avgRowSize=2.0 |
    | numNodes=0 |
    +-----------------------------------------------------------------------------+
    45 rows in set (0.00 sec)
    可以看到,此时 Query Profile 中的 rollup 项显示为 store_amt(即同步物化视图),说明该查询已命中同步物化视图。

异步物化视图

  • 基础数据准备
    以下示例基于 Default Catalog 中的两张基表:
    • 表 goods 包含产品 ID item_id1、产品名称 item_name 和产品价格 price。
    • 表 order_list 包含订单 ID order_id、客户 ID client_id 和产品 ID item_id2。
    • 其中 item_id1 与 item_id2 等价。
  • 建表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    CREATE TABLE goods(
    item_id1 INT,
    item_name STRING,
    price FLOAT
    ) DISTRIBUTED BY HASH(item_id1);

    INSERT INTO goods
    VALUES
    (1001,"apple",6.5),
    (1002,"pear",8.0),
    (1003,"potato",2.2);

    CREATE TABLE order_list(
    order_id INT,
    client_id INT,
    item_id2 INT,
    order_date DATE
    ) DISTRIBUTED BY HASH(order_id);

    INSERT INTO order_list
    VALUES
    (10001,101,1001,"2022-03-13"),
    (10001,101,1002,"2022-03-13"),
    (10002,103,1002,"2022-03-13"),
    (10002,103,1003,"2022-03-14"),
    (10003,102,1003,"2022-03-14"),
    (10003,102,1001,"2022-03-14");
  • 业务需求
    该示例业务场景需要频繁分析订单总额,则查询需要将两张表关联并调用 sum() 函数,根据订单 ID 和总额生成一张新表。除此之外,该业务场景需要每天刷新订单总额。
    1
    2
    3
    4
    5
    SELECT
    order_id,
    sum(goods.price) as total
    FROM order_list INNER JOIN goods ON goods.item_id1 = order_list.item_id2
    GROUP BY order_id;
  • 创建异步物化视图
    以下示例根据上述查询语句,基于表 goods 和表 order_list 创建一个“以订单 ID 为分组,对订单中所有商品价格求和”的异步物化视图,并设定其刷新方式为 ASYNC,每天自动刷新。
    1
    2
    3
    4
    5
    6
    7
    8
    CREATE MATERIALIZED VIEW order_mv
    DISTRIBUTED BY HASH(`order_id`)
    REFRESH ASYNC START('2022-09-01 10:00:00') EVERY (interval 1 day)
    AS SELECT
    order_list.order_id,
    sum(goods.price) as total
    FROM order_list INNER JOIN goods ON goods.item_id1 = order_list.item_id2
    GROUP BY order_id;
  • 管理异步物化视图
    • 查询异步物化视图
      1
      2
      3
      4
      5
      6
      7
      8
      9
      MySQL > SELECT * FROM order_mv;
      +----------+--------------------+
      | order_id | total |
      +----------+--------------------+
      | 10001 | 14.5 |
      | 10002 | 10.200000047683716 |
      | 10003 | 8.700000047683716 |
      +----------+--------------------+
      3 rows in set (0.01 sec)
    • 修改异步物化视图
      1
      2
      3
      4
      5
      6
      // 启用被禁用的异步物化视图(将物化视图的状态设置为 Active)。
      ALTER MATERIALIZED VIEW order_mv ACTIVE;
      // 修改异步物化视图名称为 order_total
      ALTER MATERIALIZED VIEW order_mv RENAME order_total;
      // 修改异步物化视图的最大刷新间隔为 2 天。
      ALTER MATERIALIZED VIEW order_mv REFRESH ASYNC EVERY(INTERVAL 2 DAY);
    • 查看异步物化视图
      1
      2
      3
      SHOW MATERIALIZED VIEWS;
      SHOW MATERIALIZED VIEWS WHERE NAME = "order_mv";
      SHOW MATERIALIZED VIEWS WHERE NAME LIKE "order%";
    • 查看异步物化视图创建语句
      1
      SHOW CREATE MATERIALIZED VIEW order_mv;
    • 查看异步物化视图的执行状态
      1
      select * from information_schema.tasks  order by CREATE_TIME desc limit 1\G;
    • 删除异步物化视图
      1
      DROP MATERIALIZED VIEW order_mv;
0%