Kubeflow 1.0

https://www.kubeflow.org/docs/started/kubeflow-overview/

 

Kubeflow Overview

How Kubeflow helps you organize your ML workflow

www.kubeflow.org

참조 : 쿠브플로우 (이명환) https://github.com/mojokb

 

https://www.kubeflow.org/docs/started/k8s/overview/

 

kubeflow 1.1 설치를 위해서는 kubernetes 1.16 버전이 필요하다.

이전에 설치했던 kubernetes 1.17 버전을 삭제한다.

 

Uninstall Kubernetes

kubectl drain {nodename} --delete-local-data --force --ignore-daemonsets
kubectl delete node {nodename}

kubeadm reset 
#on centos base
sudo yum remove kubeadm kubectl kubelet kubernetes-cni kube*
#on centos base
sudo yum autoremove
 
sudo rm -rf ~/.kube

 

 

 

Install specific version of Kubernetes 

 

참조 : Create Kubernetes Cluster

 

Install Kubeflow

https://www.kubeflow.org/docs/started/k8s/kfctl-k8s-istio/

 

kfctl_k8s_istio config를 이용해서 기존의 kubernetes cluster에 kubeflow를 설치한다.

 

local path provisioner

이를 위해 dynamic volume provisioner가 필요하다. 홈클라우드 환경에 local path provisioner 를 설치해보자.

 

https://github.com/rancher/local-path-provisioner#deployment

 

$ kubectl apply -f https://raw.githubusercontent.com/rancher/local-path-provisioner/master/deploy/local-path-storage.yaml

namespace/local-path-storage created
serviceaccount/local-path-provisioner-service-account created
clusterrole.rbac.authorization.k8s.io/local-path-provisioner-role created
clusterrolebinding.rbac.authorization.k8s.io/local-path-provisioner-bind created
deployment.apps/local-path-provisioner created
storageclass.storage.k8s.io/local-path created
configmap/local-path-config created

$ kubectl -n local-path-storage get pod

NAME                                      READY   STATUS    RESTARTS   AGE
local-path-provisioner-84f4c8b584-lmxkj   1/1     Running   0          59s

※ 만약 Single Node 환경이라면, master node의 taint 설정을 해제한다.

$ kubectl taint node {nodename} node-role.kubernetes.io/master:NoSchedule-
node/kubeflow untainted

 

nfs prvsioner

multi node k8s cluster환경에서는 nfs prvsioner가 필요하다.

 

$ yum install nfs-utils

$ mkdir /nfsroot

chmod -R 755 /nfsroot

chown nfsnobody:nfsnobody /nfsroot

systemctl enable rpcbind
systemctl enable nfs-server
systemctl enable nfs-lock
systemctl enable nfs-idmap
systemctl start rpcbind
systemctl start nfs-server
systemctl start nfs-lock
systemctl start nfs-idmap

 

$ vi /etc/exports

/nfsroot *(rw,no_root_squash,no_subtree_check)

 

$ helm install nfs-client-provisioner --set nfs.server=192.168.19.129 --set nfs.path=/nfsroot --set storageClass.name=nfs --set storageClass.defaultClass=true stable/nfs-client-provisioner

 

$ kubectl get storageclass

NAME            PROVISIONER                            AGE 
local-path      rancher.io/local-path                  71m 
nfs (default)   cluster.local/nfs-client-provisioner   12m

 

install private docker registry

 

https://github.com/mojokb/handson-kubeflow/blob/master/registry/kubeflow-registry-deploy.yaml

https://github.com/mojokb/handson-kubeflow/blob/master/registry/kubeflow-registry-svc.yaml

$ kubectl apply -f kubeflow-registry-deploy.yaml

$ kubectl apply -f kubeflow-registry-svc.yaml

 

 

쿠버네티스 각 노드에 hosts 설정 및 insecure-registries 설정한다.

 

$ vi /etc/hosts

192.168.19.129  kubeflow-registry.default.svc.cluster.local


$ vi /etc/docker/daemon.json 

{
  "exec-opts": ["native.cgroupdriver=systemd"],
  "log-driver": "json-file",
  "log-opts": {
    "max-size": "100m"
  },
  "storage-driver": "overlay2",
  "storage-opts": [
    "overlay2.override_kernel_check=true"
  ],
  "insecure-registries": [
    "kubeflow-registry.default.svc.cluster.local:30000"
  ]
}

$ systemctl restart docker

 

$ curl kubeflow-registry.default.svc.cluster.local:30000/v2/_catalog

{"repositories":[]} 

 

 

install kustomize 

https://github.com/kubernetes-sigs/kustomize/blob/master/docs/INSTALL.md

 

$ curl -s "https://raw.githubusercontent.com/\ kubernetes-sigs/kustomize/master/hack/install_kustomize.sh" | bash

 

        {Version:kustomize/v3.5.4 GitCommit:3af514fa9f85430f0c1557c4a0291e62112ab026 BuildDate:2020-01-11T03:12:59                                                                                                                        Z GoOs:linux GoArch:amd64}
kustomize installed to current directory.

$ mv kustomize /usr/local/bin

 

 

Download kubeflow 

https://github.com/kubeflow/kfctl/releases/tag/v1.0.1/

 

$ tar -xvf kfctl_v1.0.1-0-gf3edb9b_linux.tar.gz

 

압축파일을 풀면 kfctl 파일이 나오는데, /usr/local/bin 으로 옮기자.

 

환경설정

export PATH=$PATH:"/usr/local/bin"

export KF_NAME=kfterry

export BASE_DIR=/root/kubeflow

export KF_DIR=${BASE_DIR}/${KF_NAME}

export CONFIG_URI="https://raw.githubusercontent.com/kubeflow/manifests/v1.0-branch/kfdef/kfctl_istio_dex.v1.0.2.yaml"

 

$ mkdir -p ${KF_DIR}

 

$ cd ${KF_DIR}

 

$ wget -O kfctl_istio_dex.yaml $CONFIG_URI

 

$ export CONFIG_FILE=${KF_DIR}/kfctl_istio_dex.yaml

 

$ kfctl apply -V -f ${CONFIG_URI}

 

$ kubectl get pod -n kubeflow

