istio 源码 – pilot 源码分析(原创)

栏目: 后端 · 发布时间: 5年前

内容简介:[TOC]完整的 yaml 文件参见

[TOC]

架构

istio 源码 – pilot 源码分析(原创)

介绍

完整的 yaml 文件参见 pilot-yaml

Dockerfile

FROM istionightly/base_debug

ADD pilot-discovery /usr/local/bin/
ADD cacert.pem /cacert.pem
ENTRYPOINT ["/usr/local/bin/pilot-discovery"]

启动命令行

# ps -ef -www|grep pilot
$/usr/local/bin/pilot-discovery discovery

命令行帮助

# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery --help
Usage:
  pilot-discovery [command]

Available Commands:
  discovery   Start Istio proxy discovery service
  help        Help about any command
  request     Makes an HTTP request to Pilot metrics/debug endpoint
  version     Prints out build version information

discovery 相关的帮助

# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery discovery --help
Defaulting container name to discovery.

Start Istio proxy discovery service

Usage:
  pilot-discovery discovery [flags]

Flags:
  -a, --appNamespace string                 Restrict the applications namespace the controller manages; if not set, controller watches all namespaces
      --cfConfig string                     Cloud Foundry config file
      --clusterRegistriesConfigMap string   ConfigMap map for clusters config store
      --clusterRegistriesNamespace string   Namespace for ConfigMap which stores clusters configs
      --configDir string                    Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client.
      --consulserverInterval duration       Interval (in seconds) for polling the Consul service registry (default 2s)
      --consulserverURL string              URL for the Consul server
      --disable-install-crds                Disable discovery service from verifying the existence of CRDs at startup and then installing if not detected.  It is recommended to be disable for highly available setups.
      --discovery_cache                     Enable caching discovery service responses (default true)
      --domain string                       DNS domain suffix (default "cluster.local")
      --grpcAddr string                     Discovery service grpc address (default ":15010")
  -h, --help                                help for discovery
      --httpAddr string                     Discovery service HTTP address (default ":8080")
      --kubeconfig string                   Use a Kubernetes configuration file instead of in-cluster configuration
      --meshConfig string                   File name for Istio mesh configuration. If not specified, a default mesh will be used. (default "/etc/istio/config/mesh")
      --monitoringAddr string               HTTP address to use for the exposing pilot self-monitoring information (default ":9093")
  -n, --namespace string                    Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable
      --plugins stringSlice                 comma separated list of networking plugins to enable (default [authn,authz,health,mixer,envoyfilter])
      --profile                             Enable profiling via web interface host:port/debug/pprof (default true)
      --registries stringSlice              Comma separated list of platform service registries to read from (choose one or more from {Kubernetes, Consul, CloudFoundry, Mock, Config}) (default [Kubernetes])
      --resync duration                     Controller resync interval (default 1m0s)
      --secureGrpcAddr string               Discovery service grpc address, with https (default ":15012")
      --webhookEndpoint string              Webhook API endpoint (supports http://sockethost, and unix:///absolute/path/to/socket

Global Flags:
   省略

主要参数:

名称 默认值 备注
--appNamespace 与 helm 安装中的 oneNamespace 对应
--configDir 表明 pilot 的两种来源:配置文件和 CRD
--discovery_cache ture 启动 cache,有助于提升性能
--domain "cluster.local" k8s 中域后缀
--grpcAddr :15010
--httpAddr :8080
--meshConfig "/etc/istio/config/mesh"
--registries Kubernetes

代码分析

函数入口

discoveryCmd = &cobra.Command{
        Use:   "discovery",
        Short: "Start Istio proxy discovery service.",
        Args:  cobra.ExactArgs(0),
        RunE: func(c *cobra.Command, args []string) error {
            // ...

            // Create the stop channel for all of the servers.
            stop := make(chan struct{})

            // Create the server for the discovery service.
            discoveryServer, err := bootstrap.NewServer(serverArgs)
            if err != nil {
                return fmt.Errorf("failed to create discovery service: %v", err)
            }

            // Start the server
            if err := discoveryServer.Start(stop); err != nil {
                return fmt.Errorf("failed to start discovery service: %v", err)
            }

            cmd.WaitSignal(stop)
            return nil
        },
    }

istio.io/istio/pilot/pkg/bootstrap/server.go

mesh 的默认配置参见:https://gist.github.com/DavadDi/f110459d339e260f818250287fc78ccc#file-mesh

// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args PilotArgs) (*Server, error) {
    // ...
    s := &Server{
        filewatcher: filewatcher.NewWatcher(),
    }

    // 省略错误处理
    s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient
    s.initMesh(&args) // 初始化配置,并添加到  filewatcher 中, /etc/istio/config/mesh

    // 1.0.5 版本的 cmd 中未包括,应该是 1.1 中添加 
    // serverArgs.NetworksConfigFile, "networksConfig", "/etc/istio/config/meshNetworks"
    // 初始化配置,并加入到 filewatcher 中监听
    s.initMeshNetworks(&args)

    // initMixerSan configures the mixerSAN configuration item. 
    // The mesh must already have been configured.
    s.initMixerSan(&args)

    // creates the config controller in the pilotConfig.
    // 最终会创建一个 crd.NewController 实例
    s.initConfigController(&args)
    /* 内部以 kube 为例,表明主要流程
        controller, err := s.makeKubeConfigController(args)
        s.configController = controller

        s.addStartFunc(func(stop <-chan struct{}) error {
            go s.configController.Run(stop)  // 1. 第一个启动的 configController
        })
    */

    // creates and initializes the service controllers
    s.initServiceControllers(&args)
    /*
       s.createK8sServiceControllers(serviceControllers, args)
        --> kube.NewController(s.kubeClient, args.Config.ControllerOptions)

        s.addStartFunc(func(stop <-chan struct{}) error {
        go s.ServiceController.Run(stop)  // 2. 第二个启动的 ServiceController
        return nil
    })
    */

    // 初始化 DiscoveryService gRPC 服务器,后面详细讲解
    // 添加2个 func 到 startFuncs 中
    s.initDiscoveryService(&args)

    // initializes the configuration for the pilot monitoring server.
    // 添加1个 func 到 startFuncs 中
    s.initMonitor(&args)

    // starts the secret controller to watch for remote
    // clusters and initialize the multicluster structures.
    // 暂时不分析
    s.initClusterRegistries(&args)

    return s, nil
}

