istio 源码 – pilot 源码分析(原创)

栏目: 后端 · 发布时间: 5年前

内容简介:[TOC]完整的 yaml 文件参见

[TOC]

架构

istio 源码 – pilot 源码分析(原创)

介绍

完整的 yaml 文件参见 pilot-yaml

Dockerfile

FROM istionightly/base_debug

ADD pilot-discovery /usr/local/bin/
ADD cacert.pem /cacert.pem
ENTRYPOINT ["/usr/local/bin/pilot-discovery"]

启动命令行

# ps -ef -www|grep pilot
$/usr/local/bin/pilot-discovery discovery

命令行帮助

# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery --help
Usage:
  pilot-discovery [command]

Available Commands:
  discovery   Start Istio proxy discovery service
  help        Help about any command
  request     Makes an HTTP request to Pilot metrics/debug endpoint
  version     Prints out build version information

discovery 相关的帮助

# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery discovery --help
Defaulting container name to discovery.

Start Istio proxy discovery service

Usage:
  pilot-discovery discovery [flags]

Flags:
  -a, --appNamespace string                 Restrict the applications namespace the controller manages; if not set, controller watches all namespaces
      --cfConfig string                     Cloud Foundry config file
      --clusterRegistriesConfigMap string   ConfigMap map for clusters config store
      --clusterRegistriesNamespace string   Namespace for ConfigMap which stores clusters configs
      --configDir string                    Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client.
      --consulserverInterval duration       Interval (in seconds) for polling the Consul service registry (default 2s)
      --consulserverURL string              URL for the Consul server
      --disable-install-crds                Disable discovery service from verifying the existence of CRDs at startup and then installing if not detected.  It is recommended to be disable for highly available setups.
      --discovery_cache                     Enable caching discovery service responses (default true)
      --domain string                       DNS domain suffix (default "cluster.local")
      --grpcAddr string                     Discovery service grpc address (default ":15010")
  -h, --help                                help for discovery
      --httpAddr string                     Discovery service HTTP address (default ":8080")
      --kubeconfig string                   Use a Kubernetes configuration file instead of in-cluster configuration
      --meshConfig string                   File name for Istio mesh configuration. If not specified, a default mesh will be used. (default "/etc/istio/config/mesh")
      --monitoringAddr string               HTTP address to use for the exposing pilot self-monitoring information (default ":9093")
  -n, --namespace string                    Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable
      --plugins stringSlice                 comma separated list of networking plugins to enable (default [authn,authz,health,mixer,envoyfilter])
      --profile                             Enable profiling via web interface host:port/debug/pprof (default true)
      --registries stringSlice              Comma separated list of platform service registries to read from (choose one or more from {Kubernetes, Consul, CloudFoundry, Mock, Config}) (default [Kubernetes])
      --resync duration                     Controller resync interval (default 1m0s)
      --secureGrpcAddr string               Discovery service grpc address, with https (default ":15012")
      --webhookEndpoint string              Webhook API endpoint (supports http://sockethost, and unix:///absolute/path/to/socket

Global Flags:
   省略

主要参数:

名称 默认值 备注
--appNamespace 与 helm 安装中的 oneNamespace 对应
--configDir 表明 pilot 的两种来源:配置文件和 CRD
--discovery_cache ture 启动 cache,有助于提升性能
--domain "cluster.local" k8s 中域后缀
--grpcAddr :15010
--httpAddr :8080
--meshConfig "/etc/istio/config/mesh"
--registries Kubernetes

代码分析

函数入口

discoveryCmd = &cobra.Command{
        Use:   "discovery",
        Short: "Start Istio proxy discovery service.",
        Args:  cobra.ExactArgs(0),
        RunE: func(c *cobra.Command, args []string) error {
            // ...

            // Create the stop channel for all of the servers.
            stop := make(chan struct{})

            // Create the server for the discovery service.
            discoveryServer, err := bootstrap.NewServer(serverArgs)
            if err != nil {
                return fmt.Errorf("failed to create discovery service: %v", err)
            }

            // Start the server
            if err := discoveryServer.Start(stop); err != nil {
                return fmt.Errorf("failed to start discovery service: %v", err)
            }

            cmd.WaitSignal(stop)
            return nil
        },
    }

istio.io/istio/pilot/pkg/bootstrap/server.go

mesh 的默认配置参见:https://gist.github.com/DavadDi/f110459d339e260f818250287fc78ccc#file-mesh

// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args PilotArgs) (*Server, error) {
    // ...
    s := &Server{
        filewatcher: filewatcher.NewWatcher(),
    }

    // 省略错误处理
    s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient
    s.initMesh(&args) // 初始化配置,并添加到  filewatcher 中, /etc/istio/config/mesh

    // 1.0.5 版本的 cmd 中未包括,应该是 1.1 中添加 
    // serverArgs.NetworksConfigFile, "networksConfig", "/etc/istio/config/meshNetworks"
    // 初始化配置,并加入到 filewatcher 中监听
    s.initMeshNetworks(&args)

    // initMixerSan configures the mixerSAN configuration item. 
    // The mesh must already have been configured.
    s.initMixerSan(&args)

    // creates the config controller in the pilotConfig.
    // 最终会创建一个 crd.NewController 实例
    s.initConfigController(&args)
    /* 内部以 kube 为例,表明主要流程
        controller, err := s.makeKubeConfigController(args)
        s.configController = controller

        s.addStartFunc(func(stop <-chan struct{}) error {
            go s.configController.Run(stop)  // 1. 第一个启动的 configController
        })
    */

    // creates and initializes the service controllers
    s.initServiceControllers(&args)
    /*
       s.createK8sServiceControllers(serviceControllers, args)
        --> kube.NewController(s.kubeClient, args.Config.ControllerOptions)

        s.addStartFunc(func(stop <-chan struct{}) error {
        go s.ServiceController.Run(stop)  // 2. 第二个启动的 ServiceController
        return nil
    })
    */

    // 初始化 DiscoveryService gRPC 服务器,后面详细讲解
    // 添加2个 func 到 startFuncs 中
    s.initDiscoveryService(&args)

    // initializes the configuration for the pilot monitoring server.
    // 添加1个 func 到 startFuncs 中
    s.initMonitor(&args)

    // starts the secret controller to watch for remote
    // clusters and initialize the multicluster structures.
    // 暂时不分析
    s.initClusterRegistries(&args)

    return s, nil
}

