背景

物理机器有显卡L20 (4 * 48G),安装vmware esxi系统,希望虚拟机rocky linux 10 直通物理显卡。

  • vmware ESXi-8.0U2-22380479
  • rocky linux 10

直通模式配置

vmware esxi 配置

由于ESXi 8.0没有对12代以后的CPU做默认的支持,所以在安装ESXi 8.0过程中会遇到紫屏的问题。为了解决这一问题,需要对ESXi的启动参数进行修改。

    1. 修改启动参数
      在ESXi启动界面,按 Shift + O 进入启动选项编辑模式,然后添加以下参数:cpuUniformityHardCheckPanic=FALSE
      完整的启动参数示例如下:kernelopt=… cpuUniformityHardCheckPanic=FALSE
    1. 启动后通过SSH调整系统配置
      启动进入ESXi系统后,通过SSH连接到主机,并执行以下命令以永久应用修改:
      1
      esxcli system settings kernel set -s cpuUniformityHardCheckPanic -v FALSE
      此命令将启动参数持久化,确保每次启动ESXi时都应用该设置,防止紫屏问题再次出现。
    1. 配置NVIDIA显卡直通
      为了在虚拟机中成功直通NVIDIA显卡,还需要进行特定的配置
    • lspci -v | grep -i nvidia
      1
      2
      3
      4
      0000:81:00.0 Display controller 3D controller: NVIDIA Corporation Device 26ba
      0000:82:00.0 Display controller 3D controller: NVIDIA Corporation Device 26ba
      0000:c1:00.0 Display controller 3D controller: NVIDIA Corporation Device 26ba
      0000:c2:00.0 Display controller 3D controller: NVIDIA Corporation Device 26ba
    • lspci -v | grep -i nvidia -A1
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      0000:81:00.0 Display controller 3D controller: NVIDIA Corporation Device 26ba
      Class 0302: 10de:26ba
      --
      0000:82:00.0 Display controller 3D controller: NVIDIA Corporation Device 26ba
      Class 0302: 10de:26ba
      --
      0000:c1:00.0 Display controller 3D controller: NVIDIA Corporation Device 26ba
      Class 0302: 10de:26ba
      --
      0000:c2:00.0 Display controller 3D controller: NVIDIA Corporation Device 26ba
      Class 0302: 10de:26ba
    • 编辑ESXi配置文件:
      通过SSH连接到ESXi主机,执行以下命令将显卡设备标记为直通设备:
      1
      2
      3
      4
      echo '/device/0000:c2:00.0/owner = "passthru"' >> /etc/vmware/esx.conf
      echo '/device/0000:81:00.0/owner = "passthru"' >> /etc/vmware/esx.conf
      echo '/device/0000:c1:00.0/owner = "passthru"' >> /etc/vmware/esx.conf
      echo '/device/0000:82:00.0/owner = "passthru"' >> /etc/vmware/esx.conf
      其中,0000:01:00.0为NVIDIA显卡的PCI地址,请根据实际硬件调整。
    • 更新Passthru映射文件:
      为了确保显卡的各项功能能够正确直通,需在passthru.map文件中添加相关配置。执行以下命令:
      1
      2
      3
      echo '10de 26ba bridge false' >> /etc/vmware/passthru.map
      echo '10de 26ba link false' >> /etc/vmware/passthru.map
      echo '10de 26ba d3d0 false' >> /etc/vmware/passthru.map
      这里的10de 26ba代表NVIDIA显卡的厂商ID和设备ID,请根据你的显卡型号进行相应调整。
    • 重启ESXi服务:
      为使上述配置生效,需要重启ESXi的管理代理服务。执行以下命令:
      1
      2
      /etc/init.d/hostd restart
      /etc/init.d/vpxa restart
      或者,您也可以选择重启整个ESXi主机。

虚拟机配置

  • 创建虚拟机及配置参数
    • 预留全部内存
      wUml1N
    • 在“虚拟机设置”中,点击“添加其他设备” > “PCI设备”,在列表中找到并勾选NVIDIA显卡(例如,0000:c1:00.0)。
      PZmAZx
    • 取消UEFI安全启动
      jQILo6

为了优化GPU直通的性能和兼容性,需要在虚拟机的高级配置中添加以下参数:

1
2
3
4
5
6
# 该参数用于隐藏虚拟化环境,使得虚拟机能够更好地识别并利用NVIDIA显卡。
hypervisor.cpuid.v0 = FALSE
# 启用64位内存映射输入/输出(MMIO),提高显卡的内存访问效率。
pciPassthru.use64bitMMIO = TRUE
# 设置64位MMIO的大小,确保显卡有足够的内存资源进行高性能计算和渲染任务。
pciPassthru.64bitMMIOSizeGB = 128

pciPassthru.64bitMMIOSizeGB计算方式 https://earlruby.org/tag/use64bitmmio/
64bitMMIOSizeGB值的计算方法是将连接到VM的所有GPU上的显存总量(GB)相加。如果总显存为2的幂次方,则将pciPassthru.64bitMMIOSizeGB设置为下一个2的幂次方即可。
如果总显存介于2的2次方之间,则向上舍入到下一个2的幂次方,然后再次向上舍入。
2的幂数是2、4、8、16、32、64、128、256、512、1024…
例如虚拟机直通两张24G显存的显卡,则64bitMMIOSizeGB应设置为128。计算方式为24*2=48,在32和64之间,先舍入到64,再次舍入到128

BCylVm

linux 驱动安装

  • 内核版本及更新

    1
    2
    uname -r
    sudo dnf update -y
  • 安装基本开发工具

    1
    2
    sudo dnf install epel-release -y
    sudo dnf groupinstall "Development Tools" -y
  • 内核制备

    安装内核开发包;提高兼容性,安装与当前内核版本匹配的内核头文件

    1
    2
    sudo dnf install kernel-devel -y
    sudo dnf install kernel-headers-$(uname -r) -y
  • 安装 Dynamic Kernel Module Support (DKMS)

    DKMS 会在内核更新发生时自动重建内核模块,确保您的 NVIDIA 驱动程序在系统更新后仍然正常运行:

    1
    sudo dnf install dkms -y
  • 将官方 NVIDIA CUDA 存储库添加到您的系统。对于 Rocky Linux 10,请使用 RHEL 9 兼容存储库:

    1
    2
    3
    sudo dnf config-manager --add-repo http://developer.download.nvidia.com/compute/cuda/repos/rhel9/$(uname -i)/cuda-rhel9.repo
    # 更新您的包缓存以识别新的仓库:
    sudo dnf makecache
  • 在安装实际的 NVIDIA 驱动程序之前,请确保存在所有必要的依赖项:

    1
    sudo dnf install kernel-headers-$(uname -r) kernel-devel-$(uname -r) tar bzip2 make automake gcc gcc-c++ pciutils elfutils-libelf-devel libglvnd-opengl libglvnd-glx libglvnd-devel acpid pkgconfig dkms -y
  • 禁用 Nouveau 驱动程序

    Nouveau 驱动程序提供基本功能,但缺乏 CUDA 支持、最佳游戏性能和专业工作站功能等高级功能。专有的 NVIDIA 驱动程序提供完整的功能集和更好的性能优化。
    使用 grubby 命令修改内核参数并将 Nouveau 驱动程序列入黑名单:

    1
    sudo grubby --args="nouveau.modeset=0 rd.driver.blacklist=nouveau" --update-kernel=ALL
    • 为了提高安全性,创建一个黑名单配置文件
      1
      2
      echo "blacklist nouveau" | sudo tee /etc/modprobe.d/blacklist-nouveau.conf
      echo 'omit_drivers+=" nouveau "' | sudo tee /etc/dracut.conf.d/blacklist-nouveau.conf
    • 重新生成初始 RAM 文件系统:
      1
      2
      sudo dracut --regenerate-all --force
      sudo depmod -a
  • 基本驱动程序验证

    1
    2
    3
    yum install pciutils
    sudo lspci | grep NVIDIA
    nvidia-smi
  • GUI 验证方法

    1
    nvidia-settings

参考链接

从坑中爬起:ESXi 8.0直通NVIDIA显卡的血泪经验
How To Install Nvidia Drivers on Rocky Linux 10

Qwen3概述

  • 多种思考模式

    可用户提示或系统消息中添加 /think 和 /no_think 来逐轮切换模型的思考模式

    • 思考模式:在这种模式下,模型会逐步推理,经过深思熟虑后给出最终答案。这种方法非常适合需要深入思考的复杂问题。
    • 非思考模式:在此模式中,模型提供快速、近乎即时的响应,适用于那些对速度要求高于深度的简单问题。
  • 多语言
    119 种语言和方言

  • MCP 支持

Qwen3-30B-A3B

  • 一个拥有约 300 亿总参数和 30 亿激活参数的小型 MoE 模型
  • 需24GB+显存

Qwen3-Embedding & Qwen3-Reranker

Model Type Models Size Layers Sequence Length Embedding Dimension MRL Support Instruction Aware
Text Embedding Qwen3-Embedding-0.6B 0.6B 28 32K 1024 Yes Yes
Text Embedding Qwen3-Embedding-4B 4B 36 32K 2560 Yes Yes
Text Embedding Qwen3-Embedding-8B 8B 36 32K 4096 Yes Yes
Text Reranking Qwen3-Reranker-0.6B 0.6B 28 32K - - Yes
Text Reranking Qwen3-Reranker-4B 4B 36 32K - - Yes
Text Reranking Qwen3-Reranker-8B 8B 36 32K - - Yes
  • 经济型:Embedding-4B + Reranker-4B(显存总需求<30GB)
  • 高性能型:Embedding-8B + Reranker-8B(需多GPU,吞吐量提升40%+)

对比BGE-M3:全方位代差优势

指标 Qwen3-8B BGE-M3 优势幅度
综合得分 70.58 59.56 ↑11.02
上下文长度 32K 8K ↑ 4倍
检索任务(MSMARCO) 57.65 40.88 ↑41%
开放问答(NQ) 10.06 -3.11 实现负分逆转
多语言理解 28.66 20.10 ↑42%

vllm 安装

  • uv(首选)

    1
    2
    3
    uv venv llvm --python 3.12 --seed
    source llvm/bin/activate
    uv pip install vllm
  • conda(有 license 问题)

    1
    2
    3
    4
    5
    conda env list ## 查看conda创建的所以虚拟环境
    conda create -n llvm python=3.12 ## 创建特定版本python
    conda activate llvm ## 进入某个虚拟环境
    conda env remove -n llvm ## 删除某个虚拟环境
    pip install vllm

模型下载

1
2
3
4
5
6
7
8
## 安装下载环境
pip install modelscope
## 下载模型
modelscope download --model Qwen/Qwen3-30B-A3B
## 下载指定目录下
modelscope download --model Qwen/Qwen3-30B-A3B --local_dir /home/models
modelscope download --model Qwen/Qwen3-Embedding-8B --local_dir /home/models
modelscope download --model Qwen/Qwen3-Reranker-8B --local_dir /home/models

vllm 服务启动

vllm serve <model_path>

1
2
3
4
5
6
7
vllm serve /home/models/Qwen3-30B-A3B \
--port 8000 \
--host 0.0.0.0 \
--gpu-memory-utilization 0.95 \
--enable-reasoning \
--reasoning-parser deepseek_r1 \
--served-model-name Qwen3-30B-A3B

参数相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
usage: vllm serve [-h] [--model MODEL]
[--task {auto,generate,embedding,embed,classify,score,reward,transcription}]
[--tokenizer TOKENIZER] [--hf-config-path HF_CONFIG_PATH]
[--skip-tokenizer-init] [--revision REVISION]
[--code-revision CODE_REVISION]
[--tokenizer-revision TOKENIZER_REVISION]
[--tokenizer-mode {auto,slow,mistral,custom}]
[--trust-remote-code]
[--allowed-local-media-path ALLOWED_LOCAL_MEDIA_PATH]
[--download-dir DOWNLOAD_DIR]
[--load-format {auto,pt,safetensors,npcache,dummy,tensorizer,sharded_state,gguf,bitsandbytes,mistral,runai_streamer}]
[--config-format {auto,hf,mistral}]
[--dtype {auto,half,float16,bfloat16,float,float32}]
[--kv-cache-dtype {auto,fp8,fp8_e5m2,fp8_e4m3}]
[--max-model-len MAX_MODEL_LEN]
[--guided-decoding-backend GUIDED_DECODING_BACKEND]
[--logits-processor-pattern LOGITS_PROCESSOR_PATTERN]
[--model-impl {auto,vllm,transformers}]
[--distributed-executor-backend {ray,mp,uni,external_launcher}]
[--pipeline-parallel-size PIPELINE_PARALLEL_SIZE]
[--tensor-parallel-size TENSOR_PARALLEL_SIZE]
[--enable-expert-parallel]
[--max-parallel-loading-workers MAX_PARALLEL_LOADING_WORKERS]
[--ray-workers-use-nsight] [--block-size {8,16,32,64,128}]
[--enable-prefix-caching | --no-enable-prefix-caching]
[--disable-sliding-window] [--use-v2-block-manager]
[--num-lookahead-slots NUM_LOOKAHEAD_SLOTS] [--seed SEED]
[--swap-space SWAP_SPACE] [--cpu-offload-gb CPU_OFFLOAD_GB]
[--gpu-memory-utilization GPU_MEMORY_UTILIZATION]
[--num-gpu-blocks-override NUM_GPU_BLOCKS_OVERRIDE]
[--max-num-batched-tokens MAX_NUM_BATCHED_TOKENS]
[--max-num-partial-prefills MAX_NUM_PARTIAL_PREFILLS]
[--max-long-partial-prefills MAX_LONG_PARTIAL_PREFILLS]
[--long-prefill-token-threshold LONG_PREFILL_TOKEN_THRESHOLD]
[--max-num-seqs MAX_NUM_SEQS] [--max-logprobs MAX_LOGPROBS]
[--disable-log-stats]
[--quantization {aqlm,awq,deepspeedfp,tpu_int8,fp8,ptpc_fp8,fbgemm_fp8,modelopt,nvfp4,marlin,gguf,gptq_marlin_24,gptq_marlin,awq_marlin,gptq,compressed-tensors,bitsandbytes,qqq,hqq,experts_int8,neuron_quant,ipex,quark,moe_wna16,None}]
[--rope-scaling ROPE_SCALING] [--rope-theta ROPE_THETA]
[--hf-overrides HF_OVERRIDES] [--enforce-eager]
[--max-seq-len-to-capture MAX_SEQ_LEN_TO_CAPTURE]
[--disable-custom-all-reduce]
[--tokenizer-pool-size TOKENIZER_POOL_SIZE]
[--tokenizer-pool-type TOKENIZER_POOL_TYPE]
[--tokenizer-pool-extra-config TOKENIZER_POOL_EXTRA_CONFIG]
[--limit-mm-per-prompt LIMIT_MM_PER_PROMPT]
[--mm-processor-kwargs MM_PROCESSOR_KWARGS]
[--disable-mm-preprocessor-cache] [--enable-lora]
[--enable-lora-bias] [--max-loras MAX_LORAS]
[--max-lora-rank MAX_LORA_RANK]
[--lora-extra-vocab-size LORA_EXTRA_VOCAB_SIZE]
[--lora-dtype {auto,float16,bfloat16}]
[--long-lora-scaling-factors LONG_LORA_SCALING_FACTORS]
[--max-cpu-loras MAX_CPU_LORAS] [--fully-sharded-loras]
[--enable-prompt-adapter]
[--max-prompt-adapters MAX_PROMPT_ADAPTERS]
[--max-prompt-adapter-token MAX_PROMPT_ADAPTER_TOKEN]
[--device {auto,cuda,neuron,cpu,openvino,tpu,xpu,hpu}]
[--num-scheduler-steps NUM_SCHEDULER_STEPS]
[--use-tqdm-on-load | --no-use-tqdm-on-load]
[--multi-step-stream-outputs [MULTI_STEP_STREAM_OUTPUTS]]
[--scheduler-delay-factor SCHEDULER_DELAY_FACTOR]
[--enable-chunked-prefill [ENABLE_CHUNKED_PREFILL]]
[--speculative-model SPECULATIVE_MODEL]
[--speculative-model-quantization {aqlm,awq,deepspeedfp,tpu_int8,fp8,ptpc_fp8,fbgemm_fp8,modelopt,nvfp4,marlin,gguf,gptq_marlin_24,gptq_marlin,awq_marlin,gptq,compressed-tensors,bitsandbytes,qqq,hqq,experts_int8,neuron_quant,ipex,quark,moe_wna16,None}]
[--num-speculative-tokens NUM_SPECULATIVE_TOKENS]
[--speculative-disable-mqa-scorer]
[--speculative-draft-tensor-parallel-size SPECULATIVE_DRAFT_TENSOR_PARALLEL_SIZE]
[--speculative-max-model-len SPECULATIVE_MAX_MODEL_LEN]
[--speculative-disable-by-batch-size SPECULATIVE_DISABLE_BY_BATCH_SIZE]
[--ngram-prompt-lookup-max NGRAM_PROMPT_LOOKUP_MAX]
[--ngram-prompt-lookup-min NGRAM_PROMPT_LOOKUP_MIN]
[--spec-decoding-acceptance-method {rejection_sampler,typical_acceptance_sampler}]
[--typical-acceptance-sampler-posterior-threshold TYPICAL_ACCEPTANCE_SAMPLER_POSTERIOR_THRESHOLD]
[--typical-acceptance-sampler-posterior-alpha TYPICAL_ACCEPTANCE_SAMPLER_POSTERIOR_ALPHA]
[--disable-logprobs-during-spec-decoding [DISABLE_LOGPROBS_DURING_SPEC_DECODING]]
[--model-loader-extra-config MODEL_LOADER_EXTRA_CONFIG]
[--ignore-patterns IGNORE_PATTERNS]
[--preemption-mode PREEMPTION_MODE]
[--served-model-name SERVED_MODEL_NAME [SERVED_MODEL_NAME ...]]
[--qlora-adapter-name-or-path QLORA_ADAPTER_NAME_OR_PATH]
[--show-hidden-metrics-for-version SHOW_HIDDEN_METRICS_FOR_VERSION]
[--otlp-traces-endpoint OTLP_TRACES_ENDPOINT]
[--collect-detailed-traces COLLECT_DETAILED_TRACES]
[--disable-async-output-proc]
[--scheduling-policy {fcfs,priority}]
[--scheduler-cls SCHEDULER_CLS]
[--override-neuron-config OVERRIDE_NEURON_CONFIG]
[--override-pooler-config OVERRIDE_POOLER_CONFIG]
[--compilation-config COMPILATION_CONFIG]
[--kv-transfer-config KV_TRANSFER_CONFIG]
[--worker-cls WORKER_CLS]
[--worker-extension-cls WORKER_EXTENSION_CLS]
[--generation-config GENERATION_CONFIG]
[--override-generation-config OVERRIDE_GENERATION_CONFIG]
[--enable-sleep-mode] [--calculate-kv-scales]
[--additional-config ADDITIONAL_CONFIG] [--enable-reasoning]
[--reasoning-parser {deepseek_r1}]

docker compose 部署

  • Qwen3-30B-A3B

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    version: '3.8'

    services:
    qwen3:
    image: vllm/vllm-openai:latest # 使用最新的vLLM镜像
    container_name: qwen3-vllm
    restart: unless-stopped
    environment:
    - HF_ENDPOINT=https://hf-mirror.com
    - MODEL_PATH=/models/Qwen3-30B-A3B # 模型路径
    - SERVED_MODEL_NAME=Qwen3-30B-A3B # 对外服务时使用的模型名称
    - API_KEY=dakewe # 设置API密钥
    - MEMORY_UTILIZATION=0.95 # GPU内存利用率,接近但不超过显存限制
    volumes:
    - ./models:/models # 将本地的模型目录挂载到容器内的/models路径
    runtime: nvidia # 使用NVIDIA运行时以支持GPU
    deploy:
    resources:
    reservations:
    devices:
    - driver: nvidia
    count: all
    capabilities: [gpu]
    ports:
    - "8000:8000" # 将容器的8000端口映射到主机的8000端口
  • Qwen3-Embedding-8B

    临时,等vllm-openai 支持
    issue: https://github.com/vllm-project/vllm/issues/19229

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    services:
    Qwen3-Embedding-8B:
    container_name: Qwen3-Embedding-8B
    restart: no
    image: dengcao/vllm-openai:v0.9.2-dev #采用vllm最新的开发版制作的镜像,经测试正常,可放心使用
    ipc: host
    volumes:
    - ./models:/models
    command: ["--model", "/models/Qwen3-Embedding-8B", "--served-model-name", "Qwen3-Embedding-8B", "--gpu-memory-utilization", "0.90"]
    ports:
    - 8001:8000
    deploy:
    resources:
    reservations:
    devices:
    - driver: nvidia
    count: all
    capabilities: [gpu]
  • Qwen3-Reranker-8B

    临时,等vllm-openai 支持
    issue: https://github.com/vllm-project/vllm/issues/19229

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    services:  
    Qwen3-Reranker-8B:
    container_name: Qwen3-Reranker-8B
    restart: no
    image: dengcao/vllm-openai:v0.9.2-dev #采用vllm最新的开发版制作的镜像,经在NVIDIA RTX3060平台主机上测试正常,可放心使用。
    ipc: host
    volumes:
    - ./models:/models
    command: ['--model', '/models/Qwen3-Reranker-8B', '--served-model-name', 'Qwen3-Reranker-8B', '--gpu-memory-utilization', '0.90', '--hf_overrides','{"architectures": ["Qwen3ForSequenceClassification"],"classifier_from_token": ["no", "yes"],"is_original_qwen3_reranker": true}']
    deploy:
    resources:
    reservations:
    devices:
    - driver: nvidia
    count: all
    capabilities: [gpu]
    ports:
    - 8002:8000

背景

解决文档内容提取

前期调研

整体上看,大部分都是基于PaddleOCR基础上集成,比较有特点的是 MinerU

部署

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
version: '3.8'

services:
mineru-api:
image: mineru-api-full
container_name: mineru-api
runtime: nvidia # 需要NVIDIA Container Runtime支持
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
shm_size: 32g
ports:
- "8888:8888"
- "30000:30000"
restart: unless-stopped

相关链接

基于MinerU 2.0的PDF解析API

背景

我们在过往尝试过使用paimon进行分层 Flink & Paimon & StarRocks & Dinky 流式湖仓分层实践验证,一共制作了如下4张表,其中流任务的merge-engine分别为:

  • order_dw.dwd_orders (partial-update)
  • order_dw.dwm_users_shops (aggregation )
  • order_dw.dws_users (aggregation )
  • order_dw.dws_shops (aggregation )

我们经常会有这样的疑问:

1、假设mysql源表中的order_dw.orders的buy_fee在下游paimon流任务表dwm_users_shopsdws_usersdws_shops中都已经完成聚合统计后,这时候,这时候突然对order_dw.orders表中的某一条数据就行修改纠正,正常的业务场景就是改价,那么下游的aggregation任务会是如何?数据是否会纠正?纠正的原理和逻辑是什么?
2、假设我的dwm_users_shops任务突然终止,并且需要从某一个checkpoint/savepoint恢复,那么这时候aggregation任务又会是如何处理聚合数据的?

带着这个疑惑,我们来实践验证一下

实践

我们以Flink & Paimon & StarRocks & Dinky 流式湖仓分层实践验证为基础

目标

本文通过huggingface的Transform类进行BERT的文本分类代码训练与验证,数据集采用网上整理包括正向和负向评论的携程网数据

通过实战完整地去掌握完整代码步骤,包括:

  • 数据的加载
  • 创建数据集
  • 划分训练集和验证集
  • 创建模型和优化器
  • 模型的训练
  • 模型的验证
  • 模型的预测

实现当输入一个对酒店的评价的一段文字,模型输出对于这个酒店的分析,判断是正向评价还是负面评价

BERT适用场景

BERT(Bidirectional Encoder Representations from Transformers)是一种基于Transformer的预训练语言模型,它在自然语言处理(NLP)领域中具有广泛的应用,以下是一些BERT特别适用的场景:

  • 1、文本分类:BERT可以用于情感分析、主题分类、垃圾邮件检测等文本分类任务。它能够捕捉到文本中细微的语义差异,从而实现更准确的分类。
  • 2、问答系统:BERT可以用于构建问答系统,它能够理解问题的上下文,并在大量文本中找到正确的答案。
  • 3、命名实体识别(NER):在NER任务中,BERT能够识别文本中的特定实体,如人名、地点、组织等。
  • 4、机器翻译:虽然BERT最初是为英语设计的,但它也可以通过多语言预训练模型来支持机器翻译任务。
  • 5、文本摘要:BERT可以用于生成文本的摘要,无论是提取式摘要还是生成式摘要。
  • 6、语言模型评估:BERT可以用于评估其他语言模型的性能,通过比较预训练模型和目标模型的表示。
  • 7、文本相似度:BERT可以用于计算文本之间的相似度,这在推荐系统、搜索引擎优化等领域非常有用。
  • 8、对话系统:BERT可以用于构建对话系统,理解用户的意图,并生成合适的回复。
  • 9、文档分类:在法律、医疗等领域,BERT可以用于对文档进行分类,帮助专业人士快速定位信息。
  • 10、文本生成:虽然BERT主要用于理解语言,但它也可以用于文本生成任务,如续写故事、生成诗歌等。
  • 11、语义匹配:BERT可以用于比较两个句子的语义相似度,这在语义搜索、信息检索等领域非常有用。
  • 12、文本纠错:BERT可以用于检测和纠正文本中的错误,提高文本质量。
  • 13、多任务学习:BERT可以同时处理多个NLP任务,通过共享表示来提高各个任务的性能。

本次数据集介绍

数据集来源于网络整理的携程网数据, 包括7000 多条酒店评论数据,5000 多条正向评论,2000 多条负向评论

地址:https://github.com/SophonPlus/ChineseNlpCorpus/tree/master/datasets/ChnSentiCorp_htl_all

字段说明

字段 说明
label 1 表示正向评论,0 表示负向评论
review 评论内容

MQPdAa

硬件环境

本例比较简单,好点配置的电脑应该都跑得了,但为了熟悉还是用GPU来玩一下,熟悉一下方便以后租借算力。
本次实验使用AutoDL AI算力云 租借算力,采用GPU卡进行训练,使用Ubuntu 24.04LTS版本,Python使用Python 3.12.3版本

基本的硬件配置如下:

  • CPU: 16 核,Xeon(R) Gold 6430
  • 内存: 120 GB
  • GPU: Nvidia RTX 4090 / 24 GB

IX3I8x

https://github.com/zyds/transformers-code/blob/master/01-Getting%20Started/07-trainer/classification_demo.ipynb

其他可用数据集

https://github.com/CLUEbenchmark/CLUEDatasetSearch

安装依赖

1
2
3
pip install transformers datasets tokenizers
pip install torch==2.2.0
pip install "numpy<2"

下载

我们来尝试一下下载并使用模型

  • bert-base-chinese
  • gpt2-chinese-cluecorpussmall
  • roberta-base-chinese-extractive-qa

download_LLM.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from transformers import AutoModel, AutoTokenizer
# 分类模型
# model_name = "bert-base-chinese"
# cache_dir = "./models/bert-base-chinese"

# 文本生成模型
# model_name = "uer/gpt2-chinese-cluecorpussmall"
# cache_dir = "./models/gpt2-chinese-cluecorpussmall"

# 问答模型
model_name = "uer/roberta-base-chinese-extractive-qa"
cache_dir = "./models/roberta-base-chinese-extractive-qa"

model = AutoModel.from_pretrained(model_name, cache_dir=cache_dir)
tokenizer = AutoTokenizer.from_pretrained(model_name, cache_dir=cache_dir)

使用

使用本地模型

  • 文本生成模型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
    model_dir = f"{绝对路径}/huggingFace/models/gpt2-chinese-cluecorpussmall/models--uer--gpt2-chinese-cluecorpussmall/snapshots/c2c0249d8a2731f269414cc3b22dff021f8e07a3"
    tokenizer = AutoTokenizer.from_pretrained(model_dir)
    model = AutoModelForCausalLM.from_pretrained(model_dir)
    generator = pipeline("text-generation", model=model, tokenizer=tokenizer, device=0)
    result = generator("你好,我是一款大模型",
    max_length=150,
    num_return_sequences=1,
    truncation=True,
    temperature=0.7,
    top_k=50,
    top_p=0.9,
    clean_up_tokenization_spaces=False
    )
    print(result)
  • 分类模型

    1
    2
    3
    4
    5
    6
    7
    from transformers import pipeline, BertTokenizer, BertForSequenceClassification
    model_dir = f"{绝对路径}/huggingFace/models/bert-base-chinese/models--bert-base-chinese/snapshots/c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f"
    model = BertForSequenceClassification.from_pretrained(model_dir)
    tokenizer = BertTokenizer.from_pretrained(model_dir)
    classifier = pipeline("text-classification", model=model, tokenizer=tokenizer)
    result = classifier("你好,我是一款大模型")
    print(result)
  • 问答模型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    from transformers import pipeline, AutoTokenizer, AutoModelForQuestionAnswering
    model_dir = f"{绝对路径}/huggingFace/models/roberta-base-chinese-extractive-qa/models--uer--roberta-base-chinese-extractive-qa/snapshots/9b02143727b9c4655d18b43a69fc39d5eb3ddd53"
    tokenizer = AutoTokenizer.from_pretrained(model_dir)
    model = AutoModelForQuestionAnswering.from_pretrained(model_dir)
    qa_pipeline = pipeline("question-answering", model=model, tokenizer=tokenizer)
    result = qa_pipeline({
    "question":"Hugging Face 是什么",
    "context": "Hugging Face 是一个自然语言处理平台"
    })
    print(result)

背景与目标

我们之前曾评估使用过seatunnel做cdc入湖验证:seatunnel-cdc入湖实践,这些场景都是能直连数据库的场景,
业务需求中经常会出现无法直连数据库做cdc进行数据同步的场景,而这些场景就需要使用api进行数据对接,用海豚调度定时同步数据。

举个实际中的例子:

  • ERP(SAP)的库存数据进行同步入湖仓做库存分析

同时,本次目标希望其他同事能依样画葫芦,在以后的对接http接口到湖仓的时候能够自行完成,而非每遇到一个对接需求,就需要通过代码方式进行对接。

准备工作

  • seatunnel 2.3.10

    首先,您需要在${SEATUNNEL_HOME}/config/plugin_config文件中加入连接器名称,然后,执行命令来安装连接器,确认连接器在${SEATUNNEL_HOME}/connectors/目录下即可。
    本例中我们会用到:connector-jdbcconnector-paimon
    写入StarRocks也可以使用connector-starrocks,本例中的场景比较适合用connector-jdbc,所以使用connector-jdbc

    1
    2
    3
    4
    5
    6
    # 配置连接器名称
    --connectors-v2--
    connector-jdbc
    connector-starrocks
    connector-paimon
    --end--
    1
    2
    # 安装连接器
    sh bin/install-plugin.sh 2.3.10

seatunnel 任务

我们先至少保证能在本地完成seatunnel任务,再完成对海豚调度的对接

  • http to starRocks
    example/http2starrocks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    env {
    parallelism = 1
    job.mode = "BATCH"
    }

    source {
    Http {
    plugin_output = "stock"
    url = "https://ip/http/prd/query_sap_stock"
    method = "POST"
    headers {
    Authorization = "Basic XXX"
    Content-Type = "application/json"
    }
    body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
    format = "json"
    content_field = "$.ET_RETURN.*"
    schema {
    fields {
    MATNR = "string"
    MAKTX = "string"
    WERKS = "string"
    NAME1 = "string"
    LGORT = "string"
    LGOBE = "string"
    CHARG = "string"
    MEINS = "string"
    LABST = "double"
    UMLME = "double"
    INSME = "double"
    EINME = "double"
    SPEME = "double"
    RETME = "double"
    }
    }
    }
    }

    # 此转换操作主要用于字段重命名等用途
    transform {
    Sql {
    plugin_input = "stock"
    plugin_output = "stock-tf-out"
    query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
    }
    }

    # 连接starRocks 进行数据分区覆写,本例适用starRocks建表,按照分区insert overwrite 覆写
    sink {
    jdbc {
    plugin_input = "stock-tf-out"
    url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "lab"
    password = "XXX"
    compatible_mode="starrocks"
    query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""
    }
    }

    # connector-starrocks进行对接 (未看到支持sql语句进行数据insert overwrite,本例子场景不适合),比较适合表数据全部删除重建场景
    // sink {
    // StarRocks {
    // plugin_input = "stock-tf-out"
    // nodeUrls = ["ip:8030"]
    // base-url = "jdbc:mysql://ip:9030/"
    // username = "lab"
    // password = "XXX"
    // database = "scm"
    // table = "ods_sap_stock"
    // batch_max_rows = 1000
    // data_save_mode="DROP_DATA"
    // starrocks.config = {
    // format = "JSON"
    // strip_outer_array = true
    // }
    // schema_save_mode = "RECREATE_SCHEMA"
    // save_mode_create_template="""
    // CREATE TABLE IF NOT EXISTS `scm`.`ods_sap_stock` (
    // MATNR STRING COMMENT '物料',
    // WERKS STRING COMMENT '工厂',
    // LGORT STRING COMMENT '库存地点',
    // MAKTX STRING COMMENT '物料描述',
    // NAME1 STRING COMMENT '工厂名称',
    // LGOBE STRING COMMENT '地点描述',
    // CHARG STRING COMMENT '批次编号',
    // MEINS STRING COMMENT '单位',
    // LABST DOUBLE COMMENT '非限制使用库存',
    // UMLME DOUBLE COMMENT '在途库存',
    // INSME DOUBLE COMMENT '质检库存',
    // EINME DOUBLE COMMENT '受限制使用的库存',
    // SPEME DOUBLE COMMENT '已冻结的库存',
    // RETME DOUBLE COMMENT '退货'
    // ) ENGINE=OLAP
    // PRIMARY KEY ( MATNR,WERKS,LGORT)
    // COMMENT 'sap库存'
    // DISTRIBUTED BY HASH (WERKS) PROPERTIES (
    // "replication_num" = "1"
    // )
    // """
    // }
    // }
  • http to paimon
    example/http2paimon

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    env {
    parallelism = 1
    job.mode = "BATCH"
    }

    source {
    Http {
    plugin_output = "stock"
    url = "https://ip/http/prd/query_sap_stock"
    method = "POST"
    headers {
    Authorization = "Basic XXX"
    Content-Type = "application/json"
    }
    body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
    format = "json"
    content_field = "$.ET_RETURN.*"
    schema {
    fields {
    MATNR = "string"
    MAKTX = "string"
    WERKS = "string"
    NAME1 = "string"
    LGORT = "string"
    LGOBE = "string"
    CHARG = "string"
    MEINS = "string"
    LABST = "double"
    UMLME = "double"
    INSME = "double"
    EINME = "double"
    SPEME = "double"
    RETME = "double"
    }
    }
    }
    }
    # 此转换操作主要用于字段从命名等方便用途
    transform {
    Sql {
    plugin_input = "stock"
    plugin_output = "stock-tf-out"
    query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
    }
    }

    # 连接paimon进行数据同步,paimon 暂时 未看到有支持 insert overwrite 分区覆写,此例仅作为参考,不适用本此例子需求
    sink {
    Paimon {
    warehouse = "s3a://test/"
    database = "sap"
    table = "ods_sap_stock"
    paimon.hadoop.conf = {
    fs.s3a.access-key=XXX
    fs.s3a.secret-key=XXX
    fs.s3a.endpoint="http://minio:9000"
    fs.s3a.path.style.access=true
    fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
    }
    }
    }

dolphinscheduler 集成seatunnel

  • 制作worker镜像
    1
    2
    3
    4
    5
    FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:3.2.2
    RUN mkdir /opt/seatunnel
    RUN mkdir /opt/seatunnel/apache-seatunnel-2.3.10
    # 容器集成seatunnel
    COPY apache-seatunnel-2.3.10/ /opt/seatunnel/apache-seatunnel-2.3.10/
    打包镜像,推送到镜像仓库
    1
    docker build --platform=linux/amd64 -t apache/dolphinscheduler-worker:3.2.2-seatunnel .
  • 使用新镜像部署一个worker,此处修改 docker-compose.yaml,增加一个 dolphinscheduler-worker-seatunnel节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    ...
    dolphinscheduler-worker-seatunnel:
    image: xxx/dolphinscheduler-worker:3.2.2-seatunnel
    profiles: ["all"]
    env_file: .env
    healthcheck:
    test: [ "CMD", "curl", "http://localhost:1235/actuator/health" ]
    interval: 30s
    timeout: 5s
    retries: 3
    depends_on:
    dolphinscheduler-zookeeper:
    condition: service_healthy
    volumes:
    - ./dolphinscheduler-worker-seatunnel-data:/tmp/dolphinscheduler
    - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs
    - ./dolphinscheduler-shared-local:/opt/soft
    - ./dolphinscheduler-resource-local:/dolphinscheduler
    networks:
    dolphinscheduler:
    ipv4_address: 172.15.0.18
    ...
  • dolphinscheduler配置seatunnel 分组及环境配置
    • 安全中心-Worker分组管理,创建一个这个节点ip的分组,用于以后需要seatunnel的任务跑该分组
      iX5FOs
    • 环境管理-创建环境,增加一个用于执行seatunnel的环境,同时需要绑定Worker分组为上一步创建的seatunnel分组
      GXYYpj
    • 创建工作流定义,把上面的seatunnel任务配置填写上
      fwe13Y
    • 运行时候,选择seatunnel的worker分组和环境即可跑在这个集成了seatunnel的环境上
      n8Y2w9

背景

上一次我们实践了

这次我们生产实践验证将 Flink & Paimon & StarRocks & Dinky 流式湖仓分层实践验证 完整部署到生产,并且采用 Flink on k8s 的 Application 方式进行部署。

本部署方案仅提供一个部署思路,可以根据自己的实际生产情况调节

准备工作

  • 提前准备好一个k8s集群
  • 准备一个对象存储,我这里用minio,主要用于jar包的存储及下载

    minio/libs/1.20 目录下上传自己需要的项目以来jar包
    FIhbuL

  • 制作基础镜像

    新建一个extends的目录,目录下放置下面3个文件

    • dinky-app-1.20-1.2.1-jar-with-dependencies.jar 从dinky目录下的jar获取
    • mysql-connector-java-8.0.27.jar
    • Dockerfile 镜像制作文件
      目录看起来像这样:
      kh82Yi
      Dockerfile
      1
      2
      3
      4
      5
      ARG FLINK_VERSION=1.20.1
      FROM flink:${FLINK_VERSION}-scala_2.12
      ADD ./*.jar $FLINK_HOME/lib/ # 将目录下的jar包添加到镜像中的lib目录
      RUN rm -rf /opt/flink/lib/flink-table-planner-loader-*.jar # 删除flink-table-planner-loader
      RUN mv /opt/flink/opt/flink-table-planner_2.12-*.jar /opt/flink/lib # 添加flink-table-planner_2.12

      dinky需要替换planner-loader为 planner的原因是血缘分析、语法增强等功能使用到了planner的依赖,但是loader是独立加载的,所以导致dinky无法使用planner的依赖

    • 打包镜像,推送到镜像仓库
      1
      2
      3
      docker build --platform linux/amd64 -t dinky-flink:1.2.1-1.20.1 . --no-cache 
      docker tag dinky-flink:1.2.1-1.20.0 XXX/dinky-flink:1.2.1-1.20.1
      docker push XXX/dinky-flink:1.2.1-1.20.1
      到此我们就制作好了我们的基础镜像。

实践

  • 正确在dinky创建和填写集群配置
    4BqhoQ
    rfiIy9
    • Flink镜像地址 填写上面步骤打包好的镜像地址
    • k8s kubeConfig 填写k8s的config文件
    • Default Pod Template

      我们参考官网Example of Pod Template,编写pod template
      <URL> <ACCESSKEY> <SECRETKEY> 请填写自己的minio地址
      initContainers 的作用是将minio中的jar包下载下来,并挂载到flink的lib目录下
      tolerations 是我们node节点打了污点,不然随随便便的pod运行,某个节点专供flink跑任务

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      apiVersion: v1
      kind: Pod
      metadata:
      name: pod-template
      spec:
      initContainers:
      - command:
      - sh
      - -c
      - mc alias set minio <URL> <ACCESSKEY> <SECRETKEY>
      && mc cp --recursive minio/libs/1.20/ /tmp/ext-lib
      && mc ls minio
      image: minio/mc:RELEASE.2025-02-15T10-36-16Z
      imagePullPolicy: IfNotPresent
      name: init-lib
      volumeMounts:
      - mountPath: /tmp/ext-lib
      name: ext-lib
      tolerations:
      - key: "env"
      operator: "Equal"
      value: "prod"
      effect: "NoSchedule"
      containers:
      - name: flink-main-container
      imagePullPolicy: Always
      volumeMounts:
      - mountPath: /opt/flink/lib/ext-lib
      name: ext-lib
      volumes:
      - name: ext-lib
      emptyDir: {}
    • 保存点路径检查点路径

      这里建议填写一个持久化的地址,可以创建一个pvc挂载上去目录

    • Jar文件路径

      这里填写 local:///opt/flink/lib/dinky-app-1.20-1.2.1-jar-with-dependencies.jar

  • 运行paimon cdc 任务,让其以application模式跑在 k8s上
    WYVxHu

vOjcqn

背景

Ilnldj
在实际生产中,我们经常会有这样的需求,需要以原始数据流作为基础,然后关联大量的外部表来补充一些属性。例如,我们在订单数据中,希望能得到订单的支付信息和产品分类信息。
特点是:

  • 支付信息数据量大
  • 产品分类信息数据量少,修改不是太频繁

打宽数据时,我们面临的局限性:

  • 双流join存储所有state成本过高。

本篇我们来实践验证一下 部分列更新Partial Update + Lookup Join 打宽做分层的可行性。降低一下流计算压力。

准备

  • 源数据准备
    MySql创建名称为order_dw的数据库,创建三张表,并插入相应数据。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    CREATE TABLE `orders` (
    order_id bigint not null primary key,
    user_id varchar(50) not null,
    shop_id bigint not null,
    product_id bigint not null,
    buy_fee bigint not null,
    create_time timestamp not null,
    update_time timestamp not null default now(),
    state int not null
    );

    CREATE TABLE `orders_pay` (
    pay_id bigint not null primary key,
    order_id bigint not null,
    pay_platform int not null,
    create_time timestamp not null
    );

    CREATE TABLE `product_catalog` (
    product_id bigint not null primary key,
    catalog_name varchar(50) not null
    );

    -- 准备数据
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');

    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);

    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');
  • 包准备
    • flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    • paimon-s3-1.0.1.jar
    • paimon-flink-action-1.0.1.jar
    • paimon-flink-1.20-1.0.1.jar
    • mysql-connector-java-8.0.27.jar
    • flink-sql-connector-mysql-cdc-3.3.0.jar
      以上文件放进去 dinky 的 opt/dinky/customJar 以及对应flink集群的 lib 目录下。(该重启重启,该重新加载包重新加载包)

ODS 业务数据库实时入湖

  • 原始flink 命令行提交
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    ./bin/flink run -d \
    ./lib/paimon-flink-action-1.0.1.jar \
    mysql_sync_database \
    --warehouse s3://lakehouse/paimon \
    --database order_dw \
    --mysql_conf hostname=XXX \
    --mysql_conf port=63950 \
    --mysql_conf server-time-zone=Asia/Shanghai \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=order_dw \
    --catalog_conf s3.endpoint=http://192.168.103.113:9010 \
    --catalog_conf s3.path.style.access=true \
    --catalog_conf s3.access-key=XXX \
    --catalog_conf s3.secret-key=XXX \
    --table_conf bucket=1 \
    --table_conf changelog-producer=input \
    --table_conf sink.parallelism=1
  • dinky提交方式
    WSTA5i
  • 创建一个flink jar任务,并且在“资源” 上传 paimon-flink-action-1.0.1.jar
  • 正确填写jar包运行参数
    • 正确选择 “程序路径” 为刚上传的jar包;
    • 程序运行类填写 org.apache.paimon.flink.action.FlinkActions;
    • 程序运行参数填写

DWD 清洗打宽

dinky创建dwd_orders的flink sql任务
gno0hu

  • 部分列更新 partial-update + 维表 lookup join 打宽
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    -- 定义paimon catalog
    CREATE CATALOG paimoncatalog
    WITH
    (
    'type' = 'paimon',
    'warehouse' = 's3://lakehouse/paimon',
    's3.endpoint' = 'http://192.168.103.113:9010',
    's3.access-key' = 'XXX',
    's3.secret-key' = 'XXX'
    );

    USE CATALOG paimoncatalog;
    -- 创建 dwd_orders表
    CREATE TABLE IF NOT EXISTS order_dw.dwd_orders (
    order_id BIGINT,
    order_user_id STRING,
    order_shop_id BIGINT,
    order_product_id BIGINT,
    order_product_catalog_name STRING,
    order_fee BIGINT,
    order_create_time TIMESTAMP,
    order_update_time TIMESTAMP,
    order_state INT,
    pay_id BIGINT,
    pay_platform INT COMMENT 'platform 0: phone, 1: pc',
    pay_create_time TIMESTAMP,
    PRIMARY KEY (order_id) NOT ENFORCED
    )
    WITH
    (
    'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表
    'partial-update.remove-record-on-delete' = 'true', -- 让partial-update支持删除
    'changelog-producer' = 'full-compaction' -- 使用full-compaction或lookup增量数据产生机制以低延时产出变更数据
    );

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';

    -- 订单表关联产品分类 打宽
    INSERT INTO order_dw.dwd_orders
    SELECT
    o.order_id,
    o.user_id,
    o.shop_id,
    o.product_id,
    dim.catalog_name,
    o.buy_fee,
    o.create_time,
    o.update_time,
    o.state,
    CAST(NULL AS BIGINT) AS pay_id,
    CAST(NULL AS INT) AS pay_platform,
    CAST(NULL AS TIMESTAMP) AS pay_create_time
    FROM
    (
    SELECT *, PROCTIME() AS proctime FROM order_dw.orders
    ) o
    LEFT JOIN order_dw.product_catalog FOR SYSTEM_TIME AS OF o.proctime AS dim ON o.product_id = dim.product_id -- lookup join 维表
    UNION ALL -- Paimon目前暂不支持在同一个作业里通过多条INSERT语句写入同一张表,因此这里使用UNION ALL
    -- 订单支付表 打宽
    SELECT
    order_id,
    CAST(NULL AS STRING) AS user_id,
    CAST(NULL AS BIGINT) AS shop_id,
    CAST(NULL AS BIGINT) AS product_id,
    CAST(NULL AS STRING) AS order_product_catalog_name,
    CAST(NULL AS BIGINT) AS order_fee,
    CAST(NULL AS TIMESTAMP) AS order_create_time,
    CAST(NULL AS TIMESTAMP) AS order_update_time,
    CAST(NULL AS INT) AS order_state,
    pay_id,
    pay_platform,
    create_time
    FROM
    order_dw.orders_pay;
    oTtvEK

s1AjzS

DWS 指标计算

我们目标创建DWS层的聚合表dws_users以及dws_shops,但是为了同时计算用户视角的聚合表以及商户视角的聚合表,我们额外创建一个以用户 + 商户为主键的中间表 dwm_users_shops

  • 创建名为dwm_users_shops的SQL流作业,利用Paimon表的预聚合数据合并机制,自动对order_fee求和,算出用户在商户的消费总额。同时,自动对1求和,也能算出用户在商户的消费次数。
    WYDcWZ
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    CREATE CATALOG paimoncatalog
    WITH
    (
    'type' = 'paimon',
    'warehouse' = 's3://lakehouse/paimon',
    's3.endpoint' = 'http://192.168.103.113:9010',
    's3.access-key' = 'XXX',
    's3.secret-key' = 'XXX'
    );

    USE CATALOG paimoncatalog;

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';

    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';

    -- 为了同时计算用户视角的聚合表以及商户视角的聚合表,另外创建一个以用户 + 商户为主键的中间表。
    CREATE TABLE IF NOT EXISTS order_dw.dwm_users_shops (
    user_id STRING,
    shop_id BIGINT,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额',
    pv BIGINT COMMENT '当日用户在商户购买的次数',
    PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
    ) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    'fields.pv.aggregate-function' = 'sum', -- 对 pv 的数据求和产生聚合结果
    'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据
    -- dwm层的中间表一般不直接提供上层应用查询,因此可以针对写入性能进行优化。
    'file.format' = 'avro', -- 使用avro行存格式的写入性能更加高效。
    'metadata.stats-mode' = 'none' -- 放弃统计信息会增加OLAP查询代价(对持续的流处理无影响),但会让写入性能更加高效。
    );

    INSERT INTO order_dw.dwm_users_shops
    SELECT
    order_user_id,
    order_shop_id,
    DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
    order_fee,
    1 -- 一条输入记录代表一次消费
    FROM order_dw.dwd_orders
    WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
    FeWBHr
    JnwTVK
  • 创建SQL作业 dws_shops & dws_users
    a0sZjg
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    CREATE CATALOG paimoncatalog
    WITH
    (
    'type' = 'paimon',
    'warehouse' = 's3://lakehouse/paimon',
    's3.endpoint' = 'http://192.168.103.113:9010',
    's3.access-key' = 'XXX',
    's3.secret-key' = 'XXX'
    );

    USE CATALOG paimoncatalog;

    -- 用户维度聚合指标表。
    CREATE TABLE IF NOT EXISTS order_dw.dws_users (
    user_id STRING,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额',
    PRIMARY KEY (user_id, ds) NOT ENFORCED
    ) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    -- 由于dws_users表不再被下游流式消费,因此无需指定增量数据产生机制
    );

    -- 商户维度聚合指标表。
    CREATE TABLE IF NOT EXISTS order_dw.dws_shops (
    shop_id BIGINT,
    ds STRING,
    payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额',
    uv BIGINT COMMENT '当日不同购买用户总人数',
    pv BIGINT COMMENT '当日购买用户总人次',
    PRIMARY KEY (shop_id, ds) NOT ENFORCED
    ) WITH (
    'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
    'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
    'fields.uv.aggregate-function' = 'sum', -- 对 uv 的数据求和产生聚合结果
    'fields.pv.aggregate-function' = 'sum' -- 对 pv 的数据求和产生聚合结果
    -- 由于dws_shops表不再被下游流式消费,因此无需指定增量数据产生机制
    );

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';

    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';

    INSERT INTO order_dw.dws_users
    SELECT
    user_id,
    ds,
    payed_buy_fee_sum
    FROM order_dw.dwm_users_shops;

    -- 以商户为主键,部分热门商户的数据量可能远高于其他商户。
    -- 因此使用local merge在写入Paimon之前先在内存中进行预聚合,缓解数据倾斜问题。
    INSERT INTO order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
    SELECT
    shop_id,
    ds,
    payed_buy_fee_sum,
    1, -- 一条输入记录代表一名用户在该商户的所有消费
    pv
    FROM order_dw.dwm_users_shops;
    Mzglef
    5oGgHE
    RS5JtQ

ADS 物化视图StarRocks使用

ADS 这边使用StarRocks进行查询paimon 数据,并且构建物化视图

  • 添加paimon catalog
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE EXTERNAL CATALOG `paimon_lab`
    PROPERTIES (
    "paimon.catalog.type" = "filesystem",
    "aws.s3.access_key" = "XXX",
    "aws.s3.secret_key" = "XXX",
    "aws.s3.endpoint" = "http://172.16.32.16:2002",
    "type" = "paimon",
    "paimon.catalog.warehouse" = "s3://lakehouse/paimon"
    )
  • 查询
    1
    select * from paimon_lab.order_dw.dws_users
    d5VhQg
  • 物化视图
    1
    2
    3
    CREATE MATERIALIZED VIEW ads_users
    AS
    select * from paimon_lab.order_dw.dws_users

结束

使用4个job完成本例验证
hHiqQW

相关

优刻得:使用USDP实践近实时数据湖仓

背景

云原生flink流计算平台解决方案验证

该架设方案全部基于云原生k8s,通俗讲就是 flink任务跑在k8s上

环境要求

k8s部署的话可以看看 k8s-1.25.4部署笔记(containerd)

  • 前提条件
    • Kubernetes 版本 >= 1.9
      1
      2
      3
      4
      ➜  ~ kubectl version --short
      Client ➜ ~ Version: v1.24.4
      Kustomize Version: v4.5.4
      Server Version: v1.24.4
    • 确保您的 ~/.kube/config 文件已正确配置以访问 Kubernetes 集群
      1
      2
      3
      4
      5
      6
      7
      ➜  ~ export KUBECONFIG=~/.kube/config
      ➜ ~ kubectl get nodes
      NAME STATUS ROLES AGE VERSION
      k8s-master2 Ready control-plane 9d v1.24.4
      k8s-node1 Ready <none> 9d v1.24.4
      k8s-node2 Ready <none> 9d v1.24.4
      k8s-node3 Ready <none> 25h v1.24.4
    • 是否启用 Kubernetes DNS正常
      1
      2
      3
      ➜  ~ kubectl cluster-info 
      Kubernetes control plane is running at https://192.168.103.201:6443
      CoreDNS is running at https://192.168.103.201:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
    • 账户具有 RBAC 权限,确保您的命名空间中的 <nameSpace> 服务账户具有创建和删除 Pod 的必要 RBAC 权限。我创建新的命名空间为flink-native
      1
      2
      3
      kubectl create namespace flink-native     
      kubectl create serviceaccount flink-sa -n flink-native
      kubectl create clusterrolebinding flinknative-role-binding-flinknative -n flink-native --clusterrole=edit --serviceaccount=flink-native:flink-sa
  • 在k8s中启动flink集群

    flink1.20

    1
    2
    3
    4
    5
    6
    7
    8
    ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=flink-cluster1 \
    -Dtaskmanager.memory.process.size=4096m \
    -Dkubernetes.taskmanager.cpu=2 \
    -Dtaskmanager.numberOfTaskSlots=4 \
    -Dkubernetes.namespace=flink-native \
    -Dkubernetes.service-account=flink-sa \
    -Dresourcemanager.taskmanager-timeout=3600000
  • 关闭集群
    1
    kubectl delete deployment/flink-cluster1

Dinky 流计算平台部署(helm)

  • 创建pvc

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
    name: dinky-config-volume
    namespace: data-center
    spec:
    storageClassName: nfs-client
    accessModes:
    - ReadWriteMany
    resources:
    requests:
    storage: 5Gi
    ---
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
    name: dinky-lib-volume
    namespace: data-center
    spec:
    storageClassName: nfs-client
    accessModes:
    - ReadWriteMany
    resources:
    requests:
    storage: 5Gi
    ---
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
    name: dinky-resource-volume
    namespace: data-center
    spec:
    storageClassName: nfs-client
    accessModes:
    - ReadWriteMany
    resources:
    requests:
    storage: 5Gi
    • dinky-config-volume 用于放置配置文件(helm 包内的conf目录文件)
      XQW5uU
    • dinky-lib-volume 用于放置自定义jar包,映射的/opt/dinky/customJar/
      b2alEN
  • 调整helm包

    • 部署文件

      helm包经久未维护,我改了下

    • dinky.yaml 增加volumes:
      1
      2
      3
      4
      volumes:
      - name: dinky-lib-volume
      persistentVolumeClaim:
      claimName: dinky-lib-volume
    • dinky.yaml 增加volumeMounts:
      1
      2
      3
      volumeMounts:
      - mountPath: /opt/dinky/customJar
      name: dinky-lib-volume
    • dinky.yaml 修正auto.sh目录位置错误,原来是/opt/dinky/auto.sh
      1
      2
      3
      4
      5
      command:
      - /bin/bash
      - '-c'
      - >-
      /opt/dinky/bin/auto.sh startOnPending {{ .Values.spec.extraEnv.flinkVersion}}
    • values.yaml 配置mysql
      1
      2
      3
      4
      5
      6
      7
      mysql:
      enabled: true
      url: "192.168.103.113:3306"
      auth:
      username: "root"
      password: "XXX"
      database: "dinky"
  • 部署

    1
    2
    helm install dinky . -f values.yaml -n data-center
    helm uninstall dinky -n data-center
  • 在dinky内增加刚刚创建的Flink Native Kubernetes集群
    LoPzn6

流计算实践

实践1: mysql cdc connector 写入 paimon

dinky基于flink sql的作业类型
打开dinky页面,新建Flink Sql任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
EXECUTE CDCSOURCE demo WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\..*',
'sink.connector' = 'sql-catalog',
'sink.catalog.name' = 'fts',
'sink.catalog.type' = 'table-store',
'sink.catalog.warehouse'='file:/tmp/table_store',
'sink.auto-create' = 'true', -- 可自动paimon建表
);

实践2: paimon cdc 写入 paimon

dinky基于flink jar的作业类型 (paimon-flink-action-1.0.0.jar
打开dinky页面,新建Flink jar任务

  • 原始提交命令:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ./bin/flink run \
    ./lib/paimon-flink-action-0.9.0.jar \
    mysql_sync_database \
    --warehouse s3://lakehouse-1253767413/paimon \
    --database app_db \
    --mysql_conf hostname=192.168.103.113 \
    --mysql_conf username=root \
    --mysql_conf password=XXX \
    --mysql_conf database-name=app_db \
    --catalog_conf s3.endpoint=cos.ap-guangzhou.myqcloud.com \
    --catalog_conf s3.access-key=XXX \
    --catalog_conf s3.secret-key=XXX \
    --table_conf bucket=1
  • dinky作业

    通过资源中心上传paimon-flink-action-0.9.0.jar包,然后按照上面原始命令分别填写程序路径程序运行类程序运行参数
    plpJLv

dinky基于flink sql的作业类型
打开dinky页面,新建Flink Sql任务

待验证 刚发布的flink cdc 3.3是否现在可以写入paimon,以前验证flink cdc pipline是无法成功写paimon paimon cdc入湖 & StarRocks湖仓分析实践
示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
SET 'execution.checkpointing.interval' = '10s';
EXECUTE PIPELINE WITHYAML (
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404

sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse

pipeline:
name: MySQL to Paimon Pipeline
parallelism: 1
)

其他相关参考

Native Kubernetes
Flink 1.10 Native Kubernetes 原理与实践
Flink on Kubernetes - Native Kubernetes - 配置基础环境
Flink CDC+Dinky同步到paimon(HDFS)
FlinkCDC pipline+Dinky整库同步StarRocks

0%