Golang: writing an AWS Load Balancer logs collector for VictoriaLogs

By | 12/06/2025
 

The next task I want to do with Golang is building a custom log collector that retrieves AWS Application Load Balancer logs from S3 and sends them to VictoriaLogs.

Sure, we could simply use Vector.dev, like what I did for AWS VPC Flow Logs (see: Vector.dev introduction, logs from AWS S3, and VictoriaLogs integration). But now there is a good opportunity to get more hands-on experience with Go, so I’ll create our own collector instead.

So, the main idea now is this:

  • Load Balancer writes logs to S3
  • In S3, we create a notification to AWS SQS.
  • Our collector polls SQS and receives information about new objects in S3.
  • makes a request to S3, receives a gz archive
  • unpacks, parses data, and sends it to VictoriaLogs

Let’s go.

Configuring S3 and SQS notifications

When designing the solution, the main question was, how do we keep track of which S3 objects have already been processed and which ones are new?

One option would be to use a database to store metadata about processed files. But this would eventually accumulate a large amount of state, and I don’t really want to introduce a stateful service into our Kubernetes environment just for this.

Instead, we will keep it simple and reliable, following the same approach we used with Vector.dev and AWS VPC Flow Logs by using an SQS queue to receive notifications about new objects in S3.

Our collector will poll messages from SQS, extract information about new log files, process them, and then delete the messages from the queue.

See documentation on AWS – Walkthrough: Configuring a bucket for notifications (SNS topic or SQS queue).

Creating an SQS queue

For now, we will create an SQS queue manually and later manage it properly using Terraform.

Keep the queue type set to Standard because:

Amazon Simple Queue Service FIFO (First-In-First-Out) queues aren’t supported as an Amazon S3 event notification destination

See Amazon S3 Event Notifications.

Also, message ordering is not important in our case, since each log entry contains its own timestamp that we will parse anyway.

So let’s go to SQS and create a new queue:

Next, we need to describe a policy in the Access policy.

S3 will send messages to the queue, while our Kubernetes Pod (using a dedicated ServiceAccount) will read those messages. So the Policy must allow these actions.

For Kubernetes access, we’ll create the ServiceAccount and IAM Role later, so for now, let’s simply grant SQS permissions to everyone in our AWS Account.

And for the S3, add permission on the SQS:SendMessage API call:

{
    "Version": "2012-10-17",
    "Id": "loggerID",
    "Statement": [
        {
            "Sid": "sendFromS3LogsAllow",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": [
                "SQS:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:492***148:testing-alb-logs-s3-notifier",
            "Condition": {
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:s3:*:*:ops-1-33-devops-ingress-ops-alb-loki-logs"
                }
            }
        },
        {
            "Sid": "receiveFromQueueAllowSameAccount",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::492***148:root"
            },
            "Action": [
                "SQS:ReceiveMessage",
                "SQS:DeleteMessage",
                "SQS:GetQueueAttributes",
                "SQS:GetQueueUrl"
            ],
            "Resource": "arn:aws:sqs:us-east-1:492***148:testing-alb-logs-s3-notifier"
        }
    ]
}

Leave the rest as default.

Configuring S3 Event Notifications

Go to an S3 bucket with logs > Properties > Event notifications:

Create a new Event notification to send notifications about all s3:ObjectCreated operations:

In the Destination, select SQS and our queue:

Wait a few minutes, and check the Monitoring tab in the SQS queue:

Okay, messages are here. Now we need to collect and read them.

Let’s move on to Go.

AWS SDK for Go

To interact with AWS, we will use the AWS SDK for Go, which provides all the required APIs.

For authentication, we import aws-sdk-go-v2/config and call the LoadDefaultConfig() function. It automatically looks for credentials in environment variables, ~/.aws/credentials and ~/.aws/config files, or uses EC2 IAM Roles.

The first argument to LoadDefaultConfig() must be a context. I covered Go contexts earlier in my previous post Golang: Creating an OpenAI Exporter for VictoriaMetrics.

For now, we will simply use context.Background(). Later, we will add proper handling and graceful shutdown logic:

package main

import (
  "context"
  "log"

  "github.com/aws/aws-sdk-go-v2/config"
)

func main() {
  // TODO: add exit handler
  ctx := context.Background()
  // Load the Shared AWS Configuration (~/.aws/config)
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }
}

Go and AWS SDK SQS

To work with Amazon SQS, the AWS SDK provides a separate module named sqs.

SQS client

Add its import and create the SQS client using NewFromConfig(), passing in the AWS config object we initialized earlier.

To interact with a queue, we need its URL, which we can obtain by calling GetQueueUrl().

We also load the QueueName from an environment variable, since later in Kubernetes it will be passed through Helm chart values:

package main

import (
  "context"
  "fmt"
  "log"
  "os"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/config"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
)

func main() {
  // TODO: add exit handler
  ctx := context.Background()

  // Load the Shared AWS Configuration (~/.aws/config)
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // create SQS client using the shared AWS config
  sqsClient := sqs.NewFromConfig(cfg)

  // read queue name from environment
  queueName := os.Getenv("ALB_LOGS_QUEUE")
  if queueName == "" {
    log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
  }

  // request queue URL by name
  getURLResp, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
    QueueName: aws.String(queueName),
  })
  if err != nil {
    log.Fatal(err)
  }

  queueURL := getURLResp.QueueUrl
  fmt.Println("Queue URL:", *queueURL)
}

Run go mod init:

$ go mod init alb-logs-collector-poc
$ go mod tidy

Set a variable with the name of the queue:

$ export ALB_LOGS_QUEUE="testing-alb-logs-s3-notifier"

Let’s run our code:

$ go run main.go 
Queue URL: https://sqs.us-east-1.amazonaws.com/492***148/testing-alb-logs-s3-notifier

OK, now we can receive messages.

SQS ReceiveMessage()

We read messages in the queue using ReceiveMessage(), passing a ReceiveMessageInput configuration that defines how many messages to fetch, how long to wait, and other parameters:

...
  // receive a single message from SQS
  msgResp, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
    QueueUrl:            queueURL, // URL returned by the GetQueueUrl()
    MaxNumberOfMessages: 1,        // receive only one message
    WaitTimeSeconds:     10,       // enable long polling
  })
  if err != nil {
    log.Fatal(err)
  }

  if len(msgResp.Messages) == 0 {
    fmt.Println("no messages received")
    return
  }
  // print the received message body
  fmt.Println("Received message:", aws.ToString(msgResp.Messages[0].Body))
...

Let’s check:

$ go run main.go | jq
{
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      ...
      "eventName": "ObjectCreated:Put",
      ...
      "s3": {
        ...
        "bucket": {
          "name": "ops-1-33-devops-ingress-ops-alb-loki-logs",
        ...
        "object": {
          "key": "AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T0935Z_34.***.***. 15_25hsfvwt.log.gz",
          ...
        }
      }
    }
  ]
}

We are interested in two fields in the SQS message payload: bucket.name and object.key.

Create a struct for this data and use json.Unmarshal() to decode the JSON into it. I covered JSON parsing earlier in the same post about the OpenAI Exporter, in the section “Creating a Go struct for JSON Unmarshall:

