聊聊链路追踪 OpenTracing

什么是 Tracing

Logging Metrics Tracing

对 Tracing 的定义是,在软件工程中,Tracing 指使用特定的日志记录程序的执行信息,与之相近的还有两个概念,它们分别是 Logging 和 Metrics。

  • Logging:用于记录离散的事件,包含程序执行到某一点或某一阶段的详细信息。
  • Metrics:可聚合的数据,且通常是固定类型的时序数据,包括 Counter、Gauge、Histogram 等。
  • Tracing:记录单个请求的处理流程,其中包括服务调用和处理时长等信息。

同时这三种定义相交的情况也比较常见。

  • Logging & Metrics:可聚合的事件。例如分析某对象存储的 Nginx 日志,统计某段时间内 GET、PUT、DELETE、OPTIONS 操作的总数。
  • Metrics & Tracing:单个请求中的可计量数据。例如 SQL 执行总时长、gRPC 调用总次数。
  • Tracing & Logging:请求阶段的标签数据。例如在 Tracing 的信息中标记详细的错误原因。

针对每种分析需求,我们都有非常强大的集中式分析工具。

  • Logging:ELK,近几年势头最猛的日志分析服务,无须多言。

  • Metrics:Prometheus,第二个加入 CNCF 的开源项目,非常好用。

  • Tracing:OpenTracingJaeger,Jaeger 是 Uber 开源的一个兼容 OpenTracing 标准的分布式追踪服务。目前 Jaeger 也加入了 CNCF

原理

​ 分布式追踪系统大体分为三个部分,数据采集、数据持久化、数据展示。数据采集是指在代码中埋点,设置请求中要上报的阶段,以及设置当前记录的阶段隶属于哪个上级阶段。数据持久化则是指将上报的数据落盘存储,例如 Jaeger 就支持多种存储后端,可选用 Cassandra 或者 Elasticsearch。数据展示则是前端根据 Trace ID 查询与之关联的请求阶段,并在界面上呈现。

Request Demonstration

上图是一个请求的流程例子,请求从客户端发出,到达负载均衡,再依次进行认证、计费,最后取到目标资源。

OpenTracing Demonstration

请求过程被采集之后,会以上图的形式呈现,横坐标是时间,圆角矩形是请求的执行的各个阶段。

发展历史

​ 早在 2005 年,Google 就在内部部署了一套分布式追踪系统 Dapper,并发表了一篇论文《Dapper, a Large-Scale Distributed Systems Tracing Infrastructure》,阐述了该分布式追踪系统的设计和实现,可以视为分布式追踪领域的鼻祖。随后出现了受此启发的开源实现,如 Zipkin、SourceGraph 开源的 Appdash、Red Hat 的 Hawkular APM、Uber 开源的 Jaeger 等。但各家的分布式追踪方案是互不兼容的,这才诞生了 OpenTracing。

​ OpenTracing 是一个 Library,定义了一套通用的数据上报接口,要求各个分布式追踪系统都来实现这套接口。这样一来,应用程序只需要对接 OpenTracing,而无需关心后端采用的到底什么分布式追踪系统,因此开发者可以无缝切换分布式追踪系统,也使得在通用代码库增加对分布式追踪的支持成为可能。

OpenTracing Diagram

数据模型

a Trace can be thought of as a directed acyclic graph (DAG) of Spans。- Trace 是多个 Span 组成的有向非循环图。

这部分在 OpenTracing 的规范中写的非常清楚,下面只大概翻译一下其中的关键部分,细节可参考原始文档 《The OpenTracing Semantic Specification》

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
Causal relationships between Spans in a single Trace

        [Span A]  ←←←(the root span)
            |
     +------+------+
     |             |
 [Span B]      [Span C] ←←←(Span C is a `ChildOf` Span A)
     |             |
 [Span D]      +---+-------+
               |           |
           [Span E]    [Span F] >>> [Span G] >>> [Span H]
                         (Span G `FollowsFrom` Span F)