NAME                                                           READY   STATUS      RESTARTS   AGE
admission-webhook-bootstrap-stateful-set-0                     1/1     Running     0          10m
admission-webhook-deployment-569558c8b6-tbp75                  1/1     Running     0          2m25s
application-controller-stateful-set-0                          1/1     Running     0          2m27s
argo-ui-7ffb9b6577-vw692                                       1/1     Running     0          10m
centraldashboard-659bd78c-p6pp5                                1/1     Running     0          2m24s
jupyter-web-app-deployment-679d5f5dc4-9cwrx                    1/1     Running     0          2m24s
katib-controller-7f58569f7d-xjwtt                              1/1     Running     1          10m
katib-db-manager-54b66f9f9d-w8rmf                              1/1     Running     0          2m24s
katib-mysql-dcf7dcbd5-8hdmn                                    1/1     Running     0          10m
katib-ui-6f97756598-z5dkj                                      1/1     Running     0          2m27s
kfserving-controller-manager-0                                 2/2     Running     1          10m
metacontroller-0                                               1/1     Running     0          10m
metadata-db-65fb5b695d-p7xjs                                   1/1     Running     0          10m
metadata-deployment-65ccddfd4c-hjrsr                           1/1     Running     0          10m
metadata-envoy-deployment-7754f56bff-859ck                     1/1     Running     0          10m
metadata-grpc-deployment-75f9888cbf-d77lc                      1/1     Running     6          10m
metadata-ui-7c85545947-lttcl                                   1/1     Running     0          2m25s
minio-69b4676bb7-7zwdr                                         1/1     Running     0          10m
ml-pipeline-5cddb75848-2km8g                                   1/1     Running     1          10m
ml-pipeline-ml-pipeline-visualizationserver-7f6fcb68c8-8bvmq   1/1     Running     0          10m
ml-pipeline-persistenceagent-6ff9fb86dc-d48z5                  1/1     Running     3          10m
ml-pipeline-scheduledworkflow-7f84b54646-rv64b                 1/1     Running     0          2m27s
ml-pipeline-ui-6758f58868-nxn6j                                1/1     Running     0          2m25s
ml-pipeline-viewer-controller-deployment-745dbb444d-cnhx7      1/1     Running     0          2m25s
mysql-6bcbfbb6b8-w5qjv                                         1/1     Running     0          2m27s
notebook-controller-deployment-5c55f5845b-mcbqs                1/1     Running     0          10m
profiles-deployment-c775584c7-hd2dk                            2/2     Running     0          10m
pytorch-operator-cf8c5c497-sc858                               1/1     Running     4          10m
seldon-controller-manager-6b4b969447-zdhph                     1/1     Running     2          10m
spark-operatorcrd-cleanup-pz6jh                                0/2     Completed   0          10m
spark-operatorsparkoperator-76dd5f5688-886k2                   1/1     Running     0          10m
spartakus-volunteer-5dc96f4447-z6d4x                           1/1     Running     0          10m
tensorboard-5f685f9d79-wrmg8                                   1/1     Running     0          2m27s
tf-job-operator-5fb85c5fb7-rpc5x                               1/1     Running     0          2m25s
workflow-controller-689d6c8846-c7q9r                           1/1     Running     0          10m

$ kubectl get pv

NAME                                       CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                     STORAGECLASS   REASON   AGE
pvc-03be65bc-26a1-411c-9a83-6c63395c25a6   20Gi       RWO            Delete           Bound    kubeflow/minio-pv-claim   nfs                     116s
pvc-50256b14-f9ac-4b73-bea5-b9482a81f164   10Gi       RWO            Delete           Bound    kubeflow/katib-mysql      nfs                     116s
pvc-51bb8698-7fc7-4b6a-93f6-3e57aaad83d0   10Gi       RWO            Delete           Bound    kubeflow/metadata-mysql   nfs                     6m46s
pvc-b4c1de43-fb75-4d8e-b999-817f65752a2d   20Gi       RWO            Delete           Bound    kubeflow/mysql-pv-claim   nfs                     116s

$ kubectl get pod -n istio-system

NAME                                                         READY   STATUS              RESTARTS   AGE
authservice-0                                                1/1     Running             1          38m
cluster-local-gateway-f4967d447-dsfwh                        0/1     Running             0          35s
istio-citadel-79b5b568b-w4bwt                                1/1     Running             1          12m
istio-galley-756f5f45c4-79gbl                                1/1     Running             1          39m
istio-ingressgateway-77f74c944c-f47cg                        0/1     ContainerCreating   0          35s
istio-nodeagent-wf9sc                                        1/1     Running             1          39m
istio-pilot-55f7f6f6df-fkcsc                                 0/2     ContainerCreating   0          35s
istio-policy-76dbd68445-jsb6r                                0/2     ContainerCreating   0          35s
istio-security-post-install-release-1.3-latest-daily-gtsw8   0/1     Completed           0          39m
istio-sidecar-injector-5d9f474dcb-d7sgk                      1/1     Running             1          39m
istio-telemetry-697c8fd794-dq2tw                             0/2     ContainerCreating   0          35s
prometheus-b845cc6fc-jkzhv                                   1/1     Running             1          39m

안뜨는 pod들이 있어 정상 접속이 안될때,

 

$ vi /etc/kubernetes/manifests/kube-apiserver.yaml

    - --service-account-issuer=kubernetes.default.svc
    - --service-account-signing-key-file=/etc/kubernetes/pki/sa.key

 

$ kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80

 

Kubeflow Dashboard

PC에서 대쉬보드에 접근하기 위해서는 노드포트로 접속해야 한다.

(Dex버전의 경우, admin@kubeflow.org / 12341234 로그인)

 

$ kubectl get service istio-ingressgateway -n istio-system

NAME                   TYPE           CLUSTER-IP     EXTERNAL-IP   PORT(S)                                                                                                                                      AGE
istio-ingressgateway   LoadBalancer   10.107.21.45        15020:31501/TCP,80:31717/TCP,443:31682/TCP,31400:32025/TCP,15029:32270/TCP,15030:31088/TCP,15031:30360/TCP,15032:30391/TCP,15443:30494/TCP   3d9h


PC의 웹브라우저에서 http://{서버IP}:31717 로 접근해보자. 

 

kubeflow networking 구조

 

istio-ingressgateway 를 통해서 Inbound network 트래픽이 유입되고,

gateway와 virtualservice를 통해서 개별 pod로 매핑이 된다.

 

kubeflow 기본 설치시, 세개의 gateway가 설정되어 있고, 

$ kubectl get gateway --all-namespaces

NAMESPACE         NAME                      AGE 
knative-serving   cluster-local-gateway     18h 
knative-serving   knative-ingress-gateway   18h 
kubeflow          kubeflow-gateway          18h

이 중 kubeflow-gateway로 virtaul service들이 매핑되어 있다.

$ kubectl get virtualservice --all-namespaces

NAMESPACE   NAME                            GATEWAYS                      HOSTS                      AGE 
kubeflow    argo-ui                         [kubeflow-gateway]            [*]                        18h 
kubeflow    centraldashboard                [kubeflow-gateway]            [*]                        18h 
kubeflow    google-api-vs                                                 [www.googleapis.com]       18h 
kubeflow    google-storage-api-vs                                         [storage.googleapis.com]   18h 
kubeflow    grafana-vs                      [kubeflow-gateway]            [*]                        18h 
kubeflow    jupyter-web-app                 [kubeflow-gateway]            [*]                        18h 
kubeflow    katib-ui                        [kubeflow-gateway]            [*]                        18h 
kubeflow    kfam                            [kubeflow-gateway]            [*]                        18h 
kubeflow    metadata-grpc                   [kubeflow-gateway]            [*]                        18h 
kubeflow    metadata-ui                     [kubeflow-gateway]            [*]                        18h 
kubeflow    ml-pipeline-tensorboard-ui      [kubeflow-gateway]            [*]                        18h 
kubeflow    ml-pipeline-ui                  [kubeflow-gateway]            [*]                        18h 
kubeflow    tensorboard                     [kubeflow-gateway]            [*]                        18h 
terry       notebook-terry-customnotebook   [kubeflow/kubeflow-gateway]   [*]                        14h 
terry       notebook-terry-notebook         [kubeflow/kubeflow-gateway]   [*]                        16h

kubeflow-gateway는 istio ingressgateway로 유입되는 모든 hsots('*')의 80포트에 대해 처리한다. 

$ kubectl get gateway kubeflow-gateway -n kubeflow -o yaml

apiVersion: networking.istio.io/v1alpha3 
kind: Gateway 
metadata: 
  name: kubeflow-gateway 
  namespace: kubeflow 