...
  // define a struct to unmarshal S3 event message
  type S3Event struct {
    Records []struct {
      S3 struct {
        Bucket struct {
          Name string `json:"name"`
        } `json:"bucket"`
        Object struct {
          Key string `json:"key"`
        } `json:"object"`
      } `json:"s3"`
    } `json:"Records"`
  }

  // take SQS message body
  msgBody := aws.ToString(msgResp.Messages[0].Body)

  // decode JSON into the struct
  var event S3Event
  if err := json.Unmarshal([]byte(msgBody), &event); err != nil {
    log.Fatal("failed to parse S3 event:", err)
  }

  // extract bucket and key
  // we always have 1 message, so use [0]
  bucket := event.Records[0].S3.Bucket.Name
  key := event.Records[0].S3.Object.Key

  fmt.Println("bucket:", bucket)
  fmt.Println("key:", key)
...

Let’s check again:

$ go run main.go 
bucket: ops-1-33-devops-ingress-ops-alb-loki-logs
key: AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T0935Z_52.***.***.213_60mjhvf6.log.gz

Now, we can move on to S3.

Go and AWS SDK S3

Similar to SQS, we use a dedicated package s3 package.

S3 client

Create a client with NewFromConfig():

...
  // create S3 client using the shared AWS config
  s3Client := s3.NewFromConfig(cfg)
...

S3 GetObject()

Add reading the file with GetObject().

Similar to SQS, we provide a GetObjectInput configuration when calling it:

...
  // fetch the S3 object and return its streaming body
  objResp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
    Bucket: aws.String(bucket),
    Key:    aws.String(key),
  })
  if err != nil {
    log.Fatal("failed to download object:", err)
  }
  defer objResp.Body.Close()

  fmt.Println("S3 object stream opened:", bucket, key)
...

GetObject() returns *GetObjectOutput, which has a field Body of type io.ReadCloser, and io.ReadCloser is an interface that defines two methods: Reader and Closer.

Reading a file from gzip

Since the log files in S3 are gzip-compressed, we import the gzip package and use NewReader() to decode the data before processing it:

...
  // create gzip reader from S3 object stream
  gzReader, err := gzip.NewReader(objResp.Body)
  if err != nil {
    log.Fatal("failed to create gzip reader:", err)
  }
  defer gzReader.Close()

  fmt.Println("gzip stream opened")
...

NewReader() expects an io.Reader, so we can simply pass the objResp.Body to it.

However, gzip.NewReader() does not return the actual data directly – it only creates a reader that will produce decompressed content on demand.

Therefore, we wrap it in bufio to read the decompressed log lines efficiently.

Reading gzip output from bufio

Add bufio import and use NewScanner() to read decompressed data from gzip.NewReader ().

Then, using Scan() and Text(), we convert each scanned line into a string for further processing:

...
  // create scanner to read decompressed log lines
  scanner := bufio.NewScanner(gzReader)
  
  // increase buffer size if ALB logs have long lines
  // default scanner buffer = 64 KB
  buf := make([]byte, 0, 1024*1024) // 1 MB
  scanner.Buffer(buf, 1024*1024)
  
  // iterate over every line in the decompressed file
  for scanner.Scan() {
    line := scanner.Text()
    fmt.Println("log line:", line)
  }
  if err := scanner.Err(); err != nil {
    log.Fatal("scanner error:", err)
  }
...

Let’s check:

$ go run main.go  | head
S3 object stream opened: ops-1-33-devops-ingress-ops-alb-loki-logs AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1005Z_52.***.***.213_2nbanv4o.log.gz

log line: h2 2025-11-25T10:00:02.271561Z app/k8s-ops133externalalb-***/336cddd33c043f33 52.***.***. 183:60978 10.0.47.34:8080 0.001 0.005 0.001 200 200 38 5492 "GET https://lightdash.example.co:443/ HTTP/2.0" "Blackbox Exporter/0.27.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492*** 148:targetgroup/k8s-proddata-lightdas-fa7b1ce474/c400fe91849c401a "Root=1-69257e22-4b16fe8d62d2f8955edc6da8" "lightdash.example.co" "arn:aws:acm:us-east-1:492***148:certificate/77230c5f-d0c2-4e58-b579-8b8422686986" 15 2025-11-25T10:00:02. 264000Z "forward" "-" "-" "10.0.47.34:8080" "200" "-" "-" TID_48df8c3791b69144b4ae0f6084e015d6 "-" "-" "-"
...

Breaking main() into functions

Our main() has grown quite a bit, and the core functionality is already working.

Now let’s make the code cleaner and more readable.

Here is the full code now:

package main

import (
  "bufio"
  "compress/gzip"
  "context"
  "encoding/json"
  "fmt"
  "log"
  "os"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/config"
  "github.com/aws/aws-sdk-go-v2/service/s3"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
)

func main() {
  // TODO: add exit handler
  ctx := context.Background()

  // Load the Shared AWS Configuration (~/.aws/config)
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // create SQS client using the shared AWS config
  sqsClient := sqs.NewFromConfig(cfg)

  // read queue name from environment
  queueName := os.Getenv("ALB_LOGS_QUEUE")
  if queueName == "" {
    log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
  }

  // request queue URL by name
  getURLResp, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
    QueueName: aws.String(queueName),
  })
  if err != nil {
    log.Fatal(err)
  }

  queueURL := getURLResp.QueueUrl
  //fmt.Println("Queue URL:", *queueURL)

  // receive a single message from SQS
  msgResp, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
    QueueUrl:            queueURL, // URL який ми отримали раніше
    MaxNumberOfMessages: 1,        // receive only one message
    WaitTimeSeconds:     10,       // enable long polling (recommended)
  })
  if err != nil {
    log.Fatal(err)
  }

  if len(msgResp.Messages) == 0 {
    fmt.Println("no messages received")
    return
  }
  // print the received message body
  //fmt.Println(aws.ToString(msgResp.Messages[0].Body))

  // define a struct to unmarshal S3 event message
  type S3Event struct {
    Records []struct {
      S3 struct {
        Bucket struct {
          Name string `json:"name"`
        } `json:"bucket"`
        Object struct {
          Key string `json:"key"`
        } `json:"object"`
      } `json:"s3"`
    } `json:"Records"`
  }

  // take SQS message body
  msgBody := aws.ToString(msgResp.Messages[0].Body)

  // decode JSON into the struct
  var event S3Event
  if err := json.Unmarshal([]byte(msgBody), &event); err != nil {
    log.Fatal("failed to parse S3 event:", err)
  }

  // extract bucket and key
  // we always have 1 message, so use [0]
  bucket := event.Records[0].S3.Bucket.Name
  key := event.Records[0].S3.Object.Key

  //fmt.Println("bucket:", bucket)
  //fmt.Println("key:", key)

  // create S3 client using the shared AWS config
  s3Client := s3.NewFromConfig(cfg)

  // fetch the S3 object and return its streaming body
  objResp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
    Bucket: aws.String(bucket),
    Key:    aws.String(key),
  })
  if err != nil {
    log.Fatal("failed to download object:", err)
  }
  defer objResp.Body.Close()

  fmt.Println("S3 object stream opened:", bucket, key)

  // create gzip reader from S3 object stream
  gzReader, err := gzip.NewReader(objResp.Body)
  if err != nil {
    log.Fatal("failed to create gzip reader:", err)
  }
  defer gzReader.Close()
  //fmt.Println("gzip stream opened")

  // create scanner to read decompressed log lines
  scanner := bufio.NewScanner (gzReader)

  // increase buffer size if ALB logs have long lines
  // default scanner buffer = 64 KB
  buf := make([]byte, 0, 1024*1024) // 1 MB
  scanner.Buffer(buf, 1024*1024)

  // iterate over every line in the decompressed file
  for scanner.Scan() {
    line := scanner.Text()
    fmt.Println("log line:", line)
  }
  if err := scanner.Err(); err != nil {
    log.Fatal("scanner error:", err)
  }
}

