数据采集 采集源配置 flink 指标采集
flink 指标采集
简介
采集 flink 数据指标上报到 DataFlux 中。
前置条件
- 已安装 flink v1.8 及以上版本(flink 官方安装文档)并开启监控指标功能
- 已安装 DataKit(DataKit 安装文档)
配置
flink 开启指标功能
- flink v1.8 至 v1.10 版本,将
flink-v1.XX/opt/flink-metrics-influxdb-1.XXX.0.jar
复制到flink-v1.XX/lib
目录下,官方描述链接。 - flink v1.11 版本,将
flink-v1.XX/opt/flink-metrics-influxdb-1.XXX.0.jar
复制到flink-v1.XX/plugins/influxdb
目录下,官方描述链接。
flink 通常会默认包含这个文件,如果在相应目录下找不到 metrics-influx
再进行复制。
修改 flink-v1.XX.X/conf/flink-conf.yaml
配置文件,添加以下内容:
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.host: localhost # DataKit 地址
metrics.reporter.influxdb.port: 8086 # DataKit HTTP 端口,默认是 9529
metrics.reporter.influxdb.db: flink # flink 数据接口,需要和配置文件中对应,以添加对应的 tags
metrics.reporter.influxdb.interval: 20 SECONDS # 数据上传间隔
重启 flink 程序,根据配置文件中的数据上传间隔,会将指标数据上报给 DataKit。
配置 datakit
进入 DataKit 安装目录下的 conf.d/flink
目录,复制 flink.conf.sample
并命名为 flink.conf
。示例如下:
[[inputs.flink]]
# 开启当前 db 数据接口
db = "flink"
# 自定义 tags
# [inputs.flink.tags]
# tags1 = "value1"
db
参数为开启的数据接口,与 flink 配置文件中的metrics.reporter.influxdb.db
对应,可以是“一对一”或“一对多”
db
的值为最终数据的 measurement。同一个db
共享同一份 tags
HTTP 服务器监听的 ip 和端口在
datakit.conf
中配置,默认为0.0.0.0:9529
采集指标
指标 | 类型 | 单位 |
---|---|---|
jobmanager_Status_JVM_CPU_Load | fields | float |
jobmanager_Status_JVM_CPU_Time | fields | int |
jobmanager_Status_JVM_ClassLoader_ClassesLoaded | fields | int |
jobmanager_Status_JVM_ClassLoader_ClassesUnloaded | fields | int |
jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count | fields | int |
jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Time | fields | int |
jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Count | fields | int |
jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Time | fields | int |
jobmanager_Status_JVM_Memory_Direct_Count | fields | int |
jobmanager_Status_JVM_Memory_Direct_MemoryUsed | fields | int |
jobmanager_Status_JVM_Memory_Direct_TotalCapacity | fields | int |
jobmanager_Status_JVM_Memory_Heap_Committed | fields | int |
jobmanager_Status_JVM_Memory_Heap_Max | fields | int |
jobmanager_Status_JVM_Memory_Heap_Used | fields | int |
jobmanager_Status_JVM_Memory_Mapped_Count | fields | int |
jobmanager_Status_JVM_Memory_Mapped_MemoryUsed | fields | int |
jobmanager_Status_JVM_Memory_Mapped_TotalCapacity | fields | int |
jobmanager_Status_JVM_Memory_NonHeap_Committed | fields | int |
jobmanager_Status_JVM_Memory_NonHeap_Max | fields | int |
jobmanager_Status_JVM_Memory_NonHeap_Used | fields | int |
jobmanager_Status_JVM_Threads_Count | fields | int |
jobmanager_numRegisteredTaskManagers | fields | int |
jobmanager_numRunningJobs | fields | int |
jobmanager_taskSlotsAvailable | fields | int |
jobmanager_taskSlotsTotal | fields | int |
示例输出
flink,tag1=value1 jobmanager_Status_JVM_CPU_Load=0.002942097026604069,jobmanager_Status_JVM_CPU_Time=7740000000i,\
jobmanager_Status_JVM_ClassLoader_ClassesLoaded=7021i,jobmanager_Status_JVM_ClassLoader_ClassesUnloaded=9i,\
jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count=2i,jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Time=54i,\
jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Count=2i,jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Time=24i,\
jobmanager_Status_JVM_Memory_Direct_Count=11i,jobmanager_Status_JVM_Memory_Direct_MemoryUsed=525469i,\
jobmanager_Status_JVM_Memory_Direct_TotalCapacity=525468i,jobmanager_Status_JVM_Memory_Heap_Committed=1029177344i,\
jobmanager_Status_JVM_Memory_Heap_Max=1029177344i,jobmanager_Status_JVM_Memory_Heap_Used=92128400i,\
jobmanager_Status_JVM_Memory_Mapped_Count=0i,jobmanager_Status_JVM_Memory_Mapped_MemoryUsed=0i,\
jobmanager_Status_JVM_Memory_Mapped_TotalCapacity=0i,jobmanager_Status_JVM_Memory_NonHeap_Committed=59244544i,\
jobmanager_Status_JVM_Memory_NonHeap_Max=780140544i,jobmanager_Status_JVM_Memory_NonHeap_Used=56317792i,\
jobmanager_Status_JVM_Threads_Count=45i,jobmanager_numRegisteredTaskManagers=0i,jobmanager_numRunningJobs=0i,\
jobmanager_taskSlotsAvailable=0i,jobmanager_taskSlotsTotal=0i 1597130792538000000