StarRocks之数据导入实践

背景及数据准备

本次仅记录一些常见的导入

  • 纽约市交通事故数据
    1
    curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/quickstart/datasets/NYPD_Crash_Data.csv
  • 天气数据
    1
    curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/quickstart/datasets/72505394728.csv
  • 用户行为数据集
    1
    curl -O https://starrocks-datasets.s3.amazonaws.com/user_behavior_ten_million_rows.parquet
  • file1.csv
    1
    2
    3
    4
    1,Lily,21
    2,Rose,22
    3,Alice,23
    4,Julia,24
  • file2.csv
    1
    2
    3
    4
    5,Tony,25
    6,Adam,26
    7,Allen,27
    8,Jacky,28
  • file3.json
    1
    {"name": "北京", "code": 2}

从本地文件系统导入(Stream Load)

TIP:http://<fe_host>:<fe_http_port>默认端口号为8030; 也可以是http://<be_host>:<be_http_port>默认端口号为 8040。

  • table1表或table2表

    • 创建table1表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      CREATE TABLE `table1`
      (
      `id` int(11) NOT NULL COMMENT "用户 ID",
      `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
      `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);
    • 导入file1.json
      1
      2
      3
      4
      5
      6
      curl --location-trusted -u <username>:<password> -H "label:123" \
      -H "Expect:100-continue" \
      -H "column_separator:," \
      -H "columns: id, name, score" \
      -T file1.csv -XPUT \
      http://<fe_host>:<fe_http_port>/api/import_example/table1/_stream_load
  • table_json表

    • 创建 table_json表
      1
      2
      3
      4
      5
      6
      7
      8
      CREATE TABLE `table_json`
      (
      `id` int(11) NOT NULL COMMENT "城市 ID",
      `city` varchar(65533) NULL COMMENT "城市名称"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);
    • 导入file3.json
      1
      2
      3
      4
      5
      6
      curl -v --location-trusted -u <username>:<password> -H "strict_mode: true" \
      -H "Expect:100-continue" \
      -H "format: json" -H "jsonpaths: [\"$.name\", \"$.code\"]" \
      -H "columns: city,tmp_id, id = tmp_id * 100" \
      -T file3.json -XPUT \
      http://<fe_host>:<fe_http_port>/api/import_example/table_json/_stream_load
  • crashdata 表

    • 创建 crashdata 表,用于存储交通事故数据集中的数据。该表的字段经过裁剪,仅包含与该教程相关字段。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      CREATE DATABASE IF NOT EXISTS import_example;
      USE import_example;

      CREATE TABLE IF NOT EXISTS crashdata (
      CRASH_DATE DATETIME,
      BOROUGH STRING,
      ZIP_CODE STRING,
      LATITUDE INT,
      LONGITUDE INT,
      LOCATION STRING,
      ON_STREET_NAME STRING,
      CROSS_STREET_NAME STRING,
      OFF_STREET_NAME STRING,
      CONTRIBUTING_FACTOR_VEHICLE_1 STRING,
      CONTRIBUTING_FACTOR_VEHICLE_2 STRING,
      COLLISION_ID INT,
      VEHICLE_TYPE_CODE_1 STRING,
      VEHICLE_TYPE_CODE_2 STRING
      );
    • 导入纽约市交通事故数据
      1
      2
      3
      4
      5
      6
      7
      8
      9
      curl --location-trusted -u root             \
      -T ./NYPD_Crash_Data.csv \
      -H "label:crashdata-0" \
      -H "column_separator:," \
      -H "skip_header:1" \
      -H "enclose:\"" \
      -H "max_filter_ratio:1" \
      -H "columns:tmp_CRASH_DATE, tmp_CRASH_TIME, CRASH_DATE=str_to_date(concat_ws(' ', tmp_CRASH_DATE, tmp_CRASH_TIME), '%m/%d/%Y %H:%i'),BOROUGH,ZIP_CODE,LATITUDE,LONGITUDE,LOCATION,ON_STREET_NAME,CROSS_STREET_NAME,OFF_STREET_NAME,NUMBER_OF_PERSONS_INJURED,NUMBER_OF_PERSONS_KILLED,NUMBER_OF_PEDESTRIANS_INJURED,NUMBER_OF_PEDESTRIANS_KILLED,NUMBER_OF_CYCLIST_INJURED,NUMBER_OF_CYCLIST_KILLED,NUMBER_OF_MOTORIST_INJURED,NUMBER_OF_MOTORIST_KILLED,CONTRIBUTING_FACTOR_VEHICLE_1,CONTRIBUTING_FACTOR_VEHICLE_2,CONTRIBUTING_FACTOR_VEHICLE_3,CONTRIBUTING_FACTOR_VEHICLE_4,CONTRIBUTING_FACTOR_VEHICLE_5,COLLISION_ID,VEHICLE_TYPE_CODE_1,VEHICLE_TYPE_CODE_2,VEHICLE_TYPE_CODE_3,VEHICLE_TYPE_CODE_4,VEHICLE_TYPE_CODE_5" \
      -XPUT http://<fe_host>:<fe_http_port>/api/import_example/crashdata/_stream_load
  • weatherdata 表

    • 创建 weatherdata 表,用于存储天气数据集中的数据。该表的字段同样经过裁剪,仅包含与该教程相关字段。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      CREATE DATABASE IF NOT EXISTS import_example;
      USE import_example;
      CREATE TABLE IF NOT EXISTS weatherdata (
      DATE DATETIME,
      NAME STRING,
      HourlyDewPointTemperature STRING,
      HourlyDryBulbTemperature STRING,
      HourlyPrecipitation STRING,
      HourlyPresentWeatherType STRING,
      HourlyPressureChange STRING,
      HourlyPressureTendency STRING,
      HourlyRelativeHumidity STRING,
      HourlySkyConditions STRING,
      HourlyVisibility STRING,
      HourlyWetBulbTemperature STRING,
      HourlyWindDirection STRING,
      HourlyWindGustSpeed STRING,
      HourlyWindSpeed STRING
      );
    • 导入天气数据
      1
      2
      3
      4
      5
      6
      7
      8
      9
      curl --location-trusted -u root             \
      -T ./72505394728.csv \
      -H "label:weather-0" \
      -H "column_separator:," \
      -H "skip_header:1" \
      -H "enclose:\"" \
      -H "max_filter_ratio:1" \
      -H "columns: STATION, DATE, LATITUDE, LONGITUDE, ELEVATION, NAME, REPORT_TYPE, SOURCE, HourlyAltimeterSetting, HourlyDewPointTemperature, HourlyDryBulbTemperature, HourlyPrecipitation, HourlyPresentWeatherType, HourlyPressureChange, HourlyPressureTendency, HourlyRelativeHumidity, HourlySkyConditions, HourlySeaLevelPressure, HourlyStationPressure, HourlyVisibility, HourlyWetBulbTemperature, HourlyWindDirection, HourlyWindGustSpeed, HourlyWindSpeed, Sunrise, Sunset, DailyAverageDewPointTemperature, DailyAverageDryBulbTemperature, DailyAverageRelativeHumidity, DailyAverageSeaLevelPressure, DailyAverageStationPressure, DailyAverageWetBulbTemperature, DailyAverageWindSpeed, DailyCoolingDegreeDays, DailyDepartureFromNormalAverageTemperature, DailyHeatingDegreeDays, DailyMaximumDryBulbTemperature, DailyMinimumDryBulbTemperature, DailyPeakWindDirection, DailyPeakWindSpeed, DailyPrecipitation, DailySnowDepth, DailySnowfall, DailySustainedWindDirection, DailySustainedWindSpeed, DailyWeather, MonthlyAverageRH, MonthlyDaysWithGT001Precip, MonthlyDaysWithGT010Precip, MonthlyDaysWithGT32Temp, MonthlyDaysWithGT90Temp, MonthlyDaysWithLT0Temp, MonthlyDaysWithLT32Temp, MonthlyDepartureFromNormalAverageTemperature, MonthlyDepartureFromNormalCoolingDegreeDays, MonthlyDepartureFromNormalHeatingDegreeDays, MonthlyDepartureFromNormalMaximumTemperature, MonthlyDepartureFromNormalMinimumTemperature, MonthlyDepartureFromNormalPrecipitation, MonthlyDewpointTemperature, MonthlyGreatestPrecip, MonthlyGreatestPrecipDate, MonthlyGreatestSnowDepth, MonthlyGreatestSnowDepthDate, MonthlyGreatestSnowfall, MonthlyGreatestSnowfallDate, MonthlyMaxSeaLevelPressureValue, MonthlyMaxSeaLevelPressureValueDate, MonthlyMaxSeaLevelPressureValueTime, MonthlyMaximumTemperature, MonthlyMeanTemperature, MonthlyMinSeaLevelPressureValue, MonthlyMinSeaLevelPressureValueDate, MonthlyMinSeaLevelPressureValueTime, MonthlyMinimumTemperature, MonthlySeaLevelPressure, MonthlyStationPressure, MonthlyTotalLiquidPrecipitation, MonthlyTotalSnowfall, MonthlyWetBulb, AWND, CDSD, CLDD, DSNW, HDSD, HTDD, NormalsCoolingDegreeDay, NormalsHeatingDegreeDay, ShortDurationEndDate005, ShortDurationEndDate010, ShortDurationEndDate015, ShortDurationEndDate020, ShortDurationEndDate030, ShortDurationEndDate045, ShortDurationEndDate060, ShortDurationEndDate080, ShortDurationEndDate100, ShortDurationEndDate120, ShortDurationEndDate150, ShortDurationEndDate180, ShortDurationPrecipitationValue005, ShortDurationPrecipitationValue010, ShortDurationPrecipitationValue015, ShortDurationPrecipitationValue020, ShortDurationPrecipitationValue030, ShortDurationPrecipitationValue045, ShortDurationPrecipitationValue060, ShortDurationPrecipitationValue080, ShortDurationPrecipitationValue100, ShortDurationPrecipitationValue120, ShortDurationPrecipitationValue150, ShortDurationPrecipitationValue180, REM, BackupDirection, BackupDistance, BackupDistanceUnit, BackupElements, BackupElevation, BackupEquipment, BackupLatitude, BackupLongitude, BackupName, WindEquipmentChangeDate" \
      -XPUT http://<fe_host>:<fe_http_port>/api/import_example/weatherdata/_stream_load

从云储存导入 ✅

将文件放到cos的云储存,导入到StarRocks内
mdDEHG

  • 通过如下语句,把COS存储空间race-1301288126input文件夹内数据文件user_behavior_ten_million_rows.parquet的数据导入到目标表 user_behavior_replica
    • 建表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      CREATE DATABASE IF NOT EXISTS mydatabase;
      USE mydatabase;

      CREATE TABLE user_behavior_replica
      (
      UserID int(11),
      ItemID int(11),
      CategoryID int(11),
      BehaviorType varchar(65533),
      Timestamp varbinary
      )
      ENGINE = OLAP
      DUPLICATE KEY(UserID)
      DISTRIBUTED BY HASH(UserID);
    • 导入任务
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      LOAD LABEL mydatabase.label_brokerloadtest_502
      (
      DATA INFILE("cosn://race-1301288126/input/user_behavior_ten_million_rows.parquet")
      INTO TABLE user_behavior_replica
      FORMAT AS "parquet"
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      )
      PROPERTIES
      (
      "timeout" = "3600"
      );
  • 通过如下语句,把COS存储空间race-1301288126input文件夹内数据文件file1.csvfile2.csv的数据导入到目标表table1table2
    • 建表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      CREATE TABLE `table1`
      (
      `id` int(11) NOT NULL COMMENT "用户 ID",
      `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
      `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);

      CREATE TABLE `table2`
      (
      `id` int(11) NOT NULL COMMENT "用户 ID",
      `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
      `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
      )
      ENGINE=OLAP
      PRIMARY KEY(`id`)
      DISTRIBUTED BY HASH(`id`);
    • 导入单个文件到单表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      LOAD LABEL mydatabase.label_brokerloadtest_501
      (
      DATA INFILE("cosn://race-1301288126/input/file1.csv")
      INTO TABLE table1
      COLUMNS TERMINATED BY ","
      (id, name, score)
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      )
      PROPERTIES
      (
      "timeout" = "3600"
      );
    • 导入多个文件到单表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      LOAD LABEL mydatabase.label_brokerloadtest_402
      (
      DATA INFILE("cosn://race-1301288126/input/*")
      INTO TABLE table1
      COLUMNS TERMINATED BY ","
      (id, name, score)
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      )
      PROPERTIES
      (
      "timeout" = "3600"
      );
    • 导入多个数据文件到多表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      LOAD LABEL mydatabase.label_brokerloadtest_403
      (
      DATA INFILE("cosn://race-1301288126/input/file1.csv")
      INTO TABLE table1
      COLUMNS TERMINATED BY ","
      (id, name, score)
      ,
      DATA INFILE("cosn://race-1301288126/input/file2.csv")
      INTO TABLE table2
      COLUMNS TERMINATED BY ","
      (id, name, score)
      )
      WITH BROKER
      (
      "fs.cosn.userinfo.secretId" = "xxx",
      "fs.cosn.userinfo.secretKey" = "xxx",
      "fs.cosn.bucket.endpoint_suffix" = "cos.ap-guangzhou.myqcloud.com"
      );
      PROPERTIES
      (
      "timeout" = "3600"
      );

