Apache Druid: overview, running in Kubernetes and monitoring with Prometheus

By | 09/18/2022
 

Apache Druid is a columnar database, focused on working with large amounts of data, combining the features and benefits of Time-Series Database, Data Warehouse, and a search engine.

The general task is to set up monitoring of the Druid cluster in Kubernetes, so at first, we will see what it is in general and how it works, and then we launch the Druid to configure its monitoring.

To run Druid itself in Kubernetes, we use druid-operator, to collect metrics – druid-exporter, to monitor – Kubernetes Prometheus Stack, and we will run all this in Minikube.

Apache Druid overview

The main feature of Druid for me personally is that the system was originally developed for use in clouds such as AWS and Kubernetes, therefore it has excellent scaling, data storage, and recovery capabilities in case one of the services crashes.

Also, Apache Druid is:

  • columnar data storage format: only the necessary table is loaded for processing, which significantly increases the speed of query processing. In addition, for fast scanning and data aggregation, Druid optimizes column storage according to its data type.
  • scalable distributed system: Druid clusters can be located on tens and hundreds of individual servers
  • data parallel processing: Druid can process each request in parallel on independent instances
  • self-healing and self-balancing: at any time, you can add or remove new nodes to the cluster, while the cluster itself performs balancing requests and switching between them without any downtime. Moreover, Druid allows you to perform version upgrades without downtime.
  • actually, cloud-native and fault-tolerant architecture: Druid was originally designed to work in distributed systems. After receiving new data, he immediately copies them to his Deep Storage, which can be data storage services such as Amazon S3, HDFS, or a network file system, and allows you to restore data even if all Druid servers fall. In a case when only a part of the Druid’s servers fell – the built-in data replication will allow you to continue executing queries (it’s already itching me to kill his nodes and look at the Druid’s reaction – but not this time)
  • indexes for fast searches: Druid uses compressed Bitmap indexes, allowing you to quickly search across multiple columns
  • Time-based partitioning: by default, Druid partitions data by time, allowing additional segments to be created across other fields. As a result, when executing queries with time intervals, only the necessary data segments will be used

Druid Architecture and Components

General architecture:

Or like this:

See Processes and servers.

Three-server configuration – Master, Query, Data

Druid has several internal services used to work with data, and these services can be grouped into three types of servers – Master, Query, and Data, see Pros and cons of colocation.

Master servers – managing the addition of new data and availability (ingest/indexing) – responsible for launching new tasks to add new data and coordinating data availability:

  • Coordinator: responsible for placing data segments on specific nodes with Historical processes
  • Overlord: responsible for placing tasks for adding incoming data to Middle Managers and for coordinating the transfer of created segments to the Deep Store

Query servers – customer requests management

  • Broker: receives requests from clients, determines which of the Historical or Middle Manager processes/nodes contain the necessary segments, and from the initial client request, forms and sends a sub-query (sub-query) to each of these processes, after which it receives responses from them, forms a general response from them with data for the client, and sends it to him.
    At the same time, Historicals responds to subqueries that refer to data segments that are already stored in the Deep Store, and Middle Manager responds to queries that refer to recently received data that is still in memory and not sent to the Deep Store
  • Router: an optional service that provides a common API for working with Brokers, Overlords, and Coordinators, although you can use them directly. In addition, Router provides Druid Console – WebUI for working with the cluster, we will see it a little later

Data servers – manage the addition of new data to the cluster and store data for clients:

  • Historical: “main workhorses” (c) of the Druid, who serve the storage and queries for “historical” data, i.e. those that are already in the Deep Store – download segments from there to the local disk of the host on which the Historical process is running, and generate responses based on this data for clients
  • Middle Manager: handles adding new data to the cluster – they read data from external sources and create new data segments from them, which are then loaded into the Deep Store
    • Peons: multiple tasks can be responsible for adding data to a cluster. To log and isolate resources, the Middle Manager creates Peons, which are separate JVM processes and are responsible for performing a specific task from the Middle Manager. Always runs on the same host where the Middle Manager process that spawned the Peon process is running.
  • Indexer: an alternative to Middle Manager and Peon – instead of creating a separate JVM for each task from Middle Manager, Indexer executes them in separate threads of its JVM.
    Developed as a simpler alternative to Middle Manager and Peon, currently an experimental feature, but will replace Middle Manager and Peon in the future