How can we organize the process?

In the main(), we perform all initializations, and then will poll SQS in a loop:

  • create context
  • creating AWS config
  • create sqsClient and s3Client clients
  • read environment variables (ALB_LOGS_QUEUE, then add more)
  • and then in a loop:
    • call the receiveFromSQS() function – check if there are any new messages
    • call the getS3Object() function – if there are messages, we go to S3 and read a new archive from there
    • call the processLogFile() function – read lines from each received log

What functions will be needed for this?

  • function ReceiveFromSQS():
    • here we will pass the context, SQS client, SQS queue URL, and write the bucket name and key (file name) to the S3Event structure
    • for further deletion of messages after successful processing, we will need to return receiptHandle
  • function GetS3Object():
    • function receives context, AWS Config, bucket, key
    • executes GetObject() and returns GetObjectOutput
  • function GzipReader():
    • reads data from GetS3Object(), unpacks it, and returns strings
  • function ScanLines():
    • receives data from GzipReader(), reads from Text(), and for now simply outputs to the console

Package collector

For convenience and to keep everything organized, let’s divide everything into separate files:

collector/
    sqs.go
    s3.go
    gzip.go
    scan.go
main.go

File collector/sqs.go

When testing, I faced with the following error:

$ go run main.go | head 
panic: runtime error: index out of range [0] with length 0

goroutine 1 [running]:
alb-logs-collector/collector.ReceiveFromSQS({0xa83530, 0xdda060}, 0xc00021d888, {0xc000160050, 0x4d})
        /home/setevoy/Projects/Go/alb-logs-collector/collector/sqs.go:52 +0x25d
main.main()
        /home/setevoy/Projects/Go/alb-logs-collector/main.go:46 +0x305
exit status 2

It happens because AWS adds test messages to the queue:

{"Service":"Amazon S3","Event":"s3:TestEvent","Time":"2025-11-24T11:10:31.573Z","Bucket":"ops-1-33-devops-ingress-ops-alb-loki-logs", ...}

Therefore, we will add a check to ReceiveFromSQS().

ReceiveFromSQS() function

Describe receiving messages from SQS:

package collector

import (
  "context"
  "encoding/json"
  "fmt"
  "strings"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
)

// S3Event for SQS message
type S3Event struct {
  Records []struct {
    S3 struct {
      Bucket struct {
        Name string `json:"name"`
      } `json:"bucket"`
      Object struct {
        Key string `json:"key"`
      } `json:"object"`
    } `json:"s3"`
  } `json:"Records"`
}

// ReceiveFromSQS reads one message and returns bucket, key, receiptHandle, queueURL
func ReceiveFromSQS(ctx context.Context, client *sqs.Client, queueName string) (bucket, key, receiptHandle, queueURL string, err error) {

  // get real queue URL
  getURL, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
    QueueName: aws.String(queueName),
  })
  if err != nil {
    return "", "", "", "", err
  }

  queueURL = *getURL.QueueUrl

  // receive one message
  resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
    QueueUrl:            aws.String(queueURL),
    MaxNumberOfMessages: 1,
    WaitTimeSeconds:     10,
  })
  if err != nil {
    return "", "", "", "", err
  }

  if len(resp.Messages) == 0 {
    return "", "", "", "", fmt.Errorf("no messages")
  }

  // receive one message
  msg := resp.Messages[0]
  // create ReceiptHandle to return to the delete function
  receiptHandle = *msg.ReceiptHandle

  raw := aws.ToString(msg.Body)
  fmt.Println("SQS RAW:", raw)

  // skip AWS test event
  if strings.Contains(raw, `"Event":"s3:TestEvent"`) {
    fmt.Println("Skipping AWS S3 test event")
    return "", "", "", queueURL, fmt.Errorf("test event skipped")
  }

  // parse S3 event message
  var event S3Event
  if err := json.Unmarshal([]byte(raw), &event); err != nil {
    return "", "", "", "", err
  }

  bucket = event.Records[0].S3.Bucket.Name
  key = event.Records[0].S3.Object.Key

  return bucket, key, receiptHandle, queueURL, nil
}

DeleteFromSQS() function

Now, add a deletion function, which we will call after successful log processing:

// DeleteFromSQS deletes message after successful processing
func DeleteFromSQS(ctx context.Context, client *sqs.Client, queueURL, receiptHandle string) error {
  _, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
    QueueUrl:      aws.String(queueURL),
    ReceiptHandle: aws.String(receiptHandle),
  })
  return err
}

collector/s3.go file and GetS3Object() function

package collector

import (
  "context"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/service/s3"
)

// GetS3Object returns stream reader of S3 file
func GetS3Object(ctx context.Context, client *s3.Client, bucket, key string) (*s3.GetObjectOutput, error) {
  return client.GetObject(ctx, &s3.GetObjectInput{
    Bucket: aws.String(bucket),
    Key:    aws.String(key),
  })
}

collector/gzip.go file and GzipReader() function

We also move it to a dedicated file to make the entire code more logical.

And later, we may need to add some checks here:

package collector

import (
  "compress/gzip"
  "io"
)

// GzipReader wraps S3 body
func GzipReader(r io.Reader) (*gzip.Reader, error) {
  return gzip.NewReader(r)
}

collector/scan.go file and ScanLines() function

package collector

import (
  "bufio"
  "fmt"
  "io"
)

// ScanLines reads log lines from reader
func ScanLines(r io.Reader) error {
  scanner := bufio.NewScanner(r)

  // increase max line size
  buf := make([]byte, 0, 1024*1024)
  scanner.Buffer(buf, 1024*1024)

  // iterate over every line in the decompressed file
  for scanner.Scan() {
    line := scanner.Text()
    fmt.Println("log line:", line)
  }

  return scanner.Err()
}

Changes in the main.go

Add the import of our collector package to the main.go and call the new functions.

Now the entire main.go file looks like this:

package main

import (
  "alb-logs-collector-poc/collector"
  "context"
  "fmt"
  "log"
  "os"

  "github.com/aws/aws-sdk-go-v2/config"
  "github.com/aws/aws-sdk-go-v2/service/s3"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
)

func main() {
  // TODO: add exit handler
  ctx := context.Background()

  // Load the Shared AWS Configuration (~/.aws/config)
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // create SQS client using the shared AWS config
  sqsClient := sqs.NewFromConfig(cfg)
  // create S3 client using the shared AWS config
  s3Client := s3.NewFromConfig(cfg)

  // read queue name from environment
  queueName := os.Getenv("ALB_LOGS_QUEUE")
  if queueName == "" {
    log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
  }

  // 1. get message
  //bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queue)
  bucket, key, _, _, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
  if err != nil {
    if err.Error() == "no messages" {
      fmt.Println("NO MESSAGES")
      return
    }
    fmt.Println("ERROR receiving message:", err)
    return
  }

  fmt.Println("BUCKET:", bucket)
  fmt.Println("KEY:", key)

  // 2. get S3 object
  s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
  if err != nil {
    fmt.Println("S3 error:", err)
    return
  }
  defer s3Obj.Body.Close()

  fmt.Println("S3 object stream opened:", bucket, key)

  // 3. gzip reader
  gzReader, err := collector.GzipReader(s3Obj.Body)
  if err != nil {
    fmt.Println("gzip error:", err)
    return
  }
  defer gzReader.Close()

  // 4. scan all lines in log file
  err = collector.ScanLines(gzReader)
  if err != nil {
    log.Fatal("scanner error:", err)
  }
}

Let’s check:

$ go run main.go
SQS RAW: {"Records":[{"eventVersion":"2.1"," eventSource":"aws:s3","awsRegion":"us-east-1","eventTime":"2025-11-25T11:15:01.559Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:***.***: elblogdelivery-session"},"requestParameters":{"sourceIPAddress":"2600:***.***:c0ef"},"responseElements":{"x-amz-request-id":"Y6523DEXMF2DS6FG"," x-amz-id-2":"tWgdxqzRbKrjUGDMLLkrOtKQW6S6aWT31VHomgaNm0UAIlzeshheXEGgZN3yRH4pMlWdzfBLqBlZuh3BO0QghDkTVU2WllFDKmh22/B1Cqk="}," s3":{"s3SchemaVersion":"1.0","configurationId":"objectCreatedEventSqs","bucket":{"name":"ops-1-33-devops-ingress-ops-alb-loki-logs","ownerIdentity":{"principalId":"***.***"}," arn":"arn:aws:s3:::ops-1-33-devops-ingress-ops-alb-loki-logs"},"object":{"key":"AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34***.***. 15_44qtqgut.log.gz","size":17823,"eTag":"36d1243edd2a7a98546e7af645c36068","sequencer":"0069258FB5806A2506"}}}]}
BUCKET: ops-1-33-devops-ingress-ops-alb-loki-logs
KEY: AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34.225.155.15_44qtqgut.log.gz
S3 object stream opened: ops-1-33-devops-ingress-ops-alb-loki-logs AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492*** 148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T1115Z_34.***.***. 15_44qtqgut.log.gz
log line: h2 2025-11-25T11:10:06.230595Z app/k8s-ops133externalalb-***/336cddd33c043f33 62.***.***. 83:32026 10.0.44.225:8000 0.000 0.034 0.000 200 200 887 1165 "GET https://staging.api.challenge.example.co:443/admin/users/list?limit=1&user=test_thread_1_ci_ios_ui_participant2%40test.example.co HTTP/2.0" "Challenge App UI Tests-Runner/1.0 (com.challengeapp.uitests. Challenge-App-UI-Tests.xctrunner; build:1; iOS 18.5.0) Alamofire/5.9.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492*** 148:targetgroup/k8s-stagingb-backenda-410aa6288b/0d639f3b859bc8fb "Root=1-69258e8e-04c4384c05934a37738043b0" "staging.api.challenge.example.co" "arn:aws:acm:us-east-1:492***148:certificate/c3b6ec41-50a0-488e-93bb-b03967405f8c" 24 2025-11-25T11:10:06. 195000Z "forward" "-" "-" "10.0.44.225:8000" "200" "-" "-" TID_d1e2c09f4fe6cf46b7c573f42b97889d "-" "-" "-"
...

Logging to VictoriaLogs

VictoriaLogs supports various recording formats, and we’ll use the JSON stream API.

First, let’s try to write some logs manually.

Open a port to VictoriaLogs Kubernetes Service:

$ kk -n ops-monitoring-ns port-forward svc/atlas-victoriametrics-victoria-logs-single-server 9428

And make a request with curl:

$ echo '{ "log": { "level": "info", "message": "TEST" }, "date": "0", "stream": "alb" }
' | curl -X POST -H 'Content-Type: application/stream+json' --data-binary @- \
 'http://localhost:9428/insert/jsonline?_stream_fields=stream&_time_field=date&_msg_field=log.message'

Let’s check:

Creating the file collector/victoria.go

Let’s create a minimal file for testing.

It contains:

  • JSONLogRecord struct: here we will generate JSON for sending to the VictoriaLogs HTTP API
  • we fill it in with data, and in the Timestamp field, we can pass “Unix timestamp in seconds, milliseconds, microseconds, or nanoseconds“, we’ll do this with the time.UnixMilli()
  • then, set values for the stream and message fields
  • form an HTTP request
  • send standard http.Client.Do()
package collector

import (
  "bytes"
  "encoding/json"
  "fmt"
  "net/http"
  "time"
)

// JSONLogRecord is the minimal structure for VictoriaLogs JSONLine API
type JSONLogRecord struct {
  Timestamp int64  `json:"date"`
  Stream    string `json:"stream"`
  Message   string `json:"message"`
}

// SendTestRecord sends simple test log record to VictoriaLogs
func SendTestRecord(url string) error {
  rec := JSONLogRecord{
    Timestamp: time.Now().UTC().UnixMilli(),
    Stream:    "test",
    Message:   "TEST VICTORIA",
  }

  body, err := json.Marshal (rec)
  if err != nil {
    return fmt.Errorf("marshal error: %w", err)
  }

  req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
  if err != nil {
    return fmt.Errorf("request error: %w", err)
  }

  req.Header.Set("Content-Type", "application/stream+json")

  resp, err := http.DefaultClient.Do(req)
  if err != nil {
    return fmt.Errorf("http error: %w", err)
  }
  defer resp.Body.Close ()

  if resp.StatusCode != 200 {
    return fmt.Errorf("victoria returned %d", resp.StatusCode)
  }

  fmt.Println("TEST RECORD SENT TO VICTORIA")
  return nil
}

Add the VictoriaLogs endpoint from the environment variables to main.go and use it to build a full URL, specifying which fields should be considered as _msg, where the _time will be, time format, and which field to use to create a Log stream:

...
  // "http://localhost:9428/insert/jsonline"
  vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
  if vmLogsEp == "" {
    log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
  }

  // VictoriaLogs endpoint
  vmLogsURL := vmLogsEp +
    "?_msg_field=message" +
    "&_time_field=date" +
    "&_time_format=unix_ms" +
    "&_stream_fields=stream"

  // 5. send test record to VictoriaLogs
  err = collector.SendVlTestRecord(vmLogsURL)
  if err != nil {
    fmt.Println("ERROR sending test record:", err)
    return
  }
...

Set a variable:

$ export VICTORIA_LOGS_URL="http://localhost:9428/insert/jsonline"

Run the code:

$ go run main.go 
...
TEST RECORD SENT TO VICTORIA

Check:

OK, it works.

Creating a Log Parser

Now let’s update our code and add new functions:

  • collector/scan.go: will read data from gzip and write it to the channel as strings
  • collector/parser.go: generates SimpleLog with fields Timestamp and Message

