Coverage for mlair/run_modules/pre_processing.py: 62%
278 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-12-02 15:24 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-12-02 15:24 +0000
1"""Pre-processing module."""
3__author__ = "Lukas Leufen, Felix Kleinert"
4__date__ = '2019-11-25'
6import logging
7import os
8import traceback
9from typing import Tuple
10import multiprocessing
11import requests
12import psutil
13import random
14import dill
16import pandas as pd
18from mlair.data_handler import DataCollection, AbstractDataHandler
19from mlair.helpers import TimeTracking, to_list, tables, remove_items
20from mlair.configuration import path_config
21from mlair.helpers.data_sources.toar_data import EmptyQueryResult
22from mlair.helpers.testing import check_nested_equality
23from mlair.run_modules.run_environment import RunEnvironment
26class PreProcessing(RunEnvironment):
27 """
28 Pre-process your data by using this class.
30 Schedule of pre-processing:
31 #. load and check valid stations (either download or load from disk)
32 #. split subsets (train, val, test, train & val)
33 #. create small report on data metrics
35 Required objects [scope] from data store:
36 * all elements from `DEFAULT_ARGS_LIST` in scope preprocessing for general data loading
37 * all elements from `DEFAULT_ARGS_LIST` in scopes [train, val, test, train_val] for custom subset settings
38 * `fraction_of_training` [.]
39 * `experiment_path` [.]
40 * `use_all_stations_on_all_data_sets` [.]
42 Optional objects
43 * all elements from `DEFAULT_KWARGS_LIST` in scope preprocessing for general data loading
44 * all elements from `DEFAULT_KWARGS_LIST` in scopes [train, val, test, train_val] for custom subset settings
46 Sets
47 * `stations` in [., train, val, test, train_val]
48 * `generator` in [train, val, test, train_val]
49 * `transformation` [.]
51 Creates
52 * all input and output data in `data_path`
53 * latex reports in `experiment_path/latex_report`
55 """
57 def __init__(self):
58 """Set up and run pre-processing."""
59 super().__init__()
60 self._run()
62 def _run(self):
63 snapshot_load_path = self.data_store.get_default("snapshot_load_path", default=None)
64 if snapshot_load_path is None: 64 ↛ 74line 64 didn't jump to line 74, because the condition on line 64 was never false
65 stations = self.data_store.get("stations")
66 data_handler = self.data_store.get("data_handler")
67 _, valid_stations = self.validate_station(data_handler, stations,
68 "preprocessing") # , store_processed_data=False)
69 if len(valid_stations) == 0: 69 ↛ 70line 69 didn't jump to line 70, because the condition on line 69 was never true
70 raise ValueError("Couldn't find any valid data according to given parameters. Abort experiment run.")
71 self.data_store.set("stations", valid_stations)
72 self.split_train_val_test()
73 else:
74 self.load_snapshot(snapshot_load_path)
75 self.report_pre_processing()
76 self.prepare_competitors()
77 if self.data_store.get_default("create_snapshot", False) is True: 77 ↛ 78line 77 didn't jump to line 78, because the condition on line 77 was never true
78 self.create_snapshot()
80 def report_pre_processing(self):
81 """Log some metrics on data and create latex report."""
82 logging.debug(20 * '##')
83 n_train = len(self.data_store.get('data_collection', 'train'))
84 n_val = len(self.data_store.get('data_collection', 'val'))
85 n_test = len(self.data_store.get('data_collection', 'test'))
86 n_total = n_train + n_val + n_test
87 logging.debug(f"Number of all stations: {n_total}")
88 logging.debug(f"Number of training stations: {n_train}")
89 logging.debug(f"Number of val stations: {n_val}")
90 logging.debug(f"Number of test stations: {n_test}")
91 self.create_latex_report()
93 def create_latex_report(self):
94 """
95 Create tables with information on the station meta data and a summary on subset sample sizes.
97 * station_sample_size.md: see table below as markdown
98 * station_sample_size.tex: same as table below as latex table
99 * station_sample_size_short.tex: reduced size table without any meta data besides station ID, as latex table
101 All tables are stored inside experiment_path inside the folder latex_report. The table format (e.g. which meta
102 data is highlighted) is currently hardcoded to have a stable table style. If further styles are needed, it is
103 better to add an additional style than modifying the existing table styles.
105 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+
106 | stat. ID | station_name | station_lon | station_lat | station_alt | train | val | test |
107 +============+===========================================+===============+===============+===============+=========+=======+========+
108 | DEBW013 | Stuttgart Bad Cannstatt | 9.2297 | 48.8088 | 235 | 1434 | 712 | 1080 |
109 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+
110 | DEBW076 | Baden-Baden | 8.2202 | 48.7731 | 148 | 3037 | 722 | 710 |
111 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+
112 | DEBW087 | Schwäbische_Alb | 9.2076 | 48.3458 | 798 | 3044 | 714 | 1087 |
113 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+
114 | DEBW107 | Tübingen | 9.0512 | 48.5077 | 325 | 1803 | 715 | 1087 |
115 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+
116 | DEBY081 | Garmisch-Partenkirchen/Kreuzeckbahnstraße | 11.0631 | 47.4764 | 735 | 2935 | 525 | 714 |
117 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+
118 | # Stations | nan | nan | nan | nan | 6 | 6 | 6 |
119 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+
120 | # Samples | nan | nan | nan | nan | 12253 | 3388 | 4678 |
121 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+
123 """
124 meta_cols = ["name", "lat", "lon", "alt", "country", "state", "type", "type_of_area", "toar1_category"]
125 meta_round = ["lat", "lon", "alt"]
126 precision = 4
127 path = os.path.join(self.data_store.get("experiment_path"), "latex_report")
128 path_config.check_path_and_create(path)
129 names_of_set = ["train", "val", "test"]
130 df = self.create_info_df(meta_cols, meta_round, names_of_set, precision)
131 column_format = tables.create_column_format_for_tex(df)
132 tables.save_to_tex(path=path, filename="station_sample_size.tex", column_format=column_format, df=df)
133 tables.save_to_md(path=path, filename="station_sample_size.md", df=df)
134 df_nometa = df.drop(meta_cols, axis=1)
135 column_format = tables.create_column_format_for_tex(df)
136 tables.save_to_tex(path=path, filename="station_sample_size_short.tex", column_format=column_format,
137 df=df_nometa)
138 tables.save_to_md(path=path, filename="station_sample_size_short.md", df=df_nometa)
139 df_descr = self.create_describe_df(df_nometa)
140 column_format = tables.create_column_format_for_tex(df_descr)
141 tables.save_to_tex(path=path, filename="station_describe_short.tex", column_format=column_format, df=df_descr)
142 tables.save_to_md(path=path, filename="station_describe_short.md", df=df_descr)
144 @staticmethod
145 def create_describe_df(df, percentiles=None, ignore_last_lines: int = 2):
146 if percentiles is None: 146 ↛ 148line 146 didn't jump to line 148, because the condition on line 146 was never false
147 percentiles = [.05, .1, .25, .5, .75, .9, .95]
148 df_descr = df.iloc[:-ignore_last_lines].astype('float32').describe(
149 percentiles=percentiles).astype("int32", errors="ignore")
150 df_descr = pd.concat([df.loc[['# Samples']], df_descr]).T
151 df_descr.rename(columns={"# Samples": "no. samples", "count": "no. stations"}, inplace=True)
152 df_descr_colnames = list(df_descr.columns)
153 df_descr_colnames = [df_descr_colnames[1]] + [df_descr_colnames[0]] + df_descr_colnames[2:]
154 df_descr = df_descr[df_descr_colnames]
155 return df_descr
157 def create_info_df(self, meta_cols, meta_round, names_of_set, precision):
158 use_multiprocessing = self.data_store.get("use_multiprocessing")
159 max_process = self.data_store.get("max_number_multiprocessing")
160 df = pd.DataFrame(columns=meta_cols + names_of_set)
161 for set_name in names_of_set:
162 data = self.data_store.get("data_collection", set_name)
163 n_process = min([psutil.cpu_count(logical=False), len(data), max_process]) # use only physical cpus
164 if n_process > 1 and use_multiprocessing is True: # parallel solution 164 ↛ 165line 164 didn't jump to line 165, because the condition on line 164 was never true
165 logging.info(f"use parallel create_info_df ({set_name})")
166 pool = multiprocessing.Pool(n_process)
167 logging.info(f"running {getattr(pool, '_processes')} processes in parallel")
168 output = [pool.apply_async(f_proc_create_info_df, args=(station, meta_cols)) for station in data]
169 for i, p in enumerate(output):
170 res = p.get()
171 station_name, shape, meta = res["station_name"], res["Y_shape"], res["meta"]
172 df.loc[station_name, set_name] = shape
173 if df.loc[station_name, meta_cols].isnull().any():
174 df.loc[station_name, meta_cols] = meta
175 logging.info(f"...finished: {station_name} ({int((i + 1.) / len(output) * 100)}%)")
176 pool.close()
177 pool.join()
178 else: # serial solution
179 logging.info(f"use serial create_info_df ({set_name})")
180 for station in data:
181 res = f_proc_create_info_df(station, meta_cols)
182 station_name, shape, meta = res["station_name"], res["Y_shape"], res["meta"]
183 df.loc[station_name, set_name] = shape
184 if df.loc[station_name, meta_cols].isnull().any():
185 df.loc[station_name, meta_cols] = meta
186 df.loc["# Samples", set_name] = df.loc[:, set_name].sum()
187 assert len(data) == df.loc[:, set_name].count() - 1
188 df.loc["# Stations", set_name] = len(data)
189 df[meta_round] = df[meta_round].astype(float).round(precision)
190 df.sort_index(inplace=True)
191 df = df.reindex(df.index.drop(["# Stations", "# Samples"]).to_list() + ["# Stations", "# Samples"], )
192 df.index.name = 'stat. ID'
193 return df
195 def split_train_val_test(self) -> None:
196 """
197 Split data into subsets.
199 Currently: train, val, test and train_val (actually this is only the merge of train and val, but as an separate
200 data_collection). IMPORTANT: Do not change to order of the execution of create_set_split. The train subset needs
201 always to be executed at first, to set a proper transformation.
202 """
203 fraction_of_training = self.data_store.get("fraction_of_training")
204 stations = self.data_store.get("stations")
205 train_index, val_index, test_index, train_val_index = self.split_set_indices(len(stations),
206 fraction_of_training)
207 subset_names = ["train", "val", "test", "train_val"]
208 if subset_names[0] != "train": # pragma: no cover
209 raise AssertionError(f"Make sure, that the train subset is always at first execution position! Given subset"
210 f"order was: {subset_names}.")
211 for (ind, scope) in zip([train_index, val_index, test_index, train_val_index], subset_names):
212 self.create_set_split(ind, scope)
214 @staticmethod
215 def split_set_indices(total_length: int, fraction: float) -> Tuple[slice, slice, slice, slice]:
216 """
217 Create the training, validation and test subset slice indices for given total_length.
219 The test data consists on (1-fraction) of total_length (fraction*len:end). Train and validation data therefore
220 are made from fraction of total_length (0:fraction*len). Train and validation data is split by the factor 0.8
221 for train and 0.2 for validation. In addition, split_set_indices returns also the combination of training and
222 validation subset.
224 :param total_length: list with all objects to split
225 :param fraction: ratio between test and union of train/val data
227 :return: slices for each subset in the order: train, val, test, train_val
228 """
229 pos_test_split = int(total_length * fraction)
230 train_index = slice(0, int(pos_test_split * 0.8))
231 val_index = slice(int(pos_test_split * 0.8), pos_test_split)
232 test_index = slice(pos_test_split, total_length)
233 train_val_index = slice(0, pos_test_split)
234 return train_index, val_index, test_index, train_val_index
236 def create_set_split(self, index_list: slice, set_name: str) -> None:
237 # get set stations
238 stations = self.data_store.get("stations", scope=set_name)
239 if self.data_store.get("use_all_stations_on_all_data_sets"):
240 set_stations = stations
241 else:
242 set_stations = stations[index_list]
243 logging.debug(f"{set_name.capitalize()} stations (len={len(set_stations)}): {set_stations}")
244 # create set data_collection and store
245 data_handler = self.data_store.get("data_handler")
246 collection, valid_stations = self.validate_station(data_handler, set_stations, set_name)
247 self.data_store.set("stations", valid_stations, scope=set_name)
248 self.data_store.set("data_collection", collection, scope=set_name)
250 def validate_station(self, data_handler: AbstractDataHandler, set_stations, set_name=None,
251 store_processed_data=True):
252 """
253 Check if all given stations in `all_stations` are valid.
255 Valid means, that there is data available for the given time range (is included in `kwargs`). The shape and the
256 loading time are logged in debug mode.
258 :return: Corrected list containing only valid station IDs.
259 """
260 t_outer = TimeTracking()
261 logging.info(f"check valid stations started{' (%s)' % (set_name if set_name is not None else 'all')}")
262 # calculate transformation using train data
263 if set_name == "train":
264 logging.info("setup transformation using train data exclusively")
265 self.transformation(data_handler, set_stations)
266 # start station check
267 collection = DataCollection(name=set_name)
268 valid_stations = []
269 kwargs = self.data_store.create_args_dict(data_handler.requirements(skip_args="station"), scope=set_name)
270 use_multiprocessing = self.data_store.get("use_multiprocessing")
271 tmp_path = self.data_store.get("tmp_path")
273 max_process = self.data_store.get("max_number_multiprocessing")
274 n_process = min([psutil.cpu_count(logical=False), len(set_stations), max_process]) # use only physical cpus
275 if n_process > 1 and use_multiprocessing is True: # parallel solution 275 ↛ 276line 275 didn't jump to line 276, because the condition on line 275 was never true
276 logging.info("use parallel validate station approach")
277 pool = multiprocessing.Pool(n_process)
278 logging.info(f"running {getattr(pool, '_processes')} processes in parallel")
279 kwargs.update({"tmp_path": tmp_path, "return_strategy": "reference"})
280 output = [
281 pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs)
282 for station in set_stations]
283 for i, p in enumerate(output):
284 _res_file, s = p.get()
285 logging.info(f"...finished: {s} ({int((i + 1.) / len(output) * 100)}%)")
286 with open(_res_file, "rb") as f:
287 dh = dill.load(f)
288 os.remove(_res_file)
289 if dh is not None:
290 collection.add(dh)
291 valid_stations.append(s)
292 pool.close()
293 pool.join()
294 else: # serial solution
295 logging.info("use serial validate station approach")
296 kwargs.update({"return_strategy": "result"})
297 for station in set_stations:
298 dh, s = f_proc(data_handler, station, set_name, store_processed_data, **kwargs)
299 if dh is not None:
300 collection.add(dh)
301 valid_stations.append(s)
303 logging.info(f"run for {t_outer} to check {len(set_stations)} station(s). Found {len(collection)}/"
304 f"{len(set_stations)} valid stations ({set_name}).")
305 if set_name == "train":
306 self.store_data_handler_attributes(data_handler, collection)
307 return collection, valid_stations
309 def store_data_handler_attributes(self, data_handler, collection):
310 store_attributes = data_handler.store_attributes()
311 if len(store_attributes) > 0: 311 ↛ 312line 311 didn't jump to line 312, because the condition on line 311 was never true
312 logging.info(f"store following parameters ({len(store_attributes)}) requested by the data handler: "
313 f"{','.join(store_attributes)}")
314 attrs = {}
315 for dh in collection:
316 station = str(dh)
317 for k, v in dh.get_store_attributes().items():
318 attrs[k] = dict(attrs.get(k, {}), **{station: v})
319 for k, v in attrs.items():
320 self.data_store.set(k, v)
322 def transformation(self, data_handler: AbstractDataHandler, stations):
323 calculate_fresh_transformation = self.data_store.get_default("calculate_fresh_transformation", True)
324 if hasattr(data_handler, "transformation"):
325 transformation_opts = None if calculate_fresh_transformation is True else self._load_transformation()
326 if transformation_opts is None: 326 ↛ 332line 326 didn't jump to line 332, because the condition on line 326 was never false
327 logging.info(f"start to calculate transformation parameters.")
328 kwargs = self.data_store.create_args_dict(data_handler.requirements(skip_args="station"), scope="train")
329 tmp_path = self.data_store.get_default("tmp_path", default=None)
330 transformation_opts = data_handler.transformation(stations, tmp_path=tmp_path, **kwargs)
331 else:
332 logging.info("In case no valid train data could be found due to problems with transformation, please "
333 "check your provided transformation file for compability with your data.")
334 self.data_store.set("transformation", transformation_opts)
335 if transformation_opts is not None:
336 self._store_transformation(transformation_opts)
338 def _load_transformation(self):
339 """Try to load transformation options from file if transformation_file is provided."""
340 transformation_file = self.data_store.get_default("transformation_file", None)
341 if transformation_file is not None:
342 if os.path.exists(transformation_file):
343 logging.info(f"use transformation from given transformation file: {transformation_file}")
344 with open(transformation_file, "rb") as pickle_file:
345 return dill.load(pickle_file)
346 else:
347 logging.info(f"cannot load transformation file: {transformation_file}. Use fresh calculation of "
348 f"transformation from train data.")
350 def _store_transformation(self, transformation_opts):
351 """Store transformation options locally inside experiment_path if not exists already."""
352 experiment_path = self.data_store.get("experiment_path")
353 transformation_path = os.path.join(experiment_path, "data", "transformation")
354 transformation_file = os.path.join(transformation_path, "transformation.pickle")
355 calculate_fresh_transformation = self.data_store.get_default("calculate_fresh_transformation", True)
356 if not os.path.exists(transformation_file) or calculate_fresh_transformation: 356 ↛ exitline 356 didn't return from function '_store_transformation', because the condition on line 356 was never false
357 path_config.check_path_and_create(transformation_path)
358 with open(transformation_file, "wb") as f:
359 dill.dump(transformation_opts, f, protocol=4)
360 logging.info(f"Store transformation options locally for later use at: {transformation_file}")
362 def prepare_competitors(self):
363 """
364 Prepare competitor models already in the preprocessing stage. This is performed here, because some models might
365 need to have internet access, which is depending on the operating system not possible during postprocessing.
366 This method checks currently only, if the Intelli03-ts-v1 model is requested as competitor and downloads the
367 data if required.
368 """
369 logging.info("Searching for competitors to be prepared for use.")
370 competitors = to_list(self.data_store.get_default("competitors", default=[]))
371 if len(competitors) > 0: 371 ↛ 392line 371 didn't jump to line 392, because the condition on line 371 was never false
372 for competitor_name in competitors:
373 if competitor_name.lower() == "IntelliO3-ts-v1".lower(): 373 ↛ 374line 373 didn't jump to line 374, because the condition on line 373 was never true
374 logging.info("Prepare IntelliO3-ts-v1 model")
375 from mlair.reference_models.reference_model_intellio3_v1 import IntelliO3_ts_v1
376 path = os.path.join(self.data_store.get("competitor_path"), competitor_name)
377 IntelliO3_ts_v1("IntelliO3-ts-v1", ref_store_path=path).make_reference_available_locally(remove_tmp_dir=False)
378 elif competitor_name.lower() == "CAMS".lower(): 378 ↛ 379line 378 didn't jump to line 379, because the condition on line 378 was never true
379 logging.info("Prepare CAMS forecasts")
380 from mlair.reference_models.reference_model_cams import CAMSforecast
381 data_path = self.data_store.get_default("cams_data_path", default=None)
382 path = os.path.join(self.data_store.get("competitor_path"), competitor_name)
383 stations = {}
384 for subset in ["train", "val", "test"]:
385 data_collection = self.data_store.get("data_collection", subset)
386 stations.update({str(s): s.get_coordinates() for s in data_collection if s not in stations})
387 CAMSforecast("CAMS", ref_store_path=path, data_path=data_path).make_reference_available_locally(stations)
388 else:
389 logging.info(f"No preparation required for competitor {competitor_name} as no specific instruction "
390 f"is provided.")
391 else:
392 logging.info("No preparation required because no competitor was provided to the workflow.")
394 def create_snapshot(self):
395 logging.info("create snapshot for preprocessing")
396 from mlair.configuration.snapshot_names import animals
397 for i_try in range(10):
398 snapshot_name = random.choice(animals).lower()
399 snapshot_path = os.path.abspath(self.data_store.get("snapshot_path"))
400 path_config.check_path_and_create(snapshot_path, remove_existing=False)
401 _snapshot_file = os.path.join(snapshot_path, f"snapshot_preprocessing_{snapshot_name}.pickle")
402 if not os.path.exists(_snapshot_file):
403 logging.info(f"store snapshot at: {_snapshot_file}")
404 with open(_snapshot_file, "wb") as f:
405 dill.dump(self.data_store, f, protocol=4)
406 print(_snapshot_file)
407 return
408 logging.info(f"Could not create snapshot at {_snapshot_file} as file is already existing ({i_try + 1}/10)")
409 logging.info(f"Could not create any snapshot after 10/10 tries.")
411 def load_snapshot(self, file):
412 logging.info(f"load snapshot for preprocessing from {file}")
413 with open(file, "rb") as f:
414 snapshot = dill.load(f)
415 excluded_params = ["activation", "activation_output", "add_dense_layer", "batch_normalization", "batch_path",
416 "batch_size", "block_length", "bootstrap_method", "bootstrap_path", "bootstrap_type",
417 "competitor_path", "competitors", "create_new_bootstraps", "create_new_model",
418 "create_snapshot", "data_collection", "debug_mode", "dense_layer_configuration",
419 "do_uncertainty_estimate", "dropout", "dropout_rnn", "early_stopping_epochs", "epochs",
420 "evaluate_competitors", "evaluate_feature_importance", "experiment_name", "experiment_path",
421 "exponent_last_layer", "forecast_path", "fraction_of_training", "hostname", "hpc_hosts",
422 "kernel_regularizer", "kernel_size", "layer_configuration", "log_level_stream",
423 "logging_path", "login_nodes", "loss_type", "loss_weights", "max_number_multiprocessing",
424 "model_class", "model_display_name", "model_path", "n_boots", "n_hidden", "n_layer",
425 "neighbors", "plot_list", "plot_path", "regularizer", "restore_best_model_weights",
426 "snapshot_load_path", "snapshot_path", "stations", "tmp_path", "train_model",
427 "transformation", "use_multiprocessing", ]
429 data_handler = self.data_store.get("data_handler")
430 model_class = self.data_store.get("model_class")
431 excluded_params = list(set(excluded_params + data_handler.store_attributes() + model_class.requirements()))
433 if check_nested_equality(self.data_store._store, snapshot._store, skip_args=excluded_params) is True:
434 self.update_datastore(snapshot, excluded_params=remove_items(excluded_params, ["transformation",
435 "data_collection",
436 "stations"]))
437 else:
438 raise ReferenceError("provided snapshot does not match with the current experiment setup. Abort this run!")
441def f_proc(data_handler, station, name_affix, store, return_strategy="", tmp_path=None, **kwargs):
442 """
443 Try to create a data handler for given arguments. If build fails, this station does not fulfil all requirements and
444 therefore f_proc will return None as indication. On a successful build, f_proc returns the built data handler and
445 the station that was used. This function must be implemented globally to work together with multiprocessing.
446 """
447 assert return_strategy in ["result", "reference"]
448 try:
449 res = data_handler.build(station, name_affix=name_affix, store_processed_data=store, **kwargs)
450 except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError) as e:
451 formatted_lines = traceback.format_exc().splitlines()
452 logging.info(f"remove station {station} because it raised an error: {e} -> "
453 f"{' | '.join(f_inspect_error(formatted_lines))}")
454 logging.debug(f"detailed information for removal of station {station}: {traceback.format_exc()}")
455 res = None
456 if return_strategy == "result": 456 ↛ 459line 456 didn't jump to line 459, because the condition on line 456 was never false
457 return res, station
458 else:
459 if tmp_path is None:
460 tmp_path = os.getcwd()
461 _tmp_file = os.path.join(tmp_path, f"{station}_{'%032x' % random.getrandbits(128)}.pickle")
462 with open(_tmp_file, "wb") as f:
463 dill.dump(res, f, protocol=4)
464 return _tmp_file, station
467def f_proc_create_info_df(data, meta_cols):
468 station_name = str(data.id_class)
469 meta = data.id_class.meta
470 res = {"station_name": station_name, "Y_shape": data.get_Y()[0].shape[0],
471 "meta": meta.reindex(meta_cols).values.flatten()}
472 return res
475def f_inspect_error(formatted):
476 for i in range(len(formatted) - 1, -1, -1): 476 ↛ 479line 476 didn't jump to line 479, because the loop on line 476 didn't complete
477 if "mlair/mlair" not in formatted[i]: 477 ↛ 476line 477 didn't jump to line 476, because the condition on line 477 was never false
478 return formatted[i - 3:i]
479 return formatted[-3:0]