go-zero+opentelemetry+jaeger云原生链路跟踪实践


本文示例程序用到的组件和框架如下:

  • jaeger:云原生可观测链路跟踪工具。关于jaeger部署详见链路跟踪工具Jaeger+OpenTelemetry Collector部署详解
  • go-zero:一个go语言版的集成了各种工程实践的api和rpc微服务框架,本文将使用go-zero的api框架
  • imroc/req:一个开源的go语言轻量级HTTP客户端库
  • opentelemetry:提供了一组API和SDK来标准化遥测数据的采集和传输,相对于skywalking等字节码实现的非侵入式链路跟踪工具来说,opentelemetry是一组可观测领域的标准和规范,基于这个标准提供的各种语言的SDK,通过在代码中打埋点的方式(侵入式),可实现市面上绝大多数语言的链路跟踪功能(比如skywalking、听云等对ktor都不支持,但opentelemetry支持)

go-zero示例程序

以一个封装了蓝鲸CMDB接口的示例工程为例(api工程),工程名叫bkcmdb,目录结构如下:

蓝鲸CMDB提供了HTTP接口,示例工程通过imroc/req库封装了蓝鲸CMDB接口,通过在调用蓝鲸接口的前后打入埋点,实现一次调用的链路跟踪功能。

如下的一次调用,将会在一个链路(即trace)上产生两个节点(即span):
postman --> bkcmdb(go-zero) --> 蓝鲸接口

该链路第一个节点是bkcmdb,第二个节点是蓝鲸接口。默认go-zero是没有开启链路跟踪功能的,下文将说明如何启用。

go-zero启动链路跟踪功能

在正式介绍opentrace基本概念之前,先通过简单的配置启动go-zero的链路跟踪功能看一看效果。go-zero自带了基于opentelemetry的链路追踪功能,默认未开启,在工程的配置文件etc/bkcmdb.yaml中增加如下配置:

Telemetry:
  Name: fiops-bkcmdb
  Endpoint: http://jaeger-collector.xxx.io/api/traces
  Sampler: 1.0
  Batcher: jaeger

只需在配置文件中增加Telemetry配置块即可,无需修改任何代码。其中Name是该链路上的ServiceName,将会在jaeger(或zipkin)上显示为Service。Endpoint是链路跟踪工具(jaeger或zipkin)的collector地址。Batcher这里指定链路跟踪工具为jaeger。

注意:默认配置了Telemetry后,只能在链路中采集到客户端(比如postman)访问bkcmdb服务端这前半条链路,bkcmdb通过imroc/req访问蓝鲸CMDB的后半条链路现在还采集不到。

现在用postman访问bkcmdb

curl --location 'http://<bkcmdb-url>/v1/project/search' \
--header 'Content-Type: application/json' \
--data '{
    "condition": {
        "project": [ ]
    },
    "page": {
        "limit": 100,
        "sort": "-bk_inst_id",
        "start": 0
    }
}'

打开jaeger前端页面,Service搜索fiops-bkcmdb,可以看到出现了一条链路,且链路中只有一个span,这个span就是客户端访问bkcmdb服务端的链路。

点击这条链路进入详情页,展开trace,地址栏url红框内是traceId,右下角红框是spanId

到此示例工程就算成功开启了链路跟踪功能,但这样还不能满足我们的需求,我们还需要bkcmdb访问蓝鲸CMDB的链路。在继续修改代码之前,我们先来了解一下opentracing、opentelemetry的基本概念。

OpenTracing

https://opentracing.io/

分布式追踪为描述和分析跨进程事务、甚至是跨网络调用提供了一种解决方案。早在 2005 年,Google 就在内部部署了一套分布式追踪系统 Dapper,并发表了一篇论文《Dapper, a Large-Scale Distributed Systems Tracing Infrastructure》,阐述了该分布式追踪系统的设计和实现,可以视为分布式追踪领域的鼻祖。后来各家厂商公司受此启发,纷纷开源了自家的链路跟踪产品,如zipkin、Uber的jaeger等。但各家的分布式追踪方案互不兼容,于是就诞生了OpenTracing。OpenTracing 是一个 Library,定义了一套通用的数据上报接口,要求各个分布式追踪系统都来实现这套接口。

数据模型

OpenTracing规范中定义了数据模型,可参考原始文档The OpenTracing Semantic Specification