Trace 是调用链,每个调用链由多个 Span 组成。Span 的单词含义是范围,可以理解为某个处理阶段。Span 和 Span 的关系称为 Reference。上图中,总共有标号为 A-H 的 8 个阶段。

1
2
3
4
5
6
7
8
9
Temporal relationships between Spans in a single Trace

––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–> time

 [Span A···················································]
   [Span B··············································]
      [Span D··········································]
    [Span C········································]
         [Span E·······]        [Span F··] [Span G··] [Span H··]

上图是按照时间顺序呈现的调用链。

每个阶段(Span)包含如下状态:

  • 操作名称
  • 起始时间
  • 结束时间
  • 一组零或多个键:值结构的 Span标签 ( span Tags)。键必须是字符串。值可以是字符串,布尔或数值类型.
  • 一组零或多个 Span日志 (span Logs),其中每个都是一个键:值映射并与一个时间戳配对。键必须是字符串,值可以是任何类型。 并非所有的 OpenTracing 实现都必须支持每种值类型。
  • 阶段上下文(SpanContext),其中包含 Trace ID 和 Span ID
  • 引用关系(References):零或多个因果相关的 Span 间的 References (通过那些相关的 SpanSpanContext )

每个 SpanContext 封装了如下状态:

  • 任何需要跟跨进程 Span 关联的,依赖于 OpenTracing 实现的状态(例如 Trace 和 Span 的 id)
  • 键:值结构的跨进程的 Baggage Items(区别于 span tag,baggage 是全局范围,在 span 间保持传递,而tag 是 span 内部,不会被子 span 继承使用。)

​ 阶段(Span)可以有 ChildOfFollowsFrom 两种引用关系。ChildOf 用于表示父子关系,即在某个阶段中发生了另一个阶段,是最常见的阶段关系,典型的场景如调用 RPC 接口、执行 SQL、写数据。FollowsFrom 表示跟随关系,意为在某个阶段之后发生了另一个阶段,用来描述顺序执行关系。

一个 Trace 的 json 案例

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
{
    "data": [
        {
            "traceID": "790e003e22209ca4",
            "spans": [
                {
                    "traceID": "790e003e22209ca4",
                    "spanID": "790e003e22209ca4",
                    "flags": 1,
                    "operationName": "say-hello",
                    "references": [],
                    "startTime": 1611318627992154,
                    "duration": 524139,
                    "tags": [
                        {
                            "key": "sampler.type",
                            "type": "string",
                            "value": "const"
                        },
                        {
                            "key": "sampler.param",
                            "type": "bool",
                            "value": true
                        },
                        {
                            "key": "hello-to",
                            "type": "string",
                            "value": "This trace"
                        },
                        {
                            "key": "internal.span.format",
                            "type": "string",
                            "value": "proto"
                        }
                    ],
                    "logs": [],
                    "processID": "p1",
                    "warnings": null
                },
                {
                    "traceID": "790e003e22209ca4",
                    "spanID": "d32970aca4d7a39b",
                    "flags": 1,
                    "operationName": "format-string",
                    "references": [],
                    "startTime": 1611318628080551,
                    "duration": 432791,
                    "tags": [
                        {
                            "key": "span.kind",
                            "type": "string",
                            "value": "client"
                        },
                        {
                            "key": "http.method",
                            "type": "string",
                            "value": "GET"
                        },
                        {
                            "key": "http.url",
                            "type": "string",
                            "value": "http://localhost:8081/api/format/This trace"
                        },
                        {
                            "key": "internal.span.format",
                            "type": "string",
                            "value": "proto"
                        }
                    ],
                    "logs": [
                        {
                            "timestamp": 1611318628510535,
                            "fields": [
                                {
                                    "key": "event",
                                    "type": "string",
                                    "value": "string.Format"
                                },
                                {
                                    "key": "value",
                                    "type": "string",
                                    "value": "Hello, This trace!"
                                }
                            ]
                        }
                    ],
                    "processID": "p1",
                    "warnings": null
                },
                {
                    "traceID": "790e003e22209ca4",
                    "spanID": "4b73f8e8e77fe9dc",
                    "flags": 1,
                    "operationName": "print-hello",
                    "references": [],
                    "startTime": 1611318628515966,
                    "duration": 259,
                    "tags": [
                        {
                            "key": "internal.span.format",
                            "type": "string",
                            "value": "proto"
                        }
                    ],
                    "logs": [
                        {
                            "timestamp": 1611318628516206,
                            "fields": [
                                {
                                    "key": "event",
                                    "type": "string",
                                    "value": "WriteLine"
                                }
                            ]
                        }
                    ],
                    "processID": "p1",
                    "warnings": null
                }
            ],
            "processes": {
                "p1": {
                    "serviceName": "hello-world",
                    "tags": [
                        {
                            "key": "hostname",
                            "type": "string",
                            "value": "Whuanle-PC"
                        },
                        {
                            "key": "ip",
                            "type": "string",
                            "value": "172.20.240.1"
                        },
                        {
                            "key": "jaeger.version",
                            "type": "string",
                            "value": "CSharp-0.4.2.0"
                        }
                    ]
                }
            },
            "warnings": null
        }
    ],
    "total": 0,
    "limit": 0,
    "offset": 0,
    "errors": null
}

