idou老师教你学Istio 27:解读Mixer Report流程

栏目: 后端 · 发布时间: 5年前

内容简介:1、概述Mixer是Istio的核心组件,提供了遥测数据收集的功能,能够实时采集服务的请求状态等信息,以达到监控服务状态目的。1.1 核心功能

1、概述

Mixer是Istio的核心组件,提供了遥测数据收集的功能,能够实时采集服务的请求状态等信息,以达到监控服务状态目的。

1.1 核心功能

•前置检查(Check):某服务接收并响应外部请求前,先通过Envoy向Mixer(Policy组件)发送Check请求,做一些access检查,同时确认adaptor所需cache字段,供之后Report接口使用;

•配额管理(Quota):通过配额管理机制,处理多请求时发生的资源竞争;

•遥测数据上报(Report):该服务请求处理结束后,将请求相关的日志,监控等数据,通过Envoy上报给Mixer(telemetry)

1.2 示例图

idou老师教你学Istio 27:解读Mixer Report流程

2、代码分析

2.1 Report代码分析

本节主要介绍Report的详细流程(基于Istio release1.0.0版本,commit id为3a136c90)。Report是mixer server的一个接口,供Envoy通过grpc调用。首先,我们从mixer server的启动入口main函数看起:

func main() {

rootCmd := cmd.GetRootCmd(os.Args[1:], supportedTemplates(), supportedAdapters(), shared.Printf, shared.Fatalf)



if err := rootCmd.Execute(); err != nil {

  os.Exit(-1)

}

}

在rootCmd中,mixs通过server命令启动了mixer server,从而触发了runserver函数,在runserver中初始化(New)了一个server,我们一起看下newServer的函数,在这个函数中,与我们本节相关的内容就是Mixer初始化了一个grpc服务器NewGRPCServer。

rootCmd.AddCommand(serverCmd(info, adapters, printf, fatalf))

func serverCmd(info map[string]template.Info, adapters []adapter.InfoFn, printf, fatalf shared.FormatFn) *cobra.Command {

sa := server.DefaultArgs()

sa.Templates = info

sa.Adapters = adapters



serverCmd := &cobra.Command{

  Use:   "server",

  Short: "Starts Mixer as a server",

  Run: func(cmd *cobra.Command, args []string) {

     runServer(sa, printf, fatalf)

  },

}… …

}

func newServer(a *Args, p *patchTable) (*Server, error) {

grpc.EnableTracing = a.EnableGRPCTracing

s.server = grpc.NewServer(grpcOptions...)

mixerpb.RegisterMixerServer(s.server, api.NewGRPCServer(s.dispatcher, s.gp, s.checkCache))

}

在这个grpc的服务端中,定义了一个Report接口,这就是我们这节课主要关注的内容(可以看到Check接口也在此定义,我们下节再讲)

