Apache Druid: обзор, запуск в Kubernetes и мониторинг с Prometheus

Автор: | 14/09/2022
 

Apache Druid – колоночная база данных, ориентированная на работу с большими объемами данных, сочетающая в себе возможности и преимущества Time-Series Database, Data Warehouse и поисковой системы.

Общая задача – настроить мониторинг кластера Druid в Kubernetes, для чего сначала посмотрим что это вообще такое и как оно всё работает, а затем запустим Друид и потрогаем его руками.

Для запуска самого Druid в Kubernetes используем druid-operator, для сбора метрик – druid-exporter, для мониторинга – Kubernetes Prometheus Stack, а запускать всё это будем в Minikube.

Обзор Apache Druid

Главная фича Друида лично для меня это то, система изначально разрабатывалась под использование в облаках типа AWS и Kubernetes, поэтому имеет отличные возможности по скейлингу, хранению данных и восстановлению работы в случае падения одного из сервисов.

Кроме того, Друид это:

  • колоночный формат хранения данных: для обработки загружается только необходимая таблица, что значительно увеличивает скорость обработки запросов. Кроме того, для быстрого сканирования и агрегации данных Друид оптимизирует хранилище колонки в соответствии с её типом данных
  • масштабируемая распределённая система: кластера Друида могут располагаться на десятках и сотнях отдельных серверов
  • параллельная обработка данных: Друид может обрабатывать каждый запрос параллельно на независимых инстансах
  • self-healing и self-balancing: в любой момент можно добавить или удалить новые ноды в кластер, при этом кластер сам выполняет балансировку запросов и переключение между ними без какого либо даунтайма. Более того, Друид позволяет без даунтайма выполнять и обновление версий
  • собственно, cloud-native и fault-tolerant архитектура: Друид изначально спроектирован для работы в распределённых системах. После получения новых данных, он сразу же копирует их в свой deep storage, в роли которого могут быть сервисы хранения данных типа Amazon S3, HDFS или сетевая файловая система, и позволяет восстановить данные даже в том случае, если все сервера Друида упадут. В случае, если упала только часть серверов Друида – встроенная репликация данных позволит продолжать выполнение запросов (уже чешутся руки поубивать его ноды, и посмотреть на реакцию Друида – но не в этот раз)
  • идексы для быстрого поиска: Друид использует сжатые Bitmap-индексы, позволяя выполнять быстрый поиск по нескольким колонкам
  • Time-based partitioning: по умолчанию Друид разделяет данные по времени, позволяя создавать дополнительные сегменты по другим полям. В результате, при выполнении запросов с временными промежутками, будут использованы только необходимые сегменты данных

Архитектура и компоненты Druid

Общая архитектура:

Или так:

См. Processes and servers.

Three-server configuration – Master, Query, Data

У Друида имеется несколько внутренних сервисов, используемых для работы с данными, и эти сервисы можно группировать по трём типам серверов – Master, Query и Data, см. Pros and cons of colocation.

Master servers – управление добавлением новых данных и доступностью (ingest/indexing) – отвечают за запуск новых задач по добавлению новых данных и координацию доступности данных:

  • Coordinator: отвечает за размещение сегментов данных на конкретные ноды с Historical процессами
  • Overlord: отвечают за размещение задач по добавлению поступающих данных на Middle Managers и за координацию передачи созданных сегментов в Deep Store

Query servers – управление запросами от клиентов

  • Broker: получает запросы от клиентов, определяет какие из Historical или Middle Manager процессов/нод содержат необходимые сегменты, и из исходного запроса клиента формирует и отправляет подзапрос (sub-query) каждому из этих процессов, после чего получает от них ответы, формирует из них общий ответ с данными для клиента, и отправляет ему.
    При этом, Historical отвечают на подзапросы, которые относятся к сегментам данных, которые уже хранятся в Deep Store, а Middle Manager – отвечают на запросы, которые относятся к недавно полученным данным, которые ещё находятся в памяти и не отправлены в Deep Store
  • Router: опциональный сервис, который предоставляет общий API для работы с Брокерами, Оверлордами и Координаторами, хотя можно их использовать и напрямую. Кроме того, Router предоставляет Druid Console – WebUI для работы с кластером, увидим её чуть позже