Causal relationships between Spans in a single Trace
 
         [Span A]  ←←←(the root span)
             |
      +------+------+
      |             |
  [Span B]      [Span C] ←←←(Span C is a `ChildOf` Span A)
      |             |
  [Span D]      +---+-------+
               |           |
           [Span E]    [Span F] >>> [Span G] >>> [Span H]
                                       ↑
                                       ↑
                                       ↑
                         (Span G `FollowsFrom` Span F)

在OpenTracing模型中有三个基本概念:

  • Trace:调用链,一次HTTP调用或者一次RPC调用(从起始到最后终止),一个调用链会分配一个全局唯一的TraceID
  • Span:每个调用链由多个Span组成。Span的含义是范围,可以理解为一个处理阶段(例如一次HTTP调用、一次数据库访问)。Span是一个分布式追踪的最小单位。Span和Span的关系称为Reference。下面是一个Span的结构。
type Span struct {
  ctx           spanContext
  serviceName   string
  operationName string
  startTime     time.Time
  flag          string
  children      int
}

Span会封装一组零个或多个key:value的Tags,用于标识一些想在链路中展示的数据

  • SpanContext:保存链路的上下文信息[TraceID,SpanID,或者是其它想要传递的内容]实现了tracer接口
type spanContext struct {
  traceId string
  spanId  string
}

Span之间的关系

  • ChildOf:两个Span可以组成父子关系
[-Parent Span---------]
         [-Child Span----]

    [-Parent Span--------------]
         [-Child Span A----]
          [-Child Span B----]
        [-Child Span C----]
         [-Child Span D---------------]
         [-Child Span E----]
  • FollowsFrom:某些父Span在任何方面都不依赖子Span的结果
[-Parent Span-]  [-Child Span-]


    [-Parent Span--]
     [-Child Span-]


    [-Parent Span-]
                [-Child Span-]

OpenCensus

https://opencensus.io/

OpenCensus是Google开源的,作为最早提出Tracing概念的公司,OpenCensus也是Google Dapper的社区版本。相对于OpenTracing只支持tracing,OpenCensus不仅支持tracing还支持metrics。OpenTracing只制定规范,OpenCensus不仅制定规范,还包含了Agent和Collector。

OpenTelemetry

https://opentelemetry.io/

为了更好的将Tracing、Metrics、Logging融合在一起,OpenTelemetry 诞生了。作为 CNCF 的孵化项目,OpenTelemetry 由 OpenTracing 和 OpenCensus 项目合并而成,是一组规范、API 接口、SDK、工具和集成。为众多开发人员带来 Metrics、Tracing、Loging 的统一标准,三者都有相同的元数据结构,可以轻松实现互相关联。

OpenTelemetry 与厂商、平台无关,不提供与可观测性相关的后端服务。可根据用户需求将可观测类数据导出到存储、查询、可视化等不同后端,如 Prometheus、Jaeger 、云厂商服务中。

有了基本概念以后,下来看看如何使用imroc/req在发送HTTP的时候加入tracing。

自定义imroc/req中间件实现发送HTTP的trace跟踪

这里我们建一个HttpClient封装一下req。在svc目录下新建httpclient.go,并在里面新建一个结构体:

type HttpClient struct {
    *req.Client
}

定义NewHttpClient函数用于返回一个实例化的HttpClient,在OnBeforeRequest里定义请求发送前的动作,在OnAfterResponse里定义请求返回后的动作。到此为止都是常规操作,还没涉及到任何tracing动作。

// 需要传入go-zero的Config对象
func NewHttpClient(conf config.Config) *HttpClient {
  headers := map[string]string{
    "BK_USER":                   "admin",
    "HTTP_BLUEKING_SUPPLIER_ID": "0",
  }
  c := req.C().
    SetCommonHeaders(headers).SetBaseURL(fmt.Sprintf("http://%s", conf.BkcmdbUrl)).
    OnBeforeRequest(func(client *req.Client, req *req.Request) error {
      if req.RetryAttempt > 0 {
        return nil
      }
      req.EnableDump()
      return nil
    }).
    OnAfterResponse(func(client *req.Client, res *req.Response) (err error) {
      responseCode := strconv.Itoa(res.StatusCode)
      if !strings.HasPrefix(responseCode, "2") && !strings.HasPrefix(responseCode, "3") {
        defer func() {
          if e := recover(); e != nil {
            err = errorx.NewError(res.StatusCode, res.String(), nil)
          }
        }()
        ress := make(map[string]interface{})
        res.UnmarshalJson(&ress)
        err = errorx.NewError(res.StatusCode, ress["message"].(string), ress["data"])
      }
      if res.Err != nil {
        err = errorx.NewError(http.StatusInternalServerError, res.Err.Error(), nil)
      }
      return
    })
  logx.Infof("init req client %s success", c.BaseURL)
  return &HttpClient{
    Client: c,
  }
}

