Vector.dev: знайомство, логи з AWS S3 та інтеграція з VictoriaLogs

Автор |  17/12/2024
 

Отже, знов повертаємось до теми AWS VPC Flow Logs, VictoriaLogs, та Grafana dashboard.

В пості VictoriaLogs: дашборда в Grafana з AWS VPC Flow Logs – мігруємо з Grafana Loki ми створили прикольну дашборду для відображення різної статистики по трафіку AWS NAT Gateway.

Але там є маленький недолік – всі дані будуються з raw logs, які пишуться з VPC Flow Logs в AWS S3, з S3 їх збирає Promtail в AWS Lambda, і потім пише до VictoriaLogs.

Проблема: перформанс з raw logs

В цій Grafana dashboard з VictoriaLogs виконуються запити типу:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>" keep_original_fields
  | filter 
    interface_id:="eni-0352f8c82da6aa229"
    action:=ACCEPT
    pkt_dst_addr:ipv4_range("10.0.32.0/20")
    pkt_dst_addr:~"${kubernetes_pod_ip}"
    pkt_src_addr:~"${remote_svc_ip}"    
  | stats by (pkt_src_addr) sum(bytes) sum_bytes
  | sort by (sum_bytes) desc limit 10

Де з extract отримуємо значення для нових полів прямо із логу.

І все це більш-менш працює, але максимальний період, за який вдається побудувати графіки – 24 години (з Loki було взагалі 30 хвилин).

Але є інший варіант роботи з логами: замість того, аби парсити поля прямо під час виконання запиту з використанням exctract – ми можемо створювати ці поля ще на етапі збору логів з S3, і далі в запитах використовувати вже їх.

В принципі, це можна було б зробити прямо з поточним сетапом – через Promtail. Щось схоже я робив в Grafana Loki: alerts from Ruler and labels from logs, але – ну не хочеться мені мати справу з Lambda Promtail від Grafana, бо мені навіть не вдалося оновити версію Promtail в моєму Docker image – а я не пам’ятаю, як робив перший. Тому в мене Promtail в Lambda досі той, який я створив ще у 2023 році – див. Loki: збір логів з CloudWatch Logs з використанням Lambda Promtail.

Тому замість Promtail вирішив спробувати Vector.dev. Він трохи складний в налаштуванні, але має просто безліч можливостей.

Власне, чим більше можливостей – тим більш складно налаштувати систему. Втім, мені все ж вдалось зробити те, чого я хотів, і вийшло навіть доволі просто, тому можна пробувати робити це для Production.

Тож сьогодні зробимо простенький Proof of Concept з Flow Logs, Vector.dev та VictoriaLogs:

  • встановимо Helm-чарт з Vector
  • створимо новий AWS S3, налаштуємо VPC Flow Logs з custom format для запису в цей бакет
  • подивимось, як ми можемо збирати логи з S3 до Vector.dev і додавати нові поля
  • і порівняємо швидкість роботи з raw logs vs логи з Vector з полями

Vector.dev

Отже, що таке Vector.dev?

Vector is a high-performance observability data pipeline that puts organizations in control of their observability data. Collecttransform, and route all your logs, metrics, and traces to any vendors

Тобто основна ідея – збирати будь-які дані моніторингу, будь то метрики або логи, виконувати над ними якісь дії, і потім кудись писати.

В моєму випадку мені треба взяти запис лога, додати до нього якісь поля, і записати до VictoriaLogs.

Components

Див. Concepts.

Нас зараз цікавлять три компоненти:

  • Sources: звідки збираємо дані
  • Transforms: що ми з даними робимо
  • Sinks: куди ми оброблені дані передаємо далі

В нашому випадку Sources буде AWS S3, в Transforms – будемо парсити логи VPC FLow logs і створювати нові fields, а в Sinks – використаємо Elasticsearch Sink для VictoriaLogs, див. документацію по Vector setup в VictoriaLogs docs.

Взагалі, Vector має окремий Loki Sink, але з ним більше проблем, ніж користі, а з Elasticsearch (або HTTP) все запрацювало без проблем.

Запуск в Kubernetes з Helm

Документація по запуску з Helm – в Install Vector on Kubernetes та в самому чарті – README.md.

Додаємо собі новий репозиторій:

$ helm repo add vector https://helm.vector.dev
"vector" has been added to your repositories
$ helm repo update

Встановлюємо Vector – поки з дефолтними параметрами, потім створимо власний values.yaml:

$ helm install vector vector/vector
NAME: vector
LAST DEPLOYED: Mon Dec  2 15:13:30 2024
...

Переходимо до VPC Flow Logs.

Налаштування AWS VPC Flow Logs до S3

Далі, нам потрібна S3 корзина, в яку ми будемо писати VPC Flow Logs, і SQS, в яку будуть відправлятись повідомлення, коли в S3 створюються нові об’єкти, тобто логи.

