search
数据采集 采集源配置 opentrace 数据采集

opentrace 数据采集

简介

目前有很多厂家实现各自的代码追踪技术来达到代码流程可视化管理,方便流程优化,性能瓶颈分析以及排除故障。成熟的技术有:Google 有 Dapper 和 StackDriver, Twitter 的 Zipkin ,淘宝的鹰眼,Uber 的 jaeger 等。由于上述各个厂家在技术细节的实现上各有特点,若用户代码追踪从一种技术实现转为另一种,可能伴随着软件产品结构的调整,带来较大的工作量和风险。为此,Opentrace 提出了一种接口标准或者是规范,它规定了多种编码语言(go,Python,Java,C++等)实现 trace 的接口规范。只要每个实现 trace 技术的厂家都遵守这个规范,软件开发者用户从一个技术实现切换到另一个,并不会带来额外的工作量,只需在初始化部分进行少量配置修改即可。Opentrace 的 GitHub 地址为https://github.com/opentracing。

Opentrace术语

Trace:一次完整的追踪,一个 trace 可以由一个或多个 span组成;
Span:调用或者跨度,可表示函数调用,RPC,数据库读写等,只要有时间维度都可以用一个span来表示,一个 span 通常有如下信息组成;

  • name,Span 的名字;
  • 起止时间,Span的开始与结束时间戳;
  • tag,存储键值对数据;
  • log,包含有时间戳键值对数据;
  • reference,Span之间关系,目前仅支持 ChildOfRef 与FollowsFromRef ;
  • spancontext,span 上下文信息,包含 traceid 与 baggage 等。向 baggage 中添加信息会传播到其所有后代 span 中,会增加负载开销。

Jaeger

jaeger 是 Uber 实现的代码追踪技术,其符合 open-trace 规范,基于 Jaeger 实现流程追踪的效果图如图1所示,可以得到流程每一步调用的耗时分析,快速分析性能瓶颈点,有助于流程优化。

上图的demo示例采用All IN ONE部署,即采用单机方式,数据收集器,存储,查询等都部署到一个机器上,可以通过 docker 方式实现,方便个人用户快速熟悉 jaeger 特性及使用。容器部署的方式命令为:docker run -d --name jaeger -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 -p 5775:5775/udp -p 6831:6831/udp -p 6832:6832/udp -p 5778:5778 -p 16686:16686 -p 14268:14268 -p 9411:9411 jaegertracing/all-in-one:1.15,部署完后通过访问http://127.0.0.1:16686/查询trace信息。Jaeger结构信息如下图2所示。

Application 即应用代码程序;jaeger-client 为注入应用程序中符合 open-trace 规范的代码,其负责收集 span 信息,并把 span 信息通过 UDP 或者 HTTP 方式发送给 jaeger-agent;jaeger-agent 定期批量把数据发送给 jaeger-collector;jaeger-collector 对数据整理并写入 DB;UI 与 jaeger-query 提供基于HTTP 访问查询服务。前面 All IN ONE 部署只启动 jaeger-agent,jaeger-collector,DB,Spark-jobs,jaeger-query与UI,并不包含Application与jaeger-client,所以要查看 trace 信息还需要部署具体 Application 应用服务。

Opentrace Demo

Demo 是使用 opentrace 的 go 语言实现的,其它语言版本没有深入研究过,大致类似。另外把 jaeger 中 jaeger-client 默认以 UDP 方式发送给 jaeger-agent 的 span 数据改为 HTTP 方式发向 dataway。并且,jaeger-client 发出的数据是 Thrift 格式。

Demo场景

demo 代码主要描述跨应用使用 opentrace。 client_app.go 中创建一个应用,此应用发送一个 HTTP 请求到http://127.0.0.1:12345/x,在这个应用中创建一个span,名字为 clientspan,并且把 trace/span 信息注入到 HTTP 的 Header 中。在 server.go 中创建一个 HTTP 服务器,此服务器有两个 api,分别是:http://127.0.0.1:12345/xhttp://127.0.0.1:12345/y。/x用于接收 client_app 的请求,在这个请求处理中创建一个 testspan ,并且请求流程有两个子流程 auth 和 data 组成,在子流程分别创建 authspan 与 dataspan。可以看出,这个 tracer 横跨两个应用:client_app 与server,由4个 span 组成:clientspan,testspan,authspan与dataspan,其中 clientspan 位于 client_app 应用中,testspan,authspan 与 dataspan 位于 server 应用中。这4个span 关系是:authspan 与dataspan 的父 span 是 testspan,而testspan 是 clientspan 的子 span。另外 demo 代码基本涵盖open-trace 接口常用使用,包括 tracer,span 的创建,baggage 在所有 span 间的传递,log 与tag 数据设置等。/y用于接收4个 span 具体追踪信息,并把 span 信息打印出来,方便查看 tracer 与 span 间关系

基本使用

  1. 运行server.go
  2. 运行client_app.go

