Coverage for mlair/data_handler/data_handler_mixed_sampling.py: 29%

303 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-06-30 10:40 +0000

1__author__ = 'Lukas Leufen' 

2__date__ = '2020-11-05' 

3 

4from mlair.data_handler.data_handler_single_station import DataHandlerSingleStation 

5from mlair.data_handler.data_handler_with_filter import DataHandlerFirFilterSingleStation, \ 

6 DataHandlerFilterSingleStation, DataHandlerClimateFirFilterSingleStation 

7from mlair.data_handler.data_handler_with_filter import DataHandlerClimateFirFilter, DataHandlerFirFilter 

8from mlair.data_handler import DefaultDataHandler 

9from mlair import helpers 

10from mlair.helpers import to_list, sort_like 

11from mlair.configuration.defaults import DEFAULT_SAMPLING, DEFAULT_INTERPOLATION_LIMIT, DEFAULT_INTERPOLATION_METHOD 

12from mlair.helpers.filter import filter_width_kzf 

13 

14import copy 

15import datetime as dt 

16from typing import Any 

17from functools import partial 

18 

19import pandas as pd 

20import xarray as xr 

21 

22 

23class DataHandlerMixedSamplingSingleStation(DataHandlerSingleStation): 

24 

25 def __init__(self, *args, **kwargs): 

26 """ 

27 This data handler requires the kwargs sampling, interpolation_limit, and interpolation_method to be a 2D tuple 

28 for input and target data. If one of these kwargs is only a single argument, it will be applied to inputs and 

29 targets with this value. If one of these kwargs is a 2-dim tuple, the first element is applied to inputs and the 

30 second to targets respectively. If one of these kwargs is not provided, it is filled up with the same default 

31 value for inputs and targets. 

32 """ 

33 self.update_kwargs("sampling", DEFAULT_SAMPLING, kwargs) 

34 self.update_kwargs("interpolation_limit", DEFAULT_INTERPOLATION_LIMIT, kwargs) 

35 self.update_kwargs("interpolation_method", DEFAULT_INTERPOLATION_METHOD, kwargs) 

36 super().__init__(*args, **kwargs) 

37 

38 @staticmethod 

39 def update_kwargs(parameter_name: str, default: Any, kwargs: dict): 

40 """ 

41 Update a single element of kwargs inplace to be usable for inputs and targets. 

42 

43 The updated value in the kwargs dictionary is a tuple consisting on the value applicable to the inputs as first 

44 element and the target's value as second element: (<value_input>, <value_target>). If the value for the given 

45 parameter_name is already a tuple, it is checked to have exact two entries. If the paramter_name is not 

46 included in kwargs, the given default value is used and applied to both elements of the update tuple. 

47 

48 :param parameter_name: name of the parameter that should be transformed to 2-dim 

49 :param default: the default value to fill if parameter is not in kwargs 

50 :param kwargs: the kwargs dictionary containing parameters 

51 """ 

52 parameter = kwargs.get(parameter_name, default) 

53 if not isinstance(parameter, tuple): 

54 parameter = (parameter, parameter) 

55 assert len(parameter) == 2 # (inputs, targets) 

56 kwargs.update({parameter_name: parameter}) 

57 

58 def make_input_target(self): 

59 self._data = tuple(map(self.load_and_interpolate, [0, 1])) # load input (0) and target (1) data 

60 self.set_inputs_and_targets() 

61 

62 def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: 

63 vars = [self.variables, self.target_var] 

64 stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) 

65 data_origin = helpers.select_from_dict(self.data_origin, vars[ind]) 

66 data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], 

67 self.store_data_locally, data_origin, self.start, self.end) 

68 data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], 

69 limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) 

70 

71 return data 

72 

73 def set_inputs_and_targets(self): 

74 self.input_data = self._data[0].sel({self.target_dim: helpers.to_list(self.variables)}) 

