Source code for pylbo.utilities.datfiles.header
from __future__ import annotations
from typing import Any, BinaryIO
import numpy as np
from pylbo._version import VersionHandler
from pylbo.utilities.datfiles.istream_reader import (
SIZE_COMPLEX,
SIZE_DOUBLE,
SIZE_INT,
read_boolean_from_istream,
read_complex_from_istream,
read_float_from_istream,
read_int_from_istream,
read_string_from_istream,
)
from pylbo.utilities.toolbox import transform_to_numpy
from pylbo.visualisation.utils import ensure_attr_set
[docs]class LegolasHeader:
"""Baseclass for a Legolas header"""
def __init__(self, istream: BinaryIO, version: VersionHandler):
self.legolas_version = version
self.data = {}
self._str_len = None
self._str_len_array = None
self._set_str_lengths(istream)
[ensure_attr_set(self, attr) for attr in ("_str_len", "_str_len_array")]
self.read_header_data(istream)
self.read_data_offsets(istream)
def __str__(self) -> str:
keys_to_avoid = ["ef_written_flags", "ef_written_idxs", "offsets"]
return "".join(
[
f"{key}: {self.data.get(key)}\n"
for key in self.data.keys()
if key not in keys_to_avoid
]
)
def __getitem__(self, key: str) -> Any:
return self.data[key]
[docs] def _set_str_lengths(self, istream: BinaryIO) -> None:
self._str_len, self._str_len_array = read_int_from_istream(istream, amount=2)
[docs] def read_header_data(self, istream: BinaryIO) -> None:
data = {}
data.update(self._read_physics_type_info(istream))
data.update(self._read_grid_info(istream))
data.update(self._read_io_info(istream))
data.update(self._read_solver_info(istream))
data.update(self._read_equilibrium_info(istream))
data.update(self._read_units_info(istream))
data.update(self._read_physics_info(istream))
data.update(self._read_parameters(istream))
data.update(self._read_equilibrium_names(istream))
self.data.update(data)
[docs] def read_data_offsets(self, istream: BinaryIO) -> None:
offsets = {}
# eigenvalues
nb_eigenvals = read_int_from_istream(istream)
self.data["nb_eigenvalues"] = nb_eigenvals
offsets["eigenvalues"] = istream.tell()
bytesize = nb_eigenvals * SIZE_COMPLEX
istream.seek(istream.tell() + bytesize)
# grid
offsets["grid"] = istream.tell()
bytesize = self.data["gridpoints"] * SIZE_DOUBLE
istream.seek(istream.tell() + bytesize)
# Gaussian grid
offsets["grid_gauss"] = istream.tell()
bytesize = self.data["gauss_gridpoints"] * SIZE_DOUBLE
istream.seek(istream.tell() + bytesize)
# equilibrium arrays
offsets["equilibrium_arrays"] = istream.tell()
bytesize = (
self.data["gauss_gridpoints"]
* len(self.data["equilibrium_names"])
* SIZE_DOUBLE
)
istream.seek(istream.tell() + bytesize)
offsets.update(self._get_eigenfunction_offsets(istream))
offsets.update(self._get_derived_eigenfunction_offsets(istream))
offsets.update(self._get_eigenvector_offsets(istream))
offsets.update(self._get_residual_offsets(istream))
offsets.update(self._get_matrices_offsets(istream))
self.data["offsets"] = offsets
[docs] def _read_physics_type_info(self, istream: BinaryIO) -> dict:
data = {}
data["nb_eqs"] = read_int_from_istream(istream)
len_type = read_int_from_istream(istream)
data["physics_type"] = read_string_from_istream(istream, length=len_type)
len_name, size_vector = read_int_from_istream(istream, amount=2)
data["state_vector"] = np.asarray(
read_string_from_istream(istream, length=len_name, amount=size_vector),
dtype=str,
)
data["dims"] = {}
for key in ("integralblock", "subblock", "quadblock", "matrix"):
data["dims"][f"dim_{key}"] = read_int_from_istream(istream)
return data
[docs] def _read_grid_info(self, istream: BinaryIO) -> dict:
data = {}
len_geom = read_int_from_istream(istream)
data["geometry"] = read_string_from_istream(istream, length=len_geom)
for key in ("", "gauss_", "ef_"):
data[f"{key}gridpoints"] = read_int_from_istream(istream)
data["gauss"] = {}
n_gauss = read_int_from_istream(istream)
data["gauss"]["number_of_nodes"] = n_gauss
data["gauss"]["nodes"] = np.asarray(
read_float_from_istream(istream, amount=n_gauss), dtype=float
)
data["gauss"]["weights"] = np.asarray(
read_float_from_istream(istream, amount=n_gauss), dtype=float
)
data["x_start"], data["x_end"] = read_float_from_istream(istream, amount=2)
return data
[docs] def _read_io_info(self, istream: BinaryIO) -> dict:
data = {}
data["has_matrices"] = read_boolean_from_istream(istream)
data["has_eigenvectors"] = read_boolean_from_istream(istream)
data["has_residuals"] = read_boolean_from_istream(istream)
data["has_efs"] = read_boolean_from_istream(istream)
data["has_derived_efs"] = read_boolean_from_istream(istream)
data["ef_subset_used"] = read_boolean_from_istream(istream)
data["ef_subset_radius"] = read_float_from_istream(istream)
data["ef_subset_center"] = read_complex_from_istream(istream)
return data
[docs] def _read_solver_info(self, istream: BinaryIO) -> dict:
data = {}
len_solver = read_int_from_istream(istream)
data["solver"] = read_string_from_istream(istream, length=len_solver)
arnoldi_data = {}
len_mode = read_int_from_istream(istream)
arnoldi_data["arpack_mode"] = read_string_from_istream(istream, length=len_mode)
arnoldi_data["number_of_eigenvalues"] = read_int_from_istream(istream)
len_which = read_int_from_istream(istream)
arnoldi_data["which_eigenvalues"] = read_string_from_istream(
istream, length=len_which
)
arnoldi_data["ncv"] = read_int_from_istream(istream)
data["maxiter"] = read_int_from_istream(istream)
data["sigma"] = read_complex_from_istream(istream)
data["tolerance"] = read_float_from_istream(istream)
data["arnoldi"] = arnoldi_data if data["solver"] == "arnoldi" else None
return data
[docs] def _read_equilibrium_info(self, istream: BinaryIO) -> dict:
data = {}
len_equil_type = read_int_from_istream(istream)
data["eq_type"] = read_string_from_istream(istream, length=len_equil_type)
len_boundary_type = read_int_from_istream(istream)
data["boundary_type"] = read_string_from_istream(
istream, length=len_boundary_type
)
return data
[docs] def _read_units_info(self, istream: BinaryIO) -> dict:
n_units = read_int_from_istream(istream)
units = {"cgs": read_boolean_from_istream(istream)}
for _ in range(n_units):
len_name = read_int_from_istream(istream)
name = read_string_from_istream(istream, length=len_name)
value = read_float_from_istream(istream)
units[name] = value
return {"units": units}
[docs] def _read_physics_info(self, istream: BinaryIO) -> dict:
data = {}
data["gamma"] = read_float_from_istream(istream)
data["is_incompressible"] = read_boolean_from_istream(istream)
physics = {}
physics["flow"] = read_boolean_from_istream(istream)
physics["cooling"] = read_boolean_from_istream(istream)
len_curve = read_int_from_istream(istream)
physics["cooling_curve"] = read_string_from_istream(istream, length=len_curve)
physics["interpolation_points"] = read_int_from_istream(istream)
physics["external_gravity"] = read_boolean_from_istream(istream)
physics["resistivity"] = read_boolean_from_istream(istream)
physics["has_fixed_resistivity"] = read_boolean_from_istream(istream)
physics["viscosity"] = read_boolean_from_istream(istream)
physics["has_viscous_heating"] = read_boolean_from_istream(istream)
physics["conduction"] = read_boolean_from_istream(istream)
physics["has_parallel_conduction"] = read_boolean_from_istream(istream)
physics["has_fixed_tc_para"] = read_boolean_from_istream(istream)
physics["has_perpendicular_conduction"] = read_boolean_from_istream(istream)
physics["has_fixed_tc_perp"] = read_boolean_from_istream(istream)
physics["Hall"] = read_boolean_from_istream(istream)
physics["Hall_uses_substitution"] = read_boolean_from_istream(istream)
physics["has_electron_inertia"] = read_boolean_from_istream(istream)
data["physics"] = physics
return data
[docs] def _read_parameters(self, istream: BinaryIO) -> dict:
parameters = {}
nb_params, len_name = read_int_from_istream(istream, amount=2)
for _ in range(nb_params):
name = read_string_from_istream(istream, length=len_name)
parameters[name] = read_float_from_istream(istream)
parameters = {k: v for k, v in parameters.items() if not np.isnan(v)}
return {"parameters": parameters}
[docs] def _read_equilibrium_names(self, istream: BinaryIO) -> dict:
nb_names, len_name = read_int_from_istream(istream, amount=2)
self.data["has_background"] = nb_names > 0
names = (
read_string_from_istream(istream, length=len_name, amount=nb_names)
if self.data["has_background"]
else []
)
# due to a typo older datfiles contain 'db03' instead of 'dB03'
names = [name.replace("db03", "dB03") for name in names]
return {"equilibrium_names": names}
[docs] def _get_eigenfunction_offsets(self, istream: BinaryIO) -> dict:
if not self.data["has_efs"]:
return {}
self.data["ef_names"] = self.data["state_vector"]
# eigenfunction grid
ef_gridsize = read_int_from_istream(istream)
offsets = self._get_ef_grid_offset(ef_gridsize, istream)
# flags
self._get_ef_written_flags(istream)
# eigenfunction block offsets
offsets.update(self._get_ef_block_offsets(istream))
return offsets
[docs] def _get_ef_grid_offset(self, ef_gridsize: int, istream: BinaryIO) -> dict:
offsets = {"ef_grid": istream.tell()}
bytesize = ef_gridsize * SIZE_DOUBLE
istream.seek(istream.tell() + bytesize)
return offsets
[docs] def _get_ef_written_flags(self, istream: BinaryIO) -> None:
# eigenfunction flags
ef_flags_size = read_int_from_istream(istream)
self.data["ef_written_flags"] = np.asarray(
read_int_from_istream(istream, amount=ef_flags_size), dtype=bool
)
ef_idxs_size = read_int_from_istream(istream)
self.data["ef_written_idxs"] = transform_to_numpy(
np.asarray(read_int_from_istream(istream, amount=ef_idxs_size), dtype=int)
- 1
) # -1 corrects for Fortran 1-based indexing
# do a sanity check
assert np.all(
self.data["ef_written_idxs"] == np.where(self.data["ef_written_flags"])[0]
)
[docs] def _get_ef_block_offsets(self, istream: BinaryIO) -> dict:
# eigenfunction offsets
offsets = {"ef_arrays": istream.tell()}
# bytesize of a single eigenfunction block (all efs for 1 state vector variable)
bytesize_block = (
self.data["ef_gridpoints"]
* len(self.data["ef_written_idxs"])
* SIZE_COMPLEX
)
offsets["ef_block_bytesize"] = bytesize_block
offsets["ef_bytesize"] = self.data["ef_gridpoints"] * SIZE_COMPLEX
istream.seek(istream.tell() + bytesize_block * self.data["nb_eqs"])
return offsets
[docs] def _get_derived_eigenfunction_offsets(self, istream: BinaryIO) -> dict:
if not self.data["has_derived_efs"]:
return {}
nb_names, size_names = read_int_from_istream(istream, amount=2)
return self._get_derived_ef_names_and_offsets(nb_names, size_names, istream)
[docs] def _get_derived_ef_names_and_offsets(
self, nb_names, size_names, istream: BinaryIO
) -> dict:
self.data["derived_ef_names"] = np.asarray(
read_string_from_istream(istream, length=size_names, amount=nb_names),
dtype=str,
)
offsets = {"derived_ef_arrays": istream.tell()}
bytesize_block = (
self.data["ef_gridpoints"]
* len(self.data["ef_written_idxs"])
* nb_names
* SIZE_COMPLEX
)
istream.seek(istream.tell() + bytesize_block)
return offsets
[docs] def _get_eigenvector_offsets(self, istream: BinaryIO) -> dict:
if not self.data["has_eigenvectors"]:
return {}
len_eigvecs, nb_eigvecs = read_int_from_istream(istream, amount=2)
offsets = {
"eigenvectors": istream.tell(),
"eigenvector_length": len_eigvecs,
"nb_eigenvectors": nb_eigvecs,
}
bytesize = len_eigvecs * nb_eigvecs * SIZE_COMPLEX
istream.seek(istream.tell() + bytesize)
return offsets
[docs] def _get_residual_offsets(self, istream: BinaryIO) -> dict:
if not self.data["has_residuals"]:
return {}
nb_residuals = read_int_from_istream(istream)
offsets = {"residuals": istream.tell(), "nb_residuals": nb_residuals}
bytesize = nb_residuals * SIZE_DOUBLE
istream.seek(istream.tell() + bytesize)
return offsets
[docs] def _get_matrices_offsets(self, istream: BinaryIO) -> dict:
if not self.data["has_matrices"]:
return {}
nonzero_B_elements, nonzero_A_elements = read_int_from_istream(
istream, amount=2
)
# B matrix is written as (row, column, real value)
byte_size = (2 * SIZE_INT + SIZE_DOUBLE) * nonzero_B_elements
offsets = {"matrix_B": istream.tell()}
istream.seek(istream.tell() + byte_size)
self.data["nonzero_B_elements"] = nonzero_B_elements
# A matrix is written as (row, column, complex value)
offsets["matrix_A"] = istream.tell()
self.data["nonzero_A_elements"] = nonzero_A_elements
return offsets