接下来给结构体定义一个SetTracer方法用于设置tracing,这里用到了req的WrapRoundTripFunc用来给req自定义一个中间件,该中间件类似一个拦截器,可在请求发送前和响应后做一些集中处理的工作。

func (c *HttpClient) SetTracer(tracer trace.Tracer) {
  c.WrapRoundTripFunc(func(rt req.RoundTripper) req.RoundTripFunc {
    return func(req *req.Request) (resp *req.Response, err error) {
      // Context().Value(key)可以从context中提取预先设定的值
      spanName, ok := req.Context().Value("SpanName").(string)
      if !ok {
        spanName = req.URL.Path // 如果没有预先设定SpanName,就以请求url作为SpanName
      }
      // 最关键的一步,启动span,并传入上下文context(来自于*req.Request),会返回一个加入了span的context和一个span对象
      _, span := tracer.Start(req.Context(), spanName) 
      defer span.End() // 无论成功与否这里需要用End将span发射出去(发射到jaeger或zipkin)
      // 下面给span添加一些tags
      span.SetAttributes(
        attribute.String("http.url", req.URL.String()),
        attribute.String("http.method", req.Method),
        attribute.String("http.request.header", req.HeaderToString()),
      )
      if len(req.Body) > 0 {
        span.SetAttributes(
          attribute.String("http.request.body", string(req.Body)),
        )
      }
      // 下面这句话将流程交还给req去执行发送请求动作
      resp, err = rt.RoundTrip(req)
      if err != nil { // 失败了记录在span里
        span.RecordError(err)
        span.SetStatus(codes.Error, err.Error())
        return
      }
      // 添加一些响应tags
      span.SetAttributes(
        attribute.Int("http.status_code", resp.StatusCode),
        attribute.String("http.response.header", resp.HeaderToString()),
        attribute.String("http.response.body", resp.String()),
      )
      return
    }
  })
}

封装好了HttpClient,接下来把它放入servicecontext方便被调用,并在servicecontext中实例化它

type ServiceContext struct {
  Config        config.Config
  Client_bkcmdb *HttpClient
}
 
func NewServiceContext(c config.Config) *ServiceContext {
  client := NewHttpClient(c) // 实例化HttpClient
  client.SetTracer(otel.Tracer("imroc/req")) // 设置开启Trace,并用otel.Tracer("imroc/req")设置otel.library
  return &ServiceContext{
    Config:        c,
    Client_bkcmdb: client,
  }
}

接下来封装一个蓝鲸CMDB接口的结构体

type BkcmdbUtil struct {
  logx.Logger
  ctx    context.Context
  svcCtx *ServiceContext
}

func GetBkcmdbUtil(ctx context.Context, svcCtx *ServiceContext) *BkcmdbUtil {
  return &BkcmdbUtil{
    Logger: logx.WithContext(ctx),
    ctx:    ctx,
    svcCtx: svcCtx,
  }
}

然后以一个蓝鲸添加模型实例的接口为例

func (b *BkcmdbUtil) AddInstance(tablename string, v interface{}) (resp *types.BkcmdbRes, err error) {
  // 下面这句话的核心是.Do(context.WithValue(b.ctx, "SpanName", "AddInstance"))
  // 通过调用context.WithValue给ctx设置SpanName,然后就可以在前文定义的中间件中用req.Context().Value("SpanName").(string)把SpanName取出来了
  err = b.svcCtx.Client_bkcmdb.Post(fmt.Sprintf("/api/v3/create/instance/object/%s", tablename)).SetBody(v).SetResult(&resp).Do(context.WithValue(b.ctx, "SpanName", "AddInstance")).Err
  if err != nil {
    b.Logger.Error(err)
    return
  }
  if resp.Code != 0 {
    b.Logger.Errorf("add %s instance failed: %v", tablename, resp.Message)
    // errorx是自定义的错误处理类
    err = errorx.NewDefaultError(fmt.Sprintf("add %s instance failed: %v", tablename, resp.Message))
  }
  return // 返回值types.BkcmdbRes是自定义的返回结构体
}

