云原生之Flink Native Session Kubernetes+Dinky 实时计算平台架设实践
背景
云原生flink流计算平台解决方案验证
该架设方案全部基于云原生k8s,通俗讲就是 flink任务跑在k8s上
环境要求
k8s部署的话可以看看 k8s-1.25.4部署笔记(containerd)
Flink Native Kubernetes集群部署
- 前提条件
- Kubernetes 版本 >= 1.9
1
2
3
4➜ ~ kubectl version --short
Client ➜ ~ Version: v1.24.4
Kustomize Version: v4.5.4
Server Version: v1.24.4 - 确保您的
~/.kube/config
文件已正确配置以访问 Kubernetes 集群1
2
3
4
5
6
7➜ ~ export KUBECONFIG=~/.kube/config
➜ ~ kubectl get nodes
NAME STATUS ROLES AGE VERSION
k8s-master2 Ready control-plane 9d v1.24.4
k8s-node1 Ready <none> 9d v1.24.4
k8s-node2 Ready <none> 9d v1.24.4
k8s-node3 Ready <none> 25h v1.24.4 - 是否启用 Kubernetes DNS正常
1
2
3➜ ~ kubectl cluster-info
Kubernetes control plane is running at https://192.168.103.201:6443
CoreDNS is running at https://192.168.103.201:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy - 账户具有 RBAC 权限,确保您的命名空间中的
<nameSpace>
服务账户具有创建和删除 Pod 的必要 RBAC 权限。我创建新的命名空间为flink-native
1
2
3kubectl create namespace flink-native
kubectl create serviceaccount flink-sa -n flink-native
kubectl create clusterrolebinding flinknative-role-binding-flinknative -n flink-native --clusterrole=edit --serviceaccount=flink-native:flink-sa
- Kubernetes 版本 >= 1.9
- 在k8s中启动flink集群
flink1.20
1
2
3
4
5
6
7
8./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-cluster1 \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.namespace=flink-native \
-Dkubernetes.service-account=flink-sa \
-Dresourcemanager.taskmanager-timeout=3600000 - 关闭集群
1
kubectl delete deployment/flink-cluster1
Dinky 流计算平台部署(helm)
创建pvc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: dinky-config-volume
namespace: data-center
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: dinky-lib-volume
namespace: data-center
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: dinky-resource-volume
namespace: data-center
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gidinky-config-volume
用于放置配置文件(helm 包内的conf目录文件)dinky-lib-volume
用于放置自定义jar包,映射的/opt/dinky/customJar/
调整helm包
- 部署文件
helm包经久未维护,我改了下
dinky.yaml
增加volumes:1
2
3
4volumes:
- name: dinky-lib-volume
persistentVolumeClaim:
claimName: dinky-lib-volumedinky.yaml
增加volumeMounts:1
2
3volumeMounts:
- mountPath: /opt/dinky/customJar
name: dinky-lib-volumedinky.yaml
修正auto.sh
目录位置错误,原来是/opt/dinky/auto.sh
1
2
3
4
5command:
- /bin/bash
- '-c'
- >-
/opt/dinky/bin/auto.sh startOnPending {{ .Values.spec.extraEnv.flinkVersion}}values.yaml
配置mysql1
2
3
4
5
6
7mysql:
enabled: true
url: "192.168.103.113:3306"
auth:
username: "root"
password: "XXX"
database: "dinky"
- 部署文件
部署
1
2helm install dinky . -f values.yaml -n data-center
helm uninstall dinky -n data-center在dinky内增加刚刚创建的Flink Native Kubernetes集群
流计算实践
实践1: mysql cdc connector 写入 paimon
dinky基于
flink sql
的作业类型
打开dinky页面,新建Flink Sql任务
1 | EXECUTE CDCSOURCE demo WITH ( |
实践2: paimon cdc 写入 paimon
dinky基于
flink jar
的作业类型 (paimon-flink-action-1.0.0.jar
)
打开dinky页面,新建Flink jar任务
- 原始提交命令:
1
2
3
4
5
6
7
8
9
10
11
12
13./bin/flink run \
./lib/paimon-flink-action-0.9.0.jar \
mysql_sync_database \
--warehouse s3://lakehouse-1253767413/paimon \
--database app_db \
--mysql_conf hostname=192.168.103.113 \
--mysql_conf username=root \
--mysql_conf password=XXX \
--mysql_conf database-name=app_db \
--catalog_conf s3.endpoint=cos.ap-guangzhou.myqcloud.com \
--catalog_conf s3.access-key=XXX \
--catalog_conf s3.secret-key=XXX \
--table_conf bucket=1 - dinky作业
通过资源中心上传
paimon-flink-action-0.9.0.jar
包,然后按照上面原始命令分别填写程序路径
、程序运行类
、程序运行参数
实践3: flink cdc pipline 写入 paimon
dinky基于
flink sql
的作业类型
打开dinky页面,新建Flink Sql任务
待验证 刚发布的flink cdc 3.3是否现在可以写入paimon,以前验证flink cdc pipline是无法成功写paimon paimon cdc入湖 & StarRocks湖仓分析实践
示例:
1 | SET 'execution.checkpointing.interval' = '10s'; |
其他相关参考
Native Kubernetes
Flink 1.10 Native Kubernetes 原理与实践
Flink on Kubernetes - Native Kubernetes - 配置基础环境
Flink CDC+Dinky同步到paimon(HDFS)
FlinkCDC pipline+Dinky整库同步StarRocks