From 481db19dbe5c3aeaf3bcf62a1ece2d2f55428880 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 5 Sep 2023 12:18:47 +0200 Subject: [PATCH] Go redshift connector (#1457) * feat(backend): redshift connector draft * fix(backend): fixed memory leak, empty string ddos * feat(backend): draft for sessions part * feat(backend): session handler * fix(backend): fixed wrong columns list in sessionToCSV convertor * feat(backend): load session info from db/cache if there is nothing in memory when sessionEnd event recieved * feat(backend): added filters for connector * feat(backend): memory leak fix + extra cache for sessions * feat(backend): moved table names as an env variable * fix(backend): added timeout for last session messages to avoid memory leak * fix(backend): fixed last memory leak * feat(backend): moved redshift connector to ee folder --- backend/internal/config/common/config.go | 11 + backend/internal/config/connector/config.go | 31 ++ backend/pkg/cache/cache.go | 10 + backend/pkg/sessions/cache.go | 20 + backend/pkg/sessions/redis.go | 8 + backend/pkg/sessions/sessions.go | 10 + ee/backend/cmd/connector/main.go | 94 ++++ ee/backend/internal/connector/service.go | 62 +++ ee/backend/pkg/connector/redshift.go | 72 +++ ee/backend/pkg/connector/saver.go | 556 ++++++++++++++++++++ ee/backend/pkg/sessions/redis.go | 44 ++ 11 files changed, 918 insertions(+) create mode 100644 backend/internal/config/connector/config.go create mode 100644 ee/backend/cmd/connector/main.go create mode 100644 ee/backend/internal/connector/service.go create mode 100644 ee/backend/pkg/connector/redshift.go create mode 100644 ee/backend/pkg/connector/saver.go diff --git a/backend/internal/config/common/config.go b/backend/internal/config/common/config.go index 5090c1fee..14401d5bf 100644 --- a/backend/internal/config/common/config.go +++ b/backend/internal/config/common/config.go @@ -38,3 +38,14 @@ func (cfg *Postgres) String() string { } return str } + +// Redshift config + +type Redshift struct { + ConnectioString string `env:"REDSHIFT_STRING"` + Host string `env:"REDSHIFT_HOST"` + Port int `env:"REDSHIFT_PORT"` + User string `env:"REDSHIFT_USER"` + Password string `env:"REDSHIFT_PASSWORD"` + Database string `env:"REDSHIFT_DATABASE"` +} diff --git a/backend/internal/config/connector/config.go b/backend/internal/config/connector/config.go new file mode 100644 index 000000000..169bbb739 --- /dev/null +++ b/backend/internal/config/connector/config.go @@ -0,0 +1,31 @@ +package connector + +import ( + "openreplay/backend/internal/config/common" + "openreplay/backend/internal/config/configurator" + "openreplay/backend/internal/config/objectstorage" + "openreplay/backend/internal/config/redis" + "time" +) + +type Config struct { + common.Config + common.Postgres + redis.Redis + common.Redshift + objectstorage.ObjectsConfig + SessionsTableName string `env:"SESSIONS_TABLE_NAME,default=connector_user_sessions"` + EventsTableName string `env:"EVENTS_TABLE_NAME,default=connector_events"` + EventLevel string `env:"EVENT_LEVEL,default=normal"` + GroupConnector string `env:"GROUP_REDSHIFT_CONNECTOR,default=redshift-connector"` + TopicRawWeb string `env:"TOPIC_RAW_WEB,required"` + TopicAnalytics string `env:"TOPIC_ANALYTICS,required"` + CommitBatchTimeout time.Duration `env:"COMMIT_BATCH_TIMEOUT,default=5s"` + UseProfiler bool `env:"PROFILER_ENABLED,default=false"` +} + +func New() *Config { + cfg := &Config{} + configurator.Process(cfg) + return cfg +} diff --git a/backend/pkg/cache/cache.go b/backend/pkg/cache/cache.go index bcb92fd91..28e0822c9 100644 --- a/backend/pkg/cache/cache.go +++ b/backend/pkg/cache/cache.go @@ -9,6 +9,8 @@ type Cache interface { Set(key, value interface{}) Get(key interface{}) (interface{}, bool) GetAndRefresh(key interface{}) (interface{}, bool) + SetCache(sessID uint64, data map[string]string) error + GetCache(sessID uint64) (map[string]string, error) } type item struct { @@ -21,6 +23,14 @@ type cacheImpl struct { items map[interface{}]item } +func (c *cacheImpl) SetCache(sessID uint64, data map[string]string) error { + return nil +} + +func (c *cacheImpl) GetCache(sessID uint64) (map[string]string, error) { + return nil, nil +} + func New(cleaningInterval, itemDuration time.Duration) Cache { cache := &cacheImpl{items: make(map[interface{}]item)} go func() { diff --git a/backend/pkg/sessions/cache.go b/backend/pkg/sessions/cache.go index 64f87ca99..4ad20f2c7 100644 --- a/backend/pkg/sessions/cache.go +++ b/backend/pkg/sessions/cache.go @@ -11,6 +11,8 @@ import ( type Cache interface { Set(session *Session) error Get(sessionID uint64) (*Session, error) + SetCache(sessID uint64, data map[string]string) error + GetCache(sessID uint64) (map[string]string, error) } var ErrSessionNotFound = errors.New("session not found") @@ -20,6 +22,24 @@ type inMemoryCacheImpl struct { redis Cache } +func (i *inMemoryCacheImpl) SetCache(sessID uint64, data map[string]string) error { + if err := i.redis.SetCache(sessID, data); err != nil && !errors.Is(err, ErrDisabledCache) { + log.Printf("Failed to cache session: %v", err) + } + return nil +} + +func (i *inMemoryCacheImpl) GetCache(sessID uint64) (map[string]string, error) { + session, err := i.redis.GetCache(sessID) + if err == nil { + return session, nil + } + if !errors.Is(err, ErrDisabledCache) && err.Error() != "redis: nil" { + log.Printf("Failed to get session from cache: %v", err) + } + return nil, ErrSessionNotFound +} + func (i *inMemoryCacheImpl) Set(session *Session) error { i.sessions.Set(session.SessionID, session) if err := i.redis.Set(session); err != nil && !errors.Is(err, ErrDisabledCache) { diff --git a/backend/pkg/sessions/redis.go b/backend/pkg/sessions/redis.go index b7f37538d..9ab206a3e 100644 --- a/backend/pkg/sessions/redis.go +++ b/backend/pkg/sessions/redis.go @@ -7,6 +7,14 @@ import ( type cacheImpl struct{} +func (c *cacheImpl) SetCache(sessID uint64, data map[string]string) error { + return ErrDisabledCache +} + +func (c *cacheImpl) GetCache(sessID uint64) (map[string]string, error) { + return nil, ErrDisabledCache +} + func (c *cacheImpl) Set(session *Session) error { return ErrDisabledCache } diff --git a/backend/pkg/sessions/sessions.go b/backend/pkg/sessions/sessions.go index 3f1c77e30..9a14cac8c 100644 --- a/backend/pkg/sessions/sessions.go +++ b/backend/pkg/sessions/sessions.go @@ -12,8 +12,10 @@ import ( type Sessions interface { Add(session *Session) error AddUnStarted(session *UnStartedSession) error + AddCached(sessionID uint64, data map[string]string) error Get(sessionID uint64) (*Session, error) GetUpdated(sessionID uint64) (*Session, error) + GetCached(sessionID uint64) (map[string]string, error) GetDuration(sessionID uint64) (uint64, error) UpdateDuration(sessionID uint64, timestamp uint64) (uint64, error) UpdateEncryptionKey(sessionID uint64, key []byte) error @@ -108,6 +110,14 @@ func (s *sessionsImpl) GetUpdated(sessionID uint64) (*Session, error) { return session, nil } +func (s *sessionsImpl) AddCached(sessionID uint64, data map[string]string) error { + return s.cache.SetCache(sessionID, data) +} + +func (s *sessionsImpl) GetCached(sessionID uint64) (map[string]string, error) { + return s.cache.GetCache(sessionID) +} + // GetDuration usage: in ender to check current and new duration to avoid duplicates func (s *sessionsImpl) GetDuration(sessionID uint64) (uint64, error) { if sess, err := s.cache.Get(sessionID); err == nil { diff --git a/ee/backend/cmd/connector/main.go b/ee/backend/cmd/connector/main.go new file mode 100644 index 000000000..bbbad6a0f --- /dev/null +++ b/ee/backend/cmd/connector/main.go @@ -0,0 +1,94 @@ +package main + +import ( + "log" + "openreplay/backend/pkg/db/postgres" + "openreplay/backend/pkg/db/postgres/pool" + "openreplay/backend/pkg/db/redis" + "openreplay/backend/pkg/projects" + "openreplay/backend/pkg/sessions" + + config "openreplay/backend/internal/config/connector" + "openreplay/backend/internal/connector" + saver "openreplay/backend/pkg/connector" + "openreplay/backend/pkg/memory" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/objectstorage/store" + "openreplay/backend/pkg/queue" + "openreplay/backend/pkg/terminator" +) + +func main() { + log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) + + cfg := config.New() + + objStore, err := store.NewStore(&cfg.ObjectsConfig) + if err != nil { + log.Fatalf("can't init object storage: %s", err) + } + + db, err := saver.NewRedshift(cfg) + if err != nil { + log.Printf("can't init redshift connection: %s", err) + return + } + defer db.Close() + + // Init postgres connection + pgConn, err := pool.New(cfg.Postgres.String()) + if err != nil { + log.Printf("can't init postgres connection: %s", err) + return + } + defer pgConn.Close() + + // Init events module + pg := postgres.NewConn(pgConn) + defer pg.Close() + + // Init redis connection + redisClient, err := redis.New(&cfg.Redis) + if err != nil { + log.Printf("can't init redis connection: %s", err) + } + defer redisClient.Close() + + projManager := projects.New(pgConn, redisClient) + sessManager := sessions.New(pgConn, projManager, redisClient) + + // Saves messages to Redshift + dataSaver := saver.New(cfg, objStore, db, sessManager) + + // Message filter + msgFilter := []int{messages.MsgConsoleLog, messages.MsgCustomEvent, messages.MsgJSException, + messages.MsgNetworkRequest, messages.MsgIssueEvent, messages.MsgCustomIssue, + messages.MsgSessionStart, messages.MsgSessionEnd, messages.MsgConnectionInformation, + messages.MsgMetadata, messages.MsgPageEvent, messages.MsgPerformanceTrackAggr, messages.MsgUserID, + messages.MsgUserAnonymousID, messages.MsgJSException, messages.MsgJSExceptionDeprecated, + messages.MsgInputEvent, messages.MsgMouseClick, messages.MsgIssueEventDeprecated} + + // Init consumer + consumer := queue.NewConsumer( + cfg.GroupConnector, + []string{ + cfg.TopicRawWeb, + cfg.TopicAnalytics, + }, + messages.NewMessageIterator(dataSaver.Handle, msgFilter, true), + false, + cfg.MessageSizeLimit, + ) + + // Init memory manager + memoryManager, err := memory.NewManager(cfg.MemoryLimitMB, cfg.MaxMemoryUsage) + if err != nil { + log.Printf("can't init memory manager: %s", err) + return + } + + // Run service and wait for TERM signal + service := connector.New(cfg, consumer, dataSaver, memoryManager) + log.Printf("Connector service started\n") + terminator.Wait(service) +} diff --git a/ee/backend/internal/connector/service.go b/ee/backend/internal/connector/service.go new file mode 100644 index 000000000..fd4fc0528 --- /dev/null +++ b/ee/backend/internal/connector/service.go @@ -0,0 +1,62 @@ +package connector + +import ( + "log" + "openreplay/backend/internal/config/connector" + "time" + + "openreplay/backend/internal/service" + saver "openreplay/backend/pkg/connector" + "openreplay/backend/pkg/memory" + "openreplay/backend/pkg/queue/types" +) + +type dbImpl struct { + cfg *connector.Config + consumer types.Consumer + saver *saver.Saver + mm memory.Manager +} + +func New(cfg *connector.Config, consumer types.Consumer, saver *saver.Saver, mm memory.Manager) service.Interface { + s := &dbImpl{ + cfg: cfg, + consumer: consumer, + saver: saver, + mm: mm, + } + go s.run() + return s +} + +func (d *dbImpl) run() { + commitTick := time.Tick(d.cfg.CommitBatchTimeout) + for { + select { + case <-commitTick: + d.commit() + case msg := <-d.consumer.Rebalanced(): + log.Println(msg) + default: + if !d.mm.HasFreeMemory() { + continue + } + if err := d.consumer.ConsumeNext(); err != nil { + log.Fatalf("Error on consumption: %v", err) + } + } + } +} + +func (d *dbImpl) commit() { + d.saver.Commit() + d.consumer.Commit() +} + +func (d *dbImpl) Stop() { + d.commit() + if err := d.saver.Close(); err != nil { + log.Printf("saver.Close error: %s", err) + } + d.consumer.Close() +} diff --git a/ee/backend/pkg/connector/redshift.go b/ee/backend/pkg/connector/redshift.go new file mode 100644 index 000000000..36dc96b0e --- /dev/null +++ b/ee/backend/pkg/connector/redshift.go @@ -0,0 +1,72 @@ +package connector + +import ( + "context" + "database/sql" + "fmt" + "log" + + "openreplay/backend/internal/config/connector" + + _ "github.com/lib/pq" +) + +type Redshift struct { + cfg *connector.Config + ctx context.Context + db *sql.DB +} + +func NewRedshift(cfg *connector.Config) (*Redshift, error) { + var source string + if cfg.ConnectioString != "" { + source = cfg.ConnectioString + } else { + source = fmt.Sprintf("postgres://%s:%s@%s:%d/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database) + } + log.Println("Connecting to Redshift Source: ", source) + sqldb, err := sql.Open("postgres", source) + if err != nil { + return nil, err + } + if err := sqldb.Ping(); err != nil { + return nil, err + } + return &Redshift{ + cfg: cfg, + ctx: context.Background(), + db: sqldb, + }, nil +} + +func (r *Redshift) Copy(tableName, fileName, delimiter string, creds, gzip bool) error { + var ( + credentials string + gzipSQL string + ) + if creds { + credentials = fmt.Sprintf(`ACCESS_KEY_ID '%s' SECRET_ACCESS_KEY '%s'`, r.cfg.AWSAccessKeyID, r.cfg.AWSSecretAccessKey) + } + if gzip { + gzipSQL = "GZIP" + } + + bucketName := "rdshftbucket" + filePath := fmt.Sprintf("s3://%s/%s", bucketName, fileName) + + copySQL := fmt.Sprintf(`COPY "%s" FROM '%s' WITH %s TIMEFORMAT 'auto' DATEFORMAT 'auto' TRUNCATECOLUMNS + STATUPDATE ON %s DELIMITER AS '%s' IGNOREHEADER 1 REMOVEQUOTES ESCAPE TRIMBLANKS EMPTYASNULL ACCEPTANYDATE`, + tableName, filePath, gzipSQL, credentials, delimiter) + log.Printf("Running command: %s", copySQL) + + _, err := r.db.ExecContext(r.ctx, copySQL) + return err +} + +func (r *Redshift) ExecutionDuration(fileName string) (int, error) { + return 0, nil +} + +func (r *Redshift) Close() error { + return r.db.Close() +} diff --git a/ee/backend/pkg/connector/saver.go b/ee/backend/pkg/connector/saver.go new file mode 100644 index 000000000..b65f2f0ba --- /dev/null +++ b/ee/backend/pkg/connector/saver.go @@ -0,0 +1,556 @@ +package connector + +import ( + "bytes" + "fmt" + "github.com/google/uuid" + "log" + "openreplay/backend/internal/http/geoip" + "openreplay/backend/pkg/sessions" + "strconv" + "time" + + config "openreplay/backend/internal/config/connector" + "openreplay/backend/pkg/messages" + "openreplay/backend/pkg/objectstorage" +) + +// Saver collect sessions and events and saves them to Redshift +type Saver struct { + cfg *config.Config + objStorage objectstorage.ObjectStorage + db *Redshift + sessModule sessions.Sessions + sessions map[uint64]map[string]string + updatedSessions map[uint64]bool + lastUpdate map[uint64]time.Time + finishedSessions []uint64 + events []map[string]string +} + +func New(cfg *config.Config, objStorage objectstorage.ObjectStorage, db *Redshift, sessions sessions.Sessions) *Saver { + if cfg == nil { + log.Fatal("connector config is empty") + } + // Validate column names in sessions table + if err := validateColumnNames(sessionColumns); err != nil { + log.Printf("can't validate column names: %s", err) + } + // Validate column names in events table + if err := validateColumnNames(eventColumns); err != nil { + log.Printf("can't validate column names: %s", err) + } + return &Saver{ + cfg: cfg, + objStorage: objStorage, + db: db, + sessModule: sessions, + updatedSessions: make(map[uint64]bool, 0), + lastUpdate: make(map[uint64]time.Time, 0), + } +} + +var sessionColumns = []string{ + "sessionid", + "user_agent", + "user_browser", + "user_browser_version", + "user_country", + "user_device", + "user_device_heap_size", + "user_device_memory_size", + "user_device_type", + "user_os", + "user_os_version", + "user_uuid", + "connection_effective_bandwidth", + "connection_type", + "metadata_key", + "metadata_value", + "referrer", + "user_anonymous_id", + "user_id", + "session_start_timestamp", + "session_end_timestamp", + "session_duration", + "first_contentful_paint", + "speed_index", + "visually_complete", + "timing_time_to_interactive", + "avg_cpu", + "avg_fps", + "max_cpu", + "max_fps", + "max_total_js_heap_size", + "max_used_js_heap_size", + "js_exceptions_count", + "inputs_count", + "clicks_count", + "issues_count", + "urls_count", +} + +var sessionInts = []string{ + "user_device_heap_size", + "user_device_memory_size", + "connection_effective_bandwidth", + "first_contentful_paint", + "speed_index", + "visually_complete", + "timing_time_to_interactive", + "avg_cpu", + "avg_fps", + "max_cpu", + "max_fps", + "max_total_js_heap_size", + "max_used_js_heap_size", + "js_exceptions_count", + "inputs_count", + "clicks_count", + "issues_count", + "urls_count", +} + +var eventColumns = []string{ + "sessionid", + "consolelog_level", + "consolelog_value", + "customevent_name", + "customevent_payload", + "jsexception_message", + "jsexception_name", + "jsexception_payload", + "jsexception_metadata", + "networkrequest_type", + "networkrequest_method", + "networkrequest_url", + "networkrequest_request", + "networkrequest_response", + "networkrequest_status", + "networkrequest_timestamp", + "networkrequest_duration", + "issueevent_message_id", + "issueevent_timestamp", + "issueevent_type", + "issueevent_context_string", + "issueevent_context", + "issueevent_payload", + "issueevent_url", + "customissue_name", + "customissue_payload", + "received_at", + "batch_order_number", +} + +func QUOTES(s string) string { + return strconv.Quote(s) +} + +func handleEvent(msg messages.Message) map[string]string { + event := make(map[string]string) + + switch m := msg.(type) { + case *messages.ConsoleLog: + event["consolelog_level"] = QUOTES(m.Level) + event["consolelog_value"] = QUOTES(m.Value) + case *messages.CustomEvent: + event["customevent_name"] = QUOTES(m.Name) + event["customevent_payload"] = QUOTES(m.Payload) + case *messages.JSException: + event["jsexception_name"] = QUOTES(m.Name) + event["jsexception_message"] = QUOTES(m.Message) + event["jsexception_payload"] = QUOTES(m.Payload) + event["jsexception_metadata"] = QUOTES(m.Metadata) + case *messages.NetworkRequest: + event["networkrequest_type"] = QUOTES(m.Type) + event["networkrequest_method"] = QUOTES(m.Method) + event["networkrequest_url"] = QUOTES(m.URL) + event["networkrequest_request"] = QUOTES(m.Request) + event["networkrequest_response"] = QUOTES(m.Response) + event["networkrequest_status"] = fmt.Sprintf("%d", m.Status) + event["networkrequest_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + event["networkrequest_duration"] = fmt.Sprintf("%d", m.Duration) + case *messages.IssueEvent: + event["issueevent_message_id"] = fmt.Sprintf("%d", m.MessageID) + event["issueevent_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + event["issueevent_type"] = QUOTES(m.Type) + event["issueevent_context_string"] = QUOTES(m.ContextString) + event["issueevent_context"] = QUOTES(m.Context) + event["issueevent_payload"] = QUOTES(m.Payload) + event["issueevent_url"] = QUOTES(m.URL) + case *messages.CustomIssue: + event["customissue_name"] = QUOTES(m.Name) + event["customissue_payload"] = QUOTES(m.Payload) + } + + if len(event) == 0 { + return nil + } + event["sessionid"] = fmt.Sprintf("%d", msg.SessionID()) + event["received_at"] = fmt.Sprintf("%d", uint64(time.Now().UnixMilli())) + event["batch_order_number"] = fmt.Sprintf("%d", 0) + return event +} + +func (s *Saver) handleSession(msg messages.Message) { + // Filter out messages that are not related to session table + switch msg.(type) { + case *messages.SessionStart, *messages.SessionEnd, *messages.ConnectionInformation, *messages.Metadata, + *messages.PageEvent, *messages.PerformanceTrackAggr, *messages.UserID, *messages.UserAnonymousID, + *messages.JSException, *messages.JSExceptionDeprecated, *messages.InputEvent, *messages.MouseClick, + *messages.IssueEvent, *messages.IssueEventDeprecated: + default: + return + } + if s.sessions == nil { + s.sessions = make(map[uint64]map[string]string) + } + sess, ok := s.sessions[msg.SessionID()] + if !ok { + // Try to load session from cache + cached, err := s.sessModule.GetCached(msg.SessionID()) + if err != nil && err != sessions.ErrSessionNotFound { + log.Printf("Failed to get cached session: %v", err) + } + if cached != nil { + sess = cached + } else { + sess = make(map[string]string) + sess[`sessionid`] = fmt.Sprintf("%d", msg.SessionID()) + } + } + if s.sessions[msg.SessionID()] == nil { + s.sessions[msg.SessionID()] = make(map[string]string) + s.sessions[msg.SessionID()][`sessionid`] = fmt.Sprintf("%d", msg.SessionID()) + sess = s.sessions[msg.SessionID()] + } + + // Parse message and add to session + updated := true + switch m := msg.(type) { + case *messages.SessionStart: + sess["session_start_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + sess["user_uuid"] = QUOTES(m.UserUUID) + sess["user_agent"] = QUOTES(m.UserAgent) + sess["user_os"] = QUOTES(m.UserOS) + sess["user_os_version"] = QUOTES(m.UserOSVersion) + sess["user_browser"] = QUOTES(m.UserBrowser) + sess["user_browser_version"] = QUOTES(m.UserBrowserVersion) + sess["user_device"] = QUOTES(m.UserDevice) + sess["user_device_type"] = QUOTES(m.UserDeviceType) + sess["user_device_memory_size"] = fmt.Sprintf("%d", m.UserDeviceMemorySize) + sess["user_device_heap_size"] = fmt.Sprintf("%d", m.UserDeviceHeapSize) + geoInfo := geoip.UnpackGeoRecord(m.UserCountry) + sess["user_country"] = QUOTES(geoInfo.Country) + case *messages.SessionEnd: + sess["session_end_timestamp"] = fmt.Sprintf("%d", m.Timestamp) + info, err := s.sessModule.Get(msg.SessionID()) + if err != nil { + log.Printf("Error getting session info: %v", err) + break + } + // Check all required fields are present + sess["session_duration"] = fmt.Sprintf("%d", *info.Duration) + if sess["user_agent"] == "" && info.UserAgent != "" { + sess["user_agent"] = QUOTES(info.UserAgent) + } + if sess["user_browser"] == "" && info.UserBrowser != "" { + sess["user_browser"] = QUOTES(info.UserBrowser) + } + if sess["user_browser_version"] == "" && info.UserBrowserVersion != "" { + sess["user_browser_version"] = QUOTES(info.UserBrowserVersion) + } + if sess["user_os"] == "" && info.UserOS != "" { + sess["user_os"] = QUOTES(info.UserOS) + } + if sess["user_os_version"] == "" && info.UserOSVersion != "" { + sess["user_os_version"] = QUOTES(info.UserOSVersion) + } + if sess["user_device"] == "" && info.UserDevice != "" { + sess["user_device"] = QUOTES(info.UserDevice) + } + if sess["user_device_type"] == "" && info.UserDeviceType != "" { + sess["user_device_type"] = QUOTES(info.UserDeviceType) + } + if sess["user_device_memory_size"] == "" && info.UserDeviceMemorySize != 0 { + sess["user_device_memory_size"] = fmt.Sprintf("%d", info.UserDeviceMemorySize) + } + if sess["user_device_heap_size"] == "" && info.UserDeviceHeapSize != 0 { + sess["user_device_heap_size"] = fmt.Sprintf("%d", info.UserDeviceHeapSize) + } + if sess["user_country"] == "" && info.UserCountry != "" { + sess["user_country"] = QUOTES(info.UserCountry) + } + if sess["user_uuid"] == "" && info.UserUUID != "" { + sess["user_uuid"] = QUOTES(info.UserUUID) + } + if sess["session_start_timestamp"] == "" && info.Timestamp != 0 { + sess["session_start_timestamp"] = fmt.Sprintf("%d", info.Timestamp) + } + if sess["user_anonymous_id"] == "" && info.UserAnonymousID != nil { + sess["user_anonymous_id"] = QUOTES(*info.UserAnonymousID) + } + if sess["user_id"] == "" && info.UserID != nil { + sess["user_id"] = QUOTES(*info.UserID) + } + if sess["urls_count"] == "" && info.PagesCount != 0 { + sess["urls_count"] = fmt.Sprintf("%d", info.PagesCount) + } + // Check int fields + for _, field := range sessionInts { + if sess[field] == "" { + sess[field] = fmt.Sprintf("%d", 0) + } + } + case *messages.ConnectionInformation: + sess["connection_effective_bandwidth"] = fmt.Sprintf("%d", m.Downlink) + sess["connection_type"] = QUOTES(m.Type) + case *messages.Metadata: + sess["metadata_key"] = QUOTES(m.Key) + sess["metadata_value"] = QUOTES(m.Value) + case *messages.PageEvent: + sess["referrer"] = QUOTES(m.Referrer) + sess["first_contentful_paint"] = fmt.Sprintf("%d", m.FirstContentfulPaint) + sess["speed_index"] = fmt.Sprintf("%d", m.SpeedIndex) + sess["timing_time_to_interactive"] = fmt.Sprintf("%d", m.TimeToInteractive) + sess["visually_complete"] = fmt.Sprintf("%d", m.VisuallyComplete) + currUrlsCount, err := strconv.Atoi(sess["urls_count"]) + if err != nil { + currUrlsCount = 0 + } + sess["urls_count"] = fmt.Sprintf("%d", currUrlsCount+1) + case *messages.PerformanceTrackAggr: + sess["avg_cpu"] = fmt.Sprintf("%d", m.AvgCPU) + sess["avg_fps"] = fmt.Sprintf("%d", m.AvgFPS) + sess["max_cpu"] = fmt.Sprintf("%d", m.MaxCPU) + sess["max_fps"] = fmt.Sprintf("%d", m.MaxFPS) + sess["max_total_js_heap_size"] = fmt.Sprintf("%d", m.MaxTotalJSHeapSize) + sess["max_used_js_heap_size"] = fmt.Sprintf("%d", m.MaxUsedJSHeapSize) + case *messages.UserID: + if m.ID != "" { + sess["user_id"] = QUOTES(m.ID) + } + case *messages.UserAnonymousID: + sess["user_anonymous_id"] = QUOTES(m.ID) + case *messages.JSException, *messages.JSExceptionDeprecated: + currExceptionsCount, err := strconv.Atoi(sess["js_exceptions_count"]) + if err != nil { + currExceptionsCount = 0 + } + sess["js_exceptions_count"] = fmt.Sprintf("%d", currExceptionsCount+1) + case *messages.InputEvent: + currInputsCount, err := strconv.Atoi(sess["inputs_count"]) + if err != nil { + currInputsCount = 0 + } + sess["inputs_count"] = fmt.Sprintf("%d", currInputsCount+1) + case *messages.MouseClick: + currMouseClicksCount, err := strconv.Atoi(sess["clicks_count"]) + if err != nil { + currMouseClicksCount = 0 + } + sess["clicks_count"] = fmt.Sprintf("%d", currMouseClicksCount+1) + case *messages.IssueEvent, *messages.IssueEventDeprecated: + currIssuesCount, err := strconv.Atoi(sess["issues_count"]) + if err != nil { + currIssuesCount = 0 + } + sess["issues_count"] = fmt.Sprintf("%d", currIssuesCount+1) + default: + updated = false + } + if updated { + if s.updatedSessions == nil { + s.updatedSessions = make(map[uint64]bool) + } + s.updatedSessions[msg.SessionID()] = true + } + s.sessions[msg.SessionID()] = sess + s.lastUpdate[msg.SessionID()] = time.Now() +} + +func (s *Saver) Handle(msg messages.Message) { + newEvent := handleEvent(msg) + if newEvent != nil { + if s.events == nil { + s.events = make([]map[string]string, 0, 2) + } + s.events = append(s.events, newEvent) + } + s.handleSession(msg) + if msg.TypeID() == messages.MsgSessionEnd { + if s.finishedSessions == nil { + s.finishedSessions = make([]uint64, 0) + } + s.finishedSessions = append(s.finishedSessions, msg.SessionID()) + } + return +} + +func eventsToBuffer(batch []map[string]string) *bytes.Buffer { + buf := bytes.NewBuffer(nil) + + // Write header + for _, column := range eventColumns { + buf.WriteString(column + "|") + } + buf.Truncate(buf.Len() - 1) + + // Write data + for _, event := range batch { + buf.WriteString("\n") + for _, column := range eventColumns { + buf.WriteString(event[column] + "|") + } + buf.Truncate(buf.Len() - 1) + } + return buf +} + +func (s *Saver) commitEvents() { + if len(s.events) == 0 { + log.Printf("empty events batch") + return + } + l := len(s.events) + + // Send data to S3 + fileName := fmt.Sprintf("connector_data/%s-%s.csv", s.cfg.EventsTableName, uuid.New().String()) + // Create csv file + buf := eventsToBuffer(s.events) + // Clear events batch + s.events = nil + + reader := bytes.NewReader(buf.Bytes()) + if err := s.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { + log.Printf("can't upload file to s3: %s", err) + return + } + // Copy data from s3 bucket to redshift + if err := s.db.Copy(s.cfg.EventsTableName, fileName, "|", true, false); err != nil { + log.Printf("can't copy data from s3 to redshift: %s", err) + return + } + log.Printf("events batch of %d events is successfully saved", l) +} + +func sessionsToBuffer(batch []map[string]string) *bytes.Buffer { + buf := bytes.NewBuffer(nil) + + // Write header + for _, column := range sessionColumns { + buf.WriteString(column + "|") + } + buf.Truncate(buf.Len() - 1) + + // Write data + for _, sess := range batch { + buf.WriteString("\n") + for _, column := range sessionColumns { + buf.WriteString(sess[column] + "|") + } + buf.Truncate(buf.Len() - 1) + } + return buf +} + +func (s *Saver) commitSessions() { + if len(s.finishedSessions) == 0 { + log.Printf("empty sessions batch") + return + } + l := len(s.finishedSessions) + sessions := make([]map[string]string, 0, len(s.finishedSessions)) + toKeep := make([]uint64, 0, len(s.finishedSessions)) + toSend := make([]uint64, 0, len(s.finishedSessions)) + for _, sessionID := range s.finishedSessions { + // ts, now, ts+1min + if s.lastUpdate[sessionID].Add(time.Minute * 1).After(time.Now()) { + toKeep = append(toKeep, sessionID) + } else { + sessions = append(sessions, s.sessions[sessionID]) + toSend = append(toSend, sessionID) + } + } + log.Printf("finished: %d, to keep: %d, to send: %d", l, len(toKeep), len(toSend)) + + // Send data to S3 + fileName := fmt.Sprintf("connector_data/%s-%s.csv", s.cfg.SessionsTableName, uuid.New().String()) + // Create csv file + buf := sessionsToBuffer(sessions) + + reader := bytes.NewReader(buf.Bytes()) + if err := s.objStorage.Upload(reader, fileName, "text/csv", objectstorage.NoCompression); err != nil { + log.Printf("can't upload file to s3: %s", err) + return + } + // Copy data from s3 bucket to redshift + if err := s.db.Copy(s.cfg.SessionsTableName, fileName, "|", true, false); err != nil { + log.Printf("can't copy data from s3 to redshift: %s", err) + return + } + // Clear current list of finished sessions + for _, sessionID := range toSend { + delete(s.sessions, sessionID) // delete session info + delete(s.lastUpdate, sessionID) // delete last session update timestamp + } + s.finishedSessions = toKeep + log.Printf("sessions batch of %d sessions is successfully saved", l) +} + +// Commit saves batch to Redshift +func (s *Saver) Commit() { + // Cache updated sessions + start := time.Now() + for sessionID, _ := range s.updatedSessions { + if err := s.sessModule.AddCached(sessionID, s.sessions[sessionID]); err != nil { + log.Printf("Error adding session to cache: %v", err) + } + } + log.Printf("Cached %d sessions in %s", len(s.updatedSessions), time.Since(start)) + s.updatedSessions = nil + // Commit events and sessions (send to Redshift) + s.commitEvents() + s.checkZombieSessions() + s.commitSessions() +} + +func (s *Saver) checkZombieSessions() { + // Check if there are old sessions that should be sent to Redshift + finished := make(map[uint64]bool, len(s.finishedSessions)) + for _, sessionID := range s.finishedSessions { + finished[sessionID] = true + } + now := time.Now() + zombieSessionsCount := 0 + for sessionID, _ := range s.sessions { + if finished[sessionID] { + continue + } + if s.lastUpdate[sessionID].Add(time.Minute * 5).Before(now) { + s.finishedSessions = append(s.finishedSessions, sessionID) + zombieSessionsCount++ + } + } + if zombieSessionsCount > 0 { + log.Printf("Found %d zombie sessions", zombieSessionsCount) + } +} + +func (s *Saver) Close() error { + // Close connection to Redshift + return nil +} + +var reservedWords = []string{"ALL", "ANALYSE", "ANALYZE", "AND", "ANY", "ARRAY", "AS", "ASC", "ASYMMETRIC", "BOTH", "CASE", "CAST", "CHECK", "COLLATE", "COLUMN", "CONSTRAINT", "CREATE", "CROSS", "CURRENT_CATALOG", "CURRENT_DATE", "CURRENT_ROLE", "CURRENT_SCHEMA", "CURRENT_TIME", "CURRENT_TIMESTAMP", "CURRENT_USER", "DEFAULT", "DEFERRABLE", "DESC", "DISTINCT", "DO", "ELSE", "END", "EXCEPT", "FALSE", "FOR", "FOREIGN", "FREEZE", "FROM", "FULL", "GRANT", "GROUP", "HAVING", "ILIKE", "IN", "INITIALLY", "INNER", "INTERSECT", "INTO", "IS", "ISNULL", "JOIN", "LEADING", "LEFT", "LIKE", "LIMIT", "LOCALTIME", "LOCALTIMESTAMP", "NATURAL", "NEW", "NOT", "NOTNULL", "NULL", "OFF", "OFFSET", "OLD", "ON", "ONLY", "OR", "ORDER", "OUTER", "OVERLAPS", "PLACING", "PRIMARY", "REFERENCES", "RETURNING", "RIGHT", "SELECT", "SESSION_USER", "SIMILAR", "SOME", "SYMMETRIC", "TABLE", "THEN", "TO", "TRAILING", "TRUE", "UNION", "UNIQUE", "USER", "USING", "VERBOSE", "WHEN", "WHERE", "WINDOW", "WITH"} + +func validateColumnNames(columns []string) error { + for _, column := range columns { + for _, reservedWord := range reservedWords { + if column == reservedWord { + return fmt.Errorf("column name %s is a reserved word", column) + } + } + } + return nil +} diff --git a/ee/backend/pkg/sessions/redis.go b/ee/backend/pkg/sessions/redis.go index 0da788533..5457c99a5 100644 --- a/ee/backend/pkg/sessions/redis.go +++ b/ee/backend/pkg/sessions/redis.go @@ -14,6 +14,50 @@ type cacheImpl struct { db *redis.Client } +func (c *cacheImpl) SetCache(sessID uint64, data map[string]string) error { + if c.db == nil { + return ErrDisabledCache + } + if data == nil { + return errors.New("session is nil") + } + if sessID == 0 { + return errors.New("session id is 0") + } + start := time.Now() + sessionBytes, err := json.Marshal(data) + if err != nil { + return err + } + if _, err = c.db.Redis.Set(fmt.Sprintf("session:cache:id:%d", sessID), sessionBytes, time.Minute*120).Result(); err != nil { + return err + } + database.RecordRedisRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "setCache", "session") + database.IncreaseRedisRequests("setCache", "sessions") + return nil +} + +func (c *cacheImpl) GetCache(sessID uint64) (map[string]string, error) { + if c.db == nil { + return nil, ErrDisabledCache + } + if sessID == 0 { + return nil, errors.New("session id is 0") + } + start := time.Now() + result, err := c.db.Redis.Get(fmt.Sprintf("session:cache:id:%d", sessID)).Result() + if err != nil { + return nil, err + } + session := map[string]string{} + if err = json.Unmarshal([]byte(result), &session); err != nil { + return nil, err + } + database.RecordRedisRequestDuration(float64(time.Now().Sub(start).Milliseconds()), "getCache", "session") + database.IncreaseRedisRequests("getCache", "sessions") + return session, nil +} + func (c *cacheImpl) Set(session *Session) error { if c.db == nil { return ErrDisabledCache