diff --git a/backend/pkg/db/postgres/bulks.go b/backend/pkg/db/postgres/bulks.go index e58299acd..c0901a1a5 100644 --- a/backend/pkg/db/postgres/bulks.go +++ b/backend/pkg/db/postgres/bulks.go @@ -95,17 +95,17 @@ func (conn *BulkSet) initBulks() { } conn.requests, err = NewBulk(conn.c, "events_common.requests", - "(session_id, timestamp, seq_index, url, duration, success)", - "($%d, $%d, $%d, LEFT($%d, 8000), $%d, $%d)", - 6, 200) + "(session_id, timestamp, seq_index, url, duration, success, tab_id)", + "($%d, $%d, $%d, LEFT($%d, 8000), $%d, $%d, $%d)", + 7, 200) if err != nil { log.Fatalf("can't create requests bulk: %s", err) } conn.customEvents, err = NewBulk(conn.c, "events_common.customs", - "(session_id, timestamp, seq_index, name, payload)", - "($%d, $%d, $%d, LEFT($%d, 2000), $%d)", - 5, 200) + "(session_id, timestamp, seq_index, name, payload, tab_id)", + "($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)", + 6, 200) if err != nil { log.Fatalf("can't create customEvents bulk: %s", err) } @@ -113,21 +113,21 @@ func (conn *BulkSet) initBulks() { "events.pages", "(session_id, message_id, timestamp, referrer, base_referrer, host, path, query, dom_content_loaded_time, "+ "load_time, response_end, first_paint_time, first_contentful_paint_time, speed_index, visually_complete, "+ - "time_to_interactive, response_time, dom_building_time)", + "time_to_interactive, response_time, dom_building_time, tab_id)", "($%d, $%d, $%d, LEFT($%d, 8000), LEFT($%d, 8000), LEFT($%d, 300), LEFT($%d, 2000), LEFT($%d, 8000), "+ "NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0),"+ - " NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0))", - 18, 200) + " NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), $%d)", + 19, 200) if err != nil { log.Fatalf("can't create webPageEvents bulk: %s", err) } conn.webInputEvents, err = NewBulk(conn.c, "events.inputs", - "(session_id, message_id, timestamp, label)", - "($%d, $%d, $%d, NULLIF(LEFT($%d, 2000),''))", - 4, 200) + "(session_id, message_id, timestamp, label, tab_id)", + "($%d, $%d, $%d, NULLIF(LEFT($%d, 2000),''), $%d)", + 5, 200) if err != nil { - log.Fatalf("can't create webPageEvents bulk: %s", err) + log.Fatalf("can't create webInputEvents bulk: %s", err) } conn.webInputDurations, err = NewBulk(conn.c, "events.inputs", @@ -135,7 +135,7 @@ func (conn *BulkSet) initBulks() { "($%d, $%d, $%d, NULLIF(LEFT($%d, 2000),''), $%d, $%d)", 6, 200) if err != nil { - log.Fatalf("can't create webPageEvents bulk: %s", err) + log.Fatalf("can't create webInputDurations bulk: %s", err) } conn.webGraphQL, err = NewBulk(conn.c, "events.graphql", @@ -143,7 +143,7 @@ func (conn *BulkSet) initBulks() { "($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)", 6, 200) if err != nil { - log.Fatalf("can't create webPageEvents bulk: %s", err) + log.Fatalf("can't create webGraphQL bulk: %s", err) } conn.webErrors, err = NewBulk(conn.c, "errors", @@ -179,33 +179,35 @@ func (conn *BulkSet) initBulks() { } conn.webIssueEvents, err = NewBulk(conn.c, "events_common.issues", - "(session_id, issue_id, timestamp, seq_index, payload)", - "($%d, $%d, $%d, $%d, CAST($%d AS jsonb))", - 5, 200) + "(session_id, issue_id, timestamp, seq_index, payload, tab_id)", + "($%d, $%d, $%d, $%d, CAST($%d AS jsonb), $%d)", + 6, 200) if err != nil { log.Fatalf("can't create webIssueEvents bulk: %s", err) } conn.webCustomEvents, err = NewBulk(conn.c, "events_common.customs", - "(session_id, seq_index, timestamp, name, payload, level)", - "($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d)", - 6, 200) + "(session_id, seq_index, timestamp, name, payload, level, tab_id)", + "($%d, $%d, $%d, LEFT($%d, 2000), $%d, $%d, $%d)", + 7, 200) if err != nil { log.Fatalf("can't create webCustomEvents bulk: %s", err) } conn.webClickEvents, err = NewBulk(conn.c, "events.clicks", - "(session_id, message_id, timestamp, label, selector, url, path, hesitation)", - "($%d, $%d, $%d, NULLIF(LEFT($%d, 2000), ''), LEFT($%d, 8000), LEFT($%d, 2000), LEFT($%d, 2000), $%d)", - 8, 200) + "(session_id, message_id, timestamp, label, selector, url, path, hesitation, tab_id)", + "($%d, $%d, $%d, NULLIF(LEFT($%d, 2000), ''), LEFT($%d, 8000), LEFT($%d, 2000), LEFT($%d, 2000), $%d, $%d)", + 9, 200) if err != nil { log.Fatalf("can't create webClickEvents bulk: %s", err) } conn.webNetworkRequest, err = NewBulk(conn.c, "events_common.requests", - "(session_id, timestamp, seq_index, url, host, path, query, request_body, response_body, status_code, method, duration, success)", - "($%d, $%d, $%d, LEFT($%d, 8000), LEFT($%d, 300), LEFT($%d, 2000), LEFT($%d, 8000), $%d, $%d, $%d::smallint, NULLIF($%d, '')::http_method, $%d, $%d)", - 13, 200) + "(session_id, timestamp, seq_index, url, host, path, query, request_body, response_body, status_code, "+ + "method, duration, success, tab_id)", + "($%d, $%d, $%d, LEFT($%d, 8000), LEFT($%d, 300), LEFT($%d, 2000), LEFT($%d, 8000), $%d, $%d, "+ + "$%d::smallint, NULLIF($%d, '')::http_method, $%d, $%d, $%d)", + 14, 200) if err != nil { log.Fatalf("can't create webNetworkRequest bulk: %s", err) } diff --git a/backend/pkg/db/postgres/messages-common.go b/backend/pkg/db/postgres/messages-common.go index 1ffdb8ddf..0d368ab47 100644 --- a/backend/pkg/db/postgres/messages-common.go +++ b/backend/pkg/db/postgres/messages-common.go @@ -104,15 +104,15 @@ func (conn *Conn) HandleSessionEnd(sessionID uint64) error { return conn.c.Exec(sqlRequest, sessionID) } -func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint32, url string, duration uint64, success bool) error { - if err := conn.bulks.Get("requests").Append(sessionID, timestamp, index, url, duration, success); err != nil { +func (conn *Conn) InsertRequest(sessionID uint64, timestamp uint64, index uint32, url string, duration uint64, success bool, tabID string) error { + if err := conn.bulks.Get("requests").Append(sessionID, timestamp, index, url, duration, success, tabID); err != nil { return fmt.Errorf("insert request in bulk err: %s", err) } return nil } -func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint32, name string, payload string) error { - if err := conn.bulks.Get("customEvents").Append(sessionID, timestamp, index, name, payload); err != nil { +func (conn *Conn) InsertCustomEvent(sessionID uint64, timestamp uint64, index uint32, name, payload, tabID string) error { + if err := conn.bulks.Get("customEvents").Append(sessionID, timestamp, index, name, payload, tabID); err != nil { return fmt.Errorf("insert custom event in bulk err: %s", err) } return nil @@ -157,12 +157,12 @@ func (conn *Conn) InsertIssueEvent(sessionID uint64, projectID uint32, e *messag if err := conn.bulks.Get("webIssues").Append(projectID, issueID, e.Type, e.ContextString); err != nil { log.Printf("insert web issue err: %s", err) } - if err := conn.bulks.Get("webIssueEvents").Append(sessionID, issueID, e.Timestamp, truncSqIdx(e.MessageID), payload); err != nil { + if err := conn.bulks.Get("webIssueEvents").Append(sessionID, issueID, e.Timestamp, truncSqIdx(e.MessageID), payload, e.TabID()); err != nil { log.Printf("insert web issue event err: %s", err) } conn.updateSessionIssues(sessionID, 0, getIssueScore(e)) if e.Type == "custom" { - if err := conn.bulks.Get("webCustomEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.ContextString, e.Payload, "error"); err != nil { + if err := conn.bulks.Get("webCustomEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.ContextString, e.Payload, "error", e.TabID()); err != nil { log.Printf("insert web custom event err: %s", err) } } diff --git a/backend/pkg/db/postgres/messages-ios.go b/backend/pkg/db/postgres/messages-ios.go index ace1955f5..6fb5c4878 100644 --- a/backend/pkg/db/postgres/messages-ios.go +++ b/backend/pkg/db/postgres/messages-ios.go @@ -8,7 +8,7 @@ import ( func (conn *Conn) InsertIOSCustomEvent(e *messages.IOSCustomEvent) error { sessionID := e.SessionID() - err := conn.InsertCustomEvent(sessionID, e.Timestamp, truncSqIdx(e.Index), e.Name, e.Payload) + err := conn.InsertCustomEvent(sessionID, e.Timestamp, truncSqIdx(e.Index), e.Name, e.Payload, e.TabID()) if err == nil { conn.insertAutocompleteValue(sessionID, 0, "CUSTOM_IOS", e.Name) } @@ -35,7 +35,7 @@ func (conn *Conn) InsertIOSUserAnonymousID(userAnonymousID *messages.IOSUserAnon func (conn *Conn) InsertIOSNetworkCall(e *messages.IOSNetworkCall) error { sessionID := e.SessionID() - err := conn.InsertRequest(sessionID, e.Timestamp, truncSqIdx(e.Index), e.URL, e.Duration, e.Success) + err := conn.InsertRequest(sessionID, e.Timestamp, truncSqIdx(e.Index), e.URL, e.Duration, e.Success, e.TabID()) if err == nil { conn.insertAutocompleteValue(sessionID, 0, "REQUEST_IOS", url.DiscardURLQuery(e.URL)) } diff --git a/backend/pkg/db/postgres/messages-web.go b/backend/pkg/db/postgres/messages-web.go index 3f93ae35d..36d5f7463 100644 --- a/backend/pkg/db/postgres/messages-web.go +++ b/backend/pkg/db/postgres/messages-web.go @@ -15,6 +15,7 @@ func (conn *Conn) InsertWebCustomEvent(sessionID uint64, projectID uint32, e *Cu truncSqIdx(e.Meta().Index), e.Name, e.Payload, + e.TabID(), ) if err == nil { conn.insertAutocompleteValue(sessionID, projectID, "CUSTOM", e.Name) @@ -46,7 +47,7 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *Page // base_path is deprecated if err = conn.bulks.Get("webPageEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Referrer, url.DiscardURLQuery(e.Referrer), host, path, query, e.DomContentLoadedEventEnd, e.LoadEventEnd, e.ResponseEnd, e.FirstPaint, e.FirstContentfulPaint, - e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, calcResponseTime(e), calcDomBuildingTime(e)); err != nil { + e.SpeedIndex, e.VisuallyComplete, e.TimeToInteractive, calcResponseTime(e), calcDomBuildingTime(e), e.TabID()); err != nil { log.Printf("insert web page event in bulk err: %s", err) } if err = conn.InsertReferrer(sessionID, e.Referrer); err != nil { @@ -66,7 +67,7 @@ func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *Mou } var host, path string host, path, _, _ = url.GetURLParts(e.Url) - if err := conn.bulks.Get("webClickEvents").Append(sessionID, truncSqIdx(e.MsgID()), e.Timestamp, e.Label, e.Selector, host+path, path, e.HesitationTime); err != nil { + if err := conn.bulks.Get("webClickEvents").Append(sessionID, truncSqIdx(e.MsgID()), e.Timestamp, e.Label, e.Selector, host+path, path, e.HesitationTime, e.TabID()); err != nil { log.Printf("insert web click err: %s", err) } // Accumulate session updates and exec inside batch with another sql commands @@ -80,7 +81,7 @@ func (conn *Conn) InsertWebInputEvent(sessionID uint64, projectID uint32, e *Inp if e.Label == "" { return nil } - if err := conn.bulks.Get("webInputEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label); err != nil { + if err := conn.bulks.Get("webInputEvents").Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.TabID()); err != nil { log.Printf("insert web input event err: %s", err) } conn.updateSessionEvents(sessionID, 1, 0) @@ -129,7 +130,7 @@ func (conn *Conn) InsertWebNetworkRequest(sessionID uint64, projectID uint32, sa return err } conn.bulks.Get("webNetworkRequest").Append(sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.URL, host, path, query, - request, response, e.Status, url.EnsureMethod(e.Method), e.Duration, e.Status < 400) + request, response, e.Status, url.EnsureMethod(e.Method), e.Duration, e.Status < 400, e.TabID()) return nil } @@ -162,7 +163,7 @@ func (conn *Conn) InsertMouseThrashing(sessionID uint64, projectID uint32, e *Mo if err := conn.bulks.Get("webIssues").Append(projectID, issueID, "mouse_thrashing", e.Url); err != nil { log.Printf("insert web issue err: %s", err) } - if err := conn.bulks.Get("webIssueEvents").Append(sessionID, issueID, e.Timestamp, truncSqIdx(e.MsgID()), nil); err != nil { + if err := conn.bulks.Get("webIssueEvents").Append(sessionID, issueID, e.Timestamp, truncSqIdx(e.MsgID()), nil, e.TabID()); err != nil { log.Printf("insert web issue event err: %s", err) } conn.updateSessionIssues(sessionID, 0, 50) diff --git a/backend/pkg/db/types/error-event.go b/backend/pkg/db/types/error-event.go index 9f2f1a886..5b2dc3130 100644 --- a/backend/pkg/db/types/error-event.go +++ b/backend/pkg/db/types/error-event.go @@ -63,7 +63,7 @@ func WrapJSException(m *JSException) *ErrorEvent { if err != nil { log.Printf("Error on parsing Exception metadata: %v", err) } - return &ErrorEvent{ + wrapper := &ErrorEvent{ MessageID: m.Meta().Index, Timestamp: uint64(m.Meta().Timestamp), Source: SOURCE_JS, @@ -72,6 +72,7 @@ func WrapJSException(m *JSException) *ErrorEvent { Payload: m.Payload, Tags: meta, } + return wrapper } func WrapIntegrationEvent(m *IntegrationEvent) *ErrorEvent { diff --git a/backend/pkg/messages/iterator.go b/backend/pkg/messages/iterator.go index 4a39a7fce..a67277f50 100644 --- a/backend/pkg/messages/iterator.go +++ b/backend/pkg/messages/iterator.go @@ -43,6 +43,7 @@ func NewMessageIterator(messageHandler MessageHandler, messageFilter []int, auto iter.preFilter = map[int]struct{}{ MsgBatchMetadata: {}, MsgBatchMeta: {}, MsgTimestamp: {}, MsgSessionStart: {}, MsgSessionEnd: {}, MsgSetPageLocation: {}, + MsgTabData: {}, } return iter } @@ -184,6 +185,10 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error { i.messageInfo.Url = m.URL // Save session page url in cache for using in next batches i.urls.Set(i.messageInfo.batch.sessionID, m.URL) + + case *TabData: + // Save tabID for using in all messages in current batch + i.batchInfo.tabID = m.TabId } return nil } diff --git a/backend/pkg/messages/message.go b/backend/pkg/messages/message.go index 6ae02e6c5..3318a696a 100644 --- a/backend/pkg/messages/message.go +++ b/backend/pkg/messages/message.go @@ -20,6 +20,7 @@ type BatchInfo struct { partition uint64 timestamp int64 version uint64 + tabID string } func NewBatchInfo(sessID uint64, topic string, id, partition uint64, ts int64) *BatchInfo { @@ -82,6 +83,13 @@ func (m *message) Time() uint64 { return m.Meta().Timestamp } +func (m *message) TabID() string { + if m.batch == nil { + return "" + } + return m.batch.tabID +} + func (m *message) SetSessionID(sessID uint64) { if m.batch == nil { m.batch = &BatchInfo{}