// 将 NewServer 函数中初始化的函数依次启动起来
func (s *Server) Start(stop <-chan struct{}) error {
    // Now start all of the components.
    for _, fn := range s.startFuncs {
        fn(stop)
    }
}

istio.io/api/mesh/v1alpha1/network.pb.go

其中 MeshNetworks 结构体和定义说明如下:

// MeshNetworks (config map) provides information about the set of networks
// inside a mesh and how to route to endpoints in each network. For example
//
// MeshNetworks(file/config map):
// networks:
// - network1:
//   - endpoints:
//     - fromRegistry: registry1 #must match secret name in kubernetes
//     - fromCidr: 192.168.100.0/22 #a VM network for example
//     gateways:
//     - registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local
//       port: 15443
//       locality: us-east-1a
type MeshNetworks struct {
    // REQUIRED: The set of networks inside this mesh. Each network should
    // have a unique name and information about how to infer the endpoints in
    // the network as well as the gateways associated with the network.
    Networks map[string]*Network `protobuf:"bytes,1,rep,name=networks" json:"networks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"`
}

经过对于 NewServer 函数的计划分析如下:

func NewServer(args PilotArgs) (*Server, error) {
    // 省略错误处理
    s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient

    // 初始化相关配置,并监视配置的变化情况
    // ...

    // creates the config controller in the pilotConfig.
    // 最终会创建一个 crd.NewController 实例
    s.initConfigController(&args)
    // s.makeKubeConfigController(args)
    // s.configController.Run(stop)


    // creates and initializes the service controllers
    s.initServiceControllers(&args)
    // kube.NewController(s.kubeClient, args.Config.ControllerOptions)
    // go s.ServiceController.Run(stop)

    // 初始化 DiscoveryService gRPC 服务器,后面详细讲解
    // 添加 2 个 func 到 startFuncs 中
    s.initDiscoveryService(&args)

    return s, nil
}

ConfigController

// initConfigController creates the config controller in the pilotConfig.
func (s *Server) initConfigController(args *PilotArgs) error {
    // k8s 方式下初始化
    controller, err := s.makeKubeConfigController(args)
    s.configController = controller

    // Defer starting the controller until after the service is created.
    s.addStartFunc(func(stop <-chan struct{}) error {
        go s.configController.Run(stop)
        return nil
    })

    //...

    // Create the config store.
    s.istioConfigStore = model.MakeIstioStore(s.configController)

    return nil
}
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) {
    kubeCfgFile := s.getKubeCfgFile(args)
    configClient, err := crd.NewClient(
        kubeCfgFile, "", 
        model.IstioConfigTypes, // 全部相关的 CRD 定义
        args.Config.ControllerOptions.DomainSuffix)


    if !args.Config.DisableInstallCRDs {
        // 注册自定义的 CRD
        configClient.RegisterResources()
    }

    return crd.NewController(configClient, args.Config.ControllerOptions), nil
}

其中 crd.NewClient 中的参数 model.IstioConfigTypes 包含了相关的全部 CRD 的定义:

istio.io/istio/pilot/pkg/model/config.go

// IstioConfigTypes lists all Istio config types with schemas and validation
    IstioConfigTypes = ConfigDescriptor{
        VirtualService,
        Gateway,
        ServiceEntry,
        DestinationRule,
        EnvoyFilter,
        Sidecar,
        HTTPAPISpec,
        HTTPAPISpecBinding,
        QuotaSpec,
        QuotaSpecBinding,
        AuthenticationPolicy,
        AuthenticationMeshPolicy,
        ServiceRole,
        ServiceRoleBinding,
        RbacConfig,
        ClusterRbacConfig,
    }

其中 crd.NewController 定义在 istio.io/istio/pilot/pkg/config/kube/crd/controller.go 文件中:

// NewController creates a new Kubernetes controller for CRDs
// Use "" for namespace to listen for all namespace changes
func NewController(client *Client, options kube.ControllerOptions) model.ConfigStoreCache {
    log.Infof("CRD controller watching namespaces %q", options.WatchedNamespace)

    // Queue requires a time duration for a retry delay after a handler error
    out := &controller{
        client: client,
        queue:  kube.NewQueue(1 * time.Second),
        kinds:  make(map[string]cacheHandler),
    }

    // add stores for CRD kinds
    for _, schema := range client.ConfigDescriptor() {
        out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod)
    }

    return out
}

对于 CRD 的监视,每一类资源需要启动一个单独的客户端连接,目前 CRD 的 Group 主要有 network/config/authentication/rbac 等;

// controller is a collection of synchronized resource watchers.
// Caches are thread-safe
type controller struct {
    client *Client  // 每类资源一个连接
    queue  kube.Queue
    kinds  map[string]cacheHandler
}

type cacheHandler struct {
    informer cache.SharedIndexInformer
    handler  *kube.ChainHandler
}
// Client is a basic REST client for CRDs implementing config store
type Client struct {
    // Map of apiVersion to restClient.
    clientset map[string]*restClient

    // domainSuffix for the config metadata
    domainSuffix string
}

ServiceController

istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go

结构定义如下:

type Controller struct {
    domainSuffix string

    client    kubernetes.Interface
    queue     Queue
    services  cacheHandler
    endpoints cacheHandler
    nodes     cacheHandler

    pods *PodCache

    //....
}

NewController 函数定义如下:

// NewController creates a new Kubernetes controller
// Created by bootstrap and multicluster (see secretcontroler).
func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
    log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
        options.WatchedNamespace, options.ResyncPeriod)

    // Queue requires a time duration for a retry delay after a handler error
    out := &Controller{
        domainSuffix:               options.DomainSuffix,
        client:                     client,
        queue:                      NewQueue(1 * time.Second),
        ClusterID:                  options.ClusterID,
        XDSUpdater:                 options.XDSUpdater,
        servicesMap:                make(map[model.Hostname]*model.Service),
        externalNameSvcInstanceMap: make(map[model.Hostname][]*model.ServiceInstance),
    }

    sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))

    svcInformer := sharedInformers.Core().V1().Services().Informer()
    out.services = out.createCacheHandler(svcInformer, "Services")

    epInformer := sharedInformers.Core().V1().Endpoints().Informer()
    out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")

    nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
    out.nodes = out.createCacheHandler(nodeInformer, "Nodes")

    podInformer := sharedInformers.Core().V1().Pods().Informer()
    out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)

    return out
}

从上面代码可以很清晰看到,ServiceController 监听特定 namespace 下的以下资源:

  1. Services
  2. Endpoints
  3. Nodes
  4. Pod

ServiceDiscovery

istio.io/istio/pilot/pkg/bootstrap/server.go

func (s *Server) initDiscoveryService(args *PilotArgs) error {
    // 需要注意 env 保存了后面使用的变量包括 
    // s.istioConfigStore  s.ServiceController s.ServiceController
    environment := &model.Environment{
        Mesh:             s.mesh,
        MeshNetworks:     s.meshNetworks,
        IstioConfigStore: s.istioConfigStore, // istio routing rules
        ServiceDiscovery: s.ServiceController, // service list 
        ServiceAccounts:  s.ServiceController,
        MixerSAN:         s.mixerSAN,
    }

    // Set up discovery service
    discovery, err := envoy.NewDiscoveryService(
        environment,
        args.DiscoveryOptions,
    )

    s.mux = discovery.RestContainer.ServeMux

    // 创建  envoyv2.NewDiscoveryServer 对应的 gRPC Server
    s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
        environment,
        istio_networking.NewConfigGenerator(args.Plugins),
        s.ServiceController, 
        s.configController)

    s.EnvoyXdsServer.InitDebug(s.mux, s.ServiceController)

    // ...

    // create grpc/http server
    s.initGrpcServer(args.KeepaliveOptions)
    s.httpServer = &http.Server{
        Addr:    args.DiscoveryOptions.HTTPAddr,
        Handler: s.mux,
    }

    // create http listener
    listener, err := net.Listen("tcp", args.DiscoveryOptions.HTTPAddr)
    s.HTTPListeningAddr = listener.Addr()

    // create grpc listener
    grpcListener, err := net.Listen("tcp", args.DiscoveryOptions.GrpcAddr)
    s.GRPCListeningAddr = grpcListener.Addr()

    s.addStartFunc(func(stop <-chan struct{}) error {
        // 启动 http  server goroutine
        // 启动 gRPC server goroutine
        // 等待关闭 goroutine

        return nil
    })

    // run secure grpc server

    return nil
}

istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go, envoyv2.NewDiscoveryServer 函数定义:

//  s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
//        environment,
//        istio_networking.NewConfigGenerator(args.Plugins),
//        s.ServiceController, 
//        s.configController)
// NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures
func NewDiscoveryServer(env *model.Environment, generator core.ConfigGenerator, ctl model.Controller, configCache model.ConfigStoreCache) *DiscoveryServer {
    // 创建 DiscoveryServer 对象
    out := &DiscoveryServer{
        Env:                     env,
        ConfigGenerator:         generator, // generates xDS responses
        ConfigController:        configCache,
        EndpointShardsByService: map[string]*EndpointShards{},
        WorkloadsByID:           map[string]*Workload{},
        edsUpdates:              map[string]*EndpointShards{},
        concurrentPushLimit:     make(chan struct{}, 20), 
        updateChannel:           make(chan *updateReq, 10),
    }
    env.PushContext = model.NewPushContext()

    // 处理相关的更新操作
    // handleUpdates处理来自 updateChannel 的事件它确保自上次事件处理之前至少已经过了minQuiet时间。
    // 它还确保在接收事件和处理事件之间最多经过 maxDelay。
    // 最后调用 doPush 函数进行推送,根据全量推送标记,将最近更新的 eds 相关信息通过 
    // XDS Incremental Push 或者 全量推送出去
    go out.handleUpdates()

    // 以下三种清空 DiscoveryServer 的本地缓存,并注册清理缓存的函数
    // 1. service 相关的信息有变化的时候,
    // 2. jwt public key 发生变化
    // 3. Istio CRD 有变化的
    // 当以上三种任一情况发生的时候,会设置信息到 out.handleUpdates() 函数

    // 周期性更新
    go out.periodicRefresh()

    // 周期性更新 metrics
    go out.periodicRefreshMetrics()

    out.DebugConfigs = pilot.DebugConfigs

    pushThrottle := intEnv(pilot.PushThrottle, 10)
    pushBurst := intEnv(pilot.PushBurst, 100)

    adsLog.Infof("Starting ADS server with rateLimiter=%d burst=%d", pushThrottle, pushBurst)
    out.rateLimiter = rate.NewLimiter(rate.Limit(pushThrottle), pushBurst)
    out.initRateLimiter = rate.NewLimiter(rate.Limit(pushThrottle*2), pushBurst*2)

    return out
}

ADS 相关定义:

https://github.com/envoyproxy/data-plane-api/blob/master/envoy/service/discovery/v2/ads.proto

// See https://github.com/lyft/envoy-api#apis for a description of the role of
// ADS and how it is intended to be used by a management server. ADS requests
// have the same structure as their singleton xDS counterparts, but can
// multiplex many resource types on a single stream. The type_url in the
// DiscoveryRequest/DiscoveryResponse provides sufficient information to recover
// the multiplexed singleton APIs at the Envoy instance and management server.
service AggregatedDiscoveryService {
  // This is a gRPC-only API.
  rpc StreamAggregatedResources(stream envoy.api.v2.DiscoveryRequest)
      returns (stream envoy.api.v2.DiscoveryResponse) {
  }

  rpc IncrementalAggregatedResources(stream envoy.api.v2.IncrementalDiscoveryRequest)
      returns (stream envoy.api.v2.IncrementalDiscoveryResponse) {
  }
}

https://github.com/envoyproxy/data-plane-api/blob/master/envoy/api/v2/discovery.proto

