Engineer in Tokyo

Kubernetesを拡張しよう

この記事は Kubernetes Advent Calendar 2016の第17日目の記事。 第16目はyuanyingさんの 「Openstack で Kubernetes を使う」 でした。

KubernetesはDeployment, Secret, ConfigMap, Ingressなど、いろいろ機能があります。それぞれの機能はあることを自動化しているようなものです。 例えば、Deploymentはアプリケーションのデプロイ・更新を自動化するもの。Ingressはロードバランサーの作成・管理を自動化しているようなもの。その機能は便利ですが、ある程度Kubernetesに取り込んだら、自分で拡張したくなる場合がが多くなる。例えば、証明書の更新・管理の自動化だとか、etcdクラスターの管理の自動化だとか。

Kubernetesアーキテクチャ

Kubernetesをどうやって拡張するかを説明する前に、そもそもKubernetesのアーキテクチャを説明しなくちゃいけない。KubernetesのマスターにAPIサーバーはもちろんあるんですが、APIサーバーは基本的にKubernetesのオブジェクトデータのCRUDオペレーションくらいしかやっていない。例えば、Deploymentの動きの実装はAPIサーバーには入っていない。ノードが落ちたら、そこに入っていたポッドを別のサーバーに移動したり、ローリングアップデートの動きなどはDeploymentコントローラーで実装されていて、コントローラーマネジャー(kube-controller-manager)というデーモンに入っている。コントローラーマネジャーは何かというと管理に便利だったため、Kubernetesの標準オブジェクト(Deployment, ReplicaSet, DaemonSet, StatefulSetなど)の複数のコントローラーが合わせて入っているデーモン。

kube-controller-manager

コントローラーは何かと言いますと、コントロールループで、クラスタのあるべき状態(APIサーバー・etcdに入っているデータ)とクラスタの実際の状態を常に比較して、クラスタのあるべき状態をクラスタに実現させるデーモン。ユーザーがAPIサーバーにあるべき状態を保存したあとに動作するものなので、必然的に非同期のアーキテクチャになる。

control-loop diagram

APIサーバーにオブジェクトの追加、変更、削除を監視できるWatch APIがある。コントローラーマネジャーのコントローラーたちは、APIサーバーのWatch APIを使って、該当のオブジェクトを監視して、他のオブジェクトを作ったり、更新したりする。例えば、DeploymentコントローラーはDeploymentが新しく作られたら、そのDeploymentに紐づくReplicaSetを作くったり、Deploymentreplicasが更新されたら、紐づくReplicaSetreplicasを更新したりします。

controller diagram

このコントローラーを組み合わせることもできます。例えば、ReplicaSetのコントローラーがさらにあります。DeploymentコントローラーがReplicaSetを更新したりするけど、ReplicaSetreplicasに従って、Podを作成したり、監視するのがReplicaSetコントローラーの役目です。

deployment diagram

Kubernetesオブジェクトではなくて、IngressServiceのように外部APIを使う場合もあるだろう。type=LoadBalancerServiceを作ったら、クラウドプロバイダーのServiceコントローラーが勝手にロードバランサーを作ってくれることもできます。

cloud diagram

Kubernetes を拡張する

Kubernetesを拡張するには標準コントローラーと同じことをする。あるオブジェクトを監視して、追加、変更などがあったら、必要なアクションをするコントローラーを作ります。ただ、標準オブジェクトはAPIがあるんですけど、カスタムオブジェクトを作りたい場合はどうするか? Kubernetes自体を修正して再コンパイルしてデプロイしたくないので、KubernetesではThirdPartyResourceというカスタムオブジェクトの定義を作ることができます。ThirdPartyResourceを作れば、APIサーバーに新しいAPI URLができて、そのURLでカスタムオブジェクトを作ることができます。簡単な例をみてみよう。

ThirdPartyResourceを定義する

この例では定期的にバッチJobを作るCronコントローラーを作ります。以下のThirdPartyResourceではCronTabというオブジェクト形を作ります。

metadata:
  name: cron-tab.alpha.ianlewis.org
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "A specification of a Job to run on a cron style schedule"
versions:
  - name: v1

このCronTabresource.yamlに保存して、kubectlで作成する。CronTabのオブジェクト名はcron-tab.alpha.ianlewis.orgの最初の部分cron-tabをCamel Caseにした名称になる。

$ kubectl create -f resource.yaml
thirdpartyresource "cron-tab.alpha.ianlewis.org" created

こうするとAPIサーバーで/apis/alpha.ianlewis.org/v1/namespaces/<namespace>/crontabs/というURLエンドポイントが使えるようになります。このURLを使うとCronTabオブジェクトを作ることができますが、簡単な操作はkubectlを使えます。

kubectl get crontab

