From f1d852abb4392901db8159768dfb319ff83ed6cc Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 26 Dec 2022 15:59:32 +0100 Subject: [PATCH] [DB] moved click events to bulks (#892) * feat(backend): moved click events to bulks * feat(backend): insert click event with host + path --- backend/pkg/db/postgres/connector.go | 72 ++++++++++++++++-------- backend/pkg/db/postgres/messages-web.go | 41 +++----------- backend/pkg/handlers/web/networkIssue.go | 14 ----- backend/pkg/messages/cache.go | 22 ++++++++ backend/pkg/messages/iterator.go | 17 +++++- 5 files changed, 93 insertions(+), 73 deletions(-) create mode 100644 backend/pkg/messages/cache.go diff --git a/backend/pkg/db/postgres/connector.go b/backend/pkg/db/postgres/connector.go index 07314ae68..51773c1e2 100644 --- a/backend/pkg/db/postgres/connector.go +++ b/backend/pkg/db/postgres/connector.go @@ -41,6 +41,8 @@ type Conn struct { webIssues Bulk webIssueEvents Bulk webCustomEvents Bulk + webClickEvents Bulk + webNetworkRequest Bulk sessionUpdates map[uint64]*sessionUpdates batchQueueLimit int batchSizeLimit int @@ -111,25 +113,25 @@ func (conn *Conn) initBulks() { "autocomplete", "(value, type, project_id)", "($%d, $%d, $%d)", - 3, 100) + 3, 200) if err != nil { - log.Fatalf("can't create autocomplete bulk") + log.Fatalf("can't create autocomplete bulk: %s", err) } conn.requests, err = NewBulk(conn.c, "events_common.requests", "(session_id, timestamp, seq_index, url, duration, success)", "($%d, $%d, $%d, left($%d, 2700), $%d, $%d)", - 6, 100) + 6, 200) if err != nil { - log.Fatalf("can't create requests bulk") + log.Fatalf("can't create requests bulk: %s", err) } conn.customEvents, err = NewBulk(conn.c, "events_common.customs", "(session_id, timestamp, seq_index, name, payload)", "($%d, $%d, $%d, left($%d, 2700), $%d)", - 5, 100) + 5, 200) if err != nil { - log.Fatalf("can't create customEvents bulk") + log.Fatalf("can't create customEvents bulk: %s", err) } conn.webPageEvents, err = NewBulk(conn.c, "events.pages", @@ -138,73 +140,89 @@ func (conn *Conn) initBulks() { "time_to_interactive, response_time, dom_building_time)", "($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0),"+ " NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0), NULLIF($%d, 0))", - 18, 100) + 18, 200) if err != nil { - log.Fatalf("can't create webPageEvents bulk") + log.Fatalf("can't create webPageEvents bulk: %s", err) } conn.webInputEvents, err = NewBulk(conn.c, "events.inputs", "(session_id, message_id, timestamp, value, label)", "($%d, $%d, $%d, $%d, NULLIF($%d,''))", - 5, 100) + 5, 200) if err != nil { - log.Fatalf("can't create webPageEvents bulk") + log.Fatalf("can't create webPageEvents bulk: %s", err) } conn.webGraphQL, err = NewBulk(conn.c, "events.graphql", "(session_id, timestamp, message_id, name, request_body, response_body)", "($%d, $%d, $%d, left($%d, 2700), $%d, $%d)", - 6, 100) + 6, 200) if err != nil { - log.Fatalf("can't create webPageEvents bulk") + log.Fatalf("can't create webPageEvents bulk: %s", err) } conn.webErrors, err = NewBulk(conn.c, "errors", "(error_id, project_id, source, name, message, payload)", "($%d, $%d, $%d, $%d, $%d, $%d::jsonb)", - 6, 100) + 6, 200) if err != nil { - log.Fatalf("can't create webErrors bulk") + log.Fatalf("can't create webErrors bulk: %s", err) } conn.webErrorEvents, err = NewBulk(conn.c, "events.errors", "(session_id, message_id, timestamp, error_id)", "($%d, $%d, $%d, $%d)", - 4, 100) + 4, 200) if err != nil { - log.Fatalf("can't create webErrorEvents bulk") + log.Fatalf("can't create webErrorEvents bulk: %s", err) } conn.webErrorTags, err = NewBulk(conn.c, "public.errors_tags", "(session_id, message_id, error_id, key, value)", "($%d, $%d, $%d, $%d, $%d)", - 5, 100) + 5, 200) if err != nil { - log.Fatalf("can't create webErrorEvents bulk") + log.Fatalf("can't create webErrorEvents bulk: %s", err) } conn.webIssues, err = NewBulk(conn.c, "issues", "(project_id, issue_id, type, context_string)", "($%d, $%d, $%d, $%d)", - 4, 100) + 4, 200) if err != nil { - log.Fatalf("can't create webIssues bulk") + log.Fatalf("can't create webIssues bulk: %s", err) } conn.webIssueEvents, err = NewBulk(conn.c, "events_common.issues", "(session_id, issue_id, timestamp, seq_index, payload)", "($%d, $%d, $%d, $%d, CAST($%d AS jsonb))", - 5, 100) + 5, 200) if err != nil { - log.Fatalf("can't create webIssueEvents bulk") + log.Fatalf("can't create webIssueEvents bulk: %s", err) } conn.webCustomEvents, err = NewBulk(conn.c, "events_common.customs", "(session_id, seq_index, timestamp, name, payload, level)", "($%d, $%d, $%d, left($%d, 2700), $%d, $%d)", - 6, 100) + 6, 200) if err != nil { - log.Fatalf("can't create webCustomEvents bulk") + log.Fatalf("can't create webCustomEvents bulk: %s", err) + } + conn.webClickEvents, err = NewBulk(conn.c, + "events.clicks", + "(session_id, message_id, timestamp, label, selector, url, path)", + "($%d, $%d, $%d, NULLIF($%d, ''), $%d, $%d, $%d)", + 7, 200) + if err != nil { + log.Fatalf("can't create webClickEvents bulk: %s", err) + } + conn.webNetworkRequest, err = NewBulk(conn.c, + "events_common.requests", + "(session_id, timestamp, seq_index, url, host, path, query, request_body, response_body, status_code, method, duration, success)", + "($%d, $%d, $%d, left($%d, 2700), $%d, $%d, $%d, $%d, $%d, $%d::smallint, NULLIF($%d, '')::http_method, $%d, $%d)", + 13, 200) + if err != nil { + log.Fatalf("can't create webNetworkRequest bulk: %s", err) } } @@ -296,6 +314,12 @@ func (conn *Conn) sendBulks() { if err := conn.webCustomEvents.Send(); err != nil { log.Printf("webCustomEvents bulk send err: %s", err) } + if err := conn.webClickEvents.Send(); err != nil { + log.Printf("webClickEvents bulk send err: %s", err) + } + if err := conn.webNetworkRequest.Send(); err != nil { + log.Printf("webNetworkRequest bulk send err: %s", err) + } } func (conn *Conn) CommitBatches() { diff --git a/backend/pkg/db/postgres/messages-web.go b/backend/pkg/db/postgres/messages-web.go index ad0210694..63669ecb7 100644 --- a/backend/pkg/db/postgres/messages-web.go +++ b/backend/pkg/db/postgres/messages-web.go @@ -58,16 +58,12 @@ func (conn *Conn) InsertWebPageEvent(sessionID uint64, projectID uint32, e *Page } func (conn *Conn) InsertWebClickEvent(sessionID uint64, projectID uint32, e *ClickEvent) error { - sqlRequest := ` - INSERT INTO events.clicks - (session_id, message_id, timestamp, label, selector, url) - (SELECT - $1, $2, $3, NULLIF($4, ''), $5, host || path - FROM events.pages - WHERE session_id = $1 AND timestamp <= $3 ORDER BY timestamp DESC LIMIT 1 - ) - ` - conn.batchQueue(sessionID, sqlRequest, sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.Selector) + var host, path string + host, path, _, _ = url.GetURLParts(e.Url) + log.Println("insert web click:", host, path) + if err := conn.webClickEvents.Append(sessionID, truncSqIdx(e.MessageID), e.Timestamp, e.Label, e.Selector, host+path, path); err != nil { + log.Printf("insert web click err: %s", err) + } // Accumulate session updates and exec inside batch with another sql commands conn.updateSessionEvents(sessionID, 1, 0) // Add new value set to autocomplete bulk @@ -119,29 +115,8 @@ func (conn *Conn) InsertWebNetworkRequest(sessionID uint64, projectID uint32, sa if err != nil { return err } - - sqlRequest := ` - INSERT INTO events_common.requests ( - session_id, timestamp, seq_index, - url, host, path, query, - request_body, response_body, status_code, method, - duration, success - ) VALUES ( - $1, $2, $3, - left($4, 2700), $5, $6, $7, - $8, $9, $10::smallint, NULLIF($11, '')::http_method, - $12, $13 - ) ON CONFLICT DO NOTHING` - conn.batchQueue(sessionID, sqlRequest, - sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), - e.URL, host, path, query, - request, response, e.Status, url.EnsureMethod(e.Method), - e.Duration, e.Status < 400, - ) - - // Record approximate message size - conn.updateBatchSize(sessionID, len(sqlRequest)+len(e.URL)+len(host)+len(path)+len(query)+ - len(e.Request)+len(e.Response)+len(url.EnsureMethod(e.Method))+8*5+1) + conn.webNetworkRequest.Append(sessionID, e.Meta().Timestamp, truncSqIdx(e.Meta().Index), e.URL, host, path, query, + request, response, e.Status, url.EnsureMethod(e.Method), e.Duration, e.Status < 400) return nil } diff --git a/backend/pkg/handlers/web/networkIssue.go b/backend/pkg/handlers/web/networkIssue.go index 5e7119c20..20ef412dd 100644 --- a/backend/pkg/handlers/web/networkIssue.go +++ b/backend/pkg/handlers/web/networkIssue.go @@ -19,20 +19,6 @@ func (f *NetworkIssueDetector) Build() Message { func (f *NetworkIssueDetector) Handle(message Message, messageID uint64, timestamp uint64) Message { switch msg := message.(type) { - // case *ResourceTiming: - // success := msg.Duration != 0 // The only available way here - // if !success { - // issueType := "missing_resource" - // if msg.Initiator == "fetch" || msg.Initiator == "xmlhttprequest" { - // issueType = "bad_request" - // } - // return &IssueEvent{ - // Type: issueType, - // MessageID: messageID, - // Timestamp: msg.Timestamp, - // ContextString: msg.URL, - // } - // } case *NetworkRequest: if msg.Status >= 400 { return &IssueEvent{ diff --git a/backend/pkg/messages/cache.go b/backend/pkg/messages/cache.go new file mode 100644 index 000000000..b3c83a0e3 --- /dev/null +++ b/backend/pkg/messages/cache.go @@ -0,0 +1,22 @@ +package messages + +type pageLocations struct { + urls map[uint64]string +} + +func NewPageLocations() *pageLocations { + return &pageLocations{urls: make(map[uint64]string)} +} + +func (p *pageLocations) Set(sessID uint64, url string) { + p.urls[sessID] = url +} + +func (p *pageLocations) Get(sessID uint64) string { + url := p.urls[sessID] + return url +} + +func (p *pageLocations) Delete(sessID uint64) { + delete(p.urls, sessID) +} diff --git a/backend/pkg/messages/iterator.go b/backend/pkg/messages/iterator.go index 86e3e6cc8..69e1f02bc 100644 --- a/backend/pkg/messages/iterator.go +++ b/backend/pkg/messages/iterator.go @@ -24,10 +24,15 @@ type messageIteratorImpl struct { broken bool messageInfo *message batchInfo *BatchInfo + urls *pageLocations } func NewMessageIterator(messageHandler MessageHandler, messageFilter []int, autoDecode bool) MessageIterator { - iter := &messageIteratorImpl{handler: messageHandler, autoDecode: autoDecode} + iter := &messageIteratorImpl{ + handler: messageHandler, + autoDecode: autoDecode, + urls: NewPageLocations(), + } if len(messageFilter) != 0 { filter := make(map[int]struct{}, len(messageFilter)) for _, msgType := range messageFilter { @@ -125,7 +130,7 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error { if m.Timestamp == 0 { i.zeroTsLog("BatchMetadata") } - i.messageInfo.Url = m.Url + i.messageInfo.Url = m.Location i.version = m.Version i.batchInfo.version = m.Version @@ -138,6 +143,10 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error { if m.Timestamp == 0 { i.zeroTsLog("BatchMeta") } + // Try to get saved session's page url + if savedURL := i.urls.Get(i.messageInfo.batch.sessionID); savedURL != "" { + i.messageInfo.Url = savedURL + } case *Timestamp: i.messageInfo.Timestamp = int64(m.Timestamp) @@ -158,9 +167,13 @@ func (i *messageIteratorImpl) preprocessing(msg Message) error { if m.Timestamp == 0 { i.zeroTsLog("SessionEnd") } + // Delete session from urls cache layer + i.urls.Delete(i.messageInfo.batch.sessionID) case *SetPageLocation: i.messageInfo.Url = m.URL + // Save session page url in cache for using in next batches + i.urls.Set(i.messageInfo.batch.sessionID, m.URL) } return nil }