diff --git a/backend/cmd/assets/main.go b/backend/cmd/assets/main.go index 6f428034c..04dc5e634 100644 --- a/backend/cmd/assets/main.go +++ b/backend/cmd/assets/main.go @@ -68,6 +68,7 @@ func main() { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) + // TODO: wait assets workers here msgConsumer.Close() os.Exit(0) case err := <-cacher.Errors: diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 89fb7ce33..00714c5cb 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -67,7 +67,7 @@ func main() { // Handler logic msgHandler := func(msg messages.Message) { - statsLogger.Collect(msg) // TODO: carefully check message meta and batch meta confusion situation + statsLogger.Collect(msg) // Just save session data into db without additional checks if err := saver.InsertMessage(msg); err != nil { @@ -127,33 +127,36 @@ func main() { signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) commitTick := time.Tick(cfg.CommitBatchTimeout) + + // Send collected batches to db + commitDBUpdates := func() { + start := time.Now() + pg.CommitBatches() + pgDur := time.Now().Sub(start).Milliseconds() + + start = time.Now() + if err := saver.CommitStats(); err != nil { + log.Printf("Error on stats commit: %v", err) + } + chDur := time.Now().Sub(start).Milliseconds() + log.Printf("commit duration(ms), pg: %d, ch: %d", pgDur, chDur) + + if err := consumer.Commit(); err != nil { + log.Printf("Error on consumer commit: %v", err) + } + } for { select { case sig := <-sigchan: - log.Printf("Caught signal %v: terminating\n", sig) + log.Printf("Caught signal %s: terminating\n", sig.String()) + commitDBUpdates() consumer.Close() os.Exit(0) case <-commitTick: - // Send collected batches to db - start := time.Now() - pg.CommitBatches() - pgDur := time.Now().Sub(start).Milliseconds() - - start = time.Now() - if err := saver.CommitStats(); err != nil { - log.Printf("Error on stats commit: %v", err) - } - chDur := time.Now().Sub(start).Milliseconds() - log.Printf("commit duration(ms), pg: %d, ch: %d", pgDur, chDur) - - // TODO: use commit worker to save time each tick - if err := consumer.Commit(); err != nil { - log.Printf("Error on consumer commit: %v", err) - } + commitDBUpdates() default: // Handle new message from queue - err := consumer.ConsumeNext() - if err != nil { + if err := consumer.ConsumeNext(); err != nil { log.Fatalf("Error on consumption: %v", err) } } diff --git a/backend/cmd/ender/main.go b/backend/cmd/ender/main.go index b698bb13b..5952aa7cc 100644 --- a/backend/cmd/ender/main.go +++ b/backend/cmd/ender/main.go @@ -46,7 +46,7 @@ func main() { log.Printf("ZERO TS, sessID: %d, msgType: %d", msg.Meta().SessionID(), msg.TypeID()) } statsLogger.Collect(msg) - sessions.UpdateSession(msg) //TODO: recheck timestamps(sessionID, meta.Timestamp, iter.Message().Meta().Timestamp) + sessions.UpdateSession(msg) } consumer := queue.NewConsumer( diff --git a/backend/cmd/heuristics/main.go b/backend/cmd/heuristics/main.go index 82510fdb7..9e4804089 100644 --- a/backend/cmd/heuristics/main.go +++ b/backend/cmd/heuristics/main.go @@ -49,7 +49,7 @@ func main() { msgHandler := func(msg messages.Message) { statsLogger.Collect(msg) - builderMap.HandleMessage(msg) //(sessionID, msg, iter.Message().Meta().Index) + builderMap.HandleMessage(msg) } consumer := queue.NewConsumer( diff --git a/backend/cmd/integrations/main.go b/backend/cmd/integrations/main.go index 9bfa4d7a2..4f5a30dcf 100644 --- a/backend/cmd/integrations/main.go +++ b/backend/cmd/integrations/main.go @@ -17,7 +17,6 @@ import ( "openreplay/backend/pkg/token" ) -// func main() { metrics := monitoring.New("integrations") diff --git a/backend/cmd/sink/main.go b/backend/cmd/sink/main.go index 0b82c3130..71f455275 100644 --- a/backend/cmd/sink/main.go +++ b/backend/cmd/sink/main.go @@ -115,6 +115,9 @@ func main() { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) + if err := writer.SyncAll(); err != nil { + log.Printf("sync error: %v\n", err) + } if err := consumer.Commit(); err != nil { log.Printf("can't commit messages: %s", err) } @@ -122,7 +125,7 @@ func main() { os.Exit(0) case <-tick: if err := writer.SyncAll(); err != nil { - log.Fatalf("Sync error: %v\n", err) + log.Fatalf("sync error: %v\n", err) } counter.Print() if err := consumer.Commit(); err != nil {