Golang: запис логів AWS Loab Balancer до VictoriaLogs

Автор |  26/11/2025
 

Наступна задача, яку хочеться вирішити з Go – це написати власний logs collector для збору логів AWS Load Balancer з AWS S3 і запису їх до VictoriaLogs.

Це, звісно, можна було б вирішити просто з Vector.dev, як це робив для AWS VPC Flow Logs, див. Vector.dev: знайомство, логи з AWS S3 та інтеграція з VictoriaLogs, але є можливість трохи попрактикуватись в Go, тому будемо робити власний колектор.

Тож основна ідея зараз така:

  • Load Balancer пише логи до S3
  • в S3 створюємо нотифікацію до AWS SQS
  • наш колектор опитує SQS, отримує інформацію про нові об’єкти в S3
  • робить запит до S3, отримує gz-архів
  • розпаковує, парсить дані, і відправляє до VictoriaLogs

Поїхали.

Налаштування S3 та SQS notifications

Коли продумував ідею, то головне питання було – як знати, які об’єкти в S3 ми вже обробили, а які ні?

Мати якусь базу, в яку писати інформацію про вже оброблені об’єкти – перший варіант. Але з часом записів про такі обєкти буде все більше, плюс не дуже хочеться тягнути якийсь stateful-сервіс в Kubernetes, де потім буде запускатись наш колектор.

Тому робимо простіше і надійніше, так само як це зроблено з Vector.dev та VPC Flow Logs – створимо SQS чергу, в яку будуть приходити повідомлення про нові S3 objects.

Читаємо повідомлення з SQS, отримуємо з них інформацію про нові файли, оброблюємо файли, видаляємо меседж із SQS queue.

Див. документацію AWS – Walkthrough: Configuring a bucket for notifications (SNS topic or SQS queue).

Створення SQS queue

Створюємо нову queue, поки руками, потім зробимо нормально, з Terraform.

Тип queue лишаємо Standart, бо:

Amazon Simple Queue Service FIFO (First-In-First-Out) queues aren’t supported as an Amazon S3 event notification destination

Див. Amazon S3 Event Notifications.

Крім того, нам порядок нам не важливий, бо в логах є власний timestamp, який ми і будемо парсити.

Переходимо до SQS, створюємо нову чергу:

Дал, в Access policy описуємо політику.

Писати в чергу у нас буде S3, а читати – Kubernetes Pod із ServiceAccount.

ServiceAccount і IAM Role для нього будемо робити вже потім, потім просто даємо право читати всім з нашого AWS Account.

А для S3 додаємо дозвіл на SQS:SendMessage:

{
    "Version": "2012-10-17",
    "Id": "loggerID",
    "Statement": [
        {
            "Sid": "sendFromS3LogsAllow",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": [
                "SQS:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:492***148:testing-alb-logs-s3-notifier",
            "Condition": {
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:s3:*:*:ops-1-33-devops-ingress-ops-alb-loki-logs"
                }
            }
        },
        {
            "Sid": "receiveFromQueueAllowSameAccount",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::492***148:root"
            },
            "Action": [
                "SQS:ReceiveMessage",
                "SQS:DeleteMessage",
                "SQS:GetQueueAttributes",
                "SQS:GetQueueUrl"
            ],
            "Resource": "arn:aws:sqs:us-east-1:492***148:testing-alb-logs-s3-notifier"
        }
    ]
}

Решту залишаємо дефолтним.

Налаштування S3 Event notifications

Переходимо до S3 з логами, Properties > Event notifications:

Створюємо новий Event notification – відправляти повідомлення про всі операції s3:ObjectCreated:

В Destination вибираємо SQS і нашу чергу:

Чекаємо кілька хвилин, перевіряємо Monitoring в SQS:

Окей – є повідомлення, тепер їх треба зібрати і прочитати.

Переходимо до Go.

AWS SDK for Go

Для роботи з AWS нам буде потрібен AWS SDK for Go, з яким можемо виконати всі потрібні операції.

Для отримання даних доступу використовуємо aws-sdk-go-v2/config і функцію LoadDefaultConfig(), яка виконує стандартний пошук credentials – в змінних оточення, файлах ~/.aws/credentials та ~/.aws/config, або використовує EC2 IAM Roles.

До LoadDefaultConfig() першим аргументом потрібно передати context, писав про нього у Golang: створення OpenAI Exporter для VictoriaMetrics, поки запускаємо з context.Background(), потім напишемо обробку і завершення роботи:

package main

import (
  "context"
  "log"

  "github.com/aws/aws-sdk-go-v2/config"
)

func main() {
  // TODO: add exit handler
  ctx := context.Background()
  // Load the Shared AWS Configuration (~/.aws/config)
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }
}

Go AWS SDK SQS

Для роботи з SQS в AWS SDK є окремий пакет sqs.

SQS client

Додаємо його імпорт, додаємо створення клієнту з NewFromConfig(), якому передаємо AWS config, який створили вище.

Для роботи з SQS queue треба мати URL – отримуємо його з GetQueueUrl().

Додаємо отримання QueueName зі змінних оточення, бо потім в Kubernetes будемо передавати із Helm chart values:

package main

import (
  "context"
  "fmt"
  "log"
  "os"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/config"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
)

func main() {
  // TODO: add exit handler
  ctx := context.Background()

  // Load the Shared AWS Configuration (~/.aws/config)
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // create SQS client using the shared AWS config
  sqsClient := sqs.NewFromConfig(cfg)

  // read queue name from environment
  queueName := os.Getenv("ALB_LOGS_QUEUE")
  if queueName == "" {
    log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
  }

  // request queue URL by name
  getURLResp, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
    QueueName: aws.String(queueName),
  })
  if err != nil {
    log.Fatal(err)
  }

  queueURL := getURLResp.QueueUrl
  fmt.Println("Queue URL:", *queueURL)
}

Виконуємо go mod init:

$ go mod init alb-logs-collector-poc
$ go mod tidy

Задаємо змінну з іменем черги:

$ export ALB_LOGS_QUEUE="testing-alb-logs-s3-notifier"

Запускаємо наш код:

$ go run main.go 
Queue URL: https://sqs.us-east-1.amazonaws.com/492***148/testing-alb-logs-s3-notifier

ОК, тепер можемо отримати повідомлення.

SQS ReceiveMessage()

Читаємо нові меседжи в queue з ReceiveMessage(), куди передаємо конфіг ReceiveMessageInput:

...
  // receive a single message from SQS
  msgResp, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
    QueueUrl:            queueURL, // URL returned by the GetQueueUrl()
    MaxNumberOfMessages: 1,        // receive only one message
    WaitTimeSeconds:     10,       // enable long polling
  })
  if err != nil {
    log.Fatal(err)
  }

  if len(msgResp.Messages) == 0 {
    fmt.Println("no messages received")
    return
  }
  // print the received message body
  fmt.Println("Received message:", aws.ToString(msgResp.Messages[0].Body))
...

Перевіряємо:

$ go run main.go | jq
{
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      ...
      "eventName": "ObjectCreated:Put",
      ...
      "s3": {
        ...
        "bucket": {
          "name": "ops-1-33-devops-ingress-ops-alb-loki-logs",
        ...
        "object": {
          "key": "AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T0935Z_34.***.***.15_25hsfvwt.log.gz",
          ...
        }
      }
    }
  ]
}

Нас тут цікавлять два поля – bucket.name та object.key.

Створюємо struct і з json.Unmarshal() заносимо в неї дані, парсинг JSON розбирав в тому ж пості про OpenAI Exporter в частині Створення Go struct для JSON Unmarshall:

...
  // define a struct to unmarshal S3 event message
  type S3Event struct {
    Records []struct {
      S3 struct {
        Bucket struct {
          Name string `json:"name"`
        } `json:"bucket"`
        Object struct {
          Key string `json:"key"`
        } `json:"object"`
      } `json:"s3"`
    } `json:"Records"`
  }

  // take SQS message body
  msgBody := aws.ToString(msgResp.Messages[0].Body)

  // decode JSON into the struct
  var event S3Event
  if err := json.Unmarshal([]byte(msgBody), &event); err != nil {
    log.Fatal("failed to parse S3 event:", err)
  }

  // extract bucket and key
  // we always have 1 message, so use [0]
  bucket := event.Records[0].S3.Bucket.Name
  key := event.Records[0].S3.Object.Key

  fmt.Println("bucket:", bucket)
  fmt.Println("key:", key)
...

Перевіряємо ще раз:

$ go run main.go 
bucket: ops-1-33-devops-ingress-ops-alb-loki-logs
key: AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T0935Z_52.***.***.213_60mjhvf6.log.gz

Можемо переходити до S3.

Go AWS SDK S3

Аналогічно до SQS – використовуємо пакет s3.

S3 client

Створюємо клієнт з NewFromConfig():

...
  // create S3 client using the shared AWS config
  s3Client := s3.NewFromConfig(cfg)
...

S3 GetObject()

Додаємо читання файлу з GetObject().

Теж аналогічно з SQS – передаємо GetObjectInput:

...
  // fetch the S3 object and return its streaming body
  objResp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
    Bucket: aws.String(bucket),
    Key:    aws.String(key),
  })
  if err != nil {
    log.Fatal("failed to download object:", err)
  }
  defer objResp.Body.Close()

  fmt.Println("S3 object stream opened:", bucket, key)
...

GetObject() повертає *GetObjectOutput, в якому є поле Body з типом io.ReadCloser, а io.ReadCloser – це інтерфейс, який визначає два методи – Reader та Closer.

Читання файлу з gzip

Логи в S3 зберігаються в gz, тому додаємо пакет gzip, і з NewReader() читаємо дані:

...
  // create gzip reader from S3 object stream
  gzReader, err := gzip.NewReader(objResp.Body)
  if err != nil {
    log.Fatal("failed to create gzip reader:", err)
  }
  defer gzReader.Close()

  fmt.Println("gzip stream opened")
...

NewReader() приймає аргумент з типом io.Reader interface, а тому ми можемо до нього передати objResp.Body.

Але сам по собі gzip.NewReader() дані нікуди не повертає – він тільки відкриє буфер, в який буде писати розархівовані строки.

Тому далі додаємо bufio.

Читання gzip output з bufio

Додаємо bufio, і з NewScanner() читаємо дані від gzip.NewReader() до буферу, з якого далі зі Scan() та Text() формуємо строки:

...
  // create scanner to read decompressed log lines
  scanner := bufio.NewScanner(gzReader)
  
  // increase buffer size if ALB logs have long lines
  // default scanner buffer = 64 KB
  buf := make([]byte, 0, 1024*1024) // 1 MB
  scanner.Buffer(buf, 1024*1024)
  
  // iterate over every line in the decompressed file
  for scanner.Scan() {
    line := scanner.Text()
    fmt.Println("log line:", line)
  }
  if err := scanner.Err(); err != nil {
    log.Fatal("scanner error:", err)
  }
...

Перевіряємо:

$ go run main.go  | head
S3 object stream opened: ops-1-33-devops-ingress-ops-alb-loki-logs AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1005Z_52.***.***.213_2nbanv4o.log.gz

log line: h2 2025-11-25T10:00:02.271561Z app/k8s-ops133externalalb-***/336cddd33c043f33 52.***.***.183:60978 10.0.47.34:8080 0.001 0.005 0.001 200 200 38 5492 "GET https://lightdash.example.co:443/ HTTP/2.0" "Blackbox Exporter/0.27.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492***148:targetgroup/k8s-proddata-lightdas-fa7b1ce474/c400fe91849c401a "Root=1-69257e22-4b16fe8d62d2f8955edc6da8" "lightdash.example.co" "arn:aws:acm:us-east-1:492***148:certificate/77230c5f-d0c2-4e58-b579-8b8422686986" 15 2025-11-25T10:00:02.264000Z "forward" "-" "-" "10.0.47.34:8080" "200" "-" "-" TID_48df8c3791b69144b4ae0f6084e015d6 "-" "-" "-"
...

Розбиття main() на функції

У нас вже доволі великий main(), і основні операції зробили, все працює.

Давайте наведемо трохи красоти.

Весь код зараз:

package main

import (
  "bufio"
  "compress/gzip"
  "context"
  "encoding/json"
  "fmt"
  "log"
  "os"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/config"
  "github.com/aws/aws-sdk-go-v2/service/s3"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
)

func main() {
  // TODO: add exit handler
  ctx := context.Background()

  // Load the Shared AWS Configuration (~/.aws/config)
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // create SQS client using the shared AWS config
  sqsClient := sqs.NewFromConfig(cfg)

  // read queue name from environment
  queueName := os.Getenv("ALB_LOGS_QUEUE")
  if queueName == "" {
    log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
  }

  // request queue URL by name
  getURLResp, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
    QueueName: aws.String(queueName),
  })
  if err != nil {
    log.Fatal(err)
  }

  queueURL := getURLResp.QueueUrl
  //fmt.Println("Queue URL:", *queueURL)

  // receive a single message from SQS
  msgResp, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
    QueueUrl:            queueURL, // URL який ми отримали раніше
    MaxNumberOfMessages: 1,        // receive only one message
    WaitTimeSeconds:     10,       // enable long polling (recommended)
  })
  if err != nil {
    log.Fatal(err)
  }

  if len(msgResp.Messages) == 0 {
    fmt.Println("no messages received")
    return
  }
  // print the received message body
  //fmt.Println(aws.ToString(msgResp.Messages[0].Body))

  // define a struct to unmarshal S3 event message
  type S3Event struct {
    Records []struct {
      S3 struct {
        Bucket struct {
          Name string `json:"name"`
        } `json:"bucket"`
        Object struct {
          Key string `json:"key"`
        } `json:"object"`
      } `json:"s3"`
    } `json:"Records"`
  }

  // take SQS message body
  msgBody := aws.ToString(msgResp.Messages[0].Body)

  // decode JSON into the struct
  var event S3Event
  if err := json.Unmarshal([]byte(msgBody), &event); err != nil {
    log.Fatal("failed to parse S3 event:", err)
  }

  // extract bucket and key
  // we always have 1 message, so use [0]
  bucket := event.Records[0].S3.Bucket.Name
  key := event.Records[0].S3.Object.Key

  //fmt.Println("bucket:", bucket)
  //fmt.Println("key:", key)

  // create S3 client using the shared AWS config
  s3Client := s3.NewFromConfig(cfg)

  // fetch the S3 object and return its streaming body
  objResp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
    Bucket: aws.String(bucket),
    Key:    aws.String(key),
  })
  if err != nil {
    log.Fatal("failed to download object:", err)
  }
  defer objResp.Body.Close()

  fmt.Println("S3 object stream opened:", bucket, key)

  // create gzip reader from S3 object stream
  gzReader, err := gzip.NewReader(objResp.Body)
  if err != nil {
    log.Fatal("failed to create gzip reader:", err)
  }
  defer gzReader.Close()
  //fmt.Println("gzip stream opened")

  // create scanner to read decompressed log lines
  scanner := bufio.NewScanner(gzReader)

  // increase buffer size if ALB logs have long lines
  // default scanner buffer = 64 KB
  buf := make([]byte, 0, 1024*1024) // 1 MB
  scanner.Buffer(buf, 1024*1024)

  // iterate over every line in the decompressed file
  for scanner.Scan() {
    line := scanner.Text()
    fmt.Println("log line:", line)
  }
  if err := scanner.Err(); err != nil {
    log.Fatal("scanner error:", err)
  }
}