建议使用方式

​ 首先假设某微服务已经有了中心化的日志收集和处理系统,如果还没有的话,强烈建议部署一套 ELK。再假设对于每一个请求,都会有一个贯穿整个请求流程的 Request ID,如果还没有的话,强烈建议加一个。以上准备完毕后,可以选取一个分布式追踪系统,集成到服务当中,建议采用 Jaeger。重点在最后,在 Trace 的起始处,将 Trace ID 设置为 Request ID,这么一来就打通了日志系统和分布式追踪系统,可以使用同一个 ID 查询请求的事件流和日志流,从此开启了上帝视角。

具体使用

​ 脱离分布式追踪系统单独讲 OpenTracing 的使用方法的话,意义不大,所以本文就不介绍具体的使用方法,之后会以 Jaeger 为例,解释如何给微服务增加分布式追踪,以及如何与现有的日志系统集合。

如果想简单了解一下使用方式,可参考 OpenTracing 的《Quick Start》

非入侵式

​ 除了通过修改应用程序代码增加分布式追踪之外,还有一种不需要修改代码的非入侵的方式,那就是 Service Mesh。Service Mesh 一般会被翻译成服务啮合层,它是在网络层面做文章,通过 Sidecar 的方式为 Pod 增加一层代理,通过这层网络代理来实现一些服务治理的功能,因为是工作在网络层面,可以做到跨语言、非入侵。Istio 则是目前最成熟的 Service Mash 工具,支持启用分布式追踪服务。Istio 会修改微服务之间发送的网络请求,在请求中注入 Trace 和 Span 标记,再将采集到的数据发送到支持 OpenTracing 的分布式追踪服务中,从而拿到请求在微服务中的调用链。当然这种方式也有缺点,它无法追踪某个微服务内部的调用过程,并且目前阶段 Istio 只能追踪 HTTP 请求,能够覆盖的范围比较有限。如果想追踪更详细的数据,还是需要在中间件和代码中埋点,不过好在埋点的过程并不复杂,不会成为一个额外的负担。

实战环节

opentelemetry POC

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// main.go

package main

