VictoriaLogs: дашборда в Grafana з AWS VPC Flow Logs – мігруємо з Grafana Loki

Автор |  04/12/2024
 

В попередньому пості – AWS: VPC Flow Logs – логи до S3 та Grafana dashboard з Loki ми створили дашборду в Grafana, яка відображає статистику використання NAT Gateway.

Що саме нас там цікавило – це які Kubernetes Pods використовують найбільше байт, бо це напряму впливає на наші AWS Costs.

І все наче добре з цією бордою, окрім одного – Loki не здатна обробити “сирі” логи і побудувати графіки більше ніж за 30 хвилин, максимум – 1 година, і то вже частина візуалізацій не прогружаються, хоча я намагався її затюнити – див. Grafana Loki: оптимізація роботи – Recording Rules, кешування та паралельні запити.

Тому я вирішив спробувати такий же підхід – з S3 для VPC Flow Logs, Lambda та Promtail – але вже з VictoriaLogs, тим більш з версії 0.8.0 VictoriaLogs Grafana data source вже завели кращу підтримку запитів, і тепер можна будувати візуалізації без Grafana Transformations.

Отже, що будемо робити:

  • швиденько покажу Terraform код, який створює S3 для VPC Flow Logs і AWS Lambda з Promtail, який шле дані до VictoriaLogs
  • створимо нову Grafana dashboard з VictoriaLogs datasource, і перенесемо запити з Loki та її LogQL до VictoriaLogs та LogsQL

Нагадаю з попереднього поста що ми маємо в нашому сетапі:

  • знаємо CIDR приватних сабнетів для Kubernetes Pods
    • у нас використовується тільки одна мережа в us-east-1a AvailabilityZone – 10.0.32.0/20
  • знаємо Elastic Network Interface ID нашого NAT Gateway – він у нас один, тому тут все просто
  • в логах маємо поля pkt_src_addr та pkt_dst_addr, по яким можемо вибирати трафік тільки з/до Kubernetes Pods

Також варто глянути інші пости по цій темі:

Terraform

S3 та Promtail Lambda

Детально тут розписувати не буду, бо в коді наче достатньо коментарів, які описують кожен ресурс. Просто приклад того, як таке можна зробити. Крім того, першу версію модуля описував в Terraform: створення модулю для збору логів AWS ALB в Grafana Loki, але тут трохи перероблений варіант аби мати можливість налаштування і Loki і VictoriaLogs, і не тільки логи ALB, але і VPC Flow Logs.

Отже, як це реалізував я:

  • репозиторій atlas-tf-modules: модулі Terraform, в якому є код для створення S3 бакетів, Lambda, нотифікацій і пермішенів
  • репозиторій atlas-monitoring: код Terraform та Helm-чарт нашого моніторинга, де створюються необхідні ресурси – RDS, різні додаткові S3-бакети, сертифікати AWS ACM, та викликається модуль з atlas-tf-modules/alb-s3-logs для налаштування збору логів з S3 бакетів

Почнемо з самого модуля alb-s3-logs для S3 та Lambda. Про Terraform та модулі писав в Terraform: модулі, Outputs та Variables.

Структура файлів в alb-s3-logs:

$ tree alb-s3-logs/
alb-s3-logs/
|-- README.md
|-- lambda.tf
|-- outputs.tf
|-- s3.tf
`-- variables.tf

Створення S3 buckets

Файл s3.tf – створення бакетів:

# define S3 bucket names from parameteres passed from a calling/root module in the 'atlas-monitoring' repository
locals {

  # ops-1-30-devops-ingress-ops-alb-loki-logs
  # "ops"     "1-30"        "devops"    "ingress"     "ops"     "alb"         "loki"       "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # ops-1-30-devops-vpc-ops-flow-loki-logs
  # "ops"     "1-30"        "devops"    "vpc"         "ops"     "flow"        "loki"       "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs
  logs_bucket_names = { for env in var.app_environments : env => "${var.aws_env}-${var.eks_version}-${var.component}-${var.application}-${env}-${var.aws_service}-${var.logger_type}-logs" }
}

resource "aws_s3_bucket" "s3_logs" {
  for_each = local.logs_bucket_names

  bucket = each.value

  # to drop a bucket, set to `true` first
  # run `terraform apply`
  # then remove the block
  # and run `terraform apply` again
  force_destroy = true
}

# remove logs older than 30 days
resource "aws_s3_bucket_lifecycle_configuration" "bucket_config" {
  for_each = aws_s3_bucket.s3_logs

  bucket = each.value.id

  rule {
    id     = "logs"
    status = "Enabled"

    expiration {
      days = 30
    }
  }
}

# block S3 bucket public access
resource "aws_s3_bucket_public_access_block" "s3_logs_backend_acl" {
  for_each = aws_s3_bucket.s3_logs

  bucket = each.value.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# using the 'var.aws_service == "alb"', attach the S3 bucket Policy to buckets for ALB Logs only
resource "aws_s3_bucket_policy" "s3_logs_alb" {
  for_each = {
    for key, bucket_name in aws_s3_bucket.s3_logs :
    key => bucket_name if var.aws_service == "alb"
  }

  bucket = each.value.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid = "RegionELBLogsWrite"
        Effect = "Allow"
        Principal = {
          AWS = "arn:aws:iam::${var.elb_account_id}:root"
        }
        Action = "s3:PutObject"
        Resource = "arn:aws:s3:::${each.value.id}/AWSLogs/${var.aws_account_id}/*"
      },
      {
        Sid = "PromtailLambdaLogsGet"
        Effect = "Allow"
        Principal = {
          AWS = module.logs_promtail_lambda[each.key].lambda_role_arn
        }
        Action = "s3:GetObject"
        Resource = "arn:aws:s3:::${each.value.id}/*"
      }
    ]
  })
}

# using the 'var.aws_service == "flow"', attach attach the S3 bucket Policy to buckets for VPC Flow Logs only
resource "aws_s3_bucket_policy" "s3_logs_flow" {
  for_each = {
    for key, bucket_name in aws_s3_bucket.s3_logs :
    key => bucket_name if var.aws_service == "flow"
  }

  bucket = each.value.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid = "VPCFlowLogsDeliveryWrite",
        Effect = "Allow",
        Principal = {
          Service = "delivery.logs.amazonaws.com"
        },
        Action = "s3:PutObject",
        Resource = "arn:aws:s3:::${each.value.id}/AWSLogs/${var.aws_account_id}/*",
        Condition = {
          StringEquals = {
            "aws:SourceAccount": "${var.aws_account_id}",
            "s3:x-amz-acl": "bucket-owner-full-control"
          },
          ArnLike = {
            "aws:SourceArn": "arn:aws:logs:us-east-1:${var.aws_account_id}:*"
          }
        }
      },
      {
        Sid = "VPCFlowLogsAclCheck",
        Effect = "Allow",
        Principal = {
          Service = "delivery.logs.amazonaws.com"
        },
        Action = "s3:GetBucketAcl",
        Resource = "arn:aws:s3:::${each.value.id}",
        Condition = {
          StringEquals = {
            "aws:SourceAccount": "${var.aws_account_id}"
          },
          ArnLike = {
            "aws:SourceArn": "arn:aws:logs:us-east-1:${var.aws_account_id}:*"
          }
        }
      },
      {
        Sid = "PromtailLambdaLogsGet"
        Effect = "Allow"
        Principal = {
          AWS = module.logs_promtail_lambda[each.key].lambda_role_arn
        }
        Action = "s3:GetObject"
        Resource = "arn:aws:s3:::${each.value.id}/*"
      }      
    ]
  })
}

# send notifications to a Lambda function with Promtail when a new object is created in the S3 bucket
resource "aws_s3_bucket_notification" "s3_logs_lambda_notification" {
  for_each = aws_s3_bucket.s3_logs

  bucket = each.value.id

  lambda_function {
    lambda_function_arn = module.logs_promtail_lambda[each.key].lambda_function_arn
    events              = ["s3:ObjectCreated:*"]
    filter_prefix       = "AWSLogs/${var.aws_account_id}/"
  }
}

Створення Lambda функцій з Promtail

Файл lambda.tf:

# to allow network connections from S3 buckets IP range
data "aws_prefix_list" "s3" {
  filter {
    name   = "prefix-list-name"
    values = ["com.amazonaws.us-east-1.s3"]
  }
}

# allow connections from S3 and from/to VPC Private Subnets to access Loki and VictoriaLogs
module "logs_security_group_lambda" {
  source  = "terraform-aws-modules/security-group/aws"
  version = "~> 5.2.0"

  # 'ops-1-30-loki-lambda-sg'
  name        = "${var.aws_env}-${var.eks_version}-lambda-${var.logger_type}-sg"
  description = "Security Group for Lambda Egress"

  vpc_id = var.vpc_id

  egress_cidr_blocks      = var.vpc_private_subnets_cidrs
  egress_ipv6_cidr_blocks = []
  egress_prefix_list_ids  = [data.aws_prefix_list.s3.id]

  ingress_cidr_blocks      = var.vpc_private_subnets_cidrs
  ingress_ipv6_cidr_blocks = []

  egress_rules  = ["https-443-tcp"]
  ingress_rules = ["https-443-tcp"]
}

# S3 buckets names:

  # ops-1-30-devops-ingress-ops-alb-loki-logs
  # "ops"     "1-30"        "devops"    "ingress"     "ops"     "alb"         "loki"       "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # ops-1-30-devops-vpc-ops-flow-loki-logs
  # "ops"     "1-30"        "devops"    "vpc"         "ops"     "flow"        "loki"       "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

module "logs_promtail_lambda" {
  source  = "terraform-aws-modules/lambda/aws"
  version = "~> 7.16.0"
  # key: 'ops'
  # value:  'ops-1-30-devops-vpc-ops-flow-loki-logs'
  for_each = aws_s3_bucket.s3_logs

  # build Lambda function name like 'ops-1-30-devops-vpc-ops-flow-loki-logs-logger'
  function_name = "${each.value.id}-${var.logger_type}-logger"
  description   = "Promtail instance to collect logs from S3"

  create_package = false
  # https://github.com/terraform-aws-modules/terraform-aws-lambda/issues/36
  publish = true

  # an error when sending logs from Flow Logs S3:
  # 'Task timed out after 3.05 seconds'
  timeout = 60

  image_uri     = var.promtail_image
  package_type  = "Image"
  architectures = ["x86_64"]

  # component=devops, logtype=alb, environment=ops, logger_type=loki
  # component=devops, logtype=flow, environment=ops, logger_type=loki
  environment_variables = {
    EXTRA_LABELS             = "component,${var.component},logtype,${var.aws_service},environment,${each.key},logger_type,${var.logger_type}"
    KEEP_STREAM              = "true"
    OMIT_EXTRA_LABELS_PREFIX = "true"
    PRINT_LOG_LINE           = "true"
    WRITE_ADDRESS            = var.logger_write_address
  }

  vpc_subnet_ids         = var.vpc_private_subnets_ids
  vpc_security_group_ids = [module.logs_security_group_lambda.security_group_id]
  attach_network_policy  = true

  # writing too many logs
  # see in CloudWatch Metrics by the 'IncomingBytes' metric
  # to save CloudWatch Logs costs, decrease the logs number
  # set to 'INFO' for debugging
  logging_application_log_level = "FATAL"
  logging_system_log_level = "WARN"
  logging_log_format = "JSON"

  # allow calling the Lambda from an S3 bucket
  # bucket name: ops-1-28-backend-api-dev-alb-logs
  allowed_triggers = {
    S3 = {
      principal  = "s3.amazonaws.com"
      source_arn = "arn:aws:s3:::${each.value.id}"
    }
  }
}

Виклик модуля atlas-tf-modules з коду моніторинга

Далі описуємо ресурси в коді Terraform в репозиторії atlas-monitoring – файл logs.tf.

Тут створюється три модулі:

  • Load Balancers logs в Loki
  • VPC Flow Logs в Loki
  • VPC Flow Logs в VictoriaLogs
/*

Collect ALB Logs to Loki module

S3:

- will create an aws_s3_bucket for each app_environments[]:
  # bucket names:
  # '<eks_env>-<eks_version>-<component>-<application>-<app_env>-alb-logs'
  # i.e:
  # 'ops-1-28-backend-api-dev-alb-logs'
- will create an aws_s3_bucket_policy with Allow for each Lambda
- will create an aws_s3_bucket_notification with Push event on each s3:ObjectCreated to each Lambda

Lambda:

- will create a security_group_lambda with Allow 443 from VPC CIDR
- will create a Lambda with Promtail for each aws_s3_bucket

*/

module "vpc_flow_logs_loki" {
  # create the module for each EKS cluster by its version
  # for_each = var.eks_versions
  for_each = toset(["1-30"])
  source = "[email protected]:ORG-NAME/atlas-tf-modules//alb-s3-logs?ref=master"
  # for local development
  # source = "/home/setevoy/Work/atlas-tf-modules//alb-s3-logs"

  # ops-1-30-devops-ingress-ops-alb-loki-logs
  # "ops"     "1-30"        "devops"    "ingress"     "ops"     "alb"         "loki"       "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # ops-1-30-devops-vpc-ops-flow-loki-logs
  # "ops"     "1-30"        "devops"    "vpc"         "ops"     "flow"        "loki"       "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # 'ops'
  aws_env          = var.aws_environment
  # '1-30'
  eks_version = each.value
  # by team: 'backend', 'devops'
  component        = "devops"
  application      = "vpc"
  app_environments = ["ops"]
  aws_service = "flow"
  logger_type = "loki"

  vpc_id                    = local.vpc_out.vpc_id
  vpc_private_subnets_cidrs = local.vpc_out.vpc_private_subnets_cidrs
  vpc_private_subnets_ids   = local.vpc_out.vpc_private_subnets_ids
  # 'https://loki.monitoring.1-30.ops.example.co:443/loki/api/v1/push'
  logger_write_address        = "https://loki.monitoring.${each.value}.ops.example.com:443/loki/api/v1/push"
  aws_account_id            = data.aws_caller_identity.current.account_id
}

module "vpc_flow_logs_vmlogs" {
  # create the module for each EKS cluster by its version
  # for_each = var.eks_versions
  for_each = toset(["1-30"])
  source = "[email protected]:ORG-NAME/atlas-tf-modules//alb-s3-logs?ref=master"
  # for local development
  # source = "/home/setevoy/Work/atlas-tf-modules//alb-s3-logs"

  # ops-1-30-devops-ingress-ops-alb-loki-logs
  # "ops"     "1-30"        "devops"    "ingress"     "ops"     "alb"         "loki"       "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # ops-1-30-devops-vpc-ops-flow-loki-logs
  # "ops"     "1-30"        "devops"    "vpc"         "ops"     "flow"        "loki"       "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # 'ops'
  aws_env          = var.aws_environment
  # '1-30'
  eks_version = each.value
  # by team: 'backend', 'devops'
  component        = "devops"
  application      = "vpc"
  app_environments = ["ops"]
  aws_service = "flow"
  logger_type = "vmlogs"

  vpc_id                    = local.vpc_out.vpc_id
  vpc_private_subnets_cidrs = local.vpc_out.vpc_private_subnets_cidrs
  vpc_private_subnets_ids   = local.vpc_out.vpc_private_subnets_ids
  # create log streams by the 'logtype,environment,logger_type' fields
  # see https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields
  logger_write_address        = "https://vmlogs.monitoring.${each.value}.ops.example.com:443/insert/loki/api/v1/push?_stream_fields=logtype,environment,logger_type"
  aws_account_id            = data.aws_caller_identity.current.account_id
}

# ../../atlas-load-balancers/helm/templates/external-ingress-alb.yaml:    
# alb.ingress.kubernetes.io/load-balancer-attributes: access_logs.s3.enabled=true,access_logs.s3.bucket=ops-1-30-devops-ingress-ops-alb-logs
# two ALB are using this buckets for their logs - the External, 'ops-external-ingress', and the Internal one, 'ops-internal-ingress'
# both are in the 'ops-common-alb-ns' Namespace
module "single_ingress_alb_logs_loki" {
  # create the module for each EKS cluster by its version
  # for_each = var.eks_versions
  for_each = toset(["1-30"])
  source = "[email protected]:ORG-NAME/atlas-tf-modules//alb-s3-logs?ref=master"
  # for local development
  # source = "/home/setevoy/Work/atlas-tf-modules//alb-s3-logs"

  # ops-1-30-devops-ingress-ops-alb-loki-logs
  # "ops"     "1-30"        "devops"    "ingress"     "ops"     "alb"         "loki"       "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # ops-1-30-devops-vpc-ops-flow-loki-logs
  # "ops"     "1-30"        "devops"    "vpc"         "ops"     "flow"        "loki"       "logs"
  # <aws_env>-<eks_version>-<component>-<application>-<app_env>-<aws_service>-<logger_type>-logs

  # 'ops'
  aws_env          = var.aws_environment
  # '1-30'
  eks_version = each.value
  component        = "devops"
  application      = "ingress"
  app_environments = ["ops"]
  aws_service = "alb"
  logger_type = "loki"

  vpc_id                    = local.vpc_out.vpc_id
  vpc_private_subnets_cidrs = local.vpc_out.vpc_private_subnets_cidrs
  vpc_private_subnets_ids   = local.vpc_out.vpc_private_subnets_ids
  # 'https://loki.monitoring.1-30.ops.example.co:443/loki/api/v1/push'
  logger_write_address        = "https://loki.monitoring.${each.value}.ops.example.com:443/loki/api/v1/push"
  aws_account_id            = data.aws_caller_identity.current.account_id
}

З цим наче все.

Модуль VPC та Flow Logs

В модулі terraform-aws-modules/vpc/aws є підтримка Flow Logs, але там можна задати тільки один flow_log_destination_arn, в якому в мене зараз Grafana Loki – S3-бакет ops-1-30-devops-vpc-ops-flow-loki-logs:

module "vpc" {
  source  = "terraform-aws-modules/vpc/aws"
  version = "~> 5.16.0"

  ...

  enable_flow_log = var.vpc_params.enable_flow_log

  # Default: "cloud-watch-logs"
  flow_log_destination_type = "s3"

  # disalbe to use S3
  create_flow_log_cloudwatch_log_group = false
  create_flow_log_cloudwatch_iam_role  = false

  # ARN of the CloudWatch log group or S3 bucket
  # disable if use 'create_flow_log_cloudwatch_log_group' and the default 'flow_log_destination_type' value (cloud-watch-logs)
  flow_log_destination_arn = "arn:aws:s3:::ops-1-30-devops-vpc-ops-flow-loki-logs"

  flow_log_cloudwatch_log_group_name_prefix = "/aws/${local.env_name}-flow-logs/"
  flow_log_log_format = "$${region} $${vpc-id} $${az-id} $${subnet-id} $${instance-id} $${interface-id} $${flow-direction} $${srcaddr} $${dstaddr} $${srcport} $${dstport} $${pkt-srcaddr} $${pkt-dstaddr} $${pkt-src-aws-service} $${pkt-dst-aws-service} $${traffic-path} $${packets} $${bytes} $${action}"

  vpc_flow_log_tags = {
    "Name" = "flow-logs-s3-to-loki"
  }
}

Аби писати відразу в два S3 бакета – просто додаємо ресурс aws_flow_log.

VPC Flow Logs пишуться з custom format:

resource "aws_flow_log" "vpc_flow_vmlogs" {
  vpc_id               = module.vpc.vpc_id
  log_destination      = "arn:aws:s3:::ops-1-30-devops-vpc-ops-flow-vmlogs-logs"
  log_destination_type = "s3"
  traffic_type         = "ALL"
  log_format = "$${region} $${vpc-id} $${az-id} $${subnet-id} $${instance-id} $${interface-id} $${flow-direction} $${srcaddr} $${dstaddr} $${srcport} $${dstport} $${pkt-srcaddr} $${pkt-dstaddr} $${pkt-src-aws-service} $${pkt-dst-aws-service} $${traffic-path} $${packets} $${bytes} $${action}"
  tags = {
    "Name" = "flow-logs-s3-to-vmlogs"
  }
}

Крім того, я ще вручну створив Flow Logs з destination в CloudWatch Logs аби перевіряти дані в Loki та VictoriaLogs.

Створення Grafana dashboard

NAT Gateway Total processed

Першим у нас йде відображення загальної статистики по тому, скільки через NAT Gateway пройшло трафіку – і від Kubernetes Pods в інтернет, і з інтернету до Kubernetes Pods.

Запит в Loki

В Loki запит виглядає так:

sum (
    sum_over_time(
        ({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | pkt_src_addr=ip("10.0.32.0/20") OR pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${kubernetes_pod_ip}"
        | pkt_dst_addr=~"${remote_svc_ip}"
        | unwrap bytes
        | __error__=""
        )[$__range]
    )
)

Тут:

  • рахуємо sum_over_time() за період, вибраний в Grafana dashboard – $__range
  • рахуємо трафік або від Kubernetes Pods – pkt_src_addr=ip("10.0.32.0/20"), або навпаки до Kubernetes Pods – pkt_dst_addr=ip("10.0.32.0/20")
  • рахуємо по полю bytesunwrap bytes

З таким запитом маємо такі дані:

kubernetes_pod_ip та remote_svc_ip – змінні в Grafana dashboard аби мати можливість перевірки даних по конкретним адресам:

Запит з VictoriaLogs

Тепер нам треба перевести цей запит в формат LogsQL для VictoriaLogs.

Виглядати він буде так:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      (pkt_src_addr:ipv4_range("10.0.32.0/20") OR pkt_dst_addr:ipv4_range("10.0.32.0/20"))
      pkt_src_addr:~"${kubernetes_pod_ip}"
      pkt_dst_addr:~"${remote_svc_ip}"   
  | stats sum(bytes) bytes_total

Підтримку змінної Grafana $__range завезли тільки вчора в версії датасорсу 0.9.0, тому оновіться.

Тут ми:

  • вибираємо дані за _time:$__range
  • в {logtype=flow, environment=ops, logger_type=vmlogs} використовуємо log stream selector з лейблами, які задаються в Lambda Promtail під час запису логів – /insert/loki/api/v1/push?_stream_fields=logtype,environment,logger_type
  • seq("eni-0352f8c82da6aa229", "ACCEPT") – використовуємо Sequence filter – вибираємо тільки записи з інтерфейсу NAT Gateway і ACCEPT аби пришвидшити виконання запиту (див. коментар від Olexandr Valialkin тут>>>)
  • з extract формуємо поля із записів в логу
  • з filter вибираємо інтерфейсу NAT Gateway, ACCEPT, і як і в запиті Loki – фільтруємо трафік або від Kubernetes Pods з IPv4 range filterpkt_src_addr:ipv4_range("10.0.32.0/20"), або навпаки до Kubernetes Pods – pkt_dst_addr:ipv4_range("10.0.32.0/20") (зверніть увагу, що умови OR заключені в дужки)
  • і в кінці з stats рахуємо суму по полю bytes, а результат пишемо в поле bytes_total

Перевірка з CloudWatch Logs Insights

Аби мати можливість перевірити дані в Loki і VictoriaLogs, VPC зараз пише ще й в CloudWatch Logs.

Зробимо такий запит в Logs Insights:

parse @message "* * * * * * * * * * * * * * * * * * *" 
| as region, vpc_id, az_id, subnet_id, instance_id, interface_id, 
| flow_direction, srcaddr, dstaddr, srcport, dstport, 
| pkt_srcaddr, pkt_dstaddr, pkt_src_aws_service, pkt_dst_aws_service, 
| traffic_path, packets, bytes, action 
| filter ((interface_id="eni-0352f8c82da6aa229") AND ((isIpv4InSubnet(pkt_srcaddr,"10.0.32.0/20") OR isIpv4InSubnet(pkt_dstaddr,"10.0.32.0/20") )) | stats sum(bytes) as bytes_total

В результаті маємо 8547192734 байт:

Що в форматі SI (див. Binary prefixes) дає нам 1.87 гігабайт – рахуємо з калькулятором:

$ bc
scale=2
8547192734/1000/1000/1000
8.54

В Loki у нас було 7.56 GiB, в VictoriaLogs – 8.66 GiB.

Інколи ті ж самі дані між Loki, VictoriaLogs та CloudWatch можуть відрізнятись, тим більш при виборках всього за 30 хвилин, бо самі Flow Logs пишуться з різницею в кілька хвилин.

Наприклад, в бакеті Loki останній об’єкт створено в 13:06:50:

А в VMLogs – в 13:05:29:

Перевірка з Cost Explorer

Ще можна перевірити дані в Cost Explorer.

Вибираємо Service == EC2-Other, Usage type == NatGateway-Bytes (GB):

За минулу добу маємо 129 гігабайт трафіку через NAT Gateway.

Якщо ми в Grafana (нарешті ми це можемо зробити, бо є VictoriaLogs) зробимо range в 24 години – то побачимо в “NAT Gateway Total processed” 135 гігабайт:

Плюс-мінус сходиться, бо Cost Explorer рахує не останні 24 години, як в Grafana, а за попередню добу, крім того, там використовується UTC (+00:00) time zone.

NAT Gateway Total OUT та IN processed

Далі, хочеться бачити розподілення трафіку – від Kubernetes Pods в інтернет, та з інтернету до Kubernetes Pods.

Згадаємо, що ми маємо в записах для пакетів, які проходять через NAT Gateway – розбирали в Трафік з Pod до External Server через NAT Gateway, та VPC Flow Logs:

  • по полю interface_id фільтруємо тільки ті записи, які були зроблені з інтерфейсу NAT Gateway
  • якщо пакет йде від Kubernetes Pod в інтернет – то в полі pkt_src_addr буде IP цього Pod
  • якщо пакет йде з інтернету до Kubernetes Pod –  то в полі pkt_dst_addr буде IP цього Pod

Запити Loki

Тому аби порахувати байти з інтернету – до Kubernetes Pods ми можемо зробити такий запит в Loki з sum_over_time() та $__range, аби вибрати дані за 30 хвилин, а в pkt_dst_addr=ip("10.0.32.0/20") вибираємо IP тільки VPC Private Subnet, яка використовується для Kubernetes Pods:

sum (
    sum_over_time(
        ({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${kubernetes_pod_ip}"
        | pkt_dst_addr=~"${remote_svc_ip}"
        | unwrap bytes
        | __error__=""
        )[$__range]
    )
)

Аби запит швидше оброблювався, внизу в Options можна поставити Type == Instant.

І аналогічно, але рахуємо від Kubernetes Pod в інтернет:

sum (
    sum_over_time(
        ({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | flow_direction="ingress"
        | pkt_src_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${kubernetes_pod_ip}"
        | pkt_dst_addr=~"${remote_svc_ip}"
        | unwrap bytes
        | __error__=""
        )[$__range]
    )
)

Запити VictoriaLogs

Запити для VictoriaLogs будуть виглядати так – з інтернету до Kubernetes Pods:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_dst_addr:ipv4_range("10.0.32.0/20")
      pkt_src_addr:~"${kubernetes_pod_ip}"
      pkt_dst_addr:~"${remote_svc_ip}"        
  | stats sum(bytes) bytes_total

З Kubernetes Pods в інтернет:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_src_addr:ipv4_range("10.0.32.0/20")
      pkt_src_addr:~"${kubernetes_pod_ip}"
      pkt_dst_addr:~"${remote_svc_ip}"  
  | stats sum(bytes) bytes_total

І всі три панелі разом:

NAT Gateway Total processed bytes/sec

Крім Stat панелей хотілося б бачити і історичну картину – в який час як мінявся трафік.

Запит Loki

В Loki все просто – просто використовуємо функцію rate():

sum (
    rate(
        ({logtype="flow", logger_type="loki"} | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_add> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${kubernetes_pod_ip}"
        | pkt_dst_addr=~"${remote_svc_ip}"
        | unwrap bytes
        | __error__=""
        )[$__interval]
    )
)

В Options і в rate() використовуємо інтервал 5 хвилин, в Standart Options > Unit bytes/sec(SI), в результаті маємо 7.25 МБ/с в 12:20:

Запит VictoriaLogs

А з VictoriaLogs трохи цікавіше, бо з коробки вона не має функції rate() (але обіцяють скоро додати).

Крім того, є ще один нюанс:

  • Loki рахує дані “назад”, тобто – точка на графіку в 12:25 а rate() бере попередні 5 хвилин – [5m] з Options, які передаються в $__interval
  • в VictoriaLogs графік буде відображатись на момент виконання запиту

Аби порахувати per-second rate наших bytes – можемо використати  math pipe.

Отже, запит буде таким:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_dst_addr:ipv4_range("10.0.32.0/20")
      pkt_src_addr:~"${kubernetes_pod_ip}"
      pkt_dst_addr:~"${remote_svc_ip}"        
  | stats sum(bytes) sum_bytes
  | math (sum_bytes / ($__interval/1s)) as bytes_total

Тут:

  • в stats sum(bytes) рахуємо суму байт за інтервал, заданий в Options (5 хвилин), результат зберігаємо як sum_bytes
  • далі з math рахуємо суму байт з sum_bytes за кожен інтервал на графіку, і їх ділимо на кількість секунд в обраному $__interval

Тут у нас 8.30 МБ/с в 12:20. Плюс-мінус схоже. Можна вже зовсім заморочитись з перевіркою, і порахувати вручну з логів – але прям супер-точні цифри тут не дуже важливі, цікавить саме тренд, тому ОК.

Взагалі, при побудові саме графіків можна не прописувати _time:$__range, бо це виконується в самій VMLogs “під капотом”, але тут нехай буде для ясності.

Kubernetes Pods IN From IP

Наступним відобразимо топ Kubernetes Pods IP по отриманому з інтернету трафіку.

Запит Loki

Для Loki використовуємо sum_over_time() за $__range, у нас в дашборді це 30 хвилин:

topk(5,
  sum by (pkt_src_addr) (
    sum_over_time(
      (
        {logtype="flow", logger_type="loki"}
        | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | action="ACCEPT"
        | pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${remote_svc_ip}"
        | pkt_dst_addr=~"${kubernetes_pod_ip}"
        | unwrap bytes
        | __error__=""
      )[$__range]
    )
  )
)

Див. Grafana Data links – дуже корисна штука.

Запит VictoriaLogs

І аналогічний запит для VictoriaLogs буде виглядати так:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter 
    interface_id:="eni-0352f8c82da6aa229"
    action:=ACCEPT
    pkt_dst_addr:ipv4_range("10.0.32.0/20")
    pkt_dst_addr:~"${kubernetes_pod_ip}"
    pkt_src_addr:~"${remote_svc_ip}"    
  | stats by (pkt_src_addr) sum(bytes) sum_bytes
  | sort by (sum_bytes) desc limit 10

VictoriaLogs поки не підтримує Options для Legend і повертає результат просто в JSON.

Тому, аби все було красиво і без зайвих даних – можемо додати Transformations > Rename fields by regex, в якому з регуляркою .*addr="(.*)".* “виріжемо” тільки IP-адреси:

І що ми маємо:

  • в Loki у нас в топі 20.150.90.164 з 954 МБ
  • в VictoriaLogs в топі 20.150.90.164 з 954 МБ

І цілому дані схожі, хоча в Loki трохи відрізняється сортування, знов-таки – через невелику затримку. Ну і topk() в Loki працює трохи дивно, я колись намагався покопати цей момент, але забив. В VictoriaLogs limit працює краще (хоча теж є баг, далі побачимо).

Давайте перевіримо IP 20.150.90.164 в CloudWatch Logs Insights з таким запитом:

parse @message "* * * * * * * * * * * * * * * * * * *" 
| as region, vpc_id, az_id, subnet_id, instance_id, interface_id, 
| flow_direction, srcaddr, dstaddr, srcport, dstport, 
| pkt_srcaddr, pkt_dstaddr, pkt_src_aws_service, pkt_dst_aws_service, 
| traffic_path, packets, bytes, action 
| filter ((interface_id="eni-0352f8c82da6aa229") AND (isIpv4InSubnet(pkt_dstaddr,"10.0.32.0/20"))) | stats sum(bytes) as bytes_total by pkt_srcaddr
| sort bytes_total desc

Дані в VictoriaLogs більш схожі на правду, але в цілому обидві системи виводять дані правильно.

Знов-таки, якщо брати більший проміжок часу (чого ми не можемо зробити з Loki, але можемо в VictoriaLogs) – то дані в CloudWatch Logs та VictoriaLogs будуть ще більш точні.

Kubernetes Pods IN From IP bytes/sec

Тут аналогічно тому, як ми робили для панельки “NAT Gateway Total IN processed” – аби мати історичну картину по трафіку.

Запит Loki

topk(5,
  sum by (pkt_src_addr) (
    rate(
      (
        {logtype="flow", logger_type="loki"}
        | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | action="ACCEPT"
        | pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${remote_svc_ip}"
        | pkt_dst_addr=~"${kubernetes_pod_ip}"
        | unwrap bytes
        | __error__=""
      )[$__interval]
    )
  )
)

Запит VictoriaLogs

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_dst_addr:ipv4_range("10.0.32.0/20")
      pkt_drc_addr:~"${kubernetes_pod_ip}"
      pkt_src_addr:~"${remote_svc_ip}"
  | stats by (pkt_src_addr) sum(bytes) bytes_total
  | sort by (bytes_total) desc
  | math (bytes_total / ($__interval/1s)) as bytes_total

Теж плюс-мінус дані схожі.

Але тут знов є проблема з topk() в Loki – бо задано ліміт в топ-5 результатів, а виводить 11.

В VictoriaLogs також є проблема з limit, наприклад задамо | sort by (bytes_total) desc limit 5:

І в результаті маємо не топ-5 IP, а просто 5 точок на графіку.

Говорив з девелоперами VictoriaMetrics – кажуть, що схоже на баг, завів їм GitHub Issue, подивимось, що буде в найближчих релізах з багфіксами.

Kubernetes Pods IN by IP and Port

Залишилось відобразити інформацію по IP і портам – буває корисно при визначені сервісу, який генерує трафік – див. pkt_src_aws_service, pkt_dst_aws_service та визначення сервісу.

Запит Loki

Використовуємо тип візуалізації Table і такий запит:

topk(10,
  sum by (pkt_src_addr, src_port, pkt_dst_addr, dst_port) (
    sum_over_time(
      (
        {logtype="flow", logger_type="loki"}
        | pattern `<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>`
        | interface_id="eni-0352f8c82da6aa229"
        | action="ACCEPT"
        | pkt_dst_addr=ip("10.0.32.0/20")
        | pkt_src_addr=~"${remote_svc_ip}"
        | pkt_dst_addr=~"${kubernetes_pod_ip}"
        | unwrap bytes
        | __error__=""
      )[$__range]
    )
  )
)

В Fields override задаємо Unit поля Value в bytes(SI), і для кожної колонки – власний Data link.

Змінити заголовки колонок і сховати поле Time можемо в Transformations:

Запит VictoriaLogs

Запит буде таким:

_time:$__range {logtype=flow, environment=ops, logger_type=vmlogs} seq("eni-0352f8c82da6aa229", "ACCEPT")
| extract "<region> <vpc_id> <az_id> <subnet_id> <instance_id> <interface_id> <flow_direction> <src_addr> <dst_addr> <src_port> <dst_port> <pkt_src_addr> <pkt_dst_addr> <pkt_src_aws_service> <pkt_dst_aws_service> <traffic_path> <packets> <bytes> <action>"
  | filter
      interface_id:="eni-0352f8c82da6aa229"
      action:="ACCEPT"
      pkt_dst_addr:ipv4_range("10.0.32.0/20")
      pkt_dst_addr:~"${kubernetes_pod_ip}"
      pkt_src_addr:~"${remote_svc_ip}"
  | stats by (pkt_src_addr, src_port, pkt_dst_addr, dst_port) sum(bytes) bytes_total
  | sort by (bytes_total) desc limit 10

Через те, що VictoriaLogs повертає (поки що) результати в JSON – то додамо трансформацію Extract fields.

В Filter fields by name як і для Loki – прибираємо колонку Time.

А в Organize fields by name – міняємо заголовки колонок і робимо сортування колонок:

Фінальний результат та перформанс Loki vs VictoriaLogs

Результат в VictoriaLogs за 12 (!) годин:

І ресурси:

$ kk top pod atlas-victoriametrics-victoria-logs-single-server-0 
NAME                                                  CPU(cores)   MEMORY(bytes)   
atlas-victoriametrics-victoria-logs-single-server-0   2754m        34Mi

Результат в Loki за 30 хвилин:

І ресурси:

$ kk top pod -l app.kubernetes.io/name=loki,app.kubernetes.io/component=read
NAME                        CPU(cores)   MEMORY(bytes)   
loki-read-89fbb8f5c-874xq   683m         402Mi           
loki-read-89fbb8f5c-8qpdw   952m         399Mi           
loki-read-89fbb8f5c-qh8dg   848m         413Mi

Ну і на цьому власне все.

Залишилось дочекатись мінорних апдейтів по датасорсу і самій VictoriaLogs.

Що далі?

А далі все ж хочеться мати поля з Kubernetes Pods IP та адреси зовнішніх ресурсах в полях логів, а не парсити їх в дашборді самою VictoriaLogs – тоді буде можливість взагалі робити виборки за кілька днів або може навіть тижнів.

Для цього підсказали ідею із vector.dev – збирати нею логи з S3, там виконувати трансформації і додавати поля, а потім вже писати ці логи в VictoriaLogs.

Скоріш за все, як буде час, спробую, бо виглядає дуже цікавим рішенням.