Edit scan.go – instead of creating a buffer, create a channel, and scanner.Text() will now write to the channel instead of the buffer:

package collector

import (
  "bufio"
  "io"
)

// ScanLines reads lines from an io.Reader and sends them into a channel
// caller must range over the returned channel
func ScanLines(r io.Reader) <-chan string {
  ch := make(chan string)

  go func() {
    defer close(ch)

    scanner := bufio.NewScanner(r)

    // - bufio.Scanner reads decompressed bytes from GzipReader()
    // - it splits input by '\n' and returns each line as a complete string
    // - each line is sent into the channel for further processing
    for scanner.Scan() {
      ch <- scanner.Text()
    }
  }()

  return ch
}

Create parser.go with the SimpleLog structure.

We will keep the time from the log records in the structure, and set the entire log record message in the Message filed:

package collector

import (
  "fmt"
  "strings"
  "time"
)

// SimpleLog contains parsed timestamp and original line
type SimpleLog struct {
  Timestamp time.Time
  Message   string
}

// ParseRawLine parses ALB log timestamp from the 2nd field.
// Everything else we keep raw.
func ParseRawLine(line string) (*SimpleLog, error) {
  fields := strings.Fields(line)
  if len(fields) < 2 {
    return nil, fmt.Errorf("invalid ALB log line")
  }

  fmt.Println("--- PARSER SEND ---")
  fmt.Println(line)
  fmt.Println("--- PARSER DEBUG ---")

  ts, err := time.Parse(time.RFC3339Nano, fields[1])
  if err != nil {
    return nil, fmt.Errorf("timestamp parse error: %w", err)
  }

  return &SimpleLog{
    Timestamp: ts.UTC(),
    Message:   line,
  }, nil
}

Update victoria.go – now it will not fill in the data in JSONLogRecord, but will accept it as an argument:

package collector

import (
  "bytes"
  "encoding/json"
  "fmt"
  "net/http"
  "time"
)

// JSONLogRecord is the minimal structure for VictoriaLogs JSONLine API
type JSONLogRecord struct {
  Timestamp int64  `json:"date"`
  Stream    string `json:"stream"`
  Message   string `json:"message"`
}

// SendToVictoria sends one JSON LINE into VictoriaLogs JSON Stream API.
func SendToVictoria(url string, rec *JSONLogRecord) error {
  body, err := json.Marshal(rec)
  if err != nil {
    return err
  }

  fmt.Println("--- VMLOGS SEND ---")
  fmt.Println(string(body))
  fmt.Println("--- VMLOGS DEBUG ---")

  req, err := http.NewRequest("POST", url, bytes.NewReader(body))
  if err != nil {
    return err
  }

  req.Header.Set("Content-Type", "application/stream+json")

  resp, err := http.DefaultClient.Do(req)
  if err != nil {
    return err
  }
  defer resp.Body.Close()

  if resp.StatusCode >= 300 {
    return fmt.Errorf("victoria response status: %s", resp.Status)
  }

  return nil
}

In main.go, add a loop and add message deletion from the SQS with the DeleteFromSQS() call, which we created earlier:

...
  // 4. Pipeline: read lines → parse → send
  for line := range collector.ScanLines(gzReader) {

    rec, err := collector.ParseRawLine(line)
    if err != nil {
      continue
    }

    out := &collector.JSONLogRecord{
      Timestamp: rec.Timestamp.UnixMilli(),
      Stream:    "alb",
      Message:   rec.Message,
    }

    if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
      fmt.Println("send error:", err)
    }
  }

  // 5. Delete SQS message
  if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
    fmt.Println("delete error:", err)
  }
}

While collector.ScanLines() returns data, we pass it to ParseRawLine(), which fills SimpleLog with Timestamp and Message.

Then we fill in the JSONLogRecord and send it to VictoriaLogs.

Here, we can purge the SQS queue: while I was writing the code, old messages accumulated there, and the exporter started pulling old logs. I spent a some time looking for why the time in VictoriaLogs did not match the expected time, but the problem turned out to be trivial – I was looking at data for the last 15 minutes, and morning records were being imported.

Let’s check:

$ go run main.go 
...
--- PARSER SEND ---
https 2025-11-25T12:36:50.859909Z app/k8s-ops133externalalb-***/336cddd33c043f33 3.***.***. 78:3883 10.0.46.229:8000 0.000 0.010 0.000 200 200 2023 574 "POST https://api.challenge.example.co:443/auth/auth0-webhooks/post-login HTTP/1.1" "axios/1.6.5" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492*** 148:targetgroup/k8s-prodback-backenda-47ba3e0f35/9ec763ecd48352da "Root=1-6925a2e2-6e4507dd3a23de072c2f6ae9" "api.challenge.example.co" "arn:aws:acm:us-east-1:492***148:certificate/beeb3714-511e-414b-b1f3-5440746bb5ea" 12 2025-11-25T12:36:50.837000Z "forward" "-" "-" "10.0.46.229:8000" "200" "-" "-" TID_5ed365cd6c6f57409a2566d1dcaf049c "-" "-" "-"
--- PARSER DEBUG ---
--- VMLOGS SEND ---
{"date":1764074210859,"stream":"alb","message":"https 2025-11-25T12:36:50.859909Z app/k8s-ops133externalalb-***/336cddd33c043f33 3.** *.***. 78:3883 10.0.46.229:8000 0.000 0.010 0.000 200 200 2023 574 \"POST https://api.challenge.example.co:443/auth/auth0-webhooks/post-login HTTP/1.1\" \"axios/1.6.5\" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-1:492*** 148:targetgroup/k8s-prodback-backenda-47ba3e0f35/9ec763ecd48352da \"Root=1-6925a2e2-6e4507dd3a23de072c2f6ae9\" \"api.challenge.example.co\" \"arn:aws:acm:us-east-1:492***148:certificate/beeb3714-511e-414b-b1f3-5440746bb5ea\" 12 2025-11-25T12:36:50.837000Z \"forward\" \"-\" \"-\" \"10.0.46.229:8000\" \"200\" \"-\" \"-\" TID_5ed365cd6c6f57409a2566d1dcaf049c \"-\" \"-\" \"-\""}
--- VMLOGS DEBUG ---
...

And in the VictoriaLogs:

Everything is here.

What we have left:

  • form fields
  • add gocron

Starting a loop with gocron

Add a launch every minute with gocron.

Now the entire main.go looks like this:

func main() {
  // TODO: add exit handler
  ctx := context.Background()

  // Load the Shared AWS Configuration (~/.aws/config)
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // create SQS client using the shared AWS config
  sqsClient := sqs.NewFromConfig(cfg)
  // create S3 client using the shared AWS config
  s3Client := s3.NewFromConfig(cfg)

  // read queue name from environment
  queueName := os.Getenv("ALB_LOGS_QUEUE")
  if queueName == "" {
    log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
  }

  // scheduler
  s := gocron.NewScheduler(time.UTC)

  // job: check SQS every minute
  s.Every(1).Minute().Do(func() {
    fmt.Println("CHECKING SQS...")

    // 1. get message
    bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
    //bucket, key, _, _, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
    if err != nil {
      if err.Error() == "no messages" {
        fmt.Println("NO MESSAGES")
        return
      }
      fmt.Println("ERROR receiving message:", err)
      return
    }

    fmt.Println("BUCKET:", bucket)
    fmt.Println("KEY:", key)

    // 2. get S3 object
    s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
    if err != nil {
      fmt.Println("S3 error:", err)
      return
    }
    defer s3Obj.Body.Close()

    fmt.Println("S3 object stream opened:", bucket, key)

    // 3. gzip reader
    gzReader, err := collector.GzipReader(s3Obj.Body)
    if err != nil {
      fmt.Println("gzip error:", err)
      return
    }
    defer gzReader.Close()

    // "http://localhost:9428/insert/jsonline"
    vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
    if vmLogsEp == "" {
      log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
    }

    // VictoriaLogs endpoint
    vmLogsURL := vmLogsEp +
      "?_msg_field=message" +
      "&_time_field=date" +
      "&_time_format=unix_ms" +
      "&_stream_fields=stream"

    // 4. Pipeline: read lines → parse → send
    for line := range collector.ScanLines(gzReader) {

      rec, err := collector.ParseRawLine(line)
      if err != nil {
        continue
      }

      out := &collector.JSONLogRecord{
        Timestamp: rec.Timestamp.UnixMilli(),
        Stream:    "alb",
        Message:   rec.Message,
      }

      if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
        fmt.Println("send error:", err)
      }
    }

    // 5. delete message from SQS
    if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
      fmt.Println("FAILED TO DELETE SQS MESSAGE:", err)
    } else {
      fmt.Println("SQS MESSAGE DELETED")
    }
  })

  // start async
  s.StartBlocking()
}

Adding fields to VictoriaLogs message 

Documentation on AWS ALB fields – Access log entries.

What we can now add to the logs in VictoriaLogs is the client_ip with client:port, target_ip (Kubernetes Pod) with target:port, elb_status_code – ALB response code, and target_status_code.

Update the SimpleLog struct, add new fields:

type SimpleLog struct {
    Timestamp    time.Time
    Message      string
    ClientIP     string
    TargetIP     string
    ELBStatus    int
    TargetStatus int
}

In the ParseRawLine(),  add adding data to these fields using the array index from the fields string:

...
    elbStatus, _ := strconv.Atoi(fields[8])
    targetStatus, _ := strconv.Atoi(fields[9])

    return &SimpleLog{
        Timestamp:    ts.UTC(),
        Message:      line,
        ClientIP:     fields[3],
        TargetIP:     fields[4],
        ELBStatus:    elbStatus,
        TargetStatus: targetStatus,
    }, nil
...

Add these fields to JSONLogRecord:

type JSONLogRecord struct {
    Timestamp    int64  `json:"date"`
    Message      string `json:"message"`
    Stream       string `json:"stream"`
    ClientIP     string `json:"client_ip"`
    TargetIP     string `json:"target_ip"`
    ELBStatus    int    `json:"elb_status"`
    TargetStatus int    `json:"target_status"`
}

And add them to main.go:

...
      out := &collector.JSONLogRecord{
        Timestamp:    rec.Timestamp.UnixMilli(),
        Message:      rec.Message,
        Stream:       "alb",
        ClientIP:     rec.ClientIP,
        TargetIP:     rec.TargetIP,
        ELBStatus:    rec.ELBStatus,
        TargetStatus: rec.TargetStatus,
      }
...

Now we have new fields in VictoriaLogs:

The ports in client_ip and target_ip are clearly redundant here, so we can remove them during parsing.

Add a function with the strings.SplitN() to our parser.go:

// cut port from "ip:port"
func stripPort(s string) string {
  parts := strings.SplitN(s, ":", 2)
  return parts[0]
}

And use it when filling in SimpleLog:

...
  return &SimpleLog{
    Timestamp:    ts.UTC(),
    Message:      line,
    ClientIP:     stripPort(fields[3]),
    TargetIP:     stripPort(fields[4]),
    ELBStatus:    elbStatus,
    TargetStatus: targetStatus,
  }, nil
...

Now we have fields without unnecessary ports:

The final result

All code together.

I asked AI to write comments so that all functions would be explained in detail.

File main.go

package main

import (
  "alb-logs-collector/collector"
  "context"
  "fmt"
  "log"
  "os"
  "time"

  "github.com/aws/aws-sdk-go-v2/config"
  "github.com/aws/aws-sdk-go-v2/service/s3"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
  "github.com/go-co-op/gocron"
)

func main() {
  // create a base context for all AWS operations
  // this context is passed into SQS/S3 functions
  ctx := context.Background()

  // load shared AWS config from ~/.aws/config and ~/.aws/credentials
  // this provides region, credentials, retry settings, etc.
  cfg, err := config.LoadDefaultConfig(ctx)
  if err != nil {
    log.Fatal(err)
  }

  // create SQS client using the shared AWS config
  // used later for reading and deleting queue messages
  sqsClient := sqs.NewFromConfig(cfg)

  // create S3 client for reading S3 objects
  s3Client := s3.NewFromConfig(cfg)

  // read queue name from environment
  // this avoids hardcoding queue names in code
  queueName := os.Getenv("ALB_LOGS_QUEUE")
  if queueName == "" {
    log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
  }

  // create the scheduler that runs periodic jobs
  // gocron automatically creates goroutines for scheduled tasks
  s := gocron.NewScheduler(time.UTC)

  // schedule: run the function every minute
  s.Every(1).Minute().Do(func() {
    fmt.Println("CHECKING SQS...")

    // 1. read one message from SQS
    // ReceiveFromSQS is implemented in collector/sqs.go
    bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
    if err != nil {
      // "no messages" is not an error - just no new logs
      if err.Error() == "no messages" {
        fmt.Println("NO MESSAGES")
        return
      }
      fmt.Println("ERROR receiving message:", err)
      return
    }

    fmt.Println("BUCKET:", bucket)
    fmt.Println("KEY:", key)

    // 2. download the S3 object stream
    // GetS3Object located in collector/s3.go
    s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
    if err != nil {
      fmt.Println("S3 error:", err)
      return
    }
    defer s3Obj.Body.Close()

    fmt.Println("S3 object stream opened:", bucket, key)

    // 3. wrap the S3 stream into gzip reader
    // GzipReader implemented in collector/gzip.go
    gzReader, err := collector.GzipReader(s3Obj.Body)
    if err != nil {
      fmt.Println("gzip error:", err)
      return
    }
    defer gzReader.Close()

    // read VictoriaLogs endpoint
    vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
    if vmLogsEp == "" {
      log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
    }

    // final URL with parameters for jsonline ingestion
    vmLogsURL := vmLogsEp +
      "?_msg_field=message" + // which JSON field contains the log message
      "&_time_field=date" + // which JSON field contains timestamp
      "&_time_format=unix_ms" + // tell VictoriaLogs that the timestamp is unix milliseconds
      "&_stream_fields=stream" // which field defines the stream name

    // 4. process the log file line-by-line
    // ScanLines implemented in collector/scan.go
    // It returns a channel of RAW log lines (already ungzipped)
    for line := range collector.ScanLines(gzReader) {

      // parse timestamp + extract fields
      // ParseRawLine implemented in collector/parser.go
      rec, err := collector.ParseRawLine(line)
      if err != nil {
        continue // skip invalid ALB lines
      }

      // prepare the JSON record for VictoriaLogs
      // JSONLogRecord defined in collector/victoria.go
      out := &collector.JSONLogRecord{
        Timestamp:    rec.Timestamp.UnixMilli(), // ALB timestamp in unix ms
        Message:      rec.Message,               // full raw ALB log line
        Stream:       "alb",                     // log stream name
        ClientIP:     rec.ClientIP,              // extracted from ALB log
        TargetIP:     rec.TargetIP,              // extracted from ALB log
        ELBStatus:    rec.ELBStatus,             // HTTP status from ALB
        TargetStatus: rec.TargetStatus,          // backend response status
      }

      // send it to VictoriaLogs
      // SendToVictoria implemented in collector/victoria.go
      if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
        fmt.Println("send error:", err)
      }
    }

    // 5. delete message from SQS after successful processing
    // DeleteFromSQS implemented in collector/sqs.go
    if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
      fmt.Println("FAILED TO DELETE SQS MESSAGE:", err)
    } else {
      fmt.Println("SQS MESSAGE DELETED")
    }
  })

  // start the scheduler and block main goroutine forever
  s.StartBlocking()
}

