云原生之Flink Native Session Kubernetes+Dinky 实时计算平台架设实践

背景

云原生flink流计算平台解决方案验证

该架设方案全部基于云原生k8s,通俗讲就是 flink任务跑在k8s上

环境要求

k8s部署的话可以看看 k8s-1.25.4部署笔记(containerd)

  • 前提条件
    • Kubernetes 版本 >= 1.9
      1
      2
      3
      4
      ➜  ~ kubectl version --short
      Client ➜ ~ Version: v1.24.4
      Kustomize Version: v4.5.4
      Server Version: v1.24.4
    • 确保您的 ~/.kube/config 文件已正确配置以访问 Kubernetes 集群
      1
      2
      3
      4
      5
      6
      7
      ➜  ~ export KUBECONFIG=~/.kube/config
      ➜ ~ kubectl get nodes
      NAME STATUS ROLES AGE VERSION
      k8s-master2 Ready control-plane 9d v1.24.4
      k8s-node1 Ready <none> 9d v1.24.4
      k8s-node2 Ready <none> 9d v1.24.4
      k8s-node3 Ready <none> 25h v1.24.4
    • 是否启用 Kubernetes DNS正常
      1
      2
      3
      ➜  ~ kubectl cluster-info 
      Kubernetes control plane is running at https://192.168.103.201:6443
      CoreDNS is running at https://192.168.103.201:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
    • 账户具有 RBAC 权限,确保您的命名空间中的 <nameSpace> 服务账户具有创建和删除 Pod 的必要 RBAC 权限。我创建新的命名空间为flink-native
      1
      2
      3
      kubectl create namespace flink-native     
      kubectl create serviceaccount flink-sa -n flink-native
      kubectl create clusterrolebinding flinknative-role-binding-flinknative -n flink-native --clusterrole=edit --serviceaccount=flink-native:flink-sa
  • 在k8s中启动flink集群

    flink1.20

    1
    2
    3
    4
    5
    6
    7
    8
    ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=flink-cluster1 \
    -Dtaskmanager.memory.process.size=4096m \
    -Dkubernetes.taskmanager.cpu=2 \
    -Dtaskmanager.numberOfTaskSlots=4 \
    -Dkubernetes.namespace=flink-native \
    -Dkubernetes.service-account=flink-sa \
    -Dresourcemanager.taskmanager-timeout=3600000
  • 关闭集群
    1
    kubectl delete deployment/flink-cluster1

Dinky 流计算平台部署(helm)

  • 创建pvc

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
    name: dinky-config-volume
    namespace: data-center
    spec:
    storageClassName: nfs-client
    accessModes:
    - ReadWriteMany
    resources:
    requests:
    storage: 5Gi
    ---
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
    name: dinky-lib-volume
    namespace: data-center
    spec:
    storageClassName: nfs-client
    accessModes:
    - ReadWriteMany
    resources:
    requests:
    storage: 5Gi
    ---
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
    name: dinky-resource-volume
    namespace: data-center
    spec:
    storageClassName: nfs-client
    accessModes:
    - ReadWriteMany
    resources:
    requests:
    storage: 5Gi
    • dinky-config-volume 用于放置配置文件(helm 包内的conf目录文件)
      XQW5uU
    • dinky-lib-volume 用于放置自定义jar包,映射的/opt/dinky/customJar/
      b2alEN
  • 调整helm包

    • 部署文件

      helm包经久未维护,我改了下

    • dinky.yaml 增加volumes:
      1
      2
      3
      4
      volumes:
      - name: dinky-lib-volume
      persistentVolumeClaim:
      claimName: dinky-lib-volume
    • dinky.yaml 增加volumeMounts:
      1
      2
      3
      volumeMounts:
      - mountPath: /opt/dinky/customJar
      name: dinky-lib-volume
    • dinky.yaml 修正auto.sh目录位置错误,原来是/opt/dinky/auto.sh
      1
      2
      3
      4
      5
      command:
      - /bin/bash
      - '-c'
      - >-
      /opt/dinky/bin/auto.sh startOnPending {{ .Values.spec.extraEnv.flinkVersion}}
    • values.yaml 配置mysql
      1
      2
      3
      4
      5
      6
      7
      mysql:
      enabled: true
      url: "192.168.103.113:3306"
      auth:
      username: "root"
      password: "XXX"
      database: "dinky"
  • 部署

    1
    2
    helm install dinky . -f values.yaml -n data-center
    helm uninstall dinky -n data-center
  • 在dinky内增加刚刚创建的Flink Native Kubernetes集群
    LoPzn6

流计算实践

实践1: mysql cdc connector 写入 paimon

dinky基于flink sql的作业类型
打开dinky页面,新建Flink Sql任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
EXECUTE CDCSOURCE demo WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\..*',
'sink.connector' = 'sql-catalog',
'sink.catalog.name' = 'fts',
'sink.catalog.type' = 'table-store',
'sink.catalog.warehouse'='file:/tmp/table_store',
'sink.auto-create' = 'true', -- 可自动paimon建表
);

实践2: paimon cdc 写入 paimon

dinky基于flink jar的作业类型 (paimon-flink-action-1.0.0.jar
打开dinky页面,新建Flink jar任务

  • 原始提交命令:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ./bin/flink run \
    ./lib/paimon-flink-action-0.9.0.jar \
    mysql_sync_database \
    --warehouse s3://lakehouse-1253767413/paimon \
    --database app_db \
    --mysql_conf hostname=192.168.103.113 \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=app_db \
    --catalog_conf s3.endpoint=cos.ap-guangzhou.myqcloud.com \
    --catalog_conf s3.access-key=XXX \
    --catalog_conf s3.secret-key=XXX \
    --table_conf bucket=1
  • dinky作业

    通过资源中心上传paimon-flink-action-0.9.0.jar包,然后按照上面原始命令分别填写程序路径程序运行类程序运行参数
    plpJLv

dinky基于flink sql的作业类型
打开dinky页面,新建Flink Sql任务

待验证 刚发布的flink cdc 3.3是否现在可以写入paimon,以前验证flink cdc pipline是无法成功写paimon paimon cdc入湖 & StarRocks湖仓分析实践
示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
SET 'execution.checkpointing.interval' = '10s';
EXECUTE PIPELINE WITHYAML (
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404

sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse

pipeline:
name: MySQL to Paimon Pipeline
parallelism: 1
)

其他相关参考

Native Kubernetes
Flink 1.10 Native Kubernetes 原理与实践
Flink on Kubernetes - Native Kubernetes - 配置基础环境
Flink CDC+Dinky同步到paimon(HDFS)
FlinkCDC pipline+Dinky整库同步StarRocks