Як можемо організувати процес?

В main() виконуємо всякі ініціалізації, а потім в циклі будемо опитувати SQS:

  • створюємо context
  • створюємо AWS config
  • створюємо клієнти sqsClient та s3Client
  • зчитуємо змінні середовища (ALB_LOGS_QUEUE, потім додамо ще)
  • і потім в циклі:
    • викликаємо функцію receiveFromSQS() – перевіряємо, чи з’явились нові меседжи
    • викликаємо функцію getS3Object() – якщо меседжи є, то йдемо до S3 і читаємо звідти новий архів
    • викликаємо функцію processLogFile() – зчитуємо строки з кожного отриманого логу

Які функції для цього знадобляться?

  • функція ReceiveFromSQS()
    • сюди будемо передавати context, SQS client, SQS queue URL, і записувати в структуру S3Event ім’я бакету та key – ім’я файлу
    • для подальшого видалення меседжів після успішної обробки – потрібно буде повертати receiptHandle
  • функція GetS3Object()
    • отримує context, AWS Config, bucket, key
    • виконує GetObject() і повертає GetObjectOutput
  • функція GzipReader()
    • читає дані від GetS3Object(), розпаковує, повертає строки
  • функція ScanLines()
    • отримує дані від GzipReader(), зчитує з Text(), і поки що просто виводить на консоль

Package collector

Для зручності і аби все було структуровано – розділимо все по окремим файлам:

collector/
    sqs.go
    s3.go
    gzip.go
    scan.go
main.go

Файл collector/sqs.go

Коли тестив, то зустрів таку помилку:

$ go run main.go | head 
panic: runtime error: index out of range [0] with length 0

goroutine 1 [running]:
alb-logs-collector/collector.ReceiveFromSQS({0xa83530, 0xdda060}, 0xc00021d888, {0xc000160050, 0x4d})
        /home/setevoy/Projects/Go/alb-logs-collector/collector/sqs.go:52 +0x25d
main.main()
        /home/setevoy/Projects/Go/alb-logs-collector/main.go:46 +0x305
exit status 2

Виникає через, то AWS пише тестові повідомлення до черги:

{"Service":"Amazon S3","Event":"s3:TestEvent","Time":"2025-11-24T11:10:31.573Z","Bucket":"ops-1-33-devops-ingress-ops-alb-loki-logs", ...}

Тому до ReceiveFromSQS() додамо перевірку.

Функція ReceiveFromSQS()

Описуємо отримання повідомлень:

package collector

import (
  "context"
  "encoding/json"
  "fmt"
  "strings"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
)

// S3Event for SQS message
type S3Event struct {
  Records []struct {
    S3 struct {
      Bucket struct {
        Name string `json:"name"`
      } `json:"bucket"`
      Object struct {
        Key string `json:"key"`
      } `json:"object"`
    } `json:"s3"`
  } `json:"Records"`
}

// ReceiveFromSQS reads one message and returns bucket, key, receiptHandle, queueURL
func ReceiveFromSQS(ctx context.Context, client *sqs.Client, queueName string) (bucket, key, receiptHandle, queueURL string, err error) {

  // get real queue URL
  getURL, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
    QueueName: aws.String(queueName),
  })
  if err != nil {
    return "", "", "", "", err
  }

  queueURL = *getURL.QueueUrl

  // receive one message
  resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
    QueueUrl:            aws.String(queueURL),
    MaxNumberOfMessages: 1,
    WaitTimeSeconds:     10,
  })
  if err != nil {
    return "", "", "", "", err
  }

  if len(resp.Messages) == 0 {
    return "", "", "", "", fmt.Errorf("no messages")
  }

  // receive one message
  msg := resp.Messages[0]
  // create ReceiptHandle to return to the delete function
  receiptHandle = *msg.ReceiptHandle

  raw := aws.ToString(msg.Body)
  fmt.Println("SQS RAW:", raw)

  // skip AWS test event
  if strings.Contains(raw, `"Event":"s3:TestEvent"`) {
    fmt.Println("Skipping AWS S3 test event")
    return "", "", "", queueURL, fmt.Errorf("test event skipped")
  }

  // parse S3 event message
  var event S3Event
  if err := json.Unmarshal([]byte(raw), &event); err != nil {
    return "", "", "", "", err
  }

  bucket = event.Records[0].S3.Bucket.Name
  key = event.Records[0].S3.Object.Key

  return bucket, key, receiptHandle, queueURL, nil
}

Функція DeleteFromSQS()

І вже додаємо видалення, будемо викликати після успішної обробки логів:

// DeleteFromSQS deletes message after successful processing
func DeleteFromSQS(ctx context.Context, client *sqs.Client, queueURL, receiptHandle string) error {
  _, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
    QueueUrl:      aws.String(queueURL),
    ReceiptHandle: aws.String(receiptHandle),
  })
  return err
}

Файл collector/s3.go та функція GetS3Object()

package collector

import (
  "context"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/service/s3"
)

// GetS3Object returns stream reader of S3 file
func GetS3Object(ctx context.Context, client *s3.Client, bucket, key string) (*s3.GetObjectOutput, error) {
  return client.GetObject(ctx, &s3.GetObjectInput{
    Bucket: aws.String(bucket),
    Key:    aws.String(key),
  })
}

Файл collector/gzip.go та функція GzipReader()

Виносимо теж окремо, аби весь код був більш логічним.

Крім того, потім, можливо, потрібно буде додати якісь перевірки:

package collector

import (
  "compress/gzip"
  "io"
)

// GzipReader wraps S3 body
func GzipReader(r io.Reader) (*gzip.Reader, error) {
  return gzip.NewReader(r)
}

Файл collector/scan.go та функція ScanLines()

package collector

import (
  "bufio"
  "fmt"
  "io"
)

// ScanLines reads log lines from reader
func ScanLines(r io.Reader) error {
  scanner := bufio.NewScanner(r)

  // increase max line size
  buf := make([]byte, 0, 1024*1024)
  scanner.Buffer(buf, 1024*1024)

  // iterate over every line in the decompressed file
  for scanner.Scan() {
    line := scanner.Text()
    fmt.Println("log line:", line)
  }

  return scanner.Err()
}

Зміни в main.go

Додаємо імпорт нашого пакету collector, і викликаємо нові функції.

Тепер весь файл main.go виглядає так:

package main

import (
  "alb-logs-collector-poc/collector"
  "context"
  "fmt"
  "log"
  "os"

  "github.com/aws/aws-sdk-go-v2/config"
  "github.com/aws/aws-sdk-go-v2/service/s3"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
)