func (s *grpcServer) Report(ctx context.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) {

lg.Debugf("Report (Count: %d)", len(req.Attributes))

// 校验attribute是否为空,空则直接return

if len(req.Attributes) == 0 {

  return reportResp, nil

}



// 若属性word为空,赋为默认值

for i := 0; i < len(req.Attributes); i++ {

  iflen(req.Attributes[i].Words) == 0 {

     req.Attributes[i].Words = req.DefaultWords

  }

}



// 根据第一条attribute,生成proto包,proto包能跟踪一组attributes

protoBag := attribute.NewProtoBag(&req.Attributes[0], s.globalDict, s.globalWordList)



// 初始化,开始跟踪attributes各个条目中属性

accumBag := attribute.GetMutableBag(protoBag)



// 保存accumBag的增量状态

reportBag := attribute.GetMutableBag(accumBag)



reportSpan, reportCtx := opentracing.StartSpanFromContext(ctx, "Report")

reporter := s.dispatcher.GetReporter(reportCtx)



var errors *multierror.Error

for i := 0; i < len(req.Attributes); i++ {

  span, newctx := opentracing.StartSpanFromContext(reportCtx, fmt.Sprintf("attribute bag %d", i))



  // 第一个属性已经在创建proto包时创建,在此追踪所有attributes

  if i > 0 {

     if err := accumBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList); err != nil {

        err = fmt.Errorf("request could not be processed due to invalid attributes: %v", err)

        span.LogFields(otlog.String("error", err.Error()))

        span.Finish()

        errors = multierror.Append(errors, err)

        break

     }

  }



  lg.Debug("Dispatching Preprocess")

  // 真正开始分发,预处理阶段

  if err := s.dispatcher.Preprocess(newctx, accumBag, reportBag); err != nil {

     err = fmt.Errorf("preprocessing attributes failed: %v", err)

     span.LogFields(otlog.String("error", err.Error()))

     span.Finish()

     errors = multierror.Append(errors, err)

     continue

  }



  lg.Debug("Dispatching to main adapters after running preprocessors")

  lg.Debuga("Attribute Bag: \n", reportBag)

  lg.Debugf("Dispatching Report %d out of %d", i+1, len(req.Attributes))

  // 真正开始分发,将数据逐步加入到缓存中

  if err := reporter.Report(reportBag); err != nil {

     span.LogFields(otlog.String("error", err.Error()))

     span.Finish()

     errors = multierror.Append(errors, err)

     continue

  }



  span.Finish()



  // purge the effect of the Preprocess call so that the next time through everything is clean

  reportBag.Reset()

}



reportBag.Done()

accumBag.Done()

protoBag.Done()

// 真正的发送函数,从缓存中取出并发送到adaptor

if err := reporter.Flush(); err != nil {

  errors = multierror.Append(errors, err)

}

reporter.Done()



if errors != nil {

  reportSpan.LogFields(otlog.String("error", errors.Error()))

}

reportSpan.Finish()



if errors != nil {

  lg.Errora("Report failed:", errors.Error())

  return nil, grpc.Errorf(codes.Unknown, errors.Error())

}

// 过程结束

return reportResp, nil

}

通过上述代码解读,我们了解了Report接口的工作流程,但此时我们还并不知道一个请求的状态是如何报给adaptor的,下面我们通过简要的函数串接,把这部分流程串起来:

上述的预处理阶段Preprocess与上报阶段Report,最终都会调用到dispatch函数,仅通过不同的type来区分要做的事情;

func (d *Impl) Preprocess(ctx context.Context, bag attribute.Bag, responseBag *attribute.MutableBag) error {

s := d.getSession(ctx, tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR, bag)

s.responseBag = responseBag

err := s.dispatch()

if err == nil {

  err = s.err

}

… …

}

func (r *reporter) Report(bag attribute.Bag) error {

s := r.impl.getSession(r.ctx, tpb.TEMPLATE_VARIETY_REPORT, bag)

s.reportStates = r.states

err := s.dispatch()

if err == nil {

  err = s.err

}

… …

}

而dispatch函数中做了真正的分发动作,包括:

1.遍历所有adaptor,调用adaptor中的函数,针对不同的adaptor生成不同的instance,并将instance缓存放入reportstates

var instance interface{}

if instance, err = input.Builder(s.bag); err != nil {

log.Errorf("error creating instance: destination='%v', error='%v'", destination.FriendlyName, err)

s.err = multierror.Append(s.err, err)

continue

}

type NamedBuilder struct {

InstanceShortName string

Builder           template.InstanceBuilderFn

}

InstanceBuilderFn func(attrs attribute.Bag) (interface{}, error)

CreateInstanceBuilder: func(instanceName string, param proto.Message, expb *compiled.ExpressionBuilder) (template.InstanceBuilderFn, error)

builder.build(attr)

// For report templates, accumulate instances as much as possible before commencing dispatch.

if s.variety == tpb.TEMPLATE_VARIETY_REPORT {

state.instances = append(state.instances, instance)

continue

}

2.将instance分发到所有adaptor,最终调用并分发到adaptor的HandleMetric函数中

func (r *reporter) Flush() error {

s := r.impl.getSession(r.ctx, tpb.TEMPLATE_VARIETY_REPORT, nil)

s.reportStates = r.states



s.dispatchBufferedReports()

err := s.err

… …

}

func (s *session) dispatchBufferedReports() {

// Ensure that we can run dispatches to all destinations in parallel.

s.ensureParallelism(len(s.reportStates))



// dispatch the buffered dispatchStates we've got

for k, v := range s.reportStates {

  s.dispatchToHandler(v)

  delete(s.reportStates, k)

}



s.waitForDispatched()

}

func (s *session) dispatchToHandler(ds *dispatchState) {

s.activeDispatches++

ds.session = s

s.impl.gp.ScheduleWork(ds.invokeHandler, nil)

}

case tpb.TEMPLATE_VARIETY_REPORT:

ds.err = ds.destination.Template.DispatchReport(

  ctx, ds.destination.Handler, ds.instances)

type TemplateInfo struct {

Name             string

Variety          tpb.TemplateVariety

DispatchReport   template.DispatchReportFn

DispatchCheck    template.DispatchCheckFn

DispatchQuota    template.DispatchQuotaFn

DispatchGenAttrs template.DispatchGenerateAttributesFn

}

DispatchReport: func(ctx context.Context, handler adapter.Handler, inst []interface{}) error {



// Convert the instances from the generic []interface{}, to their specialized type.

instances := make([]*metric.Instance, len(inst))

for i, instance := range inst {

  instances[i] = instance.(*metric.Instance)

}



// Invoke the handler.

if err := handler.(metric.Handler).HandleMetric(ctx, instances); err != nil {

  return fmt.Errorf("failed to report all values: %v", err)

}

return nil

}

2.2 相关结构体定义

Report接口请求体定义

// Used to report telemetry after performing one or more actions.

type ReportRequest struct {

// 代表一个请求中的属性

// 每个attribute代表一个请求动作,多个动作可汇总在一条message中以提高效率

//虽然每个“属性”消息在语义上被视为与消息中的其他属性无关的独立独立实体,但此消息格式利用属性消息之间的增量编码,以便大幅减少请求大小并改进端到端 效率。 每组单独的属性用于修改前一组。 这消除了在单个请求中多次冗余地发送相同属性的需要。

// 如果客户端上报时不想使用增量编码,可全量的发送所有属性.

Attributes []CompressedAttributes `protobuf:"bytes,1,rep,name=attributes" json:"attributes"`

// 所有属性的默认消息级字典.

// 这使得可以为该请求中的所有属性共享相同的字典,这可以大大减少整体请求大小

DefaultWords []string `protobuf:"bytes,2,rep,name=default_words,json=defaultWords" json:"default_words,omitempty"`

// 全局字典的词条数,可检测客户端与服务端之间的全局字典是否同步

GlobalWordCount uint32 `protobuf:"varint,3,opt,name=global_word_count,json=globalWordCount,proto3" json:"global_word_count,omitempty"`

}

3、总结

Mixer中涉及很多缓存命中等用于优化性能的设计,本文仅介绍了Mixer中Report接口发送到adaptor的过程,一些性能优化设计,如protobag,dispatch缓存等内容,将会在后续文章中解析。

相关服务请访问 https://support.huaweicloud.co ... _2019


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Hackers

Hackers

Steven Levy / O'Reilly Media / 2010-5-30 / USD 21.99

This 25th anniversary edition of Steven Levy's classic book traces the exploits of the computer revolution's original hackers -- those brilliant and eccentric nerds from the late 1950s through the ear......一起来看看 《Hackers》 这本书的介绍吧!

html转js在线工具
html转js在线工具

html转js在线工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具