// A DiscoveryRequest requests a set of versioned resources of the same type for
// a given Envoy node on some API.
message DiscoveryRequest {
  // The version_info provided in the request messages will be the version_info
  // received with the most recent successfully processed response or empty on
  // the first request. It is expected that no new request is sent after a
  // response is received until the Envoy instance is ready to ACK/NACK the new
  // configuration. ACK/NACK takes place by returning the new API config version
  // as applied or the previous API config version respectively. Each type_url
  // (see below) has an independent version associated with it.
  string version_info = 1;

  // The node making the request.
  core.Node node = 2;

  // List of resources to subscribe to, e.g. list of cluster names or a route
  // configuration name. If this is empty, all resources for the API are
  // returned. LDS/CDS expect empty resource_names, since this is global
  // discovery for the Envoy instance. The LDS and CDS responses will then imply
  // a number of resources that need to be fetched via EDS/RDS, which will be
  // explicitly enumerated in resource_names.
  repeated string resource_names = 3;

  // Type of the resource that is being requested, e.g.
  // "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment". This is implicit
  // in requests made via singleton xDS APIs such as CDS, LDS, etc. but is
  // required for ADS.
  string type_url = 4;

  // nonce corresponding to DiscoveryResponse being ACK/NACKed. See above
  // discussion on version_info and the DiscoveryResponse nonce comment. This
  // may be empty if no nonce is available, e.g. at startup or for non-stream
  // xDS implementations.
  string response_nonce = 5;

  // This is populated when the previous :ref:`DiscoveryResponse <envoy_api_msg_DiscoveryResponse>`
  // failed to update configuration. The *message* field in *error_details* provides the Envoy
  // internal exception related to the failure. It is only intended for consumption during manual
  // debugging, the string provided is not guaranteed to be stable across Envoy versions.
  google.rpc.Status error_detail = 6;
}

message DiscoveryResponse {
  // The version of the response data.
  string version_info = 1;

  // The response resources. These resources are typed and depend on the API being called.
  repeated google.protobuf.Any resources = 2 [(gogoproto.nullable) = false];

  // [#not-implemented-hide:]
  // Canary is used to support two Envoy command line flags:
  //
  // * --terminate-on-canary-transition-failure. When set, Envoy is able to
  //   terminate if it detects that configuration is stuck at canary. Consider
  //   this example sequence of updates:
  //   - Management server applies a canary config successfully.
  //   - Management server rolls back to a production config.
  //   - Envoy rejects the new production config.
  //   Since there is no sensible way to continue receiving configuration
  //   updates, Envoy will then terminate and apply production config from a
  //   clean slate.
  // * --dry-run-canary. When set, a canary response will never be applied, only
  //   validated via a dry run.
  bool canary = 3;

  // Type URL for resources. This must be consistent with the type_url in the
  // Any messages for resources if resources is non-empty. This effectively
  // identifies the xDS API when muxing over ADS.
  string type_url = 4;

  // For gRPC based subscriptions, the nonce provides a way to explicitly ack a
  // specific DiscoveryResponse in a following DiscoveryRequest. Additional
  // messages may have been sent by Envoy to the management server for the
  // previous version on the stream prior to this DiscoveryResponse, that were
  // unprocessed at response send time. The nonce allows the management server
  // to ignore any further DiscoveryRequests for the previous version until a
  // DiscoveryRequest bearing the nonce. The nonce is optional and is not
  // required for non-stream based xDS implementations.
  string nonce = 5;

  // [#not-implemented-hide:]
  // The control plane instance that sent the response.
  core.ControlPlane control_plane = 6;
}

关于 ADS 保证一致性的内容参见: envoy-的-xds-rest-和-grpc-协议详解 中的 ”最终一致性考虑“ 章节:

一般来说,为避免流量丢弃,更新的顺序应该遵循 make before break 模型,其中

* 必须始终先推送 CDS 更新(如果有)。

* EDS 更新(如果有)必须在相应集群的 CDS 更新后到达。

* LDS 更新必须在相应的 CDS/EDS 更新后到达。

* 与新添加的监听器相关的 RDS 更新必须在最后到达。

* 最后,删除过期的 CDS 集群和相关的 EDS 端点(不再被引用的端点)。

ADS 允许单一管理服务器通过单个 gRPC 流,提供所有的 API 更新。配合仔细规划的更新顺序,ADS 可规避更新过程中流量丢失。

pilot/pkg/proxy/envoy/v2/ads.go

// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    // ...

    // InitContext returns immediately if the context was already initialized.
    // InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
    // 1. initServiceRegistry
    // 2. initVirtualServices
    // 3. initDestinationRules
    // 4. initAuthorizationPolicies
    // 5. InitSidecarScopes
    err := s.globalPushContext().InitContext(s.Env)
    con := newXdsConnection(peerAddr, stream)

    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    // 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
    go receiveThread(con, reqChannel, &receiveError)

    for {
        // Block until either a request is received or a push is triggered.
        select {
        case discReq, ok := <-reqChannel:
              err = s.initConnectionNode(discReq, con)

            switch discReq.TypeUrl {
            case ClusterType:
                // ...
                // CDS REQ is the first request an envoy makes. This shows up
                // immediately after connect. It is followed by EDS REQ as
                // soon as the CDS push is returned.
                adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
                con.CDSWatch = true
                err := s.pushCds(con, s.globalPushContext(), versionInfo())

            case ListenerType:
                // ...
                adsLog.Debugf("ADS:LDS: REQ %s %v", con.ConID, peerAddr)
                con.LDSWatch = true
                err := s.pushLds(con, s.globalPushContext(), true, versionInfo())

            case RouteType:
                // ...
                adsLog.Debugf("ADS:RDS: REQ %s %s  routes: %d", peerAddr, con.ConID, len(con.Routes))
                err := s.pushRoute(con, s.globalPushContext())

            case EndpointType:
                // ...
                // 各种错误处理

                for _, cn := range con.Clusters {
                    s.removeEdsCon(cn, con.ConID, con)
                }

                for _, cn := range clusters {
                    s.addEdsCon(cn, con.ConID, con)
                }

                con.Clusters = clusters
                adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters))
                err := s.pushEds(s.globalPushContext(), con, true, nil)
                if err != nil {
                    return err
                }

            default:
                adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
            }
            // ...
        case pushEv := <-con.pushChannel:
            // ...

            err := s.pushConnection(con, pushEv)
            if err != nil {
                return nil
            }

        }
    }
}

istio.io/istio/pilot/pkg/model/push_context.go

// PushContext tracks the status of a push - metrics and errors.
// Metrics are reset after a push - at the beginning all
// values are zero, and when push completes the status is reset.
// The struct is exposed in a debug endpoint - fields public to allow
// easy serialization as json.
type PushContext struct {

    // privateServices are reachable within the same namespace.
    privateServicesByNamespace map[string][]*Service
    // publicServices are services reachable within the mesh.
    publicServices []*Service

    privateVirtualServicesByNamespace map[string][]Config
    publicVirtualServices             []Config

    // destination rules are of three types:
    // namespaceLocalDestRules: all public/private dest rules pertaining to a service defined in a given namespace
    //  namespaceExportedDestRules: all public dest rules pertaining to a service defined in a namespace
    //  allExportedDestRules: all (public) dest rules across all namespaces
    // We need the allExportedDestRules in addition to namespaceExportedDestRules because we select
    // the dest rule based on the most specific host match, and not just any destination rule
    namespaceLocalDestRules    map[string]*processedDestRules
    namespaceExportedDestRules map[string]*processedDestRules
    allExportedDestRules       *processedDestRules

    // sidecars for each namespace
    sidecarsByNamespace map[string][]*SidecarScope
    ////////// END ////////

    // The following data is either a global index or used in the inbound path.
    // Namespace specific views do not apply here.

    // ServiceByHostname has all services, indexed by hostname.
    ServiceByHostname map[Hostname]*Service `json:"-"`

    // AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there
    // are no authorization policies in the cluster.
    AuthzPolicies *AuthorizationPolicies `json:"-"`

    // ServicePort2Name is used to keep track of service name and port mapping.
    // This is needed because ADS names use port numbers, while endpoints use
    // port names. The key is the service name. If a service or port are not found,
    // the endpoint needs to be re-evaluated later (eventual consistency)
    ServicePort2Name map[string]PortList `json:"-"`

    initDone bool
}


// InitContext will initialize the data structures used for code generation.
// This should be called before starting the push, from the thread creating
// the push context.
func (ps *PushContext) InitContext(env *Environment) error {
    ps.Mutex.Lock()
    defer ps.Mutex.Unlock()
    if ps.initDone {
        return nil
    }
    ps.Env = env
    var err error

    // Caches list of services in the registry, and creates a map
    // of hostname to service -> ServicePort2Name map[string]PortList `json:"-"`
    ps.initServiceRegistry(env)

    // Caches list of virtual services -> publicVirtualServices []Config
    ps.initVirtualServices(env)

    // Split out of DestinationRule expensive conversions - once per push.
    // 最后保存到以下三个变量中:
    //  * namespaceLocalDestRules    map[string]*processedDestRules
    //  * namespaceExportedDestRules map[string]*processedDestRules
    //  * allExportedDestRules       *processedDestRules
    ps.initDestinationRules(env)

    // Get the ClusterRbacConfig -> AuthzPolicies *AuthorizationPolicies
    ps.initAuthorizationPolicies(env)

    // Must be initialized in the end -> sidecarsByNamespace map[string][]*SidecarScope
    ps.InitSidecarScopes(env)

    ps.initDone = true
    return nil
}

CDS 为 ADS 中第一个发送的信息,后续我们以 CDS 为例进行详细分析

// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    // ...

    // InitContext returns immediately if the context was already initialized.
    // InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
    // 1. initServiceRegistry
    // 2. initVirtualServices
    // 3. initDestinationRules
    // 4. initAuthorizationPolicies
    // 5. InitSidecarScopes
    err := s.globalPushContext().InitContext(s.Env)
    con := newXdsConnection(peerAddr, stream)

    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    // 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
    go receiveThread(con, reqChannel, &receiveError)

    for {
        // Block until either a request is received or a push is triggered.
        select {
        case discReq, ok := <-reqChannel:
            // 主要是调用 `ParseServiceNodeWithMetadata` 函数,
            // 从 Req 的消息中获取到各种信息,并生产 `Proxy` 对象
            // 当前 discReq.Node.Id 格式为 Type~IPAddress~ID~Domain
            err = s.initConnectionNode(discReq, con)

            switch discReq.TypeUrl {
            case ClusterType:
                // 如果已经发送过 CS 数据后的响应消息的处理
                if con.CDSWatch {
                    // ...
                }
                // CDS REQ is the first request an envoy makes. This shows up
                // immediately after connect. It is followed by EDS REQ as
                // soon as the CDS push is returned.
                adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
                con.CDSWatch = true
                err := s.pushCds(con, s.globalPushContext(), versionInfo())
                if err != nil {
                    return err
                }

                //...
    }

core.Node

Identifies a specific Envoy instance. The node identifier is presented to the management server, which may use this identifier to distinguish per Envoy configuration for serving.{

"id": "...",

"cluster": "...",

"metadata": "{...}",

"locality": "{...}",

"build_version": "..."

}

id

(string) An opaque node identifier for the Envoy node. This also provides the local service node name. It should be set if any of the following features are used: statsd, CDS, and HTTP tracing, either in this message or via --service-node.

cluster

(string) Defines the local service cluster name where Envoy is running. Though optional, it should be set if any of the following features are used: statsd, health check cluster verification, runtime override directory, user agent addition, HTTP global rate limiting, CDS, and HTTP tracing, either in this message or via --service-cluster.

metadata

(Struct) Opaque metadata extending the node identifier. Envoy will pass this directly to the management server.

locality

(core.Locality) Locality specifying where the Envoy instance is running.

build_version

(string) This is motivated by informing a management server during canary which version of Envoy is being tested in a heterogeneous fleet. This will be set by Envoy in management server RPCs.

initConnectionNode 函数中,主要是调用 ParseServiceNodeWithMetadata 函数,从 Req 的消息中获取到各种信息,并生产 Proxy 对象;

istio.io/istio/pilot/pkg/model/context.go

func ParseServiceNodeWithMetadata(s string, metadata map[string]string) (*Proxy, error) {
}
// Proxy contains information about an specific instance of a proxy (envoy sidecar, gateway,
// etc). The Proxy is initialized when a sidecar connects to Pilot, and populated from
// 'node' info in the protocol as well as data extracted from registries.
//
// In current Istio implementation nodes use a 4-parts '~' delimited ID.
// Type~IPAddress~ID~Domain
type Proxy struct {
    // ClusterID specifies the cluster where the proxy resides.
    // TODO: clarify if this is needed in the new 'network' model, likely needs to
    // be renamed to 'network'
    ClusterID string

    // Type specifies the node type. First part of the ID.
    Type NodeType

    // IPAddresses is the IP addresses of the proxy used to identify it and its
    // co-located service instances. Example: "10.60.1.6". In some cases, the host
    // where the poxy and service instances reside may have more than one IP address
    IPAddresses []string

    // ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and
    // namespace.
    ID string

    // Locality is the location of where Envoy proxy runs.
    Locality Locality

    // DNSDomain defines the DNS domain suffix for short hostnames (e.g.
    // "default.svc.cluster.local")
    DNSDomain string

    // TrustDomain defines the trust domain of the certificate
    TrustDomain string

    // ConfigNamespace defines the namespace where this proxy resides
    // for the purposes of network scoping.
    // NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES
    ConfigNamespace string

    // Metadata key-value pairs extending the Node identifier
    Metadata map[string]string

    // the sidecarScope associated with the proxy
    SidecarScope *SidecarScope
}

推送 CDS 的核心实现通过函数 s.pushCds(con, s.globalPushContext(), versionInfo())

istio.io/istio/pilot/pkg/proxy/envoy/v2/cds.go

func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
    // 通过当前的 con.modelNode 和 push 的上下文生成对应的 rawClusters 对象 []*xdsapi.Cluster
    rawClusters, err := s.generateRawClusters(con.modelNode, push)

    response := con.clusters(rawClusters)
    err = con.send(response)
    return nil
}

因此 s.generateRawClusters(con.modelNode, push) 的作用不言而喻,就是将 push 上下文中与 CDS 相关的数据整理并封装成 CDS Reponse 的格式。

func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) ([]*xdsapi.Cluster, error) {
    rawClusters, err := s.ConfigGenerator.BuildClusters(s.Env, node, push)

    // 对 rawClusters 中的信息进行 Validate 验证
    return rawClusters, nil
}