In addition to internal processes, Druid also uses external services for work – Deep storage, Metadata storage, and ZooKeeper:

  • deep storage: is used to store all the data added to the system and is a distributed storage available to each Druid server. It can be Amazon S3, HDFS, or NFS
  • metadata storage: used to store internal metadata such as segment usage information and current tasks. The role of such storage can be classic SUDB such as PostgreSQL or MySQL
  • ZooKeeper: used for service discovery and coordination of services (in Kubernetes, you can use druid-kubernetes-extensions instead, we’ll try it too, but another time)

Druid data flow

And let’s see how data is added (Data ingest) and answers to clients (Data query, Query Answering).

Data ingest

Adding data can be divided into two stages: in the first, Middle Manager instances run indexing tasks, these tasks create data segments and send them to the Deep Store.

In the second, Historical instances download data segments from the Deep store to use them when forming responses to customer inquiries.

The first stage is getting data from external sources, indexing and creating data segments, and uploading them to the Deep store:

At the same time, in the process of creating data segments, they are already available for executing queries on them.

In the second stage – the Coordinator polls the Metadata store in search of new segments. As soon as the Coordinator process finds them, it selects the Historical instance, which must download the segment from the Deep store so that it becomes available for processing requests:

The Coordinator polls the Metadata Store for new segments. As soon as it finds them, the Coordinator selects the Historical instance, which should download the segment from the Deep store. When the segment is loaded – Historical is ready to use it to process requests

Data query

Clients send requests to Broker directly or via Router.

When such a request is received, the Broker determines which processes are servicing the required data segments.

If the data requires segments both from the Deep store (some old ones) and from the Middle Manager (which we get right now from the stream), then the Broker forms and sends separate subrequests to the Historical and Middle Managers, where each of them will perform its part and will return the data to the Broker. The broker aggregates them and answers the client with the final result.

Next, let’s try to deploy this whole thing and poke it with a twig, and then add monitoring from above.

Running Druid in Kubernetes

Run Minikube, add memory and CPU to run all the pods quietly, here we allocate 24 RAM and 8 cores – Java is there, let it eat:

[simterm]

$ minikube start --memory 12000 --cpus 8
😄  minikube v1.26.1 on Arch "rolling"
✨  Automatically selected the virtualbox driver
👍  Starting control plane node minikube in cluster minikube
🔥  Creating virtualbox VM (CPUs=8, Memory=12000MB, Disk=20000MB) ...
🐳  Preparing Kubernetes v1.24.3 on Docker 20.10.17 ...
    ▪ Generating certificates and keys ...
    ▪ Booting up control plane ...
    ▪ Configuring RBAC rules ...
🔎  Verifying Kubernetes components...
    ▪ Using image gcr.io/k8s-minikube/storage-provisioner:v5
🌟  Enabled addons: storage-provisioner, default-storageclass
🏄  Done! kubectl is now configured to use "minikube" cluster and "default" namespace by default

[/simterm]

Check:

[simterm]

$ minikube status
minikube
type: Control Plane
host: Running
kubelet: Running
apiserver: Running
kubeconfig: Configured

[/simterm]

And nodes:

[simterm]

$ kubectl get node
NAME       STATUS   ROLES           AGE   VERSION
minikube   Ready    control-plane   41s   v1.24.3

[/simterm]

Let’s move on to installing the Apache Druid operator.

Druid Operator install

Create a Namespace for the operator:

[simterm]

$ kubectl create namespace druid-operator
namespace/druid-operator created

[/simterm]

Set the operator to this namespace:

[simterm]

$ git clone https://github.com/druid-io/druid-operator.git 
$ cd druid-operator/
$ helm -n druid-operator install cluster-druid-operator ./chart

[/simterm]

Check pod:

[simterm]