func main() {
  // TODO: add exit handler
  ctx := context.Background()

  // Load the Shared AWS Configuration (~/.aws/config)
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // create SQS client using the shared AWS config
  sqsClient := sqs.NewFromConfig(cfg)
  // create S3 client using the shared AWS config
  s3Client := s3.NewFromConfig(cfg)

  // read queue name from environment
  queueName := os.Getenv("ALB_LOGS_QUEUE")
  if queueName == "" {
    log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
  }

  // 1. get message
  //bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queue)
  bucket, key, _, _, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
  if err != nil {
    if err.Error() == "no messages" {
      fmt.Println("NO MESSAGES")
      return
    }
    fmt.Println("ERROR receiving message:", err)
    return
  }

  fmt.Println("BUCKET:", bucket)
  fmt.Println("KEY:", key)

  // 2. get S3 object
  s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
  if err != nil {
    fmt.Println("S3 error:", err)
    return
  }
  defer s3Obj.Body.Close()

  fmt.Println("S3 object stream opened:", bucket, key)

  // 3. gzip reader
  gzReader, err := collector.GzipReader(s3Obj.Body)
  if err != nil {
    fmt.Println("gzip error:", err)
    return
  }
  defer gzReader.Close()

  // 4. scan all lines in log file
  err = collector.ScanLines(gzReader)
  if err != nil {
    log.Fatal("scanner error:", err)
  }
}

Перевіряємо:

$ go run main.go
SQS RAW: {"Records":[{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"us-east-1","eventTime":"2025-11-25T11:15:01.559Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:***.***:elblogdelivery-session"},"requestParameters":{"sourceIPAddress":"2600:***.***:c0ef"},"responseElements":{"x-amz-request-id":"Y6523DEXMF2DS6FG","x-amz-id-2":"tWgdxqzRbKrjUGDMLLkrOtKQW6S6aWT31VHomgaNm0UAIlzeshheXEGgZN3yRH4pMlWdzfBLqBlZuh3BO0QghDkTVU2WllFDKmh22/B1Cqk="},"s3":{"s3SchemaVersion":"1.0","configurationId":"objectCreatedEventSqs","bucket":{"name":"ops-1-33-devops-ingress-ops-alb-loki-logs","ownerIdentity":{"principalId":"***.***"},"arn":"arn:aws:s3:::ops-1-33-devops-ingress-ops-alb-loki-logs"},"object":{"key":"AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34***.***.15_44qtqgut.log.gz","size":17823,"eTag":"36d1243edd2a7a98546e7af645c36068","sequencer":"0069258FB5806A2506"}}}]}
BUCKET: ops-1-33-devops-ingress-ops-alb-loki-logs
KEY: AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34.225.155.15_44qtqgut.log.gz
S3 object stream opened: ops-1-33-devops-ingress-ops-alb-loki-logs AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34.***.***.15_44qtqgut.log.gz
log line: h2 2025-11-25T11:10:06.230595Z app/k8s-ops133externalalb-***/336cddd33c043f33 62.***.***.83:32026 10.0.44.225:8000 0.000 0.034 0.000 200 200 887 1165 "GET https://staging.api.challenge.example.co:443/admin/users/list?limit=1&user=test_thread_1_ci_ios_ui_participant2%40test.example.co HTTP/2.0" "Challenge App UI Tests-Runner/1.0 (com.challengeapp.uitests.Challenge-App-UI-Tests.xctrunner; build:1; iOS 18.5.0) Alamofire/5.9.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492***148:targetgroup/k8s-stagingb-backenda-410aa6288b/0d639f3b859bc8fb "Root=1-69258e8e-04c4384c05934a37738043b0" "staging.api.challenge.example.co" "arn:aws:acm:us-east-1:492***148:certificate/c3b6ec41-50a0-488e-93bb-b03967405f8c" 24 2025-11-25T11:10:06.195000Z "forward" "-" "-" "10.0.44.225:8000" "200" "-" "-" TID_d1e2c09f4fe6cf46b7c573f42b97889d "-" "-" "-"
...

Запис логів до VictoriaLogs

VictoriaLogs підтримує різні формати запису, ми будемо робити через JSON stream API.

Спершу спробуємо руками.

Відкриваємо порт до VictoriaLogs Kubernetes Service:

$ kk -n ops-monitoring-ns port-forward svc/atlas-victoriametrics-victoria-logs-single-server 9428

І робимо запит з curl:

$ echo '{ "log": { "level": "info", "message": "TEST" }, "date": "0", "stream": "alb" }
' | curl -X POST -H 'Content-Type: application/stream+json' --data-binary @- \
 'http://localhost:9428/insert/jsonline?_stream_fields=stream&_time_field=date&_msg_field=log.message'

Перевіряємо:

Створення файлу collector/victoria.go

Створюємо мінімальний файл для тесту.

В ньому:

  • JSONLogRecord struct: тут будемо формувати JSON для передачі на VictoriaLogs HTTP API
  • заповнюємо його даними, в поле Timestamp можна передавати “Unix timestamp in seconds, milliseconds, microseconds or nanoseconds” – ми робимо в time.UnixMilli()
  • і задаємо значення для полів stream та message
  • формуємо HTTP request
  • відправляємо стандартним http.Client.Do()
package collector

import (
  "bytes"
  "encoding/json"
  "fmt"
  "net/http"
  "time"
)

// JSONLogRecord is the minimal structure for VictoriaLogs JSONLine API
type JSONLogRecord struct {
  Timestamp int64  `json:"date"`
  Stream    string `json:"stream"`
  Message   string `json:"message"`
}

// SendTestRecord sends simple test log record to VictoriaLogs
func SendTestRecord(url string) error {
  rec := JSONLogRecord{
    Timestamp: time.Now().UTC().UnixMilli(),
    Stream:    "test",
    Message:   "TEST VICTORIA",
  }

  body, err := json.Marshal(rec)
  if err != nil {
    return fmt.Errorf("marshal error: %w", err)
  }

  req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
  if err != nil {
    return fmt.Errorf("request error: %w", err)
  }

  req.Header.Set("Content-Type", "application/stream+json")

  resp, err := http.DefaultClient.Do(req)
  if err != nil {
    return fmt.Errorf("http error: %w", err)
  }
  defer resp.Body.Close()

  if resp.StatusCode != 200 {
    return fmt.Errorf("victoria returned %d", resp.StatusCode)
  }

  fmt.Println("TEST RECORD SENT TO VICTORIA")
  return nil
}

До main.go додаємо отримання VictoriaLogs ендпоінта зі змінних оточення і з ним будуємо повний URL, в якому вказуємо які поля треба вважати як _msg, де буде _time, в якому форматі час, і за яким полем створювати Log stream:

...
  // "http://localhost:9428/insert/jsonline"
  vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
  if vmLogsEp == "" {
    log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
  }

  // VictoriaLogs endpoint
  vmLogsURL := vmLogsEp +
    "?_msg_field=message" +
    "&_time_field=date" +
    "&_time_format=unix_ms" +
    "&_stream_fields=stream"

  // 5. send test record to VictoriaLogs
  err = collector.SendVlTestRecord(vmLogsURL)
  if err != nil {
    fmt.Println("ERROR sending test record:", err)
    return
  }
...

Задаємо змінну:

$ export VICTORIA_LOGS_URL="http://localhost:9428/insert/jsonline"

