Coverage for mlair/run_modules/pre_processing.py: 62%

278 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-12-02 15:24 +0000

1"""Pre-processing module.""" 

2 

3__author__ = "Lukas Leufen, Felix Kleinert" 

4__date__ = '2019-11-25' 

5 

6import logging 

7import os 

8import traceback 

9from typing import Tuple 

10import multiprocessing 

11import requests 

12import psutil 

13import random 

14import dill 

15 

16import pandas as pd 

17 

18from mlair.data_handler import DataCollection, AbstractDataHandler 

19from mlair.helpers import TimeTracking, to_list, tables, remove_items 

20from mlair.configuration import path_config 

21from mlair.helpers.data_sources.toar_data import EmptyQueryResult 

22from mlair.helpers.testing import check_nested_equality 

23from mlair.run_modules.run_environment import RunEnvironment 

24 

25 

26class PreProcessing(RunEnvironment): 

27 """ 

28 Pre-process your data by using this class. 

29 

30 Schedule of pre-processing: 

31 #. load and check valid stations (either download or load from disk) 

32 #. split subsets (train, val, test, train & val) 

33 #. create small report on data metrics 

34 

35 Required objects [scope] from data store: 

36 * all elements from `DEFAULT_ARGS_LIST` in scope preprocessing for general data loading 

37 * all elements from `DEFAULT_ARGS_LIST` in scopes [train, val, test, train_val] for custom subset settings 

38 * `fraction_of_training` [.] 

39 * `experiment_path` [.] 

40 * `use_all_stations_on_all_data_sets` [.] 

41 

42 Optional objects 

43 * all elements from `DEFAULT_KWARGS_LIST` in scope preprocessing for general data loading 

44 * all elements from `DEFAULT_KWARGS_LIST` in scopes [train, val, test, train_val] for custom subset settings 

45 

46 Sets 

47 * `stations` in [., train, val, test, train_val] 

48 * `generator` in [train, val, test, train_val] 

49 * `transformation` [.] 

50 

51 Creates 

52 * all input and output data in `data_path` 

53 * latex reports in `experiment_path/latex_report` 

54 

55 """ 

56 

57 def __init__(self): 

58 """Set up and run pre-processing.""" 

59 super().__init__() 

60 self._run() 

61 

62 def _run(self): 

63 snapshot_load_path = self.data_store.get_default("snapshot_load_path", default=None) 

64 if snapshot_load_path is None: 64 ↛ 74line 64 didn't jump to line 74, because the condition on line 64 was never false

65 stations = self.data_store.get("stations") 

66 data_handler = self.data_store.get("data_handler") 

67 _, valid_stations = self.validate_station(data_handler, stations, 

68 "preprocessing") # , store_processed_data=False) 

69 if len(valid_stations) == 0: 69 ↛ 70line 69 didn't jump to line 70, because the condition on line 69 was never true

70 raise ValueError("Couldn't find any valid data according to given parameters. Abort experiment run.") 

71 self.data_store.set("stations", valid_stations) 

72 self.split_train_val_test() 

73 else: 

74 self.load_snapshot(snapshot_load_path) 

75 self.report_pre_processing() 

76 self.prepare_competitors() 

77 if self.data_store.get_default("create_snapshot", False) is True: 77 ↛ 78line 77 didn't jump to line 78, because the condition on line 77 was never true

78 self.create_snapshot() 

79 

80 def report_pre_processing(self): 

81 """Log some metrics on data and create latex report.""" 

82 logging.debug(20 * '##') 

83 n_train = len(self.data_store.get('data_collection', 'train')) 

84 n_val = len(self.data_store.get('data_collection', 'val')) 

85 n_test = len(self.data_store.get('data_collection', 'test')) 

86 n_total = n_train + n_val + n_test 

87 logging.debug(f"Number of all stations: {n_total}") 

88 logging.debug(f"Number of training stations: {n_train}") 

89 logging.debug(f"Number of val stations: {n_val}") 

90 logging.debug(f"Number of test stations: {n_test}") 

91 self.create_latex_report() 

92 

93 def create_latex_report(self): 

94 """ 

95 Create tables with information on the station meta data and a summary on subset sample sizes. 

96 

97 * station_sample_size.md: see table below as markdown 

98 * station_sample_size.tex: same as table below as latex table 

99 * station_sample_size_short.tex: reduced size table without any meta data besides station ID, as latex table 

100 

101 All tables are stored inside experiment_path inside the folder latex_report. The table format (e.g. which meta 

102 data is highlighted) is currently hardcoded to have a stable table style. If further styles are needed, it is 

103 better to add an additional style than modifying the existing table styles. 

104 

105 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ 

106 | stat. ID | station_name | station_lon | station_lat | station_alt | train | val | test | 

107 +============+===========================================+===============+===============+===============+=========+=======+========+ 

108 | DEBW013 | Stuttgart Bad Cannstatt | 9.2297 | 48.8088 | 235 | 1434 | 712 | 1080 | 

109 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ 

110 | DEBW076 | Baden-Baden | 8.2202 | 48.7731 | 148 | 3037 | 722 | 710 | 

111 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ 

112 | DEBW087 | Schwäbische_Alb | 9.2076 | 48.3458 | 798 | 3044 | 714 | 1087 | 

113 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ 

114 | DEBW107 | Tübingen | 9.0512 | 48.5077 | 325 | 1803 | 715 | 1087 | 

115 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ 

116 | DEBY081 | Garmisch-Partenkirchen/Kreuzeckbahnstraße | 11.0631 | 47.4764 | 735 | 2935 | 525 | 714 | 

117 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ 

118 | # Stations | nan | nan | nan | nan | 6 | 6 | 6 | 

119 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ 

120 | # Samples | nan | nan | nan | nan | 12253 | 3388 | 4678 | 

121 +------------+-------------------------------------------+---------------+---------------+---------------+---------+-------+--------+ 

122 

123 """ 

124 meta_cols = ["name", "lat", "lon", "alt", "country", "state", "type", "type_of_area", "toar1_category"] 

125 meta_round = ["lat", "lon", "alt"] 

126 precision = 4 

127 path = os.path.join(self.data_store.get("experiment_path"), "latex_report") 

128 path_config.check_path_and_create(path) 

129 names_of_set = ["train", "val", "test"] 

130 df = self.create_info_df(meta_cols, meta_round, names_of_set, precision) 

131 column_format = tables.create_column_format_for_tex(df) 

132 tables.save_to_tex(path=path, filename="station_sample_size.tex", column_format=column_format, df=df) 

133 tables.save_to_md(path=path, filename="station_sample_size.md", df=df) 

134 df_nometa = df.drop(meta_cols, axis=1) 

135 column_format = tables.create_column_format_for_tex(df) 

136 tables.save_to_tex(path=path, filename="station_sample_size_short.tex", column_format=column_format, 

137 df=df_nometa) 

138 tables.save_to_md(path=path, filename="station_sample_size_short.md", df=df_nometa) 

139 df_descr = self.create_describe_df(df_nometa) 

140 column_format = tables.create_column_format_for_tex(df_descr) 

141 tables.save_to_tex(path=path, filename="station_describe_short.tex", column_format=column_format, df=df_descr) 

142 tables.save_to_md(path=path, filename="station_describe_short.md", df=df_descr) 

143 

144 @staticmethod 

145 def create_describe_df(df, percentiles=None, ignore_last_lines: int = 2): 

146 if percentiles is None: 146 ↛ 148line 146 didn't jump to line 148, because the condition on line 146 was never false

147 percentiles = [.05, .1, .25, .5, .75, .9, .95] 

148 df_descr = df.iloc[:-ignore_last_lines].astype('float32').describe( 

149 percentiles=percentiles).astype("int32", errors="ignore") 

