当前位置:首页 > 行业发展 > 正文

像Google一样构建机器学习系统2 - 开发你的机器学习工作流

按照上篇文章搭建了一套KubeflowPipelines之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于KubeflowPipelines的机器学习工作流。准备工作机器学习工作流是一个任务驱动的流程,同时也是数据驱动的流程,这里涉及到数据的导入和准备,模型训练Checkpoint的导出...

按照上篇文章搭建了一套KubeflowPipelines之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于KubeflowPipelines的机器学习工作流。准备工作机器学习工作流是一个任......

按照上篇文章搭建了一套KubeflowPipelines之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于KubeflowPipelines的机器学习工作流。

准备工作

机器学习工作流是一个任务驱动的流程,同时也是数据驱动的流程,这里涉及到数据的导入和准备,模型训练Checkpoint的导出评估,到最终模型的导出。这就需要分布式存储作为传输的媒介,这里使用NAS作为分布式存储。

创建分布式存储,这里以NAS为例。这里NFS_SERVER_IP需要替换成真实NAS服务器地址

1.创建阿里云NAS服务,可以参考文档

2.需要在NFSServer中创建/data

mount-tnfs-overs=4.0NFS_SERVER_IP://nfscd/:v1kind:PersistentVolumemetadata:name:user-susanlabels:user-susan:pipelinesspec:persistentVolumeReclaimPolicy:Retaincapacity:storage:10GiaccessModes:-ReadWriteManynfs:server:NFS_SERVER_IPpath:"/data":v1kind:PersistentVolumeClaimmetadata:name:user-susanannotations:description:"thisisthemnistdemo"owner:Tomspec:accessModes:-ReadWriteManyresources:requests:storage:5Giselector:matchLabels:user-susan:_data=_job_op(name="prepare-data",image="byrnedo/alpine-curl",data=data,command="mkdir-p/training/dataset/mnist\cd/training/dataset/mnist\curl-O;\curl-O;\curl-O;\curl-O")3.exportthemodelexport_model=_job_op(name="export-model",image="tensorflow/tensorflow:1.11.0-py3",sync_source="",env=["GIT_SYNC_REV=%s"%(commit)],data=data,command="echo%s;pythoncode/tensorflow-sample-code/tfjob/docker/mnist/export__version=%s--checkpoint_path=/training/output/mnist/training/output/models"%(,model_version))

KubeflowPipelines会将上面的代码转化成一个有向无环图(DAG),其中的每一个节点就是Component(组件),而Component(组件)之间的连线代表它们之间的依赖关系。从PipelinesUI可以看到DAG图:

首先具体理解一下数据准备的部分,这里我们提供了_job_op的PythonAPI,需要指定该步骤的名称:name,需要使用的容器镜像:image,要使用的数据以及其对应到容器内部的挂载目录:data,这里的data是一个数组格式,如data=["user-susan:/training"],表示可以挂载到多个数据。user-susan是之前创建的PersistentVolumeClaim,而/training为容器内部的挂载目录。

prepare_data=_job_op(name="prepare-data",image="byrnedo/alpine-curl",data=data,command="mkdir-p/training/dataset/mnist\cd/training/dataset/mnist\curl-O;\curl-O;\curl-O;\curl-O")

而上述步骤实际上是从指定地址利用curl下载数据到分布式存储对应的目录/training/dataset/mnist,请注意这里的/training为分布式存储的根目录,类似大家熟悉的根mount点;而/training/dataset/mnist是子目录。其实后面的步骤可以通过使用同样的根mount点,读到数据,进行运算。

第二步是利用下载到分布式存储的数据,并通过git指定固定commitid下载代码,并进行模型训练

train=_job_op(name="train",image="tensorflow/tensorflow:1.11.0-gpu-py3",sync_source="",env=["GIT_SYNC_REV=%s"%(commit)],gpus=gpus,data=data,command='''echo%s;pythoncode/tensorflow-sample-code/tfjob/docker/mnist/\--max_steps500--data_dir/training/dataset/mnist\--log_dir/training/output/mnist--learning_rate%s\--dropout%s'''%(prepare_,learning_rate,dropout),metrics=["Train-accuracy:PERCENTAGE"])

可以看到这个步骤比数据准备要相对复杂一点,除了和第一步骤中的name,image,data和command之外,在模型训练步骤中,还需要指定:

获取代码的方式:从可重现实验的角度来看,对于运行试验代码的追本溯源,是非常重要的一环。可以在API调用时指定sync_source的git代码源,同时通过设定env中GIT_SYNC_REV指定训练代码的commitid

gpu:默认为0,就是不使用GPU;如果为大于0的整数值,就代表该步骤需要这个数量的GPU数。

