From 2abf063ba248aba6acd9ae1ab5498b41e439860f Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 2 Nov 2022 17:00:41 +0100 Subject: [PATCH] Sessions cache layer (#789) * feat(backend/db): cache layer for sessions with auto deleting logic --- backend/cmd/db/main.go | 3 +- backend/pkg/db/cache/cache.go | 73 +++++++++++++++++++++++++ backend/pkg/db/cache/messages-common.go | 1 - backend/pkg/db/cache/messages-ios.go | 14 +++-- backend/pkg/db/cache/messages-web.go | 14 +++-- backend/pkg/db/cache/pg-cache.go | 25 +++------ backend/pkg/db/cache/project.go | 8 +-- backend/pkg/db/cache/session.go | 46 +++++++++++----- backend/pkg/db/types/session.go | 2 +- 9 files changed, 136 insertions(+), 50 deletions(-) create mode 100644 backend/pkg/db/cache/cache.go diff --git a/backend/cmd/db/main.go b/backend/cmd/db/main.go index 2b99883d2..bbdf8f7b8 100644 --- a/backend/cmd/db/main.go +++ b/backend/cmd/db/main.go @@ -30,7 +30,8 @@ func main() { cfg := db.New() // Init database - pg := cache.NewPGCache(postgres.NewConn(cfg.Postgres, cfg.BatchQueueLimit, cfg.BatchSizeLimit, metrics), cfg.ProjectExpirationTimeoutMs) + pg := cache.NewPGCache( + postgres.NewConn(cfg.Postgres, cfg.BatchQueueLimit, cfg.BatchSizeLimit, metrics), cfg.ProjectExpirationTimeoutMs) defer pg.Close() // HandlersFabric returns the list of message handlers we want to be applied to each incoming message. diff --git a/backend/pkg/db/cache/cache.go b/backend/pkg/db/cache/cache.go new file mode 100644 index 000000000..9075ae73f --- /dev/null +++ b/backend/pkg/db/cache/cache.go @@ -0,0 +1,73 @@ +package cache + +import ( + "log" + "openreplay/backend/pkg/db/postgres" + "openreplay/backend/pkg/db/types" + "sync" + "time" +) + +type SessionMeta struct { + *types.Session + lastUse time.Time +} + +type ProjectMeta struct { + *types.Project + expirationTime time.Time +} + +type Cache interface { + SetSession(sess *types.Session) + HasSession(sessID uint64) bool + GetSession(sessionID uint64) (*types.Session, error) + GetProject(projectID uint32) (*types.Project, error) + GetProjectByKey(projectKey string) (*types.Project, error) +} + +type cacheImpl struct { + conn *postgres.Conn + mutex sync.RWMutex + sessions map[uint64]*SessionMeta + projects map[uint32]*ProjectMeta + projectsByKeys sync.Map + projectExpirationTimeout time.Duration +} + +func NewCache(conn *postgres.Conn, projectExpirationTimeoutMs int64) Cache { + newCache := &cacheImpl{ + conn: conn, + sessions: make(map[uint64]*SessionMeta), + projects: make(map[uint32]*ProjectMeta), + projectExpirationTimeout: time.Duration(1000 * projectExpirationTimeoutMs), + } + go newCache.cleaner() + return newCache +} + +func (c *cacheImpl) cleaner() { + cleanTick := time.Tick(time.Minute * 5) + for { + select { + case <-cleanTick: + c.clearCache() + } + } +} + +func (c *cacheImpl) clearCache() { + c.mutex.Lock() + defer c.mutex.Unlock() + + now := time.Now() + cacheSize := len(c.sessions) + deleted := 0 + for id, sess := range c.sessions { + if now.Sub(sess.lastUse).Minutes() > 3 { + deleted++ + delete(c.sessions, id) + } + } + log.Printf("cache cleaner: deleted %d/%d sessions", deleted, cacheSize) +} diff --git a/backend/pkg/db/cache/messages-common.go b/backend/pkg/db/cache/messages-common.go index a4df19cc3..46fc64790 100644 --- a/backend/pkg/db/cache/messages-common.go +++ b/backend/pkg/db/cache/messages-common.go @@ -19,7 +19,6 @@ func (c *PGCache) HandleSessionEnd(sessionID uint64) error { if err := c.Conn.HandleSessionEnd(sessionID); err != nil { log.Printf("can't handle session end: %s", err) } - c.DeleteSession(sessionID) return nil } diff --git a/backend/pkg/db/cache/messages-ios.go b/backend/pkg/db/cache/messages-ios.go index e0463c431..e65051f33 100644 --- a/backend/pkg/db/cache/messages-ios.go +++ b/backend/pkg/db/cache/messages-ios.go @@ -1,16 +1,16 @@ package cache import ( - "errors" + "fmt" . "openreplay/backend/pkg/db/types" . "openreplay/backend/pkg/messages" ) func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) error { - if c.sessions[sessionID] != nil { - return errors.New("This session already in cache!") + if c.cache.HasSession(sessionID) { + return fmt.Errorf("session %d already in cache", sessionID) } - c.sessions[sessionID] = &Session{ + newSess := &Session{ SessionID: sessionID, Platform: "ios", Timestamp: s.Timestamp, @@ -24,8 +24,10 @@ func (c *PGCache) InsertIOSSessionStart(sessionID uint64, s *IOSSessionStart) er UserCountry: s.UserCountry, UserDeviceType: s.UserDeviceType, } - if err := c.Conn.InsertSessionStart(sessionID, c.sessions[sessionID]); err != nil { - c.sessions[sessionID] = nil + c.cache.SetSession(newSess) + if err := c.Conn.InsertSessionStart(sessionID, newSess); err != nil { + // don't know why? + c.cache.SetSession(nil) return err } return nil diff --git a/backend/pkg/db/cache/messages-web.go b/backend/pkg/db/cache/messages-web.go index 50b531d0a..de97ef42a 100644 --- a/backend/pkg/db/cache/messages-web.go +++ b/backend/pkg/db/cache/messages-web.go @@ -1,7 +1,7 @@ package cache import ( - "errors" + "fmt" . "openreplay/backend/pkg/db/types" . "openreplay/backend/pkg/messages" ) @@ -31,10 +31,10 @@ func (c *PGCache) InsertWebSessionStart(sessionID uint64, s *SessionStart) error } func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error { - if c.sessions[sessionID] != nil { - return errors.New("This session already in cache!") + if c.cache.HasSession(sessionID) { + return fmt.Errorf("session %d already in cache", sessionID) } - c.sessions[sessionID] = &Session{ + newSess := &Session{ SessionID: sessionID, Platform: "web", Timestamp: s.Timestamp, @@ -55,8 +55,10 @@ func (c *PGCache) HandleWebSessionStart(sessionID uint64, s *SessionStart) error UserDeviceHeapSize: s.UserDeviceHeapSize, UserID: &s.UserID, } - if err := c.Conn.HandleSessionStart(sessionID, c.sessions[sessionID]); err != nil { - c.sessions[sessionID] = nil + c.cache.SetSession(newSess) + if err := c.Conn.HandleSessionStart(sessionID, newSess); err != nil { + // don't know why? + c.cache.SetSession(nil) return err } return nil diff --git a/backend/pkg/db/cache/pg-cache.go b/backend/pkg/db/cache/pg-cache.go index bb87f9f4c..75e468d35 100644 --- a/backend/pkg/db/cache/pg-cache.go +++ b/backend/pkg/db/cache/pg-cache.go @@ -1,31 +1,20 @@ package cache import ( - "sync" - "time" - "openreplay/backend/pkg/db/postgres" - . "openreplay/backend/pkg/db/types" ) -type ProjectMeta struct { - *Project - expirationTime time.Time -} - type PGCache struct { *postgres.Conn - sessions map[uint64]*Session - projects map[uint32]*ProjectMeta - projectsByKeys sync.Map // map[string]*ProjectMeta - projectExpirationTimeout time.Duration + cache Cache } -func NewPGCache(pgConn *postgres.Conn, projectExpirationTimeoutMs int64) *PGCache { +func NewPGCache(conn *postgres.Conn, projectExpirationTimeoutMs int64) *PGCache { + // Create in-memory cache layer for sessions and projects + c := NewCache(conn, projectExpirationTimeoutMs) + // Return PG wrapper with integrated cache layer return &PGCache{ - Conn: pgConn, - sessions: make(map[uint64]*Session), - projects: make(map[uint32]*ProjectMeta), - projectExpirationTimeout: time.Duration(1000 * projectExpirationTimeoutMs), + Conn: conn, + cache: c, } } diff --git a/backend/pkg/db/cache/project.go b/backend/pkg/db/cache/project.go index 35d952302..60b868501 100644 --- a/backend/pkg/db/cache/project.go +++ b/backend/pkg/db/cache/project.go @@ -5,7 +5,7 @@ import ( "time" ) -func (c *PGCache) GetProjectByKey(projectKey string) (*Project, error) { +func (c *cacheImpl) GetProjectByKey(projectKey string) (*Project, error) { pmInterface, found := c.projectsByKeys.Load(projectKey) if found { if pm, ok := pmInterface.(*ProjectMeta); ok { @@ -15,7 +15,7 @@ func (c *PGCache) GetProjectByKey(projectKey string) (*Project, error) { } } - p, err := c.Conn.GetProjectByKey(projectKey) + p, err := c.conn.GetProjectByKey(projectKey) if err != nil { return nil, err } @@ -24,12 +24,12 @@ func (c *PGCache) GetProjectByKey(projectKey string) (*Project, error) { return p, nil } -func (c *PGCache) GetProject(projectID uint32) (*Project, error) { +func (c *cacheImpl) GetProject(projectID uint32) (*Project, error) { if c.projects[projectID] != nil && time.Now().Before(c.projects[projectID].expirationTime) { return c.projects[projectID].Project, nil } - p, err := c.Conn.GetProject(projectID) + p, err := c.conn.GetProject(projectID) if err != nil { return nil, err } diff --git a/backend/pkg/db/cache/session.go b/backend/pkg/db/cache/session.go index 89b8f89f8..f03f1e955 100644 --- a/backend/pkg/db/cache/session.go +++ b/backend/pkg/db/cache/session.go @@ -4,28 +4,48 @@ import ( "errors" "github.com/jackc/pgx/v4" . "openreplay/backend/pkg/db/types" + "time" ) var NilSessionInCacheError = errors.New("nil session in error") -func (c *PGCache) GetSession(sessionID uint64) (*Session, error) { - if s, inCache := c.sessions[sessionID]; inCache { - if s == nil { - return s, NilSessionInCacheError - } - return s, nil +func (c *cacheImpl) SetSession(sess *Session) { + c.mutex.Lock() + defer c.mutex.Unlock() + + if meta, ok := c.sessions[sess.SessionID]; ok { + meta.Session = sess + meta.lastUse = time.Now() + } else { + c.sessions[sess.SessionID] = &SessionMeta{sess, time.Now()} } - s, err := c.Conn.GetSession(sessionID) +} + +func (c *cacheImpl) HasSession(sessID uint64) bool { + c.mutex.RLock() + defer c.mutex.RUnlock() + + sess, ok := c.sessions[sessID] + return ok && sess.Session != nil +} + +func (c *cacheImpl) GetSession(sessionID uint64) (*Session, error) { + c.mutex.Lock() + defer c.mutex.Unlock() + + if s, inCache := c.sessions[sessionID]; inCache { + if s.Session == nil { + return nil, NilSessionInCacheError + } + return s.Session, nil + } + s, err := c.conn.GetSession(sessionID) if err == pgx.ErrNoRows { - c.sessions[sessionID] = nil + c.sessions[sessionID] = &SessionMeta{nil, time.Now()} } if err != nil { return nil, err } - c.sessions[sessionID] = s + c.sessions[sessionID] = &SessionMeta{s, time.Now()} return s, nil } - -func (c *PGCache) DeleteSession(sessionID uint64) { - delete(c.sessions, sessionID) -} diff --git a/backend/pkg/db/types/session.go b/backend/pkg/db/types/session.go index 53fef410c..202eb9966 100644 --- a/backend/pkg/db/types/session.go +++ b/backend/pkg/db/types/session.go @@ -20,7 +20,7 @@ type Session struct { IssueTypes []string IssueScore int - UserID *string // pointer?? + UserID *string UserAnonymousID *string Metadata1 *string Metadata2 *string