DataX之离线数据源同步实践

背景

​DataX 是一个异构数据源离线同步工具,本次需求是定时调度数据库,

  • mongodb、mysql定时同步到es
  • mongodb、mysql定时同步到StarRocks

原则是要配合海豚调度DolphinScheduler,但是DolphinScheduler目前看有点重,晚点评估。

job

  • mysql同步StarRocks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    {
    "job":
    {
    "setting":
    {
    "speed":
    {
    "channel": 3
    },
    "errorLimit":
    {
    "record": 0,
    "percentage": 0.02
    }
    },
    "content":
    [
    {
    "reader":
    {
    "name": "mysqlreader",
    "parameter":
    {
    "username": "root",
    "password": "XXX",
    "column":
    [
    "id",
    "user_name",
    "wecom_user_id",
    "sap_code",
    "password",
    "name",
    "cellphone",
    "email"
    ],
    "splitPk": "id",
    "connection":
    [
    {
    "table":
    [
    "user"
    ],
    "jdbcUrl":
    [
    "jdbc:mysql://192.168.103.113:3306/user"
    ]
    }
    ]
    }
    },
    "writer":
    {
    "name": "starrockswriter",
    "parameter":
    {
    "username": "root",
    "password": "XXX",
    "column":
    [
    "id",
    "user_name",
    "wecom_user_id",
    "sap_code",
    "password",
    "name",
    "cellphone",
    "email"
    ],
    "preSql":
    [],
    "postSql":
    [],
    "connection":
    [
    {
    "table":
    [
    "user"
    ],
    "jdbcUrl": "jdbc:mysql://192.168.103.202:9030/",
    "selectedDatabase": "assistant"
    }
    ],
    "loadUrl":
    [
    "192.168.103.202:8040" // FE的http port,这里直接用CN的port
    ],
    "loadProps":
    {}
    }
    }
    }
    ]
    }
    }
  • mongodb同步es

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    {
    "job":
    {
    "setting":
    {
    "speed":
    {
    "channel": 2
    }
    },
    "content":
    [
    {
    "reader":
    {
    "name": "mongodbreader",
    "parameter":
    {
    "address":
    [
    "192.168.103.113:27011"
    ],
    "userName": "root",
    "userPassword": "XXX",
    "authDb": "admin",
    "dbName": "biocitydb",
    "collectionName": "companies",
    "column":
    [
    {
    "name": "_id",
    "type": "string"
    },
    {
    "name": "name",
    "type": "string"
    },
    {
    "name": "sapCode",
    "type": "string"
    }
    ]
    }
    },
    "writer":
    {
    "name": "elasticsearchwriter",
    "parameter":
    {
    "endpoint": "http://192.168.199.113:9200",
    "accessId": "root",
    "accessKey": "XXX",
    "index": "companies",
    "type": "default",
    "cleanup": true,
    "settings":
    {
    "index":
    {
    "number_of_shards": 1,
    "number_of_replicas": 0
    }
    },
    "discovery": false,
    "batchSize": 1000,
    "splitter": ",",
    "column":
    [
    {
    "name": "id",
    "type": "id"
    },
    {
    "name": "name",
    "type": "keyword"
    },
    {
    "name": "sapCode",
    "type": "keyword"
    }
    ]
    }
    }
    }
    ]
    }
    }

启动

1
2
3
$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}