$ kubectl -n druid-operator get pod
NAME                                     READY   STATUS    RESTARTS   AGE
cluster-druid-operator-9c8c44f78-8svhc   1/1     Running   0          49s

[/simterm]

Move on to creating a cluster.

Spin Up Druid Cluster

Create a namespace:

[simterm]

$ kubectl create ns druid
namespace/druid created

[/simterm]

Install Zookeeper:

[simterm]

$ kubectl -n druid apply -f examples/tiny-cluster-zk.yaml
service/tiny-cluster-zk created
statefulset.apps/tiny-cluster-zk created

[/simterm]

To create a test cluster, use the config from the examples – examples/tiny-cluster.yaml.

Create a cluster:

[simterm]

$ kubectl -n druid apply -f examples/tiny-cluster.yaml
druid.druid.apache.org/tiny-cluster created

[/simterm]

Check resources:

[simterm]

$ kubectl -n druid get all
NAME                                    READY   STATUS              RESTARTS   AGE
pod/druid-tiny-cluster-brokers-0        0/1     ContainerCreating   0          21s
pod/druid-tiny-cluster-coordinators-0   0/1     ContainerCreating   0          21s
pod/druid-tiny-cluster-historicals-0    0/1     ContainerCreating   0          21s
pod/druid-tiny-cluster-routers-0        0/1     ContainerCreating   0          21s
pod/tiny-cluster-zk-0                   1/1     Running             0          40s

NAME                                      TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                      AGE
service/druid-tiny-cluster-brokers        ClusterIP   None         <none>        8088/TCP                     21s
service/druid-tiny-cluster-coordinators   ClusterIP   None         <none>        8088/TCP                     21s
service/druid-tiny-cluster-historicals    ClusterIP   None         <none>        8088/TCP                     21s
service/druid-tiny-cluster-routers        ClusterIP   None         <none>        8088/TCP                     21s
service/tiny-cluster-zk                   ClusterIP   None         <none>        2181/TCP,2888/TCP,3888/TCP   40s

NAME                                               READY   AGE
statefulset.apps/druid-tiny-cluster-brokers        0/1     21s
statefulset.apps/druid-tiny-cluster-coordinators   0/1     21s
statefulset.apps/druid-tiny-cluster-historicals    0/1     21s
statefulset.apps/druid-tiny-cluster-routers        0/1     21s
statefulset.apps/tiny-cluster-zk                   1/1     40s

[/simterm]

Wait for the pods to switch to Running (it took about 5 minutes), and forward the port to the Druid Router:

[simterm]

$ kubectl port-forward svc/druid-tiny-cluster-routers 8888:8088 -n druid
Forwarding from 127.0.0.1:8888 -> 8088
Forwarding from [::1]:8888 -> 8088

[/simterm]

Open Druid Console – http://localhost:8888:

You can look into Druid Services – we will see the same services that we saw in the form of Kubernetes pods:

Here is a good short video with an example of loading test data – for the sake of interest, you can walk.

Let’s move on to installing Prometheus.

Installing Kube Prometheus Stack

Create a namespace:

[simterm]

$ kubectl create ns monitoring
namespace/monitoring created

[/simterm]

Install KPS:

[simterm]

$ helm -n monitoring install kube-prometheus-stack prometheus-community/kube-prometheus-stack
NAME: kube-prometheus-stack
LAST DEPLOYED: Tue Sep 13 15:24:22 2022
NAMESPACE: monitoring
STATUS: deployed
...

[/simterm]

Check pods:

[simterm]

$ kubectl -n monitoring get po
NAME                                                        READY   STATUS              RESTARTS   AGE
alertmanager-kube-prometheus-stack-alertmanager-0           0/2     ContainerCreating   0          22s
kube-prometheus-stack-grafana-595f8cff67-zrvxv              3/3     Running             0          42s
kube-prometheus-stack-kube-state-metrics-66dd655687-nkxpb   1/1     Running             0          42s
kube-prometheus-stack-operator-7bc9959dd6-d52gh             1/1     Running             0          42s
kube-prometheus-stack-prometheus-node-exporter-rvgxw        1/1     Running             0          42s
prometheus-kube-prometheus-stack-prometheus-0               0/2     Init:0/1            0          22s

[/simterm]

Wait a few minutes until everyone becomes Running, and open access to Prometheus:

[simterm]

$ kubectl -n monitoring port-forward svc/kube-prometheus-stack-prometheus 9090:9090
Forwarding from 127.0.0.1:9090 -> 9090
Forwarding from [::1]:9090 -> 9090

[/simterm]

Open access to Grafana:

[simterm]

$ kubectl -n monitoring port-forward svc/kube-prometheus-stack-grafana 8080:80
Forwarding from 127.0.0.1:8080 -> 3000
Forwarding from [::1]:8080 -> 3000

[/simterm]

Grafana dashboard

To get the password from Grafana – we find its Secret:

[simterm]

$ kubectl -n monitoring get secret | grep graf
kube-prometheus-stack-grafana                                  Opaque               3      102s

[/simterm]

Get its value:

[simterm]

$ kubectl -n monitoring get secret kube-prometheus-stack-grafana -o jsonpath="{.data.admin-password}" | base64 --decode ; echo
prom-operator

[/simterm]

For Grafana there is a ready community дашборда, we can add it for now, but later then we will make our own.

Open localhost:8080 in a browser, log in and go to Dashboards > Import:

Set ID 12155, load:

Choose Prometheus as a datasource:

And we got a dashboard, but so far without any data:

Running Druid Exporter

Clone the repository:

[simterm]

$ cd ../
$ git clone https://github.com/opstree/druid-exporter.git
$ cd druid-exporter/

[/simterm]

Check Druid Router’s Kubernetes Service – we need its full name to configure the Exporter:

[simterm]

$ kubectl -n druid get svc
NAME                              TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                      AGE
...
druid-tiny-cluster-routers        ClusterIP   None         <none>        8088/TCP                     4m3s
...

[/simterm]

Install the exporter in the namespace monitoring, in the druidURL parameter we specify the URL of the Router service of our Druid cluster we just looked above, its port, then we enable the creation of Kubernetes ServiceMonitor and the namespace in which Prometheus (monitoring) works for us:

[simterm]

$ helm -n monitoring install druid-exporter ./helm/ --set druidURL="http://druid-tiny-cluster-routers.druid.svc.cluster.local:8088" --set druidExporterPort="8080" --set logLevel="debug" --set logFormat="text" --set serviceMonitor.enabled=true --set serviceMonitor.namespace="monitoring"
NAME: druid-exporter
LAST DEPLOYED: Tue Sep 13 15:28:25 2022
NAMESPACE: monitoring
STATUS: deployed
...

[/simterm]

Open access to the Exporter Service:

[simterm]

$ kubectl -n monitoring port-forward svc/druid-exporter-prometheus-druid-exporter 8989:8080
Forwarding from 127.0.0.1:8989 -> 8080
Forwarding from [::1]:8989 -> 8080

[/simterm]

Check metrics:

[simterm]

$ curl -s localhost:8989/metrics | grep -v '#' | grep druid_
druid_completed_tasks 0
druid_health_status{druid="health"} 1
druid_pending_tasks 0
druid_running_tasks 0
druid_waiting_tasks 0

[/simterm]

Okay – already something, although not enough yet. Let’s add data collection to Prometheus, and then we will add more metrics.

Prometheus ServiceMonitor

Check Prometheus Service Discovery – all our ServiceMonitors should be here:

Check if Druid ServiceMonitor is created in Kubernetes:

[simterm]

$ kubectl -n monitoring get servicemonitors
NAME                                             AGE
druid-exporter-prometheus-druid-exporter         87s
...

[/simterm]

Checking the Prometheus resource – its serviceMonitorSelector:

[simterm]

$ kubectl -n monitoring get prometheus -o yaml | yq .items[].spec.serviceMonitorSelector.matchLabels
{
  "release": "kube-prometheus-stack"
}

[/simterm]

Edit the Druid ServiceMonitor:

[simterm]

$ kubectl -n monitoring edit servicemonitor druid-exporter-prometheus-druid-exporter

[/simterm]

Add the label release: kube-prometheus-stack, so that Prometheus starts collecting metrics from this ServiceMonitor:

Wait a minute or two, check the monitors again:

Check metrics:

 

Okay – now we have Druid Cluster, we have Prometheus, which collects metrics – it remains to configure these same metrics.

Apache Druid Monitoring

Druid’s monitoring setup includes Emitters and Monitors: emitters “push” metrics from the Druid “out”, and monitors determine which metrics will be available. At the same time, for some metrics it is required to include additional extensions.

См:

All services have common metrics, for example query/time, and your own, for example, there is a metric for Broker named sqlQuery/time, and these additional metrics can be enabled for a specific service through its runtime.properties.

Druid Metrics Emitters

To enable metrics emission – edit examples/tiny-cluster.yaml, and to the common.runtime.properties add the followong:

druid.emitter=http
druid.emitter.logging.logLevel=debug
druid.emitter.http.recipientBaseUrl=http://druid-exporter-prometheus-druid-exporter.monitoring.svc.cluster.local:8080/druid

Save, update the cluster:

[simterm]

$ kubectl -n druid apply -f examples/tiny-cluster.yaml 
druid.druid.apache.org/tiny-cluster configured

[/simterm]

Wait a minute or two for the pods to restart and start collecting metrics, check the metrics in the Exporter:

[simterm]

$ curl -s localhost:8989/metrics | grep -v '#' | grep druid_ | head
druid_completed_tasks 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-brokers-0:8088",metric_name="avatica-remote-JsonHandler-Handler/Serialization",service="druid-broker"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-brokers-0:8088",metric_name="avatica-remote-ProtobufHandler-Handler/Serialization",service="druid-broker"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-brokers-0:8088",metric_name="avatica-server-AvaticaJsonHandler-Handler/RequestTimings",service="druid-broker"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-brokers-0:8088",metric_name="avatica-server-AvaticaProtobufHandler-Handler/RequestTimings",service="druid-broker"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-brokers-0:8088",metric_name="jetty-numOpenConnections",service="druid-broker"} 1
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-coordinators-0:8088",metric_name="compact-task-count",service="druid-coordinator"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-coordinators-0:8088",metric_name="compactTask-availableSlot-count",service="druid-coordinator"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-coordinators-0:8088",metric_name="compactTask-maxSlot-count",service="druid-coordinator"} 0
druid_emitted_metrics{datasource="",host="druid-tiny-cluster-coordinators-0:8088",metric_name="coordinator-global-time",service="druid-coordinator"} 2

[/simterm]

Got new druid_emitted_metrics – nice.

In the label exported_service we can see a Druid’s service and in the metric_name – which metric exactly.

CHech with Prometheus:

Perfect!

But I want more metrics – “Need more metrics, my lord!” (c)

Druid Monitors

For example, I want to know the CPU consumption of each of the processes/services.

To be able to see all system metrics (without Prometheus Node Exporter) – include org.apache.druid.java.util.metrics.SysMonitor, and for data from JVM – add org.apache.druid.java.util.metrics.JvmMonitor.

In the same examples/tiny-cluster.yaml in the block common.runtime.properties add the following:

...
druid.monitoring.monitors=["org.apache.druid.java.util.metrics.SysMonitor", "org.apache.druid.java.util.metrics.JvmMonitor"]
...

Save, update the cluster, wait for the restart of the pods, and after a couple of minutes check the metrics:

And a dashboard in Grafana:

Okay, we did it.

In Running/Failed Tasks, we still have No Data, since we didn’t run anything. If you load test data from the already mentioned video, then these metrics will appear.

There’s actually a lot more left:

  • add PostgreSQL as Metadata store
  • configure the collection of metrics from it
  • test work without ZooKeeper (druid-kubernetes-extensions)
  • test Prometheus Emitter
  • try Indexer instead of Middle Manager
  • configure the collection of logs (where? haven’t figured it out yet, but most likely Promtail && Loki)
  • well, actually – to assemble a normal dashboard for Grafana

And already having monitoring, it will be possible to drive some load tests and play around with cluster performance tuning, but this is all in the next parts.

Related links