spec: 
  selector: 
    istio: ingressgateway 
  servers: 
  - hosts: 
    - '*' 
    port: 
      name: http 
      number: 80 
      protocol: HTTP 

kubeflow의 각 구성요소들은 제각각의 prefix를 통해 구분되어진다.

$ kubectl get virtualservice katib-ui -n kubeflow -o yaml

apiVersion: networking.istio.io/v1alpha3 
kind: VirtualService 
metadata: 
  name: katib-ui 
  namespace: kubeflow 
spec: 
  gateways: 
  - kubeflow-gateway 
  hosts: 
  - '*' 
  http: 
  - match: 
    - uri: 
        prefix: /katib/ 
    rewrite: 
      uri: /katib/ 
    route: 
    - destination: 
        host: katib-ui.kubeflow.svc.cluster.local 
        port: 
          number: 80 

 

Notebook Servers

https://www.kubeflow.org/docs/notebooks/setup/

 

 

kubeflow 대쉬보드를 통해 쥬피터 노트북 서버를 생성할 수 있다.

 

kubeflow fairing과 pipeline 라이브러리까지 포함된 커스텀 이미지를 통해 생성해보자.

!pip3 install kfp --upgrade --user

 

주피터 노트북에서 샘플 코드를 실행해 본다.

from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)

import tensorflow as tf

x = tf.placeholder(tf.float32, [None, 784])

W = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))

y = tf.nn.softmax(tf.matmul(x, W) + b)

y_ = tf.placeholder(tf.float32, [None, 10])
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))

train_step = tf.train.GradientDescentOptimizer(0.05).minimize(cross_entropy)

sess = tf.InteractiveSession()
tf.global_variables_initializer().run()

for _ in range(1000):
  batch_xs, batch_ys = mnist.train.next_batch(100)
  sess.run(train_step, feed_dict={x: batch_xs, y_: batch_ys})

correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
print("Accuracy: ", sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))

 

주피터 노트북 터미널

notebook server container에 hosts 파일 설정

$ vi /etc/hosts

192.168.19.134 kubeflow-registry.default.svc.cluster.localkubeflow-registry.default.svc.cluster.local

 

kubeflow fairing upgrade

$ !pip install kubeflow-fairing --upgrade --user

 

Fairing

https://www.kubeflow.org/docs/fairing/install-fairing/#set-up-kubeflow-fairing-in-a-hosted-jupyter-notebook

 

주피터 노트북에서 아래의 샘플 fairing code 실행.

tensorflow:1.14.0-py3 이미지로 도커빌드 후, registry에 push 되고, 

HOSTNAME 이라는 os 환경변수를 출력하는 kubernetes job이 배포된다.

job에 의해 pod가 실행되어 결과를 출력하고 사라진다.

import os
import tensorflow as tf
from kubeflow import fairing

DOCKER_REGISTRY = 'kubeflow-registry.default.svc.cluster.local:30000'

fairing.config.set_builder(
    'append',
    base_image='tensorflow/tensorflow:1.14.0-py3',
    registry=DOCKER_REGISTRY, push=True)

fairing.config.set_deployer('job')

def train():
        hostname = tf.constant(os.environ['HOSTNAME'])
        sess = tf.Session()
        print('Hostname: ', sess.run(hostname).decode('utf-8'))
        
if __name__ == '__main__':
        remote_train = fairing.config.fn(train)
        remote_train()

 

Katib

 

kubeflow에서 제공하는 하이퍼파라미터 도구

 

HP 샘플 수행 https://github.com/mojokb/handson-kubeflow/blob/master/katib/mnist_experiment_random.yaml

KFServing

$ git clone https://github.com/kubeflow/kfserving.git

Cloning into 'kfserving'...
remote: Enumerating objects: 5, done.
remote: Counting objects: 100% (5/5), done.
remote: Compressing objects: 100% (5/5), done.
remote: Total 35452 (delta 1), reused 2 (delta 0), pack-reused 35447
Receiving objects: 100% (35452/35452), 83.49 MiB | 11.95 MiB/s, done.
Resolving deltas: 100% (12541/12541), done.
Checking out files: 100% (18841/18841), done.

$ cd kfserving

$ TAG=v0.3.0

$ kubectl apply -f ./install/$TAG/kfserving.yaml

namespace/kfserving-system created
customresourcedefinition.apiextensions.k8s.io/inferenceservices.serving.kubeflow.org configured
clusterrole.rbac.authorization.k8s.io/kfserving-proxy-role configured
clusterrole.rbac.authorization.k8s.io/manager-role configured
clusterrolebinding.rbac.authorization.k8s.io/kfserving-proxy-rolebinding configured
clusterrolebinding.rbac.authorization.k8s.io/manager-rolebinding configured
configmap/inferenceservice-config created
secret/kfserving-webhook-server-secret created
service/kfserving-controller-manager-metrics-service created
service/kfserving-controller-manager-service created
statefulset.apps/kfserving-controller-manager created

 

Jupyter Notebook에 KFServing SDK 설치

$ pip install kfserving --user

$ pip install kfserving --upgrade --user

 

$ /usr/bin/python3 -m pip install --upgrade pip

$ pip freeze | grep kf

KFServing 샘플 실행

https://github.com/kubeflow/kfserving/blob/master/docs/samples/client/kfserving_sdk_sample.ipynb

 

 

 

Pipeline

 

end-to-end ML 워크플로우를 만들고 배포할 수 있는 플랫폼. (argo 워크플로우 사용)

 

Component : ML 워크플로우를 한 단계 수행하는 코드 집합

Graph : 파이프라인의 런타임 실행을 나타냄

Run : 파이프라인의 단일 실행 단위

Recurring Run : 파이프라인을 주기적으로 실행하는 Run

Run Trigger : periodic (매 30분 마다 실행), cron

Step : 파이프라인에서 하나의 컴포넌트의 실행

Experiment : 파이프라인을 실행하는 워크스페이스. 파이프라인 실행의 논리적 그룹.

Output Artifact : 파이프라인 컴포넌트의 출력. 파이프라인 구성요소가 어떻게 작동하는지 이해할 수 있음.

 

SDK로 pipeline 생성하기

 

containerop.py 파일 작성

import kfp.dsl as dsl

def echo_op():
return dsl.ContainerOp(
name='echo',
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=['echo "Hello World"']
)

@dsl.pipeline(
name='ContainerOp pipeline',
description='ContainerOp'
)
def hello_world_pipeline():
echo_task = echo_op()


if __name__ == "__main__":
import kfp.compiler as compiler
compiler.Compiler().compile(hello_world_pipeline, 'containerop.pipeline.tar.gz')

containerop.pipeline.tar.gz 파일이 생성된다.

 

Upload pipeline

Create Run

 

Hands-on Kubeflow

https://github.com/mojokb/handson-kubeflow 

https://github.com/mojokb/kubeflow-book 

 

Fashion Mnist 를 tensorflow keras를 통해 Jupyter notebook에서 실행해 본다.

 

* Keras : 거의 모든 종류의 딥러닝 모델을 간편하게 만들고 훈련시킬 수 있는 파이썬을 위한 딥러닝 프레임워크

* Mnist

* Fashion Mnist 

 

fashin-mnist 실행

import tensorflow as tf

class MyFashionMnist(object):
  def train(self):

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
      tf.keras.layers.Flatten(input_shape=(28, 28)),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dropout(0.2),
      tf.keras.layers.Dense(10, activation='softmax')
    ])
    model.summary()
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train, y_train, epochs=10)

    model.evaluate(x_test,  y_test, verbose=2)

