idou老师教你学Istio 27:解读Mixer Report流程

栏目: 后端 · 发布时间: 5年前

内容简介:1、概述Mixer是Istio的核心组件,提供了遥测数据收集的功能,能够实时采集服务的请求状态等信息,以达到监控服务状态目的。1.1 核心功能

1、概述

Mixer是Istio的核心组件,提供了遥测数据收集的功能,能够实时采集服务的请求状态等信息,以达到监控服务状态目的。

1.1 核心功能

•前置检查(Check):某服务接收并响应外部请求前,先通过Envoy向Mixer(Policy组件)发送Check请求,做一些access检查,同时确认adaptor所需cache字段,供之后Report接口使用;

•配额管理(Quota):通过配额管理机制,处理多请求时发生的资源竞争;

•遥测数据上报(Report):该服务请求处理结束后,将请求相关的日志,监控等数据,通过Envoy上报给Mixer(telemetry)

1.2 示例图

idou老师教你学Istio 27:解读Mixer Report流程

2、代码分析

2.1 Report代码分析

本节主要介绍Report的详细流程(基于Istio release1.0.0版本,commit id为3a136c90)。Report是mixer server的一个接口,供Envoy通过grpc调用。首先,我们从mixer server的启动入口main函数看起:

func main() {

rootCmd := cmd.GetRootCmd(os.Args[1:], supportedTemplates(), supportedAdapters(), shared.Printf, shared.Fatalf)



if err := rootCmd.Execute(); err != nil {

  os.Exit(-1)

}

}

在rootCmd中,mixs通过server命令启动了mixer server,从而触发了runserver函数,在runserver中初始化(New)了一个server,我们一起看下newServer的函数,在这个函数中,与我们本节相关的内容就是Mixer初始化了一个grpc服务器NewGRPCServer。

rootCmd.AddCommand(serverCmd(info, adapters, printf, fatalf))

func serverCmd(info map[string]template.Info, adapters []adapter.InfoFn, printf, fatalf shared.FormatFn) *cobra.Command {

sa := server.DefaultArgs()

sa.Templates = info

sa.Adapters = adapters



serverCmd := &cobra.Command{

  Use:   "server",

  Short: "Starts Mixer as a server",

  Run: func(cmd *cobra.Command, args []string) {

     runServer(sa, printf, fatalf)

  },

}… …

}

func newServer(a *Args, p *patchTable) (*Server, error) {

grpc.EnableTracing = a.EnableGRPCTracing

s.server = grpc.NewServer(grpcOptions...)

mixerpb.RegisterMixerServer(s.server, api.NewGRPCServer(s.dispatcher, s.gp, s.checkCache))

}

在这个grpc的服务端中,定义了一个Report接口,这就是我们这节课主要关注的内容(可以看到Check接口也在此定义,我们下节再讲)

func (s *grpcServer) Report(ctx context.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) {

lg.Debugf("Report (Count: %d)", len(req.Attributes))

// 校验attribute是否为空,空则直接return

if len(req.Attributes) == 0 {

  return reportResp, nil

}



// 若属性word为空,赋为默认值

for i := 0; i < len(req.Attributes); i++ {

  iflen(req.Attributes[i].Words) == 0 {

     req.Attributes[i].Words = req.DefaultWords

  }

}



// 根据第一条attribute,生成proto包,proto包能跟踪一组attributes

protoBag := attribute.NewProtoBag(&req.Attributes[0], s.globalDict, s.globalWordList)



// 初始化,开始跟踪attributes各个条目中属性

accumBag := attribute.GetMutableBag(protoBag)



// 保存accumBag的增量状态

reportBag := attribute.GetMutableBag(accumBag)



reportSpan, reportCtx := opentracing.StartSpanFromContext(ctx, "Report")

reporter := s.dispatcher.GetReporter(reportCtx)



var errors *multierror.Error

for i := 0; i < len(req.Attributes); i++ {

  span, newctx := opentracing.StartSpanFromContext(reportCtx, fmt.Sprintf("attribute bag %d", i))



  // 第一个属性已经在创建proto包时创建,在此追踪所有attributes

  if i > 0 {

     if err := accumBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList); err != nil {

        err = fmt.Errorf("request could not be processed due to invalid attributes: %v", err)

        span.LogFields(otlog.String("error", err.Error()))

        span.Finish()

        errors = multierror.Append(errors, err)

        break

     }

  }



  lg.Debug("Dispatching Preprocess")

  // 真正开始分发,预处理阶段

  if err := s.dispatcher.Preprocess(newctx, accumBag, reportBag); err != nil {

     err = fmt.Errorf("preprocessing attributes failed: %v", err)

     span.LogFields(otlog.String("error", err.Error()))

     span.Finish()

     errors = multierror.Append(errors, err)

     continue

  }



  lg.Debug("Dispatching to main adapters after running preprocessors")

  lg.Debuga("Attribute Bag: \n", reportBag)

  lg.Debugf("Dispatching Report %d out of %d", i+1, len(req.Attributes))

  // 真正开始分发,将数据逐步加入到缓存中

  if err := reporter.Report(reportBag); err != nil {

     span.LogFields(otlog.String("error", err.Error()))

     span.Finish()

     errors = multierror.Append(errors, err)

     continue

  }



  span.Finish()



  // purge the effect of the Preprocess call so that the next time through everything is clean

  reportBag.Reset()

}



reportBag.Done()

accumBag.Done()

protoBag.Done()

// 真正的发送函数,从缓存中取出并发送到adaptor

if err := reporter.Flush(); err != nil {

  errors = multierror.Append(errors, err)

}

reporter.Done()



if errors != nil {

  reportSpan.LogFields(otlog.String("error", errors.Error()))

}

reportSpan.Finish()



