目标需求

机房A:sqlserver分库分表
机房B(腾讯云):kafka,starrocks数仓

将sqlserver表数据流入kafka,流出到starrocks

QhUMOs

  • db: SCPRD
  • schema: test2,test3
  • table: test2.user,test3.user
  • column: id,name,age
    其中column字段只想进 idname,过滤age
    bHwB3g

前置配置

  • 开启db:SCPRD的数据库cdc
    1
    2
    3
    4
    5
    use SCPRD
    GO

    EXEC sys.sp_cdc_enable_db
    GO
  • 开启table:test2.user,test3.user的表cdc
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    USE SCPRD
    GO

    EXEC sys.sp_cdc_enable_table
    @source_schema = N'test2', -- 指定源表所属的 schema 名
    @source_name = N'user', -- 指定需要读取的源表名
    @role_name = N'cdc_role',
    @filegroup_name = NULL,
    @supports_net_changes = 1,
    @captured_column_list = N'[id],[name]'
    GO
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    USE SCPRD
    GO

    EXEC sys.sp_cdc_enable_table
    @source_schema = N'test3', -- 指定源表所属的 schema 名
    @source_name = N'user', -- 指定需要读取的源表名
    @role_name = N'cdc_role',
    @filegroup_name = NULL,
    @supports_net_changes = 1,
    @captured_column_list = N'[id],[name]'
    GO
    为表启用CDC后,SQLServer生成两个Agent作业 cdc.dbname_capturecdc.dbname_cleanup

实践

部署的方案有很多,如果能打通机房A和机房B的网络,那是最优的,无法打通的情况我们选择如下2个方案。

  • 优先方案:选择在机房A部署debezium/connect,远程连接机房B(腾讯云)的kafka,
  • 备选方案:选择在机房B(腾讯云)部署debezium/connect,远程连接机房A的sqlserver
  • 机房A部署debezium/connect

    因为跨机房,所有使用SASL_PLAINTEXT接入腾讯云kafka稳妥稳定一点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    version: '2'
    services:
    connect:
    image: ccr.ccs.tencentyun.com/da-debezium/connect:2.6.2.Final
    ports:
    - 8083:8083
    environment:
    BOOTSTRAP_SERVERS: 159.75.194.XXX:50001
    CONNECT_SASL_MECHANISM: PLAIN
    CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXT
    CONNECT_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
    GROUP_ID: connect-cluster
    CONFIG_STORAGE_TOPIC: config.storage.topic
    OFFSET_STORAGE_TOPIC: offset.storage.topic
    STATUS_STORAGE_TOPIC: status.storage.topic
    CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
    CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_PLAINTEXT
    CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
    CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
    CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_PLAINTEXT
    CONNECT_CONSUMER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="USER_NAME" password="PWD";
    networks:
    kafka_network:
    name: debezium
  • 创建sqlserver connector
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    {
    "name": "source-sqlserver-user-cdc_v1",
    "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.hostname": "192.168.103.XX",
    "database.port": "1433",
    "database.user": "sa",
    "database.password": "XXX",
    "database.names": "SCPRD",
    "topic.prefix": "sqlserver-user-cdc_v1",
    "schema.include.list": "test2,test3",
    "table.include.list": "test2.user,test3.user",
    "column.include.list": "test(.*).user.id,test(.*).user.name",
    "schema.history.internal.kafka.bootstrap.servers": "159.75.194.XXX:50001",
    "schema.history.internal.kafka.topic": "schemahistory.sqlserver-user-cdc_v1",
    "schema.history.internal.producer.sasl.mechanism": "PLAIN",
    "schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
    "schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER_NAME\" password=\"PWD\";",
    "schema.history.internal.consumer.sasl.mechanism": "PLAIN",
    "schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
    "schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER_NAME\" password=\"PWD\";",
    "schema.history.internal.store.only.captured.tables.ddl": true,
    "schema.history.internal.store.only.captured.databases.ddl": true,
    "database.encrypt": "false",
    "transforms": "Reroute",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "(.*)test(.*)user",
    "transforms.Reroute.topic.replacement": "$1user"
    }
    }

运维常见命令

  • 为数据库 SCPRD 启用、关闭 CDC
    开启

    注意:数据库开启cdc功能,注意不能为master数据库启动该功能

    1
    2
    3
    4
    5
    use SCPRD
    GO

    EXEC sys.sp_cdc_enable_db
    GO

    关闭

    1
    2
    3
    4
    5
    USE SCPRD
    GO

    exec sys.sp_cdc_disable_db
    GO
  • 为 SQLServer 源表启用、关闭变更数据捕获
    开启

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    USE SCPRD
    GO

    EXEC sys.sp_cdc_enable_table
    @source_schema = N'test2', -- 指定源表所属的 schema 名
    @source_name = N'user', -- 指定需要读取的源表名
    @role_name = N'cdc_role',
    @filegroup_name = NULL,
    @supports_net_changes = 1,
    @captured_column_list = N'[id],[name]'
    GO

    关闭

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    USE SCPRD
    GO

    EXEC sys.sp_cdc_disable_table
    @source_schema = N'test2', -- 指定源表所属的 schema 名
    @source_name = N'user', -- 指定需要读取的源表名
    @role_name = N'cdc_role',
    @filegroup_name = NULL,
    @supports_net_changes = 1,
    @captured_column_list = N'[id],[name]'
    GO
  • 查询数据库的cdc开启状态、查询表cdc开启状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    --查询数据库是否开启CDC
    SELECT name,is_cdc_enabled FROM sys.databases

    --查询表是否开启CDC
    SELECT C.name,
    B.name,
    is_tracked_by_cdc
    FROM sys.tables AS A
    LEFT JOIN sys.objects AS B ON A.object_id = B.object_id
    LEFT JOIN sys.schemas AS C ON C.schema_id = B.schema_id;
  • 检查源表是否启动变更数据捕获

    1
    2
    3
    4
    5
    USE SCPRD
    GO

    EXEC sys.sp_cdc_help_change_data_capture
    GO

