API Server源码分析之入口点解析
作者:阳明 2023-03-17 07:53:20
云计算
云原生 从本文开始,我们将对 K8s API Server 的代码进行详细分析,并探讨其应用入口点、框架以及与 etcd 的通信。
Kubernetes(K8s)集群中最关键的组件之一是 API Server,它是所有集群管理活动的入口点。从本文开始,我们将对 K8s API Server 的代码进行详细分析,并探讨其应用入口点、框架以及与 etcd 的通信。
应用入口点
K8s API Server 的主要入口点位于 cmd/kube-apiserver/apiserver.go 文件的。
// cmd/kube-apiserver/apiserver.go
// apiserver is the main api server and master for the cluster.
// it is responsible for serving the cluster management API.
packagemain
import (
"os"
"k8s.io/component-base/cli"
_"k8s.io/component-base/logs/json/register"// 用于JSON日志格式注册
_"k8s.io/component-base/metrics/prometheus/clientgo"// 加载所有的 prometheus client-go 插件
_"k8s.io/component-base/metrics/prometheus/version"// 用于版本指标注册
"k8s.io/kubernetes/cmd/kube-apiserver/app"
)
funcmain() {
command :=app.NewAPIServerCommand()
code :=cli.Run(command)
os.Exit(code)
}
其中的 app.NewAPIServerCommand() 是构建的一个 cobra 的命令对象,cli.Run 然后执行该命令即可,所以我们直接查看 NewAPIServerCommand 函数是如果构造 cobra.Command 对象的:
// cmd/kube-apiserver/app/server.go
// NewAPIServerCommand 使用默认参数创建一个 *cobra.Command 对象
funcNewAPIServerCommand() *cobra.Command {
// NewServerRunOptions 使用默认参数创建一个新的 ServerRunOptions 对象。
// ServerRunOption 对象是运行 apiserver 需要的对象
s :=options.NewServerRunOptions()
cmd :=&cobra.Command{
Use: "kube-apiserver",
Long: `TheKubernetesAPIservervalidatesandconfiguresdata
fortheapiobjectswhichincludepods, services, replicationcontrollers, and
others. TheAPIServerservicesRESToperationsandprovidesthefrontendtothe
cluster's shared state through which all other components interact.`,
// ......
RunE: func(cmd*cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs :=cmd.Flags()
iferr :=s.Logs.ValidateAndApply(); err!=nil {
returnerr
}
cliflag.PrintFlags(fs)
err :=checkNonZeroInsecurePort(fs)
iferr!=nil {
returnerr
}
// 设置默认选项
completedOptions, err :=Complete(s)
iferr!=nil {
returnerr
}
// 校验选项
iferrs :=completedOptions.Validate(); len(errs) !=0 {
returnutilerrors.NewAggregate(errs)
}
returnRun(completedOptions, genericapiserver.SetupSignalHandler())
},
}
// ......
returncmd
}
该函数最核心的功能就是使用 Complete(s) 函数来生成 apiserver 启动需要的默认参数,然后将默认参数传递给 Run 函数进行启动。
// cmd/kube-apiserver/app/server.go
// Run 运行指定的 APIServer,不能退出.
funcRun(completeOptionscompletedServerRunOptions, stopCh<-chanstruct{}) error {
// 创建服务链(包含的3个server组件)
server, err :=CreateServerChain(completeOptions, stopCh)
// 服务启动前的准备工作,包括健康检查、存活检查、OpenAPI路由注册等
prepared, err :=server.PrepareRun()
// 正式启动运行
returnprepared.Run(stopCh)
}
在 Run 函数中首先会通过 CreateServerChain 函数通过委托创建连接的 APIServer 对象。
// cmd/kube-apiserver/app/server.go
// CreateServerChain 通过委托创建连接的APIServer
funcCreateServerChain(completedOptionscompletedServerRunOptions, stopCh<-chanstruct{}) (*aggregatorapiserver.APIAggregator, error) {
// CreateKubeAPIServerConfig 创建用于运行 APIServer 的所有配置资源,但不运行任何资源
kubeAPIServerConfig, serviceResolver, pluginInitializer, err :=CreateKubeAPIServerConfig(completedOptions)
// // 创建 APIExtensionsServer 配置
apiExtensionsConfig, err :=createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
// 创建APIExtensionsServer并注册路由
apiExtensionsServer, err :=createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
// 创建KubeAPIServer并注册路由
kubeAPIServer, err :=CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
// // 创建 aggregatorServer 配置
aggregatorConfig, err :=createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
// 创建aggregatorServer并注册路由
aggregatorServer, err :=createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
returnaggregatorServer, nil
}
上面的函数中可以看到 CreateServerChain 会创建3个 server:APIExtensionServer、KubeAPIServer、AggregratorServer,APIServer 就是依靠这3个组件来对不同类型的请求进行处理的:
- APIExtensionServer: 主要负责处理 CustomResourceDefinition(CRD)方面的请求。
- KubeAPIServer: 主要负责处理 K8s 内置资源的请求,此外还会包括通用处理、认证、鉴权等。
- AggregratorServer: 主要负责聚合器方面的处理,它充当一个代理服务器,将请求转发到聚合进来的 K8s service 中。
创建每个 server 都有对应的 config,可以看出上面函数中的 apiExtensionServer 和 aggregatorServer 的 Config 需要依赖 kubeAPIServerConfig,而这几个 ServerConfig 都需要依赖 GenericConfig,CreateKubeAPIServerConfig 函数创建 kubeAPIServerConfig ,在该函数中通过调用 buildGenericConfig 来创建 GenericConfig 对象,如下代码所示。
// cmd/kube-apiserver/app/server.go
// CreateKubeAPIServerConfig 创建用于运行 APIServer 的所有配置资源
funcCreateKubeAPIServerConfig(scompletedServerRunOptions) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
proxyTransport :=CreateProxyTransport()
// 构建通用配置
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err :=buildGenericConfig(s.ServerRunOptions, proxyTransport)
// ......
config :=&controlplane.Config{
GenericConfig: genericConfig,
ExtraConfig: controlplane.ExtraConfig{
APIResourceConfigSource: storageFactory.APIResourceConfigSource,
StorageFactory: storageFactory,
EventTTL: s.EventTTL,
KubeletClientConfig: s.KubeletConfig,
EnableLogsSupport: s.EnableLogsHandler,
ProxyTransport: proxyTransport,
ServiceIPRange: s.PrimaryServiceClusterIPRange,
APIServerServiceIP: s.APIServerServiceIP,
SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,
APIServerServicePort: 443,
ServiceNodePortRange: s.ServiceNodePortRange,
KubernetesServiceNodePort: s.KubernetesServiceNodePort,
EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),
MasterCount: s.MasterCount,
ServiceAccountIssuer: s.ServiceAccountIssuer,
ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,
ExtendExpiration: s.Authentication.ServiceAccounts.ExtendExpiration,
VersionedInformers: versionedInformers,
IdentityLeaseDurationSeconds: s.IdentityLeaseDurationSeconds,
IdentityLeaseRenewIntervalSeconds: s.IdentityLeaseRenewIntervalSeconds,
},
}
// ......
returnconfig, serviceResolver, pluginInitializers, nil
}
funcbuildGenericConfig(
s*options.ServerRunOptions,
proxyTransport*http.Transport,
)(...){
//创建一个通用配置对象
genericConfig=genericapiserver.NewConfig(legacyscheme.Codecs)
// ......
//创建认证实例
iflastErr=s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr!=nil {
return
}
// ...
// openapi/swagger配置,OpenAPIConfig 用于生成 OpenAPI 规范
getOpenAPIDefinitions :=openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
genericConfig.OpenAPIConfig=genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
genericConfig.OpenAPIConfig.Info.Title="Kubernetes"
genericConfig.LongRunningFunc=filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
// storageFactoryConfig 对象定义了 kube-apiserver 与 etcd 的交互方式,如:etcd认证、地址、存储前缀等
// 该对象也定义了资源存储方式,如:资源信息、资源编码信息、资源状态等
storageFactoryConfig :=kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig=genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err :=storageFactoryConfig.Complete(s.Etcd)
storageFactory, lastErr=completedStorageFactoryConfig.New()
iflastErr=s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr!=nil {
return
}
// ......
// 初始化 SharedInformerFactory
kubeClientConfig :=genericConfig.LoopbackClientConfig
clientgoExternalClient, err :=clientgoclientset.NewForConfig(kubeClientConfig)
versionedInformers=clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// 认证配置,内部调用 authenticatorConfig.New()
// K8s提供了9种认证机制,每种认证机制被实例化后都成为认证器
iflastErr=s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr!=nil {
return
}
// 创建鉴权实例,K8s也提供了6种授权机制,每种授权机制被实例化后都成为授权器
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err=BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
// ...
// 审计
lastErr=s.Audit.ApplyTo(genericConfig)
// 准入控制器
// k8s资源在认证和授权通过,被持久化到etcd之前进入准入控制逻辑
// 准入控制包括:对请求的资源进行自定义操作(校验、修改、拒绝)
// 准入控制器通过 Plugins 数据结构统一注册、存放、管理
admissionConfig :=&kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
serviceResolver=buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
pluginInitializers, admissionPostStartHook, err=admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider)
err=s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
feature.DefaultFeatureGate,
pluginInitializers...)
// ...
}
然后我们再来分别看看这3个 Server 是如何构建的。
go-restful框架
这里我们就不得不先了解下 go-restful 这个框架了,因为 APIServer 就使用的这个框架。下面的代码是 go-restful 官方的一个示例,这个 demo 了解后基本上就知道 go-restful 框架是如何使用的了:
packagemain
import (
"log"
"net/http"
restfulspec"github.com/emicklei/go-restful-openapi/v2"
restful"github.com/emicklei/go-restful/v3"
"github.com/go-openapi/spec"
)
// UserResource is the REST layer to the User domain
typeUserResourcestruct {
// normally one would use DAO (data access object)
usersmap[string]User
}
// WebService creates a new service that can handle REST requests for User resources.
func (uUserResource) WebService() *restful.WebService {
ws :=new(restful.WebService)
ws.
Path("/users").
Consumes(restful.MIME_XML, restful.MIME_JSON).
Produces(restful.MIME_JSON, restful.MIME_XML) // you can specify this per route as well
tags := []string{"users"}
ws.Route(ws.GET("/").To(u.findAllUsers).
// docs
Doc("get all users").
Metadata(restfulspec.KeyOpenAPITags, tags).
Writes([]User{}).
Returns(200, "OK", []User{}))
ws.Route(ws.GET("/{user-id}").To(u.findUser).
// docs
Doc("get a user").
Param(ws.PathParameter("user-id", "identifier of the user").DataType("integer").DefaultValue("1")).
Metadata(restfulspec.KeyOpenAPITags, tags).
Writes(User{}). // on the response
Returns(200, "OK", User{}).
Returns(404, "Not Found", nil))
ws.Route(ws.PUT("/{user-id}").To(u.updateUser).
// docs
Doc("update a user").
Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
Metadata(restfulspec.KeyOpenAPITags, tags).
Reads(User{})) // from the request
ws.Route(ws.PUT("").To(u.createUser).
// docs
Doc("create a user").
Metadata(restfulspec.KeyOpenAPITags, tags).
Reads(User{})) // from the request
ws.Route(ws.DELETE("/{user-id}").To(u.removeUser).
// docs
Doc("delete a user").
Metadata(restfulspec.KeyOpenAPITags, tags).
Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")))
returnws
}
// GET http://localhost:8080/users
//
func (uUserResource) findAllUsers(request*restful.Request, response*restful.Response) {
list := []User{}
for_, each :=rangeu.users {
list=append(list, each)
}
response.WriteEntity(list)
}
// GET http://localhost:8080/users/1
//
func (uUserResource) findUser(request*restful.Request, response*restful.Response) {
id :=request.PathParameter("user-id")
usr :=u.users[id]
iflen(usr.ID) ==0 {
response.WriteErrorString(http.StatusNotFound, "User could not be found.")
} else {
response.WriteEntity(usr)
}
}
// PUT http://localhost:8080/users/1
// <User><Id>1</Id><Name>Melissa Raspberry</Name></User>
//
func (u*UserResource) updateUser(request*restful.Request, response*restful.Response) {
usr :=new(User)
err :=request.ReadEntity(&usr)
iferr==nil {
u.users[usr.ID] =*usr
response.WriteEntity(usr)
} else {
response.WriteError(http.StatusInternalServerError, err)
}
}
// PUT http://localhost:8080/users/1
// <User><Id>1</Id><Name>Melissa</Name></User>
//
func (u*UserResource) createUser(request*restful.Request, response*restful.Response) {
usr :=User{ID: request.PathParameter("user-id")}
err :=request.ReadEntity(&usr)
iferr==nil {
u.users[usr.ID] =usr
response.WriteHeaderAndEntity(http.StatusCreated, usr)
} else {
response.WriteError(http.StatusInternalServerError, err)
}
}
// DELETE http://localhost:8080/users/1
//
func (u*UserResource) removeUser(request*restful.Request, response*restful.Response) {
id :=request.PathParameter("user-id")
delete(u.users, id)
}
funcmain() {
u :=UserResource{map[string]User{}}
restful.DefaultContainer.Add(u.WebService())
config :=restfulspec.Config{
WebServices: restful.RegisteredWebServices(), // you control what services are visible
APIPath: "/apidocs.json",
PostBuildSwaggerObjectHandler: enrichSwaggerObject}
restful.DefaultContainer.Add(restfulspec.NewOpenAPIService(config))
// Optionally, you can install the Swagger Service which provides a nice Web UI on your REST API
// You need to download the Swagger HTML5 assets and change the FilePath location in the config below.
// Open http://localhost:8080/apidocs/?url=http://localhost:8080/apidocs.json
http.Handle("/apidocs/", http.StripPrefix("/apidocs/", http.FileServer(http.Dir("/Users/emicklei/Projects/swagger-ui/dist"))))
log.Printf("start listening on localhost:8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
funcenrichSwaggerObject(swo*spec.Swagger) {
swo.Info=&spec.Info{
InfoProps: spec.InfoProps{
Title: "UserService",
Description: "Resource for managing Users",
Contact: &spec.ContactInfo{
ContactInfoProps: spec.ContactInfoProps{
Name: "john",
Email: "[email protected]",
URL: "http://johndoe.org",
},
},
License: &spec.License{
LicenseProps: spec.LicenseProps{
Name: "MIT",
URL: "http://mit.org",
},
},
Version: "1.0.0",
},
}
swo.Tags= []spec.Tag{spec.Tag{TagProps: spec.TagProps{
Name: "users",
Description: "Managing users"}}}
}
// User is just a sample type
typeUserstruct {
IDstring`json:"id"description:"identifier of the user"`
Namestring`json:"name"description:"name of the user"default:"john"`
Ageint`json:"age"description:"age of the user"default:"21"`
}
这个示例代码,就是使用 go-restful 的核心功能实现了一个简单的 RESTful 的 API,实现了对 User 的增删查改,其中有这么几个核心概念:Container、WebService、Route。
- Container:服务器容器,包含多个 WebService 和一个 http.ServerMux。
- WebService:服务,由多个 Route 组成,一个 WebService 其实代表某一个对象相关的服务,如上例中的 /users,针对该 /users 要实现RESTful API,那么需要向其添加增删查改的路由,即 Route,它是 Route 的集合。
- Route:路由,包含了 url,http 方法,接收和响应的媒体类型以及处理函数。每一个 Route,根据 Method 和 Path,映射到对应的方法中,即是 Method/Path 到 Function 映射关系的抽象,如上例中的 ws.Route(ws.GET(“/{user-id}”).To(u.findUser)),就是针对 /users/{user-id}该路径的GET请求,则被路由到 findUser 方法中进行处理。
- Container 是 WebService 的集合,可以向 Container 中添加多个 WebService,而 Container 因为实现了 ServeHTTP() 方法,其本质上还是一个http Handler,可以直接用在 http Server 中。
Kubernetes 中对 go-restful 的使用比较基础,就使用到了其最基础的路由功能,由于 K8s 有很多内置的资源对象,也包括 CRD 这种自定义资源对象,所以一开始并不是直接将这些资源对应对应的接口硬编码的,而是通过一系列代码动态注册的,所以接下来我们分析的其实就是想办法让 APIServer 能够提供如下所示的路由处理出来:
GET/apis/apps/v1/namespaces/{namespace}/deployments/{name}
POST/apis/apps/v1/namespaces/{namespace}/deployments
GET/apis/apps/v1/namespaces/{namespace}/daemonsets/{name}
POST/apis/apps/v1/namespaces/{namespace}/daemonsets
对 go-restful 有一个基础了解后,后面就可以去了解下这3个 Server 具体是如何实例化的了。
文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/272158.html<

