Отже, знов повертаємось до теми AWS VPC Flow Logs, VictoriaLogs, та Grafana dashboard.
В пості VictoriaLogs: дашборда в Grafana з AWS VPC Flow Logs – мігруємо з Grafana Loki ми створили прикольну дашборду для відображення різної статистики по трафіку AWS NAT Gateway.
Але там є маленький недолік – всі дані будуються з raw logs, які пишуться з VPC Flow Logs в AWS S3, з S3 їх збирає Promtail в AWS Lambda, і потім пише до VictoriaLogs.
Зміст
Проблема: перформанс з raw logs
В цій Grafana dashboard з VictoriaLogs виконуються запити типу:
_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT") | extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>" keep_original_fields | filter interface_id:="eni-0352f8c82da6aa229" action:=ACCEPT pkt_dst_addr:ipv4_range("10.0.32.0/20") pkt_dst_addr:~"${kubernetes_pod_ip}" pkt_src_addr:~"${remote_svc_ip}" | stats by (pkt_src_addr) sum(bytes) sum_bytes | sort by (sum_bytes) desc limit 10
Де з extract
отримуємо значення для нових полів прямо із логу.
І все це більш-менш працює, але максимальний період, за який вдається побудувати графіки – 24 години (з Loki було взагалі 30 хвилин).
Але є інший варіант роботи з логами: замість того, аби парсити поля прямо під час виконання запиту з використанням exctract
– ми можемо створювати ці поля ще на етапі збору логів з S3, і далі в запитах використовувати вже їх.
В принципі, це можна було б зробити прямо з поточним сетапом – через Promtail. Щось схоже я робив в Grafana Loki: alerts from Ruler and labels from logs, але – ну не хочеться мені мати справу з Lambda Promtail від Grafana, бо мені навіть не вдалося оновити версію Promtail в моєму Docker image – а я не пам’ятаю, як робив перший. Тому в мене Promtail в Lambda досі той, який я створив ще у 2023 році – див. Loki: збір логів з CloudWatch Logs з використанням Lambda Promtail.
Тому замість Promtail вирішив спробувати Vector.dev. Він трохи складний в налаштуванні, але має просто безліч можливостей.
Власне, чим більше можливостей – тим більш складно налаштувати систему. Втім, мені все ж вдалось зробити те, чого я хотів, і вийшло навіть доволі просто, тому можна пробувати робити це для Production.
Тож сьогодні зробимо простенький Proof of Concept з Flow Logs, Vector.dev та VictoriaLogs:
- встановимо Helm-чарт з Vector
- створимо новий AWS S3, налаштуємо VPC Flow Logs з custom format для запису в цей бакет
- подивимось, як ми можемо збирати логи з S3 до Vector.dev і додавати нові поля
- і порівняємо швидкість роботи з raw logs vs логи з Vector з полями
Vector.dev
Отже, що таке Vector.dev?
Vector is a high-performance observability data pipeline that puts organizations in control of their observability data. Collect, transform, and route all your logs, metrics, and traces to any vendors
Тобто основна ідея – збирати будь-які дані моніторингу, будь то метрики або логи, виконувати над ними якісь дії, і потім кудись писати.
В моєму випадку мені треба взяти запис лога, додати до нього якісь поля, і записати до VictoriaLogs.
Components
Див. Concepts.
Нас зараз цікавлять три компоненти:
- Sources: звідки збираємо дані
- Transforms: що ми з даними робимо
- Sinks: куди ми оброблені дані передаємо далі
В нашому випадку Sources буде AWS S3, в Transforms – будемо парсити логи VPC FLow logs і створювати нові fields, а в Sinks – використаємо Elasticsearch Sink для VictoriaLogs, див. документацію по Vector setup в VictoriaLogs docs.
Взагалі, Vector має окремий Loki Sink, але з ним більше проблем, ніж користі, а з Elasticsearch (або HTTP) все запрацювало без проблем.
Запуск в Kubernetes з Helm
Документація по запуску з Helm – в Install Vector on Kubernetes та в самому чарті – README.md.
Додаємо собі новий репозиторій:
$ helm repo add vector https://helm.vector.dev "vector" has been added to your repositories $ helm repo update
Встановлюємо Vector – поки з дефолтними параметрами, потім створимо власний values.yaml
:
$ helm install vector vector/vector NAME: vector LAST DEPLOYED: Mon Dec 2 15:13:30 2024 ...
Переходимо до VPC Flow Logs.
Налаштування AWS VPC Flow Logs до S3
Далі, нам потрібна S3 корзина, в яку ми будемо писати VPC Flow Logs, і SQS, в яку будуть відправлятись повідомлення, коли в S3 створюються нові об’єкти, тобто логи.
Потім Vector буде читати повідомлення з цієї SQS, і забирати логи з S3.
Створення AWS SQS
Документація по SQS для S3 – Walkthrough: Configuring a bucket for notifications (SNS topic or SQS queue).
Створюємо нову чергу:
Тип – Standart:
Задаємо Access policy:
{ "Version": "2012-10-17", "Id": "example-ID", "Statement": [ { "Sid": "vpc-ops-flow-vmlogs-s3-allow", "Effect": "Allow", "Principal": { "Service": "s3.amazonaws.com" }, "Action": "SQS:SendMessage", "c": "arn:aws:sqs:us-east-1:492***148:s3-vector-vmlogs-queue", "Condition": { "StringEquals": { "aws:SourceAccount": "492***148" }, "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:s3-vector-vmlogs-flow-logs-bucket" } } } ] }
В Resource вказуємо ім’я нашої queue, а в Condition – дозволяємо доступ з ID нашого акаунту та S3-бакету з ім’ям s3-vector-vmlogs-flow-logs-bucket
:
Тут все – параметри Dead-letter queue залишаємо дефолтні, клікаємо Create, і переходимо до S3.
Створення AWS S3
Створюємо новий S3 бакет з ім’ям s3-vector-vmlogs-flow-logs-bucket
– як ми задали в SQS Access Policy.
ACL нам зараз не потрібна, але Block Public Access лишаємо в дефолтному Block All:
Клікаємо Create, переходимо в Properties > Event notifications:
Задаємо Event name, в Event types вибираємо s3:ObjectCreated:*
:
В Destination задаємо нашу SQS:
Клікаємо Save changes, і переходимо вже до VPC Flow Logs.
Створення VPC Flow Logs до S3
Створюємо новий Flow Log.
Якщо у вас VPC створюється з Terraform – то можна використати ресурс aws_flow_log
:
resource "aws_flow_log" "vpc_flow_vector" { vpc_id = module.vpc.vpc_id log_destination = "arn:aws:s3:::s3-vector-vmlogs-flow-logs-bucket" log_destination_type = "s3" traffic_type = "ALL" log_format = "$${region} $${vpc-id} $${az-id} $${subnet-id} $${instance-id} $${interface-id} $${flow-direction} $${srcaddr} $${dstaddr} $${srcport} $${dstport} $${pkt-srcaddr} $${pkt-dstaddr} $${pkt-src-aws-service} $${pkt-dst-aws-service} $${traffic-path} $${packets} $${bytes} $${action}" tags = { "Name" = "flow-logs-s3-to-vector" } }
Або робимо руками – переходимо в VPC, вкладка Flow logs, клікаємо Create flow log – тут я вже маю два Flow Logs для Promtail Lambda:
В Destination задаємо Send to an Amazon S3 bucket, і вказуємо ARN нашого бакета:
Я завжди використовую Custom format з додатковими полями:
Зберігаємо, і перевіряємо статус:
Все зелененьке, працює.
Можна зачекати 10 хвилин (дефолтний період доставки логів), і перевірити дані в самій S3:
І вкладку Monitoring в SQS:
Налаштування Vector.dev
Ну а тепер саме цікаве.
Отже, що нам треба:
- додати Source S3 з параметром SQS – звідки будемо збирати логи
- додати трансформацію – створення нових fields
- і додати Sink для VictoriaLogs – куди будемо писати
Тобто створюється такий собі pipeline – Source збирає дані, Transform їх трансформує, а Sink – передає оброблені дані далі, в нашому випадку до VictoriaLogs.
Документація по AWS S3 source – тут>>>.
Документація по Transformations – тут>>>.
Документація по всім Sinks – тут>>>, і по Loki – тут>>>, але ми будемо використовувати інший, Elasticsearch.
Документація по Elasticsearch Sink в Vector.dev – тут>>>, і документація по Elasticsearch data ingest в VictoriaLogs – тут>>>.
Також може бути цікавим – як з Vector збирати логи зі звичайних файлів – тут>>>.
І ще цікавий use case – збирати логи Kubernetes, і пушити їх в AWS S3 – див. How to Collect, Transform, and Ship Logs from AWS S3 to Codegiant Observability Using Vector.
З документацією розібрались – поїхали конфігуряти.
Vector.dev: Sources – S3
Першим налаштуємо збір логів з AWS S3 бакету. Для цього нам потрібні такі параметри:
type
:aws_s3
auth
: як будемо виконувати аутентифікацію- поки зробимо банальним Access/Secret ключами, коли будемо це запускати в Production – то додамо EKS Pod Identity з IAM Role, яка буде дозволяти доступ Kubernetes Pod з Vector до S3 та SQS
sqs.queue_url
: звідки Vector буде отримувати інформацію, що в S3 з’явились нові логи
Задавати параметри будемо через Helm chart values і параметр customConfig
, до якого є важливий коментар:
# customConfig — Override Vector’s default configs, if used **all** options need to be specified.
Тобто, нам потрібно буде задати всі параметри.
Тому зараз конфіг буде таким:
image: repository: timberio/vector pullPolicy: IfNotPresent replicas: 1 service: enabled: false customConfig: sources: s3-vector-vmlogs-flow-logs-bucket: # source name to be used later in Transforms type: aws_s3 region: us-east-1 compression: gzip auth: region: us-east-1 access_key_id: AKI***B7A secret_access_key: pAu***2gW sqs: queue_url: https://sqs.us-east-1.amazonaws.com/492***148/s3-vector-vmlogs-queue
Vector.dev: Transforms – remap
та VRL
Transforms є багато, але нам зараз цікавий remap
, в якому з Vector Remap Language (VRL) ми можемо виконувати прям безліч всяких операцій.
VRL – це domain-specific language (DSL) для самого Vector.dev, в якому є різні функції для роботи з даними.
Є навіть VRL Playground, де можна спробувати що і як працює.
З того, що може бути цікавим нам – це Parse functions, а саме – функція parse_aws_vpc_flow_log
. А для роботи з AWS Load Balancer logs – є функція parse_aws_alb_log
.
Сама parse_aws_vpc_flow_log
описується тут – parse_aws_vpc_flow_log.rs.
А приклади є тут – VRL example reference.
Що ми нею можемо зробити – передати їй на “вхід” дані з наших логів, і задати custom format.
Самий простий конфіг, з яким власне все працює так, як мені треба, виглядає так:
... transforms: s3-vector-vmlogs-flow-logs-transform: type: remap inputs: - s3-vector-vmlogs-flow-logs-bucket # a name from the 'sources', can have several Inputs source: | . = parse_aws_vpc_flow_log!( .message, format: "region vpc_id az_id subnet_id instance_id interface_id flow_direction srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path packets bytes action" )
Якщо хочеться виконати якісь операції над полями – то можна оформити таким чином:
... source: | .parsed = parse_aws_vpc_flow_log!( .message, format: "region vpc_id az_id subnet_id instance_id interface_id flow_direction srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path packets bytes action" ) .region = .parsed.region .vpc_id = .parsed.vpc.id .az_id = .parsed.az_id .subnet_id = .parsed.subnet_id .instance_id = .parsed.instance_id .interface_id = .parsed.interface_id .account_id = .parsed.account_id .srcaddr = .parsed.srcaddr .dstaddr = .parsed.dstaddr .srcport = .parsed.srcport .dstport = .parsed.dstport .protocol = .parsed.protocol .packets = to_int(.parsed.packets) .bytes = to_int(.parsed.bytes) del(.parsed) ...
Тут ми створюємо власні поля region
, vpc_id
etc, приводимо поля packets
та bytes
до типу integer, і в кінці видаляємо весь message
з .parsed
викликом Path function del()
.
Але в данному випадку все чудово працює і без цього, просто експерементував з різними варіантами.
Vector.dev: Sinks – Elasticsearch та VictoriaLogs
І останнім нам потрібно задати Sink.
Я пробував це робити з Loki Sink, але з ним так і не вийшло правильно оформити нові поля, тому по рекомендації розробників VictoriaLogs просто взяв Elasticsearch Sink.
Описуємо наш конфіг:
... sinks: s3-flow-logs-to-victorialogs: inputs: - s3-vector-vmlogs-flow-logs-transform # a Transform name to get processed data from type: elasticsearch endpoints: - http://atlas-victoriametrics-victoria-logs-single-server:9428/insert/elasticsearch/ # VictoriaLogs Kubernetes Service URL and Elasticsearch endpoint api_version: v8 compression: gzip healthcheck: enabled: false query: # HTTP query params extra_fields: source=vector # add a custom label # _msg_field: message # ommited here, as we have everything in the fields from the Transform, but may be used for other data _time_field: timestamp # set the '_time' field for the VictoriaLogs _stream_fields: source,vpc_id,az_id # create Stream fields for the VictoriaLogs to save data in a dedicated Stream; specify fields without spaces
Власне, я тут наче все додав в коменти, але пройдемось ще:
inputs
: задаємо ім’я Transform, з якого беремо даніendpoints
: передаємо адресу VictoriaLogs в нашому Kubernetes кластеріhealthcheck
: відключаємо, бо VictoriaLogs поки не підтримує/ping
ендпоінтquery
: передаємо додаткові параметри, див. VictoriaLogs HTTP- в
_stream_fields
описуємо по яким полям VictoriaLogs буде створювати log stream – див. Stream fields
- в
Весь values тепер виглядає так:
image: repository: timberio/vector pullPolicy: IfNotPresent replicas: 1 service: enabled: false customConfig: sources: s3-vector-vmlogs-flow-logs-bucket: # source name to be used later in Transforms type: aws_s3 region: us-east-1 compression: gzip auth: region: us-east-1 access_key_id: AKI***B7A secret_access_key: pAu***2gW sqs: queue_url: https://sqs.us-east-1.amazonaws.com/492***148/s3-vector-vmlogs-queue transforms: s3-vector-vmlogs-flow-logs-transform: # a name from the 'sources', can have several Inputs type: remap inputs: - s3-vector-vmlogs-flow-logs-bucket source: | . = parse_aws_vpc_flow_log!( .message, format: "region vpc_id az_id subnet_id instance_id interface_id flow_direction srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path packets bytes action" ) sinks: s3-flow-logs-to-victorialogs: inputs: - s3-vector-vmlogs-flow-logs-transform # a Transform name to get processed data from type: elasticsearch endpoints: - http://atlas-victoriametrics-victoria-logs-single-server:9428/insert/elasticsearch/ # VictoriaLogs Kubernetes Service URL and Elasticsearch endpoint api_version: v8 compression: gzip healthcheck: enabled: false query: # HTTP query params extra_fields: source=vector # add a custom label # _msg_field: message # ommited here, as we have everything in the fields from the Transform, but may be used for other data _time_field: timestamp # set the '_time' field for the VictoriaLogs _stream_fields: source,vpc_id,az_id # create Stream fields for the VictoriaLogs to save data in a dedicated Stream; specify fields without spaces
Деплоїмо наші зміни:
$ helm upgrade --install vector vector/vector -f vector-values.yaml
В логах чомусь помилка обробки поля srcport
з Flow Logs:
ERROR transform{component_kind="transform" component_id=s3-vector-vmlogs-flow-logs-transform component_type=remap}: vector::internal_events::remap: Mapping failed with event. error="function call error for \"parse_aws_vpc_flow_log\" at (4:254): failed to parse value as i64 (key: `srcport`): `srcport`" error_type="conversion_failed" stage="processing" internal_log_rate_limit=true
Чому – не знаю, бо поле таке саме і в Flow Logs, і в нашому custom format. Але воно наче ні на що не впливає, пізніше зроблю GitHub Issue, спитаю.
Чекаємо, коли з S3 прийдуть дані, і перевіряємо в нашій VictoriaLogs, використовуючи _stream: {source="vector", vpc_id="vpc-0fbaffe234c0d81ea", az_id="use1-az2"}
– поля, які ми задавали в _stream_fields
:
Вау!
“It works!” (c)
Grafana та VictoriaLogs
Давайте глянемо, як це все працює в Grafana.
Спершу – просто перевіримо дані там:
В моїй Grafana dashboard є така панелька:
З таким запитом:
_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT") | extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>" keep_original_fields | filter interface_id:="eni-0352f8c82da6aa229" action:="ACCEPT" pkt_dst_addr:ipv4_range("10.0.32.0/20") pkt_dst_addr:~"${kubernetes_pod_ip}" pkt_src_addr:~"${remote_svc_ip}" | stats by (pkt_src_addr, src_port, pkt_dst_addr, dst_port) sum(bytes) bytes_total | sort by (bytes_total) desc limit 10
Перепишемо цей запит під нові дані – використовуємо новий stream, і приберемо filter
, бо у там тепер є готові поля – виконуємо виборку відразу по ним:
{source="vector", vpc_id="vpc-0fbaffe234c0d81ea", az_id="use1-az2"} interface_id:="eni-0352f8c82da6aa229" action:="ACCEPT" pkt_dstaddr:ipv4_range("10.0.32.0/20") | stats by (pkt_srcaddr, srcport, pkt_dstaddr, dstport) sum(bytes) bytes_total | sort by (bytes_total) desc
Performance: “raw logs” vs “fielded logs”
І порівняємо швидкість такого запиту із запитом з сирих логів.
Старий запит, візьмемо 3 години:
Новий запит за ті ж 3 години:
Різниця у 2 рази.
При цьому ресурси самого Vector:
$ kk top pod vector-0 NAME CPU(cores) MEMORY(bytes) vector-0 3m 104Mi
І VictoriaLogs:
$ kk top pod atlas-victoriametrics-victoria-logs-single-server-0 atlas-victoriametrics-victoria-logs-single-server-0 12m 840Mi
Можна пробувати цю схему запускати в Production.
Корисні посилання
- ALB Controller Logs With Vector and OpenSearch: A Guide
- Fast log management with SigLens and Vector.dev