Я давно фанат ThinkPad, дуже люблю всю їхню лінійку.
Нещодавно десь зустрів модель X200, які випускались з 2008 року – просто десь побачив картинку, і дуже захотів собі в “колекцію”. Неочікувано – але він навіть знайшовся в продажу на OLX, тому купив собі цей чудо-девайс.
ThinkPad X200 overview
Він… Ну – це просто бімба 🙂
Перше, що відрізняє цей ноутбук від інших моделей – це поворотний екран на 360 градусів, і це “фірмова фішка” моделі саме X200 Tablet, бо є X200 без цього.
Друге – це тачскрін.
Зовнішній вигляд
Виглядає ThinkPad X200 Tablet так:
На борту навіть є dial-up модем!
Ну і, звісно, ніяких HDMI – для зовнішнього монітору тільки VGA.
Хоча HDMI 1.0 з’явився ще у 2003, але тоді він використовувався для DVD-плеєрів та телевізорах, а ноутбуки в ті роки ще йшли переважно з VGA.
Замовив перехідник VGA на HDMI, бо в моєму моніторі DELL U3421WE ніяких VGA, само собою, вже нема – подивимось, чи спрацює.
Справа є фізичний перемикач для відключення WiFi/Bluetooth, хоча в моїй моделі блютузу нема:
В комплекті йде стилус:
ThinkPad X200 Hardware
Звісно, залізо вже стареньке:
дисплей: 12.1 дюймів, 1440×900 точок
процесор: Intel Core 2 Duo SU9400, 1.4 GHz
відео: Integrated card Intel 4500MHD
оперативна пам’ять: DDR3-1066 MHz
в офіційній документації Lenovo завжди писали, що максимум пам’яті 4 гігабайти, але гугол каже, що 8 гіг працює без проблем
поки що в мене 4 GB, замовив нові 2 планки по 4 ГБ, подивимось чи запрацює
зараз (2025 рік) DDR3-1066 MHz на 4 ГБ коштує 440 гривень 🙂
диск: SATA, але сам диск HDD, 250 ГБ, 5400 RPM
начебто є можливість замінити диск на SSD, але корпус ще не розбирав, потім подивлюся, і якщо можна – то поставлю якийсь SSD
мережа:
Ethernet 1000 bmps
WiFi: 802.11a, 802.11g, 802.11n
Встановлення FreeBSD
Чому FreeBSD? Бо це моя перша UNIX-система, на якій я вперше збирав ядро, ще у 2006 чи 2007 році, і потім до 2012 чи 2013 це була моя основна система на моїх перших сервера.
Моя перша версія FreeBSD була… не пам’ятаю точно, чи то 5, чи то 6. Але згодом, після виходу FreeBSD 9 перейшов на Linux (CentOS). Правда чому саме – теж не пам’ятаю. Щось пов’язане зі змінами в роботі з пакетами і FreeBSD ports.
Ну і раз вже налаштовуємо “раритетний” ноутбук – то чому б не спробувати і “раритетну” систему? 🙂
Трохи ностальгії по тим часам, коли вчився працювати з портами FreeBSD, коли вперше розбирався з runlevels в Linux часів SysV init і їхніми аналогами у FreeBSD – режимами завантаження системи.
Хоча, звісно, встановлювати будемо актуальну версію FreeBSD, 14.3 на сьогодняшній день.
Єдине, що тачскрін завести не вдалося, і, судячи з результатів гуглінгу, не вдасться – бо FreeBSD просто не має драйвера.
Тому я все ж встановлю FreeBSD, пограюсь і опишу якісь деталі, але потім на цьому ноутбуці буде Arch Linux – там хоч і довелось трохи попрацювати напильником, але тачскрін працює відмінно.
Вибір образу FreeBSD
Для FreeBSD існує три гілки системи:
RELEASE: основна стабільна версія, апдейти тільки security + critical fixes – максимум стабільності, мінімум сюрпризів
STABLE: гілка розробки, з якої формується майбутній RELEASE – вже достатньо стабільна, але все ще можуть бути баги
CURRENT: гілка активної розробки з усіма новими фічами і плюшками – мінімум стабільності, максимум нових експерементальних штук
FreeBSD має окремі версії для майже всіх платформ:
Для нашого ThinkPad X200 вибираємо amd64, завантажуємо образ зі сторінки Get FreeBSD, і починаємо установку.
Встановлення FreeBSD на віртуальну машину
Аби не робити 100500 фотографій екрану – процес установки FreeBSD покажу на віртуалці з QEMU/KVM, бо принципової різниці нема.
По віртуалізації QEMU/KVM в Linux є чорнетка, допишу якось, давно лежить.
У FreeBSD дуже приємний інсталятор bsdinstall, з яким можна зробити все і відразу. І, як на мене – він зручніший за archintsall, з яким я взагалі не подружився і Arch Linux завжди встановлюю руками:
Вибираємо 1, далі маємо вибір – або запустити установку, або використати ISO як live-cd:
Задаємо ім’я хоста:
Далі вибір компонентів.
Нам тут потрібні тільки порти, “ports – Ports tree“, і можна додати “src – System source tree” – тут весь код FreeBSD, корисно, якщо хочеться зібрати власне ядро і для установку апдейдів з утилітою freebsd-update, про неї трохи далі:
Наступний крок – розбивка диску:
ZFS: файлова система з підтримкою snapshots, RAID і можливістю зміни розміру розділів
з’явилась у 2005 році для операційної системи Solaris від Sun Microsystems, і на той час була революційною системою
UFS: класична для FreeBSD файлова система – дуже стабільна, мінімум використання RAM, але і не має всіх фіч ZFS
Залишаємо дефолтну опцію “Auto (ZFS)”:
Забігаючи наперед – ось так потім виглядають розділи вже на самому ноутбуку:
І навіть пропонується відразу налаштувати RAID – але це явно не наш кейс:
Вибираємо диск для установки:
Чекаємо завершення установки:
Задаємо пароль root:
Вибираємо мережевий інтерфейс:
Це установка на віртуалку, WiFi тут нема.
Але на самому ноуті можна відразу налаштувати і його, хоча в моєму випадку з ThinkPad X200 потім довелось трохи робити руками, далі покажу.
Залишаємо DHCP:
Вибираємо таймзону:
Після чого налаштовуємо базові параметри системи:
local_unbound: локальний DNS-кеш + DNSSEC валідація, для домашнього ноута можна не включати
sshd: включаємо
moused: підтримка миші в консолі – користі мало, але виглядає прикольно 🙂
ntpd: синхронізація часу
powerd: динамічне керування частотою CPU, дуже корисно, особливо для ноутбука – зберігати час роботи батареї
dumpdev: допомагає дебажити kernel panic
Далі налаштування для security – цікаво, але для домашнього ноута можна пропустити, має сенс для серверів:
Детально можна почитати в документації Section 2.8.4, “Enabling Hardening Security Options”, але коротко пройдемось по опціях, бо тут проявляються перші (якщо не говорити про файлу систему) відмінності від Linux:
hide_uids і hide_gids: ховає процеси інших користувачів системи (root, звісно, бачить все)
hide_jail: ховає процеси, які запущені в jail
FreeBSD Jails – аналог “класичних” контейнерів в Linux, але з більше жорсткою ізоляцією
і з’явились вони ще до того, як в Linux ядрі взагалі з’явилась підтримка чогось схожого:
jails були додані у FreeBSD ще у 2000 році, з FreeBSD 4.0
в Linux на той час був тільки chroot, де ізоляція була дуже слабка, і можна було легко вийти за межі “контейнера” і тримати доступ до всієї системи
і тільки у 2006-2008 в Linux з’явились cgroups, аж в 2008-2013 були додані namespaces, і тільки після цього, у 2013 році, з’явився Docker і контейнери в Linux, якими ми їх знаємо зараз
proc_debug: вимикає для звичайних користувачів можливість дебажити чужі процеси і обмежує частину інформації з /proc
random_pid: якщо включено, то PID задається з рандомним зміщенням номеру, а не по черзі
clear_tmp: у FreeBSD каталог /tmp по дефолту не очищається при ребуті, з clear_tmp це можна включити
в Linux залежить від дистрибутива і того, як саме монтується /tmp, бо зазвичай це tmpfs в RAM, який, звісно, очищається
FreeBSD свідомо не очищає його, бо політика FreeBSD – “адміністратор сам вирішує, що треба робити системі“
disable_syslogd: заборона відкриття мережевого сокету для демона syslogd – на ноуті можна включити
але сам syslogd продовжує використання локального Unix socket /var/run/log для роботи
secure_console: блокує root-вхід з консолі без пароля при завантаженні системи в single-user mode (див. Chapter 15. The FreeBSD Booting Process – у FreeBSD дуже класна документація)
disable_dtrace: DTrace у FreeBSD – аналог strace/eBPF в Linux, для трасування процесів, системних викликів і роботи ядра, і має можливість вносити “на льоту” зміни в пам’ять та ядро; disable_dtrace блокує цю можливість навіть для root
Окей – залишаємо тут все по дефолту, і переходимо до user management:
Додаємо юзера:
У FreeBSD є можливість вказати Login class, і це фішка чисто FreeBSD, див. Configuring Login Classes.
Ну і власне на цьому установка завершена.
В останньому вікні є можливість щось дотюнити:
Ребутаємось, і маємо нову систему:
І екран загрузки вже на самому ноуті:
FreeBSD load options
Коротко по доступних опціях:
Boot Multi user (Enter): стандартний запуск, монтуються файлові системи, запускаються сервіси із /etc/rc.conf
аналог в Linux – звичайний boot у runlevel/systemd multi-user.target
Boot Single user: коли система зламалась, і треба щось пофіксити
запускається тільки ядро і мінімальний shell
“/” монтується в read-only (можна перемонтувати в r/w)
аналог в Linux – init=/bin/bash або systemd.unit=rescue.target
аналог в Linux – GRUB console, але FreeBSD loader глибше інтегрований з системою
Reboot: reboot 🙂
Cons: Video: фішка FreeBSD, задає тип для system console – video console (звичайний екран) або serial console
Kernel: default/kernel: вибір ядра для завантаження
при апгрейдах попередня версія ядра зберігається у /boot/kernel.old/, і при проблемах можна завантажити інші ядра
Boot Options: додаткові параметри для завантаження – відключити ACPI, DMA, ZFS cache, etc
Повертаємось до ноутбука.
Налаштування WiFi
При встановленні на ноутбук побачило карту:
І мережі:
Але після налаштування і завершення установки інтерфейсу нема:
Робимо вручну.
З pciconf -lv перевіряємо пристрої на PCI/PCI-Express шині:
Нас цікавить device iwn – “Intel WiFi Link 5300“:
The Intel® WiFi Link 5300 Series is a family of IEEE 802.11a/b/g/Draft-N1 wireless network adapters that operate in both the 2.4 GHz and 5.0 GHz spectra
Тут FreeBSD – це user-facing пакети, а FreeBSD-kmods – репозиторій для додаткових модулів ядра (nvidia-driver, virtualbox-kmod тощо).
FreeBSD Ports Collection – каталог із source code пакетів та Makefiles, розбиті по категоріям, наприклад порт для xfce4-conf буде в директорії /usr/ports/x11/:
root@setevoy-x200:/home/setevoy # ls -1 /usr/ports/x11/xfce4-conf/
Makefile
distinfo
files
pkg-descr
pkg-plist
Використовувати порти варто, якщо потрібні якісь особливі параметри для зборки та установки, але здебільшого все можна встановлювати з pkg.
Апгрейд системи і пакетів
Тут у нас є дві окремі частини – сама система, і пакети, які ми встановлюємо з pkg.
Для апгрейду системи у FreeBSD є класна утиліта freebsd-update – дозволяє і отримувати останні апдейти, і виконувати апгрейд самої системи.
Аби отримати і встановити вручну – робимо:
# freebsd-update fetch
# freebsd-update install
fetch – завантажити останні апдейти, install – встановити їх.
Або можна відразу додати в cron:
@daily root freebsd-update cron
freebsd-update fetch та freebsd-update install виконають апгрейд ядра, системних утиліт в /bin, /sbin, /usr/bin, бібліотеки в /lib, /usr/lib і драйверів.
Для апгрейду пакетів, які ми встановлюємо з pkg – використовуємо аналогічні команди:
# pkg update
# pkg upgrade
Аналогічно до freebsd-update – з update отримуємо оновлення, з upgrade встановлюємо їх.
# pkg install xorg drm-kmod
...
Number of packages to be installed: 318
The process will require 3 GiB more space.
431 MiB to be downloaded.
...
Встановлюємо сам XFCE – теж немаленький:
# pkg install xfce xfce4-goodies
...
Number of packages to be installed: 317
The process will require 1 GiB more space.
248 MiB to be downloaded.
...
Але році у 2010 встановлював KDE з ports – то процес зборки зайняв багато годин, бо pkg тоді ще не було, і в ті часи майже все встановлювалось із ports collection.
Була і тоді система pkg_tools, але в ній не було системи залежностей, пакети качались по FTP, а апгрейд пакетів виконувався фактично через видалення і встановлення заново.
До /etc/rc.conf додаємо запуск DBus (потрібен для XFCE) та Slim:
sysrc dbus_enable="YES"
sysrc slim_enable="YES"
Створюємо файл ~/.xinitrcв домашній директорії юзера, аби Slim знав що запускати:
# echo "exec startxfce4" > /home/setevoy/.xinitrc
Ребутаємо машинку:
# reboot
І вуаля – все готово:
Можна перевірити з яким драйвером працює відео:
# cat /var/log/Xorg.0.log | grep -E "Driver|scfb|vesa"
[ 145.813] X.Org Video Driver: 25.2
[ 145.862] (==) Matched scfb as autoconfigured driver 2
[ 145.862] (==) Matched vesa as autoconfigured driver 3
[ 145.864] Module class: X.Org Video Driver
[ 145.864] ABI class: X.Org Video Driver, version 25.2
[ 145.864] (II) LoadModule: "scfb"
[ 145.864] (II) Loading /usr/local/lib/xorg/modules/drivers/scfb_drv.so
[ 145.864] (II) Module scfb: vendor="X.Org Foundation"
[ 145.864] ABI class: X.Org Video Driver, version 25.2
[ 145.864] (II) LoadModule: "vesa"
[ 145.864] (II) Loading /usr/local/lib/xorg/modules/drivers/vesa_drv.so
[ 145.864] (II) Module vesa: vendor="X.Org Foundation"
[ 145.864] Module class: X.Org Video Driver
[ 145.865] ABI class: X.Org Video Driver, version 25.2
[ 145.865] (II) modesetting: Driver for Modesetting Kernel Drivers: kms
[ 145.865] (II) scfb: driver for wsdisplay framebuffer: scfb
[ 145.865] (II) VESA: driver for VESA chipsets: vesa
[ 145.873] (WW) Falling back to old probe method for scfb
[ 145.873] scfb trace: probe start
[ 145.873] scfb trace: probe done
[ 145.874] ABI class: X.Org Video Driver, version 25.2
[ 145.875] ABI class: X.Org Video Driver, version 25.2
[ 146.006] (II) UnloadModule: "scfb"
[ 146.006] (II) Unloading scfb
scfb чимось не сподобався, буде з vesa, але на цьому залізі взагалі не принципово.
Додаємо інші корисні пакети:
# pkg install vim sudo bash
Я вже звик до bash, тому встановлюю і його.
Аби змінити юзеру його shell – використовуємо chsh:
# chsh -s /usr/local/bin/bash setevoy
chsh: user information updated
Але, як казав на початку – на такому старому залізі під FreeBSD не працює тачскрін, тому потім все ж встановлю Arch Linux – з ним вже тестив, там тачскрін пальцями і стилусом працює без проблем.
Наступна задача, яку хочеться вирішити з Go – це написати власний logs collector для збору логів AWS Load Balancer з AWS S3 і запису їх до VictoriaLogs.
наш колектор опитує SQS, отримує інформацію про нові об’єкти в S3
робить запит до S3, отримує gz-архів
розпаковує, парсить дані, і відправляє до VictoriaLogs
Поїхали.
Налаштування S3 та SQS notifications
Коли продумував ідею, то головне питання було – як знати, які об’єкти в S3 ми вже обробили, а які ні?
Мати якусь базу, в яку писати інформацію про вже оброблені об’єкти – перший варіант. Але з часом записів про такі обєкти буде все більше, плюс не дуже хочеться тягнути якийсь stateful-сервіс в Kubernetes, де потім буде запускатись наш колектор.
Тому робимо простіше і надійніше, так само як це зроблено з Vector.dev та VPC Flow Logs – створимо SQS чергу, в яку будуть приходити повідомлення про нові S3 objects.
Читаємо повідомлення з SQS, отримуємо з них інформацію про нові файли, оброблюємо файли, видаляємо меседж із SQS queue.
Переходимо до S3 з логами, Properties > Event notifications:
Створюємо новий Event notification – відправляти повідомлення про всі операції s3:ObjectCreated:
В Destination вибираємо SQS і нашу чергу:
Чекаємо кілька хвилин, перевіряємо Monitoring в SQS:
Окей – є повідомлення, тепер їх треба зібрати і прочитати.
Переходимо до Go.
AWS SDK for Go
Для роботи з AWS нам буде потрібен AWS SDK for Go, з яким можемо виконати всі потрібні операції.
Для отримання даних доступу використовуємо aws-sdk-go-v2/config і функцію LoadDefaultConfig(), яка виконує стандартний пошук credentials – в змінних оточення, файлах ~/.aws/credentials та ~/.aws/config, або використовує EC2 IAM Roles.
...
// receive a single message from SQS
msgResp, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: queueURL, // URL returned by the GetQueueUrl()
MaxNumberOfMessages: 1, // receive only one message
WaitTimeSeconds: 10, // enable long polling
})
if err != nil {
log.Fatal(err)
}
if len(msgResp.Messages) == 0 {
fmt.Println("no messages received")
return
}
// print the received message body
fmt.Println("Received message:", aws.ToString(msgResp.Messages[0].Body))
...
...
// define a struct to unmarshal S3 event message
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// take SQS message body
msgBody := aws.ToString(msgResp.Messages[0].Body)
// decode JSON into the struct
var event S3Event
if err := json.Unmarshal([]byte(msgBody), &event); err != nil {
log.Fatal("failed to parse S3 event:", err)
}
// extract bucket and key
// we always have 1 message, so use [0]
bucket := event.Records[0].S3.Bucket.Name
key := event.Records[0].S3.Object.Key
fmt.Println("bucket:", bucket)
fmt.Println("key:", key)
...
Перевіряємо ще раз:
$ go run main.go
bucket: ops-1-33-devops-ingress-ops-alb-loki-logs
key: AWSLogs/492***148/elasticloadbalancing/us-east-1/2025/11/25/492***148_elasticloadbalancing_us-east-1_app.k8s-ops133externalalb-***.336cddd33c043f33_20251125T0935Z_52.***.***.213_60mjhvf6.log.gz
...
// fetch the S3 object and return its streaming body
objResp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
log.Fatal("failed to download object:", err)
}
defer objResp.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
...
GetObject() повертає *GetObjectOutput, в якому є поле Body з типом io.ReadCloser, а io.ReadCloser – це інтерфейс, який визначає два методи – Reader та Closer.
Читання файлу з gzip
Логи в S3 зберігаються в gz, тому додаємо пакет gzip, і з NewReader() читаємо дані:
У нас вже доволі великий main(), і основні операції зробили, все працює.
Давайте наведемо трохи красоти.
Весь код зараз:
package main
import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"log"
"os"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
func main() {
// TODO: add exit handler
ctx := context.Background()
// Load the Shared AWS Configuration (~/.aws/config)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
sqsClient := sqs.NewFromConfig(cfg)
// read queue name from environment
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// request queue URL by name
getURLResp, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
log.Fatal(err)
}
queueURL := getURLResp.QueueUrl
//fmt.Println("Queue URL:", *queueURL)
// receive a single message from SQS
msgResp, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: queueURL, // URL який ми отримали раніше
MaxNumberOfMessages: 1, // receive only one message
WaitTimeSeconds: 10, // enable long polling (recommended)
})
if err != nil {
log.Fatal(err)
}
if len(msgResp.Messages) == 0 {
fmt.Println("no messages received")
return
}
// print the received message body
//fmt.Println(aws.ToString(msgResp.Messages[0].Body))
// define a struct to unmarshal S3 event message
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// take SQS message body
msgBody := aws.ToString(msgResp.Messages[0].Body)
// decode JSON into the struct
var event S3Event
if err := json.Unmarshal([]byte(msgBody), &event); err != nil {
log.Fatal("failed to parse S3 event:", err)
}
// extract bucket and key
// we always have 1 message, so use [0]
bucket := event.Records[0].S3.Bucket.Name
key := event.Records[0].S3.Object.Key
//fmt.Println("bucket:", bucket)
//fmt.Println("key:", key)
// create S3 client using the shared AWS config
s3Client := s3.NewFromConfig(cfg)
// fetch the S3 object and return its streaming body
objResp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
log.Fatal("failed to download object:", err)
}
defer objResp.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
// create gzip reader from S3 object stream
gzReader, err := gzip.NewReader(objResp.Body)
if err != nil {
log.Fatal("failed to create gzip reader:", err)
}
defer gzReader.Close()
//fmt.Println("gzip stream opened")
// create scanner to read decompressed log lines
scanner := bufio.NewScanner(gzReader)
// increase buffer size if ALB logs have long lines
// default scanner buffer = 64 KB
buf := make([]byte, 0, 1024*1024) // 1 MB
scanner.Buffer(buf, 1024*1024)
// iterate over every line in the decompressed file
for scanner.Scan() {
line := scanner.Text()
fmt.Println("log line:", line)
}
if err := scanner.Err(); err != nil {
log.Fatal("scanner error:", err)
}
}
Як можемо організувати процес?
В main() виконуємо всякі ініціалізації, а потім в циклі будемо опитувати SQS:
створюємо context
створюємо AWS config
створюємо клієнти sqsClient та s3Client
зчитуємо змінні середовища (ALB_LOGS_QUEUE, потім додамо ще)
і потім в циклі:
викликаємо функцію receiveFromSQS() – перевіряємо, чи з’явились нові меседжи
викликаємо функцію getS3Object() – якщо меседжи є, то йдемо до S3 і читаємо звідти новий архів
викликаємо функцію processLogFile() – зчитуємо строки з кожного отриманого логу
Які функції для цього знадобляться?
функція ReceiveFromSQS()
сюди будемо передавати context, SQS client, SQS queue URL, і записувати в структуру S3Event ім’я бакету та key – ім’я файлу
для подальшого видалення меседжів після успішної обробки – потрібно буде повертати receiptHandle
функція GetS3Object()
отримує context, AWS Config, bucket, key
виконує GetObject() і повертає GetObjectOutput
функція GzipReader()
читає дані від GetS3Object(), розпаковує, повертає строки
функція ScanLines()
отримує дані від GzipReader(), зчитує з Text(), і поки що просто виводить на консоль
Package collector
Для зручності і аби все було структуровано – розділимо все по окремим файлам:
collector/
sqs.go
s3.go
gzip.go
scan.go
main.go
Файл collector/sqs.go
Коли тестив, то зустрів таку помилку:
$ go run main.go | head
panic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
alb-logs-collector/collector.ReceiveFromSQS({0xa83530, 0xdda060}, 0xc00021d888, {0xc000160050, 0x4d})
/home/setevoy/Projects/Go/alb-logs-collector/collector/sqs.go:52 +0x25d
main.main()
/home/setevoy/Projects/Go/alb-logs-collector/main.go:46 +0x305
exit status 2
Виникає через, то AWS пише тестові повідомлення до черги:
JSONLogRecord struct: тут будемо формувати JSON для передачі на VictoriaLogs HTTP API
заповнюємо його даними, в поле Timestamp можна передавати “Unix timestamp in seconds, milliseconds, microseconds or nanoseconds” – ми робимо в time.UnixMilli()
package collector
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
)
// JSONLogRecord is the minimal structure for VictoriaLogs JSONLine API
type JSONLogRecord struct {
Timestamp int64 `json:"date"`
Stream string `json:"stream"`
Message string `json:"message"`
}
// SendTestRecord sends simple test log record to VictoriaLogs
func SendTestRecord(url string) error {
rec := JSONLogRecord{
Timestamp: time.Now().UTC().UnixMilli(),
Stream: "test",
Message: "TEST VICTORIA",
}
body, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("marshal error: %w", err)
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("request error: %w", err)
}
req.Header.Set("Content-Type", "application/stream+json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("http error: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("victoria returned %d", resp.StatusCode)
}
fmt.Println("TEST RECORD SENT TO VICTORIA")
return nil
}
До main.go додаємо отримання VictoriaLogs ендпоінта зі змінних оточення і з ним будуємо повний URL, в якому вказуємо які поля треба вважати як _msg, де буде _time, в якому форматі час, і за яким полем створювати Log stream:
...
// "http://localhost:9428/insert/jsonline"
vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
if vmLogsEp == "" {
log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
}
// VictoriaLogs endpoint
vmLogsURL := vmLogsEp +
"?_msg_field=message" +
"&_time_field=date" +
"&_time_format=unix_ms" +
"&_stream_fields=stream"
// 5. send test record to VictoriaLogs
err = collector.SendVlTestRecord(vmLogsURL)
if err != nil {
fmt.Println("ERROR sending test record:", err)
return
}
...
collector/scan.go буде читати дані від gzip і писати в канал у вигляді готових strings
collector/parser.go – формує SimpleLog з полями Timestamp і Message
Редагуємо scan.go – замість створення буферу створюємо channel, і scanner.Text()тепер буде писати в канал замість буферу:
package collector
import (
"bufio"
"io"
)
// ScanLines reads lines from an io.Reader and sends them into a channel
// caller must range over the returned channel
func ScanLines(r io.Reader) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
scanner := bufio.NewScanner(r)
// - bufio.Scanner reads decompressed bytes from GzipReader()
// - it splits input by '\n' and returns each line as a complete string
// - each line is sent into the channel for further processing
for scanner.Scan() {
ch <- scanner.Text()
}
}()
return ch
}
Створюємо parser.go зі структурою SimpleLog.
В структурі будемо тримати час, отриманий із запису в log record, а в Message будемо заносити весь текст:
Поки collector.ScanLines() повертає дані – передаємо їх до ParseRawLine(), який заповнює SimpleLog з Timestamp і Message.
Потім заповнюємо JSONLogRecord і передаємо до VictoriaLogs.
Зараз можна очистити SQS-чергу: поки я писав код, там назбирались старі повідомлення, і експортер почав тягнути старі логи. Я довго шукав, чому у VictoriaLogs час не збігається з очікуваним, але проблема виявилась банальною – я дивився дані за останні 15 хвилин, а імпортувались ранкові записи.
Але тоді доведеться почекати до 5 хвилин, поки з’явиться новий меседж.
Що зараз можемо додати до логів у VictoriaLogs – client_ip з client:port, target_ip (Kubernetes Pod) із target:port, elb_status_code – код відповіді ALB, аби потім простіше робити алерти, і target_status_code – аналогічно.
Оновлюємо структуру SimpleLog – додаємо нові поля:
type SimpleLog struct {
Timestamp time.Time
Message string
ClientIP string
TargetIP string
ELBStatus int
TargetStatus int
}
До ParseRawLine() додаємо запис в ці поля, використовуючи array index зі строки fields:
Коменти попросив написати AI, щоб детально по всім функціям було пояснення.
Файл main.go
package main
import (
"alb-logs-collector/collector"
"context"
"fmt"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/go-co-op/gocron"
)
func main() {
// create a base context for all AWS operations
// this context is passed into SQS/S3 functions
ctx := context.Background()
// load shared AWS config from ~/.aws/config and ~/.aws/credentials
// this provides region, credentials, retry settings, etc.
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(err)
}
// create SQS client using the shared AWS config
// used later for reading and deleting queue messages
sqsClient := sqs.NewFromConfig(cfg)
// create S3 client for reading S3 objects
s3Client := s3.NewFromConfig(cfg)
// read queue name from environment
// this avoids hardcoding queue names in code
queueName := os.Getenv("ALB_LOGS_QUEUE")
if queueName == "" {
log.Fatal("environment variable ALB_LOGS_QUEUE is not set")
}
// create the scheduler that runs periodic jobs
// gocron automatically creates goroutines for scheduled tasks
s := gocron.NewScheduler(time.UTC)
// schedule: run the function every minute
s.Every(1).Minute().Do(func() {
fmt.Println("CHECKING SQS...")
// 1. read one message from SQS
// ReceiveFromSQS is implemented in collector/sqs.go
bucket, key, receiptHandle, queueURL, err := collector.ReceiveFromSQS(ctx, sqsClient, queueName)
if err != nil {
// "no messages" is not an error — just no new logs
if err.Error() == "no messages" {
fmt.Println("NO MESSAGES")
return
}
fmt.Println("ERROR receiving message:", err)
return
}
fmt.Println("BUCKET:", bucket)
fmt.Println("KEY:", key)
// 2. download the S3 object stream
// GetS3Object located in collector/s3.go
s3Obj, err := collector.GetS3Object(ctx, s3Client, bucket, key)
if err != nil {
fmt.Println("S3 error:", err)
return
}
defer s3Obj.Body.Close()
fmt.Println("S3 object stream opened:", bucket, key)
// 3. wrap the S3 stream into gzip reader
// GzipReader implemented in collector/gzip.go
gzReader, err := collector.GzipReader(s3Obj.Body)
if err != nil {
fmt.Println("gzip error:", err)
return
}
defer gzReader.Close()
// read VictoriaLogs endpoint
vmLogsEp := os.Getenv("VICTORIA_LOGS_URL")
if vmLogsEp == "" {
log.Fatal("environment variable VICTORIA_LOGS_URL is not set")
}
// final URL with parameters for jsonline ingestion
vmLogsURL := vmLogsEp +
"?_msg_field=message" + // which JSON field contains the log message
"&_time_field=date" + // which JSON field contains timestamp
"&_time_format=unix_ms" + // tell VictoriaLogs that the timestamp is unix milliseconds
"&_stream_fields=stream" // which field defines the stream name
// 4. process the log file line-by-line
// ScanLines implemented in collector/scan.go
// It returns a channel of RAW log lines (already ungzipped)
for line := range collector.ScanLines(gzReader) {
// parse timestamp + extract fields
// ParseRawLine implemented in collector/parser.go
rec, err := collector.ParseRawLine(line)
if err != nil {
continue // skip invalid ALB lines
}
// prepare the JSON record for VictoriaLogs
// JSONLogRecord defined in collector/victoria.go
out := &collector.JSONLogRecord{
Timestamp: rec.Timestamp.UnixMilli(), // ALB timestamp in unix ms
Message: rec.Message, // full raw ALB log line
Stream: "alb", // log stream name
ClientIP: rec.ClientIP, // extracted from ALB log
TargetIP: rec.TargetIP, // extracted from ALB log
ELBStatus: rec.ELBStatus, // HTTP status from ALB
TargetStatus: rec.TargetStatus, // backend response status
}
// send it to VictoriaLogs
// SendToVictoria implemented in collector/victoria.go
if err := collector.SendToVictoria(vmLogsURL, out); err != nil {
fmt.Println("send error:", err)
}
}
// 5. delete message from SQS after successful processing
// DeleteFromSQS implemented in collector/sqs.go
if err := collector.DeleteFromSQS(ctx, sqsClient, queueURL, receiptHandle); err != nil {
fmt.Println("FAILED TO DELETE SQS MESSAGE:", err)
} else {
fmt.Println("SQS MESSAGE DELETED")
}
})
// start the scheduler and block main goroutine forever
s.StartBlocking()
}
Файл sqs.go
package collector
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
// S3Event describes the JSON format sent by S3 to SQS.
// It matches the structure of the S3 event notification:
// "Records" → "s3" → "bucket.name" and "object.key".
// This struct is used in ReceiveFromSQS() to extract bucket/key.
type S3Event struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"` // S3 bucketName
} `json:"bucket"`
Object struct {
Key string `json:"key"` // S3 object key (log filename)
} `json:"object"`
} `json:"s3"`
} `json:"Records"`
}
// ReceiveFromSQS reads a single SQS message.
// It returns:
// - bucket: the S3 bucket from event
// - key: the object key (filename in S3)
// - receiptHandle: required later to delete the message
// - queueURL: actual AWS queue URL (needed for deletion)
// - err: error, or "no messages" if queue is empty
//
// This function is called from main.go inside the scheduler loop.
// After processing the file from S3, the caller must call DeleteFromSQS().
func ReceiveFromSQS(ctx context.Context, client *sqs.Client, queueName string) (bucket, key, receiptHandle, queueURL string, err error) {
// resolve real SQS queue URL from queue name
// SQS APIs always operate on queueURL, not the name
getURL, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
return "", "", "", "", err
}
queueURL = *getURL.QueueUrl
// receive exactly one message
// WaitTimeSeconds enables long polling for up to 10s
resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 1,
WaitTimeSeconds: 10,
})
if err != nil {
return "", "", "", "", err
}
// queue empty → no new logs
if len(resp.Messages) == 0 {
return "", "", "", "", fmt.Errorf("no messages")
}
// take the first (and only) message
msg := resp.Messages[0]
// ReceiptHandle is mandatory for deletion
receiptHandle = *msg.ReceiptHandle
// raw JSON body received from S3 → SQS notification
raw := aws.ToString(msg.Body)
fmt.Println("SQS RAW:", raw)
// filter out AWS TestEvent generated when enabling notifications
// Test events do NOT contain real logs and must be ignored
if strings.Contains(raw, `"Event":"s3:TestEvent"`) {
fmt.Println("Skipping AWS S3 test event")
return "", "", "", queueURL, fmt.Errorf("test event skipped")
}
// unmarshal JSON into S3Event struct
var event S3Event
if err := json.Unmarshal([]byte(raw), &event); err != nil {
return "", "", "", "", err
}
// extract bucket and key from the event
bucket = event.Records[0].S3.Bucket.Name
key = event.Records[0].S3.Object.Key
return bucket, key, receiptHandle, queueURL, nil
}
// DeleteFromSQS permanently removes the processed message.
// Must be called only after successful S3 → gzip → parsing → VictoriaLogs ingestion.
// If not deleted, SQS will retry delivery (visibility timeout expires).
// Implemented in collector/sqs.go, called from main.go.
func DeleteFromSQS(ctx context.Context, client *sqs.Client, queueURL, receiptHandle string) error {
_, err := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: aws.String(receiptHandle),
})
return err
}
Файл s3.go
package collector
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// GetS3Object retrieves an S3 object and returns a streaming reader.
//
// This function does NOT download the file to disk.
// It returns a network stream (`GetObjectOutput.Body`), which allows the caller
// to read the object lazily, byte-by-byte, directly from S3.
//
// In our pipeline, the call chain looks like:
//
// main.go
// → ReceiveFromSQS() to get bucket/key (collector/sqs.go)
// → GetS3Object() to open S3 object stream (this file)
// → GzipReader() to decompress gzip data (collector/gzip.go)
// → ScanLines() to iterate log lines (collector/scan.go)
//
// Notes:
// - The returned object must be closed by the caller: `defer obj.Body.Close()`
// - S3 GetObject supports range requests, but here we read the whole file.
// - The body is read sequentially; this is efficient for log ingestion patterns.
func GetS3Object(ctx context.Context, client *s3.Client, bucket, key string) (*s3.GetObjectOutput, error) {
return client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket), // name of S3 bucket with ALB logs
Key: aws.String(key), // key (path/filename) of the gzip log file
})
}
Файл gzip.go
package collector
import (
"compress/gzip"
"io"
)
// GzipReader wraps an existing io.Reader with a gzip decompressor.
//
// This function takes the S3 object body (returned by GetS3Object in
// collector/s3.go) and turns it into a gzip.Reader capable of producing
// the decompressed content of the ALB log file.
//
// Typical pipeline:
//
// S3Object.Body (io.ReadCloser)
// ↓ passed into
// GzipReader() → *gzip.Reader (still a stream)
// ↓ passed into
// ScanLines() → yields plain-text log lines
//
// Notes:
// - gzip.NewReader expects the input stream to be in .gz format.
// - Caller must close the returned reader: `defer gz.Close()`.
// - No buffering or scanning is done here; this function only wraps the stream.
func GzipReader(r io.Reader) (*gzip.Reader, error) {
return gzip.NewReader(r)
}
Файл scan.go
package collector
import (
"bufio"
"io"
)
// ScanLines reads an arbitrary io.Reader line-by-line and returns a channel
// that produces each extracted line as a string.
//
// This function is used in main.go to iterate through the contents of an S3
// Gzip file. The call chain is:
//
// main.go → GetS3Object() (collector/s3.go)
// → GzipReader() (collector/gzip.go)
// → ScanLines() (this file)
//
// Because ScanLines() uses a goroutine, the caller can process lines
// asynchronously using: `for line := range ScanLines(r) { ... }`.
//
// Notes:
// - The returned channel is *unbuffered*, so each send blocks until the caller
// receives the value. This naturally rate-controls the goroutine.
// - bufio.Scanner splits input by '\n', automatically handling different line
// lengths (up to scanner’s max buffer).
// - When the reader is fully consumed, the goroutine closes the channel.
func ScanLines(r io.Reader) <-chan string {
ch := make(chan string)
// launch a goroutine that streams lines out of the reader
go func() {
// ensure channel is closed when scanning finishes
defer close(ch)
// bufio.Scanner provides efficient, line-oriented reading of text streams
scanner := bufio.NewScanner(r)
// loop until EOF or error. Each iteration reads the next line
for scanner.Scan() {
// push extracted line into the channel
// (blocks until caller receives it)
ch <- scanner.Text()
}
// scanner.Err() is intentionally ignored here, because error handling
// is performed by the caller when needed, and the channel-based design
// treats EOF as natural termination.
}()
// return read-only channel of strings
return ch
}
Файл parser.go
package collector
import (
"fmt"
"strconv"
"strings"
"time"
)
// SimpleLog represents a minimal, lightweight structure containing:
//
// - Timestamp: parsed ALB timestamp in UTC
// - Message: the full raw log line (used as message in VictoriaLogs)
// - ClientIP: extracted client IP (port removed)
// - TargetIP: extracted backend target IP (port removed)
// - ELBStatus: HTTP status returned by ALB to the client
// - TargetStatus: HTTP status returned by the backend to ALB
//
// This struct is designed for the simplified ingestion phase,
// before implementing full ALB field parsing.
type SimpleLog struct {
Timestamp time.Time
Message string
ClientIP string
TargetIP string
ELBStatus int
TargetStatus int
}
// stripPort removes the ":port" suffix from IP strings like "1.2.3.4:5678".
// This keeps VictoriaLogs cardinality low, avoiding creation of thousands
// of separate series due to ephemeral client ports.
func stripPort(s string) string {
parts := strings.SplitN(s, ":", 2)
return parts[0]
}
// ParseRawLine parses the essential ALB log fields:
//
// - Timestamp from the 2nd field
// - Client IP (without port)
// - Target IP (without port)
// - ALB HTTP status
// - Target HTTP status
//
// Everything else is kept untouched in the Message field.
//
// ALB logs are space-delimited, except for quoted sections
// (like the request line and user agent). At this simplified stage,
// we do *not* parse quoted fields — we only extract the mandatory parts.
func ParseRawLine(line string) (*SimpleLog, error) {
fields := strings.Fields(line)
// we expect at least: protocol, timestamp, elb, client, target, ... status codes
// ALB format is consistent — <10 fields means corrupted input
if len(fields) < 10 {
return nil, fmt.Errorf("invalid ALB log line")
}
fmt.Println("--- PARSER SEND ---")
fmt.Println(line)
fmt.Println("--- PARSER DEBUG ---")
// parse timestamp in RFC3339Nano format
// ALB always emits timestamps in UTC
ts, err := time.Parse(time.RFC3339Nano, fields[1])
if err != nil {
return nil, fmt.Errorf("timestamp parse error: %w", err)
}
// convert status codes from string → int
elbStatus, _ := strconv.Atoi(fields[8])
targetStatus, _ := strconv.Atoi(fields[9])
return &SimpleLog{
Timestamp: ts.UTC(), // ensure strict UTC normalization
Message: line, // pass full raw line to VictoriaLogs
ClientIP: stripPort(fields[3]),
TargetIP: stripPort(fields[4]),
ELBStatus: elbStatus,
TargetStatus: targetStatus,
}, nil
}
Файл victoria.go
package collector
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
// JSONLogRecord represents a single log entry formatted for
// VictoriaLogs JSONLine ingestion API.
//
// Field mapping:
//
// - Timestamp → "date"
// UNIX milliseconds timestamp of the log event.
// This must match ?_time_field=date&_time_format=unix_ms in ingestion URL.
//
// - Message → "message"
// The complete raw ALB log line. Used as the primary log message.
//
// - Stream → "stream"
// Logical log stream identifier. Used in ingestion via ?_stream_fields=stream.
//
// - ClientIP / TargetIP / ELBStatus / TargetStatus
// Additional parsed metadata fields. These become searchable log fields.
//
// The structure intentionally avoids nested JSON — VictoriaLogs processes
// flat objects more efficiently and without ambiguity in field extraction.
type JSONLogRecord struct {
Timestamp int64 `json:"date"`
Message string `json:"message"`
Stream string `json:"stream"`
ClientIP string `json:"client_ip"`
TargetIP string `json:"target_ip"`
ELBStatus int `json:"elb_status"`
TargetStatus int `json:"target_status"`
}
// SendToVictoria sends exactly ONE JSON record to the VictoriaLogs JSONLine API.
//
// This function is called from main.go inside the ingestion loop.
// It performs the following steps:
//
// 1. Marshal the JSONLogRecord into a compact JSON object
// 2. Build an HTTP POST request with Content-Type: application/stream+json
// 3. Send the request to VictoriaLogs
// 4. Check for non-2xx HTTP status codes
//
// Notes:
// - VictoriaLogs expects a "streaming JSON" format, where each POST body
// contains a single JSON object (or multiple lines if needed).
// - We send logs one-by-one for simplicity, but batching can be added later.
// - The caller controls the ingestion endpoint URL, including query params:
// ?_msg_field=message&_time_field=date&_time_format=unix_ms&_stream_fields=stream
//
// Errors returned from this function are logged in main.go, and do not
// interrupt the ingestion pipeline.
func SendToVictoria(url string, rec *JSONLogRecord) error {
// serialize record into JSON line
body, err := json.Marshal(rec)
if err != nil {
return err
}
// debug output for transparency
fmt.Println("--- VMLOGS SEND ---")
fmt.Println(string(body))
fmt.Println("--- VMLOGS DEBUG ---")
// create POST request with streaming JSON payload
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
return err
}
// JSONLine ingestion requires this content type
req.Header.Set("Content-Type", "application/stream+json")
// send HTTP request using default HTTP client
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// check server-side errors
if resp.StatusCode >= 300 {
return fmt.Errorf("victoria response status: %s", resp.Status)
}
return nil
}
Вже запустив в Kubernetes, все працює.
Єдиний момент, що SQS та S3 операції не падають при access denied, треба додати перевірку помилок.
Почав писати log collector з S3 до VictoriaLogs з використанням AWS GO SDK, і в коді достатньо багато використовуються різні Input/Ouput операції, бо треба отримати лог, розпарсити, записати дані.
Тож цього разу подивимось на дуже класний приклад використання інтерфейсів при роботі з функцією io.Copy() і ще раз трохи зазирнемо під капот внутрішньої реалізації інтерфейсів в Go.
Basic I/O example – os.Open(), os.Create() та io.Copy()
Напишемо простий код, який буде з одного файлу копіювати в інший:
package main
import (
"fmt"
"io"
"os"
)
func main() {
// open the source file for reading
// returns pointer to os.File:
// 'func os.Open(name string) (*os.File, error)':
//
// os.File represents an open file descriptor:
// type File struct {
// // contains filtered or unexported fields
// }
sourceFile, err := os.Open("source.txt")
if err != nil {
panic(err)
}
// always close files when done
defer sourceFile.Close()
// create destination file for writing
// 'func os.Create(name string) (*os.File, error)'
destFile, err := os.Create("dest.txt")
if err != nil {
panic(err)
}
defer destFile.Close()
// copy data from source to destination
// io.Copy() pulls bytes from any Reader and pushes them into any Writer
// 'func io.Copy(dst io.Writer, src io.Reader) (written int64, err error)'
bytesWritten, err := io.Copy(destFile, sourceFile)
if err != nil {
panic(err)
}
fmt.Println("Copied bytes:", bytesWritten)
}
Тут ми:
з os.Open("source.txt") відкриваємо файл на читання
з os.Create("dest.txt") створюємо файл, в який будемо копіювати дані
і з io.Copy() копіюємо дані з “source.txt” до “dest.txt“
І для обох функцій – і Copy(), і copyBuffer() – аргументи є interface type:
А type Writer interface описує вимоги до типу, що може бути використаний через цей інтерфейс: такий тип повинен мати метод Write(), приймати аргумент з типом slice of bytes, і повертати значення int та err:
// Writer is the interface that wraps the basic Write method.
type Writer interface {
Write(p []byte) (n int, err error)
}
Об’єкт destFile повинен мати метод Write() – який в нього є, бо destFile – це *os.File struct, в якої є набір методів, в тому числі як раз Read() та Write():
$ go doc os.File
package os // import "os"
type File struct {
// Has unexported fields.
}
File represents an open file descriptor.
...
func (f *File) Read(b []byte) (n int, err error)
...
func (f *File) Write(b []byte) (n int, err error)
А отже, маючи об’єкт з типом *os.File – ми через відповідні інтерфейси можемо викликати *os.File.Write():
func Copy(dst Writer, ...) каже – “dst повинен мати метод Write([]byte) (int, error)“
тип *os.File має метод Write() – а значить він задовольняє Writer interface
Тобто: інтерфейси в Go описують не типи даних, а вимоги до методів, які мають бути реалізовані, щоб ці методи можна було викликати через інтерфейс.
І коли ми пишемо і запускаємо io.Copy(destFile, ...) – під капотом Go під час компіляції програми:
перевіряє, який тип приймає io.Copy() – це interface type
аби задовільнити конкретно цей interface type – об’єкт (тип), який передається аргументом до io.Copy(), повинен мати метод Write()
Go перевіряє, чи є у переданого типу такий метод – чи є для об’єкту *os.File метод Write()
Далі – “магія”, описана в попередньому пості: ще раз глянемо на те, як працюють інтерфейси, як через них викликаються методи, і що саме знаходиться в аргументах io.Copy() та copyBuffer() при роботі програми.
Структури iface та itab
Коли ми передаємо обʼєкт (pointer на *os.File) у параметр із типом інтерфейсу (dst Writer) – то Go формує дві внутрішні структури, які передаються до функцій як interface value.
type iface struct {
// Pointer to the 'itab' (interface table)
tab unsafe.Pointer
// Pointer to the actual data (our *os.File struct)
data unsafe.Pointer
}
Де:
tab unsafe.Pointer: pointer на другий тип type itab, який описаний в type ITab struct
раніше було type itab struct, зараз перенесли в Go ABI, про ABI в наступному пості
data unsafe.Pointer: pointer на наш об’єкт з типом os.File struct, який має метод Write()
Друга структура, ITab struct, має свої три поля:
type ITab struct {
// pointer to the 'type Writer interface'
Inter *InterfaceType
// pointer to the 'type File struct'
Type *Type
// in our case if we have 1 method, thus '[N]uintptr' == [1]uintptr
// and in the 'fun[0]' will be the address of the method 'Write()' of the 'os.File' struct
Fun [1]uintptr // will have '[1]uintptr', and
}
Тут:
Inter *InterfaceType: pointer на опис type Writer interface
“який інтерфейс треба задовольнити“
Type: pointer на опис конкретного типу значення (у нашому випадку тип *os.File)
“який тип ми передаємо“
'Fun[0]': буде посиланням на метод Write() структури os.File
“ось адреси методів, які цей тип використовує для реалізації цього інтерфейсу“
І коли ми в коді передаємо значення типу *os.File в параметр інтерфейсного типу (dst Writer) – то Go створює ці структури, і передає структура iface з полями tab і data до виклику io.Copy(), а потім далі – до copyBuffer():
io.Copy(iface):
- iface.tab => вказівник на структуру itab
- iface.data => вказівник на *os.File
В itab struct маємо таблицю методів, пов’язаних з цим інтерфейсом (або – які імплементують цей інтерфейс), а в полі fun структури itab знаходиться масив з pointers, де кожен елемент містить адресу функції, яка реалізує відповідний метод інтерфейсу для конкретного типу.
І у випадку з інтерфейсом Writer – це буде масив fun[0] зі значенням, наприклад, 0xc000014070, де за адресою 0xc000014070 буде розташований метод Write() типу *os.File.
І коли в copyBuffer(dst Writer) виконується виклик Write(), який описаний як:
Повертаючись до твердження “при виклику io.Copy(*os.File) – викликається copyBuffer(), якому першим аргументом передається структура iface” – давайте подивимось на аргументи, з якими ми працюємо.
Перевірка типів інтерфейсних значень в аргументах
Аби побачити все своїми очима – повторимо “хак” з попереднього поста – створимо власну структуру, яка аналогічна до iface, бо напряму до iface ми звернутись не можемо – але можемо прочитати її памʼять через unsafe.Pointer.
І на додачу створимо власну функцію myCopy(), яка буде мати в параметрах наші власні інтерфейси – аналогічно тому, як це зроблено для io.Copy().
Тобто – ми повністю повторюємо поведінку оригінального io.Copy(), але замість справжніх io.Reader та io.Writer використовуємо свої інтерфейси і власну структуру myIfaceStruct, аби подивитись, як Go зберігає інтерфейс у памʼяті:
створюємо два об’єкти sourceFile та destFile, які є pointers на *os.File
описуємо власну функцію myCopy(), яка в параметрах описує отримання інтерфейсних типів
наші інтерфейси myReaderInterface та myWriterInterface вимагають методів Read() та Write(), які є у sourceFile та destFile
Код виходить такий:
package main
import (
"fmt"
"io"
"os"
"unsafe"
)
type myIfaceStruct struct {
tab unsafe.Pointer
data unsafe.Pointer
}
// Writer is the interface that wraps the basic Write method.
type myWriterInterface interface {
// define Write method to satisfy the myWriterInterface interface
Write(p []byte) (n int, err error)
}
// Reader is the interface that wraps the basic Read method.
type myReaderInterface interface {
// define Read method to satisfy the myReaderInterface interface
Read(p []byte) (n int, err error)
}
// accept any type which has Read and Write methods
func myCopy(src myReaderInterface, dst myWriterInterface) (int64, error) {
// '&src' gives us the address of the interface variable 'src'
// 'unsafe.Pointer(&src)' allows us to reinterpret that memory as a different type
// the interface value occupies 16 bytes:
// - first 8 bytes: pointer to the method/type table ('tab')
// - next 8 bytes: pointer to the actual value ('data')
// '(*myIfaceStruct)(...)' tells Go to treat those bytes as a 'myIfaceStruct'
// '*(*myIfaceStruct)(...)' finally copies those bytes into the 'rawIface' variable
rawIface := *(*myIfaceStruct)(unsafe.Pointer(&src))
fmt.Println()
// Print diagnostic messages
//
// we intentionally use '%p' modifier with a non-pointer value argument
// this causes a formatting error, and 'fmt' prints a diagnostic message
// that includes the full content of 'rawIface' (its type and both fields)
fmt.Printf("'rawIface' data: %p\n", rawIface)
// same idea for %s: &src is a *myReaderInterface, not a string
// so fmt prints a diagnostic message showing the type and value
fmt.Printf("'src' data: %s\n", &src)
fmt.Println()
// Print addresses from the 'iface' struct
//
// 'tab' field is a pointer to the interface's method table (the 'itab' struct)
// this value is copied from the real interface value stored in 'src'
fmt.Printf("Copy of the 'iface.tab': address stored inside 'rawIface.tab': %p\n", rawIface.tab)
// 'data' field is a pointer to the underlying object (the *os.File struct)
// also copied directly from the actual interface storage
fmt.Printf("Copy of the 'iface.data': address stored inside 'rawIface.data': %p\n", rawIface.data)
// print the address of the real underlying object (*os.File)
// this should match the value stored in rawIface.data
fmt.Printf("The 'src' (*os.File) actual object address: %p\n", src)
fmt.Println()
// Test sizes
//
// 'src' will have 16 bytes
// because 'iface' has two fields: 'tab' and 'data'
// they are pointers, each of 8 bytes
fmt.Println("sizeof the 'src' (size of 'iface' with two pointers):", unsafe.Sizeof(src))
// but pointer to the '*os.File' object size will be 8 bytes
testSource, _ := os.Open("source.txt")
fmt.Println("sizeof the 'testSource' (size of '*os.File' with one pointer):", unsafe.Sizeof(testSource))
fmt.Println()
// demonstrate "dynamic types"
//
// - Printf '%T' modifier will print the type of the variable
// - Printf '%p' modifier will print the address pointed to by '&'
fmt.Printf("'src' type: %T\n", src)
// address of the the 'src'
fmt.Printf("'src' address: %p\n", &src)
fmt.Println()
return io.Copy(dst, src)
}
func main() {
// sourceFile is *os.File
sourceFile, _ := os.Open("source.txt")
defer sourceFile.Close()
// destFile is *os.File
destFile, _ := os.Create("dest.txt")
defer destFile.Close()
myCopy(sourceFile, destFile)
}
Запускаємо:
$ go run test-int.go
'rawIface' data: %!p(main.myIfaceStruct={0x4eee38 0xc000062030})
'src' data: %!s(*main.myReaderInterface=0xc000014070)
Copy of the 'iface.tab': address stored inside 'rawIface.tab': 0x4eee38
Copy of the 'iface.data': address stored inside 'rawIface.data': 0xc000062030
The 'src' (*os.File) actual object address: 0xc000062030
sizeof the 'src' (size of 'iface' with two pointers): 16
sizeof the 'testSource' (size of '*os.File' with one pointer): 8
'src' type: *os.File
'src' address: 0xc000014070
І розбираємо результат.
Перші два – зовсім “грязний хак”, випадково на нього натрапив: якщо до модифікатора в fmt.Printf() передати не той тип даних, який він очікує – він виводить повідомлення з деталями по помилці, де можемо побачити, що саме повністю передавалось (хоча як виявилось, під капотом просто викликається (reflect.TypeOf(p.arg).String())).
Перший блок:
rawIface є типом main.myIfaceStruct, яка містить два вказівники на адреси 0x4eee38 та 0xc000062030 – див. далі про зміст rawIface
src є поінтером на *main.myReaderInterface – структуру, яка знаходиться за адресою 0xc000014070
Далі – виводимо адреси, які зберігаються в полях iface (і які ми отримали через нашу власну структуру):
'rawIface.tab': 0x4eee38 – тут адреса розміщення itab struct
'rawIface.data': 0xc000062030 – тут адреса переданого через src об’єкту os.File
і ту саму адресу ми бачимо в наступному рядку – src є pointer на *os.File, з Printf(%p) отримуємо адресу, на яку src вказує
Найбільш явний доказ того, що насправді myCopy() у (src myReaderInterface) працює з інтерфейсом, а не *os.File – це розмір:
з unsafe.Sizeof(src) отримуємо розмір самого інтерфейсного значення (iface), яке складається з двох pointers – tab і data, по 8 байт кожен
а testSource := os.Open("source.txt") має розмір 8 байт, бо це один поінтер
Інтерфейси Go та “dynamic type”
А далі ми бачимо те, що називають “динамічними типами”: в результатах unsafe.Sizeof(src)) ми побачили, що там 2 поінтери, тобто це 100% тип interface value з двома pointers.
Але в fmt.Printf("'src' type: %T\n", src) ми отримуємо тип *os.File – бо це pointer на структуру os.File:
$ go run test-int.go
...
'src' data: %!s(*main.myReaderInterface=0xc000014070)
...
'src' type: *os.File
'src' address: 0xc000014070
The static type (or just type) of a variable is the type given in its declaration, the type provided in the new call or composite literal, or the type of an element of a structured variable. Variables of interface type also have a distinct dynamic type, which is the (non-interface) type of the value assigned to the variable at run time (unless the value is the predeclared identifier nil, which has no type). The dynamic type may vary during execution but values stored in interface variables are always assignable to the static type of the variable.
Отже, змінна має static type, коли:
змінна оголошується (var i int)
тип заданий під час присвоювання даних при виклику функцій (x := new(int))
Проте змінні інтерфейсного типу завжди мають фіксований статичний тип (сам інтерфейс) – але реальний об’єкт всередині неї має окремий dynamic type – це конкретний тип значення, присвоєного під час виконання.
І в нашому прикладі вище – iface.data як раз і є тою змінною, яка визначає dynamic type, і тому ми в результаті fmt.Printf("'src' type: %T\n", src) бачимо саме *os.File.
Додаємо до нашого коду ще трохи дебагу:
...
// show the static type of the interface itself
// - (*myReaderInterface)(nil) creates a nil pointer to the interface type
// - reflect.TypeOf(...) gives the type of that pointer
// - Elem() gives the type the pointer points to (the interface type)
// this demonstrates that the static type is 'myReaderInterface'
fmt.Println("static type of the 'myReaderInterface':", reflect.TypeOf((*myReaderInterface)(nil)).Elem())
// show the dynamic type stored inside the interface variable 'src'
// - 'src' is an interface value (16-byte iface: tab + data)
// - reflect.TypeOf(src) reads the real type stored in iface.data
// this prints the actual type, '*os.File' in our case
//
// and this is exactly the same information that 'fmt.Printf("%T", src)' prints:
// both reflect.TypeOf(src) and %T reveal the dynamic type stored in the interface
fmt.Printf("'src' dynamic type: %v\n", reflect.TypeOf(src))
// show the type of the variable 'src' itself, not the value stored inside it
// this is exactly what the myCopy() function "sees" when receiving its argument
// - '&src' is a pointer to the interface variable
// - reflect.TypeOf(&src) therefore reports: "*myReaderInterface"
// this confirms that 'src' is an interface-typed variable, not a concrete value
fmt.Println("'src' variable type: ", reflect.TypeOf(&src))
...
Результат:
$ go run test-int.go
...
static type of the 'myReaderInterface': main.myReaderInterface
'src' dynamic type: *os.File
'src' variable type: *main.myReaderInterface
Тут ми:
в першій перевірці просто створюємо вказівник на інтерфейсний тип (але без створення самого об’єкту): результат є main.myReaderInterface
другий результат – “прочитай значення інтерфейсної змінної src, і скажи, який там тип” – саме тут ми бачимо, що в iface.data зберігається pointer на об’єкт типу – *os.File
третя перевірка – “сходи за адресою, де зберігається змінна src, і скажи який за цією адресою тип даних” – отримуємо pointer на *main.myReaderInterface
Використання інтерфейсів на прикладі io.Copy()
То що це все значить для нас?
А значить, що використовуючи інтерфейси, ми можемо передати будь-які значення (типи), які реалізують інтерфейс.
Якщо повернутись до нашого першого коду, то в io.Copy() першим параметром ми можемо передати будь-який тип, який має метод Write([]byte) (int, error), а в другий – аналогічно, тільки Read(), бо під капотом Copy() викликає copyBuffer(), а той просто створює буфер розміром в 32 кілобайти, чей який “переливає” з одного “каналу” в інший:
func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
...
if buf == nil {
size := 32 * 1024
...
buf = make([]byte, size)
}
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
...
А значить – ми можемо у Writer передати os.Stdout, тобто просто вивести на консоль:
...
func main() {
// open the source file for reading
// returs pointer to os.File:
// 'func os.Open(name string) (*os.File, error)':
//
// os.File represents an open file descriptor:
// type File struct {
// // contains filtered or unexported fields
// }
sourceFile, err := os.Open("source.txt")
if err != nil {
panic(err)
}
// always close files when done
defer sourceFile.Close()
// printto console instead
// actualy, os.Stdout is also *os.File
// thus it also has Write() method
bytesWritten, err := io.Copy(os.Stdout, sourceFile)
if err != nil {
panic(err)
}
fmt.Println("Copied bytes:", bytesWritten)
}
Результат:
$ go run main.go
source
Copied bytes: 7
Або можемо створити власний буфер в пам’яті, і писати в нього, бо bytes.Buffer теж має метод Write():
Є задачка на моніторинг костів на OpenAI – бачити скільки за добу витрачено кожним проектом, і слати алерти в Slack, коли витрати завеликі.
Потикав кілька готових експортерів для OpenAI, але не побачив там метрик саме по костам, тому просто напишемо свій.
Писати будемо на Golang, ідея дуже проста – з OpenAI API отримуємо дані, генеруємо метрику, відправляємо її до VictoriaMetrics.
На Go останній раз писав у 2019 році, і то один раз, тому заодно будемо згадувати що і як працює, і місцями дивитись деталі реалізації різних бібліотек.
Поїхали.
OpenAI API
Документація по OpenAI API – Costs та повертаєме значення – Costs object.
Для доступу до Costs потрібен окремий ключ – робимо на platform.openai.com в Admin keys:
Для отримання Costs треба задавати параметр start_time в Unix форматі – створюємо змінну:
для структури Request маємо метод EnableTrace() який теж повертає Request
і для того ж Request маємо метод Get(), який теж повертає Request плюс error
І це дозволяє нам будувати ланцюжки запитів – Client => R() => Request => EnableTrace() => Request => Get().
Окей, давайте до коду.
Створення resty клієнта
Пишемо main.go:
package main
import (
"fmt"
"github.com/go-resty/resty/v2"
)
// set global const as ay be used in other packages
const (
baseURL = "https://api.openai.com/v1"
costsPath = "/organization/costs"
)
func main() {
client := resty.New()
// build 'https://api.openai.com/v1/organization/costs'
response, err := client.R().Get(baseURL + costsPath)
if err != nil {
panic(err)
}
fmt.Println(response)
}
Запускаємо:
$ go run main.go
{
"error": {
"message": "You didn't provide an API key. You need to provide your API key in an Authorization header using Bearer auth (i.e. Authorization: Bearer YOUR_KEY). You can obtain an API key from https://platform.openai.com/account/api-keys."
...
Далі нам треба додати auth header до нашого запиту – використовуємо метод func (*Client) SetAuthToken, який просто додає значення до поля Token в об’єкті Client.
Ще є окремий метод func (r *Request) SetAuthToken, який задає токен на конкретні реквести, а не на весь клієнт, але в нашому випадку робимо простіше, через загальний Client.
Робимо method chaining із прикладу вище – для Client викликаємо SetAuthToken(), який задає токен, наступним викликаємо R() для створення request, і наступним викликаємо Get(), в який передаємо URL:
Зараз нам треба тільки start_time, але потім будемо додавати ще, тому можна їх відразу записати в map, який потім передамо до SetQueryParams().
Для start_time нам треба передати час – робимо з time.Now(), і передавати дату до OpenAI API нам треба в Unix форматі, тому використовуємо функцію Unix().
Перевіряємо як воно буде виглядати:
gore> :import time
gore> timeNow := time.Now().Unix()
1762956432
Додаємо в код створення змінної timeNow з часом, створення setQueryParams map of strings зі списком параметрів теж в strings, і додаємо виклик SetQueryParams() до client:
А в функції parseResponseBody() викликається метод Unmarshalc, який в свою чергу викликає Client.JSONUnmarshal(), а поле JSONUnmarshal містить функцію json.Unmarshal():
...
func createClient(hc *http.Client) *Client {
if hc.Transport == nil {
hc.Transport = createTransport(nil)
}
c := &Client{ // not setting lang default values
...
JSONUnmarshal: json.Unmarshal,
...
array, масив: фіксована довжина, індексований тип, всі об’єкти того самого типу – [3]int{1,2,3}
slice: аналогічний до array, але не фіксованої довжини – []int{1,2,3}
maps: набір key:value елементів змінної довжини одного типу – map[string]string{"key_name": "value_value"}
structs: комплексний тип, який може включати в себе інші типи – struct{ Name string; Age int }{ Name: "Nino", Age: 35 }
Так як ми знаємо, які типи ми отримуємо з API та всі поля в них – то нам підійде slice of structs, де кожен елемент slice буде структурою з полями, в яких ми будемо зберігати project_id, amount та project_name.
Структура для Project ID та Amount
Структура може виглядати так:
type ProjectSpend struct {
ProjectID string
ProjectSpend int
}
А потім створимо slice з цією структурою:
data := []ProjectSpend{}
Тепер давайте подивимось на те, що нам повертає OpenAI API.
має кілька JSON properties – "object": "page", etc
далі йде масив data []
який містить в собі інший object {}
який починається з properties "object": "bucket", etc
і в якому є інший масив results []
який включає в себе ще один object {}
який починається із property "object": "organization.costs.result"
за яким слідує property amount, який містить в собі вкладений object {}
з двома property – value та value
Якщо ми хочемо це відобразити в Go struct – то нам потрібно створити кілька структур, які будуть передавати дані одна до одної:
перша структура “захоплює” data[]
друга структура – отримує results[]
третя – отримує значення поля project_id
а четверта – зчитує amount
Як це може виглядати в коді – з використання structs composition, коли одна структура містить в собі поле, яке є іншою структурою:
type ResponceAmount struct {
Value float64
}
type ResponceProjectID struct {
ProjectID string `json:"project_id"`
Amount ResponceAmount
}
type ResponseResults struct {
Results []ResponceProjectID
}
type ResponseData struct {
Data []ResponseResults
}
res := &ResponseData{}
І тепер можемо виконати json.Unmarshall через виклик SetResult(), в який ме передаємо pointer – res := &ResponseData{}:
$ go run main.go
...
Result: &{[{[{proj_1 {2.16911625}} {proj_Agtar0XzJdXXLhGt8YCRNZMY {0.1846203}} {proj_2 {0.1531728}} {proj_3 {0.19788874999999997}}]}]}
Або можемо зробити більш лаконічно – використовуючи nested anonymous structs:
...
// catch data[] and pass to nested struct
// catch results[] and pass to next nested struct
// catch 'project_id' property to the 'ProjectID' field, and pass to next nested struct
// catch 'amount' property to the 'Amount' field, and pass to next nested struct
// finally, catch 'value' property to the 'Value' field
type ResponseData struct {
Data []struct {
Results []struct {
ProjectID string `json:"project_id"`
Amount struct {
Value float64
}
}
}
}
...
І отримаємо той самий результат.
А далі нам потрібно буде згенерувати метрики з лейблами.
Робимо це у два цикли for, в яких перебираємо поля кожної структури:
...
// catch each item from the 'Response.Data[]'
for _, dataItem := range res.Data {
// catch each iteam from the 'Response.Data[].Results[]'
for _, result := range dataItem.Results {
project := result.ProjectID
amount := result.Amount.Value
// print in VictoriaMetrics gauge format
fmt.Printf("openai_stats{type=\"costs\", project=\"%s\"} %f\n", project, amount)
}
}
...
Результат:
$ go run main.go
openai_stats{type="costs", project="proj_1"} 2.170784
openai_stats{type="costs", project="proj_2"} 0.241411
openai_stats{type="costs", project="proj_3"} 0.213558
openai_stats{type="costs", project="proj_4"} 0.198619
А тепер зробимо аналогічно, але для імен проектів, бо мати в лейблах метрик значення у вигляді “proj_123” зовсім незручно, хочеться вивести нормальні імена.
Структура для Project Names
Додаємо другий ендпоінт, див. документацію List projects:
Додавання OPENAI_ADMIN_KEY ключа і параметрів переносимо в створення клієнта, після чого викликаємо нашу функцію, якій передаємо створений і налаштований клієнт:
sanitize імен – форматування даних зі strings.Replace()
Але в іменах у нас є пробіли та символи “/”, і імена проектів містять заглавні букви – а нам в лейблах метрик треба мати вид “my_project_name“.
Додамо функцію, яка буде виконувати нормалізацію використовуючи методи ToLower() та ReplaceAll() із пакету strings:
...
func normalizeLabel(s string) string {
s = strings.ToLower(s)
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, "/", "_")
return s
}
...
Наступний крок – побудувати map, в якій ми будемо мати project_id та project_names:
...
projectNames := make(map[string]string)
// get each 'ProjectsResponse.Data[].ID'
// get each 'ProjectsResponse.Data[].Name'
// populate the projectNames map with:
// 'project_id' = 'project_name'
for _, p := range projectsRes.Data {
projectNames[p.ID] = normalizeLabel(p.Name)
}
fmt.Println("Projects Names: ", projectNames)
...
В результаті маємо:
$ go run main.go
Projects Names: map[proj_1:kraken_production proj_2:assistant_test_eval proj_3:knowledge_base proj_4:default_project]
І тепер оновлюємо наші два цикли – використовуємо в лейблі імена замість ID:
...
// catch each item from the 'Response.Data[]'
for _, dataItem := range costsRes.Data {
// catch each item from the 'Response.Data[].Results[]'
for _, result := range dataItem.Results {
// get ''Response.Data[].Results[].ProjectID'
id := result.ProjectID
// get ''Response.Data[].Results[].Amount.Value'
amount := result.Amount.Value
// use the 'id' to get the project name from the projectNames map
project := projectNames[id]
if project == "" {
project = "unknown"
}
// print in VictoriaMetrics gauge format
fmt.Printf("openai_stats{type=\"costs\", project=\"%s\"} %f\n", project, amount)
}
}
...
І результат:
$ go run main.go
openai_stats{type="costs", project="knowledge_base"} 2.170784
openai_stats{type="costs", project="kraken_production"} 0.241411
openai_stats{type="costs", project="assistant_test_eval"} 1.083077
openai_stats{type="costs", project="default_project"} 0.461237
Зараз весь код у нас такий:
package main
import (
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/go-resty/resty/v2"
)
// set global const as ay be used in other packages
const (
baseURL = "https://api.openai.com/v1"
costsPath = "/organization/costs"
projectsPath = "/organization/projects"
)
// catch data[] and pass to nested struct
// catch results[] and pass to next nested struct
// catch 'project_id' property to the 'ProjectID' field, and pass to next nested struct
// catch 'amount' property to the 'Amount' field, and pass to next nested struct
// finally, catch 'value' property to the 'Value' field
type CostsResponseData struct {
Data []struct {
Results []struct {
ProjectID string `json:"project_id"`
Amount struct {
Value float64
}
}
}
}
type ProjectsResponse struct {
Data []struct {
ID string
Name string
}
}
func getOpenAi(client *resty.Client, path string, out any) error {
_, err := client.R().
SetResult(out).
Get(path)
return err
}
func normalizeLabel(s string) string {
s = strings.ToLower(s)
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, "/", "_")
return s
}
func main() {
//client := resty.New()
apiKey := os.Getenv("OPENAI_ADMIN_KEY")
timeNow := strconv.FormatInt(time.Now().Unix(), 10)
setQueryParams := map[string]string{
"start_time": timeNow,
"group_by": "project_id",
}
client := resty.New().
SetAuthToken(apiKey).
SetQueryParams(setQueryParams)
// use pointer to ResponseData struct
// as 'json.Unmarshal' requires a pointer to write results
costsRes := &CostsResponseData{}
if err := getOpenAi(client, baseURL+costsPath, costsRes); err != nil {
panic(err)
}
projectsRes := &ProjectsResponse{}
if err := getOpenAi(client, baseURL+projectsPath, projectsRes); err != nil {
panic(err)
}
projectNames := make(map[string]string)
// get each 'ProjectsResponse.Data[].ID'
// get each 'ProjectsResponse.Data[].Name'
// populate the projectNames map with:
// 'project_id' = 'project_name'
for _, p := range projectsRes.Data {
projectNames[p.ID] = normalizeLabel(p.Name)
}
// catch each item from the 'Response.Data[]'
for _, dataItem := range costsRes.Data {
// catch each item from the 'Response.Data[].Results[]'
for _, result := range dataItem.Results {
// get ''Response.Data[].Results[].ProjectID'
id := result.ProjectID
// get ''Response.Data[].Results[].Amount.Value'
amount := result.Amount.Value
// use the 'id' to get the project name from the projectNames map
project := projectNames[id]
if project == "" {
project = "unknown"
}
// print in VictoriaMetrics gauge format
fmt.Printf("openai_stats{type=\"costs\", project=\"%s\"} %f\n", project, amount)
}
}
}
Тепер можемо переходити до формування реальних метрик та записати їх до VictoriaMetrics.
Планування метрик для VictoriaMetrics
Отже, метрики у нас будуть у вигляді openai_stats{type="costs", project="prodject_id"} 5.55.
А що сказано в задачі, що треба в результаті?
якщо денний спендінг на опенаі перевищує середній за останні дні (з певним трешолдом) – кричати в слак
Значить, нам потрібна буде сума за добу, і маючи її, ми можемо робити порівняння з попередніми періодами часу.
І потім можемо для алерту створити запит на кшталт такого:
if
avg_over_time(openai_stats{type="costs", project="prodject_id"}[1d)
>
avg_over_time(openai_stats{type="costs", project="prodject_id"}[3d)
then send alert
Але з Counter є нюанс – він обнуляється, якщо експортер перезапуститься – див. counter reset.
Крім того, якщо ми отримуємо дані починаючи з 00:00 – то з наступного дня значення буде починатись з 0,00 USD.
А значить, у нас значення в метриці може і збільшуватись, і зменшуватись, а значить – нам потрібен не Counter, а Gauge.
VictoriaMetrics Go client
Є бібліотека для Prometheus, але так як у нас VictoriaMetrics – то беремо їхній пакет, який до того ж має функцію PushMetrics(), з якою ми можемо відразу пушити метрики до VictoriaMetrics.
Дивимось документацію по type Gauge, там є приклад створення об’єкта метрики.
Функція NewGauge() приймає два аргументи – ім’я метрики з лейблами та функцію, яка виконує оновлення значення для цієї метрики, див. gauge.go:
$ go run main.go
test_openai_stats{type="costs", project="assistant_test_eval"} 4.9838991
test_openai_stats{type="costs", project="default_project"} 0.5281144000000001
test_openai_stats{type="costs", project="knowledge_base"} 2.17244425
test_openai_stats{type="costs", project="kraken_production"} 0.5510669499999999
Супер.
А тепер подумаємо над всією логікою виконання.
Що у нас є зараз:
створення resty.Client
ініціалізація структури costsRes := &CostsResponseData{}
виклик getOpenAi() з аргументами (client, baseURL+costsPath, costsRes), де ми заповнюємо дані в структурі CostsResponseData
ініціалізація projectsRes := &ProjectsResponse{}
виклик getOpenAi() з аргументами (client, baseURL+projectsPath, projectsRes), де ми заповнюємо дані в структурі ProjectsResponse
ініціалізація мапи projectNames
заповнення її з даними "project_id": "project_name"
далі цикли, в яких:
отримуємо project_id
отримуємо amount
по project_id отримуємо ім’я проекту, записуємо в змінну project
генеруємо ім’я метрики і лейблу з project в metricName
з metrics.NewGauge генеруємо нову метрику
з gauge.Set(amount) записуємо в неї значення
з metrics.WritePrometheus() всі згенеровані метрики виводимо на консоль
І все це зараз виконується при виклику main().
Натомість нам при виклику main(), тобто при старті експортера, треба:
створити resty.Client
далі періодично виконувати оновлення даних та записувати дані до VictoriaMetrics:
з getOpenAi() заповнити структуру ProjectsResponse
з getOpenAi() заповнити структуру CostsResponseData
заповнити projectNames
запустити цикли для генерації метрик і виконання Set()
в кінці циклу виконати WritePrometheus()
Правда, при такому підході ми кожну годину будемо перезаписувати поля в ProjectsResponse, CostsResponseData та projectNames, що наче не дуже ОК з точки зору перформансу – але якщо у нас з’явиться новий проект, то ми його відразу “спіймаємо”, і додамо нову метрику для нього.
Отже, що треба зробити – це винести нашу логіку в окрему функцію, раз на годину викликати її, а потім виконувати WritePrometheus().
Пишемо цю функцію, тільки міняємо NewGauge() на GetOrCreateGauge(), бо при наступному виклику нашої функції метрики вже будуть створені:
...
func fetchAndPush(client *resty.Client, costsRes *CostsResponseData, projectsRes *ProjectsResponse, projectNames map[string]string) {
if err := getOpenAi(client, baseURL+costsPath, costsRes); err != nil {
panic(err)
}
if err := getOpenAi(client, baseURL+projectsPath, projectsRes); err != nil {
panic(err)
}
// get each 'ProjectsResponse.Data[].ID'
// get each 'ProjectsResponse.Data[].Name'
// populate the projectNames map with:
// 'project_id' = 'project_name'
for _, p := range projectsRes.Data {
projectNames[p.ID] = normalizeLabel(p.Name)
}
// catch each item from the 'Response.Data[]'
for _, dataItem := range costsRes.Data {
// catch each item from the 'Response.Data[].Results[]'
for _, result := range dataItem.Results {
// get 'Response.Data[].Results[].ProjectID'
// i.e. 'proj_123'
id := result.ProjectID
// get 'Response.Data[].Results[].Amount.Value'
amount := result.Amount.Value
// use the 'id' to get the project name from the projectNames map
project := projectNames[id]
if project == "" {
project = "unknown"
}
// print in VictoriaMetrics gauge format
//fmt.Printf("openai_stats{type=\"costs\", project=\"%s\"} %f\n", project, amount)
metricName := fmt.Sprintf(`test_openai_stats{type="costs", project="%s"}`, project)
gauge := metrics.GetOrCreateGauge(metricName, nil)
gauge.Set(amount)
}
}
metrics.WritePrometheus(os.Stdout, false)
}
...
Тепер в main() у нас залишається:
...
func main() {
//client := resty.New()
apiKey := os.Getenv("OPENAI_ADMIN_KEY")
timeNow := strconv.FormatInt(time.Now().Unix(), 10)
setQueryParams := map[string]string{
"start_time": timeNow,
"group_by": "project_id",
}
client := resty.New().
SetAuthToken(apiKey).
SetQueryParams(setQueryParams)
// use pointer to ResponseData struct
// as 'json.Unmarshal' requires a pointer to write results
costsRes := &CostsResponseData{}
projectsRes := &ProjectsResponse{}
// will be populated with key:value pairs:
// 'proj_123' = 'kraken_production'
projectNames := make(map[string]string)
fetchAndPush(client, costsRes, projectsRes, projectNames)
}
Запускаємо для перевірки:
$ go run main.go
test_openai_stats{type="costs", project="assistant_test_eval"} 6.3417053
test_openai_stats{type="costs", project="default_project"} 0.6592560500000001
test_openai_stats{type="costs", project="knowledge_base"} 2.17244425
test_openai_stats{type="costs", project="kraken_production"} 0.6170747
Тепер нам треба замість простого виводу на консоль записати дані до VictoriaMetrics.
Запис метрик до VictoriaMetrics з InitPush() та PushMetrics()
Для запису метрик до VictoriaMetrics маємо дві основні функції – InitPush() та PushMetrics().
Функція InitPush()
Функція InitPush() дозволяє виконувати періодичні записи із заданим interval, а PushMetrics() – просто разово записати всі метрики, які є в Set struct. Про Set трохи далі.
Тепер просто інтересу заради розберемо, як саме VictoriaMetrics клієнт виконує запис.
ми в нашому коді викликаємо InitPush(), передаємо до цієї функції URL та інтервал
InitPush() створює змінну writeMetrics – анонімну функцію, яка приймає аргумент типу io.Writer, і яка потім буде викликати функцію WritePrometheus(), в яку передається цей io.Writer
далі викликається функція InitPushExt(), якій передається pushURL, interval, та об’єкт writeMetrics
Тут просто додаються параметри зі структури PushOptions, в яку можемо передати параметри типу extraLabels, і потім викликається InitPushExtWithOptions(), в яку передається наш writeMetrics.
Дивимось InitPushExtWithOptions(): тут створюється goroutine, яка із заданим interval викликає pushMetrics(), в яку передається наш об’єкт writeMetrics (тобто та анонімна функція, яка буде викликати WritePrometheus()):
В свою чергу pushMetrics() створює буфер bytes.Buffer, передає його до writeMetrics(), writeMetrics() викликає WritePrometheus(), яка отримує цей буфер:
// NewSet creates new set of metrics.
//
// Pass the set to RegisterSet() function in order to export its metrics via global WritePrometheus() call.
func NewSet() *Set {
return &Set{
m: make(map[string]*namedMetric),
}
}
Тобто, при виклику NetGauge() ми передаємо аргумент з іменем метрики, NetGauge() викликає NewSet(), передає цю метрику, а NewSet() виконує ініціалізацію структури Set, в поле namedMetric задаючи нашу метрику.
Функція PushMetrics()
Ну а з PushMetrics() все майже аналогічно – створюється writeMetrics, викликається PushMetricsExt():
Отже, що нам треба зробити зараз – це замість WritePrometheus() викликати PushMetrics().
Створення context та виклик PushMetrics()
Для PushMetrics() потрібно передати context, який керує goroutines і завершує їх або по таймауту, або якщо сама програма отримала від системи сигнали SIGTERM чи SIGKILL.
Детальніше про context трохи далі, поки просто додаємо import "context", в main() створюємо пустий контекст з Background():
...
import (
"context"
...
func main() {
...
// will be populated with key:value pairs:
// 'proj_123' = 'kraken_production'
projectNames := make(map[string]string)
ctx := context.Background()
...
В нашій функції fetchAndPush() додаємо параметр з типом context.Context:
Потім можна переробити на виклик раз на годину – s.Every(1).Hour().Do( ... ), або на початку кожної години – s.Cron("0 * * * *").Do( ... ).
І в кінці запускаємо крон зі StartBlocking(), який блокує завершення самої функції main().
Відкриваємо доступ до VictoriaMetrics в Kubernetes:
$ kk -n ops-monitoring-ns port-forward svc/vmsingle-vm-k8s-stack 8428
Запускаємо наш експортер:
$ go run main.go
test_openai_stats{type="costs", project="assistant_test_eval"} 6.501765299999999
test_openai_stats{type="costs", project="default_project"} 0.6592560500000001
test_openai_stats{type="costs", project="knowledge_base"} 2.17411225
test_openai_stats{type="costs", project="kraken_production"} 0.6471627999999999
^Csignal: interrupt
І перевіряємо дані вже у VictoriaMetrics:
Правда, тут з’явився якийсь “unknown” проект, треба буде додати логування.
Що ще треба поправити:
зараз ініціалізація структур CostsResponseData та ProjectsResponse виконується в main(), і потім при кожному виклику fetchAndPush() в них записуються дані
якщо проект видалиться з OpenAI – він залишиться в структурах, і ми будемо продовжувати писати метрики для проекту, якого вже нема
треба винести в саму fetchAndPush() і просто кожного разу заповнювати їх з нуля
аналогічно з projectNames – перенести ініціалізацію в саму fetchAndPush()
SetQueryParams – зараз передається однаково для обох викликів getOpenAi(), але в /organization/projects нема параметра group_by
в метриці лейблу type="" краще замінити на category=""
додати external lablels – щось типу “job="openai-exporter"“
замість використання panic(err) – записувати в лог, повертати помилку до викликаючої функції і обробляти там
додати коректну обробку сигналів SIGTERM та SIGINT
resty.client вміє виконувати retry при помилках, треба додати SetRetryCount() і SetRetryWaitTime()
ну і додати логи виконання і помилок
Створення Golang context
Під час роботи в нашому коді запускається кілька одночасних операцій – з gocron.NewScheduler() ми запускаємо виконання нашої функції fetchAndPush(), в ній у нас запускаються HTTP-запити з resty.Client.Get(), у VictoriaMetrics запускаються виклики для запису до VictoriaMetrics endpoint.
Аби все це діло коректно завершити, а не просто “вбити” під час отримання SIGINT або SIGTERM – Go дозволяє нам керувати процесом завершення наших функцій і goroutines через context виконання.
Інший приклад, коли нам треба керувати виконанням операції – це задати ліміт на час виконання, як це, наприклад, зроблено в VictoriaMetrics у функції InitPushExtWithOptions():
...
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
stopCh := ctx.Done()
for {
select {
case <-ticker.C:
ctxLocal, cancel := context.WithTimeout(ctx, interval+time.Second)
err := pc.pushMetrics(ctxLocal, writeMetrics)
...
Тут виконання pc.pushMetrics() обмежено interval, який передається при виклику InitPush().
При цьому context виконання включає в себе не тільки обробку сигналів і керування життєвим циклом функцій і goroutine, но і всю пов’язану з цим виконанням інформацію:
Package context defines the Context type, which carries deadlines, cancellation signals, and other request-scoped values across API boundaries and between processes
Я виніс описання роботи context окремою частиною, бо дуже цікавий механізм, а зараз просто давайте його додамо в наш код.
Отже, що нам треба:
створити context
створити “перехоплювач сигналів” SIGINT (Ctrl+C) та SIGTERM (сигнал від операційної системи, коли виконання програми завершується, наприклад – коли kubelet зупиняє контейнер)
відправити сигнал зупинки всім дочірнім функціям і goroutines
Далі описуємо запуск gocron.NewScheduler(), а в кінці main() запускаємо створення та читання з каналу:
...
// block until Ctrl+C cancels rootCtx
<-rootCtx.Done()
}
Як тільки NotifyContext() отримає SIGTERM – він закриє канал rootCtx.Done(), після чого каскадно закриються канали всіх дочірніх контекстів, потім всі дочірні goroutines, що слухають ці контексти, завершать роботу, і main() зможе коректно завершитись.
resty.client теж вміє працювати з context через SetContext(), йому передаємо наш rootCtx при виклику if err := getOpenAI(ctx, ... ) {...}.
Фінальний результат
Після всіх правок весь код експортеру тепер виглядає так:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/go-co-op/gocron"
"github.com/go-resty/resty/v2"
)
const (
// base URL of the OpenAI Admin API
baseURL = "https://api.openai.com/v1"
// endpoints that we call
costsPath = "/organization/costs"
projectsPath = "/organization/projects"
// VictoriaMetrics push endpoint (Prometheus remote write format)
pushURL = "http://localhost:8428/api/v1/import/prometheus"
)
// structure describing the JSON for costs API
// resty will unmarshal into this struct automatically
type CostsResponseData struct {
Data []struct {
Results []struct {
ProjectID string `json:"project_id"`
Amount struct {
Value float64 `json:"value"`
} `json:"amount"`
} `json:"results"`
} `json:"data"`
}
// structure describing the JSON for projects API
// used to map project_id → readable project name
type ProjectsResponse struct {
Data []struct {
ID string `json:"id"`
Name string `json:"name"`
} `json:"data"`
}
// normalizeLabel converts a project name into a Prometheus-safe label
// - lowercases
// - replaces spaces with underscores
// - replaces slashes to avoid label parser issues
func normalizeLabel(s string) string {
s = strings.ToLower(s)
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, "/", "_")
return s
}
// getOpenAI performs a GET request to the OpenAI Admin API
// and unmarshals the returned JSON into the 'out' structure.
//
// ctx: allows cancellation (we pass rootCtx so Ctrl+C cancels requests)
// client: the resty client with authentication
// path: "/organization/costs" or "/organization/projects"
// params: optional query parameters
func getOpenAI(ctx context.Context, client *resty.Client, path string, params map[string]string, out any) error {
// create HTTP request object
req := client.R().
SetContext(ctx). // attach context so cancellation works
SetResult(out) // register target structure for unmarshalling JSON
// set optional query parameters
if params != nil {
req.SetQueryParams(params)
}
// execute HTTP GET request
if _, err := req.Get(baseURL + path); err != nil {
return fmt.Errorf("request to %s failed: %w", path, err)
}
return nil
}
// fetchAndPush performs one exporter cycle:
//
// 1. fetch costs grouped by project_id
// 2. fetch readable project names
// 3. build project_id → normalized_name map
// 4. create/update Prometheus gauges
// 5. push all metrics to VictoriaMetrics
//
// ctx: the root context (cancelled when Ctrl+C is pressed)
func fetchAndPush(ctx context.Context, client *resty.Client) error {
// create fresh response holders for every iteration
costsRes := &CostsResponseData{}
projectsRes := &ProjectsResponse{}
projectNames := make(map[string]string)
// build query parameters for costs API
// start_time: current timestamp (Unix)
// group_by: instruct API to group costs per project_id
timeNow := strconv.FormatInt(time.Now().Unix(), 10)
costParams := map[string]string{
"start_time": timeNow,
"group_by": "project_id",
}
// fetch costs data
if err := getOpenAI(ctx, client, costsPath, costParams, costsRes); err != nil {
return fmt.Errorf("fetch costs: %w", err)
}
// fetch project definitions
if err := getOpenAI(ctx, client, projectsPath, nil, projectsRes); err != nil {
return fmt.Errorf("fetch projects: %w", err)
}
// fill map: project_id → normalized_label
for _, p := range projectsRes.Data {
projectNames[p.ID] = normalizeLabel(p.Name)
}
// process returned costs
for _, dataItem := range costsRes.Data {
for _, result := range dataItem.Results {
id := result.ProjectID
amount := result.Amount.Value
// resolve project readable name
project := projectNames[id]
if project == "" {
project = "unknown"
}
metricName := fmt.Sprintf(
`openai_stats{project="%s",category="costs"}`,
project,
)
// get or create gauge
gauge := metrics.GetOrCreateGauge(metricName, nil)
// update gauge value
gauge.Set(amount)
// log written metric
log.Printf("metric updated: name=%s value=%f", metricName, amount)
}
}
// push metrics with job="openai_exporter"
pushOpts := &metrics.PushOptions{
ExtraLabels: `job="openai_exporter"`,
}
// push all collected metrics
if err := metrics.PushMetrics(ctx, pushURL, false, pushOpts); err != nil {
return fmt.Errorf("push metrics: %w", err)
}
return nil
}
func main() {
// create a context that automatically cancels on OS signals (Ctrl+C, kill, SIGTERM)
//
// how it works:
// - signal.NotifyContext wraps the parent context and subscribes it to OS signals
// - when the program receives Ctrl+C (SIGINT) or SIGTERM:
// Go internally calls rootCancel()
// the context's Done() channel is closed
// - all goroutines waiting on <-rootCtx.Done() are instantly unblocked
// - any operation bound to this context (HTTP requests, timeouts, jobs)
// receives ctx.Err()==context.Canceled and stops gracefully
//
// practically:
// - main goroutine waits for <-rootCtx.Done()
// - when Ctrl+C arrives => rootCtx.Done() closes => program starts graceful shutdown
//
// 'defer rootCancel()' is used to clean up internal signal resources when main() exits normally
rootCtx, rootCancel := signal.NotifyContext(
context.Background(),
os.Interrupt,
syscall.SIGTERM,
)
defer rootCancel()
// load OpenAI admin API key
apiKey := os.Getenv("OPENAI_ADMIN_KEY")
if apiKey == "" {
log.Fatal("OPENAI_ADMIN_KEY is not set")
}
// create resty client with:
// - bearer token
// - automatic retries (3 attempts)
client := resty.New().
SetAuthToken(apiKey).
SetRetryCount(3).
SetRetryWaitTime(2 * time.Second)
// create scheduler using local timezone
s := gocron.NewScheduler(time.Local)
// register a job that runs every 1 minute
s.Every(1).Minute().Do(func() {
start := time.Now()
log.Println("starting fetch-and-push cycle")
// run our exporter cycle
if err := fetchAndPush(rootCtx, client); err != nil {
log.Println("ERROR during fetchAndPush:", err)
return
}
log.Println("fetch-and-push completed in", time.Since(start))
})
log.Println("starting scheduler...")
// run scheduler in background goroutine
s.StartAsync()
// block until Ctrl+C cancels rootCtx
<-rootCtx.Done()
log.Println("received Ctrl+C, stopping scheduler...")
// shutdown scheduler gracefully
s.Stop()
log.Println("scheduler stopped, exiting")
}
Ті самі 6.95 долари, що ми бачимо у VictoriaMetrics від нашого експортеру.
Можна б ще покращити код, наприклад розбити велику функцію fetchAndPush(), і треба додати передачу URL до VictoriaMetrics зі змінних оточення, але поки поживемо з таким варіантом.
Bonus: як працює контроль виконання через Golang context
Ми в нашій функції fetchAndPush() використовуємо metrics.PushMetrics(), передавши йому контекст.
Але для кращої картини – давайте знову повернемося до InitPush(), бо там використання context більш явне.
Отже, InitPush() викликає InitPushExt(), а InitPushExt() викликає InitPushExtWithOptions(), якому передає пустий context.Background() – return InitPushExtWithOptions(context.Background() ...).
В InitPushExtWithOptions() запускається goroutine, go func() {}, в якій створюється локальний context :
таким чином timerCtx тепер має доступ до всіх методів структури cancelCtx
далі WithDeadlineCause() перевіряє умову if dur <= 0 і, і якщо час виконання завершився, то:
викликає c.cancel(true, DeadlineExceeded, cause)
повертає “return c, func() { c.cancel(false, Canceled, nil) }“, яка повертається до InitPushExtWithOptions() в частині ctxLocal, cancel := context.WithTimeout() і "func() { c.cancel() }" і стає cancel()
c.cancel() – це метод структури timerCtx – func (c *timerCtx) cancel(), який викликає c.cancelCtx.cancel()
а c.cancelCtx.cancel() – це метод структури cancelCtx – func (c *cancelCtx) cancel(), який викликає d, _ := c.done.Load().(chan struct{})
і викликає close(d)
Ось тут:
func (c *cancelCtx) cancel(removeFromParent bool, err, cause error) {
...
d, _ := c.done.Load().(chan struct{})
if d == nil {
...
} else {
close(d)
}
...
c.done.Load() – викликається з поля done структури cancelCtx:
type cancelCtx struct {
...
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
...
}
Тобто, в d, _ := c.done.Load().(chan struct{}) викликається Load(), в (chan struct{}) виконується type assertion, тобто перевіряється, що це тип chan struct{}, після чого d стає chan struct{}, після чого виконується close(channel).
А close() – це вбудована функція Go, яка закриває отриманий аргументом канал.
Як тільки канал Done() закривається – всі goroutines, які виконують <-ctx.Done(), миттєво пробуджуються і можуть коректно завершити свою роботу.
В InitPushExtWithOptions() це виконується тут:
go func() {
...
stopCh := ctx.Done()
...
case <-stopCh:
...
return
}
}
}()
Закриття каналу – це читання нульового значення, що призводить до спрацювання умови case => що призводить до завершення циклу через виклик return => що призводить до завершення всієї go func() {}.
Окей.
А звідки канал взявся?
Відкриття каналу
Для того, аби функція чи рутина постійно “слухали” цей канал в очікуванні його закриття – ми викликаємо Done():
...
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
stopCh := ctx.Done()
...
case <-stopCh:
if wg != nil {
wg.Done()
}
return
}
...
Інтерфейси в Go дозволяють описати доступ до даних або методів без створення самих реалізацій в цих інтерфейсах.
Таким чином ми створюємо “загальну шину”, яку далі можемо використовувати для “підключення” зовнішніх “систем”.
Тобто інтерфейс – це абстракція, яка описує доступ до іншого типу, але конкретна реалізація цієї поведінки вже буде залежати від того, що саме ми підключимо до інтерфейсу.
Взагалі хотів просто написати пост при використання інтерфейсів, але натомість вийшов пост про те, як інтерфейси реалізовані взагалі, і про “магію” того, як через них відбувається виклик даних.
Насправді спочатку було б добре написати про pointers та методи в Go, бо в цьому матеріалі саме на них побудоване все пояснення роботи інтерфейсів, але це вже іншим разом.
Найпростіший приклад інтерфейсу – коли в ньому не задається ані метод, ані тип даних, який цей метод повертає.
Використовуючи такий інтерфейс ми можемо створити функцію, яка буде приймати будь-який тип даних – бо інакше при оголошенні функції нам треба вказати тип даних, який вона приймає в параметрі:
package main
import "fmt"
// define an empty interface
// it can hold a value of any type
type Any interface{}
func printValue(v Any) {
// print the value
fmt.Println("Value:", v)
}
func main() {
// pass int
printValue(42)
// pass string
printValue("hello")
// pass float
printValue(3.14)
// pass slice
printValue([]int{1, 2, 3})
}
Без використання інтерфейсу – нам довелось би створювати окремі функції для кожного типу, який ми хочемо передати, або, як альтернатива – використовувати generics.
Інший варіант створення порожнього інтерфейсу – це використання типу any, який по факту є аліасом на interface{}:
...
type MyAny any
func printValue(v Any) {
// print the value
fmt.Println("Value:", v)
}
...
Interfaces та Methods
Якщо “порожній” інтерфейс any каже “я приймаю будь-яке значення”, то “класичний” інтерфейс каже: “мене цікавить лише певна поведінка”.
Ця поведінка описується через набір сигнатур методів (method signatures), які тип має реалізувати, щоб відповідати інтерфейсу
Коли ми створюємо змінну інтерфейсного типу або передаємо значення у функцію, що приймає інтерфейс, Go перевіряє, чи тип реалізує всі методи цього інтерфейсу, і створює зв’язок між ними.
І потім ми, використовуючи цей інтерфейс, можемо викликати пов’язані з ним методи.
Тобто інтерфейс – це посередник, який дозволяє викликати метод незалежно від конкретного типу.
Наприклад:
package main
import "fmt"
// define an interface 'MyInterface' with a single method 'MyMethod' returning a string
type MyInterface interface {
MyMethod() string
}
// define 'MyStruct' struct with a 'MyField' field
type MyStruct struct {
MyField string
}
// define 'MyMethod' method for the 'MyStruct' struct
// this makes 'MyStruct' implicitly implement 'MyInterface'
// 'MyMethod' method uses 'MyStruct' as the receiver, so this method is tied to the 'MyStruct' type
func (receiver MyStruct) MyMethod() string {
return "Executing " + receiver.MyField
}
// define a function 'sayHello()' which accepts any type that implements 'MyInterface'
// and prints the value returned by its 'MyMethod'
func sayHello(g MyInterface) {
fmt.Println(g.MyMethod())
}
func main() {
// create an instance of MyStruct
myObj := MyStruct{MyField: "Hello, Interface!"}
// pass the MyStruct instance to the function.
// this works because MyStruct implements MyInterface.
sayHello(myObj)
}
Тут ми:
оголошуємо власний інтерфейсний тип з іменем MyInterface
цей інтерфейс описує одну сигнатуру методу – MyMethod(), і цей метод має повертати дані з типом string
створюємо власний тип даних MyStruct з типом struct, в якому є одне поле MyField з типом string
до цієї структури “прив’язуємо” функцію MyMethod() – через вказання ресивера (receiver MyStruct), завдяки чому MyStructреалізуєінтерфейсMyInterface
описуємо нашу “основному робочу” функцію sayHello(), яка аргументом приймає інтерфейс і викликає метод MyMethod(), який є в цьому інтерфейсі
створюємо інстанс нашого типу даних MyStruct, якому в поле MyField записуємо значення “Hello, Interface!“
і викликаємо нашу робочу функцію, передаючи аргументом цю структуру
Постарався відобразити зв’язки між всіма об’єктами, бо вони дуже не явні, вийшло щось таке:
створюємо об’єкт myObj з типом MyStruct
викликаємо sayHello(), передаючи аргументом myObj, який всередині функції sayHello() стає змінною g, яка пов’язується з нашим інтерфейсом MyInterface, який надає доступ до методу MyMethod()
в функції sayHello() через виклик g.MyMethod() ми звертаємось до інтерфейсу MyInterface, кажучи “мені потрібен твій метод MyMethod()“
інтерфейс MyInterface “бачить”, що всередині нього зараз схований об’єкт myObj (типу MyStruct), тому він перенаправляє цей виклик саме до методу цієї конкретної структури
Окей – тепер картина стає більш зрозумілою.
Окрім одного моменту – як саме інтерфейс “бачить”, що “в ньому” є об’єкт myObj з методом MyMethod()?
The interface’s “magic”: type iface struct
Для того, аби розібратись з цим – трохи зануримось в магію вказівників (pointers), а саме – створимо власну структуру, яка буде копіювати те, як в type MyInterface interface структуровані дані.
А потім через вказівники – подивимось на адреси і зміст даних.
“Трохи” перепишемо наш код:
package main
import (
"fmt"
"unsafe"
)
// define MyInterface interface
// (same as before)
type MyInterface interface {
MyMethod() string
}
// define MyStruct struct
// (same as before)
type MyStruct struct {
MyField string
}
// define MyMethod method with a POINTER receiver
// - before was func (p MyStruct) MyMethod() ... - by value
// - now is func (p *MyStruct) MyMethod() ... - by pointer
// This means the method operates on the original data.
func (p *MyStruct) MyMethod() string {
return "Executing " + p.MyField
}
// This is the helper struct to inspect an interface
// It represents the internal memory layout of an interface variable
// the 'tab' has a table with information about the interface's type and methods
//
// type iface struct {
// // pointer to the 'itab' struct, see below
// tab *itab
// // here will be a pointer to the 'myObj' struct
// data unsafe.Pointer
// }
//
// type itab struct {
// // pointer to the 'type MyInterface interface'
// inter *interfacetype
// // pointer to the 'type MyStruct struct'
// typ *rtype
// // in our case we have 1 method, thus '[N]uintptr' == [1]uintptr
// // and in the 'fun[0]' will be the address of the method 'MyMethod'
// fun [N]uintptr // will have '[1]uintptr', and
// }
type ifaceStruct struct {
// Pointer to type/method info table
tab unsafe.Pointer
// Pointer to the actual data
// in our case, here will be a pointer to the 'myObj' struct
data unsafe.Pointer
}
// HERE IS THE "MAGIC"
// We modify sayHello to inspect the `g` it receives.
//
// 'g' is a new, local variable of the 'MyInterface' type.
// When the function is called, `myObj` is assigned to `g`.
//
// Because 'g' is an interface, it internally consists of two pointers:
// 1. tab: A pointer to the "interface table" (itab) that links
// the interface type (MyInterface) to the concrete type (*MyStruct)
// and stores pointers to the methods that satisfy the interface
// 2. data: A pointer to the actual data. In our case, this will be
// the pointer we passed in (`myObj`).
func sayHello(g MyInterface) {
fmt.Println("Inside sayHello()")
// Get the address of `g` and cast it to our helper struct 'ifaceStruct'
// This line does three things in one go:
// 1. &g - takes the memory address of our interface variable `g`
// 2. unsafe.Pointer(&g) - casts that address to a raw, untyped pointer
// 3. (*ifaceStruct)(...) - re-interprets that raw pointer as a pointer to our helper struct
// As a result, `g_internal` is now a `*ifaceStruct` that points to
// the exact same memory location as `g`, letting us access its .tab and .data fields.
g_internal := (*ifaceStruct)(unsafe.Pointer(&g))
fmt.Printf("Internal 'Type' pointer (tab): %p\n", g_internal.tab)
fmt.Printf("Internal 'Data' pointer (data): %p\n", g_internal.data)
fmt.Println("Result:", g.MyMethod())
}
func main() {
// Create the object and get a pointer to it
// 'myObj' now holds a pointer to a MyStruct instance in memory
myObj := &MyStruct{MyField: "Hello, Interface!"}
// Print location of the 'myObj' struct
fmt.Println("Inside main()")
fmt.Printf("Address of the original 'myObj' in main(): %p\n", myObj)
// Pass the pointer to the function
// i.e. we pass an address of the 'myObj' struct location
sayHello(myObj)
}
Запускаємо:
$ go run interface-details.go
Inside main()
Address of the original 'myObj' in main(): 0xc000014070
Inside sayHello()
Internal 'Type' pointer (tab): 0x4e5a28
Internal 'Data' pointer (data): 0xc000014070
Result: Executing Hello, Interface!
В коментах розписав все детально, але по суті коротко ми:
у виклику sayHello(myObj) до функції sayHello() передаємо адресу “0xc000014070” – посилання на структуру MyStruct з полем MyField, в яке записане значення “Hello, Interface!“
функція sayHello() приймає аргумент типу інтерфейс, і змінна g містить два вказівники – tab (на структуру itab, яка зберігає інформацію про тип і методи), та data (на значення типу MyStruct)
А сама цікава магія відбувається під час компіляції програми і створення структури itab:
Go перевіряє методи в коді, знаходить структуру MyStruct з методом MyMethod()
перевіряє інтерфейси, і знаходить MyInterface, який вимагає метод MyMethod() string
перевіряє, що MyStruct.MyMethod() та MyInterface.MyMethod() збігаються
створює таблицю інтерфейсу (itab – interface table), яка пов’язує MyStruct з MyInterface і зберігає адреси методів, що реалізують інтерфейс
І далі під час виконання програми під час виклику sayHello(myObj) Go створює нову змінну g типу iface, у якій ці два вказівники (tab та data) поєднуються:
вказівник на itab (яку компілятор створив для пари MyStruct + MyInterface) буде поміщено в g.tab
вказівник на myObj (тобто адреса типу “0xc000014070“) буде поміщено в g.data
В результаті в g.tab у нас буде структура itab – в полі fun[0] якої буде адреса функції MyMethod(), а в g.data – буде вказівник на екземпляр MyStruct з полем MyField.
І тоді при виклику:
...
fmt.Println("\nResult:", g.MyMethod())
...
Ми запускаємо:
...
return "Executing " + *MyStruct.MyField
...
Наостанок – можна ще вивести і саму itab, аналогічно тому, як зробили для самого інтерфейсу, через створення власної структури type itabStruct struct:
package main
import (
"fmt"
"unsafe"
)
// define MyInterface interface
type MyInterface interface {
MyMethod() string
}
// define MyStruct struct
type MyStruct struct {
MyField string
}
// define MyMethod method with a POINTER receiver
func (p *MyStruct) MyMethod() string {
return "Executing " + p.MyField
}
// This helper represents the interface value itself (the 2-word struct)
type ifaceStruct struct {
// Pointer to the 'itab' (interface table)
tab unsafe.Pointer
// Pointer to the actual data (our *MyStruct)
data unsafe.Pointer
}
// NEW
// This helper represents the internal 'runtime.itab' struct
type itabStruct struct {
// inter: Pointer to the interface type's definition (MyInterface)
inter unsafe.Pointer
// typ: Pointer to the concrete type's definition (*MyStruct)
typ unsafe.Pointer
// hash: Hash of the concrete type, used for lookups
hash uint32
// _ [4]byte: Padding (on 64-bit systems)
_ [4]byte
// fun: The method dispatch table - an array of function pointers
// Each entry corresponds to a method defined in the interface
// Here we have one entry: the address of MyStruct.MyMethod()
fun [1]uintptr
}
// HERE IS THE "MAGIC"
func sayHello(g MyInterface) {
fmt.Println("--- Inside sayHello() ---")
// 1. Get the address of `g` and cast it to our helper struct
g_internal := (*ifaceStruct)(unsafe.Pointer(&g))
// Print the two main pointers
fmt.Printf("g.tab (pointer to itab): %p\n", g_internal.tab)
fmt.Printf("g.data (pointer to myObj): %p\n", g_internal.data)
// NEW - 2. DE-REFERENCE THE 'tab' POINTER
// Cast the 'tab' pointer to our itabStruct pointer
itab_ptr := (*itabStruct)(g_internal.tab)
// NEW - 3. PRINT THE CONTENTS OF THE 'itab'
fmt.Println("\n--- Inspecting the 'itab' (at address g.tab) ---")
fmt.Printf("itab.inter (ptr to MyInterface info): %p\n", itab_ptr.inter)
fmt.Printf("itab.typ (ptr to *MyStruct info): %p\n", itab_ptr.typ)
fmt.Printf("itab.hash (hash of *MyStruct type): %x\n", itab_ptr.hash)
// This is the final link!
// This is the actual memory address of the function to be called, i.e. the 'g.MyMethod()' in this case
fmt.Printf("itab.fun[0] (ADDRESS OF THE METHOD): 0x%x\n", itab_ptr.fun[0])
// 4. Call the method as usual
fmt.Println("\nResult:", g.MyMethod())
}
func main() {
// Create the object and get a pointer to it
myObj := &MyStruct{MyField: "Hello, Interface!"}
fmt.Println("--- Inside main() ---")
fmt.Printf("Address of original 'myObj' in main(): %p\n", myObj)
// Pass the pointer to the function - Go will create an 'iface' value
// linking the interface 'MyInterface' with the concrete type *MyStruct.
sayHello(myObj)
}
Результат:
$ go run interface-details-3.go
--- Inside main() ---
Address of original 'myObj' in main(): 0xc00019a020
--- Inside sayHello() ---
g.tab (pointer to itab): 0x4e6c08
g.data (pointer to myObj): 0xc00019a020
--- Inspecting the 'itab' (at address g.tab) ---
itab.inter (ptr to MyInterface info): 0x4a9d80
itab.typ (ptr to *MyStruct info): 0x4a86e0
itab.hash (hash of *MyStruct type): 1ac3179f
itab.fun[0] (ADDRESS OF THE METHOD): 0x499c40
Result: Executing Hello, Interface!
Тобто, коли ми викликаємо g.MyMethod(), Go бере адресу функції з itab.fun[0] і викликає її, передаючи їй як аргумент вказівник з g.data – от і вся “магія” динамічного виклику методів через інтерфейс.
Ну і тепер можна використовувати інтерфейси, вже маючи уявлення про те, як саме вони працюють.
Прилетів мені один з дефолтних алертів VictoriaMetrics, які створюються під час деплою Helm-чарту victoria-metrics-k8s-stack:
Думав написати коротенький пост типу “що таке Churn Rate і як його пофіксати”, але в результаті вийшло доволі глибоко зануритись в те, як взагалі VictoriaMetrics працює з даними – і це виявилось дуже цікавою темою.
Давайте спочатку коротко розберемо що таке “метрика” і тайм-серія взагалі, і потім подивимось як вони впливають на ресурси системи – CPU, пам’ять та диск.
Metric vs Time Series vs Sample
Всі ми маємо справу з метриками в моніторингу – будь то Prometheus, чи VictoriaMetrics, чи InfluxDB, і ці метрики ми потім використовуємо в наших дашбордах Grafana або в алерт-рулах VMAlert.
Але що таке власне “метрика”? А що таке тайм-серія, sample чи data point? І як кількість різних значень однієї label для метрики впливає на використання диску та пам’яті?
Бо, наприклад, я в постах зазвичай просто використовую слово “метрика”, бо в 99% цього достатньо, аби описати об’єкт, про який йде мова.
Але для повноцінної роботи з системами моніторингу треба добре уявляти різницю між цими поняттями.
Що таке Metric?
Метрика (Metric): що вимірюється
Наприклад – cpu_usage, memory_free, http_requests_total, database_connections.
В документації VictoriaMetrics є дуже точний вираз – це як імена змінних, через які ми передаємо дані, див. Structure of a metric.
Метрика має власне ім’я, та опціонально набір labels (лейбл або тегів), які дозволяють додати більше контексту для конкретного вимірювання – але без значень цих лейбл.
Крім того, лейбли впливають на те, як дані по ції метриці будуть зберігатись і шукатись.
Тобто метрика – це “схема”, яка описує що ми вимірюємо, та за якими ознаками (лейблами) можемо групувати дані.
Приклад:
Metric: "cpu_usage{server, core}"
Тут:
ім’я метрики: cpu_usage
ім’я label: server
ім’я label: core
Що таке Time Series?
Таймсерія (Time Series): послідовність даних
Це повна послідовність записів, які згруповані для конкретної метрики та її labels зі значеннями – тобто набору metric_name{label_name="label_value"}, і які впорядковані за часом.
для таймсерії cpu_usage{server="web01", core="0"} маємо чотири семпла:
1753857852, 75.5
1753857912, 76.2
1753857972, 74.8
1753858032, 73.1
І дані за весь період спостережень по кожній унікальній комбінації cpu_usage{server="some_server", core="some_core"} будуть формувати одну і ту ж таймсерію, навіть якщо ці дані збираються роками – допоки не зміниться значення або в server, або в core.
High Cardinality vs High Churn rate
Обидві проблеми мають однакове “походження”, але трохи відрізняються по суті.
High cardinality – це “persistent проблема”, яка впливає на зберігання, індексацію та пошук даних.
Вона виникає, коли у нас є багато унікальних комбінацій лейблів, навіть якщо значення самих метрик надходять рідко або перестають надходити.
Це призводить до великої кількості живих та неактивних серій, що збільшує розмір IndexDB, використання памʼяті та час пошуку. Про IndexDB детальніше будемо говорити далі.
High churn rate – це “online проблема”, коли у нас постійно створюються нові тайм-серії через зміну значень лейблів, особливо короткоживучих або динамічних (як у Kubernetes – pod_name, container_id, job_id, або щось типу client_ip).
Це створює великий потік нових записів у IndexDB, завантажуючи CPU, пам’ять, та диск.
“Життя метрики”
Є дуже класне відео, яке побачив багато років тому – The Inner Life of the Cell, чомусь воно тут згадалось.
Аби зрозуміти як кількість лейбл (точніше – значення в них) впливають на розмір даних в системі і на використання CPU та пам’яті – давайте подивимось як у VictoriaMetrics взагалі відбувається весь процес “під капотом”.
Там 7 частин, і для дійсно “глибокого занурення” у внутрішню архітектуру VictoriaMetrics дуже рекомендую їх прочитати.
Але зараз ми відносно швидко пройдемося по процесу додавання нових даних і їхньому пошуку, і більше сконцентруємось саме на питанні Churn Rate.
“Write-path”: vminsert та vmstorage
Отже – почнемо з початку: vmagent збирає метрики з експортерів, і далі ці дані через vminsert треба записати до vmstorage.
У випадку vmsingle у на всі компоненти працюють в одному процесі, але для кращої картини – давайте їх розділяти.
vminsert збирає дані до себе в пам’ять, після чого відправляє до vmstorage блоками до 100 мегабайт.
На початку кожного блоку від vminsert задається загальний розмір блоку, після чого vmstorage починає зчитувати дані в ньому блоками по 24+n байт, строкам (row):
в перших 8 байтах вказується розмір n – розмір наступного сектору, який містить в собі ім’я метрики та її лейбли
другий сектор – ці n байт з іменем метрики і лейблами
третій сектор розміром 8 байт містить в собі значення семпла (“75.5” з прикладів вище)
четвертий містить Timestamp, ще 8 байт
В результаті формується row із 8*3 байт (24) + n байт, де n – це довжина імені метрики і її лейбл.
vmstorage формує власні блоки, в кожному максимум 10,000 строк:
vmstorage, IndexDB та TSID
Після чого починає сама цікава магія – це Time Series ID, або TSID.
Для кожної унікальної комбінації метрика+лейбли+значення лейбл VictoriaMetrics має власний унікальний ідентифікатор, який використовується для збереження даних та при подальшому пошуку даних.
Сам TSID це ідентифікатор (див type TSID struct), суто внутрішній механізм самої VictoriaMetrisc, який ми, нажаль, ніде побачити не можемо:
// TSID is unique id for a time series.
//
// Time series blocks are sorted by TSID.
type TSID struct {
MetricGroupID uint64
JobID uint32
InstanceID uint32
// MetricID is the unique id of the metric (time series).
//
// All the other TSID fields may be obtained by MetricID.
MetricID uint64
}
Маючи набір з імені метрики та її тегів (лейбл), vmstorage спершу перевіряє свій TSID Cache. Якщо для ції комбінації ми вже маємо згенерований TSID – використовуємо його.
Якщо в кеші даних нема (значення vm_slow_row_inserts_total росте) – vmstorage звертається до IndexDB, і починає пошук TSID там.
Якщо в IndexDB знайдений TSID – він додається в кеш vmstorage, і процес йде далі:
Якщо ж це абсолютно нові імена метрики і лейбл з їхніми значеннями – генерується новий TSID, який реєструється в кеші vmstorage.
IndexDB зберігає два індекси, в кожному кілька мапінгів між полями та ID, описано в частині How IndexDB is Structured:
1 – Tag to metric IDs (Global index): кожен тег (лейбла) мапиться на ім’я метрики (її ID)
2 – Metric ID to TSID (Global index): ID кожної метрики мапиться на TSID
3 – Metric ID to metric name (Global index): мапінг власне імені метрики на її ID
5 – Date to metric ID (Per-day index): мапінг дат на metric ID для швидкого пошуку по датам (“чи є за цей день дані по цій метриці”)
6 – Date with tag to metric IDs (Per-day index): аналогічний до першого Tag to metric IDs мапінгу, але по датам
7 – Date with metric name to TSID (Per-day index): схожий на другого Metric ID to TSID мапінгу, але по іменам метрик і датам
Ці індекси тримаються як в пам’яті, і періодично записуються на диск (flush) в persistant storage IndexDB в каталог indexdb/, де – як і в каталозі data/, в якому зберігають самі тайм-серії – виконується merge даних для оптимізації зберігання та пошуку.
І повертаючись до питання Churn Rate та High cardinality – кожна окрема метрика+лейбли створюють окремі TSID, для кожної лейбли створюються мапінги в індексах, при великій кількості нових даних, які постійно записуються з пам’яті в диск – частіше викликаються дискові операції – маємо навантаження на CPU, пам’ять, I/O операції диска.
vmstorage та збереження даних на диску
В принципі, саме цікаве ми вже побачили – ролі IndexDB та TSID, але давайте пройдемось по решті процесу.
З отриманих від vminsert даних прочитали дані, сформували власні block з rows.
В кожній row vmstorage зберігає вже не ім’я метрики – а її TSID, а для кожного TSID містить записи з values та часом (власне, тайм-серії):
Тут в small “скидаються” дані з in-memory parts, і small потім merge в big parts.
Кожен part містить в собі власний індекс, який відповідає за мапінг даних на timestamps та values:
“Read-path”: пошук даних з vmselect та vmstorage
Коли ж ми робимо пошук по даним – то vmselect передає до vmstorage запит з метрикою, лейблами (тегами) та датою, за яку треба виконати пошук.
vmstorage в IndexDB по tag to metric IDs знаходить відповідні MetricIDs – для всіх метрик, які має цей тег.
Далі по Metric ID IndexDB в записах metric ID to TSID знаходить відповідні TSID, які повертає до vmstorage.
Маючи TSID – vmtorage вже перевіряє in-memory, small та big parts, шукаючи потрібний TSID в файлах metaindex.bin.
А знайшовши потрібний metadata.bin – він читає відповідний index.bin, який вже каже в яких строках timestamp.bin та values.bin знайти потрібні дані, які потім повертаються до vmselect.
Практичний приклад: запис 10,000 метрик і 10,000 labels
Це все цікаво почитати в теорії – але давайте трохи практики, бо завжди ж цікаво подивитись як воно виглядає в реальності.
Що будемо робити:
запустимо два контейнери з VictoriaMetrics
в кожен через API запишемо 10,000 метрик, але:
в один інстанс для всіх метрик лейбла буде мати однакове значення
в другий інстанс значення label буде постійно змінюватись
А потім глянемо як це вплинуло на розмір даних.
Створюємо директорії:
$ mkdir vm-data-light
$ mkdir vm-data-heavy
Запускаємо два контейнери – vm-light та vm-heavy, кожному підключаємо відповідний каталог – ./vm-data-light та ./vm-data-heavy, кожен слухає власний TCP-порт:
$ du -sh vm-data-light/
76K vm-data-light/
$ du -sh vm-data-heavy/
76K vm-data-heavy/
І кількість файлів в них:
$ find vm-data-light/ -type f | wc -l
5
$ find vm-data-heavy/ -type f | wc -l
5
Всюди все однаково.
Тепер пишемо два скрипти – теж “light” та “heavy”.
Спочатку “light” версія:
#!/usr/bin/env bash
for i in $(seq 1 10000); do
echo "my_metric{label=\"value-1\"} $i" | curl -s \
--data-binary @- \
http://localhost:8428/api/v1/import/prometheus
done
echo "DONE: stable series sent"
Тут в циклі від 1 до 10000 виконуємо запис метрики my_metric{label="value-1"}, але з кожним разом просто збільшуємо значення, яке зберігаємо.
Другий скрипт – “heavy” версія:
#!/usr/bin/env bash
for i in $(seq 1 10000); do
echo "my_metric{label=\"value-$i\"} $i" | curl -s \
--data-binary @- \
http://localhost:8429/api/v1/import/prometheus
done
echo "DONE: high churn series sent"
Він аналогічний, але тут значення змінної $i використовуємо ще і для зміни значення в label – my_metric{label="value-$i"} $i.
Запускаємо тести:
$ bash light.sh
$ bash heavy.sh
І порівнюємо дані.
Розмір даних в data/:
$ du -sh vm-data-light/data/
152K vm-data-light/data/
$ du -sh vm-data-heavy/data/
372K vm-data-heavy/data/
Розмір даних в indexdb/:
$ du -sh vm-data-light/indexdb/
56K vm-data-light/indexdb/
$ du -sh vm-data-heavy/indexdb/
764K vm-data-heavy/indexdb/
Кількість файлів в data/:
$ find vm-data-light/data/ -type f | wc -l
26
$ find vm-data-heavy/data/ -type f | wc -l
26
Кількість файлів в indexdb/:
$ find vm-data-light/indexdb/ -type f | wc -l
8
$ find vm-data-heavy/indexdb/ -type f | wc -l
53
8 vs 53!
Дерево каталогів і файлів в vm-data-light/data/ і vm-data-heavy/data/ буде однаковим, але давайте глянемо на IndexDB.
Там познайомились з InfluxDB в цілому, тепер час будувати з ним реальні рішення.
Що будемо робити – запустимо InfluxDB на Debian, налаштуємо NGINX, імпортуємо дані з Google Sheets в .csv, а потім мігруємо їх до InfluxDB та підключимо Grafana. І додатково трохи пограємось з Python Falsk для створення веб-форми.
Мій “self-monitoring” проект
Власне, для чого я все це роблю: я веду такий собі “self human monitoring” – кожного дня записую в Google Sheets різні показники – як добре спав, який був настрій, наскільки добре голова працювала і багато іншого, загалом там 23 метрики.
Далі це все прямо в Google Sheets виводиться в графіки, де я в будь-який момент можу глянути в який період яке в мене було самопочуття.
Система дуже класна, веду її вже два з половиною роки і активно користуюсь, але є проблема – це візуалізація даних, бо дефолтні графіки в сами гуглотаблицях дуже обмежені.
Минулого року для візуалізації підключав Google Looker Studio, який нативно вміє інтеграцію з Google Sheets – але з ним постійно виникали якісь проблеми, особливо якщо змінювався формат в таблиці типу перейменування колонок, тому згодом я Looker Studio закинув.
І врешті-решт прийшла ідея того, що, камон! Девопс я, ілі тварь дрожащая?
Чому б не використати мої знання в моніторингу інфраструктури в цій справі теж?
Тому вирішив побудувати власний стек моніторингу, де дані будуть зберігатись в InfluxDB.
Взагалі, InfluxDB вибрав, бо трохи погрався і сподобалось як там все з коробки є, але коли почав вже робити дашборди – то поняв, що вона все ж доволі обмежена, і мені не вистачає Grafana.
Тому поки що InfluxDB залишиться як база, а до неї додамо Grafana.
А вже пізніше, мабуть, все ж мігрую дані до VictoriaMetrics.
Втім, цей пост, звісно, не про цей селф-мониторинг, а просто непоганий приклад того, як запустити Influx з NGINX і Grafana, як імпортувати дані, і як створити веб-сторінку з Flask для додавання нових метрик в InfluxDB.
Поточні дані в Google Sheets
На прикладі таблиці Sleep:
Тут Sleep_rate – суб’єктивна оцінка якості сну, Sleepy_day – наскільки сильна була сонливість цього дня, Wake_ups – скільки раз за ніч прокидався, і Mults – наскільки яскраві і насичені були сни, бо іноді вони бувають дійсно “мультфільмами” – наче всю ніч в кінотеатрі просидів 🙂
План дій
Робитись все буде на тому самому сервері з Debian, де зараз хоститься сам блог RTFM.
Що будемо робити:
запустимо InfluxDB в Docker
налаштуємо vitrtualhost в NGINX
імпортуємо існуючі дані з Google Sheets в InfluxDB
подивимось, які дашборди можемо зробити в InfluxDB
додамо форму для введення нових даних
додамо Grafana для повноцінної візуалізації
Окремо треба буде зробити бекап і підтюнити InfluxDB та Grafana, бо сервер маленький, лише 2 гігабайти пам’яті, але це вже іншим разом.
Поїхали.
Запуск InfluxDB з Docker Compose
Простіше всього зробити з docker-compose, аби потім легше було переносити на інший сервер.
Встановлення Docker та Docker Compose на Debian
Встановлюємо Docker та Docker Compose, документація тут>>>:
INFLUXD_REPORTING_DISABLED: телеметрія в InfluxData (О.о)
INFLUXD_TASKS_ENABLED: користуватись поки не планую
INFLUXD_FLUX_LOG_ENABLED: детальні логи Flux queries, поки логи нехай будуть, але потім можна буде відключити
INFLUXD_QUERY_MEMORY_BYTES: можна задати ліміт по пам’яті на кожен запит, але з моїм об’ємом даних – не варте
INFLUXD_UI_DISABLED: можна відключити веб-інтерфейс і працювати тільки з API, поки нехай буде, як повністю на Grafana переключусь – можна буде відключити
Для даних буду робити каталог в /data, там в мене зараз живуть сайти, це окремий Digtical Ocean volume, який автоматом бекапиться самим Digtical Ocean:
...
Press Enter to Continue
Successfully received certificate.
Certificate is saved at: /etc/letsencrypt/live/monitoring.example.org.ua/fullchain.pem
Key is saved at: /etc/letsencrypt/live/monitoring.example.org.ua/privkey.pem
This certificate expires on 2026-01-24.
Додавання NGINX virtualhost
В файлі /etc/nginx/conf.d/monitoring.example.org.ua.conf описуємо новий server і location:
root@setevoy-do-2023-09-02:/opt/influx# nginx -t
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful
Треба імпортувати вже існуючі дані з Google Sheets в InfluxDB і згенерувати метрики. Благо в мене з попередніх років в Таблицях все структуровано, InfluxDB вміє приймати .csv, тому тут проблем (майже) не було.
Завантажуємо таблицю собі на машину в .csv:
Отримуємо такий документ:
$ head 2025-Daily-Sleep-self.csv
Date,Sleep_rate_my_day,Sleepy_day,Wake_ups,Mults
2025-01-01,7,1,,
2025-01-02,7,1,,
2025-01-03,7,2,,
2025-01-04,5,3,,
Таблиць в мене кілька:
Для кожної зробимо окрему метрику, а в тегах використаємо імена колонок:
Найпростіший спосіб завантажити csv – через UI:
Але в даному випадку він не спрацює, бо не той формат дати – в мене 2025-01-09, а InfluxDB хоче повний rfc3339, тобто 2025-01-09T00:00:00Z.
Згадуємо, що колись вміли в awk Йдемо до ChatGPT, отримуємо команду для форматування дати:
Додаємо собі $PATH:/usr/libexec/docker/cli-plugins/:/opt/influx, налаштовуємо підключення:
root@setevoy-do-2023-09-02:/opt/influx# influx config create --config-name local --host-url http://localhost:8086 --org setevoy --token $INFLUX_TOKEN --active
Active Name URL Org
* local http://localhost:8086 setevoy
І завантажуємо дані – додаємо --header, бо формат InfluxDB вимагає цих анотацій, див. Extended annotated CSV:
root@setevoy-do-2023-09-02:/data/influx/import# influx write --bucket self-monitoring-1 --file 2025-Daily-Sleep-self-rfc3339.csv --format csv --header "#constant measurement,sleep_daily" --header "#datatype dateTime:RFC3339,double,double,double,double"
2025/10/26 11:32:24 line 303: no field data found
2025/10/26 11:32:24 line 304: no field data found
2025/10/26 11:32:24 Unable to batcher to error-file: invalid argument
2025/10/26 11:32:24 line 305: no field data found
2025/10/26 11:32:24 Unable to batcher to error-file: invalid argument
2025/10/26 11:32:24 line 306: no field data found
...
Таблиці за 2023 і 204 в мене окремими документами, аналогічно додаємо їх – і тепер маємо всі дані в одному місці:
Всі дані за 2.5 роки на одній дашборді.
Офігєть.
Веб-форма з Flask для внесення даних
Наступна задача – додати можливість вносити нові дані.
Перший варіант – продовжити писати в Google Sheets, на сервері скриптом отримувати їх, фіксити дату і пушити в базу, а скрипт запускати по крону.
Плюси – звична схема, і є “бекап” у вигляді гугл-таблиць.
Мінуси – буде проблема з тим, як в скрипті перевіряти які дані в базі вже є, аби не дублювати старі записи, і нові дані з Google Sheets в базі з’являться не відразу, а коли відпрацює крон.
Другий варіант – повністю нова схема: написати простеньку веб-сторінку, яка через InfluxDB клієнт буде записувати нові дані.
Мінуси – доведеться налаштовувати додатковий location в NGINX і запускати якийсь сервіс, який це скрипт буде оброблювати.
Врешті-решт все ж зупинився на другому варіанті.
Як це буде працювати:
gunicorn для запуску Flask app
index.html шаблон
metrics.json з описом метрик і їхніх тегів
app.py, який отримує дані з форми вводу в HTML і виконує операції в InfluxDB
Шаблон для метрик
Аби спростити життя далі – щоб простіше було додавати нові метрики – створимо JSON, який буде використовуватись в app.py аби формувати список метрик і їхніх тегів.
import os
import json
from datetime import date, datetime, time, timezone
from flask import Flask, render_template, request, jsonify
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
app = Flask(__name__)
# === InfluxDB config ===
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "tOx***iuw=="
INFLUX_ORG = "setevoy"
# default bucket, if user doesn't choose one from the html form
DEFAULT_BUCKET = "self-monitoring-1"
# load metrics from the 'metrics.json'
METRICS_FILE = os.path.join(os.path.dirname(__file__), "metrics.json")
with open(METRICS_FILE, "r") as f:
METRICS = json.load(f)
@app.get("/set")
@app.get("/set/")
def index():
"""Render HTML form with today's date pre-filled"""
return render_template(
"index.html",
metrics=METRICS,
today_date=date.today().isoformat()
)
@app.post("/set/submit")
def submit():
"""Handle form submission and write data to InfluxDB"""
form = request.form
# --- 1) Date from form or today ---
date_str = form.get("date")
if date_str:
try:
selected_date = datetime.fromisoformat(date_str).date()
except ValueError:
return jsonify({"ok": False, "error": "Bad date format, expected YYYY-MM-DD"}), 400
else:
selected_date = date.today()
# --- 2) Fixed time: 03:00 UTC ---
ts = datetime.combine(selected_date, time(3, 0, 0), tzinfo=timezone.utc)
wrote, errors = [], []
# --- 3) Get bucket from form or use default ---
bucket = form.get("bucket", DEFAULT_BUCKET)
try:
with InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG) as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
for measurement, fields in METRICS.items():
for field in fields:
raw = form.get(field)
if raw is None or raw == "":
continue
try:
val = float(raw)
except ValueError:
errors.append(f"{measurement}.{field}: not a number: {raw!r}")
continue
point = (
Point(measurement)
.field(field, val)
.time(ts, WritePrecision.NS)
)
# write to selected bucket
write_api.write(bucket=bucket, record=point)
wrote.append(f"{bucket}: {measurement}.{field}={val}")
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
html = f"""
<html>
<body style="font-family:Arial;margin:40px;">
<h3>Data successfully written</h3>
<p><b>Date:</b> {selected_date.isoformat()}</p>
<ul>
{''.join(f'<li>{w}</li>' for w in wrote)}
</ul>
<p><a href="/set"><button>Return to main page</button></a></p>
</body>
</html>
"""
return html
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080, debug=True)
В принципі, тут доволі простий скрипт:
@app.get("/set/"): роут, де буде наша форма, генерує сторінку з файлу index.html
@app.post("/set/submit") і функція submit(): де логіка виконання – є можливість задати дату, вибрати корзину в InfluxDB, в яку будемо писати, бере список метрик і тегів з metrics.json, і через InfluxDBClient вносить дані в InfluxDB
в кінці виводиться ще одна форма з інформацією про те, що саме було записано, і малює кнопку “повернутись назад”
Файл templates/index.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Self Monitoring</title>
<style>
body { font-family: Arial; margin: 40px; }
.metric-block { margin-bottom: 30px; }
label { display: inline-block; width: 180px; }
input { width: 80px; }
</style>
<script>
// set date input to yesterday in local time (YYYY-MM-DD)
function setYesterday() {
const d = new Date();
d.setDate(d.getDate() - 1);
const y = d.getFullYear();
const m = String(d.getMonth() + 1).padStart(2, '0');
const day = String(d.getDate()).padStart(2, '0');
document.getElementById('date').value = `${y}-${m}-${day}`;
}
</script>
</head>
<body>
<h2>Self Monitoring</h2>
<form action="/set/submit" method="post">
<!-- Bucket selector -->
<div style="margin-bottom:16px;">
<label for="bucket">Bucket:</label>
<select id="bucket" name="bucket" required>
<option value="self-monitoring-1">self-monitoring-1</option>
<option value="self-monitoring-test">self-monitoring-test</option>
</select>
</div>
<!-- Date picker -->
<div style="margin-bottom:16px;">
<label for="date">Date:</label>
<input type="date" id="date" name="date" value="{{ today_date }}" required>
<button type="button" onclick="setYesterday()">Yesterday</button>
<small>UTC midnight will be used</small>
</div>
{% for measurement, fields in metrics.items() %}
<div class="metric-block">
<h3>{{ measurement }}</h3>
{% for field in fields %}
<div>
<label for="{{ field }}">{{ field }}:</label>
<input type="number" step="any" name="{{ field }}" id="{{ field }}">
</div>
{% endfor %}
</div>
{% endfor %}
<input type="submit" value="Submit">
</form>
</body>
</html>
Я трохи повозився з influx backup, але постійно ловив 401, не став заморачуватись, бо дані оновлюються рідко, тому просто навайбокодив простенький скрипт на bash:
#!/bin/bash
# backup InfluxDB data directory and upload to S3
# set vars
SRC_DIR="/opt/influx"
BACKUP_DIR="/backups/influx"
DATE=$(date +%Y-%m-%d)
ARCHIVE_NAME="${DATE}-influx.tar.gz"
ARCHIVE_PATH="${BACKUP_DIR}/${ARCHIVE_NAME}"
S3_BUCKET="s3://setevoy-influx-backups"
# create backup directory if not exists
mkdir -p "$BACKUP_DIR"
# create tar.gz archive
tar -czf "$ARCHIVE_PATH" -C "$SRC_DIR" .
# check that archive was created
if [ ! -f "$ARCHIVE_PATH" ]; then
echo "❌ Failed to create backup archive!"
exit 1
fi
# upload to S3
aws s3 cp "$ARCHIVE_PATH" "$S3_BUCKET/$ARCHIVE_NAME"
# check upload result
if [ $? -eq 0 ]; then
echo "✅ Uploaded to S3: $S3_BUCKET/$ARCHIVE_NAME"
# remove local archive after successful upload
rm -f "$ARCHIVE_PATH"
echo "🧹 Local archive removed: $ARCHIVE_PATH"
else
echo "⚠️ Upload to S3 failed, keeping local copy."
exit 1
fi
Запускаємо для перевірки:
root@setevoy-do-2023-09-02:~# chmod +x /opt/influx/backup_data.sh
root@setevoy-do-2023-09-02:~# /opt/influx/backup_data.sh
upload: ../backups/influx/2025-10-27-influx.tar.gz to s3://setevoy-influx-backups/2025-10-27-influx.tar.gz
✅ Uploaded to S3: s3://setevoy-influx-backups/2025-10-27-influx.tar.gz
🧹 Local archive removed: /backups/influx/2025-10-27-influx.tar.gz
Є в мене давня ідея self-monitoring, яку, сподіваюсь, я такі почну робити і про яку напишу окремо.
Але суть її така сама, як і в етіх ваших моніторингах – збирати метрики, і відображати графіки.
Почав під цю систему вибирати базу даних, і хоча там частота запису метрик невелика, 1 метрика на день, але хочу її робити у звичному мені time series форматі – як ми це робимо в VictoriaMetrics/Prometheus.
А в рамках написання іншого поста, про структуру TSDB та метрики (все ще в чернетках), я торкнувся InfluxDB, про яку згадав і цього разу.
Саму InfluxDB я трохи використовував ще років п’ять тому, але зовсім трохи – вона просто була одним з бекендів для Grafana, коли ми будували автоматичний load testing з JMeter в Kubernetes (колись до цього знов дійде, і напишу теж, бо там дуже класний сетап).
Але так, щоб самому використовувати InfluxDB – досвіду не було. І коли я зараз глянув на неї – то система прям дуже сподобалась, а тому для свого self-monitoring буду використовувати її.
Якщо дуже коротко – то для повноцінного моніторингу, для відносно великого проекту я все ж взяв би саме VictoriaMetrics, бо на великих об’ємах вона буде набагато краща в плані CPU/Memory.
Але для якогось pet project – InfluxDB можливо підійде краще за рахунок того, що в ній “з коробки” є можливість будувати дашборди з графіками, є власний alertmanager, є цікаві штуки для різних автоматизацій.
Втім, у InfluxDB є (відносний) недолік – це більш складна мова запитів, яких до того цілих дві – Flux та InfluxQL. Але можливості query builder для простого використання цілком достатньо.
InfluxDB overview
Власне InfluxDB – ще одна Time Series Database, як вже згадувані VictoriaMetrics або Prometheus.
Головна різниця – VictoriaMetrics та Prometheus працюють по pull-моделі (збирають дані з експортерів), а InfluxDB – це push-модель, коли експортери самі, власне, пушать дані в базу.
Різні і мови запитів – в VictoriaMetrics MetricsQL та PromQL в Prometheus маємо звичні нам функції типу rate() і sum by (), тоді як в InfluxDB це мова Flux (“functional data scripting language“), яка по суті являється повноцінною мовою програмування, та InfluxQL – яка більше схожа на SQL, але в InfluxDB v2 вмикається через костиль, і дефолтна мова саме Flux (але в InfluxDB v3 наче знову буде InfluxQL).
VictoriaMetrics/Prometheus – це частина CNCF-екосистеми і LGPT (Loki + Grafana + Prometheus + Tempo) або PLG (Prometheus + Loki + Grafana) стеків, а InfluxDB – це про TICK stack (Telegraf + InfluxDB + Chronograf + Kapacitor).
При цьому в InfluxDB v2 Chronograf та Kapacitor вже вбудовані в саму систему, окремо запускати не треба.
Ну і дані – VictoriaMetrics та Prometheus заточені під зберігання і роботу саме з “класичними” метриками, тоді як в InfluxDB можна збирати логи, дані з IoT девайсів, events, дані від Telegraf-плагінів тощо.
Крім того, InfluxDB наче краще підходить для довготривалого зберігання даних – і за рахунок самої моделі зберігання даних, і за рахунок вбудованих механізмів для data retention.
Ну і можливості візуалізації даних – якщо в VictoriaMetrics та Prometheus у нас “з коробки” є тільки базові графіки, бо це всеж більше бази даних, то в InfluxDB у нас є повноцінний інтерфейс, через який ми можемо робити всі потрібні налаштування і візуалізації
Запуск InfluxDB з Docker
Для “погратись” просто запустимо локально з Docker:
Але в v3 багато змін, вона не дуже сумісна з другою версією, а більшість гайдів будуть саме по другій, тому давайте працювати з нею.
Note: по ходу гуглінга знайшов цікавий матеріал – What InfluxDB Got Wrong, де як раз говориться про те, що команда InfluxData робить нові версії несумісні з попередніми, і це, звісно, не дуже гуд
Відкриваємо в браузері http://localhost:8086, налаштовуємо юзера, організацію, і дефолтний бакет (про бакети і інші концепти далі):
Відразу отримуємо пропозицію налаштування – “погратись”, advanced, або просто перейти в базу:
Клікаємо Quick start аби отримати якісь базові дані, де нам відразу автоматично налаштовується збір власних метрик InfluxDB і створюється дашборда:
Key concepts
Коротко пройдемось по основних поняттях.
Bucket: на відміну від VictoriaMetrics/Prometheus, в InfluxDB дані організовані в такі собі “корзини” або “бази даних”
Measurement: це по факту звичні нам з VictoriaMetrics/Prometheus метрики, і метрики (я їх буду назвати саме так, хоча, мабуть, це не дуже коректно з технічної точки зору) складаються з:
Tags: labels для метрик, індексуються для швидкого пошуку
Fields: поля зі значеннями, не індексуються
Timestamp: час додавання метрики
Point: конкретний запис (метрика + теги + значення + час), аналог Sample або data points в термінах VictoriaMetircs/Prometheus
Series: група записів (метрика + теги + значення), аналог Time Series в термінах VictoriaMetircs/Prometheus
Формат метрик відрізняється від VictoriaMetrics/Prometheus і записується в форматі line protocol.
Наприклад, у VictoriaMetircs запис може виглядати так:
Тут в InfluxDB метриці маємо власне ім’я метрики node_cpu_seconds_total, два теги зі значеннями – cpu=0,mode=user, поле value зі значенням, і timestamp.
Timestamp можна задавати в UNIX epoch, можна в ISO 8601, тобто 2025-10-25T12:00:00Z, але рекомендований і дефолтний формат – саме UNIX.
Доступ до InfluxDB
Тут маємо на вибір сам UI і Data Exporter, CLI-утиліту influx, та InfluxDB HTTP API для всякої автоматизації.
$ docker exec -ti influxdb influx --help
NAME:
influx - Influx Client
USAGE:
influx [command]
HINT: If you are looking for the InfluxQL shell from 1.x, run "influx v1 shell"
COMMANDS:
version Print the influx CLI version
write Write points to InfluxDB
bucket Bucket management commands
...
Дуже цікава фішка, аналог Jupyter Notebook – “жива” аналітика, експерименти із запитами, автоматизація запитів:
Дозволяє зберігати послідовності, які потім можна використати в InfluxDB Tasks.
Кожен Notebook розбитий на кілька cell, які можуть бути data source для отримання даних, visualization для графіків, і action – створити алерт або Task.
Насправді доволі потужний інструмент з купою плагінів, але для прикладу зберемо метрики CPU з хоста:
Зберігаємо:
І навіть отримуємо інструкції як запустити:
Прямо при запуску ми в Telegraf передаємо URL з конфігом – і він отримає саме ті налаштування, які ми робили на попередньому екрані, тобто нам взагалі не треба писати локальний telegraf.conf.
Прийшла задачка підняти для проекту цікавий сервіс Arize Phoenix для моніторингу і тюнингу використання LLM.
За сам сервіс багато не скажу, бо не користувався, але його запуск вийшов доволі цікавим.
Що будемо робити – спочатку з Helm запустимо тестовий варіант, подивитись як воно взагалі виглядає, потім зробимо повноцінну автоматизацію – Terraform для всяких сікретів, Helm для самого Phoenix.
Власне цей пост буде не стільки про сам Arize Phoenix, скільки просто приклад як з Terraform створити AWS Secrets, і як з Helm та External Secrets Operator ці сікрети отримати.
Тестовий запуск з Helm в Kubernetes
Phoenix підтримує різні варіанти запуску. нам цікавий Helm, документація тут – Kubernetes (helm).
Сам чарт є в Docker Hub (і далі це трохи вилізе боком), всі values є там жеж.
$ kk get all
NAME READY STATUS RESTARTS AGE
pod/phoenix-8677bcc44f-k8w2k 1/1 Running 1 (49s ago) 70s
pod/phoenix-postgresql-0 1/1 Running 0 70s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/phoenix-postgresql ClusterIP 172.20.11.177 <none> 5432/TCP 70s
service/phoenix-svc NodePort 172.20.85.64 <none> 4317:31314/TCP,6006:31180/TCP,9090:31897/TCP 70s
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/phoenix 1/1 1 1 70s
NAME DESIRED CURRENT READY AGE
replicaset.apps/phoenix-8677bcc44f 1 1 1 70s
NAME READY AGE
statefulset.apps/phoenix-postgresql 1/1 71s
По дефолту використовує власний контейнер з PostgreSQL, для Production будемо робити в AWS RDS.
Відкриваємо доступ до порту для WebUI:
$ kk port-forward service/phoenix-svc 6006
Переходимо в браузері на http://localhost:6006, логінимось.
Документація по аутентифікація – тут>>>, і там є цікаві моменти. наприклад, змінити пошту для адміна (і для Member? тобто для звичайних юзерів? не пробував) не міжна:
Neither an Admin nor Member is permitted to change email addresses.
ОК, воно працює – давайте думати про продакшен сетап.
AWS та Terraform
Що нам треба буде з ресурсів в AWS:
запис Route 53 з доменом для доступу юзерів
TLS сертифікат в AWS Certificate Manager
AWS Secrets Manager:
пароль для доступу до Postgres
два паролі для самого Phoenix
пароль для SMTP – навіть якщо він не використовується
resource "aws_route53_record" "phoenix_dns" {
zone_id = data.aws_route53_zone.ops.zone_id
name = local.phoenix_domain_name
type = "CNAME"
ttl = 300
records = [
data.aws_lb.shared_alb.dns_name
]
}
Виконуємо terraform init та terraform plan, перевіряємо, що все ок:
...
Terraform will perform the following actions:
# aws_route53_record.phoenix_dns will be created
+ resource "aws_route53_record" "phoenix_dns" {
+ allow_overwrite = (known after apply)
+ fqdn = (known after apply)
+ id = (known after apply)
+ name = "phoenix.ops.example.co"
+ records = [
+ "k8s-ops133externalalb-***.us-east-1.elb.amazonaws.com",
]
+ ttl = 300
+ type = "CNAME"
+ zone_id = "Z02***OYY"
}
Plan: 1 to add, 0 to change, 0 to destroy.
Сертифікат в AWS ACM
Далі для Ingress та ALB нам потрібно створити сертифікат під цей DNS:
PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD: пароль при сетапі
PHOENIX_ADMIN_SECRET: пароль після сетапу
чесно тут не дуже зрозумів, бо навіть якщо відразу створити і передати PHOENIX_ADMIN_SECRET – то перший логін все одно буде з PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD
# auth.defaultAdminPassword or PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD
# PHOENIX_ADMIN_SECRET
# PHOENIX_SECRET: A long string value that is used to sign JWTs for your deployment.
# PHOENIX_POSTGRES_PASSWORD
# PHOENIX_SMTP_PASSWORD
##############################################
### PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD ###
##############################################
# generate a random password
ephemeral "random_password" "ops_phoenix_default_admin_initail_secret_random_password" {
length = 12
special = true
}
# create an AWS Secret resource
resource "aws_secretsmanager_secret" "ops_phoenix_default_admin_initial_secret" {
name = "/ops/phoenix/phoenix_default_admin_initial_secret"
description = "Default Phoenix admin username and password"
recovery_window_in_days = 0
}
# create an AWS Secret value
resource "aws_secretsmanager_secret_version" "ops_phoenix_default_admin_initial_secret_version" {
secret_id = aws_secretsmanager_secret.ops_phoenix_default_admin_initial_secret.id
secret_string_wo = ephemeral.random_password.ops_phoenix_default_admin_initail_secret_random_password.result
secret_string_wo_version = 1
}
Деплоїмо, перевіряємо Route 53, ACM та Secrets Manager:
Повторюємо для решти – вони всі більш-менш однакові, тільки в деяких просто пароль, в деяких логін:пароль в JSON, і різна довжина.
Бо, наприклад, для PHOENIX_ADMIN_SECRET є перевірка на кількість символів:
...
atlas-phoenix-6865f69ffc-k7hwl:phoenix File "/phoenix/env/phoenix/config.py", line 772, in get_env_phoenix_admin_secret
atlas-phoenix-6865f69ffc-k7hwl:phoenix REQUIREMENTS_FOR_PHOENIX_SECRET.validate(phoenix_admin_secret, "Phoenix secret")
atlas-phoenix-6865f69ffc-k7hwl:phoenix File "/phoenix/env/phoenix/auth.py", line 255, in validate
atlas-phoenix-6865f69ffc-k7hwl:phoenix raise ValueError(err_text)
atlas-phoenix-6865f69ffc-k7hwl:phoenix ValueError: Phoenix secret must be at least 32 characters long
....
Описуємо ресурси:
...
############################
### PHOENIX_ADMIN_SECRET ###
############################
# generate a random password
ephemeral "random_password" "ops_phoenix_admin_secret_random_password" {
length = 32
special = true
}
# create an AWS Secret resource
resource "aws_secretsmanager_secret" "ops_phoenix_admin_secret" {
name = "/ops/phoenix/phoenix_admin_secret"
description = "Phoenix admin username and password"
recovery_window_in_days = 0
}
# create an AWS Secret value
resource "aws_secretsmanager_secret_version" "ops_phoenix_admin_secret_version" {
secret_id = aws_secretsmanager_secret.ops_phoenix_admin_secret.id
secret_string_wo = jsonencode({
login = "admin@localhost"
password = ephemeral.random_password.ops_phoenix_admin_secret_random_password.result
})
secret_string_wo_version = 3
}
######################
### PHOENIX_SECRET ###
######################
# generate a random password
ephemeral "random_password" "ops_phoenix_secret_random_password" {
length = 65
special = false
}
# create an AWS Secret resource
resource "aws_secretsmanager_secret" "ops_phoenix_secret" {
name = "/ops/phoenix/phoenix_secret"
description = "Phoenix secret string used to sign JWTs"
recovery_window_in_days = 0
}
# create an AWS Secret value
resource "aws_secretsmanager_secret_version" "ops_phoenix_secret_version" {
secret_id = aws_secretsmanager_secret.ops_phoenix_secret.id
secret_string_wo = ephemeral.random_password.ops_phoenix_secret_random_password.result
secret_string_wo_version = 1
}
#################################
### PHOENIX_POSTGRES_PASSWORD ###
#################################
# generate a random password
ephemeral "random_password" "ops_phoenix_postgres_random_password" {
length = 12
special = false
}
# create an AWS Secret resource
resource "aws_secretsmanager_secret" "ops_phoenix_postgres_credentials" {
name = "/ops/phoenix/phoenix_postgres_credentials"
description = "Phoenix PostgreSQL username and password"
recovery_window_in_days = 0
}
# create an AWS Secret value
resource "aws_secretsmanager_secret_version" "ops_phoenix_postgres_credentials_version" {
secret_id = aws_secretsmanager_secret.ops_phoenix_postgres_credentials.id
secret_string_wo = ephemeral.random_password.ops_phoenix_postgres_random_password.result
secret_string_wo_version = 3
}
#############################
### PHOENIX_SMTP_PASSWORD ###
#############################
# generate a random password
ephemeral "random_password" "ops_phoenix_smtp_password_random_password" {
length = 12
special = false
}
# create an AWS Secret resource
resource "aws_secretsmanager_secret" "ops_phoenix_smtp_password" {
name = "/ops/phoenix/ops_phoenix_smtp_password"
description = "Phoenix secret string used to sign JWTs"
recovery_window_in_days = 0
}
# create an AWS Secret value
resource "aws_secretsmanager_secret_version" "ops_phoenix_smtp_password_version" {
secret_id = aws_secretsmanager_secret.ops_phoenix_smtp_password.id
secret_string_wo = ephemeral.random_password.ops_phoenix_smtp_password_random_password.result
secret_string_wo_version = 2
}
З Terraform все, можемо готувати базу Postgres.
PostgreSQL user and database
Сервер у нас вже є, тому зараз просто створити базу і юзера.
Створюємо юзера, базу, даємо повний доступ до цієї бази:
ops_grafana_db=> CREATE USER ops_phoenix_user WITH PASSWORD '***';
CREATE ROLE
ops_grafana_db=> CREATE DATABASE ops_phoenix_db OWNER ops_phoenix_user;
CREATE DATABASE
ops_grafana_db=> GRANT ALL PRIVILEGES ON DATABASE ops_phoenix_db TO ops_phoenix_user;
GRANT
Тепер робимо helm dependency update, і ловимо “response status code 401” від Docker Hub:
...
Update Complete. ⎈Happy Helming!⎈
Error: could not retrieve list of tags for repository oci://registry-1.docker.io/arizephoenix/phoenix-helm: GET "https://registry-1.docker.io/v2/arizephoenix/phoenix-helm/phoenix/tags/list": response status code 401: unauthorized: authentication required: [map[Action:pull Class: Name:arizephoenix/phoenix-helm/phoenix Type:repository]]
Тому що Helm при dependency update намагається отримати всі доступні теги з tags/list, а в Docker Hub для цього потрібно залогінитись.
Логінитись туди я і не хочу, і це зламає можилу майбутню автоматизацію, тому робимо костиль.
Пишемо Makefile в якому додаємо таргет на helm pull oci://:
...
phoenix-helm:
auth:
# Kubernetes Secret name
name: phoenix-secret
# use AWS RDS instead of deploying local
postgresql:
enabled: false
database:
postgres:
host: db.monitoring.ops.example.co
user: ops_phoenix_user
db: ops_phoenix_db
...
Деплоїмо, перевіряємо:
Налаштування Ingress
Сам Ingress enabled by default, тому нам треба тільки додати атрибути, через які він “замапиться” на наш загальний AWS Application Load Balancer через анотацію alb.ingress.kubernetes.io/group.name.
Але і тут є нюанс: в чарті нема можливості задати spec.ingressClassName="alb".
Тому робимо трохи deprecated way, теж через annotations:
Перший логін робимо з паролем PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD, далі Phoenix запросить його змінити – задаємо наш із PHOENIX_ADMIN_SECRET, віддаємо девелоперам на погратись:
Дебажимо одну проблему з використанням пам’яті в Kubernetes Pods, і вирішили подивитись на пам’ять і кількість процесів на нодах.
Сама проблема полягає в тому, що зазвичай Kubernetes Pod з Livekit споживає близько 2 гігабайт пам’яті, але іноді бувають спайки до 10-11 гіг, через що под вбивається:
Що ми хочемо визначити: це один процес починає стільки пам’яті “їсти” – чи просто створюється багато процесів в контейнері?
Самий простий варіант тут – використати Prometheus Process Exporter, який запускається у вигляді DaemonSet, на кожній WorkerNode створює власний контейнер, і для всіх чи обраних процесів на EC2 збирає статистику з /proc.
root@backend-celery-workers-deployment-5bc64557c8-zbq2j:/app# ps aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 1 0.2 1.4 544832 236720 ? Ss 07:27 0:24 /usr/local/bin/python /usr/local/bin/celery -A celery_app.app worker [...]
...
Та Livekit:
root@backend-livekit-agent-deployment-7d9bf86564-qgjzb:/app# ps aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 1 0.4 1.8 2112944 294772 ? Ssl 07:06 0:46 python -m cortex.livekit_agent.main start
root 24 0.0 0.0 15788 12860 ? S 07:06 0:00 /usr/local/bin/python -c from multiprocessing.resource_tracker import main;main(34)
root 25 0.0 0.6 342976 102852 ? S 07:06 0:02 /usr/local/bin/python -c from multiprocessing.forkserver import main [...]
...
Додаємо конфіг для process-exporter – описуємо nameMatchers:
...
process-exporter:
enabled: true
tolerations:
operator: Exists
effect: NoSchedule
- key: CriticalAddonsOnly
config:
# metrics will be broken down by thread name as well as group name
threads: true
# any process that otherwise isn't part of its own group becomes part of the first group found (if any) when walking the process tree upwards
children: true
# means that on each scrape the process names are re-evaluated
recheck: false
# remove_empty_groups drop empty groups if no processes found
remove_empty_groups: true
nameMatchers:
# gunicorn (python + uvicorn workers)
- name: "gunicorn"
exe:
- /usr/local/bin/python
cmdline:
- ".*gunicorn.*"
# celery worker
- name: "celery-worker"
exe:
- /usr/local/bin/python
cmdline:
- ".*celery.*worker.*"
# livekit agent
- name: "livekit-agent"
exe:
- python
- /usr/local/bin/python
cmdline:
- ".*cortex.livekit_agent.main.*"
# livekit multiprocessing helpers
- name: "livekit-multiproc"
exe:
- /usr/local/bin/python
cmdline:
- ".*multiprocessing.*"
Тут в exe – список самого executable (можна кілька), а в cmdline – аргументи, з якими процес запущено.
Тобто для Livekit у нас exe – “/usr/local/bin/python“, а cmdline – це “-c from multiprocessing.resource_tracker [...]” або “-c from multiprocessing.forkserver [...]“.
Деплоїмо, і тепер залишилось тільки три групи:
Але є нюанси.
Перше – статистика збирається з кожної ноди по всій групі процесів.
Тобто, якщо ми зробимо:
sum(namedprocess_namegroup_memory_bytes{memtype="resident", groupname="celery-worker"}) by (groupname, instance, pod)
То отримаємо суму всіх RSS всіх Celery-воркерів на ноді, де запущений відповідний process-exporter Pod:
А друга проблема – що Process Exporter не має лейбли з іменем WorkerNode, з якої зібрані метрики.
Тому тут тільки шукати вручну – по Pod IP (лейбла instance) можемо знайти його Node:
$ kk get pod -o wide | grep 10.0.45.166
atlas-victoriametrics-process-exporter-4zdzl 1/1 Running 0 6m51s 10.0.45.166 ip-10-0-40-195.ec2.internal <none> <none>
Повертаючись до питання того, що немає інформації по кожному процесу: ми можемо отримати середнє значення по кожному, бо у нас є метрика namedprocess_namegroup_num_procs:
sum(namedprocess_namegroup_memory_bytes{memtype="resident", groupname="celery-worker", instance="10.0.45.166:9256"}) by (groupname, instance, pod)
/
sum(namedprocess_namegroup_num_procs{groupname="celery-worker", instance="10.0.45.166:9256"}) by (groupname, instance, pod)
Результат ~230 MB:
Як ми і бачили в ps -eo rss,cmd.
Name Group Template variables та інформація по кожному процесу
Або, якщо нам прям дуже хочеться бачити статистику по кожному процесу – ми можемо використати динамічні імена для groupname з {{.PID}} – тоді для кожного процесу буде формуватись окрема група, див. Using a config file: group name:
Але цей варіант ОК тільки для якщо вам треба щось подебажити, і відключити, бо призведе до High cardinality issue.
Результат нашого дебагу
Власне, що нам потрібно було дізнатись – пам’ять “утікає” в якомусь одному процесі, чи просто створюється багато процесів в одному Pod?
Для цього в Grafana зробили графік із запитом:
sum(namedprocess_namegroup_memory_bytes{memtype="resident", groupname=~"livekit-multiproc-.*"}) by (groupname, instance)
До нього додали графіки з метриками самого Livekit – lk_agents_active_job_count та lk_agents_child_process_count, і окремо – графік з VictoriaLogs, де виводимо кількість API-запитів кожного юзера по полю token_email:
Де і бачимо, що один і той же юзер починає робити пачку запитів для підключення до Livekit, через що в Livekit Pod створюється пачка процесів (по новій Livekit Job на кожен запит), і в результаті загальна кількість пам’яті в поді зашкалює, бо 40 процесів по ~380 MB це ~15 гігабайт пам’яті.
Але в кожному конкретному процесі пам’ять тримається на рівні 300-400 мегабайт.
Залишилось розібратись чому саме спавняться процеси, але то вже задачка девелоперам.