Kubernetes: що таке Kubernetes Operator та CustomResourceDefinition

Автор |  18/07/2025

Мабуть, всі користувались операторами в Kubernetes, наприклад – PostgreSQL operator, VictoriaMetircs Operator.

Але що там відбувається “під капотом”? Як і до чого застосовуються CustomResourceDefinition (CRD), і що таке, власне “оператор”?

І, врешті решт – в чому різниця між “Kubernetes Operator” та “Kubernetes Controller”?

В попередній частині – Kubernetes: Kubernetes API, API Groups, CRD та etcd – трохи копнули в те, як працює Kubernetes API і що таке CRD, а тепер можемо спробувати написати власний мікро-опертор, простенький MVP, і на його прикладі розібратись з деталями.

Kubernetes Controller vs Kubernetes Operator

Отже, в чому головна різниця між Controller та Operator?

What is: Kubernetes Controller

Якщо просто, то Controller – то просто якийсь сервіс, який моніторить ресурси в кластері, і приводить їхній стан у відповідність до того, як цей стан описаний в базі даних – etcd.

В Kubernetes ми маємо набір дефолтних контролерів – Core Controllers у складі Kube Controller Manager, такі як ReplicaSet Controller, який перевіряє кількість подів в Deployment на відповідність до значення replicas, або Deployment Controller, який контролює створення та оновлення ReplicaSets, чи PersistentVolume Controller та PersistentVolumeClaim Binder для роботи з дисками тощо.

Окрім цих дефолтних контролерів можемо створити власний контролер, або взяти існуючий – наприклад, ExternalDNS Controller. Це приклади кастомних контролерів (Custom Controllers).

Контролери працюють у control loop – циклічному процесі, в якому постійно перевіряють задані їм ресурси – або зміни вже існуючі ресурсів в системі, або реагують на додавання нових.

Під час кожної перевірки (reconciliation loop), Controller порівнює поточний стан (current state) ресурсу та порівнює його з бажаним станом (desired state) – тобто параметрами, заданими в його маніфесті при створенні або оновлені ресурсу.

Якщо desired стан не відповідає current state – то контролер виконує потрібні дії, аби ці стани узгодити.

What is: Kubernetes Operator

В свою чергу Kubernetes Operator – це такий собі “контролер на стероїдах”: фактично, Operator являє собою Custom Controller в тому сенсі, що він має власний сервіс у вигляді Pod, який комунікує з Kubernetes API для отримання та апдейту інформації про ресурси.

Але якщо звичайні контролери працюють з “дефолтними” типами ресурсів (Pod, Endpoint Slice, Node, PVC) – то для Operator ми описуємо власні, кастомні ресурси, використовуючи маніфест з Custom Resource.

А те, як ці ресурси будуть виглядати і які параметри мати – задаємо через CustomResourceDefinition які записуються в базу Kubernetes та додаються до Kubernetes API, і таким чином Kubernetes API дозволяє нашому кастомному Контролеру оперувати з цими ресурсами.

Тобто:

  • Controller – це компонент, сервіс, а Operator – це поєднання одного чи кількох кастомних Controller та відповідних CRD
  • Controller – реагує на зміну ресурсів, а Operator – додає нові типи ресурсів + контролер, який ці ресурси контролює

Kubernetes Operator frameworks

Існує кілька рішень, які спрощують створення операторів.

Основні – Kubebuilder, фреймворк для створення контролерів на Go, та Kopf – на Python.

Також є Operator SDK, який взагалі дозволяє працювати з контролерами навіть за допомогою Helm, без коду.

Я спочатку думав робити взагалі на “голому Go”, без фреймворків, аби краще зрозуміти, як усе працює під капотом – але цей пост почав перетворюватись на 95% Golang.

А так як основна ідея матеріалу була показати чисто концептуально що таке Kubernetes Operator, яку роль грають CustomResourceDefinitions та як вони один з одним взаємодіють і дозволяють керувати ресурсами – то все ж зупинився на Kopf, бо він дуже простий, і для цих цілей цілком підходить.

Створення CustomResourceDefinition

Почнемо з написання CRD.

Власне CustomResourceDefinition – це просто опис того, які поля у нашого кастомного ресурсу будуть, аби контролер міг їх використовувати через Kubernetes API для створення реальних ресурсів – будь то якісь ресурси в самому Kubernetes, чи зовнішні типу AWS Load Balancer чи AWS Route 53.

Що будемо робити: напишемо CRD, який буде описувати ресурс MyApp, і у цього ресурсу будуть поля для Docker image та кастомне поле з якимось текстом, який потім буде записувати в логи Kubernetes Pod.