75 self.target_data = self._data[1].sel({self.target_dim: helpers.to_list(self.target_var)}) 

76 

77 def setup_data_path(self, data_path, sampling): 

78 """Sets two paths instead of single path. Expects sampling arg to be a list with two entries""" 

79 assert len(sampling) == 2 

80 return list(map(lambda x: super(__class__, self).setup_data_path(data_path, x), sampling)) 

81 

82 def _extract_lazy(self, lazy_data): 

83 _data, self.meta, _input_data, _target_data = lazy_data 

84 f_prep = partial(self._slice_prep, start=self.start, end=self.end) 

85 self._data = f_prep(_data[0]), f_prep(_data[1]) 

86 self.input_data, self.target_data = list(map(f_prep, [_input_data, _target_data])) 

87 

88 

89class DataHandlerMixedSampling(DefaultDataHandler): 

90 """Data handler using mixed sampling for input and target.""" 

91 

92 data_handler = DataHandlerMixedSamplingSingleStation 

93 data_handler_transformation = DataHandlerMixedSamplingSingleStation 

94 _requirements = data_handler.requirements() 

95 

96 

97class DataHandlerMixedSamplingWithFilterSingleStation(DataHandlerMixedSamplingSingleStation, 

98 DataHandlerFilterSingleStation): 

99 

100 def __init__(self, *args, **kwargs): 

101 super().__init__(*args, **kwargs) 

102 

103 def _check_sampling(self, **kwargs): 

104 assert kwargs.get("sampling") == ("hourly", "daily") 

105 

106 def apply_filter(self): 

107 raise NotImplementedError 

108 

109 def create_filter_index(self) -> pd.Index: 

110 """Create name for filter dimension.""" 

111 raise NotImplementedError 

112 

113 def _create_lazy_data(self): 

114 raise NotImplementedError 

115 

116 def make_input_target(self): 

117 """ 

118 A FIR filter is applied on the input data that has hourly resolution. Labels Y are provided as aggregated values 

119 with daily resolution. 

120 """ 

121 self._data = tuple(map(self.load_and_interpolate, [0, 1])) # load input (0) and target (1) data 

122 self.set_inputs_and_targets() 

123 self.apply_filter() 

124 

125 def estimate_filter_width(self): 

126 """Return maximum filter width.""" 

127 raise NotImplementedError 

128 

129 @staticmethod 

130 def _add_time_delta(date, delta): 

131 new_date = dt.datetime.strptime(date, "%Y-%m-%d") + dt.timedelta(hours=delta) 

132 return new_date.strftime("%Y-%m-%d") 

133 

134 def update_start_end(self, ind): 

135 if ind == 0: # for inputs 

136 estimated_filter_width = self.estimate_filter_width() 

137 start = self._add_time_delta(self.start, -estimated_filter_width) 

138 end = self._add_time_delta(self.end, estimated_filter_width) 

139 else: # target 

140 start, end = self.start, self.end 

141 return start, end 

142 

143 def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: 

144 

145 start, end = self.update_start_end(ind) 

146 vars = [self.variables, self.target_var] 

147 stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) 

148 data_origin = helpers.select_from_dict(self.data_origin, vars[ind]) 

149 

150 data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], 

151 self.store_data_locally, data_origin, start, end) 

152 data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], 

153 limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) 

154 return data 

155 

156 def _extract_lazy(self, lazy_data): 

157 _data, self.meta, _input_data, _target_data = lazy_data 

158 start_inp, end_inp = self.update_start_end(0) 

159 self._data = tuple(map(lambda x: self._slice_prep(_data[x], *self.update_start_end(x)), [0, 1])) 

160 self.input_data = self._slice_prep(_input_data, start_inp, end_inp) 

161 self.target_data = self._slice_prep(_target_data, self.start, self.end) 

162 

163 

164class DataHandlerMixedSamplingWithFirFilterSingleStation(DataHandlerMixedSamplingWithFilterSingleStation, 

165 DataHandlerFirFilterSingleStation): 

166 

167 def __init__(self, *args, **kwargs): 

168 super().__init__(*args, **kwargs) 

169 

170 def estimate_filter_width(self): 

171 """Filter width is determined by the filter with the highest order.""" 

172 if isinstance(self.filter_order[0], tuple): 

173 return max([filter_width_kzf(*e) for e in self.filter_order]) 

174 else: 

175 return max(self.filter_order) 

176 

177 def apply_filter(self): 

178 DataHandlerFirFilterSingleStation.apply_filter(self) 

179 

180 def create_filter_index(self, add_unfiltered_index=True) -> pd.Index: 

181 return DataHandlerFirFilterSingleStation.create_filter_index(self, add_unfiltered_index=add_unfiltered_index) 

182 

183 def _extract_lazy(self, lazy_data): 

184 _data, _meta, _input_data, _target_data, self.fir_coeff, self.filter_dim_order = lazy_data 

185 DataHandlerMixedSamplingWithFilterSingleStation._extract_lazy(self, (_data, _meta, _input_data, _target_data)) 

186 

187 def _create_lazy_data(self): 

188 return DataHandlerFirFilterSingleStation._create_lazy_data(self) 

189 

190 @staticmethod 

191 def _get_fs(**kwargs): 

192 """Return frequency in 1/day (not Hz)""" 

193 sampling = kwargs.get("sampling")[0] 

194 if sampling == "daily": 

195 return 1 

196 elif sampling == "hourly": 

197 return 24 

198 else: 

199 raise ValueError(f"Unknown sampling rate {sampling}. Only daily and hourly resolution is supported.") 

200 

201 

202class DataHandlerMixedSamplingWithFirFilter(DataHandlerFirFilter): 

203 """Data handler using mixed sampling for input and target. Inputs are temporal filtered.""" 

204 

205 data_handler = DataHandlerMixedSamplingWithFirFilterSingleStation 

206 data_handler_transformation = DataHandlerMixedSamplingWithFirFilterSingleStation 

207 _requirements = data_handler.requirements() 

208 

209 

210class DataHandlerMixedSamplingWithClimateFirFilterSingleStation(DataHandlerClimateFirFilterSingleStation, 

211 DataHandlerMixedSamplingWithFirFilterSingleStation): 

212 

213 def __init__(self, *args, **kwargs): 

214 super().__init__(*args, **kwargs) 

215 

216 def _extract_lazy(self, lazy_data): 

217 _data, _meta, _input_data, _target_data, self.climate_filter_coeff, self.apriori, self.all_apriori, \ 

218 self.filter_dim_order = lazy_data 

219 DataHandlerMixedSamplingWithFilterSingleStation._extract_lazy(self, (_data, _meta, _input_data, _target_data)) 

220 

221 

222class DataHandlerMixedSamplingWithClimateFirFilter(DataHandlerClimateFirFilter): 

223 """Data handler using mixed sampling for input and target. Inputs are temporal filtered.""" 

224 

225 data_handler = DataHandlerMixedSamplingWithClimateFirFilterSingleStation 

226 data_handler_transformation = DataHandlerMixedSamplingWithClimateFirFilterSingleStation 

227 data_handler_unfiltered = DataHandlerMixedSamplingSingleStation 

228 _requirements = list(set(data_handler.requirements() + data_handler_unfiltered.requirements())) 

229 DEFAULT_FILTER_ADD_UNFILTERED = False 

230 

231 def __init__(self, *args, data_handler_class_unfiltered: data_handler_unfiltered = None, 

232 filter_add_unfiltered: bool = DEFAULT_FILTER_ADD_UNFILTERED, **kwargs): 

233 self.dh_unfiltered = data_handler_class_unfiltered 

234 self.filter_add_unfiltered = filter_add_unfiltered 

235 super().__init__(*args, **kwargs) 

236 

237 def _create_collection(self): 