Запсукаємо:

$ go run main.go 
...
TEST RECORD SENT TO VICTORIA

Перервіряємо:

ОК, працює.

Створення Log parser

Тепер оновлюємо наш код – додаємо нові функції:

  • collector/scan.go буде читати дані від gzip і писати в канал у вигляді готових strings
  • collector/parser.go – формує SimpleLog з полями Timestamp і Message

Редагуємо scan.go – замість створення буферу створюємо channel, і scanner.Text() тепер буде писати в канал замість буферу:

package collector

import (
  "bufio"
  "io"
)

// ScanLines reads lines from an io.Reader and sends them into a channel
// caller must range over the returned channel
func ScanLines(r io.Reader) <-chan string {
  ch := make(chan string)

  go func() {
    defer close(ch)

    scanner := bufio.NewScanner(r)

    // - bufio.Scanner reads decompressed bytes from GzipReader()
    // - it splits input by '\n' and returns each line as a complete string
    // - each line is sent into the channel for further processing
    for scanner.Scan() {
      ch <- scanner.Text()
    }
  }()

  return ch
}

Створюємо parser.go зі структурою SimpleLog.

В структурі будемо тримати час, отриманий із запису в log record, а в Message будемо заносити весь текст:

package collector

import (
  "fmt"
  "strings"
  "time"
)

// SimpleLog contains parsed timestamp and original line
type SimpleLog struct {
  Timestamp time.Time
  Message   string
}

// ParseRawLine parses ALB log timestamp from the 2nd field.
// Everything else we keep raw.
func ParseRawLine(line string) (*SimpleLog, error) {
  fields := strings.Fields(line)
  if len(fields) < 2 {
    return nil, fmt.Errorf("invalid ALB log line")
  }

  fmt.Println("--- PARSER SEND ---")
  fmt.Println(line)
  fmt.Println("--- PARSER DEBUG ---")

  ts, err := time.Parse(time.RFC3339Nano, fields[1])
  if err != nil {
    return nil, fmt.Errorf("timestamp parse error: %w", err)
  }

  return &SimpleLog{
    Timestamp: ts.UTC(),
    Message:   line,
  }, nil
}

Оновлюємо victoria.go – тепер не вона буде заповнювати дані у JSONLogRecord, а буде приймати їх аргументом:

package collector

import (
  "bytes"
  "encoding/json"
  "fmt"
  "net/http"
  "time"
)

// JSONLogRecord is the minimal structure for VictoriaLogs JSONLine API
type JSONLogRecord struct {
  Timestamp int64  `json:"date"`
  Stream    string `json:"stream"`
  Message   string `json:"message"`
}

// SendToVictoria sends one JSON LINE into VictoriaLogs JSON Stream API.
func SendToVictoria(url string, rec *JSONLogRecord) error {
  body, err := json.Marshal(rec)
  if err != nil {
    return err
  }

  fmt.Println("--- VMLOGS SEND ---")
  fmt.Println(string(body))
  fmt.Println("--- VMLOGS DEBUG ---")

  req, err := http.NewRequest("POST", url, bytes.NewReader(body))
  if err != nil {
    return err
  }

  req.Header.Set("Content-Type", "application/stream+json")

  resp, err := http.DefaultClient.Do(req)
  if err != nil {
    return err
  }
  defer resp.Body.Close()

  if resp.StatusCode >= 300 {
    return fmt.Errorf("victoria response status: %s", resp.Status)
  }

  return nil
}

В main.go() робимо цикл і вже додаємо видалення повідомлень з SQS викликом DeleteFromSQS(), яку створювали раніше:

...
  // 4. Pipeline: read lines → parse → send
  for line := range collector.ScanLines(gzReader) {

    rec, err := collector.ParseRawLine(line)
    if err != nil {
      continue
    }

    out := &collector.JSONLogRecord{
      Timestamp: rec.Timestamp.UnixMilli(),
      Stream:    "alb",
      Message:   rec.Message,
    }

    if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
      fmt.Println("send error:", err)
    }
  }

  // 5. Delete SQS message
  if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
    fmt.Println("delete error:", err)
  }
}

Поки collector.ScanLines() повертає дані – передаємо їх до ParseRawLine(), який заповнює SimpleLog з Timestamp і Message.

Потім заповнюємо JSONLogRecord і передаємо до VictoriaLogs.

Зараз можна очистити SQS-чергу: поки я писав код, там назбирались старі повідомлення, і експортер почав тягнути старі логи. Я довго шукав, чому у VictoriaLogs час не збігається з очікуваним, але проблема виявилась банальною – я дивився дані за останні 15 хвилин, а імпортувались ранкові записи.

Але тоді доведеться почекати до 5 хвилин, поки з’явиться новий меседж.

Перевіряємо:

$ go run main.go 
...
--- PARSER SEND ---
https 2025-11-25T12:36:50.859909Z app/k8s-ops133externalalb-***/336cddd33c043f33 3.***.***.78:3883 10.0.46.229:8000 0.000 0.010 0.000 200 200 2023 574 "POST https://api.challenge.example.co:443/auth/auth0-webhooks/post-login HTTP/1.1" "axios/1.6.5" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492***148:targetgroup/k8s-prodback-backenda-47ba3e0f35/9ec763ecd48352da "Root=1-6925a2e2-6e4507dd3a23de072c2f6ae9" "api.challenge.example.co" "arn:aws:acm:us-east-1:492***148:certificate/beeb3714-511e-414b-b1f3-5440746bb5ea" 12 2025-11-25T12:36:50.837000Z "forward" "-" "-" "10.0.46.229:8000" "200" "-" "-" TID_5ed365cd6c6f57409a2566d1dcaf049c "-" "-" "-"
--- PARSER DEBUG ---
--- VMLOGS SEND ---
{"date":1764074210859,"stream":"alb","message":"https 2025-11-25T12:36:50.859909Z app/k8s-ops133externalalb-***/336cddd33c043f33 3.***.***.78:3883 10.0.46.229:8000 0.000 0.010 0.000 200 200 2023 574 \"POST https://api.challenge.example.co:443/auth/auth0-webhooks/post-login HTTP/1.1\" \"axios/1.6.5\" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492***148:targetgroup/k8s-prodback-backenda-47ba3e0f35/9ec763ecd48352da \"Root=1-6925a2e2-6e4507dd3a23de072c2f6ae9\" \"api.challenge.example.co\" \"arn:aws:acm:us-east-1:492***148:certificate/beeb3714-511e-414b-b1f3-5440746bb5ea\" 12 2025-11-25T12:36:50.837000Z \"forward\" \"-\" \"-\" \"10.0.46.229:8000\" \"200\" \"-\" \"-\" TID_5ed365cd6c6f57409a2566d1dcaf049c \"-\" \"-\" \"-\""}
--- VMLOGS DEBUG ---
...

І у VictoriaLogs:

Все тут.

Що нам залишилось:

  • формувати поля
  • додати gocron

Запуск циклу з gocron

Додаємо запуск кожну хвилину з gocron.

Тепер весь main.go виглядає так:

func main() {
  // TODO: add exit handler
  ctx := context.Background()

  // Load the Shared AWS Configuration (~/.aws/config)
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // create SQS client using the shared AWS config
  sqsClient := sqs.NewFromConfig(cfg)
  // create S3 client using the shared AWS config
  s3Client := s3.NewFromConfig(cfg)

  // read queue name from environment
  queueName := os.Getenv("ALB_LOGS_QUEUE")
  if queueName == "" {
    log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
  }

  // scheduler
  s := gocron.NewScheduler(time.UTC)

  // job: check SQS every minute
  s.Every(1).Minute().Do(func() {
    fmt.Println("CHECKING SQS...")

    // 1. get message
    bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
    //bucket, key, _, _, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
    if err != nil {
      if err.Error() == "no messages" {
        fmt.Println("NO MESSAGES")
        return
      }
      fmt.Println("ERROR receiving message:", err)
      return
    }

    fmt.Println("BUCKET:", bucket)
    fmt.Println("KEY:", key)

    // 2. get S3 object
    s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
    if err != nil {
      fmt.Println("S3 error:", err)
      return
    }
    defer s3Obj.Body.Close()

    fmt.Println("S3 object stream opened:", bucket, key)

    // 3. gzip reader
    gzReader, err := collector.GzipReader(s3Obj.Body)
    if err != nil {
      fmt.Println("gzip error:", err)
      return
    }
    defer gzReader.Close()

    // "http://localhost:9428/insert/jsonline"
    vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
    if vmLogsEp == "" {
      log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
    }

    // VictoriaLogs endpoint
    vmLogsURL := vmLogsEp +
      "?_msg_field=message" +
      "&_time_field=date" +
      "&_time_format=unix_ms" +
      "&_stream_fields=stream"

    // 4. Pipeline: read lines → parse → send
    for line := range collector.ScanLines(gzReader) {

      rec, err := collector.ParseRawLine(line)
      if err != nil {
        continue
      }

      out := &collector.JSONLogRecord{
        Timestamp: rec.Timestamp.UnixMilli(),
        Stream:    "alb",
        Message:   rec.Message,
      }

      if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
        fmt.Println("send error:", err)
      }
    }

    // 5. delete message from SQS
    if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
      fmt.Println("FAILED TO DELETE SQS MESSAGE:", err)
    } else {
      fmt.Println("SQS MESSAGE DELETED")
    }
  })

  // start async
  s.StartBlocking()
}

Додавання fields до VictoriaLogs message 

Документація по полям AWS ALB – Access log entries.

Що зараз можемо додати до логів у VictoriaLogs – client_ip з client:port, target_ip (Kubernetes Pod) із target:port, elb_status_code – код відповіді ALB, аби потім простіше робити алерти, і target_status_code – аналогічно.

Оновлюємо структуру SimpleLog – додаємо нові поля:

type SimpleLog struct {
    Timestamp    time.Time
    Message      string
    ClientIP     string
    TargetIP     string
    ELBStatus    int
    TargetStatus int
}

До ParseRawLine() додаємо запис в ці поля, використовуючи array index зі строки fields:

...
    elbStatus, _ := strconv.Atoi(fields[8])
    targetStatus, _ := strconv.Atoi(fields[9])

    return &SimpleLog{
        Timestamp:    ts.UTC(),
        Message:      line,
        ClientIP:     fields[3],
        TargetIP:     fields[4],
        ELBStatus:    elbStatus,
        TargetStatus: targetStatus,
    }, nil
...

Додаємо ці поля до JSONLogRecord:

type JSONLogRecord struct {
    Timestamp    int64  `json:"date"`
    Message      string `json:"message"`
    Stream       string `json:"stream"`
    ClientIP     string `json:"client_ip"`
    TargetIP     string `json:"target_ip"`
    ELBStatus    int    `json:"elb_status"`
    TargetStatus int    `json:"target_status"`
}

І додаємо їх до main.go:

...
      out := &collector.JSONLogRecord{
        Timestamp:    rec.Timestamp.UnixMilli(),
        Message:      rec.Message,
        Stream:       "alb",
        ClientIP:     rec.ClientIP,
        TargetIP:     rec.TargetIP,
        ELBStatus:    rec.ELBStatus,
        TargetStatus: rec.TargetStatus,
      }
...

Тепер у VictoriaLogs маємо нові fields:

Порти в client_ip та target_ip тут явно зайві, можемо їх вирізати під час парсингу.

До parser.go додаємо функцію зі strings.SplitN():

// cut port from "ip:port"
func stripPort(s string) string {
  parts := strings.SplitN(s, ":", 2)
  return parts[0]
}

І використовуємо її при заповненні SimpleLog:

...
  return &SimpleLog{
    Timestamp:    ts.UTC(),
    Message:      line,
    ClientIP:     stripPort(fields[3]),
    TargetIP:     stripPort(fields[4]),
    ELBStatus:    elbStatus,
    TargetStatus: targetStatus,
  }, nil
...

Тепер маємо поля без зайвих портів:

The final result

Весь код разом.

Коменти попросив написати AI, щоб детально по всім функціям було пояснення.

Файл main.go

package main

import (
  "alb-logs-collector/collector"
  "context"
  "fmt"
  "log"
  "os"
  "time"

  "github.com/aws/aws-sdk-go-v2/config"
  "github.com/aws/aws-sdk-go-v2/service/s3"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
  "github.com/go-co-op/gocron"
)

func main() {
  // create a base context for all AWS operations
  // this context is passed into SQS/S3 functions
  ctx := context.Background()

  // load shared AWS config from ~/.aws/config and ~/.aws/credentials
  // this provides region, credentials, retry settings, etc.
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // create SQS client using the shared AWS config
  // used later for reading and deleting queue messages
  sqsClient := sqs.NewFromConfig(cfg)

  // create S3 client for reading S3 objects
  s3Client := s3.NewFromConfig(cfg)

  // read queue name from environment
  // this avoids hardcoding queue names in code
  queueName := os.Getenv("ALB_LOGS_QUEUE")
  if queueName == "" {
    log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
  }

  // create the scheduler that runs periodic jobs
  // gocron automatically creates goroutines for scheduled tasks
  s := gocron.NewScheduler(time.UTC)

  // schedule: run the function every minute
  s.Every(1).Minute().Do(func() {
    fmt.Println("CHECKING SQS...")

    // 1. read one message from SQS
    // ReceiveFromSQS is implemented in collector/sqs.go
    bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
    if err != nil {
      // "no messages" is not an error — just no new logs
      if err.Error() == "no messages" {
        fmt.Println("NO MESSAGES")
        return
      }
      fmt.Println("ERROR receiving message:", err)
      return
    }

    fmt.Println("BUCKET:", bucket)
    fmt.Println("KEY:", key)

    // 2. download the S3 object stream
    // GetS3Object located in collector/s3.go
    s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
    if err != nil {
      fmt.Println("S3 error:", err)
      return
    }
    defer s3Obj.Body.Close()

    fmt.Println("S3 object stream opened:", bucket, key)

    // 3. wrap the S3 stream into gzip reader
    // GzipReader implemented in collector/gzip.go
    gzReader, err := collector.GzipReader(s3Obj.Body)
    if err != nil {
      fmt.Println("gzip error:", err)
      return
    }
    defer gzReader.Close()

    // read VictoriaLogs endpoint
    vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
    if vmLogsEp == "" {
      log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
    }

    // final URL with parameters for jsonline ingestion
    vmLogsURL := vmLogsEp +
      "?_msg_field=message" + // which JSON field contains the log message
      "&_time_field=date" + // which JSON field contains timestamp
      "&_time_format=unix_ms" + // tell VictoriaLogs that the timestamp is unix milliseconds
      "&_stream_fields=stream" // which field defines the stream name

    // 4. process the log file line-by-line
    // ScanLines implemented in collector/scan.go
    // It returns a channel of RAW log lines (already ungzipped)
    for line := range collector.ScanLines(gzReader) {

      // parse timestamp + extract fields
      // ParseRawLine implemented in collector/parser.go
      rec, err := collector.ParseRawLine(line)
      if err != nil {
        continue // skip invalid ALB lines
      }

      // prepare the JSON record for VictoriaLogs
      // JSONLogRecord defined in collector/victoria.go
      out := &collector.JSONLogRecord{
        Timestamp:    rec.Timestamp.UnixMilli(), // ALB timestamp in unix ms
        Message:      rec.Message,               // full raw ALB log line
        Stream:       "alb",                     // log stream name
        ClientIP:     rec.ClientIP,              // extracted from ALB log
        TargetIP:     rec.TargetIP,              // extracted from ALB log
        ELBStatus:    rec.ELBStatus,             // HTTP status from ALB
        TargetStatus: rec.TargetStatus,          // backend response status
      }

      // send it to VictoriaLogs
      // SendToVictoria implemented in collector/victoria.go
      if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
        fmt.Println("send error:", err)
      }
    }

    // 5. delete message from SQS after successful processing
    // DeleteFromSQS implemented in collector/sqs.go
    if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
      fmt.Println("FAILED TO DELETE SQS MESSAGE:", err)
    } else {
      fmt.Println("SQS MESSAGE DELETED")
    }
  })

  // start the scheduler and block main goroutine forever
  s.StartBlocking()
}

