Repository restructuring
This commit is contained in:
133
utils/packeter.py
Normal file
133
utils/packeter.py
Normal file
@@ -0,0 +1,133 @@
|
||||
import struct
|
||||
|
||||
class StructWrapper:
|
||||
@staticmethod
|
||||
def _endian(e):
|
||||
return ">" if e else "<"
|
||||
|
||||
@staticmethod
|
||||
def _format_size(f):
|
||||
format_sizes = {
|
||||
"c": 1,
|
||||
"h": 2, "H": 2,
|
||||
"i": 4, "I": 4,
|
||||
"q": 8, "Q": 8,
|
||||
}
|
||||
|
||||
assert f in format_sizes
|
||||
return format_sizes[f]
|
||||
|
||||
class Parser(StructWrapper):
|
||||
def __init__(self, data, big_endian=False):
|
||||
self.data = data
|
||||
self.offset = 0
|
||||
self.big_endian = big_endian
|
||||
|
||||
def eof(self):
|
||||
return self.offset >= len(self.data)
|
||||
|
||||
def remaining_size(self):
|
||||
return len(self.data[self.offset:])
|
||||
|
||||
def _struct_unpack(self, big_endian, f):
|
||||
size = self._format_size(f)
|
||||
assert self.remaining_size() >= size
|
||||
|
||||
# grab default endianess, when none is given
|
||||
if big_endian is None:
|
||||
big_endian = self.big_endian
|
||||
|
||||
value = struct.unpack(self._endian(big_endian) + f, self.data[self.offset:self.offset+size])[0]
|
||||
self.offset += size
|
||||
return value
|
||||
|
||||
def read_byte(self):
|
||||
return self._struct_unpack(big_endian, "c")
|
||||
|
||||
def read_char(self):
|
||||
return chr(self.read_byte())
|
||||
|
||||
def read_signed_short(self, big_endian=None):
|
||||
return self._struct_unpack(big_endian, "h")
|
||||
|
||||
def read_unsigned_short(self, big_endian=None):
|
||||
return self._struct_unpack(big_endian, "H")
|
||||
|
||||
def read_signed_int(self, big_endian=None):
|
||||
return self._struct_unpack(big_endian, "i")
|
||||
|
||||
def read_unsigned_int(self, big_endian=None):
|
||||
return self._struct_unpack(big_endian, "I")
|
||||
|
||||
def read_signed_long(self, big_endian=None):
|
||||
return self._struct_unpack(big_endian, "q")
|
||||
|
||||
def read_unsigned_long(self, big_endian=None):
|
||||
return self._struct_unpack(big_endian, "Q")
|
||||
|
||||
def read_bin(self, length):
|
||||
d = self.data[self.offset:self.offset+length]
|
||||
assert len(self.data[self.offset:]) >= length
|
||||
self.offset += length
|
||||
return d
|
||||
|
||||
def read_until(self, byte):
|
||||
data = b""
|
||||
while not self.eof():
|
||||
c = self.read_byte()
|
||||
if c == byte:
|
||||
break
|
||||
data += c
|
||||
return data
|
||||
|
||||
class Packer(StructWrapper):
|
||||
def __init__(self, big_endian=False):
|
||||
self.buffer = b""
|
||||
self.offset = 0
|
||||
self.big_endian = big_endian
|
||||
|
||||
def get(self):
|
||||
return self.buffer
|
||||
|
||||
def length(self):
|
||||
return len(self.buffer)
|
||||
|
||||
def _struct_pack(self, big_endian, f, value):
|
||||
# grab default endianess, when none is given
|
||||
if big_endian is None:
|
||||
big_endian = self.big_endian
|
||||
|
||||
size = self._format_size(f)
|
||||
self.buffer += struct.pack(self._endian(big_endian) + f, value)
|
||||
self.offset += size
|
||||
|
||||
def write_byte(self, value):
|
||||
self._struct_pack(big_endian, "c", value)
|
||||
|
||||
def write_char(self, value):
|
||||
self._struct_pack(big_endian, "c", value.encode())
|
||||
|
||||
def write_signed_short(self, value, big_endian=None):
|
||||
self._struct_pack(big_endian, "c", value)
|
||||
|
||||
def write_unsigned_short(self, value, big_endian=None):
|
||||
self._struct_pack(big_endian, "H", value)
|
||||
|
||||
def write_signed_int(self, value, big_endian=None):
|
||||
self._struct_unpack(big_endian, "i", value)
|
||||
|
||||
def write_unsigned_int(self, value, big_endian=None):
|
||||
self._struct_unpack(big_endian, "I", value)
|
||||
|
||||
def write_signed_long(self, value, big_endian=None):
|
||||
self._struct_unpack(big_endian, "q", value)
|
||||
|
||||
def rwrite_unsigned_long(self, value, big_endian=None):
|
||||
self._struct_unpack(big_endian, "Q", value)
|
||||
|
||||
def write_bin(self, value):
|
||||
self.buffer += value
|
||||
self.offset += len(value)
|
||||
|
||||
def write_string(self, value, encoding="UTF-8"):
|
||||
self.write_bin(value.encode(encoding))
|
||||
Reference in New Issue
Block a user