内容简介:[TOC]完整的 yaml 文件参见
[TOC]
架构
介绍
完整的 yaml 文件参见 pilot-yaml 。
Dockerfile
FROM istionightly/base_debug ADD pilot-discovery /usr/local/bin/ ADD cacert.pem /cacert.pem ENTRYPOINT ["/usr/local/bin/pilot-discovery"]
启动命令行
# ps -ef -www|grep pilot $/usr/local/bin/pilot-discovery discovery
命令行帮助
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery --help Usage: pilot-discovery [command] Available Commands: discovery Start Istio proxy discovery service help Help about any command request Makes an HTTP request to Pilot metrics/debug endpoint version Prints out build version information
discovery 相关的帮助
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery discovery --help
Defaulting container name to discovery.
Start Istio proxy discovery service
Usage:
pilot-discovery discovery [flags]
Flags:
-a, --appNamespace string Restrict the applications namespace the controller manages; if not set, controller watches all namespaces
--cfConfig string Cloud Foundry config file
--clusterRegistriesConfigMap string ConfigMap map for clusters config store
--clusterRegistriesNamespace string Namespace for ConfigMap which stores clusters configs
--configDir string Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client.
--consulserverInterval duration Interval (in seconds) for polling the Consul service registry (default 2s)
--consulserverURL string URL for the Consul server
--disable-install-crds Disable discovery service from verifying the existence of CRDs at startup and then installing if not detected. It is recommended to be disable for highly available setups.
--discovery_cache Enable caching discovery service responses (default true)
--domain string DNS domain suffix (default "cluster.local")
--grpcAddr string Discovery service grpc address (default ":15010")
-h, --help help for discovery
--httpAddr string Discovery service HTTP address (default ":8080")
--kubeconfig string Use a Kubernetes configuration file instead of in-cluster configuration
--meshConfig string File name for Istio mesh configuration. If not specified, a default mesh will be used. (default "/etc/istio/config/mesh")
--monitoringAddr string HTTP address to use for the exposing pilot self-monitoring information (default ":9093")
-n, --namespace string Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable
--plugins stringSlice comma separated list of networking plugins to enable (default [authn,authz,health,mixer,envoyfilter])
--profile Enable profiling via web interface host:port/debug/pprof (default true)
--registries stringSlice Comma separated list of platform service registries to read from (choose one or more from {Kubernetes, Consul, CloudFoundry, Mock, Config}) (default [Kubernetes])
--resync duration Controller resync interval (default 1m0s)
--secureGrpcAddr string Discovery service grpc address, with https (default ":15012")
--webhookEndpoint string Webhook API endpoint (supports http://sockethost, and unix:///absolute/path/to/socket
Global Flags:
省略
主要参数:
| 名称 | 默认值 | 备注 |
|---|---|---|
| --appNamespace | 空 | 与 helm 安装中的 oneNamespace 对应 |
| --configDir | 空 | 表明 pilot 的两种来源:配置文件和 CRD |
| --discovery_cache | ture | 启动 cache,有助于提升性能 |
| --domain | "cluster.local" | k8s 中域后缀 |
| --grpcAddr | :15010 | |
| --httpAddr | :8080 | |
| --meshConfig | "/etc/istio/config/mesh" | |
| --registries | Kubernetes |
代码分析
函数入口
discoveryCmd = &cobra.Command{
Use: "discovery",
Short: "Start Istio proxy discovery service.",
Args: cobra.ExactArgs(0),
RunE: func(c *cobra.Command, args []string) error {
// ...
// Create the stop channel for all of the servers.
stop := make(chan struct{})
// Create the server for the discovery service.
discoveryServer, err := bootstrap.NewServer(serverArgs)
if err != nil {
return fmt.Errorf("failed to create discovery service: %v", err)
}
// Start the server
if err := discoveryServer.Start(stop); err != nil {
return fmt.Errorf("failed to start discovery service: %v", err)
}
cmd.WaitSignal(stop)
return nil
},
}
istio.io/istio/pilot/pkg/bootstrap/server.go
mesh 的默认配置参见:https://gist.github.com/DavadDi/f110459d339e260f818250287fc78ccc#file-mesh
// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args PilotArgs) (*Server, error) {
// ...
s := &Server{
filewatcher: filewatcher.NewWatcher(),
}
// 省略错误处理
s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient
s.initMesh(&args) // 初始化配置,并添加到 filewatcher 中, /etc/istio/config/mesh
// 1.0.5 版本的 cmd 中未包括,应该是 1.1 中添加
// serverArgs.NetworksConfigFile, "networksConfig", "/etc/istio/config/meshNetworks"
// 初始化配置,并加入到 filewatcher 中监听
s.initMeshNetworks(&args)
// initMixerSan configures the mixerSAN configuration item.
// The mesh must already have been configured.
s.initMixerSan(&args)
// creates the config controller in the pilotConfig.
// 最终会创建一个 crd.NewController 实例
s.initConfigController(&args)
/* 内部以 kube 为例,表明主要流程
controller, err := s.makeKubeConfigController(args)
s.configController = controller
s.addStartFunc(func(stop <-chan struct{}) error {
go s.configController.Run(stop) // 1. 第一个启动的 configController
})
*/
// creates and initializes the service controllers
s.initServiceControllers(&args)
/*
s.createK8sServiceControllers(serviceControllers, args)
--> kube.NewController(s.kubeClient, args.Config.ControllerOptions)
s.addStartFunc(func(stop <-chan struct{}) error {
go s.ServiceController.Run(stop) // 2. 第二个启动的 ServiceController
return nil
})
*/
// 初始化 DiscoveryService gRPC 服务器,后面详细讲解
// 添加2个 func 到 startFuncs 中
s.initDiscoveryService(&args)
// initializes the configuration for the pilot monitoring server.
// 添加1个 func 到 startFuncs 中
s.initMonitor(&args)
// starts the secret controller to watch for remote
// clusters and initialize the multicluster structures.
// 暂时不分析
s.initClusterRegistries(&args)
return s, nil
}
// 将 NewServer 函数中初始化的函数依次启动起来
func (s *Server) Start(stop <-chan struct{}) error {
// Now start all of the components.
for _, fn := range s.startFuncs {
fn(stop)
}
}
istio.io/api/mesh/v1alpha1/network.pb.go
其中 MeshNetworks 结构体和定义说明如下:
// MeshNetworks (config map) provides information about the set of networks
// inside a mesh and how to route to endpoints in each network. For example
//
// MeshNetworks(file/config map):
// networks:
// - network1:
// - endpoints:
// - fromRegistry: registry1 #must match secret name in kubernetes
// - fromCidr: 192.168.100.0/22 #a VM network for example
// gateways:
// - registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local
// port: 15443
// locality: us-east-1a
type MeshNetworks struct {
// REQUIRED: The set of networks inside this mesh. Each network should
// have a unique name and information about how to infer the endpoints in
// the network as well as the gateways associated with the network.
Networks map[string]*Network `protobuf:"bytes,1,rep,name=networks" json:"networks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"`
}
经过对于 NewServer 函数的计划分析如下:
func NewServer(args PilotArgs) (*Server, error) {
// 省略错误处理
s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient
// 初始化相关配置,并监视配置的变化情况
// ...
// creates the config controller in the pilotConfig.
// 最终会创建一个 crd.NewController 实例
s.initConfigController(&args)
// s.makeKubeConfigController(args)
// s.configController.Run(stop)
// creates and initializes the service controllers
s.initServiceControllers(&args)
// kube.NewController(s.kubeClient, args.Config.ControllerOptions)
// go s.ServiceController.Run(stop)
// 初始化 DiscoveryService gRPC 服务器,后面详细讲解
// 添加 2 个 func 到 startFuncs 中
s.initDiscoveryService(&args)
return s, nil
}
ConfigController
// initConfigController creates the config controller in the pilotConfig.
func (s *Server) initConfigController(args *PilotArgs) error {
// k8s 方式下初始化
controller, err := s.makeKubeConfigController(args)
s.configController = controller
// Defer starting the controller until after the service is created.
s.addStartFunc(func(stop <-chan struct{}) error {
go s.configController.Run(stop)
return nil
})
//...
// Create the config store.
s.istioConfigStore = model.MakeIstioStore(s.configController)
return nil
}
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) {
kubeCfgFile := s.getKubeCfgFile(args)
configClient, err := crd.NewClient(
kubeCfgFile, "",
model.IstioConfigTypes, // 全部相关的 CRD 定义
args.Config.ControllerOptions.DomainSuffix)
if !args.Config.DisableInstallCRDs {
// 注册自定义的 CRD
configClient.RegisterResources()
}
return crd.NewController(configClient, args.Config.ControllerOptions), nil
}
其中 crd.NewClient 中的参数 model.IstioConfigTypes 包含了相关的全部 CRD 的定义:
istio.io/istio/pilot/pkg/model/config.go
// IstioConfigTypes lists all Istio config types with schemas and validation
IstioConfigTypes = ConfigDescriptor{
VirtualService,
Gateway,
ServiceEntry,
DestinationRule,
EnvoyFilter,
Sidecar,
HTTPAPISpec,
HTTPAPISpecBinding,
QuotaSpec,
QuotaSpecBinding,
AuthenticationPolicy,
AuthenticationMeshPolicy,
ServiceRole,
ServiceRoleBinding,
RbacConfig,
ClusterRbacConfig,
}
其中 crd.NewController 定义在 istio.io/istio/pilot/pkg/config/kube/crd/controller.go 文件中:
// NewController creates a new Kubernetes controller for CRDs
// Use "" for namespace to listen for all namespace changes
func NewController(client *Client, options kube.ControllerOptions) model.ConfigStoreCache {
log.Infof("CRD controller watching namespaces %q", options.WatchedNamespace)
// Queue requires a time duration for a retry delay after a handler error
out := &controller{
client: client,
queue: kube.NewQueue(1 * time.Second),
kinds: make(map[string]cacheHandler),
}
// add stores for CRD kinds
for _, schema := range client.ConfigDescriptor() {
out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod)
}
return out
}
对于 CRD 的监视,每一类资源需要启动一个单独的客户端连接,目前 CRD 的 Group 主要有 network/config/authentication/rbac 等;
// controller is a collection of synchronized resource watchers.
// Caches are thread-safe
type controller struct {
client *Client // 每类资源一个连接
queue kube.Queue
kinds map[string]cacheHandler
}
type cacheHandler struct {
informer cache.SharedIndexInformer
handler *kube.ChainHandler
}
// Client is a basic REST client for CRDs implementing config store
type Client struct {
// Map of apiVersion to restClient.
clientset map[string]*restClient
// domainSuffix for the config metadata
domainSuffix string
}
ServiceController
istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go
结构定义如下:
type Controller struct {
domainSuffix string
client kubernetes.Interface
queue Queue
services cacheHandler
endpoints cacheHandler
nodes cacheHandler
pods *PodCache
//....
}
NewController 函数定义如下:
// NewController creates a new Kubernetes controller
// Created by bootstrap and multicluster (see secretcontroler).
func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
options.WatchedNamespace, options.ResyncPeriod)
// Queue requires a time duration for a retry delay after a handler error
out := &Controller{
domainSuffix: options.DomainSuffix,
client: client,
queue: NewQueue(1 * time.Second),
ClusterID: options.ClusterID,
XDSUpdater: options.XDSUpdater,
servicesMap: make(map[model.Hostname]*model.Service),
externalNameSvcInstanceMap: make(map[model.Hostname][]*model.ServiceInstance),
}
sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
svcInformer := sharedInformers.Core().V1().Services().Informer()
out.services = out.createCacheHandler(svcInformer, "Services")
epInformer := sharedInformers.Core().V1().Endpoints().Informer()
out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")
nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
out.nodes = out.createCacheHandler(nodeInformer, "Nodes")
podInformer := sharedInformers.Core().V1().Pods().Informer()
out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)
return out
}
从上面代码可以很清晰看到,ServiceController 监听特定 namespace 下的以下资源:
- Services
- Endpoints
- Nodes
- Pod
ServiceDiscovery
istio.io/istio/pilot/pkg/bootstrap/server.go
func (s *Server) initDiscoveryService(args *PilotArgs) error {
// 需要注意 env 保存了后面使用的变量包括
// s.istioConfigStore s.ServiceController s.ServiceController
environment := &model.Environment{
Mesh: s.mesh,
MeshNetworks: s.meshNetworks,
IstioConfigStore: s.istioConfigStore, // istio routing rules
ServiceDiscovery: s.ServiceController, // service list
ServiceAccounts: s.ServiceController,
MixerSAN: s.mixerSAN,
}
// Set up discovery service
discovery, err := envoy.NewDiscoveryService(
environment,
args.DiscoveryOptions,
)
s.mux = discovery.RestContainer.ServeMux
// 创建 envoyv2.NewDiscoveryServer 对应的 gRPC Server
s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
environment,
istio_networking.NewConfigGenerator(args.Plugins),
s.ServiceController,
s.configController)
s.EnvoyXdsServer.InitDebug(s.mux, s.ServiceController)
// ...
// create grpc/http server
s.initGrpcServer(args.KeepaliveOptions)
s.httpServer = &http.Server{
Addr: args.DiscoveryOptions.HTTPAddr,
Handler: s.mux,
}
// create http listener
listener, err := net.Listen("tcp", args.DiscoveryOptions.HTTPAddr)
s.HTTPListeningAddr = listener.Addr()
// create grpc listener
grpcListener, err := net.Listen("tcp", args.DiscoveryOptions.GrpcAddr)
s.GRPCListeningAddr = grpcListener.Addr()
s.addStartFunc(func(stop <-chan struct{}) error {
// 启动 http server goroutine
// 启动 gRPC server goroutine
// 等待关闭 goroutine
return nil
})
// run secure grpc server
return nil
}
istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go, envoyv2.NewDiscoveryServer 函数定义:
// s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
// environment,
// istio_networking.NewConfigGenerator(args.Plugins),
// s.ServiceController,
// s.configController)
// NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures
func NewDiscoveryServer(env *model.Environment, generator core.ConfigGenerator, ctl model.Controller, configCache model.ConfigStoreCache) *DiscoveryServer {
// 创建 DiscoveryServer 对象
out := &DiscoveryServer{
Env: env,
ConfigGenerator: generator, // generates xDS responses
ConfigController: configCache,
EndpointShardsByService: map[string]*EndpointShards{},
WorkloadsByID: map[string]*Workload{},
edsUpdates: map[string]*EndpointShards{},
concurrentPushLimit: make(chan struct{}, 20),
updateChannel: make(chan *updateReq, 10),
}
env.PushContext = model.NewPushContext()
// 处理相关的更新操作
// handleUpdates处理来自 updateChannel 的事件它确保自上次事件处理之前至少已经过了minQuiet时间。
// 它还确保在接收事件和处理事件之间最多经过 maxDelay。
// 最后调用 doPush 函数进行推送,根据全量推送标记,将最近更新的 eds 相关信息通过
// XDS Incremental Push 或者 全量推送出去
go out.handleUpdates()
// 以下三种清空 DiscoveryServer 的本地缓存,并注册清理缓存的函数
// 1. service 相关的信息有变化的时候,
// 2. jwt public key 发生变化
// 3. Istio CRD 有变化的
// 当以上三种任一情况发生的时候,会设置信息到 out.handleUpdates() 函数
// 周期性更新
go out.periodicRefresh()
// 周期性更新 metrics
go out.periodicRefreshMetrics()
out.DebugConfigs = pilot.DebugConfigs
pushThrottle := intEnv(pilot.PushThrottle, 10)
pushBurst := intEnv(pilot.PushBurst, 100)
adsLog.Infof("Starting ADS server with rateLimiter=%d burst=%d", pushThrottle, pushBurst)
out.rateLimiter = rate.NewLimiter(rate.Limit(pushThrottle), pushBurst)
out.initRateLimiter = rate.NewLimiter(rate.Limit(pushThrottle*2), pushBurst*2)
return out
}
ADS 相关定义:
https://github.com/envoyproxy/data-plane-api/blob/master/envoy/service/discovery/v2/ads.proto
// See https://github.com/lyft/envoy-api#apis for a description of the role of
// ADS and how it is intended to be used by a management server. ADS requests
// have the same structure as their singleton xDS counterparts, but can
// multiplex many resource types on a single stream. The type_url in the
// DiscoveryRequest/DiscoveryResponse provides sufficient information to recover
// the multiplexed singleton APIs at the Envoy instance and management server.
service AggregatedDiscoveryService {
// This is a gRPC-only API.
rpc StreamAggregatedResources(stream envoy.api.v2.DiscoveryRequest)
returns (stream envoy.api.v2.DiscoveryResponse) {
}
rpc IncrementalAggregatedResources(stream envoy.api.v2.IncrementalDiscoveryRequest)
returns (stream envoy.api.v2.IncrementalDiscoveryResponse) {
}
}
https://github.com/envoyproxy/data-plane-api/blob/master/envoy/api/v2/discovery.proto
// A DiscoveryRequest requests a set of versioned resources of the same type for
// a given Envoy node on some API.
message DiscoveryRequest {
// The version_info provided in the request messages will be the version_info
// received with the most recent successfully processed response or empty on
// the first request. It is expected that no new request is sent after a
// response is received until the Envoy instance is ready to ACK/NACK the new
// configuration. ACK/NACK takes place by returning the new API config version
// as applied or the previous API config version respectively. Each type_url
// (see below) has an independent version associated with it.
string version_info = 1;
// The node making the request.
core.Node node = 2;
// List of resources to subscribe to, e.g. list of cluster names or a route
// configuration name. If this is empty, all resources for the API are
// returned. LDS/CDS expect empty resource_names, since this is global
// discovery for the Envoy instance. The LDS and CDS responses will then imply
// a number of resources that need to be fetched via EDS/RDS, which will be
// explicitly enumerated in resource_names.
repeated string resource_names = 3;
// Type of the resource that is being requested, e.g.
// "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment". This is implicit
// in requests made via singleton xDS APIs such as CDS, LDS, etc. but is
// required for ADS.
string type_url = 4;
// nonce corresponding to DiscoveryResponse being ACK/NACKed. See above
// discussion on version_info and the DiscoveryResponse nonce comment. This
// may be empty if no nonce is available, e.g. at startup or for non-stream
// xDS implementations.
string response_nonce = 5;
// This is populated when the previous :ref:`DiscoveryResponse <envoy_api_msg_DiscoveryResponse>`
// failed to update configuration. The *message* field in *error_details* provides the Envoy
// internal exception related to the failure. It is only intended for consumption during manual
// debugging, the string provided is not guaranteed to be stable across Envoy versions.
google.rpc.Status error_detail = 6;
}
message DiscoveryResponse {
// The version of the response data.
string version_info = 1;
// The response resources. These resources are typed and depend on the API being called.
repeated google.protobuf.Any resources = 2 [(gogoproto.nullable) = false];
// [#not-implemented-hide:]
// Canary is used to support two Envoy command line flags:
//
// * --terminate-on-canary-transition-failure. When set, Envoy is able to
// terminate if it detects that configuration is stuck at canary. Consider
// this example sequence of updates:
// - Management server applies a canary config successfully.
// - Management server rolls back to a production config.
// - Envoy rejects the new production config.
// Since there is no sensible way to continue receiving configuration
// updates, Envoy will then terminate and apply production config from a
// clean slate.
// * --dry-run-canary. When set, a canary response will never be applied, only
// validated via a dry run.
bool canary = 3;
// Type URL for resources. This must be consistent with the type_url in the
// Any messages for resources if resources is non-empty. This effectively
// identifies the xDS API when muxing over ADS.
string type_url = 4;
// For gRPC based subscriptions, the nonce provides a way to explicitly ack a
// specific DiscoveryResponse in a following DiscoveryRequest. Additional
// messages may have been sent by Envoy to the management server for the
// previous version on the stream prior to this DiscoveryResponse, that were
// unprocessed at response send time. The nonce allows the management server
// to ignore any further DiscoveryRequests for the previous version until a
// DiscoveryRequest bearing the nonce. The nonce is optional and is not
// required for non-stream based xDS implementations.
string nonce = 5;
// [#not-implemented-hide:]
// The control plane instance that sent the response.
core.ControlPlane control_plane = 6;
}
关于 ADS 保证一致性的内容参见: envoy-的-xds-rest-和-grpc-协议详解 中的 ”最终一致性考虑“ 章节:
一般来说,为避免流量丢弃,更新的顺序应该遵循 make before break 模型,其中
* 必须始终先推送 CDS 更新(如果有)。
* EDS 更新(如果有)必须在相应集群的 CDS 更新后到达。
* LDS 更新必须在相应的 CDS/EDS 更新后到达。
* 与新添加的监听器相关的 RDS 更新必须在最后到达。
* 最后,删除过期的 CDS 集群和相关的 EDS 端点(不再被引用的端点)。
ADS 允许单一管理服务器通过单个 gRPC 流,提供所有的 API 更新。配合仔细规划的更新顺序,ADS 可规避更新过程中流量丢失。
pilot/pkg/proxy/envoy/v2/ads.go
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// ...
// InitContext returns immediately if the context was already initialized.
// InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
// 1. initServiceRegistry
// 2. initVirtualServices
// 3. initDestinationRules
// 4. initAuthorizationPolicies
// 5. InitSidecarScopes
err := s.globalPushContext().InitContext(s.Env)
con := newXdsConnection(peerAddr, stream)
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
// 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
go receiveThread(con, reqChannel, &receiveError)
for {
// Block until either a request is received or a push is triggered.
select {
case discReq, ok := <-reqChannel:
err = s.initConnectionNode(discReq, con)
switch discReq.TypeUrl {
case ClusterType:
// ...
// CDS REQ is the first request an envoy makes. This shows up
// immediately after connect. It is followed by EDS REQ as
// soon as the CDS push is returned.
adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
con.CDSWatch = true
err := s.pushCds(con, s.globalPushContext(), versionInfo())
case ListenerType:
// ...
adsLog.Debugf("ADS:LDS: REQ %s %v", con.ConID, peerAddr)
con.LDSWatch = true
err := s.pushLds(con, s.globalPushContext(), true, versionInfo())
case RouteType:
// ...
adsLog.Debugf("ADS:RDS: REQ %s %s routes: %d", peerAddr, con.ConID, len(con.Routes))
err := s.pushRoute(con, s.globalPushContext())
case EndpointType:
// ...
// 各种错误处理
for _, cn := range con.Clusters {
s.removeEdsCon(cn, con.ConID, con)
}
for _, cn := range clusters {
s.addEdsCon(cn, con.ConID, con)
}
con.Clusters = clusters
adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters))
err := s.pushEds(s.globalPushContext(), con, true, nil)
if err != nil {
return err
}
default:
adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
}
// ...
case pushEv := <-con.pushChannel:
// ...
err := s.pushConnection(con, pushEv)
if err != nil {
return nil
}
}
}
}
istio.io/istio/pilot/pkg/model/push_context.go
// PushContext tracks the status of a push - metrics and errors.
// Metrics are reset after a push - at the beginning all
// values are zero, and when push completes the status is reset.
// The struct is exposed in a debug endpoint - fields public to allow
// easy serialization as json.
type PushContext struct {
// privateServices are reachable within the same namespace.
privateServicesByNamespace map[string][]*Service
// publicServices are services reachable within the mesh.
publicServices []*Service
privateVirtualServicesByNamespace map[string][]Config
publicVirtualServices []Config
// destination rules are of three types:
// namespaceLocalDestRules: all public/private dest rules pertaining to a service defined in a given namespace
// namespaceExportedDestRules: all public dest rules pertaining to a service defined in a namespace
// allExportedDestRules: all (public) dest rules across all namespaces
// We need the allExportedDestRules in addition to namespaceExportedDestRules because we select
// the dest rule based on the most specific host match, and not just any destination rule
namespaceLocalDestRules map[string]*processedDestRules
namespaceExportedDestRules map[string]*processedDestRules
allExportedDestRules *processedDestRules
// sidecars for each namespace
sidecarsByNamespace map[string][]*SidecarScope
////////// END ////////
// The following data is either a global index or used in the inbound path.
// Namespace specific views do not apply here.
// ServiceByHostname has all services, indexed by hostname.
ServiceByHostname map[Hostname]*Service `json:"-"`
// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there
// are no authorization policies in the cluster.
AuthzPolicies *AuthorizationPolicies `json:"-"`
// ServicePort2Name is used to keep track of service name and port mapping.
// This is needed because ADS names use port numbers, while endpoints use
// port names. The key is the service name. If a service or port are not found,
// the endpoint needs to be re-evaluated later (eventual consistency)
ServicePort2Name map[string]PortList `json:"-"`
initDone bool
}
// InitContext will initialize the data structures used for code generation.
// This should be called before starting the push, from the thread creating
// the push context.
func (ps *PushContext) InitContext(env *Environment) error {
ps.Mutex.Lock()
defer ps.Mutex.Unlock()
if ps.initDone {
return nil
}
ps.Env = env
var err error
// Caches list of services in the registry, and creates a map
// of hostname to service -> ServicePort2Name map[string]PortList `json:"-"`
ps.initServiceRegistry(env)
// Caches list of virtual services -> publicVirtualServices []Config
ps.initVirtualServices(env)
// Split out of DestinationRule expensive conversions - once per push.
// 最后保存到以下三个变量中:
// * namespaceLocalDestRules map[string]*processedDestRules
// * namespaceExportedDestRules map[string]*processedDestRules
// * allExportedDestRules *processedDestRules
ps.initDestinationRules(env)
// Get the ClusterRbacConfig -> AuthzPolicies *AuthorizationPolicies
ps.initAuthorizationPolicies(env)
// Must be initialized in the end -> sidecarsByNamespace map[string][]*SidecarScope
ps.InitSidecarScopes(env)
ps.initDone = true
return nil
}
CDS 为 ADS 中第一个发送的信息,后续我们以 CDS 为例进行详细分析
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// ...
// InitContext returns immediately if the context was already initialized.
// InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
// 1. initServiceRegistry
// 2. initVirtualServices
// 3. initDestinationRules
// 4. initAuthorizationPolicies
// 5. InitSidecarScopes
err := s.globalPushContext().InitContext(s.Env)
con := newXdsConnection(peerAddr, stream)
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
// 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
go receiveThread(con, reqChannel, &receiveError)
for {
// Block until either a request is received or a push is triggered.
select {
case discReq, ok := <-reqChannel:
// 主要是调用 `ParseServiceNodeWithMetadata` 函数,
// 从 Req 的消息中获取到各种信息,并生产 `Proxy` 对象
// 当前 discReq.Node.Id 格式为 Type~IPAddress~ID~Domain
err = s.initConnectionNode(discReq, con)
switch discReq.TypeUrl {
case ClusterType:
// 如果已经发送过 CS 数据后的响应消息的处理
if con.CDSWatch {
// ...
}
// CDS REQ is the first request an envoy makes. This shows up
// immediately after connect. It is followed by EDS REQ as
// soon as the CDS push is returned.
adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
con.CDSWatch = true
err := s.pushCds(con, s.globalPushContext(), versionInfo())
if err != nil {
return err
}
//...
}
Identifies a specific Envoy instance. The node identifier is presented to the management server, which may use this identifier to distinguish per Envoy configuration for serving.{
"id": "...",
"cluster": "...",
"metadata": "{...}",
"locality": "{...}",
"build_version": "..."
}
id
(string) An opaque node identifier for the Envoy node. This also provides the local service node name. It should be set if any of the following features are used: statsd, CDS, and HTTP tracing, either in this message or via --service-node.
cluster
(string) Defines the local service cluster name where Envoy is running. Though optional, it should be set if any of the following features are used: statsd, health check cluster verification, runtime override directory, user agent addition, HTTP global rate limiting, CDS, and HTTP tracing, either in this message or via --service-cluster.
metadata
(Struct) Opaque metadata extending the node identifier. Envoy will pass this directly to the management server.
locality
(core.Locality) Locality specifying where the Envoy instance is running.
build_version
(string) This is motivated by informing a management server during canary which version of Envoy is being tested in a heterogeneous fleet. This will be set by Envoy in management server RPCs.
在 initConnectionNode 函数中,主要是调用 ParseServiceNodeWithMetadata 函数,从 Req 的消息中获取到各种信息,并生产 Proxy 对象;
istio.io/istio/pilot/pkg/model/context.go
func ParseServiceNodeWithMetadata(s string, metadata map[string]string) (*Proxy, error) {
}
// Proxy contains information about an specific instance of a proxy (envoy sidecar, gateway,
// etc). The Proxy is initialized when a sidecar connects to Pilot, and populated from
// 'node' info in the protocol as well as data extracted from registries.
//
// In current Istio implementation nodes use a 4-parts '~' delimited ID.
// Type~IPAddress~ID~Domain
type Proxy struct {
// ClusterID specifies the cluster where the proxy resides.
// TODO: clarify if this is needed in the new 'network' model, likely needs to
// be renamed to 'network'
ClusterID string
// Type specifies the node type. First part of the ID.
Type NodeType
// IPAddresses is the IP addresses of the proxy used to identify it and its
// co-located service instances. Example: "10.60.1.6". In some cases, the host
// where the poxy and service instances reside may have more than one IP address
IPAddresses []string
// ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and
// namespace.
ID string
// Locality is the location of where Envoy proxy runs.
Locality Locality
// DNSDomain defines the DNS domain suffix for short hostnames (e.g.
// "default.svc.cluster.local")
DNSDomain string
// TrustDomain defines the trust domain of the certificate
TrustDomain string
// ConfigNamespace defines the namespace where this proxy resides
// for the purposes of network scoping.
// NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES
ConfigNamespace string
// Metadata key-value pairs extending the Node identifier
Metadata map[string]string
// the sidecarScope associated with the proxy
SidecarScope *SidecarScope
}
推送 CDS 的核心实现通过函数 s.pushCds(con, s.globalPushContext(), versionInfo())
istio.io/istio/pilot/pkg/proxy/envoy/v2/cds.go
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
// 通过当前的 con.modelNode 和 push 的上下文生成对应的 rawClusters 对象 []*xdsapi.Cluster
rawClusters, err := s.generateRawClusters(con.modelNode, push)
response := con.clusters(rawClusters)
err = con.send(response)
return nil
}
因此 s.generateRawClusters(con.modelNode, push) 的作用不言而喻,就是将 push 上下文中与 CDS 相关的数据整理并封装成 CDS Reponse 的格式。
func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) ([]*xdsapi.Cluster, error) {
rawClusters, err := s.ConfigGenerator.BuildClusters(s.Env, node, push)
// 对 rawClusters 中的信息进行 Validate 验证
return rawClusters, nil
}
对象 rawClusters 是通过 s.ConfigGenerator.BuildClusters 函数基于 s.Env, node, push 三者的信息组合而生成出来的:
- s.Env 保存了 ServiceController 和 ConfigController 等资源的本地缓存信息
-
node 为本次 Req 请求中生成的包含 Node 相关信息的对象
-
push 为本次推送的上下文,已经将本次推送过程中需要的信息完成了初步的整理(从 s.Env 中生成出来的)
istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go
BuildClusters 函数的实现如下:
// BuildClusters returns the list of clusters for the given proxy. This is the CDS output
// For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname
// Cluster type based on resolution
// For inbound (sidecar only): Cluster for each inbound endpoint port and for each service port
func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) {
clusters := make([]*apiv2.Cluster, 0)
switch proxy.Type {
case model.SidecarProxy:
// GetProxyServiceInstances returns service instances co-located with the proxy
// 获取与 proxy 所在主机上的 service instance,包括 headless 服务
instances, err := env.GetProxyServiceInstances(proxy)
sidecarScope := proxy.SidecarScope
recomputeOutboundClusters := true
// 追加 OutboundClusters
if recomputeOutboundClusters {
clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...)
}
// Let ServiceDiscovery decide which IP and Port are used for management if
// there are multiple IPs
managementPorts := make([]*model.Port, 0)
for _, ip := range proxy.IPAddresses {
managementPorts = append(managementPorts, env.ManagementPorts(ip)...)
}
// 追加与 proxy ip 上 managementPorts 相关的 InboundClusters
clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...)
default: // Gateways
// ...
}
// Add a blackhole and passthrough cluster for catching traffic to unresolved routes
// DO NOT CALL PLUGINS for these two clusters.
clusters = append(clusters, buildBlackHoleCluster())
clusters = append(clusters, buildDefaultPassthroughCluster())
// resolves cluster name conflicts.
return normalizeClusters(push, proxy, clusters), nil
}
临时的调试方法:
istio.io/istio/pilot/pkg/proxy/envoy/v2/debug.go
// InitDebug initializes the debug handlers and adds a debug in-memory registry.
func (s *DiscoveryServer) InitDebug(mux *http.ServeMux, sctl *aggregate.Controller) {
// For debugging and load testing v2 we add an memory registry.
s.MemRegistry = NewMemServiceDiscovery(
map[model.Hostname]*model.Service{ // mock.HelloService.Hostname: mock.HelloService,
}, 2)
s.MemRegistry.EDSUpdater = s
s.MemRegistry.ClusterID = "v2-debug"
sctl.AddRegistry(aggregate.Registry{
ClusterID: "v2-debug",
Name: serviceregistry.ServiceRegistry("memAdapter"),
ServiceDiscovery: s.MemRegistry,
ServiceAccounts: s.MemRegistry,
Controller: s.MemRegistry.controller,
})
mux.HandleFunc("/ready", s.ready)
mux.HandleFunc("/debug/edsz", s.edsz)
mux.HandleFunc("/debug/adsz", s.adsz)
mux.HandleFunc("/debug/cdsz", cdsz)
mux.HandleFunc("/debug/syncz", Syncz)
mux.HandleFunc("/debug/registryz", s.registryz)
mux.HandleFunc("/debug/endpointz", s.endpointz)
mux.HandleFunc("/debug/endpointShardz", s.endpointShardz)
mux.HandleFunc("/debug/workloadz", s.workloadz)
mux.HandleFunc("/debug/configz", s.configz)
mux.HandleFunc("/debug/authenticationz", s.authenticationz)
mux.HandleFunc("/debug/config_dump", s.ConfigDump)
mux.HandleFunc("/debug/push_status", s.PushStatusHandler)
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。