// 将 NewServer 函数中初始化的函数依次启动起来
func (s *Server) Start(stop <-chan struct{}) error {
    // Now start all of the components.
    for _, fn := range s.startFuncs {
        fn(stop)
    }
}

istio.io/api/mesh/v1alpha1/network.pb.go

其中 MeshNetworks 结构体和定义说明如下:

// MeshNetworks (config map) provides information about the set of networks
// inside a mesh and how to route to endpoints in each network. For example
//
// MeshNetworks(file/config map):
// networks:
// - network1:
//   - endpoints:
//     - fromRegistry: registry1 #must match secret name in kubernetes
//     - fromCidr: 192.168.100.0/22 #a VM network for example
//     gateways:
//     - registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local
//       port: 15443
//       locality: us-east-1a
type MeshNetworks struct {
    // REQUIRED: The set of networks inside this mesh. Each network should
    // have a unique name and information about how to infer the endpoints in
    // the network as well as the gateways associated with the network.
    Networks map[string]*Network `protobuf:"bytes,1,rep,name=networks" json:"networks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"`
}

经过对于 NewServer 函数的计划分析如下:

func NewServer(args PilotArgs) (*Server, error) {
    // 省略错误处理
    s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient

    // 初始化相关配置,并监视配置的变化情况
    // ...

    // creates the config controller in the pilotConfig.
    // 最终会创建一个 crd.NewController 实例
    s.initConfigController(&args)
    // s.makeKubeConfigController(args)
    // s.configController.Run(stop)


    // creates and initializes the service controllers
    s.initServiceControllers(&args)
    // kube.NewController(s.kubeClient, args.Config.ControllerOptions)
    // go s.ServiceController.Run(stop)

    // 初始化 DiscoveryService gRPC 服务器,后面详细讲解
    // 添加 2 个 func 到 startFuncs 中
    s.initDiscoveryService(&args)

    return s, nil
}

ConfigController

// initConfigController creates the config controller in the pilotConfig.
func (s *Server) initConfigController(args *PilotArgs) error {
    // k8s 方式下初始化
    controller, err := s.makeKubeConfigController(args)
    s.configController = controller

    // Defer starting the controller until after the service is created.
    s.addStartFunc(func(stop <-chan struct{}) error {
        go s.configController.Run(stop)
        return nil
    })

    //...

    // Create the config store.
    s.istioConfigStore = model.MakeIstioStore(s.configController)

    return nil
}
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) {
    kubeCfgFile := s.getKubeCfgFile(args)
    configClient, err := crd.NewClient(
        kubeCfgFile, "", 
        model.IstioConfigTypes, // 全部相关的 CRD 定义
        args.Config.ControllerOptions.DomainSuffix)


    if !args.Config.DisableInstallCRDs {
        // 注册自定义的 CRD
        configClient.RegisterResources()
    }

    return crd.NewController(configClient, args.Config.ControllerOptions), nil
}

其中 crd.NewClient 中的参数 model.IstioConfigTypes 包含了相关的全部 CRD 的定义:

istio.io/istio/pilot/pkg/model/config.go