150 df_descr = pd.concat([df.loc[['# Samples']], df_descr]).T 

151 df_descr.rename(columns={"# Samples": "no. samples", "count": "no. stations"}, inplace=True) 

152 df_descr_colnames = list(df_descr.columns) 

153 df_descr_colnames = [df_descr_colnames[1]] + [df_descr_colnames[0]] + df_descr_colnames[2:] 

154 df_descr = df_descr[df_descr_colnames] 

155 return df_descr 

156 

157 def create_info_df(self, meta_cols, meta_round, names_of_set, precision): 

158 use_multiprocessing = self.data_store.get("use_multiprocessing") 

159 max_process = self.data_store.get("max_number_multiprocessing") 

160 df = pd.DataFrame(columns=meta_cols + names_of_set) 

161 for set_name in names_of_set: 

162 data = self.data_store.get("data_collection", set_name) 

163 n_process = min([psutil.cpu_count(logical=False), len(data), max_process]) # use only physical cpus 

164 if n_process > 1 and use_multiprocessing is True: # parallel solution 164 ↛ 165line 164 didn't jump to line 165, because the condition on line 164 was never true

165 logging.info(f"use parallel create_info_df ({set_name})") 

166 pool = multiprocessing.Pool(n_process) 

167 logging.info(f"running {getattr(pool, '_processes')} processes in parallel") 

168 output = [pool.apply_async(f_proc_create_info_df, args=(station, meta_cols)) for station in data] 

169 for i, p in enumerate(output): 

170 res = p.get() 

171 station_name, shape, meta = res["station_name"], res["Y_shape"], res["meta"] 

172 df.loc[station_name, set_name] = shape 

173 if df.loc[station_name, meta_cols].isnull().any(): 

174 df.loc[station_name, meta_cols] = meta 

175 logging.info(f"...finished: {station_name} ({int((i + 1.) / len(output) * 100)}%)") 

176 pool.close() 

177 pool.join() 

178 else: # serial solution 

179 logging.info(f"use serial create_info_df ({set_name})") 

180 for station in data: 

181 res = f_proc_create_info_df(station, meta_cols) 

182 station_name, shape, meta = res["station_name"], res["Y_shape"], res["meta"] 

183 df.loc[station_name, set_name] = shape 

184 if df.loc[station_name, meta_cols].isnull().any(): 

185 df.loc[station_name, meta_cols] = meta 

186 df.loc["# Samples", set_name] = df.loc[:, set_name].sum() 

187 assert len(data) == df.loc[:, set_name].count() - 1 

188 df.loc["# Stations", set_name] = len(data) 

189 df[meta_round] = df[meta_round].astype(float).round(precision) 

190 df.sort_index(inplace=True) 

191 df = df.reindex(df.index.drop(["# Stations", "# Samples"]).to_list() + ["# Stations", "# Samples"], ) 

192 df.index.name = 'stat. ID' 

193 return df 

194 

195 def split_train_val_test(self) -> None: 

196 """ 

197 Split data into subsets. 

198 

199 Currently: train, val, test and train_val (actually this is only the merge of train and val, but as an separate 

200 data_collection). IMPORTANT: Do not change to order of the execution of create_set_split. The train subset needs 

201 always to be executed at first, to set a proper transformation. 

202 """ 

203 fraction_of_training = self.data_store.get("fraction_of_training") 

204 stations = self.data_store.get("stations") 

205 train_index, val_index, test_index, train_val_index = self.split_set_indices(len(stations), 

206 fraction_of_training) 

207 subset_names = ["train", "val", "test", "train_val"] 

208 if subset_names[0] != "train": # pragma: no cover 

209 raise AssertionError(f"Make sure, that the train subset is always at first execution position! Given subset" 

210 f"order was: {subset_names}.") 

211 for (ind, scope) in zip([train_index, val_index, test_index, train_val_index], subset_names): 

212 self.create_set_split(ind, scope) 

213 

214 @staticmethod 

215 def split_set_indices(total_length: int, fraction: float) -> Tuple[slice, slice, slice, slice]: 

216 """ 

217 Create the training, validation and test subset slice indices for given total_length. 

218 

219 The test data consists on (1-fraction) of total_length (fraction*len:end). Train and validation data therefore 

220 are made from fraction of total_length (0:fraction*len). Train and validation data is split by the factor 0.8 

221 for train and 0.2 for validation. In addition, split_set_indices returns also the combination of training and 

222 validation subset. 

223 

224 :param total_length: list with all objects to split 

225 :param fraction: ratio between test and union of train/val data 

226 

227 :return: slices for each subset in the order: train, val, test, train_val 

228 """ 

229 pos_test_split = int(total_length * fraction) 

230 train_index = slice(0, int(pos_test_split * 0.8)) 

231 val_index = slice(int(pos_test_split * 0.8), pos_test_split) 

232 test_index = slice(pos_test_split, total_length) 

233 train_val_index = slice(0, pos_test_split) 

234 return train_index, val_index, test_index, train_val_index 

235 

236 def create_set_split(self, index_list: slice, set_name: str) -> None: 

237 # get set stations 

238 stations = self.data_store.get("stations", scope=set_name) 

239 if self.data_store.get("use_all_stations_on_all_data_sets"): 

240 set_stations = stations 

241 else: 

242 set_stations = stations[index_list] 

243 logging.debug(f"{set_name.capitalize()} stations (len={len(set_stations)}): {set_stations}") 

244 # create set data_collection and store 

245 data_handler = self.data_store.get("data_handler") 

246 collection, valid_stations = self.validate_station(data_handler, set_stations, set_name) 

247 self.data_store.set("stations", valid_stations, scope=set_name) 

248 self.data_store.set("data_collection", collection, scope=set_name) 

249 

250 def validate_station(self, data_handler: AbstractDataHandler, set_stations, set_name=None, 

251 store_processed_data=True): 

252 """ 

253 Check if all given stations in `all_stations` are valid. 

254 

255 Valid means, that there is data available for the given time range (is included in `kwargs`). The shape and the 

256 loading time are logged in debug mode. 

257 

258 :return: Corrected list containing only valid station IDs. 

259 """ 

260 t_outer = TimeTracking() 

261 logging.info(f"check valid stations started{' (%s)' % (set_name if set_name is not None else 'all')}") 

262 # calculate transformation using train data 

263 if set_name == "train": 

264 logging.info("setup transformation using train data exclusively") 

265 self.transformation(data_handler, set_stations) 

266 # start station check 

267 collection = DataCollection(name=set_name) 

268 valid_stations = [] 

269 kwargs = self.data_store.create_args_dict(data_handler.requirements(skip_args="station"), scope=set_name) 

270 use_multiprocessing = self.data_store.get("use_multiprocessing") 

271 tmp_path = self.data_store.get("tmp_path") 

272 

273 max_process = self.data_store.get("max_number_multiprocessing") 

274 n_process = min([psutil.cpu_count(logical=False), len(set_stations), max_process]) # use only physical cpus 

275 if n_process > 1 and use_multiprocessing is True: # parallel solution 275 ↛ 276line 275 didn't jump to line 276, because the condition on line 275 was never true

276 logging.info("use parallel validate station approach") 

277 pool = multiprocessing.Pool(n_process) 

278 logging.info(f"running {getattr(pool, '_processes')} processes in parallel") 

279 kwargs.update({"tmp_path": tmp_path, "return_strategy": "reference"}) 

280 output = [ 

281 pool.apply_async(f_proc, args=(data_handler, station, set_name, store_processed_data), kwds=kwargs) 

282 for station in set_stations] 

283 for i, p in enumerate(output): 

284 _res_file, s = p.get() 

285 logging.info(f"...finished: {s} ({int((i + 1.) / len(output) * 100)}%)") 

286 with open(_res_file, "rb") as f: 

287 dh = dill.load(f) 

288 os.remove(_res_file) 

289 if dh is not None: 

290 collection.add(dh) 

291 valid_stations.append(s) 

292 pool.close() 

293 pool.join() 

294 else: # serial solution 

295 logging.info("use serial validate station approach") 

296 kwargs.update({"return_strategy": "result"}) 

297 for station in set_stations: 

298 dh, s = f_proc(data_handler, station, set_name, store_processed_data, **kwargs) 

299 if dh is not None: 

300 collection.add(dh) 

301 valid_stations.append(s) 

302 

303 logging.info(f"run for {t_outer} to check {len(set_stations)} station(s). Found {len(collection)}/" 

304 f"{len(set_stations)} valid stations ({set_name}).") 

305 if set_name == "train": 

306 self.store_data_handler_attributes(data_handler, collection) 

307 return collection, valid_stations 

308 

309 def store_data_handler_attributes(self, data_handler, collection): 

310 store_attributes = data_handler.store_attributes() 

311 if len(store_attributes) > 0: 311 ↛ 312line 311 didn't jump to line 312, because the condition on line 311 was never true

312 logging.info(f"store following parameters ({len(store_attributes)}) requested by the data handler: " 

313 f"{','.join(store_attributes)}") 

314 attrs = {} 

315 for dh in collection: 

316 station = str(dh) 

317 for k, v in dh.get_store_attributes().items(): 

318 attrs[k] = dict(attrs.get(k, {}), **{station: v}) 

319 for k, v in attrs.items(): 

320 self.data_store.set(k, v) 

321 

322 def transformation(self, data_handler: AbstractDataHandler, stations): 

323 calculate_fresh_transformation = self.data_store.get_default("calculate_fresh_transformation", True) 

324 if hasattr(data_handler, "transformation"): 

325 transformation_opts = None if calculate_fresh_transformation is True else self._load_transformation() 

326 if transformation_opts is None: 326 ↛ 332line 326 didn't jump to line 332, because the condition on line 326 was never false

327 logging.info(f"start to calculate transformation parameters.") 

328 kwargs = self.data_store.create_args_dict(data_handler.requirements(skip_args="station"), scope="train") 

329 tmp_path = self.data_store.get_default("tmp_path", default=None) 

330 transformation_opts = data_handler.transformation(stations, tmp_path=tmp_path, **kwargs) 

331 else: 

332 logging.info("In case no valid train data could be found due to problems with transformation, please " 

333 "check your provided transformation file for compability with your data.") 

334 self.data_store.set("transformation", transformation_opts) 

335 if transformation_opts is not None: 

336 self._store_transformation(transformation_opts) 

337 

338 def _load_transformation(self): 

339 """Try to load transformation options from file if transformation_file is provided.""" 

340 transformation_file = self.data_store.get_default("transformation_file", None) 

341 if transformation_file is not None: 

342 if os.path.exists(transformation_file): 

343 logging.info(f"use transformation from given transformation file: {transformation_file}") 

344 with open(transformation_file, "rb") as pickle_file: 

345 return dill.load(pickle_file) 

346 else: 

347 logging.info(f"cannot load transformation file: {transformation_file}. Use fresh calculation of " 

348 f"transformation from train data.") 

349 

350 def _store_transformation(self, transformation_opts): 

351 """Store transformation options locally inside experiment_path if not exists already.""" 

352 experiment_path = self.data_store.get("experiment_path") 

353 transformation_path = os.path.join(experiment_path, "data", "transformation") 

354 transformation_file = os.path.join(transformation_path, "transformation.pickle") 

355 calculate_fresh_transformation = self.data_store.get_default("calculate_fresh_transformation", True) 

356 if not os.path.exists(transformation_file) or calculate_fresh_transformation: 356 ↛ exitline 356 didn't return from function '_store_transformation', because the condition on line 356 was never false

357 path_config.check_path_and_create(transformation_path) 

358 with open(transformation_file, "wb") as f: 

359 dill.dump(transformation_opts, f, protocol=4) 

360 logging.info(f"Store transformation options locally for later use at: {transformation_file}") 

361 

362 def prepare_competitors(self): 

363 """ 

364 Prepare competitor models already in the preprocessing stage. This is performed here, because some models might 

365 need to have internet access, which is depending on the operating system not possible during postprocessing. 

366 This method checks currently only, if the Intelli03-ts-v1 model is requested as competitor and downloads the 

367 data if required. 

368 """ 

369 logging.info("Searching for competitors to be prepared for use.") 

370 competitors = to_list(self.data_store.get_default("competitors", default=[])) 

371 if len(competitors) > 0: 371 ↛ 392line 371 didn't jump to line 392, because the condition on line 371 was never false

372 for competitor_name in competitors: 

373 if competitor_name.lower() == "IntelliO3-ts-v1".lower(): 373 ↛ 374line 373 didn't jump to line 374, because the condition on line 373 was never true

374 logging.info("Prepare IntelliO3-ts-v1 model") 

375 from mlair.reference_models.reference_model_intellio3_v1 import IntelliO3_ts_v1 

376 path = os.path.join(self.data_store.get("competitor_path"), competitor_name) 

377 IntelliO3_ts_v1("IntelliO3-ts-v1", ref_store_path=path).make_reference_available_locally(remove_tmp_dir=False) 

378 elif competitor_name.lower() == "CAMS".lower(): 378 ↛ 379line 378 didn't jump to line 379, because the condition on line 378 was never true

379 logging.info("Prepare CAMS forecasts") 

380 from mlair.reference_models.reference_model_cams import CAMSforecast 

381 data_path = self.data_store.get_default("cams_data_path", default=None) 

382 path = os.path.join(self.data_store.get("competitor_path"), competitor_name) 

383 stations = {} 

384 for subset in ["train", "val", "test"]: 

385 data_collection = self.data_store.get("data_collection", subset) 

386 stations.update({str(s): s.get_coordinates() for s in data_collection if s not in stations}) 

387 CAMSforecast("CAMS", ref_store_path=path, data_path=data_path).make_reference_available_locally(stations) 

388 else: 

389 logging.info(f"No preparation required for competitor {competitor_name} as no specific instruction " 

390 f"is provided.") 

391 else: 

392 logging.info("No preparation required because no competitor was provided to the workflow.") 

393 

394 def create_snapshot(self): 

395 logging.info("create snapshot for preprocessing") 

396 from mlair.configuration.snapshot_names import animals 

397 for i_try in range(10): 

398 snapshot_name = random.choice(animals).lower() 

399 snapshot_path = os.path.abspath(self.data_store.get("snapshot_path")) 

400 path_config.check_path_and_create(snapshot_path, remove_existing=False) 

401 _snapshot_file = os.path.join(snapshot_path, f"snapshot_preprocessing_{snapshot_name}.pickle") 

402 if not os.path.exists(_snapshot_file): 

403 logging.info(f"store snapshot at: {_snapshot_file}") 

404 with open(_snapshot_file, "wb") as f: 

405 dill.dump(self.data_store, f, protocol=4) 

406 print(_snapshot_file) 

407 return 

408 logging.info(f"Could not create snapshot at {_snapshot_file} as file is already existing ({i_try + 1}/10)") 

409 logging.info(f"Could not create any snapshot after 10/10 tries.") 

410 

411 def load_snapshot(self, file): 

412 logging.info(f"load snapshot for preprocessing from {file}") 

413 with open(file, "rb") as f: 

414 snapshot = dill.load(f) 