238 collection = super()._create_collection() 

239 if self.filter_add_unfiltered is True and self.dh_unfiltered is not None: 

240 collection.append(self.dh_unfiltered) 

241 return collection 

242 

243 @classmethod 

244 def build(cls, station: str, **kwargs): 

245 sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler.requirements() if k in kwargs} 

246 filter_add_unfiltered = kwargs.get("filter_add_unfiltered", False) 

247 sp_keys = cls.build_update_transformation(sp_keys, dh_type="filtered") 

248 sp = cls.data_handler(station, **sp_keys) 

249 if filter_add_unfiltered is True: 

250 sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_unfiltered.requirements() if k in kwargs} 

251 sp_keys = cls.build_update_transformation(sp_keys, dh_type="unfiltered") 

252 sp_unfiltered = cls.data_handler_unfiltered(station, **sp_keys) 

253 else: 

254 sp_unfiltered = None 

255 dp_args = {k: copy.deepcopy(kwargs[k]) for k in cls.own_args("id_class") if k in kwargs} 

256 return cls(sp, data_handler_class_unfiltered=sp_unfiltered, **dp_args) 

257 

258 @classmethod 

259 def build_update_transformation(cls, kwargs_dict, dh_type="filtered"): 

260 if "transformation" in kwargs_dict: 

261 trafo_opts = kwargs_dict.get("transformation") 

262 if isinstance(trafo_opts, dict): 

263 kwargs_dict["transformation"] = trafo_opts.get(dh_type) 

264 return kwargs_dict 

265 

266 @classmethod 

267 def transformation(cls, set_stations, tmp_path=None, dh_transformation=None, **kwargs): 

268 

269 # sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls._requirements if k in kwargs} 

270 if "transformation" not in kwargs.keys(): 

271 return 

272 

273 if dh_transformation is None: 

274 dh_transformation = (cls.data_handler_transformation, cls.data_handler_unfiltered) 

275 elif not isinstance(dh_transformation, tuple): 

276 dh_transformation = (dh_transformation, dh_transformation) 

277 transformation_filtered = super().transformation(set_stations, tmp_path=tmp_path, 

278 dh_transformation=dh_transformation[0], **kwargs) 

279 if kwargs.get("filter_add_unfiltered", False) is False: 

280 return transformation_filtered 

281 else: 

282 transformation_unfiltered = super().transformation(set_stations, tmp_path=tmp_path, 

283 dh_transformation=dh_transformation[1], **kwargs) 

284 return {"filtered": transformation_filtered, "unfiltered": transformation_unfiltered} 

285 

286 

287class DataHandlerMixedSamplingWithClimateAndFirFilter(DataHandlerMixedSamplingWithClimateFirFilter): 

288 # data_handler = DataHandlerMixedSamplingWithClimateFirFilterSingleStation 

289 # data_handler_transformation = DataHandlerMixedSamplingWithClimateFirFilterSingleStation 

290 # data_handler_unfiltered = DataHandlerMixedSamplingSingleStation 

291 # _requirements = list(set(data_handler.requirements() + data_handler_unfiltered.requirements())) 

292 # DEFAULT_FILTER_ADD_UNFILTERED = False 

293 data_handler_climate_fir = DataHandlerMixedSamplingWithClimateFirFilterSingleStation 

294 data_handler_fir = (DataHandlerMixedSamplingWithFirFilterSingleStation, 

295 DataHandlerMixedSamplingWithClimateFirFilterSingleStation) 

296 data_handler_fir_pos = None 

297 data_handler = None 

298 data_handler_unfiltered = DataHandlerMixedSamplingSingleStation 

299 _requirements = list(set(data_handler_climate_fir.requirements() + data_handler_fir[0].requirements() + 

300 data_handler_fir[1].requirements() + data_handler_unfiltered.requirements())) 

301 chem_indicator = "chem" 

302 meteo_indicator = "meteo" 

303 

304 def __init__(self, data_handler_class_chem, data_handler_class_meteo, data_handler_class_chem_unfiltered, 

305 data_handler_class_meteo_unfiltered, chem_vars, meteo_vars, *args, **kwargs): 

306 

307 if len(chem_vars) > 0: 

308 id_class, id_class_unfiltered = data_handler_class_chem, data_handler_class_chem_unfiltered 

309 self.id_class_other = data_handler_class_meteo 

310 self.id_class_other_unfiltered = data_handler_class_meteo_unfiltered 

311 else: 

312 id_class, id_class_unfiltered = data_handler_class_meteo, data_handler_class_meteo_unfiltered 

313 self.id_class_other = data_handler_class_chem 

314 self.id_class_other_unfiltered = data_handler_class_chem_unfiltered 

315 super().__init__(id_class, *args, data_handler_class_unfiltered=id_class_unfiltered, **kwargs) 

316 

317 @classmethod 

318 def _split_chem_and_meteo_variables(cls, **kwargs): 

319 """ 

320 Select all used variables and split them into categories chem and other. 

321 

322 Chemical variables are indicated by `cls.data_handler_climate_fir.chem_vars`. To indicate used variables, this 

323 method uses 1) parameter `variables`, 2) keys from `statistics_per_var`, 3) keys from 

324 `cls.data_handler_climate_fir.DEFAULT_VAR_ALL_DICT`. Option 3) is also applied if 1) or 2) are given but None. 

325 """ 

326 if "variables" in kwargs: 

327 variables = kwargs.get("variables") 

328 elif "statistics_per_var" in kwargs: 

329 variables = kwargs.get("statistics_per_var").keys() 

330 else: 

331 variables = None 

332 if variables is None: 

333 variables = cls.data_handler_climate_fir.DEFAULT_VAR_ALL_DICT.keys() 

334 chem_vars = cls.data_handler_climate_fir.chem_vars 

335 chem = set(variables).intersection(chem_vars) 

336 meteo = set(variables).difference(chem_vars) 

337 return sort_like(to_list(chem), variables), sort_like(to_list(meteo), variables) 

338 

339 @classmethod 

340 def build(cls, station: str, **kwargs): 

341 chem_vars, meteo_vars = cls._split_chem_and_meteo_variables(**kwargs) 

342 filter_add_unfiltered = kwargs.get("filter_add_unfiltered", False) 

343 sp_chem, sp_chem_unfiltered = None, None 

344 sp_meteo, sp_meteo_unfiltered = None, None 

345 

346 if len(chem_vars) > 0: 

347 sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_climate_fir.requirements() if k in kwargs} 

348 sp_keys = cls.build_update_transformation(sp_keys, dh_type="filtered_chem") 

349 

350 cls.prepare_build(sp_keys, chem_vars, cls.chem_indicator) 

351 sp_chem = cls.data_handler_climate_fir(station, **sp_keys) 

352 if filter_add_unfiltered is True: 

353 sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_unfiltered.requirements() if k in kwargs} 

354 sp_keys = cls.build_update_transformation(sp_keys, dh_type="unfiltered_chem") 

355 cls.prepare_build(sp_keys, chem_vars, cls.chem_indicator) 

356 cls.correct_overwrite_option(sp_keys) 

357 sp_chem_unfiltered = cls.data_handler_unfiltered(station, **sp_keys) 

358 if len(meteo_vars) > 0: 

359 cls.set_data_handler_fir_pos(**kwargs) 

360 sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_fir[cls.data_handler_fir_pos].requirements() if k in kwargs} 

361 sp_keys = cls.build_update_transformation(sp_keys, dh_type="filtered_meteo") 

362 cls.prepare_build(sp_keys, meteo_vars, cls.meteo_indicator) 

363 sp_meteo = cls.data_handler_fir[cls.data_handler_fir_pos](station, **sp_keys) 

