From 7fc8fab4d7d40248f7a22ca076ea9d0d12179c6e Mon Sep 17 00:00:00 2001 From: mauricio garcia suarez Date: Thu, 25 Aug 2022 14:34:33 +0200 Subject: [PATCH] Minor issues solved --- ee/connectors/handler.py | 10 ++++++++++ ee/connectors/msgcodec/msgcodec.py | 27 ++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/ee/connectors/handler.py b/ee/connectors/handler.py index 1442c9148..02f393091 100644 --- a/ee/connectors/handler.py +++ b/ee/connectors/handler.py @@ -401,6 +401,16 @@ def handle_message(message: Message) -> Optional[DetailedEvent]: n.batchmeta_timestamp = message.timestamp return n + if isinstance(message, BatchMetadata): + n.batchmeta_page_no = message.page_no + n.batchmeta_first_index = message.first_index + n.batchmeta_timestamp = message.timestamp + return n + + if isinstance(message, PartitionedMessage): + n.part_no = message.part_no + n.part_total = message.part_total + if isinstance(message, PerformanceTrack): n.performancetrack_frames = message.frames n.performancetrack_ticks = message.ticks diff --git a/ee/connectors/msgcodec/msgcodec.py b/ee/connectors/msgcodec/msgcodec.py index 03515316e..b390643ce 100644 --- a/ee/connectors/msgcodec/msgcodec.py +++ b/ee/connectors/msgcodec/msgcodec.py @@ -20,13 +20,37 @@ class MessageCodec(Codec): reader = io.BytesIO(b) return self.read_head_message(reader) + @staticmethod + def check_message_id(b: bytes) -> int: + """ + todo: make it static and without reader. It's just the first byte + Read and return the first byte where the message id is encoded + """ + reader = io.BytesIO(b) + id_ = Codec.read_uint(reader) + + return id_ + + @staticmethod + def decode_key(b) -> int: + """ + Decode the message key (encoded with little endian) + """ + try: + decoded = int.from_bytes(b, "little", signed=False) + except Exception as e: + raise UnicodeDecodeError(f"Error while decoding message key (SessionID) from {b}\n{e}") + return decoded + def decode_detailed(self, b: bytes): reader = io.BytesIO(b) messages_list = list() messages_list.append(self.handler(reader, 0)) if isinstance(messages_list[0], BatchMeta): + # Old BatchMeta mode = 0 elif isinstance(messages_list[0], BatchMetadata): + # New BatchMeta mode = 1 else: return messages_list @@ -40,15 +64,16 @@ class MessageCodec(Codec): def handler(self, reader: io.BytesIO, mode=0): message_id = self.read_message_id(reader) if mode == 1: + # We skip the three bytes representing the length of message. It can be used to skip unwanted messages reader.read(3) return self.read_head_message(reader, message_id) elif mode == 0: + # Old format with no bytes for message length return self.read_head_message(reader, message_id) else: raise IOError() def read_head_message(self, reader: io.BytesIO, message_id: int): - if message_id == 80: return BatchMeta( page_no=self.read_uint(reader),