if __name__ == '__main__':
    local_train = MyFashionMnist()
    local_train.train()

Fashion Mnist를 Fairing job으로 변경

: Fashion Mnist 코드를 페어링 코드로 감싸서 이미지 Build와 Push 그리고, k8s cluster까지 job을 던지는 것이 목표

https://github.com/mojokb/kubeflow-book/blob/master/dist-fashion-mnist-fairing-.ipynb

import tensorflow as tf
import os

class MyFashionMnist(object):
  def train(self):

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
      tf.keras.layers.Flatten(input_shape=(28, 28)),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dropout(0.2),
      tf.keras.layers.Dense(10, activation='softmax')
    ])
    model.summary()
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train, y_train, epochs=10)

    model.evaluate(x_test,  y_test, verbose=2)

if __name__ == '__main__':
    if os.getenv('FAIRING_RUNTIME', None) is None:
        from kubeflow import fairing
        from kubeflow.fairing.kubernetes import utils as k8s_utils

        DOCKER_REGISTRY = 'kubeflow-registry.default.svc.cluster.local:30000'
        fairing.config.set_builder(
            'append',
            image_name='fairing-job',
            base_image='brightfly/kubeflow-jupyter-lab:tf2.0-cpu',
            registry=DOCKER_REGISTRY, 
            push=True)
        # cpu 1, memory 3GiB
        fairing.config.set_deployer('job',
                                    namespace='terry',
                                    pod_spec_mutators=[
                                        k8s_utils.get_resource_mutator(cpu=1,
                                                                       memory=3)]
         
                                   )
        fairing.config.run()
    else:
        remote_train = MyFashionMnist()
        remote_train.train()

Fairing job pod fairing-job-ffkzf-4k72s가 떠서 수행되고 사라진다.

$ kubectl get all -n terry

NAME                          READY   STATUS    RESTARTS   AGE
pod/customnotebook-0          2/2     Running   2          16h
pod/fairing-job-ffkzf-4k72s   1/1     Running   0          55s

NAME                     TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)   AGE
service/customnotebook   ClusterIP   10.111.82.243           80/TCP    16h

NAME                              READY   AGE
statefulset.apps/customnotebook   1/1     16h

NAME                          COMPLETIONS   DURATION   AGE
job.batch/fairing-job-ffkzf   0/1           55s        56s

 

Katib를 이용한 Fashion Mnist HP tunning

일부 내용 수정 (로거함수를 keras callback 함수 적용)

https://github.com/mojokb/kubeflow-book/blob/master/fashion-mnist-katib.ipynb

import tensorflow as tf
import os
import argparse
from tensorflow.python.keras.callbacks import Callback



class MyFashionMnist(object):
  def train(self):
    
    # 입력 값을 받게 추가합니다.
    parser = argparse.ArgumentParser()
    parser.add_argument('--learning_rate', required=False, type=float, default=0.01)
    parser.add_argument('--dropout_rate', required=False, type=float, default=0.2)
    parser.add_argument('--opt', required=False, type=int, default=1)    
    args = parser.parse_args()    
    
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28,28)),
  tf.keras.layers.Dense(128,activation='relu'),
  tf.keras.layers.Dropout(args.dropout_rate),
  tf.keras.layers.Dense(10, activation='softmax')
])

    model.summary()
    
    sgd = tf.keras.optimizers.SGD(lr=args.learning_rate)
    adam = tf.keras.optimizers.Adam(lr=args.learning_rate)

    optimizers= [sgd, adam]
    model.compile(optimizers=optimizers[args.opt],
                  loss='sparse_categorical_crossentropy',
                  metrics=['acc'])

    model.fit(x_train, y_train,
              verbose=0,
              validation_data=(x_test, y_test),
              epochs=5,
              callbacks=[KatibMetricLog()])

    model.evaluate(x_test,  y_test, verbose=2)

class KatibMetricLog(Callback):
    def on_batch_end(self, batch, logs={}):
        print("batch=" + str(batch),
              "accuracy=" + str(logs.get('acc')),
              "loss=" + str(logs.get('loss')))
    def on_epoch_begin(self, epoch, logs={}):
        print("epoch " + str(epoch) + ":")
    
    def on_epoch_end(self, epoch, logs={}):
        print("Validation-accuracy=" + str(logs.get('val_acc')),
              "Validation-loss=" + str(logs.get('val_loss')))
        return

if __name__ == '__main__':
    if os.getenv('FAIRING_RUNTIME', None) is None:
        from kubeflow import fairing
        from kubeflow.fairing.kubernetes import utils as k8s_utils

        DOCKER_REGISTRY = 'kubeflow-registry.default.svc.cluster.local:30000'
        fairing.config.set_builder(
            'append',
            image_name='fairing-job',
            base_image='brightfly/kubeflow-jupyter-lab:tf2.0-cpu',
            registry=DOCKER_REGISTRY, 
            push=True)

        fairing.config.set_deployer('job',
                                    namespace='terry',
                                    pod_spec_mutators=[
                                        k8s_utils.get_resource_mutator(cpu=2, memory=5)]
                                   )
        fairing.config.run()
    else:
        remote_train = MyFashionMnist()
        remote_train.train()

 

Katib Experiment CRD 생성

 

https://github.com/mojokb/kubeflow-book/blob/master/fashion-katib-random-v2.yaml

방금 전 생성된 23CEC3F1 이미지 정보 입력.

"/app/fashion-mnist-katib.py 은 좀 전에 수행했던 Fairing 수행 py파일명.

Katib UI에서 submit 수행.

apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
  namespace: terry
  labels:
    controller-tools.k8s.io: "1.0"
  name: fashion-mnist-cpu-experiment-v2-1
spec:
  parallelTrialCount: 5 
  maxTrialCount: 30
  maxFailedTrialCount: 3
  objective:
    type: maximize
    goal: 0.99  
    objectiveMetricName: Validation-accuracy
    additionalMetricNames:
      - accuracy
      - loss
      - Validation-loss
  algorithm:
    algorithmName: random
  trialTemplate:
    goTemplate:
        rawTemplate: |-
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: {{.Trial}}
            namespace: {{.NameSpace}}
          spec:
            template:
              spec:
                containers:
                - name: {{.Trial}}
                  image: kubeflow-registry.default.svc.cluster.local:30000/fairing-job:23CEC3F1
                  command:
                  - "python"
                  - "/app/fashion-mnist-katib.py
                  {{- with .HyperParameters}}
                  {{- range .}}
                  - "{{.Name}}={{.Value}}"
                  {{- end}}
                  {{- end}}
                restartPolicy: Never
  parameters:
    - name: --learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.0005"
        max: "0.0015"
    - name: --dropout_rate
      parameterType: double
      feasibleSpace:
        min: "0.1"
        max: "0.9"
    - name: --layer
      parameterType: int
      feasibleSpace:
        min: "1"
        max: "5"        
    - name: --epoch
      parameterType: int
      feasibleSpace:
        min: "5"
        max: "15"             
    - name: --act
      parameterType: categorical
      feasibleSpace:
        list: # relu, sigmoid, softmax, tanh
        - "relu"
        - "sigmoid" 
        - "softmax"         
        - "tanh"

 

$ kubectl get all -n terry