415 excluded_params = ["activation", "activation_output", "add_dense_layer", "batch_normalization", "batch_path", 

416 "batch_size", "block_length", "bootstrap_method", "bootstrap_path", "bootstrap_type", 

417 "competitor_path", "competitors", "create_new_bootstraps", "create_new_model", 

418 "create_snapshot", "data_collection", "debug_mode", "dense_layer_configuration", 

419 "do_uncertainty_estimate", "dropout", "dropout_rnn", "early_stopping_epochs", "epochs", 

420 "evaluate_competitors", "evaluate_feature_importance", "experiment_name", "experiment_path", 

421 "exponent_last_layer", "forecast_path", "fraction_of_training", "hostname", "hpc_hosts", 

422 "kernel_regularizer", "kernel_size", "layer_configuration", "log_level_stream", 

423 "logging_path", "login_nodes", "loss_type", "loss_weights", "max_number_multiprocessing", 

424 "model_class", "model_display_name", "model_path", "n_boots", "n_hidden", "n_layer", 

425 "neighbors", "plot_list", "plot_path", "regularizer", "restore_best_model_weights", 

426 "snapshot_load_path", "snapshot_path", "stations", "tmp_path", "train_model", 

427 "transformation", "use_multiprocessing", ] 

428 

429 data_handler = self.data_store.get("data_handler") 

430 model_class = self.data_store.get("model_class") 

431 excluded_params = list(set(excluded_params + data_handler.store_attributes() + model_class.requirements())) 

432 

433 if check_nested_equality(self.data_store._store, snapshot._store, skip_args=excluded_params) is True: 

434 self.update_datastore(snapshot, excluded_params=remove_items(excluded_params, ["transformation", 

435 "data_collection", 

436 "stations"])) 

437 else: 

438 raise ReferenceError("provided snapshot does not match with the current experiment setup. Abort this run!") 

439 

440 

441def f_proc(data_handler, station, name_affix, store, return_strategy="", tmp_path=None, **kwargs): 

442 """ 

443 Try to create a data handler for given arguments. If build fails, this station does not fulfil all requirements and 

444 therefore f_proc will return None as indication. On a successful build, f_proc returns the built data handler and 

445 the station that was used. This function must be implemented globally to work together with multiprocessing. 

446 """ 

447 assert return_strategy in ["result", "reference"] 

448 try: 

449 res = data_handler.build(station, name_affix=name_affix, store_processed_data=store, **kwargs) 

450 except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError) as e: 

451 formatted_lines = traceback.format_exc().splitlines() 

452 logging.info(f"remove station {station} because it raised an error: {e} -> " 

453 f"{' | '.join(f_inspect_error(formatted_lines))}") 

454 logging.debug(f"detailed information for removal of station {station}: {traceback.format_exc()}") 

455 res = None 

456 if return_strategy == "result": 456 ↛ 459line 456 didn't jump to line 459, because the condition on line 456 was never false

457 return res, station 

458 else: 

459 if tmp_path is None: 

460 tmp_path = os.getcwd() 

461 _tmp_file = os.path.join(tmp_path, f"{station}_{'%032x' % random.getrandbits(128)}.pickle") 

462 with open(_tmp_file, "wb") as f: 

463 dill.dump(res, f, protocol=4) 

464 return _tmp_file, station 

465 

466 

467def f_proc_create_info_df(data, meta_cols): 

468 station_name = str(data.id_class) 

469 meta = data.id_class.meta 

470 res = {"station_name": station_name, "Y_shape": data.get_Y()[0].shape[0], 

471 "meta": meta.reindex(meta_cols).values.flatten()} 

472 return res 

473 

474 

475def f_inspect_error(formatted): 

476 for i in range(len(formatted) - 1, -1, -1): 476 ↛ 479line 476 didn't jump to line 479, because the loop on line 476 didn't complete

477 if "mlair/mlair" not in formatted[i]: 477 ↛ 476line 477 didn't jump to line 476, because the condition on line 477 was never false

478 return formatted[i - 3:i] 

479 return formatted[-3:0]