diff --git a/backend/pkg/messages/batch.go b/backend/pkg/messages/batch.go index fa40db7b2..9241672a3 100644 --- a/backend/pkg/messages/batch.go +++ b/backend/pkg/messages/batch.go @@ -1,14 +1,17 @@ package messages import ( - "io" "bytes" + "io" "github.com/pkg/errors" ) func ReadBatch(b []byte, callback func(Message)) error { - reader := bytes.NewReader(b) + return ReadBatchReader(bytes.NewReader(b), callback) +} + +func ReadBatchReader(reader io.Reader, callback func(Message)) error { var index uint64 var timestamp int64 for { @@ -21,12 +24,12 @@ func ReadBatch(b []byte, callback func(Message)) error { msg = transformDepricated(msg) isBatchMeta := false - switch m := msg.(type){ - case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it) + switch m := msg.(type) { + case *BatchMeta: // Is not required to be present in batch since IOS doesn't have it (though we might change it) if index != 0 { // Might be several 0-0 BatchMeta in a row without a error though return errors.New("Batch Meta found at the end of the batch") } - index = m.PageNo << 32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) + index = m.PageNo<<32 + m.FirstIndex // 2^32 is the maximum count of messages per page (ha-ha) timestamp = m.Timestamp isBatchMeta = true // continue readLoop @@ -34,7 +37,7 @@ func ReadBatch(b []byte, callback func(Message)) error { if index != 0 { // Might be several 0-0 BatchMeta in a row without a error though return errors.New("Batch Meta found at the end of the batch") } - index = m.FirstIndex + index = m.FirstIndex timestamp = int64(m.Timestamp) isBatchMeta = true // continue readLoop @@ -46,23 +49,23 @@ func ReadBatch(b []byte, callback func(Message)) error { msg.Meta().Index = index msg.Meta().Timestamp = timestamp callback(msg) - if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker + if !isBatchMeta { // Without that indexes will be unique anyway, though shifted by 1 because BatchMeta is not counted in tracker index++ } } return errors.New("Error of the codeflow. (Should return on EOF)") } -const AVG_MESSAGE_SIZE = 40 // TODO: calculate OR calculate dynamically +const AVG_MESSAGE_SIZE = 40 // TODO: calculate OR calculate dynamically func WriteBatch(mList []Message) []byte { - batch := make([]byte, AVG_MESSAGE_SIZE * len(mList)) + batch := make([]byte, AVG_MESSAGE_SIZE*len(mList)) p := 0 for _, msg := range mList { msgBytes := msg.Encode() - if len(batch) < p + len(msgBytes) { - newBatch := make([]byte, 2*len(batch) + len(msgBytes)) - copy(newBatch, batch) - batch = newBatch + if len(batch) < p+len(msgBytes) { + newBatch := make([]byte, 2*len(batch)+len(msgBytes)) + copy(newBatch, batch) + batch = newBatch } copy(batch[p:], msgBytes) p += len(msgBytes) @@ -70,12 +73,12 @@ func WriteBatch(mList []Message) []byte { return batch[:p] } -func RewriteBatch(b []byte, rewrite func(Message) Message) ([]byte, error) { - mList := make([]Message, 0, len(b)/AVG_MESSAGE_SIZE) - if err := ReadBatch(b, func(m Message) { +func RewriteBatch(reader io.Reader, rewrite func(Message) Message) ([]byte, error) { + mList := make([]Message, 0, 10) // 10? + if err := ReadBatchReader(reader, func(m Message) { mList = append(mList, rewrite(m)) }); err != nil { return nil, err } return WriteBatch(mList), nil -} \ No newline at end of file +} diff --git a/backend/services/http/handlers-web.go b/backend/services/http/handlers-web.go index dcbd33720..7aab5bfbc 100644 --- a/backend/services/http/handlers-web.go +++ b/backend/services/http/handlers-web.go @@ -3,7 +3,6 @@ package main import ( "encoding/json" "errors" - "io/ioutil" "log" "math/rand" "net/http" @@ -76,14 +75,14 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { responseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) return } - sessionID, err := flaker.Compose(uint64(startTime.UnixMilli())) + sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6)) if err != nil { responseWithError(w, http.StatusInternalServerError, err) return } // TODO: if EXPIRED => send message for two sessions association expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) - tokenData = &token.TokenData{sessionID, expTime.UnixMilli()} + tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6} country := geoIP.ExtractISOCodeFromHTTPRequest(r) producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{ @@ -108,8 +107,8 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { //delayDuration := time.Now().Sub(startTime) responseWithJSON(w, &response{ - //Timestamp: startTime.UnixMilli(), - //Delay: delayDuration.Milliseconds(), + //Timestamp: startTime.UnixNano() / 1e6, + //Delay: delayDuration.Nanoseconds() / 1e6, Token: tokenizer.Compose(*tokenData), UserUUID: userUUID, SessionID: strconv.FormatUint(tokenData.ID, 10), @@ -125,17 +124,8 @@ func pushMessagesHandlerWeb(w http.ResponseWriter, r *http.Request) { } body := http.MaxBytesReader(w, r.Body, BEACON_SIZE_LIMIT) defer body.Close() - buf, err := ioutil.ReadAll(body) - if err != nil { - responseWithError(w, http.StatusInternalServerError, err) // TODO: send error here only on staging - return - } - //log.Printf("Sending batch...") - //startTime := time.Now() - // analyticsMessages := make([]Message, 0, 200) - - rewritenBuf, err := RewriteBatch(buf, func(msg Message) Message { + rewritenBuf, err := RewriteBatch(body, func(msg Message) Message { switch m := msg.(type) { case *SetNodeAttributeURLBased: if m.Name == "src" || m.Name == "href" {