在阅读本文之前,假设已经在 GCP 上安装好了 Kubeflow。

首先进入 Kubeflow,点击 Notebook Server,新建一个 Jupyter Notebook

新建的时候会让你输入 Name 和 Namespace,在 Kubeflow 中,每个用户都在 k8s 集群上有自己的 Namespace。

这里输入的 Name 对应的 Notebook Pod 最后会在自己的 Namespace 下。

新的 Notebook 里面是空的,我们需要下载一些例子。打开 terminal

然后输入 git clone 命令:

git clone https://github.com/kubeflow/examples.git

回到默认界面会看到刚刚 clone 的项目,打开 mnist 目录下的 mnist_gcp.ipynb

开始

首先第一个问题,当我打开这个 Jupyter Notebook 的 WebUI 时,它运行在哪里?

Notebook 是在哪个 Pod

$ kubectl -n Your-namespace get pod

NAME                                 READY   STATUS      RESTARTS   AGE
fairing-builder-chvkq-6s4cn          0/1     Completed   0          3d23h
mnist-model-7886dcbb5b-t2kk8         1/1     Running     0          3d22h
mnist-tensorboard-774c585b7c-65766   2/2     Running     0          21h
mnist-train-2596-chief-0             0/1     Completed   0          3d22h
mnist-train-2596-worker-0            0/1     Completed   0          3d22h
mnist-ui-7f95c8498b-xqsfs            2/2     Running     0          3d22h
test1-0                              2/2     Running     0          3d23h

test1-0 是之前在 UI里面创建 Notebook server 时定下的名字,于是test1-0

这个 Pod 的镜像其实是 TensorFlow 的基础上,为 Kubeflow 做了稍微一些定制。

进入到这个 Pod 里面之后用 ps 命令看一下

________                               _______________
___  __/__________________________________  ____/__  /________      __
__  /  _  _ \_  __ \_  ___/  __ \_  ___/_  /_   __  /_  __ \_ | /| / /
_  /   /  __/  / / /(__  )/ /_/ /  /   _  __/   _  / / /_/ /_ |/ |/ /
/_/    \___//_/ /_//____/ \____//_/    /_/      /_/  \____/____/|__/


You are running this container as user with ID 1000 and group 100,
which should map to the ID and group for your user on the Docker host. Great!

tf-docker ~ > ps aux
USER         PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
jovyan         1  0.0  0.0   4520   764 ?        Ss   Mar18   0:07 tini -- sh -c jupyter notebook --notebook-dir=/home/${NB_USER} --ip=0.0.0.0 --no-browser
jovyan         7  0.0  0.0   4628   788 ?        S    Mar18   0:00 sh -c jupyter notebook --notebook-dir=/home/${NB_USER} --ip=0.0.0.0 --no-browser --allow-
jovyan         8  0.0  0.2 801768 67724 ?        Sl   Mar18   0:55 /usr/bin/python3 /usr/local/bin/jupyter-notebook --notebook-dir=/home/jovyan --ip=0.0.0.0
jovyan        61  0.0  0.0   4628  1680 pts/0    Ss+  Mar18   0:00 /bin/sh -l
jovyan        76  0.0  0.4 1875928 139504 ?      Ssl  Mar18   5:35 /usr/bin/python3 -m ipykernel_launcher -f /home/jovyan/.local/share/jupyter/runtime/kerne
jovyan       545  0.0  0.0  20180  3856 pts/1    Ss+  02:04   0:00 bash

第一行的 TensorFlow logo 说明了这个image 是基于 TensorFlow 的。

其中 /usr/bin/python3 -m ipykernel_launcher 这个进程对应 WebUI 里面的 notebook,每当你打开一个新的 Notebook 写代码时,Pod 里面就会多出一个进程。

其中,ipykernel 是一个交互式 python 模块。

Step-by-Step

一开始是执行一些 import 操作。

from importlib import reload
from oauth2client.client import GoogleCredentials
credentials = GoogleCredentials.get_application_default()

import notebook_setup
reload(notebook_setup)
notebook_setup.notebook_setup()

这些 import 的模块应该都是定制 TensorFlow image 中预先配置好的。如果想知道源码具体位置,可以

import name
printf(name.__file__)

notebook_setup.py 模块是在 example/mnist 目录下。

在 notebook_setup.py 中 setup_gcp(): 中可以直接调用 gcloud 命令,并且已经配置好可以直接访问当前 project,猜测是因为 GCP 定制的 kubeflow image 预先设置好了 ?

from kubernetes import client as k8s_client
from kubernetes.client import rest as k8s_rest
from kubeflow import fairing   
from kubeflow.fairing import utils as fairing_utils
from kubeflow.fairing.builders import append
from kubeflow.fairing.deployers import job
from kubeflow.fairing.preprocessors import base as base_preprocessor

# Setting up Google Container Registry (GCR) for storing output containers.
# You can use any Docker container registry instead of GCR.
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
namespace = fairing_utils.get_current_k8s_namespace()

再 import kubeflow 的一些模块,其中比较有意思的是 kubeflow.fairing。 导入这个模块后,可以很方便的获取 GCP 的配置,比如 project name, namespace 等。

Use Kubeflow Fairing to build the Docker

这里有意思的是用 kaniko-project 在 GKE 上 build image。

有意思在什么地方呢? 我们知道 GKE 上的运行单位是 Pod,Pod 本身就是容器的集合,一般我们都是在本地制作好 image 后 push 到 gcr 上,而这个项目是在 Pod 里面 build image 后直接 push 到 gcr。

