search
数据采集 采集器 DataX

DataX

简介

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。在此,我们也可以将上述数据,导入成时序数据存储。

DataX 安装

点击下载

注意,目前此处只有 MySQL -> InfluxDB 的同步,其它同步方式待验证。

DataX 配置

更多不同数据源的配置方式,请参见 DataX 官方文档,此处只以 MySQL 为例。

假定我们有一个数据库的表 biz_license 需要同步,下面是一个简单地示例:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",

                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://172.16.0.20:10490/dataflux_license" # MySQL 数据库地址
                                ],
                                "table": [
                                    "biz_license" # 被同步的表名称
                                ]
                            }
                        ],
                        "password": "xxxxxxxx",        # 数据库密码
                        "username": "license-db-user", # 数据库用户名

                        "splitPk": "id",

                        "column": [ # 被同步的数据列
                            "uuid",
                            "owner",
                            "createAgent",
                            "md5Sign",
                            "effect",
                            "from_unixtime(createAt)"
                        ]
                    }
                },
                "writer": {
                    "name": "influxdbwriter",
                    "parameter": {
                        "column": [ # 写入时序之后的对应 tag/filed 名称
                            "uuid",
                            "owner",
                            "createAgent",
                            "md5Sign",
                            "effect",
                            "createAt"
                        ],
                        "columnType": [ # 标明tag/field/timestamp 列
                            "tag",
                            "tag",
                            "tag",
                            "field",
                            "field",
                            "timestamp"
                        ],

                        "sourceDbType": "RDB",                        # 数据源是关系数据库
                        "endpoint": "http://dataway:9528",            # 将数据同步到 dataway
                        "username": "xxxxxx",                         # 可随便填
                        "password": "xxxxxxxxxxxxxxxxx",              # 可随便填
                        "database": "tkn_6cb6e15edd9a40629673ee7ecd5b9f6e", # 填写 dataway 上配置的 token 值
                        "retentionPolicy": "rp3",                     # 如果不确定,则填 rp3
                        "measurement": "mysql_biz_license"            # 写入 influxDB 之后的 measurement name
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

有几点需要注意一下:

  • DataX 只支持将数据库中 Date 类型的数据当做时间戳。假定 MySQL 原始数据库中,时间格式是 Unix 时间戳,那么可以用 from_unixtime() 这个 MySQL 内置函数对时间进行转换
  • 由于 InfluxDB 对原始数据列进行了 tag/field 的区分,所以在配置字段映射时,需要对业务进行细致规划,不能导致时间线暴涨
  • 此处利用 DataX 将数据通过 Dataway 同步到 InfluxDB 中心存储,所以不需要配置 InfluxDB 的用户名、密码等信息。但需要配置 database 信息,此处我们填入 Dataway 的 token 字段,因为中心通过这个 token 来映射具体的 DB
  • Dataway 需要升级到特定的版本。较低版本的 Dataway 不支持 DataX 数据同步功能(TODO: 具体参见产品发布情况)
  • retentionPolicy 这个字段必须指定,不然 DataX 会使用 autogen 这个 RP,在我们中心,这个 RP 有特殊用途,不能在此处使用。(中心的默认 RP 可能调整,这个地方需要注意)
  • 各 field/tag key/value 要符合 InfluxDB 行协议标准,参见这里

将上述 json 保存为 sample.json,进入 DataX 安装目录,即可执行

$ cd path/to/datax
$ python bin/datax sample.json

执行完后,DataX 会输出类似如下的一些信息:

任务启动时刻                    : 2020-02-26 04:22:53
任务结束时刻                    : 2020-02-26 04:23:04
任务总计耗时                    :                 11s
任务平均流量                    :              170B/s
记录写入速度                    :              1rec/s
读出记录总数                    :                  16
读写失败总数                    :                   0