内容简介:[TOC]完整的 yaml 文件参见
[TOC]
架构
介绍
完整的 yaml 文件参见 pilot-yaml 。
Dockerfile
FROM istionightly/base_debug ADD pilot-discovery /usr/local/bin/ ADD cacert.pem /cacert.pem ENTRYPOINT ["/usr/local/bin/pilot-discovery"]
启动命令行
# ps -ef -www|grep pilot $/usr/local/bin/pilot-discovery discovery
命令行帮助
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery --help Usage: pilot-discovery [command] Available Commands: discovery Start Istio proxy discovery service help Help about any command request Makes an HTTP request to Pilot metrics/debug endpoint version Prints out build version information
discovery 相关的帮助
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery discovery --help Defaulting container name to discovery. Start Istio proxy discovery service Usage: pilot-discovery discovery [flags] Flags: -a, --appNamespace string Restrict the applications namespace the controller manages; if not set, controller watches all namespaces --cfConfig string Cloud Foundry config file --clusterRegistriesConfigMap string ConfigMap map for clusters config store --clusterRegistriesNamespace string Namespace for ConfigMap which stores clusters configs --configDir string Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client. --consulserverInterval duration Interval (in seconds) for polling the Consul service registry (default 2s) --consulserverURL string URL for the Consul server --disable-install-crds Disable discovery service from verifying the existence of CRDs at startup and then installing if not detected. It is recommended to be disable for highly available setups. --discovery_cache Enable caching discovery service responses (default true) --domain string DNS domain suffix (default "cluster.local") --grpcAddr string Discovery service grpc address (default ":15010") -h, --help help for discovery --httpAddr string Discovery service HTTP address (default ":8080") --kubeconfig string Use a Kubernetes configuration file instead of in-cluster configuration --meshConfig string File name for Istio mesh configuration. If not specified, a default mesh will be used. (default "/etc/istio/config/mesh") --monitoringAddr string HTTP address to use for the exposing pilot self-monitoring information (default ":9093") -n, --namespace string Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable --plugins stringSlice comma separated list of networking plugins to enable (default [authn,authz,health,mixer,envoyfilter]) --profile Enable profiling via web interface host:port/debug/pprof (default true) --registries stringSlice Comma separated list of platform service registries to read from (choose one or more from {Kubernetes, Consul, CloudFoundry, Mock, Config}) (default [Kubernetes]) --resync duration Controller resync interval (default 1m0s) --secureGrpcAddr string Discovery service grpc address, with https (default ":15012") --webhookEndpoint string Webhook API endpoint (supports http://sockethost, and unix:///absolute/path/to/socket Global Flags: 省略
主要参数:
名称 | 默认值 | 备注 |
---|---|---|
--appNamespace | 空 | 与 helm 安装中的 oneNamespace 对应 |
--configDir | 空 | 表明 pilot 的两种来源:配置文件和 CRD |
--discovery_cache | ture | 启动 cache,有助于提升性能 |
--domain | "cluster.local" | k8s 中域后缀 |
--grpcAddr | :15010 | |
--httpAddr | :8080 | |
--meshConfig | "/etc/istio/config/mesh" | |
--registries | Kubernetes |
代码分析
函数入口
discoveryCmd = &cobra.Command{ Use: "discovery", Short: "Start Istio proxy discovery service.", Args: cobra.ExactArgs(0), RunE: func(c *cobra.Command, args []string) error { // ... // Create the stop channel for all of the servers. stop := make(chan struct{}) // Create the server for the discovery service. discoveryServer, err := bootstrap.NewServer(serverArgs) if err != nil { return fmt.Errorf("failed to create discovery service: %v", err) } // Start the server if err := discoveryServer.Start(stop); err != nil { return fmt.Errorf("failed to start discovery service: %v", err) } cmd.WaitSignal(stop) return nil }, }
istio.io/istio/pilot/pkg/bootstrap/server.go
mesh 的默认配置参见:https://gist.github.com/DavadDi/f110459d339e260f818250287fc78ccc#file-mesh
// NewServer creates a new Server instance based on the provided arguments. func NewServer(args PilotArgs) (*Server, error) { // ... s := &Server{ filewatcher: filewatcher.NewWatcher(), } // 省略错误处理 s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient s.initMesh(&args) // 初始化配置,并添加到 filewatcher 中, /etc/istio/config/mesh // 1.0.5 版本的 cmd 中未包括,应该是 1.1 中添加 // serverArgs.NetworksConfigFile, "networksConfig", "/etc/istio/config/meshNetworks" // 初始化配置,并加入到 filewatcher 中监听 s.initMeshNetworks(&args) // initMixerSan configures the mixerSAN configuration item. // The mesh must already have been configured. s.initMixerSan(&args) // creates the config controller in the pilotConfig. // 最终会创建一个 crd.NewController 实例 s.initConfigController(&args) /* 内部以 kube 为例,表明主要流程 controller, err := s.makeKubeConfigController(args) s.configController = controller s.addStartFunc(func(stop <-chan struct{}) error { go s.configController.Run(stop) // 1. 第一个启动的 configController }) */ // creates and initializes the service controllers s.initServiceControllers(&args) /* s.createK8sServiceControllers(serviceControllers, args) --> kube.NewController(s.kubeClient, args.Config.ControllerOptions) s.addStartFunc(func(stop <-chan struct{}) error { go s.ServiceController.Run(stop) // 2. 第二个启动的 ServiceController return nil }) */ // 初始化 DiscoveryService gRPC 服务器,后面详细讲解 // 添加2个 func 到 startFuncs 中 s.initDiscoveryService(&args) // initializes the configuration for the pilot monitoring server. // 添加1个 func 到 startFuncs 中 s.initMonitor(&args) // starts the secret controller to watch for remote // clusters and initialize the multicluster structures. // 暂时不分析 s.initClusterRegistries(&args) return s, nil } // 将 NewServer 函数中初始化的函数依次启动起来 func (s *Server) Start(stop <-chan struct{}) error { // Now start all of the components. for _, fn := range s.startFuncs { fn(stop) } }
istio.io/api/mesh/v1alpha1/network.pb.go
其中 MeshNetworks 结构体和定义说明如下:
// MeshNetworks (config map) provides information about the set of networks // inside a mesh and how to route to endpoints in each network. For example // // MeshNetworks(file/config map): // networks: // - network1: // - endpoints: // - fromRegistry: registry1 #must match secret name in kubernetes // - fromCidr: 192.168.100.0/22 #a VM network for example // gateways: // - registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local // port: 15443 // locality: us-east-1a type MeshNetworks struct { // REQUIRED: The set of networks inside this mesh. Each network should // have a unique name and information about how to infer the endpoints in // the network as well as the gateways associated with the network. Networks map[string]*Network `protobuf:"bytes,1,rep,name=networks" json:"networks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` }
经过对于 NewServer 函数的计划分析如下:
func NewServer(args PilotArgs) (*Server, error) { // 省略错误处理 s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient // 初始化相关配置,并监视配置的变化情况 // ... // creates the config controller in the pilotConfig. // 最终会创建一个 crd.NewController 实例 s.initConfigController(&args) // s.makeKubeConfigController(args) // s.configController.Run(stop) // creates and initializes the service controllers s.initServiceControllers(&args) // kube.NewController(s.kubeClient, args.Config.ControllerOptions) // go s.ServiceController.Run(stop) // 初始化 DiscoveryService gRPC 服务器,后面详细讲解 // 添加 2 个 func 到 startFuncs 中 s.initDiscoveryService(&args) return s, nil }
ConfigController
// initConfigController creates the config controller in the pilotConfig. func (s *Server) initConfigController(args *PilotArgs) error { // k8s 方式下初始化 controller, err := s.makeKubeConfigController(args) s.configController = controller // Defer starting the controller until after the service is created. s.addStartFunc(func(stop <-chan struct{}) error { go s.configController.Run(stop) return nil }) //... // Create the config store. s.istioConfigStore = model.MakeIstioStore(s.configController) return nil }
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) { kubeCfgFile := s.getKubeCfgFile(args) configClient, err := crd.NewClient( kubeCfgFile, "", model.IstioConfigTypes, // 全部相关的 CRD 定义 args.Config.ControllerOptions.DomainSuffix) if !args.Config.DisableInstallCRDs { // 注册自定义的 CRD configClient.RegisterResources() } return crd.NewController(configClient, args.Config.ControllerOptions), nil }
其中 crd.NewClient 中的参数 model.IstioConfigTypes 包含了相关的全部 CRD 的定义:
istio.io/istio/pilot/pkg/model/config.go
// IstioConfigTypes lists all Istio config types with schemas and validation IstioConfigTypes = ConfigDescriptor{ VirtualService, Gateway, ServiceEntry, DestinationRule, EnvoyFilter, Sidecar, HTTPAPISpec, HTTPAPISpecBinding, QuotaSpec, QuotaSpecBinding, AuthenticationPolicy, AuthenticationMeshPolicy, ServiceRole, ServiceRoleBinding, RbacConfig, ClusterRbacConfig, }
其中 crd.NewController 定义在 istio.io/istio/pilot/pkg/config/kube/crd/controller.go 文件中:
// NewController creates a new Kubernetes controller for CRDs // Use "" for namespace to listen for all namespace changes func NewController(client *Client, options kube.ControllerOptions) model.ConfigStoreCache { log.Infof("CRD controller watching namespaces %q", options.WatchedNamespace) // Queue requires a time duration for a retry delay after a handler error out := &controller{ client: client, queue: kube.NewQueue(1 * time.Second), kinds: make(map[string]cacheHandler), } // add stores for CRD kinds for _, schema := range client.ConfigDescriptor() { out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod) } return out }
对于 CRD 的监视,每一类资源需要启动一个单独的客户端连接,目前 CRD 的 Group 主要有 network/config/authentication/rbac 等;
// controller is a collection of synchronized resource watchers. // Caches are thread-safe type controller struct { client *Client // 每类资源一个连接 queue kube.Queue kinds map[string]cacheHandler } type cacheHandler struct { informer cache.SharedIndexInformer handler *kube.ChainHandler }
// Client is a basic REST client for CRDs implementing config store type Client struct { // Map of apiVersion to restClient. clientset map[string]*restClient // domainSuffix for the config metadata domainSuffix string }
ServiceController
istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go
结构定义如下:
type Controller struct { domainSuffix string client kubernetes.Interface queue Queue services cacheHandler endpoints cacheHandler nodes cacheHandler pods *PodCache //.... }
NewController 函数定义如下:
// NewController creates a new Kubernetes controller // Created by bootstrap and multicluster (see secretcontroler). func NewController(client kubernetes.Interface, options ControllerOptions) *Controller { log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s", options.WatchedNamespace, options.ResyncPeriod) // Queue requires a time duration for a retry delay after a handler error out := &Controller{ domainSuffix: options.DomainSuffix, client: client, queue: NewQueue(1 * time.Second), ClusterID: options.ClusterID, XDSUpdater: options.XDSUpdater, servicesMap: make(map[model.Hostname]*model.Service), externalNameSvcInstanceMap: make(map[model.Hostname][]*model.ServiceInstance), } sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace)) svcInformer := sharedInformers.Core().V1().Services().Informer() out.services = out.createCacheHandler(svcInformer, "Services") epInformer := sharedInformers.Core().V1().Endpoints().Informer() out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints") nodeInformer := sharedInformers.Core().V1().Nodes().Informer() out.nodes = out.createCacheHandler(nodeInformer, "Nodes") podInformer := sharedInformers.Core().V1().Pods().Informer() out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out) return out }
从上面代码可以很清晰看到,ServiceController 监听特定 namespace 下的以下资源:
- Services
- Endpoints
- Nodes
- Pod
ServiceDiscovery
istio.io/istio/pilot/pkg/bootstrap/server.go
func (s *Server) initDiscoveryService(args *PilotArgs) error { // 需要注意 env 保存了后面使用的变量包括 // s.istioConfigStore s.ServiceController s.ServiceController environment := &model.Environment{ Mesh: s.mesh, MeshNetworks: s.meshNetworks, IstioConfigStore: s.istioConfigStore, // istio routing rules ServiceDiscovery: s.ServiceController, // service list ServiceAccounts: s.ServiceController, MixerSAN: s.mixerSAN, } // Set up discovery service discovery, err := envoy.NewDiscoveryService( environment, args.DiscoveryOptions, ) s.mux = discovery.RestContainer.ServeMux // 创建 envoyv2.NewDiscoveryServer 对应的 gRPC Server s.EnvoyXdsServer = envoyv2.NewDiscoveryServer( environment, istio_networking.NewConfigGenerator(args.Plugins), s.ServiceController, s.configController) s.EnvoyXdsServer.InitDebug(s.mux, s.ServiceController) // ... // create grpc/http server s.initGrpcServer(args.KeepaliveOptions) s.httpServer = &http.Server{ Addr: args.DiscoveryOptions.HTTPAddr, Handler: s.mux, } // create http listener listener, err := net.Listen("tcp", args.DiscoveryOptions.HTTPAddr) s.HTTPListeningAddr = listener.Addr() // create grpc listener grpcListener, err := net.Listen("tcp", args.DiscoveryOptions.GrpcAddr) s.GRPCListeningAddr = grpcListener.Addr() s.addStartFunc(func(stop <-chan struct{}) error { // 启动 http server goroutine // 启动 gRPC server goroutine // 等待关闭 goroutine return nil }) // run secure grpc server return nil }
istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go, envoyv2.NewDiscoveryServer
函数定义:
// s.EnvoyXdsServer = envoyv2.NewDiscoveryServer( // environment, // istio_networking.NewConfigGenerator(args.Plugins), // s.ServiceController, // s.configController) // NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures func NewDiscoveryServer(env *model.Environment, generator core.ConfigGenerator, ctl model.Controller, configCache model.ConfigStoreCache) *DiscoveryServer { // 创建 DiscoveryServer 对象 out := &DiscoveryServer{ Env: env, ConfigGenerator: generator, // generates xDS responses ConfigController: configCache, EndpointShardsByService: map[string]*EndpointShards{}, WorkloadsByID: map[string]*Workload{}, edsUpdates: map[string]*EndpointShards{}, concurrentPushLimit: make(chan struct{}, 20), updateChannel: make(chan *updateReq, 10), } env.PushContext = model.NewPushContext() // 处理相关的更新操作 // handleUpdates处理来自 updateChannel 的事件它确保自上次事件处理之前至少已经过了minQuiet时间。 // 它还确保在接收事件和处理事件之间最多经过 maxDelay。 // 最后调用 doPush 函数进行推送,根据全量推送标记,将最近更新的 eds 相关信息通过 // XDS Incremental Push 或者 全量推送出去 go out.handleUpdates() // 以下三种清空 DiscoveryServer 的本地缓存,并注册清理缓存的函数 // 1. service 相关的信息有变化的时候, // 2. jwt public key 发生变化 // 3. Istio CRD 有变化的 // 当以上三种任一情况发生的时候,会设置信息到 out.handleUpdates() 函数 // 周期性更新 go out.periodicRefresh() // 周期性更新 metrics go out.periodicRefreshMetrics() out.DebugConfigs = pilot.DebugConfigs pushThrottle := intEnv(pilot.PushThrottle, 10) pushBurst := intEnv(pilot.PushBurst, 100) adsLog.Infof("Starting ADS server with rateLimiter=%d burst=%d", pushThrottle, pushBurst) out.rateLimiter = rate.NewLimiter(rate.Limit(pushThrottle), pushBurst) out.initRateLimiter = rate.NewLimiter(rate.Limit(pushThrottle*2), pushBurst*2) return out }
ADS 相关定义:
https://github.com/envoyproxy/data-plane-api/blob/master/envoy/service/discovery/v2/ads.proto
// See https://github.com/lyft/envoy-api#apis for a description of the role of // ADS and how it is intended to be used by a management server. ADS requests // have the same structure as their singleton xDS counterparts, but can // multiplex many resource types on a single stream. The type_url in the // DiscoveryRequest/DiscoveryResponse provides sufficient information to recover // the multiplexed singleton APIs at the Envoy instance and management server. service AggregatedDiscoveryService { // This is a gRPC-only API. rpc StreamAggregatedResources(stream envoy.api.v2.DiscoveryRequest) returns (stream envoy.api.v2.DiscoveryResponse) { } rpc IncrementalAggregatedResources(stream envoy.api.v2.IncrementalDiscoveryRequest) returns (stream envoy.api.v2.IncrementalDiscoveryResponse) { } }
https://github.com/envoyproxy/data-plane-api/blob/master/envoy/api/v2/discovery.proto
// A DiscoveryRequest requests a set of versioned resources of the same type for // a given Envoy node on some API. message DiscoveryRequest { // The version_info provided in the request messages will be the version_info // received with the most recent successfully processed response or empty on // the first request. It is expected that no new request is sent after a // response is received until the Envoy instance is ready to ACK/NACK the new // configuration. ACK/NACK takes place by returning the new API config version // as applied or the previous API config version respectively. Each type_url // (see below) has an independent version associated with it. string version_info = 1; // The node making the request. core.Node node = 2; // List of resources to subscribe to, e.g. list of cluster names or a route // configuration name. If this is empty, all resources for the API are // returned. LDS/CDS expect empty resource_names, since this is global // discovery for the Envoy instance. The LDS and CDS responses will then imply // a number of resources that need to be fetched via EDS/RDS, which will be // explicitly enumerated in resource_names. repeated string resource_names = 3; // Type of the resource that is being requested, e.g. // "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment". This is implicit // in requests made via singleton xDS APIs such as CDS, LDS, etc. but is // required for ADS. string type_url = 4; // nonce corresponding to DiscoveryResponse being ACK/NACKed. See above // discussion on version_info and the DiscoveryResponse nonce comment. This // may be empty if no nonce is available, e.g. at startup or for non-stream // xDS implementations. string response_nonce = 5; // This is populated when the previous :ref:`DiscoveryResponse <envoy_api_msg_DiscoveryResponse>` // failed to update configuration. The *message* field in *error_details* provides the Envoy // internal exception related to the failure. It is only intended for consumption during manual // debugging, the string provided is not guaranteed to be stable across Envoy versions. google.rpc.Status error_detail = 6; } message DiscoveryResponse { // The version of the response data. string version_info = 1; // The response resources. These resources are typed and depend on the API being called. repeated google.protobuf.Any resources = 2 [(gogoproto.nullable) = false]; // [#not-implemented-hide:] // Canary is used to support two Envoy command line flags: // // * --terminate-on-canary-transition-failure. When set, Envoy is able to // terminate if it detects that configuration is stuck at canary. Consider // this example sequence of updates: // - Management server applies a canary config successfully. // - Management server rolls back to a production config. // - Envoy rejects the new production config. // Since there is no sensible way to continue receiving configuration // updates, Envoy will then terminate and apply production config from a // clean slate. // * --dry-run-canary. When set, a canary response will never be applied, only // validated via a dry run. bool canary = 3; // Type URL for resources. This must be consistent with the type_url in the // Any messages for resources if resources is non-empty. This effectively // identifies the xDS API when muxing over ADS. string type_url = 4; // For gRPC based subscriptions, the nonce provides a way to explicitly ack a // specific DiscoveryResponse in a following DiscoveryRequest. Additional // messages may have been sent by Envoy to the management server for the // previous version on the stream prior to this DiscoveryResponse, that were // unprocessed at response send time. The nonce allows the management server // to ignore any further DiscoveryRequests for the previous version until a // DiscoveryRequest bearing the nonce. The nonce is optional and is not // required for non-stream based xDS implementations. string nonce = 5; // [#not-implemented-hide:] // The control plane instance that sent the response. core.ControlPlane control_plane = 6; }
关于 ADS 保证一致性的内容参见: envoy-的-xds-rest-和-grpc-协议详解 中的 ”最终一致性考虑“ 章节:
一般来说,为避免流量丢弃,更新的顺序应该遵循 make before break 模型,其中
* 必须始终先推送 CDS 更新(如果有)。
* EDS 更新(如果有)必须在相应集群的 CDS 更新后到达。
* LDS 更新必须在相应的 CDS/EDS 更新后到达。
* 与新添加的监听器相关的 RDS 更新必须在最后到达。
* 最后,删除过期的 CDS 集群和相关的 EDS 端点(不再被引用的端点)。
ADS 允许单一管理服务器通过单个 gRPC 流,提供所有的 API 更新。配合仔细规划的更新顺序,ADS 可规避更新过程中流量丢失。
pilot/pkg/proxy/envoy/v2/ads.go
// StreamAggregatedResources implements the ADS interface. func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { // ... // InitContext returns immediately if the context was already initialized. // InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析 // 1. initServiceRegistry // 2. initVirtualServices // 3. initDestinationRules // 4. initAuthorizationPolicies // 5. InitSidecarScopes err := s.globalPushContext().InitContext(s.Env) con := newXdsConnection(peerAddr, stream) reqChannel := make(chan *xdsapi.DiscoveryRequest, 1) // 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest go receiveThread(con, reqChannel, &receiveError) for { // Block until either a request is received or a push is triggered. select { case discReq, ok := <-reqChannel: err = s.initConnectionNode(discReq, con) switch discReq.TypeUrl { case ClusterType: // ... // CDS REQ is the first request an envoy makes. This shows up // immediately after connect. It is followed by EDS REQ as // soon as the CDS push is returned. adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String()) con.CDSWatch = true err := s.pushCds(con, s.globalPushContext(), versionInfo()) case ListenerType: // ... adsLog.Debugf("ADS:LDS: REQ %s %v", con.ConID, peerAddr) con.LDSWatch = true err := s.pushLds(con, s.globalPushContext(), true, versionInfo()) case RouteType: // ... adsLog.Debugf("ADS:RDS: REQ %s %s routes: %d", peerAddr, con.ConID, len(con.Routes)) err := s.pushRoute(con, s.globalPushContext()) case EndpointType: // ... // 各种错误处理 for _, cn := range con.Clusters { s.removeEdsCon(cn, con.ConID, con) } for _, cn := range clusters { s.addEdsCon(cn, con.ConID, con) } con.Clusters = clusters adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters)) err := s.pushEds(s.globalPushContext(), con, true, nil) if err != nil { return err } default: adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String()) } // ... case pushEv := <-con.pushChannel: // ... err := s.pushConnection(con, pushEv) if err != nil { return nil } } } }
istio.io/istio/pilot/pkg/model/push_context.go
// PushContext tracks the status of a push - metrics and errors. // Metrics are reset after a push - at the beginning all // values are zero, and when push completes the status is reset. // The struct is exposed in a debug endpoint - fields public to allow // easy serialization as json. type PushContext struct { // privateServices are reachable within the same namespace. privateServicesByNamespace map[string][]*Service // publicServices are services reachable within the mesh. publicServices []*Service privateVirtualServicesByNamespace map[string][]Config publicVirtualServices []Config // destination rules are of three types: // namespaceLocalDestRules: all public/private dest rules pertaining to a service defined in a given namespace // namespaceExportedDestRules: all public dest rules pertaining to a service defined in a namespace // allExportedDestRules: all (public) dest rules across all namespaces // We need the allExportedDestRules in addition to namespaceExportedDestRules because we select // the dest rule based on the most specific host match, and not just any destination rule namespaceLocalDestRules map[string]*processedDestRules namespaceExportedDestRules map[string]*processedDestRules allExportedDestRules *processedDestRules // sidecars for each namespace sidecarsByNamespace map[string][]*SidecarScope ////////// END //////// // The following data is either a global index or used in the inbound path. // Namespace specific views do not apply here. // ServiceByHostname has all services, indexed by hostname. ServiceByHostname map[Hostname]*Service `json:"-"` // AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there // are no authorization policies in the cluster. AuthzPolicies *AuthorizationPolicies `json:"-"` // ServicePort2Name is used to keep track of service name and port mapping. // This is needed because ADS names use port numbers, while endpoints use // port names. The key is the service name. If a service or port are not found, // the endpoint needs to be re-evaluated later (eventual consistency) ServicePort2Name map[string]PortList `json:"-"` initDone bool } // InitContext will initialize the data structures used for code generation. // This should be called before starting the push, from the thread creating // the push context. func (ps *PushContext) InitContext(env *Environment) error { ps.Mutex.Lock() defer ps.Mutex.Unlock() if ps.initDone { return nil } ps.Env = env var err error // Caches list of services in the registry, and creates a map // of hostname to service -> ServicePort2Name map[string]PortList `json:"-"` ps.initServiceRegistry(env) // Caches list of virtual services -> publicVirtualServices []Config ps.initVirtualServices(env) // Split out of DestinationRule expensive conversions - once per push. // 最后保存到以下三个变量中: // * namespaceLocalDestRules map[string]*processedDestRules // * namespaceExportedDestRules map[string]*processedDestRules // * allExportedDestRules *processedDestRules ps.initDestinationRules(env) // Get the ClusterRbacConfig -> AuthzPolicies *AuthorizationPolicies ps.initAuthorizationPolicies(env) // Must be initialized in the end -> sidecarsByNamespace map[string][]*SidecarScope ps.InitSidecarScopes(env) ps.initDone = true return nil }
CDS 为 ADS 中第一个发送的信息,后续我们以 CDS 为例进行详细分析
// StreamAggregatedResources implements the ADS interface. func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { // ... // InitContext returns immediately if the context was already initialized. // InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析 // 1. initServiceRegistry // 2. initVirtualServices // 3. initDestinationRules // 4. initAuthorizationPolicies // 5. InitSidecarScopes err := s.globalPushContext().InitContext(s.Env) con := newXdsConnection(peerAddr, stream) reqChannel := make(chan *xdsapi.DiscoveryRequest, 1) // 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest go receiveThread(con, reqChannel, &receiveError) for { // Block until either a request is received or a push is triggered. select { case discReq, ok := <-reqChannel: // 主要是调用 `ParseServiceNodeWithMetadata` 函数, // 从 Req 的消息中获取到各种信息,并生产 `Proxy` 对象 // 当前 discReq.Node.Id 格式为 Type~IPAddress~ID~Domain err = s.initConnectionNode(discReq, con) switch discReq.TypeUrl { case ClusterType: // 如果已经发送过 CS 数据后的响应消息的处理 if con.CDSWatch { // ... } // CDS REQ is the first request an envoy makes. This shows up // immediately after connect. It is followed by EDS REQ as // soon as the CDS push is returned. adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String()) con.CDSWatch = true err := s.pushCds(con, s.globalPushContext(), versionInfo()) if err != nil { return err } //... }
Identifies a specific Envoy instance. The node identifier is presented to the management server, which may use this identifier to distinguish per Envoy configuration for serving.{
"id": "...",
"cluster": "...",
"metadata": "{...}",
"locality": "{...}",
"build_version": "..."
}
id
(string) An opaque node identifier for the Envoy node. This also provides the local service node name. It should be set if any of the following features are used: statsd, CDS, and HTTP tracing, either in this message or via --service-node.
cluster
(string) Defines the local service cluster name where Envoy is running. Though optional, it should be set if any of the following features are used: statsd, health check cluster verification, runtime override directory, user agent addition, HTTP global rate limiting, CDS, and HTTP tracing, either in this message or via --service-cluster.
metadata
(Struct) Opaque metadata extending the node identifier. Envoy will pass this directly to the management server.
locality
(core.Locality) Locality specifying where the Envoy instance is running.
build_version
(string) This is motivated by informing a management server during canary which version of Envoy is being tested in a heterogeneous fleet. This will be set by Envoy in management server RPCs.
在 initConnectionNode
函数中,主要是调用 ParseServiceNodeWithMetadata
函数,从 Req 的消息中获取到各种信息,并生产 Proxy
对象;
istio.io/istio/pilot/pkg/model/context.go
func ParseServiceNodeWithMetadata(s string, metadata map[string]string) (*Proxy, error) { }
// Proxy contains information about an specific instance of a proxy (envoy sidecar, gateway, // etc). The Proxy is initialized when a sidecar connects to Pilot, and populated from // 'node' info in the protocol as well as data extracted from registries. // // In current Istio implementation nodes use a 4-parts '~' delimited ID. // Type~IPAddress~ID~Domain type Proxy struct { // ClusterID specifies the cluster where the proxy resides. // TODO: clarify if this is needed in the new 'network' model, likely needs to // be renamed to 'network' ClusterID string // Type specifies the node type. First part of the ID. Type NodeType // IPAddresses is the IP addresses of the proxy used to identify it and its // co-located service instances. Example: "10.60.1.6". In some cases, the host // where the poxy and service instances reside may have more than one IP address IPAddresses []string // ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and // namespace. ID string // Locality is the location of where Envoy proxy runs. Locality Locality // DNSDomain defines the DNS domain suffix for short hostnames (e.g. // "default.svc.cluster.local") DNSDomain string // TrustDomain defines the trust domain of the certificate TrustDomain string // ConfigNamespace defines the namespace where this proxy resides // for the purposes of network scoping. // NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES ConfigNamespace string // Metadata key-value pairs extending the Node identifier Metadata map[string]string // the sidecarScope associated with the proxy SidecarScope *SidecarScope }
推送 CDS 的核心实现通过函数 s.pushCds(con, s.globalPushContext(), versionInfo())
istio.io/istio/pilot/pkg/proxy/envoy/v2/cds.go
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error { // 通过当前的 con.modelNode 和 push 的上下文生成对应的 rawClusters 对象 []*xdsapi.Cluster rawClusters, err := s.generateRawClusters(con.modelNode, push) response := con.clusters(rawClusters) err = con.send(response) return nil }
因此 s.generateRawClusters(con.modelNode, push)
的作用不言而喻,就是将 push
上下文中与 CDS 相关的数据整理并封装成 CDS Reponse 的格式。
func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) ([]*xdsapi.Cluster, error) { rawClusters, err := s.ConfigGenerator.BuildClusters(s.Env, node, push) // 对 rawClusters 中的信息进行 Validate 验证 return rawClusters, nil }
对象 rawClusters
是通过 s.ConfigGenerator.BuildClusters
函数基于 s.Env, node, push
三者的信息组合而生成出来的:
- s.Env 保存了 ServiceController 和 ConfigController 等资源的本地缓存信息
-
node 为本次 Req 请求中生成的包含 Node 相关信息的对象
-
push 为本次推送的上下文,已经将本次推送过程中需要的信息完成了初步的整理(从 s.Env 中生成出来的)
istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go
BuildClusters 函数的实现如下:
// BuildClusters returns the list of clusters for the given proxy. This is the CDS output // For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname // Cluster type based on resolution // For inbound (sidecar only): Cluster for each inbound endpoint port and for each service port func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) { clusters := make([]*apiv2.Cluster, 0) switch proxy.Type { case model.SidecarProxy: // GetProxyServiceInstances returns service instances co-located with the proxy // 获取与 proxy 所在主机上的 service instance,包括 headless 服务 instances, err := env.GetProxyServiceInstances(proxy) sidecarScope := proxy.SidecarScope recomputeOutboundClusters := true // 追加 OutboundClusters if recomputeOutboundClusters { clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...) } // Let ServiceDiscovery decide which IP and Port are used for management if // there are multiple IPs managementPorts := make([]*model.Port, 0) for _, ip := range proxy.IPAddresses { managementPorts = append(managementPorts, env.ManagementPorts(ip)...) } // 追加与 proxy ip 上 managementPorts 相关的 InboundClusters clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...) default: // Gateways // ... } // Add a blackhole and passthrough cluster for catching traffic to unresolved routes // DO NOT CALL PLUGINS for these two clusters. clusters = append(clusters, buildBlackHoleCluster()) clusters = append(clusters, buildDefaultPassthroughCluster()) // resolves cluster name conflicts. return normalizeClusters(push, proxy, clusters), nil }
临时的调试方法:
istio.io/istio/pilot/pkg/proxy/envoy/v2/debug.go
// InitDebug initializes the debug handlers and adds a debug in-memory registry. func (s *DiscoveryServer) InitDebug(mux *http.ServeMux, sctl *aggregate.Controller) { // For debugging and load testing v2 we add an memory registry. s.MemRegistry = NewMemServiceDiscovery( map[model.Hostname]*model.Service{ // mock.HelloService.Hostname: mock.HelloService, }, 2) s.MemRegistry.EDSUpdater = s s.MemRegistry.ClusterID = "v2-debug" sctl.AddRegistry(aggregate.Registry{ ClusterID: "v2-debug", Name: serviceregistry.ServiceRegistry("memAdapter"), ServiceDiscovery: s.MemRegistry, ServiceAccounts: s.MemRegistry, Controller: s.MemRegistry.controller, }) mux.HandleFunc("/ready", s.ready) mux.HandleFunc("/debug/edsz", s.edsz) mux.HandleFunc("/debug/adsz", s.adsz) mux.HandleFunc("/debug/cdsz", cdsz) mux.HandleFunc("/debug/syncz", Syncz) mux.HandleFunc("/debug/registryz", s.registryz) mux.HandleFunc("/debug/endpointz", s.endpointz) mux.HandleFunc("/debug/endpointShardz", s.endpointShardz) mux.HandleFunc("/debug/workloadz", s.workloadz) mux.HandleFunc("/debug/configz", s.configz) mux.HandleFunc("/debug/authenticationz", s.authenticationz) mux.HandleFunc("/debug/config_dump", s.ConfigDump) mux.HandleFunc("/debug/push_status", s.PushStatusHandler) }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。