k8s数据卷管理源码分析


声明:本文转载自https://my.oschina.net/u/1378920/blog/1537622,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

概述

volume是k8s中很重要的一个环节,主要用来存储k8s中pod生产的一些系统或者业务数据。k8s在kubelet中提供了volume管理的逻辑

源码分析

首先是kubelet启动方法

func main() {        s := options.NewKubeletServer()        s.AddFlags(pflag.CommandLine)         flag.InitFlags()        logs.InitLogs()        defer logs.FlushLogs()         verflag.PrintAndExitIfRequested()         if err := app.Run(s, nil); err != nil {               fmt.Fprintf(os.Stderr, "error: %v\n", err)               os.Exit(1)        } } 

很容易发现run方法中包含了kubelet所有重要信息

func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {          		//配置验证     	...         if kubeDeps == nil {               ...                kubeDeps, err = UnsecuredKubeletDeps(s)                ...        }         //初始化cAdvisor以及containerManager等管理器        ...          if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil {               return err        }         ... } 

里面有两个与volume管理相关的重要方法

  • UnsecuredKubeletDeps:会初始化docker client、网络管理插件、数据管理插件等系统核心组件,因为不方便对外部开放,所以命名为unsecure。其中我们需要关注的是它对volume plugin的初始化操作

    	func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) {  	    ...  		return &kubelet.KubeletDeps{ 			Auth:               nil,  			CAdvisorInterface:  nil,  			Cloud:              nil,  			ContainerManager:   nil, 			DockerClient:       dockerClient, 			KubeClient:         nil, 			ExternalKubeClient: nil, 			Mounter:            mounter, 			NetworkPlugins:     ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir), 			OOMAdjuster:        oom.NewOOMAdjuster(), 			OSInterface:        kubecontainer.RealOS{}, 			Writer:             writer, 			VolumePlugins:      ProbeVolumePlugins(s.VolumePluginDir), 			TLSOptions:         tlsOptions, 		}, nil 	} 

    在初始化volume plugin的时候会传递VolumePluginDir作为自定义plugin的路径,默认路径为**/usr/libexec/kubernetes/kubelet-plugins/volume/exec/**

    	func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin { 		allPlugins := []volume.VolumePlugin{} 		allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(volume.VolumeConfig{})...) 		allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(volume.VolumeConfig{})...) 		allPlugins = append(allPlugins, secret.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, quobyte.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, cephfs.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, downwardapi.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(pluginDir)...) 		allPlugins = append(allPlugins, azure_file.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, configmap.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, photon_pd.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...) 		allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...) 		return allPlugins 	}  

    可以观察到众多插件中,有一个名为flexvolume,只有这个插件带有参数pluginDir,说明只有这个插件支持自定义实现。具体kubelet怎么和这些插件交互,以及这些插件提供哪些接口,还需要继续阅读代码

  • RunKubelet:这才是kubelet服务的启动方法,其中最重要的功能都藏在startKubelet中

    	func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {  		//初始化启动器 		...  		if runOnce { 			if _, err := k.RunOnce(podCfg.Updates()); err != nil { 				return fmt.Errorf("runonce failed: %v", err) 			} 			glog.Infof("Started kubelet %s as runonce", version.Get().String()) 		} else { 			startKubelet(k, podCfg, kubeCfg, kubeDeps) 			glog.Infof("Started kubelet %s", version.Get().String()) 		} 		return nil 	} 

    startKubelet包含两个环节

    • 不断同步apiserver的pod信息,根据新增、删除的pod对volume状态进行同步更新
    • 启动服务,监听controller manager的请求。其中controller manager可以辅助kubelet管理volume,用户也可以选择禁用controller manager的管理
    	func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) { 		// 同步pod信息 		go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)  		// 启动kubelet服务 		if kubeCfg.EnableServer { 			go wait.Until(func() { 				k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling) 			}, 0, wait.NeverStop) 		} 		if kubeCfg.ReadOnlyPort > 0 { 			go wait.Until(func() { 				k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort)) 			}, 0, wait.NeverStop) 		} 	} 

    跟踪同步pod信息的Run方法,会追查到这段代码

    	func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {  	    ...  		go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)  		if kl.kubeClient != nil { 			//同步node信息 			go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) 		}  		// 同步pod信息 		kl.pleg.Start() 		kl.syncLoop(updates, kl) 	}  

    kl.volumeManager是kubelet进行数据卷管理的核心接口

    	type VolumeManager interface { 		Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})  		WaitForAttachAndMount(pod *v1.Pod) error  		GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap  		GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64  		GetVolumesInUse() []v1.UniqueVolumeName  		ReconcilerStatesHasBeenSynced() bool  		VolumeIsAttached(volumeName v1.UniqueVolumeName) bool  		MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName) 	} 

    VolumeManager的Run会执行一个异步循环,当pod被调度到该node,它会检查该pod所申请的所有volume,根据这些volume与pod的关系做attach/detach/mount/unmount操作

    	func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { 		defer runtime.HandleCrash()  		go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh) 		glog.V(2).Infof("The desired_state_of_world populator starts")  		glog.Infof("Starting Kubelet Volume Manager") 		go vm.reconciler.Run(stopCh)  		<-stopCh 		glog.Infof("Shutting down Kubelet Volume Manager") 	} 

    其中重点关注的地方是vm.desiredStateOfWorldPopulator.Runvm.reconciler.Run这两个方法。在介绍这两个方法之前,需要补充一个关键信息,这也是理解这两个方法的关键信息。

    kubelet管理volume的方式基于两个不同的状态:

    • DesiredStateOfWorld:预期中,pod对volume的使用情况,简称预期状态。当pod.yaml定制好volume,并提交成功,预期状态就已经确定
    • ActualStateOfWorld:实际中,pod对voluem的使用情况,简称实际状态。实际状态是kubelet的后台线程监控的结果

    理解了这两个状态,就能大概知道vm.desiredStateOfWorldPopulator.Run这个方法是干什么的呢。很明显,它就是根据从apiserver同步到的pod信息,来更新DesiredStateOfWorld。另外一个方法vm.reconciler.Run,是预期状态和实际状态的协调者,它负责将实际状态调整成与预期状态。预期状态的更新实现,以及协调者具体如何协调,需要继续阅读代码才能理解

    追踪vm.desiredStateOfWorldPopulator.Run,我们发现这段逻辑

    	func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() { 		for _, pod := range dswp.podManager.GetPods() { 			if dswp.isPodTerminated(pod) { 				continue 			} 			dswp.processPodVolumes(pod) 		} 	} 

    kubelet会同步新增的pod到desiredStateOfWorldPopulator的podManager中。这段代码就是轮询其中非结束状态的pod,并交给desiredStateOfWorldPopulator处理

    	func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {  		...  		for _, podVolume := range pod.Spec.Volumes { 			volumeSpec, volumeGidValue, err := 				dswp.createVolumeSpec(podVolume, pod.Namespace) 			if err != nil { 				glog.Errorf( 					"Error processing volume %q for pod %q: %v", 					podVolume.Name, 					format.Pod(pod), 					err) 				continue 			}   			_, err = dswp.desiredStateOfWorld.AddPodToVolume( 				uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue) 			if err != nil { 				glog.Errorf( 					"Failed to add volume %q (specName: %q) for pod %q to desiredStateOfWorld. err=%v", 					podVolume.Name, 					volumeSpec.Name(), 					uniquePodName, 					err) 			}  			glog.V(10).Infof( 				"Added volume %q (volSpec=%q) for pod %q to desired state.", 				podVolume.Name, 				volumeSpec.Name(), 				uniquePodName) 		}  		dswp.markPodProcessed(uniquePodName) 	} 

    desiredStateOfWorldPopulator并不处理很重的逻辑,只是作为一个代理,将控制某个pod预期状态的逻辑交付给desiredStateOfWorld,并标记为已处理

    	func (dsw *desiredStateOfWorld) AddPodToVolume( 		podName types.UniquePodName, 		pod *v1.Pod, 		volumeSpec *volume.Spec, 		outerVolumeSpecName string, 		volumeGidValue string) (v1.UniqueVolumeName, error) {  		...  		dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{ 			podName:             podName, 			pod:                 pod, 			spec:                volumeSpec, 			outerVolumeSpecName: outerVolumeSpecName, 		}  		return volumeName, nil 	} 

    这段逻辑中,我们忽略了前面一系列预处理操作,直接关注最核心的地方:确定预期状态的方式就是,用一个映射表结构,绑定volume到pod之间的关系,这个关系表就是绑定关系的参考依据

    看完了desiredStateOfWorldPopulator的处理逻辑,接着进入另一个核心接口reconciler。它才是volume manager中最重要的控制器

    追踪reconciler的Run方法,我们定位到最核心的一段代码

    	func (rc *reconciler) reconcile() {  		//umount 		for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() { 			if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {  				...  				err := rc.operationExecutor.UnmountVolume( 					mountedVolume.MountedVolume, rc.actualStateOfWorld)  				... 			} 		}  		// attach/mount 		for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { 			volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) 			volumeToMount.DevicePath = devicePath 			if cache.IsVolumeNotAttachedError(err) {  				...  				err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)  				...  			} else if !volMounted || cache.IsRemountRequiredError(err) {  				...  				err := rc.operationExecutor.MountVolume( 					rc.waitForAttachTimeout, 					volumeToMount.VolumeToMount, 					rc.actualStateOfWorld)  				... 			} 		}  		//detach/unmount 		for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { 			if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && 				!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { 				if attachedVolume.GloballyMounted {  					...  					err := rc.operationExecutor.UnmountDevice( 						attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter) 					...  				} else {  					...  					err := rc.operationExecutor.DetachVolume( 							attachedVolume.AttachedVolume, false,rc.actualStateOfWorld)  					... 				} 			} 		} 	} 

    我略去了多余的代码,保留最核心的部分。这段控制逻辑就是一个协调器,具体要做的事情就是,根据实际状态与预期状态的差异,做协调操作

    • volume和pod的预期状态不存在绑定关系,则detach volume,并对pod和volume执行unmount操作
    • volume和pod的预期状态存在绑定关系,则attach volume,并对pod和volume执行mount操作

    如果采用自定义的flexvolume插件,上述这些方法会对插件中实现的方法进行系统调用

    • AttachVolume:调用attach
    • DetachVolume:调用detach
    • MountVolume:调用mountdevice,mount
    • UnmountVolume:调用unmount
    • UnmountDevice:调用umountdevice

    flex volume提供的lvm插件。如果需要支持mount和unmount操作,可以在这个脚本中补充

    	#!/bin/bash  	# Copyright 2015 The Kubernetes Authors. 	# 	# Licensed under the Apache License, Version 2.0 (the "License"); 	# you may not use this file except in compliance with the License. 	# You may obtain a copy of the License at 	# 	#     http://www.apache.org/licenses/LICENSE-2.0 	# 	# Unless required by applicable law or agreed to in writing, software 	# distributed under the License is distributed on an "AS IS" BASIS, 	# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 	# See the License for the specific language governing permissions and 	# limitations under the License.  	# Notes: 	#  - Please install "jq" package before using this driver. 	usage() { 		err "Invalid usage. Usage: " 		err "\t$0 init" 		err "\t$0 attach <json params> <nodename>" 		err "\t$0 detach <mount device> <nodename>" 		err "\t$0 waitforattach <mount device> <json params>" 		err "\t$0 mountdevice <mount dir> <mount device> <json params>" 		err "\t$0 unmountdevice <mount dir>" 		err "\t$0 isattached <json params> <nodename>" 		exit 1 	}  	err() { 		echo -ne $* 1>&2 	}  	log() { 		echo -ne $* >&1 	}  	ismounted() { 		MOUNT=`findmnt -n ${MNTPATH} 2>/dev/null | cut -d' ' -f1` 		if [ "${MOUNT}" == "${MNTPATH}" ]; then 			echo "1" 		else 			echo "0" 		fi 	}  	getdevice() { 		VOLUMEID=$(echo ${JSON_PARAMS} | jq -r '.volumeID') 		VG=$(echo ${JSON_PARAMS}|jq -r '.volumegroup')  		# LVM substitutes - with -- 		VOLUMEID=`echo $VOLUMEID|sed s/-/--/g` 		VG=`echo $VG|sed s/-/--/g`  		DMDEV="/dev/mapper/${VG}-${VOLUMEID}" 		echo ${DMDEV} 	}  	attach() { 		JSON_PARAMS=$1 		SIZE=$(echo $1 | jq -r '.size')  		DMDEV=$(getdevice) 		if [ ! -b "${DMDEV}" ]; then 			err "{\"status\": \"Failure\", \"message\": \"Volume ${VOLUMEID} does not exist\"}" 			exit 1 		fi 		log "{\"status\": \"Success\", \"device\":\"${DMDEV}\"}" 		exit 0 	}  	detach() { 		log "{\"status\": \"Success\"}" 		exit 0 	}  	waitforattach() { 		shift 		attach $* 	}  	domountdevice() { 		MNTPATH=$1 		DMDEV=$2 		FSTYPE=$(echo $3|jq -r '.["kubernetes.io/fsType"]')  		if [ ! -b "${DMDEV}" ]; then 			err "{\"status\": \"Failure\", \"message\": \"${DMDEV} does not exist\"}" 			exit 1 		fi  		if [ $(ismounted) -eq 1 ] ; then 			log "{\"status\": \"Success\"}" 			exit 0 		fi  		VOLFSTYPE=`blkid -o udev ${DMDEV} 2>/dev/null|grep "ID_FS_TYPE"|cut -d"=" -f2` 		if [ "${VOLFSTYPE}" == "" ]; then 			mkfs -t ${FSTYPE} ${DMDEV} >/dev/null 2>&1 			if [ $? -ne 0 ]; then 				err "{ \"status\": \"Failure\", \"message\": \"Failed to create fs ${FSTYPE} on device ${DMDEV}\"}" 				exit 1 			fi 		fi  		mkdir -p ${MNTPATH} &> /dev/null  		mount ${DMDEV} ${MNTPATH} &> /dev/null 		if [ $? -ne 0 ]; then 			err "{ \"status\": \"Failure\", \"message\": \"Failed to mount device ${DMDEV} at ${MNTPATH}\"}" 			exit 1 		fi 		log "{\"status\": \"Success\"}" 		exit 0 	}  	unmountdevice() { 		MNTPATH=$1 		if [ ! -d ${MNTPATH} ]; then 			log "{\"status\": \"Success\"}" 			exit 0 		fi  		if [ $(ismounted) -eq 0 ] ; then 			log "{\"status\": \"Success\"}" 			exit 0 		fi  		umount ${MNTPATH} &> /dev/null 		if [ $? -ne 0 ]; then 			err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at ${MNTPATH}\"}" 			exit 1 		fi  		log "{\"status\": \"Success\"}" 		exit 0 	}  	isattached() { 		log "{\"status\": \"Success\", \"attached\":true}" 		exit 0 	}  	op=$1  	if [ "$op" = "init" ]; then 		log "{\"status\": \"Success\"}" 		exit 0 	fi  	if [ $# -lt 2 ]; then 		usage 	fi  	shift  	case "$op" in 		attach) 			attach $* 			;; 		detach) 			detach $* 			;; 		waitforattach) 			waitforattach $* 			;; 		mountdevice) 			domountdevice $* 			;; 		unmountdevice) 			unmountdevice $* 			;; 		isattached) 	                isattached $* 	                ;; 		*) 			log "{ \"status\": \"Not supported\" }" 			exit 0 	esac  	exit 1 

    值得注意的是,为什么会有两次mount操作,一次mountdevice,一次mount。分别是做什么的?

    其实k8s提供的volume管理方式是,一个volume可以被多个pod挂载,如果某个device需要作为多个pod的volume,就需要多次挂载。但是device只能被挂载一次。所以,k8s采用的方式是,先用mountdevice将device挂载到一个全局目录,然后这个全局目录就可以被多次挂载到pod的卷目录。如此一来,就能完成多pod挂载同一个volume

总结

只有理解了volume manager的代码,在使用它提供的volume plugin或者实现自定义flex volume plugin时才能驾轻就熟。以上代码,都是基于k8s v1.6.6版本

本文发表于2017年09月15日 08:35
(c)注:本文转载自https://my.oschina.net/u/1378920/blog/1537622,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2472 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1