Coverage for mlair/helpers/filter.py: 45%

574 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-06-30 10:40 +0000

1import gc 

2import warnings 

3from typing import Union, Callable, Tuple, Dict, Any 

4import logging 

5 

6import datetime 

7import numpy as np 

8import pandas as pd 

9from matplotlib import pyplot as plt 

10from scipy import signal, stats 

11import xarray as xr 

12import dask.array as da 

13 

14from mlair.helpers import to_list, TimeTrackingWrapper, TimeTracking 

15 

16 

17class FIRFilter: 

18 from mlair.plotting.data_insight_plotting import PlotFirFilter 

19 

20 def __init__(self, data, fs, order, cutoff, window, var_dim, time_dim, display_name=None, minimum_length=None, 

21 extend_end=0, plot_path=None, plot_dates=None, offset=0): 

22 self._filtered = [] 

23 self._h = [] 

24 self.data = data 

25 self.fs = fs 

26 self.order = order 

27 self.cutoff = cutoff 

28 self.window = window 

29 self.var_dim = var_dim 

30 self.time_dim = time_dim 

31 self.display_name = display_name 

32 self.minimum_length = minimum_length 

33 self.extend_end = extend_end 

34 self.offset = offset 

35 self.plot_path = plot_path 

36 self.plot_dates = plot_dates 

37 self.run() 

38 

39 def run(self): 

40 logging.info(f"{self.display_name}: start {self.__class__.__name__}") 

41 filtered = [] 

42 h = [] 

43 input_data = self.data.__deepcopy__() 

44 

45 # collect some data for visualization 

46 if self.plot_dates is None: 

47 plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * self.fs 

48 self.plot_dates = [input_data.isel({self.time_dim: int(pos)}).coords[self.time_dim].values 

49 for pos in plot_pos if pos < len(input_data.coords[self.time_dim])] 

50 plot_data = [] 

51 

52 for i in range(len(self.order)): 

53 # apply filter 

54 fi, hi = self.fir_filter(input_data, self.fs, self.cutoff[i], self.order[i], time_dim=self.time_dim, 

55 var_dim=self.var_dim, window=self.window, display_name=self.display_name) 

56 filtered.append(fi) 

57 h.append(hi) 

58 

59 # visualization 

60 plot_data.append(self.create_visualization(fi, input_data, self.plot_dates, self.time_dim, self.fs, hi, 

61 self.minimum_length, self.order, i, self.extend_end, self.var_dim)) 

62 # calculate residuum 

63 input_data = input_data - fi 

64 

65 # add last residuum to filtered 

66 filtered.append(input_data) 

67 

68 self._filtered = filtered 

69 self._h = h 

70 

71 # visualize 

72 if self.plot_path is not None: 

73 try: 

74 self.PlotFirFilter(self.plot_path, plot_data, self.display_name) # not working when t0 != 0 

75 except Exception as e: 

76 logging.info(f"Could not plot climate fir filter due to following reason:\n{e}") 

77 

78 def create_visualization(self, filtered, filter_input_data, plot_dates, time_dim, sampling, 

79 h, minimum_length, order, i, extend_end, var_dim): # pragma: no cover 

80 plot_data = [] 

81 minimum_length = minimum_length or 0 

82 for viz_date in set(plot_dates).intersection(filtered.coords[time_dim].values): 

83 try: 

84 if i < len(order) - 1: 

85 minimum_length += order[i+1] 

86 

87 td_type = {1: "D", 24: "h"}.get(sampling) 

88 length = len(h) 

89 extend_length_history = minimum_length + int((length + 1) / 2) 

90 extend_length_future = int((length + 1) / 2) + 1 

91 t_minus = viz_date + np.timedelta64(int(-extend_length_history), td_type) 

92 t_plus = viz_date + np.timedelta64(int(extend_length_future + extend_end), td_type) 

93 time_slice = slice(t_minus, t_plus - np.timedelta64(1, td_type)) 

94 plot_data.append({"t0": viz_date, "filter_input": filter_input_data.sel({time_dim: time_slice}), 

95 "filtered": filtered.sel({time_dim: time_slice}), "h": h, "time_dim": time_dim, 

96 "var_dim": var_dim}) 

97 except: 

98 pass 

99 return plot_data 

100 

101 @property 

102 def filter_coefficients(self): 

103 return self._h 

104 

105 @property 

106 def filtered_data(self): 

107 return self._filtered 

108 

109 @TimeTrackingWrapper 

110 def fir_filter(self, data, fs, cutoff_high, order, sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", 

111 minimum_length=None, new_dim="window", plot_dates=None, display_name=None): 

112 

113 # calculate FIR filter coefficients 

114 h = self._calculate_filter_coefficients(window, order, cutoff_high, fs) 

115 

116 coll = [] 

117 for var in data.coords[var_dim]: 

118 d = data.sel({var_dim: var}) 

119 filt = xr.apply_ufunc(fir_filter_convolve, d, 

120 input_core_dims=[[time_dim]], output_core_dims=[[time_dim]], 

121 vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype]) 

122 # trim data to valid range 

123 ext_len = int((len(h) + 1) / 2) 

124 valid_range = filt.coords[time_dim].values[ext_len:-ext_len] 

125 trimmed = filt.sel(**{time_dim: valid_range}) 

126 coll.append(trimmed) 

127 filtered = xr.concat(coll, var_dim) 

128 

129 # create result array with same shape like input data, gaps are filled by nans 

130 filtered = self._create_full_filter_result_array(data, filtered, time_dim, display_name) 

131 return filtered, h 

132 

133 @staticmethod 

134 def _calculate_filter_coefficients(window: Union[str, tuple], order: Union[int, tuple], cutoff_high: float, 

135 fs: float) -> np.array: 

136 """ 

137 Calculate filter coefficients for moving window using scipy's signal package for common filter types and local 

138 method firwin_kzf for Kolmogorov Zurbenko filter (kzf). The filter is a low-pass filter. 

139 

140 :param window: name of the window type which is either a string with the window's name or a tuple containing the 

141 name but also some parameters (e.g. `("kaiser", 5)`) 

142 :param order: order of the filter to create as int or parameters m and k of kzf 

143 :param cutoff_high: cutoff frequency to use for low-pass filter in frequency of fs 

144 :param fs: sampling frequency of time series 

145 """ 

146 if window == "kzf": 

147 h = firwin_kzf(*order) 

148 else: 

149 h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) 

150 return h 

151 

152 @staticmethod 

153 def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str, 

154 display_name: str = None) -> xr.DataArray: 

155 """ 

156 Create result filter array with same shape line given template data (should be the original input data before 

157 filtering the data). All gaps are filled by nans. 

158 

159 :param template_array: this array is used as template for shape and ordering of dims 

160 :param result_array: array with data that are filled into template 

161 :param new_dim: new dimension which is shifted/appended to/at the end (if present or not) 

162 :param display_name: string that is attached to logging (default None) 

163 """ 

164 logging.debug(f"{display_name}: create res_full") 

165 new_coords = {**{k: template_array.coords[k].values for k in template_array.coords if k != new_dim}, 

166 new_dim: result_array.coords[new_dim]} 

167 dims = [*template_array.dims, new_dim] if new_dim not in template_array.dims else template_array.dims 

168 result_array = result_array.transpose(*dims) 

169 return result_array.broadcast_like(xr.DataArray(dims=dims, coords=new_coords)) 

170 

171 

172class ClimateFIRFilter(FIRFilter): 

