本文示例程序用到的组件和框架如下:
- jaeger:云原生可观测链路跟踪工具。关于jaeger部署详见链路跟踪工具Jaeger+OpenTelemetry Collector部署详解
- go-zero:一个go语言版的集成了各种工程实践的api和rpc微服务框架,本文将使用go-zero的api框架
- imroc/req:一个开源的go语言轻量级HTTP客户端库
- opentelemetry:提供了一组API和SDK来标准化遥测数据的采集和传输,相对于skywalking等字节码实现的非侵入式链路跟踪工具来说,opentelemetry是一组可观测领域的标准和规范,基于这个标准提供的各种语言的SDK,通过在代码中打埋点的方式(侵入式),可实现市面上绝大多数语言的链路跟踪功能(比如skywalking、听云等对ktor都不支持,但opentelemetry支持)
go-zero示例程序
以一个封装了蓝鲸CMDB接口的示例工程为例(api工程),工程名叫bkcmdb,目录结构如下:
蓝鲸CMDB提供了HTTP接口,示例工程通过imroc/req库封装了蓝鲸CMDB接口,通过在调用蓝鲸接口的前后打入埋点,实现一次调用的链路跟踪功能。
如下的一次调用,将会在一个链路(即trace)上产生两个节点(即span):
postman --> bkcmdb(go-zero) --> 蓝鲸接口
该链路第一个节点是bkcmdb,第二个节点是蓝鲸接口。默认go-zero是没有开启链路跟踪功能的,下文将说明如何启用。
go-zero启动链路跟踪功能
在正式介绍opentrace基本概念之前,先通过简单的配置启动go-zero的链路跟踪功能看一看效果。go-zero自带了基于opentelemetry的链路追踪功能,默认未开启,在工程的配置文件etc/bkcmdb.yaml中增加如下配置:
Telemetry:
Name: fiops-bkcmdb
Endpoint: http://jaeger-collector.xxx.io/api/traces
Sampler: 1.0
Batcher: jaeger
只需在配置文件中增加Telemetry
配置块即可,无需修改任何代码。其中Name
是该链路上的ServiceName,将会在jaeger(或zipkin)上显示为Service。Endpoint是链路跟踪工具(jaeger或zipkin)的collector地址。Batcher
这里指定链路跟踪工具为jaeger。
现在用postman访问bkcmdb
curl --location 'http://<bkcmdb-url>/v1/project/search' \
--header 'Content-Type: application/json' \
--data '{
"condition": {
"project": [ ]
},
"page": {
"limit": 100,
"sort": "-bk_inst_id",
"start": 0
}
}'
打开jaeger前端页面,Service搜索fiops-bkcmdb,可以看到出现了一条链路,且链路中只有一个span,这个span就是客户端访问bkcmdb服务端的链路。
点击这条链路进入详情页,展开trace,地址栏url红框内是traceId,右下角红框是spanId
到此示例工程就算成功开启了链路跟踪功能,但这样还不能满足我们的需求,我们还需要bkcmdb访问蓝鲸CMDB的链路。在继续修改代码之前,我们先来了解一下opentracing、opentelemetry的基本概念。
OpenTracing
分布式追踪为描述和分析跨进程事务、甚至是跨网络调用提供了一种解决方案。早在 2005 年,Google 就在内部部署了一套分布式追踪系统 Dapper,并发表了一篇论文《Dapper, a Large-Scale Distributed Systems Tracing Infrastructure》,阐述了该分布式追踪系统的设计和实现,可以视为分布式追踪领域的鼻祖。后来各家厂商公司受此启发,纷纷开源了自家的链路跟踪产品,如zipkin、Uber的jaeger等。但各家的分布式追踪方案互不兼容,于是就诞生了OpenTracing。OpenTracing 是一个 Library,定义了一套通用的数据上报接口,要求各个分布式追踪系统都来实现这套接口。
数据模型
OpenTracing规范中定义了数据模型,可参考原始文档The OpenTracing Semantic Specification
Causal relationships between Spans in a single Trace
[Span A] ←←←(the root span)
|
+------+------+
| |
[Span B] [Span C] ←←←(Span C is a `ChildOf` Span A)
| |
[Span D] +---+-------+
| |
[Span E] [Span F] >>> [Span G] >>> [Span H]
↑
↑
↑
(Span G `FollowsFrom` Span F)
在OpenTracing模型中有三个基本概念:
- Trace:调用链,一次HTTP调用或者一次RPC调用(从起始到最后终止),一个调用链会分配一个全局唯一的TraceID
- Span:每个调用链由多个Span组成。Span的含义是范围,可以理解为一个处理阶段(例如一次HTTP调用、一次数据库访问)。Span是一个分布式追踪的最小单位。Span和Span的关系称为Reference。下面是一个Span的结构。
type Span struct {
ctx spanContext
serviceName string
operationName string
startTime time.Time
flag string
children int
}
Span会封装一组零个或多个key:value的Tags,用于标识一些想在链路中展示的数据
- SpanContext:保存链路的上下文信息[TraceID,SpanID,或者是其它想要传递的内容]实现了tracer接口
type spanContext struct {
traceId string
spanId string
}
Span之间的关系
- ChildOf:两个Span可以组成父子关系
[-Parent Span---------]
[-Child Span----]
[-Parent Span--------------]
[-Child Span A----]
[-Child Span B----]
[-Child Span C----]
[-Child Span D---------------]
[-Child Span E----]
- FollowsFrom:某些父Span在任何方面都不依赖子Span的结果
[-Parent Span-] [-Child Span-]
[-Parent Span--]
[-Child Span-]
[-Parent Span-]
[-Child Span-]
OpenCensus
OpenCensus是Google开源的,作为最早提出Tracing概念的公司,OpenCensus也是Google Dapper的社区版本。相对于OpenTracing只支持tracing,OpenCensus不仅支持tracing还支持metrics。OpenTracing只制定规范,OpenCensus不仅制定规范,还包含了Agent和Collector。
OpenTelemetry
为了更好的将Tracing、Metrics、Logging融合在一起,OpenTelemetry 诞生了。作为 CNCF 的孵化项目,OpenTelemetry 由 OpenTracing 和 OpenCensus 项目合并而成,是一组规范、API 接口、SDK、工具和集成。为众多开发人员带来 Metrics、Tracing、Loging 的统一标准,三者都有相同的元数据结构,可以轻松实现互相关联。
OpenTelemetry 与厂商、平台无关,不提供与可观测性相关的后端服务。可根据用户需求将可观测类数据导出到存储、查询、可视化等不同后端,如 Prometheus、Jaeger 、云厂商服务中。
有了基本概念以后,下来看看如何使用imroc/req在发送HTTP的时候加入tracing。
自定义imroc/req中间件实现发送HTTP的trace跟踪
这里我们建一个HttpClient封装一下req。在svc目录下新建httpclient.go,并在里面新建一个结构体:
type HttpClient struct {
*req.Client
}
定义NewHttpClient函数用于返回一个实例化的HttpClient,在OnBeforeRequest里定义请求发送前的动作,在OnAfterResponse里定义请求返回后的动作。到此为止都是常规操作,还没涉及到任何tracing动作。
// 需要传入go-zero的Config对象
func NewHttpClient(conf config.Config) *HttpClient {
headers := map[string]string{
"BK_USER": "admin",
"HTTP_BLUEKING_SUPPLIER_ID": "0",
}
c := req.C().
SetCommonHeaders(headers).SetBaseURL(fmt.Sprintf("http://%s", conf.BkcmdbUrl)).
OnBeforeRequest(func(client *req.Client, req *req.Request) error {
if req.RetryAttempt > 0 {
return nil
}
req.EnableDump()
return nil
}).
OnAfterResponse(func(client *req.Client, res *req.Response) (err error) {
responseCode := strconv.Itoa(res.StatusCode)
if !strings.HasPrefix(responseCode, "2") && !strings.HasPrefix(responseCode, "3") {
defer func() {
if e := recover(); e != nil {
err = errorx.NewError(res.StatusCode, res.String(), nil)
}
}()
ress := make(map[string]interface{})
res.UnmarshalJson(&ress)
err = errorx.NewError(res.StatusCode, ress["message"].(string), ress["data"])
}
if res.Err != nil {
err = errorx.NewError(http.StatusInternalServerError, res.Err.Error(), nil)
}
return
})
logx.Infof("init req client %s success", c.BaseURL)
return &HttpClient{
Client: c,
}
}
接下来给结构体定义一个SetTracer
方法用于设置tracing,这里用到了req的WrapRoundTripFunc
用来给req自定义一个中间件,该中间件类似一个拦截器,可在请求发送前和响应后做一些集中处理的工作。
func (c *HttpClient) SetTracer(tracer trace.Tracer) {
c.WrapRoundTripFunc(func(rt req.RoundTripper) req.RoundTripFunc {
return func(req *req.Request) (resp *req.Response, err error) {
// Context().Value(key)可以从context中提取预先设定的值
spanName, ok := req.Context().Value("SpanName").(string)
if !ok {
spanName = req.URL.Path // 如果没有预先设定SpanName,就以请求url作为SpanName
}
// 最关键的一步,启动span,并传入上下文context(来自于*req.Request),会返回一个加入了span的context和一个span对象
_, span := tracer.Start(req.Context(), spanName)
defer span.End() // 无论成功与否这里需要用End将span发射出去(发射到jaeger或zipkin)
// 下面给span添加一些tags
span.SetAttributes(
attribute.String("http.url", req.URL.String()),
attribute.String("http.method", req.Method),
attribute.String("http.request.header", req.HeaderToString()),
)
if len(req.Body) > 0 {
span.SetAttributes(
attribute.String("http.request.body", string(req.Body)),
)
}
// 下面这句话将流程交还给req去执行发送请求动作
resp, err = rt.RoundTrip(req)
if err != nil { // 失败了记录在span里
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return
}
// 添加一些响应tags
span.SetAttributes(
attribute.Int("http.status_code", resp.StatusCode),
attribute.String("http.response.header", resp.HeaderToString()),
attribute.String("http.response.body", resp.String()),
)
return
}
})
}
封装好了HttpClient,接下来把它放入servicecontext方便被调用,并在servicecontext中实例化它
type ServiceContext struct {
Config config.Config
Client_bkcmdb *HttpClient
}
func NewServiceContext(c config.Config) *ServiceContext {
client := NewHttpClient(c) // 实例化HttpClient
client.SetTracer(otel.Tracer("imroc/req")) // 设置开启Trace,并用otel.Tracer("imroc/req")设置otel.library
return &ServiceContext{
Config: c,
Client_bkcmdb: client,
}
}
接下来封装一个蓝鲸CMDB接口的结构体
type BkcmdbUtil struct {
logx.Logger
ctx context.Context
svcCtx *ServiceContext
}
func GetBkcmdbUtil(ctx context.Context, svcCtx *ServiceContext) *BkcmdbUtil {
return &BkcmdbUtil{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
然后以一个蓝鲸添加模型实例的接口为例
func (b *BkcmdbUtil) AddInstance(tablename string, v interface{}) (resp *types.BkcmdbRes, err error) {
// 下面这句话的核心是.Do(context.WithValue(b.ctx, "SpanName", "AddInstance"))
// 通过调用context.WithValue给ctx设置SpanName,然后就可以在前文定义的中间件中用req.Context().Value("SpanName").(string)把SpanName取出来了
err = b.svcCtx.Client_bkcmdb.Post(fmt.Sprintf("/api/v3/create/instance/object/%s", tablename)).SetBody(v).SetResult(&resp).Do(context.WithValue(b.ctx, "SpanName", "AddInstance")).Err
if err != nil {
b.Logger.Error(err)
return
}
if resp.Code != 0 {
b.Logger.Errorf("add %s instance failed: %v", tablename, resp.Message)
// errorx是自定义的错误处理类
err = errorx.NewDefaultError(fmt.Sprintf("add %s instance failed: %v", tablename, resp.Message))
}
return // 返回值types.BkcmdbRes是自定义的返回结构体
}
示例代码写到这就可以了,接下来再用postman发起一次调用,打开jaeger页面,会发现一个链路里现在出现了两个span,其中一个是postman访问go-zero接口的span,第二个就是go-zero通过req访问蓝鲸CMDB的span。
其它通用操作的trace跟踪
有了如上示例,我们其实可以在任意操作前后打埋点(开启一个span),比如访问数据库、MongoDB、Redis、Elasticsearch、调用其它平台SDK(比如kubernetes client-go)等等,下来以Elasticsearch为例说明。
封装一个EsUtil
用来操作elasticsearch
type EsUtil struct {
logx.Logger
ctx context.Context
svcCtx *ServiceContext
tracer trace.Tracer
}
func GetEsUtil(ctx context.Context, svcCtx *ServiceContext) *EsUtil {
return &EsUtil{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
tracer: otel.Tracer("go-elasticsearch"),
}
}
在发起一个ES操作(例如插入一个文档)的前后打入埋点startTracer
、endTracer
func (es *EsUtil) EsIndex(index string, body interface{}, id string, ctx context.Context) (*types.EsAddResponse, error) {
b, _ := json.Marshal(body)
es.Logger.Debugf("es index request=%v", string(b))
span := es.newSpan(ctx, index) // 创建一个span
defer span.End() // 用End把span发射出去
es.startTracer(index, id, "index", b, span, ctx) // 请求前开启span
res, err := es.svcCtx.Esclient.Index(
index,
bytes.NewReader(b),
es.svcCtx.Esclient.Index.WithDocumentID(id),
es.svcCtx.Esclient.Index.WithContext(ctx),
)
defer res.Body.Close()
if err != nil {
return nil, err
}
if res.IsError() {
err = errorx.NewDefaultError(fmt.Sprintf("[%s] error indexing document ID=%v", res.Status(), id))
es.endTracer(err, res.StatusCode, nil, span) // 出错后结束span并写入错误信息
return nil, err
} else {
var r types.EsAddResponse // 自定义了es返回结构体
json.NewDecoder(res.Body).Decode(&r)
b, _ = json.Marshal(r)
es.Logger.Debugf("es index response=%v", string(b))
es.endTracer(nil, res.StatusCode, b, span) // 正常返回结束span并写入返回值
return &r, nil
}
}
func (es *EsUtil) newSpan(ctx context.Context, name string) (span trace.Span) {
spanName, ok := ctx.Value("SpanName").(string)
if !ok {
spanName = name
}
_, span = es.tracer.Start(ctx, spanName)
return
}
func (es *EsUtil) startTracer(index, id, action string, body []byte, span trace.Span, ctx context.Context) {
span.SetAttributes(
attribute.String("elastic._index", index),
attribute.String("elastic._id", id),
attribute.String("elastic.action", action),
attribute.String("elastic.client.version", elasticsearch.Version),
)
if len(body) > 0 {
span.SetAttributes(
attribute.String("elastic.request.body", string(body)),
)
}
}
func (es *EsUtil) endTracer(err error, status_code int, body []byte, span trace.Span) {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return
}
span.SetAttributes(
attribute.Int("elastic.status", status_code),
attribute.String("elastic.response.body", string(body)),
)
}
到此,从go-zero的服务A接收到一个请求,到由当前服务A发送出去的第一跳操作(HTTP、Elasticsearch、数据库等)都可以在jaerger里看到trace了,大概长这样:
microserviceA
|
|--> HTTP(蓝鲸CMDB)
|--> Elasticsearch
|--> PostgreSQL
|--> HTTP(蓝鲸CMDB)
|--> HTTP(蓝鲸CMDB)
| ……
此时我们发现,跨服务(A服务调用B服务)现在还是没有链路,单独看服务A的链路也有,服务B的链路也有,但AB没连起来。原因在于我们在链路传递参数的过程中,并没有把traceId、spanId传递到下一跳。自然下一跳和上一跳就没法连接起来了。
改造代码之前还是先看原理。
Opentelemetry链路传递核心原理
先看一张在HTTP通信下,trace如何进行链路传递参数
链路传递数据时用到一个载体叫做propagators,什么是 Propagator?OpenTelemetry 是这样定义的
大概意思就是:从系统的横向切入面看,通过定义一个可读可写的context对象,在应用间进行状态的传递。
这里还有一个Baggage的概念,但是本文暂未涉及到,因此不再赘述,示例代码只需要propagator就可以满足需求了
Propagator注入trace实现链路信息传递
服务A通过imroc/req调用服务B(HTTP调用),需要在服务A端HTTP发起之前给propagator注入载体(carrier),并写入HTTP headers,如下代码
func (c *HttpClient) roundTripFunc(rt req.RoundTripper) req.RoundTripFunc {
return func(req *req.Request) (resp *req.Response, err error) {
// 省略……
spanCtx, span := c.Tracer.Start(req.Context(), spanName) // 拿到关键的spanCtx
defer span.End()
// 最关键的一句话,把spanCtx里的trace信息inject到载体propagation.HeaderCarrier里,并通过HTTP headers传输
otel.GetTextMapPropagator().Inject(spanCtx, propagation.HeaderCarrier(req.Headers))
span.SetAttributes(
// 省略……
)
}
}
然后在服务B端,从HTTP headers中提取trace信息到载体(carrier
),并放入context
里,这样就完成了应用之间的链路传递。这部分功能go-zero已经默认实现了,当开启了Telemetry
后,go-zero会通过内置的trace中间件从载体中提取trace信息,这部分源码如下(https://github.com/zeromicro/go-zero/blob/v1.5.2/rest/handler/tracehandler.go):
// TraceHandler return a middleware that process the opentelemetry.
func TraceHandler(serviceName, path string, opts ...TraceOption) func(http.Handler) http.Handler {
var options traceOptions
for _, opt := range opts {
opt(&options)
}
ignorePaths := collection.NewSet()
ignorePaths.AddStr(options.traceIgnorePaths...)
return func(next http.Handler) http.Handler {
tracer := otel.Tracer(trace.TraceName)
propagator := otel.GetTextMapPropagator()
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
spanName := path
if len(spanName) == 0 {
spanName = r.URL.Path
}
if ignorePaths.Contains(spanName) {
next.ServeHTTP(w, r)
return
}
ctx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header))
spanCtx, span := tracer.Start(
ctx,
spanName,
oteltrace.WithSpanKind(oteltrace.SpanKindServer),
oteltrace.WithAttributes(semconv.HTTPServerAttributesFromHTTPRequest(
serviceName, spanName, r)...),
)
defer span.End()
// convenient for tracking error messages
propagator.Inject(spanCtx, propagation.HeaderCarrier(w.Header()))
trw := &response.WithCodeResponseWriter{Writer: w, Code: http.StatusOK}
next.ServeHTTP(trw, r.WithContext(spanCtx))
span.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(trw.Code)...)
span.SetStatus(semconv.SpanStatusFromHTTPStatusCodeAndSpanKind(
trw.Code, oteltrace.SpanKindServer))
})
}
}
现在打开jaeger页面,可以看到两个服务之间的链路已经连起来了
展开GetProjects,可以看到注入的header长这样