import (
	"context"
	"io"
	"log"
	"os"
	"os/signal"

	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
	"go.opentelemetry.io/otel/sdk/resource"
	"go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)

func newExporter(w io.Writer) (trace.SpanExporter, error) {
	return stdouttrace.New(
		stdouttrace.WithWriter(w),
		// Use human-readable output.
		stdouttrace.WithPrettyPrint(),
		// Do not print timestamps for the demo.
		stdouttrace.WithoutTimestamps(),
	)
}

func newResource() *resource.Resource {
	r, _ := resource.Merge(
		resource.Default(),
		resource.NewWithAttributes(
			semconv.SchemaURL,
			semconv.ServiceNameKey.String("fib"),
			semconv.ServiceVersionKey.String("v0.1.0"),
			attribute.String("environment", "demo"),
		),
	)
	return r
}

func main() {
	l := log.New(os.Stdout, "", 0)

	// Write telemetry data to a file.
	f, err := os.Create("traces.txt")
	if err != nil {
		l.Fatal(err)
	}
	defer f.Close()

	exp, err := newExporter(f)
	if err != nil {
		l.Fatal(err)
	}

	tp := trace.NewTracerProvider(
		trace.WithBatcher(exp),
		trace.WithResource(newResource()),
	)
	defer func() {
		if err := tp.Shutdown(context.Background()); err != nil {
			l.Fatal(err)
		}
	}()
	otel.SetTracerProvider(tp)

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, os.Interrupt)

	errCh := make(chan error)
	app := NewApp(os.Stdin, l)
	go func() {
		errCh <- app.Run(context.Background())
	}()

	select {
	case <-sigCh:
		l.Println("\ngoodbye")
		return
	case err := <-errCh:
		if err != nil {
			l.Fatal(err)
		}
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// app.go

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"strconv"

	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/codes"
	"go.opentelemetry.io/otel/trace"
)

const name = "fib"

func Fibonacci(n uint) (uint64, error) {
	if n <= 1 {
		return uint64(n), nil
	}
	var n2, n1 uint64 = 0, 1
	for i := uint(2); i < n; i++ {
		n2, n1 = n1, n1+n2
	}

	return n1 + n2, nil
}

type App struct {
	r io.Reader
	l *log.Logger
}

func NewApp(r io.Reader, l *log.Logger) *App {
	return &App{r, l}
}

func (a *App) Run(ctx context.Context) error {
	for {
		// Each execution of the run loop, we should get a new "root" span and context.
		newCtx, span := otel.Tracer(name).Start(ctx, "Run")

		n, err := a.Poll(newCtx)
		if err != nil {
			span.End()
			return err
		}
		a.Write(newCtx, n)
	}
}

func (a *App) Poll(ctx context.Context) (uint, error) {

	_, span := otel.Tracer(name).Start(ctx, "Poll")

	defer span.End()

	a.l.Print("What Fibonacci number would you like to konw: ")

	var n uint
	_, err := fmt.Fscanf(a.r, "%d\n", &n)

	// Store n as a string to not overflow an int64.
	nStr := strconv.FormatUint(uint64(n), 10)

	span.SetAttributes(attribute.String("request.n", nStr))

	return n, err
}

func (a *App) Write(ctx context.Context, n uint) {

	var span trace.Span
	ctx, span = otel.Tracer(name).Start(ctx, "Poll")
	defer span.End()

	f, err := func(ctx context.Context) (uint64, error) {
		_, span := otel.Tracer(name).Start(ctx, "Fibonacci")
		defer span.End()
		f, err := Fibonacci(n)
		if err != nil {
			span.RecordError(err)
			span.SetStatus(codes.Error, err.Error())
		}
		return f, err
	}(ctx)

	if err != nil {
		a.l.Printf("Fibonacci(%d): %v\n", n, err)
	} else {
		a.l.Printf("Fibonacci(%d)= %d\n", n, f)
	}
}

Jaeger 安装

  • docker-compose 文件安装
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
---
version: "3"

services:
  elasticsearch:
    image: elasticsearch:7.14.2
    networks:
      - elastic-jaeger
    ports:
      - "127.0.0.1:9200:9200"
      - "127.0.0.1:9300:9300"
    restart: on-failure
    environment:
      - cluster.name=jaeger-cluster
      - discovery.type=single-node
      - http.host=0.0.0.0
      - transport.host=127.0.0.1
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
      - xpack.security.enabled=false
    volumes:
      - E:\data\es:/usr/share/elasticsearch/data

  jaeger-collector:
    image: jaegertracing/jaeger-collector
    ports:
      - "14269:14269"
      - "14268:14268"
      - "14267:14267"
      - "9411:9411"
    networks:
      - elastic-jaeger
    restart: on-failure
    environment:
      - SPAN_STORAGE_TYPE=elasticsearch
    command: [
      "--es.server-urls=http://elasticsearch:9200",
      "--es.num-shards=1",
      "--es.num-replicas=0",
      "--log-level=error"
    ]
    depends_on:
      - elasticsearch

  jaeger-agent:
    image: jaegertracing/jaeger-agent
    hostname: jaeger-agent
    command: ["--reporter.grpc.host-port=jaeger-collector:14267"]
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
    networks:
      - elastic-jaeger
    restart: on-failure
    environment:
      - SPAN_STORAGE_TYPE=elasticsearch
    depends_on:
      - jaeger-collector

  jaeger-query:
    image: jaegertracing/jaeger-query
    environment:
      - SPAN_STORAGE_TYPE=elasticsearch
      - no_proxy=localhost
    ports:
      - "16686:16686"
      - "16687:16687"
    networks:
      - elastic-jaeger
    restart: on-failure
    command: [
      "--es.server-urls=http://elasticsearch:9200",
      "--span-storage.type=elasticsearch",
      "--log-level=debug"
    ]
    depends_on:
      - jaeger-agent

volumes:
  esdata:
    driver: local

networks:
  elastic-jaeger:
    driver: bridge

http和grpc中使用 Jaeger

1. Gin

通过 Middleware 可以追踪到最外层的 Handler,更深层方法需要追踪的话可以通过ctxspan传递到各个方法中去进一步追踪。

http 请求使用 request.Header 做载体。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package middleware

import (
	"context"
	"github.com/gin-gonic/gin"
	"github.com/opentracing/opentracing-go"
	"github.com/opentracing/opentracing-go/ext"
	"i-go/apm/trace/config"
)

// Jaeger 通过 middleware 将 tracer 和 ctx 注入到 gin.Context 中
func Jaeger() gin.HandlerFunc {
	return func(c *gin.Context) {
		var parentSpan opentracing.Span
		tracer, closer := config.NewTracer("gin-demo")
		defer closer.Close()
		// 直接从 c.Request.Header 中提取 span,如果没有就新建一个
		spCtx, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(c.Request.Header))
		if err != nil {
			parentSpan = tracer.StartSpan(c.Request.URL.Path)
			defer parentSpan.Finish()
		} else {
			parentSpan = opentracing.StartSpan(
				c.Request.URL.Path,
				opentracing.ChildOf(spCtx),
				opentracing.Tag{Key: string(ext.Component), Value: "HTTP"},
				ext.SpanKindRPCServer,
			)
			defer parentSpan.Finish()
		}
        // 然后存到 g.ctx 中 供后续使用
		c.Set("tracer", tracer)
		c.Set("ctx", opentracing.ContextWithSpan(context.Background(), parentSpan))
		c.Next()
	}
}

