from __future__ import annotations
import copy
from pathlib import Path
from typing import Union
import f90nml
from pylbo.automation.defaults import namelist_items
from pylbo.exceptions import ParfileGenerationError
from pylbo.utilities.logger import pylboLogger
from pylbo.utilities.toolbox import transform_to_list
[docs]def _ensure_nb_names_and_nb_runs_matches(
names: Union[str, list[str]], nb_runs: int
) -> list[str]:
"""
Ensures that the number of names matches the number of runs.
Parameters
----------
names : str, list[str]
The basename(s) for the parfile(s).
nb_runs : int
Number of runs for which the parfiles are generated.
Returns
-------
names : list[str]
The basename(s) for the parfile(s).
"""
names = transform_to_list(names)
if len(names) == 1:
names = names * nb_runs
elif len(names) != nb_runs:
raise ValueError(
f"Number of parfile names ({len(names)}) does not match number of runs "
f"({nb_runs})."
)
return names
[docs]def _validate_basenames(basenames: list[str]) -> list[str]:
"""
Validates the basenames for given parfiles.
Parameters
----------
basename : list[str]
The basenames for the parfiles. If not given, defaults to "parfile[i]".
Returns
-------
basename : list[str]
The basename for a parfile.
"""
if len(basenames) == 1:
return ["parfile"] if basenames[0] is None else basenames
for i, name in enumerate(basenames):
basenames[i] = f"parfile{i + 1:04d}" if name is None else name
return basenames
[docs]def _validate_output_dir(output_dir, subdir):
"""
Validates and returns the output directory for the parfiles.
Parameters
----------
output_dir : str, ~os.PathLike
The output directory to store the parfiles in. If not given, defaults to
the current working directory.
subdir : boolean
If `.true.`, creates a subdirectory called `parfiles`.
Raises
------
NotADirectoryError
If the output directory is not found.
Returns
-------
output : ~os.PathLike
The resolved path to the output directory with "parfiles" appended.
"""
if output_dir is None:
output_dir = Path.cwd()
output = Path(output_dir).resolve()
if not output.is_dir():
raise NotADirectoryError(output)
if subdir:
output = (output / "parfiles").resolve()
if not output.is_dir():
Path.mkdir(output)
return output
[docs]class ParfileGenerator:
"""
Handles parfile generation.
Parameters
----------
parfile_dict : dict
Dictionary containing the keys to be placed in the parfile.
basename : str, list[str]
The basename for the parfile, the `.par` suffix is added automatically and is
not needed. If not provided, the basename will default to `parfile`.
Can be a list of names as well if multiple parfiles are being generated.
output_dir : str, ~os.PathLike
Output directory where the parfiles are saved, defaults to the current
working directory if not specified.
subdir : boolean
If `True` (default), creates a subdirectory under `output_dir` called
`parfiles` in which the parfiles will be saved.
prefix_numbers : boolean
If `True` prepends the `basename` by a n-digit number (e.g. xxxxmyparfile.par).
The number of digits is specified by `nb_prefix_digits`.
nb_prefix_digits : int
Number of digits to prepend to the `basename` if `prefix_numbers` is `True`.
Defaults to 4.
"""
def __init__(
self,
parfile_dict,
basename=None,
output_dir=None,
subdir=True,
prefix_numbers=True,
nb_prefix_digits=4,
):
self.parfile_dict = copy.deepcopy(parfile_dict)
self.nb_runs = self.parfile_dict.pop("number_of_runs", 1)
names = _ensure_nb_names_and_nb_runs_matches(basename, self.nb_runs)
self.basenames = _validate_basenames(transform_to_list(names))
self.output_dir = _validate_output_dir(output_dir, subdir)
self._use_prefix = prefix_numbers
self._nb_prefix_digits = nb_prefix_digits
self.parfiles = []
self.container = {}
[docs] def _get_and_check_item(self, namelist, name, allowed_dtypes):
"""
Does typechecking on the various dictionary keys supplied to the parfile
generator. Pops the key from the dictionary.
Parameters
----------
namelist : str
One of the namelists ("gridlist", "savelist", etc.)
name : str
The key to check.
allowed_dtypes : class, tuple
Allowed types for that particular key. Either a single value or a tuple.
Raises
------
TypeError
If the types do not match, e.g. if "gridpoints" is specified as a float
value when it should be an integer.
Returns
-------
item : any
The item popped from the dictionary corresponding to the given key.
"""
item = self.parfile_dict.pop(name, None)
if item is not None:
item_aslist = transform_to_list(item)
for obj in item_aslist:
if not isinstance(obj, allowed_dtypes):
raise TypeError(
f"namelist '{namelist}' expected item '{name}' to be of "
f"type {allowed_dtypes} but got {type(obj)}. \n"
f"item value = {item}"
)
return item
[docs] def create_namelist_from_dict(self):
"""
Creates one major namelist from the given dictionary.
Raises
------
ParfileGenerationError
- If the original dictionary is not empty after everything should be popped
- If there is an inconsistency between array sizes of dictionary items
"""
for namelist, items in namelist_items.items():
# update container
self.container.update({namelist: {}})
# loop over names for this specific namelist
for name, dtypes in items:
obj = self._get_and_check_item(namelist, name, dtypes)
if obj is not None:
obj = transform_to_list(obj)
self.container[namelist].update({name: obj})
# if this namelist is still empty, remove it
if self.container[namelist] == {}:
self.container.pop(namelist)
# account for parameters separately
params = self.parfile_dict.pop("parameters", {})
if len(params) != 0:
self.container["equilibriumlist"].update({"use_defaults": [False]})
for name, param in params.items():
params.update({name: transform_to_list(param)})
self.container.update({"paramlist": params})
# here the original dictionary should be empty,
# something went wrong if it isn't
if len(self.parfile_dict) != 0:
raise ParfileGenerationError(self.parfile_dict)
# update container for number of runs, all items are lists
for namelist, items in self.container.items():
for key, values in items.items():
if len(values) == 1:
values_list = values * self.nb_runs
elif len(values) == self.nb_runs:
values_list = values
else:
raise ParfileGenerationError(items, self.nb_runs, key)
# make sure that sigma has complex values
if key == "sigma":
values_list = [complex(value) for value in values_list]
self.container.get(namelist).update({key: values_list})
[docs] def generate_parfiles(self):
"""
Creates separate parfiles from the main namelist container and writes
the individual parfiles to disk.
Returns
-------
parfiles : list of str
List containing the paths of the parfiles, can be passed to the legolas
runner.
"""
run_dict = {key: {} for key in self.container.keys()}
# savelist must be present
try:
run_dict["savelist"]
except KeyError:
run_dict.update({"savelist": {}})
for current_run in range(self.nb_runs):
prefix = (
f"{current_run + 1:0{self._nb_prefix_digits}d}"
if self._use_prefix and self.nb_runs > 1
else ""
)
# generate dictionary for this specific run
for namelist, items in self.container.items():
for key, values in items.items():
run_dict[namelist].update({key: values[current_run]})
basename = self.basenames[current_run]
parfile_name = f"{prefix}{basename}.par"
# datfile name (no extension .dat needed)
datfile_name = (
f"{prefix}{run_dict['savelist'].get('basename_datfile', basename)}"
)
run_dict["savelist"].update({"basename_datfile": datfile_name})
# set paths and write parfile
parfile_path = (self.output_dir / parfile_name).resolve()
self.parfiles.append(str(parfile_path))
f90nml.write(run_dict, parfile_path, force=True)
# clear dictionary but keep keys
run_dict.update({key: {} for key in run_dict})
pylboLogger.info(f"parfiles generated and saved to {self.output_dir}")
return self.parfiles