Потім Vector буде читати повідомлення з цієї SQS, і забирати логи з S3.

Створення AWS SQS

Документація по SQS для S3 – Walkthrough: Configuring a bucket for notifications (SNS topic or SQS queue).

Створюємо нову чергу:

Тип – Standart:

Задаємо Access policy:

{
  "Version": "2012-10-17",
  "Id": "example-ID",
  "Statement": [
    {
      "Sid": "vpc-ops-flow-vmlogs-s3-allow",
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "SQS:SendMessage",
      "c": "arn:aws:sqs:us-east-1:492***148:s3-vector-vmlogs-queue",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "492***148"
        },
        "ArnLike": {
          "aws:SourceArn": "arn:aws:s3:*:*:s3-vector-vmlogs-flow-logs-bucket"
        }
      }
    }
  ]
}

В Resource вказуємо ім’я нашої queue, а в Condition – дозволяємо доступ з ID нашого акаунту та S3-бакету з ім’ям s3-vector-vmlogs-flow-logs-bucket:

Тут все – параметри Dead-letter queue залишаємо дефолтні, клікаємо Create, і переходимо до S3.

Створення AWS S3

Створюємо новий S3 бакет з ім’ям s3-vector-vmlogs-flow-logs-bucket – як ми задали в SQS Access Policy.

ACL нам зараз не потрібна, але Block Public Access лишаємо в дефолтному Block All:

 

Клікаємо Create, переходимо в Properties > Event notifications:

Задаємо Event name, в Event types вибираємо s3:ObjectCreated:*:

В Destination задаємо нашу SQS:

Клікаємо Save changes, і переходимо вже до VPC Flow Logs.

Створення VPC Flow Logs до S3

Створюємо новий Flow Log.

Якщо у вас VPC створюється з Terraform – то можна використати ресурс aws_flow_log:

resource "aws_flow_log" "vpc_flow_vector" {
  vpc_id               = module.vpc.vpc_id
  log_destination      = "arn:aws:s3:::s3-vector-vmlogs-flow-logs-bucket"
  log_destination_type = "s3"
  traffic_type         = "ALL"
  log_format = "$${region} $${vpc-id} $${az-id} $${subnet-id} $${instance-id} $${interface-id} $${flow-direction} $${srcaddr} $${dstaddr} $${srcport} $${dstport} $${pkt-srcaddr} $${pkt-dstaddr} $${pkt-src-aws-service} $${pkt-dst-aws-service} $${traffic-path} $${packets} $${bytes} $${action}"
  tags = {
    "Name" = "flow-logs-s3-to-vector"
  }
}

Або робимо руками – переходимо в VPC, вкладка Flow logs, клікаємо Create flow log – тут я вже маю два Flow Logs для Promtail Lambda:

В Destination задаємо Send to an Amazon S3 bucket, і вказуємо ARN нашого бакета:

Я завжди використовую Custom format з додатковими полями:

Зберігаємо, і перевіряємо статус:

Все зелененьке, працює.

Можна зачекати 10 хвилин (дефолтний період доставки логів), і перевірити дані в самій S3:

І вкладку Monitoring в SQS:

Налаштування Vector.dev

Ну а тепер саме цікаве.

Отже, що нам треба:

  • додати Source S3 з параметром SQS – звідки будемо збирати логи
  • додати трансформацію – створення нових fields
  • і додати Sink для VictoriaLogs – куди будемо писати

Тобто створюється такий собі pipeline – Source збирає дані, Transform їх трансформує, а Sink – передає оброблені дані далі, в нашому випадку до VictoriaLogs.

Документація по AWS S3 source – тут>>>.

Документація по Transformations – тут>>>.

Документація по всім Sinks – тут>>>, і по Loki – тут>>>, але ми будемо використовувати інший, Elasticsearch.

Документація по Elasticsearch Sink в Vector.dev – тут>>>, і документація по Elasticsearch data ingest в VictoriaLogs – тут>>>.

Також може бути цікавим – як з Vector збирати логи зі звичайних файлів – тут>>>.

І ще цікавий use case – збирати логи Kubernetes, і пушити їх в AWS S3 – див. How to Collect, Transform, and Ship Logs from AWS S3 to Codegiant Observability Using Vector.

З документацією розібрались – поїхали конфігуряти.

Vector.dev: Sources – S3

Першим налаштуємо збір логів з AWS S3 бакету. Для цього нам потрібні такі параметри:

  • type: aws_s3
  • auth: як будемо виконувати аутентифікацію
    • поки зробимо банальним Access/Secret ключами, коли будемо це запускати в Production – то додамо EKS Pod Identity з IAM Role, яка буде дозволяти доступ Kubernetes Pod з Vector до S3 та SQS
  • sqs.queue_url: звідки Vector буде отримувати інформацію, що в S3 з’явились нові логи

