![]()
The next task I want to do with Golang is building a custom log collector that retrieves AWS Application Load Balancer logs from S3 and sends them to VictoriaLogs.
Sure, we could simply use Vector.dev, like what I did for AWS VPC Flow Logs (see: Vector.dev introduction, logs from AWS S3, and VictoriaLogs integration). But now there is a good opportunity to get more hands-on experience with Go, so I’ll create our own collector instead.
So, the main idea now is this:
- Load Balancer writes logs to S3
- In S3, we create a notification to AWS SQS.
- Our collector polls SQS and receives information about new objects in S3.
- makes a request to S3, receives a gz archive
- unpacks, parses data, and sends it to VictoriaLogs
Let’s go.
Contents
Configuring S3 and SQS notifications
When designing the solution, the main question was, how do we keep track of which S3 objects have already been processed and which ones are new?
One option would be to use a database to store metadata about processed files. But this would eventually accumulate a large amount of state, and I don’t really want to introduce a stateful service into our Kubernetes environment just for this.
Instead, we will keep it simple and reliable, following the same approach we used with Vector.dev and AWS VPC Flow Logs by using an SQS queue to receive notifications about new objects in S3.
Our collector will poll messages from SQS, extract information about new log files, process them, and then delete the messages from the queue.
See documentation on AWS – Walkthrough: Configuring a bucket for notifications (SNS topic or SQS queue).
Creating an SQS queue
For now, we will create an SQS queue manually and later manage it properly using Terraform.
Keep the queue type set to Standard because:
Amazon Simple Queue Service FIFO (First-In-First-Out) queues aren’t supported as an Amazon S3 event notification destination
See Amazon S3 Event Notifications.
Also, message ordering is not important in our case, since each log entry contains its own timestamp that we will parse anyway.
So let’s go to SQS and create a new queue:
Next, we need to describe a policy in the Access policy.
S3 will send messages to the queue, while our Kubernetes Pod (using a dedicated ServiceAccount) will read those messages. So the Policy must allow these actions.
For Kubernetes access, we’ll create the ServiceAccount and IAM Role later, so for now, let’s simply grant SQS permissions to everyone in our AWS Account.
And for the S3, add permission on the SQS:SendMessage API call:
{
"Version": "2012-10-17",
"Id": "loggerID",
"Statement": [
{
"Sid": "sendFromS3LogsAllow",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": [
"SQS:SendMessage"
],
"Resource": "arn:aws:sqs:us-east-1:492***148:testing-alb-logs-s3-notifier",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:ops-1-33-devops-ingress-ops-alb-loki-logs"
}
}
},
{
"Sid": "receiveFromQueueAllowSameAccount",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::492***148:root"
},
"Action": [
"SQS:ReceiveMessage",
"SQS:DeleteMessage",
"SQS:GetQueueAttributes",
"SQS:GetQueueUrl"
],
"Resource": "arn:aws:sqs:us-east-1:492***148:testing-alb-logs-s3-notifier"
}
]
}
Leave the rest as default.
Configuring S3 Event Notifications
Go to an S3 bucket with logs > Properties > Event notifications:
Create a new Event notification to send notifications about all s3:ObjectCreated operations:
In the Destination, select SQS and our queue:
Wait a few minutes, and check the Monitoring tab in the SQS queue:
Okay, messages are here. Now we need to collect and read them.
Let’s move on to Go.
AWS SDK for Go
To interact with AWS, we will use the AWS SDK for Go, which provides all the required APIs.
For authentication, we import aws-sdk-go-v2/config and call the LoadDefaultConfig() function. It automatically looks for credentials in environment variables, ~/.aws/credentials and ~/.aws/config files, or uses EC2 IAM Roles.
The first argument to LoadDefaultConfig() must be a context. I covered Go contexts earlier in my previous post Golang: Creating an OpenAI Exporter for VictoriaMetrics.
For now, we will simply use context.Background(). Later, we will add proper handling and graceful shutdown logic:
package main
import (
"context"
"log"
"github.com/aws/aws-sdk-go-v2/config"
)
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
}
Go and AWS SDK SQS
To work with Amazon SQS, the AWS SDK provides a separate module named sqs.
SQS client
Add its import and create the SQS client using NewFromConfig(), passing in the AWS config object we initialized earlier.
To interact with a queue, we need its URL, which we can obtain by calling GetQueueUrl().
We also load the QueueName from an environment variable, since later in Kubernetes it will be passed through Helm chart values:
package main
import (
"context"
"fmt"
"log"
"os"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
sqsClient := sqs.NewFromConfig(cfg)
// read queue name from environment
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// request queue URL by name
getURLResp, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
log.Fatal(err)
}
queueURL := getURLResp.QueueUrl
fmt.Println("Queue URL:", *queueURL)
}
Run go mod init:
$ go mod init alb-logs-collector-poc $ go mod tidy
Set a variable with the name of the queue:
$ export ALB_LOGS_QUEUE="testing-alb-logs-s3-notifier"
Let’s run our code:
$ go run main.go Queue URL: https://sqs.us-east-1.amazonaws.com/492***148/testing-alb-logs-s3-notifier
OK, now we can receive messages.
SQS ReceiveMessage()
We read messages in the queue using ReceiveMessage(), passing a ReceiveMessageInput configuration that defines how many messages to fetch, how long to wait, and other parameters:
...
// receive a single message from SQS
msgResp, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: queueURL, // URL returned by the GetQueueUrl()
MaxNumberOfMessages: 1, // receive only one message
WaitTimeSeconds: 10, // enable long polling
})
if err != nil {
log.Fatal(err)
}
if len(msgResp.Messages) == 0 {
fmt.Println("no messages received")
return
}
// print the received message body
fmt.Println("Received message:", aws.ToString(msgResp.Messages[0].Body))
...
Let’s check:
$ go run main.go | jq
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
...
"eventName": "ObjectCreated:Put",
...
"s3": {
...
"bucket": {
"name": "ops-1-33-devops-ingress-ops-alb-loki-logs",
...
"object": {
"key": "AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T0935Z_34.***.***. 15_25hsfvwt.log.gz",
...
}
}
}
]
}
We are interested in two fields in the SQS message payload: bucket.name and object.key.
Create a struct for this data and use json.Unmarshal() to decode the JSON into it. I covered JSON parsing earlier in the same post about the OpenAI Exporter, in the section “Creating a Go struct for JSON Unmarshall:
...
// define a struct to unmarshal S3 event message
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// take SQS message body
msgBody := aws.ToString(msgResp.Messages[0].Body)
// decode JSON into the struct
var event S3Event
if err := json.Unmarshal([]byte(msgBody), &event); err != nil {
log.Fatal("failed to parse S3 event:", err)
}
// extract bucket and key
// we always have 1 message, so use [0]
bucket := event.Records[0].S3.Bucket.Name
key := event.Records[0].S3.Object.Key
fmt.Println("bucket:", bucket)
fmt.Println("key:", key)
...
Let’s check again:
$ go run main.go bucket: ops-1-33-devops-ingress-ops-alb-loki-logs key: AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T0935Z_52.***.***.213_60mjhvf6.log.gz
Now, we can move on to S3.
Go and AWS SDK S3
Similar to SQS, we use a dedicated package s3 package.
S3 client
Create a client with NewFromConfig():
... // create S3 client using the shared AWS config s3Client := s3.NewFromConfig(cfg) ...
S3 GetObject()
Add reading the file with GetObject().
Similar to SQS, we provide a GetObjectInput configuration when calling it:
...
// fetch the S3 object and return its streaming body
objResp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
log.Fatal("failed to download object:", err)
}
defer objResp.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
...
GetObject() returns *GetObjectOutput, which has a field Body of type io.ReadCloser, and io.ReadCloser is an interface that defines two methods: Reader and Closer.
Reading a file from gzip
Since the log files in S3 are gzip-compressed, we import the gzip package and use NewReader() to decode the data before processing it:
...
// create gzip reader from S3 object stream
gzReader, err := gzip.NewReader(objResp.Body)
if err != nil {
log.Fatal("failed to create gzip reader:", err)
}
defer gzReader.Close()
fmt.Println("gzip stream opened")
...
NewReader() expects an io.Reader, so we can simply pass the objResp.Body to it.
However, gzip.NewReader() does not return the actual data directly – it only creates a reader that will produce decompressed content on demand.
Therefore, we wrap it in bufio to read the decompressed log lines efficiently.
Reading gzip output from bufio
Add bufio import and use NewScanner() to read decompressed data from gzip.NewReader ().
Then, using Scan() and Text(), we convert each scanned line into a string for further processing:
...
// create scanner to read decompressed log lines
scanner := bufio.NewScanner(gzReader)
// increase buffer size if ALB logs have long lines
// default scanner buffer = 64 KB
buf := make([]byte, 0, 1024*1024) // 1 MB
scanner.Buffer(buf, 1024*1024)
// iterate over every line in the decompressed file
for scanner.Scan() {
line := scanner.Text()
fmt.Println("log line:", line)
}
if err := scanner.Err(); err != nil {
log.Fatal("scanner error:", err)
}
...
Let’s check:
$ go run main.go | head S3 object stream opened: ops-1-33-devops-ingress-ops-alb-loki-logs AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1005Z_52.***.***.213_2nbanv4o.log.gz log line: h2 2025-11-25T10:00:02.271561Z app/k8s-ops133externalalb-***/336cddd33c043f33 52.***.***. 183:60978 10.0.47.34:8080 0.001 0.005 0.001 200 200 38 5492 "GET https://lightdash.example.co:443/ HTTP/2.0" "Blackbox Exporter/0.27.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492*** 148:targetgroup/k8s-proddata-lightdas-fa7b1ce474/c400fe91849c401a "Root=1-69257e22-4b16fe8d62d2f8955edc6da8" "lightdash.example.co" "arn:aws:acm:us-east-1:492***148:certificate/77230c5f-d0c2-4e58-b579-8b8422686986" 15 2025-11-25T10:00:02. 264000Z "forward" "-" "-" "10.0.47.34:8080" "200" "-" "-" TID_48df8c3791b69144b4ae0f6084e015d6 "-" "-" "-" ...
Breaking main() into functions
Our main() has grown quite a bit, and the core functionality is already working.
Now let’s make the code cleaner and more readable.
Here is the full code now:
package main
import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"log"
"os"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
sqsClient := sqs.NewFromConfig(cfg)
// read queue name from environment
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// request queue URL by name
getURLResp, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
log.Fatal(err)
}
queueURL := getURLResp.QueueUrl
//fmt.Println("Queue URL:", *queueURL)
// receive a single message from SQS
msgResp, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: queueURL, // URL який ми отримали раніше
MaxNumberOfMessages: 1, // receive only one message
WaitTimeSeconds: 10, // enable long polling (recommended)
})
if err != nil {
log.Fatal(err)
}
if len(msgResp.Messages) == 0 {
fmt.Println("no messages received")
return
}
// print the received message body
//fmt.Println(aws.ToString(msgResp.Messages[0].Body))
// define a struct to unmarshal S3 event message
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// take SQS message body
msgBody := aws.ToString(msgResp.Messages[0].Body)
// decode JSON into the struct
var event S3Event
if err := json.Unmarshal([]byte(msgBody), &event); err != nil {
log.Fatal("failed to parse S3 event:", err)
}
// extract bucket and key
// we always have 1 message, so use [0]
bucket := event.Records[0].S3.Bucket.Name
key := event.Records[0].S3.Object.Key
//fmt.Println("bucket:", bucket)
//fmt.Println("key:", key)
// create S3 client using the shared AWS config
s3Client := s3.NewFromConfig(cfg)
// fetch the S3 object and return its streaming body
objResp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
log.Fatal("failed to download object:", err)
}
defer objResp.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
// create gzip reader from S3 object stream
gzReader, err := gzip.NewReader(objResp.Body)
if err != nil {
log.Fatal("failed to create gzip reader:", err)
}
defer gzReader.Close()
//fmt.Println("gzip stream opened")
// create scanner to read decompressed log lines
scanner := bufio.NewScanner (gzReader)
// increase buffer size if ALB logs have long lines
// default scanner buffer = 64 KB
buf := make([]byte, 0, 1024*1024) // 1 MB
scanner.Buffer(buf, 1024*1024)
// iterate over every line in the decompressed file
for scanner.Scan() {
line := scanner.Text()
fmt.Println("log line:", line)
}
if err := scanner.Err(); err != nil {
log.Fatal("scanner error:", err)
}
}
How can we organize the process?
In the main(), we perform all initializations, and then will poll SQS in a loop:
- create
context - creating AWS config
- create
sqsClientands3Clientclients - read environment variables (
ALB_LOGS_QUEUE, then add more) - and then in a loop:
- call the
receiveFromSQS()function – check if there are any new messages - call the
getS3Object()function – if there are messages, we go to S3 and read a new archive from there - call the
processLogFile()function – read lines from each received log
- call the
What functions will be needed for this?
- function
ReceiveFromSQS():- here we will pass the
context, SQS client, SQS queue URL, and write the bucket name and key (file name) to theS3Eventstructure - for further deletion of messages after successful processing, we will need to return
receiptHandle
- here we will pass the
- function
GetS3Object():- function receives
context, AWS Config,bucket,key - executes
GetObject()and returnsGetObjectOutput
- function receives
- function
GzipReader():- reads data from
GetS3Object(), unpacks it, and returns strings
- reads data from
- function
ScanLines():- receives data from
GzipReader(), reads fromText(), and for now simply outputs to the console
- receives data from
Package collector
For convenience and to keep everything organized, let’s divide everything into separate files:
collector/
sqs.go
s3.go
gzip.go
scan.go
main.go
File collector/sqs.go
When testing, I faced with the following error:
$ go run main.go | head
panic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
alb-logs-collector/collector.ReceiveFromSQS({0xa83530, 0xdda060}, 0xc00021d888, {0xc000160050, 0x4d})
/home/setevoy/Projects/Go/alb-logs-collector/collector/sqs.go:52 +0x25d
main.main()
/home/setevoy/Projects/Go/alb-logs-collector/main.go:46 +0x305
exit status 2
It happens because AWS adds test messages to the queue:
{"Service":"Amazon S3","Event":"s3:TestEvent","Time":"2025-11-24T11:10:31.573Z","Bucket":"ops-1-33-devops-ingress-ops-alb-loki-logs", ...}
Therefore, we will add a check to ReceiveFromSQS().
ReceiveFromSQS() function
Describe receiving messages from SQS:
package collector
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
// S3Event for SQS message
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// ReceiveFromSQS reads one message and returns bucket, key, receiptHandle, queueURL
func ReceiveFromSQS(ctx context.Context, client *sqs.Client, queueName string) (bucket, key, receiptHandle, queueURL string, err error) {
// get real queue URL
getURL, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
return "", "", "", "", err
}
queueURL = *getURL.QueueUrl
// receive one message
resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 1,
WaitTimeSeconds: 10,
})
if err != nil {
return "", "", "", "", err
}
if len(resp.Messages) == 0 {
return "", "", "", "", fmt.Errorf("no messages")
}
// receive one message
msg := resp.Messages[0]
// create ReceiptHandle to return to the delete function
receiptHandle = *msg.ReceiptHandle
raw := aws.ToString(msg.Body)
fmt.Println("SQS RAW:", raw)
// skip AWS test event
if strings.Contains(raw, `"Event":"s3:TestEvent"`) {
fmt.Println("Skipping AWS S3 test event")
return "", "", "", queueURL, fmt.Errorf("test event skipped")
}
// parse S3 event message
var event S3Event
if err := json.Unmarshal([]byte(raw), &event); err != nil {
return "", "", "", "", err
}
bucket = event.Records[0].S3.Bucket.Name
key = event.Records[0].S3.Object.Key
return bucket, key, receiptHandle, queueURL, nil
}
DeleteFromSQS() function
Now, add a deletion function, which we will call after successful log processing:
// DeleteFromSQS deletes message after successful processing
func DeleteFromSQS(ctx context.Context, client *sqs.Client, queueURL, receiptHandle string) error {
_, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(receiptHandle),
})
return err
}
collector/s3.go file and GetS3Object() function
package collector
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// GetS3Object returns stream reader of S3 file
func GetS3Object(ctx context.Context, client *s3.Client, bucket, key string) (*s3.GetObjectOutput, error) {
return client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
}
collector/gzip.go file and GzipReader() function
We also move it to a dedicated file to make the entire code more logical.
And later, we may need to add some checks here:
package collector
import (
"compress/gzip"
"io"
)
// GzipReader wraps S3 body
func GzipReader(r io.Reader) (*gzip.Reader, error) {
return gzip.NewReader(r)
}
collector/scan.go file and ScanLines() function
package collector
import (
"bufio"
"fmt"
"io"
)
// ScanLines reads log lines from reader
func ScanLines(r io.Reader) error {
scanner := bufio.NewScanner(r)
// increase max line size
buf := make([]byte, 0, 1024*1024)
scanner.Buffer(buf, 1024*1024)
// iterate over every line in the decompressed file
for scanner.Scan() {
line := scanner.Text()
fmt.Println("log line:", line)
}
return scanner.Err()
}
Changes in the main.go
Add the import of our collector package to the main.go and call the new functions.
Now the entire main.go file looks like this:
package main
import (
"alb-logs-collector-poc/collector"
"context"
"fmt"
"log"
"os"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
sqsClient := sqs.NewFromConfig(cfg)
// create S3 client using the shared AWS config
s3Client := s3.NewFromConfig(cfg)
// read queue name from environment
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// 1. get message
//bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queue)
bucket, key, _, _, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
if err != nil {
if err.Error() == "no messages" {
fmt.Println("NO MESSAGES")
return
}
fmt.Println("ERROR receiving message:", err)
return
}
fmt.Println("BUCKET:", bucket)
fmt.Println("KEY:", key)
// 2. get S3 object
s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
if err != nil {
fmt.Println("S3 error:", err)
return
}
defer s3Obj.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
// 3. gzip reader
gzReader, err := collector.GzipReader(s3Obj.Body)
if err != nil {
fmt.Println("gzip error:", err)
return
}
defer gzReader.Close()
// 4. scan all lines in log file
err = collector.ScanLines(gzReader)
if err != nil {
log.Fatal("scanner error:", err)
}
}
Let’s check:
$ go run main.go
SQS RAW: {"Records":[{"eventVersion":"2.1"," eventSource":"aws:s3","awsRegion":"us-east-1","eventTime":"2025-11-25T11:15:01.559Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:***.***: elblogdelivery-session"},"requestParameters":{"sourceIPAddress":"2600:***.***:c0ef"},"responseElements":{"x-amz-request-id":"Y6523DEXMF2DS6FG"," x-amz-id-2":"tWgdxqzRbKrjUGDMLLkrOtKQW6S6aWT31VHomgaNm0UAIlzeshheXEGgZN3yRH4pMlWdzfBLqBlZuh3BO0QghDkTVU2WllFDKmh22/B1Cqk="}," s3":{"s3SchemaVersion":"1.0","configurationId":"objectCreatedEventSqs","bucket":{"name":"ops-1-33-devops-ingress-ops-alb-loki-logs","ownerIdentity":{"principalId":"***.***"}," arn":"arn:aws:s3:::ops-1-33-devops-ingress-ops-alb-loki-logs"},"object":{"key":"AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34***.***. 15_44qtqgut.log.gz","size":17823,"eTag":"36d1243edd2a7a98546e7af645c36068","sequencer":"0069258FB5806A2506"}}}]}
BUCKET: ops-1-33-devops-ingress-ops-alb-loki-logs
KEY: AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34.225.155.15_44qtqgut.log.gz
S3 object stream opened: ops-1-33-devops-ingress-ops-alb-loki-logs AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34.***.***. 15_44qtqgut.log.gz
log line: h2 2025-11-25T11:10:06.230595Z app/k8s-ops133externalalb-***/336cddd33c043f33 62.***.***. 83:32026 10.0.44.225:8000 0.000 0.034 0.000 200 200 887 1165 "GET https://staging.api.challenge.example.co:443/admin/users/list?limit=1&user=test_thread_1_ci_ios_ui_participant2%40test.example.co HTTP/2.0" "Challenge App UI Tests-Runner/1.0 (com.challengeapp.uitests. Challenge-App-UI-Tests.xctrunner; build:1; iOS 18.5.0) Alamofire/5.9.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492*** 148:targetgroup/k8s-stagingb-backenda-410aa6288b/0d639f3b859bc8fb "Root=1-69258e8e-04c4384c05934a37738043b0" "staging.api.challenge.example.co" "arn:aws:acm:us-east-1:492***148:certificate/c3b6ec41-50a0-488e-93bb-b03967405f8c" 24 2025-11-25T11:10:06. 195000Z "forward" "-" "-" "10.0.44.225:8000" "200" "-" "-" TID_d1e2c09f4fe6cf46b7c573f42b97889d "-" "-" "-"
...
Logging to VictoriaLogs
VictoriaLogs supports various recording formats, and we’ll use the JSON stream API.
First, let’s try to write some logs manually.
Open a port to VictoriaLogs Kubernetes Service:
$ kk -n ops-monitoring-ns port-forward svc/atlas-victoriametrics-victoria-logs-single-server 9428
And make a request with curl:
$ echo '{ "log": { "level": "info", "message": "TEST" }, "date": "0", "stream": "alb" }
' | curl -X POST -H 'Content-Type: application/stream+json' --data-binary @- \
'http://localhost:9428/insert/jsonline?_stream_fields=stream&_time_field=date&_msg_field=log.message'
Let’s check:
Creating the file collector/victoria.go
Let’s create a minimal file for testing.
It contains:
JSONLogRecordstruct: here we will generate JSON for sending to the VictoriaLogs HTTP API- we fill it in with data, and in the
Timestampfield, we can pass “Unix timestamp in seconds, milliseconds, microseconds, or nanoseconds“, we’ll do this with thetime.UnixMilli() - then, set values for the
streamandmessagefields - form an HTTP request
- send standard
http.Client.Do()
package collector
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
)
// JSONLogRecord is the minimal structure for VictoriaLogs JSONLine API
type JSONLogRecord struct {
Timestamp int64 `json:"date"`
Stream string `json:"stream"`
Message string `json:"message"`
}
// SendTestRecord sends simple test log record to VictoriaLogs
func SendTestRecord(url string) error {
rec := JSONLogRecord{
Timestamp: time.Now().UTC().UnixMilli(),
Stream: "test",
Message: "TEST VICTORIA",
}
body, err := json.Marshal (rec)
if err != nil {
return fmt.Errorf("marshal error: %w", err)
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("request error: %w", err)
}
req.Header.Set("Content-Type", "application/stream+json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("http error: %w", err)
}
defer resp.Body.Close ()
if resp.StatusCode != 200 {
return fmt.Errorf("victoria returned %d", resp.StatusCode)
}
fmt.Println("TEST RECORD SENT TO VICTORIA")
return nil
}
Add the VictoriaLogs endpoint from the environment variables to main.go and use it to build a full URL, specifying which fields should be considered as _msg, where the _time will be, time format, and which field to use to create a Log stream:
...
// "http://localhost:9428/insert/jsonline"
vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
if vmLogsEp == "" {
log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
}
// VictoriaLogs endpoint
vmLogsURL := vmLogsEp +
"?_msg_field=message" +
"&_time_field=date" +
"&_time_format=unix_ms" +
"&_stream_fields=stream"
// 5. send test record to VictoriaLogs
err = collector.SendVlTestRecord(vmLogsURL)
if err != nil {
fmt.Println("ERROR sending test record:", err)
return
}
...
Set a variable:
$ export VICTORIA_LOGS_URL="http://localhost:9428/insert/jsonline"
Run the code:
$ go run main.go ... TEST RECORD SENT TO VICTORIA
Check:
OK, it works.
Creating a Log Parser
Now let’s update our code and add new functions:
collector/scan.go: will read data fromgzipand write it to the channel as stringscollector/parser.go: generatesSimpleLogwith fieldsTimestampandMessage
Edit scan.go – instead of creating a buffer, create a channel, and scanner.Text() will now write to the channel instead of the buffer:
package collector
import (
"bufio"
"io"
)
// ScanLines reads lines from an io.Reader and sends them into a channel
// caller must range over the returned channel
func ScanLines(r io.Reader) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
scanner := bufio.NewScanner(r)
// - bufio.Scanner reads decompressed bytes from GzipReader()
// - it splits input by '\n' and returns each line as a complete string
// - each line is sent into the channel for further processing
for scanner.Scan() {
ch <- scanner.Text()
}
}()
return ch
}
Create parser.go with the SimpleLog structure.
We will keep the time from the log records in the structure, and set the entire log record message in the Message filed:
package collector
import (
"fmt"
"strings"
"time"
)
// SimpleLog contains parsed timestamp and original line
type SimpleLog struct {
Timestamp time.Time
Message string
}
// ParseRawLine parses ALB log timestamp from the 2nd field.
// Everything else we keep raw.
func ParseRawLine(line string) (*SimpleLog, error) {
fields := strings.Fields(line)
if len(fields) < 2 {
return nil, fmt.Errorf("invalid ALB log line")
}
fmt.Println("--- PARSER SEND ---")
fmt.Println(line)
fmt.Println("--- PARSER DEBUG ---")
ts, err := time.Parse(time.RFC3339Nano, fields[1])
if err != nil {
return nil, fmt.Errorf("timestamp parse error: %w", err)
}
return &SimpleLog{
Timestamp: ts.UTC(),
Message: line,
}, nil
}
Update victoria.go – now it will not fill in the data in JSONLogRecord, but will accept it as an argument:
package collector
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
)
// JSONLogRecord is the minimal structure for VictoriaLogs JSONLine API
type JSONLogRecord struct {
Timestamp int64 `json:"date"`
Stream string `json:"stream"`
Message string `json:"message"`
}
// SendToVictoria sends one JSON LINE into VictoriaLogs JSON Stream API.
func SendToVictoria(url string, rec *JSONLogRecord) error {
body, err := json.Marshal(rec)
if err != nil {
return err
}
fmt.Println("--- VMLOGS SEND ---")
fmt.Println(string(body))
fmt.Println("--- VMLOGS DEBUG ---")
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/stream+json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("victoria response status: %s", resp.Status)
}
return nil
}
In main.go, add a loop and add message deletion from the SQS with the DeleteFromSQS() call, which we created earlier:
...
// 4. Pipeline: read lines → parse → send
for line := range collector.ScanLines(gzReader) {
rec, err := collector.ParseRawLine(line)
if err != nil {
continue
}
out := &collector.JSONLogRecord{
Timestamp: rec.Timestamp.UnixMilli(),
Stream: "alb",
Message: rec.Message,
}
if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
fmt.Println("send error:", err)
}
}
// 5. Delete SQS message
if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
fmt.Println("delete error:", err)
}
}
While collector.ScanLines() returns data, we pass it to ParseRawLine(), which fills SimpleLog with Timestamp and Message.
Then we fill in the JSONLogRecord and send it to VictoriaLogs.
Here, we can purge the SQS queue: while I was writing the code, old messages accumulated there, and the exporter started pulling old logs. I spent a some time looking for why the time in VictoriaLogs did not match the expected time, but the problem turned out to be trivial – I was looking at data for the last 15 minutes, and morning records were being imported.
Let’s check:
$ go run main.go
...
--- PARSER SEND ---
https 2025-11-25T12:36:50.859909Z app/k8s-ops133externalalb-***/336cddd33c043f33 3.***.***. 78:3883 10.0.46.229:8000 0.000 0.010 0.000 200 200 2023 574 "POST https://api.challenge.example.co:443/auth/auth0-webhooks/post-login HTTP/1.1" "axios/1.6.5" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492*** 148:targetgroup/k8s-prodback-backenda-47ba3e0f35/9ec763ecd48352da "Root=1-6925a2e2-6e4507dd3a23de072c2f6ae9" "api.challenge.example.co" "arn:aws:acm:us-east-1:492***148:certificate/beeb3714-511e-414b-b1f3-5440746bb5ea" 12 2025-11-25T12:36:50.837000Z "forward" "-" "-" "10.0.46.229:8000" "200" "-" "-" TID_5ed365cd6c6f57409a2566d1dcaf049c "-" "-" "-"
--- PARSER DEBUG ---
--- VMLOGS SEND ---
{"date":1764074210859,"stream":"alb","message":"https 2025-11-25T12:36:50.859909Z app/k8s-ops133externalalb-***/336cddd33c043f33 3.** *.***. 78:3883 10.0.46.229:8000 0.000 0.010 0.000 200 200 2023 574 \"POST https://api.challenge.example.co:443/auth/auth0-webhooks/post-login HTTP/1.1\" \"axios/1.6.5\" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492*** 148:targetgroup/k8s-prodback-backenda-47ba3e0f35/9ec763ecd48352da \"Root=1-6925a2e2-6e4507dd3a23de072c2f6ae9\" \"api.challenge.example.co\" \"arn:aws:acm:us-east-1:492***148:certificate/beeb3714-511e-414b-b1f3-5440746bb5ea\" 12 2025-11-25T12:36:50.837000Z \"forward\" \"-\" \"-\" \"10.0.46.229:8000\" \"200\" \"-\" \"-\" TID_5ed365cd6c6f57409a2566d1dcaf049c \"-\" \"-\" \"-\""}
--- VMLOGS DEBUG ---
...
And in the VictoriaLogs:
Everything is here.
What we have left:
- form fields
- add
gocron
Starting a loop with gocron
Add a launch every minute with gocron.
Now the entire main.go looks like this:
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
sqsClient := sqs.NewFromConfig(cfg)
// create S3 client using the shared AWS config
s3Client := s3.NewFromConfig(cfg)
// read queue name from environment
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// scheduler
s := gocron.NewScheduler(time.UTC)
// job: check SQS every minute
s.Every(1).Minute().Do(func() {
fmt.Println("CHECKING SQS...")
// 1. get message
bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
//bucket, key, _, _, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
if err != nil {
if err.Error() == "no messages" {
fmt.Println("NO MESSAGES")
return
}
fmt.Println("ERROR receiving message:", err)
return
}
fmt.Println("BUCKET:", bucket)
fmt.Println("KEY:", key)
// 2. get S3 object
s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
if err != nil {
fmt.Println("S3 error:", err)
return
}
defer s3Obj.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
// 3. gzip reader
gzReader, err := collector.GzipReader(s3Obj.Body)
if err != nil {
fmt.Println("gzip error:", err)
return
}
defer gzReader.Close()
// "http://localhost:9428/insert/jsonline"
vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
if vmLogsEp == "" {
log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
}
// VictoriaLogs endpoint
vmLogsURL := vmLogsEp +
"?_msg_field=message" +
"&_time_field=date" +
"&_time_format=unix_ms" +
"&_stream_fields=stream"
// 4. Pipeline: read lines → parse → send
for line := range collector.ScanLines(gzReader) {
rec, err := collector.ParseRawLine(line)
if err != nil {
continue
}
out := &collector.JSONLogRecord{
Timestamp: rec.Timestamp.UnixMilli(),
Stream: "alb",
Message: rec.Message,
}
if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
fmt.Println("send error:", err)
}
}
// 5. delete message from SQS
if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
fmt.Println("FAILED TO DELETE SQS MESSAGE:", err)
} else {
fmt.Println("SQS MESSAGE DELETED")
}
})
// start async
s.StartBlocking()
}
Adding fields to VictoriaLogs message
Documentation on AWS ALB fields – Access log entries.
What we can now add to the logs in VictoriaLogs is the client_ip with client:port, target_ip (Kubernetes Pod) with target:port, elb_status_code – ALB response code, and target_status_code.
Update the SimpleLog struct, add new fields:
type SimpleLog struct {
Timestamp time.Time
Message string
ClientIP string
TargetIP string
ELBStatus int
TargetStatus int
}
In the ParseRawLine(), add adding data to these fields using the array index from the fields string:
...
elbStatus, _ := strconv.Atoi(fields[8])
targetStatus, _ := strconv.Atoi(fields[9])
return &SimpleLog{
Timestamp: ts.UTC(),
Message: line,
ClientIP: fields[3],
TargetIP: fields[4],
ELBStatus: elbStatus,
TargetStatus: targetStatus,
}, nil
...
Add these fields to JSONLogRecord:
type JSONLogRecord struct {
Timestamp int64 `json:"date"`
Message string `json:"message"`
Stream string `json:"stream"`
ClientIP string `json:"client_ip"`
TargetIP string `json:"target_ip"`
ELBStatus int `json:"elb_status"`
TargetStatus int `json:"target_status"`
}
And add them to main.go:
...
out := &collector.JSONLogRecord{
Timestamp: rec.Timestamp.UnixMilli(),
Message: rec.Message,
Stream: "alb",
ClientIP: rec.ClientIP,
TargetIP: rec.TargetIP,
ELBStatus: rec.ELBStatus,
TargetStatus: rec.TargetStatus,
}
...
Now we have new fields in VictoriaLogs:
The ports in client_ip and target_ip are clearly redundant here, so we can remove them during parsing.
Add a function with the strings.SplitN() to our parser.go:
// cut port from "ip:port"
func stripPort(s string) string {
parts := strings.SplitN(s, ":", 2)
return parts[0]
}
And use it when filling in SimpleLog:
...
return &SimpleLog{
Timestamp: ts.UTC(),
Message: line,
ClientIP: stripPort(fields[3]),
TargetIP: stripPort(fields[4]),
ELBStatus: elbStatus,
TargetStatus: targetStatus,
}, nil
...
Now we have fields without unnecessary ports:
The final result
All code together.
I asked AI to write comments so that all functions would be explained in detail.
File main.go
package main
import (
"alb-logs-collector/collector"
"context"
"fmt"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/go-co-op/gocron"
)
func main() {
// create a base context for all AWS operations
// this context is passed into SQS/S3 functions
ctx := context.Background()
// load shared AWS config from ~/.aws/config and ~/.aws/credentials
// this provides region, credentials, retry settings, etc.
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
// used later for reading and deleting queue messages
sqsClient := sqs.NewFromConfig(cfg)
// create S3 client for reading S3 objects
s3Client := s3.NewFromConfig(cfg)
// read queue name from environment
// this avoids hardcoding queue names in code
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// create the scheduler that runs periodic jobs
// gocron automatically creates goroutines for scheduled tasks
s := gocron.NewScheduler(time.UTC)
// schedule: run the function every minute
s.Every(1).Minute().Do(func() {
fmt.Println("CHECKING SQS...")
// 1. read one message from SQS
// ReceiveFromSQS is implemented in collector/sqs.go
bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
if err != nil {
// "no messages" is not an error - just no new logs
if err.Error() == "no messages" {
fmt.Println("NO MESSAGES")
return
}
fmt.Println("ERROR receiving message:", err)
return
}
fmt.Println("BUCKET:", bucket)
fmt.Println("KEY:", key)
// 2. download the S3 object stream
// GetS3Object located in collector/s3.go
s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
if err != nil {
fmt.Println("S3 error:", err)
return
}
defer s3Obj.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
// 3. wrap the S3 stream into gzip reader
// GzipReader implemented in collector/gzip.go
gzReader, err := collector.GzipReader(s3Obj.Body)
if err != nil {
fmt.Println("gzip error:", err)
return
}
defer gzReader.Close()
// read VictoriaLogs endpoint
vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
if vmLogsEp == "" {
log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
}
// final URL with parameters for jsonline ingestion
vmLogsURL := vmLogsEp +
"?_msg_field=message" + // which JSON field contains the log message
"&_time_field=date" + // which JSON field contains timestamp
"&_time_format=unix_ms" + // tell VictoriaLogs that the timestamp is unix milliseconds
"&_stream_fields=stream" // which field defines the stream name
// 4. process the log file line-by-line
// ScanLines implemented in collector/scan.go
// It returns a channel of RAW log lines (already ungzipped)
for line := range collector.ScanLines(gzReader) {
// parse timestamp + extract fields
// ParseRawLine implemented in collector/parser.go
rec, err := collector.ParseRawLine(line)
if err != nil {
continue // skip invalid ALB lines
}
// prepare the JSON record for VictoriaLogs
// JSONLogRecord defined in collector/victoria.go
out := &collector.JSONLogRecord{
Timestamp: rec.Timestamp.UnixMilli(), // ALB timestamp in unix ms
Message: rec.Message, // full raw ALB log line
Stream: "alb", // log stream name
ClientIP: rec.ClientIP, // extracted from ALB log
TargetIP: rec.TargetIP, // extracted from ALB log
ELBStatus: rec.ELBStatus, // HTTP status from ALB
TargetStatus: rec.TargetStatus, // backend response status
}
// send it to VictoriaLogs
// SendToVictoria implemented in collector/victoria.go
if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
fmt.Println("send error:", err)
}
}
// 5. delete message from SQS after successful processing
// DeleteFromSQS implemented in collector/sqs.go
if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
fmt.Println("FAILED TO DELETE SQS MESSAGE:", err)
} else {
fmt.Println("SQS MESSAGE DELETED")
}
})
// start the scheduler and block main goroutine forever
s.StartBlocking()
}
File sqs.go
package collector
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
// S3Event describes the JSON format sent by S3 to SQS.
// It matches the structure of the S3 event notification:
// "Records" → "s3" → "bucket.name" and "object.key".
// This struct is used in ReceiveFromSQS() to extract bucket/key.
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"` // S3 bucketName
} `json:"bucket"`
Object struct {
Key string `json:"key"` // S3 object key (log filename)
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// ReceiveFromSQS reads a single SQS message.
// It returns:
// - bucket: the S3 bucket from event
// - key: the object key (filename in S3)
// - receiptHandle: required later to delete the message
// - queueURL: actual AWS queue URL (needed for deletion)
// - err: error, or "no messages" if queue is empty
//
// This function is called from main.go inside the scheduler loop.
// After processing the file from S3, the caller must call DeleteFromSQS().
func ReceiveFromSQS(ctx context.Context, client *sqs.Client, queueName string) (bucket, key, receiptHandle, queueURL string, err error) {
// resolve real SQS queue URL from queue name
// SQS APIs always operate on queueURL, not the name
getURL, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
return "", "", "", "", err
}
queueURL = *getURL.QueueUrl
// receive exactly one message
// WaitTimeSeconds enables long polling for up to 10s
resp, err := client. ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 1,
WaitTimeSeconds: 10,
})
if err != nil {
return "", "", "", "", err
}
// queue empty → no new logs
if len(resp.Messages) == 0 {
return "", "", "", "", fmt.Errorf("no messages")
}
// take the first (and only) message
msg := resp.Messages[0]
// ReceiptHandle is mandatory for deletion
receiptHandle = *msg.ReceiptHandle
// raw JSON body received from S3 → SQS notification
raw := aws.ToString(msg.Body)
fmt.Println("SQS RAW:", raw)
// filter out AWS TestEvent generated when enabling notifications
// Test events do NOT contain real logs and must be ignored
if strings.Contains(raw, `"Event":"s3:TestEvent"`) {
fmt.Println("Skipping AWS S3 test event")
return "", "", "", queueURL, fmt.Errorf("test event skipped")
}
// unmarshal JSON into S3Event struct
var event S3Event
if err := json.Unmarshal([]byte(raw), &event); err != nil {
return "", "", "", "", err
}
// extract bucket and key from the event
bucket = event.Records[0].S3.Bucket.Name
key = event.Records[0].S3.Object.Key
return bucket, key, receiptHandle, queueURL, nil
}
// DeleteFromSQS permanently removes the processed message.
// Must be called only after successful S3 → gzip → parsing → VictoriaLogs ingestion.
// If not deleted, SQS will retry delivery (visibility timeout expires).
// Implemented in collector/sqs.go, called from main.go.
func DeleteFromSQS(ctx context.Context, client *sqs.Client, queueURL, receiptHandle string) error {
_, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(receiptHandle),
})
return err
}
File s3.go
package collector
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// GetS3Object retrieves an S3 object and returns a streaming reader.
//
// This function does NOT download the file to disk.
// It returns a network stream (`GetObjectOutput.Body`), which allows the caller
// to read the object lazily, byte-by-byte, directly from S3.
//
// In our pipeline, the call chain looks like:
//
// main.go
// → ReceiveFromSQS() to get bucket/key (collector/sqs.go)
// → GetS3Object() to open S3 object stream (this file)
// → GzipReader() to decompress gzip data (collector/gzip.go)
// → ScanLines() to iterate log lines (collector/scan.go)
//
// Notes:
// - The returned object must be closed by the caller: `defer obj.Body.Close()`
// - S3 GetObject supports range requests, but here we read the whole file.
// - The body is read sequentially; this is efficient for log ingestion patterns.
func GetS3Object(ctx context.Context, client *s3.Client, bucket, key string) (*s3.GetObjectOutput, error) {
return client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket), // name of S3 bucket with ALB logs
Key: aws.String(key), // key (path/filename) of the gzip log file
})
}
File gzip.go
package collector
import (
"compress/gzip"
"io"
)
// GzipReader wraps an existing io.Reader with a gzip decompressor.
//
// This function takes the S3 object body (returned by GetS3Object in
// collector/s3.go) and turns it into a gzip.Reader capable of producing
// the decompressed content of the ALB log file.
//
// Typical pipeline:
//
// S3Object.Body (io.ReadCloser)
// ↓ passed into
// GzipReader() → *gzip.Reader (still a stream)
// ↓ passed into
// ScanLines() → yields plain-text log lines
//
// Notes:
// - gzip.NewReader expects the input stream to be in .gz format.
// - Caller must close the returned reader: `defer gz.Close()`.
// - No buffering or scanning is done here; this function only wraps the stream.
func GzipReader(r io.Reader) (*gzip.Reader, error) {
return gzip.NewReader(r)
}
File scan.go
package collector
import (
"bufio"
"io"
)
// ScanLines reads an arbitrary io.Reader line-by-line and returns a channel
// that produces each extracted line as a string.
//
// This function is used in main.go to iterate through the contents of an S3
// Gzip file. The call chain is:
//
// main.go → GetS3Object() (collector/s3.go)
// → GzipReader() (collector/gzip.go)
// → ScanLines() (this file)
//
// Because ScanLines() uses a goroutine, the caller can process lines
// asynchronously using: `for line := range ScanLines(r) { ... }`.
//
// Notes:
// - The returned channel is *unbuffered*, so each send blocks until the caller
// receives the value. This naturally rate-controls the goroutine.
// - bufio.Scanner splits input by '\n', automatically handling different line
// lengths (up to scanner’s max buffer).
// - When the reader is fully consumed, the goroutine closes the channel.
func ScanLines(r io.Reader) <-chan string {
ch := make(chan string)
// launch a goroutine that streams lines out of the reader
go func() {
// ensure channel is closed when scanning finishes
defer close(ch)
// bufio.Scanner provides efficient, line-oriented reading of text streams
scanner := bufio.NewScanner(r)
// loop until EOF or error. Each iteration reads the next line
for scanner.Scan() {
// push extracted line into the channel
// (blocks until caller receives it)
ch <- scanner.Text()
}
// scanner.Err() is intentionally ignored here, because error handling
// is performed by the caller when needed, and the channel-based design
// treats EOF as natural termination.
}()
// return read-only channel of strings
return ch
}
File parser.go
package collector
import (
"fmt"
"strconv"
"strings"
"time"
)
// SimpleLog represents a minimal, lightweight structure containing:
//
// - Timestamp: parsed ALB timestamp in UTC
// - Message: the full raw log line (used as message in VictoriaLogs)
// - ClientIP: extracted client IP (port removed)
// - TargetIP: extracted backend target IP (port removed)
// - ELBStatus: HTTP status returned by ALB to the client
// - TargetStatus: HTTP status returned by the backend to ALB
//
// This struct is designed for the simplified ingestion phase,
// before implementing full ALB field parsing.
type SimpleLog struct {
Timestamp time.Time
Message string
ClientIP string
TargetIP string
ELBStatus int
TargetStatus int
}
// stripPort removes the ":port" suffix from IP strings like "1.2.3.4:5678".
// This keeps VictoriaLogs cardinality low, avoiding creation of thousands
// of separate series due to ephemeral client ports.
func stripPort(s string) string {
parts := strings.SplitN(s, ":", 2)
return parts[0]
}
// ParseRawLine parses the essential ALB log fields:
//
// - Timestamp from the 2nd field
// - Client IP (without port)
// - Target IP (without port)
// - ALB HTTP status
// - Target HTTP status
//
// Everything else is kept untouched in the Message field.
//
// ALB logs are space-delimited, except for quoted sections
// (like the request line and user agent). At this simplified stage,
// we do *not* parse quoted fields - we only extract the mandatory parts.
func ParseRawLine(line string) (*SimpleLog, error) {
fields := strings.Fields(line)
// we expect at least: protocol, timestamp, elb, client, target, ... status codes
// ALB format is consistent - <10 fields means corrupted input
if len(fields) < 10 {
return nil, fmt.Errorf("invalid ALB log line")
}
fmt.Println("--- PARSER SEND ---")
fmt.Println(line)
fmt.Println("--- PARSER DEBUG ---")
// parse timestamp in RFC3339Nano format
// ALB always emits timestamps in UTC
ts, err := time.Parse(time.RFC3339Nano, fields[1])
if err != nil {
return nil, fmt.Errorf("timestamp parse error: %w", err)
}
// convert status codes from string → int
elbStatus, _ := strconv.Atoi(fields[8])
targetStatus, _ := strconv.Atoi(fields[9])
return &SimpleLog{
Timestamp: ts.UTC(), // ensure strict UTC normalization
Message: line, // pass full raw line to VictoriaLogs
ClientIP: stripPort(fields[3]),
TargetIP: stripPort(fields[4]),
ELBStatus: elbStatus,
TargetStatus: targetStatus,
}, nil
}
File victoria.go
package collector
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
// JSONLogRecord represents a single log entry formatted for
// VictoriaLogs JSONLine ingestion API.
//
// Field mapping:
//
// - Timestamp → "date"
// UNIX milliseconds timestamp of the log event.
// This must match ?_time_field=date&_time_format=unix_ms in ingestion URL.
//
// - Message → "message"
// The complete raw ALB log line. Used as the primary log message.
//
// - Stream → "stream"
// Logical log stream identifier. Used in ingestion via ?_stream_fields=stream.
//
// - ClientIP / TargetIP / ELBStatus / TargetStatus
// Additional parsed metadata fields. These become searchable log fields.
//
// The structure intentionally avoids nested JSON - VictoriaLogs processes
// flat objects more efficiently and without ambiguity in field extraction.
type JSONLogRecord struct {
Timestamp int64 `json:"date"`
Message string `json:"message"`
Stream string `json:"stream"`
ClientIP string `json:"client_ip"`
TargetIP string `json:"target_ip"`
ELBStatus int `json:"elb_status"`
TargetStatus int `json:"target_status"`
}
// SendToVictoria sends exactly ONE JSON record to the VictoriaLogs JSONLine API.
//
// This function is called from main.go inside the ingestion loop.
// It performs the following steps:
//
// 1. Marshal the JSONLogRecord into a compact JSON object
// 2. Build an HTTP POST request with Content-Type: application/stream+json
// 3. Send the request to VictoriaLogs
// 4. Check for non-2xx HTTP status codes
//
// Notes:
// - VictoriaLogs expects a "streaming JSON" format, where each POST body
// contains a single JSON object (or multiple lines if needed).
// - We send logs one-by-one for simplicity, but batching can be added later.
// - The caller controls the ingestion endpoint URL, including query params:
// ?_msg_field=message&_time_field=date&_time_format=unix_ms&_stream_fields=stream
//
// Errors returned from this function are logged in main.go, and do not
// interrupt the ingestion pipeline.
func SendToVictoria(url string, rec *JSONLogRecord) error {
// serialize record into JSON line
body, err := json.Marshal(rec)
if err != nil {
return err
}
// debug output for transparency
fmt.Println("--- VMLOGS SEND ---")
fmt.Println(string(body))
fmt.Println("--- VMLOGS DEBUG ---")
// create POST request with streaming JSON payload
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
return err
}
// JSONLine ingestion requires this content type
req.Header.Set("Content-Type", "application/stream+json")
// send HTTP request using default HTTP client
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// check server-side errors
if resp.StatusCode >= 300 {
return fmt.Errorf("victoria response status: %s", resp.Status)
}
return nil
}
Already launched it in our Kubernetes, everything works.
The only thing is that SQS and S3 operations do not fail when access is denied, so need to add more error checks.