对象 rawClusters 是通过 s.ConfigGenerator.BuildClusters 函数基于 s.Env, node, push 三者的信息组合而生成出来的:

  • s.Env 保存了 ServiceController 和 ConfigController 等资源的本地缓存信息
  • node 为本次 Req 请求中生成的包含 Node 相关信息的对象

  • push 为本次推送的上下文,已经将本次推送过程中需要的信息完成了初步的整理(从 s.Env 中生成出来的)

istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go

BuildClusters 函数的实现如下:

// BuildClusters returns the list of clusters for the given proxy. This is the CDS output
// For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname
// Cluster type based on resolution
// For inbound (sidecar only): Cluster for each inbound endpoint port and for each service port
func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) {
    clusters := make([]*apiv2.Cluster, 0)

    switch proxy.Type {
    case model.SidecarProxy:
        // GetProxyServiceInstances returns service instances co-located with the proxy
        // 获取与 proxy 所在主机上的 service instance,包括 headless 服务
        instances, err := env.GetProxyServiceInstances(proxy)

        sidecarScope := proxy.SidecarScope
        recomputeOutboundClusters := true
        // 追加 OutboundClusters
        if recomputeOutboundClusters {
            clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...)
        }

        // Let ServiceDiscovery decide which IP and Port are used for management if
        // there are multiple IPs
        managementPorts := make([]*model.Port, 0)
        for _, ip := range proxy.IPAddresses {
            managementPorts = append(managementPorts, env.ManagementPorts(ip)...)
        }

        // 追加与 proxy ip 上 managementPorts 相关的 InboundClusters
        clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...)

    default: // Gateways
        // ...
    }

    // Add a blackhole and passthrough cluster for catching traffic to unresolved routes
    // DO NOT CALL PLUGINS for these two clusters.
    clusters = append(clusters, buildBlackHoleCluster())
    clusters = append(clusters, buildDefaultPassthroughCluster())

    // resolves cluster name conflicts. 
    return normalizeClusters(push, proxy, clusters), nil
}

临时的调试方法:

istio.io/istio/pilot/pkg/proxy/envoy/v2/debug.go

// InitDebug initializes the debug handlers and adds a debug in-memory registry.
func (s *DiscoveryServer) InitDebug(mux *http.ServeMux, sctl *aggregate.Controller) {
    // For debugging and load testing v2 we add an memory registry.
    s.MemRegistry = NewMemServiceDiscovery(
        map[model.Hostname]*model.Service{ // mock.HelloService.Hostname: mock.HelloService,
        }, 2)
    s.MemRegistry.EDSUpdater = s
    s.MemRegistry.ClusterID = "v2-debug"

    sctl.AddRegistry(aggregate.Registry{
        ClusterID:        "v2-debug",
        Name:             serviceregistry.ServiceRegistry("memAdapter"),
        ServiceDiscovery: s.MemRegistry,
        ServiceAccounts:  s.MemRegistry,
        Controller:       s.MemRegistry.controller,
    })

    mux.HandleFunc("/ready", s.ready)

    mux.HandleFunc("/debug/edsz", s.edsz)
    mux.HandleFunc("/debug/adsz", s.adsz)
    mux.HandleFunc("/debug/cdsz", cdsz)
    mux.HandleFunc("/debug/syncz", Syncz)

    mux.HandleFunc("/debug/registryz", s.registryz)
    mux.HandleFunc("/debug/endpointz", s.endpointz)
    mux.HandleFunc("/debug/endpointShardz", s.endpointShardz)
    mux.HandleFunc("/debug/workloadz", s.workloadz)
    mux.HandleFunc("/debug/configz", s.configz)

    mux.HandleFunc("/debug/authenticationz", s.authenticationz)
    mux.HandleFunc("/debug/config_dump", s.ConfigDump)
    mux.HandleFunc("/debug/push_status", s.PushStatusHandler)
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

ANSI Common Lisp

ANSI Common Lisp

Paul Graham / Prentice Hall / 1995-11-12 / USD 116.40

For use as a core text supplement in any course covering common LISP such as Artificial Intelligence or Concepts of Programming Languages. Teaching students new and more powerful ways of thinking abo......一起来看看 《ANSI Common Lisp》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具