Наступна задача, яку хочеться вирішити з Go – це написати власний logs collector для збору логів AWS Load Balancer з AWS S3 і запису їх до VictoriaLogs.
Це, звісно, можна було б вирішити просто з Vector.dev, як це робив для AWS VPC Flow Logs, див. Vector.dev: знайомство, логи з AWS S3 та інтеграція з VictoriaLogs, але є можливість трохи попрактикуватись в Go, тому будемо робити власний колектор.
Тож основна ідея зараз така:
- Load Balancer пише логи до S3
- в S3 створюємо нотифікацію до AWS SQS
- наш колектор опитує SQS, отримує інформацію про нові об’єкти в S3
- робить запит до S3, отримує gz-архів
- розпаковує, парсить дані, і відправляє до VictoriaLogs
Поїхали.
Зміст
Налаштування S3 та SQS notifications
Коли продумував ідею, то головне питання було – як знати, які об’єкти в S3 ми вже обробили, а які ні?
Мати якусь базу, в яку писати інформацію про вже оброблені об’єкти – перший варіант. Але з часом записів про такі обєкти буде все більше, плюс не дуже хочеться тягнути якийсь stateful-сервіс в Kubernetes, де потім буде запускатись наш колектор.
Тому робимо простіше і надійніше, так само як це зроблено з Vector.dev та VPC Flow Logs – створимо SQS чергу, в яку будуть приходити повідомлення про нові S3 objects.
Читаємо повідомлення з SQS, отримуємо з них інформацію про нові файли, оброблюємо файли, видаляємо меседж із SQS queue.
Див. документацію AWS – Walkthrough: Configuring a bucket for notifications (SNS topic or SQS queue).
Створення SQS queue
Створюємо нову queue, поки руками, потім зробимо нормально, з Terraform.
Тип queue лишаємо Standart, бо:
Amazon Simple Queue Service FIFO (First-In-First-Out) queues aren’t supported as an Amazon S3 event notification destination
Див. Amazon S3 Event Notifications.
Крім того, нам порядок нам не важливий, бо в логах є власний timestamp, який ми і будемо парсити.
Переходимо до SQS, створюємо нову чергу:
Дал, в Access policy описуємо політику.
Писати в чергу у нас буде S3, а читати – Kubernetes Pod із ServiceAccount.
ServiceAccount і IAM Role для нього будемо робити вже потім, потім просто даємо право читати всім з нашого AWS Account.
А для S3 додаємо дозвіл на SQS:SendMessage:
{
"Version": "2012-10-17",
"Id": "loggerID",
"Statement": [
{
"Sid": "sendFromS3LogsAllow",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": [
"SQS:SendMessage"
],
"Resource": "arn:aws:sqs:us-east-1:492***148:testing-alb-logs-s3-notifier",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:ops-1-33-devops-ingress-ops-alb-loki-logs"
}
}
},
{
"Sid": "receiveFromQueueAllowSameAccount",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::492***148:root"
},
"Action": [
"SQS:ReceiveMessage",
"SQS:DeleteMessage",
"SQS:GetQueueAttributes",
"SQS:GetQueueUrl"
],
"Resource": "arn:aws:sqs:us-east-1:492***148:testing-alb-logs-s3-notifier"
}
]
}
Решту залишаємо дефолтним.
Налаштування S3 Event notifications
Переходимо до S3 з логами, Properties > Event notifications:
Створюємо новий Event notification – відправляти повідомлення про всі операції s3:ObjectCreated:
В Destination вибираємо SQS і нашу чергу:
Чекаємо кілька хвилин, перевіряємо Monitoring в SQS:
Окей – є повідомлення, тепер їх треба зібрати і прочитати.
Переходимо до Go.
AWS SDK for Go
Для роботи з AWS нам буде потрібен AWS SDK for Go, з яким можемо виконати всі потрібні операції.
Для отримання даних доступу використовуємо aws-sdk-go-v2/config і функцію LoadDefaultConfig(), яка виконує стандартний пошук credentials – в змінних оточення, файлах ~/.aws/credentials та ~/.aws/config, або використовує EC2 IAM Roles.
До LoadDefaultConfig() першим аргументом потрібно передати context, писав про нього у Golang: створення OpenAI Exporter для VictoriaMetrics, поки запускаємо з context.Background(), потім напишемо обробку і завершення роботи:
package main
import (
"context"
"log"
"github.com/aws/aws-sdk-go-v2/config"
)
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
}
Go AWS SDK SQS
Для роботи з SQS в AWS SDK є окремий пакет sqs.
SQS client
Додаємо його імпорт, додаємо створення клієнту з NewFromConfig(), якому передаємо AWS config, який створили вище.
Для роботи з SQS queue треба мати URL – отримуємо його з GetQueueUrl().
Додаємо отримання QueueName зі змінних оточення, бо потім в Kubernetes будемо передавати із Helm chart values:
package main
import (
"context"
"fmt"
"log"
"os"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
sqsClient := sqs.NewFromConfig(cfg)
// read queue name from environment
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// request queue URL by name
getURLResp, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
log.Fatal(err)
}
queueURL := getURLResp.QueueUrl
fmt.Println("Queue URL:", *queueURL)
}
Виконуємо go mod init:
$ go mod init alb-logs-collector-poc $ go mod tidy
Задаємо змінну з іменем черги:
$ export ALB_LOGS_QUEUE="testing-alb-logs-s3-notifier"
Запускаємо наш код:
$ go run main.go Queue URL: https://sqs.us-east-1.amazonaws.com/492***148/testing-alb-logs-s3-notifier
ОК, тепер можемо отримати повідомлення.
SQS ReceiveMessage()
Читаємо нові меседжи в queue з ReceiveMessage(), куди передаємо конфіг ReceiveMessageInput:
...
// receive a single message from SQS
msgResp, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: queueURL, // URL returned by the GetQueueUrl()
MaxNumberOfMessages: 1, // receive only one message
WaitTimeSeconds: 10, // enable long polling
})
if err != nil {
log.Fatal(err)
}
if len(msgResp.Messages) == 0 {
fmt.Println("no messages received")
return
}
// print the received message body
fmt.Println("Received message:", aws.ToString(msgResp.Messages[0].Body))
...
Перевіряємо:
$ go run main.go | jq
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
...
"eventName": "ObjectCreated:Put",
...
"s3": {
...
"bucket": {
"name": "ops-1-33-devops-ingress-ops-alb-loki-logs",
...
"object": {
"key": "AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T0935Z_34.***.***.15_25hsfvwt.log.gz",
...
}
}
}
]
}
Нас тут цікавлять два поля – bucket.name та object.key.
Створюємо struct і з json.Unmarshal() заносимо в неї дані, парсинг JSON розбирав в тому ж пості про OpenAI Exporter в частині Створення Go struct для JSON Unmarshall:
...
// define a struct to unmarshal S3 event message
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// take SQS message body
msgBody := aws.ToString(msgResp.Messages[0].Body)
// decode JSON into the struct
var event S3Event
if err := json.Unmarshal([]byte(msgBody), &event); err != nil {
log.Fatal("failed to parse S3 event:", err)
}
// extract bucket and key
// we always have 1 message, so use [0]
bucket := event.Records[0].S3.Bucket.Name
key := event.Records[0].S3.Object.Key
fmt.Println("bucket:", bucket)
fmt.Println("key:", key)
...
Перевіряємо ще раз:
$ go run main.go bucket: ops-1-33-devops-ingress-ops-alb-loki-logs key: AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T0935Z_52.***.***.213_60mjhvf6.log.gz
Можемо переходити до S3.
Go AWS SDK S3
Аналогічно до SQS – використовуємо пакет s3.
S3 client
Створюємо клієнт з NewFromConfig():
... // create S3 client using the shared AWS config s3Client := s3.NewFromConfig(cfg) ...
S3 GetObject()
Додаємо читання файлу з GetObject().
Теж аналогічно з SQS – передаємо GetObjectInput:
...
// fetch the S3 object and return its streaming body
objResp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
log.Fatal("failed to download object:", err)
}
defer objResp.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
...
GetObject() повертає *GetObjectOutput, в якому є поле Body з типом io.ReadCloser, а io.ReadCloser – це інтерфейс, який визначає два методи – Reader та Closer.
Читання файлу з gzip
Логи в S3 зберігаються в gz, тому додаємо пакет gzip, і з NewReader() читаємо дані:
...
// create gzip reader from S3 object stream
gzReader, err := gzip.NewReader(objResp.Body)
if err != nil {
log.Fatal("failed to create gzip reader:", err)
}
defer gzReader.Close()
fmt.Println("gzip stream opened")
...
NewReader() приймає аргумент з типом io.Reader interface, а тому ми можемо до нього передати objResp.Body.
Але сам по собі gzip.NewReader() дані нікуди не повертає – він тільки відкриє буфер, в який буде писати розархівовані строки.
Тому далі додаємо bufio.
Читання gzip output з bufio
Додаємо bufio, і з NewScanner() читаємо дані від gzip.NewReader() до буферу, з якого далі зі Scan() та Text() формуємо строки:
...
// create scanner to read decompressed log lines
scanner := bufio.NewScanner(gzReader)
// increase buffer size if ALB logs have long lines
// default scanner buffer = 64 KB
buf := make([]byte, 0, 1024*1024) // 1 MB
scanner.Buffer(buf, 1024*1024)
// iterate over every line in the decompressed file
for scanner.Scan() {
line := scanner.Text()
fmt.Println("log line:", line)
}
if err := scanner.Err(); err != nil {
log.Fatal("scanner error:", err)
}
...
Перевіряємо:
$ go run main.go | head S3 object stream opened: ops-1-33-devops-ingress-ops-alb-loki-logs AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1005Z_52.***.***.213_2nbanv4o.log.gz log line: h2 2025-11-25T10:00:02.271561Z app/k8s-ops133externalalb-***/336cddd33c043f33 52.***.***.183:60978 10.0.47.34:8080 0.001 0.005 0.001 200 200 38 5492 "GET https://lightdash.example.co:443/ HTTP/2.0" "Blackbox Exporter/0.27.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492***148:targetgroup/k8s-proddata-lightdas-fa7b1ce474/c400fe91849c401a "Root=1-69257e22-4b16fe8d62d2f8955edc6da8" "lightdash.example.co" "arn:aws:acm:us-east-1:492***148:certificate/77230c5f-d0c2-4e58-b579-8b8422686986" 15 2025-11-25T10:00:02.264000Z "forward" "-" "-" "10.0.47.34:8080" "200" "-" "-" TID_48df8c3791b69144b4ae0f6084e015d6 "-" "-" "-" ...
Розбиття main() на функції
У нас вже доволі великий main(), і основні операції зробили, все працює.
Давайте наведемо трохи красоти.
Весь код зараз:
package main
import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"log"
"os"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
sqsClient := sqs.NewFromConfig(cfg)
// read queue name from environment
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// request queue URL by name
getURLResp, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
log.Fatal(err)
}
queueURL := getURLResp.QueueUrl
//fmt.Println("Queue URL:", *queueURL)
// receive a single message from SQS
msgResp, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: queueURL, // URL який ми отримали раніше
MaxNumberOfMessages: 1, // receive only one message
WaitTimeSeconds: 10, // enable long polling (recommended)
})
if err != nil {
log.Fatal(err)
}
if len(msgResp.Messages) == 0 {
fmt.Println("no messages received")
return
}
// print the received message body
//fmt.Println(aws.ToString(msgResp.Messages[0].Body))
// define a struct to unmarshal S3 event message
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// take SQS message body
msgBody := aws.ToString(msgResp.Messages[0].Body)
// decode JSON into the struct
var event S3Event
if err := json.Unmarshal([]byte(msgBody), &event); err != nil {
log.Fatal("failed to parse S3 event:", err)
}
// extract bucket and key
// we always have 1 message, so use [0]
bucket := event.Records[0].S3.Bucket.Name
key := event.Records[0].S3.Object.Key
//fmt.Println("bucket:", bucket)
//fmt.Println("key:", key)
// create S3 client using the shared AWS config
s3Client := s3.NewFromConfig(cfg)
// fetch the S3 object and return its streaming body
objResp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
log.Fatal("failed to download object:", err)
}
defer objResp.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
// create gzip reader from S3 object stream
gzReader, err := gzip.NewReader(objResp.Body)
if err != nil {
log.Fatal("failed to create gzip reader:", err)
}
defer gzReader.Close()
//fmt.Println("gzip stream opened")
// create scanner to read decompressed log lines
scanner := bufio.NewScanner(gzReader)
// increase buffer size if ALB logs have long lines
// default scanner buffer = 64 KB
buf := make([]byte, 0, 1024*1024) // 1 MB
scanner.Buffer(buf, 1024*1024)
// iterate over every line in the decompressed file
for scanner.Scan() {
line := scanner.Text()
fmt.Println("log line:", line)
}
if err := scanner.Err(); err != nil {
log.Fatal("scanner error:", err)
}
}
Як можемо організувати процес?
В main() виконуємо всякі ініціалізації, а потім в циклі будемо опитувати SQS:
- створюємо
context - створюємо AWS config
- створюємо клієнти
sqsClientтаs3Client - зчитуємо змінні середовища (
ALB_LOGS_QUEUE, потім додамо ще) - і потім в циклі:
- викликаємо функцію
receiveFromSQS()– перевіряємо, чи з’явились нові меседжи - викликаємо функцію
getS3Object()– якщо меседжи є, то йдемо до S3 і читаємо звідти новий архів - викликаємо функцію
processLogFile()– зчитуємо строки з кожного отриманого логу
- викликаємо функцію
Які функції для цього знадобляться?
- функція
ReceiveFromSQS()- сюди будемо передавати
context, SQS client, SQS queue URL, і записувати в структуруS3Eventім’я бакету та key – ім’я файлу - для подальшого видалення меседжів після успішної обробки – потрібно буде повертати
receiptHandle
- сюди будемо передавати
- функція
GetS3Object()- отримує
context, AWS Config,bucket,key - виконує
GetObject()і повертаєGetObjectOutput
- отримує
- функція
GzipReader()- читає дані від
GetS3Object(), розпаковує, повертає строки
- читає дані від
- функція
ScanLines()- отримує дані від
GzipReader(), зчитує зText(), і поки що просто виводить на консоль
- отримує дані від
Package collector
Для зручності і аби все було структуровано – розділимо все по окремим файлам:
collector/
sqs.go
s3.go
gzip.go
scan.go
main.go
Файл collector/sqs.go
Коли тестив, то зустрів таку помилку:
$ go run main.go | head
panic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
alb-logs-collector/collector.ReceiveFromSQS({0xa83530, 0xdda060}, 0xc00021d888, {0xc000160050, 0x4d})
/home/setevoy/Projects/Go/alb-logs-collector/collector/sqs.go:52 +0x25d
main.main()
/home/setevoy/Projects/Go/alb-logs-collector/main.go:46 +0x305
exit status 2
Виникає через, то AWS пише тестові повідомлення до черги:
{"Service":"Amazon S3","Event":"s3:TestEvent","Time":"2025-11-24T11:10:31.573Z","Bucket":"ops-1-33-devops-ingress-ops-alb-loki-logs", ...}
Тому до ReceiveFromSQS() додамо перевірку.
Функція ReceiveFromSQS()
Описуємо отримання повідомлень:
package collector
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
// S3Event for SQS message
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// ReceiveFromSQS reads one message and returns bucket, key, receiptHandle, queueURL
func ReceiveFromSQS(ctx context.Context, client *sqs.Client, queueName string) (bucket, key, receiptHandle, queueURL string, err error) {
// get real queue URL
getURL, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
return "", "", "", "", err
}
queueURL = *getURL.QueueUrl
// receive one message
resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 1,
WaitTimeSeconds: 10,
})
if err != nil {
return "", "", "", "", err
}
if len(resp.Messages) == 0 {
return "", "", "", "", fmt.Errorf("no messages")
}
// receive one message
msg := resp.Messages[0]
// create ReceiptHandle to return to the delete function
receiptHandle = *msg.ReceiptHandle
raw := aws.ToString(msg.Body)
fmt.Println("SQS RAW:", raw)
// skip AWS test event
if strings.Contains(raw, `"Event":"s3:TestEvent"`) {
fmt.Println("Skipping AWS S3 test event")
return "", "", "", queueURL, fmt.Errorf("test event skipped")
}
// parse S3 event message
var event S3Event
if err := json.Unmarshal([]byte(raw), &event); err != nil {
return "", "", "", "", err
}
bucket = event.Records[0].S3.Bucket.Name
key = event.Records[0].S3.Object.Key
return bucket, key, receiptHandle, queueURL, nil
}
Функція DeleteFromSQS()
І вже додаємо видалення, будемо викликати після успішної обробки логів:
// DeleteFromSQS deletes message after successful processing
func DeleteFromSQS(ctx context.Context, client *sqs.Client, queueURL, receiptHandle string) error {
_, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(receiptHandle),
})
return err
}
Файл collector/s3.go та функція GetS3Object()
package collector
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// GetS3Object returns stream reader of S3 file
func GetS3Object(ctx context.Context, client *s3.Client, bucket, key string) (*s3.GetObjectOutput, error) {
return client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
}
Файл collector/gzip.go та функція GzipReader()
Виносимо теж окремо, аби весь код був більш логічним.
Крім того, потім, можливо, потрібно буде додати якісь перевірки:
package collector
import (
"compress/gzip"
"io"
)
// GzipReader wraps S3 body
func GzipReader(r io.Reader) (*gzip.Reader, error) {
return gzip.NewReader(r)
}
Файл collector/scan.go та функція ScanLines()
package collector
import (
"bufio"
"fmt"
"io"
)
// ScanLines reads log lines from reader
func ScanLines(r io.Reader) error {
scanner := bufio.NewScanner(r)
// increase max line size
buf := make([]byte, 0, 1024*1024)
scanner.Buffer(buf, 1024*1024)
// iterate over every line in the decompressed file
for scanner.Scan() {
line := scanner.Text()
fmt.Println("log line:", line)
}
return scanner.Err()
}
Зміни в main.go
Додаємо імпорт нашого пакету collector, і викликаємо нові функції.
Тепер весь файл main.go виглядає так:
package main
import (
"alb-logs-collector-poc/collector"
"context"
"fmt"
"log"
"os"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
sqsClient := sqs.NewFromConfig(cfg)
// create S3 client using the shared AWS config
s3Client := s3.NewFromConfig(cfg)
// read queue name from environment
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// 1. get message
//bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queue)
bucket, key, _, _, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
if err != nil {
if err.Error() == "no messages" {
fmt.Println("NO MESSAGES")
return
}
fmt.Println("ERROR receiving message:", err)
return
}
fmt.Println("BUCKET:", bucket)
fmt.Println("KEY:", key)
// 2. get S3 object
s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
if err != nil {
fmt.Println("S3 error:", err)
return
}
defer s3Obj.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
// 3. gzip reader
gzReader, err := collector.GzipReader(s3Obj.Body)
if err != nil {
fmt.Println("gzip error:", err)
return
}
defer gzReader.Close()
// 4. scan all lines in log file
err = collector.ScanLines(gzReader)
if err != nil {
log.Fatal("scanner error:", err)
}
}
Перевіряємо:
$ go run main.go
SQS RAW: {"Records":[{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"us-east-1","eventTime":"2025-11-25T11:15:01.559Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:***.***:elblogdelivery-session"},"requestParameters":{"sourceIPAddress":"2600:***.***:c0ef"},"responseElements":{"x-amz-request-id":"Y6523DEXMF2DS6FG","x-amz-id-2":"tWgdxqzRbKrjUGDMLLkrOtKQW6S6aWT31VHomgaNm0UAIlzeshheXEGgZN3yRH4pMlWdzfBLqBlZuh3BO0QghDkTVU2WllFDKmh22/B1Cqk="},"s3":{"s3SchemaVersion":"1.0","configurationId":"objectCreatedEventSqs","bucket":{"name":"ops-1-33-devops-ingress-ops-alb-loki-logs","ownerIdentity":{"principalId":"***.***"},"arn":"arn:aws:s3:::ops-1-33-devops-ingress-ops-alb-loki-logs"},"object":{"key":"AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34***.***.15_44qtqgut.log.gz","size":17823,"eTag":"36d1243edd2a7a98546e7af645c36068","sequencer":"0069258FB5806A2506"}}}]}
BUCKET: ops-1-33-devops-ingress-ops-alb-loki-logs
KEY: AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34.225.155.15_44qtqgut.log.gz
S3 object stream opened: ops-1-33-devops-ingress-ops-alb-loki-logs AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34.***.***.15_44qtqgut.log.gz
log line: h2 2025-11-25T11:10:06.230595Z app/k8s-ops133externalalb-***/336cddd33c043f33 62.***.***.83:32026 10.0.44.225:8000 0.000 0.034 0.000 200 200 887 1165 "GET https://staging.api.challenge.example.co:443/admin/users/list?limit=1&user=test_thread_1_ci_ios_ui_participant2%40test.example.co HTTP/2.0" "Challenge App UI Tests-Runner/1.0 (com.challengeapp.uitests.Challenge-App-UI-Tests.xctrunner; build:1; iOS 18.5.0) Alamofire/5.9.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492***148:targetgroup/k8s-stagingb-backenda-410aa6288b/0d639f3b859bc8fb "Root=1-69258e8e-04c4384c05934a37738043b0" "staging.api.challenge.example.co" "arn:aws:acm:us-east-1:492***148:certificate/c3b6ec41-50a0-488e-93bb-b03967405f8c" 24 2025-11-25T11:10:06.195000Z "forward" "-" "-" "10.0.44.225:8000" "200" "-" "-" TID_d1e2c09f4fe6cf46b7c573f42b97889d "-" "-" "-"
...
Запис логів до VictoriaLogs
VictoriaLogs підтримує різні формати запису, ми будемо робити через JSON stream API.
Спершу спробуємо руками.
Відкриваємо порт до VictoriaLogs Kubernetes Service:
$ kk -n ops-monitoring-ns port-forward svc/atlas-victoriametrics-victoria-logs-single-server 9428
І робимо запит з curl:
$ echo '{ "log": { "level": "info", "message": "TEST" }, "date": "0", "stream": "alb" }
' | curl -X POST -H 'Content-Type: application/stream+json' --data-binary @- \
'http://localhost:9428/insert/jsonline?_stream_fields=stream&_time_field=date&_msg_field=log.message'
Перевіряємо:
Створення файлу collector/victoria.go
Створюємо мінімальний файл для тесту.
В ньому:
JSONLogRecordstruct: тут будемо формувати JSON для передачі на VictoriaLogs HTTP API- заповнюємо його даними, в поле
Timestampможна передавати “Unix timestamp in seconds, milliseconds, microseconds or nanoseconds” – ми робимо вtime.UnixMilli() - і задаємо значення для полів
streamтаmessage - формуємо HTTP request
- відправляємо стандартним
http.Client.Do()
package collector
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
)
// JSONLogRecord is the minimal structure for VictoriaLogs JSONLine API
type JSONLogRecord struct {
Timestamp int64 `json:"date"`
Stream string `json:"stream"`
Message string `json:"message"`
}
// SendTestRecord sends simple test log record to VictoriaLogs
func SendTestRecord(url string) error {
rec := JSONLogRecord{
Timestamp: time.Now().UTC().UnixMilli(),
Stream: "test",
Message: "TEST VICTORIA",
}
body, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("marshal error: %w", err)
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("request error: %w", err)
}
req.Header.Set("Content-Type", "application/stream+json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("http error: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("victoria returned %d", resp.StatusCode)
}
fmt.Println("TEST RECORD SENT TO VICTORIA")
return nil
}
До main.go додаємо отримання VictoriaLogs ендпоінта зі змінних оточення і з ним будуємо повний URL, в якому вказуємо які поля треба вважати як _msg, де буде _time, в якому форматі час, і за яким полем створювати Log stream:
...
// "http://localhost:9428/insert/jsonline"
vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
if vmLogsEp == "" {
log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
}
// VictoriaLogs endpoint
vmLogsURL := vmLogsEp +
"?_msg_field=message" +
"&_time_field=date" +
"&_time_format=unix_ms" +
"&_stream_fields=stream"
// 5. send test record to VictoriaLogs
err = collector.SendVlTestRecord(vmLogsURL)
if err != nil {
fmt.Println("ERROR sending test record:", err)
return
}
...
Задаємо змінну:
$ export VICTORIA_LOGS_URL="http://localhost:9428/insert/jsonline"
Запсукаємо:
$ go run main.go ... TEST RECORD SENT TO VICTORIA
Перервіряємо:
ОК, працює.
Створення Log parser
Тепер оновлюємо наш код – додаємо нові функції:
collector/scan.goбуде читати дані відgzipі писати в канал у вигляді готових stringscollector/parser.go– формуєSimpleLogз полямиTimestampіMessage
Редагуємо scan.go – замість створення буферу створюємо channel, і scanner.Text() тепер буде писати в канал замість буферу:
package collector
import (
"bufio"
"io"
)
// ScanLines reads lines from an io.Reader and sends them into a channel
// caller must range over the returned channel
func ScanLines(r io.Reader) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
scanner := bufio.NewScanner(r)
// - bufio.Scanner reads decompressed bytes from GzipReader()
// - it splits input by '\n' and returns each line as a complete string
// - each line is sent into the channel for further processing
for scanner.Scan() {
ch <- scanner.Text()
}
}()
return ch
}
Створюємо parser.go зі структурою SimpleLog.
В структурі будемо тримати час, отриманий із запису в log record, а в Message будемо заносити весь текст:
package collector
import (
"fmt"
"strings"
"time"
)
// SimpleLog contains parsed timestamp and original line
type SimpleLog struct {
Timestamp time.Time
Message string
}
// ParseRawLine parses ALB log timestamp from the 2nd field.
// Everything else we keep raw.
func ParseRawLine(line string) (*SimpleLog, error) {
fields := strings.Fields(line)
if len(fields) < 2 {
return nil, fmt.Errorf("invalid ALB log line")
}
fmt.Println("--- PARSER SEND ---")
fmt.Println(line)
fmt.Println("--- PARSER DEBUG ---")
ts, err := time.Parse(time.RFC3339Nano, fields[1])
if err != nil {
return nil, fmt.Errorf("timestamp parse error: %w", err)
}
return &SimpleLog{
Timestamp: ts.UTC(),
Message: line,
}, nil
}
Оновлюємо victoria.go – тепер не вона буде заповнювати дані у JSONLogRecord, а буде приймати їх аргументом:
package collector
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
)
// JSONLogRecord is the minimal structure for VictoriaLogs JSONLine API
type JSONLogRecord struct {
Timestamp int64 `json:"date"`
Stream string `json:"stream"`
Message string `json:"message"`
}
// SendToVictoria sends one JSON LINE into VictoriaLogs JSON Stream API.
func SendToVictoria(url string, rec *JSONLogRecord) error {
body, err := json.Marshal(rec)
if err != nil {
return err
}
fmt.Println("--- VMLOGS SEND ---")
fmt.Println(string(body))
fmt.Println("--- VMLOGS DEBUG ---")
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/stream+json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("victoria response status: %s", resp.Status)
}
return nil
}
В main.go() робимо цикл і вже додаємо видалення повідомлень з SQS викликом DeleteFromSQS(), яку створювали раніше:
...
// 4. Pipeline: read lines → parse → send
for line := range collector.ScanLines(gzReader) {
rec, err := collector.ParseRawLine(line)
if err != nil {
continue
}
out := &collector.JSONLogRecord{
Timestamp: rec.Timestamp.UnixMilli(),
Stream: "alb",
Message: rec.Message,
}
if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
fmt.Println("send error:", err)
}
}
// 5. Delete SQS message
if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
fmt.Println("delete error:", err)
}
}
Поки collector.ScanLines() повертає дані – передаємо їх до ParseRawLine(), який заповнює SimpleLog з Timestamp і Message.
Потім заповнюємо JSONLogRecord і передаємо до VictoriaLogs.
Зараз можна очистити SQS-чергу: поки я писав код, там назбирались старі повідомлення, і експортер почав тягнути старі логи. Я довго шукав, чому у VictoriaLogs час не збігається з очікуваним, але проблема виявилась банальною – я дивився дані за останні 15 хвилин, а імпортувались ранкові записи.
Але тоді доведеться почекати до 5 хвилин, поки з’явиться новий меседж.
Перевіряємо:
$ go run main.go
...
--- PARSER SEND ---
https 2025-11-25T12:36:50.859909Z app/k8s-ops133externalalb-***/336cddd33c043f33 3.***.***.78:3883 10.0.46.229:8000 0.000 0.010 0.000 200 200 2023 574 "POST https://api.challenge.example.co:443/auth/auth0-webhooks/post-login HTTP/1.1" "axios/1.6.5" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492***148:targetgroup/k8s-prodback-backenda-47ba3e0f35/9ec763ecd48352da "Root=1-6925a2e2-6e4507dd3a23de072c2f6ae9" "api.challenge.example.co" "arn:aws:acm:us-east-1:492***148:certificate/beeb3714-511e-414b-b1f3-5440746bb5ea" 12 2025-11-25T12:36:50.837000Z "forward" "-" "-" "10.0.46.229:8000" "200" "-" "-" TID_5ed365cd6c6f57409a2566d1dcaf049c "-" "-" "-"
--- PARSER DEBUG ---
--- VMLOGS SEND ---
{"date":1764074210859,"stream":"alb","message":"https 2025-11-25T12:36:50.859909Z app/k8s-ops133externalalb-***/336cddd33c043f33 3.***.***.78:3883 10.0.46.229:8000 0.000 0.010 0.000 200 200 2023 574 \"POST https://api.challenge.example.co:443/auth/auth0-webhooks/post-login HTTP/1.1\" \"axios/1.6.5\" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492***148:targetgroup/k8s-prodback-backenda-47ba3e0f35/9ec763ecd48352da \"Root=1-6925a2e2-6e4507dd3a23de072c2f6ae9\" \"api.challenge.example.co\" \"arn:aws:acm:us-east-1:492***148:certificate/beeb3714-511e-414b-b1f3-5440746bb5ea\" 12 2025-11-25T12:36:50.837000Z \"forward\" \"-\" \"-\" \"10.0.46.229:8000\" \"200\" \"-\" \"-\" TID_5ed365cd6c6f57409a2566d1dcaf049c \"-\" \"-\" \"-\""}
--- VMLOGS DEBUG ---
...
І у VictoriaLogs:
Все тут.
Що нам залишилось:
- формувати поля
- додати
gocron
Запуск циклу з gocron
Додаємо запуск кожну хвилину з gocron.
Тепер весь main.go виглядає так:
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
sqsClient := sqs.NewFromConfig(cfg)
// create S3 client using the shared AWS config
s3Client := s3.NewFromConfig(cfg)
// read queue name from environment
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// scheduler
s := gocron.NewScheduler(time.UTC)
// job: check SQS every minute
s.Every(1).Minute().Do(func() {
fmt.Println("CHECKING SQS...")
// 1. get message
bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
//bucket, key, _, _, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
if err != nil {
if err.Error() == "no messages" {
fmt.Println("NO MESSAGES")
return
}
fmt.Println("ERROR receiving message:", err)
return
}
fmt.Println("BUCKET:", bucket)
fmt.Println("KEY:", key)
// 2. get S3 object
s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
if err != nil {
fmt.Println("S3 error:", err)
return
}
defer s3Obj.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
// 3. gzip reader
gzReader, err := collector.GzipReader(s3Obj.Body)
if err != nil {
fmt.Println("gzip error:", err)
return
}
defer gzReader.Close()
// "http://localhost:9428/insert/jsonline"
vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
if vmLogsEp == "" {
log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
}
// VictoriaLogs endpoint
vmLogsURL := vmLogsEp +
"?_msg_field=message" +
"&_time_field=date" +
"&_time_format=unix_ms" +
"&_stream_fields=stream"
// 4. Pipeline: read lines → parse → send
for line := range collector.ScanLines(gzReader) {
rec, err := collector.ParseRawLine(line)
if err != nil {
continue
}
out := &collector.JSONLogRecord{
Timestamp: rec.Timestamp.UnixMilli(),
Stream: "alb",
Message: rec.Message,
}
if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
fmt.Println("send error:", err)
}
}
// 5. delete message from SQS
if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
fmt.Println("FAILED TO DELETE SQS MESSAGE:", err)
} else {
fmt.Println("SQS MESSAGE DELETED")
}
})
// start async
s.StartBlocking()
}
Додавання fields до VictoriaLogs message
Документація по полям AWS ALB – Access log entries.
Що зараз можемо додати до логів у VictoriaLogs – client_ip з client:port, target_ip (Kubernetes Pod) із target:port, elb_status_code – код відповіді ALB, аби потім простіше робити алерти, і target_status_code – аналогічно.
Оновлюємо структуру SimpleLog – додаємо нові поля:
type SimpleLog struct {
Timestamp time.Time
Message string
ClientIP string
TargetIP string
ELBStatus int
TargetStatus int
}
До ParseRawLine() додаємо запис в ці поля, використовуючи array index зі строки fields:
...
elbStatus, _ := strconv.Atoi(fields[8])
targetStatus, _ := strconv.Atoi(fields[9])
return &SimpleLog{
Timestamp: ts.UTC(),
Message: line,
ClientIP: fields[3],
TargetIP: fields[4],
ELBStatus: elbStatus,
TargetStatus: targetStatus,
}, nil
...
Додаємо ці поля до JSONLogRecord:
type JSONLogRecord struct {
Timestamp int64 `json:"date"`
Message string `json:"message"`
Stream string `json:"stream"`
ClientIP string `json:"client_ip"`
TargetIP string `json:"target_ip"`
ELBStatus int `json:"elb_status"`
TargetStatus int `json:"target_status"`
}
І додаємо їх до main.go:
...
out := &collector.JSONLogRecord{
Timestamp: rec.Timestamp.UnixMilli(),
Message: rec.Message,
Stream: "alb",
ClientIP: rec.ClientIP,
TargetIP: rec.TargetIP,
ELBStatus: rec.ELBStatus,
TargetStatus: rec.TargetStatus,
}
...
Тепер у VictoriaLogs маємо нові fields:
Порти в client_ip та target_ip тут явно зайві, можемо їх вирізати під час парсингу.
До parser.go додаємо функцію зі strings.SplitN():
// cut port from "ip:port"
func stripPort(s string) string {
parts := strings.SplitN(s, ":", 2)
return parts[0]
}
І використовуємо її при заповненні SimpleLog:
...
return &SimpleLog{
Timestamp: ts.UTC(),
Message: line,
ClientIP: stripPort(fields[3]),
TargetIP: stripPort(fields[4]),
ELBStatus: elbStatus,
TargetStatus: targetStatus,
}, nil
...
Тепер маємо поля без зайвих портів:
The final result
Весь код разом.
Коменти попросив написати AI, щоб детально по всім функціям було пояснення.
Файл main.go
package main
import (
"alb-logs-collector/collector"
"context"
"fmt"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/go-co-op/gocron"
)
func main() {
// create a base context for all AWS operations
// this context is passed into SQS/S3 functions
ctx := context.Background()
// load shared AWS config from ~/.aws/config and ~/.aws/credentials
// this provides region, credentials, retry settings, etc.
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
// used later for reading and deleting queue messages
sqsClient := sqs.NewFromConfig(cfg)
// create S3 client for reading S3 objects
s3Client := s3.NewFromConfig(cfg)
// read queue name from environment
// this avoids hardcoding queue names in code
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// create the scheduler that runs periodic jobs
// gocron automatically creates goroutines for scheduled tasks
s := gocron.NewScheduler(time.UTC)
// schedule: run the function every minute
s.Every(1).Minute().Do(func() {
fmt.Println("CHECKING SQS...")
// 1. read one message from SQS
// ReceiveFromSQS is implemented in collector/sqs.go
bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
if err != nil {
// "no messages" is not an error — just no new logs
if err.Error() == "no messages" {
fmt.Println("NO MESSAGES")
return
}
fmt.Println("ERROR receiving message:", err)
return
}
fmt.Println("BUCKET:", bucket)
fmt.Println("KEY:", key)
// 2. download the S3 object stream
// GetS3Object located in collector/s3.go
s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
if err != nil {
fmt.Println("S3 error:", err)
return
}
defer s3Obj.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
// 3. wrap the S3 stream into gzip reader
// GzipReader implemented in collector/gzip.go
gzReader, err := collector.GzipReader(s3Obj.Body)
if err != nil {
fmt.Println("gzip error:", err)
return
}
defer gzReader.Close()
// read VictoriaLogs endpoint
vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
if vmLogsEp == "" {
log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
}
// final URL with parameters for jsonline ingestion
vmLogsURL := vmLogsEp +
"?_msg_field=message" + // which JSON field contains the log message
"&_time_field=date" + // which JSON field contains timestamp
"&_time_format=unix_ms" + // tell VictoriaLogs that the timestamp is unix milliseconds
"&_stream_fields=stream" // which field defines the stream name
// 4. process the log file line-by-line
// ScanLines implemented in collector/scan.go
// It returns a channel of RAW log lines (already ungzipped)
for line := range collector.ScanLines(gzReader) {
// parse timestamp + extract fields
// ParseRawLine implemented in collector/parser.go
rec, err := collector.ParseRawLine(line)
if err != nil {
continue // skip invalid ALB lines
}
// prepare the JSON record for VictoriaLogs
// JSONLogRecord defined in collector/victoria.go
out := &collector.JSONLogRecord{
Timestamp: rec.Timestamp.UnixMilli(), // ALB timestamp in unix ms
Message: rec.Message, // full raw ALB log line
Stream: "alb", // log stream name
ClientIP: rec.ClientIP, // extracted from ALB log
TargetIP: rec.TargetIP, // extracted from ALB log
ELBStatus: rec.ELBStatus, // HTTP status from ALB
TargetStatus: rec.TargetStatus, // backend response status
}
// send it to VictoriaLogs
// SendToVictoria implemented in collector/victoria.go
if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
fmt.Println("send error:", err)
}
}
// 5. delete message from SQS after successful processing
// DeleteFromSQS implemented in collector/sqs.go
if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
fmt.Println("FAILED TO DELETE SQS MESSAGE:", err)
} else {
fmt.Println("SQS MESSAGE DELETED")
}
})
// start the scheduler and block main goroutine forever
s.StartBlocking()
}
Файл sqs.go
package collector
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
// S3Event describes the JSON format sent by S3 to SQS.
// It matches the structure of the S3 event notification:
// "Records" → "s3" → "bucket.name" and "object.key".
// This struct is used in ReceiveFromSQS() to extract bucket/key.
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"` // S3 bucketName
} `json:"bucket"`
Object struct {
Key string `json:"key"` // S3 object key (log filename)
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// ReceiveFromSQS reads a single SQS message.
// It returns:
// - bucket: the S3 bucket from event
// - key: the object key (filename in S3)
// - receiptHandle: required later to delete the message
// - queueURL: actual AWS queue URL (needed for deletion)
// - err: error, or "no messages" if queue is empty
//
// This function is called from main.go inside the scheduler loop.
// After processing the file from S3, the caller must call DeleteFromSQS().
func ReceiveFromSQS(ctx context.Context, client *sqs.Client, queueName string) (bucket, key, receiptHandle, queueURL string, err error) {
// resolve real SQS queue URL from queue name
// SQS APIs always operate on queueURL, not the name
getURL, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
return "", "", "", "", err
}
queueURL = *getURL.QueueUrl
// receive exactly one message
// WaitTimeSeconds enables long polling for up to 10s
resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 1,
WaitTimeSeconds: 10,
})
if err != nil {
return "", "", "", "", err
}
// queue empty → no new logs
if len(resp.Messages) == 0 {
return "", "", "", "", fmt.Errorf("no messages")
}
// take the first (and only) message
msg := resp.Messages[0]
// ReceiptHandle is mandatory for deletion
receiptHandle = *msg.ReceiptHandle
// raw JSON body received from S3 → SQS notification
raw := aws.ToString(msg.Body)
fmt.Println("SQS RAW:", raw)
// filter out AWS TestEvent generated when enabling notifications
// Test events do NOT contain real logs and must be ignored
if strings.Contains(raw, `"Event":"s3:TestEvent"`) {
fmt.Println("Skipping AWS S3 test event")
return "", "", "", queueURL, fmt.Errorf("test event skipped")
}
// unmarshal JSON into S3Event struct
var event S3Event
if err := json.Unmarshal([]byte(raw), &event); err != nil {
return "", "", "", "", err
}
// extract bucket and key from the event
bucket = event.Records[0].S3.Bucket.Name
key = event.Records[0].S3.Object.Key
return bucket, key, receiptHandle, queueURL, nil
}
// DeleteFromSQS permanently removes the processed message.
// Must be called only after successful S3 → gzip → parsing → VictoriaLogs ingestion.
// If not deleted, SQS will retry delivery (visibility timeout expires).
// Implemented in collector/sqs.go, called from main.go.
func DeleteFromSQS(ctx context.Context, client *sqs.Client, queueURL, receiptHandle string) error {
_, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(receiptHandle),
})
return err
}
Файл s3.go
package collector
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// GetS3Object retrieves an S3 object and returns a streaming reader.
//
// This function does NOT download the file to disk.
// It returns a network stream (`GetObjectOutput.Body`), which allows the caller
// to read the object lazily, byte-by-byte, directly from S3.
//
// In our pipeline, the call chain looks like:
//
// main.go
// → ReceiveFromSQS() to get bucket/key (collector/sqs.go)
// → GetS3Object() to open S3 object stream (this file)
// → GzipReader() to decompress gzip data (collector/gzip.go)
// → ScanLines() to iterate log lines (collector/scan.go)
//
// Notes:
// - The returned object must be closed by the caller: `defer obj.Body.Close()`
// - S3 GetObject supports range requests, but here we read the whole file.
// - The body is read sequentially; this is efficient for log ingestion patterns.
func GetS3Object(ctx context.Context, client *s3.Client, bucket, key string) (*s3.GetObjectOutput, error) {
return client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket), // name of S3 bucket with ALB logs
Key: aws.String(key), // key (path/filename) of the gzip log file
})
}
Файл gzip.go
package collector
import (
"compress/gzip"
"io"
)
// GzipReader wraps an existing io.Reader with a gzip decompressor.
//
// This function takes the S3 object body (returned by GetS3Object in
// collector/s3.go) and turns it into a gzip.Reader capable of producing
// the decompressed content of the ALB log file.
//
// Typical pipeline:
//
// S3Object.Body (io.ReadCloser)
// ↓ passed into
// GzipReader() → *gzip.Reader (still a stream)
// ↓ passed into
// ScanLines() → yields plain-text log lines
//
// Notes:
// - gzip.NewReader expects the input stream to be in .gz format.
// - Caller must close the returned reader: `defer gz.Close()`.
// - No buffering or scanning is done here; this function only wraps the stream.
func GzipReader(r io.Reader) (*gzip.Reader, error) {
return gzip.NewReader(r)
}
Файл scan.go
package collector
import (
"bufio"
"io"
)
// ScanLines reads an arbitrary io.Reader line-by-line and returns a channel
// that produces each extracted line as a string.
//
// This function is used in main.go to iterate through the contents of an S3
// Gzip file. The call chain is:
//
// main.go → GetS3Object() (collector/s3.go)
// → GzipReader() (collector/gzip.go)
// → ScanLines() (this file)
//
// Because ScanLines() uses a goroutine, the caller can process lines
// asynchronously using: `for line := range ScanLines(r) { ... }`.
//
// Notes:
// - The returned channel is *unbuffered*, so each send blocks until the caller
// receives the value. This naturally rate-controls the goroutine.
// - bufio.Scanner splits input by '\n', automatically handling different line
// lengths (up to scanner’s max buffer).
// - When the reader is fully consumed, the goroutine closes the channel.
func ScanLines(r io.Reader) <-chan string {
ch := make(chan string)
// launch a goroutine that streams lines out of the reader
go func() {
// ensure channel is closed when scanning finishes
defer close(ch)
// bufio.Scanner provides efficient, line-oriented reading of text streams
scanner := bufio.NewScanner(r)
// loop until EOF or error. Each iteration reads the next line
for scanner.Scan() {
// push extracted line into the channel
// (blocks until caller receives it)
ch <- scanner.Text()
}
// scanner.Err() is intentionally ignored here, because error handling
// is performed by the caller when needed, and the channel-based design
// treats EOF as natural termination.
}()
// return read-only channel of strings
return ch
}
Файл parser.go
package collector
import (
"fmt"
"strconv"
"strings"
"time"
)
// SimpleLog represents a minimal, lightweight structure containing:
//
// - Timestamp: parsed ALB timestamp in UTC
// - Message: the full raw log line (used as message in VictoriaLogs)
// - ClientIP: extracted client IP (port removed)
// - TargetIP: extracted backend target IP (port removed)
// - ELBStatus: HTTP status returned by ALB to the client
// - TargetStatus: HTTP status returned by the backend to ALB
//
// This struct is designed for the simplified ingestion phase,
// before implementing full ALB field parsing.
type SimpleLog struct {
Timestamp time.Time
Message string
ClientIP string
TargetIP string
ELBStatus int
TargetStatus int
}
// stripPort removes the ":port" suffix from IP strings like "1.2.3.4:5678".
// This keeps VictoriaLogs cardinality low, avoiding creation of thousands
// of separate series due to ephemeral client ports.
func stripPort(s string) string {
parts := strings.SplitN(s, ":", 2)
return parts[0]
}
// ParseRawLine parses the essential ALB log fields:
//
// - Timestamp from the 2nd field
// - Client IP (without port)
// - Target IP (without port)
// - ALB HTTP status
// - Target HTTP status
//
// Everything else is kept untouched in the Message field.
//
// ALB logs are space-delimited, except for quoted sections
// (like the request line and user agent). At this simplified stage,
// we do *not* parse quoted fields — we only extract the mandatory parts.
func ParseRawLine(line string) (*SimpleLog, error) {
fields := strings.Fields(line)
// we expect at least: protocol, timestamp, elb, client, target, ... status codes
// ALB format is consistent — <10 fields means corrupted input
if len(fields) < 10 {
return nil, fmt.Errorf("invalid ALB log line")
}
fmt.Println("--- PARSER SEND ---")
fmt.Println(line)
fmt.Println("--- PARSER DEBUG ---")
// parse timestamp in RFC3339Nano format
// ALB always emits timestamps in UTC
ts, err := time.Parse(time.RFC3339Nano, fields[1])
if err != nil {
return nil, fmt.Errorf("timestamp parse error: %w", err)
}
// convert status codes from string → int
elbStatus, _ := strconv.Atoi(fields[8])
targetStatus, _ := strconv.Atoi(fields[9])
return &SimpleLog{
Timestamp: ts.UTC(), // ensure strict UTC normalization
Message: line, // pass full raw line to VictoriaLogs
ClientIP: stripPort(fields[3]),
TargetIP: stripPort(fields[4]),
ELBStatus: elbStatus,
TargetStatus: targetStatus,
}, nil
}
Файл victoria.go
package collector
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
// JSONLogRecord represents a single log entry formatted for
// VictoriaLogs JSONLine ingestion API.
//
// Field mapping:
//
// - Timestamp → "date"
// UNIX milliseconds timestamp of the log event.
// This must match ?_time_field=date&_time_format=unix_ms in ingestion URL.
//
// - Message → "message"
// The complete raw ALB log line. Used as the primary log message.
//
// - Stream → "stream"
// Logical log stream identifier. Used in ingestion via ?_stream_fields=stream.
//
// - ClientIP / TargetIP / ELBStatus / TargetStatus
// Additional parsed metadata fields. These become searchable log fields.
//
// The structure intentionally avoids nested JSON — VictoriaLogs processes
// flat objects more efficiently and without ambiguity in field extraction.
type JSONLogRecord struct {
Timestamp int64 `json:"date"`
Message string `json:"message"`
Stream string `json:"stream"`
ClientIP string `json:"client_ip"`
TargetIP string `json:"target_ip"`
ELBStatus int `json:"elb_status"`
TargetStatus int `json:"target_status"`
}
// SendToVictoria sends exactly ONE JSON record to the VictoriaLogs JSONLine API.
//
// This function is called from main.go inside the ingestion loop.
// It performs the following steps:
//
// 1. Marshal the JSONLogRecord into a compact JSON object
// 2. Build an HTTP POST request with Content-Type: application/stream+json
// 3. Send the request to VictoriaLogs
// 4. Check for non-2xx HTTP status codes
//
// Notes:
// - VictoriaLogs expects a "streaming JSON" format, where each POST body
// contains a single JSON object (or multiple lines if needed).
// - We send logs one-by-one for simplicity, but batching can be added later.
// - The caller controls the ingestion endpoint URL, including query params:
// ?_msg_field=message&_time_field=date&_time_format=unix_ms&_stream_fields=stream
//
// Errors returned from this function are logged in main.go, and do not
// interrupt the ingestion pipeline.
func SendToVictoria(url string, rec *JSONLogRecord) error {
// serialize record into JSON line
body, err := json.Marshal(rec)
if err != nil {
return err
}
// debug output for transparency
fmt.Println("--- VMLOGS SEND ---")
fmt.Println(string(body))
fmt.Println("--- VMLOGS DEBUG ---")
// create POST request with streaming JSON payload
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
return err
}
// JSONLine ingestion requires this content type
req.Header.Set("Content-Type", "application/stream+json")
// send HTTP request using default HTTP client
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// check server-side errors
if resp.StatusCode >= 300 {
return fmt.Errorf("victoria response status: %s", resp.Status)
}
return nil
}
Вже запустив в Kubernetes, все працює.
Єдиний момент, що SQS та S3 операції не падають при access denied, треба додати перевірку помилок.