然后在 gin 中添加这个 middleware 即可。

1
	e := gin.New() e.Use(middleware.Jaeger()) 

需要更细粒度的追踪,只需要将 span 传递到各个方法即可

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Register(e *gin.Engine) {
	e.GET("/ping", Ping)
}

func Ping(c *gin.Context) {
	psc, _ := c.Get("ctx")
	ctx := psc.(context.Context)
	doPing1(ctx)
	doPing2(ctx)
	c.JSON(200, gin.H{"message": "pong"})
}
func doPing1(ctx context.Context) {
	span, _ := opentracing.StartSpanFromContext(ctx, "doPing1")
	defer span.Finish()
	time.Sleep(time.Second)
	fmt.Println("pong")
}
func doPing2(ctx context.Context) {
	span, _ := opentracing.StartSpanFromContext(ctx, "doPing2")
	defer span.Finish()
	time.Sleep(time.Second)
	fmt.Println("pong")
}

2. gRPC

追踪 gRPC 则通过拦截器实现。

  • 这里使用使用 gRPC 的metadata 来做载体。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func ClientInterceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor {
	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
		invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
		span, _ := opentracing.StartSpanFromContext(ctx,
			"call gRPC",
			opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
			ext.SpanKindRPCClient)
		defer span.Finish()

		md, ok := metadata.FromOutgoingContext(ctx)
		if !ok {
			md = metadata.New(nil)
		} else {
			md = md.Copy()
		}

		err := tracer.Inject(span.Context(), opentracing.TextMap, MDReaderWriter{md})
		if err != nil {
			span.LogFields(log.String("inject-error", err.Error()))
		}

		newCtx := metadata.NewOutgoingContext(ctx, md)
		err = invoker(newCtx, method, req, reply, cc, opts...)
		if err != nil {
			span.LogFields(log.String("call-error", err.Error()))
		}
		return err
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func ServerInterceptor(tracer opentracing.Tracer) grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
		resp interface{}, err error) {
		md, ok := metadata.FromIncomingContext(ctx)
		if !ok {
			md = metadata.New(nil)
		}
		// 服务端拦截器则是在MD中把 span提取出来
		spanContext, err := tracer.Extract(opentracing.TextMap, MDReaderWriter{md})
		if err != nil && err != opentracing.ErrSpanContextNotFound {
			fmt.Print("extract from metadata error: ", err)
		} else {
			span := tracer.StartSpan(
				info.FullMethod,
				ext.RPCServerOption(spanContext),
				opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
				ext.SpanKindRPCServer,
			)
			defer span.Finish()
			ctx = opentracing.ContextWithSpan(ctx, span)
		}
		return handler(ctx, req)
	}
}

