diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 82ece145f..3da8a71ae 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -2,6 +2,7 @@ package main import ( "log" + "openreplay/backend/pkg/memory" config "openreplay/backend/internal/config/db" "openreplay/backend/internal/db" @@ -53,8 +54,15 @@ func main() { cfg.MessageSizeLimit, ) + // Init memory manager + memoryManager, err := memory.NewManager(cfg.MemoryLimitMB, cfg.MaxMemoryUsage) + if err != nil { + log.Printf("can't init memory manager: %s", err) + return + } + // Run service and wait for TERM signal - service := db.New(cfg, consumer, saver) + service := db.New(cfg, consumer, saver, memoryManager) log.Printf("Db service started\n") terminator.Wait(service) } diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index 84d816a33..0bd175020 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -2,6 +2,7 @@ package main import ( "log" + "openreplay/backend/pkg/memory" "os" "os/signal" "strings" @@ -51,6 +52,12 @@ func main() { cfg.MessageSizeLimit, ) + memoryManager, err := memory.NewManager(cfg.MemoryLimitMB, cfg.MaxMemoryUsage) + if err != nil { + log.Printf("can't init memory manager: %s", err) + return + } + log.Printf("Ender service started\n") sigchan := make(chan os.Signal, 1) @@ -121,6 +128,9 @@ func main() { case msg := <-consumer.Rebalanced(): log.Println(msg) default: + if !memoryManager.HasFreeMemory() { + continue + } if err := consumer.ConsumeNext(); err != nil { log.Fatalf("Error on consuming: %v", err) } diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go index 3a7abb7a0..2b5f7ca8c 100644 --- a/backend/cmd/heuristics/main.go +++ b/backend/cmd/heuristics/main.go @@ -7,6 +7,7 @@ import ( "openreplay/backend/pkg/handlers" "openreplay/backend/pkg/handlers/custom" "openreplay/backend/pkg/handlers/web" + "openreplay/backend/pkg/memory" "openreplay/backend/pkg/messages" "openreplay/backend/pkg/metrics" heuristicsMetrics "openreplay/backend/pkg/metrics/heuristics" @@ -48,8 +49,15 @@ func main() { cfg.MessageSizeLimit, ) + // Init memory manager + memoryManager, err := memory.NewManager(cfg.MemoryLimitMB, cfg.MaxMemoryUsage) + if err != nil { + log.Printf("can't init memory manager: %s", err) + return + } + // Run service and wait for TERM signal - service := heuristics.New(cfg, producer, consumer, eventBuilder) + service := heuristics.New(cfg, producer, consumer, eventBuilder, memoryManager) log.Printf("Heuristics service started\n") terminator.Wait(service) } diff --git a/backend/internal/config/common/config.go b/backend/internal/config/common/config.go index c3246c10c..97d4f6c0f 100644 --- a/backend/internal/config/common/config.go +++ b/backend/internal/config/common/config.go @@ -5,6 +5,8 @@ import "strings" type Config struct { ConfigFilePath string `env:"CONFIG_FILE_PATH"` MessageSizeLimit int `env:"QUEUE_MESSAGE_SIZE_LIMIT,default=1048576"` + MaxMemoryUsage uint64 `env:"MAX_MEMORY_USAGE,default=80"` + MemoryLimitMB uint64 `env:"MEMORY_LIMIT_MB,default=0"` // 0 means take limit from OS (cgroup) } type Configer interface { diff --git a/backend/internal/db/service.go b/backend/internal/db/service.go index 69b5cb1cb..f6f982739 100644 --- a/backend/internal/db/service.go +++ b/backend/internal/db/service.go @@ -2,6 +2,7 @@ package db import ( "log" + "openreplay/backend/pkg/memory" "time" "openreplay/backend/internal/config/db" @@ -14,13 +15,15 @@ type dbImpl struct { cfg *db.Config consumer types.Consumer saver datasaver.Saver + mm memory.Manager } -func New(cfg *db.Config, consumer types.Consumer, saver datasaver.Saver) service.Interface { +func New(cfg *db.Config, consumer types.Consumer, saver datasaver.Saver, mm memory.Manager) service.Interface { s := &dbImpl{ cfg: cfg, consumer: consumer, saver: saver, + mm: mm, } go s.run() return s @@ -35,6 +38,9 @@ func (d *dbImpl) run() { case msg := <-d.consumer.Rebalanced(): log.Println(msg) default: + if !d.mm.HasFreeMemory() { + continue + } if err := d.consumer.ConsumeNext(); err != nil { log.Fatalf("Error on consumption: %v", err) } diff --git a/backend/internal/heuristics/service.go b/backend/internal/heuristics/service.go index 44b4034e2..9d2f8313b 100644 --- a/backend/internal/heuristics/service.go +++ b/backend/internal/heuristics/service.go @@ -3,6 +3,7 @@ package heuristics import ( "fmt" "log" + "openreplay/backend/pkg/memory" "openreplay/backend/pkg/messages" metrics "openreplay/backend/pkg/metrics/heuristics" "time" @@ -18,14 +19,16 @@ type heuristicsImpl struct { producer types.Producer consumer types.Consumer events sessions.EventBuilder + mm memory.Manager } -func New(cfg *heuristics.Config, p types.Producer, c types.Consumer, e sessions.EventBuilder) service.Interface { +func New(cfg *heuristics.Config, p types.Producer, c types.Consumer, e sessions.EventBuilder, mm memory.Manager) service.Interface { s := &heuristicsImpl{ cfg: cfg, producer: p, consumer: c, events: e, + mm: mm, } go s.run() return s @@ -47,6 +50,9 @@ func (h *heuristicsImpl) run() { case msg := <-h.consumer.Rebalanced(): log.Println(msg) default: + if !h.mm.HasFreeMemory() { + continue + } if err := h.consumer.ConsumeNext(); err != nil { log.Fatalf("Error on consuming: %v", err) } diff --git a/backend/pkg/memory/limit.go b/backend/pkg/memory/limit.go new file mode 100644 index 000000000..53e227ce0 --- /dev/null +++ b/backend/pkg/memory/limit.go @@ -0,0 +1,44 @@ +package memory + +import ( + "errors" + "fmt" + "log" + "os" + "strconv" + "strings" +) + +const ( + Limit = "hierarchical_memory_limit" + Maximum = 9223372036854771712 +) + +func parseMemoryLimit() (int, error) { + data, err := os.ReadFile("/sys/fs/cgroup/memory/memory.stat") + if err != nil { + return 0, err + } + lines := strings.Split(string(data), "\n") + for _, line := range lines { + if strings.Contains(line, Limit) { + lineParts := strings.Split(line, " ") + if len(lineParts) != 2 { + log.Println("can't parse memory limit") + return 0, fmt.Errorf("can't split string with memory limit, str: %s", line) + } + value, err := strconv.Atoi(lineParts[1]) + if err != nil { + return 0, fmt.Errorf("can't convert memory limit to int, str: %s", lineParts[1]) + } + if value == Maximum { + return 0, errors.New("memory limit is not defined") + } + // DEBUG_LOG + value /= 1024 * 1024 + log.Printf("memory limit is defined: %d MiB", value) + return value, nil + } + } + return 0, errors.New("memory limit is not defined") +} diff --git a/backend/pkg/memory/manager.go b/backend/pkg/memory/manager.go new file mode 100644 index 000000000..88336e979 --- /dev/null +++ b/backend/pkg/memory/manager.go @@ -0,0 +1,103 @@ +package memory + +import ( + "errors" + "log" + "runtime" + "sync" + "time" +) + +type Manager interface { + HasFreeMemory() bool +} + +type managerImpl struct { + mutex *sync.RWMutex + current uint64 + maximum uint64 + threshold uint64 + allocated uint64 + total uint64 +} + +func NewManager(maximumMemory, thresholdValue uint64) (Manager, error) { + if maximumMemory < 1 { + log.Println("maximumMemory is not defined, try to parse memory limit from system") + memLimit, err := parseMemoryLimit() + if err != nil { + log.Println("can't parse system memory limit, err: ", err) + } + if memLimit > 0 { + maximumMemory = uint64(memLimit) + } + } + if thresholdValue > 100 { + return nil, errors.New("threshold must be less than 100") + } + m := &managerImpl{ + mutex: &sync.RWMutex{}, + threshold: thresholdValue, + maximum: maximumMemory, + current: 0, + } + go m.worker() + return m, nil +} + +func (m *managerImpl) currentUsage() uint64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.current +} + +func (m *managerImpl) calcMemoryUsage() { + var rtm runtime.MemStats + runtime.ReadMemStats(&rtm) + allocated := rtm.Alloc / 1024 / 1024 + total := rtm.Sys / 1024 / 1024 + if allocated > m.maximum && m.HasFreeMemory() { + log.Println("memory consumption is greater than maximum memory, current: ", allocated, "maximum: ", m.maximum) + } + current := uint64(float64(allocated*100) / float64(m.maximum)) + m.mutex.Lock() + m.current = current + m.allocated = allocated + m.total = total + m.mutex.Unlock() +} + +func (m *managerImpl) printStat() { + log.Printf("current memory consumption: %d, allocated: %d, maximum: %d, current usage: %d, threshold: %d", + m.total, m.allocated, m.maximum, m.current, m.threshold) +} + +func (m *managerImpl) worker() { + // If maximum memory is not defined, then we don't need to check memory usage + if m.maximum == 0 { + m.current = m.threshold + return + } + + // First memory usage calculation + m.calcMemoryUsage() + m.printStat() + + // Logs and update ticks + updateTick := time.Tick(time.Second) + logsTick := time.Tick(time.Minute) + + // Start worker + for { + select { + case <-updateTick: + m.calcMemoryUsage() + case <-logsTick: + m.printStat() + } + } +} + +func (m *managerImpl) HasFreeMemory() bool { + return m.currentUsage() <= m.threshold +}