Vector.dev: introduction, AWS S3 logs, and integration with VictoriaLogs

By | 12/21/2024
 

So, we’re back to the topic of AWS VPC Flow Logs, VictoriaLogs, and the Grafana dashboard.

In the post VictoriaLogs: a Grafana dashboard for AWS VPC Flow Logs – migrating from Grafana Loki, we created a cool dashboard to display various statistics on AWS NAT Gateway traffic.

But there is a small drawback: all the data is built from raw logs that are written from VPC Flow Logs to AWS S3, from S3 they are collected by Promtail in AWS Lambda, and then written to VictoriaLogs.

Problem: performance with raw logs processing

In this Grafana dashboard, there are queries like:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>" keep_original_fields
  | filter 
    interface_id:="eni-0352f8c82da6aa229"
    action:=ACCEPT
    pkt_dst_addr:ipv4_range("10.0.32.0/20")
    pkt_dst_addr:~"${kubernetes_pod_ip}"
    pkt_src_addr:~"${remote_svc_ip}"    
  | stats by (pkt_src_addr) sum(bytes) sum_bytes
  | sort by (sum_bytes) desc limit 10

Where with the extract we are getting values for new fields directly from the log records.

And this more or less works, but the maximum period for which we can build graphs is 24 hours (with Loki, it was 30 minutes).

But there’s another way to work with logs: instead of parsing fields directly during the query execution using extract, we can create these fields at the stage of collecting logs from S3, and then use them in queries.

Actually, this could be done directly with the current setup – through Promtail. I did something similar in Grafana Loki: alerts from Ruler and labels from logs, but I don’t want to deal with Grafana’s Lambda Promtail, because I couldn’t even update the version of Promtail in my Docker image – and I don’t remember how I did for the first time. So I still have the Promtail in Lambda that I created back in 2023 – see Loki: collecting logs from CloudWatch Logs using Lambda Promtail.

Thus, instead of Promtail, I decided to try Vector.dev. It’s a bit complicated to set up, but it has many features.

Well, the more features, the more difficult it is to customize a system. However, I managed to do what I wanted, and it turned out to be quite simple, so can try to do it for Production.

Therefore, today we’re going to make a simple Proof of Concept with AWS VPC Flow Logs, Vector.dev, and VictoriaLogs:

  • install a Helm chart with Vector
  • create a new AWS S3, configure VPC Flow Logs with a custom format for writing to this bucket
  • see how we can collect logs from S3 to Vector.dev and add new fields
  • and compare the speed of working with raw logs vs. logs from Vector with fields

Vector.dev

So, what is Vector.dev?

Vector is a high-performance observability data pipeline that puts organizations in control of their observability data. Collect, transform, and route all your logs, metrics, and traces to any vendors

The main idea is to collect any monitoring data, be it metrics or logs, perform some actions on it, and then write it somewhere.

In my case, I need to take a log entry, add some fields to it, and write it to VictoriaLogs.

Components

See Concepts.

We are currently interested in three components:

  • Sources: where we collect data from
  • Transforms: what we do with the data
  • Sinks: where we transfer the processed data further

In our case, Sources will be AWS S3, in Transforms we will parse VPC Flow logs and create new fields, and in Sinks we will use Elasticsearch Sink for VictoriaLogs, see the documentation on Vector setup in VictoriaLogs docs.

In general, Vector has a separate Loki Sink, but it’s more trouble than it’s worth, and with Elasticsearch (or HTTP) everything worked without any problems.

Running Vector.dev in Kubernetes with Helm

The documentation for running from Helm is in Install Vector on Kubernetes, and in the chart itself – README.md.

Add a new repository:

$ helm repo add vector https://helm.vector.dev
"vector" has been added to your repositories
$ helm repo update

Install Vector – so far with default parameters, and later will create our own values.yaml:

$ helm install vector vector/vector
NAME: vector
LAST DEPLOYED: Mon Dec  2 15:13:30 2024
...

Let’s go to the VPC Flow Logs.

Setting up AWS VPC Flow Logs to S3

Next, we need an S3 bucket to which we will write VPC Flow Logs, and an SQS to send notifications when new objects, i.e. logs, are created in S3.

Then Vector will read the messages from this SQS and fetch the logs from S3.

Creating AWS SQS

SQS documentation for S3 – Walkthrough: Configuring a bucket for notifications (SNS topic or SQS queue).

Create a new queue:

Тип – Standart:

Create an Access policy:

{
  "Version": "2012-10-17",
  "Id": "example-ID",
  "Statement": [
    {
      "Sid": "vpc-ops-flow-vmlogs-s3-allow",
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "SQS:SendMessage",
      "c": "arn:aws:sqs:us-east-1:492***148:s3-vector-vmlogs-queue",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "492***148"
        },
        "ArnLike": {
          "aws:SourceArn": "arn:aws:s3:*:*:s3-vector-vmlogs-flow-logs-bucket"
        }
      }
    }
  ]
}

In the Resource, specify the name of our queue, and in the Condition, allow access from our account ID and the S3 bucket named s3-vector-vmlogs-flow-logs-bucket:

Leave the Dead-letter queue parameters as default, click Create, and go to S3.

Creating AWS S3

Create a new S3 bucket with the name s3-vector-vmlogs-flow-logs-bucket – as we set in the SQS Access Policy.

We don’t need an ACL right now, but we leave the Public Access block in the default Block All:

 

Click Create, go to Properties > Event notifications:

Set the Event name, select s3:ObjectCreated:* in Event types:

In the Destination, we set our SQS:

Click Save changes, and go to the VPC Flow Logs.

Creating VPC Flow Logs with S3

Create a new Flow Log.

If your VPC is created with Terraform, you can use the aws_flow_log resource:

resource "aws_flow_log" "vpc_flow_vector" {
  vpc_id               = module.vpc.vpc_id
  log_destination      = "arn:aws:s3:::s3-vector-vmlogs-flow-logs-bucket"
  log_destination_type = "s3"
  traffic_type         = "ALL"
  log_format = "$${region} $${vpc-id} $${az-id} $${subnet-id} $${instance-id} $${interface-id} $${flow-direction} $${srcaddr} $${dstaddr} $${srcport} $${dstport} $${pkt-srcaddr} $${pkt-dstaddr} $${pkt-src-aws-service} $${pkt-dst-aws-service} $${traffic-path} $${packets} $${bytes} $${action}"
  tags = {
    "Name" = "flow-logs-s3-to-vector"
  }
}

Or we can do it manually – go to the VPC, the Flow logs tab, click Create flow log – here I already have two Flow Logs for Promtail Lambda:

In the Destination field, set the Send to an Amazon S3 bucket and specify the ARN of our bucket:

I always use Custom format with additional fields:

Save it and check the status:

Everything is green and working.

You can wait for 10 minutes (the default log delivery period) and check the data in S3 itself:

And the Monitoring tab in SQS:

Setting up Vector.dev

And now for the most interesting part.

So, what do we need to do?

  • add an S3 Source with the SQS parameter – where we will collect logs from
  • add a Transformation – create new fields
  • and add a Sink for the VictoriaLogs – where we will write

That is, a kind of pipeline is created – Source collects data, Transform transforms it, and Sink transfers the processed data further, in our case to VictoriaLogs.

AWS S3 source documentation is here>>>.

The documentation for Transformations is here>>>.

Documentation on all Sinks is here>>>, and on Loki is here>>>, but we will use another one, Elasticsearch.

Documentation for Elasticsearch Sink in Vector.dev is here>>>, and documentation for Elasticsearch data ingest in VictoriaLogs is here>>>.

You may also be interested in how to collect logs from regular files with Vector – here>>>.

And another interesting use case is to collect Kubernetes logs and push them to AWS S3 – see How to Collect, Transform, and Ship Logs from AWS S3 to Codegiant Observability Using Vector.

Having figured out the documentation, let’s go to the configuration.

Vector.dev: Sources – S3

First, let’s configure the collection of logs from the AWS S3 bucket. To do this, we need the following parameters:

  • type: aws_s3
  • auth: how we will perform authentication
    • for now, we will make it with the common Access/Secret keys, but when we’ll run it in Production, we will add EKS Pod Identity with an IAM Role, which will allow Kubernetes Pod from Vector to access S3 and SQS
  • sqs.queue_url: where Vector will receive information that new logs have appeared in S3

We will set the parameters through the Helm chart values and the customConfig parameter, which has an important comment:

# customConfig — Override Vector’s default configs, if used **all** options need to be specified.

So now, the configuration will be as follows:

image:
  repository: timberio/vector
  pullPolicy: IfNotPresent

replicas: 1