Задавати параметри будемо через Helm chart values і параметр customConfig, до якого є важливий коментар:

# customConfig — Override Vector’s default configs, if used **all** options need to be specified.

Тобто, нам потрібно буде задати всі параметри.

Тому зараз конфіг буде таким:

image:
  repository: timberio/vector
  pullPolicy: IfNotPresent

replicas: 1

service:
  enabled: false

customConfig:

  sources:
    s3-vector-vmlogs-flow-logs-bucket: # source name to be used later in Transforms
      type: aws_s3
      region: us-east-1
      compression: gzip
      auth:
        region: us-east-1
        access_key_id: AKI***B7A
        secret_access_key: pAu***2gW
      sqs:
        queue_url: https://sqs.us-east-1.amazonaws.com/492***148/s3-vector-vmlogs-queue

Vector.dev: Transforms – remap та VRL

Transforms є багато, але нам зараз цікавий remap, в якому з Vector Remap Language (VRL) ми можемо виконувати прям безліч всяких операцій.

VRL – це domain-specific language (DSL) для самого Vector.dev, в якому є різні функції для роботи з даними.

Є навіть VRL Playground, де можна спробувати що і як працює.

З того, що може бути цікавим нам – це Parse functions, а саме – функція parse_aws_vpc_flow_log. А для роботи з AWS Load Balancer logs – є функція parse_aws_alb_log.

Сама parse_aws_vpc_flow_log описується тут – parse_aws_vpc_flow_log.rs.

А приклади є тут – VRL example reference.

Що ми нею можемо зробити – передати їй на “вхід” дані з наших логів, і задати custom format.

Самий простий конфіг, з яким власне все працює так, як мені треба, виглядає так:

...
  transforms:

    s3-vector-vmlogs-flow-logs-transform:
      type: remap
      inputs:
        - s3-vector-vmlogs-flow-logs-bucket # a name from the 'sources', can have several Inputs
      source: |
        . = parse_aws_vpc_flow_log!(
          .message,
          format: "region vpc_id az_id subnet_id instance_id interface_id flow_direction srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path packets bytes action"
        )

Якщо хочеться виконати якісь операції над полями – то можна оформити таким чином:

...
      source: |
        .parsed = parse_aws_vpc_flow_log!(
          .message,
          format: "region vpc_id az_id subnet_id instance_id interface_id flow_direction srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path packets bytes action"
        )

        .region = .parsed.region
        .vpc_id = .parsed.vpc.id
        .az_id = .parsed.az_id
        .subnet_id = .parsed.subnet_id
        .instance_id = .parsed.instance_id
        .interface_id = .parsed.interface_id
        .account_id = .parsed.account_id
        .srcaddr = .parsed.srcaddr
        .dstaddr = .parsed.dstaddr
        .srcport = .parsed.srcport
        .dstport = .parsed.dstport
        .protocol = .parsed.protocol
        .packets = to_int(.parsed.packets)
        .bytes = to_int(.parsed.bytes)

        del(.parsed)
...

Тут ми створюємо власні поля region, vpc_id etc, приводимо поля packets та bytes до типу integer, і в кінці видаляємо весь message з .parsed викликом Path function del().

Але в данному випадку все чудово працює і без цього, просто експерементував з різними варіантами.

Vector.dev: Sinks – Elasticsearch та VictoriaLogs

І останнім нам потрібно задати Sink.

Я пробував це робити з Loki Sink, але з ним так і не вийшло правильно оформити нові поля, тому по рекомендації розробників VictoriaLogs просто взяв Elasticsearch Sink.

Описуємо наш конфіг:

...
  sinks:

    s3-flow-logs-to-victorialogs:
      inputs:
        - s3-vector-vmlogs-flow-logs-transform # a Transform name to get processed data from
      type: elasticsearch
      endpoints:
        - http://atlas-victoriametrics-victoria-logs-single-server:9428/insert/elasticsearch/ # VictoriaLogs Kubernetes Service URL and Elasticsearch endpoint
      api_version: v8
      compression: gzip
      healthcheck:
        enabled: false
      query: # HTTP query params
        extra_fields: source=vector # add a custom label
        # _msg_field: message # ommited here, as we have everything in the fields from the Transform, but may be used for other data
        _time_field: timestamp # set the '_time' field for the VictoriaLogs
        _stream_fields: source,vpc_id,az_id # create Stream fields for the VictoriaLogs to save data in a dedicated Stream; specify fields without spaces

