Ender logic update (#1435)
* feat(backend): new session end detection logic + several fixes * feat(backend): support partitions managment in ender * feat(backend): added new consumer support to redis * feat(backend): added support for new consumer in kafka * feat(backend): added new consumer support to redis (ee) * feat(backend): small refactoring in ender
This commit is contained in:
parent
d7be57245b
commit
676041d90b
7 changed files with 193 additions and 45 deletions
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"openreplay/backend/pkg/db/redis"
|
"openreplay/backend/pkg/db/redis"
|
||||||
"openreplay/backend/pkg/memory"
|
"openreplay/backend/pkg/memory"
|
||||||
"openreplay/backend/pkg/projects"
|
"openreplay/backend/pkg/projects"
|
||||||
|
"openreplay/backend/pkg/queue/types"
|
||||||
"openreplay/backend/pkg/sessions"
|
"openreplay/backend/pkg/sessions"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
|
@ -95,32 +96,68 @@ func main() {
|
||||||
failedSessionEnds := make(map[uint64]uint64)
|
failedSessionEnds := make(map[uint64]uint64)
|
||||||
duplicatedSessionEnds := make(map[uint64]uint64)
|
duplicatedSessionEnds := make(map[uint64]uint64)
|
||||||
negativeDuration := make(map[uint64]uint64)
|
negativeDuration := make(map[uint64]uint64)
|
||||||
|
shorterDuration := make(map[uint64]int64)
|
||||||
|
diffDuration := make(map[uint64]int64)
|
||||||
|
noSessionInDB := make(map[uint64]uint64)
|
||||||
|
updatedDurations := 0
|
||||||
|
newSessionEnds := 0
|
||||||
|
|
||||||
|
type SessionEndType int
|
||||||
|
const (
|
||||||
|
FailedSessionEnd SessionEndType = iota + 1
|
||||||
|
DuplicatedSessionEnd
|
||||||
|
NegativeDuration
|
||||||
|
ShorterDuration
|
||||||
|
DiffDuration
|
||||||
|
NewSessionEnd
|
||||||
|
NoSessionInDB
|
||||||
|
)
|
||||||
|
|
||||||
// Find ended sessions and send notification to other services
|
// Find ended sessions and send notification to other services
|
||||||
sessionEndGenerator.HandleEndedSessions(func(sessionID uint64, timestamp uint64) bool {
|
sessionEndGenerator.HandleEndedSessions(func(sessionID uint64, timestamp uint64) (bool, int) {
|
||||||
msg := &messages.SessionEnd{Timestamp: timestamp}
|
msg := &messages.SessionEnd{Timestamp: timestamp}
|
||||||
currDuration, err := sessManager.GetDuration(sessionID)
|
currDuration, err := sessManager.GetDuration(sessionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err)
|
log.Printf("getSessionDuration failed, sessID: %d, err: %s", sessionID, err)
|
||||||
}
|
}
|
||||||
|
sess, err := sessManager.Get(sessionID)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("can't get session from database to compare durations, sessID: %d, err: %s", sessionID, err)
|
||||||
|
} else {
|
||||||
|
newDur := timestamp - sess.Timestamp
|
||||||
|
// Skip if session was ended before with same duration
|
||||||
|
if currDuration == newDur {
|
||||||
|
duplicatedSessionEnds[sessionID] = currDuration
|
||||||
|
return true, int(DuplicatedSessionEnd)
|
||||||
|
}
|
||||||
|
// Skip if session was ended before with longer duration
|
||||||
|
if currDuration > newDur {
|
||||||
|
shorterDuration[sessionID] = int64(currDuration) - int64(newDur)
|
||||||
|
return true, int(ShorterDuration)
|
||||||
|
}
|
||||||
|
}
|
||||||
newDuration, err := sessManager.UpdateDuration(sessionID, msg.Timestamp)
|
newDuration, err := sessManager.UpdateDuration(sessionID, msg.Timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if strings.Contains(err.Error(), "integer out of range") {
|
if strings.Contains(err.Error(), "integer out of range") {
|
||||||
// Skip session with broken duration
|
// Skip session with broken duration
|
||||||
failedSessionEnds[sessionID] = timestamp
|
failedSessionEnds[sessionID] = timestamp
|
||||||
return true
|
return true, int(FailedSessionEnd)
|
||||||
}
|
}
|
||||||
if strings.Contains(err.Error(), "is less than zero for uint64") {
|
if strings.Contains(err.Error(), "is less than zero for uint64") {
|
||||||
negativeDuration[sessionID] = timestamp
|
negativeDuration[sessionID] = timestamp
|
||||||
return true
|
return true, int(NegativeDuration)
|
||||||
|
}
|
||||||
|
if strings.Contains(err.Error(), "no rows in result set") {
|
||||||
|
noSessionInDB[sessionID] = timestamp
|
||||||
|
return true, int(NoSessionInDB)
|
||||||
}
|
}
|
||||||
log.Printf("can't save sessionEnd to database, sessID: %d, err: %s", sessionID, err)
|
log.Printf("can't save sessionEnd to database, sessID: %d, err: %s", sessionID, err)
|
||||||
return false
|
return false, 0
|
||||||
}
|
}
|
||||||
|
// Check one more time just in case
|
||||||
if currDuration == newDuration {
|
if currDuration == newDuration {
|
||||||
// Skip session end duplicate
|
|
||||||
duplicatedSessionEnds[sessionID] = currDuration
|
duplicatedSessionEnds[sessionID] = currDuration
|
||||||
return true
|
return true, int(DuplicatedSessionEnd)
|
||||||
}
|
}
|
||||||
if cfg.UseEncryption {
|
if cfg.UseEncryption {
|
||||||
if key := storage.GenerateEncryptionKey(); key != nil {
|
if key := storage.GenerateEncryptionKey(); key != nil {
|
||||||
|
|
@ -133,25 +170,40 @@ func main() {
|
||||||
}
|
}
|
||||||
if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil {
|
if err := producer.Produce(cfg.TopicRawWeb, sessionID, msg.Encode()); err != nil {
|
||||||
log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID)
|
log.Printf("can't send sessionEnd to topic: %s; sessID: %d", err, sessionID)
|
||||||
return false
|
return false, 0
|
||||||
}
|
}
|
||||||
return true
|
if currDuration != 0 {
|
||||||
|
diffDuration[sessionID] = int64(newDuration) - int64(currDuration)
|
||||||
|
updatedDurations++
|
||||||
|
} else {
|
||||||
|
newSessionEnds++
|
||||||
|
}
|
||||||
|
return true, int(NewSessionEnd)
|
||||||
})
|
})
|
||||||
if len(failedSessionEnds) > 0 {
|
if n := len(failedSessionEnds); n > 0 {
|
||||||
log.Println("sessions with wrong duration:", failedSessionEnds)
|
log.Println("sessions with wrong duration:", n, failedSessionEnds)
|
||||||
}
|
}
|
||||||
if len(duplicatedSessionEnds) > 0 {
|
if n := len(negativeDuration); n > 0 {
|
||||||
log.Println("session end duplicates:", duplicatedSessionEnds)
|
log.Println("sessions with negative duration:", n, negativeDuration)
|
||||||
}
|
}
|
||||||
if len(negativeDuration) > 0 {
|
if n := len(noSessionInDB); n > 0 {
|
||||||
log.Println("sessions with negative duration:", negativeDuration)
|
log.Printf("sessions without info in DB: %d, %v", n, noSessionInDB)
|
||||||
}
|
}
|
||||||
|
log.Printf("[INFO] failed: %d, negative: %d, shorter: %d, same: %d, updated: %d, new: %d, not found: %d",
|
||||||
|
len(failedSessionEnds), len(negativeDuration), len(shorterDuration), len(duplicatedSessionEnds),
|
||||||
|
updatedDurations, newSessionEnds, len(noSessionInDB))
|
||||||
producer.Flush(cfg.ProducerTimeout)
|
producer.Flush(cfg.ProducerTimeout)
|
||||||
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
|
if err := consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP); err != nil {
|
||||||
log.Printf("can't commit messages with offset: %s", err)
|
log.Printf("can't commit messages with offset: %s", err)
|
||||||
}
|
}
|
||||||
case msg := <-consumer.Rebalanced():
|
case msg := <-consumer.Rebalanced():
|
||||||
log.Println(msg)
|
log.Printf("Rebalanced event, type: %s, partitions: %+v", msg.Type, msg.Partitions)
|
||||||
|
if msg.Type == types.RebalanceTypeRevoke {
|
||||||
|
sessionEndGenerator.Disable()
|
||||||
|
} else {
|
||||||
|
sessionEndGenerator.ActivePartitions(msg.Partitions)
|
||||||
|
sessionEndGenerator.Enable()
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
if !memoryManager.HasFreeMemory() {
|
if !memoryManager.HasFreeMemory() {
|
||||||
continue
|
continue
|
||||||
|
|
|
||||||
|
|
@ -9,12 +9,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// EndedSessionHandler handler for ended sessions
|
// EndedSessionHandler handler for ended sessions
|
||||||
type EndedSessionHandler func(sessionID uint64, timestamp uint64) bool
|
type EndedSessionHandler func(sessionID uint64, timestamp uint64) (bool, int)
|
||||||
|
|
||||||
// session holds information about user's session live status
|
// session holds information about user's session live status
|
||||||
type session struct {
|
type session struct {
|
||||||
lastTimestamp int64
|
lastTimestamp int64 // timestamp from message broker
|
||||||
lastUpdate int64
|
lastUpdate int64 // local timestamp
|
||||||
lastUserTime uint64
|
lastUserTime uint64
|
||||||
isEnded bool
|
isEnded bool
|
||||||
}
|
}
|
||||||
|
|
@ -24,6 +24,8 @@ type SessionEnder struct {
|
||||||
timeout int64
|
timeout int64
|
||||||
sessions map[uint64]*session // map[sessionID]session
|
sessions map[uint64]*session // map[sessionID]session
|
||||||
timeCtrl *timeController
|
timeCtrl *timeController
|
||||||
|
parts uint64
|
||||||
|
enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(timeout int64, parts int) (*SessionEnder, error) {
|
func New(timeout int64, parts int) (*SessionEnder, error) {
|
||||||
|
|
@ -31,9 +33,38 @@ func New(timeout int64, parts int) (*SessionEnder, error) {
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
sessions: make(map[uint64]*session),
|
sessions: make(map[uint64]*session),
|
||||||
timeCtrl: NewTimeController(parts),
|
timeCtrl: NewTimeController(parts),
|
||||||
|
parts: uint64(parts), // ender uses all partitions by default
|
||||||
|
enabled: true,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (se *SessionEnder) Enable() {
|
||||||
|
se.enabled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (se *SessionEnder) Disable() {
|
||||||
|
se.enabled = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (se *SessionEnder) ActivePartitions(parts []uint64) {
|
||||||
|
activeParts := make(map[uint64]bool, 0)
|
||||||
|
for _, p := range parts {
|
||||||
|
activeParts[p] = true
|
||||||
|
}
|
||||||
|
removedSessions := 0
|
||||||
|
activeSessions := 0
|
||||||
|
for sessID, _ := range se.sessions {
|
||||||
|
if !activeParts[sessID%se.parts] {
|
||||||
|
delete(se.sessions, sessID)
|
||||||
|
removedSessions++
|
||||||
|
} else {
|
||||||
|
activeSessions++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Printf("SessionEnder: %d sessions left in active partitions: %+v, removed %d sessions",
|
||||||
|
activeSessions, parts, removedSessions)
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateSession save timestamp for new sessions and update for existing sessions
|
// UpdateSession save timestamp for new sessions and update for existing sessions
|
||||||
func (se *SessionEnder) UpdateSession(msg messages.Message) {
|
func (se *SessionEnder) UpdateSession(msg messages.Message) {
|
||||||
var (
|
var (
|
||||||
|
|
@ -46,14 +77,14 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) {
|
||||||
log.Printf("got empty timestamp for sessionID: %d", sessionID)
|
log.Printf("got empty timestamp for sessionID: %d", sessionID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
se.timeCtrl.UpdateTime(sessionID, batchTimestamp)
|
se.timeCtrl.UpdateTime(sessionID, batchTimestamp, localTimestamp)
|
||||||
sess, ok := se.sessions[sessionID]
|
sess, ok := se.sessions[sessionID]
|
||||||
if !ok {
|
if !ok {
|
||||||
// Register new session
|
// Register new session
|
||||||
se.sessions[sessionID] = &session{
|
se.sessions[sessionID] = &session{
|
||||||
lastTimestamp: batchTimestamp, // timestamp from message broker
|
lastTimestamp: batchTimestamp,
|
||||||
lastUpdate: localTimestamp, // local timestamp
|
lastUpdate: localTimestamp,
|
||||||
lastUserTime: msgTimestamp, // last timestamp from user's machine
|
lastUserTime: msgTimestamp, // last timestamp from user's machine
|
||||||
isEnded: false,
|
isEnded: false,
|
||||||
}
|
}
|
||||||
ender.IncreaseActiveSessions()
|
ender.IncreaseActiveSessions()
|
||||||
|
|
@ -74,21 +105,53 @@ func (se *SessionEnder) UpdateSession(msg messages.Message) {
|
||||||
|
|
||||||
// HandleEndedSessions runs handler for each ended session and delete information about session in successful case
|
// HandleEndedSessions runs handler for each ended session and delete information about session in successful case
|
||||||
func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
|
func (se *SessionEnder) HandleEndedSessions(handler EndedSessionHandler) {
|
||||||
|
if !se.enabled {
|
||||||
|
log.Printf("SessionEnder is disabled")
|
||||||
|
return
|
||||||
|
}
|
||||||
currTime := time.Now().UnixMilli()
|
currTime := time.Now().UnixMilli()
|
||||||
allSessions, removedSessions := len(se.sessions), 0
|
allSessions, removedSessions := len(se.sessions), 0
|
||||||
|
brokerTime := make(map[int]int, 0)
|
||||||
|
serverTime := make(map[int]int, 0)
|
||||||
|
|
||||||
|
isSessionEnded := func(sessID uint64, sess *session) (bool, int) {
|
||||||
|
// Has been finished already
|
||||||
|
if sess.isEnded {
|
||||||
|
return true, 1
|
||||||
|
}
|
||||||
|
batchTimeDiff := se.timeCtrl.LastBatchTimestamp(sessID) - sess.lastTimestamp
|
||||||
|
|
||||||
|
// Has been finished according to batch timestamp and hasn't been updated for a long time
|
||||||
|
if (batchTimeDiff >= se.timeout) && (currTime-sess.lastUpdate >= se.timeout) {
|
||||||
|
return true, 2
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hasn't been finished according to batch timestamp but hasn't been read from partition for a long time
|
||||||
|
if (batchTimeDiff < se.timeout) && (currTime-se.timeCtrl.LastUpdateTimestamp(sessID) >= se.timeout) {
|
||||||
|
return true, 3
|
||||||
|
}
|
||||||
|
return false, 0
|
||||||
|
}
|
||||||
|
|
||||||
for sessID, sess := range se.sessions {
|
for sessID, sess := range se.sessions {
|
||||||
if sess.isEnded || (se.timeCtrl.LastTimestamp(sessID)-sess.lastTimestamp > se.timeout) ||
|
if ended, endCase := isSessionEnded(sessID, sess); ended {
|
||||||
(currTime-sess.lastUpdate > se.timeout) {
|
|
||||||
sess.isEnded = true
|
sess.isEnded = true
|
||||||
if handler(sessID, sess.lastUserTime) {
|
if res, _ := handler(sessID, sess.lastUserTime); res {
|
||||||
delete(se.sessions, sessID)
|
delete(se.sessions, sessID)
|
||||||
ender.DecreaseActiveSessions()
|
ender.DecreaseActiveSessions()
|
||||||
ender.IncreaseClosedSessions()
|
ender.IncreaseClosedSessions()
|
||||||
removedSessions++
|
removedSessions++
|
||||||
|
if endCase == 2 {
|
||||||
|
brokerTime[1]++
|
||||||
|
}
|
||||||
|
if endCase == 3 {
|
||||||
|
serverTime[1]++
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Printf("sessID: %d, userTime: %d", sessID, sess.lastUserTime)
|
log.Printf("sessID: %d, userTime: %d", sessID, sess.lastUserTime)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Printf("Removed %d of %d sessions", removedSessions, allSessions)
|
log.Printf("Removed %d of %d sessions; brokerTime: %d, serverTime: %d",
|
||||||
|
removedSessions, allSessions, brokerTime, serverTime)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,28 @@
|
||||||
package sessionender
|
package sessionender
|
||||||
|
|
||||||
type timeController struct {
|
type timeController struct {
|
||||||
parts uint64
|
parts uint64
|
||||||
lastTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage
|
lastBatchTimestamp map[uint64]int64 // map[partition]consumerTimeOfLastMessage
|
||||||
|
lastUpdateTimestamp map[uint64]int64 // map[partition]systemTimeOfLastMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTimeController(parts int) *timeController {
|
func NewTimeController(parts int) *timeController {
|
||||||
return &timeController{
|
return &timeController{
|
||||||
parts: uint64(parts),
|
parts: uint64(parts),
|
||||||
lastTimestamp: make(map[uint64]int64),
|
lastBatchTimestamp: make(map[uint64]int64),
|
||||||
|
lastUpdateTimestamp: make(map[uint64]int64),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *timeController) UpdateTime(sessionID uint64, timestamp int64) {
|
func (tc *timeController) UpdateTime(sessionID uint64, batchTimestamp, updateTimestamp int64) {
|
||||||
tc.lastTimestamp[sessionID%tc.parts] = timestamp
|
tc.lastBatchTimestamp[sessionID%tc.parts] = batchTimestamp
|
||||||
|
tc.lastUpdateTimestamp[sessionID%tc.parts] = updateTimestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *timeController) LastTimestamp(sessionID uint64) int64 {
|
func (tc *timeController) LastBatchTimestamp(sessionID uint64) int64 {
|
||||||
return tc.lastTimestamp[sessionID%tc.parts]
|
return tc.lastBatchTimestamp[sessionID%tc.parts]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tc *timeController) LastUpdateTimestamp(sessionID uint64) int64 {
|
||||||
|
return tc.lastUpdateTimestamp[sessionID%tc.parts]
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,24 @@
|
||||||
package types
|
package types
|
||||||
|
|
||||||
|
type RebalanceType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
RebalanceTypeAssign RebalanceType = "assign"
|
||||||
|
RebalanceTypeRevoke RebalanceType = "revoke"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PartitionsRebalancedEvent struct {
|
||||||
|
Type RebalanceType
|
||||||
|
Partitions []uint64
|
||||||
|
}
|
||||||
|
|
||||||
// Consumer reads batches of session data from queue (redis or kafka)
|
// Consumer reads batches of session data from queue (redis or kafka)
|
||||||
type Consumer interface {
|
type Consumer interface {
|
||||||
ConsumeNext() error
|
ConsumeNext() error
|
||||||
CommitBack(gap int64) error
|
CommitBack(gap int64) error
|
||||||
Commit() error
|
Commit() error
|
||||||
Close()
|
Close()
|
||||||
Rebalanced() <-chan interface{}
|
Rebalanced() <-chan *PartitionsRebalancedEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
// Producer sends batches of session data to queue (redis or kafka)
|
// Producer sends batches of session data to queue (redis or kafka)
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"openreplay/backend/pkg/messages"
|
"openreplay/backend/pkg/messages"
|
||||||
|
"openreplay/backend/pkg/queue/types"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
@ -27,7 +28,7 @@ type Consumer struct {
|
||||||
idsPending streamPendingIDsMap
|
idsPending streamPendingIDsMap
|
||||||
lastTs int64
|
lastTs int64
|
||||||
autoCommit bool
|
autoCommit bool
|
||||||
event chan interface{}
|
event chan *types.PartitionsRebalancedEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConsumer(group string, streams []string, messageIterator messages.MessageIterator) *Consumer {
|
func NewConsumer(group string, streams []string, messageIterator messages.MessageIterator) *Consumer {
|
||||||
|
|
@ -58,13 +59,13 @@ func NewConsumer(group string, streams []string, messageIterator messages.Messag
|
||||||
group: group,
|
group: group,
|
||||||
autoCommit: true,
|
autoCommit: true,
|
||||||
idsPending: idsPending,
|
idsPending: idsPending,
|
||||||
event: make(chan interface{}, 4),
|
event: make(chan *types.PartitionsRebalancedEvent, 4),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const READ_COUNT = 10
|
const READ_COUNT = 10
|
||||||
|
|
||||||
func (c *Consumer) Rebalanced() <-chan interface{} {
|
func (c *Consumer) Rebalanced() <-chan *types.PartitionsRebalancedEvent {
|
||||||
return c.event
|
return c.event
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ type consumerImpl struct {
|
||||||
idsPending streamPendingIDsMap
|
idsPending streamPendingIDsMap
|
||||||
lastTs int64
|
lastTs int64
|
||||||
autoCommit bool
|
autoCommit bool
|
||||||
event chan interface{}
|
event chan *types.PartitionsRebalancedEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueueMessage struct {
|
type QueueMessage struct {
|
||||||
|
|
@ -67,7 +67,7 @@ func NewConsumer(client *Client, group string, streams []string) types.Consumer
|
||||||
group: group,
|
group: group,
|
||||||
autoCommit: true,
|
autoCommit: true,
|
||||||
idsPending: idsPending,
|
idsPending: idsPending,
|
||||||
event: make(chan interface{}, 4),
|
event: make(chan *types.PartitionsRebalancedEvent, 4),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -169,6 +169,6 @@ func (c *consumerImpl) Commit() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *consumerImpl) Rebalanced() <-chan interface{} {
|
func (c *consumerImpl) Rebalanced() <-chan *types.PartitionsRebalancedEvent {
|
||||||
return c.event
|
return c.event
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"openreplay/backend/pkg/env"
|
"openreplay/backend/pkg/env"
|
||||||
"openreplay/backend/pkg/messages"
|
"openreplay/backend/pkg/messages"
|
||||||
|
"openreplay/backend/pkg/queue/types"
|
||||||
|
|
||||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
@ -20,6 +21,7 @@ type Consumer struct {
|
||||||
commitTicker *time.Ticker
|
commitTicker *time.Ticker
|
||||||
pollTimeout uint
|
pollTimeout uint
|
||||||
events chan interface{}
|
events chan interface{}
|
||||||
|
rebalanced chan *types.PartitionsRebalancedEvent
|
||||||
lastReceivedPrtTs map[int32]int64
|
lastReceivedPrtTs map[int32]int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -72,7 +74,8 @@ func NewConsumer(
|
||||||
messageIterator: messageIterator,
|
messageIterator: messageIterator,
|
||||||
commitTicker: commitTicker,
|
commitTicker: commitTicker,
|
||||||
pollTimeout: 200,
|
pollTimeout: 200,
|
||||||
events: make(chan interface{}, 4),
|
events: make(chan interface{}, 32),
|
||||||
|
rebalanced: make(chan *types.PartitionsRebalancedEvent, 32),
|
||||||
lastReceivedPrtTs: make(map[int32]int64, 16),
|
lastReceivedPrtTs: make(map[int32]int64, 16),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -96,15 +99,25 @@ func (consumer *Consumer) reBalanceCallback(_ *kafka.Consumer, e kafka.Event) er
|
||||||
case kafka.RevokedPartitions:
|
case kafka.RevokedPartitions:
|
||||||
// receive before re-balancing partitions; stop consuming messages and commit current state
|
// receive before re-balancing partitions; stop consuming messages and commit current state
|
||||||
consumer.events <- evt.String()
|
consumer.events <- evt.String()
|
||||||
|
parts := make([]uint64, len(evt.Partitions))
|
||||||
|
for i, p := range evt.Partitions {
|
||||||
|
parts[i] = uint64(p.Partition)
|
||||||
|
}
|
||||||
|
consumer.rebalanced <- &types.PartitionsRebalancedEvent{Type: types.RebalanceTypeRevoke, Partitions: parts}
|
||||||
case kafka.AssignedPartitions:
|
case kafka.AssignedPartitions:
|
||||||
// receive after re-balancing partitions; continue consuming messages
|
// receive after re-balancing partitions; continue consuming messages
|
||||||
//consumer.events <- evt.String()
|
consumer.events <- evt.String()
|
||||||
|
parts := make([]uint64, len(evt.Partitions))
|
||||||
|
for i, p := range evt.Partitions {
|
||||||
|
parts[i] = uint64(p.Partition)
|
||||||
|
}
|
||||||
|
consumer.rebalanced <- &types.PartitionsRebalancedEvent{Type: types.RebalanceTypeAssign, Partitions: parts}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (consumer *Consumer) Rebalanced() <-chan interface{} {
|
func (consumer *Consumer) Rebalanced() <-chan *types.PartitionsRebalancedEvent {
|
||||||
return consumer.events
|
return consumer.rebalanced
|
||||||
}
|
}
|
||||||
|
|
||||||
func (consumer *Consumer) Commit() error {
|
func (consumer *Consumer) Commit() error {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue