paimon之aggregation统计数据纠正验证
背景
我们在过往尝试过使用paimon进行分层 Flink & Paimon & StarRocks & Dinky 流式湖仓分层实践验证,一共制作了如下4张表,其中流任务的merge-engine
分别为:
order_dw.dwd_orders
(partial-update)order_dw.dwm_users_shops
(aggregation )order_dw.dws_users
(aggregation )order_dw.dws_shops
(aggregation )
我们经常会有这样的疑问:
1、假设mysql源表中的order_dw.orders
的buy_fee在下游paimon流任务表dwm_users_shops
、dws_users
、dws_shops
中都已经完成聚合统计后,这时候,这时候突然对order_dw.orders
表中的某一条数据就行修改纠正,正常的业务场景就是改价,那么下游的aggregation任务会是如何?数据是否会纠正?纠正的原理和逻辑是什么?
2、假设我的dwm_users_shops任务突然终止,并且需要从某一个checkpoint/savepoint恢复,那么这时候aggregation任务又会是如何处理聚合数据的?
带着这个疑惑,我们来实践验证一下
实践
huggingface 大模型之情感分析
huggingface 大模型下载与使用
安装依赖
1 | pip install transformers datasets tokenizers |
下载
我们来尝试一下下载并使用模型
- bert-base-chinese
- gpt2-chinese-cluecorpussmall
- roberta-base-chinese-extractive-qa
download_LLM.py
1 | from transformers import AutoModel, AutoTokenizer |
使用
使用本地模型
文本生成模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
model_dir = f"{绝对路径}/huggingFace/models/gpt2-chinese-cluecorpussmall/models--uer--gpt2-chinese-cluecorpussmall/snapshots/c2c0249d8a2731f269414cc3b22dff021f8e07a3"
tokenizer = AutoTokenizer.from_pretrained(model_dir)
model = AutoModelForCausalLM.from_pretrained(model_dir)
generator = pipeline("text-generation", model=model, tokenizer=tokenizer, device=0)
result = generator("你好,我是一款大模型",
max_length=150,
num_return_sequences=1,
truncation=True,
temperature=0.7,
top_k=50,
top_p=0.9,
clean_up_tokenization_spaces=False
)
print(result)分类模型
1
2
3
4
5
6
7from transformers import pipeline, BertTokenizer, BertForSequenceClassification
model_dir = f"{绝对路径}/huggingFace/models/bert-base-chinese/models--bert-base-chinese/snapshots/c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f"
model = BertForSequenceClassification.from_pretrained(model_dir)
tokenizer = BertTokenizer.from_pretrained(model_dir)
classifier = pipeline("text-classification", model=model, tokenizer=tokenizer)
result = classifier("你好,我是一款大模型")
print(result)问答模型
1
2
3
4
5
6
7
8
9
10from transformers import pipeline, AutoTokenizer, AutoModelForQuestionAnswering
model_dir = f"{绝对路径}/huggingFace/models/roberta-base-chinese-extractive-qa/models--uer--roberta-base-chinese-extractive-qa/snapshots/9b02143727b9c4655d18b43a69fc39d5eb3ddd53"
tokenizer = AutoTokenizer.from_pretrained(model_dir)
model = AutoModelForQuestionAnswering.from_pretrained(model_dir)
qa_pipeline = pipeline("question-answering", model=model, tokenizer=tokenizer)
result = qa_pipeline({
"question":"Hugging Face 是什么",
"context": "Hugging Face 是一个自然语言处理平台"
})
print(result)
dolphinscheduler & seatunnel之http接口定时同步调度湖仓
背景与目标
我们之前曾评估使用过seatunnel做cdc入湖验证:seatunnel-cdc入湖实践,这些场景都是能直连数据库的场景,
业务需求中经常会出现无法直连数据库做cdc进行数据同步的场景,而这些场景就需要使用api进行数据对接,用海豚调度定时同步数据。
举个实际中的例子:
- ERP(SAP)的库存数据进行同步入湖仓做库存分析
同时,本次目标希望其他同事能依样画葫芦,在以后的对接http接口到湖仓的时候能够自行完成,而非每遇到一个对接需求,就需要通过代码方式进行对接。
准备工作
- seatunnel 2.3.10
首先,您需要在
${SEATUNNEL_HOME}/config/plugin_config
文件中加入连接器名称,然后,执行命令来安装连接器,确认连接器在${SEATUNNEL_HOME}/connectors/
目录下即可。
本例中我们会用到:connector-jdbc
、connector-paimon
写入StarRocks也可以使用connector-starrocks
,本例中的场景比较适合用connector-jdbc
,所以使用connector-jdbc
1
2
3
4
5
6# 配置连接器名称
--connectors-v2--
connector-jdbc
connector-starrocks
connector-paimon
--end--1
2# 安装连接器
sh bin/install-plugin.sh 2.3.10
seatunnel 任务
我们先至少保证能在本地完成seatunnel任务,再完成对海豚调度的对接
http to starRocks
example/http2starrocks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102env {
parallelism = 1
job.mode = "BATCH"
}
source {
Http {
plugin_output = "stock"
url = "https://ip/http/prd/query_sap_stock"
method = "POST"
headers {
Authorization = "Basic XXX"
Content-Type = "application/json"
}
body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
format = "json"
content_field = "$.ET_RETURN.*"
schema {
fields {
MATNR = "string"
MAKTX = "string"
WERKS = "string"
NAME1 = "string"
LGORT = "string"
LGOBE = "string"
CHARG = "string"
MEINS = "string"
LABST = "double"
UMLME = "double"
INSME = "double"
EINME = "double"
SPEME = "double"
RETME = "double"
}
}
}
}
# 此转换操作主要用于字段从命名等方便用途
transform {
Sql {
plugin_input = "stock"
plugin_output = "stock-tf-out"
query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
}
}
# 连接starRocks 进行数据分区覆写,本例适用starRocks建表,按照分区insert overwrite 覆写
sink {
jdbc {
plugin_input = "stock-tf-out"
url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "lab"
password = "XXX"
compatible_mode="starrocks"
query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""
}
}
# connector-starrocks进行对接 (未看到支持sql语句进行数据insert overwrite,本例子场景不适合),比较适合表数据全部删除重建场景
// sink {
// StarRocks {
// plugin_input = "stock-tf-out"
// nodeUrls = ["ip:8030"]
// base-url = "jdbc:mysql://ip:9030/"
// username = "lab"
// password = "XXX"
// database = "scm"
// table = "ods_sap_stock"
// batch_max_rows = 1000
// data_save_mode="DROP_DATA"
// starrocks.config = {
// format = "JSON"
// strip_outer_array = true
// }
// schema_save_mode = "RECREATE_SCHEMA"
// save_mode_create_template="""
// CREATE TABLE IF NOT EXISTS `scm`.`ods_sap_stock` (
// MATNR STRING COMMENT '物料',
// WERKS STRING COMMENT '工厂',
// LGORT STRING COMMENT '库存地点',
// MAKTX STRING COMMENT '物料描述',
// NAME1 STRING COMMENT '工厂名称',
// LGOBE STRING COMMENT '地点描述',
// CHARG STRING COMMENT '批次编号',
// MEINS STRING COMMENT '单位',
// LABST DOUBLE COMMENT '非限制使用库存',
// UMLME DOUBLE COMMENT '在途库存',
// INSME DOUBLE COMMENT '质检库存',
// EINME DOUBLE COMMENT '受限制使用的库存',
// SPEME DOUBLE COMMENT '已冻结的库存',
// RETME DOUBLE COMMENT '退货'
// ) ENGINE=OLAP
// PRIMARY KEY ( MATNR,WERKS,LGORT)
// COMMENT 'sap库存'
// DISTRIBUTED BY HASH (WERKS) PROPERTIES (
// "replication_num" = "1"
// )
// """
// }
// }http to paimon
example/http2paimon
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61env {
parallelism = 1
job.mode = "BATCH"
}
source {
Http {
plugin_output = "stock"
url = "https://ip/http/prd/query_sap_stock"
method = "POST"
headers {
Authorization = "Basic XXX"
Content-Type = "application/json"
}
body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
format = "json"
content_field = "$.ET_RETURN.*"
schema {
fields {
MATNR = "string"
MAKTX = "string"
WERKS = "string"
NAME1 = "string"
LGORT = "string"
LGOBE = "string"
CHARG = "string"
MEINS = "string"
LABST = "double"
UMLME = "double"
INSME = "double"
EINME = "double"
SPEME = "double"
RETME = "double"
}
}
}
}
# 此转换操作主要用于字段从命名等方便用途
transform {
Sql {
plugin_input = "stock"
plugin_output = "stock-tf-out"
query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
}
}
# 连接paimon进行数据同步,paimon 暂时 未看到有支持 insert overwrite 分区覆写,此例仅作为参考,不适用本此例子需求
sink {
Paimon {
warehouse = "s3a://test/"
database = "sap"
table = "ods_sap_stock"
paimon.hadoop.conf = {
fs.s3a.access-key=XXX
fs.s3a.secret-key=XXX
fs.s3a.endpoint="http://minio:9000"
fs.s3a.path.style.access=true
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}
dolphinscheduler 集成seatunnel
- 制作worker镜像打包镜像,推送到镜像仓库
1
2
3
4
5FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:3.2.2
RUN mkdir /opt/seatunnel
RUN mkdir /opt/seatunnel/apache-seatunnel-2.3.10
# 容器集成seatunnel
COPY apache-seatunnel-2.3.10/ /opt/seatunnel/apache-seatunnel-2.3.10/1
docker build --platform=linux/amd64 -t apache/dolphinscheduler-worker:3.2.2-seatunnel .
- 使用新镜像部署一个worker,此处修改
docker-compose.yaml
,增加一个dolphinscheduler-worker-seatunnel
节点1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22...
dolphinscheduler-worker-seatunnel:
image: xxx/dolphinscheduler-worker:3.2.2-seatunnel
profiles: ["all"]
env_file: .env
healthcheck:
test: [ "CMD", "curl", "http://localhost:1235/actuator/health" ]
interval: 30s
timeout: 5s
retries: 3
depends_on:
dolphinscheduler-zookeeper:
condition: service_healthy
volumes:
- ./dolphinscheduler-worker-seatunnel-data:/tmp/dolphinscheduler
- ./dolphinscheduler-logs:/opt/dolphinscheduler/logs
- ./dolphinscheduler-shared-local:/opt/soft
- ./dolphinscheduler-resource-local:/dolphinscheduler
networks:
dolphinscheduler:
ipv4_address: 172.15.0.18
... - dolphinscheduler配置seatunnel 分组及环境配置
安全中心-Worker分组管理
,创建一个这个节点ip的分组,用于以后需要seatunnel的任务跑该分组环境管理-创建环境
,增加一个用于执行seatunnel的环境,同时需要绑定Worker分组为上一步创建的seatunnel分组- 创建工作流定义,把上面的seatunnel任务配置填写上
- 运行时候,选择seatunnel的worker分组和环境即可跑在这个集成了seatunnel的环境上
k8s Native Application & Dinky 生产部署验证
背景
上一次我们实践了
这次我们生产实践验证将 Flink & Paimon & StarRocks & Dinky 流式湖仓分层实践验证 完整部署到生产,并且采用 Flink on k8s
的 Application 方式进行部署。
本部署方案仅提供一个部署思路,可以根据自己的实际生产情况调节
准备工作
- 提前准备好一个k8s集群
- 准备一个对象存储,我这里用
minio
,主要用于jar包的存储及下载在
minio/libs/1.20
目录下上传自己需要的项目以来jar包 - 制作基础镜像
新建一个
extends
的目录,目录下放置下面3个文件dinky-app-1.20-1.2.1-jar-with-dependencies.jar
从dinky目录下的jar获取mysql-connector-java-8.0.27.jar
Dockerfile
镜像制作文件
目录看起来像这样:Dockerfile
1
2
3
4
5ARG FLINK_VERSION=1.20.1
FROM flink:${FLINK_VERSION}-scala_2.12
ADD ./*.jar $FLINK_HOME/lib/ # 将目录下的jar包添加到镜像中的lib目录
RUN rm -rf /opt/flink/lib/flink-table-planner-loader-*.jar # 删除flink-table-planner-loader
RUN mv /opt/flink/opt/flink-table-planner_2.12-*.jar /opt/flink/lib # 添加flink-table-planner_2.12dinky需要替换planner-loader为 planner的原因是血缘分析、语法增强等功能使用到了planner的依赖,但是loader是独立加载的,所以导致dinky无法使用planner的依赖
- 打包镜像,推送到镜像仓库到此我们就制作好了我们的基础镜像。
1
2
3docker build --platform linux/amd64 -t dinky-flink:1.2.1-1.20.1 . --no-cache
docker tag dinky-flink:1.2.1-1.20.0 XXX/dinky-flink:1.2.1-1.20.1
docker push XXX/dinky-flink:1.2.1-1.20.1
实践
- 正确在dinky创建和填写集群配置
Flink镜像地址
填写上面步骤打包好的镜像地址k8s kubeConfig
填写k8s的config文件Default Pod Template
我们参考官网Example of Pod Template,编写pod template
<URL> <ACCESSKEY> <SECRETKEY>
请填写自己的minio地址
initContainers 的作用是将minio中的jar包下载下来,并挂载到flink的lib目录下
tolerations 是我们node节点打了污点,不然随随便便的pod运行,某个节点专供flink跑任务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
initContainers:
- command:
- sh
- -c
- mc alias set minio <URL> <ACCESSKEY> <SECRETKEY>
&& mc cp --recursive minio/libs/1.20/ /tmp/ext-lib
&& mc ls minio
image: minio/mc:RELEASE.2025-02-15T10-36-16Z
imagePullPolicy: IfNotPresent
name: init-lib
volumeMounts:
- mountPath: /tmp/ext-lib
name: ext-lib
tolerations:
- key: "env"
operator: "Equal"
value: "prod"
effect: "NoSchedule"
containers:
- name: flink-main-container
imagePullPolicy: Always
volumeMounts:
- mountPath: /opt/flink/lib/ext-lib
name: ext-lib
volumes:
- name: ext-lib
emptyDir: {}保存点路径
和检查点路径
这里建议填写一个持久化的地址,可以创建一个pvc挂载上去目录
- Jar文件路径
这里填写
local:///opt/flink/lib/dinky-app-1.20-1.2.1-jar-with-dependencies.jar
- 运行paimon cdc 任务,让其以application模式跑在 k8s上
Flink & Paimon & StarRocks & Dinky 流式湖仓分层实践验证
背景
在实际生产中,我们经常会有这样的需求,需要以原始数据流作为基础,然后关联大量的外部表来补充一些属性。例如,我们在订单数据中,希望能得到订单的支付信息和产品分类信息。
特点是:
- 支付信息数据量大
- 产品分类信息数据量少,修改不是太频繁
打宽数据时,我们面临的局限性:
- 双流join存储所有state成本过高。
本篇我们来实践验证一下 部分列更新Partial Update + Lookup Join 打宽做分层的可行性。降低一下流计算压力。
准备
- 源数据准备
MySql创建名称为order_dw的数据库,创建三张表,并插入相应数据。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43CREATE TABLE `orders` (
order_id bigint not null primary key,
user_id varchar(50) not null,
shop_id bigint not null,
product_id bigint not null,
buy_fee bigint not null,
create_time timestamp not null,
update_time timestamp not null default now(),
state int not null
);
CREATE TABLE `orders_pay` (
pay_id bigint not null primary key,
order_id bigint not null,
pay_platform int not null,
create_time timestamp not null
);
CREATE TABLE `product_catalog` (
product_id bigint not null primary key,
catalog_name varchar(50) not null
);
-- 准备数据
INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
INSERT INTO orders VALUES
(100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
(100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
(100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
(100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
(100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
(100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
(100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
INSERT INTO orders_pay VALUES
(2001, 100001, 1, '2023-02-15 17:40:56'),
(2002, 100002, 1, '2023-02-15 17:40:56'),
(2003, 100003, 0, '2023-02-15 17:40:56'),
(2004, 100004, 0, '2023-02-15 17:40:56'),
(2005, 100005, 0, '2023-02-15 18:40:56'),
(2006, 100006, 0, '2023-02-15 18:40:56'),
(2007, 100007, 0, '2023-02-15 18:40:56'); - 包准备
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
paimon-s3-1.0.1.jar
paimon-flink-action-1.0.1.jar
paimon-flink-1.20-1.0.1.jar
mysql-connector-java-8.0.27.jar
flink-sql-connector-mysql-cdc-3.3.0.jar
以上文件放进去 dinky 的opt/dinky/customJar
以及对应flink集群的lib
目录下。(该重启重启,该重新加载包重新加载包)
ODS 业务数据库实时入湖
- 原始flink 命令行提交
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18./bin/flink run -d \
./lib/paimon-flink-action-1.0.1.jar \
mysql_sync_database \
--warehouse s3://lakehouse/paimon \
--database order_dw \
--mysql_conf hostname=XXX \
--mysql_conf port=63950 \
--mysql_conf server-time-zone=Asia/Shanghai \
--mysql_conf username=root \
--mysql_conf password=XXX \
--mysql_conf database-name=order_dw \
--catalog_conf s3.endpoint=http://192.168.103.113:9010 \
--catalog_conf s3.path.style.access=true \
--catalog_conf s3.access-key=XXX \
--catalog_conf s3.secret-key=XXX \
--table_conf bucket=1 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=1 - dinky提交方式
- 创建一个flink jar任务,并且在“资源” 上传
paimon-flink-action-1.0.1.jar
包 - 正确填写jar包运行参数
- 正确选择 “程序路径” 为刚上传的jar包;
- 程序运行类填写
org.apache.paimon.flink.action.FlinkActions
; - 程序运行参数填写
DWD 清洗打宽
dinky创建dwd_orders
的flink sql任务
部分列更新 partial-update
+维表 lookup join
打宽1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77-- 定义paimon catalog
CREATE CATALOG paimoncatalog
WITH
(
'type' = 'paimon',
'warehouse' = 's3://lakehouse/paimon',
's3.endpoint' = 'http://192.168.103.113:9010',
's3.access-key' = 'XXX',
's3.secret-key' = 'XXX'
);
USE CATALOG paimoncatalog;
-- 创建 dwd_orders表
CREATE TABLE IF NOT EXISTS order_dw.dwd_orders (
order_id BIGINT,
order_user_id STRING,
order_shop_id BIGINT,
order_product_id BIGINT,
order_product_catalog_name STRING,
order_fee BIGINT,
order_create_time TIMESTAMP,
order_update_time TIMESTAMP,
order_state INT,
pay_id BIGINT,
pay_platform INT COMMENT 'platform 0: phone, 1: pc',
pay_create_time TIMESTAMP,
PRIMARY KEY (order_id) NOT ENFORCED
)
WITH
(
'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表
'partial-update.remove-record-on-delete' = 'true', -- 让partial-update支持删除
'changelog-producer' = 'full-compaction' -- 使用full-compaction或lookup增量数据产生机制以低延时产出变更数据
);
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- 订单表关联产品分类 打宽
INSERT INTO order_dw.dwd_orders
SELECT
o.order_id,
o.user_id,
o.shop_id,
o.product_id,
dim.catalog_name,
o.buy_fee,
o.create_time,
o.update_time,
o.state,
CAST(NULL AS BIGINT) AS pay_id,
CAST(NULL AS INT) AS pay_platform,
CAST(NULL AS TIMESTAMP) AS pay_create_time
FROM
(
SELECT *, PROCTIME() AS proctime FROM order_dw.orders
) o
LEFT JOIN order_dw.product_catalog FOR SYSTEM_TIME AS OF o.proctime AS dim ON o.product_id = dim.product_id -- lookup join 维表
UNION ALL -- Paimon目前暂不支持在同一个作业里通过多条INSERT语句写入同一张表,因此这里使用UNION ALL
-- 订单支付表 打宽
SELECT
order_id,
CAST(NULL AS STRING) AS user_id,
CAST(NULL AS BIGINT) AS shop_id,
CAST(NULL AS BIGINT) AS product_id,
CAST(NULL AS STRING) AS order_product_catalog_name,
CAST(NULL AS BIGINT) AS order_fee,
CAST(NULL AS TIMESTAMP) AS order_create_time,
CAST(NULL AS TIMESTAMP) AS order_update_time,
CAST(NULL AS INT) AS order_state,
pay_id,
pay_platform,
create_time
FROM
order_dw.orders_pay;
DWS 指标计算
我们目标创建DWS层的聚合表dws_users
以及dws_shops
,但是为了同时计算用户视角的聚合表以及商户视角的聚合表,我们额外创建一个以用户 + 商户为主键的中间表 dwm_users_shops
- 创建名为
dwm_users_shops
的SQL流作业,利用Paimon表的预聚合数据合并机制,自动对order_fee求和,算出用户在商户的消费总额。同时,自动对1求和,也能算出用户在商户的消费次数。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45CREATE CATALOG paimoncatalog
WITH
(
'type' = 'paimon',
'warehouse' = 's3://lakehouse/paimon',
's3.endpoint' = 'http://192.168.103.113:9010',
's3.access-key' = 'XXX',
's3.secret-key' = 'XXX'
);
USE CATALOG paimoncatalog;
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- 为了同时计算用户视角的聚合表以及商户视角的聚合表,另外创建一个以用户 + 商户为主键的中间表。
CREATE TABLE IF NOT EXISTS order_dw.dwm_users_shops (
user_id STRING,
shop_id BIGINT,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额',
pv BIGINT COMMENT '当日用户在商户购买的次数',
PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
'fields.pv.aggregate-function' = 'sum', -- 对 pv 的数据求和产生聚合结果
'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据
-- dwm层的中间表一般不直接提供上层应用查询,因此可以针对写入性能进行优化。
'file.format' = 'avro', -- 使用avro行存格式的写入性能更加高效。
'metadata.stats-mode' = 'none' -- 放弃统计信息会增加OLAP查询代价(对持续的流处理无影响),但会让写入性能更加高效。
);
INSERT INTO order_dw.dwm_users_shops
SELECT
order_user_id,
order_shop_id,
DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
order_fee,
1 -- 一条输入记录代表一次消费
FROM order_dw.dwd_orders
WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL; - 创建SQL作业
dws_shops & dws_users
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63CREATE CATALOG paimoncatalog
WITH
(
'type' = 'paimon',
'warehouse' = 's3://lakehouse/paimon',
's3.endpoint' = 'http://192.168.103.113:9010',
's3.access-key' = 'XXX',
's3.secret-key' = 'XXX'
);
USE CATALOG paimoncatalog;
-- 用户维度聚合指标表。
CREATE TABLE IF NOT EXISTS order_dw.dws_users (
user_id STRING,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额',
PRIMARY KEY (user_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
-- 由于dws_users表不再被下游流式消费,因此无需指定增量数据产生机制
);
-- 商户维度聚合指标表。
CREATE TABLE IF NOT EXISTS order_dw.dws_shops (
shop_id BIGINT,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额',
uv BIGINT COMMENT '当日不同购买用户总人数',
pv BIGINT COMMENT '当日购买用户总人次',
PRIMARY KEY (shop_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
'fields.uv.aggregate-function' = 'sum', -- 对 uv 的数据求和产生聚合结果
'fields.pv.aggregate-function' = 'sum' -- 对 pv 的数据求和产生聚合结果
-- 由于dws_shops表不再被下游流式消费,因此无需指定增量数据产生机制
);
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
INSERT INTO order_dw.dws_users
SELECT
user_id,
ds,
payed_buy_fee_sum
FROM order_dw.dwm_users_shops;
-- 以商户为主键,部分热门商户的数据量可能远高于其他商户。
-- 因此使用local merge在写入Paimon之前先在内存中进行预聚合,缓解数据倾斜问题。
INSERT INTO order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
SELECT
shop_id,
ds,
payed_buy_fee_sum,
1, -- 一条输入记录代表一名用户在该商户的所有消费
pv
FROM order_dw.dwm_users_shops;
ADS 物化视图StarRocks使用
ADS 这边使用StarRocks进行查询paimon 数据,并且构建物化视图
- 添加paimon catalog
1
2
3
4
5
6
7
8
9CREATE EXTERNAL CATALOG `paimon_lab`
PROPERTIES (
"paimon.catalog.type" = "filesystem",
"aws.s3.access_key" = "XXX",
"aws.s3.secret_key" = "XXX",
"aws.s3.endpoint" = "http://172.16.32.16:2002",
"type" = "paimon",
"paimon.catalog.warehouse" = "s3://lakehouse/paimon"
) - 查询
1
select * from paimon_lab.order_dw.dws_users
- 物化视图
1
2
3CREATE MATERIALIZED VIEW ads_users
AS
select * from paimon_lab.order_dw.dws_users
结束
使用4个job完成本例验证
相关
云原生之Flink Native Session Kubernetes+Dinky 实时计算平台架设实践
背景
云原生flink流计算平台解决方案验证
该架设方案全部基于云原生k8s,通俗讲就是 flink任务跑在k8s上
环境要求
k8s部署的话可以看看 k8s-1.25.4部署笔记(containerd)
Flink Native Kubernetes集群部署
- 前提条件
- Kubernetes 版本 >= 1.9
1
2
3
4➜ ~ kubectl version --short
Client ➜ ~ Version: v1.24.4
Kustomize Version: v4.5.4
Server Version: v1.24.4 - 确保您的
~/.kube/config
文件已正确配置以访问 Kubernetes 集群1
2
3
4
5
6
7➜ ~ export KUBECONFIG=~/.kube/config
➜ ~ kubectl get nodes
NAME STATUS ROLES AGE VERSION
k8s-master2 Ready control-plane 9d v1.24.4
k8s-node1 Ready <none> 9d v1.24.4
k8s-node2 Ready <none> 9d v1.24.4
k8s-node3 Ready <none> 25h v1.24.4 - 是否启用 Kubernetes DNS正常
1
2
3➜ ~ kubectl cluster-info
Kubernetes control plane is running at https://192.168.103.201:6443
CoreDNS is running at https://192.168.103.201:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy - 账户具有 RBAC 权限,确保您的命名空间中的
<nameSpace>
服务账户具有创建和删除 Pod 的必要 RBAC 权限。我创建新的命名空间为flink-native
1
2
3kubectl create namespace flink-native
kubectl create serviceaccount flink-sa -n flink-native
kubectl create clusterrolebinding flinknative-role-binding-flinknative -n flink-native --clusterrole=edit --serviceaccount=flink-native:flink-sa
- Kubernetes 版本 >= 1.9
- 在k8s中启动flink集群
flink1.20
1
2
3
4
5
6
7
8./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-cluster1 \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.namespace=flink-native \
-Dkubernetes.service-account=flink-sa \
-Dresourcemanager.taskmanager-timeout=3600000 - 关闭集群
1
kubectl delete deployment/flink-cluster1
Dinky 流计算平台部署(helm)
创建pvc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: dinky-config-volume
namespace: data-center
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: dinky-lib-volume
namespace: data-center
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: dinky-resource-volume
namespace: data-center
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gidinky-config-volume
用于放置配置文件(helm 包内的conf目录文件)dinky-lib-volume
用于放置自定义jar包,映射的/opt/dinky/customJar/
调整helm包
- 部署文件
helm包经久未维护,我改了下
dinky.yaml
增加volumes:1
2
3
4volumes:
- name: dinky-lib-volume
persistentVolumeClaim:
claimName: dinky-lib-volumedinky.yaml
增加volumeMounts:1
2
3volumeMounts:
- mountPath: /opt/dinky/customJar
name: dinky-lib-volumedinky.yaml
修正auto.sh
目录位置错误,原来是/opt/dinky/auto.sh
1
2
3
4
5command:
- /bin/bash
- '-c'
- >-
/opt/dinky/bin/auto.sh startOnPending {{ .Values.spec.extraEnv.flinkVersion}}values.yaml
配置mysql1
2
3
4
5
6
7mysql:
enabled: true
url: "192.168.103.113:3306"
auth:
username: "root"
password: "XXX"
database: "dinky"
- 部署文件
部署
1
2helm install dinky . -f values.yaml -n data-center
helm uninstall dinky -n data-center在dinky内增加刚刚创建的Flink Native Kubernetes集群
流计算实践
实践1: mysql cdc connector 写入 paimon
dinky基于
flink sql
的作业类型
打开dinky页面,新建Flink Sql任务
1 | EXECUTE CDCSOURCE demo WITH ( |
实践2: paimon cdc 写入 paimon
dinky基于
flink jar
的作业类型 (paimon-flink-action-1.0.0.jar
)
打开dinky页面,新建Flink jar任务
- 原始提交命令:
1
2
3
4
5
6
7
8
9
10
11
12
13./bin/flink run \
./lib/paimon-flink-action-0.9.0.jar \
mysql_sync_database \
--warehouse s3://lakehouse-1253767413/paimon \
--database app_db \
--mysql_conf hostname=192.168.103.113 \
--mysql_conf username=root \
--mysql_conf password=XXX \
--mysql_conf database-name=app_db \
--catalog_conf s3.endpoint=cos.ap-guangzhou.myqcloud.com \
--catalog_conf s3.access-key=XXX \
--catalog_conf s3.secret-key=XXX \
--table_conf bucket=1 - dinky作业
通过资源中心上传
paimon-flink-action-0.9.0.jar
包,然后按照上面原始命令分别填写程序路径
、程序运行类
、程序运行参数
实践3: flink cdc pipline 写入 paimon
dinky基于
flink sql
的作业类型
打开dinky页面,新建Flink Sql任务
待验证 刚发布的flink cdc 3.3是否现在可以写入paimon,以前验证flink cdc pipline是无法成功写paimon paimon cdc入湖 & StarRocks湖仓分析实践
示例:
1 | SET 'execution.checkpointing.interval' = '10s'; |
其他相关参考
Native Kubernetes
Flink 1.10 Native Kubernetes 原理与实践
Flink on Kubernetes - Native Kubernetes - 配置基础环境
Flink CDC+Dinky同步到paimon(HDFS)
FlinkCDC pipline+Dinky整库同步StarRocks
基于ZeroTier的SD WAN混合云组网方案验证
背景
以前曾个人玩法自建部署headscale进行组网,主要用于个人电脑(在外或在公司)能与家里群晖nas进行组网,解决随时从nas获取数据的需求。具体可看看过往的记录
今天我们的需求是企业场景,企业场景与个人场景略显不同,它主要是对混合云的组网,要求会更高。
这里的要求如下:
- 希望是完全私有化的方案,不允许有安全问题
- 希望能将腾讯云和自建IDC组网成混合云
- 希望组网后能互相访问内网不同的网段。
我们来验证一下ztnet能否做到以上这3点,为此我们先规划一下资源
- 腾讯云广州
- 腾讯云广州六区-qs服务器
172.16.32.16
私有planet自建
- 腾讯云广州三区-scm服务器
172.16.0.4
leaf客户端
- 腾讯云广州三区-mysql服务
172.16.0.9:3306
- 腾讯云广州六区-qs服务器
- IDC机房
- 机房-data-center服务器
leaf客户端
- 机房-sqlserver数据库
- 机房-data-center服务器
验证
- ztnet部署
ztnet是zerotier私有根服务器的解决方案,解决根服务在别人手里的问题及根节点访问慢的问题
我们部署在腾讯云广州六区-qs服务器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100services:
postgres:
image: postgres:15.2-alpine
container_name: postgres
restart: unless-stopped
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: ztnet
volumes:
- postgres-data:/var/lib/postgresql/data
networks:
- app-network
zerotier:
image: zyclonite/zerotier:1.14.2
hostname: zerotier
container_name: zerotier
restart: unless-stopped
volumes:
- zerotier:/var/lib/zerotier-one
cap_add:
- NET_ADMIN
- SYS_ADMIN
devices:
- /dev/net/tun:/dev/net/tun
networks:
- app-network
ports:
- "9993:9993/udp"
environment:
- ZT_OVERRIDE_LOCAL_CONF=true
- ZT_ALLOW_MANAGEMENT_FROM=172.31.255.0/29
ztnet:
image: sinamics/ztnet:latest
container_name: ztnet
working_dir: /app
volumes:
- zerotier:/var/lib/zerotier-one
restart: unless-stopped
ports:
- 3000:3000
# - 127.0.0.1:3000:3000 <--- Use / Uncomment this line to restrict access to localhost only
environment:
POSTGRES_HOST: postgres
POSTGRES_PORT: 5432
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: ztnet
NEXTAUTH_URL: "http://localhost:3000" # !! Important !! Set the NEXTAUTH_URL environment variable to the canonical URL or IP of your site with port 3000
NEXTAUTH_SECRET: "random_secret"
NEXTAUTH_URL_INTERNAL: "http://ztnet:3000" # Internal NextAuth URL for 'ztnet' container on port 3000. Do not change unless modifying container name.
networks:
- app-network
links:
- postgres
depends_on:
- postgres
- zerotier
############################################################################
# #
# Uncomment the section below to enable HTTPS reverse proxy with Caddy. #
# #
# Steps: #
# 1. Replace <YOUR-PUBLIC-HOST-NAME> with your actual public domain name. #
# 2. Uncomment the caddy_data volume definition in the volumes section. #
# #
############################################################################
# https-proxy:
# image: caddy:latest
# container_name: ztnet-https-proxy
# restart: unless-stopped
# depends_on:
# - ztnet
# command: caddy reverse-proxy --from <YOUR-PUBLIC-HOST-NAME> --to ztnet:3000
# volumes:
# - caddy_data:/data
# networks:
# - app-network
# links:
# - ztnet
# ports:
# - "80:80"
# - "443:443"
volumes:
zerotier:
postgres-data:
# caddy_data:
networks:
app-network:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.31.255.0/29
基于Paimon的StarRocks"增量"按天分区刷新验证
背景
注意:此处的增量指的T+1天的增量刷新,并非paimon数据湖中有增量数据,物化视图只刷新增量数据。
StarRocks并未支持到能增量物化视图刷新,仅支持按照该分区定时刷新该分区数据,这是极为遗憾的,因为按照该分区每一次都全量刷新,是及其损耗性能的。
举个例子:订单表今天有1W条数据,按照天刷新,也就是每一次都要刷新1W条数据。
但是该种刷新方式,对于T+1天的场景还是基本能满足要求的,毕竟BI场景中,T+1天的场景非常常见。
paimon 1.0
StarRocks 3.3.8
flink 1.20
paimon-s3-1.0.0.jar
放入lib目录paimon-flink-action-1.0.0.jar
放入lib目录paimon-flink-1.20-1.0.0.jar
放入lib目录
目标:
- 将mysql表同步到paimon
使用paimon cdc同步mysql源数据orders表到数据湖作为我们数仓的贴源层ods_orders,并且设置paimon表分区按天。
- 通过paimon catalog,创建按天分区增量刷新的物化视图
在Starrocks创建基于paimon分区表的物化视图dwd_orders_mv,验证StarRocks是否能够按照Paimon的分区进行按天增量刷新。
实践
mysql表准备
mysql准备一张orders表,字段有一个创建时间
created
,后续同步到paimon将使用该字段进行按天分区1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17-- 创建orders表
CREATE TABLE `orders` (
`id` int NOT NULL AUTO_INCREMENT,
`created` datetime DEFAULT NULL,
`type` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`k1` int DEFAULT NULL,
`v2` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- 造数据
INSERT INTO `orders` VALUES(1,'2024-11-01 11:17:41',"1",1,1),(2,'2024-11-02 22:17:41',"1",1,1),(3,'2024-11-02 22:17:41',"1",1,1);
-- 查询数据
SELECT * FROM orders;
id created type k1 v2
1 2024-11-01 11:17:41 1 1 1
2 2024-11-02 22:17:41 1 1 1
3 2024-11-02 22:17:41 1 1 1paimon cdc 设置分区键进行同步 (ods_orders)
partition_keys 设置按天分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20./bin/flink run -d \
./lib/paimon-flink-action-1.0.0.jar \
mysql_sync_table \
--warehouse s3://lakehouse/paimon-dev \
--database test \
--table ods_orders \
--primary_keys id,pt \
--partition_keys pt \
--computed_column 'pt=date_format(created, yyyyMMdd)' \
--mysql_conf hostname=192.168.103.113 \
--mysql_conf username=root \
--mysql_conf password=XXX \
--mysql_conf database-name=test \
--mysql_conf table-name='orders' \
--catalog_conf s3.endpoint=http://192.168.103.113:9010 \
--catalog_conf s3.path.style.access=true \
--catalog_conf s3.access-key=yTaSTipLTvJY86qkJGEO \
--catalog_conf s3.secret-key=SisnaqWR0uMxa52PjeZGq5HilQA1a3PRXDz4R8v4 \
--table_conf bucket=1 \
--table_conf changelog-producer=inputStarRocks创建按天分区的物化视图 (dwd_orders_mv)
- 创建物化视图
1
2
3
4
5
6
7
8
9
10
11DROP MATERIALIZED VIEW `test`.`dwd_orders_mv`;
CREATE MATERIALIZED VIEW `test`.`dwd_orders_mv`
COMMENT "dwd-order"
REFRESH ASYNC START('2024-01-01 10:00:00') EVERY (interval 3 MINUTE)
PARTITION BY pt
ORDER BY( created )
PROPERTIES ( "partition_refresh_number" = "30" )
AS
SELECT
*
FROM paimon.test.ods_orders so - 查询物化视图
1
2
3
4
5
6SELECT * FROM `test`.`dwd_orders_mv`
created id type k1 v2 pt
2024-11-01 11:17:41 1 1 1 1 20241101
2024-11-02 22:17:41 3 1 1 1 20241102
2024-11-02 22:17:41 2 1 1 1 20241102
- 创建物化视图
mysql更新数据
1
insert into orders values(4,'2024-11-03 11:17:41',"1",1,1)
查询分区刷新情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16SELECT
*
FROM
information_schema.task_runs
WHERE
task_name IN (
SELECT
TASK_NAME
FROM
information_schema.materialized_views
WHERE
TABLE_NAME IN ( "dwd_orders_mv" ))
ORDER BY
TASK_NAME ASC,
CREATE_TIME DESC
LIMIT 10
结论
mysql新增2024-11-03这一天的数据后,paimon cdc同步到paimon表,存入20241103分区,starrocks正常按天分区刷新2024-11-03号的数据。