diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 74e6149bc..55a6d87fd 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -5,7 +5,6 @@ import ( "log" types2 "openreplay/backend/pkg/db/types" "openreplay/backend/pkg/pprof" - "openreplay/backend/pkg/queue/types" "os" "os/signal" "syscall" @@ -49,14 +48,8 @@ func main() { // Create handler's aggregator builderMap := sessions.NewBuilderMap(handlersFabric) - var producer types.Producer = nil - if cfg.UseQuickwit { - producer = queue.NewProducer(cfg.MessageSizeLimit, true) - defer producer.Close(15000) - } - // Init modules - saver := datasaver.New(pg, producer) + saver := datasaver.New(pg, cfg) saver.InitStats() msgFilter := []int{messages.MsgMetadata, messages.MsgIssueEvent, messages.MsgSessionStart, messages.MsgSessionEnd, diff --git a/backend/internal/config/db/config.go b/backend/internal/config/db/config.go index 6ec25ab01..a58cd97e0 100644 --- a/backend/internal/config/db/config.go +++ b/backend/internal/config/db/config.go @@ -18,6 +18,7 @@ type Config struct { BatchQueueLimit int `env:"DB_BATCH_QUEUE_LIMIT,required"` BatchSizeLimit int `env:"DB_BATCH_SIZE_LIMIT,required"` UseQuickwit bool `env:"QUICKWIT_ENABLED,default=false"` + QuickwitTopic string `env:"QUICKWIT_TOPIC,default=saas-quickwit"` UseProfiler bool `env:"PROFILER_ENABLED,default=false"` } diff --git a/backend/internal/db/datasaver/messages.go b/backend/internal/db/datasaver/messages.go index 834db33ed..12e7152b4 100644 --- a/backend/internal/db/datasaver/messages.go +++ b/backend/internal/db/datasaver/messages.go @@ -36,13 +36,10 @@ func (mi *Saver) InsertMessage(msg Message) error { // Unique Web messages case *PageEvent: - mi.sendToFTS(msg, sessionID) return mi.pg.InsertWebPageEvent(sessionID, m) case *NetworkRequest: - mi.sendToFTS(msg, sessionID) return mi.pg.InsertWebNetworkRequest(sessionID, m) case *GraphQL: - mi.sendToFTS(msg, sessionID) return mi.pg.InsertWebGraphQL(sessionID, m) case *JSException: return mi.pg.InsertWebJSException(m) diff --git a/backend/internal/db/datasaver/saver.go b/backend/internal/db/datasaver/saver.go index d41756a4d..2a356d120 100644 --- a/backend/internal/db/datasaver/saver.go +++ b/backend/internal/db/datasaver/saver.go @@ -1,6 +1,7 @@ package datasaver import ( + "openreplay/backend/internal/config/db" "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/queue/types" ) @@ -10,6 +11,6 @@ type Saver struct { producer types.Producer } -func New(pg *cache.PGCache, producer types.Producer) *Saver { - return &Saver{pg: pg, producer: producer} +func New(pg *cache.PGCache, _ *db.Config) *Saver { + return &Saver{pg: pg, producer: nil} } diff --git a/backend/pkg/db/postgres/messages-web.go b/backend/pkg/db/postgres/messages-web.go index 63669ecb7..541331c96 100644 --- a/backend/pkg/db/postgres/messages-web.go +++ b/backend/pkg/db/postgres/messages-web.go @@ -60,7 +60,6 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *Page func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *ClickEvent) error { var host, path string host, path, _, _ = url.GetURLParts(e.Url) - log.Println("insert web click:", host, path) if err := conn.webClickEvents.Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.Selector, host+path, path); err != nil { log.Printf("insert web click err: %s", err) } diff --git a/backend/internal/db/datasaver/fts.go b/ee/backend/internal/db/datasaver/fts.go similarity index 84% rename from backend/internal/db/datasaver/fts.go rename to ee/backend/internal/db/datasaver/fts.go index 6b6bdbbae..1ff546d27 100644 --- a/backend/internal/db/datasaver/fts.go +++ b/ee/backend/internal/db/datasaver/fts.go @@ -7,6 +7,8 @@ import ( ) type NetworkRequestFTS struct { + SessionID uint64 `json:"session_id"` + ProjectID uint32 `json:"project_id"` Method string `json:"method"` URL string `json:"url"` Request string `json:"request"` @@ -17,6 +19,8 @@ type NetworkRequestFTS struct { } type PageEventFTS struct { + SessionID uint64 `json:"session_id"` + ProjectID uint32 `json:"project_id"` MessageID uint64 `json:"message_id"` Timestamp uint64 `json:"timestamp"` URL string `json:"url"` @@ -37,13 +41,15 @@ type PageEventFTS struct { } type GraphQLFTS struct { + SessionID uint64 `json:"session_id"` + ProjectID uint32 `json:"project_id"` OperationKind string `json:"operation_kind"` OperationName string `json:"operation_name"` Variables string `json:"variables"` Response string `json:"response"` } -func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) { +func (s *Saver) SendToFTS(msg messages.Message, projID uint32) { // Skip, if FTS is disabled if s.producer == nil { return @@ -58,6 +64,8 @@ func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) { // Common case *messages.NetworkRequest: event, err = json.Marshal(NetworkRequestFTS{ + SessionID: msg.SessionID(), + ProjectID: projID, Method: m.Method, URL: m.URL, Request: m.Request, @@ -68,6 +76,8 @@ func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) { }) case *messages.PageEvent: event, err = json.Marshal(PageEventFTS{ + SessionID: msg.SessionID(), + ProjectID: projID, MessageID: m.MessageID, Timestamp: m.Timestamp, URL: m.URL, @@ -88,6 +98,8 @@ func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) { }) case *messages.GraphQL: event, err = json.Marshal(GraphQLFTS{ + SessionID: msg.SessionID(), + ProjectID: projID, OperationKind: m.OperationKind, OperationName: m.OperationName, Variables: m.Variables, @@ -98,7 +110,7 @@ func (s *Saver) sendToFTS(msg messages.Message, sessionID uint64) { log.Printf("can't marshal json for quickwit: %s", err) } else { if len(event) > 0 { - if err := s.producer.Produce("quickwit", sessionID, event); err != nil { + if err := s.producer.Produce(s.topic, msg.SessionID(), event); err != nil { log.Printf("can't send event to quickwit: %s", err) } } diff --git a/ee/backend/internal/db/datasaver/saver.go b/ee/backend/internal/db/datasaver/saver.go index 7b48fbaa0..76057309d 100644 --- a/ee/backend/internal/db/datasaver/saver.go +++ b/ee/backend/internal/db/datasaver/saver.go @@ -1,8 +1,10 @@ package datasaver import ( + "openreplay/backend/internal/config/db" "openreplay/backend/pkg/db/cache" "openreplay/backend/pkg/db/clickhouse" + "openreplay/backend/pkg/queue" "openreplay/backend/pkg/queue/types" ) @@ -10,8 +12,14 @@ type Saver struct { pg *cache.PGCache ch clickhouse.Connector producer types.Producer + topic string } -func New(pg *cache.PGCache, producer types.Producer) *Saver { - return &Saver{pg: pg, producer: producer} +func New(pg *cache.PGCache, cfg *db.Config) *Saver { + var producer types.Producer = nil + if cfg.UseQuickwit { + producer = queue.NewProducer(cfg.MessageSizeLimit, true) + defer producer.Close(15000) + } + return &Saver{pg: pg, producer: producer, topic: cfg.QuickwitTopic} } diff --git a/ee/backend/internal/db/datasaver/stats.go b/ee/backend/internal/db/datasaver/stats.go index c18918c63..69ecf3a7d 100644 --- a/ee/backend/internal/db/datasaver/stats.go +++ b/ee/backend/internal/db/datasaver/stats.go @@ -17,6 +17,13 @@ func (si *Saver) InitStats() { } func (si *Saver) InsertStats(session *types.Session, msg messages.Message) error { + // Send data to quickwit + if sess, err := si.pg.Cache.GetSession(msg.SessionID()); err != nil { + si.SendToFTS(msg, 0) + } else { + si.SendToFTS(msg, sess.ProjectID) + } + switch m := msg.(type) { // Web case *messages.SessionEnd: