NVIDIA/k8s-device-plugin源码分析


声明:本文转载自https://my.oschina.net/jxcdwangtao/blog/1797047,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

Author: xidianwangtao@gmail.com

k8s-device-plugin内部实现原理图

Kubernetes如何通过Device Plugins来使用NVIDIA GPU中,对NVIDIA/k8s-device-plugin的工作原理进行了深入分析,为了方便我们在这再次贴出其内部实现原理图:

输入图片说明

PreStartContainer和GetDevicePluginOptions两个接口,在NVIDIA/k8s-device-plugin中可以忽略,可以认为是空实现。我们主要关注ListAndWatch和Allocate的实现。

启动

一切从main函数开始!核心的代码如下:

func main() { 	log.Println("Loading NVML") 	if err := nvml.Init(); err != nil { 		select {} 	}     ... 	log.Println("Fetching devices.") 	if len(getDevices()) == 0 { 		select {} 	}  	log.Println("Starting FS watcher.") 	watcher, err := newFSWatcher(pluginapi.DevicePluginPath) 	if err != nil { 		os.Exit(1) 	}     ... 	log.Println("Starting OS watcher.") 	sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)  	restart := true 	var devicePlugin *NvidiaDevicePlugin  L: 	for { 		if restart { 			if devicePlugin != nil { 				devicePlugin.Stop() 			}  			devicePlugin = NewNvidiaDevicePlugin() 			if err := devicePlugin.Serve(); err != nil { 				... 			} else { 				restart = false 			} 		}  		select { 		case event := <-watcher.Events: 			if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create { 				restart = true 			}  		case err := <-watcher.Errors:  		case s := <-sigs: 			switch s { 			case syscall.SIGHUP: 				restart = true 			default: 				devicePlugin.Stop() 				break L 			} 		} 	} } 

相关说明不需多说,请参考下面的流程逻辑图:

输入图片说明

Serve

k8s-device-plugin启动流程中,devicePlugin.Serve负责启动gRPC Server Start对外提供服务,然后把自己注册到kubelet。

// Serve starts the gRPC server and register the device plugin to Kubelet func (m *NvidiaDevicePlugin) Serve() error { 	err := m.Start() 	if err != nil { 		log.Printf("Could not start device plugin: %s", err) 		return err 	} 	log.Println("Starting to serve on", m.socket)  	err = m.Register(pluginapi.KubeletSocket, resourceName) 	if err != nil { 		log.Printf("Could not register device plugin: %s", err) 		m.Stop() 		return err 	} 	log.Println("Registered device plugin with Kubelet")  	return nil }  

Start

Start的代码如下:

// Start starts the gRPC server of the device plugin func (m *NvidiaDevicePlugin) Start() error { 	err := m.cleanup() 	if err != nil { 		return err 	}  	sock, err := net.Listen("unix", m.socket) 	if err != nil { 		return err 	}  	m.server = grpc.NewServer([]grpc.ServerOption{}...) 	pluginapi.RegisterDevicePluginServer(m.server, m)  	go m.server.Serve(sock)  	// Wait for server to start by launching a blocking connexion 	conn, err := dial(m.socket, 5*time.Second) 	if err != nil { 		return err 	} 	conn.Close()  	go m.healthcheck()  	return nil } 

更加深入的代码调用关系,这里不多介绍,直接贴出Start的实现逻辑图:

输入图片说明

Start流程中负责创建nvidia.sock文件。

需要特别说明healthcheck部分:

  • healthcheck启动协程对管理的devices进行健康状态监控,一旦发现有device unhealthy,则发送到NvidiaDevicePlugin的health channel。device plugin的ListAndWatch会从health channel中获取这些unhealthy devices,并通知到kubelet进行更新。
  • 只监控nvmlEventTypeXidCriticalError事件,一旦监控到某个device的这个Event,就认为该device unhealthy。关于nvmlEventTypeXidCriticalError的说明,请参考NVIDIA的nvml api文档
  • 可以通过设置NVIDIA device plugin Pod内的环境变量DP_DISABLE_HEALTHCHECKS为”all”来取消healthcheck。不设置或者设置为其他值都会启动healthcheck,默认部署时不设置。

Register

Start之后,接着进入Register流程,其代码如下:

// Register registers the device plugin for the given resourceName with Kubelet. func (m *NvidiaDevicePlugin) Register(kubeletEndpoint, resourceName string) error { 	conn, err := dial(kubeletEndpoint, 5*time.Second) 	if err != nil { 		return err 	} 	defer conn.Close()  	client := pluginapi.NewRegistrationClient(conn) 	reqt := &pluginapi.RegisterRequest{ 		Version:      pluginapi.Version, 		Endpoint:     path.Base(m.socket), 		ResourceName: resourceName, 	}  	_, err = client.Register(context.Background(), reqt) 	if err != nil { 		return err 	} 	return nil } 