364 if filter_add_unfiltered is True: 

365 sp_keys = {k: copy.deepcopy(kwargs[k]) for k in cls.data_handler_unfiltered.requirements() if k in kwargs} 

366 sp_keys = cls.build_update_transformation(sp_keys, dh_type="unfiltered_meteo") 

367 cls.prepare_build(sp_keys, meteo_vars, cls.meteo_indicator) 

368 cls.correct_overwrite_option(sp_keys) 

369 sp_meteo_unfiltered = cls.data_handler_unfiltered(station, **sp_keys) 

370 

371 dp_args = {k: copy.deepcopy(kwargs[k]) for k in cls.own_args("id_class") if k in kwargs} 

372 return cls(sp_chem, sp_meteo, sp_chem_unfiltered, sp_meteo_unfiltered, chem_vars, meteo_vars, **dp_args) 

373 

374 @classmethod 

375 def correct_overwrite_option(cls, kwargs): 

376 """Set `overwrite_local_data=False`.""" 

377 if "overwrite_local_data" in kwargs: 

378 kwargs["overwrite_local_data"] = False 

379 

380 @classmethod 

381 def set_data_handler_fir_pos(cls, **kwargs): 

382 """ 

383 Set position of fir data handler to use either faster FIR version or slower climate FIR. 

384 

385 This method will set data handler indicator to 0 if either no parameter "extend_length_opts" is given or the 

386 parameter is of type dict but has no entry for the meteo_indicator. In all other cases, indicator is set to 1. 

387 """ 

388 p_name = "extend_length_opts" 

389 if cls.data_handler_fir_pos is None: 

390 if p_name in kwargs: 

391 if isinstance(kwargs[p_name], dict) and cls.meteo_indicator not in kwargs[p_name].keys(): 

392 cls.data_handler_fir_pos = 0 # use faster fir version without climate estimate 

393 else: 

394 cls.data_handler_fir_pos = 1 # use slower fir version with climate estimate 

395 else: 

396 cls.data_handler_fir_pos = 0 # use faster fir version without climate estimate 

397 

398 @classmethod 

399 def prepare_build(cls, kwargs, var_list, var_type): 

400 """ 

401 Prepares for build of class. 

402 

403 `variables` parameter is updated by `var_list`, which should only include variables of a specific type (e.g. 

404 only chemical variables) indicated by `var_type`. Furthermore, this method cleans the `kwargs` dictionary as 

405 follows: For all parameters provided as dict to separate between chem and meteo options (dict must have keys 

406 from `cls.chem_indicator` and/or `cls.meteo_indicator`), this parameter is removed from kwargs and its value 

407 related to `var_type` added again. In case there is no value for given `var_type`, the parameter is not added 

408 at all (as this parameter is assumed to affect only other types of variables). 

409 """ 

410 kwargs.update({"variables": var_list}) 

411 for k in list(kwargs.keys()): 

412 v = kwargs[k] 

413 if isinstance(v, dict): 

414 if len(set(v.keys()).intersection({cls.chem_indicator, cls.meteo_indicator})) > 0: 

415 try: 

416 new_v = kwargs.pop(k) 

417 kwargs[k] = new_v[var_type] 

418 except KeyError: 

419 pass 

420 

421 def _create_collection(self): 

422 collection = super()._create_collection() 

423 if self.id_class_other is not None: 

424 collection.append(self.id_class_other) 

425 if self.filter_add_unfiltered is True and self.id_class_other_unfiltered is not None: 

426 collection.append(self.id_class_other_unfiltered) 

427 return collection 

428 

429 @classmethod 

430 def transformation(cls, set_stations, tmp_path=None, **kwargs): 

431 

432 if "transformation" not in kwargs.keys(): 

433 return 

434 

435 chem_vars, meteo_vars = cls._split_chem_and_meteo_variables(**kwargs) 

436 transformation_chem, transformation_meteo = None, None 

437 # chem transformation 