CronTabオブジェクトを作ってみましょう。ThirdPartyResourceのオブジェクトはKubernetesオブジェクトの標準フィールドapiVersion, kind, metadataが必要ですが、それ以外のフィールドはすべて任意JSONデータ。CronTabspecというフィールドにJobオブジェクトのspecと同じデータを入れます。以下のYamlをbackup.yamlに保存します。

apiVersion: "alpha.ianlewis.org/v1"
kind: "CronTab"
metadata:
  name: backup
spec:
  schedule: "@daily"
  jobTemplate:
    containers:
      - image: mybackupscript:v9
        name: backup
    restartPolicy: Never

backupCronTabを作成する

$ kubectl create -f backup.yaml
crontab "backup" created

コントローラーを作る

ThirdPartyResourceを作ることでAPIサーバーでオブジェクトを保存してくれるんだけど、特に何も処理、動作はしない。処理をするコントローラーを書かなくちゃいけない。コントローラーは非同期処理は多いので、Goでは一番書きやすいと思う。

基本のロジックを書いておく

package main

import (
    "net/http"
    "time"
    "fmt"
    "encoding/json"

    "github.com/robfig/cron"
)

// CronTabの処理をやってくれるcron.CronとCronTabオブジェクトのマッピング
type cronServer struct {
    Server *cron.Cron
    Object cronTab
}

var cronServers = make(map[string]cronServer, 0)

// Kubernetesオブジェクトの`metadata`フィールド
type objectMeta struct {
    Name            string `json:"name"`
    UID             string `json:"uid,omitempty"`
    ResourceVersion string `json:"resourceVersion,omitempty"`
}

// リストで取ってきた場合のJSON型
type cronTabList struct {
    Items []cronTab `json:"items"`
}

type cronTabSpec struct {
    Schedule    string          `json:"schedule"`
    JobTemplate json.RawMessage `json:"jobTemplate"`
}

// CronTabオブジェクト
type cronTab struct {
    // The following fields mirror the fields in the third party resource.
    ObjectMeta objectMeta  `json:"metadata"`
    Spec       cronTabSpec `json:"spec"`
}

func main() {
    for {
        // APIを15秒ごとにポーリングする
        time.Sleep(15 * time.Second)
        resp, err := http.Get("http://localhost:8001/apis/alpha.ianlewis.org/v1/namespaces/default/crontabs")
        if err != nil {
            log.Printf("Could not connect to Kubernetes API: %v", err)
            continue
        }

        // APIから取ってきたJSONをデコード
        decoder := json.NewDecoder(resp.Body)
        var l cronTabList

        err = decoder.Decode(&l)
        if err != nil {
            log.Printf("Could not decode JSON event object: %v", err)
            continue
        }

        // 削除されたCronTabを処理する
        removeDeletedCronTabs(l)

        // 追加、更新されたCronTabを処理
        updateCronTabs(l)
    }
}

上の処理で15秒ごとにAPIを叩いて、コントローラーの内部の状態を更新する。removeDeletedCronTabs()updateCronTabs()でCronサーバーを操作する。まずはremoveDeletedCronTabs()を実装する。APIサーバーがつくてくれるUIDでオブジェクトを特定する。

func removeDeletedCrontabs(l cronTabList) {
    for _, s := range cronServers {
        found := false
        for _, c := range l.Items {
            if c.ObjectMeta.UID == s.Object.ObjectMeta.UID {
                found = true
            }
        }
        if !found {
            removeCronTab(s.Object)
        }
    }
}

次、updateCronTabs()を実装する。ここにもUIDでオブジェクトを特定する上、オブジェクトが更新されているかどうかをresourceVersionでチェックする。

func updateCronTabs(l cronTabList) {
    for _, c := range l.Items {
        found := false
        for _, s := range cronServers {
            if c.ObjectMeta.UID == s.Object.ObjectMeta.UID {
                if c.ObjectMeta.ResourceVersion != s.Object.ObjectMeta.ResourceVersion {
                    log.Printf("Updating crontab %s", c.ObjectMeta.Name)
                    removeCronTab(s.Object)
                    err := addCronTab(c)
                    if err != nil {
                        log.Printf("Could not create crontab %#v: %v", c, err)
                    }
                }
                found = true
            }
        }
        if !found {
            err := addCronTab(c)
            if err != nil {
                log.Printf("Could not create crontab %#v: %v", c, err)
            }
        }
    }
}

addCronTab()はこんな感じでcron.Cronのサーバーオブジェクトを起動する。このオブジェクトはgoroutineでスケジュールに従ってaddFunc()で指定した関数を呼び出す。

