search
数据采集 采集源配置 通过 zipkin 采集链路追踪数据

通过 zipkin 采集链路追踪数据

简介

接收 符合 zipkin 协议格式的链路数据,并把数据经过统一转换成 DataFlux 的链路数据后上报到 DataFlux 中。

前置条件

配置

使用 Zipkin 的数据发送到 DataKit一共需要三步:

第一步:在 DataKit 中开启链路数据接收服务

第二步:配置和获取链路数据监听的地址和端口

第三步:按项目需求对进行埋点,并配置数据上报地址为 DataKit 的链路数据接收地址

具体请参考(Python语言示例代码)

开启链路数据接收服务

打开 DataKit 采集源配置文件夹(默认路径为 DataKit 安装目录的 conf.d 文件夹),找到 conf.d/traceZipkin 文件夹,复制 traceZipkin.conf.sample 并命名为 traceZipkin.conf, 去掉 [inputs.traceZipkin] 前面的注释,即可开启链路数据接收服务,另外如果需要对链路数据添加其他自定义的标签,可以去掉 [inputs.traceZipkin.tags] 的注释,并添加自定义标签。

设置:

#[inputs.traceZipkin]
#       pathV1 = "/api/v1/spans"    # zipkin V1版本trace数据接收路径,默认与zipkin官方定义的路径相同
#       pathV2 = "/api/v2/spans"    # zipkin V2版本trace数据接收路径,默认与zipkin官方定义的路径相同
#       [inputs.traceZipkin.tags]   # 自定义标签组
#               tag1 = "tag1"       # 自定义标签1
#               tag2 = "tag2"       # 自定义标签2
#               tag3 = "tag3"       # 自定义标签3

配置和链路数据监听的地址和端口

打开 DataKit 安装目录下的 datakit.conf,找到 http_server_addr 配置项,配置数据接收的监听地址和端口,默认监听地址是 0.0.0.0,端口为 9529

完成以上配置后,即可获取:zipkin V1版本链路数据的接收地址HTTP协议//:绑定地址:链路端口/api/v1/spans;zipkin V2版本链路数据的接收地址HTTP协议//:绑定地址:链路端口/api/v2/spans。例如 DataWay 的地址是 1.2.3.4,配置的监听地址和端口是 0.0.0.0:9529,zipkin V1与V2版本链路数据的接收地址分别是: http://1.2.3.4:9529/api/v1/spanshttp://1.2.3.4:9529/api/v2/spans,如果是本机也可以使用 localhost127.0.0.1,如果是内网也可以使用内网地址。

注意:

  • V1版本请求的默认路径是 /api/v1/spans
  • V2版本请求的默认路径是 /api/v2/spans
  • 需保证数据采集端能访问该地址

埋点配置

通过 Zipkin 采集数据需要根据当前项目的开发语言引入对应的 Zipkin SDK,然后按照 SDK 的文档项目进行埋点,对于 DataFlux 的链路数据采集来说,只需要将 Zipkin SDK 中的数据上报地址配置为上一步中获取到的链路数据接收地址即可(相当于将 Datakit 当做 Zipkin Server)。

这里以 Zipkin 的 py-zipkin 客户端作为示范

第一步,引入相关客户端包

import requests
from py_zipkin import Encoding
from py_zipkin.zipkin import zipkin_span, create_http_headers_for_new_span

第二步,设置上报地址

def http_transport(body):
    # 将 zipkin_url 配置为 DataKit 的链路数据接收地址,以V2版本为例
    zipkin_url = "http://1.2.3.4:9529/api/v2/spans"
    #设置数据格式
    headers = {"Content-Type": "application/json"}
    requests.post(zipkin_url, data=body, headers=headers)

另外,同一种trace类型同一个版本数据格式也有所不同,这由 SDK 的 HTTP/POST 请求头中 Content-Type 来区分。目前 zipkin 支持类型如下表所示:

source version Content-Type
zipkin v1 application/json
zipkin v1 application/x-thrift
zipkin v2 application/json
zipkin v2 application/x-protobuf

第三步,埋点创建span信息

    with zipkin_span(
            service_name='webapp',  # 服务名
            span_name='index',     # span 名
            transport_handler=http_transport,  # 自定义上报函数,http_transport 是我们自定义的上报函数
            port=5010,  # 本服务的端口
            sample_rate=100,  # 采样率,设置范围:0.0 - 100.0,我们示例的 100 代表每一个请求都会被追踪。
            encoding= Encoding.V2_JSON,
    )as zipkin_context:
        # 添加 binary_annotation:自定义信息
        zipkin_context.update_binary_annotations({
            "key": "jdbc.query",
            "value": "select distinct",
        }
        )
        time.sleep(1)
        # index span 下建立子 span,只需设置好service_name,span_name,
        with zipkin_span(service_name='webapp', span_name='do_stuff', encoding= Encoding.V2_JSON,):
            do_stuff()

其他语言的 SDK,依次类推,配置成功后约 1-2 分钟即可在 DataFlux Studio 的 「链路追踪」中查看相关的链路数据。

示例代码

以 Python 语言作为示例代码,其它编程语言与此类似,示例中SERVICE_A提供 HTTP 服务,并且调用SERVICE_B HTTP服务。

SERVICE_A

import requests
from flask import Flask
from py_zipkin.zipkin import zipkin_span, create_http_headers_for_new_span
import time

app = Flask(__name__)

# 这里将数据上报的host,prot配置,也可以通过常量(python中变量也可以)的方式放置
app.config.update({
    "DATAKIT_HOST": "10.100.64.205",  # 数据上报的host
    "DATAKIT_PORT": "9529",
})


def do_stuff():
    time.sleep(2)
    # 当有需要跨服务请求时,create_http_headers_for_new_span() 将 trace_id,span_id,parent_span_id,is_sampled 等关键信息存入请求头里传送给另外一个服务的span里
    headers = create_http_headers_for_new_span()
    requests.get('http://localhost:6000/service1/', headers=headers)
    return 'OK'


def http_transport(encoded_span):
    body = encoded_span

    # 上报地址,host,prot,直接拿配置好的
    zipkin_url = "http://{host}:{port}/api/v1/spans".format(host=app.config["DATAKIT_HOST"],
                                                            port=app.config["DATAKIT_PORT"])
    # 请求头设置使用 thrift 格式
    headers = {"Content-Type": "application/x-thrift"}

    # http post 方式上报数据,建议使用 try/except 对上报失败的数据做处理。
    r = requests.post(zipkin_url, data=body, headers=headers)


@app.route('/')
def index():
    with zipkin_span(
            service_name='SERVICE_A',         # 服务名
            span_name='index',                # span 名
            transport_handler=http_transport, # 自定义上报函数,http_transport 是我们自定义的上报函数
            port=5010,                        # 本服务的端口
            sample_rate=100,                  # 采样率,设置范围:0.0 - 100.0,我们示例的 100 代表每一个请求都会被追踪。
    )as zipkin_context:
        # 添加 binary_annotation:自定义信息
        zipkin_context.update_binary_annotations({
            "key": "jdbc.query",
            "value": "select distinct",
        }
        )
        time.sleep(1)
        # index span 下建立子 span,只需设置好service_name,span_name,
        with zipkin_span(service_name='SERVICE_A', span_name='do_stuff'):
            do_stuff()
        time.sleep(1)
    return 'OK', 200


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=5010, debug=True) 

SERVICE_B

from flask import request
import requests
from flask import Flask
from py_zipkin.zipkin import zipkin_span,ZipkinAttrs
import time

app = Flask(__name__)
app.config.update({
    "DATAKIT_HOST": "10.100.64.205",
    "DATAKIT_PORT": "9529",
})


def do_stuff():
    time.sleep(2)
    return 'OK'


def http_transport(encoded_span):

    body=encoded_span
    zipkin_url = "http://{host}:{port}/api/v1/spans".format(host=app.config["DATAKIT_HOST"], port=app.config["DATAKIT_PORT"])
    headers = {"Content-Type": "application/x-thrift"}
    requests.post(zipkin_url, data=body, headers=headers)
    print(body)


@app.route('/service1/')
def index():
    with zipkin_span(
        service_name='SERVICE_B',
        zipkin_attrs=ZipkinAttrs(
            trace_id=request.headers['X-B3-TraceID'],
            span_id=request.headers['X-B3-SpanID'],
            parent_span_id=request.headers['X-B3-ParentSpanID'],
            flags=request.headers['X-B3-Flags'],
            is_sampled=request.headers['X-B3-Sampled'],
        ),
        span_name='index_service1',
        transport_handler=http_transport,
        port=6000,
        sample_rate=100,
    ):
        with zipkin_span(service_name='SERVICE_B', span_name='service1_do_stuff'):
            do_stuff()
    return 'OK', 200


if __name__=='__main__':
    app.run(host="0.0.0.0", port=6000, debug=True) 

采集数据

具体请参考 链路数据采集文档中的字段说明

事件

该数据源无关键事件数据

日志

该数据源无日志

FAQ

配置好后为什么在 DataFlux Studio 中看不到数据?

请按照下面的项依次进行检查:

  1. 确认更改了 DataKit 的相关配置文件后是否重启 DataKit
  2. 请确认在 DataKit 中配置的上报地址是否正确
  3. 请确认采集链路数据的网络环境是否能访问 DataKit 的链路数据接收地址
  4. 开启 DataKit Debug 模式(在 datakit.conf 中将 log_level 设置为 debug,重启),查看日志 datakit.log,如果有链路数据,又产生如下的 debug 日志。