Register的实现流程图如下:

输入图片说明

  • 注册的Resource Name是nvidia.com/gpu
  • 注册的Version是v1beta1

Stop

Stop的代码如下:

// Stop stops the gRPC server func (m *NvidiaDevicePlugin) Stop() error { 	if m.server == nil { 		return nil 	}  	m.server.Stop() 	m.server = nil 	close(m.stop)  	return m.cleanup() } 

Stop的实现流程图如下:

输入图片说明

  • Stop流程中负责停止gRPC Server,并删除nvidia.sock。

ListAndWatch

ListAndWatch接口主要负责监控health channel,发现有gpu变成unhealthy后,将完成的gpu list信息(ID和health状态)发送给kubelet进行更新。

// ListAndWatch lists devices and update that list according to the health status func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { 	s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})  	for { 		select { 		case <-m.stop: 			return nil 		case d := <-m.health: 			// FIXME: there is no way to recover from the Unhealthy state. 			d.Health = pluginapi.Unhealthy 			s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}) 		} 	} } 

ListAndWatch的实现流程图如下:

输入图片说明

Allocate

Allocate负责接口kubelet为Container请求分配gpu的请求,请求的结构体如下:

// - Allocate is expected to be called during pod creation since allocation //   failures for any container would result in pod startup failure. // - Allocate allows kubelet to exposes additional artifacts in a pod's //   environment as directed by the plugin. // - Allocate allows Device Plugin to run device specific operations on //   the Devices requested type AllocateRequest struct { 	ContainerRequests []*ContainerAllocateRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests" json:"container_requests,omitempty"` }  type ContainerAllocateRequest struct { 	DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs" json:"devicesIDs,omitempty"` } 

device plugin Allocate的Response结构体定义如下:

// AllocateResponse includes the artifacts that needs to be injected into // a container for accessing 'deviceIDs' that were mentioned as part of // 'AllocateRequest'. // Failure Handling: // if Kubelet sends an allocation request for dev1 and dev2. // Allocation on dev1 succeeds but allocation on dev2 fails. // The Device plugin should send a ListAndWatch update and fail the // Allocation request type AllocateResponse struct { 	ContainerResponses []*ContainerAllocateResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses" json:"container_responses,omitempty"` }  type ContainerAllocateResponse struct { 	// List of environment variable to be set in the container to access one of more devices. 	Envs map[string]string `protobuf:"bytes,1,rep,name=envs" json:"envs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` 	// Mounts for the container. 	Mounts []*Mount `protobuf:"bytes,2,rep,name=mounts" json:"mounts,omitempty"` 	// Devices for the container. 	Devices []*DeviceSpec `protobuf:"bytes,3,rep,name=devices" json:"devices,omitempty"` 	// Container annotations to pass to the container runtime 	Annotations map[string]string `protobuf:"bytes,4,rep,name=annotations" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } 

Allocate的代码实现如下:

// Allocate which return list of devices. func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { 	devs := m.devs 	responses := pluginapi.AllocateResponse{} 	for _, req := range reqs.ContainerRequests { 		response := pluginapi.ContainerAllocateResponse{ 			Envs: map[string]string{ 				"NVIDIA_VISIBLE_DEVICES": strings.Join(req.DevicesIDs, ","), 			}, 		}  		for _, id := range req.DevicesIDs { 			if !deviceExists(devs, id) { 				return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id) 			} 		}  		responses.ContainerResponses = append(responses.ContainerResponses, &response) 	}  	return &responses, nil } 

下面是其实现逻辑图:

输入图片说明

  • Allocate中会遍历ContainerRequests,将DeviceIDs封装到ContainerAllocateResponse的Envs:NVIDIA_VISIBLE_DEVICES中,格式为:”${ID_1},${ID_2},...
  • 除此之外,并没有封装Mounts, Devices, Annotations。

总结

NVIDIA/k8s-device-plugin的代码中,依赖于nvidia-docker代码库,存在很多golang调用C库的地方,还需要大家自行到[nvml api文档](https://docs.nvidia.com/deploy/nvml-api)中查看相关C函数声明。上一篇博客介绍了Kubernetes如何通过Device Plugins来使用NVIDIA GPU,这篇博客介绍NVIDIA/k8s-device-plugin的代码实现流程,下一篇博客我觉得还有必要对kubelet device plugin manger进行代码分析,如此才能完整的理解整个交互细节。

本文发表于2018年04月18日 22:38
(c)注:本文转载自https://my.oschina.net/jxcdwangtao/blog/1797047,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2825 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1