metrics:同样是从可重现和可比较的实验目的出发,用户可以将需要的一系列指标导出,并且通过PipelinesUI上直观的显示和比较。具体使用方法分为两步,1.在调用API时以数组的形式指定要收集指标的metricsname和指标的展示格式PERCENTAGE或者是RAW,比如metrics=["Train-accuracy:PERCENTAGE"]。2.由于Pipelines默认会从stdout日志中收集指标,你需要在真正运行的模型代码中输出{metricsname}={value}或者{metricsname}:{value},可以参考具体样例代码

值得注意的是:

最后export_model是基于train训练产生的checkpoint,生成训练模型:

export_model=_job_op(name="export-model",image="tensorflow/tensorflow:1.11.0-py3",sync_source="",env=["GIT_SYNC_REV=%s"%(commit)],data=data,command="echo%s;pythoncode/tensorflow-sample-code/tfjob/docker/mnist/export__version=%s--checkpoint_path=/training/output/mnist/training/output/models"%(,model_version))

export_model和第二步train类似,甚至要更为简单,它只是从git同步模型导出代码并且利用共享目录/training/output/mnist中的checkpoint执行模型导出。

整个工作流程看起来还是很直观的,下面就可以定义一个Python方法将整个流程贯穿在一起。

@(name='pipelinetorunjobs',description='showshowtorunpipelinejobs.')defsample_pipeline(learning_rate='0.01',dropout='0.9',model_version='1',commit='f097575656f927d86d99dd64931042e1a9003cb2'):

而实际上,这些参数都可以在用户提交工作流时进行覆盖,以下就是提交工作流对应的UI:

提交Pipeline

您可以在自己的Kubernetes内将前面开发工作流的PythonDSL提交到KubeflowPipelines服务中,实际提交代码很简单:

KFP_SERVICE=":8888"().compile(sample_pipeline,__file__+'.')client=(host=KFP_SERVICE)try:experiment_id=_experiment(experiment_name=EXPERIMENT_NAME).idexcept:experiment_id=_experiment(EXPERIMENT_NAME).idrun=_pipeline(experiment_id,RUN_ID,__file__+'.',params={'learning_rate':learning_rate,'dropout':dropout,'model_version':model_version,'commit':commit})

在集群内准备一个python3的环境,并且安装KubeflowPipelinesSDK

kubectlexec-it-nkubeflow$(kubectlgetpo-ljob-name=pipeline-client-nkubeflow|grep-vNAME|awk'{print$1}')bash

登录到Python3的环境后,执行如下命令,连续提交两个不同参数的任务

/experiments,比如

总结

实现一个可以运行的KubeflowPipeline需要的步骤是:

1.构建Pipeline(流水线)中需要的最小执行单元Component(组件),如果是利用原生定义的_ops,需要构建两部分代码:

构建运行时代码:通常是为每个步骤构建容器镜像,作为Pipelines和真正执行业务逻辑代码之间的适配器。它所做的事情为获取Pipelines上下文的输入参数,调用业务逻辑代码,并且将需要传递到下个步骤的输出按照Pipelines的规则放到容器内的指定位置,由底层工作流组件负责传递。这样产生的结果是运行时代码与业务逻辑代码会耦合在一起。可以参考KubeflowPipelines的例子

构建客户端代码:这个步骤通常是长成下面的样子,熟悉Kubernetes的朋友会发现这个步骤实际上就是在编写PodSpec:

container_op=(name=name,image='train-image',arguments=['--input_dir',input_dir,'--output_dir',output_dir,'--model_name',model_name,'--model_version',model_version,'--epochs',epochs],file_outputs={'output':'/'})container__volume(k8s_(host_path=k8s_(path=persistent_volume_path),name=persistent_volume_name))container__volume_mount(k8s_(mount_path=persistent_volume_path,name=persistent_volume_name))

利用原生定义的_ops的好处在于灵活,由于开放了和Pipelines的交互接口,用户可以在container_ops这个层面做许多事情。但是它的问题在于:

复用度低,每个Component都需要构建镜像和开发运行时代码

复杂度高,使用者需要了解Kubernetes的概念,比如resourcelimit,PVC,nodeselector等一系列概念

支持分布式训练困难,由于container_op为单容器操作,如果需要支持分布式训练就需要在container_ops中提交和管理类似TFJob的任务。这里会带来复杂度和安全性的双重挑战,复杂度比较好理解,安全性是说提交TFJob这类任务的权限会需要开放额外的权限给Pipeline的开发者。

另一种方式是使用arena_op这种可以重用的ComponentAPI,它使用通用运行时代码,可以免去重复构建运行时代码的工作;同时利用通用一套的arena_opAPI简化用户的使用;也支持ParameterServer和MPI等场景。建议您使用这种方式编译Pipelines

2.将构建好的Component(组件)拼接成Pipeline(流水线)

3.将Pipeline(流水线)编译后Argo的执行引擎(Argo)识别的DAG配置文件,并提交的DAG配置文件到KubeflowPipelines,并利用KubeflowPipelines自身的UI查看流程结果。

最新文章