438 if len(chem_vars) > 0: 

439 kwargs_chem = copy.deepcopy(kwargs) 

440 cls.prepare_build(kwargs_chem, chem_vars, cls.chem_indicator) 

441 dh_transformation = (cls.data_handler_climate_fir, cls.data_handler_unfiltered) 

442 transformation_chem = super().transformation(set_stations, tmp_path=tmp_path, 

443 dh_transformation=dh_transformation, **kwargs_chem) 

444 

445 # meteo transformation 

446 if len(meteo_vars) > 0: 

447 cls.set_data_handler_fir_pos(**kwargs) 

448 kwargs_meteo = copy.deepcopy(kwargs) 

449 cls.prepare_build(kwargs_meteo, meteo_vars, cls.meteo_indicator) 

450 dh_transformation = (cls.data_handler_fir[cls.data_handler_fir_pos], cls.data_handler_unfiltered) 

451 transformation_meteo = super().transformation(set_stations, tmp_path=tmp_path, 

452 dh_transformation=dh_transformation, **kwargs_meteo) 

453 

454 # combine all transformations 

455 transformation_res = {} 

456 if transformation_chem is not None: 

457 if isinstance(transformation_chem, dict): 

458 if len(transformation_chem) > 0: 

459 transformation_res["filtered_chem"] = transformation_chem.pop("filtered") 

460 transformation_res["unfiltered_chem"] = transformation_chem.pop("unfiltered") 

461 else: # if no unfiltered chem branch 

462 transformation_res["filtered_chem"] = transformation_chem 

463 if transformation_meteo is not None: 

464 if isinstance(transformation_meteo, dict): 

465 if len(transformation_meteo) > 0: 

466 transformation_res["filtered_meteo"] = transformation_meteo.pop("filtered") 

467 transformation_res["unfiltered_meteo"] = transformation_meteo.pop("unfiltered") 

468 else: # if no unfiltered meteo branch 

469 transformation_res["filtered_meteo"] = transformation_meteo 

470 return transformation_res if len(transformation_res) > 0 else None 

471 

472 

473class DataHandlerIFSSingleStation(DataHandlerMixedSamplingWithClimateFirFilterSingleStation): 

474 

475 def load_and_interpolate(self, ind) -> [xr.DataArray, pd.DataFrame]: 

476 start, end = self.update_start_end(ind) 

477 vars = [self.variables, self.target_var] 

478 stats_per_var = helpers.select_from_dict(self.statistics_per_var, vars[ind]) 

479 data_origin = helpers.select_from_dict(self.data_origin, vars[ind]) 

480 

481 data, self.meta = self.load_data(self.path[ind], self.station, stats_per_var, self.sampling[ind], 

482 self.store_data_locally, data_origin, start, end) 

483 if ind == 1: # only for target 

484 data = self.interpolate(data, dim=self.time_dim, method=self.interpolation_method[ind], 

485 limit=self.interpolation_limit[ind], sampling=self.sampling[ind]) 

486 return data 

487 

488 def make_input_target(self): 

489 """ 

490 A FIR filter is applied on the input data that has hourly resolution. Labels Y are provided as aggregated values 

491 with daily resolution. 

492 """ 

493 self._data = tuple(map(self.load_and_interpolate, [0, 1])) # load input (0) and target (1) data 

494 self.set_inputs_and_targets() 

495 self.apply_filter() 

496 

497 

498class DataHandlerIFS(DataHandlerMixedSamplingWithClimateAndFirFilter): 

499 

500 data_handler_fir = (DataHandlerMixedSamplingWithFirFilterSingleStation, 

501 DataHandlerIFSSingleStation) 

502 

503 @classmethod 

504 def set_data_handler_fir_pos(cls, **kwargs): 

505 """ 

506 Set position of fir data handler to always use climate FIR. 

507 """ 

508 cls.data_handler_fir_pos = 1 # use slower fir version with climate estimate