MDReaderWriter 结构如下

为了做载体,必须要实现 opentracing.TextMapWriter opentracing.TextMapReader 这两个接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// TextMapWriter is the Inject() carrier for the TextMap builtin format.With
// it, the caller can encode a SpanContext for propagation as entries in a map
// of unicode strings.
type TextMapWriter interface {
   Set(key, val string)
}

// TextMapReader is the Extract() carrier for the TextMap builtin format. With it,
// the caller can decode a propagated SpanContext as entries in a map of
// unicode strings.
type TextMapReader interface {
   ForeachKey(handler func(key, val string) error) error
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func main() {
	// tracer
	tracer, closer := config.NewTracer("gRPC-hello")
	defer closer.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	// conn
	conn, err := grpc.DialContext(
		ctx,
		"localhost:50051",
		grpc.WithInsecure(),
		grpc.WithBlock(),
		grpc.WithUnaryInterceptor(
			grpcMiddleware.ChainUnaryClient(
				interceptor.ClientInterceptor(tracer),
			),
		),
	)
	if err != nil {
		fmt.Println("grpc conn err:", err)
		return
	}
	client := proto.NewHelloClient(conn)
	r, err := client.SayHello(context.Background(), &proto.HelloReq{Name: "xiaoming"})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.Message)
}

然后建立连接或者启动服务的时候把拦截器添加上即可

建立连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func main() {
	// tracer
	tracer, closer := config.NewTracer("gRPC-hello")
	defer closer.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	// conn
	conn, err := grpc.DialContext(
		ctx,
		"localhost:50051",
		grpc.WithInsecure(),
		grpc.WithBlock(),
		grpc.WithUnaryInterceptor(
			grpcMiddleware.ChainUnaryClient(
				interceptor.ClientInterceptor(tracer),
			),
		),
	)
	if err != nil {
		fmt.Println("grpc conn err:", err)
		return
	}
	client := proto.NewHelloClient(conn)
	r, err := client.SayHello(context.Background(), &proto.HelloReq{Name: "xiaoming"})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.Message)
}

启动服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func main() {
	lis, err := net.Listen("tcp", "50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	tracer, closer := config.NewTracer("gRPC-hello")
	defer closer.Close()
	// UnaryInterceptor
	s := grpc.NewServer(grpc.UnaryInterceptor(
		grpc_middleware.ChainUnaryServer(
			interceptor.ServerInterceptor(tracer),
		),
	))
	proto.RegisterHelloServer(s, &helloServer{})
	if err := s.Serve(lis); err != nil {
		panic(err)
	}
}

参考文章