File sqs.go

package collector

import (
  "context"
  "encoding/json"
  "fmt"
  "strings"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/service/sqs"
)

// S3Event describes the JSON format sent by S3 to SQS.
// It matches the structure of the S3 event notification:
// "Records" → "s3" → "bucket.name" and "object.key".
// This struct is used in ReceiveFromSQS() to extract bucket/key.
type S3Event struct {
  Records []struct {
    S3 struct {
      Bucket struct {
        Name string `json:"name"` // S3 bucketName
      } `json:"bucket"`
      Object struct {
        Key string `json:"key"` // S3 object key (log filename)
      } `json:"object"`
    } `json:"s3"`
  } `json:"Records"`
}

// ReceiveFromSQS reads a single SQS message.
// It returns:
// - bucket: the S3 bucket from event
// - key: the object key (filename in S3)
// - receiptHandle: required later to delete the message
// - queueURL: actual AWS queue URL (needed for deletion)
// - err: error, or "no messages" if queue is empty
//
// This function is called from main.go inside the scheduler loop.
// After processing the file from S3, the caller must call DeleteFromSQS().
func ReceiveFromSQS(ctx context.Context, client *sqs.Client, queueName string) (bucket, key, receiptHandle, queueURL string, err error) {

  // resolve real SQS queue URL from queue name
  // SQS APIs always operate on queueURL, not the name
  getURL, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
    QueueName: aws.String(queueName),
  })
  if err != nil {
    return "", "", "", "", err
  }

  queueURL = *getURL.QueueUrl

  // receive exactly one message
  // WaitTimeSeconds enables long polling for up to 10s
  resp, err := client. ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
    QueueUrl:            aws.String(queueURL),
    MaxNumberOfMessages: 1,
    WaitTimeSeconds:     10,
  })
  if err != nil {
    return "", "", "", "", err
  }

  // queue empty → no new logs
  if len(resp.Messages) == 0 {
    return "", "", "", "", fmt.Errorf("no messages")
  }

  // take the first (and only) message
  msg := resp.Messages[0]

  // ReceiptHandle is mandatory for deletion
  receiptHandle = *msg.ReceiptHandle

  // raw JSON body received from S3 → SQS notification
  raw := aws.ToString(msg.Body)
  fmt.Println("SQS RAW:", raw)

  // filter out AWS TestEvent generated when enabling notifications
  // Test events do NOT contain real logs and must be ignored
  if strings.Contains(raw, `"Event":"s3:TestEvent"`) {
    fmt.Println("Skipping AWS S3 test event")
    return "", "", "", queueURL, fmt.Errorf("test event skipped")
  }

  // unmarshal JSON into S3Event struct
  var event S3Event
  if err := json.Unmarshal([]byte(raw), &event); err != nil {
    return "", "", "", "", err
  }

  // extract bucket and key from the event
  bucket = event.Records[0].S3.Bucket.Name
  key = event.Records[0].S3.Object.Key

  return bucket, key, receiptHandle, queueURL, nil
}

// DeleteFromSQS permanently removes the processed message.
// Must be called only after successful S3 → gzip → parsing → VictoriaLogs ingestion.
// If not deleted, SQS will retry delivery (visibility timeout expires).
// Implemented in collector/sqs.go, called from main.go.
func DeleteFromSQS(ctx context.Context, client *sqs.Client, queueURL, receiptHandle string) error {
  _, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
    QueueUrl:      aws.String(queueURL),
    ReceiptHandle: aws.String(receiptHandle),
  })
  return err
}

File s3.go

package collector

import (
  "context"

  "github.com/aws/aws-sdk-go-v2/aws"
  "github.com/aws/aws-sdk-go-v2/service/s3"
)

// GetS3Object retrieves an S3 object and returns a streaming reader.
//
// This function does NOT download the file to disk.
// It returns a network stream (`GetObjectOutput.Body`), which allows the caller
// to read the object lazily, byte-by-byte, directly from S3.
//
// In our pipeline, the call chain looks like:
//
//	main.go
//	  → ReceiveFromSQS() to get bucket/key      (collector/sqs.go)
//	  → GetS3Object() to open S3 object stream  (this file)
//	  → GzipReader() to decompress gzip data    (collector/gzip.go)
//	  → ScanLines() to iterate log lines        (collector/scan.go)
//
// Notes:
// - The returned object must be closed by the caller: `defer obj.Body.Close()`
// - S3 GetObject supports range requests, but here we read the whole file.
// - The body is read sequentially; this is efficient for log ingestion patterns.
func GetS3Object(ctx context.Context, client *s3.Client, bucket, key string) (*s3.GetObjectOutput, error) {
  return client.GetObject(ctx, &s3.GetObjectInput{
    Bucket: aws.String(bucket), // name of S3 bucket with ALB logs
    Key:    aws.String(key),    // key (path/filename) of the gzip log file
  })
}

File gzip.go

package collector

import (
  "compress/gzip"
  "io"
)

// GzipReader wraps an existing io.Reader with a gzip decompressor.
//
// This function takes the S3 object body (returned by GetS3Object in
// collector/s3.go) and turns it into a gzip.Reader capable of producing
// the decompressed content of the ALB log file.
//
// Typical pipeline:
//
//	S3Object.Body (io.ReadCloser)
//	    ↓ passed into
//	GzipReader() → *gzip.Reader (still a stream)
//	    ↓ passed into
//	ScanLines() → yields plain-text log lines
//
// Notes:
// - gzip.NewReader expects the input stream to be in .gz format.
// - Caller must close the returned reader: `defer gz.Close()`.
// - No buffering or scanning is done here; this function only wraps the stream.
func GzipReader(r io.Reader) (*gzip.Reader, error) {
  return gzip.NewReader(r)
}

File scan.go

package collector

import (
  "bufio"
  "io"
)