Власне, я тут наче все додав в коменти, але пройдемось ще:

  • inputs: задаємо ім’я Transform, з якого беремо дані
  • endpoints: передаємо адресу VictoriaLogs в нашому Kubernetes кластері
  • healthcheck: відключаємо, бо VictoriaLogs поки не підтримує /ping ендпоінт
  • query: передаємо додаткові параметри, див. VictoriaLogs HTTP
    • в _stream_fields описуємо по яким полям VictoriaLogs буде створювати log stream – див. Stream fields

Весь values тепер виглядає так:

image:
  repository: timberio/vector
  pullPolicy: IfNotPresent

replicas: 1

service:
  enabled: false

customConfig:

  sources:
    s3-vector-vmlogs-flow-logs-bucket: # source name to be used later in Transforms
      type: aws_s3
      region: us-east-1
      compression: gzip
      auth:
        region: us-east-1
        access_key_id: AKI***B7A
        secret_access_key: pAu***2gW
      sqs:
        queue_url: https://sqs.us-east-1.amazonaws.com/492***148/s3-vector-vmlogs-queue

  transforms:

    s3-vector-vmlogs-flow-logs-transform: # a name from the 'sources', can have several Inputs
      type: remap
      inputs:
        - s3-vector-vmlogs-flow-logs-bucket
      source: |
        . = parse_aws_vpc_flow_log!(
          .message,
          format: "region vpc_id az_id subnet_id instance_id interface_id flow_direction srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path packets bytes action"
        )

  sinks:

    s3-flow-logs-to-victorialogs:
      inputs:
        - s3-vector-vmlogs-flow-logs-transform # a Transform name to get processed data from
      type: elasticsearch
      endpoints:
        - http://atlas-victoriametrics-victoria-logs-single-server:9428/insert/elasticsearch/ # VictoriaLogs Kubernetes Service URL and Elasticsearch endpoint
      api_version: v8
      compression: gzip
      healthcheck:
        enabled: false
      query: # HTTP query params
        extra_fields: source=vector # add a custom label
        # _msg_field: message # ommited here, as we have everything in the fields from the Transform, but may be used for other data
        _time_field: timestamp # set the '_time' field for the VictoriaLogs
        _stream_fields: source,vpc_id,az_id # create Stream fields for the VictoriaLogs to save data in a dedicated Stream; specify fields without spaces

Деплоїмо наші зміни:

$ helm upgrade --install vector vector/vector -f vector-values.yaml

В логах чомусь помилка обробки поля srcport з Flow Logs:

ERROR transform{component_kind="transform" component_id=s3-vector-vmlogs-flow-logs-transform component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"parse_aws_vpc_flow_log\" at (4:254): failed to parse value as i64 (key: `srcport`): `srcport`" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true

Чому – не знаю, бо поле таке саме і в Flow Logs, і в нашому custom format. Але воно наче ні на що не впливає, пізніше зроблю GitHub Issue, спитаю.

Чекаємо, коли з S3 прийдуть дані, і перевіряємо в нашій VictoriaLogs, використовуючи _stream: {source="vector", vpc_id="vpc-0fbaffe234c0d81ea", az_id="use1-az2"} – поля, які ми задавали в _stream_fields:

Вау!

“It works!” (c)

Grafana та VictoriaLogs

Давайте глянемо, як це все працює в Grafana.

Спершу – просто перевіримо дані там:

В моїй Grafana dashboard є така панелька:

З таким запитом:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>" keep_original_fields
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_dst_addr:ipv4_range("10.0.32.0/20")
      pkt_dst_addr:~"${kubernetes_pod_ip}"
      pkt_src_addr:~"${remote_svc_ip}"
  | stats by (pkt_src_addr, src_port, pkt_dst_addr, dst_port) sum(bytes) bytes_total
  | sort by (bytes_total) desc limit 10

Перепишемо цей запит під нові дані – використовуємо новий stream, і приберемо filter, бо у там тепер є готові поля – виконуємо виборку відразу по ним:

{source="vector", vpc_id="vpc-0fbaffe234c0d81ea", az_id="use1-az2"} interface_id:="eni-0352f8c82da6aa229" action:="ACCEPT" pkt_dstaddr:ipv4_range("10.0.32.0/20")
  | stats by (pkt_srcaddr, srcport, pkt_dstaddr, dstport) sum(bytes) bytes_total 
  | sort by (bytes_total) desc

Performance: “raw logs” vs “fielded logs”

І порівняємо швидкість такого запиту із запитом з сирих логів.

Старий запит, візьмемо 3 години:

Новий запит за ті ж 3 години:

Різниця у 2 рази.

При цьому ресурси самого Vector:

$ kk top pod vector-0
NAME       CPU(cores)   MEMORY(bytes)   
vector-0   3m           104Mi           

І VictoriaLogs:

$ kk top pod atlas-victoriametrics-victoria-logs-single-server-0
atlas-victoriametrics-victoria-logs-single-server-0   12m   840Mi

Можна пробувати цю схему запускати в Production.

Корисні посилання