Почав писати log collector з S3 до VictoriaLogs з використанням AWS GO SDK, і в коді достатньо багато використовуються різні Input/Ouput операції, бо треба отримати лог, розпарсити, записати дані.
Тож цього разу подивимось на дуже класний приклад використання інтерфейсів при роботі з функцією io.Copy() і ще раз трохи зазирнемо під капот внутрішньої реалізації інтерфейсів в Go.
Basic I/O example – os.Open(), os.Create() та io.Copy()
Напишемо простий код, який буде з одного файлу копіювати в інший:
package main
import (
"fmt"
"io"
"os"
)
func main() {
// open the source file for reading
// returns pointer to os.File:
// 'func os.Open(name string) (*os.File, error)':
//
// os.File represents an open file descriptor:
// type File struct {
// // contains filtered or unexported fields
// }
sourceFile, err := os.Open("source.txt")
if err != nil {
panic(err)
}
// always close files when done
defer sourceFile.Close()
// create destination file for writing
// 'func os.Create(name string) (*os.File, error)'
destFile, err := os.Create("dest.txt")
if err != nil {
panic(err)
}
defer destFile.Close()
// copy data from source to destination
// io.Copy() pulls bytes from any Reader and pushes them into any Writer
// 'func io.Copy(dst io.Writer, src io.Reader) (written int64, err error)'
bytesWritten, err := io.Copy(destFile, sourceFile)
if err != nil {
panic(err)
}
fmt.Println("Copied bytes:", bytesWritten)
}
Тут ми:
з os.Open("source.txt") відкриваємо файл на читання
з os.Create("dest.txt") створюємо файл, в який будемо копіювати дані
і з io.Copy() копіюємо дані з “source.txt” до “dest.txt“
І для обох функцій – і Copy(), і copyBuffer() – аргументи є interface type:
А type Writer interface описує вимоги до типу, що може бути використаний через цей інтерфейс: такий тип повинен мати метод Write(), приймати аргумент з типом slice of bytes, і повертати значення int та err:
// Writer is the interface that wraps the basic Write method.
type Writer interface {
Write(p []byte) (n int, err error)
}
Об’єкт destFile повинен мати метод Write() – який в нього є, бо destFile – це *os.File struct, в якої є набір методів, в тому числі як раз Read() та Write():
$ go doc os.File
package os // import "os"
type File struct {
// Has unexported fields.
}
File represents an open file descriptor.
...
func (f *File) Read(b []byte) (n int, err error)
...
func (f *File) Write(b []byte) (n int, err error)
А отже, маючи об’єкт з типом *os.File – ми через відповідні інтерфейси можемо викликати *os.File.Write():
func Copy(dst Writer, ...) каже – “dst повинен мати метод Write([]byte) (int, error)“
тип *os.File має метод Write() – а значить він задовольняє Writer interface
Тобто: інтерфейси в Go описують не типи даних, а вимоги до методів, які мають бути реалізовані, щоб ці методи можна було викликати через інтерфейс.
І коли ми пишемо і запускаємо io.Copy(destFile, ...) – під капотом Go під час компіляції програми:
перевіряє, який тип приймає io.Copy() – це interface type
аби задовільнити конкретно цей interface type – об’єкт (тип), який передається аргументом до io.Copy(), повинен мати метод Write()
Go перевіряє, чи є у переданого типу такий метод – чи є для об’єкту *os.File метод Write()
Далі – “магія”, описана в попередньому пості: ще раз глянемо на те, як працюють інтерфейси, як через них викликаються методи, і що саме знаходиться в аргументах io.Copy() та copyBuffer() при роботі програми.
Структури iface та itab
Коли ми передаємо обʼєкт (pointer на *os.File) у параметр із типом інтерфейсу (dst Writer) – то Go формує дві внутрішні структури, які передаються до функцій як interface value.
type iface struct {
// Pointer to the 'itab' (interface table)
tab unsafe.Pointer
// Pointer to the actual data (our *os.File struct)
data unsafe.Pointer
}
Де:
tab unsafe.Pointer: pointer на другий тип type itab, який описаний в type ITab struct
раніше було type itab struct, зараз перенесли в Go ABI, про ABI в наступному пості
data unsafe.Pointer: pointer на наш об’єкт з типом os.File struct, який має метод Write()
Друга структура, ITab struct, має свої три поля:
type ITab struct {
// pointer to the 'type Writer interface'
Inter *InterfaceType
// pointer to the 'type File struct'
Type *Type
// in our case if we have 1 method, thus '[N]uintptr' == [1]uintptr
// and in the 'fun[0]' will be the address of the method 'Write()' of the 'os.File' struct
Fun [1]uintptr // will have '[1]uintptr', and
}
Тут:
Inter *InterfaceType: pointer на опис type Writer interface
“який інтерфейс треба задовольнити“
Type: pointer на опис конкретного типу значення (у нашому випадку тип *os.File)
“який тип ми передаємо“
'Fun[0]': буде посиланням на метод Write() структури os.File
“ось адреси методів, які цей тип використовує для реалізації цього інтерфейсу“
І коли ми в коді передаємо значення типу *os.File в параметр інтерфейсного типу (dst Writer) – то Go створює ці структури, і передає структура iface з полями tab і data до виклику io.Copy(), а потім далі – до copyBuffer():
io.Copy(iface):
- iface.tab => вказівник на структуру itab
- iface.data => вказівник на *os.File
В itab struct маємо таблицю методів, пов’язаних з цим інтерфейсом (або – які імплементують цей інтерфейс), а в полі fun структури itab знаходиться масив з pointers, де кожен елемент містить адресу функції, яка реалізує відповідний метод інтерфейсу для конкретного типу.
І у випадку з інтерфейсом Writer – це буде масив fun[0] зі значенням, наприклад, 0xc000014070, де за адресою 0xc000014070 буде розташований метод Write() типу *os.File.
І коли в copyBuffer(dst Writer) виконується виклик Write(), який описаний як:
Повертаючись до твердження “при виклику io.Copy(*os.File) – викликається copyBuffer(), якому першим аргументом передається структура iface” – давайте подивимось на аргументи, з якими ми працюємо.
Перевірка типів інтерфейсних значень в аргументах
Аби побачити все своїми очима – повторимо “хак” з попереднього поста – створимо власну структуру, яка аналогічна до iface, бо напряму до iface ми звернутись не можемо – але можемо прочитати її памʼять через unsafe.Pointer.
І на додачу створимо власну функцію myCopy(), яка буде мати в параметрах наші власні інтерфейси – аналогічно тому, як це зроблено для io.Copy().
Тобто – ми повністю повторюємо поведінку оригінального io.Copy(), але замість справжніх io.Reader та io.Writer використовуємо свої інтерфейси і власну структуру myIfaceStruct, аби подивитись, як Go зберігає інтерфейс у памʼяті:
створюємо два об’єкти sourceFile та destFile, які є pointers на *os.File
описуємо власну функцію myCopy(), яка в параметрах описує отримання інтерфейсних типів
наші інтерфейси myReaderInterface та myWriterInterface вимагають методів Read() та Write(), які є у sourceFile та destFile
Код виходить такий:
package main
import (
"fmt"
"io"
"os"
"unsafe"
)
type myIfaceStruct struct {
tab unsafe.Pointer
data unsafe.Pointer
}
// Writer is the interface that wraps the basic Write method.
type myWriterInterface interface {
// define Write method to satisfy the myWriterInterface interface
Write(p []byte) (n int, err error)
}
// Reader is the interface that wraps the basic Read method.
type myReaderInterface interface {
// define Read method to satisfy the myReaderInterface interface
Read(p []byte) (n int, err error)
}
// accept any type which has Read and Write methods
func myCopy(src myReaderInterface, dst myWriterInterface) (int64, error) {
// '&src' gives us the address of the interface variable 'src'
// 'unsafe.Pointer(&src)' allows us to reinterpret that memory as a different type
// the interface value occupies 16 bytes:
// - first 8 bytes: pointer to the method/type table ('tab')
// - next 8 bytes: pointer to the actual value ('data')
// '(*myIfaceStruct)(...)' tells Go to treat those bytes as a 'myIfaceStruct'
// '*(*myIfaceStruct)(...)' finally copies those bytes into the 'rawIface' variable
rawIface := *(*myIfaceStruct)(unsafe.Pointer(&src))
fmt.Println()
// Print diagnostic messages
//
// we intentionally use '%p' modifier with a non-pointer value argument
// this causes a formatting error, and 'fmt' prints a diagnostic message
// that includes the full content of 'rawIface' (its type and both fields)
fmt.Printf("'rawIface' data: %p\n", rawIface)
// same idea for %s: &src is a *myReaderInterface, not a string
// so fmt prints a diagnostic message showing the type and value
fmt.Printf("'src' data: %s\n", &src)
fmt.Println()
// Print addresses from the 'iface' struct
//
// 'tab' field is a pointer to the interface's method table (the 'itab' struct)
// this value is copied from the real interface value stored in 'src'
fmt.Printf("Copy of the 'iface.tab': address stored inside 'rawIface.tab': %p\n", rawIface.tab)
// 'data' field is a pointer to the underlying object (the *os.File struct)
// also copied directly from the actual interface storage
fmt.Printf("Copy of the 'iface.data': address stored inside 'rawIface.data': %p\n", rawIface.data)
// print the address of the real underlying object (*os.File)
// this should match the value stored in rawIface.data
fmt.Printf("The 'src' (*os.File) actual object address: %p\n", src)
fmt.Println()
// Test sizes
//
// 'src' will have 16 bytes
// because 'iface' has two fields: 'tab' and 'data'
// they are pointers, each of 8 bytes
fmt.Println("sizeof the 'src' (size of 'iface' with two pointers):", unsafe.Sizeof(src))
// but pointer to the '*os.File' object size will be 8 bytes
testSource, _ := os.Open("source.txt")
fmt.Println("sizeof the 'testSource' (size of '*os.File' with one pointer):", unsafe.Sizeof(testSource))
fmt.Println()
// demonstrate "dynamic types"
//
// - Printf '%T' modifier will print the type of the variable
// - Printf '%p' modifier will print the address pointed to by '&'
fmt.Printf("'src' type: %T\n", src)
// address of the the 'src'
fmt.Printf("'src' address: %p\n", &src)
fmt.Println()
return io.Copy(dst, src)
}
func main() {
// sourceFile is *os.File
sourceFile, _ := os.Open("source.txt")
defer sourceFile.Close()
// destFile is *os.File
destFile, _ := os.Create("dest.txt")
defer destFile.Close()
myCopy(sourceFile, destFile)
}
Запускаємо:
$ go run test-int.go
'rawIface' data: %!p(main.myIfaceStruct={0x4eee38 0xc000062030})
'src' data: %!s(*main.myReaderInterface=0xc000014070)
Copy of the 'iface.tab': address stored inside 'rawIface.tab': 0x4eee38
Copy of the 'iface.data': address stored inside 'rawIface.data': 0xc000062030
The 'src' (*os.File) actual object address: 0xc000062030
sizeof the 'src' (size of 'iface' with two pointers): 16
sizeof the 'testSource' (size of '*os.File' with one pointer): 8
'src' type: *os.File
'src' address: 0xc000014070
І розбираємо результат.
Перші два – зовсім “грязний хак”, випадково на нього натрапив: якщо до модифікатора в fmt.Printf() передати не той тип даних, який він очікує – він виводить повідомлення з деталями по помилці, де можемо побачити, що саме повністю передавалось (хоча як виявилось, під капотом просто викликається (reflect.TypeOf(p.arg).String())).
Перший блок:
rawIface є типом main.myIfaceStruct, яка містить два вказівники на адреси 0x4eee38 та 0xc000062030 – див. далі про зміст rawIface
src є поінтером на *main.myReaderInterface – структуру, яка знаходиться за адресою 0xc000014070
Далі – виводимо адреси, які зберігаються в полях iface (і які ми отримали через нашу власну структуру):
'rawIface.tab': 0x4eee38 – тут адреса розміщення itab struct
'rawIface.data': 0xc000062030 – тут адреса переданого через src об’єкту os.File
і ту саму адресу ми бачимо в наступному рядку – src є pointer на *os.File, з Printf(%p) отримуємо адресу, на яку src вказує
Найбільш явний доказ того, що насправді myCopy() у (src myReaderInterface) працює з інтерфейсом, а не *os.File – це розмір:
з unsafe.Sizeof(src) отримуємо розмір самого інтерфейсного значення (iface), яке складається з двох pointers – tab і data, по 8 байт кожен
а testSource := os.Open("source.txt") має розмір 8 байт, бо це один поінтер
Інтерфейси Go та “dynamic type”
А далі ми бачимо те, що називають “динамічними типами”: в результатах unsafe.Sizeof(src)) ми побачили, що там 2 поінтери, тобто це 100% тип interface value з двома pointers.
Але в fmt.Printf("'src' type: %T\n", src) ми отримуємо тип *os.File – бо це pointer на структуру os.File:
$ go run test-int.go
...
'src' data: %!s(*main.myReaderInterface=0xc000014070)
...
'src' type: *os.File
'src' address: 0xc000014070
The static type (or just type) of a variable is the type given in its declaration, the type provided in the new call or composite literal, or the type of an element of a structured variable. Variables of interface type also have a distinct dynamic type, which is the (non-interface) type of the value assigned to the variable at run time (unless the value is the predeclared identifier nil, which has no type). The dynamic type may vary during execution but values stored in interface variables are always assignable to the static type of the variable.
Отже, змінна має static type, коли:
змінна оголошується (var i int)
тип заданий під час присвоювання даних при виклику функцій (x := new(int))
Проте змінні інтерфейсного типу завжди мають фіксований статичний тип (сам інтерфейс) – але реальний об’єкт всередині неї має окремий dynamic type – це конкретний тип значення, присвоєного під час виконання.
І в нашому прикладі вище – iface.data як раз і є тою змінною, яка визначає dynamic type, і тому ми в результаті fmt.Printf("'src' type: %T\n", src) бачимо саме *os.File.
Додаємо до нашого коду ще трохи дебагу:
...
// show the static type of the interface itself
// - (*myReaderInterface)(nil) creates a nil pointer to the interface type
// - reflect.TypeOf(...) gives the type of that pointer
// - Elem() gives the type the pointer points to (the interface type)
// this demonstrates that the static type is 'myReaderInterface'
fmt.Println("static type of the 'myReaderInterface':", reflect.TypeOf((*myReaderInterface)(nil)).Elem())
// show the dynamic type stored inside the interface variable 'src'
// - 'src' is an interface value (16-byte iface: tab + data)
// - reflect.TypeOf(src) reads the real type stored in iface.data
// this prints the actual type, '*os.File' in our case
//
// and this is exactly the same information that 'fmt.Printf("%T", src)' prints:
// both reflect.TypeOf(src) and %T reveal the dynamic type stored in the interface
fmt.Printf("'src' dynamic type: %v\n", reflect.TypeOf(src))
// show the type of the variable 'src' itself, not the value stored inside it
// this is exactly what the myCopy() function "sees" when receiving its argument
// - '&src' is a pointer to the interface variable
// - reflect.TypeOf(&src) therefore reports: "*myReaderInterface"
// this confirms that 'src' is an interface-typed variable, not a concrete value
fmt.Println("'src' variable type: ", reflect.TypeOf(&src))
...
Результат:
$ go run test-int.go
...
static type of the 'myReaderInterface': main.myReaderInterface
'src' dynamic type: *os.File
'src' variable type: *main.myReaderInterface
Тут ми:
в першій перевірці просто створюємо вказівник на інтерфейсний тип (але без створення самого об’єкту): результат є main.myReaderInterface
другий результат – “прочитай значення інтерфейсної змінної src, і скажи, який там тип” – саме тут ми бачимо, що в iface.data зберігається pointer на об’єкт типу – *os.File
третя перевірка – “сходи за адресою, де зберігається змінна src, і скажи який за цією адресою тип даних” – отримуємо pointer на *main.myReaderInterface
Використання інтерфейсів на прикладі io.Copy()
То що це все значить для нас?
А значить, що використовуючи інтерфейси, ми можемо передати будь-які значення (типи), які реалізують інтерфейс.
Якщо повернутись до нашого першого коду, то в io.Copy() першим параметром ми можемо передати будь-який тип, який має метод Write([]byte) (int, error), а в другий – аналогічно, тільки Read(), бо під капотом Copy() викликає copyBuffer(), а той просто створює буфер розміром в 32 кілобайти, чей який “переливає” з одного “каналу” в інший:
func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
...
if buf == nil {
size := 32 * 1024
...
buf = make([]byte, size)
}
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
...
А значить – ми можемо у Writer передати os.Stdout, тобто просто вивести на консоль:
...
func main() {
// open the source file for reading
// returs pointer to os.File:
// 'func os.Open(name string) (*os.File, error)':
//
// os.File represents an open file descriptor:
// type File struct {
// // contains filtered or unexported fields
// }
sourceFile, err := os.Open("source.txt")
if err != nil {
panic(err)
}
// always close files when done
defer sourceFile.Close()
// printto console instead
// actualy, os.Stdout is also *os.File
// thus it also has Write() method
bytesWritten, err := io.Copy(os.Stdout, sourceFile)
if err != nil {
panic(err)
}
fmt.Println("Copied bytes:", bytesWritten)
}
Результат:
$ go run main.go
source
Copied bytes: 7
Або можемо створити власний буфер в пам’яті, і писати в нього, бо bytes.Buffer теж має метод Write():
Є задачка на моніторинг костів на OpenAI – бачити скільки за добу витрачено кожним проектом, і слати алерти в Slack, коли витрати завеликі.
Потикав кілька готових експортерів для OpenAI, але не побачив там метрик саме по костам, тому просто напишемо свій.
Писати будемо на Golang, ідея дуже проста – з OpenAI API отримуємо дані, генеруємо метрику, відправляємо її до VictoriaMetrics.
На Go останній раз писав у 2019 році, і то один раз, тому заодно будемо згадувати що і як працює, і місцями дивитись деталі реалізації різних бібліотек.
Поїхали.
OpenAI API
Документація по OpenAI API – Costs та повертаєме значення – Costs object.
Для доступу до Costs потрібен окремий ключ – робимо на platform.openai.com в Admin keys:
Для отримання Costs треба задавати параметр start_time в Unix форматі – створюємо змінну:
для структури Request маємо метод EnableTrace() який теж повертає Request
і для того ж Request маємо метод Get(), який теж повертає Request плюс error
І це дозволяє нам будувати ланцюжки запитів – Client => R() => Request => EnableTrace() => Request => Get().
Окей, давайте до коду.
Створення resty клієнта
Пишемо main.go:
package main
import (
"fmt"
"github.com/go-resty/resty/v2"
)
// set global const as ay be used in other packages
const (
baseURL = "https://api.openai.com/v1"
costsPath = "/organization/costs"
)
func main() {
client := resty.New()
// build 'https://api.openai.com/v1/organization/costs'
response, err := client.R().Get(baseURL + costsPath)
if err != nil {
panic(err)
}
fmt.Println(response)
}
Запускаємо:
$ go run main.go
{
"error": {
"message": "You didn't provide an API key. You need to provide your API key in an Authorization header using Bearer auth (i.e. Authorization: Bearer YOUR_KEY). You can obtain an API key from https://platform.openai.com/account/api-keys."
...
Далі нам треба додати auth header до нашого запиту – використовуємо метод func (*Client) SetAuthToken, який просто додає значення до поля Token в об’єкті Client.
Ще є окремий метод func (r *Request) SetAuthToken, який задає токен на конкретні реквести, а не на весь клієнт, але в нашому випадку робимо простіше, через загальний Client.
Робимо method chaining із прикладу вище – для Client викликаємо SetAuthToken(), який задає токен, наступним викликаємо R() для створення request, і наступним викликаємо Get(), в який передаємо URL:
Зараз нам треба тільки start_time, але потім будемо додавати ще, тому можна їх відразу записати в map, який потім передамо до SetQueryParams().
Для start_time нам треба передати час – робимо з time.Now(), і передавати дату до OpenAI API нам треба в Unix форматі, тому використовуємо функцію Unix().
Перевіряємо як воно буде виглядати:
gore> :import time
gore> timeNow := time.Now().Unix()
1762956432
Додаємо в код створення змінної timeNow з часом, створення setQueryParams map of strings зі списком параметрів теж в strings, і додаємо виклик SetQueryParams() до client:
А в функції parseResponseBody() викликається метод Unmarshalc, який в свою чергу викликає Client.JSONUnmarshal(), а поле JSONUnmarshal містить функцію json.Unmarshal():
...
func createClient(hc *http.Client) *Client {
if hc.Transport == nil {
hc.Transport = createTransport(nil)
}
c := &Client{ // not setting lang default values
...
JSONUnmarshal: json.Unmarshal,
...
array, масив: фіксована довжина, індексований тип, всі об’єкти того самого типу – [3]int{1,2,3}
slice: аналогічний до array, але не фіксованої довжини – []int{1,2,3}
maps: набір key:value елементів змінної довжини одного типу – map[string]string{"key_name": "value_value"}
structs: комплексний тип, який може включати в себе інші типи – struct{ Name string; Age int }{ Name: "Nino", Age: 35 }
Так як ми знаємо, які типи ми отримуємо з API та всі поля в них – то нам підійде slice of structs, де кожен елемент slice буде структурою з полями, в яких ми будемо зберігати project_id, amount та project_name.
Структура для Project ID та Amount
Структура може виглядати так:
type ProjectSpend struct {
ProjectID string
ProjectSpend int
}
А потім створимо slice з цією структурою:
data := []ProjectSpend{}
Тепер давайте подивимось на те, що нам повертає OpenAI API.
має кілька JSON properties – "object": "page", etc
далі йде масив data []
який містить в собі інший object {}
який починається з properties "object": "bucket", etc
і в якому є інший масив results []
який включає в себе ще один object {}
який починається із property "object": "organization.costs.result"
за яким слідує property amount, який містить в собі вкладений object {}
з двома property – value та value
Якщо ми хочемо це відобразити в Go struct – то нам потрібно створити кілька структур, які будуть передавати дані одна до одної:
перша структура “захоплює” data[]
друга структура – отримує results[]
третя – отримує значення поля project_id
а четверта – зчитує amount
Як це може виглядати в коді – з використання structs composition, коли одна структура містить в собі поле, яке є іншою структурою:
type ResponceAmount struct {
Value float64
}
type ResponceProjectID struct {
ProjectID string `json:"project_id"`
Amount ResponceAmount
}
type ResponseResults struct {
Results []ResponceProjectID
}
type ResponseData struct {
Data []ResponseResults
}
res := &ResponseData{}
І тепер можемо виконати json.Unmarshall через виклик SetResult(), в який ме передаємо pointer – res := &ResponseData{}:
$ go run main.go
...
Result: &{[{[{proj_1 {2.16911625}} {proj_Agtar0XzJdXXLhGt8YCRNZMY {0.1846203}} {proj_2 {0.1531728}} {proj_3 {0.19788874999999997}}]}]}
Або можемо зробити більш лаконічно – використовуючи nested anonymous structs:
...
// catch data[] and pass to nested struct
// catch results[] and pass to next nested struct
// catch 'project_id' property to the 'ProjectID' field, and pass to next nested struct
// catch 'amount' property to the 'Amount' field, and pass to next nested struct
// finally, catch 'value' property to the 'Value' field
type ResponseData struct {
Data []struct {
Results []struct {
ProjectID string `json:"project_id"`
Amount struct {
Value float64
}
}
}
}
...
І отримаємо той самий результат.
А далі нам потрібно буде згенерувати метрики з лейблами.
Робимо це у два цикли for, в яких перебираємо поля кожної структури:
...
// catch each item from the 'Response.Data[]'
for _, dataItem := range res.Data {
// catch each iteam from the 'Response.Data[].Results[]'
for _, result := range dataItem.Results {
project := result.ProjectID
amount := result.Amount.Value
// print in VictoriaMetrics gauge format
fmt.Printf("openai_stats{type=\"costs\", project=\"%s\"} %f\n", project, amount)
}
}
...
Результат:
$ go run main.go
openai_stats{type="costs", project="proj_1"} 2.170784
openai_stats{type="costs", project="proj_2"} 0.241411
openai_stats{type="costs", project="proj_3"} 0.213558
openai_stats{type="costs", project="proj_4"} 0.198619
А тепер зробимо аналогічно, але для імен проектів, бо мати в лейблах метрик значення у вигляді “proj_123” зовсім незручно, хочеться вивести нормальні імена.
Структура для Project Names
Додаємо другий ендпоінт, див. документацію List projects:
Додавання OPENAI_ADMIN_KEY ключа і параметрів переносимо в створення клієнта, після чого викликаємо нашу функцію, якій передаємо створений і налаштований клієнт:
sanitize імен – форматування даних зі strings.Replace()
Але в іменах у нас є пробіли та символи “/”, і імена проектів містять заглавні букви – а нам в лейблах метрик треба мати вид “my_project_name“.
Додамо функцію, яка буде виконувати нормалізацію використовуючи методи ToLower() та ReplaceAll() із пакету strings:
...
func normalizeLabel(s string) string {
s = strings.ToLower(s)
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, "/", "_")
return s
}
...
Наступний крок – побудувати map, в якій ми будемо мати project_id та project_names:
...
projectNames := make(map[string]string)
// get each 'ProjectsResponse.Data[].ID'
// get each 'ProjectsResponse.Data[].Name'
// populate the projectNames map with:
// 'project_id' = 'project_name'
for _, p := range projectsRes.Data {
projectNames[p.ID] = normalizeLabel(p.Name)
}
fmt.Println("Projects Names: ", projectNames)
...
В результаті маємо:
$ go run main.go
Projects Names: map[proj_1:kraken_production proj_2:assistant_test_eval proj_3:knowledge_base proj_4:default_project]
І тепер оновлюємо наші два цикли – використовуємо в лейблі імена замість ID:
...
// catch each item from the 'Response.Data[]'
for _, dataItem := range costsRes.Data {
// catch each item from the 'Response.Data[].Results[]'
for _, result := range dataItem.Results {
// get ''Response.Data[].Results[].ProjectID'
id := result.ProjectID
// get ''Response.Data[].Results[].Amount.Value'
amount := result.Amount.Value
// use the 'id' to get the project name from the projectNames map
project := projectNames[id]
if project == "" {
project = "unknown"
}
// print in VictoriaMetrics gauge format
fmt.Printf("openai_stats{type=\"costs\", project=\"%s\"} %f\n", project, amount)
}
}
...
І результат:
$ go run main.go
openai_stats{type="costs", project="knowledge_base"} 2.170784
openai_stats{type="costs", project="kraken_production"} 0.241411
openai_stats{type="costs", project="assistant_test_eval"} 1.083077
openai_stats{type="costs", project="default_project"} 0.461237
Зараз весь код у нас такий:
package main
import (
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/go-resty/resty/v2"
)
// set global const as ay be used in other packages
const (
baseURL = "https://api.openai.com/v1"
costsPath = "/organization/costs"
projectsPath = "/organization/projects"
)
// catch data[] and pass to nested struct
// catch results[] and pass to next nested struct
// catch 'project_id' property to the 'ProjectID' field, and pass to next nested struct
// catch 'amount' property to the 'Amount' field, and pass to next nested struct
// finally, catch 'value' property to the 'Value' field
type CostsResponseData struct {
Data []struct {
Results []struct {
ProjectID string `json:"project_id"`
Amount struct {
Value float64
}
}
}
}
type ProjectsResponse struct {
Data []struct {
ID string
Name string
}
}
func getOpenAi(client *resty.Client, path string, out any) error {
_, err := client.R().
SetResult(out).
Get(path)
return err
}
func normalizeLabel(s string) string {
s = strings.ToLower(s)
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, "/", "_")
return s
}
func main() {
//client := resty.New()
apiKey := os.Getenv("OPENAI_ADMIN_KEY")
timeNow := strconv.FormatInt(time.Now().Unix(), 10)
setQueryParams := map[string]string{
"start_time": timeNow,
"group_by": "project_id",
}
client := resty.New().
SetAuthToken(apiKey).
SetQueryParams(setQueryParams)
// use pointer to ResponseData struct
// as 'json.Unmarshal' requires a pointer to write results
costsRes := &CostsResponseData{}
if err := getOpenAi(client, baseURL+costsPath, costsRes); err != nil {
panic(err)
}
projectsRes := &ProjectsResponse{}
if err := getOpenAi(client, baseURL+projectsPath, projectsRes); err != nil {
panic(err)
}
projectNames := make(map[string]string)
// get each 'ProjectsResponse.Data[].ID'
// get each 'ProjectsResponse.Data[].Name'
// populate the projectNames map with:
// 'project_id' = 'project_name'
for _, p := range projectsRes.Data {
projectNames[p.ID] = normalizeLabel(p.Name)
}
// catch each item from the 'Response.Data[]'
for _, dataItem := range costsRes.Data {
// catch each item from the 'Response.Data[].Results[]'
for _, result := range dataItem.Results {
// get ''Response.Data[].Results[].ProjectID'
id := result.ProjectID
// get ''Response.Data[].Results[].Amount.Value'
amount := result.Amount.Value
// use the 'id' to get the project name from the projectNames map
project := projectNames[id]
if project == "" {
project = "unknown"
}
// print in VictoriaMetrics gauge format
fmt.Printf("openai_stats{type=\"costs\", project=\"%s\"} %f\n", project, amount)
}
}
}
Тепер можемо переходити до формування реальних метрик та записати їх до VictoriaMetrics.
Планування метрик для VictoriaMetrics
Отже, метрики у нас будуть у вигляді openai_stats{type="costs", project="prodject_id"} 5.55.
А що сказано в задачі, що треба в результаті?
якщо денний спендінг на опенаі перевищує середній за останні дні (з певним трешолдом) – кричати в слак
Значить, нам потрібна буде сума за добу, і маючи її, ми можемо робити порівняння з попередніми періодами часу.
І потім можемо для алерту створити запит на кшталт такого:
if
avg_over_time(openai_stats{type="costs", project="prodject_id"}[1d)
>
avg_over_time(openai_stats{type="costs", project="prodject_id"}[3d)
then send alert
Але з Counter є нюанс – він обнуляється, якщо експортер перезапуститься – див. counter reset.
Крім того, якщо ми отримуємо дані починаючи з 00:00 – то з наступного дня значення буде починатись з 0,00 USD.
А значить, у нас значення в метриці може і збільшуватись, і зменшуватись, а значить – нам потрібен не Counter, а Gauge.
VictoriaMetrics Go client
Є бібліотека для Prometheus, але так як у нас VictoriaMetrics – то беремо їхній пакет, який до того ж має функцію PushMetrics(), з якою ми можемо відразу пушити метрики до VictoriaMetrics.
Дивимось документацію по type Gauge, там є приклад створення об’єкта метрики.
Функція NewGauge() приймає два аргументи – ім’я метрики з лейблами та функцію, яка виконує оновлення значення для цієї метрики, див. gauge.go:
$ go run main.go
test_openai_stats{type="costs", project="assistant_test_eval"} 4.9838991
test_openai_stats{type="costs", project="default_project"} 0.5281144000000001
test_openai_stats{type="costs", project="knowledge_base"} 2.17244425
test_openai_stats{type="costs", project="kraken_production"} 0.5510669499999999
Супер.
А тепер подумаємо над всією логікою виконання.
Що у нас є зараз:
створення resty.Client
ініціалізація структури costsRes := &CostsResponseData{}
виклик getOpenAi() з аргументами (client, baseURL+costsPath, costsRes), де ми заповнюємо дані в структурі CostsResponseData
ініціалізація projectsRes := &ProjectsResponse{}
виклик getOpenAi() з аргументами (client, baseURL+projectsPath, projectsRes), де ми заповнюємо дані в структурі ProjectsResponse
ініціалізація мапи projectNames
заповнення її з даними "project_id": "project_name"
далі цикли, в яких:
отримуємо project_id
отримуємо amount
по project_id отримуємо ім’я проекту, записуємо в змінну project
генеруємо ім’я метрики і лейблу з project в metricName
з metrics.NewGauge генеруємо нову метрику
з gauge.Set(amount) записуємо в неї значення
з metrics.WritePrometheus() всі згенеровані метрики виводимо на консоль
І все це зараз виконується при виклику main().
Натомість нам при виклику main(), тобто при старті експортера, треба:
створити resty.Client
далі періодично виконувати оновлення даних та записувати дані до VictoriaMetrics:
з getOpenAi() заповнити структуру ProjectsResponse
з getOpenAi() заповнити структуру CostsResponseData
заповнити projectNames
запустити цикли для генерації метрик і виконання Set()
в кінці циклу виконати WritePrometheus()
Правда, при такому підході ми кожну годину будемо перезаписувати поля в ProjectsResponse, CostsResponseData та projectNames, що наче не дуже ОК з точки зору перформансу – але якщо у нас з’явиться новий проект, то ми його відразу “спіймаємо”, і додамо нову метрику для нього.
Отже, що треба зробити – це винести нашу логіку в окрему функцію, раз на годину викликати її, а потім виконувати WritePrometheus().
Пишемо цю функцію, тільки міняємо NewGauge() на GetOrCreateGauge(), бо при наступному виклику нашої функції метрики вже будуть створені:
...
func fetchAndPush(client *resty.Client, costsRes *CostsResponseData, projectsRes *ProjectsResponse, projectNames map[string]string) {
if err := getOpenAi(client, baseURL+costsPath, costsRes); err != nil {
panic(err)
}
if err := getOpenAi(client, baseURL+projectsPath, projectsRes); err != nil {
panic(err)
}
// get each 'ProjectsResponse.Data[].ID'
// get each 'ProjectsResponse.Data[].Name'
// populate the projectNames map with:
// 'project_id' = 'project_name'
for _, p := range projectsRes.Data {
projectNames[p.ID] = normalizeLabel(p.Name)
}
// catch each item from the 'Response.Data[]'
for _, dataItem := range costsRes.Data {
// catch each item from the 'Response.Data[].Results[]'
for _, result := range dataItem.Results {
// get 'Response.Data[].Results[].ProjectID'
// i.e. 'proj_123'
id := result.ProjectID
// get 'Response.Data[].Results[].Amount.Value'
amount := result.Amount.Value
// use the 'id' to get the project name from the projectNames map
project := projectNames[id]
if project == "" {
project = "unknown"
}
// print in VictoriaMetrics gauge format
//fmt.Printf("openai_stats{type=\"costs\", project=\"%s\"} %f\n", project, amount)
metricName := fmt.Sprintf(`test_openai_stats{type="costs", project="%s"}`, project)
gauge := metrics.GetOrCreateGauge(metricName, nil)
gauge.Set(amount)
}
}
metrics.WritePrometheus(os.Stdout, false)
}
...
Тепер в main() у нас залишається:
...
func main() {
//client := resty.New()
apiKey := os.Getenv("OPENAI_ADMIN_KEY")
timeNow := strconv.FormatInt(time.Now().Unix(), 10)
setQueryParams := map[string]string{
"start_time": timeNow,
"group_by": "project_id",
}
client := resty.New().
SetAuthToken(apiKey).
SetQueryParams(setQueryParams)
// use pointer to ResponseData struct
// as 'json.Unmarshal' requires a pointer to write results
costsRes := &CostsResponseData{}
projectsRes := &ProjectsResponse{}
// will be populated with key:value pairs:
// 'proj_123' = 'kraken_production'
projectNames := make(map[string]string)
fetchAndPush(client, costsRes, projectsRes, projectNames)
}
Запускаємо для перевірки:
$ go run main.go
test_openai_stats{type="costs", project="assistant_test_eval"} 6.3417053
test_openai_stats{type="costs", project="default_project"} 0.6592560500000001
test_openai_stats{type="costs", project="knowledge_base"} 2.17244425
test_openai_stats{type="costs", project="kraken_production"} 0.6170747
Тепер нам треба замість простого виводу на консоль записати дані до VictoriaMetrics.
Запис метрик до VictoriaMetrics з InitPush() та PushMetrics()
Для запису метрик до VictoriaMetrics маємо дві основні функції – InitPush() та PushMetrics().
Функція InitPush()
Функція InitPush() дозволяє виконувати періодичні записи із заданим interval, а PushMetrics() – просто разово записати всі метрики, які є в Set struct. Про Set трохи далі.
Тепер просто інтересу заради розберемо, як саме VictoriaMetrics клієнт виконує запис.
ми в нашому коді викликаємо InitPush(), передаємо до цієї функції URL та інтервал
InitPush() створює змінну writeMetrics – анонімну функцію, яка приймає аргумент типу io.Writer, і яка потім буде викликати функцію WritePrometheus(), в яку передається цей io.Writer
далі викликається функція InitPushExt(), якій передається pushURL, interval, та об’єкт writeMetrics
Тут просто додаються параметри зі структури PushOptions, в яку можемо передати параметри типу extraLabels, і потім викликається InitPushExtWithOptions(), в яку передається наш writeMetrics.
Дивимось InitPushExtWithOptions(): тут створюється goroutine, яка із заданим interval викликає pushMetrics(), в яку передається наш об’єкт writeMetrics (тобто та анонімна функція, яка буде викликати WritePrometheus()):
В свою чергу pushMetrics() створює буфер bytes.Buffer, передає його до writeMetrics(), writeMetrics() викликає WritePrometheus(), яка отримує цей буфер:
// NewSet creates new set of metrics.
//
// Pass the set to RegisterSet() function in order to export its metrics via global WritePrometheus() call.
func NewSet() *Set {
return &Set{
m: make(map[string]*namedMetric),
}
}
Тобто, при виклику NetGauge() ми передаємо аргумент з іменем метрики, NetGauge() викликає NewSet(), передає цю метрику, а NewSet() виконує ініціалізацію структури Set, в поле namedMetric задаючи нашу метрику.
Функція PushMetrics()
Ну а з PushMetrics() все майже аналогічно – створюється writeMetrics, викликається PushMetricsExt():
Отже, що нам треба зробити зараз – це замість WritePrometheus() викликати PushMetrics().
Створення context та виклик PushMetrics()
Для PushMetrics() потрібно передати context, який керує goroutines і завершує їх або по таймауту, або якщо сама програма отримала від системи сигнали SIGTERM чи SIGKILL.
Детальніше про context трохи далі, поки просто додаємо import "context", в main() створюємо пустий контекст з Background():
...
import (
"context"
...
func main() {
...
// will be populated with key:value pairs:
// 'proj_123' = 'kraken_production'
projectNames := make(map[string]string)
ctx := context.Background()
...
В нашій функції fetchAndPush() додаємо параметр з типом context.Context:
Потім можна переробити на виклик раз на годину – s.Every(1).Hour().Do( ... ), або на початку кожної години – s.Cron("0 * * * *").Do( ... ).
І в кінці запускаємо крон зі StartBlocking(), який блокує завершення самої функції main().
Відкриваємо доступ до VictoriaMetrics в Kubernetes:
$ kk -n ops-monitoring-ns port-forward svc/vmsingle-vm-k8s-stack 8428
Запускаємо наш експортер:
$ go run main.go
test_openai_stats{type="costs", project="assistant_test_eval"} 6.501765299999999
test_openai_stats{type="costs", project="default_project"} 0.6592560500000001
test_openai_stats{type="costs", project="knowledge_base"} 2.17411225
test_openai_stats{type="costs", project="kraken_production"} 0.6471627999999999
^Csignal: interrupt
І перевіряємо дані вже у VictoriaMetrics:
Правда, тут з’явився якийсь “unknown” проект, треба буде додати логування.
Що ще треба поправити:
зараз ініціалізація структур CostsResponseData та ProjectsResponse виконується в main(), і потім при кожному виклику fetchAndPush() в них записуються дані
якщо проект видалиться з OpenAI – він залишиться в структурах, і ми будемо продовжувати писати метрики для проекту, якого вже нема
треба винести в саму fetchAndPush() і просто кожного разу заповнювати їх з нуля
аналогічно з projectNames – перенести ініціалізацію в саму fetchAndPush()
SetQueryParams – зараз передається однаково для обох викликів getOpenAi(), але в /organization/projects нема параметра group_by
в метриці лейблу type="" краще замінити на category=""
додати external lablels – щось типу “job="openai-exporter"“
замість використання panic(err) – записувати в лог, повертати помилку до викликаючої функції і обробляти там
додати коректну обробку сигналів SIGTERM та SIGINT
resty.client вміє виконувати retry при помилках, треба додати SetRetryCount() і SetRetryWaitTime()
ну і додати логи виконання і помилок
Створення Golang context
Під час роботи в нашому коді запускається кілька одночасних операцій – з gocron.NewScheduler() ми запускаємо виконання нашої функції fetchAndPush(), в ній у нас запускаються HTTP-запити з resty.Client.Get(), у VictoriaMetrics запускаються виклики для запису до VictoriaMetrics endpoint.
Аби все це діло коректно завершити, а не просто “вбити” під час отримання SIGINT або SIGTERM – Go дозволяє нам керувати процесом завершення наших функцій і goroutines через context виконання.
Інший приклад, коли нам треба керувати виконанням операції – це задати ліміт на час виконання, як це, наприклад, зроблено в VictoriaMetrics у функції InitPushExtWithOptions():
...
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
stopCh := ctx.Done()
for {
select {
case <-ticker.C:
ctxLocal, cancel := context.WithTimeout(ctx, interval+time.Second)
err := pc.pushMetrics(ctxLocal, writeMetrics)
...
Тут виконання pc.pushMetrics() обмежено interval, який передається при виклику InitPush().
При цьому context виконання включає в себе не тільки обробку сигналів і керування життєвим циклом функцій і goroutine, но і всю пов’язану з цим виконанням інформацію:
Package context defines the Context type, which carries deadlines, cancellation signals, and other request-scoped values across API boundaries and between processes
Я виніс описання роботи context окремою частиною, бо дуже цікавий механізм, а зараз просто давайте його додамо в наш код.
Отже, що нам треба:
створити context
створити “перехоплювач сигналів” SIGINT (Ctrl+C) та SIGTERM (сигнал від операційної системи, коли виконання програми завершується, наприклад – коли kubelet зупиняє контейнер)
відправити сигнал зупинки всім дочірнім функціям і goroutines
Далі описуємо запуск gocron.NewScheduler(), а в кінці main() запускаємо створення та читання з каналу:
...
// block until Ctrl+C cancels rootCtx
<-rootCtx.Done()
}
Як тільки NotifyContext() отримає SIGTERM – він закриє канал rootCtx.Done(), після чого каскадно закриються канали всіх дочірніх контекстів, потім всі дочірні goroutines, що слухають ці контексти, завершать роботу, і main() зможе коректно завершитись.
resty.client теж вміє працювати з context через SetContext(), йому передаємо наш rootCtx при виклику if err := getOpenAI(ctx, ... ) {...}.
Фінальний результат
Після всіх правок весь код експортеру тепер виглядає так:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/go-co-op/gocron"
"github.com/go-resty/resty/v2"
)
const (
// base URL of the OpenAI Admin API
baseURL = "https://api.openai.com/v1"
// endpoints that we call
costsPath = "/organization/costs"
projectsPath = "/organization/projects"
// VictoriaMetrics push endpoint (Prometheus remote write format)
pushURL = "http://localhost:8428/api/v1/import/prometheus"
)
// structure describing the JSON for costs API
// resty will unmarshal into this struct automatically
type CostsResponseData struct {
Data []struct {
Results []struct {
ProjectID string `json:"project_id"`
Amount struct {
Value float64 `json:"value"`
} `json:"amount"`
} `json:"results"`
} `json:"data"`
}
// structure describing the JSON for projects API
// used to map project_id → readable project name
type ProjectsResponse struct {
Data []struct {
ID string `json:"id"`
Name string `json:"name"`
} `json:"data"`
}
// normalizeLabel converts a project name into a Prometheus-safe label
// - lowercases
// - replaces spaces with underscores
// - replaces slashes to avoid label parser issues
func normalizeLabel(s string) string {
s = strings.ToLower(s)
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, "/", "_")
return s
}
// getOpenAI performs a GET request to the OpenAI Admin API
// and unmarshals the returned JSON into the 'out' structure.
//
// ctx: allows cancellation (we pass rootCtx so Ctrl+C cancels requests)
// client: the resty client with authentication
// path: "/organization/costs" or "/organization/projects"
// params: optional query parameters
func getOpenAI(ctx context.Context, client *resty.Client, path string, params map[string]string, out any) error {
// create HTTP request object
req := client.R().
SetContext(ctx). // attach context so cancellation works
SetResult(out) // register target structure for unmarshalling JSON
// set optional query parameters
if params != nil {
req.SetQueryParams(params)
}
// execute HTTP GET request
if _, err := req.Get(baseURL + path); err != nil {
return fmt.Errorf("request to %s failed: %w", path, err)
}
return nil
}
// fetchAndPush performs one exporter cycle:
//
// 1. fetch costs grouped by project_id
// 2. fetch readable project names
// 3. build project_id → normalized_name map
// 4. create/update Prometheus gauges
// 5. push all metrics to VictoriaMetrics
//
// ctx: the root context (cancelled when Ctrl+C is pressed)
func fetchAndPush(ctx context.Context, client *resty.Client) error {
// create fresh response holders for every iteration
costsRes := &CostsResponseData{}
projectsRes := &ProjectsResponse{}
projectNames := make(map[string]string)
// build query parameters for costs API
// start_time: current timestamp (Unix)
// group_by: instruct API to group costs per project_id
timeNow := strconv.FormatInt(time.Now().Unix(), 10)
costParams := map[string]string{
"start_time": timeNow,
"group_by": "project_id",
}
// fetch costs data
if err := getOpenAI(ctx, client, costsPath, costParams, costsRes); err != nil {
return fmt.Errorf("fetch costs: %w", err)
}
// fetch project definitions
if err := getOpenAI(ctx, client, projectsPath, nil, projectsRes); err != nil {
return fmt.Errorf("fetch projects: %w", err)
}
// fill map: project_id → normalized_label
for _, p := range projectsRes.Data {
projectNames[p.ID] = normalizeLabel(p.Name)
}
// process returned costs
for _, dataItem := range costsRes.Data {
for _, result := range dataItem.Results {
id := result.ProjectID
amount := result.Amount.Value
// resolve project readable name
project := projectNames[id]
if project == "" {
project = "unknown"
}
metricName := fmt.Sprintf(
`openai_stats{project="%s",category="costs"}`,
project,
)
// get or create gauge
gauge := metrics.GetOrCreateGauge(metricName, nil)
// update gauge value
gauge.Set(amount)
// log written metric
log.Printf("metric updated: name=%s value=%f", metricName, amount)
}
}
// push metrics with job="openai_exporter"
pushOpts := &metrics.PushOptions{
ExtraLabels: `job="openai_exporter"`,
}
// push all collected metrics
if err := metrics.PushMetrics(ctx, pushURL, false, pushOpts); err != nil {
return fmt.Errorf("push metrics: %w", err)
}
return nil
}
func main() {
// create a context that automatically cancels on OS signals (Ctrl+C, kill, SIGTERM)
//
// how it works:
// - signal.NotifyContext wraps the parent context and subscribes it to OS signals
// - when the program receives Ctrl+C (SIGINT) or SIGTERM:
// Go internally calls rootCancel()
// the context's Done() channel is closed
// - all goroutines waiting on <-rootCtx.Done() are instantly unblocked
// - any operation bound to this context (HTTP requests, timeouts, jobs)
// receives ctx.Err()==context.Canceled and stops gracefully
//
// practically:
// - main goroutine waits for <-rootCtx.Done()
// - when Ctrl+C arrives => rootCtx.Done() closes => program starts graceful shutdown
//
// 'defer rootCancel()' is used to clean up internal signal resources when main() exits normally
rootCtx, rootCancel := signal.NotifyContext(
context.Background(),
os.Interrupt,
syscall.SIGTERM,
)
defer rootCancel()
// load OpenAI admin API key
apiKey := os.Getenv("OPENAI_ADMIN_KEY")
if apiKey == "" {
log.Fatal("OPENAI_ADMIN_KEY is not set")
}
// create resty client with:
// - bearer token
// - automatic retries (3 attempts)
client := resty.New().
SetAuthToken(apiKey).
SetRetryCount(3).
SetRetryWaitTime(2 * time.Second)
// create scheduler using local timezone
s := gocron.NewScheduler(time.Local)
// register a job that runs every 1 minute
s.Every(1).Minute().Do(func() {
start := time.Now()
log.Println("starting fetch-and-push cycle")
// run our exporter cycle
if err := fetchAndPush(rootCtx, client); err != nil {
log.Println("ERROR during fetchAndPush:", err)
return
}
log.Println("fetch-and-push completed in", time.Since(start))
})
log.Println("starting scheduler...")
// run scheduler in background goroutine
s.StartAsync()
// block until Ctrl+C cancels rootCtx
<-rootCtx.Done()
log.Println("received Ctrl+C, stopping scheduler...")
// shutdown scheduler gracefully
s.Stop()
log.Println("scheduler stopped, exiting")
}
Ті самі 6.95 долари, що ми бачимо у VictoriaMetrics від нашого експортеру.
Можна б ще покращити код, наприклад розбити велику функцію fetchAndPush(), і треба додати передачу URL до VictoriaMetrics зі змінних оточення, але поки поживемо з таким варіантом.
Bonus: як працює контроль виконання через Golang context
Ми в нашій функції fetchAndPush() використовуємо metrics.PushMetrics(), передавши йому контекст.
Але для кращої картини – давайте знову повернемося до InitPush(), бо там використання context більш явне.
Отже, InitPush() викликає InitPushExt(), а InitPushExt() викликає InitPushExtWithOptions(), якому передає пустий context.Background() – return InitPushExtWithOptions(context.Background() ...).
В InitPushExtWithOptions() запускається goroutine, go func() {}, в якій створюється локальний context :
таким чином timerCtx тепер має доступ до всіх методів структури cancelCtx
далі WithDeadlineCause() перевіряє умову if dur <= 0 і, і якщо час виконання завершився, то:
викликає c.cancel(true, DeadlineExceeded, cause)
повертає “return c, func() { c.cancel(false, Canceled, nil) }“, яка повертається до InitPushExtWithOptions() в частині ctxLocal, cancel := context.WithTimeout() і "func() { c.cancel() }" і стає cancel()
c.cancel() – це метод структури timerCtx – func (c *timerCtx) cancel(), який викликає c.cancelCtx.cancel()
а c.cancelCtx.cancel() – це метод структури cancelCtx – func (c *cancelCtx) cancel(), який викликає d, _ := c.done.Load().(chan struct{})
і викликає close(d)
Ось тут:
func (c *cancelCtx) cancel(removeFromParent bool, err, cause error) {
...
d, _ := c.done.Load().(chan struct{})
if d == nil {
...
} else {
close(d)
}
...
c.done.Load() – викликається з поля done структури cancelCtx:
type cancelCtx struct {
...
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
...
}
Тобто, в d, _ := c.done.Load().(chan struct{}) викликається Load(), в (chan struct{}) виконується type assertion, тобто перевіряється, що це тип chan struct{}, після чого d стає chan struct{}, після чого виконується close(channel).
А close() – це вбудована функція Go, яка закриває отриманий аргументом канал.
Як тільки канал Done() закривається – всі goroutines, які виконують <-ctx.Done(), миттєво пробуджуються і можуть коректно завершити свою роботу.
В InitPushExtWithOptions() це виконується тут:
go func() {
...
stopCh := ctx.Done()
...
case <-stopCh:
...
return
}
}
}()
Закриття каналу – це читання нульового значення, що призводить до спрацювання умови case => що призводить до завершення циклу через виклик return => що призводить до завершення всієї go func() {}.
Окей.
А звідки канал взявся?
Відкриття каналу
Для того, аби функція чи рутина постійно “слухали” цей канал в очікуванні його закриття – ми викликаємо Done():
...
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
stopCh := ctx.Done()
...
case <-stopCh:
if wg != nil {
wg.Done()
}
return
}
...
Інтерфейси в Go дозволяють описати доступ до даних або методів без створення самих реалізацій в цих інтерфейсах.
Таким чином ми створюємо “загальну шину”, яку далі можемо використовувати для “підключення” зовнішніх “систем”.
Тобто інтерфейс – це абстракція, яка описує доступ до іншого типу, але конкретна реалізація цієї поведінки вже буде залежати від того, що саме ми підключимо до інтерфейсу.
Взагалі хотів просто написати пост при використання інтерфейсів, але натомість вийшов пост про те, як інтерфейси реалізовані взагалі, і про “магію” того, як через них відбувається виклик даних.
Насправді спочатку було б добре написати про pointers та методи в Go, бо в цьому матеріалі саме на них побудоване все пояснення роботи інтерфейсів, але це вже іншим разом.
Найпростіший приклад інтерфейсу – коли в ньому не задається ані метод, ані тип даних, який цей метод повертає.
Використовуючи такий інтерфейс ми можемо створити функцію, яка буде приймати будь-який тип даних – бо інакше при оголошенні функції нам треба вказати тип даних, який вона приймає в параметрі:
package main
import "fmt"
// define an empty interface
// it can hold a value of any type
type Any interface{}
func printValue(v Any) {
// print the value
fmt.Println("Value:", v)
}
func main() {
// pass int
printValue(42)
// pass string
printValue("hello")
// pass float
printValue(3.14)
// pass slice
printValue([]int{1, 2, 3})
}
Без використання інтерфейсу – нам довелось би створювати окремі функції для кожного типу, який ми хочемо передати, або, як альтернатива – використовувати generics.
Інший варіант створення порожнього інтерфейсу – це використання типу any, який по факту є аліасом на interface{}:
...
type MyAny any
func printValue(v Any) {
// print the value
fmt.Println("Value:", v)
}
...
Interfaces та Methods
Якщо “порожній” інтерфейс any каже “я приймаю будь-яке значення”, то “класичний” інтерфейс каже: “мене цікавить лише певна поведінка”.
Ця поведінка описується через набір сигнатур методів (method signatures), які тип має реалізувати, щоб відповідати інтерфейсу
Коли ми створюємо змінну інтерфейсного типу або передаємо значення у функцію, що приймає інтерфейс, Go перевіряє, чи тип реалізує всі методи цього інтерфейсу, і створює зв’язок між ними.
І потім ми, використовуючи цей інтерфейс, можемо викликати пов’язані з ним методи.
Тобто інтерфейс – це посередник, який дозволяє викликати метод незалежно від конкретного типу.
Наприклад:
package main
import "fmt"
// define an interface 'MyInterface' with a single method 'MyMethod' returning a string
type MyInterface interface {
MyMethod() string
}
// define 'MyStruct' struct with a 'MyField' field
type MyStruct struct {
MyField string
}
// define 'MyMethod' method for the 'MyStruct' struct
// this makes 'MyStruct' implicitly implement 'MyInterface'
// 'MyMethod' method uses 'MyStruct' as the receiver, so this method is tied to the 'MyStruct' type
func (receiver MyStruct) MyMethod() string {
return "Executing " + receiver.MyField
}
// define a function 'sayHello()' which accepts any type that implements 'MyInterface'
// and prints the value returned by its 'MyMethod'
func sayHello(g MyInterface) {
fmt.Println(g.MyMethod())
}
func main() {
// create an instance of MyStruct
myObj := MyStruct{MyField: "Hello, Interface!"}
// pass the MyStruct instance to the function.
// this works because MyStruct implements MyInterface.
sayHello(myObj)
}
Тут ми:
оголошуємо власний інтерфейсний тип з іменем MyInterface
цей інтерфейс описує одну сигнатуру методу – MyMethod(), і цей метод має повертати дані з типом string
створюємо власний тип даних MyStruct з типом struct, в якому є одне поле MyField з типом string
до цієї структури “прив’язуємо” функцію MyMethod() – через вказання ресивера (receiver MyStruct), завдяки чому MyStructреалізуєінтерфейсMyInterface
описуємо нашу “основному робочу” функцію sayHello(), яка аргументом приймає інтерфейс і викликає метод MyMethod(), який є в цьому інтерфейсі
створюємо інстанс нашого типу даних MyStruct, якому в поле MyField записуємо значення “Hello, Interface!“
і викликаємо нашу робочу функцію, передаючи аргументом цю структуру
Постарався відобразити зв’язки між всіма об’єктами, бо вони дуже не явні, вийшло щось таке:
створюємо об’єкт myObj з типом MyStruct
викликаємо sayHello(), передаючи аргументом myObj, який всередині функції sayHello() стає змінною g, яка пов’язується з нашим інтерфейсом MyInterface, який надає доступ до методу MyMethod()
в функції sayHello() через виклик g.MyMethod() ми звертаємось до інтерфейсу MyInterface, кажучи “мені потрібен твій метод MyMethod()“
інтерфейс MyInterface “бачить”, що всередині нього зараз схований об’єкт myObj (типу MyStruct), тому він перенаправляє цей виклик саме до методу цієї конкретної структури
Окей – тепер картина стає більш зрозумілою.
Окрім одного моменту – як саме інтерфейс “бачить”, що “в ньому” є об’єкт myObj з методом MyMethod()?
The interface’s “magic”: type iface struct
Для того, аби розібратись з цим – трохи зануримось в магію вказівників (pointers), а саме – створимо власну структуру, яка буде копіювати те, як в type MyInterface interface структуровані дані.
А потім через вказівники – подивимось на адреси і зміст даних.
“Трохи” перепишемо наш код:
package main
import (
"fmt"
"unsafe"
)
// define MyInterface interface
// (same as before)
type MyInterface interface {
MyMethod() string
}
// define MyStruct struct
// (same as before)
type MyStruct struct {
MyField string
}
// define MyMethod method with a POINTER receiver
// - before was func (p MyStruct) MyMethod() ... - by value
// - now is func (p *MyStruct) MyMethod() ... - by pointer
// This means the method operates on the original data.
func (p *MyStruct) MyMethod() string {
return "Executing " + p.MyField
}
// This is the helper struct to inspect an interface
// It represents the internal memory layout of an interface variable
// the 'tab' has a table with information about the interface's type and methods
//
// type iface struct {
// // pointer to the 'itab' struct, see below
// tab *itab
// // here will be a pointer to the 'myObj' struct
// data unsafe.Pointer
// }
//
// type itab struct {
// // pointer to the 'type MyInterface interface'
// inter *interfacetype
// // pointer to the 'type MyStruct struct'
// typ *rtype
// // in our case we have 1 method, thus '[N]uintptr' == [1]uintptr
// // and in the 'fun[0]' will be the address of the method 'MyMethod'
// fun [N]uintptr // will have '[1]uintptr', and
// }
type ifaceStruct struct {
// Pointer to type/method info table
tab unsafe.Pointer
// Pointer to the actual data
// in our case, here will be a pointer to the 'myObj' struct
data unsafe.Pointer
}
// HERE IS THE "MAGIC"
// We modify sayHello to inspect the `g` it receives.
//
// 'g' is a new, local variable of the 'MyInterface' type.
// When the function is called, `myObj` is assigned to `g`.
//
// Because 'g' is an interface, it internally consists of two pointers:
// 1. tab: A pointer to the "interface table" (itab) that links
// the interface type (MyInterface) to the concrete type (*MyStruct)
// and stores pointers to the methods that satisfy the interface
// 2. data: A pointer to the actual data. In our case, this will be
// the pointer we passed in (`myObj`).
func sayHello(g MyInterface) {
fmt.Println("Inside sayHello()")
// Get the address of `g` and cast it to our helper struct 'ifaceStruct'
// This line does three things in one go:
// 1. &g - takes the memory address of our interface variable `g`
// 2. unsafe.Pointer(&g) - casts that address to a raw, untyped pointer
// 3. (*ifaceStruct)(...) - re-interprets that raw pointer as a pointer to our helper struct
// As a result, `g_internal` is now a `*ifaceStruct` that points to
// the exact same memory location as `g`, letting us access its .tab and .data fields.
g_internal := (*ifaceStruct)(unsafe.Pointer(&g))
fmt.Printf("Internal 'Type' pointer (tab): %p\n", g_internal.tab)
fmt.Printf("Internal 'Data' pointer (data): %p\n", g_internal.data)
fmt.Println("Result:", g.MyMethod())
}
func main() {
// Create the object and get a pointer to it
// 'myObj' now holds a pointer to a MyStruct instance in memory
myObj := &MyStruct{MyField: "Hello, Interface!"}
// Print location of the 'myObj' struct
fmt.Println("Inside main()")
fmt.Printf("Address of the original 'myObj' in main(): %p\n", myObj)
// Pass the pointer to the function
// i.e. we pass an address of the 'myObj' struct location
sayHello(myObj)
}
Запускаємо:
$ go run interface-details.go
Inside main()
Address of the original 'myObj' in main(): 0xc000014070
Inside sayHello()
Internal 'Type' pointer (tab): 0x4e5a28
Internal 'Data' pointer (data): 0xc000014070
Result: Executing Hello, Interface!
В коментах розписав все детально, але по суті коротко ми:
у виклику sayHello(myObj) до функції sayHello() передаємо адресу “0xc000014070” – посилання на структуру MyStruct з полем MyField, в яке записане значення “Hello, Interface!“
функція sayHello() приймає аргумент типу інтерфейс, і змінна g містить два вказівники – tab (на структуру itab, яка зберігає інформацію про тип і методи) та data (на значення типу MyStruct)
А сама цікава магія відбувається під час компіляції програми і створення структури itab:
Go перевіряє методи в коді, знаходить структуру MyStruct з методом MyMethod()
перевіряє інтерфейси, і знаходить MyInterface, який вимагає метод MyMethod() string
перевіряє, що MyStruct.MyMethod() та MyInterface.MyMethod() збігаються
створює таблицю інтерфейсу (itab – interface table), яка пов’язує MyStruct з MyInterface і зберігає адреси методів, що реалізують інтерфейс
І далі під час виконання програми під час виклику sayHello(myObj) Go створює нову змінну g типу iface, у якій ці два вказівники (tab та data) поєднуються:
вказівник на itab (яку компілятор створив для пари MyStruct + MyInterface) буде поміщено в g.tab
вказівник на myObj (тобто адреса типу “0xc000014070“) буде поміщено в g.data
В результаті в g.tab у нас буде структура itab – в полі fun[0] якої буде адреса функції MyMethod(), а в g.data – буде вказівник на екземпляр MyStruct з полем MyField.
І тоді при виклику:
...
fmt.Println("\nResult:", g.MyMethod())
...
Ми запускаємо:
...
return "Executing " + *MyStruct.MyField
...
Наостанок – можна ще вивести і саму itab, аналогічно тому, як зробили для самого інтерфейсу, через створення власної структури type itabStruct struct:
package main
import (
"fmt"
"unsafe"
)
// define MyInterface interface
type MyInterface interface {
MyMethod() string
}
// define MyStruct struct
type MyStruct struct {
MyField string
}
// define MyMethod method with a POINTER receiver
func (p *MyStruct) MyMethod() string {
return "Executing " + p.MyField
}
// This helper represents the interface value itself (the 2-word struct)
type ifaceStruct struct {
// Pointer to the 'itab' (interface table)
tab unsafe.Pointer
// Pointer to the actual data (our *MyStruct)
data unsafe.Pointer
}
// NEW
// This helper represents the internal 'runtime.itab' struct
type itabStruct struct {
// inter: Pointer to the interface type's definition (MyInterface)
inter unsafe.Pointer
// typ: Pointer to the concrete type's definition (*MyStruct)
typ unsafe.Pointer
// hash: Hash of the concrete type, used for lookups
hash uint32
// _ [4]byte: Padding (on 64-bit systems)
_ [4]byte
// fun: The method dispatch table - an array of function pointers
// Each entry corresponds to a method defined in the interface
// Here we have one entry: the address of MyStruct.MyMethod()
fun [1]uintptr
}
// HERE IS THE "MAGIC"
func sayHello(g MyInterface) {
fmt.Println("--- Inside sayHello() ---")
// 1. Get the address of `g` and cast it to our helper struct
g_internal := (*ifaceStruct)(unsafe.Pointer(&g))
// Print the two main pointers
fmt.Printf("g.tab (pointer to itab): %p\n", g_internal.tab)
fmt.Printf("g.data (pointer to myObj): %p\n", g_internal.data)
// NEW - 2. DE-REFERENCE THE 'tab' POINTER
// Cast the 'tab' pointer to our itabStruct pointer
itab_ptr := (*itabStruct)(g_internal.tab)
// NEW - 3. PRINT THE CONTENTS OF THE 'itab'
fmt.Println("\n--- Inspecting the 'itab' (at address g.tab) ---")
fmt.Printf("itab.inter (ptr to MyInterface info): %p\n", itab_ptr.inter)
fmt.Printf("itab.typ (ptr to *MyStruct info): %p\n", itab_ptr.typ)
fmt.Printf("itab.hash (hash of *MyStruct type): %x\n", itab_ptr.hash)
// This is the final link!
// This is the actual memory address of the function to be called, i.e. the 'g.MyMethod()' in this case
fmt.Printf("itab.fun[0] (ADDRESS OF THE METHOD): 0x%x\n", itab_ptr.fun[0])
// 4. Call the method as usual
fmt.Println("\nResult:", g.MyMethod())
}
func main() {
// Create the object and get a pointer to it
myObj := &MyStruct{MyField: "Hello, Interface!"}
fmt.Println("--- Inside main() ---")
fmt.Printf("Address of original 'myObj' in main(): %p\n", myObj)
// Pass the pointer to the function - Go will create an 'iface' value
// linking the interface 'MyInterface' with the concrete type *MyStruct.
sayHello(myObj)
}
Результат:
$ go run interface-details-3.go
--- Inside main() ---
Address of original 'myObj' in main(): 0xc00019a020
--- Inside sayHello() ---
g.tab (pointer to itab): 0x4e6c08
g.data (pointer to myObj): 0xc00019a020
--- Inspecting the 'itab' (at address g.tab) ---
itab.inter (ptr to MyInterface info): 0x4a9d80
itab.typ (ptr to *MyStruct info): 0x4a86e0
itab.hash (hash of *MyStruct type): 1ac3179f
itab.fun[0] (ADDRESS OF THE METHOD): 0x499c40
Result: Executing Hello, Interface!
Тобто, коли ми викликаємо g.MyMethod(), Go бере адресу функції з itab.fun[0] і викликає її, передаючи їй як аргумент вказівник з g.data – от і вся “магія” динамічного виклику методів через інтерфейс.
Ну і тепер можна використовувати інтерфейси, вже маючи уявлення про те, як саме вони працюють.
Прилетів мені один з дефолтних алертів VictoriaMetrics, які створюються під час деплою Helm-чарту victoria-metrics-k8s-stack:
Думав написати коротенький пост типу “що таке Churn Rate і як його пофіксати”, але в результаті вийшло доволі глибоко зануритись в те, як взагалі VictoriaMetrics працює з даними – і це виявилось дуже цікавою темою.
Давайте спочатку коротко розберемо що таке “метрика” і тайм-серія взагалі, і потім подивимось як вони впливають на ресурси системи – CPU, пам’ять та диск.
Metric vs Time Series vs Sample
Всі ми маємо справу з метриками в моніторингу – будь то Prometheus, чи VictoriaMetrics, чи InfluxDB, і ці метрики ми потім використовуємо в наших дашбордах Grafana або в алерт-рулах VMAlert.
Але що таке власне “метрика”? А що таке тайм-серія, sample чи data point? І як кількість різних значень однієї label для метрики впливає на використання диску та пам’яті?
Бо, наприклад, я в постах зазвичай просто використовую слово “метрика”, бо в 99% цього достатньо, аби описати об’єкт, про який йде мова.
Але для повноцінної роботи з системами моніторингу треба добре уявляти різницю між цими поняттями.
Що таке Metric?
Метрика (Metric): що вимірюється
Наприклад – cpu_usage, memory_free, http_requests_total, database_connections.
В документації VictoriaMetrics є дуже точний вираз – це як імена змінних, через які ми передаємо дані, див. Structure of a metric.
Метрика має власне ім’я, та опціонально набір labels (лейбл або тегів), які дозволяють додати більше контексту для конкретного вимірювання – але без значень цих лейбл.
Крім того, лейбли впливають на те, як дані по ції метриці будуть зберігатись і шукатись.
Тобто метрика – це “схема”, яка описує що ми вимірюємо, та за якими ознаками (лейблами) можемо групувати дані.
Приклад:
Metric: "cpu_usage{server, core}"
Тут:
ім’я метрики: cpu_usage
ім’я label: server
ім’я label: core
Що таке Time Series?
Таймсерія (Time Series): послідовність даних
Це повна послідовність записів, які згруповані для конкретної метрики та її labels зі значеннями – тобто набору metric_name{label_name="label_value"}, і які впорядковані за часом.
для таймсерії cpu_usage{server="web01", core="0"} маємо чотири семпла:
1753857852, 75.5
1753857912, 76.2
1753857972, 74.8
1753858032, 73.1
І дані за весь період спостережень по кожній унікальній комбінації cpu_usage{server="some_server", core="some_core"} будуть формувати одну і ту ж таймсерію, навіть якщо ці дані збираються роками – допоки не зміниться значення або в server, або в core.
High Cardinality vs High Churn rate
Обидві проблеми мають однакове “походження”, але трохи відрізняються по суті.
High cardinality – це “persistent проблема”, яка впливає на зберігання, індексацію та пошук даних.
Вона виникає, коли у нас є багато унікальних комбінацій лейблів, навіть якщо значення самих метрик надходять рідко або перестають надходити.
Це призводить до великої кількості живих та неактивних серій, що збільшує розмір IndexDB, використання памʼяті та час пошуку. Про IndexDB детальніше будемо говорити далі.
High churn rate – це “online проблема”, коли у нас постійно створюються нові тайм-серії через зміну значень лейблів, особливо короткоживучих або динамічних (як у Kubernetes – pod_name, container_id, job_id, або щось типу client_ip).
Це створює великий потік нових записів у IndexDB, завантажуючи CPU, пам’ять, та диск.
“Життя метрики”
Є дуже класне відео, яке побачив багато років тому – The Inner Life of the Cell, чомусь воно тут згадалось.
Аби зрозуміти як кількість лейбл (точніше – значення в них) впливають на розмір даних в системі і на використання CPU та пам’яті – давайте подивимось як у VictoriaMetrics взагалі відбувається весь процес “під капотом”.
Там 7 частин, і для дійсно “глибокого занурення” у внутрішню архітектуру VictoriaMetrics дуже рекомендую їх прочитати.
Але зараз ми відносно швидко пройдемося по процесу додавання нових даних і їхньому пошуку, і більше сконцентруємось саме на питанні Churn Rate.
“Write-path”: vminsert та vmstorage
Отже – почнемо з початку: vmagent збирає метрики з експортерів, і далі ці дані через vminsert треба записати до vmstorage.
У випадку vmsingle у на всі компоненти працюють в одному процесі, але для кращої картини – давайте їх розділяти.
vminsert збирає дані до себе в пам’ять, після чого відправляє до vmstorage блоками до 100 мегабайт.
На початку кожного блоку від vminsert задається загальний розмір блоку, після чого vmstorage починає зчитувати дані в ньому блоками по 24+n байт, строкам (row):
в перших 8 байтах вказується розмір n – розмір наступного сектору, який містить в собі ім’я метрики та її лейбли
другий сектор – ці n байт з іменем метрики і лейблами
третій сектор розміром 8 байт містить в собі значення семпла (“75.5” з прикладів вище)
четвертий містить Timestamp, ще 8 байт
В результаті формується row із 8*3 байт (24) + n байт, де n – це довжина імені метрики і її лейбл.
vmstorage формує власні блоки, в кожному максимум 10,000 строк:
vmstorage, IndexDB та TSID
Після чого починає сама цікава магія – це Time Series ID, або TSID.
Для кожної унікальної комбінації метрика+лейбли+значення лейбл VictoriaMetrics має власний унікальний ідентифікатор, який використовується для збереження даних та при подальшому пошуку даних.
Сам TSID це ідентифікатор (див type TSID struct), суто внутрішній механізм самої VictoriaMetrisc, який ми, нажаль, ніде побачити не можемо:
// TSID is unique id for a time series.
//
// Time series blocks are sorted by TSID.
type TSID struct {
MetricGroupID uint64
JobID uint32
InstanceID uint32
// MetricID is the unique id of the metric (time series).
//
// All the other TSID fields may be obtained by MetricID.
MetricID uint64
}
Маючи набір з імені метрики та її тегів (лейбл), vmstorage спершу перевіряє свій TSID Cache. Якщо для ції комбінації ми вже маємо згенерований TSID – використовуємо його.
Якщо в кеші даних нема (значення vm_slow_row_inserts_total росте) – vmstorage звертається до IndexDB, і починає пошук TSID там.
Якщо в IndexDB знайдений TSID – він додається в кеш vmstorage, і процес йде далі:
Якщо ж це абсолютно нові імена метрики і лейбл з їхніми значеннями – генерується новий TSID, який реєструється в кеші vmstorage.
IndexDB зберігає два індекси, в кожному кілька мапінгів між полями та ID, описано в частині How IndexDB is Structured:
1 – Tag to metric IDs (Global index): кожен тег (лейбла) мапиться на ім’я метрики (її ID)
2 – Metric ID to TSID (Global index): ID кожної метрики мапиться на TSID
3 – Metric ID to metric name (Global index): мапінг власне імені метрики на її ID
5 – Date to metric ID (Per-day index): мапінг дат на metric ID для швидкого пошуку по датам (“чи є за цей день дані по цій метриці”)
6 – Date with tag to metric IDs (Per-day index): аналогічний до першого Tag to metric IDs мапінгу, але по датам
7 – Date with metric name to TSID (Per-day index): схожий на другого Metric ID to TSID мапінгу, але по іменам метрик і датам
Ці індекси тримаються як в пам’яті, і періодично записуються на диск (flush) в persistant storage IndexDB в каталог indexdb/, де – як і в каталозі data/, в якому зберігають самі тайм-серії – виконується merge даних для оптимізації зберігання та пошуку.
І повертаючись до питання Churn Rate та High cardinality – кожна окрема метрика+лейбли створюють окремі TSID, для кожної лейбли створюються мапінги в індексах, при великій кількості нових даних, які постійно записуються з пам’яті в диск – частіше викликаються дискові операції – маємо навантаження на CPU, пам’ять, I/O операції диска.
vmstorage та збереження даних на диску
В принципі, саме цікаве ми вже побачили – ролі IndexDB та TSID, але давайте пройдемось по решті процесу.
З отриманих від vminsert даних прочитали дані, сформували власні block з rows.
В кожній row vmstorage зберігає вже не ім’я метрики – а її TSID, а для кожного TSID містить записи з values та часом (власне, тайм-серії):
Тут в small “скидаються” дані з in-memory parts, і small потім merge в big parts.
Кожен part містить в собі власний індекс, який відповідає за мапінг даних на timestamps та values:
“Read-path”: пошук даних з vmselect та vmstorage
Коли ж ми робимо пошук по даним – то vmselect передає до vmstorage запит з метрикою, лейблами (тегами) та датою, за яку треба виконати пошук.
vmstorage в IndexDB по tag to metric IDs знаходить відповідні MetricIDs – для всіх метрик, які має цей тег.
Далі по Metric ID IndexDB в записах metric ID to TSID знаходить відповідні TSID, які повертає до vmstorage.
Маючи TSID – vmtorage вже перевіряє in-memory, small та big parts, шукаючи потрібний TSID в файлах metaindex.bin.
А знайшовши потрібний metadata.bin – він читає відповідний index.bin, який вже каже в яких строках timestamp.bin та values.bin знайти потрібні дані, які потім повертаються до vmselect.
Практичний приклад: запис 10,000 метрик і 10,000 labels
Це все цікаво почитати в теорії – але давайте трохи практики, бо завжди ж цікаво подивитись як воно виглядає в реальності.
Що будемо робити:
запустимо два контейнери з VictoriaMetrics
в кожен через API запишемо 10,000 метрик, але:
в один інстанс для всіх метрик лейбла буде мати однакове значення
в другий інстанс значення label буде постійно змінюватись
А потім глянемо як це вплинуло на розмір даних.
Створюємо директорії:
$ mkdir vm-data-light
$ mkdir vm-data-heavy
Запускаємо два контейнери – vm-light та vm-heavy, кожному підключаємо відповідний каталог – ./vm-data-light та ./vm-data-heavy, кожен слухає власний TCP-порт:
$ du -sh vm-data-light/
76K vm-data-light/
$ du -sh vm-data-heavy/
76K vm-data-heavy/
І кількість файлів в них:
$ find vm-data-light/ -type f | wc -l
5
$ find vm-data-heavy/ -type f | wc -l
5
Всюди все однаково.
Тепер пишемо два скрипти – теж “light” та “heavy”.
Спочатку “light” версія:
#!/usr/bin/env bash
for i in $(seq 1 10000); do
echo "my_metric{label=\"value-1\"} $i" | curl -s \
--data-binary @- \
http://localhost:8428/api/v1/import/prometheus
done
echo "DONE: stable series sent"
Тут в циклі від 1 до 10000 виконуємо запис метрики my_metric{label="value-1"}, але з кожним разом просто збільшуємо значення, яке зберігаємо.
Другий скрипт – “heavy” версія:
#!/usr/bin/env bash
for i in $(seq 1 10000); do
echo "my_metric{label=\"value-$i\"} $i" | curl -s \
--data-binary @- \
http://localhost:8429/api/v1/import/prometheus
done
echo "DONE: high churn series sent"
Він аналогічний, але тут значення змінної $i використовуємо ще і для зміни значення в label – my_metric{label="value-$i"} $i.
Запускаємо тести:
$ bash light.sh
$ bash heavy.sh
І порівнюємо дані.
Розмір даних в data/:
$ du -sh vm-data-light/data/
152K vm-data-light/data/
$ du -sh vm-data-heavy/data/
372K vm-data-heavy/data/
Розмір даних в indexdb/:
$ du -sh vm-data-light/indexdb/
56K vm-data-light/indexdb/
$ du -sh vm-data-heavy/indexdb/
764K vm-data-heavy/indexdb/
Кількість файлів в data/:
$ find vm-data-light/data/ -type f | wc -l
26
$ find vm-data-heavy/data/ -type f | wc -l
26
Кількість файлів в indexdb/:
$ find vm-data-light/indexdb/ -type f | wc -l
8
$ find vm-data-heavy/indexdb/ -type f | wc -l
53
8 vs 53!
Дерево каталогів і файлів в vm-data-light/data/ і vm-data-heavy/data/ буде однаковим, але давайте глянемо на IndexDB.
Там познайомились з InfluxDB в цілому, тепер час будувати з ним реальні рішення.
Що будемо робити – запустимо InfluxDB на Debian, налаштуємо NGINX, імпортуємо дані з Google Sheets в .csv, а потім мігруємо їх до InfluxDB та підключимо Grafana. І додатково трохи пограємось з Python Falsk для створення веб-форми.
Мій “self-monitoring” проект
Власне, для чого я все це роблю: я веду такий собі “self human monitoring” – кожного дня записую в Google Sheets різні показники – як добре спав, який був настрій, наскільки добре голова працювала і багато іншого, загалом там 23 метрики.
Далі це все прямо в Google Sheets виводиться в графіки, де я в будь-який момент можу глянути в який період яке в мене було самопочуття.
Система дуже класна, веду її вже два з половиною роки і активно користуюсь, але є проблема – це візуалізація даних, бо дефолтні графіки в сами гуглотаблицях дуже обмежені.
Минулого року для візуалізації підключав Google Looker Studio, який нативно вміє інтеграцію з Google Sheets – але з ним постійно виникали якісь проблеми, особливо якщо змінювався формат в таблиці типу перейменування колонок, тому згодом я Looker Studio закинув.
І врешті-решт прийшла ідея того, що, камон! Девопс я, ілі тварь дрожащая?
Чому б не використати мої знання в моніторингу інфраструктури в цій справі теж?
Тому вирішив побудувати власний стек моніторингу, де дані будуть зберігатись в InfluxDB.
Взагалі, InfluxDB вибрав, бо трохи погрався і сподобалось як там все з коробки є, але коли почав вже робити дашборди – то поняв, що вона все ж доволі обмежена, і мені не вистачає Grafana.
Тому поки що InfluxDB залишиться як база, а до неї додамо Grafana.
А вже пізніше, мабуть, все ж мігрую дані до VictoriaMetrics.
Втім, цей пост, звісно, не про цей селф-мониторинг, а просто непоганий приклад того, як запустити Influx з NGINX і Grafana, як імпортувати дані, і як створити веб-сторінку з Flask для додавання нових метрик в InfluxDB.
Поточні дані в Google Sheets
На прикладі таблиці Sleep:
Тут Sleep_rate – суб’єктивна оцінка якості сну, Sleepy_day – наскільки сильна була сонливість цього дня, Wake_ups – скільки раз за ніч прокидався, і Mults – наскільки яскраві і насичені були сни, бо іноді вони бувають дійсно “мультфільмами” – наче всю ніч в кінотеатрі просидів 🙂
План дій
Робитись все буде на тому самому сервері з Debian, де зараз хоститься сам блог RTFM.
Що будемо робити:
запустимо InfluxDB в Docker
налаштуємо vitrtualhost в NGINX
імпортуємо існуючі дані з Google Sheets в InfluxDB
подивимось, які дашборди можемо зробити в InfluxDB
додамо форму для введення нових даних
додамо Grafana для повноцінної візуалізації
Окремо треба буде зробити бекап і підтюнити InfluxDB та Grafana, бо сервер маленький, лише 2 гігабайти пам’яті, але це вже іншим разом.
Поїхали.
Запуск InfluxDB з Docker Compose
Простіше всього зробити з docker-compose, аби потім легше було переносити на інший сервер.
Встановлення Docker та Docker Compose на Debian
Встановлюємо Docker та Docker Compose, документація тут>>>:
INFLUXD_REPORTING_DISABLED: телеметрія в InfluxData (О.о)
INFLUXD_TASKS_ENABLED: користуватись поки не планую
INFLUXD_FLUX_LOG_ENABLED: детальні логи Flux queries, поки логи нехай будуть, але потім можна буде відключити
INFLUXD_QUERY_MEMORY_BYTES: можна задати ліміт по пам’яті на кожен запит, але з моїм об’ємом даних – не варте
INFLUXD_UI_DISABLED: можна відключити веб-інтерфейс і працювати тільки з API, поки нехай буде, як повністю на Grafana переключусь – можна буде відключити
Для даних буду робити каталог в /data, там в мене зараз живуть сайти, це окремий Digtical Ocean volume, який автоматом бекапиться самим Digtical Ocean:
...
Press Enter to Continue
Successfully received certificate.
Certificate is saved at: /etc/letsencrypt/live/monitoring.example.org.ua/fullchain.pem
Key is saved at: /etc/letsencrypt/live/monitoring.example.org.ua/privkey.pem
This certificate expires on 2026-01-24.
Додавання NGINX virtualhost
В файлі /etc/nginx/conf.d/monitoring.example.org.ua.conf описуємо новий server і location:
root@setevoy-do-2023-09-02:/opt/influx# nginx -t
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful
Треба імпортувати вже існуючі дані з Google Sheets в InfluxDB і згенерувати метрики. Благо в мене з попередніх років в Таблицях все структуровано, InfluxDB вміє приймати .csv, тому тут проблем (майже) не було.
Завантажуємо таблицю собі на машину в .csv:
Отримуємо такий документ:
$ head 2025-Daily-Sleep-self.csv
Date,Sleep_rate_my_day,Sleepy_day,Wake_ups,Mults
2025-01-01,7,1,,
2025-01-02,7,1,,
2025-01-03,7,2,,
2025-01-04,5,3,,
Таблиць в мене кілька:
Для кожної зробимо окрему метрику, а в тегах використаємо імена колонок:
Найпростіший спосіб завантажити csv – через UI:
Але в даному випадку він не спрацює, бо не той формат дати – в мене 2025-01-09, а InfluxDB хоче повний rfc3339, тобто 2025-01-09T00:00:00Z.
Згадуємо, що колись вміли в awk Йдемо до ChatGPT, отримуємо команду для форматування дати:
Додаємо собі $PATH:/usr/libexec/docker/cli-plugins/:/opt/influx, налаштовуємо підключення:
root@setevoy-do-2023-09-02:/opt/influx# influx config create --config-name local --host-url http://localhost:8086 --org setevoy --token $INFLUX_TOKEN --active
Active Name URL Org
* local http://localhost:8086 setevoy
І завантажуємо дані – додаємо --header, бо формат InfluxDB вимагає цих анотацій, див. Extended annotated CSV:
root@setevoy-do-2023-09-02:/data/influx/import# influx write --bucket self-monitoring-1 --file 2025-Daily-Sleep-self-rfc3339.csv --format csv --header "#constant measurement,sleep_daily" --header "#datatype dateTime:RFC3339,double,double,double,double"
2025/10/26 11:32:24 line 303: no field data found
2025/10/26 11:32:24 line 304: no field data found
2025/10/26 11:32:24 Unable to batcher to error-file: invalid argument
2025/10/26 11:32:24 line 305: no field data found
2025/10/26 11:32:24 Unable to batcher to error-file: invalid argument
2025/10/26 11:32:24 line 306: no field data found
...
Таблиці за 2023 і 204 в мене окремими документами, аналогічно додаємо їх – і тепер маємо всі дані в одному місці:
Всі дані за 2.5 роки на одній дашборді.
Офігєть.
Веб-форма з Flask для внесення даних
Наступна задача – додати можливість вносити нові дані.
Перший варіант – продовжити писати в Google Sheets, на сервері скриптом отримувати їх, фіксити дату і пушити в базу, а скрипт запускати по крону.
Плюси – звична схема, і є “бекап” у вигляді гугл-таблиць.
Мінуси – буде проблема з тим, як в скрипті перевіряти які дані в базі вже є, аби не дублювати старі записи, і нові дані з Google Sheets в базі з’являться не відразу, а коли відпрацює крон.
Другий варіант – повністю нова схема: написати простеньку веб-сторінку, яка через InfluxDB клієнт буде записувати нові дані.
Мінуси – доведеться налаштовувати додатковий location в NGINX і запускати якийсь сервіс, який це скрипт буде оброблювати.
Врешті-решт все ж зупинився на другому варіанті.
Як це буде працювати:
gunicorn для запуску Flask app
index.html шаблон
metrics.json з описом метрик і їхніх тегів
app.py, який отримує дані з форми вводу в HTML і виконує операції в InfluxDB
Шаблон для метрик
Аби спростити життя далі – щоб простіше було додавати нові метрики – створимо JSON, який буде використовуватись в app.py аби формувати список метрик і їхніх тегів.
import os
import json
from datetime import date, datetime, time, timezone
from flask import Flask, render_template, request, jsonify
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
app = Flask(__name__)
# === InfluxDB config ===
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "tOx***iuw=="
INFLUX_ORG = "setevoy"
# default bucket, if user doesn't choose one from the html form
DEFAULT_BUCKET = "self-monitoring-1"
# load metrics from the 'metrics.json'
METRICS_FILE = os.path.join(os.path.dirname(__file__), "metrics.json")
with open(METRICS_FILE, "r") as f:
METRICS = json.load(f)
@app.get("/set")
@app.get("/set/")
def index():
"""Render HTML form with today's date pre-filled"""
return render_template(
"index.html",
metrics=METRICS,
today_date=date.today().isoformat()
)
@app.post("/set/submit")
def submit():
"""Handle form submission and write data to InfluxDB"""
form = request.form
# --- 1) Date from form or today ---
date_str = form.get("date")
if date_str:
try:
selected_date = datetime.fromisoformat(date_str).date()
except ValueError:
return jsonify({"ok": False, "error": "Bad date format, expected YYYY-MM-DD"}), 400
else:
selected_date = date.today()
# --- 2) Fixed time: 03:00 UTC ---
ts = datetime.combine(selected_date, time(3, 0, 0), tzinfo=timezone.utc)
wrote, errors = [], []
# --- 3) Get bucket from form or use default ---
bucket = form.get("bucket", DEFAULT_BUCKET)
try:
with InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG) as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
for measurement, fields in METRICS.items():
for field in fields:
raw = form.get(field)
if raw is None or raw == "":
continue
try:
val = float(raw)
except ValueError:
errors.append(f"{measurement}.{field}: not a number: {raw!r}")
continue
point = (
Point(measurement)
.field(field, val)
.time(ts, WritePrecision.NS)
)
# write to selected bucket
write_api.write(bucket=bucket, record=point)
wrote.append(f"{bucket}: {measurement}.{field}={val}")
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
html = f"""
<html>
<body style="font-family:Arial;margin:40px;">
<h3>Data successfully written</h3>
<p><b>Date:</b> {selected_date.isoformat()}</p>
<ul>
{''.join(f'<li>{w}</li>' for w in wrote)}
</ul>
<p><a href="/set"><button>Return to main page</button></a></p>
</body>
</html>
"""
return html
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080, debug=True)
В принципі, тут доволі простий скрипт:
@app.get("/set/"): роут, де буде наша форма, генерує сторінку з файлу index.html
@app.post("/set/submit") і функція submit(): де логіка виконання – є можливість задати дату, вибрати корзину в InfluxDB, в яку будемо писати, бере список метрик і тегів з metrics.json, і через InfluxDBClient вносить дані в InfluxDB
в кінці виводиться ще одна форма з інформацією про те, що саме було записано, і малює кнопку “повернутись назад”
Файл templates/index.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Self Monitoring</title>
<style>
body { font-family: Arial; margin: 40px; }
.metric-block { margin-bottom: 30px; }
label { display: inline-block; width: 180px; }
input { width: 80px; }
</style>
<script>
// set date input to yesterday in local time (YYYY-MM-DD)
function setYesterday() {
const d = new Date();
d.setDate(d.getDate() - 1);
const y = d.getFullYear();
const m = String(d.getMonth() + 1).padStart(2, '0');
const day = String(d.getDate()).padStart(2, '0');
document.getElementById('date').value = `${y}-${m}-${day}`;
}
</script>
</head>
<body>
<h2>Self Monitoring</h2>
<form action="/set/submit" method="post">
<!-- Bucket selector -->
<div style="margin-bottom:16px;">
<label for="bucket">Bucket:</label>
<select id="bucket" name="bucket" required>
<option value="self-monitoring-1">self-monitoring-1</option>
<option value="self-monitoring-test">self-monitoring-test</option>
</select>
</div>
<!-- Date picker -->
<div style="margin-bottom:16px;">
<label for="date">Date:</label>
<input type="date" id="date" name="date" value="{{ today_date }}" required>
<button type="button" onclick="setYesterday()">Yesterday</button>
<small>UTC midnight will be used</small>
</div>
{% for measurement, fields in metrics.items() %}
<div class="metric-block">
<h3>{{ measurement }}</h3>
{% for field in fields %}
<div>
<label for="{{ field }}">{{ field }}:</label>
<input type="number" step="any" name="{{ field }}" id="{{ field }}">
</div>
{% endfor %}
</div>
{% endfor %}
<input type="submit" value="Submit">
</form>
</body>
</html>
Я трохи повозився з influx backup, але постійно ловив 401, не став заморачуватись, бо дані оновлюються рідко, тому просто навайбокодив простенький скрипт на bash:
#!/bin/bash
# backup InfluxDB data directory and upload to S3
# set vars
SRC_DIR="/opt/influx"
BACKUP_DIR="/backups/influx"
DATE=$(date +%Y-%m-%d)
ARCHIVE_NAME="${DATE}-influx.tar.gz"
ARCHIVE_PATH="${BACKUP_DIR}/${ARCHIVE_NAME}"
S3_BUCKET="s3://setevoy-influx-backups"
# create backup directory if not exists
mkdir -p "$BACKUP_DIR"
# create tar.gz archive
tar -czf "$ARCHIVE_PATH" -C "$SRC_DIR" .
# check that archive was created
if [ ! -f "$ARCHIVE_PATH" ]; then
echo "❌ Failed to create backup archive!"
exit 1
fi
# upload to S3
aws s3 cp "$ARCHIVE_PATH" "$S3_BUCKET/$ARCHIVE_NAME"
# check upload result
if [ $? -eq 0 ]; then
echo "✅ Uploaded to S3: $S3_BUCKET/$ARCHIVE_NAME"
# remove local archive after successful upload
rm -f "$ARCHIVE_PATH"
echo "🧹 Local archive removed: $ARCHIVE_PATH"
else
echo "⚠️ Upload to S3 failed, keeping local copy."
exit 1
fi
Запускаємо для перевірки:
root@setevoy-do-2023-09-02:~# chmod +x /opt/influx/backup_data.sh
root@setevoy-do-2023-09-02:~# /opt/influx/backup_data.sh
upload: ../backups/influx/2025-10-27-influx.tar.gz to s3://setevoy-influx-backups/2025-10-27-influx.tar.gz
✅ Uploaded to S3: s3://setevoy-influx-backups/2025-10-27-influx.tar.gz
🧹 Local archive removed: /backups/influx/2025-10-27-influx.tar.gz
Є в мене давня ідея self-monitoring, яку, сподіваюсь, я такі почну робити і про яку напишу окремо.
Але суть її така сама, як і в етіх ваших моніторингах – збирати метрики, і відображати графіки.
Почав під цю систему вибирати базу даних, і хоча там частота запису метрик невелика, 1 метрика на день, але хочу її робити у звичному мені time series форматі – як ми це робимо в VictoriaMetrics/Prometheus.
А в рамках написання іншого поста, про структуру TSDB та метрики (все ще в чернетках), я торкнувся InfluxDB, про яку згадав і цього разу.
Саму InfluxDB я трохи використовував ще років п’ять тому, але зовсім трохи – вона просто була одним з бекендів для Grafana, коли ми будували автоматичний load testing з JMeter в Kubernetes (колись до цього знов дійде, і напишу теж, бо там дуже класний сетап).
Але так, щоб самому використовувати InfluxDB – досвіду не було. І коли я зараз глянув на неї – то система прям дуже сподобалась, а тому для свого self-monitoring буду використовувати її.
Якщо дуже коротко – то для повноцінного моніторингу, для відносно великого проекту я все ж взяв би саме VictoriaMetrics, бо на великих об’ємах вона буде набагато краща в плані CPU/Memory.
Але для якогось pet project – InfluxDB можливо підійде краще за рахунок того, що в ній “з коробки” є можливість будувати дашборди з графіками, є власний alertmanager, є цікаві штуки для різних автоматизацій.
Втім, у InfluxDB є (відносний) недолік – це більш складна мова запитів, яких до того цілих дві – Flux та InfluxQL. Але можливості query builder для простого використання цілком достатньо.
InfluxDB overview
Власне InfluxDB – ще одна Time Series Database, як вже згадувані VictoriaMetrics або Prometheus.
Головна різниця – VictoriaMetrics та Prometheus працюють по pull-моделі (збирають дані з експортерів), а InfluxDB – це push-модель, коли експортери самі, власне, пушать дані в базу.
Різні і мови запитів – в VictoriaMetrics MetricsQL та PromQL в Prometheus маємо звичні нам функції типу rate() і sum by (), тоді як в InfluxDB це мова Flux (“functional data scripting language“), яка по суті являється повноцінною мовою програмування, та InfluxQL – яка більше схожа на SQL, але в InfluxDB v2 вмикається через костиль, і дефолтна мова саме Flux (але в InfluxDB v3 наче знову буде InfluxQL).
VictoriaMetrics/Prometheus – це частина CNCF-екосистеми і LGPT (Loki + Grafana + Prometheus + Tempo) або PLG (Prometheus + Loki + Grafana) стеків, а InfluxDB – це про TICK stack (Telegraf + InfluxDB + Chronograf + Kapacitor).
При цьому в InfluxDB v2 Chronograf та Kapacitor вже вбудовані в саму систему, окремо запускати не треба.
Ну і дані – VictoriaMetrics та Prometheus заточені під зберігання і роботу саме з “класичними” метриками, тоді як в InfluxDB можна збирати логи, дані з IoT девайсів, events, дані від Telegraf-плагінів тощо.
Крім того, InfluxDB наче краще підходить для довготривалого зберігання даних – і за рахунок самої моделі зберігання даних, і за рахунок вбудованих механізмів для data retention.
Ну і можливості візуалізації даних – якщо в VictoriaMetrics та Prometheus у нас “з коробки” є тільки базові графіки, бо це всеж більше бази даних, то в InfluxDB у нас є повноцінний інтерфейс, через який ми можемо робити всі потрібні налаштування і візуалізації
Запуск InfluxDB з Docker
Для “погратись” просто запустимо локально з Docker:
Але в v3 багато змін, вона не дуже сумісна з другою версією, а більшість гайдів будуть саме по другій, тому давайте працювати з нею.
Note: по ходу гуглінга знайшов цікавий матеріал – What InfluxDB Got Wrong, де як раз говориться про те, що команда InfluxData робить нові версії несумісні з попередніми, і це, звісно, не дуже гуд
Відкриваємо в браузері http://localhost:8086, налаштовуємо юзера, організацію, і дефолтний бакет (про бакети і інші концепти далі):
Відразу отримуємо пропозицію налаштування – “погратись”, advanced, або просто перейти в базу:
Клікаємо Quick start аби отримати якісь базові дані, де нам відразу автоматично налаштовується збір власних метрик InfluxDB і створюється дашборда:
Key concepts
Коротко пройдемось по основних поняттях.
Bucket: на відміну від VictoriaMetrics/Prometheus, в InfluxDB дані організовані в такі собі “корзини” або “бази даних”
Measurement: це по факту звичні нам з VictoriaMetrics/Prometheus метрики, і метрики (я їх буду назвати саме так, хоча, мабуть, це не дуже коректно з технічної точки зору) складаються з:
Tags: labels для метрик, індексуються для швидкого пошуку
Fields: поля зі значеннями, не індексуються
Timestamp: час додавання метрики
Point: конкретний запис (метрика + теги + значення + час), аналог Sample або data points в термінах VictoriaMetircs/Prometheus
Series: група записів (метрика + теги + значення), аналог Time Series в термінах VictoriaMetircs/Prometheus
Формат метрик відрізняється від VictoriaMetrics/Prometheus і записується в форматі line protocol.
Наприклад, у VictoriaMetircs запис може виглядати так:
Тут в InfluxDB метриці маємо власне ім’я метрики node_cpu_seconds_total, два теги зі значеннями – cpu=0,mode=user, поле value зі значенням, і timestamp.
Timestamp можна задавати в UNIX epoch, можна в ISO 8601, тобто 2025-10-25T12:00:00Z, але рекомендований і дефолтний формат – саме UNIX.
Доступ до InfluxDB
Тут маємо на вибір сам UI і Data Exporter, CLI-утиліту influx, та InfluxDB HTTP API для всякої автоматизації.
$ docker exec -ti influxdb influx --help
NAME:
influx - Influx Client
USAGE:
influx [command]
HINT: If you are looking for the InfluxQL shell from 1.x, run "influx v1 shell"
COMMANDS:
version Print the influx CLI version
write Write points to InfluxDB
bucket Bucket management commands
...
Дуже цікава фішка, аналог Jupyter Notebook – “жива” аналітика, експерименти із запитами, автоматизація запитів:
Дозволяє зберігати послідовності, які потім можна використати в InfluxDB Tasks.
Кожен Notebook розбитий на кілька cell, які можуть бути data source для отримання даних, visualization для графіків, і action – створити алерт або Task.
Насправді доволі потужний інструмент з купою плагінів, але для прикладу зберемо метрики CPU з хоста:
Зберігаємо:
І навіть отримуємо інструкції як запустити:
Прямо при запуску ми в Telegraf передаємо URL з конфігом – і він отримає саме ті налаштування, які ми робили на попередньому екрані, тобто нам взагалі не треба писати локальний telegraf.conf.
Прийшла задачка підняти для проекту цікавий сервіс Arize Phoenix для моніторингу і тюнингу використання LLM.
За сам сервіс багато не скажу, бо не користувався, але його запуск вийшов доволі цікавим.
Що будемо робити – спочатку з Helm запустимо тестовий варіант, подивитись як воно взагалі виглядає, потім зробимо повноцінну автоматизацію – Terraform для всяких сікретів, Helm для самого Phoenix.
Власне цей пост буде не стільки про сам Arize Phoenix, скільки просто приклад як з Terraform створити AWS Secrets, і як з Helm та External Secrets Operator ці сікрети отримати.
Тестовий запуск з Helm в Kubernetes
Phoenix підтримує різні варіанти запуску. нам цікавий Helm, документація тут – Kubernetes (helm).
Сам чарт є в Docker Hub (і далі це трохи вилізе боком), всі values є там жеж.
$ kk get all
NAME READY STATUS RESTARTS AGE
pod/phoenix-8677bcc44f-k8w2k 1/1 Running 1 (49s ago) 70s
pod/phoenix-postgresql-0 1/1 Running 0 70s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/phoenix-postgresql ClusterIP 172.20.11.177 <none> 5432/TCP 70s
service/phoenix-svc NodePort 172.20.85.64 <none> 4317:31314/TCP,6006:31180/TCP,9090:31897/TCP 70s
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/phoenix 1/1 1 1 70s
NAME DESIRED CURRENT READY AGE
replicaset.apps/phoenix-8677bcc44f 1 1 1 70s
NAME READY AGE
statefulset.apps/phoenix-postgresql 1/1 71s
По дефолту використовує власний контейнер з PostgreSQL, для Production будемо робити в AWS RDS.
Відкриваємо доступ до порту для WebUI:
$ kk port-forward service/phoenix-svc 6006
Переходимо в браузері на http://localhost:6006, логінимось.
Документація по аутентифікація – тут>>>, і там є цікаві моменти. наприклад, змінити пошту для адміна (і для Member? тобто для звичайних юзерів? не пробував) не міжна:
Neither an Admin nor Member is permitted to change email addresses.
ОК, воно працює – давайте думати про продакшен сетап.
AWS та Terraform
Що нам треба буде з ресурсів в AWS:
запис Route 53 з доменом для доступу юзерів
TLS сертифікат в AWS Certificate Manager
AWS Secrets Manager:
пароль для доступу до Postgres
два паролі для самого Phoenix
пароль для SMTP – навіть якщо він не використовується
resource "aws_route53_record" "phoenix_dns" {
zone_id = data.aws_route53_zone.ops.zone_id
name = local.phoenix_domain_name
type = "CNAME"
ttl = 300
records = [
data.aws_lb.shared_alb.dns_name
]
}
Виконуємо terraform init та terraform plan, перевіряємо, що все ок:
...
Terraform will perform the following actions:
# aws_route53_record.phoenix_dns will be created
+ resource "aws_route53_record" "phoenix_dns" {
+ allow_overwrite = (known after apply)
+ fqdn = (known after apply)
+ id = (known after apply)
+ name = "phoenix.ops.example.co"
+ records = [
+ "k8s-ops133externalalb-***.us-east-1.elb.amazonaws.com",
]
+ ttl = 300
+ type = "CNAME"
+ zone_id = "Z02***OYY"
}
Plan: 1 to add, 0 to change, 0 to destroy.
Сертифікат в AWS ACM
Далі для Ingress та ALB нам потрібно створити сертифікат під цей DNS:
PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD: пароль при сетапі
PHOENIX_ADMIN_SECRET: пароль після сетапу
чесно тут не дуже зрозумів, бо навіть якщо відразу створити і передати PHOENIX_ADMIN_SECRET – то перший логін все одно буде з PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD
# auth.defaultAdminPassword or PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD
# PHOENIX_ADMIN_SECRET
# PHOENIX_SECRET: A long string value that is used to sign JWTs for your deployment.
# PHOENIX_POSTGRES_PASSWORD
# PHOENIX_SMTP_PASSWORD
##############################################
### PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD ###
##############################################
# generate a random password
ephemeral "random_password" "ops_phoenix_default_admin_initail_secret_random_password" {
length = 12
special = true
}
# create an AWS Secret resource
resource "aws_secretsmanager_secret" "ops_phoenix_default_admin_initial_secret" {
name = "/ops/phoenix/phoenix_default_admin_initial_secret"
description = "Default Phoenix admin username and password"
recovery_window_in_days = 0
}
# create an AWS Secret value
resource "aws_secretsmanager_secret_version" "ops_phoenix_default_admin_initial_secret_version" {
secret_id = aws_secretsmanager_secret.ops_phoenix_default_admin_initial_secret.id
secret_string_wo = ephemeral.random_password.ops_phoenix_default_admin_initail_secret_random_password.result
secret_string_wo_version = 1
}
Деплоїмо, перевіряємо Route 53, ACM та Secrets Manager:
Повторюємо для решти – вони всі більш-менш однакові, тільки в деяких просто пароль, в деяких логін:пароль в JSON, і різна довжина.
Бо, наприклад, для PHOENIX_ADMIN_SECRET є перевірка на кількість символів:
...
atlas-phoenix-6865f69ffc-k7hwl:phoenix File "/phoenix/env/phoenix/config.py", line 772, in get_env_phoenix_admin_secret
atlas-phoenix-6865f69ffc-k7hwl:phoenix REQUIREMENTS_FOR_PHOENIX_SECRET.validate(phoenix_admin_secret, "Phoenix secret")
atlas-phoenix-6865f69ffc-k7hwl:phoenix File "/phoenix/env/phoenix/auth.py", line 255, in validate
atlas-phoenix-6865f69ffc-k7hwl:phoenix raise ValueError(err_text)
atlas-phoenix-6865f69ffc-k7hwl:phoenix ValueError: Phoenix secret must be at least 32 characters long
....
Описуємо ресурси:
...
############################
### PHOENIX_ADMIN_SECRET ###
############################
# generate a random password
ephemeral "random_password" "ops_phoenix_admin_secret_random_password" {
length = 32
special = true
}
# create an AWS Secret resource
resource "aws_secretsmanager_secret" "ops_phoenix_admin_secret" {
name = "/ops/phoenix/phoenix_admin_secret"
description = "Phoenix admin username and password"
recovery_window_in_days = 0
}
# create an AWS Secret value
resource "aws_secretsmanager_secret_version" "ops_phoenix_admin_secret_version" {
secret_id = aws_secretsmanager_secret.ops_phoenix_admin_secret.id
secret_string_wo = jsonencode({
login = "admin@localhost"
password = ephemeral.random_password.ops_phoenix_admin_secret_random_password.result
})
secret_string_wo_version = 3
}
######################
### PHOENIX_SECRET ###
######################
# generate a random password
ephemeral "random_password" "ops_phoenix_secret_random_password" {
length = 65
special = false
}
# create an AWS Secret resource
resource "aws_secretsmanager_secret" "ops_phoenix_secret" {
name = "/ops/phoenix/phoenix_secret"
description = "Phoenix secret string used to sign JWTs"
recovery_window_in_days = 0
}
# create an AWS Secret value
resource "aws_secretsmanager_secret_version" "ops_phoenix_secret_version" {
secret_id = aws_secretsmanager_secret.ops_phoenix_secret.id
secret_string_wo = ephemeral.random_password.ops_phoenix_secret_random_password.result
secret_string_wo_version = 1
}
#################################
### PHOENIX_POSTGRES_PASSWORD ###
#################################
# generate a random password
ephemeral "random_password" "ops_phoenix_postgres_random_password" {
length = 12
special = false
}
# create an AWS Secret resource
resource "aws_secretsmanager_secret" "ops_phoenix_postgres_credentials" {
name = "/ops/phoenix/phoenix_postgres_credentials"
description = "Phoenix PostgreSQL username and password"
recovery_window_in_days = 0
}
# create an AWS Secret value
resource "aws_secretsmanager_secret_version" "ops_phoenix_postgres_credentials_version" {
secret_id = aws_secretsmanager_secret.ops_phoenix_postgres_credentials.id
secret_string_wo = ephemeral.random_password.ops_phoenix_postgres_random_password.result
secret_string_wo_version = 3
}
#############################
### PHOENIX_SMTP_PASSWORD ###
#############################
# generate a random password
ephemeral "random_password" "ops_phoenix_smtp_password_random_password" {
length = 12
special = false
}
# create an AWS Secret resource
resource "aws_secretsmanager_secret" "ops_phoenix_smtp_password" {
name = "/ops/phoenix/ops_phoenix_smtp_password"
description = "Phoenix secret string used to sign JWTs"
recovery_window_in_days = 0
}
# create an AWS Secret value
resource "aws_secretsmanager_secret_version" "ops_phoenix_smtp_password_version" {
secret_id = aws_secretsmanager_secret.ops_phoenix_smtp_password.id
secret_string_wo = ephemeral.random_password.ops_phoenix_smtp_password_random_password.result
secret_string_wo_version = 2
}
З Terraform все, можемо готувати базу Postgres.
PostgreSQL user and database
Сервер у нас вже є, тому зараз просто створити базу і юзера.
Створюємо юзера, базу, даємо повний доступ до цієї бази:
ops_grafana_db=> CREATE USER ops_phoenix_user WITH PASSWORD '***';
CREATE ROLE
ops_grafana_db=> CREATE DATABASE ops_phoenix_db OWNER ops_phoenix_user;
CREATE DATABASE
ops_grafana_db=> GRANT ALL PRIVILEGES ON DATABASE ops_phoenix_db TO ops_phoenix_user;
GRANT
Тепер робимо helm dependency update, і ловимо “response status code 401” від Docker Hub:
...
Update Complete. ⎈Happy Helming!⎈
Error: could not retrieve list of tags for repository oci://registry-1.docker.io/arizephoenix/phoenix-helm: GET "https://registry-1.docker.io/v2/arizephoenix/phoenix-helm/phoenix/tags/list": response status code 401: unauthorized: authentication required: [map[Action:pull Class: Name:arizephoenix/phoenix-helm/phoenix Type:repository]]
Тому що Helm при dependency update намагається отримати всі доступні теги з tags/list, а в Docker Hub для цього потрібно залогінитись.
Логінитись туди я і не хочу, і це зламає можилу майбутню автоматизацію, тому робимо костиль.
Пишемо Makefile в якому додаємо таргет на helm pull oci://:
...
phoenix-helm:
auth:
# Kubernetes Secret name
name: phoenix-secret
# use AWS RDS instead of deploying local
postgresql:
enabled: false
database:
postgres:
host: db.monitoring.ops.example.co
user: ops_phoenix_user
db: ops_phoenix_db
...
Деплоїмо, перевіряємо:
Налаштування Ingress
Сам Ingress enabled by default, тому нам треба тільки додати атрибути, через які він “замапиться” на наш загальний AWS Application Load Balancer через анотацію alb.ingress.kubernetes.io/group.name.
Але і тут є нюанс: в чарті нема можливості задати spec.ingressClassName="alb".
Тому робимо трохи deprecated way, теж через annotations:
Перший логін робимо з паролем PHOENIX_DEFAULT_ADMIN_INITIAL_PASSWORD, далі Phoenix запросить його змінити – задаємо наш із PHOENIX_ADMIN_SECRET, віддаємо девелоперам на погратись:
Дебажимо одну проблему з використанням пам’яті в Kubernetes Pods, і вирішили подивитись на пам’ять і кількість процесів на нодах.
Сама проблема полягає в тому, що зазвичай Kubernetes Pod з Livekit споживає близько 2 гігабайт пам’яті, але іноді бувають спайки до 10-11 гіг, через що под вбивається:
Що ми хочемо визначити: це один процес починає стільки пам’яті “їсти” – чи просто створюється багато процесів в контейнері?
Самий простий варіант тут – використати Prometheus Process Exporter, який запускається у вигляді DaemonSet, на кожній WorkerNode створює власний контейнер, і для всіх чи обраних процесів на EC2 збирає статистику з /proc.
root@backend-celery-workers-deployment-5bc64557c8-zbq2j:/app# ps aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 1 0.2 1.4 544832 236720 ? Ss 07:27 0:24 /usr/local/bin/python /usr/local/bin/celery -A celery_app.app worker [...]
...
Та Livekit:
root@backend-livekit-agent-deployment-7d9bf86564-qgjzb:/app# ps aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 1 0.4 1.8 2112944 294772 ? Ssl 07:06 0:46 python -m cortex.livekit_agent.main start
root 24 0.0 0.0 15788 12860 ? S 07:06 0:00 /usr/local/bin/python -c from multiprocessing.resource_tracker import main;main(34)
root 25 0.0 0.6 342976 102852 ? S 07:06 0:02 /usr/local/bin/python -c from multiprocessing.forkserver import main [...]
...
Додаємо конфіг для process-exporter – описуємо nameMatchers:
...
process-exporter:
enabled: true
tolerations:
operator: Exists
effect: NoSchedule
- key: CriticalAddonsOnly
config:
# metrics will be broken down by thread name as well as group name
threads: true
# any process that otherwise isn't part of its own group becomes part of the first group found (if any) when walking the process tree upwards
children: true
# means that on each scrape the process names are re-evaluated
recheck: false
# remove_empty_groups drop empty groups if no processes found
remove_empty_groups: true
nameMatchers:
# gunicorn (python + uvicorn workers)
- name: "gunicorn"
exe:
- /usr/local/bin/python
cmdline:
- ".*gunicorn.*"
# celery worker
- name: "celery-worker"
exe:
- /usr/local/bin/python
cmdline:
- ".*celery.*worker.*"
# livekit agent
- name: "livekit-agent"
exe:
- python
- /usr/local/bin/python
cmdline:
- ".*cortex.livekit_agent.main.*"
# livekit multiprocessing helpers
- name: "livekit-multiproc"
exe:
- /usr/local/bin/python
cmdline:
- ".*multiprocessing.*"
Тут в exe – список самого executable (можна кілька), а в cmdline – аргументи, з якими процес запущено.
Тобто для Livekit у нас exe – “/usr/local/bin/python“, а cmdline – це “-c from multiprocessing.resource_tracker [...]” або “-c from multiprocessing.forkserver [...]“.
Деплоїмо, і тепер залишилось тільки три групи:
Але є нюанси.
Перше – статистика збирається з кожної ноди по всій групі процесів.
Тобто, якщо ми зробимо:
sum(namedprocess_namegroup_memory_bytes{memtype="resident", groupname="celery-worker"}) by (groupname, instance, pod)
То отримаємо суму всіх RSS всіх Celery-воркерів на ноді, де запущений відповідний process-exporter Pod:
А друга проблема – що Process Exporter не має лейбли з іменем WorkerNode, з якої зібрані метрики.
Тому тут тільки шукати вручну – по Pod IP (лейбла instance) можемо знайти його Node:
$ kk get pod -o wide | grep 10.0.45.166
atlas-victoriametrics-process-exporter-4zdzl 1/1 Running 0 6m51s 10.0.45.166 ip-10-0-40-195.ec2.internal <none> <none>
Повертаючись до питання того, що немає інформації по кожному процесу: ми можемо отримати середнє значення по кожному, бо у нас є метрика namedprocess_namegroup_num_procs:
sum(namedprocess_namegroup_memory_bytes{memtype="resident", groupname="celery-worker", instance="10.0.45.166:9256"}) by (groupname, instance, pod)
/
sum(namedprocess_namegroup_num_procs{groupname="celery-worker", instance="10.0.45.166:9256"}) by (groupname, instance, pod)
Результат ~230 MB:
Як ми і бачили в ps -eo rss,cmd.
Name Group Template variables та інформація по кожному процесу
Або, якщо нам прям дуже хочеться бачити статистику по кожному процесу – ми можемо використати динамічні імена для groupname з {{.PID}} – тоді для кожного процесу буде формуватись окрема група, див. Using a config file: group name:
Але цей варіант ОК тільки для якщо вам треба щось подебажити, і відключити, бо призведе до High cardinality issue.
Результат нашого дебагу
Власне, що нам потрібно було дізнатись – пам’ять “утікає” в якомусь одному процесі, чи просто створюється багато процесів в одному Pod?
Для цього в Grafana зробили графік із запитом:
sum(namedprocess_namegroup_memory_bytes{memtype="resident", groupname=~"livekit-multiproc-.*"}) by (groupname, instance)
До нього додали графіки з метриками самого Livekit – lk_agents_active_job_count та lk_agents_child_process_count, і окремо – графік з VictoriaLogs, де виводимо кількість API-запитів кожного юзера по полю token_email:
Де і бачимо, що один і той же юзер починає робити пачку запитів для підключення до Livekit, через що в Livekit Pod створюється пачка процесів (по новій Livekit Job на кожен запит), і в результаті загальна кількість пам’яті в поді зашкалює, бо 40 процесів по ~380 MB це ~15 гігабайт пам’яті.
Але в кожному конкретному процесі пам’ять тримається на рівні 300-400 мегабайт.
Залишилось розібратись чому саме спавняться процеси, але то вже задачка девелоперам.
логи в CloudWatch Logs – можемо збирати в VictoriaLogs, і генерити якісь метрики і алерти, але я під час нашого production incent нічого цікавого в логах не побачив, див. Monitoring OpenSearch logs with Amazon CloudWatch Logs
і є Prometheus Exporter Plugin, який відкриває ендпоінт для збору метрик з Prometheus/VictoriaMetrics (але в AWS OpenSearch Managed його додати не можна, хоча сапорт обіцяє, ще feature request є – може колись додадуть)
CloudWatch метрики
Метрик досить багато, але з того, що може бути цікавим нам – враховуючи те, що у нас нема виділених master та coordinator nodes, і ми не використовуємо ultra-warm та cold інстанси.
Cluster metrics:
ClusterStatus: green/yellow/red – основний показник стану кластеру, контроль активності шардів даних
Shards: active/unassigned/delayedUnassigned/activePrimary/initializing/relocating – більш детальна інформація по стану шардів, але тут просто загальна кількість, без деталізації по конкретним індексам
Nodes: кількість нод в кластері – знаючи, скільки має бути живих нод – можемо алертити, коли якась нода відвалиться
SearchableDocuments: не те щоб саме для нас було дуже цікаво, але можливо буде корисним потім, аби бачити що взагалі твориться в індексах
CPUUtilization: відсоток використання CPU разом на всіх нодах, і це прям must-have
FreeStorageSpace: теж корисно моніторити
ClusterIndexWritesBlocked: чи все ОК із записами в індекс
JVMMemoryPressure та OldGenJVMMemoryPressure: відсоток використання пам’яті JVM heap – далі окремо копнемо в моніторинг JVM, бо це прям окремий геморой
AutomatedSnapshotFailure: мабуть, good to know, якщо бекап сфейлиться
CPUCreditBalance: нам корисно, бо ми на t3 інстансах (але у нас в CloudWatch її нема)
2xx, 3xx, 4xx, 5xx`: дані по HTTP-запитам і помилкам
тут треба буде дивитись на метрики з EBS volume metrics, але для початку можна просто додати алерти на Throttle взагалі
HighSwapUsage: аналогічно до попередніх метрик – колись мали біду в RDS, тому краще помоніторити і тут
EBS volume metrics – тут в принципі стандартні метрики EBS, як і для EC2 або RDS:
ReadLatency та WriteLatency: затримки читання/запису
іноді бувають спайки, тому можна додати
ReadThroughput та WriteThroughput: “пропускна здатність”? загальне навантаження на диск, давайте скажемо так
DiskQueueDepth: черга I/O операцій
у нас в CloudWatch пуста (поки що?), тому скіпаємо
ReadIOPS та WriteIOPS: кількість операцій читання/запису на секунду
Instance metrics – тут метрики по кожному OpenSearch інстансу (не серверу, EC2, а самого OpenSearch) на кожній ноді:
FetchLatency та FetchRate: як швидко отримуємо дані з шардів (але в CloudWatch теж не знайшов)
ThreadCount: кількість потоків в операційній системі, які були створені JVM (Garbadge Collector threads, search threads, write/index threads, etc)
в CloudWatch значення стабільне, але в Grafana для загальної картини поки можна додати, подивимось, чи буде там щось цікаве
ShardReactivateCount: як часто шарди зі станів cold/inactive переводяться в активні, що потребує ресурсів операційної системи і CPU та пам’яті; ну… може бути, треба глянути чи воно взагалі у нас має якісь значення
але в CloudWatch теж нічого – “did not match any metrics“
ConcurrentSearchRate та ConcurrentSearchLatency: кількість і швидкість одночасних запитів на пошук – може бути цікавим, якщо довго висять багато паралельних запитів
але у нас (поки що?) ці значення постійно на нулі, тому скіпаємо
SearchRate: кількість пошукових запитів на хвилину, корисно для загальної картини
SearchLatency: швидкість виконання пошукових запитів, мабуть, дуже корисно, можна навіть алерт прикрутити
IndexingRate та IndexingLatency: аналогічно, але для індексації нових документів
SysMemoryUtilization: відсоток використання пам’яті на дата-ноді, але це не дасть повноцінної картини, треба дивитись на пам’ять JVM
JVMGCYoungCollectionCount та JVMGCOldCollectionCount: кількість запусків Garbage Collectors, корисно разом з даними по JVM memory, поговоримо далі детальніше
SearchTaskCancelled та SearchShardTaskCancelled: про погані новини 🙂 якщо задачі канселяються – щось явно йде не так (або юзер сам перервав виконання запиту, або HTTP connection reset, або таймаути, чи навантаження на кластер)
але у нас завжди по нулях, навіть коли кластер падав, тому поки сенсу збору цих метрик не бачу
ThreadpoolIndexQueue та ThreadpoolSearchQueue: кількість задач на індексацію та пошук в черзі, коли їх забагато – маємо ThreadpoolIndexRejected та ThreadpoolSearchRejected
ThreadpoolIndexQueue в CloudWatch нема взагалі, а ThreadpoolSearchQueue є, але теж постійно в нулях, тому поки скіпаємо
ThreadpoolIndexRejected та ThreadpoolSearchRejected: власне, вище
в CloudWatch картина аналогічна – ThreadpoolIndexRejected нема взагалі, ThreadpoolSearchRejected в нулях
ThreadpoolIndexThreads та ThreadpoolSearchThreads: максимальна кількість потоків операційної системи для індексації та пошуку, якщо всі зайняті – то запити підуть в ThreadpoolIndexQueue/ThreadpoolSearchQueue
в OpenSearch є кілька типів пулів для потоків – search, index, write і т.д., і для кожного пулу є показник threads (скільки виділено), queue – черга, rejected – відхилено, бо черга переповнена, див. OpenSearch Threadpool
в Node Stats API (GET _nodes/stats/thread_pool) є показник active threads, але в CloudWatch такого не бачу
ThreadpoolIndexThreads у нас в CloudWatch взагалі нема, а ThreadpoolSearchThreads статична, поки, думаю, можна скіпнути їхній моніторинг
PrimaryWriteRejected: відхилені операції записи в primary-шарди через проблеми в thread pool write або index, чи навантаження на дата-ноді
в CloudWatch поки пусті, але додамо збір і алерт
ReplicaWriteRejected: відхилені операції записи в replica-шарди – в primary документ додано, але не може записати в репліку
в CloudWatch поки пусті, але додамо збір і алерт
k-NN metrics – нам корисно, бо у нас vector store з k-NN:
KNNCacheCapacityReached: коли кеш повністю зайнятий (див. далі)
KNNEvictionCount: як часто дані з кешу видаляються – ознака, що пам’яті не вистачає
KNNGraphMemoryUsage: використання off-heap пам’яті під графи самого вектору
KNNGraphQueryErrors: кількість помилок при пошуку в векторах
в CloudWatch поки пусті, але додамо збір і алерт
KNNGraphQueryRequests: загальна кількість запитів до k-NN graphs
KNNHitCount та KNNMissCount: скільки результатів було повернуто з кешу, а скільки довелось зчитувати з диску
KNNTotalLoadTime: швидкість завантаження з диску в кеш (великі графи або завантажений EBS – буде рости час)
Моніторинг Memory
Давайте подумаємо як нам основнім показники помоніторити, і першим – пам’ять, бо це ж Java.
Що у нас є по пам’яті з метрик?
SysMemoryUtilization: відсоток використання пам’яті на сервері (дата-ноді) взагалі
JVMMemoryPressure: загальний відсоток використання JVM Heap; JVM Heap по дефолту виділяється в 50% від пам’яті серверу, але не більше 32 гіг
в CloudWatch ще є метрика KNNGraphMemoryUsagePercentage – але в документації її нема
kNN Memory usage
Спершу коротенько про пам’ять під k-NN.
Отже, на EC2 у нас виділяється пам’ять під JVM Heap (50% доступної на сервері), і окремо – off-heap для OpenSearch vector store, де він тримає графи та кеш vectore store – див. Approximate k-NN search, плюс під саму операційну систему і її файловий кеш.
Якоїсь метрики типу “KNNGraphMemoryAvailable” у нас нема, але маючи KNNGraphMemoryUsagePercentage та KNNGraphMemoryUsage можемо її порахувати:
KNNGraphMemoryUsage: у нас зараз 662 мегабайти
KNNGraphMemoryUsagePercentage: 60%
Значить, під k-NN graphs виділяється 1 гігабайт поза JVM Heap memory (це на t3.medium.search).
OpenSearch Service uses half of an instance’s RAM for the Java heap (up to a heap size of 32 GiB). By default, k-NN uses up to 50% of the remaining half
Знаючи, що у нас зараз t3.medium.search, на яких видається 4 гігабайти пам’яті – 2 GB йде під JVM Heap, і 1 гігабайт – під k-NN графи.
Основну частину KNNGraphMemory використовує k-NN cache, тобто частина оперативної пам’яті системи, в якій OpenSearch тримає HNSW-графи з векторних індексів, аби не зчитувати їх кожного разу з диску (див. k-NN clear cache).
Тому корисно мати графіки по EBS IOPS та використанню k-NN cache.
Stack Memory: окрім JVM Heap маємо Stack, який виділяється кожному потоку, де він тримає свої змінні, посилання, параметри запуску
задається через -Xss, дефолтне значення від 256 кілобайт до 1 мегабайту, див. Understanding Threads and Locks (не знайшов, як подивитись в OpenSearch Service)
якщо маємо багато threads – буде багато пам’яті під їхні стеки
очищується, коли thread вмирає
Heap Space:
використовується для виділення пам’яті, яка доступна всім потокам
керується Garbage Collectors (GC)
в контексті OpenSearch у нас тут будуть кеши пошуку і індексацій
В Heap memory у нас є:
Young Generation: свіженькі дані, усі нові об’єкти
дані звідси або видаляються зовсім, або переміщаються в Old Generation
Old Generation: сам код процесу OpenSearch, кеші, індексні структури Lucene, великі масиви
Якщо OldGenJVMMemoryPressure забитий – значить, Garbage Collector не може його почистити, бо на дані є посилання, і тоді маємо проблему – бо в Heap нема місця для нових даних, і JVM може впасти з помилкою OutOfMemoryError.
Взагалі “heap pressure” – це коли в Young Gen і Old Gen мало вільної пам’яті, і нема де розмістити нові дані, аби відповісти клієнтам.
Це призводить до частого запуску Garbage Collector, що займає час та ресурси системи – замість обробки запитів від клієнтів.
В результаті latency зростає, індексація нових документів гальмує, або взагалі отримуємо ClusterIndexWritesBlocked – аби уникнути Java OutOfMemoryError, бо при індексації OpenSearch спочатку пише дані в Heap, а потім “скидається” на диск.
Отже – для картини використання пам’яті моніторимо:
SysMemoryUtilization – для загальної картини по стану EC2
в нашому випадку тут буде стабільно близько 90%, але це ОК
JVMMemoryPressure – для загальної картини по JVM
має регулярно чиститись з Garbage Collector (GC)
якщо постійно вище 80-90% – є проблеми з запуском GC
OldGenJVMMemoryPressure – для даних по Old Generation Heap
має бути на рівні 30-40%, якщо вище і не вичищається – то проблеми або з кодом, або з GC
KNNGraphMemoryUsage – в нашому випадку треба для загальної картини
І варто додати алерти на HighSwapUsage – у нас вже відбувався активний swapping, коли запустились на t3.small.search, і це показник того, що пам’яті недостатньо.
Збір метрик до VictoriaMetrics
Власне, як вибрати метрики?
Спершу шукаємо їх в CloudWatch Metrics, і дивимось чи взагалі метрика є, і чи вона повертає якісь цікаві дані.
Наприклад, SysMemoryUtilization дає інфу.
Отуто у нас на t3.small.search був спайк, після якого кластер впав:
А ось метрика HighSwapUsage – теж до переїзду на t3.medium.search:
ClusterStatus є:
Shards є, але це по всім індексам, і нема можливості фільтрувати по окремим:
Ну і треба мати на увазі, що збір метрик з CloudWatch теж коштує грошей за API-запити, тому все підряд збирати не варто.
Взагалі для збору метрик з CloudWatch ми користуємось YACE (Yet Another CloudWatch Exporter), але він не підтримує OpenSearch Managed cluser – див. Features.
Зверніть увагу, що для різних метрик можуть бути різні Dimenstions – перевіряємо їх в CloudWatch:
Деплоїмо, перевіряємо:
І навіть цифри вийшли такі, як ми рахували в першому пості – маємо ~130000 документів в production index, по формулі num_vectors * 1.1 * (4*1024 + 8*16) виходить 604032000 байт, або 604.032 мегабайт.
А на графіку маємо 662261 kilobytes – це 662 мегабайти, але по всім індексам разом.
Тепер у VictoriaMetrics у нас є метрики aws_es_knngraph_memory_usage_average, aws_es_sys_memory_utilization_average, aws_es_jvmmemory_pressure_average, aws_es_old_gen_jvmmemory_pressure_average.
Аналогічно додаємо решту.
Для пошуку того, як саме метрики називаються в VictoriaMetrics/Prometheus – відкриваємо порт до CloudWatch Exporter:
$ kk port-forward svc/atlas-victoriametrics-prometheus-cloudwatch-exporter 9106
І з curl та grep шукаємо метрики:
$ curl -s localhost:9106/metrics | grep aws_es
# HELP aws_es_cluster_status_green_maximum CloudWatch metric AWS/ES ClusterStatus.green Dimensions: [ClientId, DomainName] Statistic: Maximum Unit: Count
# TYPE aws_es_cluster_status_green_maximum gauge
aws_es_cluster_status_green_maximum{job="aws_es",instance="",domain_name="atlas-kb-prod-cluster",client_id="492***148",} 1.0 1758014700000
# HELP aws_es_cluster_status_yellow_maximum CloudWatch metric AWS/ES ClusterStatus.yellow Dimensions: [ClientId, DomainName] Statistic: Maximum Unit: Count
# TYPE aws_es_cluster_status_yellow_maximum gauge
aws_es_cluster_status_yellow_maximum{job="aws_es",instance="",domain_name="atlas-kb-prod-cluster",client_id="492***148",} 0.0 1758014700000
# HELP aws_es_cluster_status_red_maximum CloudWatch metric AWS/ES ClusterStatus.red Dimensions: [ClientId, DomainName] Statistic: Maximum Unit: Count
# TYPE aws_es_cluster_status_red_maximum gauge
aws_es_cluster_status_red_maximum{job="aws_es",instance="",domain_name="atlas-kb-prod-cluster",client_id="492***148",} 0.0 1758014700000
...
Створення Grafana dahsboard
ОК, метрики з CloudWatch маємо – їх поки вистачить.
Подумаємо, що ми хочемо бачити в Grafana.
Загальна ідея – така собі “overview” дашборда, де на одній борді будуть відображатись всі головні дані по кластеру.
Які метрики зараз є, і як ми їх можемо використати в Grafana – я їх тут собі виписував, аби не заплутатись, бо їх вийшло багатенько:
aws_es_cluster_status_green_maximum, aws_es_cluster_status_yellow_maximum, aws_es_cluster_status_red_maximum: можна зробити одну Stats панель
aws_es_nodes_maximum: теж якусь Stats панель – знаємо, скільки має бути, і будемо робити червоним, коли Data Nodes менше, ніж має бути
aws_es_searchable_documents_maximum: просто інтересу заради – графіком покажемо кількість документів разом в усіх індексах
aws_es_cpuutilization_average: одним графіком по кожній ноді, і якусь Stats з загальною інформацією і різними кольорами
aws_es_free_storage_space_maximum: просто Stats
aws_es_cluster_index_writes_blocked_maximum: не став додавати в Grafana, тільки алерт
aws_es_jvmmemory_pressure_average: графік і Stats
aws_es_old_gen_jvmmemory_pressure_average: десь поруч, теж графіком + Stats
aws_es_automated_snapshot_failure_maximum: це просто для алерта
aws_es_5xx_maximum: і графік, і Stats
aws_es_iops_throttle_maximum: графік, аби бачити в порівнянні з іншими даними типу CPU/Mem usage
aws_es_throughput_throttle_maximum: графік
aws_es_high_swap_usage_maximum: і графік, і Stats – графік, аби бачити в порівнянні з CPU/дисками
aws_es_read_latency_average: графік
aws_es_write_latency_average: графік
aws_es_read_throughput_average: не став додавати, бо забагато графіків
aws_es_write_throughput_average: не став додавати, бо забагато графіків
aws_es_read_iops_average: графік, корисно, аби розуміти роботу кешу k-NN – якщо його мало (а ми тестили на t3.small.search з 2 гігабайтами загальної пам’яті) – то читання з диску буде багато
aws_es_write_iops_average: аналогічно
aws_es_thread_count_average: не став додавати, бо воно доволі статичне і якось сильно корисної інформації не побачив
aws_es_search_rate_average: теж просто графік
aws_es_search_latency_average: аналогічно, десь поруч
aws_es_sys_memory_utilization_average: ну, воно постійно буде десь під 90%, поки прибрав з Grafana, але додав в алерти
aws_es_jvmgcyoung_collection_count_average: графік, бачити як часто викликається
aws_es_jvmgcold_collection_count_average: графік, бачити як часто викликається
aws_es_primary_write_rejected_average: графік, але поки не став додавати, бо забагато графіків – тільки алерт
aws_es_replica_write_rejected_average: графік, але поки не став додавати, бо забагато графіків – тільки алерт
k-NN:
aws_es_knncache_capacity_reached_maximum: тільки для warning-алерту
aws_es_knneviction_count_average: не став додавати, хоча може бути цікавим
aws_es_knngraph_memory_usage_average: не став додавати
aws_es_knngraph_memory_usage_percentage_maximum: графік, замість aws_es_knngraph_memory_usage_average
aws_es_knngraph_query_errors_maximum: тільки алерт
aws_es_knngraph_query_requests_sum: графік
aws_es_knnhit_count_maximum: графік
aws_es_knnmiss_count_maximum: графік
aws_es_knntotal_load_time_sum: було непогано мати графік, але нема місця на борді
VictoriaMetrics/Prometheus sum(), avg() та max()
Спершу давайте згадаємо які у нас є функції для агрегації даних.
З CloudWatch для OpenSearch ми будемо отримувати два основні типи – counter та gauge:
$ curl -s localhost:9106/metrics | grep cpuutil
# HELP aws_es_cpuutilization_average CloudWatch metric AWS/ES CPUUtilization Dimensions: [ClientId, DomainName, NodeId] Statistic: Average Unit: Percent
# TYPE aws_es_cpuutilization_average gauge
aws_es_cpuutilization_average{job="aws_es",instance="",domain_name="atlas-kb-prod-cluster",node_id="BzX51PLwSRCJ7GrbgB4VyA",client_id="492***148",} 10.0 1758099600000
...
Різниця між ними:
counter: значення може тільки збільшувати значення
gauge: значення може збільшуватись і зменшуватись
Тут у нас “TYPE aws_es_cpuutilization_average gauge“, бо використання CPU може і збільшуватись, і зменшуватись.
З sum() без лейбл ми просто отримаємо суму всіх значень:
Якщо зробимо sum by (node_id) – то отримаємо значення для конкретної тайм-серії, яка тут буде збігатись з вибіркою без sum by ():
(значення міняється, поки пишу і роблю запити)
З max() без фільтрів – отримаємо просто максимальне значення, вибране з усіх отриманих тайм-серій:
А з avg() – середнє значення всіх значень, тобто сума всіх значень поділена на кількість тайм-серій:
Порахуємо самі:
(41+46+12)/3
33
Власне, чому я про це став писати окремо – бо з sum() навіть із by (node_id) іноді можна отримати такі во спайки:
Хоча без sum() їх нема:
А траплялись вони через те, що в цей момент перестворювався Pod з CloudWatch Exporter:
І в цей момент ми отримували дані зі старого поду, і з нового.
Тому тут варіант або використовувати max(), або просто avg(). Хоча max() все ж, мабуть, краще, бо нам цікаві “найгірші” показники.
Окей – з цим розібрались, погнали робити дашборду.
Cluster status
Тут хочеться на одній Stats панелі бачити всі три значення – Green, Yellow, Red.
Але так як в Grafana у нас нема if/else, то зробимо “костиль”.
Збираємо всі три метрики, і результат кожної множимо на 1, 2, чи 3:
sum(aws_es_cluster_status_green_maximum) by (domain_name) * 1 +
sum(aws_es_cluster_status_yellow_maximum) by (domain_name) * 2 +
sum(aws_es_cluster_status_red_maximum) by (domain_name) * 3
Відповідно, якщо aws_es_cluster_status_green_maximum == 1, то 1 * 1 == 1, а aws_es_cluster_status_yellow_maximum == 0 і aws_es_cluster_status_red_maximum будуть == 0 – то і множення поверне 0.
А якщо aws_es_cluster_status_green_maximum стане 0, але aws_es_cluster_status_red_maximum буде 1 – то 1 * 2 отримаємо 3, і по значенню 3 будемо міняти показник в Stats-панелі
І додаємо Value mappings з текстом і кольорами:
Отримуємо такий результат:
Nodes status
Тут все просто – знаємо потрібну кількість, поточну отримуємо з aws_es_nodes_maximum:
sum(aws_es_nodes_maximum) by (domain_name)
І знов через Value mappings задаємо значення і кольори:
На випадок, якщо колись збільшимо кількість нод, і забудемо оновити тут значення для “ОК” – то додаємо третій статус, ERR:
CPUUtilization: Stats
Тут зробимо кросивенько – з типом візуалізації Gauge:
avg(aws_es_cpuutilization_average) by (domain_name)
Задаємо Text size та Unit:
І Thresholds:
Description непогано генерить ChatGPT – корисно і девелоперам, і нам самим через півроку, або просто беремо опис з документації AWS:
The percentage of CPU usage for data nodes in the cluster. Maximum shows the node with the highest CPU usage. Average represents all nodes in the cluster.
Додаємо решту Stats:
CPUUtilization: Graph
Тут виведемо графік по CPU кожної ноди – середнє за 5 хвилин:
max(avg_over_time(aws_es_cpuutilization_average[5m])) by (node_id)
І ось теж приклад того, як з sum() з’являлись спайки, яких не було насправді:
Тому робимо max().
Задамо Gradient mode == Opacity, і Unit == percent:
Задаємо Color scheme і Thresholds, включаємо Show thresholds:
В Data links можна задати лінку на сторінку DataNode Health в AWS Console:
Actions, мабуть, не так давно з’явилось, ще не використовував, але виглядає цікаво – можна щось пушнути:
JVMMemoryPressure: Graph
Тут нам цікаво бачити чи не “залипає” використання пам’яті, і як часто запускається Garbage Collector.
Запит простий – можна зробити max by (node_id), але я зробив просто загальну картину по кластеру:
max(aws_es_jvmmemory_pressure_average)
І графік аналогічно попередньому:
В Desription додаємо пояснення “коли хвилюватись”:
Represents the percentage of JVM heap in use (young + old generation).
Values below 75% are normal. Sustained pressure above 80% indicates frequent GC and potential performance degradation.
Values consistently > 85–90% mean heap exhaustion risk and may trigger ClusterIndexWritesBlocked – investigate immediately.
JVMGCYoungCollectionCount and JVMGCOldCollectionCount
Дуже корисний графік, аби бачити як часто зпускаються Garbage Collects.
В запиті використаємо increase[1m] – побачити як змінилось значення за хвилину:
max(increase(aws_es_jvmgcyoung_collection_count_average[1m])) by (domain_name)
І для Old Gen:
max(increase(aws_es_jvmgcold_collection_count_average[1m])) by (domain_name)
Unit – ops/sec, Decimals задаємо 0, аби мати тільки цілі значення:
KNNHitCount vs KNNMissCount
Тут зробимо дані на секунду – rate():
sum(rate(aws_es_knnhit_count_average[5m]))
І для Cache Miss:
sum(rate(aws_es_knnmiss_count_average[5m]))
Unit ops/s, кольори можемо задати через Overrides:
Статистика тут, до речі, дуже так собі – стабільно багато Cache missed, але чому – поки не розібрались.
Фінальний результат
Збираємо всі графіки, і отримуємо щось таке:
t3.small.search vs t3.medium.search на графіках
І приклад того, як нестача ресурсів, в першу чергу пам’яті, відображається на графіках: у нас були t3.medium.search, потім ми повернули t3.small.search, аби подивитись як воно на перформанс вплине.
t3.small.search – це лише 2 гігабайти пам’яті і 2 ядра CPU.
З цих 2 гіг пам’яті 1 гіг під JVM Heap, 500 мегабайт під k-NN memory, і 500 залишалось на решту процесів.
Ну і результати, цілком очікувані:
Garbage Collectors стали запускатись постійно, бо треба було чистити пам’ять, якої не вистачало
Read IOPS виріс, бо постійно з диска завантажувались дані до JVM Heap Young і k-NN
Search Latency виріс, бо не всі дані були в кеші, і чекали I/O-операцій з диску
і CPU utilization підскочив – бо CPU був завантажений і Garbage Collectors, і читанням з диску
OpenSearch ClusterStatus Yellow та OpenSearch ClusterStatus Red: тут просто якщо більше ніж 0:
...
- alert: OpenSearch ClusterStatus Yellow
expr: sum(aws_es_cluster_status_yellow_maximum) by (domain_name, node_id) > 0
for: 1s
labels:
severity: warning
component: backend
environment: prod
annotations:
summary: 'OpenSearch ClusterStatus Yellow status detected'
description: |-
The primary shards for all indexes are allocated to nodes in the cluster, but replica shards for at least one index are not
*OpenSearch Doman*: `{{ "{{" }} $labels.domain_name }}`
grafana_opensearch_overview_url: 'https://{{ .Values.monitoring.root_url }}/d/b2d2dabd-a6b4-4a8a-b795-270b3e200a2e/aws-opensearch-cluster-cloudwatch'
- alert: OpenSearch ClusterStatus Red
expr: sum(aws_es_cluster_status_red_maximum) by (domain_name, node_id) > 0
for: 1s
labels:
severity: critical
component: backend
environment: prod
annotations:
summary: 'OpenSearch ClusterStatus RED status detected!'
description: |-
The primary and replica shards for at least one index are not allocated to nodes in the cluster
*OpenSearch Doman*: `{{ "{{" }} $labels.domain_name }}`
grafana_opensearch_overview_url: 'https://{{ .Values.monitoring.root_url }}/d/b2d2dabd-a6b4-4a8a-b795-270b3e200a2e/aws-opensearch-cluster-cloudwatch'
...
Через labels у нас реалізований роутинг алертів в Opsgenie до потрібних каналів Slack, а анотація grafana_opensearch_overview_url використовуються для додавання лінки на Grafana в повідомленні в Slack:
OpenSearch CPUHigh – якщо більше 20% протягом 10 хвилин:
Тепер напишемо Terraform code для створення кластера, юзерів та індексів.
Створювати кластер будемо в VPC, для аутентифікації використаємо internal user database.
А в VPC не можна… Бо – suprize! – AWS Bedrock вимагає OpenSeach Managed кластер саме Public, а не в VPC.
The OpenSearch Managed Cluster you provided is not supported because it is VPC protected. Your cluster must be behind a public network.
Писав в сапорт, сказали, що:
However, there is an ongoing product feature request (PFR) to have Bedrock KnowledgeBases support provisioned Open Search clusters in VPC.
І пропонують використати Amazon OpenSearch Serverless, з якого ми власне і тікаємо, бо ціни дурні.
Друга проблема, яка виявилась, коли я почав писати ресурси bedrockagent_knowledge_base – це те, що він не підтримує storage_configuration з type == OPENSEARCH_MANAGED, тільки Serverless.
Отже, будемо робити OpenSearch Managed Service кластер, кластер буде один, з трьома індексами – Dev/Staging/Prod.
В кластері буде три маленькі дата-ноди, а в кожному індексі – 1 primary shard та 1 репліка, бо проект маленький, даних в нашому Production індексі на AWS OpenSearch Serverless, з якого ми хочемо переїхати на AWS OpenSearch Service – зараз всього 2 GiB, і навряд чи в майбутньому буде дуже багато.
Було б добре кластер зробити у власному Terraform модулі аби простіше створювати якісь тестові оточення, як в мене це зроблено для AWS EKS – але поки не дуже є на це час, тому робимо просто tf-файлами з окремим prod.tfvars для змінних.
Може, потім напишу окремо по переносу у власний модуль, бо це дійсно зручно.
І в наступній частині – поговоримо про моніторинг, бо наш Production вже разок падав 🙂
В data.tf збираємо дані AWS Account ID, Availability Zones, VPC та приватні subnets, в яких будемо створювати кластер в яких колись потім будемо створювати кластер:
data "aws_caller_identity" "current" {}
data "aws_availability_zones" "available" {
state = "available"
}
data "aws_vpc" "eks_vpc" {
id = var.vpc_id
}
data "aws_subnets" "private" {
filter {
name = "vpc-id"
values = [var.vpc_id]
}
tags = {
subnet-type = "private"
}
}
Файл variables.tf з нашими дефолтними змінними, потім будемо додавати нові:
variable "aws_region" {
type = string
}
variable "project_name" {
description = "A project name to be used in resources"
type = string
}
variable "component" {
description = "A team using this project (backend, web, ios, data, devops)"
type = string
}
variable "environment" {
description = "Dev/Prod, will be used in AWS resources Name tag, and resources names"
type = string
}
variable "vpc_id" {
type = string
description = "A VPC ID to be used to create OpenSearch cluster and its Nodes"
}
Значення змінних передаємо через окремий prod.tfvars, потім, при потребі, можна буде створити нове оточення через файл типу envs/test/test.tfvars:
У нас тут ще буде AWS Bedrock, якому треба буде налаштувати доступ – аде це зробимо через його IAM Role, і про Bedrock тут писати не буду – бо і тема окрема, і в Terraform поки що нема підтримки OPENSEARCH_MANAGED, тому ми зробили його руками, а потім виконаємо terraform import.
Індекси, юзерів для нашого Backend API та Bedrock IAM Role mappings будемо робити в internal database самого OpenSearch через Terraform OpenSearch Provider аби не морочитись з доступами до дашборди.
Більша частина locals буде саме тут, але деякі, які зовсім вже “локальні” до якогось коду – будуть у файлах з кодом ресурсів.
Додаємо файл opensearcth_users.tf – поки тут тільки рутовий юзер, пароль зберігаємо в AWS Parameter Store (замість AWS Secrets Manager – “так історично склалося”):
############
### ROOT ###
############
# generate root password
# waiting for write-only: https://github.com/hashicorp/terraform-provider-aws/pull/43621
# then will update it with the ephemeral type
resource "random_password" "os_master_password" {
length = 16
special = true
}
# store the root password in AWS Parameter Store
resource "aws_ssm_parameter" "os_master_password" {
name = "/${var.environment}/${local.env_name}-root-password"
description = "OpenSearch cluster master password"
type = "SecureString"
value = random_password.os_master_password.result
overwrite = true
tier = "Standard"
lifecycle {
ignore_changes = [value] # to prevent diff every time password is regenerated
}
}
data "aws_ssm_parameter" "os_master_password" {
name = "/${var.environment}/${local.env_name}-root-password"
with_decryption = true
depends_on = [aws_ssm_parameter.os_master_password]
}
Пишемо файл opensearch_cluster.tf.
Я тут залишив конфіг для VPC, і на майбутнє, і просто для прикладу, хоча перенести вже створений кластер у VPC не можна буде – доведеться створювати новий, див. Limitations в документації Launching your Amazon OpenSearch Service domains within a VPC:
module "opensearch" {
source = "terraform-aws-modules/opensearch/aws"
version = "~> 2.0.0"
# enable Fine-grained access control
# by using the internal user database, we'll simply access to the Dashboards
# for backend API Kubernetes Pods, will use Kubernetes Secrets with username:password from AWS Parameter Store
advanced_security_options = {
enabled = true
anonymous_auth_enabled = false
internal_user_database_enabled = true
master_user_options = {
master_user_name = "os_root"
master_user_password = data.aws_ssm_parameter.os_master_password.value
}
}
# can't be used with t3 instances
auto_tune_options = {
desired_state = "DISABLED"
}
# have three data nodes - t3.small.search nodes in two AZs
# will use 3 indexes - dev/stage/prod with 1 shard and 1 replica each
cluster_config = {
instance_count = var.cluser_options.instance_count
dedicated_master_enabled = false
instance_type = var.cluser_options.instance_type
# put both data-nodes in different AZs
zone_awareness_config = {
availability_zone_count = 2
}
zone_awareness_enabled = true
}
# the cluster's name
# 'atlas-kb-prod'
domain_name = "${local.env_name}-cluster"
# 50 GiB for each Data Node
ebs_options = {
ebs_enabled = true
volume_type = var.cluser_options.volume_type
volume_size = var.cluser_options.volume_size
}
encrypt_at_rest = {
enabled = true
}
# latest for today:
# https://docs.aws.amazon.com/opensearch-service/latest/developerguide/what-is.html#choosing-version
engine_version = var.cluser_options.engine_version
# enable CloudWatch logs for Index and Search slow logs
# TODO: collect to VictoriaLogs or Loki, and create metrics and alerts
log_publishing_options = [
{ log_type = "INDEX_SLOW_LOGS" },
{ log_type = "SEARCH_SLOW_LOGS" },
]
ip_address_type = "ipv4"
node_to_node_encryption = {
enabled = true
}
# allow minor version updates automatically
# will be performed during off-peak windows
software_update_options = {
auto_software_update_enabled = var.cluser_options.auto_software_update_enabled
}
# DO NOT use 'atlas-vpc-ops' VPC and its private subnets
# > "The OpenSearch Managed Cluster you provided is not supported because it is VPC protected. Your cluster must be behind a public network."
# vpc_options = {
# subnet_ids = data.aws_subnets.private.ids
# }
# # VPC endpoint to access from Kubernetes Pods
# vpc_endpoints = {
# one = {
# subnet_ids = data.aws_subnets.private.ids
# }
# }
# Security Group rules to allow access from the VPC only
# security_group_rules = {
# ingress_443 = {
# type = "ingress"
# description = "HTTPS access from VPC"
# from_port = 443
# to_port = 443
# ip_protocol = "tcp"
# cidr_ipv4 = data.aws_vpc.ops_vpc.cidr_block
# }
# }
# Access policy
# necessary to allow access for AWS user to the Dashboards
access_policy_statements = [
{
effect = "Allow"
principals = [{
type = "*"
identifiers = ["*"]
}]
actions = ["es:*"]
}
]
# 'atlas-kb-ops-os-cluster'
tags = {
Name = "${var.project_name}-${var.environment}-os-cluster"
}
}
В принципі, тут все в коментах описано, але кратко:
три дата-ноди, кожна з 50 гіг дисків, в різних Availability Zones
включаємо логи в CloudWatch
кластер робимо в приватних сабнетах
в Domain Access Policy дозволяємо доступ для всіх
ну – поки так… Security Groups ми використати не можемо, бо не в VPC, а створити IP-Based policy – як? ми ж не знаємо CIDR Bedrock
в принципі, тут в principals.identifiers можна додати ліміт на наших IAM Users + Bedrock AIM Role, бо вона буде одна
Запускаємо створення кластера і йдемо пити чай.
Налаштування Custom endpoint
Після створення кластеру перевіряємо доступ до дашборди, якщо все ОК – то додаємо Custom endpoint.
Note: з Custom endpoint свої приколи: в Terraform OpenSearch Provider треба використовувати саме Custom endpoint URL, але в AWS Bedrock Knowledge Base – дефолтний URL кластеру
Для цього нам треба зробити сертифікат в AWS Certificate Manager і додати новий запис в Route53.
Я тут очікував можливу проблему куриця і яйця, бо налаштування Custom Endpoint залежать від AWS ACM і запису в AWS Route53, а запис в AWS Route53 буде залежати від кластеру – бо використовує його ендпоінт.
Але ні, якщо робити новий кластер з налаштуваннями, які описав нижче – все нормально створюється: спочатку сертифікат в AWS ACM, потім кластер з Custom Endpoint, потім запис в Route53 з CNAME на cluster default URL.
output "vpc_id" {
value = var.vpc_id
}
output "cluster_arn" {
value = module.opensearch.domain_arn
}
output "opensearch_domain_endpoint_cluster" {
value = "https://${module.opensearch.domain_endpoint}"
}
output "opensearch_domain_endpoint_custom" {
value = "https://${local.os_custom_domain_name}"
}
output "opensearch_root_username" {
value = "os_root"
}
output "opensearch_root_user_password_secret_name" {
value = "/${var.environment}/${local.env_name}-root-password"
}
Створення OpenSearch Users
Власне, що нам залишилось – це користувачі і індекси.
Юзерів у нас буде два типи:
звичайні юзери з OpenSearch internal database – для нашого Backend API в Kubernetes (насправді, потім ми все ж перейшли на IAM Roles, які мапляться в поди Backend через EKS Pod Identities)
і юзери (IAM Role) для Bedrock – там буде три Knowledge Bases, кожна зі своєю IAM Role, для якої треба буде додати OpenSearch Role і зробити mapping на IAM-ролі
Почнемо зі звичайних юзерів.
Додаємо провайдера, в мене це у файлі versions.tf:
Якщо кластер всеж створений в VPC – то потрібен підключений VPN, аби провайдер зміг підключитись до кластеру.
До variables.tf додаємо list() зі списком оточень:
...
variable "app_environments" {
type = list(string)
description = "The Application's environments, to be used to created Dev/Staging/Prod DynamoDB tables, etc"
}
Спершу я планував просто використовувати локальних юзерів, і в цей пост записав такий варіант – нехай буде. Далі покажу, як все ж зробили потім – з IAM Users та IAM Roles.
У файлі opensearch_users.tf додаємо в циклах три паролі, трьох юзерів, і три ролі, на які мапимо юзерів – кожна роль з доступом до власного індексу:
...
##############
### KRAKEN ###
##############
resource "random_password" "os_kraken_password" {
for_each = toset(var.app_environments)
length = 16
special = true
}
# store the root password in AWS Parameter Store
resource "aws_ssm_parameter" "os_kraken_password" {
for_each = toset(var.app_environments)
name = "/${var.environment}/${local.env_name}-kraken-${each.key}-password"
description = "OpenSearch cluster Backend Dev password"
type = "SecureString"
value = random_password.os_kraken_password[each.key].result
overwrite = true
tier = "Standard"
lifecycle {
ignore_changes = [value] # to prevent diff every time password is regenerated
}
}
# Create a user
resource "opensearch_user" "os_kraken_user" {
for_each = toset(var.app_environments)
username = "os_kraken_${each.key}"
password = random_password.os_kraken_password[each.key].result
description = "Backend EKS ${each.key} user"
depends_on = [module.opensearch]
}
# And a full user, role and role mapping example:
resource "opensearch_role" "os_kraken_role" {
for_each = toset(var.app_environments)
role_name = "os_kraken_${each.key}_role"
description = "Backend EKS ${each.key} role"
cluster_permissions = [
"indices:data/read/msearch",
"indices:data/write/bulk*",
"indices:data/read/mget*"
]
index_permissions {
index_patterns = ["kraken-kb-index-${each.key}"]
allowed_actions = ["*"]
}
depends_on = [module.opensearch]
}
В cluster_permissions додаємо дозволи, які потрібні і для index level, і для cluster level, бо Bedrock без них не працював, див. Cluster wide index permissions.
Деплоїмо, перевіряємо в Dashboards:
Додавання IAM Users
Тут ідея така сама, просто замість звичайних юзерів з логіном:паролем для аутентифікації використовується IAM та його Users && Roles.
Про роль для Bedrock далі, а зараз додамо мапінг юзерів.
Що нам треба – це взяти список наших Backend team юзерів, дати їм IAM Policy з доступом до OpenSearch, а потім в OpnSearch internal users database додати мапінг на локальну роль.
Note: ChatGPT вперто казав додавати IAM Users в Backend Roles, але ні, і це явно вказано в документації – додавати треба в Users, див. Additional master users.
І всім IAM Users треба додати IAM-політику з доступом.
Створення AWS Bedrock IAM Roles та OpenSearch Role mappings
Bedrock у нас вже є, треба просто створити нові IAM Roles і замапити їх до OpenSeach Roles.
Додаємо файл iam.tf – описуємо IAM Role та IAM Policy (Identity-based Policy для доступу до OpenSearch), тут також в циклі по кожному з var.app_environmetns:
#####################################
### MAIN ROLE FOR KNOWLEDGE BASE ###
#####################################
# grants permissions for AWS Bedrock to interact with other AWS services
resource "aws_iam_role" "knowledge_base_role" {
for_each = toset(var.app_environments)
name = "${var.project_name}-role-${each.key}-managed"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "bedrock.amazonaws.com"
}
Condition = {
StringEquals = {
"aws:SourceAccount" = data.aws_caller_identity.current.account_id
}
ArnLike = {
# restricts the role to be assumed only by Bedrock knowledge base in the specified region
"aws:SourceArn" = "arn:aws:bedrock:${var.aws_region}:${data.aws_caller_identity.current.account_id}:knowledge-base/*"
}
}
}
]
})
}
# IAM policy for Knowledge Base to access OpenSearch Managed
resource "aws_iam_policy" "knowledge_base_opensearch_policy" {
for_each = toset(var.app_environments)
name = "${var.project_name}-kb-opensearch-policy-${each.key}-managed"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"es:*",
]
Resource = [
module.opensearch.domain_arn,
"${module.opensearch.domain_arn}/*"
]
}
]
})
}
resource "aws_iam_role_policy_attachment" "knowledge_base_opensearch" {
for_each = toset(var.app_environments)
role = aws_iam_role.knowledge_base_role[each.key].name
policy_arn = aws_iam_policy.knowledge_base_opensearch_policy[each.key].arn
}
Далі в opensearch_users.tf створимо:
opensearch_role: з cluster_permissions та index_permissions на кожен індекс
locals з усіма IAM Roles, які створили вище
і opensearch_roles_mapping для кожної opensearch_role.os_bedrock_roles, які через backend_roles додаємо до кожної opensearch_role
Власне, саме тут зіткнулись з помилками доступу Bedrock, через які довелось додавати cluster_permissions:
The knowledge base storage configuration provided is invalid… Request failed: [security_exception] no permissions for [indices:data/read/msearch] and User [name=arn:aws:iam::492***148:role/kraken-kb-role-dev, backend_roles=[arn:aws:iam::492***148:role/kraken-kb-role-dev], requestedTenant=null]
Створюємо файл opensearch_indexes.tf. І додаємо сам індекси – тут я все ж вирішив без циклу, прямо створити окремі Dev/Staging/Prod:
# Dev Index
resource "opensearch_index" "kb_vector_index_dev" {
name = "kraken-kb-index-dev"
# enable approximate nearest neighbor search by setting index_knn to true
index_knn = true
index_knn_algo_param_ef_search = "512"
number_of_shards = "1"
number_of_replicas = "1"
mappings = local.os_index_mappings
# When new documents are ingested into the Knowledge Base,
# OpenSearch automatically creates field mappings for new metadata fields under
# AMAZON_BEDROCK_METADATA. Since these fields are created outside of TF resource definitions,
# TF detects them as configuration drift and attempts to recreate the index to match its
# known state.
#
# This lifecycle rule prevents unnecessary index recreation by ignoring mapping changes
# that occur after initial deployment.
lifecycle {
ignore_changes = [mappings]
}
}
...