173 from mlair.plotting.data_insight_plotting import PlotClimateFirFilter 

174 

175 def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None, 

176 apriori_diurnal=False, sel_opts=None, plot_path=None, 

177 minimum_length=None, new_dim=None, display_name=None, extend_length_opts: int = 0, 

178 extend_end: Union[dict, int] = 0, plot_dates=None, offset: int = 0): 

179 """ 

180 :param data: data to filter 

181 :param fs: sampling frequency in 1/days -> 1d: fs=1 -> 1H: fs=24 

182 :param order: a tuple with the order of the filter in same ordering like cutoff 

183 :param cutoff: a tuple with the cutoff frequencies (all are applied as low pass) 

184 :param window: window type of the filter (e.g. hamming) 

185 :param time_dim: name of time dimension to apply filter along 

186 :param var_dim: name of variables dimension 

187 :param apriori: apriori information to use for the first low pass. If None, climatology is calculated on the 

188 provided data. 

189 :param apriori_type: type of apriori information to use. Climatology will be used always for first low pass. For 

190 the residuum either the value zero is used (apriori_type is None or "zeros") or a climatology on the 

191 residua is used ("residuum_stats"). 

192 :param apriori_diurnal: Use diurnal cycle as additional apriori information (only applicable for hourly 

193 resoluted data). The mean anomaly of each hour is added to the apriori_type information. 

194 :param sel_opts: specify some parameters to select a subset of data before calculating the apriori information. 

195 Use this parameter for example, if apriori shall only calculated on a shorter time period than available in 

196 given data. 

197 :param extend_length_opts: shift information switch between historical data and apriori estimation by the given 

198 values (default None). Must either be a dictionary with keys available in var_dim or a single value that is 

199 applied to all data. This parameter has only influence on which information is available at t0 for the 

200 filter calculation but has no influence on the shape of the returned filtered data. 

201 :param extend_end: This parameter indicates the number of time steps with ti>t0 to return of the filtered data. 

202 In case the extend_end parameter is larger than the extend_length_opts parameter, this leads to the case 

203 that not only observational data but also climatological estimations are returned. Must either be a 

204 dictionary with keys available in var_dim or a single value that is applied to all data. Default is 0. 

205 :param offset: Shift t0 by this timestep delta. Default is 0. 

206 """ 

207 self._apriori = apriori 

208 self.apriori_type = apriori_type 

209 self.apriori_diurnal = apriori_diurnal 

210 self._apriori_list = [] 

211 self.sel_opts = sel_opts 

212 self.new_dim = new_dim 

213 self.plot_data = [] 

214 self.extend_length_opts = extend_length_opts 

215 super().__init__(data, fs, order, cutoff, window, var_dim, time_dim, display_name=display_name, 

216 minimum_length=minimum_length, plot_path=plot_path, extend_end=extend_end, plot_dates=plot_dates, offset=offset) 

217 

218 def run(self): 

219 filtered = [] 

220 h = [] 

221 forecasts = None 

222 if self.sel_opts is not None: 

223 self.sel_opts = self.sel_opts if isinstance(self.sel_opts, dict) else {self.time_dim: self.sel_opts} 

224 self._check_sel_opts() 

225 sampling = {1: "1d", 24: "1H"}.get(int(self.fs)) 

226 

227 if self.new_dim in self.data.coords: 

228 pseudo_timeseries = self.create_pseudo_timeseries(self.data, self.time_dim, sampling, self.new_dim) 

229 forecasts = self.data.__deepcopy__() 

230 self.data = pseudo_timeseries 

231 

232 logging.debug(f"{self.display_name}: create diurnal_anomalies") 

233 if self.apriori_diurnal is True and sampling == "1H": 

234 diurnal_anomalies = self.create_seasonal_hourly_mean(self.data, self.time_dim, sel_opts=self.sel_opts, 

235 sampling=sampling, as_anomaly=True) 

236 else: 

237 diurnal_anomalies = 0 

238 logging.debug(f"{self.display_name}: create monthly apriori") 

239 if self._apriori is None: 

240 self._apriori = self.create_monthly_mean(self.data, self.time_dim, sel_opts=self.sel_opts, 

241 sampling=sampling) + diurnal_anomalies 

242 logging.debug(f"{self.display_name}: apriori shape = {self._apriori.shape}") 

243 apriori_list = to_list(self._apriori) 

244 input_data = self.data.__deepcopy__() 

245 

246 # for visualization 

247 plot_dates = self.plot_dates 

248 

249 # create tmp dimension to apply filter, search for unused name 

250 new_dim = self._create_tmp_dimension(input_data) if self.new_dim is None else self.new_dim 

251 

252 for i in range(len(self.order)): 

253 logging.info(f"{self.display_name}: start filter for order {self.order[i]}") 

254 # calculate climatological filter 

255 next_order = self._next_order(self.order, 0, i, self.window) 

256 fi, input_data, hi, apriori, plot_data = self.clim_filter(input_data, self.fs, self.cutoff[i], self.order[i], 

257 apriori=apriori_list[i], sel_opts=self.sel_opts, 

258 sampling=sampling, time_dim=self.time_dim, 

259 window=self.window, var_dim=self.var_dim, 

260 minimum_length=self.minimum_length, new_dim=new_dim, 

261 plot_dates=plot_dates, display_name=self.display_name, 

262 extend_opts=self.extend_length_opts, 

263 extend_end=self.extend_end, next_order=next_order, 

264 forecasts=forecasts, offset=self.offset) 

265 

266 logging.info(f"{self.display_name}: finished clim_filter calculation") 

267 if self.minimum_length is None: 

268 filtered.append(fi.sel({new_dim: slice(None, self.extend_end)})) 

269 else: 

270 filtered.append(fi.sel({new_dim: slice(self.extend_end - self.minimum_length, self.extend_end)})) 

271 h.append(hi) 

272 gc.collect() 

273 forecasts = None # reset/disable forecasts after first filter iteration 

274 self.plot_data.append(plot_data) 

275 plot_dates = {e["t0"] for e in plot_data} 

276 

277 # calculate residuum 

278 logging.info(f"{self.display_name}: calculate residuum") 

279 coord_range = range(int(fi.coords[new_dim].values.min()), int(fi.coords[new_dim].values.max() + 1)) 

280 if new_dim in input_data.coords: 

281 input_data = input_data.sel({new_dim: coord_range}) - fi 

282 else: 

283 input_data = self._shift_data(input_data, coord_range, self.time_dim, new_dim) - fi 

284 

285 # create new apriori information for next iteration if no further apriori is provided 

286 if len(apriori_list) < len(self.order): 

287 logging.info(f"{self.display_name}: create diurnal_anomalies") 

288 if self.apriori_diurnal is True and sampling == "1H": 

289 diurnal_anomalies = self.create_seasonal_hourly_mean(input_data.sel({new_dim: 0}, drop=True), 

290 self.time_dim, sel_opts=self.sel_opts, 

291 sampling=sampling, as_anomaly=True) 

292 else: 

293 diurnal_anomalies = 0 

294 logging.info(f"{self.display_name}: create monthly apriori") 

295 if self.apriori_type is None or self.apriori_type == "zeros": # zero version 

296 apriori_list.append(xr.zeros_like(apriori_list[i]) + diurnal_anomalies) 

297 elif self.apriori_type == "residuum_stats": # calculate monthly statistic on residuum 

298 apriori_list.append( 

299 -self.create_monthly_mean(input_data.sel({new_dim: 0}, drop=True), self.time_dim, 

300 sel_opts=self.sel_opts, sampling=sampling) + diurnal_anomalies) 

301 else: 

302 raise ValueError(f"Cannot handle unkown apriori type: {self.apriori_type}. Please choose from None," 

303 f" `zeros` or `residuum_stats`.") 

304 

305 # add last residuum to filtered 

306 if self.minimum_length is None: 

307 filtered.append(input_data.sel({new_dim: slice(None, self.extend_end)})) 

308 else: 

309 filtered.append(input_data.sel({new_dim: slice(self.extend_end - self.minimum_length, self.extend_end)})) 

310 

311 self._filtered = filtered 

312 self._h = h 

313 self._apriori_list = apriori_list 

314 

315 # visualize 

316 if self.plot_path is not None: 

317 try: 

318 self.PlotClimateFirFilter(self.plot_path, self.plot_data, sampling, self.display_name) 

319 except Exception as e: 

320 logging.info(f"Could not plot climate fir filter due to following reason:\n{e}") 

321 

322 def _check_sel_opts(self): 

323 if len(self.data.sel(**self.sel_opts).coords[self.time_dim]) == 0: 

324 raise ValueError(f"Abort {self.__class__.__name__} as no data is available after applying sel_opts to data") 

325 

326 @staticmethod 

327 def _next_order(order: list, minimum_length: Union[int, None], pos: int, window: Union[str, tuple]) -> int: 

328 next_order = 0 

329 if pos + 1 < len(order): 

330 next_order = order[pos + 1] 

331 if window == "kzf" and isinstance(next_order, tuple): 

332 next_order = filter_width_kzf(*next_order) 

333 if minimum_length is not None: 

334 next_order = next_order + minimum_length 

335 return next_order 

336 

337 @staticmethod 

338 def create_monthly_unity_array(data: xr.DataArray, time_dim: str, extend_range: int = 366) -> xr.DataArray: 

339 """ 

340 Create a xarray data array filled with ones with monthly resolution (set on 16th of month). Data is extended by 

341 extend_range days in future and past along time_dim. 

342 

343 :param data: data to create monthly unity array from, must contain dimension time_dim 

344 :param time_dim: name of temporal dimension 

345 :param extend_range: number of days to extend data (default 366) 

346 :returns: xarray in monthly resolution (centered at 16th day of month) with all values equal to 1 

347 """ 

348 coords = data.coords 

349 

350 # extend time_dim by given extend_range days 

351 start = coords[time_dim][0].values.astype("datetime64[D]") - np.timedelta64(extend_range, "D") 

352 end = coords[time_dim][-1].values.astype("datetime64[D]") + np.timedelta64(extend_range, "D") 

353 new_time_axis = np.arange(start, end).astype("datetime64[ns]") 

354 

355 # construct data array with updated coords 

356 new_coords = {k: data.coords[k].values if k != time_dim else new_time_axis for k in coords} 

357 new_array = xr.DataArray(1, coords=new_coords, dims=new_coords.keys()).transpose(*data.dims) 

358 

359 # loffset is required because resampling uses last day in month as resampling timestamp 

360 return new_array.resample({time_dim: "1m"}, loffset=datetime.timedelta(days=-15)).max() 

361 

362 def create_monthly_mean(self, data: xr.DataArray, time_dim: str, sel_opts: dict = None, sampling: str = "1d") \ 

363 -> xr.DataArray: 

364 """ 

365 Calculate monthly means (12 values) and return a data array with same resolution as given data containing these 

366 monthly mean values. Sampling points are the 16th of each month (this value is equal to the true monthly mean) 

367 and all other values between two points are interpolated linearly. It is possible to apply some pre-selection 

368 to use only a subset of given data using the sel_opts parameter. Only data from this subset are used to 

369 calculate the monthly statistic. 

370 

371 :param data: data to apply statistical calculation on 

372 :param time_dim: name of temporal axis 

373 :param sel_opts: selection options as dict to select a subset of data (default None). A given sel_opts with 

374 `sel_opts={<time_dim>: "2006"}` forces the method e.g. to derive the monthly means only from data of the 

375 year 2006. 

376 :param sampling: sampling of the returned data (default 1d) 

377 :returns: array in desired resolution containing interpolated monthly values. Months with no valid data are 

378 returned as np.nan which also effects data in the neighbouring months (before / after sampling points which 

379 are the 16th of each month). 

380 """ 

381 

382 # create unity xarray in monthly resolution with sampling point in mid of each month 

383 monthly = self.create_monthly_unity_array(data, time_dim) * np.nan 

384 

385 # apply selection if given (only use subset for monthly means) 

386 if sel_opts is not None: 

387 data = data.sel(**sel_opts) 

388 

389 # create monthly mean and replace entries in unity array 

390 monthly_mean = data.groupby(f"{time_dim}.month").mean() 

391 for month in monthly_mean.month.values: 

392 monthly = xr.where((monthly[f"{time_dim}.month"] == month), 

393 monthly_mean.sel(month=month, drop=True), 

394 monthly) 

395 # transform monthly information into original sampling rate 

396 return monthly.dropna(dim=time_dim).resample({time_dim: sampling}).interpolate() 

397 

398 @staticmethod 

399 def _compute_hourly_mean_per_month(data: xr.DataArray, time_dim: str, as_anomaly: bool) -> Dict[int, xr.DataArray]: 

400 """ 

401 Calculate for each hour in each month a separate mean value (12 x 24 values in total). Average is either the 

402 anomaly of a monthly mean state or the raw mean value. 

403 

404 :param data: data to calculate averages on 

405 :param time_dim: name of temporal dimension 

406 :param as_anomaly: indicates whether to calculate means as anomaly of a monthly mean or as raw mean values. 

407 :returns: dictionary containing 12 months each with a 24-valued array (1 entry for each hour) 

408 """ 

409 seasonal_hourly_means = {} 

410 for month in data.groupby(f"{time_dim}.month").groups.keys(): 

411 single_month_data = data.sel({time_dim: (data[f"{time_dim}.month"] == month)}) 

412 hourly_mean = single_month_data.groupby(f"{time_dim}.hour").mean() 

413 if as_anomaly is True: 

414 hourly_mean = hourly_mean - hourly_mean.mean("hour") 

415 seasonal_hourly_means[month] = hourly_mean 

416 return seasonal_hourly_means 

417 

418 @staticmethod 

419 def _create_seasonal_cycle_of_single_hour_mean(result_arr: xr.DataArray, means: Dict[int, xr.DataArray], hour: int, 

420 time_dim: str, sampling: str) -> xr.DataArray: 

421 """ 

422 Use monthly means of a given hour to create an array with interpolated values at the indicated hour for each day 

423 of the full time span indicated by given result_arr. 

424 

425 :param result_arr: template array indicating the full time range and additional dimensions to keep 

426 :param means: dictionary containing 24 hourly averages for each month (12 x 24 values in total) 

427 :param hour: integer of hour of interest 

428 :param time_dim: name of temporal dimension 

429 :param sampling: sampling rate to interpolate 

430 :returns: array with interpolated averages in sampling resolution containing only values for hour of interest 

431 """ 

432 h_coll = xr.ones_like(result_arr) * np.nan 

433 for month in means.keys(): 

434 hourly_mean_single_month = means[month].sel(hour=hour, drop=True) 

435 h_coll = xr.where((h_coll[f"{time_dim}.month"] == month), hourly_mean_single_month, h_coll) 

436 h_coll = h_coll.dropna(time_dim).resample({time_dim: sampling}).interpolate() 

437 h_coll = h_coll.sel({time_dim: (h_coll[f"{time_dim}.hour"] == hour)}) 

438 return h_coll 

439 

440 def create_seasonal_hourly_mean(self, data: xr.DataArray, time_dim: str, sel_opts: Dict[str, Any] = None, 

441 sampling: str = "1H", as_anomaly: bool = True) -> xr.DataArray: 

442 """ 

443 Compute climatological statistics on hourly base either as raw data or anomalies. For each month, an overall 

444 mean value (only used if requiring anomalies) and the mean of each hour are calculated. The climatological 

445 diurnal cycle is positioned on the 16th of each month and interpolated in between by using a distinct 

446 interpolation for each hour of day. The returned array therefore contains data with a yearly cycle (if anomaly 

447 is not calculated) or data without a yearly cycle (if using anomalies). In both cases, the data have an 

448 amplitude that varies over the year. 

449 

450 :param data: data to apply this method to 

451 :param time_dim: name of temporal axis 

452 :param sel_opts: specific selection options that are applied before calculation of climatological statistics 

453 (default None) 

454 :param sampling: temporal resolution of data (default "1H") 

455 :param as_anomaly: specify whether to use anomalies or raw data including a seasonal cycle of the mean value 

456 (default: True) 

457 :returns: climatological statistics for given data interpolated with given sampling rate 

458 """ 

459 

460 """Calculate hourly statistics. Either the absolute value or the anomaly (as_anomaly=True).""" 

461 # can only be used for hourly sampling rate 

462 assert sampling == "1H" 

463 

464 # apply selection if given (only use subset for seasonal hourly means) 

465 if sel_opts is not None: 

466 data = data.sel(**sel_opts) 

467 

468 # create unity xarray in monthly resolution with sampling point in mid of each month 

469 monthly = self.create_monthly_unity_array(data, time_dim) * np.nan 

470 

471 # calculate for each hour in each month a separate mean value 

472 seasonal_hourly_means = self._compute_hourly_mean_per_month(data, time_dim, as_anomaly) 

473 

474 # create seasonal cycles of these hourly averages 

475 seasonal_coll = [] 

476 for hour in data.groupby(f"{time_dim}.hour").groups.keys(): 

477 mean_single_hour = self._create_seasonal_cycle_of_single_hour_mean(monthly, seasonal_hourly_means, hour, 

478 time_dim, sampling) 

479 seasonal_coll.append(mean_single_hour) 

480 

481 # combine all cycles in a common data array 

482 hourly = xr.concat(seasonal_coll, time_dim).sortby(time_dim).resample({time_dim: sampling}).interpolate() 

483 return hourly 

484 

485 @staticmethod 

486 def extend_apriori(data: xr.DataArray, apriori: xr.DataArray, time_dim: str, sampling: str = "1d", 

487 display_name: str = None) -> xr.DataArray: 

488 """ 

489 Extend time range of apriori information to span a longer period as data (or at least of equal length). This 

490 method may not working properly if length of apriori contains data from less then one year. 

491 

492 :param data: data to get time range of which apriori should span in minimum 

493 :param apriori: data that is adjusted. It is assumed that this data varies in the course of the year but is same 

494 for the same day in different years. Otherwise this method will introduce some unintended artefacts in the 

495 apriori data. 

496 :param time_dim: name of temporal dimension 

497 :param sampling: sampling of data (e.g. "1m", "1d", default "1d") 

498 :param display_name: name to use for logging message (default None) 

499 :returns: array which adjusted temporal coverage derived from apriori 

500 """ 

501 dates = data.coords[time_dim].values 

502 td_type = {"1d": "D", "1H": "h"}.get(sampling) 

503 

504 # apriori starts after data 

505 if dates[0] < apriori.coords[time_dim].values[0]: 

506 logging.debug(f"{display_name}: apriori starts after data") 

507 

508 # add difference in full years 

509 date_diff = abs(dates[0] - apriori.coords[time_dim].values[0]).astype("timedelta64[D]") 

510 extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 

511 factor = 1 if td_type == "D" else 24 

512 

513 # get fill data range 

514 start = apriori.coords[time_dim][0].values.astype("datetime64[%s]" % td_type) 

515 end = apriori.coords[time_dim][0].values.astype("datetime64[%s]" % td_type) + np.timedelta64( 

516 366 * factor + 1, td_type) 

517 

518 # fill year by year 

519 for i in range(365, extend_range + 365, 365): 

520 apriori_tmp = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date 

521 new_time_axis = apriori_tmp.coords[time_dim] - np.timedelta64(i * factor, td_type) 

522 apriori_tmp.coords[time_dim] = new_time_axis 

523 apriori = apriori.combine_first(apriori_tmp) 

524 

525 # apriori ends before data 

526 if dates[-1] + np.timedelta64(365, "D") > apriori.coords[time_dim].values[-1]: 

527 logging.debug(f"{display_name}: apriori ends before data") 

528 

529 # add difference in full years + 1 year (because apriori is used as future estimate) 

530 date_diff = abs(dates[-1] - apriori.coords[time_dim].values[-1]).astype("timedelta64[D]") 

531 extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 + 365 

532 factor = 1 if td_type == "D" else 24 

533 

534 # get fill data range 

535 start = apriori.coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) - np.timedelta64( 

536 366 * factor + 1, td_type) 

537 end = apriori.coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) 

538 

539 # fill year by year 

540 for i in range(365, extend_range + 365, 365): 

541 apriori_tmp = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date 

542 new_time_axis = apriori_tmp.coords[time_dim] + np.timedelta64(i * factor, td_type) 

543 apriori_tmp.coords[time_dim] = new_time_axis 

544 apriori = apriori.combine_first(apriori_tmp) 

545 

546 return apriori 

547 

548 @staticmethod 

549 def get_forecast_run_delta(data, time_dim): 

550 time_data = data.coords[time_dim].values 

551 deltas = (time_data[1:] - time_data[:-1]) / np.timedelta64(1, "h") 

552 return stats.mode(deltas).mode[0] 

553 

554 def combine_observation_and_apriori(self, data: xr.DataArray, apriori: xr.DataArray, time_dim: str, new_dim: str, 

555 extend_length_history: int, extend_length_future: int, 

556 extend_length_separator: int = 0, forecasts: xr.DataArray = None, 

557 sampling: str = "1H", extend_end: int = 0, offset: int = 0) -> xr.DataArray: 

558 """ 

559 Combine historical data / observations ("data") and climatological statistics ("apriori"). Historical data are 

560 used on interval [t0 - extend_length_history, t0] and apriori is used on [t0 + 1, t0 + extend_length_future]. If 

561 indicated by the extend_length_seperator, it is possible to shift end of history interval and start of apriori 

562 interval by given number of time steps. 

563 

564 :param data: historical data for past values, must contain dimensions time_dim and var_dim and might also have 

565 a new_dim dimension 

566 :param apriori: climatological estimate for future values, must contain dimensions time_dim and var_dim, but 

567 can also have dimension new_dim 

568 :param time_dim: name of temporal dimension 

569 :param new_dim: name of new dim on which data is combined along 

570 :param extend_length_history: number of time steps to use from data 

571 :param extend_length_future: number of time steps to use from apriori (minus 1) 

572 :param extend_length_separator: position of last history value to use (default 0), this position indicates the 

573 last value that is used from data (followed by values from apriori). In other words, end of history 

574 interval and start of apriori interval are shifted by this value from t0 (positive or negative). 

575 :returns: combined data array 

576 """ 

577 # prepare historical data / observation 

578 if forecasts is None: 578 ↛ 581line 578 didn't jump to line 581, because the condition on line 578 was never false

579 ext_sep = offset + min(extend_end, extend_length_future, extend_length_separator) 

580 else: 

581 ext_sep = min(offset, extend_length_future) 

582 if new_dim not in data.coords: 

583 history = self._shift_data(data, range(int(-extend_length_history), ext_sep + 1), 

584 time_dim, new_dim) 

585 else: 

586 history = data.sel({new_dim: slice(int(-extend_length_history), ext_sep)}) 

587 

588 if forecasts is not None: 588 ↛ 589line 588 didn't jump to line 589, because the condition on line 588 was never true

589 forecast_delta = self.get_forecast_run_delta(forecasts, time_dim) 

590 forecast_end = offset + min(extend_end, extend_length_separator) 

591 for lead_time in forecasts.coords[new_dim]: 

592 delta = np.timedelta64(int(lead_time - offset), {"1d": "D", "1H": "h"}.get(sampling)) 

593 forecasts_tmp = forecasts.sel({new_dim: slice(None, forecast_end + lead_time)}) 

594 forecasts_tmp.coords[time_dim] = forecasts_tmp.coords[time_dim] + delta 

595 forecasts_tmp.coords[new_dim] = forecasts_tmp.coords[new_dim] + offset - lead_time 

596 history = history.combine_first(forecasts_tmp) 

597 # history.plot() 

598 if lead_time >= forecast_delta - 1: # stop when all gaps are filled 

599 break 

600 

601 if extend_length_future > ext_sep + 1: 601 ↛ 614line 601 didn't jump to line 614, because the condition on line 601 was never false

602 # prepare climatological statistics 

603 if new_dim not in apriori.coords: 

604 future = self._shift_data(apriori, range(ext_sep + 1, 

605 offset + extend_length_future + 1), 

606 time_dim, new_dim) 

607 else: 

608 future = apriori.sel({new_dim: slice(ext_sep + 1, offset + extend_length_future)}) 

609 # combine historical data [t0-length,t0+sep] and climatological statistics [t0+sep+1,t0+length] 

610 filter_input_data = history.combine_first(future) 

611 # filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") 

612 return filter_input_data.dropna(time_dim) 

613 else: 

614 return history 

615 

616 @staticmethod 

617 def create_full_time_dim(data, dim, freq): 

618 """Ensure time dimension to be equidistant. Sometimes dates if missing values have been dropped.""" 

619 start = data.coords[dim].values[0] 

620 end = data.coords[dim].values[-1] 

621 datetime_index = pd.DataFrame(index=pd.date_range(start, end, freq=freq)) 

622 t = data.sel({dim: start}, drop=True) 

623 res = xr.DataArray(coords=[datetime_index.index, *[t.coords[c] for c in t.coords]], dims=[dim, *t.coords]) 

624 res = res.transpose(*data.dims) 

625 res.loc[data.coords] = data 

626 return res 

627 

628 def create_pseudo_timeseries(self, data, time_dim, sampling, window_dim): 

629 empty_data = data.sel({window_dim: 0}, drop=True) * np.nan 

630 data_full_time_index = self.create_full_time_dim(empty_data, time_dim, sampling) 

631 for lead_time in data.coords[window_dim]: 

632 data_tmp = data.sel({window_dim: lead_time}, drop=True) 

633 delta = np.timedelta64(int(lead_time), {"1d": "D", "1H": "h"}.get(sampling)) 

634 data_tmp.coords[time_dim] = data_tmp.coords[time_dim] + delta 

635 data_full_time_index = data_full_time_index.combine_first(data_tmp) 

636 if data_full_time_index.isnull().sum() == 0: # stop when all gaps are filled 

637 break 

638 return data_full_time_index 

639 

640 def create_visualization(self, filtered, data, filter_input_data, plot_dates, time_dim, new_dim, sampling, 

641 extend_length_history, extend_length_future, minimum_length, h, 

642 variable_name, extend_length_opts=None, extend_end=None, offset=None, forecast=None): # pragma: no cover 

643 

644 plot_data = [] 

645 extend_end = 0 if extend_end is None else extend_end 

646 extend_length_opts = 0 if extend_length_opts is None else extend_length_opts 

647 offset = 0 if offset is None else offset 

648 for t0 in set(plot_dates).intersection(filtered.coords[time_dim].values): 

649 try: 

650 td_type = {"1d": "D", "1H": "h"}.get(sampling) 

651 t_minus = t0 + np.timedelta64(int(-extend_length_history), td_type) 

652 t_plus = t0 + np.timedelta64(int(extend_length_future + 1), td_type) 

653 if new_dim not in data.coords: 

654 tmp_filter_data = self._shift_data(data.sel({time_dim: slice(t_minus, t_plus)}), 

655 range(int(-extend_length_history), 

656 int(extend_length_future + 1)), 

657 time_dim, 

658 new_dim).sel({time_dim: t0}) 

659 else: 

660 tmp_filter_data = None 

661 filter_input = filter_input_data.sel({time_dim: t0, new_dim: slice(None, extend_length_future)}) 

662 valid_start = int(filtered[new_dim].min()) + int((len(h) + 1) / 2) 

663 valid_end = min(extend_length_opts + extend_end + 1, int(filtered[new_dim].max()) - int((len(h) + 1) / 2)) 

664 valid_range = range(valid_start, valid_end) 

665 # if forecast is not None: 

666 # forecast_deltas = (t0 - forecast.coords[time_dim]) / np.timedelta64(1, "h") + 12 

667 # minimal_forecast_delta = int(forecast_deltas[forecast_deltas >= 0][-1]) 

668 # init_time = forecast.coords[time_dim][forecast_deltas >= 0][-1] 

669 # forecast_end = min(extend_end, extend_length_opts) 

670 # f = forecast.sel({time_dim: init_time, new_dim: slice(None, forecast_end)}) 

671 # f.coords[time_dim] = t0 

672 # f.coords[new_dim] = f.coords[new_dim] - minimal_forecast_delta + offset 

673 # else: 

674 # f = None 

675 # correct all data for offset 

676 def correct_data_for_offset(d): 

677 d = d.__deepcopy__() 

678 d.coords[time_dim] = d.coords[time_dim] + np.timedelta64(int(offset), td_type) 

679 d.coords[new_dim] = d.coords[new_dim] - offset 

680 return d 

681 plot_data.append({"t0": t0 + np.timedelta64(int(offset), td_type), 

682 "var": variable_name, 

683 "filter_input": correct_data_for_offset(filter_input), 

684 "filter_input_nc": correct_data_for_offset(tmp_filter_data), 

685 "valid_range": range(valid_range.start - offset, valid_range.stop - offset), 

686 # "forecast": f, 

687 "time_range": data.sel( 

688 {time_dim: slice(t_minus, t_plus - np.timedelta64(1, td_type))}).coords[ 

689 time_dim].values, 

690 "h": h, 

691 "new_dim": new_dim}) 

692 except: 

693 pass 

694 return plot_data 

695 

696 @staticmethod 

697 def _get_year_interval(data: xr.DataArray, time_dim: str) -> Tuple[int, int]: 

698 """ 

699 Get year of start and end date of given data. 

700 

701 :param data: data to extract dates from 

702 :param time_dim: name of temporal axis 

703 :returns: two-element tuple with start and end 

704 """ 

705 start = pd.to_datetime(data.coords[time_dim].min().values).year 

706 end = pd.to_datetime(data.coords[time_dim].max().values).year 

707 return start, end 

708 

709 @staticmethod 

710 def _calculate_filter_coefficients(window: Union[str, tuple], order: Union[int, tuple], cutoff_high: float, 

711 fs: float) -> np.array: 

712 """ 

713 Calculate filter coefficients for moving window using scipy's signal package for common filter types and local 

714 method firwin_kzf for Kolmogorov Zurbenko filter (kzf). The filter is a low-pass filter. 

715 

716 :param window: name of the window type which is either a string with the window's name or a tuple containing the 

717 name but also some parameters (e.g. `("kaiser", 5)`) 

718 :param order: order of the filter to create as int or parameters m and k of kzf 

719 :param cutoff_high: cutoff frequency to use for low-pass filter in frequency of fs 

720 :param fs: sampling frequency of time series 

721 """ 

722 if window == "kzf": 

723 h = firwin_kzf(*order) 

724 else: 

725 h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) 

726 return h 

727 

728 @staticmethod 

729 def _trim_data_to_minimum_length(data: xr.DataArray, extend_length_history: int, dim: str, 

730 extend_length_future: int = 0, offset: int = 0) -> xr.DataArray: 

731 """ 

732 Trim data along given axis between either -minimum_length (if given) or -extend_length_history and 

733 extend_length_opts (which is default set to 0). 

734 

735 :param data: data to trim 

736 :param extend_length_history: start number for trim range, only used if parameter minimum_length is not provided 

737 :param dim: dim to apply trim on 

738 :param extend_length_future: number to use in "future" 

739 :returns: trimmed data 

740 """ 

741 return data.sel({dim: slice(extend_length_history + offset, extend_length_future + offset)}, drop=True) 

742 

743 @staticmethod 

744 def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str, 

745 display_name: str = None) -> xr.DataArray: 

746 """ 

747 Create result filter array with same shape line given template data (should be the original input data before 

748 filtering the data). All gaps are filled by nans. 

749 

750 :param template_array: this array is used as template for shape and ordering of dims 

751 :param result_array: array with data that are filled into template 

752 :param new_dim: new dimension which is shifted/appended to/at the end (if present or not) 

753 :param display_name: string that is attached to logging (default None) 

754 """ 

755 logging.debug(f"{display_name}: create res_full") 

756 new_coords = {**{k: template_array.coords[k].values for k in template_array.coords if k != new_dim}, 

757 new_dim: result_array.coords[new_dim]} 

758 dims = [*template_array.dims, new_dim] if new_dim not in template_array.dims else template_array.dims 

759 result_array = result_array.transpose(*dims) 

760 return result_array.broadcast_like(xr.DataArray(dims=dims, coords=new_coords)) 

761 

762 @TimeTrackingWrapper 

763 def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None, 

764 sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", 

765 minimum_length=0, next_order=0, new_dim="window", plot_dates=None, display_name=None, 

766 extend_opts: int = 0, extend_end: int = 0, forecasts=None, offset: int = 0): 

767 

768 logging.debug(f"{display_name}: extend apriori") 

769 

770 # calculate apriori information from data if not given and extend its range if not sufficient long enough 

771 if apriori is None: 

772 apriori = self.create_monthly_mean(data, time_dim, sel_opts=sel_opts, sampling=sampling) 

773 apriori = apriori.astype(data.dtype) 

774 apriori = self.extend_apriori(data, apriori, time_dim, sampling, display_name=display_name) 

775 

776 # calculate FIR filter coefficients 

777 h = self._calculate_filter_coefficients(window, order, cutoff_high, fs) 

778 length = len(h) 

779 

780 # set data extend that is required for filtering 

781 extend_length_history = minimum_length + int((next_order + 1) / 2) + int((length + 1) / 2) - extend_end 

782 extend_length_future = extend_end + int((next_order + 1) / 2) + int((length + 1) / 2) 

783 

784 # collect some data for visualization 

785 plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * fs 

786 if plot_dates is None: 

787 plot_dates = [data.isel({time_dim: int(pos)}).coords[time_dim].values for pos in plot_pos if 

788 pos < len(data.coords[time_dim])] 

789 plot_data = [] 

790 

791 coll = [] 

792 coll_input = [] 

793 

794 for var in reversed(data.coords[var_dim].values): 

795 logging.info(f"{display_name} ({var}): sel data") 

796 

797 _start, _end = self._get_year_interval(data, time_dim) 

798 filt_coll = [] 

799 filt_input_coll = [] 

800 for _year in range(_start, _end + 1): 

801 logging.debug(f"{display_name} ({var}): year={_year}") 

802 

803 # select observations and apriori data 

804 time_slice = self._create_time_range_extend( 

805 _year, sampling, max(extend_length_history, extend_length_future)) 

806 d = data.sel({var_dim: [var], time_dim: time_slice}) 

807 a = apriori.sel({var_dim: [var], time_dim: time_slice}) 

808 f = forecasts.sel({var_dim: [var]}) if forecasts is not None else None 

809 if len(d.coords[time_dim]) == 0: # no data at all for this year 809 ↛ 810line 809 didn't jump to line 810, because the condition on line 809 was never true

810 continue 

811 

812 # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] 

813 filter_input_data = self.combine_observation_and_apriori(d, a, time_dim, new_dim, extend_length_history, 

814 extend_length_future, extend_length_separator=extend_opts, forecasts=f, sampling=sampling, 

815 extend_end=extend_end, offset=offset) 

816 

817 # select only data for current year 

818 try: 

819 filter_input_data = filter_input_data.sel({time_dim: str(_year)}) 

820 except KeyError: # no valid data for this year 

821 continue 

822 if len(filter_input_data.coords[time_dim]) == 0: # no valid data for this year 822 ↛ 823line 822 didn't jump to line 823, because the condition on line 822 was never true

823 continue 

824 

825 # apply filter 

826 logging.debug(f"{display_name} ({var}): start filter convolve") 

827 with TimeTracking(name=f"{display_name} ({var}): filter convolve", logging_level=logging.DEBUG): 

828 filt = xr.apply_ufunc(fir_filter_convolve, filter_input_data, 

829 input_core_dims=[[new_dim]], output_core_dims=[[new_dim]], 

830 vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype]) 

831 

832 # trim data if required 

833 valid_range_start = int(filt.coords[new_dim].min() + (length + 1) / 2) 

834 valid_range_end = int(filt.coords[new_dim].max() - (length + 1) / 2) + 1 

835 ext_len = min(extend_length_future, valid_range_end) 

836 trimmed = self._trim_data_to_minimum_length(filt, valid_range_start, new_dim, 

837 extend_length_future=ext_len) 

838 filt_coll.append(trimmed) 

839 trimmed = self._trim_data_to_minimum_length(filter_input_data, valid_range_start, new_dim, 

840 extend_length_future=ext_len) 

841 filt_input_coll.append(trimmed) 

842 

843 # visualization 

844 plot_data.extend(self.create_visualization(filt, d, filter_input_data, plot_dates, time_dim, new_dim, 

845 sampling, extend_length_history, extend_length_future, 

846 minimum_length, h, var, extend_opts, extend_end, offset, f)) 

847 

