:py:mod:`mlair.data_handler` ============================ .. py:module:: mlair.data_handler .. autoapi-nested-parse:: Data Handling. The module data_handling contains all methods and classes that are somehow related to data preprocessing, postprocessing, loading, and distribution for training. Submodules ---------- .. toctree:: :titlesonly: :maxdepth: 1 abstract_data_handler/index.rst data_handler_mixed_sampling/index.rst data_handler_neighbors/index.rst data_handler_single_station/index.rst data_handler_with_filter/index.rst default_data_handler/index.rst input_bootstraps/index.rst iterator/index.rst Package Contents ---------------- Classes ~~~~~~~ .. autoapisummary:: mlair.data_handler.Bootstraps mlair.data_handler.KerasIterator mlair.data_handler.DataCollection mlair.data_handler.DefaultDataHandler mlair.data_handler.AbstractDataHandler mlair.data_handler.DataHandlerNeighbors Attributes ~~~~~~~~~~ .. autoapisummary:: mlair.data_handler.__author__ mlair.data_handler.__date__ .. py:data:: __author__ :annotation: = Lukas Leufen, Felix Kleinert .. py:data:: __date__ :annotation: = 2020-04-17 .. py:class:: Bootstraps(data: mlair.data_handler.abstract_data_handler.AbstractDataHandler, number_of_bootstraps: int = 10, bootstrap_dimension: str = 'variables', bootstrap_type='singleinput', bootstrap_method='shuffle') Bases: :py:obj:`collections.Iterable` Main class to perform bootstrap operations. This class requires a data handler following the definition of the AbstractDataHandler, the number of bootstraps to create and the dimension along this bootstrapping is performed (default dimension is `variables`). When iterating on this class, it returns the bootstrapped X, Y and a tuple with (position of variable in X, name of this variable). The tuple is interesting if X consists on mutliple input streams X_i (e.g. two or more stations) because it shows which variable of which input X_i has been bootstrapped. All bootstrap combinations can be retrieved by calling the .bootstraps() method. Further more, by calling the .get_orig_prediction() this class imitates according to the set number of bootstraps the original prediction. As bootstrap method, this class can currently make use of the ShuffleBoostraps class that uses drawing with replacement to destroy the variables information by keeping its statistical properties. Use `bootstrap="shuffle"` to call this method. Another method is the zero mean bootstrapping triggered by `bootstrap="zero_mean"` and performed by the MeanBootstraps class. This method destroy the variable's information by a mode collapse to constant value of zero. In case, the variable is normalized with a zero mean, this is equivalent to a mode collapse to the variable's mean value. Statistics in general are not conserved in this case, but the mean value of course. A custom mean value for bootstrapping is currently not supported. .. py:method:: __iter__(self) .. py:method:: __len__(self) .. py:method:: bootstraps(self) .. py:method:: get_orig_prediction(self, path: str, file_name: str, prediction_name: str = 'CNN') -> numpy.ndarray Repeat predictions from given file(_name) in path by the number of boots. :param path: path to file :param file_name: file name :param prediction_name: name of the prediction to select from loaded file (default CNN) :return: repeated predictions .. py:class:: KerasIterator(collection: DataCollection, batch_size: int, batch_path: str, shuffle_batches: bool = False, model=None, upsampling=False, name=None, use_multiprocessing=False, max_number_multiprocessing=1) Bases: :py:obj:`tensorflow.keras.utils.Sequence` Base object for fitting to a sequence of data, such as a dataset. Every `Sequence` must implement the `__getitem__` and the `__len__` methods. If you want to modify your dataset between epochs you may implement `on_epoch_end`. The method `__getitem__` should return a complete batch. Notes: `Sequence` are a safer way to do multiprocessing. This structure guarantees that the network will only train once on each sample per epoch which is not the case with generators. Examples: ```python from skimage.io import imread from skimage.transform import resize import numpy as np import math # Here, `x_set` is list of path to the images # and `y_set` are the associated classes. class CIFAR10Sequence(Sequence): def __init__(self, x_set, y_set, batch_size): self.x, self.y = x_set, y_set self.batch_size = batch_size def __len__(self): return math.ceil(len(self.x) / self.batch_size) def __getitem__(self, idx): batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size] batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size] return np.array([ resize(imread(file_name), (200, 200)) for file_name in batch_x]), np.array(batch_y) ``` .. py:method:: __len__(self) -> int Number of batch in the Sequence. :returns: The number of batches in the Sequence. .. py:method:: __getitem__(self, index: int) -> Tuple[numpy.ndarray, numpy.ndarray] Get batch for given index. .. py:method:: _get_model_rank(self) .. py:method:: __data_generation(self, index: int) -> Tuple[numpy.ndarray, numpy.ndarray] Load pickle data from disk. .. py:method:: _concatenate(new: List[numpy.ndarray], old: List[numpy.ndarray]) -> List[numpy.ndarray] :staticmethod: Concatenate two lists of data along axis=0. .. py:method:: _concatenate_multi(*args: List[numpy.ndarray]) -> List[numpy.ndarray] :staticmethod: Concatenate two lists of data along axis=0. .. py:method:: _prepare_batches(self, use_multiprocessing=False, max_process=1) -> None Prepare all batches as locally stored files. Walk through all elements of collection and split (or merge) data according to the batch size. Too long data sets are divided into multiple batches. Not fully filled batches are retained together with remains from the next collection elements. These retained data are concatenated and also split into batches. If data are still remaining afterwards, they are saved as final smaller batch. All batches are enumerated by a running index starting at 0. A list with all batch numbers is stored in class's parameter indexes. This method can either use a serial approach or use multiprocessing to decrease computational time. .. py:method:: _cleanup_path(path: str, create_new: bool = True) -> None :staticmethod: First remove existing path, second create empty path if enabled. .. py:method:: on_epoch_end(self) -> None Randomly shuffle indexes if enabled. .. py:class:: DataCollection(collection: list = None, name: str = None) Bases: :py:obj:`collections.Iterable` .. py:method:: name(self) :property: .. py:method:: __len__(self) .. py:method:: __iter__(self) -> collections.Iterator .. py:method:: __getitem__(self, index) .. py:method:: add(self, element) .. py:method:: _set_mapping(self) .. py:method:: keys(self) .. py:class:: DefaultDataHandler(id_class: data_handler, experiment_path: str, min_length: int = 0, extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False, name_affix=None, store_processed_data=True, iter_dim=DEFAULT_ITER_DIM, time_dim=DEFAULT_TIME_DIM, use_multiprocessing=True, max_number_multiprocessing=MAX_NUMBER_MULTIPROCESSING) Bases: :py:obj:`mlair.data_handler.abstract_data_handler.AbstractDataHandler` .. py:attribute:: _requirements .. py:attribute:: _store_attributes .. py:attribute:: _skip_args .. py:attribute:: DEFAULT_ITER_DIM :annotation: = Stations .. py:attribute:: DEFAULT_TIME_DIM :annotation: = datetime .. py:attribute:: MAX_NUMBER_MULTIPROCESSING :annotation: = 16 .. py:method:: build(cls, station: str, **kwargs) :classmethod: Return initialised class. .. py:method:: _create_collection(self) .. py:method:: _reset_data(self) .. py:method:: _cleanup(self) .. py:method:: _store(self, fresh_store=False, store_processed_data=True) .. py:method:: get_store_attributes(self) Returns all attribute names and values that are indicated by the store_attributes method. .. py:method:: _force_dask_computation(data) :staticmethod: .. py:method:: _load(self) .. py:method:: get_data(self, upsampling=False, as_numpy=True) .. py:method:: __repr__(self) Return repr(self). .. py:method:: __len__(self, upsampling=False) .. py:method:: get_X_original(self) .. py:method:: get_Y_original(self) .. py:method:: _to_numpy(d) :staticmethod: .. py:method:: get_X(self, upsampling=False, as_numpy=True) .. py:method:: get_Y(self, upsampling=False, as_numpy=True) .. py:method:: harmonise_X(self) .. py:method:: get_observation(self) .. py:method:: apply_transformation(self, data, base='target', dim=0, inverse=False) This method must return transformed data. The flag inverse can be used to trigger either transformation or its inverse method. .. py:method:: multiply_extremes(self, extreme_values: num_or_list = 1.0, extremes_on_right_tail_only: bool = False, timedelta: Tuple[int, str] = (1, 'm'), dim=DEFAULT_TIME_DIM) Multiply extremes. This method extracts extreme values from self.labels which are defined in the argument extreme_values. One can also decide only to extract extremes on the right tail of the distribution. When extreme_values is a list of floats/ints all values larger (and smaller than negative extreme_values; extraction is performed in standardised space) than are extracted iteratively. If for example extreme_values = [1.,2.] then a value of 1.5 would be extracted once (for 0th entry in list), while a 2.5 would be extracted twice (once for each entry). Timedelta is used to mark those extracted values by adding one min to each timestamp. As TOAR Data are hourly one can identify those "artificial" data points later easily. Extreme inputs and labels are stored in self.extremes_history and self.extreme_labels, respectively. :param extreme_values: user definition of extreme :param extremes_on_right_tail_only: if False also multiply values which are smaller then -extreme_values, if True only extract values larger than extreme_values :param timedelta: used as arguments for np.timedelta in order to mark extreme values on datetime .. py:method:: _add_timedelta(data, dim, timedelta) :staticmethod: .. py:method:: transformation(cls, set_stations, tmp_path=None, dh_transformation=None, **kwargs) :classmethod: ### supported transformation methods Currently supported methods are: * standardise (default, if method is not given) * centre * min_max * log ### mean and std estimation Mean and std (depending on method) are estimated. For each station, mean and std are calculated and afterwards aggregated using the mean value over all station-wise metrics. This method is not exactly accurate, especially regarding the std calculation but therefore much faster. Furthermore, it is a weighted mean weighted by the time series length / number of data itself - a longer time series has more influence on the transformation settings than a short time series. The estimation of the std in less accurate, because the unweighted mean of all stds in not equal to the true std, but still the mean of all station-wise std is a decent estimate. Finally, the real accuracy of mean and std is less important, because it is "just" a transformation / scaling. ### mean and std given If mean and std are not None, the default data handler expects this parameters to match the data and applies this values to the data. Make sure that all dimensions and/or coordinates are in agreement. ### min and max given If min and max are not None, the default data handler expects this parameters to match the data and applies this values to the data. Make sure that all dimensions and/or coordinates are in agreement. .. py:method:: aggregate_transformation(cls, transformation_dict, iter_dim) :classmethod: .. py:method:: update_transformation_dict(cls, dh, transformation_dict) :classmethod: Inner method that is performed in both serial and parallel approach. .. py:method:: get_coordinates(self) Return coordinates as dictionary with keys `lon` and `lat`. .. py:class:: AbstractDataHandler(*args, **kwargs) Bases: :py:obj:`object` .. py:attribute:: _requirements :annotation: = [] .. py:attribute:: _store_attributes :annotation: = [] .. py:attribute:: _skip_args :annotation: = ['self'] .. py:method:: build(cls, *args, **kwargs) :classmethod: Return initialised class. .. py:method:: __len__(self, upsampling=False) :abstractmethod: .. py:method:: requirements(cls, skip_args=None) :classmethod: Return requirements and own arguments without duplicates. .. py:method:: own_args(cls, *args) :classmethod: Return all arguments (including kwonlyargs). .. py:method:: super_args(cls) :classmethod: .. py:method:: store_attributes(cls) -> list :classmethod: Let MLAir know that some data should be stored in the data store. This is used for calculations on the train subset that should be applied to validation and test subset. To work properly, add a class variable cls._store_attributes to your data handler. If your custom data handler is constructed on different data handlers (e.g. like the DefaultDataHandler), it is required to overwrite the get_store_attributs method in addition to return attributes from the corresponding subclasses. This is not required, if only attributes from the main class are to be returned. Note, that MLAir will store these attributes with the data handler's identification. This depends on the custom data handler setting. When loading an attribute from the data handler, it is therefore required to extract the right information by using the class identification. In case of the DefaultDataHandler this can be achieved to convert all keys of the attribute to string and compare these with the station parameter. .. py:method:: get_store_attributes(self) Returns all attribute names and values that are indicated by the store_attributes method. .. py:method:: transformation(cls, *args, **kwargs) :classmethod: .. py:method:: apply_transformation(self, data, inverse=False, **kwargs) :abstractmethod: This method must return transformed data. The flag inverse can be used to trigger either transformation or its inverse method. .. py:method:: get_X(self, upsampling=False, as_numpy=False) :abstractmethod: .. py:method:: get_Y(self, upsampling=False, as_numpy=False) :abstractmethod: .. py:method:: get_data(self, upsampling=False, as_numpy=False) .. py:method:: get_coordinates(self) -> Union[None, Dict] Return coordinates as dictionary with keys `lon` and `lat`. .. py:method:: _hash_list(self) .. py:class:: DataHandlerNeighbors(id_class, data_path, neighbors=None, min_length=0, extreme_values: num_or_list = None, extremes_on_right_tail_only: bool = False) Bases: :py:obj:`mlair.data_handler.DefaultDataHandler` Data handler including neighboring stations. .. py:method:: build(cls, station, **kwargs) :classmethod: Return initialised class. .. py:method:: _create_collection(self) .. py:method:: get_coordinates(self, include_neighbors=False) Return coordinates as dictionary with keys `lon` and `lat`.