内容简介:[TOC]完整的 yaml 文件参见
[TOC]
架构
介绍
完整的 yaml 文件参见 pilot-yaml 。
Dockerfile
FROM istionightly/base_debug ADD pilot-discovery /usr/local/bin/ ADD cacert.pem /cacert.pem ENTRYPOINT ["/usr/local/bin/pilot-discovery"]
启动命令行
# ps -ef -www|grep pilot $/usr/local/bin/pilot-discovery discovery
命令行帮助
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery --help Usage: pilot-discovery [command] Available Commands: discovery Start Istio proxy discovery service help Help about any command request Makes an HTTP request to Pilot metrics/debug endpoint version Prints out build version information
discovery 相关的帮助
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery discovery --help Defaulting container name to discovery. Start Istio proxy discovery service Usage: pilot-discovery discovery [flags] Flags: -a, --appNamespace string Restrict the applications namespace the controller manages; if not set, controller watches all namespaces --cfConfig string Cloud Foundry config file --clusterRegistriesConfigMap string ConfigMap map for clusters config store --clusterRegistriesNamespace string Namespace for ConfigMap which stores clusters configs --configDir string Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client. --consulserverInterval duration Interval (in seconds) for polling the Consul service registry (default 2s) --consulserverURL string URL for the Consul server --disable-install-crds Disable discovery service from verifying the existence of CRDs at startup and then installing if not detected. It is recommended to be disable for highly available setups. --discovery_cache Enable caching discovery service responses (default true) --domain string DNS domain suffix (default "cluster.local") --grpcAddr string Discovery service grpc address (default ":15010") -h, --help help for discovery --httpAddr string Discovery service HTTP address (default ":8080") --kubeconfig string Use a Kubernetes configuration file instead of in-cluster configuration --meshConfig string File name for Istio mesh configuration. If not specified, a default mesh will be used. (default "/etc/istio/config/mesh") --monitoringAddr string HTTP address to use for the exposing pilot self-monitoring information (default ":9093") -n, --namespace string Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable --plugins stringSlice comma separated list of networking plugins to enable (default [authn,authz,health,mixer,envoyfilter]) --profile Enable profiling via web interface host:port/debug/pprof (default true) --registries stringSlice Comma separated list of platform service registries to read from (choose one or more from {Kubernetes, Consul, CloudFoundry, Mock, Config}) (default [Kubernetes]) --resync duration Controller resync interval (default 1m0s) --secureGrpcAddr string Discovery service grpc address, with https (default ":15012") --webhookEndpoint string Webhook API endpoint (supports http://sockethost, and unix:///absolute/path/to/socket Global Flags: 省略
主要参数:
名称 | 默认值 | 备注 |
---|---|---|
--appNamespace | 空 | 与 helm 安装中的 oneNamespace 对应 |
--configDir | 空 | 表明 pilot 的两种来源:配置文件和 CRD |
--discovery_cache | ture | 启动 cache,有助于提升性能 |
--domain | "cluster.local" | k8s 中域后缀 |
--grpcAddr | :15010 | |
--httpAddr | :8080 | |
--meshConfig | "/etc/istio/config/mesh" | |
--registries | Kubernetes |
代码分析
函数入口
discoveryCmd = &cobra.Command{ Use: "discovery", Short: "Start Istio proxy discovery service.", Args: cobra.ExactArgs(0), RunE: func(c *cobra.Command, args []string) error { // ... // Create the stop channel for all of the servers. stop := make(chan struct{}) // Create the server for the discovery service. discoveryServer, err := bootstrap.NewServer(serverArgs) if err != nil { return fmt.Errorf("failed to create discovery service: %v", err) } // Start the server if err := discoveryServer.Start(stop); err != nil { return fmt.Errorf("failed to start discovery service: %v", err) } cmd.WaitSignal(stop) return nil }, }
istio.io/istio/pilot/pkg/bootstrap/server.go
mesh 的默认配置参见:https://gist.github.com/DavadDi/f110459d339e260f818250287fc78ccc#file-mesh
// NewServer creates a new Server instance based on the provided arguments. func NewServer(args PilotArgs) (*Server, error) { // ... s := &Server{ filewatcher: filewatcher.NewWatcher(), } // 省略错误处理 s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient s.initMesh(&args) // 初始化配置,并添加到 filewatcher 中, /etc/istio/config/mesh // 1.0.5 版本的 cmd 中未包括,应该是 1.1 中添加 // serverArgs.NetworksConfigFile, "networksConfig", "/etc/istio/config/meshNetworks" // 初始化配置,并加入到 filewatcher 中监听 s.initMeshNetworks(&args) // initMixerSan configures the mixerSAN configuration item. // The mesh must already have been configured. s.initMixerSan(&args) // creates the config controller in the pilotConfig. // 最终会创建一个 crd.NewController 实例 s.initConfigController(&args) /* 内部以 kube 为例,表明主要流程 controller, err := s.makeKubeConfigController(args) s.configController = controller s.addStartFunc(func(stop <-chan struct{}) error { go s.configController.Run(stop) // 1. 第一个启动的 configController }) */ // creates and initializes the service controllers s.initServiceControllers(&args) /* s.createK8sServiceControllers(serviceControllers, args) --> kube.NewController(s.kubeClient, args.Config.ControllerOptions) s.addStartFunc(func(stop <-chan struct{}) error { go s.ServiceController.Run(stop) // 2. 第二个启动的 ServiceController return nil }) */ // 初始化 DiscoveryService gRPC 服务器,后面详细讲解 // 添加2个 func 到 startFuncs 中 s.initDiscoveryService(&args) // initializes the configuration for the pilot monitoring server. // 添加1个 func 到 startFuncs 中 s.initMonitor(&args) // starts the secret controller to watch for remote // clusters and initialize the multicluster structures. // 暂时不分析 s.initClusterRegistries(&args) return s, nil } // 将 NewServer 函数中初始化的函数依次启动起来 func (s *Server) Start(stop <-chan struct{}) error { // Now start all of the components. for _, fn := range s.startFuncs { fn(stop) } }
istio.io/api/mesh/v1alpha1/network.pb.go
其中 MeshNetworks 结构体和定义说明如下:
// MeshNetworks (config map) provides information about the set of networks // inside a mesh and how to route to endpoints in each network. For example // // MeshNetworks(file/config map): // networks: // - network1: // - endpoints: // - fromRegistry: registry1 #must match secret name in kubernetes // - fromCidr: 192.168.100.0/22 #a VM network for example // gateways: // - registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local // port: 15443 // locality: us-east-1a type MeshNetworks struct { // REQUIRED: The set of networks inside this mesh. Each network should // have a unique name and information about how to infer the endpoints in // the network as well as the gateways associated with the network. Networks map[string]*Network `protobuf:"bytes,1,rep,name=networks" json:"networks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` }
经过对于 NewServer 函数的计划分析如下:
func NewServer(args PilotArgs) (*Server, error) { // 省略错误处理 s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient // 初始化相关配置,并监视配置的变化情况 // ... // creates the config controller in the pilotConfig. // 最终会创建一个 crd.NewController 实例 s.initConfigController(&args) // s.makeKubeConfigController(args) // s.configController.Run(stop) // creates and initializes the service controllers s.initServiceControllers(&args) // kube.NewController(s.kubeClient, args.Config.ControllerOptions) // go s.ServiceController.Run(stop) // 初始化 DiscoveryService gRPC 服务器,后面详细讲解 // 添加 2 个 func 到 startFuncs 中 s.initDiscoveryService(&args) return s, nil }
ConfigController
// initConfigController creates the config controller in the pilotConfig. func (s *Server) initConfigController(args *PilotArgs) error { // k8s 方式下初始化 controller, err := s.makeKubeConfigController(args) s.configController = controller // Defer starting the controller until after the service is created. s.addStartFunc(func(stop <-chan struct{}) error { go s.configController.Run(stop) return nil }) //... // Create the config store. s.istioConfigStore = model.MakeIstioStore(s.configController) return nil }
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) { kubeCfgFile := s.getKubeCfgFile(args) configClient, err := crd.NewClient( kubeCfgFile, "", model.IstioConfigTypes, // 全部相关的 CRD 定义 args.Config.ControllerOptions.DomainSuffix) if !args.Config.DisableInstallCRDs { // 注册自定义的 CRD configClient.RegisterResources() } return crd.NewController(configClient, args.Config.ControllerOptions), nil }
其中 crd.NewClient 中的参数 model.IstioConfigTypes 包含了相关的全部 CRD 的定义:
istio.io/istio/pilot/pkg/model/config.go
// IstioConfigTypes lists all Istio config types with schemas and validation IstioConfigTypes = ConfigDescriptor{ VirtualService, Gateway, ServiceEntry, DestinationRule, EnvoyFilter, Sidecar, HTTPAPISpec, HTTPAPISpecBinding, QuotaSpec, QuotaSpecBinding, AuthenticationPolicy, AuthenticationMeshPolicy, ServiceRole, ServiceRoleBinding, RbacConfig, ClusterRbacConfig, }
其中 crd.NewController 定义在 istio.io/istio/pilot/pkg/config/kube/crd/controller.go 文件中:
// NewController creates a new Kubernetes controller for CRDs // Use "" for namespace to listen for all namespace changes func NewController(client *Client, options kube.ControllerOptions) model.ConfigStoreCache { log.Infof("CRD controller watching namespaces %q", options.WatchedNamespace) // Queue requires a time duration for a retry delay after a handler error out := &controller{ client: client, queue: kube.NewQueue(1 * time.Second), kinds: make(map[string]cacheHandler), } // add stores for CRD kinds for _, schema := range client.ConfigDescriptor() { out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod) } return out }
对于 CRD 的监视,每一类资源需要启动一个单独的客户端连接,目前 CRD 的 Group 主要有 network/config/authentication/rbac 等;
// controller is a collection of synchronized resource watchers. // Caches are thread-safe type controller struct { client *Client // 每类资源一个连接 queue kube.Queue kinds map[string]cacheHandler } type cacheHandler struct { informer cache.SharedIndexInformer handler *kube.ChainHandler }
// Client is a basic REST client for CRDs implementing config store type Client struct { // Map of apiVersion to restClient. clientset map[string]*restClient // domainSuffix for the config metadata domainSuffix string }
ServiceController
istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go
结构定义如下:
type Controller struct { domainSuffix string client kubernetes.Interface queue Queue services cacheHandler endpoints cacheHandler nodes cacheHandler pods *PodCache //.... }
NewController 函数定义如下:
// NewController creates a new Kubernetes controller // Created by bootstrap and multicluster (see secretcontroler). func NewController(client kubernetes.Interface, options ControllerOptions) *Controller { log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s", options.WatchedNamespace, options.ResyncPeriod) // Queue requires a time duration for a retry delay after a handler error out := &Controller{ domainSuffix: options.DomainSuffix, client: client, queue: NewQueue(1 * time.Second), ClusterID: options.ClusterID, XDSUpdater: options.XDSUpdater, servicesMap: make(map[model.Hostname]*model.Service), externalNameSvcInstanceMap: make(map[model.Hostname][]*model.ServiceInstance), } sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace)) svcInformer := sharedInformers.Core().V1().Services().Informer() out.services = out.createCacheHandler(svcInformer, "Services") epInformer := sharedInformers.Core().V1().Endpoints().Informer() out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints") nodeInformer := sharedInformers.Core().V1().Nodes().Informer() out.nodes = out.createCacheHandler(nodeInformer, "Nodes") podInformer := sharedInformers.Core().V1().Pods().Informer() out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out) return out }
从上面代码可以很清晰看到,ServiceController 监听特定 namespace 下的以下资源:
- Services
- Endpoints
- Nodes
- Pod
ServiceDiscovery
istio.io/istio/pilot/pkg/bootstrap/server.go
func (s *Server) initDiscoveryService(args *PilotArgs) error { // 需要注意 env 保存了后面使用的变量包括 // s.istioConfigStore s.ServiceController s.ServiceController environment := &model.Environment{ Mesh: s.mesh, MeshNetworks: s.meshNetworks, IstioConfigStore: s.istioConfigStore, // istio routing rules ServiceDiscovery: s.ServiceController, // service list ServiceAccounts: s.ServiceController, MixerSAN: s.mixerSAN, } // Set up discovery service discovery, err := envoy.NewDiscoveryService( environment, args.DiscoveryOptions, ) s.mux = discovery.RestContainer.ServeMux // 创建 envoyv2.NewDiscoveryServer 对应的 gRPC Server s.EnvoyXdsServer = envoyv2.NewDiscoveryServer( environment, istio_networking.NewConfigGenerator(args.Plugins), s.ServiceController, s.configController) s.EnvoyXdsServer.InitDebug(s.mux, s.ServiceController) // ... // create grpc/http server s.initGrpcServer(args.KeepaliveOptions) s.httpServer = &http.Server{ Addr: args.DiscoveryOptions.HTTPAddr, Handler: s.mux, } // create http listener listener, err := net.Listen("tcp", args.DiscoveryOptions.HTTPAddr) s.HTTPListeningAddr = listener.Addr() // create grpc listener grpcListener, err := net.Listen("tcp", args.DiscoveryOptions.GrpcAddr) s.GRPCListeningAddr = grpcListener.Addr() s.addStartFunc(func(stop <-chan struct{}) error { // 启动 http server goroutine // 启动 gRPC server goroutine // 等待关闭 goroutine return nil }) // run secure grpc server return nil }
istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go, envoyv2.NewDiscoveryServer
函数定义:
// s.EnvoyXdsServer = envoyv2.NewDiscoveryServer( // environment, // istio_networking.NewConfigGenerator(args.Plugins), // s.ServiceController, // s.configController) // NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures func NewDiscoveryServer(env *model.Environment, generator core.ConfigGenerator, ctl model.Controller, configCache model.ConfigStoreCache) *DiscoveryServer { // 创建 DiscoveryServer 对象 out := &DiscoveryServer{ Env: env, ConfigGenerator: generator, // generates xDS responses ConfigController: configCache, EndpointShardsByService: map[string]*EndpointShards{}, WorkloadsByID: map[string]*Workload{}, edsUpdates: map[string]*EndpointShards{}, concurrentPushLimit: make(chan struct{}, 20), updateChannel: make(chan *updateReq, 10), } env.PushContext = model.NewPushContext() // 处理相关的更新操作 // handleUpdates处理来自 updateChannel 的事件它确保自上次事件处理之前至少已经过了minQuiet时间。 // 它还确保在接收事件和处理事件之间最多经过 maxDelay。 // 最后调用 doPush 函数进行推送,根据全量推送标记,将最近更新的 eds 相关信息通过 // XDS Incremental Push 或者 全量推送出去 go out.handleUpdates() // 以下三种清空 DiscoveryServer 的本地缓存,并注册清理缓存的函数 // 1. service 相关的信息有变化的时候, // 2. jwt public key 发生变化 // 3. Istio CRD 有变化的 // 当以上三种任一情况发生的时候,会设置信息到 out.handleUpdates() 函数 // 周期性更新 go out.periodicRefresh() // 周期性更新 metrics go out.periodicRefreshMetrics() out.DebugConfigs = pilot.DebugConfigs pushThrottle := intEnv(pilot.PushThrottle, 10) pushBurst := intEnv(pilot.PushBurst, 100) adsLog.Infof("Starting ADS server with rateLimiter=%d burst=%d", pushThrottle, pushBurst) out.rateLimiter = rate.NewLimiter(rate.Limit(pushThrottle), pushBurst) out.initRateLimiter = rate.NewLimiter(rate.Limit(pushThrottle*2), pushBurst*2) return out }
ADS 相关定义:
https://github.com/envoyproxy/data-plane-api/blob/master/envoy/service/discovery/v2/ads.proto
// See https://github.com/lyft/envoy-api#apis for a description of the role of // ADS and how it is intended to be used by a management server. ADS requests // have the same structure as their singleton xDS counterparts, but can // multiplex many resource types on a single stream. The type_url in the // DiscoveryRequest/DiscoveryResponse provides sufficient information to recover // the multiplexed singleton APIs at the Envoy instance and management server. service AggregatedDiscoveryService { // This is a gRPC-only API. rpc StreamAggregatedResources(stream envoy.api.v2.DiscoveryRequest) returns (stream envoy.api.v2.DiscoveryResponse) { } rpc IncrementalAggregatedResources(stream envoy.api.v2.IncrementalDiscoveryRequest) returns (stream envoy.api.v2.IncrementalDiscoveryResponse) { } }
https://github.com/envoyproxy/data-plane-api/blob/master/envoy/api/v2/discovery.proto
// A DiscoveryRequest requests a set of versioned resources of the same type for // a given Envoy node on some API. message DiscoveryRequest { // The version_info provided in the request messages will be the version_info // received with the most recent successfully processed response or empty on // the first request. It is expected that no new request is sent after a // response is received until the Envoy instance is ready to ACK/NACK the new // configuration. ACK/NACK takes place by returning the new API config version // as applied or the previous API config version respectively. Each type_url // (see below) has an independent version associated with it. string version_info = 1; // The node making the request. core.Node node = 2; // List of resources to subscribe to, e.g. list of cluster names or a route // configuration name. If this is empty, all resources for the API are // returned. LDS/CDS expect empty resource_names, since this is global // discovery for the Envoy instance. The LDS and CDS responses will then imply // a number of resources that need to be fetched via EDS/RDS, which will be // explicitly enumerated in resource_names. repeated string resource_names = 3; // Type of the resource that is being requested, e.g. // "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment". This is implicit // in requests made via singleton xDS APIs such as CDS, LDS, etc. but is // required for ADS. string type_url = 4; // nonce corresponding to DiscoveryResponse being ACK/NACKed. See above // discussion on version_info and the DiscoveryResponse nonce comment. This // may be empty if no nonce is available, e.g. at startup or for non-stream // xDS implementations. string response_nonce = 5; // This is populated when the previous :ref:`DiscoveryResponse <envoy_api_msg_DiscoveryResponse>` // failed to update configuration. The *message* field in *error_details* provides the Envoy // internal exception related to the failure. It is only intended for consumption during manual // debugging, the string provided is not guaranteed to be stable across Envoy versions. google.rpc.Status error_detail = 6; } message DiscoveryResponse { // The version of the response data. string version_info = 1; // The response resources. These resources are typed and depend on the API being called. repeated google.protobuf.Any resources = 2 [(gogoproto.nullable) = false]; // [#not-implemented-hide:] // Canary is used to support two Envoy command line flags: // // * --terminate-on-canary-transition-failure. When set, Envoy is able to // terminate if it detects that configuration is stuck at canary. Consider // this example sequence of updates: // - Management server applies a canary config successfully. // - Management server rolls back to a production config. // - Envoy rejects the new production config. // Since there is no sensible way to continue receiving configuration // updates, Envoy will then terminate and apply production config from a // clean slate. // * --dry-run-canary. When set, a canary response will never be applied, only // validated via a dry run. bool canary = 3; // Type URL for resources. This must be consistent with the type_url in the // Any messages for resources if resources is non-empty. This effectively // identifies the xDS API when muxing over ADS. string type_url = 4; // For gRPC based subscriptions, the nonce provides a way to explicitly ack a // specific DiscoveryResponse in a following DiscoveryRequest. Additional // messages may have been sent by Envoy to the management server for the // previous version on the stream prior to this DiscoveryResponse, that were // unprocessed at response send time. The nonce allows the management server // to ignore any further DiscoveryRequests for the previous version until a // DiscoveryRequest bearing the nonce. The nonce is optional and is not // required for non-stream based xDS implementations. string nonce = 5; // [#not-implemented-hide:] // The control plane instance that sent the response. core.ControlPlane control_plane = 6; }
关于 ADS 保证一致性的内容参见: envoy-的-xds-rest-和-grpc-协议详解 中的 ”最终一致性考虑“ 章节:
一般来说,为避免流量丢弃,更新的顺序应该遵循 make before break 模型,其中
* 必须始终先推送 CDS 更新(如果有)。
* EDS 更新(如果有)必须在相应集群的 CDS 更新后到达。
* LDS 更新必须在相应的 CDS/EDS 更新后到达。
* 与新添加的监听器相关的 RDS 更新必须在最后到达。
* 最后,删除过期的 CDS 集群和相关的 EDS 端点(不再被引用的端点)。
ADS 允许单一管理服务器通过单个 gRPC 流,提供所有的 API 更新。配合仔细规划的更新顺序,ADS 可规避更新过程中流量丢失。
pilot/pkg/proxy/envoy/v2/ads.go
// StreamAggregatedResources implements the ADS interface. func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { // ... // InitContext returns immediately if the context was already initialized. // InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析 // 1. initServiceRegistry // 2. initVirtualServices // 3. initDestinationRules // 4. initAuthorizationPolicies // 5. InitSidecarScopes err := s.globalPushContext().InitContext(s.Env) con := newXdsConnection(peerAddr, stream) reqChannel := make(chan *xdsapi.DiscoveryRequest, 1) // 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest go receiveThread(con, reqChannel, &receiveError) for { // Block until either a request is received or a push is triggered. select { case discReq, ok := <-reqChannel: err = s.initConnectionNode(discReq, con) switch discReq.TypeUrl { case ClusterType: // ... // CDS REQ is the first request an envoy makes. This shows up // immediately after connect. It is followed by EDS REQ as // soon as the CDS push is returned. adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String()) con.CDSWatch = true err := s.pushCds(con, s.globalPushContext(), versionInfo()) case ListenerType: // ... adsLog.Debugf("ADS:LDS: REQ %s %v", con.ConID, peerAddr) con.LDSWatch = true err := s.pushLds(con, s.globalPushContext(), true, versionInfo()) case RouteType: // ... adsLog.Debugf("ADS:RDS: REQ %s %s routes: %d", peerAddr, con.ConID, len(con.Routes)) err := s.pushRoute(con, s.globalPushContext()) case EndpointType: // ... // 各种错误处理 for _, cn := range con.Clusters { s.removeEdsCon(cn, con.ConID, con) } for _, cn := range clusters { s.addEdsCon(cn, con.ConID, con) } con.Clusters = clusters adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters)) err := s.pushEds(s.globalPushContext(), con, true, nil) if err != nil { return err } default: adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String()) } // ... case pushEv := <-con.pushChannel: // ... err := s.pushConnection(con, pushEv) if err != nil { return nil } } } }
istio.io/istio/pilot/pkg/model/push_context.go
// PushContext tracks the status of a push - metrics and errors. // Metrics are reset after a push - at the beginning all // values are zero, and when push completes the status is reset. // The struct is exposed in a debug endpoint - fields public to allow // easy serialization as json. type PushContext struct { // privateServices are reachable within the same namespace. privateServicesByNamespace map[string][]*Service // publicServices are services reachable within the mesh. publicServices []*Service privateVirtualServicesByNamespace map[string][]Config publicVirtualServices []Config // destination rules are of three types: // namespaceLocalDestRules: all public/private dest rules pertaining to a service defined in a given namespace // namespaceExportedDestRules: all public dest rules pertaining to a service defined in a namespace // allExportedDestRules: all (public) dest rules across all namespaces // We need the allExportedDestRules in addition to namespaceExportedDestRules because we select // the dest rule based on the most specific host match, and not just any destination rule namespaceLocalDestRules map[string]*processedDestRules namespaceExportedDestRules map[string]*processedDestRules allExportedDestRules *processedDestRules // sidecars for each namespace sidecarsByNamespace map[string][]*SidecarScope ////////// END //////// // The following data is either a global index or used in the inbound path. // Namespace specific views do not apply here. // ServiceByHostname has all services, indexed by hostname. ServiceByHostname map[Hostname]*Service `json:"-"` // AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there // are no authorization policies in the cluster. AuthzPolicies *AuthorizationPolicies `json:"-"` // ServicePort2Name is used to keep track of service name and port mapping. // This is needed because ADS names use port numbers, while endpoints use // port names. The key is the service name. If a service or port are not found, // the endpoint needs to be re-evaluated later (eventual consistency) ServicePort2Name map[string]PortList `json:"-"` initDone bool } // InitContext will initialize the data structures used for code generation. // This should be called before starting the push, from the thread creating // the push context. func (ps *PushContext) InitContext(env *Environment) error { ps.Mutex.Lock() defer ps.Mutex.Unlock() if ps.initDone { return nil } ps.Env = env var err error // Caches list of services in the registry, and creates a map // of hostname to service -> ServicePort2Name map[string]PortList `json:"-"` ps.initServiceRegistry(env) // Caches list of virtual services -> publicVirtualServices []Config ps.initVirtualServices(env) // Split out of DestinationRule expensive conversions - once per push. // 最后保存到以下三个变量中: // * namespaceLocalDestRules map[string]*processedDestRules // * namespaceExportedDestRules map[string]*processedDestRules // * allExportedDestRules *processedDestRules ps.initDestinationRules(env) // Get the ClusterRbacConfig -> AuthzPolicies *AuthorizationPolicies ps.initAuthorizationPolicies(env) // Must be initialized in the end -> sidecarsByNamespace map[string][]*SidecarScope ps.InitSidecarScopes(env) ps.initDone = true return nil }
CDS 为 ADS 中第一个发送的信息,后续我们以 CDS 为例进行详细分析
// StreamAggregatedResources implements the ADS interface. func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { // ... // InitContext returns immediately if the context was already initialized. // InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析 // 1. initServiceRegistry // 2. initVirtualServices // 3. initDestinationRules // 4. initAuthorizationPolicies // 5. InitSidecarScopes err := s.globalPushContext().InitContext(s.Env) con := newXdsConnection(peerAddr, stream) reqChannel := make(chan *xdsapi.DiscoveryRequest, 1) // 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest go receiveThread(con, reqChannel, &receiveError) for { // Block until either a request is received or a push is triggered. select { case discReq, ok := <-reqChannel: // 主要是调用 `ParseServiceNodeWithMetadata` 函数, // 从 Req 的消息中获取到各种信息,并生产 `Proxy` 对象 // 当前 discReq.Node.Id 格式为 Type~IPAddress~ID~Domain err = s.initConnectionNode(discReq, con) switch discReq.TypeUrl { case ClusterType: // 如果已经发送过 CS 数据后的响应消息的处理 if con.CDSWatch { // ... } // CDS REQ is the first request an envoy makes. This shows up // immediately after connect. It is followed by EDS REQ as // soon as the CDS push is returned. adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String()) con.CDSWatch = true err := s.pushCds(con, s.globalPushContext(), versionInfo()) if err != nil { return err } //... }
Identifies a specific Envoy instance. The node identifier is presented to the management server, which may use this identifier to distinguish per Envoy configuration for serving.{
"id": "...",
"cluster": "...",
"metadata": "{...}",
"locality": "{...}",
"build_version": "..."
}
id
(string) An opaque node identifier for the Envoy node. This also provides the local service node name. It should be set if any of the following features are used: statsd, CDS, and HTTP tracing, either in this message or via --service-node.
cluster
(string) Defines the local service cluster name where Envoy is running. Though optional, it should be set if any of the following features are used: statsd, health check cluster verification, runtime override directory, user agent addition, HTTP global rate limiting, CDS, and HTTP tracing, either in this message or via --service-cluster.
metadata
(Struct) Opaque metadata extending the node identifier. Envoy will pass this directly to the management server.
locality
(core.Locality) Locality specifying where the Envoy instance is running.
build_version
(string) This is motivated by informing a management server during canary which version of Envoy is being tested in a heterogeneous fleet. This will be set by Envoy in management server RPCs.
在 initConnectionNode
函数中,主要是调用 ParseServiceNodeWithMetadata
函数,从 Req 的消息中获取到各种信息,并生产 Proxy
对象;
istio.io/istio/pilot/pkg/model/context.go
func ParseServiceNodeWithMetadata(s string, metadata map[string]string) (*Proxy, error) { }
// Proxy contains information about an specific instance of a proxy (envoy sidecar, gateway, // etc). The Proxy is initialized when a sidecar connects to Pilot, and populated from // 'node' info in the protocol as well as data extracted from registries. // // In current Istio implementation nodes use a 4-parts '~' delimited ID. // Type~IPAddress~ID~Domain type Proxy struct { // ClusterID specifies the cluster where the proxy resides. // TODO: clarify if this is needed in the new 'network' model, likely needs to // be renamed to 'network' ClusterID string // Type specifies the node type. First part of the ID. Type NodeType // IPAddresses is the IP addresses of the proxy used to identify it and its // co-located service instances. Example: "10.60.1.6". In some cases, the host // where the poxy and service instances reside may have more than one IP address IPAddresses []string // ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and // namespace. ID string // Locality is the location of where Envoy proxy runs. Locality Locality // DNSDomain defines the DNS domain suffix for short hostnames (e.g. // "default.svc.cluster.local") DNSDomain string // TrustDomain defines the trust domain of the certificate TrustDomain string // ConfigNamespace defines the namespace where this proxy resides // for the purposes of network scoping. // NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES ConfigNamespace string // Metadata key-value pairs extending the Node identifier Metadata map[string]string // the sidecarScope associated with the proxy SidecarScope *SidecarScope }
推送 CDS 的核心实现通过函数 s.pushCds(con, s.globalPushContext(), versionInfo())
istio.io/istio/pilot/pkg/proxy/envoy/v2/cds.go
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error { // 通过当前的 con.modelNode 和 push 的上下文生成对应的 rawClusters 对象 []*xdsapi.Cluster rawClusters, err := s.generateRawClusters(con.modelNode, push) response := con.clusters(rawClusters) err = con.send(response) return nil }
因此 s.generateRawClusters(con.modelNode, push)
的作用不言而喻,就是将 push
上下文中与 CDS 相关的数据整理并封装成 CDS Reponse 的格式。
func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) ([]*xdsapi.Cluster, error) { rawClusters, err := s.ConfigGenerator.BuildClusters(s.Env, node, push) // 对 rawClusters 中的信息进行 Validate 验证 return rawClusters, nil }
对象 rawClusters
是通过 s.ConfigGenerator.BuildClusters
函数基于 s.Env, node, push
三者的信息组合而生成出来的:
- s.Env 保存了 ServiceController 和 ConfigController 等资源的本地缓存信息
-
node 为本次 Req 请求中生成的包含 Node 相关信息的对象
-
push 为本次推送的上下文,已经将本次推送过程中需要的信息完成了初步的整理(从 s.Env 中生成出来的)
istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go
BuildClusters 函数的实现如下:
// BuildClusters returns the list of clusters for the given proxy. This is the CDS output // For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname // Cluster type based on resolution // For inbound (sidecar only): Cluster for each inbound endpoint port and for each service port func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) { clusters := make([]*apiv2.Cluster, 0) switch proxy.Type { case model.SidecarProxy: // GetProxyServiceInstances returns service instances co-located with the proxy // 获取与 proxy 所在主机上的 service instance,包括 headless 服务 instances, err := env.GetProxyServiceInstances(proxy) sidecarScope := proxy.SidecarScope recomputeOutboundClusters := true // 追加 OutboundClusters if recomputeOutboundClusters { clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...) } // Let ServiceDiscovery decide which IP and Port are used for management if // there are multiple IPs managementPorts := make([]*model.Port, 0) for _, ip := range proxy.IPAddresses { managementPorts = append(managementPorts, env.ManagementPorts(ip)...) } // 追加与 proxy ip 上 managementPorts 相关的 InboundClusters clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...) default: // Gateways // ... } // Add a blackhole and passthrough cluster for catching traffic to unresolved routes // DO NOT CALL PLUGINS for these two clusters. clusters = append(clusters, buildBlackHoleCluster()) clusters = append(clusters, buildDefaultPassthroughCluster()) // resolves cluster name conflicts. return normalizeClusters(push, proxy, clusters), nil }
临时的调试方法:
istio.io/istio/pilot/pkg/proxy/envoy/v2/debug.go
// InitDebug initializes the debug handlers and adds a debug in-memory registry. func (s *DiscoveryServer) InitDebug(mux *http.ServeMux, sctl *aggregate.Controller) { // For debugging and load testing v2 we add an memory registry. s.MemRegistry = NewMemServiceDiscovery( map[model.Hostname]*model.Service{ // mock.HelloService.Hostname: mock.HelloService, }, 2) s.MemRegistry.EDSUpdater = s s.MemRegistry.ClusterID = "v2-debug" sctl.AddRegistry(aggregate.Registry{ ClusterID: "v2-debug", Name: serviceregistry.ServiceRegistry("memAdapter"), ServiceDiscovery: s.MemRegistry, ServiceAccounts: s.MemRegistry, Controller: s.MemRegistry.controller, }) mux.HandleFunc("/ready", s.ready) mux.HandleFunc("/debug/edsz", s.edsz) mux.HandleFunc("/debug/adsz", s.adsz) mux.HandleFunc("/debug/cdsz", cdsz) mux.HandleFunc("/debug/syncz", Syncz) mux.HandleFunc("/debug/registryz", s.registryz) mux.HandleFunc("/debug/endpointz", s.endpointz) mux.HandleFunc("/debug/endpointShardz", s.endpointShardz) mux.HandleFunc("/debug/workloadz", s.workloadz) mux.HandleFunc("/debug/configz", s.configz) mux.HandleFunc("/debug/authenticationz", s.authenticationz) mux.HandleFunc("/debug/config_dump", s.ConfigDump) mux.HandleFunc("/debug/push_status", s.PushStatusHandler) }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
ANSI Common Lisp
Paul Graham / Prentice Hall / 1995-11-12 / USD 116.40
For use as a core text supplement in any course covering common LISP such as Artificial Intelligence or Concepts of Programming Languages. Teaching students new and more powerful ways of thinking abo......一起来看看 《ANSI Common Lisp》 这本书的介绍吧!