相关链接

SqlServer Change Data Capture CDC使用手册
Debezium SASL_PLAINTEXT

先说结论

解决了mongodb能支持cdc的before消息体,但因为 StarRocks Connector for Kafka 惊天巨bug,本轮实践验证失败。

目标

网上的资料真是少,怎么找都没有找到能支持mongodb删除的debezium cdc的中文资料,看了半天英文资料,实践一下,这里主要验证一下 StarRocks Connector for Kafka

  • source: MongoDbConnector
  • sink: StarRocksSinkConnector

我们的目标是将mongodb表数据实时CDC进入starrocks数仓,支持删除操作。

前置设施

mongodb

首先我们得确保mongodb是6.0,因为6.0后支持了changeStreamPreAndPostImages变更流
原理:在6.0以前,因为mongodb不支持changeStreamPreAndPostImages变更流,无法在cdc语句显示出before,所以当进行删除操作的时候,按照debizium的格式规范,无法完成删除操作

如果需要在debezium cdc输出的语句上显示before信息需要

  • 开启mongodb版本 6.0 中的新增功能 changeStreamPreAndPostImages
    1
    2
    3
    4
    5
    6
    7
    use user
    db.runCommand({
    collMod: "user",
    changeStreamPreAndPostImages: {
    enabled: true
    }
    })
  • 在debezium mongodb connector的capture.mode上使用以下任意一个
    • change_streams_with_pre_image 输出before,不输出after
    • change_streams_update_full_with_pre_image 输出before和after
      模式 描述
      change_streams 输出变化流,但是在进行update操作时,不输出after字段
      change_streams_update_full 在change_streams的基础上,增加after字段,用于输出现在变化后的数据的内容
      change_streams_with_pre_image 在change_streams的基础上,增加before字段的输出,但需要进行配置
      change_streams_update_full_with_pre_image 在change_streams_update_full的基础上增加,增加before字段,用于输出变化前变化后的数据的内容
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      {
      "name": "source-user-mongodb-connector",
      "config": {
      "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
      "mongodb.connection.string": "mongodb://xxx:xxx@172.16.0.11:27017,172.16.0.15:27017/?&authSource=admin",
      "collection.include.list": "mydb.user",
      "topic.prefix": "user-cdc",
      "capture.mode": "change_streams_update_full_with_pre_image"
      }
      }

kafka

开启了自动创建topic。
如果没有自动开启创建,需要为debezium手工创建

  • config.storage.topic
  • offset.storage.topic
  • status.storage.topic

手工创建,参考Kafka Connect 接入 CKafka,有许多debezium的要求,所以建议自动创建。

部署 debezium/connect

使用容器分布式部署方式,使得支持通过rest api管理connector,
通过volumes,将starrocks-kafka-connector也集成支持进去
这里需要注意debezium/connect镜像已经将常见的mysql、mongodb、sqlserver集成了进去了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
version: '2'
services:
connect:
image: debezium/connect:2.4.2.Final
ports:
- 8083:8083
environment:
- BOOTSTRAP_SERVERS=172.16.0.XX:9092
- GROUP_ID=connect-cluster
- CONFIG_STORAGE_TOPIC=config.storage.topic
- OFFSET_STORAGE_TOPIC=offset.storage.topic
- STATUS_STORAGE_TOPIC=status.storage.topic
volumes:
- /home/kafka-connect/starrocks-kafka-connector-1.0.3:/kafka/connect/starrocks-kafka-connector-1.0.3
debezium-ui:
image: debezium/debezium-ui:2.4
ports:
- 8085:8080
environment:
- KAFKA_CONNECT_URIS=http://connect:8083
networks:
kafka_network:
name: debezium

流入:mongo->kafka ✅

mongo user 表 流入 kafka

user表先插入一条数据吧,方便提交任务的时候,先走全量快照

1
2
3
4
db.getCollection("user").insert( {
_id: ObjectId("667820b8d767977a330e9241"),
name: "danny"
} );

可通过 http://ip:8083/connectors ,提交source mongodb connector任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
curl --location 'http://ip:8083/connectors' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "source-test23-user-cdc",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "mongodb://mongouser:xxx@172.16.0.11:27017,172.16.0.15:27017/?&authSource=admin",
"collection.include.list": "test.user",
"topic.prefix": "user-cdc23",
"capture.mode": "change_streams_update_full_with_pre_image",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "false"
}
}'

我们来看看增删改查对应的消息结构

  • 创建

    创建的时候 before是null,after是创建后的json字符串,op是c

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    {
    "schema": {...},
    "payload": {
    "before": null,
    "after": "{\"_id\": {\"$oid\": \"6676dccb988e8964430f7e46\"},\"name\": \"junyao\"}",
    "updateDescription": null,
    "source": {
    "version": "2.4.2.Final",
    "connector": "mongodb",
    "name": "user",
    "ts_ms": 1719065804000,
    "snapshot": "false",
    "db": "mydb",
    "sequence": null,
    "rs": "cmgo-6t0e5t7x_0",
    "collection": "user",
    "ord": 1,
    "lsid": null,
    "txnNumber": null,
    "wallTime": 1719065804058
    },
    "op": "c",
    "ts_ms": 1719065804054,
    "transaction": null
    }
    }
  • 更新

    before是更新前,after是更新后,op是u

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    {
    "schema": {...},
    "payload": {
    "before": "{\"_id\": {\"$oid\": \"6676dbe5988e8964430f7e45\"},\"name\": \"tony\"}",
    "after": "{\"_id\": {\"$oid\": \"6676dbe5988e8964430f7e45\"},\"name\": \"danny\"}",
    "updateDescription": {
    "removedFields": null,
    "updatedFields": "{\"name\": \"danny\"}",
    "truncatedArrays": null
    },
    "source": {
    "version": "2.4.2.Final",
    "connector": "mongodb",
    "name": "user",
    "ts_ms": 1719065650000,
    "snapshot": "false",
    "db": "mydb",
    "sequence": null,
    "rs": "cmgo-6t0e5t7x_0",
    "collection": "user",
    "ord": 1,
    "lsid": null,
    "txnNumber": null,
    "wallTime": 1719065650298
    },
    "op": "u",
    "ts_ms": 1719065650298,
    "transaction": null
    }
    }
  • 删除

    before是删除前,after是删除后

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    {
    "schema": {...},
    "payload": {
    "before": "{\"_id\": {\"$oid\": \"6676dbe5988e8964430f7e45\"},\"name\": \"danny\"}",
    "after": null,
    "updateDescription": null,
    "source": {
    "version": "2.4.2.Final",
    "connector": "mongodb",
    "name": "user",
    "ts_ms": 1719065732000,
    "snapshot": "false",
    "db": "mydb",
    "sequence": null,
    "rs": "cmgo-6t0e5t7x_0",
    "collection": "user",
    "ord": 1,
    "lsid": null,
    "txnNumber": null,
    "wallTime": 1719065732402
    },
    "op": "d",
    "ts_ms": 1719065732399,
    "transaction": null
    }
    }

流出:kafka -> StarRocksSinkConnector ⚠️

kafka流出starrocks,目前未走通

  • 创建starrocks表
    1
    2
    3
    4
    5
    6
    7
    8
    CREATE TABLE IF NOT EXISTS `scm`.`ods_test_user` (
    `_id` STRING ,
    `name` STRING NULL
    ) ENGINE=OLAP
    PRIMARY KEY (_id) DISTRIBUTED BY HASH (_id) PROPERTIES
    (
    "replication_num" = "1"
    )
  • 提交任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    curl --location 'http://ip:8083/connectors' \
    --header 'Content-Type: application/json' \
    --data '{
    "name": "sink-test23-user-cdc",
    "config": {
    "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
    "topics": "user-cdc23.test.user",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "false",
    "starrocks.http.url": "xxx.xxx.xxx.xxx:38030",
    "starrocks.topic2table.map": "user-cdc23.test.user:ods_test_user",
    "starrocks.username": "root",
    "starrocks.password": "xxx",
    "starrocks.database.name": "scm",
    "sink.properties.strip_outer_array": "true",
    "transforms": "addfield,unwrap",
    "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite"
    }
    }'

这里需要注意,按照starrocks的文档导入 Debezium CDC 格式数据,对于mongodb的transforms.unwrap.type存在差异,因为我是mongodb6.0,所以我看了一下最新的 MongoDB New Document State Extraction,相比较 New Record State Extraction,有以下的差异

需要调整成 io.debezium.connector.mongodb.transforms.ExtractNewDocumentState

调整好提交任务,看报错日志。

1
2
3
root@starrockscluster-cn-1:/opt/starrocks# curl http://starrockscluster-cn-1.starrockscluster-cn-search.starrocks.svc.cluster.local:8040/api/_load_error_log?file=error_log_864bbf459bb530a6_44aa9dc5433d4c0
Error: NULL value in non-nullable column '_id'. Row: [NULL, NULL, 0]
root@starrockscluster-cn-1:/opt/starrocks#

失败,差评!

流出:kafka -> StarRocksSinkConnector 失败后换个思路 ⚠️

换个思路,我可不可以在 mongo source流入kafka的时候就处理好格式? 于是我改了下,添加以下内容到source

1
2
3
4
5
"transforms": "addfield,unwrap",
"transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite"
  • mongo source
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    curl --location 'ip:8083/connectors' \
    --header 'Accept: application/json' \
    --header 'Content-Type: application/json' \
    --data-raw '{
    "name": "source-test26-user-cdc",
    "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.connection.string": "mongodb://mongouser:XXX@172.16.0.11:27017,172.16.0.15:27017/?&authSource=admin",
    "collection.include.list": "test.user",
    "topic.prefix": "user-cdc26",
    "capture.mode": "change_streams_update_full_with_pre_image",
    "snapshot.mode": "initial",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "false",
    "transforms": "addfield,unwrap",
    "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite"
    }
    }'
    得到了这样的数据结构
    1
    2
    3
    4
    5
    {
    "_id": "66793958d767977a330e9245",
    "name": "jy",
    "__deleted": false
    }
  • starrocks sink
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    curl --location 'http://ip:8083/connectors' \
    --header 'Content-Type: application/json' \
    --data '{
    "name": "sink-test27-user-cdc",
    "config": {
    "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
    "topics": "user-cdc27.test.user",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "false",
    "starrocks.http.url": "159.75.190.XXX:38030",
    "starrocks.topic2table.map": "user-cdc27.test.user:ods_test_user",
    "starrocks.username": "root",
    "starrocks.password": "XXX",
    "starrocks.database.name": "scm",
    "sink.properties.strip_outer_array": "true"
    }
    }'
  • 验证

    我们来验证一下增删改查一下

    • 创建
      1
      2
      3
      4
      5
      {
      "_id": "66793e78d767977a330e9246",
      "name": "ivan",
      "__deleted": false
      }
      ⚠️成功同步数据到starrocks,并没有添加__op:0, 没有添加的时候系统默认是__op:0
    • 更新
      1
      2
      3
      4
      5
      {
      "_id": "66793958d767977a330e9245",
      "name": "danny",
      "__deleted": false
      }
      ⚠️成功同步数据到starrocks,并没有添加__op:0, 没有添加的时候系统默认是__op:0
    • 删除
      1
      2
      3
      4
      5
      {
      "_id": "66793e78d767977a330e9246",
      "name": "ivan",
      "__deleted": true
      }
      ⚠️失败,starrocks并没有删除,并没有添加__op:1, 没有添加的时候系统默认是__op:0,所以无法删除,应该是做了更新操作

我们可以看到实际transforms出来的并没有包含__op,
而按照StarRocks 的主键表的UPSERT 和 DELETE 操作
xqhGZM

我们发现,消息体内没有__op,都会按照UPSERT,这就能理解为什么没有成功删除了。

差评!

其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
server {
listen 443 ssl;
server_name XXX.XXX.cn;

ssl_certificate /etc/nginx/ssl/xx.crt;
ssl_certificate_key /etc/nginx/ssl/xx.key;

location / {
proxy_pass https://registry-1.docker.io;
proxy_set_header Host registry-1.docker.io;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
proxy_set_header Authorization $http_authorization;
proxy_pass_header Authorization;

proxy_intercept_errors on;
recursive_error_pages on;
error_page 301 302 307 = @handle_redirect;
}

location @handle_redirect {
resolver 1.1.1.1;
set $saved_redirect_location '$upstream_http_location';
proxy_pass $saved_redirect_location;
}

}

/etc/docker/daemon.json

1
2
3
4
5
{
"registry-mirrors": [
"https://my-docker-hub.mydomain.com"
]
}
1
2
systemctl daemon-reload
systemctl restart docker

演示

咱们这次使用 书生·浦语大模型挑战赛(春季赛)Top12,创意应用奖的数据集,使用LLaMA3-8B大模型微调

环境

  • 点击下载 LLaMA3-8B 微调代码压缩包,并解压
  • 在终端进入解压后的文件夹,创建一个新的 Conda 虚拟环境
    1
    2
    3
    cd llama3-ft
    conda create -n llama3-ft python=3.10
    conda activate llama3-ft
  • 安装依赖包
    1
    pip install -r requirements.txt

数据集