其他数据集

  • 1.3 亿条亚马逊产品的用户评论信息,总大小约为 37GB

    每行包含用户 ID(customer_id)、评论 ID(review_id)、已购买产品 ID(product_id)、产品分类(product_category)、评分(star_rating)、评论标题(review_headline)、评论内容(review_body)等 15 列信息。
    amazon_reviews/amazon_reviews_2010
    amazon_reviews/amazon_reviews_2011
    amazon_reviews/amazon_reviews_2012
    amazon_reviews/amazon_reviews_2013
    amazon_reviews/amazon_reviews_2014
    amazon_reviews/amazon_reviews_2015

  • 建表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    CREATE TABLE `amazon_reviews` (  
    `review_date` int(11) NULL,
    `marketplace` varchar(20) NULL,
    `customer_id` bigint(20) NULL,
    `review_id` varchar(40) NULL,
    `product_id` varchar(10) NULL,
    `product_parent` bigint(20) NULL,
    `product_title` varchar(500) NULL,
    `product_category` varchar(50) NULL,
    `star_rating` smallint(6) NULL,
    `helpful_votes` int(11) NULL,
    `total_votes` int(11) NULL,
    `vine` boolean NULL,
    `verified_purchase` boolean NULL,
    `review_headline` varchar(500) NULL,
    `review_body` string NULL
    ) ENGINE=OLAP
    DUPLICATE KEY(`review_date`)
    COMMENT 'OLAP'
    DISTRIBUTED BY HASH(`review_date`);