Файл sqs.go

package collector

import (
  "context"
  "encoding/json"
  "fmt"
  "strings"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
)

// S3Event describes the JSON format sent by S3 to SQS.
// It matches the structure of the S3 event notification:
// "Records" → "s3" → "bucket.name" and "object.key".
// This struct is used in ReceiveFromSQS() to extract bucket/key.
type S3Event struct {
  Records []struct {
    S3 struct {
      Bucket struct {
        Name string `json:"name"` // S3 bucketName
      } `json:"bucket"`
      Object struct {
        Key string `json:"key"` // S3 object key (log filename)
      } `json:"object"`
    } `json:"s3"`
  } `json:"Records"`
}

// ReceiveFromSQS reads a single SQS message.
// It returns:
// - bucket: the S3 bucket from event
// - key: the object key (filename in S3)
// - receiptHandle: required later to delete the message
// - queueURL: actual AWS queue URL (needed for deletion)
// - err: error, or "no messages" if queue is empty
//
// This function is called from main.go inside the scheduler loop.
// After processing the file from S3, the caller must call DeleteFromSQS().
func ReceiveFromSQS(ctx context.Context, client *sqs.Client, queueName string) (bucket, key, receiptHandle, queueURL string, err error) {

  // resolve real SQS queue URL from queue name
  // SQS APIs always operate on queueURL, not the name
  getURL, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
    QueueName: aws.String(queueName),
  })
  if err != nil {
    return "", "", "", "", err
  }

  queueURL = *getURL.QueueUrl

  // receive exactly one message
  // WaitTimeSeconds enables long polling for up to 10s
  resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
    QueueUrl:            aws.String(queueURL),
    MaxNumberOfMessages: 1,
    WaitTimeSeconds:     10,
  })
  if err != nil {
    return "", "", "", "", err
  }

  // queue empty → no new logs
  if len(resp.Messages) == 0 {
    return "", "", "", "", fmt.Errorf("no messages")
  }

  // take the first (and only) message
  msg := resp.Messages[0]

  // ReceiptHandle is mandatory for deletion
  receiptHandle = *msg.ReceiptHandle

  // raw JSON body received from S3 → SQS notification
  raw := aws.ToString(msg.Body)
  fmt.Println("SQS RAW:", raw)

  // filter out AWS TestEvent generated when enabling notifications
  // Test events do NOT contain real logs and must be ignored
  if strings.Contains(raw, `"Event":"s3:TestEvent"`) {
    fmt.Println("Skipping AWS S3 test event")
    return "", "", "", queueURL, fmt.Errorf("test event skipped")
  }

  // unmarshal JSON into S3Event struct
  var event S3Event
  if err := json.Unmarshal([]byte(raw), &event); err != nil {
    return "", "", "", "", err
  }

  // extract bucket and key from the event
  bucket = event.Records[0].S3.Bucket.Name
  key = event.Records[0].S3.Object.Key

  return bucket, key, receiptHandle, queueURL, nil
}

// DeleteFromSQS permanently removes the processed message.
// Must be called only after successful S3 → gzip → parsing → VictoriaLogs ingestion.
// If not deleted, SQS will retry delivery (visibility timeout expires).
// Implemented in collector/sqs.go, called from main.go.
func DeleteFromSQS(ctx context.Context, client *sqs.Client, queueURL, receiptHandle string) error {
  _, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
    QueueUrl:      aws.String(queueURL),
    ReceiptHandle: aws.String(receiptHandle),
  })
  return err
}

Файл s3.go

package collector

import (
  "context"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/service/s3"
)

// GetS3Object retrieves an S3 object and returns a streaming reader.
//
// This function does NOT download the file to disk.
// It returns a network stream (`GetObjectOutput.Body`), which allows the caller
// to read the object lazily, byte-by-byte, directly from S3.
//
// In our pipeline, the call chain looks like:
//
//	main.go
//	  → ReceiveFromSQS() to get bucket/key      (collector/sqs.go)
//	  → GetS3Object() to open S3 object stream  (this file)
//	  → GzipReader() to decompress gzip data    (collector/gzip.go)
//	  → ScanLines() to iterate log lines        (collector/scan.go)
//
// Notes:
// - The returned object must be closed by the caller: `defer obj.Body.Close()`
// - S3 GetObject supports range requests, but here we read the whole file.
// - The body is read sequentially; this is efficient for log ingestion patterns.
func GetS3Object(ctx context.Context, client *s3.Client, bucket, key string) (*s3.GetObjectOutput, error) {
  return client.GetObject(ctx, &s3.GetObjectInput{
    Bucket: aws.String(bucket), // name of S3 bucket with ALB logs
    Key:    aws.String(key),    // key (path/filename) of the gzip log file
  })
}

Файл gzip.go

package collector

import (
  "compress/gzip"
  "io"
)

// GzipReader wraps an existing io.Reader with a gzip decompressor.
//
// This function takes the S3 object body (returned by GetS3Object in
// collector/s3.go) and turns it into a gzip.Reader capable of producing
// the decompressed content of the ALB log file.
//
// Typical pipeline:
//
//	S3Object.Body (io.ReadCloser)
//	    ↓ passed into
//	GzipReader() → *gzip.Reader (still a stream)
//	    ↓ passed into
//	ScanLines() → yields plain-text log lines
//
// Notes:
// - gzip.NewReader expects the input stream to be in .gz format.
// - Caller must close the returned reader: `defer gz.Close()`.
// - No buffering or scanning is done here; this function only wraps the stream.
func GzipReader(r io.Reader) (*gzip.Reader, error) {
  return gzip.NewReader(r)
}

Файл scan.go

package collector

import (
  "bufio"
  "io"
)