你可以直接使用 dataset/huanhuan.json 数据集(该数据集来源于 https://github.com/KMnO4-zx ),也可以自己准备数据集 ,比如你的客服对话(FAQ)数据集,这样就可以微调一个更适合你的智能客服的模型,客服回答更准确。
数据集的格式也比较简单,示例如下:
instruction 是问题,output 是回答

1
2
3
4
5
6
7
8
9
10
11
12
[
{
"instruction": "你好",
"input": "",
"output": "皇上好,我是甄嬛,家父是大理寺少卿甄远道。"
},
{
"instruction": "你不愿意见我?",
"input": "",
"output": "不该相见自然不愿见,还望王爷尊重我的意愿。"
}
]

微调

  • 模型选择
    我使用的是 LLM-Research/Meta-Llama-3-8B-Instruct ,你也可以选择一个其他模型,只需要修改 train.py 文件里面的 model_id 变量即可。
    由于国内访问 HuggingFace 比较困难,因此使用 ModelScope 提供的模型。
    1
    2
    3
    4
    5
    6
    7
    8
    # 需要微调的基座模型

    # https://www.modelscope.cn/studios/LLM-Research/Chat_Llama-3-8B/summary
    model_id = 'LLM-Research/Meta-Llama-3-8B-Instruct'

    # 比如你也可以使用 Qwen1.5-4B-Chat 模型
    # https://www.modelscope.cn/models/qwen/Qwen1.5-4B-Chat/summary
    # model_id = 'qwen/Qwen1.5-4B-Chat'
  • 开始微调
    只需要在项目根目录下执行以下命令即可。
    1
    python train.py

测试

微调完成后,你可以执行以下命令启动一个 ChatBot 进行对话测试。

1
streamlit run chat.py

该命令执行后,会自动打开浏览器对话页面

其他说明

  • 微调的时间会根据你的数据集大小和模型大小而定。
    我由于没有 GPU,因此耗时2个小时,如果你有 GPU,大概需要 30 分钟。

  • 代码会自动下载模型,然后开始微调

  • 微调完成后,所有的文件会保存在 models 文件夹下面,结构如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    ├── models
    ├── checkpoint #【模型微调的 checkpoint】
    │ ├── LLM-Research
    │ │ └── Meta-Llama-3-8B-Instruct
    │ │ ├── checkpoint-100
    │ │ ├── checkpoint-200
    │ │ ├── checkpoint-xxx
    │ └── qwen
    │ └── Qwen1.5-4B-Chat
    │ ├── checkpoint-100
    │ ├── checkpoint-200
    │ ├── checkpoint-xxx
    ├── lora #【模型微调的 lora 文件】
    │ ├── LLM-Research
    │ │ └── Meta-Llama-3-8B-Instruct
    │ └── qwen
    │ └── Qwen1.5-4B-Chat
    └── model #【自动下载的基座模型】
    ├── LLM-Research
    │ └── Meta-Llama-3-8B-Instruct
    └── qwen
    └── Qwen1___5-4B-Chat
  • Cannot copy out of meta tensor; no data
    报错

    1
    `NotImplementedError:Cannot copy out of meta tensor; no data! Please use torch.nn.Module.to_empty() instead of torch.nn.Module.to() when moving module from meta to a different device.`

    解决:强制设置 device = “mps”

    1
    2
    3
    # 检查CUDA是否可用,然后检查MPS是否可用,最后回退到CPU
    # device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
    device = "mps"

环境与背景

在特定的情况下,要保证信息安全的同时还能享受到AIGC大模型带来的乐趣和功能,那么,私有化部署就能帮助到你,最起码,它是一个真正可用的方案。私有化部署指的是将AI应用部署在企业内部的服务器上,而非云端。这种部署方式可以在保证数据安全的同时,提高企业对于自身数据资产的控制权。

简单描述下本地电脑的配置:

  • 处理器:Apple M2 Pro
  • 内存:32 GB
  • 系统:14.3.1 (23D60)

本次只是初步评估ChatGLM3-6B的效果,尽可能在已有本地设备的情况下进行低成本本地模型部署,如果要更好的效果,还是上专业的硬件设备。

MAC部署

ChatGLM3 下载

1
git clone https://github.com/THUDM/ChatGLM3

但是,默认里面是没有模型的,只有自带的简单的聊天项目以及相关的接口示例项目,还得继续下载模型。

ChatGLM3-6B 模型下载

当然,如果你自己不下载这些模型,这些模型就会在运行的时候自动下载(网络不好的话会影响使用体验,所以,建议提前下载)
ZHGKaj

1
2
3
4
git lfs install
git clone https://huggingface.co/THUDM/chatglm3-6b
或者
git clone https://www.modelscope.cn/ZhipuAI/chatglm3-6b.git

项目配置和部署

把下载的服务直接放到需要运行的地方,然后执行python环境管理

1
2
conda create --name chatglm3 python=3.10
conda activate chatglm3

然后,进入到主项目中,开始配置一些环境

1
2
3
4
cd ChatGLM3
pip install -r requirements.txt -i https://mirror.sjtu.edu.cn/pypi/web/simple
pip list //查看安装了什么包
pip show openai // 查看包安装到了哪里

vq1ItJ
可以看到,实际上我们可以运行7种案例。

  1. 基础例子(cli_demo , web_demo_streamlit )
  2. 综合例子(聊天,工具,代码解释)
  3. 模型微调
  4. 类似于langchain的案例
  5. openai接口的案例
  6. TensorRT-LLM推理部署
  7. 工具调用

目前,只有第二个的综合例子,是比较有趣的,就以它为案例进行配置修改。

composite_demo例子

看到,这个demo下还有requirements.txt文件,我们把他给安装了

1
pip install -r requirements.txt -i https://mirror.sjtu.edu.cn/pypi/web/simple

演示中使用 Code Interpreter 还需要安装 Jupyter 内核:

1
2
pip install ipykernel -i https://mirror.sjtu.edu.cn/pypi/web/simple
ipython kernel install --name chatglm3 --user

接着修改client.py里面的配置信息
PG7zl2

1
2
// 修改 MODEL_PATH , chatglm3-6b 绝对路径
MODEL_PATH = os.environ.get('MODEL_PATH', '/Users/junyao/Desktop/chatglm/chatglm3-6b')

对于搭载了Apple Silicon或者AMD GPU的 Mac,可以使用MPS后端来在GPU上运行ChatGLM3-6B。需要参考Apple的官方说明 安装 PyTorch-Nightly(正确的版本号应该是2.x.x.dev2023xxxx,而不是 2.x.x)。

1
2
pip uninstall torch torchvision torchaudio 
pip install --pre torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/nightly/cpu

使用命令

1
pip list | grep torch 

看到类似这样的带dev的就可以下一步了

1
2
3
torch 2.3.0.dev20231224
torchaudio 2.2.0.dev20231224
torchvision 0.18.0.dev20231224

client.pydevice_map = "auto"改为device_map = "mps"
136-140行

1
2
3
4
5
self.model = AutoModel.from_pretrained(
model_path,
trust_remote_code=True,
config=config,
device_map="mps").eval()

150行

1
self.model = AutoModel.from_pretrained(MODEL_PATH, trust_remote_code=True, device_map="mps").eval()

QJx6CW
然后,执行以下命令启动服务

1
streamlit run main.py

效果展示

这回答速度真绝,非常的快。

对话模式

FXB1yl

输入你是谁,它就输自动的输出信息,速度还挺快。而控制台也会显示你输入的信息以及返回的信息。

gVeMKf

工具模式

工具模式,需要自己先定义工具,我这边没有定义,有兴趣的可以整一下。
以下是自带的工具进行的演示:我调用了一个查询天气的工具(tool_registry.py) 文件可以看到 get_weather的代码
7X0GG2

代码解释器模式

fC4KjC

总结

一开始的时候,没有按照Apple的官方说明安装PyTorch-Nightly,并配置MPS,结果效果喜人,一直在推理。后来配置后,感觉速度不亚于chatgpt3.5,答复效果也非常好。下一步开始使用chatGLM搭建私有知识库。

背景

​DataX 是一个异构数据源离线同步工具,本次需求是定时调度数据库,

  • mongodb、mysql定时同步到es
  • mongodb、mysql定时同步到StarRocks

原则是要配合海豚调度DolphinScheduler,但是DolphinScheduler目前看有点重,晚点评估。

job

  • mysql同步StarRocks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    {
    "job":
    {
    "setting":
    {
    "speed":
    {
    "channel": 3
    },
    "errorLimit":
    {
    "record": 0,
    "percentage": 0.02
    }
    },
    "content":
    [
    {
    "reader":
    {
    "name": "mysqlreader",
    "parameter":
    {
    "username": "root",
    "password": "XXX",
    "column":
    [
    "id",
    "user_name",
    "wecom_user_id",
    "sap_code",
    "password",
    "name",
    "cellphone",
    "email"
    ],
    "splitPk": "id",
    "connection":
    [
    {
    "table":
    [
    "user"
    ],
    "jdbcUrl":
    [
    "jdbc:mysql://192.168.103.113:3306/user"
    ]
    }
    ]
    }
    },
    "writer":
    {
    "name": "starrockswriter",
    "parameter":
    {
    "username": "root",
    "password": "XXX",
    "column":
    [
    "id",
    "user_name",
    "wecom_user_id",
    "sap_code",
    "password",
    "name",
    "cellphone",
    "email"
    ],
    "preSql":
    [],
    "postSql":
    [],
    "connection":
    [
    {
    "table":
    [
    "user"
    ],
    "jdbcUrl": "jdbc:mysql://192.168.103.202:9030/",
    "selectedDatabase": "assistant"
    }
    ],
    "loadUrl":
    [
    "192.168.103.202:8040" // FE的http port,这里直接用CN的port
    ],
    "loadProps":
    {}
    }
    }
    }
    ]
    }
    }
  • mongodb同步es

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    {
    "job":
    {
    "setting":
    {
    "speed":
    {
    "channel": 2
    }
    },
    "content":
    [
    {
    "reader":
    {
    "name": "mongodbreader",
    "parameter":
    {
    "address":
    [
    "192.168.103.113:27011"
    ],
    "userName": "root",
    "userPassword": "XXX",
    "authDb": "admin",
    "dbName": "biocitydb",
    "collectionName": "companies",
    "column":
    [
    {
    "name": "_id",
    "type": "string"
    },
    {
    "name": "name",
    "type": "string"
    },
    {
    "name": "sapCode",
    "type": "string"
    }
    ]
    }
    },
    "writer":
    {
    "name": "elasticsearchwriter",
    "parameter":
    {
    "endpoint": "http://192.168.199.113:9200",
    "accessId": "root",
    "accessKey": "XXX",
    "index": "companies",
    "type": "default",
    "cleanup": true,
    "settings":
    {
    "index":
    {
    "number_of_shards": 1,
    "number_of_replicas": 0
    }
    },
    "discovery": false,
    "batchSize": 1000,
    "splitter": ",",
    "column":
    [
    {
    "name": "id",
    "type": "id"
    },
    {
    "name": "name",
    "type": "keyword"
    },
    {
    "name": "sapCode",
    "type": "keyword"
    }
    ]
    }
    }
    }
    ]
    }
    }

启动

1
2
3
$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}

背景

开始统一数仓,seatunnel评估一轮

mongodb-cdc

  • mongodb-cdc实时同步mysql

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["pms"]
    collection = ["pms.demand_item_row"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "_id" : string,
    "ras" : string,
    "planSn" : string,
    "isPlaned" : boolean
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    _id = _id
    ras = ras
    planSn = planSn
    isPlaned = isPlaned
    }
    }
    }

    sink {
    jdbc {
    url = "jdbc:mysql://192.168.103.113:3306"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "XXX"
    generate_sink_sql = true
    database = test
    table = row
    primary_keys = ["_id"]
    }
    }
  • mongodb-cdc实时同步starrocks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["biocitydb"]
    collection = ["biocitydb.users"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "id" : int,
    "username" : string,
    "wecomUserId" : string,
    "password" : string,
    "name" : string,
    "cellphone" : string,
    "email" : string
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    id = id
    username = user_name
    wecomUserId = wecom_user_id
    password = password
    name = name
    cellphone = cellphone
    email = email
    }
    }
    }

    sink {
    StarRocks {
    batch_max_rows=10240
    table="user"
    database="assistant"
    base-url="jdbc:mysql://192.168.103.202:9030"
    password="XXX"
    username="root"
    nodeUrls=[
    "192.168.103.202:8040"
    ]
    }
    }
  • mongodb-cdc 实时同步es

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    MongoDB-CDC {
    hosts = "192.168.103.113:27011"
    database = ["pms"]
    collection = ["pms.demand_item_row"]
    username = root
    password = "XXX"
    schema = {
    fields {
    "_id" : string,
    "ras" : string,
    "planSn" : string,
    "isPlaned" : boolean
    }
    }
    }
    }

    transform {
    FieldMapper {
    field_mapper = {
    _id = _id
    ras = ras
    planSn = planSn
    isPlaned = isPlaned
    }
    }
    }

    sink {
    Elasticsearch {
    hosts = ["192.168.103.113:9200"]
    index = "row"
    username = "elastic"
    password = "XXX"
    tls_verify_certificate = false
    primary_keys = ["_id"]
    }
    }

sqlserver cdc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 启用CDC功能
EXEC sys.sp_cdc_enable_db;

-- 确认SQL Server Agent已开启
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'

-- 判断当前数据库是否启用了CDC(如果返回1,表示已启用)
SELECT name,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1;

-- 查询schema_id
SELECT * FROM sys.schemas

-- 判断当前数据表是否启用了CDC(如果返回1,表示已启用)
SELECT name, is_tracked_by_cdc, schema_id FROM sys.tables WHERE is_tracked_by_cdc = 1;

-- source_schema 是表所属的架构(schema)的名称。
-- source_name 是要启用 CDC 跟踪的表的名称。
-- cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。
EXEC sys.sp_cdc_enable_table
@source_schema = 'wmwhse1',
@source_name = 'PICKDETAIL',
@role_name = 'cdc_role';

-- 一次核验是否开启CDC
USE SCPRD;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO

启动

1
2
3
4
开始
./bin/seatunnel.sh --config ./example/mongo2sr3 -e local
// 恢复
./bin/seatunnel.sh --config ./example/mongo2sr3 -r {jobId} -e local

其他

SQL Server CDC功能配置总结

背景

内网k8s集群需求:StarRocks的9030端口或mysql的3306端口需要暴露出去,而他们TCP协议,是L4层服务,而ingress是http协议,是L7层服务,不能使用ingress暴露出去

  • k8s-Starrocks情况
    • services: starrocks/starrockscluster-fe-service
      eaVxzN