具体参考 kaniko项目

这一步是 build 什么 docker image 呢 ?

在 example mnist 目录下,有一个 Dockerfile.model

> cat Dockerfile.model
#This container contains your model and any helper scripts specific to your model.
# When building the image inside mnist.ipynb the base docker image will be overwritten
FROM tensorflow/tensorflow:1.15.2-py3

ADD model.py /opt/model.py
RUN chmod +x /opt/model.py

ENTRYPOINT ["/usr/local/bin/python"]
CMD ["/opt/model.py"]

看来这个 image 是用来训练模型的。

最后生成的 image 被push 到了 gcr.io 上。

Built image gcr.io/project-name/fairing-job/mnist:85809C08

Create a Cloud Storage bucket

这一步没什么好说的,就是 google cloud 的 api 调用。

from google.cloud import storage
bucket = f"{GCP_PROJECT}-mnist"

client = storage.Client()
b = storage.Bucket(client=client, name=bucket)

Distributed training

这一步就是启动一个 Pod,它的 deployment 类型是 TensorFlow 定制的 TFJob, 所用的 image 和保存数据的 bucket 都是前两步准备好的。

train_name = f"mnist-train-{uuid.uuid4().hex[:4]}"
num_ps = 1
num_workers = 2
model_dir = f"gs://{bucket}/mnist"
export_path = f"gs://{bucket}/mnist/export" 
train_steps = 200
batch_size = 100
learning_rate = .01
image = cluster_builder.image_tag

训练的数据从哪来

那么 model.py 训练的数据从哪来呢? 在 example 目录似乎并没有看到。这要从 mnist 说起,MNIST 是 Modified National Institute of Standards and Technology 的缩写,这是一个手写数字的数据库

因此,model.py 中导入的库已经会自动下载训练数据。

然后,调用 kubeflow.tfjob.api 提供的函数 apply 这些配置。其作用等效于 kubectl apply, 但是因为 yaml 中有变量,所以只能用函数的方式。

from kubeflow.tfjob.api import tf_job_client as tf_job_client_module

tf_job_client = tf_job_client_module.TFJobClient()

tf_job_body = yaml.safe_load(train_spec)
tf_job = tf_job_client.create(tf_job_body, namespace=namespace)  
logging.info(f"Created job {namespace}.{train_name}")

另一个 apply 的函数是 k8s_util.apply_k8s_specs,在下文会遇到。

为什么叫 Distributed training 呢? 因为 TFJob yaml file 里有三个角色:PS,Chief,Worker。

Deploy TensorBoard

TensorBoard 是一个调参的 WebUI,不是本文的重点,先跳过。

Serve the model

Serving 一个模型用的是 TensorFlow 提供的 tensorflow/serving:1.15.0 镜像,TensorFlow serving 是一个用 C++ 编写的 serving 程序。

从给出的 YAML file 能看出,传入的参数包括了前面训练后生成的模型,它保存在 model_base_path = export_path 中,而export_path 在训练模型那一步已经指定。

其实说到底,最后用户是与 serving 打交道,因此这个 Pod 需要一个对应的 k8s Service 类型,YAML file 中指定了 9000 和 8500 端口,前者给 grpc 调用,后者是 http 接口。

Deploy the UI for the MNIST web

前面已经把训练模型、开启serving 都做好了,最后一步,是给一个用户界面让用户使用,因此这个 UI 是一个单独的组件。

从源码的 web-ui 目录中可以看出,是用 python flask 框架提供简单的 http server 服务,以及一个 Dockerfile。

其中最重要的是 mnist_client.py 文件中的 get_prediction() 函数,由它调用后端的 Model Serving 服务。

UI 和 Pod 内通信

那么 UI 是怎么与 Model Serving 通信的呢? 最主要的是调用 grpc 和 tensorflow_serving 的 python API。 至于 grpc,前面在部署 TensorFlow serving 的时候 service 文件提供了 grpc 和 http 端口。

from grpc.beta import implementations
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2

def get_prediction(image, server_host='mnist-service', server_port=9000,
                   server_name="mnist", timeout=10.0):
  """
  Retrieve a prediction from a TensorFlow model server

  :param image:       a MNIST image represented as a 1x784 array
  :param server_host: the address of the TensorFlow server
  :param server_port: the port used by the server
  :param server_name: the name of the server
  :param timeout:     the amount of time to wait for a prediction to complete
  :return 0:          the integer predicted in the MNIST image
  :return 1:          the confidence scores for all classes
  :return 2:          the version number of the model handling the request
  """
  print("connecting to:%s:%i" % (server_host, server_port))
  # initialize to server connection
  channel = implementations.insecure_channel(server_host, server_port)
  stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)

  # build request
  request = predict_pb2.PredictRequest()
  request.model_spec.name = server_name
  request.model_spec.signature_name = 'serving_default'
  request.inputs['x'].CopyFrom(
      tf.contrib.util.make_tensor_proto(image, shape=image.shape))

  # retrieve results
  result = stub.Predict(request, timeout)
  resultVal = result.outputs["classes"].int_val[0]
  scores = result.outputs['predictions'].float_val
  version = result.outputs["classes"].int_val[0]
  return resultVal, scores, version

结束语

  1. 需要熟悉一堆第三方库怎么用。比如
  • 操作 GCP的 oauth2client.client
  • k8s_util
  • kubeflow,kubeflow.fairing.preprocessors
  • kubernetes
  • google.cloud
  1. 几个有意思的开源项目,比如 kubeflow fairing,在 GKE 里面 build image 的 kaniko 等。

参考资料