packeter.py: checksums, class structure
This commit is contained in:
@@ -1,6 +1,46 @@
|
|||||||
import struct
|
import struct
|
||||||
|
import zlib
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
|
||||||
|
class Buffer:
|
||||||
|
def __init__(self, buffer):
|
||||||
|
self._offset = 0
|
||||||
|
self._buffer = buffer
|
||||||
|
|
||||||
|
def _write(self, data):
|
||||||
|
size = len(data)
|
||||||
|
if self._offset == len(self._buffer):
|
||||||
|
self._buffer += data
|
||||||
|
else:
|
||||||
|
self._buffer = self._buffer[:self._offset] + data + self._buffer[self._offset+size:]
|
||||||
|
self._offset += size
|
||||||
|
|
||||||
|
def _read(self, n):
|
||||||
|
data = self._buffer[self._offset:self._offset+n]
|
||||||
|
self._offset = min(len(self._buffer), self._offset+n)
|
||||||
|
|
||||||
|
def eof(self):
|
||||||
|
return self._offset >= len(self._buffer)
|
||||||
|
|
||||||
|
def remaining_size(self):
|
||||||
|
return len(self._buffer[self._offset:])
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
return self._buffer
|
||||||
|
|
||||||
|
def length(self):
|
||||||
|
return len(self._buffer)
|
||||||
|
|
||||||
|
def seek(self, pos):
|
||||||
|
if pos < 0:
|
||||||
|
self._offset = max(0, self._offset - pos)
|
||||||
|
else:
|
||||||
|
self._offset = min(len(self._buffer, pos))
|
||||||
|
|
||||||
|
def tell(self):
|
||||||
|
return self._offset
|
||||||
|
|
||||||
class StructWrapper:
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _endian(e):
|
def _endian(e):
|
||||||
return ">" if e else "<"
|
return ">" if e else "<"
|
||||||
@@ -17,18 +57,11 @@ class StructWrapper:
|
|||||||
assert f in format_sizes
|
assert f in format_sizes
|
||||||
return format_sizes[f]
|
return format_sizes[f]
|
||||||
|
|
||||||
class Parser(StructWrapper):
|
class Parser(Buffer):
|
||||||
def __init__(self, data, big_endian=False):
|
def __init__(self, buffer, big_endian=False):
|
||||||
self.data = data
|
super().__init__(buffer)
|
||||||
self.offset = 0
|
|
||||||
self.big_endian = big_endian
|
self.big_endian = big_endian
|
||||||
|
|
||||||
def eof(self):
|
|
||||||
return self.offset >= len(self.data)
|
|
||||||
|
|
||||||
def remaining_size(self):
|
|
||||||
return len(self.data[self.offset:])
|
|
||||||
|
|
||||||
def _struct_unpack(self, big_endian, f):
|
def _struct_unpack(self, big_endian, f):
|
||||||
size = self._format_size(f)
|
size = self._format_size(f)
|
||||||
assert self.remaining_size() >= size
|
assert self.remaining_size() >= size
|
||||||
@@ -37,8 +70,7 @@ class Parser(StructWrapper):
|
|||||||
if big_endian is None:
|
if big_endian is None:
|
||||||
big_endian = self.big_endian
|
big_endian = self.big_endian
|
||||||
|
|
||||||
value = struct.unpack(self._endian(big_endian) + f, self.data[self.offset:self.offset+size])[0]
|
value = struct.unpack(self._endian(big_endian) + f, self._read(size))[0]
|
||||||
self.offset += size
|
|
||||||
return value
|
return value
|
||||||
|
|
||||||
def read_byte(self):
|
def read_byte(self):
|
||||||
@@ -66,10 +98,7 @@ class Parser(StructWrapper):
|
|||||||
return self._struct_unpack(big_endian, "Q")
|
return self._struct_unpack(big_endian, "Q")
|
||||||
|
|
||||||
def read_bin(self, length):
|
def read_bin(self, length):
|
||||||
d = self.data[self.offset:self.offset+length]
|
return self._read(length)
|
||||||
assert len(self.data[self.offset:]) >= length
|
|
||||||
self.offset += length
|
|
||||||
return d
|
|
||||||
|
|
||||||
def read_until(self, byte):
|
def read_until(self, byte):
|
||||||
data = b""
|
data = b""
|
||||||
@@ -82,24 +111,16 @@ class Parser(StructWrapper):
|
|||||||
|
|
||||||
class Packer(StructWrapper):
|
class Packer(StructWrapper):
|
||||||
def __init__(self, big_endian=False):
|
def __init__(self, big_endian=False):
|
||||||
self.buffer = b""
|
super().__init__(b"")
|
||||||
self.offset = 0
|
|
||||||
self.big_endian = big_endian
|
self.big_endian = big_endian
|
||||||
|
|
||||||
def get(self):
|
|
||||||
return self.buffer
|
|
||||||
|
|
||||||
def length(self):
|
|
||||||
return len(self.buffer)
|
|
||||||
|
|
||||||
def _struct_pack(self, big_endian, f, value):
|
def _struct_pack(self, big_endian, f, value):
|
||||||
# grab default endianess, when none is given
|
# grab default endianess, when none is given
|
||||||
if big_endian is None:
|
if big_endian is None:
|
||||||
big_endian = self.big_endian
|
big_endian = self.big_endian
|
||||||
|
|
||||||
size = self._format_size(f)
|
size = self._format_size(f)
|
||||||
self.buffer += struct.pack(self._endian(big_endian) + f, value)
|
self.write(struct.pack(self._endian(big_endian) + f, value))
|
||||||
self.offset += size
|
|
||||||
|
|
||||||
def write_byte(self, value):
|
def write_byte(self, value):
|
||||||
self._struct_pack(big_endian, "c", value)
|
self._struct_pack(big_endian, "c", value)
|
||||||
@@ -126,8 +147,25 @@ class Packer(StructWrapper):
|
|||||||
self._struct_unpack(big_endian, "Q", value)
|
self._struct_unpack(big_endian, "Q", value)
|
||||||
|
|
||||||
def write_bin(self, value):
|
def write_bin(self, value):
|
||||||
self.buffer += value
|
self.write(value)
|
||||||
self.offset += len(value)
|
|
||||||
|
|
||||||
def write_string(self, value, encoding="UTF-8"):
|
def write_string(self, value, encoding="UTF-8"):
|
||||||
self.write_bin(value.encode(encoding))
|
self.write_bin(value.encode(encoding))
|
||||||
|
|
||||||
|
def fill(self, b: bytes, n: int):
|
||||||
|
self.write_bin(b * n)
|
||||||
|
|
||||||
|
def crc32(self) -> int:
|
||||||
|
return zlib.crc32(self.get()) & 0xFFFFFFFF
|
||||||
|
|
||||||
|
def md5(self) -> bytes:
|
||||||
|
return hashlib.md5(self.get()).digest()
|
||||||
|
|
||||||
|
def sha1(self) -> bytes:
|
||||||
|
return hashlib.sha1(self.get()).digest()
|
||||||
|
|
||||||
|
def sha256(self) -> bytes:
|
||||||
|
return hashlib.sha256(self.get()).digest()
|
||||||
|
|
||||||
|
def hmac(self, algorithm: str = "sha256"):
|
||||||
|
return hmac.new(key, self._data, getattr(hashlib, algorithm)).digest()
|
||||||
|
|||||||
Reference in New Issue
Block a user