Data servers – управляют добавлением новых данных в кластер и хранят данные для клиентов:

  • Historical: “главные рабочие лошадки” (с) Друида, которые обслуживают хранилище и запросы к “историческим” данным, т.е. тем, которые уже находятся в Deep Store – загружают оттуда сегменты на локальный диск хоста, на котором работает Historical процесс, и формируют ответы по этим данным для клиентов
  • Middle Manager: обрабатывает добавление новых данных в кластер – читают данные с внешних источников и создают из них новые сегменты данных, которые затем загружаются в Deep Store
    • Peons: за добавление данных в кластер может отвечать несколько задач. Для логирования и изоляции ресурсов, Middle Manager создаёт Peons, которые являются отдельными процессами JVM и отвечают за выполнение конкретной задачи от Middle Manager. Запускается всегда на том же хосте, где запущен процесс Middle Manager, который породил Peon-процесс.
  • Indexer: альтернатива Middle Manager и Peon – вместо создания отдельной JVM на каждую задачу от Middle Manager, Indexer выполняет их в отдельных потоках своей JVM.
    Разрабатывается как более простая альтернатива Middle Manager и Peon, пока является експерементальной фичей, но в будущем заменит Middle Manager и Peon

Кроме внутренних процессов, для работы Друид также использует внешние сервисы – Deep storage, Metadata storage и ZooKeeper:

  • deep storage: используется для хранения всех данных, добавленных в систему и представляет собой распределённое хранилище, доступное каждому серверу Друида. Это могут быть Amazon S3, HDFS или NFS
  • metadata storage: используется для хранения внутренних метаданных, таких как информация об использовании сегментов и текущих задачах. В роли такого хранилища могут быть классические СУДБ типа PostgreSQL или MySQL
  • ZooKeeper: используется для service discovery и координации работы сервисов (в Kubernetes вместо него можно использовать druid-kubernetes-extensions, попробуем тоже, но в другой раз)

Druid data flow

И посмотрим, как выполняется добавление данных (Data ingest) и ответы клиентам (Data query, Query Answering).

Data ingest

Добавление данных можно разделить на два этапа: в первом, инстансы Middle Manager запускают задачи по индексированию, эти задачи создают сегменты данных и отправляют их в Deep Store, а во втором – Historical инстансы загружают сегменты данных из Deep store, что бы использовать их при формировании ответов на запросы клиентов.

Первый этап – получение данных из внешних источников, индексирование и создание сегментов данных, и их загрузка в Deep store:

При этом в процессе создания сегментов данных – они уже доступны для выполнения запросов по ним.

Второй этап – Coordinator опрашивает Metadata store в поисках новых сегментов. Как только процесс Coordinator-а их находит, он выбирает инстанс Historical, который должен загрузить сегмент из Deep store, что бы он стал доступен  для обработки запросов:

Coordinator опрашивает Metadata Store в поисках новых сегментов. Как только он их находит, Coordinator выбирает инстанс Historical, который должен загрузить сегмент из Deep store. Когда сегмент загружен – Historical готов использовать его для обработки запросов

Data query

Клиенты шлют запросы к Broker напрямую или через Router.

Когда такой запрос получен, Broker определяет какие процессы обслуживают необходимые сегменты данных.

Если данные требуют сегментов одновременно и из Deep store (какие-то старые), и из Middle Manager (которые получаем прямо сейчас из стрима), то Брокер формирует и отправляет отдельные подзапросы к Historical и Middle Manager, где каждый из них выполнит свою часть и вернёт данные Брокеру. Брокер их агрегирует, и отвечает клиенту с финальным результатом.

Далее, попробуем это всё дело развернуть и потыкать веточкой, а потом сверху добавим мониторинга.

Запуск Druid в Kubernetes

Запускаем Minikube, добавляем памяти и ЦПУ, что бы спокойно запускать все поды, тут выделяем 24 RAM и 8 ядер – там Java, пусть кушает:

[simterm]

$ minikube start --memory 12000 --cpus 8
😄  minikube v1.26.1 on Arch "rolling"
✨  Automatically selected the virtualbox driver
👍  Starting control plane node minikube in cluster minikube
🔥  Creating virtualbox VM (CPUs=8, Memory=12000MB, Disk=20000MB) ...
🐳  Preparing Kubernetes v1.24.3 on Docker 20.10.17 ...
    ▪ Generating certificates and keys ...
    ▪ Booting up control plane ...
    ▪ Configuring RBAC rules ...
🔎  Verifying Kubernetes components...
    ▪ Using image gcr.io/k8s-minikube/storage-provisioner:v5
🌟  Enabled addons: storage-provisioner, default-storageclass
🏄  Done! kubectl is now configured to use "minikube" cluster and "default" namespace by default

[/simterm]

Проверяем:

[simterm]

$ minikube status
minikube
type: Control Plane
host: Running
kubelet: Running
apiserver: Running
kubeconfig: Configured

[/simterm]

