diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 82e96dae6..6dc4b09cb 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -145,7 +145,7 @@ func main() { pgDur := time.Now().Sub(start).Milliseconds() start = time.Now() - if err := saver.CommitStats(); err != nil { + if err := saver.CommitStats(consumer.HasFirstPartition()); err != nil { log.Printf("Error on stats commit: %v", err) } chDur := time.Now().Sub(start).Milliseconds() diff --git a/backend/internal/db/datasaver/stats.go b/backend/internal/db/datasaver/stats.go index 26efe51b5..17028ca5c 100644 --- a/backend/internal/db/datasaver/stats.go +++ b/backend/internal/db/datasaver/stats.go @@ -22,6 +22,6 @@ func (si *Saver) InsertStats(session *Session, msg Message) error { return nil } -func (si *Saver) CommitStats() error { +func (si *Saver) CommitStats(optimize bool) error { return nil } diff --git a/backend/pkg/queue/types/types.go b/backend/pkg/queue/types/types.go index aaf6f7afa..0f196c608 100644 --- a/backend/pkg/queue/types/types.go +++ b/backend/pkg/queue/types/types.go @@ -9,6 +9,7 @@ type Consumer interface { Commit() error CommitBack(gap int64) error Close() + HasFirstPartition() bool } type Producer interface { diff --git a/backend/pkg/redisstream/consumer.go b/backend/pkg/redisstream/consumer.go index d32972981..bae70120d 100644 --- a/backend/pkg/redisstream/consumer.go +++ b/backend/pkg/redisstream/consumer.go @@ -161,3 +161,7 @@ func (c *Consumer) CommitBack(gap int64) error { func (c *Consumer) Close() { // noop } + +func (c *Consumer) HasFirstPartition() bool { + return false +} diff --git a/ee/backend/internal/db/datasaver/stats.go b/ee/backend/internal/db/datasaver/stats.go index ecf418090..8d9175f04 100644 --- a/ee/backend/internal/db/datasaver/stats.go +++ b/ee/backend/internal/db/datasaver/stats.go @@ -43,7 +43,10 @@ func (si *Saver) InsertStats(session *types.Session, msg messages.Message) error return nil } -func (si *Saver) CommitStats() error { +func (si *Saver) CommitStats(optimize bool) error { + if !optimize { + return si.ch.Commit() + } select { case <-finalizeTicker: if err := si.ch.FinaliseSessionsTable(); err != nil { diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index eb9047831..ca37917f1 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -194,3 +194,16 @@ func (consumer *Consumer) Close() { log.Printf("Kafka consumer close error: %v", err) } } + +func (consumer *Consumer) HasFirstPartition() bool { + assigned, err := consumer.c.Assignment() + if err != nil { + return false + } + for _, p := range assigned { + if p.Partition == 1 { + return true + } + } + return false +}