// ScanLines reads an arbitrary io.Reader line-by-line and returns a channel
// that produces each extracted line as a string.
//
// This function is used in main.go to iterate through the contents of an S3
// Gzip file. The call chain is:
//
//	main.go → GetS3Object() (collector/s3.go)
//	        → GzipReader() (collector/gzip.go)
//	        → ScanLines()  (this file)
//
// Because ScanLines() uses a goroutine, the caller can process lines
// asynchronously using: `for line := range ScanLines(r) { ... }`.
//
// Notes:
//   - The returned channel is *unbuffered*, so each send blocks until the caller
//     receives the value. This naturally rate-controls the goroutine.
//   - bufio.Scanner splits input by '\n', automatically handling different line
//     lengths (up to scanner’s max buffer).
//   - When the reader is fully consumed, the goroutine closes the channel.
func ScanLines(r io.Reader) <-chan string {
  ch := make(chan string)

  // launch a goroutine that streams lines out of the reader
  go func() {
    // ensure channel is closed when scanning finishes
    defer close(ch)

    // bufio.Scanner provides efficient, line-oriented reading of text streams
    scanner := bufio.NewScanner(r)

    // loop until EOF or error. Each iteration reads the next line
    for scanner.Scan() {
      // push extracted line into the channel
      // (blocks until caller receives it)
      ch <- scanner.Text()
    }

    // scanner.Err() is intentionally ignored here, because error handling
    // is performed by the caller when needed, and the channel-based design
    // treats EOF as natural termination.
  }()

  // return read-only channel of strings
  return ch
}

File parser.go

package collector

import (
  "fmt"
  "strconv"
  "strings"
  "time"
)

// SimpleLog represents a minimal, lightweight structure containing:
//
// - Timestamp: parsed ALB timestamp in UTC
// - Message:   the full raw log line (used as message in VictoriaLogs)
// - ClientIP:  extracted client IP (port removed)
// - TargetIP:  extracted backend target IP (port removed)
// - ELBStatus: HTTP status returned by ALB to the client
// - TargetStatus: HTTP status returned by the backend to ALB
//
// This struct is designed for the simplified ingestion phase,
// before implementing full ALB field parsing.
type SimpleLog struct {
  Timestamp    time.Time
  Message      string
  ClientIP     string
  TargetIP     string
  ELBStatus    int
  TargetStatus int
}

// stripPort removes the ":port" suffix from IP strings like "1.2.3.4:5678".
// This keeps VictoriaLogs cardinality low, avoiding creation of thousands
// of separate series due to ephemeral client ports.
func stripPort(s string) string {
  parts := strings.SplitN(s, ":", 2)
  return parts[0]
}

// ParseRawLine parses the essential ALB log fields:
//
// - Timestamp from the 2nd field
// - Client IP (without port)
// - Target IP (without port)
// - ALB HTTP status
// - Target HTTP status
//
// Everything else is kept untouched in the Message field.
//
// ALB logs are space-delimited, except for quoted sections
// (like the request line and user agent). At this simplified stage,
// we do *not* parse quoted fields - we only extract the mandatory parts.
func ParseRawLine(line string) (*SimpleLog, error) {
  fields := strings.Fields(line)

  // we expect at least: protocol, timestamp, elb, client, target, ... status codes
  // ALB format is consistent - <10 fields means corrupted input
  if len(fields) < 10 {
    return nil, fmt.Errorf("invalid ALB log line")
  }

  fmt.Println("--- PARSER SEND ---")
  fmt.Println(line)
  fmt.Println("--- PARSER DEBUG ---")

  // parse timestamp in RFC3339Nano format
  // ALB always emits timestamps in UTC
  ts, err := time.Parse(time.RFC3339Nano, fields[1])
  if err != nil {
    return nil, fmt.Errorf("timestamp parse error: %w", err)
  }

  // convert status codes from string → int
  elbStatus, _ := strconv.Atoi(fields[8])
  targetStatus, _ := strconv.Atoi(fields[9])

  return &SimpleLog{
    Timestamp:    ts.UTC(), // ensure strict UTC normalization
    Message:      line,     // pass full raw line to VictoriaLogs
    ClientIP:     stripPort(fields[3]),
    TargetIP:     stripPort(fields[4]),
    ELBStatus:    elbStatus,
    TargetStatus: targetStatus,
  }, nil
}

File victoria.go

package collector

import (
  "bytes"
  "encoding/json"
  "fmt"
  "net/http"
)

// JSONLogRecord represents a single log entry formatted for
// VictoriaLogs JSONLine ingestion API.
//
// Field mapping:
//
//   - Timestamp → "date"
//     UNIX milliseconds timestamp of the log event.
//     This must match ?_time_field=date&_time_format=unix_ms in ingestion URL.
//
//   - Message → "message"
//     The complete raw ALB log line. Used as the primary log message.
//
//   - Stream → "stream"
//     Logical log stream identifier. Used in ingestion via ?_stream_fields=stream.
//
//   - ClientIP / TargetIP / ELBStatus / TargetStatus
//     Additional parsed metadata fields. These become searchable log fields.
//
// The structure intentionally avoids nested JSON - VictoriaLogs processes
// flat objects more efficiently and without ambiguity in field extraction.
type JSONLogRecord struct {
  Timestamp    int64  `json:"date"`
  Message      string `json:"message"`
  Stream       string `json:"stream"`
  ClientIP     string `json:"client_ip"`
  TargetIP     string `json:"target_ip"`
  ELBStatus    int    `json:"elb_status"`
  TargetStatus int    `json:"target_status"`
}

// SendToVictoria sends exactly ONE JSON record to the VictoriaLogs JSONLine API.
//
// This function is called from main.go inside the ingestion loop.
// It performs the following steps:
//
//  1. Marshal the JSONLogRecord into a compact JSON object
//  2. Build an HTTP POST request with Content-Type: application/stream+json
//  3. Send the request to VictoriaLogs
//  4. Check for non-2xx HTTP status codes
//
// Notes:
//   - VictoriaLogs expects a "streaming JSON" format, where each POST body
//     contains a single JSON object (or multiple lines if needed).
//   - We send logs one-by-one for simplicity, but batching can be added later.
//   - The caller controls the ingestion endpoint URL, including query params:
//     ?_msg_field=message&_time_field=date&_time_format=unix_ms&_stream_fields=stream
//
// Errors returned from this function are logged in main.go, and do not
// interrupt the ingestion pipeline.
func SendToVictoria(url string, rec *JSONLogRecord) error {
  // serialize record into JSON line
  body, err := json.Marshal(rec)
  if err != nil {
    return err
  }

  // debug output for transparency
  fmt.Println("--- VMLOGS SEND ---")
  fmt.Println(string(body))
  fmt.Println("--- VMLOGS DEBUG ---")

  // create POST request with streaming JSON payload
  req, err := http.NewRequest("POST", url, bytes.NewReader(body))
  if err != nil {
    return err
  }

  // JSONLine ingestion requires this content type
  req.Header.Set("Content-Type", "application/stream+json")

  // send HTTP request using default HTTP client
  resp, err := http.DefaultClient.Do(req)
  if err != nil {
    return err
  }
  defer resp.Body.Close()

  // check server-side errors
  if resp.StatusCode >= 300 {
    return fmt.Errorf("victoria response status: %s", resp.Status)
  }

  return nil
}

Already launched it in our Kubernetes, everything works.

The only thing is that SQS and S3 operations do not fail when access is denied, so need to add more error checks.