И ноды:

[simterm]

$ kubectl get node
NAME       STATUS   ROLES           AGE   VERSION
minikube   Ready    control-plane   41s   v1.24.3

[/simterm]

Переходим к установке Apache Druid operator.

Установка Druid Operator

Создаём Namespace для оператора:

[simterm]

$ kubectl create namespace druid-operator
namespace/druid-operator created

[/simterm]

Устанавливаем оператор в этот неймспейс:

[simterm]

$ git clone https://github.com/druid-io/druid-operator.git 
$ cd druid-operator/
$ helm -n druid-operator install cluster-druid-operator ./chart

[/simterm]

Проверяем под:

[simterm]

$ kubectl -n druid-operator get pod
NAME                                     READY   STATUS    RESTARTS   AGE
cluster-druid-operator-9c8c44f78-8svhc   1/1     Running   0          49s

[/simterm]

Переходим к созданию кластера.

Запуск Druid Cluster

Создаём неймспейс:

[simterm]

$ kubectl create ns druid
namespace/druid created

[/simterm]

Устанавливаем ZooKeeper:

[simterm]

$ kubectl -n druid apply -f examples/tiny-cluster-zk.yaml
service/tiny-cluster-zk created
statefulset.apps/tiny-cluster-zk created

[/simterm]

Для создания тестового кластера используем конфиг из примеров – examples/tiny-cluster.yaml.

Создаём кластер:

[simterm]

$ kubectl -n druid apply -f examples/tiny-cluster.yaml
druid.druid.apache.org/tiny-cluster created

[/simterm]

Проверяем ресурсы:

[simterm]

$ kubectl -n druid get all
NAME                                    READY   STATUS              RESTARTS   AGE
pod/druid-tiny-cluster-brokers-0        0/1     ContainerCreating   0          21s
pod/druid-tiny-cluster-coordinators-0   0/1     ContainerCreating   0          21s
pod/druid-tiny-cluster-historicals-0    0/1     ContainerCreating   0          21s
pod/druid-tiny-cluster-routers-0        0/1     ContainerCreating   0          21s
pod/tiny-cluster-zk-0                   1/1     Running             0          40s

NAME                                      TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                      AGE
service/druid-tiny-cluster-brokers        ClusterIP   None         <none>        8088/TCP                     21s
service/druid-tiny-cluster-coordinators   ClusterIP   None         <none>        8088/TCP                     21s
service/druid-tiny-cluster-historicals    ClusterIP   None         <none>        8088/TCP                     21s
service/druid-tiny-cluster-routers        ClusterIP   None         <none>        8088/TCP                     21s
service/tiny-cluster-zk                   ClusterIP   None         <none>        2181/TCP,2888/TCP,3888/TCP   40s

NAME                                               READY   AGE
statefulset.apps/druid-tiny-cluster-brokers        0/1     21s
statefulset.apps/druid-tiny-cluster-coordinators   0/1     21s
statefulset.apps/druid-tiny-cluster-historicals    0/1     21s
statefulset.apps/druid-tiny-cluster-routers        0/1     21s
statefulset.apps/tiny-cluster-zk                   1/1     40s

[/simterm]

Ждём, пока поды перейдут в Running (минут 5 заняло), и пробрасываем порт к Druid Router:

[simterm]

$ kubectl port-forward svc/druid-tiny-cluster-routers 8888:8088 -n druid
Forwarding from 127.0.0.1:8888 -> 8088
Forwarding from [::1]:8888 -> 8088

[/simterm]

Открываем Druid Console – http://localhost:8888:

Можно заглянуть в Druid Services – увидим те же сервисы, что видели в виде Kubernetes-подов:

Вот тут есть неплохое короткое видео с примером загрузки тестовых данных – интереса ради можно пройтись.

Переходим к установке Prometheus.

Установка Kube Prometheus Stack

Создаём неймспейс:

[simterm]

$ kubectl create ns monitoring
namespace/monitoring created

[/simterm]

Устанавливаем KPS:

[simterm]

$ helm -n monitoring install kube-prometheus-stack prometheus-community/kube-prometheus-stack
NAME: kube-prometheus-stack
LAST DEPLOYED: Tue Sep 13 15:24:22 2022
NAMESPACE: monitoring
STATUS: deployed
...

[/simterm]

Проверяем поды:

[simterm]

$ kubectl -n monitoring get po
NAME                                                        READY   STATUS              RESTARTS   AGE
alertmanager-kube-prometheus-stack-alertmanager-0           0/2     ContainerCreating   0          22s
kube-prometheus-stack-grafana-595f8cff67-zrvxv              3/3     Running             0          42s
kube-prometheus-stack-kube-state-metrics-66dd655687-nkxpb   1/1     Running             0          42s
kube-prometheus-stack-operator-7bc9959dd6-d52gh             1/1     Running             0          42s
kube-prometheus-stack-prometheus-node-exporter-rvgxw        1/1     Running             0          42s
prometheus-kube-prometheus-stack-prometheus-0               0/2     Init:0/1            0          22s

[/simterm]

Ждём несколько минут, пока все станут Running и открываем себе доступ к Prometheus:

[simterm]

$ kubectl -n monitoring port-forward svc/kube-prometheus-stack-prometheus 9090:9090
Forwarding from 127.0.0.1:9090 -> 9090
Forwarding from [::1]:9090 -> 9090

[/simterm]

Открываем себе доступ к Grafana:

[simterm]

$ kubectl -n monitoring port-forward svc/kube-prometheus-stack-grafana 8080:80
Forwarding from 127.0.0.1:8080 -> 3000
Forwarding from [::1]:8080 -> 3000

[/simterm]

Grafana dashboard

Получаем пароль от Grafana – находим её Secret:

[simterm]

$ kubectl -n monitoring get secret | grep graf
kube-prometheus-stack-grafana                                  Opaque               3      102s

[/simterm]

И получаем значение:

[simterm]

$ kubectl -n monitoring get secret kube-prometheus-stack-grafana -o jsonpath="{.data.admin-password}" | base64 --decode ; echo
prom-operator

[/simterm]

Для Grafana есть готовая community дашборда, пока добавим её, а вообще потом сделаем свою.

Открываем в браузере localhost:8080, логинимся, переходим в Dashboards > Import:

Указываем ID 12155, загружаем:

Выбираем Prometheus как datasource:

Получаем такую борду, но пока без данных:

Запуск Druid Exporter

Клонируем репозиторий:

[simterm]

$ cd ../
$ git clone https://github.com/opstree/druid-exporter.git
$ cd druid-exporter/

[/simterm]

Проверяем Druid Router Service – нам надо его полное имя, что бы настроить Exporter:

[simterm]

$ kubectl -n druid get svc
NAME                              TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                      AGE
...
druid-tiny-cluster-routers        ClusterIP   None         <none>        8088/TCP                     4m3s
...

[/simterm]

Устанавливаем експортер в неймспейс monitoring, в параметре druidURL указываем URL сервиса Router нашего Друид-кластера, смотрели выше, его порт, включаем создание Kubernetes ServiceMonitor и неймспейс, в котором у нас работает Prometheus (monitoring):

[simterm]

$ helm -n monitoring install druid-exporter ./helm/ --set druidURL="http://druid-tiny-cluster-routers.druid.svc.cluster.local:8088" --set druidExporterPort="8080" --set logLevel="debug" --set logFormat="text" --set serviceMonitor.enabled=true --set serviceMonitor.namespace="monitoring"
NAME: druid-exporter
LAST DEPLOYED: Tue Sep 13 15:28:25 2022
NAMESPACE: monitoring
STATUS: deployed
...

[/simterm]

Открываем доступ к Exporter Service:

[simterm]

$ kubectl -n monitoring port-forward svc/druid-exporter-prometheus-druid-exporter 8989:8080
Forwarding from 127.0.0.1:8989 -> 8080
Forwarding from [::1]:8989 -> 8080

[/simterm]

Проверяем метрики:

[simterm]

$ curl -s localhost:8989/metrics | grep -v '#' | grep druid_
druid_completed_tasks 0
druid_health_status{druid="health"} 1
druid_pending_tasks 0
druid_running_tasks 0
druid_waiting_tasks 0

[/simterm]

Окей – уже что-то, хотя пока мало. Добавляем сбор данных в Prometheus, а потом добавим ещё метрик.

Prometheus ServiceMonitor

Проверяем Prometheus Service Discovery – тут должны быть все наши СервисМониторы:

Проверяем, создан ли Druid ServiceMonitor в Kubernertes :

[simterm]

$ kubectl -n monitoring get servicemonitors
NAME                                             AGE
druid-exporter-prometheus-druid-exporter         87s
...

[/simterm]

Проверяем ресурс Prometheus – его serviceMonitorSelector:

[simterm]

$ kubectl -n monitoring get prometheus -o yaml | yq .items[].spec.serviceMonitorSelector.matchLabels
{
  "release": "kube-prometheus-stack"
}

[/simterm]

Редактируем Druid ServiceMonitor:

[simterm]

$ kubectl -n monitoring edit servicemonitor druid-exporter-prometheus-druid-exporter

[/simterm]

Добавляем лейблу release: kube-prometheus-stack, что бы Prometheus начал с этого ServiceMonitor собирать метрики:

Ждём минуту-две, проверяем мониторы ещё раз:

Проверяем метрики:

 

Окей – теперь у нас есть Druid Cluster, есть Prometheus, который собирает метрики – осталось настроить эти самые метрики.

Apache Druid Monitoring

Настройка мониторинга Друида включает в себя Emitters и Monitors: емитеры “пушат” метрики из Друида “наружу”, а мониторы определяют то, какие именно метрики будут доступны. При этом, для некоторых метрик требуется включать дополнительные расширения.

См:

У всех сервисов есть общие метрики, например query/time, и свои собственные, например для Broker есть метрика sqlQuery/time, которые могут заданы для конкретного сервиса через его runtime.properties.

Druid Metrics Emitters

Что бы включить “эмитинг” метрик – редактируем examples/tiny-cluster.yaml и в common.runtime.properties добавляем:

druid.emitter=http
druid.emitter.logging.logLevel=debug
druid.emitter.http.recipientBaseUrl=http://druid-exporter-prometheus-druid-exporter.monitoring.svc.cluster.local:8080/druid

Сохраняем, обновляем кластер:

[simterm]

$ kubectl -n druid apply -f examples/tiny-cluster.yaml 
druid.druid.apache.org/tiny-cluster configured

[/simterm]

Ждём минуту-две, что бы поды перезапустились и начали собираться метрики, проверяем метрики в Експортере:

[simterm]

$ curl -s localhost:8989/metrics | grep -v '#' | grep druid_ | head
druid_completed_tasks 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-brokers-0:8088",metric_name="avatica-remote-JsonHandler-Handler/Serialization",service="druid-broker"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-brokers-0:8088",metric_name="avatica-remote-ProtobufHandler-Handler/Serialization",service="druid-broker"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-brokers-0:8088",metric_name="avatica-server-AvaticaJsonHandler-Handler/RequestTimings",service="druid-broker"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-brokers-0:8088",metric_name="avatica-server-AvaticaProtobufHandler-Handler/RequestTimings",service="druid-broker"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-brokers-0:8088",metric_name="jetty-numOpenConnections",service="druid-broker"} 1
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-coordinators-0:8088",metric_name="compact-task-count",service="druid-coordinator"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-coordinators-0:8088",metric_name="compactTask-availableSlot-count",service="druid-coordinator"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-coordinators-0:8088",metric_name="compactTask-maxSlot-count",service="druid-coordinator"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-coordinators-0:8088",metric_name="coordinator-global-time",service="druid-coordinator"} 2

[/simterm]

Появились druid_emitted_metrics – отлично.

В них в лейбле exported_service указывается с какого сервиса метрика получена, а в metric_name – какая именно метрика.

Проверяем в Prometheus:

Отлично!

Но хочется больше метрик – “Need more metrics, my lord!” (c)

Druid Monitors

К примеру, хочется знать потребление CPU каждым из процессов/сервисом.

Что бы иметь возможность видеть все системные метрики (без Prometheus Node Exporter) – включаем org.apache.druid.java.util.metrics.SysMonitor, а для данных по работе JVM – добавим org.apache.druid.java.util.metrics.JvmMonitor.

В тот же файл examples/tiny-cluster.yaml в блок common.runtime.properties добавляем:

...
druid.monitoring.monitors=["org.apache.druid.java.util.metrics.SysMonitor", "org.apache.druid.java.util.metrics.JvmMonitor"]
...

Сохраняем, обновляем кластер, ждём рестарта подов, и через пару минут проверяем метрики:

Другое дело!

И дашборда в Графане:

Отлично.

В Running/Failed Tasks у нас всё ещё No Data, так как ничего не запускали. Если загрузить тестовые данные из уже упомянутого виедо – то появятся и эти метрики.

Осталось на самом деле ещё много чего:

  • добавить PostgreSQL как Metadata store
  • настроить сбор метрик с него
  • протестировать работу без ZooKeeper (druid-kubernetes-extensions)
  • потестировать Prometheus Emitter
  • попробовать Indexer вместо Middle Manager
  • настроить сбор логов (куда? пока не придумали, но скорее всего Promtail && Loki)
  • ну и собственно – собрать нормальную дашборду для Графаны

А уже имея мониторинг можно будет погонять какие-то нагрузочные тесты и поиграться с тюнингом производительности кластера, но это всё уже в следующих частях.

Ссылки  по теме