可以看到上述4个 span tracer/span/baggage等信息打印输出。

JaegerUI使用

  1. 把 server.go 与 client_app.go 中如下代码注释掉,即 span信息不发送到http://127.0.0.1:12345/y,改为默认以 UDP6831 端口发送的 Jeager Collector ;
    reportCfg := new(config.ReporterConfig)
    reportCfg.CollectorEndpoint = "http://127.0.0.1:12345/y"
    cfg.Reporter                = reportCfg
    
  2. 启动Jeager Collector,JeagerUI等;
    docker run -d --name jaeger   -e COLLECTOR_ZIPKIN_HTTP_PORT=9411  -p 5775:5775/udp   -p 6831:6831/udp  -p 6832:6832/udp  -p 5778:5778  -p 16686:16686   -p 14268:14268  -p 9411:9411   jaegertracing/all-in-one:1.15
    
  3. 运行 server.go
  4. 运行 client_app.go
  5. 浏览器访问http://127.0.0.1:16686/即可查看上面4个 span 关系,log/tag 数据,调用耗时等。

示例代码

server.go 代码如下:

package main

import (
    "fmt"
    "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/log"
    "github.com/uber/jaeger-client-go/config"
    "github.com/uber/jaeger-client-go/thrift"
    j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
    "io/ioutil"
    "net/http"
    "time"
)


func auth(s opentracing.Span) {
    //创建authspan,并且为testspan的子span
    authspan := opentracing.StartSpan("auth span", opentracing.ChildOf(s.Context()))

    //authspan结束后发送其追踪信息到http://127.0.0.1:12345/y
    defer authspan.Finish()

    //为testspan设置log信息
    authspan.LogFields(
        log.String("event", "start auth"),
        log.String("type", "name/password"),
        log.Int("ID", 1500))

    //获取baggage数据,应包括clientspan中clientSpanBaggage数据与testspan中testSpanBaggage数据
    fmt.Printf("authSpan baggage: %v\n", authspan.BaggageItem("clientspanbaggage"))
    fmt.Printf("dataSpan baggage: %v\n", authspan.BaggageItem("testSpanBaggage"))

    //打印authspan上下文信息
    fmt.Printf("dataSpan context: %#v\n", authspan.Context())

    //模拟操作耗时
    time.Sleep(100*time.Millisecond)
}
func data(s opentracing.Span) {
    //创建dataspan,并且为testspan的子span
    dataspan := opentracing.StartSpan("data span", opentracing.ChildOf(s.Context()))

    //authspan结束后发送其追踪信息到http://127.0.0.1:12345/y
    defer dataspan.Finish()

    //获取baggage数据,应包括clientspan中clientSpanBaggage数据与testspan中testSpanBaggage数据
    fmt.Printf("dataSpan baggage: %v\n", dataspan.BaggageItem("clientspanbaggage"))
    fmt.Printf("dataSpan baggage: %v\n", dataspan.BaggageItem("testSpanBaggage"))

    //为dataspan设置log信息
    dataspan.LogKV("action", "Process")

    //为dataspan设置baggage信息
    dataspan.SetBaggageItem("dataSpanBaggage", "dataSpanBaggageValue")

    //打印dataspan设置baggage信息
    fmt.Printf("dataSpan baggage: %v\n", dataspan.BaggageItem("dataSpanBaggage"))

    //打印dataspan上下文信息
    fmt.Printf("dataSpan context: %#v\n", dataspan.Context())

    //模拟操作耗时
    time.Sleep(100*time.Millisecond)
}
func Indexhandler(w http.ResponseWriter,r *http.Request)  {
    //获取tracer,即通过opentracing.SetGlobalTracer(tracer)设置的tracer
    tracer  := opentracing.GlobalTracer()

    //获取clientspan的context上下文载体
    carrier := opentracing.HTTPHeadersCarrier(r.Header)

    //提取clientspan的context上下文
    clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier)

    if err != nil {
        fmt.Printf("Extract err: %v\n", err)
    }

    //创建TestSpan,并且为clientspan的子span
    testSpan := opentracing.StartSpan("TestSpan", opentracing.ChildOf(clientContext))

    //TestSpan结束后发送其追踪信息到http://127.0.0.1:12345/y
    defer testSpan.Finish()

    //为testspan设置baggage信息
    testSpan.SetBaggageItem("testSpanBaggage", "testSpanBaggageValue")

    //模拟请求验证
    auth(testSpan)

    //模拟数据访问
    data(testSpan)

    //HTTP响应
    fmt.Fprintln(w,"hello world")
}

func Posthandler(w http.ResponseWriter,r *http.Request)  {
    //读取请求body
    body, err := ioutil.ReadAll(r.Body)
    if err != nil {
        return
    }

    //span thrift格式数据解析
    buffer := thrift.NewTMemoryBuffer()
    if _, err = buffer.Write(body); err != nil {
        return
    }
    transport := thrift.NewTBinaryProtocolTransport(buffer)
    batch := &j.Batch{}
    if err = batch.Read(transport); err != nil {
        return
    }

    //打印span具体信息
    for _ , v :=range batch.Spans {
        fmt.Printf("%#v\n", v)
        fmt.Printf("%#v\n", v.References)
        fmt.Printf("%#v\n", v.Logs)
        fmt.Printf("%#v\n", v.Tags)
        fmt.Printf("-----------------------------------------------\n")
    }

    //返回HTTP响应
    fmt.Fprintln(w,"hello world")
}


func main() {
    //获取缺省配置
    cfg,err:= config.FromEnv()
    if err!=nil {
        fmt.Println(err)
    }
    //设置采样率
    cfg.Sampler=&config.SamplerConfig{
        Type:  "const",// 使用const采样器
        Param: 1,      // 采样所有追踪
    }
    // 设置服务名
    cfg.ServiceName = "AAA"

    //设置agent HTTP地址,可以是dataway的地址,此处示例把数据打到本地SERVER,即http://127.0.0.1:12345/y,
    //若下面三行注释,则默认Jaeger以UDP方式向AGENT发送数据
    reportCfg := new(config.ReporterConfig)
    reportCfg.CollectorEndpoint = "http://10.100.64.106:19528/v1/write/metrics/short"
    cfg.Reporter                = reportCfg
    // 创建Tracer
    tracer, closer, err := cfg.NewTracer()
    if err != nil {
        fmt.Println(err)
    }
    defer closer.Close()
    //设置全局tracer
    opentracing.SetGlobalTracer(tracer)

    //设置HTTP服务API路由
    //client 应用发送请求到/x,请求头中带有spancontext信息
    http.HandleFunc("/x",Indexhandler)

    //所有span追踪信息发送到/y,此API打印出每个span的详细信息,可以看出span之间的关系
    http.HandleFunc("/y",Posthandler)

    http.ListenAndServe("127.0.0.1:12345",nil)
}

client_app.go 代码如下:

package main

import (
    "fmt"
    "github.com/opentracing/opentracing-go"
    "github.com/uber/jaeger-client-go/config"
    "math/rand"
    "net/http"
    "time"
)

var test_cnt int = 1000
func main(){
    //获取缺省配置
    cfg,err:= config.FromEnv()
    if err!=nil {
        fmt.Println(err)
    }

    //设置采样率
    cfg.Sampler=&config.SamplerConfig{
        Type:  "const",// 使用const采样器
        Param: 1,      // 采样所有追踪
    }

    // 设置服务名
    cfg.ServiceName = "client demo"
    //设置agent HTTP地址,可以是dataway的地址,此处示例把数据打到本地SERVER,即http://127.0.0.1:12345/y,
    //若下面三行注释,则默认Jaeger以UDP方式向AGENT发送数据
    reportCfg := new(config.ReporterConfig)
    reportCfg.CollectorEndpoint = "http://10.100.64.106:19528/v1/write/metrics/short"
    cfg.Reporter                = reportCfg
    // 创建Tracer
    tracer, closer, err := cfg.NewTracer()
    if err != nil {
        fmt.Printf("Create Trace err: %v\n", err)
        return
    }
    defer closer.Close()
    //设置全局tracer
    opentracing.SetGlobalTracer(tracer)

    for i:=0; i < test_cnt; i++ {
        app_process()
    }
}

func app_process() {

    //创建一个span,若创建span时候没有指定父span,则此spanid即为traceid
    clientSpan := opentracing.StartSpan("ClientSpan")
    //向agent发送追踪数据
    defer clientSpan.Finish()

    //随机延时一段时间
    t := time.Duration(rand.Intn(100))
    time.Sleep((100+t)*time.Millisecond)

    //创建一个HTTP客户端
    client := &http.Client{
    }

    //创建一个HTTP请求
    req, err := http.NewRequest("GET", "http://127.0.0.1:12345/x", nil)
    if err != nil {
        fmt.Printf("Request err: %v\n", err)
        return
    }

    //当前span设置一个tag
    clientSpan.SetTag("clientSpanTag", "clientSpanValue")
    clientSpan.SetBaggageItem("clientSpanBaggage", "clientSpanBaggageValue")

    //取得请求头
    carrier := opentracing.HTTPHeadersCarrier(req.Header)
    //把当前span的context上下文写入HTTP请求头,即写入traceid,baggage等信息
    clientSpan.Tracer().Inject(clientSpan.Context(), opentracing.HTTPHeaders, carrier)

    //打印HTTP请求
    fmt.Printf("Request: %v\n", req)

    //发送请求
    resp, err := client.Do(req)
    if err != nil {
        fmt.Printf("Get response err: %v\n", err)
        return
    }

    //打印HTTP响应
    fmt.Printf("Response: %v\n", resp)
}