848 # collect all filter results 

849 coll.append(xr.concat(filt_coll, time_dim)) 

850 coll_input.append(xr.concat(filt_input_coll, time_dim)) 

851 gc.collect() 

852 

853 # concat all variables 

854 logging.debug(f"{display_name}: concat all variables") 

855 res = xr.concat(coll, var_dim) 

856 res_input = xr.concat(coll_input, var_dim) 

857 

858 # create result array with same shape like input data, gaps are filled by nans 

859 res_full = self._create_full_filter_result_array(data, res, new_dim, display_name) 

860 res_input_full = self._create_full_filter_result_array(data, res_input, new_dim, display_name) 

861 return res_full, res_input_full, h, apriori, plot_data 

862 

863 @staticmethod 

864 def _create_time_range_extend(year: int, sampling: str, extend_length: int) -> slice: 

865 """ 

866 Create a slice object for given year plus extend_length in sampling resolution. 

867 

868 :param year: year to create time range for 

869 :param sampling: sampling of time range 

870 :param extend_length: number of time steps to extend out of given year 

871 :returns: slice object with time range 

872 """ 

873 td_type = {"1d": "D", "1H": "h"}.get(sampling) 

874 delta = np.timedelta64(extend_length + 1, td_type) 

875 start = np.datetime64(f"{year}-01-01") - delta 

876 end = np.datetime64(f"{year}-12-31") + delta 

877 return slice(start, end) 

878 

879 @staticmethod 

880 def _create_tmp_dimension(data: xr.DataArray) -> str: 

881 """ 

882 Create a tmp dimension with name 'window' preferably. If name is already part of one dimensions, tmp dimension 

883 name is multiplied by itself until not present in dims. Method will raise ValueError after 10 tries. 

884 

885 :param data: data array to create a new tmp dimension for with unique name 

886 :returns: valid name for a tmp dimension (preferably 'window') 

887 """ 

888 new_dim = "window" 

889 count = 0 

890 while new_dim in data.dims: 

891 new_dim += new_dim 

892 count += 1 

893 if count > 10: 

894 raise ValueError("Could not create new dimension.") 

895 return new_dim 

896 

897 def _shift_data(self, data: xr.DataArray, index_value: range, time_dim: str, new_dim: str) -> xr.DataArray: 

898 """ 

899 Shift data multiple times to create history or future along dimension new_dim for each time step. 

900 

901 :param data: data set to shift 

902 :param index_value: range of integers to span history and/or future 

903 :param time_dim: name of temporal dimension that should be shifted 

904 :param new_dim: name of dimension create by data shift 

905 :return: shifted data 

906 """ 

907 coll = [] 

908 for i in index_value: 

909 coll.append(data.shift({time_dim: -i})) 

910 new_ind = self.create_index_array(new_dim, index_value) 

911 return xr.concat(coll, dim=new_ind) 

912 

913 @staticmethod 

914 def create_index_array(index_name: str, index_value: range): 

915 """ 

916 Create index array from a range object to use as index of a data array. 

917 

918 :param index_name: name of the index dimension 

919 :param index_value: range of values to use as indexes 

920 :returns: index array for given range of values 

921 """ 

922 ind = pd.DataFrame({'val': index_value}, index=index_value) 

923 tmp_dim = index_name + "tmp" 

924 res = xr.Dataset.from_dataframe(ind).to_array(tmp_dim).rename({'index': index_name}) 

925 res = res.squeeze(dim=tmp_dim, drop=True) 

926 res.name = index_name 

927 return res 

928 

929 @property 

930 def apriori_data(self): 

931 return self._apriori_list 

932 

933 @property 

934 def initial_apriori_data(self): 

935 return self.apriori_data[0] 

936 

937 

938def fir_filter(data, fs, order=5, cutoff_low=None, cutoff_high=None, window="hamming", dim="variables", h=None, 

939 causal=True, padlen=None): 

940 """Expects xarray.""" 

941 if h is None: 

942 cutoff = [] 

943 if cutoff_low is not None: 

944 cutoff += [cutoff_low] 

945 if cutoff_high is not None: 

946 cutoff += [cutoff_high] 

947 if len(cutoff) == 2: 

948 filter_type = "bandpass" 

949 elif len(cutoff) == 1 and cutoff_low is not None: 

950 filter_type = "highpass" 

951 elif len(cutoff) == 1 and cutoff_high is not None: 

952 filter_type = "lowpass" 

953 else: 

954 raise ValueError("Please provide either cutoff_low or cutoff_high.") 

955 h = signal.firwin(order, cutoff, pass_zero=filter_type, fs=fs, window=window) 

956 filtered = xr.ones_like(data) 

957 for var in data.coords[dim]: 

958 d = data.sel({dim: var}).values.flatten() 

959 if causal: 

960 y = signal.lfilter(h, 1., d) 

961 else: 

962 padlen = padlen if padlen is not None else 3 * len(h) 

963 y = signal.filtfilt(h, 1., d, padlen=padlen) 

964 filtered.loc[{dim: var}] = y 

965 return filtered, h 

966 

967 

968def fir_filter_convolve(data, h): 

969 return signal.convolve(data, h, mode='same', method="direct") / sum(h) 

970 

971 

972class KolmogorovZurbenkoBaseClass: 

973 

974 def __init__(self, df, wl, itr, is_child=False, filter_dim="window"): 

975 """ 

976 It create the variables associate with the Kolmogorov-Zurbenko-filter. 

977 

978 Args: 

979 df(pd.DataFrame, None): time series of a variable 

980 wl(list of int): window length 

981 itr(list of int): number of iteration 

982 """ 

983 self.df = df 

984 self.filter_dim = filter_dim 

985 self.wl = to_list(wl) 

986 self.itr = to_list(itr) 

987 if abs(len(self.wl) - len(self.itr)) > 0: 

988 raise ValueError("Length of lists for wl and itr must agree!") 

989 self._isChild = is_child 

990 self.child = self.set_child() 

991 self.type = type(self).__name__ 

992 

993 def set_child(self): 

994 if len(self.wl) > 1: 

995 return KolmogorovZurbenkoBaseClass(None, self.wl[1:], self.itr[1:], True, self.filter_dim) 

996 else: 

997 return None 

998 

999 def kz_filter(self, df, m, k): 

1000 pass 

1001 

1002 def spectral_calc(self): 

1003 df_start = self.df 

1004 kz = self.kz_filter(df_start, self.wl[0], self.itr[0]) 

1005 filtered = self.subtract(df_start, kz) 

1006 # case I: no child avail -> return kz and remaining 

1007 if self.child is None: 

1008 return [kz, filtered] 

1009 # case II: has child -> return current kz and all child results 

1010 else: 

1011 self.child.df = filtered 

1012 kz_next = self.child.spectral_calc() 

1013 return [kz] + kz_next 

1014 

1015 @staticmethod 

1016 def subtract(minuend, subtrahend): 

1017 try: # pandas implementation 

1018 return minuend.sub(subtrahend, axis=0) 

1019 except AttributeError: # general implementation 

1020 return minuend - subtrahend 

1021 

1022 def run(self): 

1023 return self.spectral_calc() 

1024 

1025 def transfer_function(self): 

1026 m = self.wl[0] 

1027 k = self.itr[0] 

1028 omega = np.linspace(0.00001, 0.15, 5000) 

1029 return omega, (np.sin(m * np.pi * omega) / (m * np.sin(np.pi * omega))) ** (2 * k) 

1030 