NAME                                                            READY   STATUS    RESTARTS   AGE
pod/customnotebook-0                                            2/2     Running   2          20h
pod/fashion-mnist-cpu-experiment-v2-1-2jrtrw7v-r47rm            2/2     Running   0          82s
pod/fashion-mnist-cpu-experiment-v2-1-f6cxb6ws-tg44t            2/2     Running   0          81s
pod/fashion-mnist-cpu-experiment-v2-1-kq7nr9gx-n89mt            2/2     Running   0          80s
pod/fashion-mnist-cpu-experiment-v2-1-lbs27rtn-4vrs7            2/2     Running   0          82s
pod/fashion-mnist-cpu-experiment-v2-1-random-6798568796-7hkvs   1/1     Running   0          2m2s
pod/fashion-mnist-cpu-experiment-v2-1-tcdpdmbx-pzp96            2/2     Running   0          80s


NAME                                               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE
service/customnotebook                             ClusterIP   10.111.82.243           80/TCP     20h
service/fashion-mnist-cpu-experiment-v2-1-random   ClusterIP   10.96.49.0              6789/TCP   2m2s


NAME                                                       READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/fashion-mnist-cpu-experiment-v2-1-random   1/1     1            1           2m2s

NAME                                                                  DESIRED   CURRENT   READY   AGE
replicaset.apps/fashion-mnist-cpu-experiment-v2-1-random-6798568796   1         1         1       2m2s

NAME                              READY   AGE
statefulset.apps/customnotebook   1/1     20h


NAME                                                   COMPLETIONS   DURATION   AGE
job.batch/fashion-mnist-cpu-experiment-v2-1-2jrtrw7v   0/1           82s        82s
job.batch/fashion-mnist-cpu-experiment-v2-1-f6cxb6ws   0/1           81s        81s
job.batch/fashion-mnist-cpu-experiment-v2-1-kq7nr9gx   0/1           81s        81s
job.batch/fashion-mnist-cpu-experiment-v2-1-lbs27rtn   0/1           82s        82s
job.batch/fashion-mnist-cpu-experiment-v2-1-tcdpdmbx   0/1           81s        81s


NAME                                                        TYPE      STATUS   REQUESTED   ASSIGNED   AGE
suggestion.kubeflow.org/fashion-mnist-cpu-experiment-v2-1   Running   True     5           5          2m2s

NAME                                                        STATUS    AGE
experiment.kubeflow.org/fashion-mnist-cpu-experiment-v2-1   Running   2m3s

NAME                                                            TYPE      STATUS   AGE
trial.kubeflow.org/fashion-mnist-cpu-experiment-v2-1-2jrtrw7v   Running   True     83s
trial.kubeflow.org/fashion-mnist-cpu-experiment-v2-1-f6cxb6ws   Running   True     83s
trial.kubeflow.org/fashion-mnist-cpu-experiment-v2-1-kq7nr9gx   Running   True     83s
trial.kubeflow.org/fashion-mnist-cpu-experiment-v2-1-lbs27rtn   Running   True     83s
trial.kubeflow.org/fashion-mnist-cpu-experiment-v2-1-tcdpdmbx   Running   True     83s

. Validation Accuracy가 높을수록 dropout_rate가 상대적으로 적은 것을 확인할 수 있다.

. 탐색한 4개의 함수 중 tanh 와 sigmoid가 상대적으로 accuracy가 높은 수치를 보여주고 있다.

. 50회 trial 에서 이정도 정보를 알수 있으며, 훨씬 많은 수의 trial 로 진행한다면 최적의 하이퍼파라미터를 찾을수 있는 확률이 높아질 것이다.

 

 

Inferrencing Model Server

기존 모델 코드에 학습한 모델을 저장하는 코드 추가

pvc_name="workspace-customnotebook" 는 노트북 서버에 할당된 pvc를 붙인다. 

import tensorflow as tf
import os
import argparse
from tensorflow.python.keras.callbacks import Callback



class MyFashionMnist(object):
  def train(self):
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--learning_rate', required=False, type=float, default=0.001)
    parser.add_argument('--dropout_rate', required=False, type=float, default=0.3)
    parser.add_argument('--opt', required=False, type=int, default=1)    
    parser.add_argument('--checkpoint_dir', required=False, default='/reuslt/training_checkpoints')
    parser.add_argument('--saved_model_dir', required=False, default='/result/saved_model/001')        
    parser.add_argument('--tensorboard_log', required=False, default='/result/log')     
    args = parser.parse_args()    
    
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.023

    model = tf.keras.models.Sequential([
      tf.keras.layers.Flatten(input_shape=(28, 28)),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dropout(args.dropout_rate),
      tf.keras.layers.Dense(10, activation='softmax')
    ])
    
    model.summary()
    
    sgd = tf.keras.optimizers.SGD(lr=args.learning_rate)
    adam = tf.keras.optimizers.Adam(lr=args.learning_rate)
    
    optimizers= [sgd, adam]
    model.compile(optimizer=optimizers[args.opt],
                  loss='sparse_categorical_crossentropy',
                  metrics=['acc'])
    
    # 체크포인트를 저장할 체크포인트 디렉터리를 지정합니다.
    checkpoint_dir = args.checkpoint_dir
    # 체크포인트 파일의 이름
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")        

    model.fit(x_train, y_train,
              verbose=0,
              validation_data=(x_test, y_test),
              epochs=5,
              callbacks=[KatibMetricLog(),
                        tf.keras.callbacks.TensorBoard(log_dir=args.tensorboard_log),
                        tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                               save_weights_only=True)
                        ])
    path = args.saved_model_dir        
    model.save(path, save_format='tf')

class KatibMetricLog(Callback):
    def on_batch_end(self, batch, logs={}):
        print("batch=" + str(batch),
              "accuracy=" + str(logs.get('acc')),
              "loss=" + str(logs.get('loss')))
    def on_epoch_begin(self, epoch, logs={}):
        print("epoch " + str(epoch) + ":")
    
    def on_epoch_end(self, epoch, logs={}):
        print("Validation-accuracy=" + str(logs.get('val_acc')),
              "Validation-loss=" + str(logs.get('val_loss')))
        return

if __name__ == '__main__':
    if os.getenv('FAIRING_RUNTIME', None) is None:
        from kubeflow import fairing
        from kubeflow.fairing.kubernetes import utils as k8s_utils

        DOCKER_REGISTRY = 'kubeflow-registry.default.svc.cluster.local:30000'
        fairing.config.set_builder(
            'append',
            image_name='fairing-job',
            base_image='brightfly/kubeflow-jupyter-lab:tf2.0-cpu',
            registry=DOCKER_REGISTRY, 
            push=True)
        # cpu 2, memory 5GiB
        fairing.config.set_deployer('job',
                                    namespace='terry',
                                    pod_spec_mutators=[
                                        k8s_utils.mounting_pvc(pvc_name="workspace-customnotebook", 
                                                              pvc_mount_path="/result"),
                                        k8s_utils.get_resource_mutator(cpu=2,
                                                                       memory=5)]
         
                                   )
        fairing.config.run()
    else:
        remote_train = MyFashionMnist()
        remote_train.train()

노트북의 볼륨이 fairing-job pod의 PVC로 마운트 되었기 때문에, 학습이 진행됨에 따라 로그, 모델 등이 저장되는 것을 확인할 수 있다.

 

Pipeline : e2e ML Workflow 

 

$ kubectl label namespace kubeflow serving.kubeflow.org/inferenceservice=enabled

$ kubectl get namespace kubeflow --show-labels

NAME       STATUS   AGE   LABELS
kubeflow   Active   28h   control-plane=kubeflow,katib-metricscollector-injection=enabled,serving.kubeflow.org/inferenceservice=enabled

파이프라인에 볼륨 붙이기

https://github.com/mojokb/kubeflow-book/blob/master/pipeline_storage.ipynb

 

SDK로 pipeline 생성하기

 

attatch_pvc_pipeline.py 파일 작성

import kfp.dsl as dsl

@dsl.pipeline(
    name='AttachStorage',
    description='Create a pvc, attach it to pipeline.'
)
def attatch_pvc_pipeline():
    
    vop = dsl.VolumeOp(
        name="volume_creation",
        resource_name="vol-a",
        storage_class="nfs",
        modes=dsl.VOLUME_MODE_RWM,
        size="1Gi"
    )
    
    cop1 = dsl.ContainerOp(
        name='HelloKubeflow',
        image='alpine',
        command=['sh', '-c'],
        arguments=['echo "hello Kubeflow" > /mnt/content.txt'],
        pvolumes={"/mnt": vop.volume}
    )
    
    cop2 = dsl.ContainerOp(
        name='cat-content',
        image='alpine',
        command=['cat'],
        arguments=['/mnt/content.txt'],
        pvolumes={"/mnt": vop.volume}
    )

    cop1.after(vop)
    cop2.after(cop1)
        
     
if __name__ == "__main__":
import kfp.compiler as compiler
compiler.Compiler().compile(attatch_pvc_pipeline, 'attatch_pvc.pipeline.tar.gz')

attatch_pvc.pipeline.tar.gz 파일이 생성된다.

 

Upload pipeline

Create Run

 

create kubeflow-pvc

$ kubectl create -f kubeflow-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  annotations:
    pv.kubernetes.io/bind-completed: "yes"
    pv.kubernetes.io/bound-by-controller: "yes"
    volume.beta.kubernetes.io/storage-provisioner: cluster.local/nfs-client-provisioner
  finalizers:
  - kubernetes.io/pvc-protection
  name: kubeflow-pvc
  namespace: kubeflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  storageClassName: nfs
  volumeMode: Filesystem
  volumeName: pvc-9fda4d4e-94e3-41ec-9e66-a1346c359f59

$ kubectl get pvc -n kubeflow kubeflow-pvc

NAME                        STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE  
kubeflow-pvc                Bound    pvc-9fda4d4e-94e3-41ec-9e66-a1346c359f59   5Gi        RWX            nfs            91s

 

fashion mnist 랜덤이미지를 저장하는 이미지 Build 

import tensorflow as tf
import os
import numpy as np
from PIL import Image
from datetime import datetime
import random

class StoreImage(object):
  def save(self):
    (train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()
    
    save_path = "/mnt/data/"
    folder_name = save_path + str(datetime.today().strftime("%Y%m%d%H%M"))
    
    # make min folder
    try:
        if not(os.path.isdir(folder_name)):
            os.makedirs(os.path.join(folder_name))
    except OSError as e:
        if e.errno != errno.EEXIST:
            print("Failed to create directory!!!!!")
            raise
            
    # generate 10 ranom image (0~9999)            
    for i in range(10):
        random_num = random.randint(0, 9999)
        file_name = str(test_labels[random_num]) + "_" + str(i) + ".jpg"
        im = Image.fromarray(test_images[random_num])
        im.save(folder_name + "/" +  file_name)
    
    for (path, dir, files) in os.walk(save_path):
        print(path)
        for filename in files:
            print("%s/%s" % (path, filename))
    
if __name__ == '__main__':
    if os.getenv('FAIRING_RUNTIME', None) is None:
        from kubeflow import fairing
        from kubeflow.fairing.kubernetes import utils as k8s_utils

        DOCKER_REGISTRY = 'kubeflow-registry.default.svc.cluster.local:30000'
        fairing.config.set_builder(
            'append',
            image_name='store-fashion-minst',
            base_image='brightfly/kubeflow-jupyter-lab:tf2.0-cpu-pil',
            registry=DOCKER_REGISTRY, 
            push=True)
        # cpu 2, memory 5GiB
        fairing.config.set_deployer('job',
            namespace='terry',
            pod_spec_mutators=[
                k8s_utils.get_resource_mutator(cpu=0.5,
                                               memory=0.5)])
        fairing.config.run()
    else:
        remote = StoreImage()
        remote.save()

편의를 위해 brightfly/store-fahsionm-image:latest 이미지 사용

 

kubeflow-pvc를 /mnt/data로 마운트 해서 랜덤이미지를 저장하는 파이프라인 생성

import kfp
from kfp import dsl

@dsl.pipeline(
    name='storeImage',
    description='Save fashion image'
)
def store_image():
    
    kubeflow_pvc = dsl.PipelineVolume(pvc="kubeflow-pvc", name="kubeflow-pvc")
    
    cop1 = dsl.ContainerOp(
        name='StoreFashionMinst',
        image='brightfly/store-fahsionm-image:latest',
        command=['python', '/app/save_random_image_from_dataset.py'],
        pvolumes={"/mnt/data" : kubeflow_pvc}
    )

if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(pipeline_func=store_image,  
                                               arguments={})

 

Training부터 Serving까지 Pipeline 생성하기

 

1) Fashion Mnist 학습하고 모델 생성

2) 학습모델이 정상적으로 작성되었는지 확인

3) KFServing을 이용하여 Inferenceservice 등록

4) Web UI를 통해 Inferenceservice 수행 

 

Jupyter Notebook에서 수행

import tensorflow as tf
import os
import argparse
from tensorflow.python.keras.callbacks import Callback


class MyFashionMnist(object):
  def train(self):
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--learning_rate', required=False, type=float, default=0.001)
    parser.add_argument('--dropout_rate', required=False, type=float, default=0.3)
    parser.add_argument('--epoch', required=False, type=int, default=5)    
    parser.add_argument('--act', required=False, type=str, default='relu')        
    parser.add_argument('--layer', required=False, type=int, default=1)      
    parser.add_argument('--model_version', required=False, type=str, default='0001')    
    parser.add_argument('--checkpoint_dir', required=False, default='/reuslt/training_checkpoints')
    parser.add_argument('--saved_model_dir', required=False, default='/result/saved_model')        
    parser.add_argument('--tensorboard_log', required=False, default='/result/log')     
    args = parser.parse_args()    
    
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Flatten(input_shape=(28, 28)))
    
    for i in range(int(args.layer)):    
        model.add(tf.keras.layers.Dense(128, activation=args.act))
        if(i > 2) :
            model.add(tf.keras.layers.Dropout(args.dropout_rate))
        
    model.add(tf.keras.layers.Dense(10, activation='softmax'))
    
    model.compile(optimizer=tf.keras.optimizers.Adam(lr=args.learning_rate),
                  loss='sparse_categorical_crossentropy',
                  metrics=['acc'])
    
    model.summary()    
    
    # 체크포인트를 저장할 체크포인트 디렉터리를 지정합니다.
    checkpoint_dir = args.checkpoint_dir
    # 체크포인트 파일의 이름
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")        

    model.fit(x_train, y_train,
              verbose=0,
              validation_data=(x_test, y_test),
              epochs=args.epoch,
              callbacks=[KatibMetricLog(),
                        tf.keras.callbacks.TensorBoard(log_dir=args.tensorboard_log),
                        tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                               save_weights_only=True)
                        ])
    
    path = args.saved_model_dir + "/" + args.model_version        
    model.save(path, save_format='tf')

class KatibMetricLog(Callback):
    def on_batch_end(self, batch, logs={}):
        print("batch=" + str(batch),
              "accuracy=" + str(logs.get('acc')),
              "loss=" + str(logs.get('loss')))
    def on_epoch_begin(self, epoch, logs={}):
        print("epoch " + str(epoch) + ":")
    
    def on_epoch_end(self, epoch, logs={}):
        print("Validation-accuracy=" + str(logs.get('val_acc')),
              "Validation-loss=" + str(logs.get('val_loss')))
        return

if __name__ == '__main__':
    if os.getenv('FAIRING_RUNTIME', None) is None:
        from kubeflow import fairing
        from kubeflow.fairing.kubernetes import utils as k8s_utils

        DOCKER_REGISTRY = 'kubeflow-registry.default.svc.cluster.local:30000'
        fairing.config.set_builder(
            'append',
            image_name='katib-job',
            base_image='brightfly/kubeflow-jupyter-lab:tf2.0-cpu',
            registry=DOCKER_REGISTRY, 
            push=True)
        # cpu 2, memory 5GiB
        fairing.config.set_deployer('job',
                                    namespace='kubeflow',
                                    pod_spec_mutators=[
                                        k8s_utils.mounting_pvc(pvc_name="kubeflow-pvc", 
                                                              pvc_mount_path="/result"),
                                        k8s_utils.get_resource_mutator(cpu=1,
                                                                       memory=4)]
         
                                   )
        fairing.config.run()
    else:
        remote_train = MyFashionMnist()
        remote_train.train()

 

 

 

위에서 생성된 이미지 katib-job:A70BE781에 kubeflow-pvc를 /result로 마운트해서 파이프라인 생성

import kfp
import kfp.dsl as dsl
import kfp.onprem as onprem

def echo_op(text):
    return dsl.ContainerOp(
        name='echo',
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        arguments=['echo "$0"', text],
    )

@dsl.pipeline(
    name='FMnistPipeline',
    description='mnist '
)
def fmnist_pipeline(learning_rate, dropout_rate, epoch, act, 
                    layer, checkpoint_dir, saved_model_dir, tensorboard_log):
    
    exit_task = echo_op("Done!")
    
    with dsl.ExitHandler(exit_task): 
        
        kubeflow_pvc = dsl.PipelineVolume(pvc="kubeflow-pvc", name="kubeflow-pvc")
        
        mnist = dsl.ContainerOp(
            name='FMnist',
            image='kubeflow-registry.default.svc.cluster.local:30000/katib-job:A70BE781',
            command=['python', '/app/fmnist-save-model-renew.py'],
            arguments=[
                "--learning_rate", learning_rate,
                "--dropout_rate", dropout_rate,
                "--epoch", epoch,
                "--act", act,
                "--layer", layer,
                "--checkpoint_dir", checkpoint_dir,
                "--saved_model_dir", saved_model_dir,
                "--tensorboard_log", tensorboard_log
            ],
            pvolumes={"/result": kubeflow_pvc}
        )
        
        result = dsl.ContainerOp(
            name='list_list',
            image='library/bash:4.4.23',
            command=['ls', '-R', '/result'],
            pvolumes={"/result": mnist.pvolume}
        )

        result.after(mnist)
    

arguments = {'learning_rate': '0.001397',
             'dropout_rate': '0.18',
             'epoch' : '11',
             'act' : 'sigmoid',
             'layer': '2',
             'checkpoint_dir': '/reuslt/training_checkpoints',
             'saved_model_dir':'/result/saved_model',
             'tensorboard_log': '/result/log' 
            }
    
if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(pipeline_func=fmnist_pipeline, 
                                               arguments=arguments)

KFServing을 이용한 Inferenceservice 등록

Jupyter Notebook에 KFServing-fairing.ipynb 생성 

from kubernetes import client
from kfserving import KFServingClient
from kfserving import constants
from kfserving import utils
from kfserving import V1alpha2EndpointSpec
from kfserving import V1alpha2PredictorSpec
from kfserving import V1alpha2TensorflowSpec
from kfserving import V1alpha2InferenceServiceSpec
from kfserving import V1alpha2InferenceService
from kubernetes.client import V1ResourceRequirements
import os
import sys
import argparse
import logging
import time

'''
'''
class KFServing(object):
    def run(self):
        parser = argparse.ArgumentParser()
        parser.add_argument('--namespace', required=False, default='kubeflow')
        # pvc://${PVCNAME}/dir
        parser.add_argument('--storage_uri', required=False, default='/mnt/export')
        parser.add_argument('--name', required=False, default='kfserving-sample')        
        args = parser.parse_args()
        namespace = args.namespace
        serving_name =  args.name
        
        api_version = constants.KFSERVING_GROUP + '/' + constants.KFSERVING_VERSION
        default_endpoint_spec = V1alpha2EndpointSpec(
                                  predictor=V1alpha2PredictorSpec(
                                    tensorflow=V1alpha2TensorflowSpec(
                                      storage_uri=args.storage_uri,
                                      resources=V1ResourceRequirements(
                                          requests={'cpu':'100m','memory':'1Gi'},
                                          limits={'cpu':'100m', 'memory':'1Gi'}))))
        isvc = V1alpha2InferenceService(api_version=api_version,
                                  kind=constants.KFSERVING_KIND,
                                  metadata=client.V1ObjectMeta(
                                      name=serving_name, namespace=namespace),
                                  spec=V1alpha2InferenceServiceSpec(default=default_endpoint_spec))        
        
        KFServing = KFServingClient()
        KFServing.create(isvc)
        print('waiting 5 sec for Creating InferenceService')
        time.sleep(5)
        
        KFServing.get(serving_name, namespace=namespace, watch=True, timeout_seconds=300)
        
if __name__ == '__main__':
    if os.getenv('FAIRING_RUNTIME', None) is None:
        from kubeflow.fairing.builders.append.append import AppendBuilder
        from kubeflow.fairing.preprocessors.converted_notebook import \
            ConvertNotebookPreprocessor

        DOCKER_REGISTRY = 'kubeflow-registry.default.svc.cluster.local:30000'
        base_image = 'brightfly/kubeflow-kfserving:latest'
        image_name = 'kfserving'

        builder = AppendBuilder(
            registry=DOCKER_REGISTRY,
            image_name=image_name,
            base_image=base_image,
            push=True,
            preprocessor=ConvertNotebookPreprocessor(
                notebook_file="KFServing-fairing.ipynb"
            )
        )
        builder.build()
    else:
        serving = KFServing()
        serving.run()

katib-job:A70BE781이미지와 위에서 만든 kfserving 이미지를 넣어서 KFServing pipeline을 실행한다.

import kfp
import kfp.dsl as dsl
import kfp.onprem as onprem
import kfp.components as comp


    
def echo_op(text):
    return dsl.ContainerOp(
        name='echo',
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        arguments=['echo "$0"', text],
    )  