Документація Kubernetes по CRD – Extend the Kubernetes API with CustomResourceDefinitions.

Створюємо файл myapp-crd.yaml:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: myapps.demo.rtfm.co.ua
spec:
  group: demo.rtfm.co.ua
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                image:
                  type: string
                banner: 
                  type: string
                  description: "Optional banner text for the application"
  scope: Namespaced
  names:
    plural: myapps
    singular: myapp
    kind: MyApp
    shortNames:
      - ma

Тут:

  • spec.group: demo.rtfm.co.ua: створюємо нову API Group, всі ресурси цього типу будуть доступні за адресою /apis/demo.rtfm.co.ua/...
  • versions: список версій нового ресурсу
    • name.v1: будемо описувати тільки одну версію
    • served: true: додаємо новий ресурс в Kube API – можна робити kubectl get myapp (GET /apis/demo.rtfm.co.ua/v1/myapps)
    • storage: true: ця версія буде використовуватись для зберігання в etcd (якщо описується кілька версій – то тільки одна повинна бути із storage: true)
    • schema:
      • openAPIV3Schema: описуємо API-схему за стандартом OpenAPI v3
        • type: object: описуємо об’єкт із вкладеними полями (key: value)
        • properties: які поля у об’єкта будуть
          • spec: що ми зможемо використовувати у YAML-маніфестах при його створенні
            • type: object – описуємо наступні поля
            • properties:
              • image.type: string: Docker-образ
              • banner.type: string: наше кастомне поле, через яке ми будемо додавати якийсь запис в логах ресурсу
  • scope: Namespaced: всі ресурси цього типу будуть існувати в конкретному Kubernetes Namespace
  • names:
    • plural: myapps:  ресурси будуть доступні через /apis/demo.rtfm.co.ua/v1/namespaces/<ns>/myapps/, і як ми зможемо “звертатись” до ресурсу (kubectl get myapp), використовується в RBAC де треба вказати resources: ["myapps"]
    • singular: myap: аліас для зручності
    • shortNames: [ma] короткий аліас для зручності

Запускаємо Minikube:

$ minikube start

Додаємо CRD:

$ kk apply -f myapp-crd.yaml 
customresourcedefinition.apiextensions.k8s.io/myapps.demo.rtfm.co.ua created

Глянемо API Groups:

$ kubectl api-versions
...
demo.rtfm.co.ua/v1
...

І новий ресурс в цій API Group:

$ kubectl api-resources --api-group=demo.rtfm.co.ua
NAME     SHORTNAMES   APIVERSION           NAMESPACED   KIND
myapps   ma           demo.rtfm.co.ua/v1   true         MyApp

ОК – ми створили CRD, і тепер можемо навіть створити CustomResource (CR).

Створюємо файл myapp-example-resource.yaml:

apiVersion: demo.rtfm.co.ua/v1      # matches the CRD's group and version
kind: MyApp                         # kind from the CRD's 'spec.names.kind'
metadata:
  name: example-app                 # name of this custom resource
  namespace: default                # namespace (CRD has scope: Namespaced)
spec:
  image: nginx:latest               # container image to use (from our schema)
  banner: "This pod was created by MyApp operator 🚀"

Деплоїмо:

$ kk apply -f myapp-example-resource.yaml 
myapp.demo.rtfm.co.ua/example-app created

І перевіряємо:

$ kk get myapp
NAME          AGE
example-app   15s

Але ніякий ресурсів типу Pod нема – бо у нас нема контролера, який буде працювати з цим типом ресурсів.

Створення Kubernetes Operator з Kopf

Отже, будемо використовувати Kopf, який буде створювати Kubernetes Pod, але використовуючи наш власний CRD.

Створюємо Python virtual environment:

$ python -m venv venv
$ . ./venv/bin/activate
(venv)

Додаємо залежності – файл requirements.txt:

kopf
kubernetes
PyYAML

Встановлюємо їх – з pip або uv:

$ pip install -r requirements.txt

Пишемо код оператора:

import os
import kopf
import kubernetes
import yaml

