通过 zipkin 采集链路追踪数据
简介
接收 符合 zipkin 协议格式的链路数据,并把数据经过统一转换成 DataFlux 的链路数据后上报到 DataFlux 中。
前置条件
已安装 DataKit(DataKit 安装文档)
部署对应语言的 Zipkin SDK,并配置数据上报地址为 DataKit 提供的地址
配置
使用 Zipkin 的数据发送到 DataKit一共需要三步:
第一步:在 DataKit 中开启链路数据接收服务
第二步:配置和获取链路数据监听的地址和端口
第三步:按项目需求对进行埋点,并配置数据上报地址为 DataKit 的链路数据接收地址
具体请参考(Python语言示例代码)
开启链路数据接收服务
打开 DataKit 采集源配置文件夹(默认路径为 DataKit 安装目录的 conf.d
文件夹),找到 conf.d/traceZipkin 文件夹,复制 traceZipkin.conf.sample 并命名为 traceZipkin.conf, 去掉 [inputs.traceZipkin]
前面的注释,即可开启链路数据接收服务,另外如果需要对链路数据添加其他自定义的标签,可以去掉 [inputs.traceZipkin.tags]
的注释,并添加自定义标签。
设置:
#[inputs.traceZipkin]
# pathV1 = "/api/v1/spans" # zipkin V1版本trace数据接收路径,默认与zipkin官方定义的路径相同
# pathV2 = "/api/v2/spans" # zipkin V2版本trace数据接收路径,默认与zipkin官方定义的路径相同
# [inputs.traceZipkin.tags] # 自定义标签组
# tag1 = "tag1" # 自定义标签1
# tag2 = "tag2" # 自定义标签2
# tag3 = "tag3" # 自定义标签3
配置和链路数据监听的地址和端口
打开 DataKit 安装目录下的 datakit.conf
,找到 http_server_addr
配置项,配置数据接收的监听地址和端口,默认监听地址是 0.0.0.0
,端口为 9529
完成以上配置后,即可获取:zipkin V1版本链路数据的接收地址HTTP协议//:绑定地址:链路端口/api/v1/spans
;zipkin V2版本链路数据的接收地址HTTP协议//:绑定地址:链路端口/api/v2/spans
。例如 DataWay 的地址是 1.2.3.4
,配置的监听地址和端口是 0.0.0.0:9529
,zipkin V1与V2版本链路数据的接收地址分别是: http://1.2.3.4:9529/api/v1/spans
与http://1.2.3.4:9529/api/v2/spans
,如果是本机也可以使用 localhost
或 127.0.0.1
,如果是内网也可以使用内网地址。
注意:
- V1版本请求的默认路径是
/api/v1/spans
- V2版本请求的默认路径是
/api/v2/spans
- 需保证数据采集端能访问该地址
埋点配置
通过 Zipkin 采集数据需要根据当前项目的开发语言引入对应的 Zipkin SDK,然后按照 SDK 的文档项目进行埋点,对于 DataFlux 的链路数据采集来说,只需要将 Zipkin SDK 中的数据上报地址配置为上一步中获取到的链路数据接收地址即可(相当于将 Datakit 当做 Zipkin Server)。
这里以 Zipkin 的 py-zipkin 客户端作为示范
第一步,引入相关客户端包
import requests
from py_zipkin import Encoding
from py_zipkin.zipkin import zipkin_span, create_http_headers_for_new_span
第二步,设置上报地址
def http_transport(body):
# 将 zipkin_url 配置为 DataKit 的链路数据接收地址,以V2版本为例
zipkin_url = "http://1.2.3.4:9529/api/v2/spans"
#设置数据格式
headers = {"Content-Type": "application/json"}
requests.post(zipkin_url, data=body, headers=headers)
另外,同一种trace类型同一个版本数据格式也有所不同,这由 SDK 的 HTTP/POST 请求头中 Content-Type
来区分。目前 zipkin 支持类型如下表所示:
source | version | Content-Type |
---|---|---|
zipkin | v1 | application/json |
zipkin | v1 | application/x-thrift |
zipkin | v2 | application/json |
zipkin | v2 | application/x-protobuf |
第三步,埋点创建span信息
with zipkin_span(
service_name='webapp', # 服务名
span_name='index', # span 名
transport_handler=http_transport, # 自定义上报函数,http_transport 是我们自定义的上报函数
port=5010, # 本服务的端口
sample_rate=100, # 采样率,设置范围:0.0 - 100.0,我们示例的 100 代表每一个请求都会被追踪。
encoding= Encoding.V2_JSON,
)as zipkin_context:
# 添加 binary_annotation:自定义信息
zipkin_context.update_binary_annotations({
"key": "jdbc.query",
"value": "select distinct",
}
)
time.sleep(1)
# index span 下建立子 span,只需设置好service_name,span_name,
with zipkin_span(service_name='webapp', span_name='do_stuff', encoding= Encoding.V2_JSON,):
do_stuff()
其他语言的 SDK,依次类推,配置成功后约 1-2 分钟即可在 DataFlux Studio 的 「链路追踪」中查看相关的链路数据。
示例代码
以 Python 语言作为示例代码,其它编程语言与此类似,示例中SERVICE_A
提供 HTTP 服务,并且调用SERVICE_B
HTTP服务。
SERVICE_A
import requests
from flask import Flask
from py_zipkin.zipkin import zipkin_span, create_http_headers_for_new_span
import time
app = Flask(__name__)
# 这里将数据上报的host,prot配置,也可以通过常量(python中变量也可以)的方式放置
app.config.update({
"DATAKIT_HOST": "10.100.64.205", # 数据上报的host
"DATAKIT_PORT": "9529",
})
def do_stuff():
time.sleep(2)
# 当有需要跨服务请求时,create_http_headers_for_new_span() 将 trace_id,span_id,parent_span_id,is_sampled 等关键信息存入请求头里传送给另外一个服务的span里
headers = create_http_headers_for_new_span()
requests.get('http://localhost:6000/service1/', headers=headers)
return 'OK'
def http_transport(encoded_span):
body = encoded_span
# 上报地址,host,prot,直接拿配置好的
zipkin_url = "http://{host}:{port}/api/v1/spans".format(host=app.config["DATAKIT_HOST"],
port=app.config["DATAKIT_PORT"])
# 请求头设置使用 thrift 格式
headers = {"Content-Type": "application/x-thrift"}
# http post 方式上报数据,建议使用 try/except 对上报失败的数据做处理。
r = requests.post(zipkin_url, data=body, headers=headers)
@app.route('/')
def index():
with zipkin_span(
service_name='SERVICE_A', # 服务名
span_name='index', # span 名
transport_handler=http_transport, # 自定义上报函数,http_transport 是我们自定义的上报函数
port=5010, # 本服务的端口
sample_rate=100, # 采样率,设置范围:0.0 - 100.0,我们示例的 100 代表每一个请求都会被追踪。
)as zipkin_context:
# 添加 binary_annotation:自定义信息
zipkin_context.update_binary_annotations({
"key": "jdbc.query",
"value": "select distinct",
}
)
time.sleep(1)
# index span 下建立子 span,只需设置好service_name,span_name,
with zipkin_span(service_name='SERVICE_A', span_name='do_stuff'):
do_stuff()
time.sleep(1)
return 'OK', 200
if __name__ == '__main__':
app.run(host="0.0.0.0", port=5010, debug=True)
SERVICE_B
from flask import request
import requests
from flask import Flask
from py_zipkin.zipkin import zipkin_span,ZipkinAttrs
import time
app = Flask(__name__)
app.config.update({
"DATAKIT_HOST": "10.100.64.205",
"DATAKIT_PORT": "9529",
})
def do_stuff():
time.sleep(2)
return 'OK'
def http_transport(encoded_span):
body=encoded_span
zipkin_url = "http://{host}:{port}/api/v1/spans".format(host=app.config["DATAKIT_HOST"], port=app.config["DATAKIT_PORT"])
headers = {"Content-Type": "application/x-thrift"}
requests.post(zipkin_url, data=body, headers=headers)
print(body)
@app.route('/service1/')
def index():
with zipkin_span(
service_name='SERVICE_B',
zipkin_attrs=ZipkinAttrs(
trace_id=request.headers['X-B3-TraceID'],
span_id=request.headers['X-B3-SpanID'],
parent_span_id=request.headers['X-B3-ParentSpanID'],
flags=request.headers['X-B3-Flags'],
is_sampled=request.headers['X-B3-Sampled'],
),
span_name='index_service1',
transport_handler=http_transport,
port=6000,
sample_rate=100,
):
with zipkin_span(service_name='SERVICE_B', span_name='service1_do_stuff'):
do_stuff()
return 'OK', 200
if __name__=='__main__':
app.run(host="0.0.0.0", port=6000, debug=True)
采集数据
具体请参考 链路数据采集文档中的字段说明
事件
该数据源无关键事件数据
日志
该数据源无日志
FAQ
配置好后为什么在 DataFlux Studio 中看不到数据?
请按照下面的项依次进行检查:
- 确认更改了 DataKit 的相关配置文件后是否重启 DataKit
- 请确认在 DataKit 中配置的上报地址是否正确
- 请确认采集链路数据的网络环境是否能访问 DataKit 的链路数据接收地址
- 开启 DataKit Debug 模式(在 datakit.conf 中将
log_level
设置为debug
,重启),查看日志datakit.log
,如果有链路数据,又产生如下的 debug 日志。