service:
  enabled: false

customConfig:

  sources:
    s3-vector-vmlogs-flow-logs-bucket: # source name to be used later in Transforms
      type: aws_s3
      region: us-east-1
      compression: gzip
      auth:
        region: us-east-1
        access_key_id: AKI***B7A
        secret_access_key: pAu***2gW
      sqs:
        queue_url: https://sqs.us-east-1.amazonaws.com/492***148/s3-vector-vmlogs-queue

Vector.dev: Transforms – remap, and VRL

There are many Transforms, but we are now interested in the remap, in which we can perform many operations using the Vector Remap Language (VRL).

VRL is a domain-specific language (DSL) for Vector.dev itself, which has various functions to work with data.

There’s even a VRL Playground where you can try out what works and how it works.

Of the things that may be of interest to us are Parse functions, namely the parse_aws_vpc_flow_log function. And to work with AWS Load Balancer logs, there is the parse_aws_alb_log function.

The parse_aws_vpc_flow_log itself is described here – parse_aws_vpc_flow_log.rs.

And there are examples here – VRL example reference.

What we can do with it is to pass it the data from our logs as input, and set a custom format.

The simplest configuration, which actually works the way I want it to, looks like this:

...
  transforms:

    s3-vector-vmlogs-flow-logs-transform:
      type: remap
      inputs:
        - s3-vector-vmlogs-flow-logs-bucket # a name from the 'sources', can have several Inputs
      source: |
        . = parse_aws_vpc_flow_log!(
          .message,
          format: "region vpc_id az_id subnet_id instance_id interface_id flow_direction srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path packets bytes action"
        )

If you want to perform any operations on the fields, you can do it this way:

...
      source: |
        .parsed = parse_aws_vpc_flow_log!(
          .message,
          format: "region vpc_id az_id subnet_id instance_id interface_id flow_direction srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path packets bytes action"
        )

        .region = .parsed.region
        .vpc_id = .parsed.vpc.id
        .az_id = .parsed.az_id
        .subnet_id = .parsed.subnet_id
        .instance_id = .parsed.instance_id
        .interface_id = .parsed.interface_id
        .account_id = .parsed.account_id
        .srcaddr = .parsed.srcaddr
        .dstaddr = .parsed.dstaddr
        .srcport = .parsed.srcport
        .dstport = .parsed.dstport
        .protocol = .parsed.protocol
        .packets = to_int(.parsed.packets)
        .bytes = to_int(.parsed.bytes)

        del(.parsed)
...

Here, we create our own region, vpc_id, etc. fields, convert the packets and bytes fields to integer type, and finally remove the entire message from the .parsed by calling the Path function del().

But in current case, everything works fine without this, I just experimented with different options.

Vector.dev: Sinks – Elasticsearch and VictoriaLogs

And the last thing we need to do is create a Sink.

I tried to do this with Loki Sink, but it failed to format the new fields properly, so on the recommendation of the VictoriaLogs developers, I just used Elasticsearch Sink.

Let’s describe our configuration:

...
  sinks:

    s3-flow-logs-to-victorialogs:
      inputs:
        - s3-vector-vmlogs-flow-logs-transform # a Transform name to get processed data from
      type: elasticsearch
      endpoints:
        - http://atlas-victoriametrics-victoria-logs-single-server:9428/insert/elasticsearch/ # VictoriaLogs Kubernetes Service URL and Elasticsearch endpoint
      api_version: v8
      compression: gzip
      healthcheck:
        enabled: false
      query: # HTTP query params
        extra_fields: source=vector # add a custom label
        # _msg_field: message # ommited here, as we have everything in the fields from the Transform, but may be used for other data
        _time_field: timestamp # set the '_time' field for the VictoriaLogs
        _stream_fields: source,vpc_id,az_id # create Stream fields for the VictoriaLogs to save data in a dedicated Stream; specify fields without spaces

Actually, I’ve added everything here in the comments, but let’s go through it again:

  • inputs: set the name of the Transform from which we take data
  • endpoints: pass the address of VictoriaLogs Service in our Kubernetes cluster
  • healthcheck: disable, because VictoriaLogs does not support /ping endpoint yet (UPD: already have)
  • query: pass additional parameters, see VictoriaLogs HTTP
    • in the _stream_fields describe which fields VictoriaLogs will use to create a log stream – see Stream fields

The entire values now looks like this:

image:
  repository: timberio/vector
  pullPolicy: IfNotPresent

replicas: 1

service:
  enabled: false

customConfig:

  sources:
    s3-vector-vmlogs-flow-logs-bucket: # source name to be used later in Transforms
      type: aws_s3
      region: us-east-1
      compression: gzip
      auth:
        region: us-east-1
        access_key_id: AKI***B7A
        secret_access_key: pAu***2gW
      sqs:
        queue_url: https://sqs.us-east-1.amazonaws.com/492***148/s3-vector-vmlogs-queue

  transforms:

    s3-vector-vmlogs-flow-logs-transform: # a name from the 'sources', can have several Inputs
      type: remap
      inputs:
        - s3-vector-vmlogs-flow-logs-bucket
      source: |
        . = parse_aws_vpc_flow_log!(
          .message,
          format: "region vpc_id az_id subnet_id instance_id interface_id flow_direction srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path packets bytes action"
        )

  sinks:

    s3-flow-logs-to-victorialogs:
      inputs:
        - s3-vector-vmlogs-flow-logs-transform # a Transform name to get processed data from
      type: elasticsearch
      endpoints:
        - http://atlas-victoriametrics-victoria-logs-single-server:9428/insert/elasticsearch/ # VictoriaLogs Kubernetes Service URL and Elasticsearch endpoint
      api_version: v8
      compression: gzip
      healthcheck:
        enabled: false
      query: # HTTP query params
        extra_fields: source=vector # add a custom label
        # _msg_field: message # ommited here, as we have everything in the fields from the Transform, but may be used for other data
        _time_field: timestamp # set the '_time' field for the VictoriaLogs
        _stream_fields: source,vpc_id,az_id # create Stream fields for the VictoriaLogs to save data in a dedicated Stream; specify fields without spaces

Let’s deploy our changes:

$ helm upgrade --install vector vector/vector -f vector-values.yaml

For some reason, the srcport field from Flow Logs is not being processed correctly:

ERROR transform{component_kind="transform" component_id=s3-vector-vmlogs-flow-logs-transform component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"parse_aws_vpc_flow_log\" at (4:254): failed to parse value as i64 (key: `srcport`): `srcport`" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true

I don’t know why, because the field is the same in both Flow Logs and our custom format. But it doesn’t seem to affect anything, I’ll make a GitHub Issue later and ask.

Wait for the data to come from S3 and check it in our VictoriaLogs using _stream: {source="vector", vpc_id="vpc-0fbaffe234c0d81ea", az_id="use1-az2"} – the fields we’ve set in _stream_fields:

Yay!

“It works!” (c)

Grafana and VictoriaLogs

Let’s see how it all works in Grafana.

First, let’s just check the data there:

I have such a Table panel in my Grafana dashboard:

With such a query:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>" keep_original_fields
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_dst_addr:ipv4_range("10.0.32.0/20")
      pkt_dst_addr:~"${kubernetes_pod_ip}"
      pkt_src_addr:~"${remote_svc_ip}"
  | stats by (pkt_src_addr, src_port, pkt_dst_addr, dst_port) sum(bytes) bytes_total
  | sort by (bytes_total) desc limit 10

Let’s rewrite this query for the new data – use a new stream, and remove the extract and filter, because now there are ready-made fields – we select directly on them:

{source="vector", vpc_id="vpc-0fbaffe234c0d81ea", az_id="use1-az2"} interface_id:="eni-0352f8c82da6aa229" action:="ACCEPT" pkt_dstaddr:ipv4_range("10.0.32.0/20")
  | stats by (pkt_srcaddr, srcport, pkt_dstaddr, dstport) sum(bytes) bytes_total 
  | sort by (bytes_total) desc

Performance: “raw logs” vs “fielded logs”

Let’s compare the speed of such a query with a query from raw logs.

The old query, using the raw logs, let’s take 3 hours period:

A new query, with the fields from the Vector.dev, in the same 3-hour period:

The difference is 2 times.

At the same time, the resources of Vector:

$ kk top pod vector-0
NAME       CPU(cores)   MEMORY(bytes)   
vector-0   3m           104Mi           

And VictoriaLogs:

$ kk top pod atlas-victoriametrics-victoria-logs-single-server-0
atlas-victoriametrics-victoria-logs-single-server-0   12m   840Mi

Now can try to run this setup in Production.

Useful links