Source code for pylbo.utilities.datfiles.istream_reader

from __future__ import annotations

import struct
from functools import wraps
from typing import BinaryIO, Union

[docs]DTYPES = { "str": "c", "int": "i", "bool": "i", "float": "d", "complex": 2 * "d", }
[docs]NP_DTYPES = { "str": str, "int": int, "bool": bool, "float": float, "complex": complex, }
[docs]BYTE_ORDERS = { "little": "<", "big": ">", "native": "=", }
[docs]SIZE_CHAR = struct.calcsize(DTYPES["str"])
[docs]SIZE_INT = struct.calcsize(DTYPES["int"])
[docs]SIZE_BOOL = struct.calcsize(DTYPES["bool"])
[docs]SIZE_DOUBLE = struct.calcsize(DTYPES["float"])
[docs]SIZE_COMPLEX = struct.calcsize(DTYPES["complex"])
[docs]def requires_version(version_needed, default=None): def check_version(func): @wraps(func) def wrapper(*args, **kwargs): if args[0].legolas_version < version_needed: return default return func(*args, **kwargs) return wrapper return check_version
[docs]def read_string_from_istream( istream: BinaryIO, length: int, amount: int = 1, offset: int = None, byte_order: str = "native", ) -> Union[str, list[str]]: """ Reads a string from the input stream. Parameters ---------- istream : BinaryIO The input stream to read from. length : int The length of the string to read. amount : int, optional The amount of strings to read, by default 1. offset : int, optional The offset to seek to before reading, by default `None`. byte_order : str, optional The byte order to use, by default "native". Returns ------- str, list of str The string(s) read from the input stream. """ if offset is not None: istream.seek(offset) fmt = BYTE_ORDERS[byte_order] + amount * length * DTYPES["str"] hdr = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) if amount == 1: return b"".join(hdr).strip().decode() return [ b"".join(hdr[i : i + length]).strip().decode() for i in range(0, amount * length, length) ]
[docs]def read_int_from_istream( istream: BinaryIO, amount: int = 1, offset: int = None, byte_order: str = "native", ) -> Union[int, tuple[int, ...]]: """ Reads an integer from the input stream. Parameters ---------- istream : BinaryIO The input stream to read from. amount : int, optional The amount of integers to read, by default 1. offset : int, optional The offset to seek to before reading, by default `None`. byte_order : str, optional The byte order to use, by default "native". Returns ------- int, tuple of int The integer(s) read from the input stream. """ if offset is not None: istream.seek(offset) fmt = BYTE_ORDERS[byte_order] + amount * DTYPES["int"] hdr = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) if amount == 1: (hdr,) = hdr # unpack for single values return hdr
[docs]def read_boolean_from_istream( istream: BinaryIO, offset: int = None, byte_order: str = "native", ) -> bool: """ Reads a boolean from the input stream. Parameters ---------- istream : BinaryIO The input stream to read from. offset : int, optional The offset to seek to before reading, by default `None`. byte_order : str, optional The byte order to use, by default "native". Returns ------- bool The boolean read from the input stream. """ return bool(read_int_from_istream(istream, offset=offset, byte_order=byte_order))
[docs]def read_float_from_istream( istream: BinaryIO, amount: int = 1, offset: int = None, byte_order: str = "native", ) -> Union[float, tuple[float, ...]]: """ Reads a float from the input stream. Parameters ---------- istream : BinaryIO The input stream to read from. amount : int, optional The amount of floats to read, by default 1. offset : int, optional The offset to seek to before reading, by default `None`. byte_order : str, optional The byte order to use, by default "native". Returns ------- float, tuple of float The float(s) read from the input stream. """ if offset is not None: istream.seek(offset) fmt = BYTE_ORDERS[byte_order] + amount * DTYPES["float"] hdr = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) if amount == 1: (hdr,) = hdr # unpack for single values return hdr
[docs]def read_complex_from_istream( istream: BinaryIO, amount: int = 1, offset: int = None, byte_order: str = "native", ) -> Union[complex, tuple[complex, ...]]: """ Reads a complex from the input stream. Parameters ---------- istream : BinaryIO The input stream to read from. amount : int, optional The amount of complex numbers to read, by default 1. offset : int, optional The offset to seek to before reading, by default `None`. byte_order : str, optional The byte order to use, by default "native". Returns ------- complex, tuple of complex The complex number(s) read from the input stream. """ if offset is not None: istream.seek(offset) fmt = BYTE_ORDERS[byte_order] + amount * DTYPES["complex"] hdr = struct.unpack(fmt, istream.read(struct.calcsize(fmt))) if amount == 1: return complex(*hdr) # unpack for single values reals, imags = hdr[::2], hdr[1::2] return tuple([complex(x, y) for x, y in zip(reals, imags)])
[docs]def read_mixed_from_istream( istream: BinaryIO, fmt: str, amount: int = 1, offset: int = None, byte_order: str = "native", ) -> tuple(complex, ...): """ Reads a number of mixed types from the input stream. Parameters ---------- istream : BinaryIO The input stream to read from. fmt : str The format string to use. amount : int, optional The amount of mixed types to read, by default 1. offset : int, optional The offset to seek to before reading, by default `None`. byte_order : str, optional The byte order to use, by default "native". Returns ------- tuple of mixed The mixed types read from the input stream. """ for char in fmt: if char not in DTYPES.values(): raise ValueError( f"Invalid format character {char}, expected one of {DTYPES.values()}" ) if offset is not None: istream.seek(offset) fmt = BYTE_ORDERS[byte_order] + amount * fmt return struct.unpack(fmt, istream.read(struct.calcsize(fmt)))