示例代码写到这就可以了,接下来再用postman发起一次调用,打开jaeger页面,会发现一个链路里现在出现了两个span,其中一个是postman访问go-zero接口的span,第二个就是go-zero通过req访问蓝鲸CMDB的span。

其它通用操作的trace跟踪

有了如上示例,我们其实可以在任意操作前后打埋点(开启一个span),比如访问数据库、MongoDB、Redis、Elasticsearch、调用其它平台SDK(比如kubernetes client-go)等等,下来以Elasticsearch为例说明。

封装一个EsUtil用来操作elasticsearch

type EsUtil struct {
  logx.Logger
  ctx    context.Context
  svcCtx *ServiceContext
  tracer trace.Tracer
}

func GetEsUtil(ctx context.Context, svcCtx *ServiceContext) *EsUtil {
  return &EsUtil{
    Logger: logx.WithContext(ctx),
    ctx:    ctx,
    svcCtx: svcCtx,
    tracer: otel.Tracer("go-elasticsearch"),
  }
}

在发起一个ES操作(例如插入一个文档)的前后打入埋点startTracerendTracer

func (es *EsUtil) EsIndex(index string, body interface{}, id string, ctx context.Context) (*types.EsAddResponse, error) {
  b, _ := json.Marshal(body)
  es.Logger.Debugf("es index request=%v", string(b))
  span := es.newSpan(ctx, index) // 创建一个span
  defer span.End() // 用End把span发射出去
  es.startTracer(index, id, "index", b, span, ctx) // 请求前开启span
  res, err := es.svcCtx.Esclient.Index(
    index,
    bytes.NewReader(b),
    es.svcCtx.Esclient.Index.WithDocumentID(id),
    es.svcCtx.Esclient.Index.WithContext(ctx),
  )
  defer res.Body.Close()
  if err != nil {
    return nil, err
  }
  if res.IsError() {
    err = errorx.NewDefaultError(fmt.Sprintf("[%s] error indexing document ID=%v", res.Status(), id))
    es.endTracer(err, res.StatusCode, nil, span) // 出错后结束span并写入错误信息
    return nil, err
  } else {
    var r types.EsAddResponse // 自定义了es返回结构体
    json.NewDecoder(res.Body).Decode(&r)
    b, _ = json.Marshal(r)
    es.Logger.Debugf("es index response=%v", string(b))
    es.endTracer(nil, res.StatusCode, b, span) // 正常返回结束span并写入返回值
    return &r, nil
  }
}

func (es *EsUtil) newSpan(ctx context.Context, name string) (span trace.Span) {
  spanName, ok := ctx.Value("SpanName").(string)
  if !ok {
      spanName = name
  }
  _, span = es.tracer.Start(ctx, spanName)
  return
}

func (es *EsUtil) startTracer(index, id, action string, body []byte, span trace.Span, ctx context.Context) {
  span.SetAttributes(
    attribute.String("elastic._index", index),
    attribute.String("elastic._id", id),
    attribute.String("elastic.action", action),
    attribute.String("elastic.client.version", elasticsearch.Version),
  )
  if len(body) > 0 {
    span.SetAttributes(
        attribute.String("elastic.request.body", string(body)),
    )
  }
}

func (es *EsUtil) endTracer(err error, status_code int, body []byte, span trace.Span) {
  if err != nil {
    span.RecordError(err)
    span.SetStatus(codes.Error, err.Error())
    return
  }
  span.SetAttributes(
    attribute.Int("elastic.status", status_code),
    attribute.String("elastic.response.body", string(body)),
  )
}

到此,从go-zero的服务A接收到一个请求,到由当前服务A发送出去的第一跳操作(HTTP、Elasticsearch、数据库等)都可以在jaerger里看到trace了,大概长这样:

microserviceA
  |
  |--> HTTP(蓝鲸CMDB)
  |--> Elasticsearch
  |--> PostgreSQL
  |--> HTTP(蓝鲸CMDB)
  |--> HTTP(蓝鲸CMDB)
  | ……

此时我们发现,跨服务(A服务调用B服务)现在还是没有链路,单独看服务A的链路也有,服务B的链路也有,但AB没连起来。原因在于我们在链路传递参数的过程中,并没有把traceId、spanId传递到下一跳。自然下一跳和上一跳就没法连接起来了。

