kubernetesexec源码简析
1 概述:
1.1 环境
版本信息如下:
a、操作系统:centos 7.6
c、kubernetes版本:v1.15.0
1.2 exec原理概述
为进⼊⽬标pod的⽬标容器中执⾏命令(挂载标准输⼊和输出、标准错误的情景),kubectl exec访问kube-apiserver的connect接⼝(中间过程是通过http协议来握⼿,之后升级为spdy协议),kube-apiserver把请求转发⾄对应节点的kubelet进程,⽽kubelet进程此时是⼀个反向代理,再把请求转发⾄cri shim程序(kubelet进程中实现了docker shim),cri shim程序再调⽤容器运⾏时的exec接⼝。当cri是dockert,kubelet进程和docker shim可通过localhost⽹卡来通信。
补充说明:
1)v1.18版本开始,必须是通过kubelet代理客户端的streaming请求。
2 源码简析:
2.1 kube-apiserver侧
kube-apiserver是⽆状态的web服务,exec接⼝的注册:
// 注册http handler,重点看restfulConnectResource(...)。
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
/*
其他代码
*/
switch action.Verb {
case "CONNECT":
for _, method := range connecter.ConnectMethods() {
connectProducedObject := storageMeta.ProducesObject(method)
if connectProducedObject == nil {
connectProducedObject = "string"
}
doc := "connect " + method + " requests to " + kind
if isSubresource {
doc = "connect " + method + " requests to " + subresource + " of " + kind
}
// http handler,主要是restfulConnectResource(...)
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, request
Scope, metrics.APIServerComponent, restfulConne ctResource(connecter, reqScope, admit, path, isSubresource))
// 创建http路由
route := ws.Method(method).Path(action.Path).
To(handler).
Doc(doc).
Operation("connect" + strings.Title(strings.ToLower(method)) + namespaced + kind + strings.Title(subresource) + operationSuffix).
Produces("*/*").
Consumes("*/*").
Writes(connectProducedObject)
// 新增的http路由放⼊切⽚
routes = append(routes, route)
}
}
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group:  reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind:    reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb)
// 把http路由注册到ws对象
ws.Route(route)
}
/*
其他代码
*/
}
func restfulConnectResource(connecter rest.Connecter, scope handlers.RequestScope, admit admission.Interface, restPath string, isSubresource bool) re stful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.ConnectResource(connecter, &scope, admit, restPath, isSubresource)(res.ResponseWriter, req.Request)
}
}
func ConnectResource(connecter rest.Connecter, scope *RequestScope, admit admission.Interface, restPath string, isSubresource bool) http.HandlerFunc  {
return func(w http.ResponseWriter, req *http.Request) {
/*
其他代码
*/
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
// connecter对象的类型是ExecREST
// 调⽤ExecREST结构体的Connect(...)⽅法来获得⼀个http handler
handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
if err != nil {
<(err, w, req)
return
}
// 处理kubectl exec的请求
handler.ServeHTTP(w, req)
})
}
}
// 为pod exec这个情景,返回⼀个 http handler
// 核⼼逻辑是到正确的后端服务(⽬标kubelet的ip地址、端⼝、uri等),进⾏反向代理。
func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
execOpts, ok := opts.(*api.PodExecOptions)
if !ok {
return nil, fmt.Errorf("invalid options object: %#v", opts)
}
// location是⽬标kubelet的http url
location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts)
if err != nil {
return nil, err
}
// kube-apiserver此时是⼀个反向代理,访问的是⽬标kubelet的接⼝,然后进⾏流拷贝。
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}
route add命令实例
// 调⽤proxy.NewUpgradeAwareHandler(...)创建了类型为*proxy.UpgradeAwareHandler的对象,并返回
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired, interceptRedirects bool, r esponder rest.Responder) *proxy.UpgradeAwareHandler {
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
handler.InterceptRedirects = interceptRedirects && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects)
handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects)
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
return handler
}
2.2 kubelet侧
kubelet除了启动很多协程做for循环,还会启动web服务,web服务中就包含了/exec接⼝。
// 为web server注册⽤于调试容器的接⼝,例如/exec、/log等。
// /exec接⼝的处理⽅法是getExec(...)
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
klog.Infof("Adding debug handlers to kubelet server.")
ws := new(restful.WebService)
/*
其他代码
*/
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
Exec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
Exec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
Exec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
Exec).
Operation("getExec"))
/*
其他代码
*/
if criHandler != nil {
}
}
kubelet是⼀个反向代理,代理来⾃kube-apiserver的streaming请求
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
params := getExecRequestParams(request)
streamOpts, err := remotecommandserver.NewOptions(request.Request)
/*
检查性代码
*/
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
/
*
检查性代码
*/
podFullName := kubecontainer.GetPodFullName(pod)
// url其实所以⼀个127.0.0.1:端⼝/exec/{token},这是docker shim的接⼝
url, err := s.host.GetExec(podFullName, params.podUID, ainerName, d, *streamOpts)
/*
检查性代码
*/
// 让客户端重定向⾄url
directContainerStreaming {
http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
return
}
// 此时kubelet作为⼀个反向代理(可看成nginx或traefik),url是反向代理背后的⼀个具体的服务实例
// v1.18版本开始,代码是⼀定到达此处,必须是通过kubelet代理客户端的streaming请求
proxyStream(response.ResponseWriter, request.Request, url)
}
// 和kube-apiserver⼀样的套路,还是获得UpgradeAwareHandler结构体对象,然后调⽤ServeHTTP(w, r)
func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{}) handler.ServeHTTP(w, r)
}
2.3 UpgradeAwareHandler
这是⼀个⼯具包下的⼀个结构体,专门作为反向代理。