# use kopf to register a handler for the creation of MyApp custom resources
@kopf.on.create('demo.rtfm.co.ua', 'v1', 'myapps')
# this function will be called when a new MyApp resource is created
def create_myapp(spec, name, namespace, logger, **kwargs):
    # get image value from the spec of the CustomResource manifest
    image = spec.get('image')
    if not image:
        raise kopf.PermanentError("Field 'spec.image' must be provided.")

    # get optional banner value from the CR manifest spec
    banner = spec.get('banner')

    # load pod template YAML from file
    path = os.path.join(os.path.dirname(__file__), 'pod.yaml')
    with open(path, 'rt') as f:
        pod_template = f.read()

    # render pod YAML with provided values
    pod_yaml = pod_template.format(
        name=f"{name}-pod",
        image=image,
        app_name=name,
    )
    # create Pod difinition from the rendered YAML
    # it uses PyYAML to parse the YAML string into a Python dictionary
    # which can be used by Kubernetes API client
    # it is used to create a Pod object in Kubernetes
    pod_spec = yaml.safe_load(pod_yaml)

    # inject banner as environment variable if provided
    if banner:
        # it is used to add a new environment variable into the container spec
        container = pod_spec['spec']['containers'][0]
        env = container.setdefault('env', [])
        env.append({
            'name': 'BANNER',
            'value': banner
        })

    # create Kubernetes CoreV1 API client
    # used to interact with the Kubernetes API
    api = kubernetes.client.CoreV1Api()

    try:
        # it sends a request to the Kubernetes API to create a new Pod
        # uses 'create_namespaced_pod' method to create the Pod in the specified namespace
        # 'namespace' is the namespace where the Pod will be created
        # 'body' is the Pod specification that was created from the YAML template
        api.create_namespaced_pod(namespace=namespace, body=pod_spec)
        logger.info(f"Pod {name}-pod created.")
    except kubernetes.client.exceptions.ApiException as e:
        logger.error(f"Failed to create pod {name}-pod: {e}")

Створюємо шаблон, який буде використовуватись нашим Оператором для створення ресурсів:

apiVersion: v1
kind: Pod
metadata:
  name: {name}
  labels:
    app: {app_name}
spec:
  containers:
    - name: {app_name}
      image: {image}
      ports:
        - containerPort: 80
      env:
        - name: BANNER
          value: ""  # will be overridden in code if provided
      command: ["/bin/sh", "-c"]
      args:
        - |
          if [ -n "$BANNER" ]; then
            echo "$BANNER";
          fi
          exec sleep infinity

Запускаємо оператор з kopf run myoperator.py.

У нас вже є створений CustomResource, і Оператор має його побачити та створити Kubernetes Pod:

$ kopf run  myoperator.py  --verbose
...
[2025-07-18 13:59:58,201] kopf._cogs.clients.w [DEBUG   ] Starting the watch-stream for customresourcedefinitions.v1.apiextensions.k8s.io cluster-wide.
[2025-07-18 13:59:58,201] kopf._cogs.clients.w [DEBUG   ] Starting the watch-stream for myapps.v1.demo.rtfm.co.ua cluster-wide.
[2025-07-18 13:59:58,305] kopf.objects         [DEBUG   ] [default/example-app] Creation is in progress: {'apiVersion': 'demo.rtfm.co.ua/v1', 'kind': 'MyApp', 'metadata': {'annotations': {'kubectl.kubernetes.io/last-applied-configuration': '{"apiVersion":"demo.rtfm.co.ua/v1","kind":"MyApp","metadata":{"annotations":{},"name":"example-app","namespace":"default"},"spec":{"banner":"This pod was created by MyApp operator 🚀","image":"nginx:latest","replicas":3}}\n'}, 'creationTimestamp': '2025-07-18T09:55:42Z', 'generation': 2, 'managedFields': [{'apiVersion': 'demo.rtfm.co.ua/v1', 'fieldsType': 'FieldsV1', 'fieldsV1': {'f:metadata': {'f:annotations': {'.': {}, 'f:kubectl.kubernetes.io/last-applied-configuration': {}}}, 'f:spec': {'.': {}, 'f:banner': {}, 'f:image': {}, 'f:replicas': {}}}, 'manager': 'kubectl-client-side-apply', 'operation': 'Update', 'time': '2025-07-18T10:48:27Z'}], 'name': 'example-app', 'namespace': 'default', 'resourceVersion': '2955', 'uid': '8b674a99-05ab-4d4b-8205-725de450890a'}, 'spec': {'banner': 'This pod was created by MyApp operator 🚀', 'image': 'nginx:latest', 'replicas': 3}}
...
[2025-07-18 13:59:58,325] kopf.objects         [INFO    ] [default/example-app] Pod example-app-pod created.
[2025-07-18 13:59:58,326] kopf.objects         [INFO    ] [default/example-app] Handler 'create_myapp' succeeded.
...

Перевіряємо Pod:

$ kk get pod
NAME              READY   STATUS    RESTARTS   AGE
example-app-pod   1/1     Running   0          68s

Та його логи:

$ kk logs -f example-app-pod
This pod was created by MyApp operator 🚀

