В попередньому пості – AWS: VPC Flow Logs – логи до S3 та Grafana dashboard з Loki ми створили дашборду в Grafana, яка відображає статистику використання NAT Gateway.
Що саме нас там цікавило – це які Kubernetes Pods використовують найбільше байт, бо це напряму впливає на наші AWS Costs.
І все наче добре з цією бордою, окрім одного – Loki не здатна обробити “сирі” логи і побудувати графіки більше ніж за 30 хвилин, максимум – 1 година, і то вже частина візуалізацій не прогружаються, хоча я намагався її затюнити – див. Grafana Loki: оптимізація роботи – Recording Rules, кешування та паралельні запити.
Тому я вирішив спробувати такий же підхід – з S3 для VPC Flow Logs, Lambda та Promtail – але вже з VictoriaLogs, тим більш з версії 0.8.0 VictoriaLogs Grafana data source вже завели кращу підтримку запитів, і тепер можна будувати візуалізації без Grafana Transformations.
Отже, що будемо робити:
- швиденько покажу Terraform код, який створює S3 для VPC Flow Logs і AWS Lambda з Promtail, який шле дані до VictoriaLogs
- створимо нову Grafana dashboard з VictoriaLogs datasource, і перенесемо запити з Loki та її LogQL до VictoriaLogs та LogsQL
Нагадаю з попереднього поста що ми маємо в нашому сетапі:
- знаємо CIDR приватних сабнетів для Kubernetes Pods
- у нас використовується тільки одна мережа в us-east-1a AvailabilityZone –
10.0.32.0/20
- у нас використовується тільки одна мережа в us-east-1a AvailabilityZone –
- знаємо Elastic Network Interface ID нашого NAT Gateway – він у нас один, тому тут все просто
- в логах маємо поля
pkt_src_addrтаpkt_dst_addr, по яким можемо вибирати трафік тільки з/до Kubernetes Pods
Також варто глянути інші пости по цій темі:
- VictoriaLogs: знайомство, запуск в Kubernetes, LogsQL та Grafana
- AWS: VPC Flow Logs, NAT Gateways, та Kubernetes Pods – детальний обзор
- Grafana Loki: збираємо логи AWS LoadBalancer з S3 за допомогою Promtail Lambda
- Loki: збір логів з CloudWatch Logs з використанням Lambda Promtail
- VictoriaMetrics: створення Kubernetes monitoring stack з власним Helm-чартом
Зміст
Terraform
S3 та Promtail Lambda
Детально тут розписувати не буду, бо в коді наче достатньо коментарів, які описують кожен ресурс. Просто приклад того, як таке можна зробити. Крім того, першу версію модуля описував в Terraform: створення модулю для збору логів AWS ALB в Grafana Loki, але тут трохи перероблений варіант аби мати можливість налаштування і Loki і VictoriaLogs, і не тільки логи ALB, але і VPC Flow Logs.
Отже, як це реалізував я:
- репозиторій
atlas-tf-modules: модулі Terraform, в якому є код для створення S3 бакетів, Lambda, нотифікацій і пермішенів -
репозиторій
atlas-monitoring: код Terraform та Helm-чарт нашого моніторинга, де створюються необхідні ресурси – RDS, різні додаткові S3-бакети, сертифікати AWS ACM, та викликається модуль зatlas-tf-modules/alb-s3-logsдля налаштування збору логів з S3 бакетів
Почнемо з самого модуля alb-s3-logs для S3 та Lambda. Про Terraform та модулі писав в Terraform: модулі, Outputs та Variables.
Структура файлів в alb-s3-logs:
$ tree alb-s3-logs/ alb-s3-logs/ |-- README.md |-- lambda.tf |-- outputs.tf |-- s3.tf `-- variables.tf
Створення S3 buckets
Файл s3.tf – створення бакетів:
# define S3 bucket names from parameteres passed from a calling/root module in the 'atlas-monitoring' repository
locals {
# ops-1-30-devops-ingress-ops-alb-loki-logs
# "ops" "1-30" "devops" "ingress" "ops" "alb" "loki" "logs"
# <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
# ops-1-30-devops-vpc-ops-flow-loki-logs
# "ops" "1-30" "devops" "vpc" "ops" "flow" "loki" "logs"
# <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
logs_bucket_names = { for env in var.app_environments : env => "${var.aws_env}-${var.eks_version}-${var.component}-${var.application}-${env}-${var.aws_service}-${var.logger_type}-logs" }
}
resource "aws_s3_bucket" "s3_logs" {
for_each = local.logs_bucket_names
bucket = each.value
# to drop a bucket, set to `true` first
# run `terraform apply`
# then remove the block
# and run `terraform apply` again
force_destroy = true
}
# remove logs older than 30 days
resource "aws_s3_bucket_lifecycle_configuration" "bucket_config" {
for_each = aws_s3_bucket.s3_logs
bucket = each.value.id
rule {
id = "logs"
status = "Enabled"
expiration {
days = 30
}
}
}
# block S3 bucket public access
resource "aws_s3_bucket_public_access_block" "s3_logs_backend_acl" {
for_each = aws_s3_bucket.s3_logs
bucket = each.value.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# using the 'var.aws_service == "alb"', attach the S3 bucket Policy to buckets for ALB Logs only
resource "aws_s3_bucket_policy" "s3_logs_alb" {
for_each = {
for key, bucket_name in aws_s3_bucket.s3_logs :
key => bucket_name if var.aws_service == "alb"
}
bucket = each.value.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "RegionELBLogsWrite"
Effect = "Allow"
Principal = {
AWS = "arn:aws:iam::${var.elb_account_id}:root"
}
Action = "s3:PutObject"
Resource = "arn:aws:s3:::${each.value.id}/AWSLogs/${var.aws_account_id}/*"
},
{
Sid = "PromtailLambdaLogsGet"
Effect = "Allow"
Principal = {
AWS = module.logs_promtail_lambda[each.key].lambda_role_arn
}
Action = "s3:GetObject"
Resource = "arn:aws:s3:::${each.value.id}/*"
}
]
})
}
# using the 'var.aws_service == "flow"', attach attach the S3 bucket Policy to buckets for VPC Flow Logs only
resource "aws_s3_bucket_policy" "s3_logs_flow" {
for_each = {
for key, bucket_name in aws_s3_bucket.s3_logs :
key => bucket_name if var.aws_service == "flow"
}
bucket = each.value.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "VPCFlowLogsDeliveryWrite",
Effect = "Allow",
Principal = {
Service = "delivery.logs.amazonaws.com"
},
Action = "s3:PutObject",
Resource = "arn:aws:s3:::${each.value.id}/AWSLogs/${var.aws_account_id}/*",
Condition = {
StringEquals = {
"aws:SourceAccount": "${var.aws_account_id}",
"s3:x-amz-acl": "bucket-owner-full-control"
},
ArnLike = {
"aws:SourceArn": "arn:aws:logs:us-east-1:${var.aws_account_id}:*"
}
}
},
{
Sid = "VPCFlowLogsAclCheck",
Effect = "Allow",
Principal = {
Service = "delivery.logs.amazonaws.com"
},
Action = "s3:GetBucketAcl",
Resource = "arn:aws:s3:::${each.value.id}",
Condition = {
StringEquals = {
"aws:SourceAccount": "${var.aws_account_id}"
},
ArnLike = {
"aws:SourceArn": "arn:aws:logs:us-east-1:${var.aws_account_id}:*"
}
}
},
{
Sid = "PromtailLambdaLogsGet"
Effect = "Allow"
Principal = {
AWS = module.logs_promtail_lambda[each.key].lambda_role_arn
}
Action = "s3:GetObject"
Resource = "arn:aws:s3:::${each.value.id}/*"
}
]
})
}
# send notifications to a Lambda function with Promtail when a new object is created in the S3 bucket
resource "aws_s3_bucket_notification" "s3_logs_lambda_notification" {
for_each = aws_s3_bucket.s3_logs
bucket = each.value.id
lambda_function {
lambda_function_arn = module.logs_promtail_lambda[each.key].lambda_function_arn
events = ["s3:ObjectCreated:*"]
filter_prefix = "AWSLogs/${var.aws_account_id}/"
}
}
Створення Lambda функцій з Promtail
Файл lambda.tf:
# to allow network connections from S3 buckets IP range
data "aws_prefix_list" "s3" {
filter {
name = "prefix-list-name"
values = ["com.amazonaws.us-east-1.s3"]
}
}
# allow connections from S3 and from/to VPC Private Subnets to access Loki and VictoriaLogs
module "logs_security_group_lambda" {
source = "terraform-aws-modules/security-group/aws"
version = "~> 5.2.0"
# 'ops-1-30-loki-lambda-sg'
name = "${var.aws_env}-${var.eks_version}-lambda-${var.logger_type}-sg"
description = "Security Group for Lambda Egress"
vpc_id = var.vpc_id
egress_cidr_blocks = var.vpc_private_subnets_cidrs
egress_ipv6_cidr_blocks = []
egress_prefix_list_ids = [data.aws_prefix_list.s3.id]
ingress_cidr_blocks = var.vpc_private_subnets_cidrs
ingress_ipv6_cidr_blocks = []
egress_rules = ["https-443-tcp"]
ingress_rules = ["https-443-tcp"]
}
# S3 buckets names:
# ops-1-30-devops-ingress-ops-alb-loki-logs
# "ops" "1-30" "devops" "ingress" "ops" "alb" "loki" "logs"
# <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
# ops-1-30-devops-vpc-ops-flow-loki-logs
# "ops" "1-30" "devops" "vpc" "ops" "flow" "loki" "logs"
# <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
module "logs_promtail_lambda" {
source = "terraform-aws-modules/lambda/aws"
version = "~> 7.16.0"
# key: 'ops'
# value: 'ops-1-30-devops-vpc-ops-flow-loki-logs'
for_each = aws_s3_bucket.s3_logs
# build Lambda function name like 'ops-1-30-devops-vpc-ops-flow-loki-logs-logger'
function_name = "${each.value.id}-${var.logger_type}-logger"
description = "Promtail instance to collect logs from S3"
create_package = false
# https://github.com/terraform-aws-modules/terraform-aws-lambda/issues/36
publish = true
# an error when sending logs from Flow Logs S3:
# 'Task timed out after 3.05 seconds'
timeout = 60
image_uri = var.promtail_image
package_type = "Image"
architectures = ["x86_64"]
# component=devops, logtype=alb, environment=ops, logger_type=loki
# component=devops, logtype=flow, environment=ops, logger_type=loki
environment_variables = {
EXTRA_LABELS = "component,${var.component},logtype,${var.aws_service},environment,${each.key},logger_type,${var.logger_type}"
KEEP_STREAM = "true"
OMIT_EXTRA_LABELS_PREFIX = "true"
PRINT_LOG_LINE = "true"
WRITE_ADDRESS = var.logger_write_address
}
vpc_subnet_ids = var.vpc_private_subnets_ids
vpc_security_group_ids = [module.logs_security_group_lambda.security_group_id]
attach_network_policy = true
# writing too many logs
# see in CloudWatch Metrics by the 'IncomingBytes' metric
# to save CloudWatch Logs costs, decrease the logs number
# set to 'INFO' for debugging
logging_application_log_level = "FATAL"
logging_system_log_level = "WARN"
logging_log_format = "JSON"
# allow calling the Lambda from an S3 bucket
# bucket name: ops-1-28-backend-api-dev-alb-logs
allowed_triggers = {
S3 = {
principal = "s3.amazonaws.com"
source_arn = "arn:aws:s3:::${each.value.id}"
}
}
}
Виклик модуля atlas-tf-modules з коду моніторинга
Далі описуємо ресурси в коді Terraform в репозиторії atlas-monitoring – файл logs.tf.
Тут створюється три модулі:
- Load Balancers logs в Loki
- VPC Flow Logs в Loki
- VPC Flow Logs в VictoriaLogs
/*
Collect ALB Logs to Loki module
S3:
- will create an aws_s3_bucket for each app_environments[]:
# bucket names:
# '<eks_env>-<eks_version>-<component>-<application>-<app_env>-alb-logs'
# i.e:
# 'ops-1-28-backend-api-dev-alb-logs'
- will create an aws_s3_bucket_policy with Allow for each Lambda
- will create an aws_s3_bucket_notification with Push event on each s3:ObjectCreated to each Lambda
Lambda:
- will create a security_group_lambda with Allow 443 from VPC CIDR
- will create a Lambda with Promtail for each aws_s3_bucket
*/
module "vpc_flow_logs_loki" {
# create the module for each EKS cluster by its version
# for_each = var.eks_versions
for_each = toset(["1-30"])
source = "[email protected]:ORG-NAME/atlas-tf-modules//alb-s3-logs?ref=master"
# for local development
# source = "/home/setevoy/Work/atlas-tf-modules//alb-s3-logs"
# ops-1-30-devops-ingress-ops-alb-loki-logs
# "ops" "1-30" "devops" "ingress" "ops" "alb" "loki" "logs"
# <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
# ops-1-30-devops-vpc-ops-flow-loki-logs
# "ops" "1-30" "devops" "vpc" "ops" "flow" "loki" "logs"
# <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
# 'ops'
aws_env = var.aws_environment
# '1-30'
eks_version = each.value
# by team: 'backend', 'devops'
component = "devops"
application = "vpc"
app_environments = ["ops"]
aws_service = "flow"
logger_type = "loki"
vpc_id = local.vpc_out.vpc_id
vpc_private_subnets_cidrs = local.vpc_out.vpc_private_subnets_cidrs
vpc_private_subnets_ids = local.vpc_out.vpc_private_subnets_ids
# 'https://loki.monitoring.1-30.ops.example.co:443/loki/api/v1/push'
logger_write_address = "https://loki.monitoring.${each.value}.ops.example.com:443/loki/api/v1/push"
aws_account_id = data.aws_caller_identity.current.account_id
}
module "vpc_flow_logs_vmlogs" {
# create the module for each EKS cluster by its version
# for_each = var.eks_versions
for_each = toset(["1-30"])
source = "[email protected]:ORG-NAME/atlas-tf-modules//alb-s3-logs?ref=master"
# for local development
# source = "/home/setevoy/Work/atlas-tf-modules//alb-s3-logs"
# ops-1-30-devops-ingress-ops-alb-loki-logs
# "ops" "1-30" "devops" "ingress" "ops" "alb" "loki" "logs"
# <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
# ops-1-30-devops-vpc-ops-flow-loki-logs
# "ops" "1-30" "devops" "vpc" "ops" "flow" "loki" "logs"
# <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
# 'ops'
aws_env = var.aws_environment
# '1-30'
eks_version = each.value
# by team: 'backend', 'devops'
component = "devops"
application = "vpc"
app_environments = ["ops"]
aws_service = "flow"
logger_type = "vmlogs"
vpc_id = local.vpc_out.vpc_id
vpc_private_subnets_cidrs = local.vpc_out.vpc_private_subnets_cidrs
vpc_private_subnets_ids = local.vpc_out.vpc_private_subnets_ids
# create log streams by the 'logtype,environment,logger_type' fields
# see https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields
logger_write_address = "https://vmlogs.monitoring.${each.value}.ops.example.com:443/insert/loki/api/v1/push?_stream_fields=logtype,environment,logger_type"
aws_account_id = data.aws_caller_identity.current.account_id
}
# ../../atlas-load-balancers/helm/templates/external-ingress-alb.yaml:
# alb.ingress.kubernetes.io/load-balancer-attributes: access_logs.s3.enabled=true,access_logs.s3.bucket=ops-1-30-devops-ingress-ops-alb-logs
# two ALB are using this buckets for their logs - the External, 'ops-external-ingress', and the Internal one, 'ops-internal-ingress'
# both are in the 'ops-common-alb-ns' Namespace
module "single_ingress_alb_logs_loki" {
# create the module for each EKS cluster by its version
# for_each = var.eks_versions
for_each = toset(["1-30"])
source = "[email protected]:ORG-NAME/atlas-tf-modules//alb-s3-logs?ref=master"
# for local development
# source = "/home/setevoy/Work/atlas-tf-modules//alb-s3-logs"
# ops-1-30-devops-ingress-ops-alb-loki-logs
# "ops" "1-30" "devops" "ingress" "ops" "alb" "loki" "logs"
# <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
# ops-1-30-devops-vpc-ops-flow-loki-logs
# "ops" "1-30" "devops" "vpc" "ops" "flow" "loki" "logs"
# <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
# 'ops'
aws_env = var.aws_environment
# '1-30'
eks_version = each.value
component = "devops"
application = "ingress"
app_environments = ["ops"]
aws_service = "alb"
logger_type = "loki"
vpc_id = local.vpc_out.vpc_id
vpc_private_subnets_cidrs = local.vpc_out.vpc_private_subnets_cidrs
vpc_private_subnets_ids = local.vpc_out.vpc_private_subnets_ids
# 'https://loki.monitoring.1-30.ops.example.co:443/loki/api/v1/push'
logger_write_address = "https://loki.monitoring.${each.value}.ops.example.com:443/loki/api/v1/push"
aws_account_id = data.aws_caller_identity.current.account_id
}
З цим наче все.
Модуль VPC та Flow Logs
В модулі terraform-aws-modules/vpc/aws є підтримка Flow Logs, але там можна задати тільки один flow_log_destination_arn, в якому в мене зараз Grafana Loki – S3-бакет ops-1-30-devops-vpc-ops-flow-loki-logs:
module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "~> 5.16.0"
...
enable_flow_log = var.vpc_params.enable_flow_log
# Default: "cloud-watch-logs"
flow_log_destination_type = "s3"
# disalbe to use S3
create_flow_log_cloudwatch_log_group = false
create_flow_log_cloudwatch_iam_role = false
# ARN of the CloudWatch log group or S3 bucket
# disable if use 'create_flow_log_cloudwatch_log_group' and the default 'flow_log_destination_type' value (cloud-watch-logs)
flow_log_destination_arn = "arn:aws:s3:::ops-1-30-devops-vpc-ops-flow-loki-logs"
flow_log_cloudwatch_log_group_name_prefix = "/aws/${local.env_name}-flow-logs/"
flow_log_log_format = "$${region} $${vpc-id} $${az-id} $${subnet-id} $${instance-id} $${interface-id} $${flow-direction} $${srcaddr} $${dstaddr} $${srcport} $${dstport} $${pkt-srcaddr} $${pkt-dstaddr} $${pkt-src-aws-service} $${pkt-dst-aws-service} $${traffic-path} $${packets} $${bytes} $${action}"
vpc_flow_log_tags = {
"Name" = "flow-logs-s3-to-loki"
}
}
Аби писати відразу в два S3 бакета – просто додаємо ресурс aws_flow_log.
VPC Flow Logs пишуться з custom format:
resource "aws_flow_log" "vpc_flow_vmlogs" {
vpc_id = module.vpc.vpc_id
log_destination = "arn:aws:s3:::ops-1-30-devops-vpc-ops-flow-vmlogs-logs"
log_destination_type = "s3"
traffic_type = "ALL"
log_format = "$${region} $${vpc-id} $${az-id} $${subnet-id} $${instance-id} $${interface-id} $${flow-direction} $${srcaddr} $${dstaddr} $${srcport} $${dstport} $${pkt-srcaddr} $${pkt-dstaddr} $${pkt-src-aws-service} $${pkt-dst-aws-service} $${traffic-path} $${packets} $${bytes} $${action}"
tags = {
"Name" = "flow-logs-s3-to-vmlogs"
}
}
Крім того, я ще вручну створив Flow Logs з destination в CloudWatch Logs аби перевіряти дані в Loki та VictoriaLogs.
Створення Grafana dashboard
NAT Gateway Total processed
Першим у нас йде відображення загальної статистики по тому, скільки через NAT Gateway пройшло трафіку – і від Kubernetes Pods в інтернет, і з інтернету до Kubernetes Pods.
Запит в Loki
В Loki запит виглядає так:
sum (
sum_over_time(
({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
| interface_id="eni-0352f8c82da6aa229"
| pkt_src_addr=ip("10.0.32.0/20") OR pkt_dst_addr=ip("10.0.32.0/20")
| pkt_src_addr=~"${kubernetes_pod_ip}"
| pkt_dst_addr=~"${remote_svc_ip}"
| unwrap bytes
| __error__=""
)[$__range]
)
)
Тут:
- рахуємо
sum_over_time()за період, вибраний в Grafana dashboard –$__range - рахуємо трафік або від Kubernetes Pods –
pkt_src_addr=ip("10.0.32.0/20"), або навпаки до Kubernetes Pods –pkt_dst_addr=ip("10.0.32.0/20") - рахуємо по полю
bytes–unwrap bytes
З таким запитом маємо такі дані:
kubernetes_pod_ip та remote_svc_ip – змінні в Grafana dashboard аби мати можливість перевірки даних по конкретним адресам:
Запит з VictoriaLogs
Тепер нам треба перевести цей запит в формат LogsQL для VictoriaLogs.
Виглядати він буде так:
_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
| filter
interface_id:="eni-0352f8c82da6aa229"
action:="ACCEPT"
(pkt_src_addr:ipv4_range("10.0.32.0/20") OR pkt_dst_addr:ipv4_range("10.0.32.0/20"))
pkt_src_addr:~"${kubernetes_pod_ip}"
pkt_dst_addr:~"${remote_svc_ip}"
| stats sum(bytes) bytes_total
Підтримку змінної Grafana $__range завезли тільки вчора в версії датасорсу 0.9.0, тому оновіться.
Тут ми:
- вибираємо дані за
_time:$__range - в
{logtype=flow, environment=ops, logger_type=vmlogs}використовуємо log stream selector з лейблами, які задаються в Lambda Promtail під час запису логів –/insert/loki/api/v1/push?_stream_fields=logtype,environment,logger_type seq("eni-0352f8c82da6aa229", "ACCEPT")– використовуємо Sequence filter – вибираємо тільки записи з інтерфейсу NAT Gateway іACCEPTаби пришвидшити виконання запиту (див. коментар від Olexandr Valialkin тут>>>)- з
extractформуємо поля із записів в логу - з
filterвибираємо інтерфейсу NAT Gateway,ACCEPT, і як і в запиті Loki – фільтруємо трафік або від Kubernetes Pods з IPv4 range filter –pkt_src_addr:ipv4_range("10.0.32.0/20"), або навпаки до Kubernetes Pods –pkt_dst_addr:ipv4_range("10.0.32.0/20")(зверніть увагу, що умовиORзаключені в дужки) - і в кінці з
statsрахуємо суму по полюbytes, а результат пишемо в полеbytes_total
Перевірка з CloudWatch Logs Insights
Аби мати можливість перевірити дані в Loki і VictoriaLogs, VPC зараз пише ще й в CloudWatch Logs.
Зробимо такий запит в Logs Insights:
parse @message "* * * * * * * * * * * * * * * * * * *" | as region, vpc_id, az_id, subnet_id, instance_id, interface_id, | flow_direction, srcaddr, dstaddr, srcport, dstport, | pkt_srcaddr, pkt_dstaddr, pkt_src_aws_service, pkt_dst_aws_service, | traffic_path, packets, bytes, action | filter ((interface_id="eni-0352f8c82da6aa229") AND ((isIpv4InSubnet(pkt_srcaddr,"10.0.32.0/20") OR isIpv4InSubnet(pkt_dstaddr,"10.0.32.0/20") )) | stats sum(bytes) as bytes_total
В результаті маємо 8547192734 байт:
Що в форматі SI (див. Binary prefixes) дає нам 1.87 гігабайт – рахуємо з калькулятором:
$ bc scale=2 8547192734/1000/1000/1000 8.54
В Loki у нас було 7.56 GiB, в VictoriaLogs – 8.66 GiB.
Інколи ті ж самі дані між Loki, VictoriaLogs та CloudWatch можуть відрізнятись, тим більш при виборках всього за 30 хвилин, бо самі Flow Logs пишуться з різницею в кілька хвилин.
Наприклад, в бакеті Loki останній об’єкт створено в 13:06:50:
А в VMLogs – в 13:05:29:
Перевірка з Cost Explorer
Ще можна перевірити дані в Cost Explorer.
Вибираємо Service == EC2-Other, Usage type == NatGateway-Bytes (GB):
За минулу добу маємо 129 гігабайт трафіку через NAT Gateway.
Якщо ми в Grafana (нарешті ми це можемо зробити, бо є VictoriaLogs) зробимо range в 24 години – то побачимо в “NAT Gateway Total processed” 135 гігабайт:
Плюс-мінус сходиться, бо Cost Explorer рахує не останні 24 години, як в Grafana, а за попередню добу, крім того, там використовується UTC (+00:00) time zone.
NAT Gateway Total OUT та IN processed
Далі, хочеться бачити розподілення трафіку – від Kubernetes Pods в інтернет, та з інтернету до Kubernetes Pods.
Згадаємо, що ми маємо в записах для пакетів, які проходять через NAT Gateway – розбирали в Трафік з Pod до External Server через NAT Gateway, та VPC Flow Logs:
- по полю
interface_idфільтруємо тільки ті записи, які були зроблені з інтерфейсу NAT Gateway - якщо пакет йде від Kubernetes Pod в інтернет – то в полі
pkt_src_addrбуде IP цього Pod - якщо пакет йде з інтернету до Kubernetes Pod – то в полі
pkt_dst_addrбуде IP цього Pod
Запити Loki
Тому аби порахувати байти з інтернету – до Kubernetes Pods ми можемо зробити такий запит в Loki з sum_over_time() та $__range, аби вибрати дані за 30 хвилин, а в pkt_dst_addr=ip("10.0.32.0/20") вибираємо IP тільки VPC Private Subnet, яка використовується для Kubernetes Pods:
sum (
sum_over_time(
({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
| interface_id="eni-0352f8c82da6aa229"
| pkt_dst_addr=ip("10.0.32.0/20")
| pkt_src_addr=~"${kubernetes_pod_ip}"
| pkt_dst_addr=~"${remote_svc_ip}"
| unwrap bytes
| __error__=""
)[$__range]
)
)
Аби запит швидше оброблювався, внизу в Options можна поставити Type == Instant.
І аналогічно, але рахуємо від Kubernetes Pod в інтернет:
sum (
sum_over_time(
({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
| interface_id="eni-0352f8c82da6aa229"
| flow_direction="ingress"
| pkt_src_addr=ip("10.0.32.0/20")
| pkt_src_addr=~"${kubernetes_pod_ip}"
| pkt_dst_addr=~"${remote_svc_ip}"
| unwrap bytes
| __error__=""
)[$__range]
)
)
Запити VictoriaLogs
Запити для VictoriaLogs будуть виглядати так – з інтернету до Kubernetes Pods:
_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
| filter
interface_id:="eni-0352f8c82da6aa229"
action:="ACCEPT"
pkt_dst_addr:ipv4_range("10.0.32.0/20")
pkt_src_addr:~"${kubernetes_pod_ip}"
pkt_dst_addr:~"${remote_svc_ip}"
| stats sum(bytes) bytes_total
З Kubernetes Pods в інтернет:
_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
| filter
interface_id:="eni-0352f8c82da6aa229"
action:="ACCEPT"
pkt_src_addr:ipv4_range("10.0.32.0/20")
pkt_src_addr:~"${kubernetes_pod_ip}"
pkt_dst_addr:~"${remote_svc_ip}"
| stats sum(bytes) bytes_total
І всі три панелі разом:
NAT Gateway Total processed bytes/sec
Крім Stat панелей хотілося б бачити і історичну картину – в який час як мінявся трафік.
Запит Loki
В Loki все просто – просто використовуємо функцію rate():
sum (
rate(
({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
| interface_id="eni-0352f8c82da6aa229"
| pkt_dst_addr=ip("10.0.32.0/20")
| pkt_src_addr=~"${kubernetes_pod_ip}"
| pkt_dst_addr=~"${remote_svc_ip}"
| unwrap bytes
| __error__=""
)[$__interval]
)
)
В Options і в rate() використовуємо інтервал 5 хвилин, в Standart Options > Unit bytes/sec(SI), в результаті маємо 7.25 МБ/с в 12:20:
Запит VictoriaLogs
А з VictoriaLogs трохи цікавіше, бо з коробки вона не має функції rate() (але обіцяють скоро додати).
Крім того, є ще один нюанс:
- Loki рахує дані “назад”, тобто – точка на графіку в 12:25 а
rate()бере попередні 5 хвилин –[5m]з Options, які передаються в$__interval - в VictoriaLogs графік буде відображатись на момент виконання запиту
Аби порахувати per-second rate наших bytes – можемо використати math pipe.
Отже, запит буде таким:
_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
| filter
interface_id:="eni-0352f8c82da6aa229"
action:="ACCEPT"
pkt_dst_addr:ipv4_range("10.0.32.0/20")
pkt_src_addr:~"${kubernetes_pod_ip}"
pkt_dst_addr:~"${remote_svc_ip}"
| stats sum(bytes) sum_bytes
| math (sum_bytes / ($__interval/1s)) as bytes_total
Тут:
- в
stats sum(bytes)рахуємо суму байт за інтервал, заданий в Options (5 хвилин), результат зберігаємо якsum_bytes - далі з
mathрахуємо суму байт зsum_bytesза кожен інтервал на графіку, і їх ділимо на кількість секунд в обраному$__interval
Тут у нас 8.30 МБ/с в 12:20. Плюс-мінус схоже. Можна вже зовсім заморочитись з перевіркою, і порахувати вручну з логів – але прям супер-точні цифри тут не дуже важливі, цікавить саме тренд, тому ОК.
Взагалі, при побудові саме графіків можна не прописувати _time:$__range, бо це виконується в самій VMLogs “під капотом”, але тут нехай буде для ясності.
Kubernetes Pods IN From IP
Наступним відобразимо топ Kubernetes Pods IP по отриманому з інтернету трафіку.
Запит Loki
Для Loki використовуємо sum_over_time() за $__range, у нас в дашборді це 30 хвилин:
topk(5,
sum by (pkt_src_addr) (
sum_over_time(
(
{logtype="flow", logger_type="loki"}
| pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
| interface_id="eni-0352f8c82da6aa229"
| action="ACCEPT"
| pkt_dst_addr=ip("10.0.32.0/20")
| pkt_src_addr=~"${remote_svc_ip}"
| pkt_dst_addr=~"${kubernetes_pod_ip}"
| unwrap bytes
| __error__=""
)[$__range]
)
)
)
Див. Grafana Data links – дуже корисна штука.
Запит VictoriaLogs
І аналогічний запит для VictoriaLogs буде виглядати так:
_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
| filter
interface_id:="eni-0352f8c82da6aa229"
action:=ACCEPT
pkt_dst_addr:ipv4_range("10.0.32.0/20")
pkt_dst_addr:~"${kubernetes_pod_ip}"
pkt_src_addr:~"${remote_svc_ip}"
| stats by (pkt_src_addr) sum(bytes) sum_bytes
| sort by (sum_bytes) desc limit 10
VictoriaLogs поки не підтримує Options для Legend і повертає результат просто в JSON.
Тому, аби все було красиво і без зайвих даних – можемо додати Transformations > Rename fields by regex, в якому з регуляркою .*addr="(.*)".* “виріжемо” тільки IP-адреси:
І що ми маємо:
- в Loki у нас в топі 20.150.90.164 з 954 МБ
- в VictoriaLogs в топі 20.150.90.164 з 954 МБ
І цілому дані схожі, хоча в Loki трохи відрізняється сортування, знов-таки – через невелику затримку. Ну і topk() в Loki працює трохи дивно, я колись намагався покопати цей момент, але забив. В VictoriaLogs limit працює краще (хоча теж є баг, далі побачимо).
Давайте перевіримо IP 20.150.90.164 в CloudWatch Logs Insights з таким запитом:
parse @message "* * * * * * * * * * * * * * * * * * *" | as region, vpc_id, az_id, subnet_id, instance_id, interface_id, | flow_direction, srcaddr, dstaddr, srcport, dstport, | pkt_srcaddr, pkt_dstaddr, pkt_src_aws_service, pkt_dst_aws_service, | traffic_path, packets, bytes, action | filter ((interface_id="eni-0352f8c82da6aa229") AND (isIpv4InSubnet(pkt_dstaddr,"10.0.32.0/20"))) | stats sum(bytes) as bytes_total by pkt_srcaddr | sort bytes_total desc
Дані в VictoriaLogs більш схожі на правду, але в цілому обидві системи виводять дані правильно.
Знов-таки, якщо брати більший проміжок часу (чого ми не можемо зробити з Loki, але можемо в VictoriaLogs) – то дані в CloudWatch Logs та VictoriaLogs будуть ще більш точні.
Kubernetes Pods IN From IP bytes/sec
Тут аналогічно тому, як ми робили для панельки “NAT Gateway Total IN processed” – аби мати історичну картину по трафіку.
Запит Loki
topk(5,
sum by (pkt_src_addr) (
rate(
(
{logtype="flow", logger_type="loki"}
| pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
| interface_id="eni-0352f8c82da6aa229"
| action="ACCEPT"
| pkt_dst_addr=ip("10.0.32.0/20")
| pkt_src_addr=~"${remote_svc_ip}"
| pkt_dst_addr=~"${kubernetes_pod_ip}"
| unwrap bytes
| __error__=""
)[$__interval]
)
)
)
Запит VictoriaLogs
_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
| filter
interface_id:="eni-0352f8c82da6aa229"
action:="ACCEPT"
pkt_dst_addr:ipv4_range("10.0.32.0/20")
pkt_drc_addr:~"${kubernetes_pod_ip}"
pkt_src_addr:~"${remote_svc_ip}"
| stats by (pkt_src_addr) sum(bytes) bytes_total
| sort by (bytes_total) desc
| math (bytes_total / ($__interval/1s)) as bytes_total
Теж плюс-мінус дані схожі.
Але тут знов є проблема з topk() в Loki – бо задано ліміт в топ-5 результатів, а виводить 11.
В VictoriaLogs також є проблема з limit, наприклад задамо | sort by (bytes_total) desc limit 5:
І в результаті маємо не топ-5 IP, а просто 5 точок на графіку.
Говорив з девелоперами VictoriaMetrics – кажуть, що схоже на баг, завів їм GitHub Issue, подивимось, що буде в найближчих релізах з багфіксами.
Kubernetes Pods IN by IP and Port
Залишилось відобразити інформацію по IP і портам – буває корисно при визначені сервісу, який генерує трафік – див. pkt_src_aws_service, pkt_dst_aws_service та визначення сервісу.
Запит Loki
Використовуємо тип візуалізації Table і такий запит:
topk(10,
sum by (pkt_src_addr, src_port, pkt_dst_addr, dst_port) (
sum_over_time(
(
{logtype="flow", logger_type="loki"}
| pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
| interface_id="eni-0352f8c82da6aa229"
| action="ACCEPT"
| pkt_dst_addr=ip("10.0.32.0/20")
| pkt_src_addr=~"${remote_svc_ip}"
| pkt_dst_addr=~"${kubernetes_pod_ip}"
| unwrap bytes
| __error__=""
)[$__range]
)
)
)
В Fields override задаємо Unit поля Value в bytes(SI), і для кожної колонки – власний Data link.
Змінити заголовки колонок і сховати поле Time можемо в Transformations:
Запит VictoriaLogs
Запит буде таким:
_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
| filter
interface_id:="eni-0352f8c82da6aa229"
action:="ACCEPT"
pkt_dst_addr:ipv4_range("10.0.32.0/20")
pkt_dst_addr:~"${kubernetes_pod_ip}"
pkt_src_addr:~"${remote_svc_ip}"
| stats by (pkt_src_addr, src_port, pkt_dst_addr, dst_port) sum(bytes) bytes_total
| sort by (bytes_total) desc limit 10
Через те, що VictoriaLogs повертає (поки що) результати в JSON – то додамо трансформацію Extract fields.
В Filter fields by name як і для Loki – прибираємо колонку Time.
А в Organize fields by name – міняємо заголовки колонок і робимо сортування колонок:
Фінальний результат та перформанс Loki vs VictoriaLogs
Результат в VictoriaLogs за 12 (!) годин:
І ресурси:
$ kk top pod atlas-victoriametrics-victoria-logs-single-server-0 NAME CPU(cores) MEMORY(bytes) atlas-victoriametrics-victoria-logs-single-server-0 2754m 34Mi
Результат в Loki за 30 хвилин:
І ресурси:
$ kk top pod -l app.kubernetes.io/name=loki,app.kubernetes.io/component=read NAME CPU(cores) MEMORY(bytes) loki-read-89fbb8f5c-874xq 683m 402Mi loki-read-89fbb8f5c-8qpdw 952m 399Mi loki-read-89fbb8f5c-qh8dg 848m 413Mi
Ну і на цьому власне все.
Залишилось дочекатись мінорних апдейтів по датасорсу і самій VictoriaLogs.
Що далі?
А далі все ж хочеться мати поля з Kubernetes Pods IP та адреси зовнішніх ресурсах в полях логів, а не парсити їх в дашборді самою VictoriaLogs – тоді буде можливість взагалі робити виборки за кілька днів або може навіть тижнів.
Для цього підсказали ідею із vector.dev – збирати нею логи з S3, там виконувати трансформації і додавати поля, а потім вже писати ці логи в VictoriaLogs.
Скоріш за все, як буде час, спробую, бо виглядає дуже цікавим рішенням.



























