一、背景
服务发现与注册在分布式系统中是必要的,因为它解决了以下问题:
- 在云原生和微服务架构中,服务的数量和实例的变化是常态。服务发现与注册提供了一种动态的机制,使得服务能够自动注册和注销,并及时通知其他组件进行更新。这样,系统可以适应服务的扩展、收缩和变化。
- 弹性和可靠性:分布式系统中的服务实例可能会发生故障或不可用。服务发现与注册可以监测和探测服务的可用性,并自动将流量导向可用的实例,以实现故障恢复和提高系统的可靠性。
- 负载均衡:服务发现与注册可以根据负载均衡策略将流量分配到可用的服务实例上,以平衡请求的负载。通过合理分配请求,服务发现与注册可以提高系统的性能和吞吐量。
- 透明性和抽象层:服务发现与注册对于服务消费者来说是透明的,消费者不需要知道服务的具体实例位置和数量,只需要通过服务名称来发现和访问服务。这种抽象层可以简化开发和部署,提高系统的可维护性和可扩展性。
- 多环境支持:服务发现与注册可以帮助跨多个环境(例如开发、测试、生产)进行部署和管理服务。它们提供了一种集中式的方式来管理和发现服务,而不需要为每个环境进行硬编码和静态配置。
- 高可用性和容错性:服务发现与注册允许服务部署在多个节点上,并通过负载均衡和故障恢复机制来提供高可用性和容错性。如果某个服务实例不可用,请求可以被重定向到其他可用的实例上,从而保证系统的稳定性和可用性。
总而言之,服务发现与注册是分布式系统中实现弹性、可伸缩和可靠性的关键机制。它们提供了动态的服务管理和发现能力,使得服务能够灵活地进行部署、扩展和调整,并提供了负载均衡、故障恢复和透明访问的功能,为分布式系统的可靠运行提供了基础。
二、Istio服务发现与注册功能
三、Istio服务发现与注册结构
3.1 Kubernetes
istio对原生k8s资源的支持如下表ServiceController:
Istio通过监听k8s的资源,再通过Lister机制将原生资源转换成对应的Istio的资源定义,再通过回调注册事件,过滤器链决定是否推送,调用Handler处理Event,最终通过DiscoveryServer将变动的资源配置以XDS协议下发给Envoy。
Controller结构如下:
type Controller struct {
opts Options
client kubelib.Client
queue queue.Instance
nsInformer cache.SharedIndexInformer
nsLister listerv1.NamespaceLister
serviceInformer informer.FilteredSharedIndexInformer
serviceLister listerv1.ServiceLister
endpoints kubeEndpointsController
//用于监视可从远程群集访问的节点。在多集群(共享控制平面多网络)场景中,入口网关服务可以是nodePort类型。这样,我们就可以用节点ips填充mesh的网关地址。
nodeInformer cache.SharedIndexInformer
nodeLister listerv1.NodeLister
exports serviceExportCache
imports serviceImportCache
pods *PodCache
handlers model.ControllerHandlers
namespaceDiscoveryHandlers []func(ns string, event model.Event)
// 只用于测试
stop chan struct{}
sync.RWMutex
// servicesMap存储hostname=>service,它用于减少convertService调用。
servicesMap map[host.Name]*model.Service
// hostNamesForNamespacedName返回给定服务名称的所有可能的主机名。如果启用了Kubernetes多集群服务(MCS),它将包含常规主机名和MCS主机名(clusterset.local)。否则,将只返回常规主机名。
hostNamesForNamespacedName func(name types.NamespacedName) []host.Name
//servicesForNamespacedName返回给定服务名称的所有服务。如果启用了Kubernetes多集群服务(MCS),它将包含常规服务以及MCS服务(clusterset.local)(如果可用)。否则,将只返回常规服务。
servicesForNamespacedName func(name types.NamespacedName) []*model.Service
// nodeSelectorsForServices存储hostname=>标签选择器,可用于细化服务的节点端口IP集。
nodeSelectorsForServices map[host.Name]labels.Instance
// 节点名称及其地址的映射+标签-这是我们从vm到k8s或跨集群的节点所需要的唯一东西。当节点端口服务通过标签选择特定节点时,我们在这里运行标签选择器,只选择我们需要的节点。此映射中仅包含具有ExternalIP地址的节点!
nodeInfoMap map[string]kubernetesNode
// externalNameSvcInstanceMap存储hostname==>实例,用于存储ExternalName k8s服务的实例
externalNameSvcInstanceMap map[host.Name][]*model.ServiceInstance
// 根据工作负载条目对工作负载实例进行索引
workloadInstancesIndex workloadinstances.Index
multinetwork
// 一旦控制器成功运行,informerInit就会设置为true。这样可以确保在运行之前不会返回HasSynced=true
informerInit *atomic.Bool
// 调用SyncAll时,beginSync设置为true,表示控制器已开始同步资源。
beginSync *atomic.Bool
// initialSync在对所有对象执行初始按顺序处理之后被设置为true。
initialSync *atomic.Bool
}
Namespace
通过DiscoveryNamespacesFilter的过滤器选择istio监控的命名空间,当选择的命名空间事件触发,会触发对应的Service、Pod和Endpoint的create、delete事件处理程序,并更新DiscoveryNamespacesFilter。
Service
众所周知,当我们创建service后,kube-controller中的Endpoint controller会根据其信息创建相应的endpoint,istio就使用这一机制,监听service事件,根据service信息获取所对应的endpoint,然后构建自己的istio-endpoint与model.service然后更新缓存,推送service更新命令。
func (c *Controller) onServiceEvent(curr any, event model.Event) error {
//转换成v1.Service
svc, err := extractService(curr)
if err != nil {
log.Error(err)
return nil
}
log.Debugf("Handle event %s for service %s in namespace %s", event, svc.Name, svc.Namespace)
// 转换成标准的Istio service
svcConv := kube.ConvertService(*svc, c.opts.DomainSuffix, c.Cluster())
switch event {
case model.EventDelete:
c.deleteService(svcConv)
default:
c.addOrUpdateService(svc, svcConv, event, false)
}
return nil
}
func (c *Controller) addOrUpdateService(svc *v1.Service, svcConv *model.Service, event model.Event, updateEDSCache bool) {
needsFullPush := false
//首先,处理指定了外部IP的nodePort网关服务和负载均衡器网关服务
if !svcConv.Attributes.ClusterExternalAddresses.IsEmpty() {
needsFullPush = c.extractGatewaysFromService(svcConv)
} else if isNodePortGatewayService(svc) {
// 我们需要知道哪些服务正在使用节点选择器,因为在节点事件期间,我们必须相应地更新所有节点端口服务。
nodeSelector := getNodeSelectorsForService(svc)
c.Lock()
// 仅当它是nodePort网关服务时添加
c.nodeSelectorsForServices[svcConv.Hostname] = nodeSelector
c.Unlock()
needsFullPush = c.updateServiceNodePortAddresses(svcConv)
}
// 只有在添加更新服务时才需要实例转换
instances := kube.ExternalNameServiceInstances(svc, svcConv)
c.Lock()
c.servicesMap[svcConv.Hostname] = svcConv
if len(instances) > 0 {
c.externalNameSvcInstanceMap[svcConv.Hostname] = instances
}
c.Unlock()
// 这个完全推送需要更新所有端点,尽管我们在服务addupdate上进行了完全推送,因为该完全推送仅针对特定服务触发。
if needsFullPush {
// networks are different, we need to update all eds endpoints
c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: []model.TriggerReason{model.NetworksTrigger}})
}
shard := model.ShardKeyFromRegistry(c)
ns := svcConv.Attributes.Namespace
// 我们还需要在服务更改时进行更新。对于Kubernetes,服务更改将导致Endpoint更新,但工作负载条目也需要更新。
// TODO(nmittler): Build different sets of endpoints for cluster.local and clusterset.local.
if updateEDSCache || features.EnableK8SServiceSelectWorkloadEntries {
//构建IstioEndpoint
endpoints := c.buildEndpointsForService(svcConv, updateEDSCache)
if len(endpoints) > 0 {
c.opts.XDSUpdater.EDSCacheUpdate(shard, string(svcConv.Hostname), ns, endpoints)
}
}
c.opts.XDSUpdater.SvcUpdate(shard, string(svcConv.Hostname), ns, event)
c.handlers.NotifyServiceHandlers(svcConv, event)
}
Endpoint
EndpointMode有仅有的Kubernetes Endpoints和Kubernetes EndpointSlices。
func processEndpointEvent(c *Controller, epc kubeEndpointsController, name string, namespace string, event model.Event, ep any) error {
// 无论是哪种服务,甚至是无头服务,都要更新内部端点缓存。至于网关,集群发现类型是“EDS”,用于无头服务。
updateEDS(c, epc, ep, event)
if features.EnableHeadlessService {
if svc, _ := c.serviceLister.Services(namespace).Get(name); svc != nil {
for _, modelSvc := range c.servicesForNamespacedName(kube.NamespacedNameForK8sObject(svc)) {
// if the service is headless service, trigger a full push.
if svc.Spec.ClusterIP == v1.ClusterIPNone {
c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
Full: true,
// TODO: extend and set service instance type, so no need to re-init push context
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: kind.ServiceEntry,
Name: modelSvc.Hostname.String(),
Namespace: svc.Namespace,
}: {}},
Reason: []model.TriggerReason{model.EndpointUpdate},
})
return nil
}
}
}
}
return nil
}
func updateEDS(c *Controller, epc kubeEndpointsController, ep any, event model.Event) {
namespacedName := epc.getServiceNamespacedName(ep)
log.Debugf("Handle EDS endpoint %s %s in namespace %s", namespacedName.Name, event, namespacedName.Namespace)
var forgottenEndpointsByHost map[host.Name][]*model.IstioEndpoint
if event == model.EventDelete {
forgottenEndpointsByHost = epc.forgetEndpoint(ep)
}
shard := model.ShardKeyFromRegistry(c)
//从name和Namespaced获取hostname
for _, hostName := range c.hostNamesForNamespacedName(namespacedName) {
var endpoints []*model.IstioEndpoint
if forgottenEndpointsByHost != nil {
endpoints = forgottenEndpointsByHost[hostName]
} else {
endpoints = epc.buildIstioEndpoints(ep, hostName)
}
if features.EnableK8SServiceSelectWorkloadEntries {
//通过指定的主机名实现service目录操作
svc := c.GetService(hostName)
if svc != nil {
//用于收集updateEDS调用中的所有工作负载入口端点。
fep := c.collectWorkloadInstanceEndpoints(svc)
endpoints = append(endpoints, fep...)
} else {
log.Debugf("Handle EDS endpoint: skip collecting workload entry endpoints, service %s/%s has not been populated",
namespacedName.Namespace, namespacedName.Name)
}
}
c.opts.XDSUpdater.EDSUpdate(shard, string(hostName), namespacedName.Namespace, endpoints)
}
}
Pod
首先我们需要了解到istio中工作负载之间的通讯其实是sidecar也就是envoy之间的通讯(工作负载数据会发送到envoy中,envoy会根据配置寻找目的IP然后进行转发),POD在创建的时候,内部proxy会注册到istio,istio会将envoy配置发送给proxy,这时候就可以访问其他的工作负载。
//更新基于IP的索引(pc.podsByIP)。
func (pc *PodCache) onEvent(curr any, ev model.Event) error {
// 当pod被删除时,obj可能是v1.pod或DeletionFinalStateUnknown标记项。
pod, ok := curr.(*v1.Pod)
if !ok {
tombstone, ok := curr.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("couldn't get object from tombstone %+v", curr)
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
return fmt.Errorf("tombstone contained object that is not a pod %#v", curr)
}
}
ip := pod.Status.PodIP
// 在刚刚创建pod时,但在通过UpdateStatus分配IP之前,PodIP将为空。
if len(ip) == 0 {
return nil
}
key := kube.KeyFunc(pod.Name, pod.Namespace)
switch ev {
case model.EventAdd:
// 可能发生在istiod刚开始的时候
if pod.DeletionTimestamp != nil || !IsPodReady(pod) {
return nil
} else if shouldPodBeInEndpoints(pod) {
pc.update(ip, key)
} else {
return nil
}
case model.EventUpdate:
if pod.DeletionTimestamp != nil || !IsPodReady(pod) {
// 仅当此pod在缓存中时删除
pc.deleteIP(ip, key)
ev = model.EventDelete
} else if shouldPodBeInEndpoints(pod) {
pc.update(ip, key)
} else {
return nil
}
case model.EventDelete:
// 只有当这个pod在缓存中时才删除,在大多数情况下,它已经在设置了“删除时间戳”的“更新”中被删除。
if !pc.deleteIP(ip, key) {
return nil
}
}
pc.notifyWorkloadHandlers(pod, ev)
return nil
}
notifyWorkloadHandlers将pod包装成workloadInstance,其结构如下:
type WorkloadInstance struct {
Name string `json:"name,omitempty"`
Namespace string `json:"namespace,omitempty"`
// Where the workloadInstance come from, valid values are`Pod` or `WorkloadEntry`
Kind workloadKind `json:"kind"`
Endpoint *IstioEndpoint `json:"endpoint,omitempty"`
PortMap map[string]uint32 `json:"portMap,omitempty"`
// Can only be selected by service entry of DNS type.
DNSServiceEntryOnly bool `json:"dnsServiceEntryOnly,omitempty"`
}
在istio的设计当中,最初的时候开发者使用的pod作为工作负载,注册到中心,但是这样对于外部服务的管理无能为力,所以开发者们设计了workload资源来替代pod,一个workload代表一个工作负载,当我们引入外部服务的时候可以通过添加workload来实现。
workLoadHandler在initializeCluster中注册:
if configStore, err := createWleConfigStore(client, m.revision, options); err == nil {
kubeController.workloadEntryController = serviceentry.NewWorkloadEntryController(
configStore, options.XDSUpdater,
serviceentry.WithClusterID(cluster.ID),
serviceentry.WithNetworkIDCb(kubeRegistry.Network))
// 服务可以从同一集群中选择WorkloadEntry。我们只复制服务来配置kube-dns。
kubeController.workloadEntryController.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)
// ServiceEntry从远程集群中选择WorkloadEntry
kubeController.workloadEntryController.AppendWorkloadHandler(m.serviceEntryController.WorkloadInstanceHandler)
if features.EnableEnhancedResourceScoping.Load() {
kubeRegistry.AppendNamespaceDiscoveryHandlers(kubeController.workloadEntryController.NamespaceDiscoveryHandler)
}
m.opts.MeshServiceController.AddRegistryAndRun(kubeController.workloadEntryController, clusterStopCh)
go configStore.Run(clusterStopCh)
} else {
return fmt.Errorf("failed creating config configStore for cluster %s: %v", cluster.ID, err)
}
serviceEntryController.WorkloadInstanceHandler为其他注册中心生成的服务实例定义处理程序:
func (s *Controller) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) {
log.Debugf("Handle event %s for workload instance (%s/%s) in namespace %s", event,
wi.Kind, wi.Endpoint.Address, wi.Namespace)
key := configKey{
kind: podConfigType,
name: wi.Name,
namespace: wi.Namespace,
}
//用于指示此事件是否是为pod->workloadentry转换而触发的,以及由于workloadentries中没有相关更改而可以忽略该事件
redundantEventForPod := false
var addressToDelete string
s.mutex.Lock()
// 这是来自一个pod。将其存储在单独的映射中,以便refreshIndexes函数可以使用这些索引以及存储索引。
switch event {
case model.EventDelete:
redundantEventForPod = s.workloadInstances.Delete(wi) == nil
default: // add or update
if old := s.workloadInstances.Insert(wi); old != nil {
if old.Endpoint.Address != wi.Endpoint.Address {
addressToDelete = old.Endpoint.Address
}
// 如果多个k8s服务选择同一个pod,或者一个服务有多个端口,我们可能会得到多个事件忽略它们,因为我们只关心Endpoint IP本身。
if model.WorkloadInstancesEqual(old, wi) {
// 忽略更新,因为没有任何更改
redundantEventForPod = true
}
}
}
if redundantEventForPod {
s.mutex.Unlock()
return
}
// 我们将只选择同一命名空间中的条目
cfgs, _ := s.store.List(gvk.ServiceEntry, wi.Namespace)
if len(cfgs) == 0 {
s.mutex.Unlock()
return
}
instances := []*model.ServiceInstance{}
instancesDeleted := []*model.ServiceInstance{}
configsUpdated := map[model.ConfigKey]struct{}{}
fullPush := false
for _, cfg := range cfgs {
se := cfg.Spec.(*networking.ServiceEntry)
if se.WorkloadSelector == nil || !labels.Instance(se.WorkloadSelector.Labels).SubsetOf(wi.Endpoint.Labels) {
//不匹配,跳过这个
continue
}
seNamespacedName := types.NamespacedName{Namespace: cfg.Namespace, Name: cfg.Name}
services := s.services.getServices(seNamespacedName)
instance := convertWorkloadInstanceToServiceInstance(wi, services, se)
instances = append(instances, instance...)
if addressToDelete != "" {
for _, i := range instance {
di := i.DeepCopy()
di.Endpoint.Address = addressToDelete
instancesDeleted = append(instancesDeleted, di)
}
s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key)
} else if event == model.EventDelete {
s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key)
} else {
s.serviceInstances.updateServiceEntryInstancesPerConfig(seNamespacedName, key, instance)
}
// 如果serviceentry的解析是DNS,则进行全面推送
if (se.Resolution == networking.ServiceEntry_DNS || se.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN) &&
se.WorkloadSelector != nil {
fullPush = true
for _, inst := range instance {
configsUpdated[model.ConfigKey{
Kind: kind.ServiceEntry,
Name: string(inst.Service.Hostname),
Namespace: cfg.Namespace,
}] = struct{}{}
}
}
}
if len(instancesDeleted) > 0 {
s.serviceInstances.deleteInstances(key, instancesDeleted)
}
if event == model.EventDelete {
s.serviceInstances.deleteInstances(key, instances)
} else {
s.serviceInstances.updateInstances(key, instances)
}
s.mutex.Unlock()
s.edsUpdate(instances)
// 带有WorkloadEntry的ServiceEntry导致具有硬编码端点的STRICT_DNS集群需要更新CDS以刷新端点
if fullPush {
log.Debugf("Full push triggered during event %s for workload instance (%s/%s) in namespace %s", event,
wi.Kind, wi.Endpoint.Address, wi.Namespace)
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: configsUpdated,
Reason: []model.TriggerReason{model.EndpointUpdate},
}
s.XdsUpdater.ConfigUpdate(pushReq)
}
}
该方法主要是针对workloadInstance的增改事件,将其放入serviceEntryController.workloadInstances的索引缓存中,然后获取serviceEntryController.ServiceEntry来计算选择器是否匹配,如果匹配则进行更新。添加到serviceEntryController的缓存中实现的是自定义服务资源访问内部POD资源。
这里我们需要注意pod的workloadInstance添加到了serviceEntryController的workloadInstances缓存中,并没有添加到处理原生资源的kubeController的workload缓存中,原因是默认开启了serviceEntry标签选择器可以选择POD所以才将workloadInstance添加到该缓存中.而kubeController中并没有使用到POD的。
那么kubeController中的workload缓存又有什么用,之前的serviceEntryController的workload缓存作用是自定义服务访问内部Pod资源,那想要内部服务资源(service)访问外部资源又该如何操作,kubeController中的workload缓存就实现这个功能。当我们创建workload资源后,会存储到此缓存中并根据该信息构建pod结构体(并不添加到k8s中),通过POD结构体获取原生的k8sServices然后根据标签选择器进行推送更新。
func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) {
// 忽略格式错误的工作负载条目。忽略任何没有标签的工作负载条目,因为我们无法选择它们
if si.Namespace == "" || len(si.Endpoint.Labels) == 0 {
return
}
// 这是来自一个工作负载条目。将其存储在单独的索引中,以便InstancesByPort可以使用这些以及k8s pod。
switch event {
case model.EventDelete:
c.workloadInstancesIndex.Delete(si)
default: // add or update
c.workloadInstancesIndex.Insert(si)
}
// 通过标签选择器查找工作负载条目的服务,而不是扫描我们的model.services内部映射,通过k8s api获取服务
dummyPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels},
}
shard := model.ShardKeyFromRegistry(c)
// 找到映射到此工作负载条目的服务,如果服务类型为客户端lb,则启动eds更新
if k8sServices, err := getPodServices(c.serviceLister, dummyPod); err == nil && len(k8sServices) > 0 {
for _, k8sSvc := range k8sServices {
service := c.GetService(kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.opts.DomainSuffix))
// 请注意,这不能是外部服务,因为k8s外部服务没有标签选择器。
if service == nil || service.Resolution != model.ClientSideLB {
// 可能是无头服务
continue
}
// 获取更新的端点列表,其中包括此服务的k8s pod和工作负载条目,然后通知EDS服务器此服务的端点已更改。每个服务端口需要一个端点对象
endpoints := make([]*model.IstioEndpoint, 0)
for _, port := range service.Ports {
if port.Protocol == protocol.UDP {
continue
}
// 类似于eds.go中UpdateServiceShards的代码
instances := c.InstancesByPort(service, port.Port, nil)
for _, inst := range instances {
endpoints = append(endpoints, inst.Endpoint)
}
}
// 更新EDS
c.opts.XDSUpdater.EDSUpdate(shard, string(service.Hostname), service.Attributes.Namespace, endpoints)
}
}
}
3.2 Serviceentry(External)
在 Istio 中,ServiceEntry 是一种资源对象,用于定义对外部服务的访问方式和流量管理策略。ServiceEntry 的作用是允许 Istio 控制平面了解和管理集群外部的服务。
ServiceEntry 的作用包括以下几个方面:
- 对外部服务的访问:ServiceEntry 允许您定义与集群外部的服务进行通信的方式。您可以通过 ServiceEntry 将外部服务暴露给 Istio 网格内的其他服务,并为这些服务提供路由规则和流量管理策略。
- 流量管理和策略控制:通过 ServiceEntry,您可以指定对外部服务的流量管理策略,如负载均衡算法、超时设置、连接池等。这使得 Istio 能够对流经网格的流量进行控制和管理,并实现诸如故障转移、熔断、流量限制等功能。
- 名字解析:ServiceEntry 可以指定外部服务的主机名和 IP 地址,以帮助 Istio 控制平面解析这些服务的地址。这对于与无 DNS 记录的外部服务进行通信非常有用,或者当您希望使用固定 IP 地址来访问特定的外部服务时。
- 安全策略:ServiceEntry 可以与 Istio 的安全功能集成,例如通过定义网格边界流量、配置 TLS 加密、应用策略和访问控制规则等来保护对外部服务的访问。
3.2.1 结构
ServiceEntryController的结构如下:
//控制器与ServiceEntry CRD通信,并监控变更。
type Controller struct {
XdsUpdater model.XDSUpdater
//配置缓存
store model.ConfigStore
clusterID cluster.ID
// 这个锁是为了对以下store进行多重操作。例如,在某些情况下,它要求删除所有实例,然后更新新实例。
mutex sync.RWMutex
//存储来自SE、WLE和pod的所有服务实例
serviceInstances serviceInstancesStore
// NOTE: 历史版本,WorkloadEntry和Pod都有一个索引;小心命名冲突
//表示工作负载条目中的工作负载实例的索引。
workloadInstances workloadinstances.Index
//存储从serviceEntries转换的所有服务
services serviceStore
//为了确保eds更新以串行方式运行以防止过时的更新在并发调用edsUpdate时覆盖新的更新。如果所有线程共享一个锁,那么所有线程的性能都会明显下降。
//Controller运行从该队列里取实例
edsQueue queue.Instance
workloadHandlers []func(*model.WorkloadInstance, model.Event)
// 用于根据工作负载ip和标签获取networkID。
networkIDCallback func(IP string, labels labels.Instance) network.ID
// 指示此控制器是否用于工作负载条目。
workloadEntryController bool
model.NetworkGatewaysHandler
}
3.2.2 运行
AggregateController作为所有服务注册中心的聚合服务,在将ServiceEntry服务添加到其中时,会给Serviceregistry注册一些事件处理函数。
func (c *Controller) addRegistry(registry serviceregistry.Instance, stop <-chan struct{}) {
c.registries = append(c.registries, ®istryEntry{Instance: registry, stop: stop})
// Observe the registry for events.
registry.AppendNetworkGatewayHandler(c.NotifyGatewayHandlers)
registry.AppendServiceHandler(c.handlers.NotifyServiceHandlers)
registry.AppendServiceHandler(func(service *model.Service, event model.Event) {
for _, handlers := range c.getClusterHandlers() {
handlers.NotifyServiceHandlers(service, event)
}
})
}
从edsQueue的队列中取出任务执行,queue中存储的是匿名函数Task,要想知道Task到底是什么,我们要从来源分析
总共有三个方法调用到了ServiceEntryController的queueEdsEvent方法,分别是edsCacheUpdate,edsUpdate,serviceEntryHandler,
而edsCacheUpdate又被workloadEntryHandler调用,edsUpdate被workloadEntryHandler、serviceEntryHandler、WorkloadInstanceHandler这三个方法调用。
经过上述调用链分析,可以知道,ServiceEntryController真正运行的是它注册的各种Handler事件处理器。我们真正关注的是workloadEntryHandler、serviceEntryHandler、workloadInstanceHandler这三个Handler。
serviceEntryHandler
func (s *Controller) serviceEntryHandler(_, curr config.Config, event model.Event) {
log.Debugf("Handle event %s for service entry %s/%s", event, curr.Namespace, curr.Name)
currentServiceEntry := curr.Spec.(*networking.ServiceEntry)
//convertServices将ServiceEntry配置转换为内部服务(Istio service)对象的列表。
cs := convertServices(curr)
configsUpdated := map[model.ConfigKey]struct{}{}
key := types.NamespacedName{Namespace: curr.Namespace, Name: curr.Name}
s.mutex.Lock()
// 如果是add/delete事件,我们应该总是做一个完整的推送。如果是更新事件,我们应该只在服务发生变化时进行完全推送,否则,只推送endpoint更新。
var addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs []*model.Service
switch event {
case model.EventUpdate:
//比对增、删、改、未变化的service
addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs = servicesDiff(s.services.getServices(key), cs)
s.services.updateServices(key, cs)
case model.EventDelete:
deletedSvcs = cs
s.services.deleteServices(key)
case model.EventAdd:
addedSvcs = cs
s.services.updateServices(key, cs)
default:
// this should not happen
unchangedSvcs = cs
}
//根据Service Entry的WorkloadSelector,从workloadinstances的索引找到workloadInstances,把其中的WorkloadInstance转换为ServiceInstance放入到serviceInstances中,然后区分类型serviceEntryConfigType、workloadEntryConfigType、podConfigType放入Map中
serviceInstancesByConfig, serviceInstances := s.buildServiceInstances(curr, cs)
oldInstances := s.serviceInstances.getServiceEntryInstances(key)
for configKey, old := range oldInstances {
s.serviceInstances.deleteInstances(configKey, old)
}
if event == model.EventDelete {
s.serviceInstances.deleteAllServiceEntryInstances(key)
} else {
// 使用新实例更新索引。
for ckey, value := range serviceInstancesByConfig {
s.serviceInstances.addInstances(ckey, value)
}
s.serviceInstances.updateServiceEntryInstances(key, serviceInstancesByConfig)
}
shard := model.ShardKeyFromRegistry(s)
for _, svc := range addedSvcs {
//服务信息更改时来自服务发现的回调。
s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventAdd)
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
for _, svc := range updatedSvcs {
s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventUpdate)
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
//如果删除了服务条目,请调用SvcUpdate来清理服务的endpoints碎片。
for _, svc := range deletedSvcs {
instanceKey := instancesKey{namespace: svc.Attributes.Namespace, hostname: svc.Hostname}
// 同一host的多个service entries可以驻留在同一命名空间中。只有在没有服务实例的情况下才删除endpoints碎片。
if len(s.serviceInstances.getByKey(instanceKey)) == 0 {
s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete)
}
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
// 如果一个服务已更新,并且不是updatedSvcs的一部分,则意味着它的端点可能已更改。如果此服务条目具有带有IP的端点(即resolution为STATIC),则我们会进行EDS更新。如果服务条目有带有fqdn的端点(即解析DNS),那么我们需要进行完全推送(因为fqdn端点通过cd中的strict_DNS集群)。
if len(unchangedSvcs) > 0 {
if currentServiceEntry.Resolution == networking.ServiceEntry_DNS || currentServiceEntry.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN {
for _, svc := range unchangedSvcs {
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
}
}
s.mutex.Unlock()
fullPush := len(configsUpdated) > 0
// 如果不需要完全推送,则至少有一个服务保持不变
if !fullPush {
s.edsUpdate(serviceInstances)
return
}
// 当进行完全推送时,非DNS添加的、更新的、未更改的服务会触发eds更新,从而更新endpoints碎片。
allServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs))
nonDNSServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs))
allServices = append(allServices, addedSvcs...)
allServices = append(allServices, updatedSvcs...)
allServices = append(allServices, unchangedSvcs...)
for _, svc := range allServices {
if !(svc.Resolution == model.DNSLB || svc.Resolution == model.DNSRoundRobinLB) {
nonDNSServices = append(nonDNSServices, svc)
}
}
// 非dns服务实例
keys := map[instancesKey]struct{}{}
for _, svc := range nonDNSServices {
keys[instancesKey{hostname: svc.Hostname, namespace: curr.Namespace}] = struct{}{}
}
//ServiceEntryController的queue中放入Task
s.queueEdsEvent(keys, s.doEdsCacheUpdate)
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: configsUpdated,
Reason: []model.TriggerReason{model.ServiceUpdate},
}
s.XdsUpdater.ConfigUpdate(pushReq)
}
serviceEntryHandler方法主要是将ServiceEntry的外部服务配置转换成Istio网格内部的Service,然后根据Service的变化,分类出增、删、改、未变化的几种类型,针对不同类型更新serviceInstances缓存,调用Discovery服务推送Service变化,变更Service对应的endpoints,调用Discovery服务推送Eds变更。
ServiceEntry的pb结构如下:
type ServiceEntry struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// 与ServiceEntry关联的Host。可以是带有通配符前缀的DNS名称。1.Host字段用于在VirtualServices和DestinationRules中选择匹配的Host。2.对于HTTP流量,HTTP HostAuthority标头将与hosts字段相匹配。3.对于包含服务器名称指示(SNI)的HTTPs或TLS流量,SNI值将与hosts字段相匹配。
//
// 注1:当解析设置为类型DNS且未指定endpoints时,host字段将用作将流量路由到的endpoint的DNS名称。
//
// 注2:如果hostname与另一个服务注册表(如Kubernetes)中的服务名称匹配,该服务注册表也提供自己的一组endpoints,则ServiceEntry将被视为现有Kubernete服务的装饰器。如果适用,service entry中的属性将被添加到Kubernetes服务中。目前,“istiod”只会考虑以下附加属性:
// 1. subjectAltNames:除了验证与服务pod关联的服务帐户的SAN之外,还将验证此处指定的SAN。
Hosts []string `protobuf:"bytes,1,rep,name=hosts,proto3" json:"hosts,omitempty"`
// 与服务关联的虚拟IP地址。可能是CIDR前缀。对于HTTP流量,生成的路由配置将包括“地址”和“主机”字段值的HTTP路由域,目的地将根据HTTP HostAuthority标头进行识别。如果指定了一个或多个IP地址,则如果目标IP与地址字段中指定的IPCIDR匹配,则传入流量将被标识为属于此服务。如果Addresses(地址)字段为空,则将仅根据目标端口识别流量。在这种情况下,访问服务的端口不得与网格中的任何其他服务共享。换句话说,sidecar将充当一个简单的TCP代理,将指定端口上的传入流量转发到指定的目标端点IPhost。此字段不支持Unix域套接字地址。
Addresses []string `protobuf:"bytes,2,rep,name=addresses,proto3" json:"addresses,omitempty"`
// 与外部服务关联的端口。如果端点是Unix域套接字地址,则必须只有一个端口。
Ports []*Port `protobuf:"bytes,3,rep,name=ports,proto3" json:"ports,omitempty"`
// 指定是否应将服务视为网格外部或网格的一部分。
Location ServiceEntry_Location `protobuf:"varint,4,opt,name=location,proto3,enum=istio.networking.v1alpha3.ServiceEntry_Location" json:"location,omitempty"`
// host的服务发现模式。对于没有附带IP地址的TCP端口,将解析模式设置为NONE时必须小心。在这种情况下,将允许到所述端口上的任何IP的流量(即“0.0.0.0:<port>'”)。
Resolution ServiceEntry_Resolution `protobuf:"varint,5,opt,name=resolution,proto3,enum=istio.networking.v1alpha3.ServiceEntry_Resolution" json:"resolution,omitempty"`
// 与服务关联的一个或多个endpoints。只能指定“endpoints”或“workloadSelector”中的一个。
Endpoints []*WorkloadEntry `protobuf:"bytes,6,rep,name=endpoints,proto3" json:"endpoints,omitempty"`
// 仅适用于MESH_INTERNAL服务。只能指定“endpoints”或“workloadSelector”中的一个。根据标签选择一个或多个Kubernetes pod或VM工作负载(使用“WorkloadEntry”指定)。代表虚拟机的“WorkloadEntry”对象应与ServiceEntry在同一命名空间中定义。
WorkloadSelector *WorkloadSelector `protobuf:"bytes,9,opt,name=workload_selector,json=workloadSelector,proto3" json:"workload_selector,omitempty"`
// 此服务导出到的命名空间列表。导出服务允许sidecar、网关和其他命名空间中定义的虚拟服务使用它。此功能为服务所有者和网格管理员提供了一种机制,用于控制跨命名空间边界的服务可见性。如果未指定名称空间,则默认情况下会将服务导出到所有名称空间。值“.”是保留的,并定义了对在其中声明服务的同一命名空间的导出。同样,值“”也是保留的,它定义了对所有命名空间的输出。对于Kubernetes服务,可以通过将注释“networking.istio.ioexportTo”设置为以逗号分隔的命名空间名称列表来实现等效效果。
ExportTo []string `protobuf:"bytes,7,rep,name=export_to,json=exportTo,proto3" json:"export_to,omitempty"`
// 如果指定,代理将验证服务器证书的使用者备用名称是否与指定值之一匹配。注意:当将workloadEntry与workloadSelectors一起使用时,workloadEntry中指定的服务帐户也将用于派生应验证的其他主题备用名称。
SubjectAltNames []string `protobuf:"bytes,8,rep,name=subject_alt_names,json=subjectAltNames,proto3" json:"subject_alt_names,omitempty"`
}
3.3 AggregateController
结构如下:
//聚合不同注册表中的数据并监视更改
type Controller struct {
meshHolder mesh.Holder
// 锁用于保护注册表和控制器的运行状态。
storeLock sync.RWMutex
//服务注册实例列表
registries []*registryEntry
// 指示控制器是否已运行。如果为true,则以后添加的所有注册表都应该手动运行。
running bool
//serviceHandlers、workloadHandlers列表
handlers model.ControllerHandlers
handlersByCluster map[cluster.ID]*model.ControllerHandlers
model.NetworkGatewaysHandler
}
实例化代码如下:
func (s *Server) initServiceControllers(args *PilotArgs) error {
serviceControllers := s.ServiceController()
s.serviceEntryController = serviceentry.NewController(
s.configController, s.XDSServer,
serviceentry.WithClusterID(s.clusterID),
)
//添加外部的Service Entry的外部注册服务
serviceControllers.AddRegistry(s.serviceEntryController)
registered := make(map[provider.ID]bool)
for _, r := range args.RegistryOptions.Registries {
serviceRegistry := provider.ID(r)
if _, exists := registered[serviceRegistry]; exists {
log.Warnf("%s registry specified multiple times.", r)
continue
}
registered[serviceRegistry] = true
log.Infof("Adding %s registry adapter", serviceRegistry)
switch serviceRegistry {
case provider.Kubernetes:
//初始化K8s的服务注册中心、将k8s的资源转换为istio格式的资源文件
if err := s.initKubeRegistry(args); err != nil {
return err
}
default:
return fmt.Errorf("service registry %s is not supported", r)
}
}
s.addStartFunc(func(stop <-chan struct{}) error {
go serviceControllers.Run(stop)
return nil
})
return nil
}