相关配置

  • deployment: ingress-nginx-controller配置

    • 增加 hostNetwork: true,pod中运行的应用程序可以直接看到宿主主机的网络接口,宿主机所在的局域网上所有网络接口都可以访问到该应用程序及端口
    • 增加 - '--tcp-services-configmap=$(POD_NAMESPACE)/tcp-services'
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      spec:
      hostNetwork: true // 增加
      containers:
      - name: controller
      image: dyrnq/ingress-nginx-controller:v1.6.4
      args:
      - /nginx-ingress-controller
      - '--election-id=ingress-nginx-leader'
      - '--controller-class=k8s.io/ingress-nginx'
      - '--ingress-class=nginx'
      - '--configmap=$(POD_NAMESPACE)/ingress-nginx-controller'
      - '--validating-webhook=:8443'
      - '--validating-webhook-certificate=/usr/local/certificates/cert'
      - '--validating-webhook-key=/usr/local/certificates/key'
      - '--tcp-services-configmap=$(POD_NAMESPACE)/tcp-services' // 增加
      - '--udp-services-configmap=$(POD_NAMESPACE)/udp-services'
  • 编写TCP/UDP端口转发规则实现L4层服务暴露
    kubectl create -f tcp-services-configmap.yaml -n ingress-nginx

    1
    2
    3
    4
    5
    6
    7
    8
    9
    kind: ConfigMap
    apiVersion: v1
    metadata:
    name: tcp-services
    namespace: ingress-nginx
    data:
    '8030': starrocks/starrockscluster-fe-service:8030
    '8040': starrocks/starrockscluster-cn-service:8040
    '9030': starrocks/starrockscluster-fe-service:9030
  • 验证TCP 端口的L4服务暴露,查看pod nginx-ingress-controller的ip

    1
    2
    3
    4
    5
    > kubectl get pod -n ingress-nginx -owide
    NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
    ingress-nginx-admission-create-cpjcl 0/1 Completed 0 70d 10.244.3.20 k8s-node3 <none> <none>
    ingress-nginx-admission-patch-r6ql7 0/1 Completed 0 70d 10.244.2.12 k8s-node1 <none> <none>
    ingress-nginx-controller-58bcff6c76-xdmzq 1/1 Running 0 14m 192.168.103.202 k8s-master1 <none> <none>
  • navicat连接
    I7GLoM

相关连接

Nginx+Ingress-controller解决服务暴露和负载均衡

背景

为了让mongodb能使用标准化的sql语句查询,我们使用官方的mongo-bi做一层转换,这一步是统一数仓的关键。

目前该方案不具可行性,原因:

  • bi-connect连接mongodb数据源必须开启ssl,这就导致外部客户端连接bi-connect必须useSSL=true,而StarRocks的catalogs未支持SSL
  • 不稳定,从BI的外连接看,经常报错

下载安装

bi-connector支持不同平台安装部署,这里针对Linux环境安装部署配置进行记录。

通过官网下载:https://www.mongodb.com/try/download/bi-connector

我这里下载的文件版本为mongodb-bi-linux-x86_64-rhel70-v2.14.12.tgz
下载后解压到/opt/mongodb-bi目录

创建证书

当MongoDB启用认证时,bi-connector必须要配置使用证书,才能通过bi-connector连接mongodb

这里先创建证书

1
2
3
4
5
#执行创建 SSL 证书
mkdir -p /opt/mongodb-bi/certs
cd /opt/mongodb-bi/certs
openssl req -nodes -newkey rsa:2048 -keyout dakeweBI.key -out dakeweBI.crt -x509 -days 365 -subj "/C=US/ST=dakeweBI/L=dakeweBI/O=dakeweBI Security/OU=IT Department/CN=kayakwise.com"
cat dakeweBI.crt dakeweBI.key > dakeweBI.pem

安装 MongoDB BI Connector

1
sudo install -m755 bin/mongo* /usr/bin/

配置 MongoDB BI 配置文件

1
2
3
mkdir -p /opt/mongodb-bi/conf/
mkdir -p /opt/mongodb-bi/logs/
mkdir -p /opt/mongodb-bi/schemas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
net:
bindIp: "0.0.0.0"
port: 3307
ssl:
mode: allowSSL
PEMKeyFile: '/opt/mongodb-bi/certs/dakeweBI.pem'
allowInvalidCertificates: true
minimumTLSVersion: TLS1_0
mongodb:
net:
uri: "mongodb://192.168.103.113:27011"
ssl:
enabled: false
auth:
username: root
password: 'XXX'
source: admin
mechanism: SCRAM-SHA-1
security:
enabled: true
defaultMechanism: "SCRAM-SHA-1"
defaultSource: "admin"
systemLog:
path: /opt/mongodb-bi/logs/mongosqld.log
verbosity: 2
logAppend: true
schema:
path: /opt/mongodb-bi/schemas
maxVarcharLength: 65535
processManagement:
service:
name: "mongosql"
displayName: "MongoSQL Service"
description: "MongoSQL accesses MongoDB data with SQL"

安装 MongoDB BI Connector 服务

1
2
3
4
5
mongosqld install --config /opt/mongodb-bi/conf/mongosqld-config.yml
#重新加载
systemctl daemon-reload
#设置开机自启
systemctl enable mongosql.service

启动

1
2
3
4
5
6
#执行生成 schema 
mongodrdl --host 192.168.103.113:27011 --username root --password XXX --db assistant --authenticationDatabase admin --authenticationMechanism SCRAM-SHA-1 --out /opt/mongodb-bi/schemas/schemas.drdl
# 临时启动
mongosqld --config=/opt/mongodb-bi/conf/mongosqld-config.yml
# 常驻启动
systemctl start mongosql.service