1031 def omega_null(self, alpha=0.5): 

1032 a = np.sqrt(6) / np.pi 

1033 b = 1 / (2 * np.array(self.itr)) 

1034 c = 1 - alpha ** b 

1035 d = np.array(self.wl) ** 2 - alpha ** b 

1036 return a * np.sqrt(c / d) 

1037 

1038 def period_null(self, alpha=0.5): 

1039 return 1. / self.omega_null(alpha) 

1040 

1041 def period_null_days(self, alpha=0.5): 

1042 return self.period_null(alpha) / 24. 

1043 

1044 def plot_transfer_function(self, fig=None, name=None): 

1045 if fig is None: 

1046 fig = plt.figure() 

1047 omega, transfer_function = self.transfer_function() 

1048 if self.child is not None: 

1049 transfer_function_child = self.child.plot_transfer_function(fig) 

1050 else: 

1051 transfer_function_child = transfer_function * 0 

1052 plt.semilogx(omega, transfer_function - transfer_function_child, 

1053 label="m={:3.0f}, k={:3.0f}, T={:6.2f}d".format(self.wl[0], 

1054 self.itr[0], 

1055 self.period_null_days())) 

1056 plt.axvline(x=self.omega_null()) 

1057 if not self._isChild: 

1058 locs, labels = plt.xticks() 

1059 plt.xticks(locs, np.round(1. / (locs * 24), 1)) 

1060 plt.xlim([0.00001, 0.15]) 

1061 plt.legend() 

1062 if name is None: 

1063 plt.show() 

1064 else: 

1065 plt.savefig(name) 

1066 else: 

1067 return transfer_function 

1068 

1069 

1070class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): 

1071 

1072 def __init__(self, df, wl: Union[list, int], itr: Union[list, int], is_child=False, filter_dim="window", 

1073 method="mean", percentile=0.5): 

1074 """ 

1075 It create the variables associate with the KolmogorovZurbenkoFilterMovingWindow class. 

1076 

1077 Args: 

1078 df(pd.DataFrame, xr.DataArray): time series of a variable 

1079 wl: window length 

1080 itr: number of iteration 

1081 """ 

1082 self.valid_methods = ["mean", "percentile", "median", "max", "min"] 

1083 if method not in self.valid_methods: 

1084 raise ValueError("Method '{}' is not supported. Please select from [{}].".format( 

1085 method, ", ".join(self.valid_methods))) 

1086 else: 

1087 self.method = method 

1088 if percentile > 1 or percentile < 0: 

1089 raise ValueError("Percentile must be in range [0, 1]. Given was {}!".format(percentile)) 

1090 else: 

1091 self.percentile = percentile 

1092 super().__init__(df, wl, itr, is_child, filter_dim) 

1093 

1094 def set_child(self): 

1095 if len(self.wl) > 1: 

1096 return KolmogorovZurbenkoFilterMovingWindow(self.df, self.wl[1:], self.itr[1:], is_child=True, 

1097 filter_dim=self.filter_dim, method=self.method, 

1098 percentile=self.percentile) 

1099 else: 

1100 return None 

1101 

1102 @TimeTrackingWrapper 

1103 def kz_filter_new(self, df, wl, itr): 

1104 """ 

1105 It passes the low frequency time series. 

1106 

1107 If filter method is from mean, max, min this method will call construct and rechunk before the actual 

1108 calculation to improve performance. If filter method is either median or percentile this approach is not 

1109 applicable and depending on the data and window size, this method can become slow. 

1110 

1111 Args: 

1112 wl(int): a window length 

1113 itr(int): a number of iteration 

1114 """ 

1115 warnings.filterwarnings("ignore") 

1116 df_itr = df.__deepcopy__() 

1117 try: 

1118 kwargs = {"min_periods": int(0.7 * wl), 

1119 "center": True, 

1120 self.filter_dim: wl} 

1121 for i in np.arange(0, itr): 

1122 print(i) 

1123 rolling = df_itr.chunk().rolling(**kwargs) 

1124 if self.method not in ["percentile", "median"]: 

1125 rolling = rolling.construct("construct").chunk("auto") 

1126 if self.method == "median": 

1127 df_mv_avg_tmp = rolling.median() 

1128 elif self.method == "percentile": 

1129 df_mv_avg_tmp = rolling.quantile(self.percentile) 

1130 elif self.method == "max": 

1131 df_mv_avg_tmp = rolling.max("construct") 

1132 elif self.method == "min": 

1133 df_mv_avg_tmp = rolling.min("construct") 

1134 else: 

1135 df_mv_avg_tmp = rolling.mean("construct") 

1136 df_itr = df_mv_avg_tmp.compute() 

1137 del df_mv_avg_tmp, rolling 

1138 gc.collect() 

1139 return df_itr 

1140 except ValueError: 

1141 raise ValueError 

1142 

1143 @TimeTrackingWrapper 

1144 def kz_filter(self, df, wl, itr): 

1145 """ 

1146 It passes the low frequency time series. 

1147 

1148 Args: 

1149 wl(int): a window length 

1150 itr(int): a number of iteration 

1151 """ 

1152 import warnings 

1153 warnings.filterwarnings("ignore") 

1154 df_itr = df.__deepcopy__() 

1155 try: 

1156 kwargs = {"min_periods": int(0.7 * wl), 

1157 "center": True, 

1158 self.filter_dim: wl} 

1159 iter_vars = df_itr.coords["variables"].values 

1160 for var in iter_vars: 

1161 df_itr_var = df_itr.sel(variables=[var]) 

1162 for _ in np.arange(0, itr): 

1163 df_itr_var = df_itr_var.chunk() 

1164 rolling = df_itr_var.rolling(**kwargs) 

1165 if self.method == "median": 

1166 df_mv_avg_tmp = rolling.median() 

1167 elif self.method == "percentile": 

1168 df_mv_avg_tmp = rolling.quantile(self.percentile) 

1169 elif self.method == "max": 

1170 df_mv_avg_tmp = rolling.max() 

1171 elif self.method == "min": 

1172 df_mv_avg_tmp = rolling.min() 

1173 else: 

1174 df_mv_avg_tmp = rolling.mean() 

1175 df_itr_var = df_mv_avg_tmp.compute() 

1176 df_itr.loc[{"variables": [var]}] = df_itr_var 

1177 return df_itr 

1178 except ValueError: 

1179 raise ValueError 

1180 

1181 

1182def firwin_kzf(m: int, k: int) -> np.array: 

1183 """Calculate weights of window for Kolmogorov Zurbenko filter.""" 

1184 m, k = int(m), int(k) 

1185 coef = np.ones(m) 

1186 for i in range(1, k): 

1187 t = np.zeros((m, m + i * (m - 1))) 

1188 for km in range(m): 

1189 t[km, km:km + coef.size] = coef 

1190 coef = np.sum(t, axis=0) 

1191 return coef / (m ** k) 

1192 

1193 

1194def omega_null_kzf(m: int, k: int, alpha: float = 0.5) -> float: 

1195 a = np.sqrt(6) / np.pi 

1196 b = 1 / (2 * np.array(k)) 

1197 c = 1 - alpha ** b 

1198 d = np.array(m) ** 2 - alpha ** b 

1199 return a * np.sqrt(c / d) 

1200 

1201 

1202def filter_width_kzf(m: int, k: int) -> int: 

1203 """Returns window width of the Kolmorogov Zurbenko filter.""" 

1204 return k * (m - 1) + 1