// IstioConfigTypes lists all Istio config types with schemas and validation
    IstioConfigTypes = ConfigDescriptor{
        VirtualService,
        Gateway,
        ServiceEntry,
        DestinationRule,
        EnvoyFilter,
        Sidecar,
        HTTPAPISpec,
        HTTPAPISpecBinding,
        QuotaSpec,
        QuotaSpecBinding,
        AuthenticationPolicy,
        AuthenticationMeshPolicy,
        ServiceRole,
        ServiceRoleBinding,
        RbacConfig,
        ClusterRbacConfig,
    }

其中 crd.NewController 定义在 istio.io/istio/pilot/pkg/config/kube/crd/controller.go 文件中:

// NewController creates a new Kubernetes controller for CRDs
// Use "" for namespace to listen for all namespace changes
func NewController(client *Client, options kube.ControllerOptions) model.ConfigStoreCache {
    log.Infof("CRD controller watching namespaces %q", options.WatchedNamespace)

    // Queue requires a time duration for a retry delay after a handler error
    out := &controller{
        client: client,
        queue:  kube.NewQueue(1 * time.Second),
        kinds:  make(map[string]cacheHandler),
    }

    // add stores for CRD kinds
    for _, schema := range client.ConfigDescriptor() {
        out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod)
    }

    return out
}

对于 CRD 的监视,每一类资源需要启动一个单独的客户端连接,目前 CRD 的 Group 主要有 network/config/authentication/rbac 等;

// controller is a collection of synchronized resource watchers.
// Caches are thread-safe
type controller struct {
    client *Client  // 每类资源一个连接
    queue  kube.Queue
    kinds  map[string]cacheHandler
}

type cacheHandler struct {
    informer cache.SharedIndexInformer
    handler  *kube.ChainHandler
}
// Client is a basic REST client for CRDs implementing config store
type Client struct {
    // Map of apiVersion to restClient.
    clientset map[string]*restClient

    // domainSuffix for the config metadata
    domainSuffix string
}

ServiceController

istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go

结构定义如下:

type Controller struct {
    domainSuffix string

    client    kubernetes.Interface
    queue     Queue
    services  cacheHandler
    endpoints cacheHandler
    nodes     cacheHandler

    pods *PodCache

    //....
}

NewController 函数定义如下:

// NewController creates a new Kubernetes controller
// Created by bootstrap and multicluster (see secretcontroler).
func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
    log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
        options.WatchedNamespace, options.ResyncPeriod)

    // Queue requires a time duration for a retry delay after a handler error
    out := &Controller{
        domainSuffix:               options.DomainSuffix,
        client:                     client,
        queue:                      NewQueue(1 * time.Second),
        ClusterID:                  options.ClusterID,
        XDSUpdater:                 options.XDSUpdater,
        servicesMap:                make(map[model.Hostname]*model.Service),
        externalNameSvcInstanceMap: make(map[model.Hostname][]*model.ServiceInstance),
    }

    sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))

    svcInformer := sharedInformers.Core().V1().Services().Informer()
    out.services = out.createCacheHandler(svcInformer, "Services")

    epInformer := sharedInformers.Core().V1().Endpoints().Informer()
    out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")

    nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
    out.nodes = out.createCacheHandler(nodeInformer, "Nodes")

    podInformer := sharedInformers.Core().V1().Pods().Informer()
    out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)

    return out
}

从上面代码可以很清晰看到,ServiceController 监听特定 namespace 下的以下资源:

  1. Services
  2. Endpoints
  3. Nodes
  4. Pod

ServiceDiscovery

istio.io/istio/pilot/pkg/bootstrap/server.go

func (s *Server) initDiscoveryService(args *PilotArgs) error {
    // 需要注意 env 保存了后面使用的变量包括 
    // s.istioConfigStore  s.ServiceController s.ServiceController
    environment := &model.Environment{
        Mesh:             s.mesh,
        MeshNetworks:     s.meshNetworks,
        IstioConfigStore: s.istioConfigStore, // istio routing rules
        ServiceDiscovery: s.ServiceController, // service list 
        ServiceAccounts:  s.ServiceController,
        MixerSAN:         s.mixerSAN,
    }

    // Set up discovery service
    discovery, err := envoy.NewDiscoveryService(
        environment,
        args.DiscoveryOptions,
    )

    s.mux = discovery.RestContainer.ServeMux

    // 创建  envoyv2.NewDiscoveryServer 对应的 gRPC Server
    s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
        environment,
        istio_networking.NewConfigGenerator(args.Plugins),
        s.ServiceController, 
        s.configController)

    s.EnvoyXdsServer.InitDebug(s.mux, s.ServiceController)

    // ...

    // create grpc/http server
    s.initGrpcServer(args.KeepaliveOptions)
    s.httpServer = &http.Server{
        Addr:    args.DiscoveryOptions.HTTPAddr,
        Handler: s.mux,
    }

    // create http listener
    listener, err := net.Listen("tcp", args.DiscoveryOptions.HTTPAddr)
    s.HTTPListeningAddr = listener.Addr()

    // create grpc listener
    grpcListener, err := net.Listen("tcp", args.DiscoveryOptions.GrpcAddr)
    s.GRPCListeningAddr = grpcListener.Addr()

    s.addStartFunc(func(stop <-chan struct{}) error {
        // 启动 http  server goroutine
        // 启动 gRPC server goroutine
        // 等待关闭 goroutine

        return nil
    })

    // run secure grpc server

    return nil
}

istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go, envoyv2.NewDiscoveryServer 函数定义:

//  s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
//        environment,
//        istio_networking.NewConfigGenerator(args.Plugins),
//        s.ServiceController, 
//        s.configController)
// NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures
func NewDiscoveryServer(env *model.Environment, generator core.ConfigGenerator, ctl model.Controller, configCache model.ConfigStoreCache) *DiscoveryServer {
    // 创建 DiscoveryServer 对象
    out := &DiscoveryServer{
        Env:                     env,
        ConfigGenerator:         generator, // generates xDS responses
        ConfigController:        configCache,
        EndpointShardsByService: map[string]*EndpointShards{},
        WorkloadsByID:           map[string]*Workload{},
        edsUpdates:              map[string]*EndpointShards{},
        concurrentPushLimit:     make(chan struct{}, 20), 
        updateChannel:           make(chan *updateReq, 10),
    }
    env.PushContext = model.NewPushContext()

    // 处理相关的更新操作
    // handleUpdates处理来自 updateChannel 的事件它确保自上次事件处理之前至少已经过了minQuiet时间。
    // 它还确保在接收事件和处理事件之间最多经过 maxDelay。
    // 最后调用 doPush 函数进行推送,根据全量推送标记,将最近更新的 eds 相关信息通过 
    // XDS Incremental Push 或者 全量推送出去
    go out.handleUpdates()

    // 以下三种清空 DiscoveryServer 的本地缓存,并注册清理缓存的函数
    // 1. service 相关的信息有变化的时候,
    // 2. jwt public key 发生变化
    // 3. Istio CRD 有变化的
    // 当以上三种任一情况发生的时候,会设置信息到 out.handleUpdates() 函数

    // 周期性更新
    go out.periodicRefresh()

    // 周期性更新 metrics
    go out.periodicRefreshMetrics()

    out.DebugConfigs = pilot.DebugConfigs

    pushThrottle := intEnv(pilot.PushThrottle, 10)
    pushBurst := intEnv(pilot.PushBurst, 100)

    adsLog.Infof("Starting ADS server with rateLimiter=%d burst=%d", pushThrottle, pushBurst)
    out.rateLimiter = rate.NewLimiter(rate.Limit(pushThrottle), pushBurst)
    out.initRateLimiter = rate.NewLimiter(rate.Limit(pushThrottle*2), pushBurst*2)

    return out
}

ADS 相关定义:

https://github.com/envoyproxy/data-plane-api/blob/master/envoy/service/discovery/v2/ads.proto

// See https://github.com/lyft/envoy-api#apis for a description of the role of
// ADS and how it is intended to be used by a management server. ADS requests
// have the same structure as their singleton xDS counterparts, but can
// multiplex many resource types on a single stream. The type_url in the
// DiscoveryRequest/DiscoveryResponse provides sufficient information to recover
// the multiplexed singleton APIs at the Envoy instance and management server.
service AggregatedDiscoveryService {
  // This is a gRPC-only API.
  rpc StreamAggregatedResources(stream envoy.api.v2.DiscoveryRequest)
      returns (stream envoy.api.v2.DiscoveryResponse) {
  }

  rpc IncrementalAggregatedResources(stream envoy.api.v2.IncrementalDiscoveryRequest)
      returns (stream envoy.api.v2.IncrementalDiscoveryResponse) {
  }
}

https://github.com/envoyproxy/data-plane-api/blob/master/envoy/api/v2/discovery.proto

// A DiscoveryRequest requests a set of versioned resources of the same type for
// a given Envoy node on some API.
message DiscoveryRequest {
  // The version_info provided in the request messages will be the version_info
  // received with the most recent successfully processed response or empty on
  // the first request. It is expected that no new request is sent after a
  // response is received until the Envoy instance is ready to ACK/NACK the new
  // configuration. ACK/NACK takes place by returning the new API config version
  // as applied or the previous API config version respectively. Each type_url
  // (see below) has an independent version associated with it.
  string version_info = 1;

  // The node making the request.
  core.Node node = 2;

  // List of resources to subscribe to, e.g. list of cluster names or a route
  // configuration name. If this is empty, all resources for the API are
  // returned. LDS/CDS expect empty resource_names, since this is global
  // discovery for the Envoy instance. The LDS and CDS responses will then imply
  // a number of resources that need to be fetched via EDS/RDS, which will be
  // explicitly enumerated in resource_names.
  repeated string resource_names = 3;

  // Type of the resource that is being requested, e.g.
  // "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment". This is implicit
  // in requests made via singleton xDS APIs such as CDS, LDS, etc. but is
  // required for ADS.
  string type_url = 4;

  // nonce corresponding to DiscoveryResponse being ACK/NACKed. See above
  // discussion on version_info and the DiscoveryResponse nonce comment. This
  // may be empty if no nonce is available, e.g. at startup or for non-stream
  // xDS implementations.
  string response_nonce = 5;

  // This is populated when the previous :ref:`DiscoveryResponse <envoy_api_msg_DiscoveryResponse>`
  // failed to update configuration. The *message* field in *error_details* provides the Envoy
  // internal exception related to the failure. It is only intended for consumption during manual
  // debugging, the string provided is not guaranteed to be stable across Envoy versions.
  google.rpc.Status error_detail = 6;
}

message DiscoveryResponse {
  // The version of the response data.
  string version_info = 1;

  // The response resources. These resources are typed and depend on the API being called.
  repeated google.protobuf.Any resources = 2 [(gogoproto.nullable) = false];

  // [#not-implemented-hide:]
  // Canary is used to support two Envoy command line flags:
  //
  // * --terminate-on-canary-transition-failure. When set, Envoy is able to
  //   terminate if it detects that configuration is stuck at canary. Consider
  //   this example sequence of updates:
  //   - Management server applies a canary config successfully.
  //   - Management server rolls back to a production config.
  //   - Envoy rejects the new production config.
  //   Since there is no sensible way to continue receiving configuration
  //   updates, Envoy will then terminate and apply production config from a
  //   clean slate.
  // * --dry-run-canary. When set, a canary response will never be applied, only
  //   validated via a dry run.
  bool canary = 3;

  // Type URL for resources. This must be consistent with the type_url in the
  // Any messages for resources if resources is non-empty. This effectively
  // identifies the xDS API when muxing over ADS.
  string type_url = 4;

  // For gRPC based subscriptions, the nonce provides a way to explicitly ack a
  // specific DiscoveryResponse in a following DiscoveryRequest. Additional
  // messages may have been sent by Envoy to the management server for the
  // previous version on the stream prior to this DiscoveryResponse, that were
  // unprocessed at response send time. The nonce allows the management server
  // to ignore any further DiscoveryRequests for the previous version until a
  // DiscoveryRequest bearing the nonce. The nonce is optional and is not
  // required for non-stream based xDS implementations.
  string nonce = 5;

  // [#not-implemented-hide:]
  // The control plane instance that sent the response.
  core.ControlPlane control_plane = 6;
}

关于 ADS 保证一致性的内容参见: envoy-的-xds-rest-和-grpc-协议详解 中的 ”最终一致性考虑“ 章节:

一般来说,为避免流量丢弃,更新的顺序应该遵循 make before break 模型,其中

* 必须始终先推送 CDS 更新(如果有)。

* EDS 更新(如果有)必须在相应集群的 CDS 更新后到达。

* LDS 更新必须在相应的 CDS/EDS 更新后到达。

* 与新添加的监听器相关的 RDS 更新必须在最后到达。

* 最后,删除过期的 CDS 集群和相关的 EDS 端点(不再被引用的端点)。

ADS 允许单一管理服务器通过单个 gRPC 流,提供所有的 API 更新。配合仔细规划的更新顺序,ADS 可规避更新过程中流量丢失。

pilot/pkg/proxy/envoy/v2/ads.go

// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    // ...

    // InitContext returns immediately if the context was already initialized.
    // InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
    // 1. initServiceRegistry
    // 2. initVirtualServices
    // 3. initDestinationRules
    // 4. initAuthorizationPolicies
    // 5. InitSidecarScopes
    err := s.globalPushContext().InitContext(s.Env)
    con := newXdsConnection(peerAddr, stream)

    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    // 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
    go receiveThread(con, reqChannel, &receiveError)

    for {
        // Block until either a request is received or a push is triggered.
        select {
        case discReq, ok := <-reqChannel:
              err = s.initConnectionNode(discReq, con)

            switch discReq.TypeUrl {
            case ClusterType:
                // ...
                // CDS REQ is the first request an envoy makes. This shows up
                // immediately after connect. It is followed by EDS REQ as
                // soon as the CDS push is returned.
                adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
                con.CDSWatch = true
                err := s.pushCds(con, s.globalPushContext(), versionInfo())

            case ListenerType:
                // ...
                adsLog.Debugf("ADS:LDS: REQ %s %v", con.ConID, peerAddr)
                con.LDSWatch = true
                err := s.pushLds(con, s.globalPushContext(), true, versionInfo())

            case RouteType:
                // ...
                adsLog.Debugf("ADS:RDS: REQ %s %s  routes: %d", peerAddr, con.ConID, len(con.Routes))
                err := s.pushRoute(con, s.globalPushContext())

            case EndpointType:
                // ...
                // 各种错误处理

                for _, cn := range con.Clusters {
                    s.removeEdsCon(cn, con.ConID, con)
                }

                for _, cn := range clusters {
                    s.addEdsCon(cn, con.ConID, con)
                }

                con.Clusters = clusters
                adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters))
                err := s.pushEds(s.globalPushContext(), con, true, nil)
                if err != nil {
                    return err
                }

            default:
                adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
            }
            // ...
        case pushEv := <-con.pushChannel:
            // ...

            err := s.pushConnection(con, pushEv)
            if err != nil {
                return nil
            }

        }
    }
}

istio.io/istio/pilot/pkg/model/push_context.go

// PushContext tracks the status of a push - metrics and errors.
// Metrics are reset after a push - at the beginning all
// values are zero, and when push completes the status is reset.
// The struct is exposed in a debug endpoint - fields public to allow
// easy serialization as json.
type PushContext struct {

    // privateServices are reachable within the same namespace.
    privateServicesByNamespace map[string][]*Service
    // publicServices are services reachable within the mesh.
    publicServices []*Service

    privateVirtualServicesByNamespace map[string][]Config
    publicVirtualServices             []Config

    // destination rules are of three types:
    // namespaceLocalDestRules: all public/private dest rules pertaining to a service defined in a given namespace
    //  namespaceExportedDestRules: all public dest rules pertaining to a service defined in a namespace
    //  allExportedDestRules: all (public) dest rules across all namespaces
    // We need the allExportedDestRules in addition to namespaceExportedDestRules because we select
    // the dest rule based on the most specific host match, and not just any destination rule
    namespaceLocalDestRules    map[string]*processedDestRules
    namespaceExportedDestRules map[string]*processedDestRules
    allExportedDestRules       *processedDestRules

    // sidecars for each namespace
    sidecarsByNamespace map[string][]*SidecarScope
    ////////// END ////////

    // The following data is either a global index or used in the inbound path.
    // Namespace specific views do not apply here.

    // ServiceByHostname has all services, indexed by hostname.
    ServiceByHostname map[Hostname]*Service `json:"-"`

    // AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there
    // are no authorization policies in the cluster.
    AuthzPolicies *AuthorizationPolicies `json:"-"`

    // ServicePort2Name is used to keep track of service name and port mapping.
    // This is needed because ADS names use port numbers, while endpoints use
    // port names. The key is the service name. If a service or port are not found,
    // the endpoint needs to be re-evaluated later (eventual consistency)
    ServicePort2Name map[string]PortList `json:"-"`

    initDone bool
}


// InitContext will initialize the data structures used for code generation.
// This should be called before starting the push, from the thread creating
// the push context.
func (ps *PushContext) InitContext(env *Environment) error {
    ps.Mutex.Lock()
    defer ps.Mutex.Unlock()
    if ps.initDone {
        return nil
    }
    ps.Env = env
    var err error

    // Caches list of services in the registry, and creates a map
    // of hostname to service -> ServicePort2Name map[string]PortList `json:"-"`
    ps.initServiceRegistry(env)

    // Caches list of virtual services -> publicVirtualServices []Config
    ps.initVirtualServices(env)

    // Split out of DestinationRule expensive conversions - once per push.
    // 最后保存到以下三个变量中:
    //  * namespaceLocalDestRules    map[string]*processedDestRules
    //  * namespaceExportedDestRules map[string]*processedDestRules
    //  * allExportedDestRules       *processedDestRules
    ps.initDestinationRules(env)

    // Get the ClusterRbacConfig -> AuthzPolicies *AuthorizationPolicies
    ps.initAuthorizationPolicies(env)

    // Must be initialized in the end -> sidecarsByNamespace map[string][]*SidecarScope
    ps.InitSidecarScopes(env)

    ps.initDone = true
    return nil
}

CDS 为 ADS 中第一个发送的信息,后续我们以 CDS 为例进行详细分析

// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    // ...

    // InitContext returns immediately if the context was already initialized.
    // InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
    // 1. initServiceRegistry
    // 2. initVirtualServices
    // 3. initDestinationRules
    // 4. initAuthorizationPolicies
    // 5. InitSidecarScopes
    err := s.globalPushContext().InitContext(s.Env)
    con := newXdsConnection(peerAddr, stream)

    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    // 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
    go receiveThread(con, reqChannel, &receiveError)

    for {
        // Block until either a request is received or a push is triggered.
        select {
        case discReq, ok := <-reqChannel:
            // 主要是调用 `ParseServiceNodeWithMetadata` 函数,
            // 从 Req 的消息中获取到各种信息,并生产 `Proxy` 对象
            // 当前 discReq.Node.Id 格式为 Type~IPAddress~ID~Domain
            err = s.initConnectionNode(discReq, con)

            switch discReq.TypeUrl {
            case ClusterType:
                // 如果已经发送过 CS 数据后的响应消息的处理
                if con.CDSWatch {
                    // ...
                }
                // CDS REQ is the first request an envoy makes. This shows up
                // immediately after connect. It is followed by EDS REQ as
                // soon as the CDS push is returned.
                adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
                con.CDSWatch = true
                err := s.pushCds(con, s.globalPushContext(), versionInfo())
                if err != nil {
                    return err
                }

                //...
    }

core.Node

Identifies a specific Envoy instance. The node identifier is presented to the management server, which may use this identifier to distinguish per Envoy configuration for serving.{

"id": "...",

"cluster": "...",

"metadata": "{...}",

"locality": "{...}",

"build_version": "..."

}

id

(string) An opaque node identifier for the Envoy node. This also provides the local service node name. It should be set if any of the following features are used: statsd, CDS, and HTTP tracing, either in this message or via --service-node.

cluster

(string) Defines the local service cluster name where Envoy is running. Though optional, it should be set if any of the following features are used: statsd, health check cluster verification, runtime override directory, user agent addition, HTTP global rate limiting, CDS, and HTTP tracing, either in this message or via --service-cluster.

metadata

(Struct) Opaque metadata extending the node identifier. Envoy will pass this directly to the management server.

locality

(core.Locality) Locality specifying where the Envoy instance is running.

build_version

(string) This is motivated by informing a management server during canary which version of Envoy is being tested in a heterogeneous fleet. This will be set by Envoy in management server RPCs.

initConnectionNode 函数中,主要是调用 ParseServiceNodeWithMetadata 函数,从 Req 的消息中获取到各种信息,并生产 Proxy 对象;

istio.io/istio/pilot/pkg/model/context.go

func ParseServiceNodeWithMetadata(s string, metadata map[string]string) (*Proxy, error) {
}
// Proxy contains information about an specific instance of a proxy (envoy sidecar, gateway,
// etc). The Proxy is initialized when a sidecar connects to Pilot, and populated from
// 'node' info in the protocol as well as data extracted from registries.
//
// In current Istio implementation nodes use a 4-parts '~' delimited ID.
// Type~IPAddress~ID~Domain
type Proxy struct {
    // ClusterID specifies the cluster where the proxy resides.
    // TODO: clarify if this is needed in the new 'network' model, likely needs to
    // be renamed to 'network'
    ClusterID string

    // Type specifies the node type. First part of the ID.
    Type NodeType

    // IPAddresses is the IP addresses of the proxy used to identify it and its
    // co-located service instances. Example: "10.60.1.6". In some cases, the host
    // where the poxy and service instances reside may have more than one IP address
    IPAddresses []string

    // ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and
    // namespace.
    ID string

    // Locality is the location of where Envoy proxy runs.
    Locality Locality

    // DNSDomain defines the DNS domain suffix for short hostnames (e.g.
    // "default.svc.cluster.local")
    DNSDomain string

    // TrustDomain defines the trust domain of the certificate
    TrustDomain string

    // ConfigNamespace defines the namespace where this proxy resides
    // for the purposes of network scoping.
    // NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES
    ConfigNamespace string

    // Metadata key-value pairs extending the Node identifier
    Metadata map[string]string

    // the sidecarScope associated with the proxy
    SidecarScope *SidecarScope
}

推送 CDS 的核心实现通过函数 s.pushCds(con, s.globalPushContext(), versionInfo())

istio.io/istio/pilot/pkg/proxy/envoy/v2/cds.go

func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
    // 通过当前的 con.modelNode 和 push 的上下文生成对应的 rawClusters 对象 []*xdsapi.Cluster
    rawClusters, err := s.generateRawClusters(con.modelNode, push)

    response := con.clusters(rawClusters)
    err = con.send(response)
    return nil
}

因此 s.generateRawClusters(con.modelNode, push) 的作用不言而喻,就是将 push 上下文中与 CDS 相关的数据整理并封装成 CDS Reponse 的格式。

func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) ([]*xdsapi.Cluster, error) {
    rawClusters, err := s.ConfigGenerator.BuildClusters(s.Env, node, push)

    // 对 rawClusters 中的信息进行 Validate 验证
    return rawClusters, nil
}

对象 rawClusters 是通过 s.ConfigGenerator.BuildClusters 函数基于 s.Env, node, push 三者的信息组合而生成出来的:

  • s.Env 保存了 ServiceController 和 ConfigController 等资源的本地缓存信息
  • node 为本次 Req 请求中生成的包含 Node 相关信息的对象

  • push 为本次推送的上下文,已经将本次推送过程中需要的信息完成了初步的整理(从 s.Env 中生成出来的)

istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go

BuildClusters 函数的实现如下:

// BuildClusters returns the list of clusters for the given proxy. This is the CDS output
// For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname
// Cluster type based on resolution
// For inbound (sidecar only): Cluster for each inbound endpoint port and for each service port
func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) {
    clusters := make([]*apiv2.Cluster, 0)

    switch proxy.Type {
    case model.SidecarProxy:
        // GetProxyServiceInstances returns service instances co-located with the proxy
        // 获取与 proxy 所在主机上的 service instance,包括 headless 服务
        instances, err := env.GetProxyServiceInstances(proxy)

        sidecarScope := proxy.SidecarScope
        recomputeOutboundClusters := true
        // 追加 OutboundClusters
        if recomputeOutboundClusters {
            clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...)
        }

        // Let ServiceDiscovery decide which IP and Port are used for management if
        // there are multiple IPs
        managementPorts := make([]*model.Port, 0)
        for _, ip := range proxy.IPAddresses {
            managementPorts = append(managementPorts, env.ManagementPorts(ip)...)
        }

        // 追加与 proxy ip 上 managementPorts 相关的 InboundClusters
        clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...)

    default: // Gateways
        // ...
    }

    // Add a blackhole and passthrough cluster for catching traffic to unresolved routes
    // DO NOT CALL PLUGINS for these two clusters.
    clusters = append(clusters, buildBlackHoleCluster())
    clusters = append(clusters, buildDefaultPassthroughCluster())

    // resolves cluster name conflicts. 
    return normalizeClusters(push, proxy, clusters), nil
}

临时的调试方法:

istio.io/istio/pilot/pkg/proxy/envoy/v2/debug.go

// InitDebug initializes the debug handlers and adds a debug in-memory registry.
func (s *DiscoveryServer) InitDebug(mux *http.ServeMux, sctl *aggregate.Controller) {
    // For debugging and load testing v2 we add an memory registry.
    s.MemRegistry = NewMemServiceDiscovery(
        map[model.Hostname]*model.Service{ // mock.HelloService.Hostname: mock.HelloService,
        }, 2)
    s.MemRegistry.EDSUpdater = s
    s.MemRegistry.ClusterID = "v2-debug"

    sctl.AddRegistry(aggregate.Registry{
        ClusterID:        "v2-debug",
        Name:             serviceregistry.ServiceRegistry("memAdapter"),
        ServiceDiscovery: s.MemRegistry,
        ServiceAccounts:  s.MemRegistry,
        Controller:       s.MemRegistry.controller,
    })

    mux.HandleFunc("/ready", s.ready)

    mux.HandleFunc("/debug/edsz", s.edsz)
    mux.HandleFunc("/debug/adsz", s.adsz)
    mux.HandleFunc("/debug/cdsz", cdsz)
    mux.HandleFunc("/debug/syncz", Syncz)

    mux.HandleFunc("/debug/registryz", s.registryz)
    mux.HandleFunc("/debug/endpointz", s.endpointz)
    mux.HandleFunc("/debug/endpointShardz", s.endpointShardz)
    mux.HandleFunc("/debug/workloadz", s.workloadz)
    mux.HandleFunc("/debug/configz", s.configz)

    mux.HandleFunc("/debug/authenticationz", s.authenticationz)
    mux.HandleFunc("/debug/config_dump", s.ConfigDump)
    mux.HandleFunc("/debug/push_status", s.PushStatusHandler)
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

基业长青

基业长青

[美] 詹姆斯·柯林斯、[美] 杰里·波勒斯 / 真如 / 中信出版社 / 2006-9 / 39.00元

如何建立一个伟大并长盛不衰的公司?有思想的人们早已经厌倦了“年度流行语”般稍纵即逝的管理概念,他们渴求获得能经受时间考验的管理思想。 柯林斯和波勒斯在斯坦福大学为期6年的研究项目中,选取了18个卓越非凡、长盛不衰的公司作了深入的研究,这些公司包括通用电气、3M、默克、沃尔玛、惠普、迪士尼等,它们平均拥有近百年的历史。是什么使这些公司不同于它们的竞争对手呢?他们拥有什么别的公司所不具有的法宝呢......一起来看看 《基业长青》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具