连接查询

  • mysql cil,注意用户名密码是mongodb的用户名密码

    1
    mysql --enable-cleartext-plugin --user='root?source=admin&mechanism=SCRAM-SHA-1' --host=192.168.103.153 --protocol=tcp --port=3307 -p
  • navicat,需要勾选使用ssl
    4qsgkY

  • jdbc连接需要添加额外的JDBC连接字符串
    characterEncoding=UTF-8&connectTimeout=5000&useSSL=true&allowPublicKeyRetrieval=true&verifyServerCertificate=false

其他相关命令

1
2
3
4
5
6
7
8
9
10
11
systemctl start mongosql.service
systemctl status mongosql.service
systemctl stop mongosql.service
systemctl disable mongosql.service
journalctl -u mongosql.service
rm /etc/systemd/system/mongosql.service
rm /etc/systemd/system/mongosql.service
rm /usr/lib/systemd/system/mongosql.service
rm /usr/lib/systemd/system/mongosql.service
systemctl daemon-reload
systemctl reset-failed

lSj3FY

k8s集群背景

k8s-1.25.4部署笔记(containerd)
k8s使用nfs作为动态storageClass存储

部署 StarRocks Operator

  • 添加定制资源 StarRocksCluster
    1
    kubectl apply -f https://raw.githubusercontent.com/StarRocks/starrocks-kubernetes-operator/main/deploy/starrocks.com_starrocksclusters.yaml
  • 部署 StarRocks Operator
    1
    kubectl apply -f https://raw.githubusercontent.com/StarRocks/starrocks-kubernetes-operator/main/deploy/operator.yaml

部署 StarRocks 集群

配置文件范例

我这里配置一个使用腾讯云COS做存算分离+storageClass做FE元数据持久化

shared_data_mode.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
apiVersion: starrocks.com/v1
kind: StarRocksCluster
metadata:
name: starrockscluster
namespace: starrocks
spec:
starRocksFeSpec:
image: starrocks/fe-ubuntu:3.2.2
replicas: 1
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 2
memory: 4Gi
# service:
# type: NodePort # export fe service
# ports:
# - name: query # fill the name from the fe service ports
# nodePort: 32755
# port: 9030
# containerPort: 9030
storageVolumes:
- name: fe-storage-meta
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 10Gi # the size of storage volume for metadata
mountPath: /opt/starrocks/fe/meta # the path of metadata
- name: fe-storage-log
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 1Gi # the size of storage volume for log
mountPath: /opt/starrocks/fe/log # the path of log
configMapInfo:
configMapName: starrockscluster-fe-cm
resolveKey: fe.conf
starRocksCnSpec:
image: starrocks/cn-ubuntu:3.2.2
replicas: 1
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 2
memory: 4Gi
configMapInfo:
configMapName: starrockscluster-cn-cm
resolveKey: cn.conf
storageVolumes:
- name: cn-storage-data
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 10Gi # the size of storage volume for data
mountPath: /opt/starrocks/cn/storage # the path of data
- name: cn-storage-log
storageClassName: "nfs-client" # you can remove this line if you want to use the default storage class
storageSize: 1Gi # the size of storage volume for log
mountPath: /opt/starrocks/cn/log # the path of log
starRocksFeProxySpec:
replicas: 1
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 1
memory: 2Gi
service:
type: NodePort # export fe proxy service
ports:
- name: http-port # fill the name from the fe proxy service ports
containerPort: 8080
nodePort: 30180 # The range of valid ports is 30000-32767
port: 8080
resolver: "kube-dns.kube-system.svc.cluster.local" # this is the default dns server.

---

# fe config
apiVersion: v1
kind: ConfigMap
metadata:
name: starrockscluster-fe-cm
namespace: starrocks
labels:
cluster: starrockscluster
data:
fe.conf: |
LOG_DIR = ${STARROCKS_HOME}/log
DATE = "$(date +%Y%m%d-%H%M%S)"
JAVA_OPTS="-Dlog4j2.formatMsgNoLookups=true -Xmx8192m -XX:+UseMembar -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:-CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:SoftRefLRUPolicyMSPerMB=0 -Xloggc:${LOG_DIR}/fe.gc.log.$DATE"
JAVA_OPTS_FOR_JDK_9="-Dlog4j2.formatMsgNoLookups=true -Xmx8192m -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -XX:+CMSClassUnloadingEnabled -XX:-CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:SoftRefLRUPolicyMSPerMB=0 -Xlog:gc*:${LOG_DIR}/fe.gc.log.$DATE:time"
JAVA_OPTS_FOR_JDK_11="-Dlog4j2.formatMsgNoLookups=true -Xmx8192m -XX:+UseG1GC -Xlog:gc*:${LOG_DIR}/fe.gc.log.$DATE:time"
http_port = 8030
rpc_port = 9020
query_port = 9030
edit_log_port = 9010
mysql_service_nio_enabled = true
sys_log_level = INFO
run_mode = shared_data
cloud_native_meta_port = 6090
# 是否允许 StarRocks 使用 FE 配置文件中指定的存储相关属性创建默认存储卷
enable_load_volume_from_conf = true
aws_s3_path = files-prod-1253767413/starrocks
aws_s3_region = ap-guangzhou
aws_s3_endpoint = https://cos.ap-guangzhou.myqcloud.com
aws_s3_access_key = XXX
aws_s3_secret_key = XXX

---

# cn config
apiVersion: v1
kind: ConfigMap
metadata:
name: starrockscluster-cn-cm
namespace: starrocks
labels:
cluster: starrockscluster
data:
cn.conf: |
sys_log_level = INFO
# ports for admin, web, heartbeat service
thrift_port = 9060
webserver_port = 8040
heartbeat_service_port = 9050
brpc_port = 8060
1
kubectl apply -f shared_data_mode.yaml

相关链接

使用 Operator 部署 StarRocks 集群

0%