"""States component."""
import os
import glob
import numpy as np

from .decorators import apply_to_each_input, state_check, ndim_check
from .base_spatial import SpatialComponent
from .plot_utils import show_cube_static, show_cube_interactive
from .parse_utils import read_ecl_bin
from .utils import get_single_path


[docs]class States(SpatialComponent): """States component of geological model.""" @property def n_timesteps(self): """Effective number of timesteps.""" if not self.attributes: return 0 return np.min([x.shape[0] for _, x in self.items()])
[docs] @apply_to_each_input def apply(self, func, attr, *args, inplace=False, **kwargs): """Apply function to each timestamp of states attributes. Parameters ---------- func : callable A function to apply. Must accept data as its first argument. attr : str, array-like Attributes to get data from. args : misc Any additional positional arguments to ``func``. inplace: bool Modify сomponent inplace. kwargs : misc Any additional named arguments to ``func``. Returns ------- output : States Transformed component. """ data = getattr(self, attr) res = np.array([func(x, *args, **kwargs) for x in data]) if inplace: setattr(self, attr, res) return self return res
@apply_to_each_input def _to_spatial(self, attr, dimens, inplace): """Spatial order 'F' transformations.""" return self.reshape(attr=attr, newshape=(self.n_timesteps,) + tuple(dimens), order='F', inplace=inplace) @apply_to_each_input def _ravel(self, attr, inplace): """Ravel order 'F' transformations.""" return self.reshape(attr=attr, newshape=(self.n_timesteps, -1), order='F', inplace=inplace)
[docs] @apply_to_each_input def pad_na(self, attr, actnum, fill_na=0., inplace=True): """Add dummy cells into the state vector in the positions of non-active cells if necessary. Parameters ---------- attr: str, array-like Attributes to be padded with non-active cells. actnum: array-like of type bool Vector representing a mask of active and non-active cells. fill_na: float Value to be used as filler. inplace: bool Modify сomponent inplace. Returns ------- output : component if inplace else padded attribute. """ data = getattr(self, attr) if[1:]) == actnum.size: return self if inplace else data if data.ndim > 2: raise ValueError('Data should be raveled before padding.') n_ts = data.shape[0] actnum_ravel = actnum.ravel(order='F').astype(bool) not_actnum_ravel = ~actnum_ravel padded_data = np.empty(shape=(n_ts, actnum.size), dtype=float) padded_data[..., actnum_ravel] = data del data padded_data[..., not_actnum_ravel] = fill_na if inplace: setattr(self, attr, padded_data) return self return padded_data
[docs] @apply_to_each_input def strip_na(self, attr, actnum, inplace=True): """Remove non-active cells from the state vector. Parameters ---------- attr: str, array-like Attributes to be stripped actnum: array-like of type bool Vector representing mask of active and non-active cells. inplace: bool Modify сomponent inplace. Returns ------- output : component if inplace else stripped attribute. Notes ----- Outputs 1d array for each timestamp. """ if self.state.spatial and inplace: raise ValueError('Inplace is not allowed in spatial state.') data = self.ravel(attr, inplace=False) if data.shape[1] == np.sum(actnum): return self if inplace else data stripped_data = data[..., actnum.ravel(order='F')] if inplace: setattr(self, attr, stripped_data) return self return stripped_data
def __getitem__(self, keys): out = self.__class__() for attr, data in self.items(): data = data[keys].reshape((-1,) + data.shape[1:]) setattr(out, attr, data) out.set_state(**self.state.as_dict()) return out
[docs] @state_check(lambda state: state.spatial) @ndim_check(4) def show_cube(self, attr, t=None, x=None, y=None, z=None, actnum=None, figsize=None, **kwargs): """Visualize slices of 4D states arrays. If no slice is specified, spatial slices will be shown with interactive slider widgets. Parameters ---------- attr : str Attribute to show. t : int or None, optional Timestamp to show. x : int or None, optional Slice along x-axis to show. y : int or None, optional Slice along y-axis to show. z : int or None, optional Slice along z-axis to show. actnum : array, optional Actnum array. If None, all cell are active. figsize : array-like, optional Output plot size. kwargs : dict, optional Additional keyword arguments for plot. """ data = getattr(self, attr) if actnum is not None: data = data * actnum if np.all([t is None, x is None, y is None, z is None]): show_cube_interactive(data, figsize=figsize, **kwargs) else: show_cube_static(data, t=t, x=x, y=y, z=z, figsize=figsize, **kwargs) return self
def _read_buffer(self, path_or_buffer, attr, **kwargs): super()._read_buffer(path_or_buffer, attr, **kwargs) return self.reshape(attr=attr, newshape=(1, -1)) def _load_ecl_binary(self, path_to_results, attrs, basename, logger=None, **kwargs): """Load states from binary ECLIPSE results files. Parameters ---------- path_to_results : str Path to the folder with precomputed results of hydrodynamical simulation attrs : list or str Keyword names to be loaded logger : logger Logger for messages. **kwargs : dict, optional Any kwargs to be passed to load method. Returns ------- states : States States with loaded attributes. """ if attrs is None: attrs = FULL_STATE_KEYS unifout_path = get_single_path(path_to_results, basename + '.UNRST', logger) multout_paths = _get_multout_paths(path_to_results) if unifout_path is not None: return self._load_ecl_bin_unifout(unifout_path, attrs=attrs, logger=logger, **kwargs) if multout_paths is not None: return self._load_ecl_bin_multout(multout_paths, attrs=attrs, logger=logger, **kwargs) if logger is not None: logger.warning('The results in "%s" were not found!' % path_to_results) return self raise FileNotFoundError('The results in "%s" were not found!' % path_to_results) def _load_ecl_bin_unifout(self, path, attrs, logger, subset=None, **kwargs): """Load states from .UNRST binary file. Parameters ---------- path: str Path to the .UNRST file. attrs: list or str Keyword names to be loaded from the file. kwargs : dict, optional Any kwargs to be passed to load method. Returns ------- states : States States with loaded attributes. """ _ = kwargs if isinstance(attrs, str): attrs = [attrs] states = read_ecl_bin(path, attrs, sequential=True, subset=subset, logger=logger) for attr, x in states.items(): setattr(self, attr, np.array(x)) return self def _load_ecl_bin_multout(self, paths, attrs, logger, subset=None, **kwargs): """Load states from .X____ binary files. Parameters ---------- paths: list List of paths to .X____ files attrs: list or str Keyword names to be loaded from the files. kwargs : dict, optional Any kwargs to be passed to load method. Returns ------- states : States States with loaded attributes. """ _ = kwargs if isinstance(attrs, str): attrs = [attrs] states = {}'Start reading X files.') def is_in_subset(x): fmt = os.path.splitext(x)[1] timestep = int(fmt.lstrip('.X')) criteria = timestep in subset return criteria paths = filter(is_in_subset if subset is not None else None, paths) for path in paths: state = read_ecl_bin(path, attrs, logger=logger) for attr, x in state.items(): if attr not in states: states[attr] = [x] else: states[attr].append(x)'Finish reading X files.') states = {attr: np.stack(x) for attr, x in states.items()} for attr, x in states.items(): setattr(self, attr, x) return self def _make_data_dump(self, attr, fmt=None, actnum=None, float_dtype=None, **kwargs): """Prepare data for dump.""" if fmt.upper() == 'ASCII': data = self.ravel(attr=attr, inplace=False) return data[0] if fmt.upper() == 'HDF5': if actnum is None: data = self.ravel(attr=attr, inplace=False) else: data = self.strip_na(attr=attr, actnum=actnum, inplace=False) return data if float_dtype is None else data.astype(float_dtype) return super()._make_data_dump(attr, fmt=fmt, **kwargs)
def _get_multout_paths(path_to_results): """Searches for .X____ files in a RESULT folder near .DATA file Parameters ---------- path_to_results: str Path to the folder with precomputed results of hydrodynamical simulation Returns ------- paths: list or None List of the paths found. None else. """ multout_paths = glob.glob(os.path.join(path_to_results, '*.X*')) return sorted(multout_paths) if multout_paths else None