Отже, Оператор запустив Pod використовуючи наш CustomResource в якому взяв поле spec.banner зі рядком “This pod was created by MyApp operator 🚀“, і виконав в поді command /bin/sh -c " $BANNER".

Шаблони ресурсів – Kopf та Kubebuilder

Замість того, аби мати окремий файл pod-template.yaml ми могли б все описати прямо в коді оператора.

Тобто можна описати щось на кшталт:

...
    # get optional banner value
    banner = spec.get('banner', '')

    # define Pod spec as a Python dict
    pod_spec = {
        "apiVersion": "v1",
        "kind": "Pod",
        "metadata": {
            "name": f"{name}-pod",
            "labels": {
                "app": name,
            },
        },
        "spec": {
            "containers": [
                {
                    "name": name,
                    "image": image,
                    "env": [
                        {
                            "name": "BANNER",
                            "value": banner
                        }
                    ],
                    "command": ["/bin/sh", "-c"],
                    "args": [f'echo "$BANNER"; exec sleep infinity'],
                    "ports": [
                        {
                            "containerPort": 80
                        }
                    ]
                }
            ]
        }
    }

    # create Kubernetes API client
    api = kubernetes.client.CoreV1Api()
...

А у випадку з Kubebuilder зазвичай створюється функція, яка використовує маніфест CustomResource (cr *myappv1.MyApp) і формує об’єкт типу *corev1.Pod використовуючи Go-структури corev1.PodSpec та corev1.Container:

...
// newPod is a helper function that builds a Kubernetes Pod object
// based on the custom MyApp resource. It returns a pointer to corev1.Pod,
// which is later passed to controller-runtime's client.Create(...) to create the Pod in the cluster.
func newPod(cr *myappv1.MyApp) *corev1.Pod {
    // `cr` is a pointer to your CustomResource of kind MyApp
    // type MyApp is generated by Kubebuilder and lives in your `api/v1/myapp_types.go`
    // it contains fields like cr.Spec.Image, cr.Spec.Banner, cr.Name, cr.Namespace, etc.
    return &corev1.Pod{
        // corev1.Pod is a Go struct representing the built-in Kubernetes Pod type
        // it's defined in "k8s.io/api/core/v1" package (aliased here as corev1)
        // we return a pointer to it (`*corev1.Pod`) because client-go methods like
        // `client.Create()` expect pointer types

        ObjectMeta: metav1.ObjectMeta{
            // metav1.ObjectMeta comes from "k8s.io/apimachinery/pkg/apis/meta/v1"
            // it defines metadata like name, namespace, labels, annotations, ownerRefs, etc.
            Name:      cr.Name + "-pod",     // generate Pod name based on the CR's name
            Namespace: cr.Namespace,         // place the Pod in the same namespace as the CR
            Labels: map[string]string{       // set a label for identification or selection
                "app": cr.Name,              // e.g., `app=example-app`
            },
        },

        Spec: corev1.PodSpec{
            // corev1.PodSpec defines everything about how the Pod runs
            // including containers, volumes, restart policy, etc.

            Containers: []corev1.Container{
                // define a single container inside the Pod

                {
                    Name:  cr.Name,          // use CR name as container name (must be DNS compliant)
                    Image: cr.Spec.Image,    // container image (e.g., "nginx:1.25")

                    Env: []corev1.EnvVar{
                        // corev1.EnvVar is a struct that defines environment variables
                        {
                            Name:  "BANNER",           // name of the variable
                            Value: cr.Spec.Banner,     // value from the CR spec
                        },
                    },

                    Command: []string{"/bin/sh", "-c"},
                    // override container ENTRYPOINT to run a shell command

                    Args: []string{
                        // run a command that prints the banner and sleeps forever
                        // fmt.Sprintf(...) injects the value at runtime into the string
                        fmt.Sprintf(`echo "%s"; exec sleep infinity`, cr.Spec.Banner),
                    },

                    // optional: could also add ports, readiness/liveness probes, etc.
                },
            },
        },
    }
}
...

А як в реальних операторах?

Але це ми робили для”внутрішніх” ресурсів Kubernetes.

Як щодо зовнішніх ресурсів?

Тут просто приклад – не тестував, але загальна ідея така: просто беремо SDK (в прикладі з Python це буде boto3), і використовуючи поля з CustomResource (наприклад, subnets або scheme), виконуємо відповідні API-запити до AWS через SDK.

Приклад такого CustomResource:

apiVersion: demo.rtfm.co.ua/v1
kind: MyIngress
metadata:
  name: myapp
spec:
  subnets:
    - subnet-abc
    - subnet-def
  scheme: internet-facing

І код, який міг би створювати AWS ALB з нього:

import kopf
import boto3
import botocore
import logging

# create a global boto3 client for AWS ELBv2 service
# this client will be reused for all requests from the operator
# NOTE: region must match where your subnets and VPC exist
elbv2 = boto3.client("elbv2", region_name="us-east-1")

# define a handler that is triggered when a new MyIngress resource is created
@kopf.on.create('demo.rtfm.co.ua', 'v1', 'myingresses')
def create_ingress(spec, name, namespace, status, patch, logger, **kwargs):
    # extract the list of subnet IDs from the CustomResource 'spec.subnets' field
    # these subnets must belong to the same VPC and be public if scheme=internet-facing
    subnets = spec.get('subnets')

    # extract optional scheme (default to 'internet-facing' if not provided)
    scheme = spec.get('scheme', 'internet-facing')

    # validate input: at least 2 subnets are required to create an ALB
    if not subnets:
        raise kopf.PermanentError("spec.subnets is required.")

    # attempt to create an ALB in AWS using the provided spec
    # using the boto3 ELBv2 client
    try:
        response = elbv2.create_load_balancer(
            Name=f"{name}-alb",           # ALB name will be derived from CR name
            Subnets=subnets,              # list of subnet IDs provided by user
            Scheme=scheme,                # 'internet-facing' or 'internal'
            Type='application',           # we are creating an ALB (not NLB)
            IpAddressType='ipv4',         # only IPv4 supported here (could be 'dualstack')
            Tags=[                        # add tags for ownership tracking
                {'Key': 'ManagedBy', 'Value': 'kopf'},
            ]
        )
    except botocore.exceptions.ClientError as e:
        # if AWS API fails (e.g. invalid subnet, quota exceeded), retry later
        raise kopf.TemporaryError(f"Failed to create ALB: {e}", delay=30)

    # parse ALB metadata from AWS response
    lb = response['LoadBalancers'][0]       # ALB list should contain exactly one entry
    dns_name = lb['DNSName']                # external DNS of the ALB (e.g. abc.elb.amazonaws.com)
    arn = lb['LoadBalancerArn']             # unique ARN of the ALB (used for deletion or listeners)

    # log the creation for operator diagnostics
    logger.info(f"Created ALB: {dns_name}")

    # save ALB info into the CustomResource status field
    # this updates .status.alb.dns and .status.alb.arn in the CR object
    patch.status['alb'] = {
        'dns': dns_name,
        'arn': arn,
    }

    # return a dict, will be stored in the finalizer state
    # used later during deletion to clean up the ALB
    return {'alb-arn': arn}

У випадку з Go і Kubebuilder – використовували б бібліотеку aws-sdk-go:

import (
    "context"
    "fmt"

    elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
    "github.com/aws/aws-sdk-go-v2/aws"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    networkingv1 "k8s.io/api/networking/v1"
)

func newALB(ctx context.Context, client *elbv2.Client, cr *networkingv1.Ingress) (string, error) {
    // build input for the ALB
    input := &elbv2.CreateLoadBalancerInput{
        Name:           aws.String(fmt.Sprintf("%s-alb", cr.Name)),
        Subnets:        []string{"subnet-abc123", "subnet-def456"}, // replace with real subnets
        Scheme:         elbv2.LoadBalancerSchemeEnumInternetFacing,
        Type:           elbv2.LoadBalancerTypeEnumApplication,
        IpAddressType:  elbv2.IpAddressTypeIpv4,
        Tags: []types.Tag{
            {
                Key:   aws.String("ManagedBy"),
                Value: aws.String("MyIngressOperator"),
            },
        },
    }

    // create ALB
    output, err := client.CreateLoadBalancer(ctx, input)
    if err != nil {
        return "", fmt.Errorf("failed to create ALB: %w", err)
    }

    if len(output.LoadBalancers) == 0 {
        return "", fmt.Errorf("ALB was not returned by AWS")
    }

    // return the DNS name of the ALB
    return aws.ToString(output.LoadBalancers[0].DNSName), nil
}

В самому реальному AWS ALB Ingress Controller створення ALB викликається у файлі elbv2.go:

...
func (c *elbv2Client) CreateLoadBalancerWithContext(ctx context.Context, input *elasticloadbalancingv2.CreateLoadBalancerInput) (*elasticloadbalancingv2.CreateLoadBalancerOutput, error) {
  client, err := c.getClient(ctx, "CreateLoadBalancer")
  if err != nil {
    return nil, err
  }
  return client.CreateLoadBalancer(ctx, input)
}
...

Власне, на цьому і все.