diff --git a/backend/cmd/integrations/main.go b/backend/cmd/integrations/main.go index f24dd4433..c3f90719d 100644 --- a/backend/cmd/integrations/main.go +++ b/backend/cmd/integrations/main.go @@ -1,17 +1,16 @@ package main import ( + "context" "log" - "openreplay/backend/pkg/db/postgres/pool" - "openreplay/backend/pkg/integrations" "os" "os/signal" "syscall" - "time" + + "github.com/jackc/pgx/v4" config "openreplay/backend/internal/config/integrations" - "openreplay/backend/internal/integrations/clientManager" - "openreplay/backend/pkg/intervals" + "openreplay/backend/pkg/integrations" "openreplay/backend/pkg/metrics" databaseMetrics "openreplay/backend/pkg/metrics/database" "openreplay/backend/pkg/queue" @@ -26,89 +25,38 @@ func main() { cfg := config.New() - // Init postgres connection - pgConn, err := pool.New(cfg.Postgres.String()) + pgConn, err := pgx.Connect(context.Background(), cfg.Postgres.String()) if err != nil { - log.Printf("can't init postgres connection: %s", err) - return + log.Fatalf("can't init postgres connection: %s", err) } - defer pgConn.Close() - - tokenizer := token.NewTokenizer(cfg.TokenSecret) - - manager := clientManager.NewManager() + defer pgConn.Close(context.Background()) producer := queue.NewProducer(cfg.MessageSizeLimit, true) defer producer.Close(15000) - // TODO: rework with integration manager - listener, err := integrations.New(pgConn, cfg.Postgres.String()) + storage := integrations.NewStorage(pgConn) + if err := storage.Listen(); err != nil { + log.Fatalf("Listener error: %v", err) + } + + listener, err := integrations.New(cfg, storage, producer, integrations.NewManager(), token.NewTokenizer(cfg.TokenSecret)) if err != nil { - log.Printf("Postgres listener error: %v\n", err) - log.Fatalf("Postgres listener error") + log.Fatalf("Listener error: %v", err) } defer listener.Close() - listener.IterateIntegrationsOrdered(func(i *integrations.Integration, err error) { - if err != nil { - log.Printf("Postgres error: %v\n", err) - return - } - log.Printf("Integration initialization: %v\n", *i) - err = manager.Update(i) - if err != nil { - log.Printf("Integration parse error: %v | Integration: %v\n", err, *i) - return - } - }) - sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - tick := time.Tick(intervals.INTEGRATIONS_REQUEST_INTERVAL * time.Millisecond) - log.Printf("Integration service started\n") - manager.RequestAll() for { select { case sig := <-sigchan: log.Printf("Caught signal %v: terminating\n", sig) - listener.Close() - pgConn.Close() os.Exit(0) - case <-tick: - log.Printf("Requesting all...\n") - manager.RequestAll() - case event := <-manager.Events: - log.Printf("New integration event: %+v\n", *event.IntegrationEvent) - sessionID := event.SessionID - if sessionID == 0 { - sessData, err := tokenizer.Parse(event.Token) - if err != nil && err != token.EXPIRED { - log.Printf("Error on token parsing: %v; Token: %v", err, event.Token) - continue - } - sessionID = sessData.ID - } - producer.Produce(cfg.TopicAnalytics, sessionID, event.IntegrationEvent.Encode()) - case err := <-manager.Errors: - log.Printf("Integration error: %v\n", err) - case i := <-manager.RequestDataUpdates: - // log.Printf("Last request integration update: %v || %v\n", i, string(i.RequestData)) - if err := listener.UpdateIntegrationRequestData(&i); err != nil { - log.Printf("Postgres Update request_data error: %v\n", err) - } case err := <-listener.Errors: - log.Printf("Postgres listen error: %v\n", err) - listener.Close() - pgConn.Close() + log.Printf("Listener error: %v", err) os.Exit(0) - case iPointer := <-listener.Integrations: - log.Printf("Integration update: %v\n", *iPointer) - err := manager.Update(iPointer) - if err != nil { - log.Printf("Integration parse error: %v | Integration: %v\n", err, *iPointer) - } } } } diff --git a/backend/internal/integrations/clientManager/manager.go b/backend/internal/integrations/clientManager/manager.go deleted file mode 100644 index 9b9328802..000000000 --- a/backend/internal/integrations/clientManager/manager.go +++ /dev/null @@ -1,48 +0,0 @@ -package clientManager - -import ( - "openreplay/backend/internal/integrations/integration" - "openreplay/backend/pkg/integrations" - "strconv" -) - -type manager struct { - clientMap integration.ClientMap - Events chan *integration.SessionErrorEvent - Errors chan error - RequestDataUpdates chan integrations.Integration // not pointer because it could change in other thread -} - -func NewManager() *manager { - return &manager{ - clientMap: make(integration.ClientMap), - RequestDataUpdates: make(chan integrations.Integration, 100), - Events: make(chan *integration.SessionErrorEvent, 100), - Errors: make(chan error, 100), - } - -} - -func (m *manager) Update(i *integrations.Integration) error { - key := strconv.Itoa(int(i.ProjectID)) + i.Provider - if i.Options == nil { - delete(m.clientMap, key) - return nil - } - c, exists := m.clientMap[key] - if !exists { - c, err := integration.NewClient(i, m.RequestDataUpdates, m.Events, m.Errors) - if err != nil { - return err - } - m.clientMap[key] = c - return nil - } - return c.Update(i) -} - -func (m *manager) RequestAll() { - for _, c := range m.clientMap { - go c.Request() - } -} diff --git a/backend/internal/integrations/integration/client.go b/backend/internal/integrations/integration/client.go deleted file mode 100644 index cab736294..000000000 --- a/backend/internal/integrations/integration/client.go +++ /dev/null @@ -1,153 +0,0 @@ -package integration - -import ( - "encoding/json" - "fmt" - "log" - "openreplay/backend/pkg/integrations" - "sync" - "time" - - "openreplay/backend/pkg/messages" -) - -const MAX_ATTEMPTS_IN_A_ROW = 4 -const MAX_ATTEMPTS = 40 -const ATTEMPTS_INTERVAL = 3 * 60 * 60 * 1000 - -type requester interface { - Request(*client) error -} - -type requestData struct { - LastMessageTimestamp uint64 // `json:"lastMessageTimestamp, string"` - LastMessageId string - UnsuccessfullAttemptsCount int - LastAttemptTimestamp int64 -} - -type client struct { - requestData - requester - integration *integrations.Integration - // TODO: timeout ? - mux sync.Mutex - updateChan chan<- integrations.Integration - evChan chan<- *SessionErrorEvent - errChan chan<- error -} - -type SessionErrorEvent struct { - SessionID uint64 - Token string - *messages.IntegrationEvent -} - -type ClientMap map[string]*client - -func NewClient(i *integrations.Integration, updateChan chan<- integrations.Integration, evChan chan<- *SessionErrorEvent, errChan chan<- error) (*client, error) { - c := new(client) - if err := c.Update(i); err != nil { - return nil, err - } - - if err := json.Unmarshal(i.RequestData, &c.requestData); err != nil { - return nil, err - } - c.evChan = evChan - c.errChan = errChan - c.updateChan = updateChan - // TODO: RequestData manager - if c.requestData.LastMessageTimestamp == 0 { - // ? - c.requestData.LastMessageTimestamp = uint64(time.Now().Add(-time.Hour * 24).UnixMilli()) - } - - return c, nil -} - -// from outside -func (c *client) Update(i *integrations.Integration) error { - c.mux.Lock() - defer c.mux.Unlock() - var r requester - switch i.Provider { - case "bugsnag": - r = new(bugsnag) - case "cloudwatch": - r = new(cloudwatch) - case "datadog": - r = new(datadog) - case "elasticsearch": - r = new(elasticsearch) - case "newrelic": - r = new(newrelic) - case "rollbar": - r = new(rollbar) - case "sentry": - r = new(sentry) - case "stackdriver": - r = new(stackdriver) - case "sumologic": - r = new(sumologic) - } - if err := json.Unmarshal(i.Options, r); err != nil { - return err - } - c.integration = i - c.requester = r - return nil -} - -// package scope -func (c *client) setLastMessageTimestamp(timestamp uint64) { - if timestamp > c.requestData.LastMessageTimestamp { - c.requestData.LastMessageTimestamp = timestamp - } -} -func (c *client) getLastMessageTimestamp() uint64 { - return c.requestData.LastMessageTimestamp -} -func (c *client) setLastMessageId(timestamp uint64, id string) { - //if timestamp >= c.requestData.LastMessageTimestamp { - c.requestData.LastMessageId = id - c.requestData.LastMessageTimestamp = timestamp - //} -} -func (c *client) getLastMessageId() string { - return c.requestData.LastMessageId -} - -func (c *client) handleError(err error) { - c.errChan <- fmt.Errorf("%v | Integration: %v", err, *c.integration) -} - -// Thread-safe -func (c *client) Request() { - c.mux.Lock() - defer c.mux.Unlock() - if c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS || - (c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS_IN_A_ROW && - time.Now().UnixMilli()-c.requestData.LastAttemptTimestamp < ATTEMPTS_INTERVAL) { - return - } - - c.requestData.LastAttemptTimestamp = time.Now().UnixMilli() - err := c.requester.Request(c) - if err != nil { - log.Println("ERRROR L139") - log.Println(err) - c.handleError(err) - c.requestData.UnsuccessfullAttemptsCount++ - } else { - c.requestData.UnsuccessfullAttemptsCount = 0 - } - rd, err := json.Marshal(c.requestData) - if err != nil { - c.handleError(err) - } - // RequestData is a byte array (pointer-like type), but it's replacement - // won't affect the previous value sent by channel - c.integration.RequestData = rd - c.updateChan <- *c.integration -} diff --git a/backend/internal/integrations/integration/bugsnag.go b/backend/pkg/integrations/clients/bugsnag.go similarity index 94% rename from backend/internal/integrations/integration/bugsnag.go rename to backend/pkg/integrations/clients/bugsnag.go index ae8657c39..eb636eb10 100644 --- a/backend/internal/integrations/integration/bugsnag.go +++ b/backend/pkg/integrations/clients/bugsnag.go @@ -1,4 +1,4 @@ -package integration +package clients import ( "encoding/json" @@ -34,8 +34,9 @@ type bugsnagEvent struct { } } +// need result chan and lastMessageTs func (b *bugsnag) Request(c *client) error { - sinceTs := c.getLastMessageTimestamp() + 1000 // From next second + sinceTs := c.requestData.GetLastMessageTimestamp() + 1000 // From next second sinceFormatted := time.UnixMilli(int64(sinceTs)).Format(time.RFC3339) requestURL := fmt.Sprintf("https://api.bugsnag.com/projects/%v/events", b.BugsnagProjectId) req, err := http.NewRequest("GET", requestURL, nil) @@ -93,7 +94,7 @@ func (b *bugsnag) Request(c *client) error { continue } timestamp := uint64(parsedTime.UnixMilli()) - c.setLastMessageTimestamp(timestamp) + c.requestData.SetLastMessageTimestamp(timestamp) c.evChan <- &SessionErrorEvent{ SessionID: sessionID, Token: token, diff --git a/backend/internal/integrations/integration/bugsnag.json b/backend/pkg/integrations/clients/bugsnag.json similarity index 100% rename from backend/internal/integrations/integration/bugsnag.json rename to backend/pkg/integrations/clients/bugsnag.json diff --git a/backend/pkg/integrations/clients/client.go b/backend/pkg/integrations/clients/client.go new file mode 100644 index 000000000..c9a677e7f --- /dev/null +++ b/backend/pkg/integrations/clients/client.go @@ -0,0 +1,110 @@ +package clients + +import ( + "encoding/json" + "fmt" + "openreplay/backend/pkg/integrations/model" + "openreplay/backend/pkg/messages" + "sync" +) + +type requester interface { + Request(*client) error +} + +type client struct { + requester + requestData *model.RequestInfo + integration *model.Integration + mux sync.Mutex + updateChan chan<- model.Integration + evChan chan<- *SessionErrorEvent + errChan chan<- error +} + +type SessionErrorEvent struct { + SessionID uint64 + Token string + *messages.IntegrationEvent +} + +type ClientMap map[string]*client + +func NewClient(i *model.Integration, updateChan chan<- model.Integration, evChan chan<- *SessionErrorEvent, errChan chan<- error) (*client, error) { + ri, err := i.GetRequestInfo() + if err != nil { + return nil, err + } + c := &client{ + evChan: evChan, + errChan: errChan, + updateChan: updateChan, + requestData: ri, + } + return c, nil +} + +func (c *client) Update(i *model.Integration) error { + var r requester + switch i.Provider { + case "bugsnag": + r = new(bugsnag) + case "cloudwatch": + r = new(cloudwatch) + case "datadog": + r = new(datadog) + case "elasticsearch": + r = new(elasticsearch) + case "newrelic": + r = new(newrelic) + case "rollbar": + r = new(rollbar) + case "sentry": + r = new(sentry) + case "stackdriver": + r = new(stackdriver) + case "sumologic": + r = new(sumologic) + } + if err := json.Unmarshal(i.Options, r); err != nil { + return err + } + + c.mux.Lock() + defer c.mux.Unlock() + + c.integration = i + c.requester = r + return nil +} + +func (c *client) handleError(err error) { + c.errChan <- fmt.Errorf("%v | Integration: %v", err, *c.integration) +} + +func (c *client) Request() { + c.mux.Lock() + defer c.mux.Unlock() + + if !c.requestData.CanAttempt() { + return + } + c.requestData.UpdateLastAttempt() + + err := c.requester.Request(c) + if err != nil { + c.requestData.Inc() + c.handleError(fmt.Errorf("ERRROR L139, err: %s", err)) + } else { + c.requestData.Reset() + } + + rd, err := c.requestData.Encode() + if err != nil { + c.handleError(err) + } + // RequestData is a byte array (pointer-like type), but it's replacement + // won't affect the previous value sent by channel + c.integration.RequestData = rd + c.updateChan <- *c.integration +} diff --git a/backend/internal/integrations/integration/cloudwatch.go b/backend/pkg/integrations/clients/cloudwatch.go similarity index 91% rename from backend/internal/integrations/integration/cloudwatch.go rename to backend/pkg/integrations/clients/cloudwatch.go index a069b18cb..e06a6c605 100644 --- a/backend/internal/integrations/integration/cloudwatch.go +++ b/backend/pkg/integrations/clients/cloudwatch.go @@ -1,11 +1,10 @@ -package integration +package clients import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" - "openreplay/backend/pkg/messages" "regexp" "strings" @@ -21,7 +20,7 @@ type cloudwatch struct { } func (cw *cloudwatch) Request(c *client) error { - startTs := int64(c.getLastMessageTimestamp() + 1) // From next millisecond + startTs := int64(c.requestData.GetLastMessageTimestamp() + 1) // From next millisecond //endTs := utils.CurrentTimestamp() sess, err := session.NewSession(aws.NewConfig(). WithRegion(cw.Region). @@ -40,7 +39,7 @@ func (cw *cloudwatch) Request(c *client) error { // SetLimit(10000). // Default 10000 SetLogGroupName(cw.LogGroupName). SetFilterPattern("openReplaySessionToken") - //SetFilterPattern("asayer_session_id") + //SetFilterPattern("asayer_session_id") for { output, err := svc.FilterLogEvents(filterOptions) @@ -64,7 +63,7 @@ func (cw *cloudwatch) Request(c *client) error { name = *e.LogStreamName } timestamp := uint64(*e.Timestamp) - c.setLastMessageTimestamp(timestamp) + c.requestData.SetLastMessageTimestamp(timestamp) c.evChan <- &SessionErrorEvent{ //SessionID: sessionID, Token: token, diff --git a/backend/internal/integrations/integration/datadog.go b/backend/pkg/integrations/clients/datadog.go similarity index 95% rename from backend/internal/integrations/integration/datadog.go rename to backend/pkg/integrations/clients/datadog.go index edbbd6d83..ca0ef166c 100644 --- a/backend/internal/integrations/integration/datadog.go +++ b/backend/pkg/integrations/clients/datadog.go @@ -1,4 +1,4 @@ -package integration +package clients import ( "bytes" @@ -71,7 +71,7 @@ func (d *datadog) makeRequest(nextLogId *string, fromTs uint64, toTs uint64) (*h } func (d *datadog) Request(c *client) error { - fromTs := c.getLastMessageTimestamp() + 1 // From next millisecond + fromTs := c.requestData.GetLastMessageTimestamp() + 1 // From next millisecond toTs := uint64(time.Now().UnixMilli()) var nextLogId *string for { @@ -111,7 +111,7 @@ func (d *datadog) Request(c *client) error { continue } timestamp := uint64(parsedTime.UnixMilli()) - c.setLastMessageTimestamp(timestamp) + c.requestData.SetLastMessageTimestamp(timestamp) c.evChan <- &SessionErrorEvent{ //SessionID: sessionID, Token: token, diff --git a/backend/internal/integrations/integration/datadog.json b/backend/pkg/integrations/clients/datadog.json similarity index 100% rename from backend/internal/integrations/integration/datadog.json rename to backend/pkg/integrations/clients/datadog.json diff --git a/backend/internal/integrations/integration/elasticsearch.go b/backend/pkg/integrations/clients/elasticsearch.go similarity index 96% rename from backend/internal/integrations/integration/elasticsearch.go rename to backend/pkg/integrations/clients/elasticsearch.go index 5331f850e..e0c898434 100644 --- a/backend/internal/integrations/integration/elasticsearch.go +++ b/backend/pkg/integrations/clients/elasticsearch.go @@ -1,4 +1,4 @@ -package integration +package clients import ( "bytes" @@ -34,8 +34,6 @@ func (es *elasticsearch) Request(c *client) error { Addresses: []string{ address, }, - //Username: es.ApiKeyId, - //Password: es.ApiKey, APIKey: apiKey, } esC, err := elasticlib.NewClient(cfg) @@ -46,7 +44,7 @@ func (es *elasticsearch) Request(c *client) error { return err } - gteTs := c.getLastMessageTimestamp() + 1000 // Sec or millisec to add ? + gteTs := c.requestData.GetLastMessageTimestamp() + 1000 log.Printf("gteTs: %v ", gteTs) var buf bytes.Buffer query := map[string]interface{}{ @@ -164,7 +162,7 @@ func (es *elasticsearch) Request(c *client) error { continue } timestamp := uint64(esLog.Time.UnixMilli()) - c.setLastMessageTimestamp(timestamp) + c.requestData.SetLastMessageTimestamp(timestamp) var sessionID uint64 sessionID, err = strconv.ParseUint(token, 10, 64) diff --git a/backend/internal/integrations/integration/elasticsearch.json b/backend/pkg/integrations/clients/elasticsearch.json similarity index 100% rename from backend/internal/integrations/integration/elasticsearch.json rename to backend/pkg/integrations/clients/elasticsearch.json diff --git a/backend/internal/integrations/integration/newrelic.go b/backend/pkg/integrations/clients/newrelic.go similarity index 91% rename from backend/internal/integrations/integration/newrelic.go rename to backend/pkg/integrations/clients/newrelic.go index 97426de22..bc0a3e5ad 100644 --- a/backend/internal/integrations/integration/newrelic.go +++ b/backend/pkg/integrations/clients/newrelic.go @@ -1,4 +1,4 @@ -package integration +package clients import ( "encoding/json" @@ -27,18 +27,16 @@ type newrelicResponce struct { Results []struct { Events []json.RawMessage } - // Metadata } type newrelicEvent struct { - //AsayerSessionID uint64 `json:"asayer_session_id,string"` // string/int decoder? OpenReplaySessionToken string `json:"openReplaySessionToken"` ErrorClass string `json:"error.class"` Timestamp uint64 `json:"timestamp"` } func (nr *newrelic) Request(c *client) error { - sinceTs := c.getLastMessageTimestamp() + 1000 // From next second + sinceTs := c.requestData.GetLastMessageTimestamp() + 1000 // From next second // In docs - format "yyyy-mm-dd HH:MM:ss", but time.RFC3339 works fine too sinceFormatted := time.UnixMilli(int64(sinceTs)).Format(time.RFC3339) // US/EU endpoint ?? @@ -86,7 +84,7 @@ func (nr *newrelic) Request(c *client) error { continue } - c.setLastMessageTimestamp(e.Timestamp) + c.requestData.SetLastMessageTimestamp(e.Timestamp) c.evChan <- &SessionErrorEvent{ Token: e.OpenReplaySessionToken, IntegrationEvent: &messages.IntegrationEvent{ diff --git a/backend/internal/integrations/integration/newrelic.json b/backend/pkg/integrations/clients/newrelic.json similarity index 100% rename from backend/internal/integrations/integration/newrelic.json rename to backend/pkg/integrations/clients/newrelic.json diff --git a/backend/internal/integrations/integration/newrelic_empty.json b/backend/pkg/integrations/clients/newrelic_empty.json similarity index 100% rename from backend/internal/integrations/integration/newrelic_empty.json rename to backend/pkg/integrations/clients/newrelic_empty.json diff --git a/backend/internal/integrations/integration/rollbar.go b/backend/pkg/integrations/clients/rollbar.go similarity index 92% rename from backend/internal/integrations/integration/rollbar.go rename to backend/pkg/integrations/clients/rollbar.go index a683a687c..c7acaabe4 100644 --- a/backend/internal/integrations/integration/rollbar.go +++ b/backend/pkg/integrations/clients/rollbar.go @@ -1,4 +1,4 @@ -package integration +package clients import ( "encoding/json" @@ -60,12 +60,12 @@ type rollbarJobStatusResponce struct { type rollbarEvent map[string]string /* - It is possible to use /api/1/instances (20 per page) - Jobs for the identical requests are hashed + It is possible to use /api/1/instances (20 per page) + Jobs for the identical requests are hashed */ func (rb *rollbar) Request(c *client) error { - fromTs := c.getLastMessageTimestamp() + 1000 // From next second - c.setLastMessageTimestamp(fromTs) // anti-job-hashing + fromTs := c.requestData.GetLastMessageTimestamp() + 1000 // From next second + c.requestData.SetLastMessageTimestamp(fromTs) // anti-job-hashing fromTsSec := fromTs / 1e3 query := fmt.Sprintf(RB_QUERY, fromTsSec) jsonBody := fmt.Sprintf(`{ @@ -153,7 +153,7 @@ func (rb *rollbar) Request(c *client) error { continue } timestamp := timestampSec * 1000 - c.setLastMessageTimestamp(timestamp) + c.requestData.SetLastMessageTimestamp(timestamp) c.evChan <- &SessionErrorEvent{ Token: e["body.message.openReplaySessionToken"], IntegrationEvent: &messages.IntegrationEvent{ diff --git a/backend/internal/integrations/integration/rollbar_error.json b/backend/pkg/integrations/clients/rollbar_error.json similarity index 100% rename from backend/internal/integrations/integration/rollbar_error.json rename to backend/pkg/integrations/clients/rollbar_error.json diff --git a/backend/internal/integrations/integration/rollbar_job_result.json b/backend/pkg/integrations/clients/rollbar_job_result.json similarity index 100% rename from backend/internal/integrations/integration/rollbar_job_result.json rename to backend/pkg/integrations/clients/rollbar_job_result.json diff --git a/backend/internal/integrations/integration/rollbar_job_start.json b/backend/pkg/integrations/clients/rollbar_job_start.json similarity index 100% rename from backend/internal/integrations/integration/rollbar_job_start.json rename to backend/pkg/integrations/clients/rollbar_job_start.json diff --git a/backend/internal/integrations/integration/sentry.go b/backend/pkg/integrations/clients/sentry.go similarity index 96% rename from backend/internal/integrations/integration/sentry.go rename to backend/pkg/integrations/clients/sentry.go index ea3118449..bf62aa88b 100644 --- a/backend/internal/integrations/integration/sentry.go +++ b/backend/pkg/integrations/clients/sentry.go @@ -1,4 +1,4 @@ -package integration +package clients import ( "encoding/json" @@ -46,7 +46,7 @@ func (sn *sentry) Request(c *client) error { req.Header.Add("Authorization", authHeader) // by link ? - lastEventId := c.getLastMessageId() + lastEventId := c.requestData.GetLastMessageId() firstEvent := true PageLoop: @@ -88,7 +88,7 @@ PageLoop: timestamp := uint64(parsedTime.UnixMilli()) // TODO: not to receive all the messages (use default integration timestamp) if firstEvent { // TODO: reverse range? - c.setLastMessageId(timestamp, e.EventID) + c.requestData.SetLastMessageId(timestamp, e.EventID) firstEvent = false } diff --git a/backend/internal/integrations/integration/sentry.json b/backend/pkg/integrations/clients/sentry.json similarity index 100% rename from backend/internal/integrations/integration/sentry.json rename to backend/pkg/integrations/clients/sentry.json diff --git a/backend/internal/integrations/integration/stackdriver.go b/backend/pkg/integrations/clients/stackdriver.go similarity index 92% rename from backend/internal/integrations/integration/stackdriver.go rename to backend/pkg/integrations/clients/stackdriver.go index 45b769aa7..4f63ce954 100644 --- a/backend/internal/integrations/integration/stackdriver.go +++ b/backend/pkg/integrations/clients/stackdriver.go @@ -1,10 +1,9 @@ -package integration +package clients import ( "cloud.google.com/go/logging/logadmin" "google.golang.org/api/iterator" "google.golang.org/api/option" - //"strconv" "context" "encoding/json" @@ -33,7 +32,7 @@ type saCreds struct { } func (sd *stackdriver) Request(c *client) error { - fromTs := c.getLastMessageTimestamp() + 1 // Timestamp is RFC3339Nano, so we take the next millisecond + fromTs := c.requestData.GetLastMessageTimestamp() + 1 // Timestamp is RFC3339Nano, so we take the next millisecond fromFormatted := time.UnixMilli(int64(fromTs)).Format(time.RFC3339Nano) ctx := context.Background() @@ -85,7 +84,7 @@ func (sd *stackdriver) Request(c *client) error { continue } timestamp := uint64(e.Timestamp.UnixMilli()) - c.setLastMessageTimestamp(timestamp) + c.requestData.SetLastMessageTimestamp(timestamp) c.evChan <- &SessionErrorEvent{ //SessionID: sessionID, Token: token, diff --git a/backend/internal/integrations/integration/sumologic.go b/backend/pkg/integrations/clients/sumologic.go similarity index 93% rename from backend/internal/integrations/integration/sumologic.go rename to backend/pkg/integrations/clients/sumologic.go index c5ee63249..d2fa6eb12 100644 --- a/backend/internal/integrations/integration/sumologic.go +++ b/backend/pkg/integrations/clients/sumologic.go @@ -1,4 +1,4 @@ -package integration +package clients import ( "encoding/json" @@ -13,10 +13,10 @@ import ( ) /* - The maximum value for limit is 10,000 messages or 100 MB in total message size, - which means the query may return less than 10,000 messages if you exceed the size limit. +The maximum value for limit is 10,000 messages or 100 MB in total message size, +which means the query may return less than 10,000 messages if you exceed the size limit. - API Documentation: https://help.sumologic.com/APIs/Search-Job-API/About-the-Search-Job-API +API Documentation: https://help.sumologic.com/APIs/Search-Job-API/About-the-Search-Job-API */ const SL_LIMIT = 10000 @@ -67,7 +67,7 @@ func (sl *sumologic) deleteJob(jobId string, errChan chan<- error) { } func (sl *sumologic) Request(c *client) error { - fromTs := c.getLastMessageTimestamp() + 1 // From next millisecond + fromTs := c.requestData.GetLastMessageTimestamp() + 1 // From next millisecond toTs := time.Now().UnixMilli() requestURL := fmt.Sprintf("https://api.%vsumologic.com/api/v1/search/jobs", "eu.") // deployment server?? jsonBody := fmt.Sprintf(`{ @@ -189,7 +189,7 @@ func (sl *sumologic) Request(c *client) error { if len(name) > 20 { name = name[:20] // not sure about that } - c.setLastMessageTimestamp(e.Timestamp) + c.requestData.SetLastMessageTimestamp(e.Timestamp) c.evChan <- &SessionErrorEvent{ //SessionID: sessionID, Token: token, diff --git a/backend/internal/integrations/integration/sumologic_error.json b/backend/pkg/integrations/clients/sumologic_error.json similarity index 100% rename from backend/internal/integrations/integration/sumologic_error.json rename to backend/pkg/integrations/clients/sumologic_error.json diff --git a/backend/internal/integrations/integration/sumologic_job_result.json b/backend/pkg/integrations/clients/sumologic_job_result.json similarity index 100% rename from backend/internal/integrations/integration/sumologic_job_result.json rename to backend/pkg/integrations/clients/sumologic_job_result.json diff --git a/backend/internal/integrations/integration/sumologic_job_start.json b/backend/pkg/integrations/clients/sumologic_job_start.json similarity index 100% rename from backend/internal/integrations/integration/sumologic_job_start.json rename to backend/pkg/integrations/clients/sumologic_job_start.json diff --git a/backend/internal/integrations/integration/sumologic_job_status.json b/backend/pkg/integrations/clients/sumologic_job_status.json similarity index 100% rename from backend/internal/integrations/integration/sumologic_job_status.json rename to backend/pkg/integrations/clients/sumologic_job_status.json diff --git a/backend/internal/integrations/integration/utils.go b/backend/pkg/integrations/clients/utils.go similarity index 60% rename from backend/internal/integrations/integration/utils.go rename to backend/pkg/integrations/clients/utils.go index 36a473c02..e171e3e14 100644 --- a/backend/internal/integrations/integration/utils.go +++ b/backend/pkg/integrations/clients/utils.go @@ -1,22 +1,11 @@ -package integration +package clients import ( "fmt" "regexp" - "strconv" "strings" ) -var reSessionID = regexp.MustCompile(`(?i)asayer_session_id=([0-9]+)`) - -func GetAsayerSessionId(s string) (uint64, error) { - matches := reSessionID.FindStringSubmatch(s) - if len(matches) < 2 { - return 0, fmt.Errorf("'asayer_session_id' not found in '%v' ", s) - } - return strconv.ParseUint(matches[1], 10, 64) -} - func GetLinkFromAngularBrackets(s string) string { beg := strings.Index(s, "<") + 1 end := strings.Index(s, ">") diff --git a/backend/pkg/integrations/integrations.go b/backend/pkg/integrations/integrations.go index bea0fe65a..12d310933 100644 --- a/backend/pkg/integrations/integrations.go +++ b/backend/pkg/integrations/integrations.go @@ -1,100 +1,94 @@ package integrations import ( - "context" - "encoding/json" "fmt" + "log" + "strings" + "time" - "github.com/jackc/pgx/v4" - - "openreplay/backend/pkg/db/postgres/pool" + config "openreplay/backend/internal/config/integrations" + "openreplay/backend/pkg/intervals" + "openreplay/backend/pkg/queue/types" + "openreplay/backend/pkg/token" ) type Listener struct { - conn *pgx.Conn - db pool.Pool - Integrations chan *Integration - Errors chan error + cfg *config.Config + storage Storage + producer types.Producer + manager *Manager + tokenizer *token.Tokenizer + Errors chan error } -type Integration struct { - ProjectID uint32 `json:"project_id"` - Provider string `json:"provider"` - RequestData json.RawMessage `json:"request_data"` - Options json.RawMessage `json:"options"` -} - -func New(db pool.Pool, url string) (*Listener, error) { - conn, err := pgx.Connect(context.Background(), url) +func New(cfg *config.Config, storage Storage, producer types.Producer, manager *Manager, tokenizer *token.Tokenizer) (*Listener, error) { + listener := &Listener{ + cfg: cfg, + storage: storage, + Errors: make(chan error), + producer: producer, + manager: manager, + tokenizer: tokenizer, + } + ints, err := storage.GetAll() if err != nil { return nil, err } - listener := &Listener{ - conn: conn, - db: db, - Errors: make(chan error), + for _, i := range ints { + // Add new integration to manager + if err = manager.Update(i); err != nil { + log.Printf("Integration parse error: %v | Integration: %v\n", err, *i) + } } - listener.Integrations = make(chan *Integration, 50) - if _, err := conn.Exec(context.Background(), "LISTEN integration"); err != nil { - return nil, err - } - go listener.listen() + manager.RequestAll() + go listener.worker() return listener, nil } -func (listener *Listener) listen() { +func (l *Listener) worker() { + clientsCheckTick := time.Tick(intervals.INTEGRATIONS_REQUEST_INTERVAL * time.Millisecond) + for { - notification, err := listener.conn.WaitForNotification(context.Background()) - if err != nil { - listener.Errors <- err - continue - } - switch notification.Channel { - case "integration": - integrationP := new(Integration) - if err := json.Unmarshal([]byte(notification.Payload), integrationP); err != nil { - listener.Errors <- fmt.Errorf("%v | Payload: %v", err, notification.Payload) - } else { - listener.Integrations <- integrationP + select { + case <-clientsCheckTick: + l.manager.RequestAll() + case event := <-l.manager.Events: + log.Printf("New integration event: %+v\n", *event.IntegrationEvent) + sessionID := event.SessionID + if sessionID == 0 { + sessData, err := l.tokenizer.Parse(event.Token) + if err != nil && err != token.EXPIRED { + log.Printf("Error on token parsing: %v; Token: %v", err, event.Token) + continue + } + sessionID = sessData.ID + } + // Why do we produce integration events to analytics topic + l.producer.Produce(l.cfg.TopicAnalytics, sessionID, event.IntegrationEvent.Encode()) + case err := <-l.manager.Errors: + log.Printf("Integration error: %v\n", err) + case i := <-l.manager.RequestDataUpdates: + if err := l.storage.Update(&i); err != nil { + log.Printf("Postgres Update request_data error: %v\n", err) + } + default: + newNotification, err := l.storage.CheckNew() + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + continue + } + l.Errors <- fmt.Errorf("Integration storage error: %v", err) + continue + } + log.Printf("Integration update: %v\n", *newNotification) + err = l.manager.Update(newNotification) + if err != nil { + log.Printf("Integration parse error: %v | Integration: %v\n", err, *newNotification) } } } } -func (listener *Listener) Close() error { - return listener.conn.Close(context.Background()) -} - -func (listener *Listener) IterateIntegrationsOrdered(iter func(integration *Integration, err error)) error { - rows, err := listener.db.Query(` - SELECT project_id, provider, options, request_data - FROM integrations - `) - if err != nil { - return err - } - defer rows.Close() - - for rows.Next() { - i := new(Integration) - if err := rows.Scan(&i.ProjectID, &i.Provider, &i.Options, &i.RequestData); err != nil { - iter(nil, err) - continue - } - iter(i, nil) - } - - if err = rows.Err(); err != nil { - return err - } - return nil -} - -func (listener *Listener) UpdateIntegrationRequestData(i *Integration) error { - return listener.db.Exec(` - UPDATE integrations - SET request_data = $1 - WHERE project_id=$2 AND provider=$3`, - i.RequestData, i.ProjectID, i.Provider, - ) +func (l *Listener) Close() error { + return l.storage.UnListen() } diff --git a/backend/pkg/integrations/manager.go b/backend/pkg/integrations/manager.go new file mode 100644 index 000000000..6280f0dae --- /dev/null +++ b/backend/pkg/integrations/manager.go @@ -0,0 +1,48 @@ +package integrations + +import ( + "log" + "openreplay/backend/pkg/integrations/clients" + "openreplay/backend/pkg/integrations/model" +) + +type Manager struct { + clientMap clients.ClientMap + Events chan *clients.SessionErrorEvent + Errors chan error + RequestDataUpdates chan model.Integration // not pointer because it could change in other thread +} + +func NewManager() *Manager { + return &Manager{ + clientMap: make(clients.ClientMap), + RequestDataUpdates: make(chan model.Integration, 100), + Events: make(chan *clients.SessionErrorEvent, 100), + Errors: make(chan error, 100), + } +} + +func (m *Manager) Update(i *model.Integration) (err error) { + log.Printf("Integration initialization: %v\n", *i) + key := i.GetKey() + if i.Options == nil { + delete(m.clientMap, key) + return nil + } + c, exists := m.clientMap[key] + if !exists { + c, err = clients.NewClient(i, m.RequestDataUpdates, m.Events, m.Errors) + if err != nil { + return err + } + m.clientMap[key] = c + } + return c.Update(i) +} + +func (m *Manager) RequestAll() { + log.Printf("Requesting all...\n") + for _, c := range m.clientMap { + go c.Request() + } +} diff --git a/backend/pkg/integrations/model/integration.go b/backend/pkg/integrations/model/integration.go new file mode 100644 index 000000000..58fa2f119 --- /dev/null +++ b/backend/pkg/integrations/model/integration.go @@ -0,0 +1,38 @@ +package model + +import ( + "encoding/json" + "fmt" + "time" +) + +type Integration struct { + ProjectID uint32 `json:"project_id"` + Provider string `json:"provider"` + RequestData json.RawMessage `json:"request_data"` + Options json.RawMessage `json:"options"` +} + +func (i *Integration) Encode() []byte { + b, _ := json.Marshal(i) + return b +} + +func (i *Integration) Decode(data []byte) error { + return json.Unmarshal(data, i) +} + +func (i *Integration) GetKey() string { + return fmt.Sprintf("%d%s", i.ProjectID, i.Provider) +} + +func (i *Integration) GetRequestInfo() (*RequestInfo, error) { + ri := new(RequestInfo) + if err := json.Unmarshal(i.RequestData, ri); err != nil { + return nil, err + } + if ri.LastMessageTimestamp == 0 { + ri.LastMessageTimestamp = uint64(time.Now().Add(-time.Hour * 24).UnixMilli()) + } + return ri, nil +} diff --git a/backend/pkg/integrations/model/request.go b/backend/pkg/integrations/model/request.go new file mode 100644 index 000000000..d6267374e --- /dev/null +++ b/backend/pkg/integrations/model/request.go @@ -0,0 +1,58 @@ +package model + +import ( + "encoding/json" + "time" +) + +const MAX_ATTEMPTS_IN_A_ROW = 4 +const MAX_ATTEMPTS = 40 +const ATTEMPTS_INTERVAL = 3 * 60 * 60 * 1000 + +type RequestInfo struct { + LastMessageId string + LastMessageTimestamp uint64 + LastAttemptTimestamp int64 + UnsuccessfullAttemptsCount int +} + +func (c *RequestInfo) SetLastMessageTimestamp(timestamp uint64) { + if timestamp > c.LastMessageTimestamp { + c.LastMessageTimestamp = timestamp + } +} +func (c *RequestInfo) GetLastMessageTimestamp() uint64 { + return c.LastMessageTimestamp +} +func (c *RequestInfo) SetLastMessageId(timestamp uint64, id string) { + c.LastMessageId = id + c.LastMessageTimestamp = timestamp +} +func (c *RequestInfo) GetLastMessageId() string { + return c.LastMessageId +} + +func (c *RequestInfo) CanAttempt() bool { + if c.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS || + (c.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS_IN_A_ROW && + time.Now().UnixMilli()-c.LastAttemptTimestamp < ATTEMPTS_INTERVAL) { + return false + } + return true +} + +func (c *RequestInfo) UpdateLastAttempt() { + c.LastAttemptTimestamp = time.Now().UnixMilli() +} + +func (c *RequestInfo) Inc() { + c.UnsuccessfullAttemptsCount++ +} + +func (c *RequestInfo) Reset() { + c.UnsuccessfullAttemptsCount = 0 +} + +func (c *RequestInfo) Encode() ([]byte, error) { + return json.Marshal(c) +} diff --git a/backend/pkg/integrations/storage.go b/backend/pkg/integrations/storage.go new file mode 100644 index 000000000..02025df3d --- /dev/null +++ b/backend/pkg/integrations/storage.go @@ -0,0 +1,92 @@ +package integrations + +import ( + "context" + "encoding/json" + "fmt" + "log" + "openreplay/backend/pkg/integrations/model" + "time" + + "github.com/jackc/pgx/v4" +) + +type Storage interface { + Listen() error + UnListen() error + CheckNew() (*model.Integration, error) + GetAll() ([]*model.Integration, error) + Update(i *model.Integration) error +} + +type storageImpl struct { + conn *pgx.Conn +} + +func NewStorage(conn *pgx.Conn) Storage { + return &storageImpl{ + conn: conn, + } +} + +func (s *storageImpl) Listen() error { + _, err := s.conn.Exec(context.Background(), "LISTEN integration") + return err +} + +func (s *storageImpl) UnListen() error { + _, err := s.conn.Exec(context.Background(), "UNLISTEN integration") + return err +} + +func (s *storageImpl) CheckNew() (*model.Integration, error) { + ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond) + notification, err := s.conn.WaitForNotification(ctx) + if err != nil { + return nil, err + } + if notification.Channel == "integration" { + integrationP := new(model.Integration) + if err := json.Unmarshal([]byte(notification.Payload), integrationP); err != nil { + return nil, err + } + return integrationP, nil + } + return nil, fmt.Errorf("unknown notification channel: %s", notification.Channel) +} + +func (s *storageImpl) GetAll() ([]*model.Integration, error) { + rows, err := s.conn.Query(context.Background(), ` + SELECT project_id, provider, options, request_data + FROM integrations + `) + if err != nil { + return nil, err + } + defer rows.Close() + + integrations := make([]*model.Integration, 0) + for rows.Next() { + i := new(model.Integration) + if err := rows.Scan(&i.ProjectID, &i.Provider, &i.Options, &i.RequestData); err != nil { + log.Printf("Postgres scan error: %v\n", err) + continue + } + integrations = append(integrations, i) + } + + if err = rows.Err(); err != nil { + return nil, err + } + return integrations, nil +} + +func (s *storageImpl) Update(i *model.Integration) error { + _, err := s.conn.Exec(context.Background(), ` + UPDATE integrations + SET request_data = $1 + WHERE project_id=$2 AND provider=$3`, + i.RequestData, i.ProjectID, i.Provider, + ) + return err +}