@dsl.pipeline(
    name='FMnistPipeline',
    description='mnist '
)
def fmnist_pipeline(learning_rate, dropout_rate, epoch, act, layer,  
                    checkpoint_dir, saved_model_dir, pvc_name, tensorboard_log,
                    name, model_version, namespace):
  
    exit_task = echo_op("Done!")
    
    with dsl.ExitHandler(exit_task): 

        kubeflow_pvc = dsl.PipelineVolume(pvc=str(pvc_name))
        
        mnist = dsl.ContainerOp(
            name='FMnist',
            image='kubeflow-registry.default.svc.cluster.local:30000/katib-job:A70BE781',
            command=['python', '/app/fmnist-save-model-renew.py'],
            arguments=[
                "--learning_rate", learning_rate,
                "--dropout_rate", dropout_rate,
                "--epoch", epoch,
                "--act", act,
                "--layer", layer,
                "--checkpoint_dir", checkpoint_dir,
                "--saved_model_dir", saved_model_dir,
                "--model_version", model_version,
                "--tensorboard_log", tensorboard_log
            ],
            pvolumes={"/result": kubeflow_pvc}
        )
        
        result = dsl.ContainerOp(
            name='list_list',
            image='library/bash:4.4.23',
            command=['ls', '-R', '/result'],
            pvolumes={"/result": mnist.pvolume}
        )
        
        kfserving = dsl.ContainerOp(
            name='kfserving',
            image='kubeflow-registry.default.svc.cluster.local:30000/kfserving:2CC7FFD3',
            command=['python', '/app/KFServing-fairing.py'],
            arguments=[
                "--namespace", namespace,
                "--storage_uri", "pvc://" +  str(pvc_name) + "/saved_model",
                "--name", name
            ]
        )        
                
        result.after(mnist)
        kfserving.after(result)
    

arguments = {'learning_rate': '0.001397',
             'dropout_rate': '0.18',
             'epoch' : '11',
             'act' : 'sigmoid',
             'layer': '2',
             'checkpoint_dir': '/reuslt/training_checkpoints',
             'saved_model_dir':'/result/saved_model/',
             'pvc_name' : 'kubeflow-pvc',
             'tensorboard_log': '/result/log',
             'name' : 'kfserving-fmnist',
             'model_version' : '0001',
             'namespace' : 'kubeflow'
            }
    
if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(pipeline_func=fmnist_pipeline, 
                                               arguments=arguments)

$ kubectl apply -f kubeflow_role_binding.yaml

apiVersion: rbac.authorization.k8s.io/v1beta1
kind: Role
metadata:
  name: access-kubeflow
  namespace: kubeflow
rules:
- apiGroups: ["", "kubeflow.org", "batch"]
  resources: ["pods", "pods/log", "jobs", "experiments", "persistentvolumeclaims"]
  verbs: ["create", "delete", "update", "patch", "get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: kubeflow-access-role
  namespace: kubeflow
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: access-kubeflow
subjects:
- kind: ServiceAccount
  name: default-editor
  namespace: terry

 

pipeline-runner 에게 inferenceservice 권한 부여

$ vi kfserving-pipeline-role.yaml

apiVersion: rbac.authorization.k8s.io/v1beta1
kind: Role
metadata:
  name: access-kfserving
  namespace: kubeflow
rules:
- apiGroups: ["serving.kubeflow.org"]
  resources: ["inferenceservices"]
  verbs: ["create", "delete", "update", "patch", "get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: kfserving-access-role
  namespace: kubeflow
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: access-kfserving
subjects:
- kind: ServiceAccount
  name: pipeline-runner
  namespace: kubeflow

$ kubectl apply -f kfserving-pipeline-role.yaml

role.rbac.authorization.k8s.io/access-kfserving created
rolebinding.rbac.authorization.k8s.io/kfserving-access-role created

$ kubectl label namespace kubeflow serving.kubeflow.org/inferenceservice=enabled

[root@master01 ~]# kubectl get namespaces --show-labels
NAME                 STATUS   AGE   LABELS
kubeflow             Active   15h   control-plane=kubeflow,katib-metricscollector-injection=enabled,serving.kubeflow.org/inferenceservice=enabled
terry                Active   52m   istio-injection=enabled,katib-metricscollector-injection=enabled,serving.kubeflow.org/inferenceservice=enabled

 

WEB UI 

 

import kfp
import kfp.dsl as dsl
import kfp.onprem as onprem
import kfp.components as comp


    
def echo_op(text):
    return dsl.ContainerOp(
        name='echo',
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        arguments=['echo "$0"', text],
    )  

@dsl.pipeline(
    name='FMnistPipeline',
    description='mnist '
)
def fmnist_pipeline(learning_rate, dropout_rate, epoch, act, layer,  
                    checkpoint_dir, saved_model_dir, pvc_name, tensorboard_log,
                    name, model_version, namespace):
  
    exit_task = echo_op("Done!")
    
    with dsl.ExitHandler(exit_task): 

        kubeflow_pvc = dsl.PipelineVolume(pvc=str(pvc_name))
        
        mnist = dsl.ContainerOp(
            name='FMnist',
            image='kubeflow-registry.default.svc.cluster.local:30000/katib-job:105E605F',
            command=['python', '/app/fmnist-save-model-renew.py'],
            arguments=[
                "--learning_rate", learning_rate,
                "--dropout_rate", dropout_rate,
                "--epoch", epoch,
                "--act", act,
                "--layer", layer,
                "--checkpoint_dir", checkpoint_dir,
                "--saved_model_dir", saved_model_dir,
                "--model_version", model_version,
                "--tensorboard_log", tensorboard_log
            ],
            pvolumes={"/result": kubeflow_pvc}
        ).set_gpu_limit(1)
        
        result = dsl.ContainerOp(
            name='list_list',
            image='library/bash:4.4.23',
            command=['ls', '-R', '/result'],
            pvolumes={"/result": mnist.pvolume}
        )
        
        kfserving = dsl.ContainerOp(
            name='kfserving',
            image='kubeflow-registry.default.svc.cluster.local:30000/kfserving:857CCC8F',
            command=['python', '/app/KFServing-fairing.py'],
            arguments=[
                "--namespace", namespace,
                "--storage_uri", "pvc://" +  str(pvc_name) + "/saved_model",
                "--name", name
            ]
          )        
                
        mnist_web_ui = dsl.ContainerOp(
            name='mnist_web_ui',
            image='brightfly/fmnist-webui-deploy:latest',
        )                
        
        result.after(mnist)
        kfserving.after(result)
        mnist_web_ui.after(kfserving)
    

arguments = {'learning_rate': '0.001397',
             'dropout_rate': '0.18',
             'epoch' : '1',
             'act' : 'sigmoid',
             'layer': '2',
             'checkpoint_dir': '/reuslt/training_checkpoints',
             'saved_model_dir':'/result/saved_model/',
             'pvc_name' : 'kubeflow-pvc',
             'tensorboard_log': '/result/log',
             'name' : 'kfserving-fmnist',
             'model_version' : '0001',
             'namespace' : 'kubeflow'
            }
    
if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(pipeline_func=fmnist_pipeline, 
                                               arguments=arguments)

 

 


Sample Machine Learning Workflow 

https://www.kubeflow.org/docs/examples/kubeflow-samples/

 

MNIST on Kubeflow on Vanilla k8s

https://github.com/kubeflow/examples/tree/master/mnist#vanilla

 

 

 

 

 

 

 

 

 

 

'Kubernetes' 카테고리의 다른 글

Docker Private Registry 설치  (0) 2020.12.20
Kubernetes Worker Node Join  (0) 2020.12.05
Docker  (0) 2020.03.21
KF Serving  (1) 2020.03.13
Knative Serving  (0) 2020.03.09