func addCronTab(c cronTab) error {
    server := cron.New()

    // robfig/cronのCronは秒間のスケジュールに対応しているけど、
    // ここに標準のcronに追従して分間単位でスケジュールする
    spec := c.Spec.Schedule
    if !strings.HasPrefix(c.Spec.Schedule, "@") {
        spec = "0 " + c.Spec.Schedule
    }

    err := server.AddFunc(spec, func() {
        if err := runCronJob(c); err != nil {
            log.Printf("Error running cron job: %v", err)
        }
    })
    if err != nil {
        return fmt.Errorf("error adding crontab: %v", err)
    }

    cronServers[c.ObjectMeta.UID] = cronServer{
        Server: server,
        Object: c,
    }

    server.Start()

    log.Printf("Added crontab: %s", c.ObjectMeta.Name)

    return nil
}

removeCronTab()はこんな感じ

func removeCronTab(c cronTab) {
    if server, ok := cronServers[c.ObjectMeta.UID]; ok {
        server.Server.Stop()
        delete(cronServers, c.ObjectMeta.UID)
        log.Printf("Removed crontab: %s", c.ObjectMeta.Name)
    }
}

最後にrunCronJob()の実際にCronジョブを実行してくれる関数を定義する。ここにKubernetesのJobオブジェクトを作ってその後、Kubernetesに任せる。

type labels *map[string]string

type selector struct {
  MatchLabels labels `json:"matchLabels,omitempty"`
}

type jobTemplate struct {
  ObjectMeta      objectMeta       `json:"metadata"`
  JobTemplateSpec json.RawMessage `json:"spec,omitempty"`
}

type jobSpec struct {
  Selector *selector    `json:"selector,omitempty"`
  Template *jobTemplate `json:"template,omitempty"`
}

func runCronJob(c cronTab) error {
    name := makeJobName(c)
    log.Printf("Creating job %s for crontab %s", name, c.ObjectMeta.Name)

    job := job{
        ObjectMeta: objectMeta{
            Name: name,
        },
        JobSpec: jobSpec{
            Selector: &selector{
                MatchLabels: &map[string]string{
                    "name": name,
                },
            },
            Template: &jobTemplate{
                ObjectMeta: objectMeta{
                    Name: name,
                    Labels: &map[string]string{
                        "name": name,
                    },
                },
                JobTemplateSpec: &c.Spec.JobTemplate,
            },
        }
    }

    j, err := json.Marshal(job)
    if err != nil {
        return fmt.Errorf("could not marshal job to JSON: %s", err)
    }

    resp, err := http.Post("http://localhost:8001/apis/extensions/v1beta1/namespaces/default/jobs", "application/json", bytes.NewReader(j))
    if err != nil {
        return fmt.Errorf("HTTP request failed: %s", err)
    }

    return nil
}

コントローラーをデプロイする

このコントローラーはローカルPCなどAPIサーバーにアクセスさえできれば、どこにも動かしてもいいんだけど、最終的にクラスターにデプロイした方が良い。上のコントローラーのコードにはどこにもAPIの認証の処理をやっていないけど、kubectl proxyを使えば、認証をkubectlに任せることができる。Kubernetesクラスタにデプロイすれば、サービスアカウントがPodに紐付けられる。デフォルトサービスアカウントを使えば、Podの/var/run/secrets/kubernetes.io/serviceaccount/に認証用のトークンをマウントして、クラスタ内のPodからAPIへアクセスさせる。kubectlはこういうトークンファイルを探していて、自動的に認証してくれますので、使うのが非常に便利。

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: cron-controller
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: cron-controller
    spec:
      containers:
        - name: cron
          image: my.registry.com/cron-controller:0.0.1
        # 認証するためにkubectl proxyを使う
        - name: kubectl
          image: my.registry.com/kubectl:v1.5.1
          args:
            - "proxy"
          ports:
            - name: proxy
              containerPort: 8001

これでクラスタにデプロイして、Cronサーバーを動かす。

$ kubectl create -f deploy.yaml
deployment "cron-controller" created

うまく行けば、CronTabのスケジュールに従って、Jobが作成される

$ kubectl logs cron-controller-3711479224-7z3t0
2016/12/16 04:28:33 Watching for crontab objects...
2016/12/16 04:28:48 Added crontab: backup
2016/12/17 00:00:00 Creating job backup-8dasy for crontab backup

それでJobオブジェクトが見れるはず。

$ kubectl get jobs
NAME                    DESIRED   SUCCESSFUL   AGE
backup-8dasy            1         1            5m

まとめ

ThirdPartyResourceとコントローラーの組み合わせでKubernetesの標準機能と同じように、Kubernetesらしく拡張できる。このアーキテクチャを使うと安定性の高いシステム作るができるでしょう。

また、もし興味がある方は Kubernetes Slackチャンネルにジョインすると、他のKubernetes開発者と話せますし、#jp-users に日本のユーザーもいるのでぜひジョインしてみてください。

以上、 Kubernetes Advent Calendar 2016 の第17日の記事でした。明日は、hiyosiさんの「認証関連で何か」を期待しましょう。