StarRocks存算分离之Prometheus基于Service注解的服务发现
背景
我们前期根据kube-prometheus-stack部署实践进行了监控的部署,并且很好的对k8s集群的各项指标进行了grafana可视化监控。
但是我们还有一个监控需求来源于数仓,日常管理数仓中,我会出现如下几个需求点:
- 缓存数据到磁盘,
这个需求源于我们使用的TKE使用的腾讯云的CFS作为存储,而CFS是按量收费的,那么StarRocks缓存到磁盘到底占用的多少磁盘空间,以及是否需要清理,就迫在眉睫 - 数仓与对象储存流量情况
我们需要日常关注StarRocks与对象存储的流量带宽情况 - 物化视图的成功与否及监控告警
StarRocks中创建了非常多的物化视图,而这些物化视图的成功失败及时间节点,需要更好的监控到位
基于以上需求,我们来尝试解决这些问题
StarRocks配置prometheus metrics scrape
根据 StarRocks Cluster Integration With Prometheus and Grafana Service 指南,我们先给StarRocks配置好 prometheus metrics scrape
我是根据operator安装的而非helm,所以根据文档我的配置如下:
重点关注
spec.starRocksBeSpec.service.annotations
、spec.starRocksFeSpec.service.annotations
1 | piVersion: starrocks.com/v1 |
根据 Service 注解动态采集 参考
prometheus-additional.yaml
1 | - job_name: 'StarRocks_Cluster' |
1 | scrape_configs: |
1 | scrape_configs: |
kube-prometheus-stack 采集配置方法
如果你使用 kube-prometheus-stack 来安装 Prometheus,需要在 additionalScrapeConfigs或者additionalScrapeConfigsSecret里加上采集配置,示例:
- 在additionalScrapeConfigsSecret配置
1
kubectl create secret generic additional-configs --from-file=prometheus-additional.yaml -n monitoring
1
2
3
4
5additionalScrapeConfigsSecret:
enabled: true
name: additional-configs
key: prometheus-additional.yaml - 在additionalScrapeConfigs配置配置好后,我们到Prometheus web界面观察,发现已经正常在采集了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37prometheus:
prometheusSpec:
additionalScrapeConfigs:
- job_name: 'StarRocks_Cluster'
kubernetes_sd_configs:
- role: endpoints
relabel_configs:
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scheme]
action: replace
target_label: __scheme__
regex: (https?)
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
action: replace
target_label: __address__
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
- action: labelmap
regex: __meta_kubernetes_service_label_(.+)
- source_labels: [__meta_kubernetes_namespace]
action: keep
regex: starrocks # 过滤starrocks命名空间
- source_labels: [__meta_kubernetes_namespace]
action: replace
target_label: kubernetes_namespace
- source_labels: [__meta_kubernetes_service_name]
action: replace
target_label: kubernetes_name
- source_labels: [__meta_kubernetes_pod_name]
action: replace
target_label: kubernetes_pod_name
Grafana 监控可视化展示 坑
按照文档Import StarRocks Grafana Dashboard,导入Grafana模板,发现毛数据都木有,哈哈哈🤣,至此等待StarRocks官方修复。
我们来试试其他几个模板
Dashboard 模板
其他参考
kube-prometheus-stack部署实践
Environment
- Kubernetes 集群
需要一个已经部署完成且可用的Kubernetes 1.16+集群。 - Helm
helm version v3+
Steps
添加 Prometheus chart repo 到 Helm
1
2helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update查看版本信息
1
2
3$ helm search repo prometheus-community/kube-prometheus-stack
NAME CHART VERSION APP VERSION DESCRIPTION
prometheus-community/kube-prometheus-stack 57.0.2 v0.72.0 kube-prometheus-stack collects Kubernetes manif...将仓库拉取到本地
1
helm pull prometheus-community/kube-prometheus-stack
修改values.yaml
- 配置简单的本地 NFS 存储卷
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55$ vim ./values.yaml
---
## Storage is the definition of how storage will be used by the Alertmanager instances.
storage:
volumeClaimTemplate:
spec:
storageClassName: nfs-client
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi
...
grafana:
enabled: true
namespaceOverride: ""
defaultDashboardsTimezone: Asia/Shanghai
adminPassword: prom-operator
persistence:
enabled: true
type: pvc
storageClassName: nfs-client
accessModes:
- ReadWriteOnce
size: 20Gi
finalizers:
- kubernetes.io/pvc-protection
...
# 配置Prometheus持久化NFS存储
prometheus:
prometheusSpec:
podMonitorSelectorNilUsesHelmValues: false
serviceMonitorSelectorNilUsesHelmValues: false
## Prometheus StorageSpec for persistent data
storageSpec: {}
## Using PersistentVolumeClaim
volumeClaimTemplate:
spec:
storageClassName: nfs-client
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 20Gi
...
## Storage is the definition of how storage will be used by the ThanosRuler instances.
## ref: https://github.com/prometheus-operator/prometheus-operator/blob/main/Documentation/user-guides/storage.md
##
storage:
volumeClaimTemplate:
spec:
storageClassName: nfs-client
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 5Gi - 配置grafana ingress
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36ingress:
## If true, Grafana Ingress will be created
##
enabled: true
## IngressClassName for Grafana Ingress.
## Should be provided if Ingress is enable.
##
ingressClassName: nginx
## Annotations for Grafana Ingress
##
annotations:
kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
## Labels to be added to the Ingress
##
labels: {}
## Hostnames.
## Must be provided if Ingress is enable.
##
# hosts:
# - grafana.domain.com
hosts:
- grafana.dev.XXX.cn
## Path for grafana ingress
path: /
## TLS configuration for grafana Ingress
## Secret must be manually created in the namespace
##
tls: []
# - secretName: grafana-general-tls
# hosts:
# - grafana.example.com
- 配置简单的本地 NFS 存储卷
使用 Helm 更新版本重新部署
1
helm upgrade prometheus --namespace monitoring --create-namespace -f values.yaml .
查看资源组件情况
1
2
3
4
5
6
7
8
9
10
11kubectl get pod -n monitoring
NAME READY STATUS RESTARTS AGE
alertmanager-prometheus-kube-prometheus-alertmanager-0 2/2 Running 0 18m
prometheus-grafana-0 3/3 Running 0 10m
prometheus-kube-prometheus-operator-546f866469-rvssk 1/1 Running 1 (15h ago) 18h
prometheus-kube-state-metrics-868cc5957b-9lgt5 1/1 Running 1 (15h ago) 18h
prometheus-prometheus-kube-prometheus-prometheus-0 2/2 Running 0 18m
prometheus-prometheus-node-exporter-577kd 1/1 Running 1 (15h ago) 18h
prometheus-prometheus-node-exporter-f5g8r 1/1 Running 1 (15h ago) 18h
prometheus-prometheus-node-exporter-gkhmw 1/1 Running 0 18h
prometheus-prometheus-node-exporter-lql7g 1/1 Running 0 18h
Usage
- 我们可以通过Grafana ingress 地址进行访问,浏览Grafana仪表板
- 可以通过prometheus 9090端口的web界面进行访问查看prometheus信息
StarRocks之使用Routine Load实现CDC实践
背景
在 基于debezium(mongo)-kafka-starrocks的cdc实践,我们使用StarRocks Connector for Kafka
将debezium格式的数据实时写入到starrocks。这次我想尝试一下Routine Load
是否能够将debezium json
数据(带有删除标记)写入到starrocks。
我们明确一下目标:
- 支持将kafka中debezium json 格式的数据导入,支持增删改
- 支持日期按需转换
那么我们使用debezium将源数据导入到kafka,在kafka中造一下debezium json格式的数据。
源表数据构建
我特意在建表的时候包含了:日期时间datetime、日期date、布尔tinyint、整形int、字符串varchar,以此来验证这些类型的数据经过Routine Load,是如何进行转换的,我们重点关注不同时间格式的转换,因为在进入kafka中,时间格式的数据都被转换成了不同的时间戳。
1 | CREATE TABLE `test_date` ( |
使用debezium将源数据写入到kafka
1 | { |
得到kafka中如下结构的2条数据
1 | { |
1 | { |
我们观察一下kafka数据:
__deleted
标记了是否删除,类型是字符串,c_datetime
、c_date
、c_timestamp
进入到了kafka后,变成了不同的数据。- mysql
datetime
类型:最终转成了时间戳(1970年01月01日0时0分0秒到指定日期的毫秒数),形如:1730479999000
。 - mysql
date
类型:最终转成了天数(1970年01月01日到指定日期的天数),形如:17648
。 - mysql
timestamp
类型:最终转成了UTC时间,形如:2024-11-01T08:53:19Z
。 - mysql
time
类型:最终转成了微妙数(将小时转成了微妙),形如:34341000000
。 (该例未展示)
- mysql
这里就产生了一个疑问🤔,这些数据如何通过Routine Load
导入转换正确的数据进入到StarRocks,并且支持删除?
使用Routine Load导入StarRocks
StarRocks创建目标主键表
1
2
3
4
5
6
7
8
9
10
11CREATE TABLE ods_example (
id INT NOT NULL,
c_name string,
c_datetime datetime,
c_date date,
c_married BOOLEAN,
c_age int,
c_timestamp datetime
)
PRIMARY KEY (id)
DISTRIBUTED BY HASH (id);StarRocks创建导入,首先分析一下
支持删除的导入
:在starrocks无非就是__op
标识,当数据中带有__op:1
的时候,做DELETE操作,当数据中带有__op:0
的时候,就是更新操作,那么我们其实就是转换一下kafka中debezium json 格式的删除标识,由__deleted: "true"
转换成__op:1
即可实现数据支持删除的cdc,具体可以看看文档 通过导入实现数据变更时间的转换
:因为在kafka内的时间都是时间戳,所以使用starrocks的from_unixtime和时区转换convert_tz,根据自己需求转换即可。
来试一下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27CREATE ROUTINE LOAD test.ods_example_load_32 ON ods_example
COLUMNS (
id,
c_name,
temp_datetime,
temp_date,
c_married,
c_age,
temp_timestamp,
temp_deleted,
__op= cast( cast( temp_deleted AS BOOLEAN) AS TINYINT),
c_timestamp = convert_tz(temp_timestamp,'+00:00', '+08:00'),
c_datetime = convert_tz(from_unixtime( cast( temp_datetime AS BIGINT )/ 1000 ),'+00:00', '-08:00'),
c_date = from_unixtime( cast( temp_date AS BIGINT )* 24 * 60 * 60, '%Y%m%d' ))
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.c_name\",\"$.c_datetime\",\"$.c_date\",\"$.c_married\",\"$.c_age\",\"$.c_timestamp\",\"$.__deleted\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "172.16.0.51:9092",
"kafka_topic" = "mysql-example.test.example",
"kafka_partitions" = "0,1,2",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
至此,完美支持替代StarRocks Connector for Kafka
来做实时cdc导入。但是我有个疑问,假设我有100个字段,COLUMNS和jsonpaths就得对齐100次??留着再研究吧
其他资料
debezium深度解读单消息转换SMT
Kafka Connect是Apache Kafka®的一部分,在Kafka和其它系统之间提供可靠的、可扩展的分布式流式集成。Kafka Connect具有可用于许多系统的连接器,它是一个配置驱动的工具,不需要编码。
Kafka Connect API还提供了一个简单的接口,用于处理从源端通过数据管道到接收端的记录,该API称为单消息转换(SMT),顾名思义,当数据通过Kafka Connect连接器时,它可以对数据管道中的每条消息进行操作。
连接器分为源端或接收端,它们要么从Kafka上游的系统中提取数据,要么向Kafka的下游推送数据。这个转换可以配置为在任何一侧进行,源端连接器可以在写入Kafka主题之前对数据进行转换,接收端连接器也可以在将数据写入接收端之前对其进行转换。
转换的一些常见用途是:
- 对字段重命名;
- 掩蔽值;
- 根据值将记录路由到主题;
- 将时间戳转换或插入记录中;
- 操作主键,例如根据字段的值设置主键。
Kafka自带了许多转换器,但是开发自定义的转换器也非常容易。
配置Kafka Connect的单消息转换
需要给转换器指定一个名字,该名字将用于指定该转换器的其他属性。例如,下面是JDBC源端利用RegexRouter转换器的配置片段,该转换器将固定字符串附加到要写入的主题的末尾:
1 | { |
该转换器被命名为routeRecords,且在后续中用于传递属性。注意,上面的示例显示了RegexRouter的两个配置属性:正则表达式regex和匹配组引用replacement。此设置将从JDBC源端获取表名,并将其加上-test后缀。根据转换器的功能不同,也可能会有不同的配置属性,具体可以参见相关的文档。
执行多次转换
有时需要执行多次转换,Kafka Connect支持定义多个转化器,他们在配置中链接在一起。这些消息按照在transforms属性中定义的顺序执行转换。
下面的转换使用ValueToKey转换器将值转换为主键,并使用ExtractField转换器仅使用ID整数值作为主键:
1 | “transforms”:”createKey,extractInt”, |
注意,使用上述$Key
符号,会指定此转换将作用于记录的Key
,如果要针对记录的Value
,需要在这里指定$Value
。最后ConnectRecord看起来像这样:
1 | key value |
转换后:
1 | key value |
单消息转换深入解读
下面深入地看下连接器如何处理数据。转换器被编译为JAR,并通过Connect工作节点的属性文件中的plugin.path属性,指定其可用于Kafka Connect,安装后就可以在连接器属性中配置转换。
配置和部署后,源端连接器将从上游系统接收记录,将其转换为ConnectRecord,然后将该记录传递给配置的转换器的apply()函数,然后等待返回记录。接收端连接器也是执行类似的过程,从Kafka主题读取并反序列化每个消息之后,将调用转换器的apply()方法,并将结果记录发送到目标系统。
如何开发单消息转换器
要开发将UUID插入每个记录的简单转换器,需要逐步执行以下的步骤。
apply方法是转换器的核心,这种转换支持带有模式和不带有模式的数据,因此每个都有一个转换:
1 | @Override |
此转换器可以应用于记录的键或值,因此需要实现Key和Value子类,其扩展了InsertUuid类并实现apply方法调用的newRecord方法:
1 | public static class Key<R extends ConnectRecord<R>> extends InsertUuid<R> { |
该转换器仅改变了模式和值,但是要注意其可以操纵ConnectRecord的所有部分:Key、Value、Key和Value的模式、目标主题、目标分区和时间戳。
该转换器具有可选的参数,这些参数可以在运行时配置,并可以通过转换器类中重写的configure()方法访问:
1 | @Override |
如上所示,该Transformation接口很简单,它实现了一个apply()方法来接收ConnectRecord然后再返回ConnectRecord,它可以选择通过configure()方法接收参数。
接下来,编译此JAR并将其放入Connect工作节点中plugin.path指定的路径中。注意需要将转换器所依赖的任何依赖项打包到它的路径中或编译为胖JAR。然后在连接器配置中调用它,如下所示(注意$Value内部类约定,以指示此转换应作用于记录的值):
1 | transforms=insertuuid |
相关
基于debezium(sqlserver)-kafka-starrocks跨机房cdc实践
目标需求
机房A
:sqlserver分库分表机房B(腾讯云)
:kafka,starrocks数仓
将sqlserver表数据流入kafka,流出到starrocks
db
:SCPRD
schema
:test2,test3
table
:test2.user,test3.user
column
:id,name,age
其中column
字段只想进id
和name
,过滤age
前置配置
- 开启
db:SCPRD
的数据库cdc1
2
3
4
5use SCPRD
GO
EXEC sys.sp_cdc_enable_db
GO - 开启
table:test2.user,test3.user
的表cdc1
2
3
4
5
6
7
8
9
10
11USE SCPRD
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'test2', -- 指定源表所属的 schema 名
@source_name = N'user', -- 指定需要读取的源表名
@role_name = N'cdc_role',
@filegroup_name = NULL,
@supports_net_changes = 1,
@captured_column_list = N'[id],[name]'
GO为表启用CDC后,SQLServer生成两个Agent作业1
2
3
4
5
6
7
8
9
10
11USE SCPRD
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'test3', -- 指定源表所属的 schema 名
@source_name = N'user', -- 指定需要读取的源表名
@role_name = N'cdc_role',
@filegroup_name = NULL,
@supports_net_changes = 1,
@captured_column_list = N'[id],[name]'
GOcdc.dbname_capture
、cdc.dbname_cleanup
实践
部署的方案有很多,如果能打通机房A和机房B的网络,那是最优的,无法打通的情况我们选择如下2个方案。
- 优先方案:选择在机房A部署
debezium/connect
,远程连接机房B(腾讯云)的kafka,- 备选方案:选择在机房B(腾讯云)部署
debezium/connect
,远程连接机房A的sqlserver
- 机房A部署
debezium/connect
因为跨机房,所有使用SASL_PLAINTEXT接入腾讯云kafka稳妥稳定一点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24version: '2'
services:
connect:
image: ccr.ccs.tencentyun.com/da-debezium/connect:2.6.2.Final
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: 159.75.194.XXX:50001
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
GROUP_ID: connect-cluster
CONFIG_STORAGE_TOPIC: config.storage.topic
OFFSET_STORAGE_TOPIC: offset.storage.topic
STATUS_STORAGE_TOPIC: status.storage.topic
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_CONSUMER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
networks:
kafka_network:
name: debezium - 创建sqlserver connector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30{
"name": "source-sqlserver-user-cdc_v1",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "192.168.103.XX",
"database.port": "1433",
"database.user": "sa",
"database.password": "XXX",
"database.names": "SCPRD",
"topic.prefix": "sqlserver-user-cdc_v1",
"schema.include.list": "test2,test3",
"table.include.list": "test2.user,test3.user",
"column.include.list": "test(.*).user.id,test(.*).user.name",
"schema.history.internal.kafka.bootstrap.servers": "159.75.194.XXX:50001",
"schema.history.internal.kafka.topic": "schemahistory.sqlserver-user-cdc_v1",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER_NAME\" password=\"PWD\";",
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER_NAME\" password=\"PWD\";",
"schema.history.internal.store.only.captured.tables.ddl": true,
"schema.history.internal.store.only.captured.databases.ddl": true,
"database.encrypt": "false",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*)test(.*)user",
"transforms.Reroute.topic.replacement": "$1user"
}
}
运维常见命令
为数据库 SCPRD 启用、关闭 CDC
开启注意:数据库开启cdc功能,注意不能为master数据库启动该功能
1
2
3
4
5use SCPRD
GO
EXEC sys.sp_cdc_enable_db
GO关闭
1
2
3
4
5USE SCPRD
GO
exec sys.sp_cdc_disable_db
GO为 SQLServer 源表启用、关闭变更数据捕获
开启1
2
3
4
5
6
7
8
9
10
11USE SCPRD
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'test2', -- 指定源表所属的 schema 名
@source_name = N'user', -- 指定需要读取的源表名
@role_name = N'cdc_role',
@filegroup_name = NULL,
@supports_net_changes = 1,
@captured_column_list = N'[id],[name]'
GO关闭
1
2
3
4
5
6
7
8
9
10
11USE SCPRD
GO
EXEC sys.sp_cdc_disable_table
@source_schema = N'test2', -- 指定源表所属的 schema 名
@source_name = N'user', -- 指定需要读取的源表名
@role_name = N'cdc_role',
@filegroup_name = NULL,
@supports_net_changes = 1,
@captured_column_list = N'[id],[name]'
GO查询数据库的cdc开启状态、查询表cdc开启状态
1
2
3
4
5
6
7
8
9
10--查询数据库是否开启CDC
SELECT name,is_cdc_enabled FROM sys.databases
--查询表是否开启CDC
SELECT C.name,
B.name,
is_tracked_by_cdc
FROM sys.tables AS A
LEFT JOIN sys.objects AS B ON A.object_id = B.object_id
LEFT JOIN sys.schemas AS C ON C.schema_id = B.schema_id;检查源表是否启动变更数据捕获
1
2
3
4
5USE SCPRD
GO
EXEC sys.sp_cdc_help_change_data_capture
GO
相关链接
SqlServer Change Data Capture CDC使用手册
Debezium SASL_PLAINTEXT
基于debezium(mongo)-kafka-starrocks的cdc实践
先说结论
解决了mongodb能支持cdc的before消息体,但因为 StarRocks Connector for Kafka
惊天巨bug,本轮实践验证失败。
- 临时解决措施:sink自行转换__op
1 | "transforms": "RenameField,CastBool,CastInt", |
目标
网上的资料真是少,怎么找都没有找到能支持mongodb删除的debezium cdc的中文资料,看了半天英文资料,实践一下,这里主要验证一下 StarRocks Connector for Kafka
source
: MongoDbConnectorsink
: StarRocksSinkConnector
我们的目标是将mongodb表数据实时CDC进入starrocks数仓,支持删除操作。
前置设施
mongodb
首先我们得确保mongodb是6.0,因为6.0后支持了changeStreamPreAndPostImages变更流。
原理:在6.0以前,因为mongodb不支持changeStreamPreAndPostImages
变更流,无法在cdc语句显示出before,所以当进行删除操作的时候,按照debizium的格式规范,无法完成删除操作
如果需要在debezium cdc输出的语句上显示before信息需要
- 开启mongodb版本 6.0 中的新增功能
changeStreamPreAndPostImages
。1
2
3
4
5
6
7use user
db.runCommand({
collMod: "user",
changeStreamPreAndPostImages: {
enabled: true
}
}) - 在debezium mongodb connector的
capture.mode
上使用以下任意一个change_streams_with_pre_image
输出before,不输出afterchange_streams_update_full_with_pre_image
输出before和after模式 描述 change_streams 输出变化流,但是在进行update操作时,不输出after字段 change_streams_update_full 在change_streams的基础上,增加after字段,用于输出现在变化后的数据的内容 change_streams_with_pre_image 在change_streams的基础上,增加before字段的输出,但需要进行配置 change_streams_update_full_with_pre_image 在change_streams_update_full的基础上增加,增加before字段,用于输出变化前变化后的数据的内容 1
2
3
4
5
6
7
8
9
10{
"name": "source-user-mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "mongodb://xxx:xxx@172.16.0.11:27017,172.16.0.15:27017/?&authSource=admin",
"collection.include.list": "mydb.user",
"topic.prefix": "user-cdc",
"capture.mode": "change_streams_update_full_with_pre_image"
}
}
kafka
开启了自动创建topic。
如果没有自动开启创建,需要为debezium手工创建
config.storage.topic
offset.storage.topic
status.storage.topic
手工创建,参考Kafka Connect 接入 CKafka,有许多debezium的要求,所以建议自动创建。
部署 debezium/connect
使用容器分布式部署方式,使得支持通过rest api管理connector,
通过volumes,将starrocks-kafka-connector也集成支持进去
这里需要注意debezium/connect镜像已经将常见的mysql、mongodb、sqlserver集成了进去了
1 | version: '2' |
流入:mongo->kafka ✅
mongo user 表 流入 kafka
user表先插入一条数据吧,方便提交任务的时候,先走全量快照
1 | db.getCollection("user").insert( { |
可通过 http://ip:8083/connectors
,提交source mongodb connector任务
1 | curl --location 'http://ip:8083/connectors' \ |
我们来看看增删改查对应的消息结构
- 创建
创建的时候 before是null,after是创建后的
json字符串
,op是c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26{
"schema": {...},
"payload": {
"before": null,
"after": "{\"_id\": {\"$oid\": \"6676dccb988e8964430f7e46\"},\"name\": \"junyao\"}",
"updateDescription": null,
"source": {
"version": "2.4.2.Final",
"connector": "mongodb",
"name": "user",
"ts_ms": 1719065804000,
"snapshot": "false",
"db": "mydb",
"sequence": null,
"rs": "cmgo-6t0e5t7x_0",
"collection": "user",
"ord": 1,
"lsid": null,
"txnNumber": null,
"wallTime": 1719065804058
},
"op": "c",
"ts_ms": 1719065804054,
"transaction": null
}
} - 更新
before是更新前,after是更新后,op是u
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30{
"schema": {...},
"payload": {
"before": "{\"_id\": {\"$oid\": \"6676dbe5988e8964430f7e45\"},\"name\": \"tony\"}",
"after": "{\"_id\": {\"$oid\": \"6676dbe5988e8964430f7e45\"},\"name\": \"danny\"}",
"updateDescription": {
"removedFields": null,
"updatedFields": "{\"name\": \"danny\"}",
"truncatedArrays": null
},
"source": {
"version": "2.4.2.Final",
"connector": "mongodb",
"name": "user",
"ts_ms": 1719065650000,
"snapshot": "false",
"db": "mydb",
"sequence": null,
"rs": "cmgo-6t0e5t7x_0",
"collection": "user",
"ord": 1,
"lsid": null,
"txnNumber": null,
"wallTime": 1719065650298
},
"op": "u",
"ts_ms": 1719065650298,
"transaction": null
}
} - 删除
before是删除前,after是删除后
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26{
"schema": {...},
"payload": {
"before": "{\"_id\": {\"$oid\": \"6676dbe5988e8964430f7e45\"},\"name\": \"danny\"}",
"after": null,
"updateDescription": null,
"source": {
"version": "2.4.2.Final",
"connector": "mongodb",
"name": "user",
"ts_ms": 1719065732000,
"snapshot": "false",
"db": "mydb",
"sequence": null,
"rs": "cmgo-6t0e5t7x_0",
"collection": "user",
"ord": 1,
"lsid": null,
"txnNumber": null,
"wallTime": 1719065732402
},
"op": "d",
"ts_ms": 1719065732399,
"transaction": null
}
}
流出:kafka -> StarRocksSinkConnector ⚠️
kafka流出starrocks,目前未走通
- 创建starrocks表
1
2
3
4
5
6
7
8CREATE TABLE IF NOT EXISTS `scm`.`ods_test_user` (
`_id` STRING ,
`name` STRING NULL
) ENGINE=OLAP
PRIMARY KEY (_id) DISTRIBUTED BY HASH (_id) PROPERTIES
(
"replication_num" = "1"
) - 提交任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24curl --location 'http://ip:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-test23-user-cdc",
"config": {
"connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
"topics": "user-cdc23.test.user",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",
"starrocks.http.url": "xxx.xxx.xxx.xxx:38030",
"starrocks.topic2table.map": "user-cdc23.test.user:ods_test_user",
"starrocks.username": "root",
"starrocks.password": "xxx",
"starrocks.database.name": "scm",
"sink.properties.strip_outer_array": "true",
"transforms": "addfield,unwrap",
"transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'
这里需要注意,按照starrocks的文档导入 Debezium CDC 格式数据,对于mongodb的transforms.unwrap.type存在差异,因为我是mongodb6.0,所以我看了一下最新的 MongoDB New Document State Extraction,相比较 New Record State Extraction,有以下的差异
- New Record State Extraction
1
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
- MongoDB New Document State Extraction
1
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
需要调整成 io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
调整好提交任务,看报错日志。
1 | root@starrockscluster-cn-1:/opt/starrocks# curl http://starrockscluster-cn-1.starrockscluster-cn-search.starrocks.svc.cluster.local:8040/api/_load_error_log?file=error_log_864bbf459bb530a6_44aa9dc5433d4c0 |
失败,差评!
流出:kafka -> StarRocksSinkConnector 失败后换个思路 ⚠️
换个思路,我可不可以在 mongo source流入kafka的时候就处理好格式? 于是我改了下,添加以下内容到source
1 | "transforms": "addfield,unwrap", |
- mongo source得到了这样的数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23curl --location 'ip:8083/connectors' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "source-test26-user-cdc",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "mongodb://mongouser:XXX@172.16.0.11:27017,172.16.0.15:27017/?&authSource=admin",
"collection.include.list": "test.user",
"topic.prefix": "user-cdc26",
"capture.mode": "change_streams_update_full_with_pre_image",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",
"transforms": "addfield,unwrap",
"transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'1
2
3
4
5{
"_id": "66793958d767977a330e9245",
"name": "jy",
"__deleted": false
} - starrocks sink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19curl --location 'http://ip:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-test27-user-cdc",
"config": {
"connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
"topics": "user-cdc27.test.user",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false",
"starrocks.http.url": "159.75.190.XXX:38030",
"starrocks.topic2table.map": "user-cdc27.test.user:ods_test_user",
"starrocks.username": "root",
"starrocks.password": "XXX",
"starrocks.database.name": "scm",
"sink.properties.strip_outer_array": "true"
}
}' - 验证
我们来验证一下增删改查一下
- 创建⚠️成功同步数据到starrocks,并没有添加__op:0, 没有添加的时候系统默认是__op:0
1
2
3
4
5{
"_id": "66793e78d767977a330e9246",
"name": "ivan",
"__deleted": false
} - 更新⚠️成功同步数据到starrocks,并没有添加__op:0, 没有添加的时候系统默认是__op:0
1
2
3
4
5{
"_id": "66793958d767977a330e9245",
"name": "danny",
"__deleted": false
} - 删除⚠️失败,starrocks并没有删除,并没有添加__op:1, 没有添加的时候系统默认是__op:0,所以无法删除,应该是做了更新操作
1
2
3
4
5{
"_id": "66793e78d767977a330e9246",
"name": "ivan",
"__deleted": true
}
- 创建
我们可以看到实际transforms出来的并没有包含__op,
而按照StarRocks 的主键表的UPSERT 和 DELETE 操作
我们发现,消息体内没有__op,都会按照UPSERT,这就能理解为什么没有成功删除了。
差评!
其他
建立自用的 Docker Hub 代理
1 | server { |
/etc/docker/daemon.json
1 | { |
1 | systemctl daemon-reload |
Mac M2之LLaMA3-8B微调(llama3-fine-tuning)
演示
咱们这次使用 书生·浦语大模型挑战赛(春季赛)Top12,创意应用奖
的数据集,使用LLaMA3-8B大模型微调
环境
- 点击下载 LLaMA3-8B 微调代码压缩包,并解压
- 在终端进入解压后的文件夹,创建一个新的
Conda 虚拟环境
1
2
3cd llama3-ft
conda create -n llama3-ft python=3.10
conda activate llama3-ft - 安装依赖包
1
pip install -r requirements.txt
数据集
你可以直接使用 dataset/huanhuan.json
数据集(该数据集来源于 https://github.com/KMnO4-zx ),也可以自己准备数据集 ,比如你的客服对话(FAQ)
数据集,这样就可以微调一个更适合你的智能客服的模型,客服回答更准确。
数据集的格式也比较简单,示例如下:instruction
是问题,output
是回答
1 | [ |
微调
- 模型选择
我使用的是 LLM-Research/Meta-Llama-3-8B-Instruct ,你也可以选择一个其他模型,只需要修改train.py
文件里面的model_id
变量即可。
由于国内访问HuggingFace
比较困难,因此使用ModelScope
提供的模型。1
2
3
4
5
6
7
8# 需要微调的基座模型
# https://www.modelscope.cn/studios/LLM-Research/Chat_Llama-3-8B/summary
model_id = 'LLM-Research/Meta-Llama-3-8B-Instruct'
# 比如你也可以使用 Qwen1.5-4B-Chat 模型
# https://www.modelscope.cn/models/qwen/Qwen1.5-4B-Chat/summary
# model_id = 'qwen/Qwen1.5-4B-Chat' - 开始微调
只需要在项目根目录下执行以下命令即可。1
python train.py
测试
微调完成后,你可以执行以下命令启动一个 ChatBot 进行对话测试。
1 | streamlit run chat.py |
该命令执行后,会自动打开浏览器对话页面
其他说明
微调的时间会根据你的数据集大小和模型大小而定。
我由于没有 GPU,因此耗时2个小时,如果你有 GPU,大概需要 30 分钟。代码会自动下载模型,然后开始微调
微调完成后,所有的文件会保存在 models 文件夹下面,结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22├── models
├── checkpoint #【模型微调的 checkpoint】
│ ├── LLM-Research
│ │ └── Meta-Llama-3-8B-Instruct
│ │ ├── checkpoint-100
│ │ ├── checkpoint-200
│ │ ├── checkpoint-xxx
│ └── qwen
│ └── Qwen1.5-4B-Chat
│ ├── checkpoint-100
│ ├── checkpoint-200
│ ├── checkpoint-xxx
├── lora #【模型微调的 lora 文件】
│ ├── LLM-Research
│ │ └── Meta-Llama-3-8B-Instruct
│ └── qwen
│ └── Qwen1.5-4B-Chat
└── model #【自动下载的基座模型】
├── LLM-Research
│ └── Meta-Llama-3-8B-Instruct
└── qwen
└── Qwen1___5-4B-ChatCannot copy out of meta tensor; no data
报错1
`NotImplementedError:Cannot copy out of meta tensor; no data! Please use torch.nn.Module.to_empty() instead of torch.nn.Module.to() when moving module from meta to a different device.`
解决:强制设置 device = “mps”
1
2
3# 检查CUDA是否可用,然后检查MPS是否可用,最后回退到CPU
# device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
device = "mps"
Mac M2之ChatGLM3-6B大模型私有化部署
环境与背景
在特定的情况下,要保证信息安全的同时还能享受到AIGC大模型带来的乐趣和功能,那么,私有化部署就能帮助到你,最起码,它是一个真正可用的方案。私有化部署指的是将AI应用部署在企业内部的服务器上,而非云端。这种部署方式可以在保证数据安全的同时,提高企业对于自身数据资产的控制权。
简单描述下本地电脑的配置:
- 处理器:Apple M2 Pro
- 内存:32 GB
- 系统:14.3.1 (23D60)
本次只是初步评估ChatGLM3-6B的效果,尽可能在已有本地设备的情况下进行低成本本地模型部署,如果要更好的效果,还是上专业的硬件设备。
MAC部署
ChatGLM3 下载
1 | git clone https://github.com/THUDM/ChatGLM3 |
但是,默认里面是没有模型的,只有自带的简单的聊天项目以及相关的接口示例项目,还得继续下载模型。
ChatGLM3-6B 模型下载
当然,如果你自己不下载这些模型,这些模型就会在运行的时候自动下载(网络不好的话会影响使用体验,所以,建议提前下载)
1 | git lfs install |
项目配置和部署
把下载的服务直接放到需要运行的地方,然后执行python环境管理
1 | conda create --name chatglm3 python=3.10 |
然后,进入到主项目中,开始配置一些环境
1 | cd ChatGLM3 |
可以看到,实际上我们可以运行7种案例。
- 基础例子(cli_demo , web_demo_streamlit )
- 综合例子(聊天,工具,代码解释)
- 模型微调
- 类似于langchain的案例
- openai接口的案例
- TensorRT-LLM推理部署
- 工具调用
目前,只有第二个的综合例子,是比较有趣的,就以它为案例进行配置修改。
composite_demo例子
看到,这个demo下还有requirements.txt
文件,我们把他给安装了
1 | pip install -r requirements.txt -i https://mirror.sjtu.edu.cn/pypi/web/simple |
演示中使用 Code Interpreter
还需要安装 Jupyter
内核:
1 | pip install ipykernel -i https://mirror.sjtu.edu.cn/pypi/web/simple |
接着修改client.py里面的配置信息
1 | // 修改 MODEL_PATH , chatglm3-6b 绝对路径 |
对于搭载了Apple Silicon
或者AMD GPU
的 Mac,可以使用MPS后端来在GPU上运行ChatGLM3-6B
。需要参考Apple的官方说明 安装 PyTorch-Nightly
(正确的版本号应该是2.x.x.dev2023xxxx,而不是 2.x.x)。
1 | pip uninstall torch torchvision torchaudio |
使用命令
1 | pip list | grep torch |
看到类似这样的带dev的就可以下一步了
1 | torch 2.3.0.dev20231224 |
将client.py
中device_map = "auto"
改为device_map = "mps"
136-140行
1 | self.model = AutoModel.from_pretrained( |
150行
1 | self.model = AutoModel.from_pretrained(MODEL_PATH, trust_remote_code=True, device_map="mps").eval() |
然后,执行以下命令启动服务
1 | streamlit run main.py |
效果展示
这回答速度真绝,非常的快。
对话模式
输入你是谁,它就输自动的输出信息,速度还挺快。而控制台也会显示你输入的信息以及返回的信息。
工具模式
工具模式,需要自己先定义工具,我这边没有定义,有兴趣的可以整一下。
以下是自带的工具进行的演示:我调用了一个查询天气的工具(tool_registry.py) 文件可以看到 get_weather的代码
代码解释器模式
总结
一开始的时候,没有按照Apple的官方说明安装PyTorch-Nightly
,并配置MPS
,结果效果喜人,一直在推理。后来配置后,感觉速度不亚于chatgpt3.5,答复效果也非常好。下一步开始使用chatGLM搭建私有知识库。