search
数据采集 采集源配置 flink 指标采集

flink 指标采集

简介

采集 flink 数据指标上报到 DataFlux 中。

前置条件

配置

flink 开启指标功能

  • flink v1.8 至 v1.10 版本,将 flink-v1.XX/opt/flink-metrics-influxdb-1.XXX.0.jar 复制到 flink-v1.XX/lib 目录下,官方描述链接
  • flink v1.11 版本,将 flink-v1.XX/opt/flink-metrics-influxdb-1.XXX.0.jar 复制到 flink-v1.XX/plugins/influxdb 目录下,官方描述链接

flink 通常会默认包含这个文件,如果在相应目录下找不到 metrics-influx 再进行复制。

修改 flink-v1.XX.X/conf/flink-conf.yaml 配置文件,添加以下内容:

metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.host: localhost        # DataKit 地址
metrics.reporter.influxdb.port: 8086             # DataKit HTTP 端口,默认是 9529
metrics.reporter.influxdb.db: flink              # flink 数据接口,需要和配置文件中对应,以添加对应的 tags
metrics.reporter.influxdb.interval: 20 SECONDS   # 数据上传间隔

重启 flink 程序,根据配置文件中的数据上传间隔,会将指标数据上报给 DataKit。

配置 datakit

进入 DataKit 安装目录下的 conf.d/flink 目录,复制 flink.conf.sample 并命名为 flink.conf。示例如下:

[[inputs.flink]]

    # 开启当前 db 数据接口
    db = "flink"

    # 自定义 tags
    # [inputs.flink.tags]
    # tags1 = "value1"

db 参数为开启的数据接口,与 flink 配置文件中的 metrics.reporter.influxdb.db 对应,可以是“一对一”或“一对多”

db 的值为最终数据的 measurement。同一个 db 共享同一份 tags

HTTP 服务器监听的 ip 和端口在 datakit.conf 中配置,默认为 0.0.0.0:9529

采集指标

指标 类型 单位
jobmanager_Status_JVM_CPU_Load fields float
jobmanager_Status_JVM_CPU_Time fields int
jobmanager_Status_JVM_ClassLoader_ClassesLoaded fields int
jobmanager_Status_JVM_ClassLoader_ClassesUnloaded fields int
jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count fields int
jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Time fields int
jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Count fields int
jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Time fields int
jobmanager_Status_JVM_Memory_Direct_Count fields int
jobmanager_Status_JVM_Memory_Direct_MemoryUsed fields int
jobmanager_Status_JVM_Memory_Direct_TotalCapacity fields int
jobmanager_Status_JVM_Memory_Heap_Committed fields int
jobmanager_Status_JVM_Memory_Heap_Max fields int
jobmanager_Status_JVM_Memory_Heap_Used fields int
jobmanager_Status_JVM_Memory_Mapped_Count fields int
jobmanager_Status_JVM_Memory_Mapped_MemoryUsed fields int
jobmanager_Status_JVM_Memory_Mapped_TotalCapacity fields int
jobmanager_Status_JVM_Memory_NonHeap_Committed fields int
jobmanager_Status_JVM_Memory_NonHeap_Max fields int
jobmanager_Status_JVM_Memory_NonHeap_Used fields int
jobmanager_Status_JVM_Threads_Count fields int
jobmanager_numRegisteredTaskManagers fields int
jobmanager_numRunningJobs fields int
jobmanager_taskSlotsAvailable fields int
jobmanager_taskSlotsTotal fields int

示例输出

flink,tag1=value1 jobmanager_Status_JVM_CPU_Load=0.002942097026604069,jobmanager_Status_JVM_CPU_Time=7740000000i,\
jobmanager_Status_JVM_ClassLoader_ClassesLoaded=7021i,jobmanager_Status_JVM_ClassLoader_ClassesUnloaded=9i,\
jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count=2i,jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Time=54i,\
jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Count=2i,jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Time=24i,\
jobmanager_Status_JVM_Memory_Direct_Count=11i,jobmanager_Status_JVM_Memory_Direct_MemoryUsed=525469i,\
jobmanager_Status_JVM_Memory_Direct_TotalCapacity=525468i,jobmanager_Status_JVM_Memory_Heap_Committed=1029177344i,\
jobmanager_Status_JVM_Memory_Heap_Max=1029177344i,jobmanager_Status_JVM_Memory_Heap_Used=92128400i,\
jobmanager_Status_JVM_Memory_Mapped_Count=0i,jobmanager_Status_JVM_Memory_Mapped_MemoryUsed=0i,\
jobmanager_Status_JVM_Memory_Mapped_TotalCapacity=0i,jobmanager_Status_JVM_Memory_NonHeap_Committed=59244544i,\
jobmanager_Status_JVM_Memory_NonHeap_Max=780140544i,jobmanager_Status_JVM_Memory_NonHeap_Used=56317792i,\
jobmanager_Status_JVM_Threads_Count=45i,jobmanager_numRegisteredTaskManagers=0i,jobmanager_numRunningJobs=0i,\
jobmanager_taskSlotsAvailable=0i,jobmanager_taskSlotsTotal=0i 1597130792538000000