if errors != nil {

  lg.Errora("Report failed:", errors.Error())

  return nil, grpc.Errorf(codes.Unknown, errors.Error())

}

// 过程结束

return reportResp, nil

}

通过上述代码解读,我们了解了Report接口的工作流程,但此时我们还并不知道一个请求的状态是如何报给adaptor的,下面我们通过简要的函数串接,把这部分流程串起来:

上述的预处理阶段Preprocess与上报阶段Report,最终都会调用到dispatch函数,仅通过不同的type来区分要做的事情;

func (d *Impl) Preprocess(ctx context.Context, bag attribute.Bag, responseBag *attribute.MutableBag) error {

s := d.getSession(ctx, tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR, bag)

s.responseBag = responseBag

err := s.dispatch()

if err == nil {

  err = s.err

}

… …

}

func (r *reporter) Report(bag attribute.Bag) error {

s := r.impl.getSession(r.ctx, tpb.TEMPLATE_VARIETY_REPORT, bag)

s.reportStates = r.states

err := s.dispatch()

if err == nil {

  err = s.err

}

… …

}

而dispatch函数中做了真正的分发动作,包括:

1.遍历所有adaptor,调用adaptor中的函数,针对不同的adaptor生成不同的instance,并将instance缓存放入reportstates

var instance interface{}

if instance, err = input.Builder(s.bag); err != nil {

log.Errorf("error creating instance: destination='%v', error='%v'", destination.FriendlyName, err)

s.err = multierror.Append(s.err, err)

continue

}

type NamedBuilder struct {

InstanceShortName string

Builder           template.InstanceBuilderFn

}

InstanceBuilderFn func(attrs attribute.Bag) (interface{}, error)

CreateInstanceBuilder: func(instanceName string, param proto.Message, expb *compiled.ExpressionBuilder) (template.InstanceBuilderFn, error)

builder.build(attr)

// For report templates, accumulate instances as much as possible before commencing dispatch.

if s.variety == tpb.TEMPLATE_VARIETY_REPORT {

state.instances = append(state.instances, instance)

continue

}

2.将instance分发到所有adaptor,最终调用并分发到adaptor的HandleMetric函数中

func (r *reporter) Flush() error {

s := r.impl.getSession(r.ctx, tpb.TEMPLATE_VARIETY_REPORT, nil)

s.reportStates = r.states



s.dispatchBufferedReports()

err := s.err

… …

}

func (s *session) dispatchBufferedReports() {

// Ensure that we can run dispatches to all destinations in parallel.

s.ensureParallelism(len(s.reportStates))



// dispatch the buffered dispatchStates we've got

for k, v := range s.reportStates {

  s.dispatchToHandler(v)

  delete(s.reportStates, k)

}



s.waitForDispatched()

}

func (s *session) dispatchToHandler(ds *dispatchState) {

s.activeDispatches++

ds.session = s

s.impl.gp.ScheduleWork(ds.invokeHandler, nil)

}

case tpb.TEMPLATE_VARIETY_REPORT:

ds.err = ds.destination.Template.DispatchReport(

  ctx, ds.destination.Handler, ds.instances)

type TemplateInfo struct {

Name             string

Variety          tpb.TemplateVariety

DispatchReport   template.DispatchReportFn

DispatchCheck    template.DispatchCheckFn

DispatchQuota    template.DispatchQuotaFn

DispatchGenAttrs template.DispatchGenerateAttributesFn

}

DispatchReport: func(ctx context.Context, handler adapter.Handler, inst []interface{}) error {



// Convert the instances from the generic []interface{}, to their specialized type.

instances := make([]*metric.Instance, len(inst))

for i, instance := range inst {

  instances[i] = instance.(*metric.Instance)

}



// Invoke the handler.

if err := handler.(metric.Handler).HandleMetric(ctx, instances); err != nil {

  return fmt.Errorf("failed to report all values: %v", err)

}

return nil

}

2.2 相关结构体定义

Report接口请求体定义

// Used to report telemetry after performing one or more actions.

type ReportRequest struct {

// 代表一个请求中的属性

// 每个attribute代表一个请求动作,多个动作可汇总在一条message中以提高效率

//虽然每个“属性”消息在语义上被视为与消息中的其他属性无关的独立独立实体,但此消息格式利用属性消息之间的增量编码,以便大幅减少请求大小并改进端到端 效率。 每组单独的属性用于修改前一组。 这消除了在单个请求中多次冗余地发送相同属性的需要。

// 如果客户端上报时不想使用增量编码,可全量的发送所有属性.

Attributes []CompressedAttributes `protobuf:"bytes,1,rep,name=attributes" json:"attributes"`

// 所有属性的默认消息级字典.

// 这使得可以为该请求中的所有属性共享相同的字典,这可以大大减少整体请求大小

DefaultWords []string `protobuf:"bytes,2,rep,name=default_words,json=defaultWords" json:"default_words,omitempty"`

// 全局字典的词条数,可检测客户端与服务端之间的全局字典是否同步

GlobalWordCount uint32 `protobuf:"varint,3,opt,name=global_word_count,json=globalWordCount,proto3" json:"global_word_count,omitempty"`

}

3、总结

Mixer中涉及很多缓存命中等用于优化性能的设计,本文仅介绍了Mixer中Report接口发送到adaptor的过程,一些性能优化设计,如protobag,dispatch缓存等内容,将会在后续文章中解析。

相关服务请访问 https://support.huaweicloud.co ... _2019


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Big Java Late Objects

Big Java Late Objects

Horstmann, Cay S. / 2012-2 / 896.00元

The introductory programming course is difficult. Many students fail to succeed or have trouble in the course because they don't understand the material and do not practice programming sufficiently. ......一起来看看 《Big Java Late Objects》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试