// ScanLines reads an arbitrary io.Reader line-by-line and returns a channel
// that produces each extracted line as a string.
//
// This function is used in main.go to iterate through the contents of an S3
// Gzip file. The call chain is:
//
//	main.go → GetS3Object() (collector/s3.go)
//	        → GzipReader() (collector/gzip.go)
//	        → ScanLines()  (this file)
//
// Because ScanLines() uses a goroutine, the caller can process lines
// asynchronously using: `for line := range ScanLines(r) { ... }`.
//
// Notes:
//   - The returned channel is *unbuffered*, so each send blocks until the caller
//     receives the value. This naturally rate-controls the goroutine.
//   - bufio.Scanner splits input by '\n', automatically handling different line
//     lengths (up to scanner’s max buffer).
//   - When the reader is fully consumed, the goroutine closes the channel.
func ScanLines(r io.Reader) <-chan string {
  ch := make(chan string)

  // launch a goroutine that streams lines out of the reader
  go func() {
    // ensure channel is closed when scanning finishes
    defer close(ch)

    // bufio.Scanner provides efficient, line-oriented reading of text streams
    scanner := bufio.NewScanner(r)

    // loop until EOF or error. Each iteration reads the next line
    for scanner.Scan() {
      // push extracted line into the channel
      // (blocks until caller receives it)
      ch <- scanner.Text()
    }

    // scanner.Err() is intentionally ignored here, because error handling
    // is performed by the caller when needed, and the channel-based design
    // treats EOF as natural termination.
  }()

  // return read-only channel of strings
  return ch
}

Файл parser.go

package collector

import (
  "fmt"
  "strconv"
  "strings"
  "time"
)

// SimpleLog represents a minimal, lightweight structure containing:
//
// - Timestamp: parsed ALB timestamp in UTC
// - Message:   the full raw log line (used as message in VictoriaLogs)
// - ClientIP:  extracted client IP (port removed)
// - TargetIP:  extracted backend target IP (port removed)
// - ELBStatus: HTTP status returned by ALB to the client
// - TargetStatus: HTTP status returned by the backend to ALB
//
// This struct is designed for the simplified ingestion phase,
// before implementing full ALB field parsing.
type SimpleLog struct {
  Timestamp    time.Time
  Message      string
  ClientIP     string
  TargetIP     string
  ELBStatus    int
  TargetStatus int
}

// stripPort removes the ":port" suffix from IP strings like "1.2.3.4:5678".
// This keeps VictoriaLogs cardinality low, avoiding creation of thousands
// of separate series due to ephemeral client ports.
func stripPort(s string) string {
  parts := strings.SplitN(s, ":", 2)
  return parts[0]
}

// ParseRawLine parses the essential ALB log fields:
//
// - Timestamp from the 2nd field
// - Client IP (without port)
// - Target IP (without port)
// - ALB HTTP status
// - Target HTTP status
//
// Everything else is kept untouched in the Message field.
//
// ALB logs are space-delimited, except for quoted sections
// (like the request line and user agent). At this simplified stage,
// we do *not* parse quoted fields — we only extract the mandatory parts.
func ParseRawLine(line string) (*SimpleLog, error) {
  fields := strings.Fields(line)

  // we expect at least: protocol, timestamp, elb, client, target, ... status codes
  // ALB format is consistent — <10 fields means corrupted input
  if len(fields) < 10 {
    return nil, fmt.Errorf("invalid ALB log line")
  }

  fmt.Println("--- PARSER SEND ---")
  fmt.Println(line)
  fmt.Println("--- PARSER DEBUG ---")

  // parse timestamp in RFC3339Nano format
  // ALB always emits timestamps in UTC
  ts, err := time.Parse(time.RFC3339Nano, fields[1])
  if err != nil {
    return nil, fmt.Errorf("timestamp parse error: %w", err)
  }

  // convert status codes from string → int
  elbStatus, _ := strconv.Atoi(fields[8])
  targetStatus, _ := strconv.Atoi(fields[9])

  return &SimpleLog{
    Timestamp:    ts.UTC(), // ensure strict UTC normalization
    Message:      line,     // pass full raw line to VictoriaLogs
    ClientIP:     stripPort(fields[3]),
    TargetIP:     stripPort(fields[4]),
    ELBStatus:    elbStatus,
    TargetStatus: targetStatus,
  }, nil
}

Файл victoria.go

package collector

import (
  "bytes"
  "encoding/json"
  "fmt"
  "net/http"
)

// JSONLogRecord represents a single log entry formatted for
// VictoriaLogs JSONLine ingestion API.
//
// Field mapping:
//
//   - Timestamp → "date"
//     UNIX milliseconds timestamp of the log event.
//     This must match ?_time_field=date&_time_format=unix_ms in ingestion URL.
//
//   - Message → "message"
//     The complete raw ALB log line. Used as the primary log message.
//
//   - Stream → "stream"
//     Logical log stream identifier. Used in ingestion via ?_stream_fields=stream.
//
//   - ClientIP / TargetIP / ELBStatus / TargetStatus
//     Additional parsed metadata fields. These become searchable log fields.
//
// The structure intentionally avoids nested JSON — VictoriaLogs processes
// flat objects more efficiently and without ambiguity in field extraction.
type JSONLogRecord struct {
  Timestamp    int64  `json:"date"`
  Message      string `json:"message"`
  Stream       string `json:"stream"`
  ClientIP     string `json:"client_ip"`
  TargetIP     string `json:"target_ip"`
  ELBStatus    int    `json:"elb_status"`
  TargetStatus int    `json:"target_status"`
}

// SendToVictoria sends exactly ONE JSON record to the VictoriaLogs JSONLine API.
//
// This function is called from main.go inside the ingestion loop.
// It performs the following steps:
//
//  1. Marshal the JSONLogRecord into a compact JSON object
//  2. Build an HTTP POST request with Content-Type: application/stream+json
//  3. Send the request to VictoriaLogs
//  4. Check for non-2xx HTTP status codes
//
// Notes:
//   - VictoriaLogs expects a "streaming JSON" format, where each POST body
//     contains a single JSON object (or multiple lines if needed).
//   - We send logs one-by-one for simplicity, but batching can be added later.
//   - The caller controls the ingestion endpoint URL, including query params:
//     ?_msg_field=message&_time_field=date&_time_format=unix_ms&_stream_fields=stream
//
// Errors returned from this function are logged in main.go, and do not
// interrupt the ingestion pipeline.
func SendToVictoria(url string, rec *JSONLogRecord) error {
  // serialize record into JSON line
  body, err := json.Marshal(rec)
  if err != nil {
    return err
  }

  // debug output for transparency
  fmt.Println("--- VMLOGS SEND ---")
  fmt.Println(string(body))
  fmt.Println("--- VMLOGS DEBUG ---")

  // create POST request with streaming JSON payload
  req, err := http.NewRequest("POST", url, bytes.NewReader(body))
  if err != nil {
    return err
  }

  // JSONLine ingestion requires this content type
  req.Header.Set("Content-Type", "application/stream+json")

  // send HTTP request using default HTTP client
  resp, err := http.DefaultClient.Do(req)
  if err != nil {
    return err
  }
  defer resp.Body.Close()

  // check server-side errors
  if resp.StatusCode >= 300 {
    return fmt.Errorf("victoria response status: %s", resp.Status)
  }

  return nil
}

Вже запустив в Kubernetes, все працює.

Єдиний момент, що SQS та S3 операції не падають при access denied, треба додати перевірку помилок.