ᕕ( ᐛ )ᕗ Jimyag's Blog

从0到1实现完整的微服务框架-链路追踪

在分布式系统,尤其是微服务系统中,一次外部请求往往需要内部多个模块,多个中间件,多台机器的相互调用才能完成。在这一系列的调用中,可能有些是串行的,而有些是并行的。在这种情况下,我们如何才能确定这整个请求调用了哪些应用?哪些模块?哪些节点?以及它们的先后顺序和各部分的性能如何呢?

这就是涉及到链路追踪。

jaeger安装

docker run -d --name jaeger   -e COLLECTOR_ZIPKIN_HOST_PORT=:9411   -p 5775:5775/udp   -p 6831:6831/udp   -p 6832:6832/udp   -p 5778:5778   -p 16686:16686   -p 14250:14250   -p 14268:14268   -p 14269:14269   -p 9411:9411 jaegertracing/all-in-one:1.32

api层添加链路追踪

链路追踪的起点在每次发起http请求的地方,这时候就需要一个拦截器来生成tracer

shop\api\user-api\middlewares\tracing.go

package middlewares

import (
	"fmt"

	"github.com/gin-gonic/gin"
	"github.com/uber/jaeger-client-go"
	jaegercfg "github.com/uber/jaeger-client-go/config"
	"go.uber.org/zap"

	"github.com/jimyag/shop/api/user/global"
)

func Tracing() gin.HandlerFunc {
	return func(ctx *gin.Context) {
		cfg := jaegercfg.Configuration{
			Sampler: &jaegercfg.SamplerConfig{
				Type:  jaeger.SamplerTypeConst,
				Param: 1, // 全部采样
			},
			Reporter: &jaegercfg.ReporterConfig{
				LogSpans: true,
				LocalAgentHostPort: fmt.Sprintf("%s:%d", 
					global.ServerConfig.JaegerInfo.Host, // jaeger 位置
					global.ServerConfig.JaegerInfo.Port, // 6831
				),
			},
			ServiceName: global.ServerConfig.Name,
		}
		tracer, close, err := cfg.NewTracer(jaegercfg.Logger(jaeger.StdLogger))
		if err != nil {
			global.Logger.Fatal("创建 tracer 失败", zap.Error(err))
		}
		defer close.Close()
		startSpan := tracer.StartSpan(ctx.Request.URL.Path)
		defer startSpan.Finish()
		ctx.Set("tracer", tracer)
		ctx.Set("parentSpan", startSpan)
		ctx.Next()
	}
}

将这个中间件配置到需要链路追踪的router上

shop\api\user-api\initialize\router.go全局都加

router.Use(middlewares.Tracing())

由于我们使用了负载均衡,所以对于其他的grpc的链接要加一个拦截器,来将context加入到grpc服务中。

package initialize

import (
	"fmt"

	"github.com/hashicorp/consul/api"
	_ "github.com/mbobakov/grpc-consul-resolver"
	"github.com/opentracing/opentracing-go"
	"go.uber.org/zap"
	"google.golang.org/grpc"

	"github.com/jimyag/shop/api/user/global"
	"github.com/jimyag/shop/api/user/proto"
	"github.com/jimyag/shop/api/user/util/otgrpc"
)

func InitSrvConn() {
	// consul
	conn, err := grpc.Dial(
		fmt.Sprintf("consul://%s:%d/%s?wait=14s",
			global.ServerConfig.ConsulInfo.Host,
			global.ServerConfig.ConsulInfo.Port,
			global.ServerConfig.UserSrv.Name,
		),
		grpc.WithInsecure(),
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
        // 添加的
		grpc.WithUnaryInterceptor(
			otgrpc.OpenTracingClientInterceptor(
				opentracing.GlobalTracer(),
			),
		),
        // 结束
	)
	if err != nil {
		global.Logger.Fatal("用户服务发现错误", zap.Error(err))
	}
	global.UserSrvClient = proto.NewUserClient(conn)

}

shop\api\user-api\util\otgrpc\client.go:31修改源码

func OpenTracingClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryClientInterceptor {
	otgrpcOpts := newOptions()
	otgrpcOpts.apply(optFuncs...)
	return func(
		ctx context.Context,
		method string,
		req, resp interface{},
		cc *grpc.ClientConn,
		invoker grpc.UnaryInvoker,
		opts ...grpc.CallOption,
	) error {
		var err error
		var parentCtx opentracing.SpanContext
		// 从 context 提取 父span
		if parent := opentracing.SpanFromContext(ctx); parent != nil {
			parentCtx = parent.Context()
		}
        // 修改的
		switch ctx.(type) {
		case *gin.Context:
			iTracer, ok := ctx.(*gin.Context).Get("tracer")
			if ok {
				tracer = iTracer.(opentracing.Tracer)
			}

			parentSpan, ok := ctx.(*gin.Context).Get("parentSpan")
			if ok {
				parentCtx = parentSpan.(*jaegerClient.Span).Context()
			}

		}

		if otgrpcOpts.inclusionFunc != nil &&
			!otgrpcOpts.inclusionFunc(parentCtx, method, req, resp) {
			return invoker(ctx, method, req, resp, cc, opts...)
		}
		clientSpan := tracer.StartSpan(
			method,
			opentracing.ChildOf(parentCtx),
			ext.SpanKindRPCClient,
			gRPCComponentTag,
		)
		defer clientSpan.Finish()
		// 使用metadata机制传递
		ctx = injectSpanContext(ctx, tracer, clientSpan)
		if otgrpcOpts.logPayloads {
			clientSpan.LogFields(log.Object("gRPC request", req))
		}
		err = invoker(ctx, method, req, resp, cc, opts...)
		if err == nil {
			if otgrpcOpts.logPayloads {
				clientSpan.LogFields(log.Object("gRPC response", resp))
			}
		} else {
			SetSpanTags(clientSpan, err, true)
			clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
		}
		if otgrpcOpts.decorator != nil {
			otgrpcOpts.decorator(clientSpan, method, req, resp, err)
		}
		return err
	}
}

这里修改源码是拿到context中的tracerparentSpan

grpc集成jaeger

在服务端还有子的过程

client拦截器的原理

从context拿到父亲的span

// 通过parentSpan生成当前的span
clientSpan := tracer.StartSpan(
			method,
			opentracing.ChildOf(parentCtx),
			ext.SpanKindRPCClient,
			gRPCComponentTag,
		)
		defer clientSpan.Finish()

通过metadata的机制,将它的内容写到metadata中去

// 使用metadata机制传递
		ctx = injectSpanContext(ctx, tracer, clientSpan)

然后通过shop\api\user-api\util\otgrpc\client.go:243

func injectSpanContext(ctx context.Context, tracer opentracing.Tracer, clientSpan opentracing.Span) context.Context {
	md, ok := metadata.FromOutgoingContext(ctx)
	if !ok {
		md = metadata.New(nil)
	} else {
		md = md.Copy()
	}
	mdWriter := metadataReaderWriter{md}
	// 将服务端想要的信息注入到metadata中
	err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, mdWriter)
	// We have no better place to record an error than the Span itself :-/
	if err != nil {
		clientSpan.LogFields(log.String("event", "Tracer.Inject() failed"), log.Error(err))
	}
	return metadata.NewOutgoingContext(ctx, md)
}

如何写到opentracing中去这是有一个标准,是由opentracing做的,如何提取也是由它来做的。

将服务端想要的信息注入到metadata中去,如果注入、拿数据我们不用关心。

在grpc服务端

// For example:
//
//     s := grpc.NewServer(
//         ...,  // (existing ServerOptions)
//         grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)))

只要在new grpcserver的时候添加一个服务端的拦截器就行

shop\service\user_srv\main.go

// 初始化jaeger
	cfg := jaegercfg.Configuration{
		Sampler: &jaegercfg.SamplerConfig{
			Type:  jaeger.SamplerTypeConst,
			Param: 1, // 全部采样
		},
		Reporter: &jaegercfg.ReporterConfig{
			LogSpans: true,
			LocalAgentHostPort: fmt.Sprintf("%s:%d",
				global.RemoteConfig.JaegerInfo.Host,
				global.RemoteConfig.JaegerInfo.Port,
			),
		},
		ServiceName: "user-srv",
	}
	// 初始化一jaeger
	tracer, cl, err := cfg.NewTracer(jaegercfg.Logger(jaeger.StdLogger))
	if err != nil {
		global.Logger.Fatal("创建 tracer 失败", zap.Error(err))
	}
	opentracing.SetGlobalTracer(tracer)
	// 注册服务
	server := 		grpc.NewServer(grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)))

我们这边可以自己生成tracer,没有必要用服务端的tracer,我们只要处理好父子关系就好,当整个服务挂了之后cl.Close()

在grpc的服务中如何拿到tracer,

shop\service\user_srv\util\otgrpc\server.go:39从context中拿到span

spanContext, err := extractSpanContext(ctx, tracer)
func extractSpanContext(ctx context.Context, tracer opentracing.Tracer) (opentracing.SpanContext, error) {
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		md = metadata.New(nil)
	}
    // 与之前的Inject对应
	return tracer.Extract(opentracing.HTTPHeaders, metadataReaderWriter{md})
}

在服务中使用:

D:\repository\shop\service\user_srv\handler\user.go

func (u *UserServer) GetUserList(ctx context.Context, req *proto.PageIngo) (*proto.UserListResponse, error) {
	// 省略之前的
    // 从context总拿到parentSpan
	parentSpan := opentracing.SpanFromContext(ctx)
    // 生成一个span并设置它的父亲
	getUserListSpan := opentracing.GlobalTracer().StartSpan("get user list form database", opentracing.ChildOf(parentSpan.Context()))
	users, err := u.Store.ListUsers(ctx, arg)
	if err != nil {
		return nil, status.Errorf(codes.Internal, "获得用户列表信息失败")
	}
	getUserListSpan.Finish()
    // 追踪结束。
    // 省略其他
}

#微服务 #gRPC #链路追踪