改造代码之前还是先看原理。

Opentelemetry链路传递核心原理

先看一张在HTTP通信下,trace如何进行链路传递参数

链路传递数据时用到一个载体叫做propagators,什么是 Propagator?OpenTelemetry 是这样定义的

Cross-cutting concerns send their state to the next process using Propagators, which are defined as objects used to read and write context data to and from messages exchanged by the applications.

大概意思就是:从系统的横向切入面看,通过定义一个可读可写的context对象,在应用间进行状态的传递。

这里还有一个Baggage的概念,但是本文暂未涉及到,因此不再赘述,示例代码只需要propagator就可以满足需求了

Propagator注入trace实现链路信息传递

服务A通过imroc/req调用服务B(HTTP调用),需要在服务A端HTTP发起之前给propagator注入载体(carrier),并写入HTTP headers,如下代码

func (c *HttpClient) roundTripFunc(rt req.RoundTripper) req.RoundTripFunc {
  return func(req *req.Request) (resp *req.Response, err error) {
    // 省略……
    spanCtx, span := c.Tracer.Start(req.Context(), spanName) // 拿到关键的spanCtx
    defer span.End()
    // 最关键的一句话,把spanCtx里的trace信息inject到载体propagation.HeaderCarrier里,并通过HTTP headers传输
    otel.GetTextMapPropagator().Inject(spanCtx, propagation.HeaderCarrier(req.Headers))
    span.SetAttributes(
      // 省略……
    )
  }
}

然后在服务B端,从HTTP headers中提取trace信息到载体(carrier),并放入context里,这样就完成了应用之间的链路传递。这部分功能go-zero已经默认实现了,当开启了Telemetry后,go-zero会通过内置的trace中间件从载体中提取trace信息,这部分源码如下(https://github.com/zeromicro/go-zero/blob/v1.5.2/rest/handler/tracehandler.go):

// TraceHandler return a middleware that process the opentelemetry.
func TraceHandler(serviceName, path string, opts ...TraceOption) func(http.Handler) http.Handler {
  var options traceOptions
  for _, opt := range opts {
    opt(&options)
  }

  ignorePaths := collection.NewSet()
  ignorePaths.AddStr(options.traceIgnorePaths...)

  return func(next http.Handler) http.Handler {
    tracer := otel.Tracer(trace.TraceName)
    propagator := otel.GetTextMapPropagator()

    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
      spanName := path
      if len(spanName) == 0 {
        spanName = r.URL.Path
      }

      if ignorePaths.Contains(spanName) {
        next.ServeHTTP(w, r)
        return
      }

      ctx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header))
      spanCtx, span := tracer.Start(
        ctx,
        spanName,
        oteltrace.WithSpanKind(oteltrace.SpanKindServer),
        oteltrace.WithAttributes(semconv.HTTPServerAttributesFromHTTPRequest(
            serviceName, spanName, r)...),
      )
      defer span.End()

      // convenient for tracking error messages
      propagator.Inject(spanCtx, propagation.HeaderCarrier(w.Header()))

      trw := &response.WithCodeResponseWriter{Writer: w, Code: http.StatusOK}
      next.ServeHTTP(trw, r.WithContext(spanCtx))

      span.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(trw.Code)...)
      span.SetStatus(semconv.SpanStatusFromHTTPStatusCodeAndSpanKind(
        trw.Code, oteltrace.SpanKindServer))
    })
  }
}

现在打开jaeger页面,可以看到两个服务之间的链路已经连起来了

展开GetProjects,可以看到注入的header长这样


文章作者: 洪宇轩
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 洪宇轩 !
评论
 上一篇
K8S二次开发系列之一:进阶玩家该如何扩展自己的K8S K8S二次开发系列之一:进阶玩家该如何扩展自己的K8S
本文介绍了K8S进阶玩家可以从哪些地方入手去扩展自己的K8S集群。
2023-07-06
下一篇 
云原生链路跟踪工具Jaeger+OpenTelemetry Collector部署详解 云原生链路跟踪工具Jaeger+OpenTelemetry Collector部署详解
提到链路跟踪,或者叫全链路监控,或者叫APM(Application Performance Management),具体含义和原理不赘述,开源方案有skywalking、zipkin、elasticapm等工具,商业产品有基调听云等等,但在云原生领域,也有一个CNCF已毕业项目jaeger同样发展迅速。
2023-04-04
  目录