Coverage for mlair/helpers/filter.py: 48%

526 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-06-01 13:03 +0000

1import gc 

2import warnings 

3from typing import Union, Callable, Tuple, Dict, Any 

4import logging 

5 

6import datetime 

7import numpy as np 

8import pandas as pd 

9from matplotlib import pyplot as plt 

10from scipy import signal 

11import xarray as xr 

12import dask.array as da 

13 

14from mlair.helpers import to_list, TimeTrackingWrapper, TimeTracking 

15 

16 

17class FIRFilter: 

18 from mlair.plotting.data_insight_plotting import PlotFirFilter 

19 

20 def __init__(self, data, fs, order, cutoff, window, var_dim, time_dim, display_name=None, minimum_length=None, 

21 offset=0, plot_path=None, plot_dates=None): 

22 self._filtered = [] 

23 self._h = [] 

24 self.data = data 

25 self.fs = fs 

26 self.order = order 

27 self.cutoff = cutoff 

28 self.window = window 

29 self.var_dim = var_dim 

30 self.time_dim = time_dim 

31 self.display_name = display_name 

32 self.minimum_length = minimum_length 

33 self.offset = offset 

34 self.plot_path = plot_path 

35 self.plot_dates = plot_dates 

36 self.run() 

37 

38 def run(self): 

39 logging.info(f"{self.display_name}: start {self.__class__.__name__}") 

40 filtered = [] 

41 h = [] 

42 input_data = self.data.__deepcopy__() 

43 

44 # collect some data for visualization 

45 if self.plot_dates is None: 

46 plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * self.fs 

47 self.plot_dates = [input_data.isel({self.time_dim: int(pos)}).coords[self.time_dim].values 

48 for pos in plot_pos if pos < len(input_data.coords[self.time_dim])] 

49 plot_data = [] 

50 

51 for i in range(len(self.order)): 

52 # apply filter 

53 fi, hi = self.fir_filter(input_data, self.fs, self.cutoff[i], self.order[i], time_dim=self.time_dim, 

54 var_dim=self.var_dim, window=self.window, display_name=self.display_name) 

55 filtered.append(fi) 

56 h.append(hi) 

57 

58 # visualization 

59 plot_data.append(self.create_visualization(fi, input_data, self.plot_dates, self.time_dim, self.fs, hi, 

60 self.minimum_length, self.order, i, self.offset, self.var_dim)) 

61 # calculate residuum 

62 input_data = input_data - fi 

63 

64 # add last residuum to filtered 

65 filtered.append(input_data) 

66 

67 self._filtered = filtered 

68 self._h = h 

69 

70 # visualize 

71 if self.plot_path is not None: 

72 try: 

73 self.PlotFirFilter(self.plot_path, plot_data, self.display_name) # not working when t0 != 0 

74 except Exception as e: 

75 logging.info(f"Could not plot climate fir filter due to following reason:\n{e}") 

76 

77 def create_visualization(self, filtered, filter_input_data, plot_dates, time_dim, sampling, 

78 h, minimum_length, order, i, offset, var_dim): # pragma: no cover 

79 plot_data = [] 

80 minimum_length = minimum_length or 0 

81 for viz_date in set(plot_dates).intersection(filtered.coords[time_dim].values): 

82 try: 

83 if i < len(order) - 1: 

84 minimum_length += order[i+1] 

85 

86 td_type = {1: "D", 24: "h"}.get(sampling) 

87 length = len(h) 

88 extend_length_history = minimum_length + int((length + 1) / 2) 

89 extend_length_future = int((length + 1) / 2) + 1 

90 t_minus = viz_date + np.timedelta64(int(-extend_length_history), td_type) 

91 t_plus = viz_date + np.timedelta64(int(extend_length_future + offset), td_type) 

92 time_slice = slice(t_minus, t_plus - np.timedelta64(1, td_type)) 

93 plot_data.append({"t0": viz_date, "filter_input": filter_input_data.sel({time_dim: time_slice}), 

94 "filtered": filtered.sel({time_dim: time_slice}), "h": h, "time_dim": time_dim, 

95 "var_dim": var_dim}) 

96 except: 

97 pass 

98 return plot_data 

99 

100 @property 

101 def filter_coefficients(self): 

102 return self._h 

103 

104 @property 

105 def filtered_data(self): 

106 return self._filtered 

107 

108 @TimeTrackingWrapper 

109 def fir_filter(self, data, fs, cutoff_high, order, sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", 

110 minimum_length=None, new_dim="window", plot_dates=None, display_name=None): 

111 

112 # calculate FIR filter coefficients 

113 h = self._calculate_filter_coefficients(window, order, cutoff_high, fs) 

114 

115 coll = [] 

116 for var in data.coords[var_dim]: 

117 d = data.sel({var_dim: var}) 

118 filt = xr.apply_ufunc(fir_filter_convolve, d, 

119 input_core_dims=[[time_dim]], output_core_dims=[[time_dim]], 

120 vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype]) 

121 # trim data to valid range 

122 ext_len = int((len(h) + 1) / 2) 

123 valid_range = filt.coords[time_dim].values[ext_len:-ext_len] 

124 trimmed = filt.sel(**{time_dim: valid_range}) 

125 coll.append(trimmed) 

126 filtered = xr.concat(coll, var_dim) 

127 

128 # create result array with same shape like input data, gaps are filled by nans 

129 filtered = self._create_full_filter_result_array(data, filtered, time_dim, display_name) 

130 return filtered, h 

131 

132 @staticmethod 

133 def _calculate_filter_coefficients(window: Union[str, tuple], order: Union[int, tuple], cutoff_high: float, 

134 fs: float) -> np.array: 

135 """ 

136 Calculate filter coefficients for moving window using scipy's signal package for common filter types and local 

137 method firwin_kzf for Kolmogorov Zurbenko filter (kzf). The filter is a low-pass filter. 

138 

139 :param window: name of the window type which is either a string with the window's name or a tuple containing the 

140 name but also some parameters (e.g. `("kaiser", 5)`) 

141 :param order: order of the filter to create as int or parameters m and k of kzf 

142 :param cutoff_high: cutoff frequency to use for low-pass filter in frequency of fs 

143 :param fs: sampling frequency of time series 

144 """ 

145 if window == "kzf": 

146 h = firwin_kzf(*order) 

147 else: 

148 h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) 

149 return h 

150 

151 @staticmethod 

152 def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str, 

153 display_name: str = None) -> xr.DataArray: 

154 """ 

155 Create result filter array with same shape line given template data (should be the original input data before 

156 filtering the data). All gaps are filled by nans. 

157 

158 :param template_array: this array is used as template for shape and ordering of dims 

159 :param result_array: array with data that are filled into template 

160 :param new_dim: new dimension which is shifted/appended to/at the end (if present or not) 

161 :param display_name: string that is attached to logging (default None) 

162 """ 

163 logging.debug(f"{display_name}: create res_full") 

164 new_coords = {**{k: template_array.coords[k].values for k in template_array.coords if k != new_dim}, 

165 new_dim: result_array.coords[new_dim]} 

166 dims = [*template_array.dims, new_dim] if new_dim not in template_array.dims else template_array.dims 

167 result_array = result_array.transpose(*dims) 

168 return result_array.broadcast_like(xr.DataArray(dims=dims, coords=new_coords)) 

169 

170 

171class ClimateFIRFilter(FIRFilter): 

172 from mlair.plotting.data_insight_plotting import PlotClimateFirFilter 

173 

174 def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None, 

175 apriori_diurnal=False, sel_opts=None, plot_path=None, 

176 minimum_length=None, new_dim=None, display_name=None, extend_length_opts: int = 0, 

177 offset: Union[dict, int] = 0, plot_dates=None): 

178 """ 

179 :param data: data to filter 

180 :param fs: sampling frequency in 1/days -> 1d: fs=1 -> 1H: fs=24 

181 :param order: a tuple with the order of the filter in same ordering like cutoff 

182 :param cutoff: a tuple with the cutoff frequencies (all are applied as low pass) 

183 :param window: window type of the filter (e.g. hamming) 

184 :param time_dim: name of time dimension to apply filter along 

185 :param var_dim: name of variables dimension 

186 :param apriori: apriori information to use for the first low pass. If None, climatology is calculated on the 

187 provided data. 

188 :param apriori_type: type of apriori information to use. Climatology will be used always for first low pass. For 

189 the residuum either the value zero is used (apriori_type is None or "zeros") or a climatology on the 

190 residua is used ("residuum_stats"). 

191 :param apriori_diurnal: Use diurnal cycle as additional apriori information (only applicable for hourly 

192 resoluted data). The mean anomaly of each hour is added to the apriori_type information. 

193 :param sel_opts: specify some parameters to select a subset of data before calculating the apriori information. 

194 Use this parameter for example, if apriori shall only calculated on a shorter time period than available in 

195 given data. 

196 :param extend_length_opts: shift information switch between historical data and apriori estimation by the given 

197 values (default None). Must either be a dictionary with keys available in var_dim or a single value that is 

198 applied to all data. This parameter has only influence on which information is available at t0 for the 

199 filter calculcation but has no influence on the shape of the returned filtered data. 

200 :param offset: This parameter indicates the number of time steps with ti>t0 to return of the filtered data. In 

201 case the offset parameter is larger than the extend_lenght_opts parameter, this leads to the case that not 

202 only observational data but also climatological estimations are returned. Must either be a dictionary with 

203 keys available in var_dim or a single value that is applied to all data. Default is 0. 

204 """ 

205 self._apriori = apriori 

206 self.apriori_type = apriori_type 

207 self.apriori_diurnal = apriori_diurnal 

208 self._apriori_list = [] 

209 self.sel_opts = sel_opts 

210 self.new_dim = new_dim 

211 self.plot_data = [] 

212 self.extend_length_opts = extend_length_opts 

213 super().__init__(data, fs, order, cutoff, window, var_dim, time_dim, display_name=display_name, 

214 minimum_length=minimum_length, plot_path=plot_path, offset=offset, plot_dates=plot_dates) 

215 

216 def run(self): 

217 filtered = [] 

218 h = [] 

219 if self.sel_opts is not None: 

220 self.sel_opts = self.sel_opts if isinstance(self.sel_opts, dict) else {self.time_dim: self.sel_opts} 

221 self._check_sel_opts() 

222 sampling = {1: "1d", 24: "1H"}.get(int(self.fs)) 

223 logging.debug(f"{self.display_name}: create diurnal_anomalies") 

224 if self.apriori_diurnal is True and sampling == "1H": 

225 diurnal_anomalies = self.create_seasonal_hourly_mean(self.data, self.time_dim, sel_opts=self.sel_opts, 

226 sampling=sampling, as_anomaly=True) 

227 else: 

228 diurnal_anomalies = 0 

229 logging.debug(f"{self.display_name}: create monthly apriori") 

230 if self._apriori is None: 

231 self._apriori = self.create_monthly_mean(self.data, self.time_dim, sel_opts=self.sel_opts, 

232 sampling=sampling) + diurnal_anomalies 

233 logging.debug(f"{self.display_name}: apriori shape = {self._apriori.shape}") 

234 apriori_list = to_list(self._apriori) 

235 input_data = self.data.__deepcopy__() 

236 

237 # for visualization 

238 plot_dates = self.plot_dates 

239 

240 # create tmp dimension to apply filter, search for unused name 

241 new_dim = self._create_tmp_dimension(input_data) if self.new_dim is None else self.new_dim 

242 

243 for i in range(len(self.order)): 

244 logging.info(f"{self.display_name}: start filter for order {self.order[i]}") 

245 # calculate climatological filter 

246 next_order = self._next_order(self.order, 0, i, self.window) 

247 fi, input_data, hi, apriori, plot_data = self.clim_filter(input_data, self.fs, self.cutoff[i], self.order[i], 

248 apriori=apriori_list[i], sel_opts=self.sel_opts, 

249 sampling=sampling, time_dim=self.time_dim, 

250 window=self.window, var_dim=self.var_dim, 

251 minimum_length=self.minimum_length, new_dim=new_dim, 

252 plot_dates=plot_dates, display_name=self.display_name, 

253 extend_opts=self.extend_length_opts, 

254 offset=self.offset, next_order=next_order) 

255 

256 logging.info(f"{self.display_name}: finished clim_filter calculation") 

257 if self.minimum_length is None: 

258 filtered.append(fi.sel({new_dim: slice(None, self.offset)})) 

259 else: 

260 filtered.append(fi.sel({new_dim: slice(self.offset - self.minimum_length, self.offset)})) 

261 h.append(hi) 

262 gc.collect() 

263 self.plot_data.append(plot_data) 

264 plot_dates = {e["t0"] for e in plot_data} 

265 

266 # calculate residuum 

267 logging.info(f"{self.display_name}: calculate residuum") 

268 coord_range = range(fi.coords[new_dim].values.min(), fi.coords[new_dim].values.max() + 1) 

269 if new_dim in input_data.coords: 

270 input_data = input_data.sel({new_dim: coord_range}) - fi 

271 else: 

272 input_data = self._shift_data(input_data, coord_range, self.time_dim, new_dim) - fi 

273 

274 # create new apriori information for next iteration if no further apriori is provided 

275 if len(apriori_list) < len(self.order): 

276 logging.info(f"{self.display_name}: create diurnal_anomalies") 

277 if self.apriori_diurnal is True and sampling == "1H": 

278 diurnal_anomalies = self.create_seasonal_hourly_mean(input_data.sel({new_dim: 0}, drop=True), 

279 self.time_dim, sel_opts=self.sel_opts, 

280 sampling=sampling, as_anomaly=True) 

281 else: 

282 diurnal_anomalies = 0 

283 logging.info(f"{self.display_name}: create monthly apriori") 

284 if self.apriori_type is None or self.apriori_type == "zeros": # zero version 

285 apriori_list.append(xr.zeros_like(apriori_list[i]) + diurnal_anomalies) 

286 elif self.apriori_type == "residuum_stats": # calculate monthly statistic on residuum 

287 apriori_list.append( 

288 -self.create_monthly_mean(input_data.sel({new_dim: 0}, drop=True), self.time_dim, 

289 sel_opts=self.sel_opts, sampling=sampling) + diurnal_anomalies) 

290 else: 

291 raise ValueError(f"Cannot handle unkown apriori type: {self.apriori_type}. Please choose from None," 

292 f" `zeros` or `residuum_stats`.") 

293 

294 # add last residuum to filtered 

295 if self.minimum_length is None: 

296 filtered.append(input_data.sel({new_dim: slice(None, self.offset)})) 

297 else: 

298 filtered.append(input_data.sel({new_dim: slice(self.offset - self.minimum_length, self.offset)})) 

299 

300 self._filtered = filtered 

301 self._h = h 

302 self._apriori_list = apriori_list 

303 

304 # visualize 

305 if self.plot_path is not None: 

306 try: 

307 self.PlotClimateFirFilter(self.plot_path, self.plot_data, sampling, self.display_name) 

308 except Exception as e: 

309 logging.info(f"Could not plot climate fir filter due to following reason:\n{e}") 

310 

311 def _check_sel_opts(self): 

312 if len(self.data.sel(**self.sel_opts).coords[self.time_dim]) == 0: 

313 raise ValueError(f"Abort {self.__class__.__name__} as no data is available after applying sel_opts to data") 

314 

315 @staticmethod 

316 def _next_order(order: list, minimum_length: Union[int, None], pos: int, window: Union[str, tuple]) -> int: 

317 next_order = 0 

318 if pos + 1 < len(order): 

319 next_order = order[pos + 1] 

320 if window == "kzf" and isinstance(next_order, tuple): 

321 next_order = filter_width_kzf(*next_order) 

322 if minimum_length is not None: 

323 next_order = next_order + minimum_length 

324 return next_order 

325 

326 @staticmethod 

327 def create_monthly_unity_array(data: xr.DataArray, time_dim: str, extend_range: int = 366) -> xr.DataArray: 

328 """ 

329 Create a xarray data array filled with ones with monthly resolution (set on 16th of month). Data is extended by 

330 extend_range days in future and past along time_dim. 

331 

332 :param data: data to create monthly unity array from, must contain dimension time_dim 

333 :param time_dim: name of temporal dimension 

334 :param extend_range: number of days to extend data (default 366) 

335 :returns: xarray in monthly resolution (centered at 16th day of month) with all values equal to 1 

336 """ 

337 coords = data.coords 

338 

339 # extend time_dim by given extend_range days 

340 start = coords[time_dim][0].values.astype("datetime64[D]") - np.timedelta64(extend_range, "D") 

341 end = coords[time_dim][-1].values.astype("datetime64[D]") + np.timedelta64(extend_range, "D") 

342 new_time_axis = np.arange(start, end).astype("datetime64[ns]") 

343 

344 # construct data array with updated coords 

345 new_coords = {k: data.coords[k].values if k != time_dim else new_time_axis for k in coords} 

346 new_array = xr.DataArray(1, coords=new_coords, dims=new_coords.keys()).transpose(*data.dims) 

347 

348 # loffset is required because resampling uses last day in month as resampling timestamp 

349 return new_array.resample({time_dim: "1m"}, loffset=datetime.timedelta(days=-15)).max() 

350 

351 def create_monthly_mean(self, data: xr.DataArray, time_dim: str, sel_opts: dict = None, sampling: str = "1d") \ 

352 -> xr.DataArray: 

353 """ 

354 Calculate monthly means (12 values) and return a data array with same resolution as given data containing these 

355 monthly mean values. Sampling points are the 16th of each month (this value is equal to the true monthly mean) 

356 and all other values between two points are interpolated linearly. It is possible to apply some pre-selection 

357 to use only a subset of given data using the sel_opts parameter. Only data from this subset are used to 

358 calculate the monthly statistic. 

359 

360 :param data: data to apply statistical calculation on 

361 :param time_dim: name of temporal axis 

362 :param sel_opts: selection options as dict to select a subset of data (default None). A given sel_opts with 

363 `sel_opts={<time_dim>: "2006"}` forces the method e.g. to derive the monthly means only from data of the 

364 year 2006. 

365 :param sampling: sampling of the returned data (default 1d) 

366 :returns: array in desired resolution containing interpolated monthly values. Months with no valid data are 

367 returned as np.nan which also effects data in the neighbouring months (before / after sampling points which 

368 are the 16th of each month). 

369 """ 

370 

371 # create unity xarray in monthly resolution with sampling point in mid of each month 

372 monthly = self.create_monthly_unity_array(data, time_dim) * np.nan 

373 

374 # apply selection if given (only use subset for monthly means) 

375 if sel_opts is not None: 

376 data = data.sel(**sel_opts) 

377 

378 # create monthly mean and replace entries in unity array 

379 monthly_mean = data.groupby(f"{time_dim}.month").mean() 

380 for month in monthly_mean.month.values: 

381 monthly = xr.where((monthly[f"{time_dim}.month"] == month), 

382 monthly_mean.sel(month=month, drop=True), 

383 monthly) 

384 # transform monthly information into original sampling rate 

385 return monthly.dropna(dim=time_dim).resample({time_dim: sampling}).interpolate() 

386 

387 @staticmethod 

388 def _compute_hourly_mean_per_month(data: xr.DataArray, time_dim: str, as_anomaly: bool) -> Dict[int, xr.DataArray]: 

389 """ 

390 Calculate for each hour in each month a separate mean value (12 x 24 values in total). Average is either the 

391 anomaly of a monthly mean state or the raw mean value. 

392 

393 :param data: data to calculate averages on 

394 :param time_dim: name of temporal dimension 

395 :param as_anomaly: indicates whether to calculate means as anomaly of a monthly mean or as raw mean values. 

396 :returns: dictionary containing 12 months each with a 24-valued array (1 entry for each hour) 

397 """ 

398 seasonal_hourly_means = {} 

399 for month in data.groupby(f"{time_dim}.month").groups.keys(): 

400 single_month_data = data.sel({time_dim: (data[f"{time_dim}.month"] == month)}) 

401 hourly_mean = single_month_data.groupby(f"{time_dim}.hour").mean() 

402 if as_anomaly is True: 

403 hourly_mean = hourly_mean - hourly_mean.mean("hour") 

404 seasonal_hourly_means[month] = hourly_mean 

405 return seasonal_hourly_means 

406 

407 @staticmethod 

408 def _create_seasonal_cycle_of_single_hour_mean(result_arr: xr.DataArray, means: Dict[int, xr.DataArray], hour: int, 

409 time_dim: str, sampling: str) -> xr.DataArray: 

410 """ 

411 Use monthly means of a given hour to create an array with interpolated values at the indicated hour for each day 

412 of the full time span indicated by given result_arr. 

413 

414 :param result_arr: template array indicating the full time range and additional dimensions to keep 

415 :param means: dictionary containing 24 hourly averages for each month (12 x 24 values in total) 

416 :param hour: integer of hour of interest 

417 :param time_dim: name of temporal dimension 

418 :param sampling: sampling rate to interpolate 

419 :returns: array with interpolated averages in sampling resolution containing only values for hour of interest 

420 """ 

421 h_coll = xr.ones_like(result_arr) * np.nan 

422 for month in means.keys(): 

423 hourly_mean_single_month = means[month].sel(hour=hour, drop=True) 

424 h_coll = xr.where((h_coll[f"{time_dim}.month"] == month), hourly_mean_single_month, h_coll) 

425 h_coll = h_coll.dropna(time_dim).resample({time_dim: sampling}).interpolate() 

426 h_coll = h_coll.sel({time_dim: (h_coll[f"{time_dim}.hour"] == hour)}) 

427 return h_coll 

428 

429 def create_seasonal_hourly_mean(self, data: xr.DataArray, time_dim: str, sel_opts: Dict[str, Any] = None, 

430 sampling: str = "1H", as_anomaly: bool = True) -> xr.DataArray: 

431 """ 

432 Compute climatological statistics on hourly base either as raw data or anomalies. For each month, an overall 

433 mean value (only used if requiring anomalies) and the mean of each hour are calculated. The climatological 

434 diurnal cycle is positioned on the 16th of each month and interpolated in between by using a distinct 

435 interpolation for each hour of day. The returned array therefore contains data with a yearly cycle (if anomaly 

436 is not calculated) or data without a yearly cycle (if using anomalies). In both cases, the data have an 

437 amplitude that varies over the year. 

438 

439 :param data: data to apply this method to 

440 :param time_dim: name of temporal axis 

441 :param sel_opts: specific selection options that are applied before calculation of climatological statistics 

442 (default None) 

443 :param sampling: temporal resolution of data (default "1H") 

444 :param as_anomaly: specify whether to use anomalies or raw data including a seasonal cycle of the mean value 

445 (default: True) 

446 :returns: climatological statistics for given data interpolated with given sampling rate 

447 """ 

448 

449 """Calculate hourly statistics. Either the absolute value or the anomaly (as_anomaly=True).""" 

450 # can only be used for hourly sampling rate 

451 assert sampling == "1H" 

452 

453 # apply selection if given (only use subset for seasonal hourly means) 

454 if sel_opts is not None: 

455 data = data.sel(**sel_opts) 

456 

457 # create unity xarray in monthly resolution with sampling point in mid of each month 

458 monthly = self.create_monthly_unity_array(data, time_dim) * np.nan 

459 

460 # calculate for each hour in each month a separate mean value 

461 seasonal_hourly_means = self._compute_hourly_mean_per_month(data, time_dim, as_anomaly) 

462 

463 # create seasonal cycles of these hourly averages 

464 seasonal_coll = [] 

465 for hour in data.groupby(f"{time_dim}.hour").groups.keys(): 

466 mean_single_hour = self._create_seasonal_cycle_of_single_hour_mean(monthly, seasonal_hourly_means, hour, 

467 time_dim, sampling) 

468 seasonal_coll.append(mean_single_hour) 

469 

470 # combine all cycles in a common data array 

471 hourly = xr.concat(seasonal_coll, time_dim).sortby(time_dim).resample({time_dim: sampling}).interpolate() 

472 return hourly 

473 

474 @staticmethod 

475 def extend_apriori(data: xr.DataArray, apriori: xr.DataArray, time_dim: str, sampling: str = "1d", 

476 display_name: str = None) -> xr.DataArray: 

477 """ 

478 Extend time range of apriori information to span a longer period as data (or at least of equal length). This 

479 method may not working properly if length of apriori contains data from less then one year. 

480 

481 :param data: data to get time range of which apriori should span in minimum 

482 :param apriori: data that is adjusted. It is assumed that this data varies in the course of the year but is same 

483 for the same day in different years. Otherwise this method will introduce some unintended artefacts in the 

484 apriori data. 

485 :param time_dim: name of temporal dimension 

486 :param sampling: sampling of data (e.g. "1m", "1d", default "1d") 

487 :param display_name: name to use for logging message (default None) 

488 :returns: array which adjusted temporal coverage derived from apriori 

489 """ 

490 dates = data.coords[time_dim].values 

491 td_type = {"1d": "D", "1H": "h"}.get(sampling) 

492 

493 # apriori starts after data 

494 if dates[0] < apriori.coords[time_dim].values[0]: 

495 logging.debug(f"{display_name}: apriori starts after data") 

496 

497 # add difference in full years 

498 date_diff = abs(dates[0] - apriori.coords[time_dim].values[0]).astype("timedelta64[D]") 

499 extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 

500 factor = 1 if td_type == "D" else 24 

501 

502 # get fill data range 

503 start = apriori.coords[time_dim][0].values.astype("datetime64[%s]" % td_type) 

504 end = apriori.coords[time_dim][0].values.astype("datetime64[%s]" % td_type) + np.timedelta64( 

505 366 * factor + 1, td_type) 

506 

507 # fill year by year 

508 for i in range(365, extend_range + 365, 365): 

509 apriori_tmp = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date 

510 new_time_axis = apriori_tmp.coords[time_dim] - np.timedelta64(i * factor, td_type) 

511 apriori_tmp.coords[time_dim] = new_time_axis 

512 apriori = apriori.combine_first(apriori_tmp) 

513 

514 # apriori ends before data 

515 if dates[-1] + np.timedelta64(365, "D") > apriori.coords[time_dim].values[-1]: 

516 logging.debug(f"{display_name}: apriori ends before data") 

517 

518 # add difference in full years + 1 year (because apriori is used as future estimate) 

519 date_diff = abs(dates[-1] - apriori.coords[time_dim].values[-1]).astype("timedelta64[D]") 

520 extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 + 365 

521 factor = 1 if td_type == "D" else 24 

522 

523 # get fill data range 

524 start = apriori.coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) - np.timedelta64( 

525 366 * factor + 1, td_type) 

526 end = apriori.coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) 

527 

528 # fill year by year 

529 for i in range(365, extend_range + 365, 365): 

530 apriori_tmp = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date 

531 new_time_axis = apriori_tmp.coords[time_dim] + np.timedelta64(i * factor, td_type) 

532 apriori_tmp.coords[time_dim] = new_time_axis 

533 apriori = apriori.combine_first(apriori_tmp) 

534 

535 return apriori 

536 

537 def combine_observation_and_apriori(self, data: xr.DataArray, apriori: xr.DataArray, time_dim: str, new_dim: str, 

538 extend_length_history: int, extend_length_future: int, 

539 extend_length_separator: int = 0) -> xr.DataArray: 

540 """ 

541 Combine historical data / observations ("data") and climatological statistics ("apriori"). Historical data are 

542 used on interval [t0 - extend_length_history, t0] and apriori is used on [t0 + 1, t0 + extend_length_future]. If 

543 indicated by the extend_length_seperator, it is possible to shift end of history interval and start of apriori 

544 interval by given number of time steps. 

545 

546 :param data: historical data for past values, must contain dimensions time_dim and var_dim and might also have 

547 a new_dim dimension 

548 :param apriori: climatological estimate for future values, must contain dimensions time_dim and var_dim, but 

549 can also have dimension new_dim 

550 :param time_dim: name of temporal dimension 

551 :param new_dim: name of new dim on which data is combined along 

552 :param extend_length_history: number of time steps to use from data 

553 :param extend_length_future: number of time steps to use from apriori (minus 1) 

554 :param extend_length_separator: position of last history value to use (default 0), this position indicates the 

555 last value that is used from data (followed by values from apriori). In other words, end of history 

556 interval and start of apriori interval are shifted by this value from t0 (positive or negative). 

557 :returns: combined data array 

558 """ 

559 # prepare historical data / observation 

560 ext_sep = min(extend_length_separator, extend_length_future) 

561 if new_dim not in data.coords: 

562 history = self._shift_data(data, range(int(-extend_length_history), ext_sep + 1), 

563 time_dim, new_dim) 

564 else: 

565 history = data.sel({new_dim: slice(int(-extend_length_history), ext_sep)}) 

566 

567 if extend_length_future > ext_sep + 1: 567 ↛ 580line 567 didn't jump to line 580, because the condition on line 567 was never false

568 # prepare climatological statistics 

569 if new_dim not in apriori.coords: 

570 future = self._shift_data(apriori, range(ext_sep + 1, 

571 extend_length_future + 1), 

572 time_dim, new_dim) 

573 else: 

574 future = apriori.sel({new_dim: slice(ext_sep + 1, 

575 extend_length_future)}) 

576 # combine historical data [t0-length,t0+sep] and climatological statistics [t0+sep+1,t0+length] 

577 filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left") 

578 return filter_input_data 

579 else: 

580 return history 

581 

582 def create_visualization(self, filtered, data, filter_input_data, plot_dates, time_dim, new_dim, sampling, 

583 extend_length_history, extend_length_future, minimum_length, h, 

584 variable_name, extend_length_opts=None, offset=None): # pragma: no cover 

585 plot_data = [] 

586 offset = 0 if offset is None else offset 

587 extend_length_opts = 0 if extend_length_opts is None else extend_length_opts 

588 for t0 in set(plot_dates).intersection(filtered.coords[time_dim].values): 

589 try: 

590 td_type = {"1d": "D", "1H": "h"}.get(sampling) 

591 t_minus = t0 + np.timedelta64(int(-extend_length_history), td_type) 

592 t_plus = t0 + np.timedelta64(int(extend_length_future + 1), td_type) 

593 if new_dim not in data.coords: 

594 tmp_filter_data = self._shift_data(data.sel({time_dim: slice(t_minus, t_plus)}), 

595 range(int(-extend_length_history), 

596 int(extend_length_future + 1)), 

597 time_dim, 

598 new_dim).sel({time_dim: t0}) 

599 else: 

600 tmp_filter_data = None 

601 valid_start = int(filtered[new_dim].min()) + int((len(h) + 1) / 2) 

602 valid_end = min(extend_length_opts + offset + 1, int(filtered[new_dim].max()) - int((len(h) + 1) / 2)) 

603 valid_range = range(valid_start, valid_end) 

604 plot_data.append({"t0": t0, 

605 "var": variable_name, 

606 "filter_input": filter_input_data.sel({time_dim: t0}), 

607 "filter_input_nc": tmp_filter_data, 

608 "valid_range": valid_range, 

609 "time_range": data.sel( 

610 {time_dim: slice(t_minus, t_plus - np.timedelta64(1, td_type))}).coords[ 

611 time_dim].values, 

612 "h": h, 

613 "new_dim": new_dim}) 

614 except: 

615 pass 

616 return plot_data 

617 

618 @staticmethod 

619 def _get_year_interval(data: xr.DataArray, time_dim: str) -> Tuple[int, int]: 

620 """ 

621 Get year of start and end date of given data. 

622 

623 :param data: data to extract dates from 

624 :param time_dim: name of temporal axis 

625 :returns: two-element tuple with start and end 

626 """ 

627 start = pd.to_datetime(data.coords[time_dim].min().values).year 

628 end = pd.to_datetime(data.coords[time_dim].max().values).year 

629 return start, end 

630 

631 @staticmethod 

632 def _calculate_filter_coefficients(window: Union[str, tuple], order: Union[int, tuple], cutoff_high: float, 

633 fs: float) -> np.array: 

634 """ 

635 Calculate filter coefficients for moving window using scipy's signal package for common filter types and local 

636 method firwin_kzf for Kolmogorov Zurbenko filter (kzf). The filter is a low-pass filter. 

637 

638 :param window: name of the window type which is either a string with the window's name or a tuple containing the 

639 name but also some parameters (e.g. `("kaiser", 5)`) 

640 :param order: order of the filter to create as int or parameters m and k of kzf 

641 :param cutoff_high: cutoff frequency to use for low-pass filter in frequency of fs 

642 :param fs: sampling frequency of time series 

643 """ 

644 if window == "kzf": 

645 h = firwin_kzf(*order) 

646 else: 

647 h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window) 

648 return h 

649 

650 @staticmethod 

651 def _trim_data_to_minimum_length(data: xr.DataArray, extend_length_history: int, dim: str, 

652 extend_length_future: int = 0) -> xr.DataArray: 

653 """ 

654 Trim data along given axis between either -minimum_length (if given) or -extend_length_history and 

655 extend_length_opts (which is default set to 0). 

656 

657 :param data: data to trim 

658 :param extend_length_history: start number for trim range (transformed to negative), only used if parameter 

659 minimum_length is not provided 

660 :param dim: dim to apply trim on 

661 :param extend_length_future: number to use in "future" 

662 :returns: trimmed data 

663 """ 

664 return data.sel({dim: slice(-extend_length_history, extend_length_future)}, drop=True) 

665 

666 @staticmethod 

667 def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str, 

668 display_name: str = None) -> xr.DataArray: 

669 """ 

670 Create result filter array with same shape line given template data (should be the original input data before 

671 filtering the data). All gaps are filled by nans. 

672 

673 :param template_array: this array is used as template for shape and ordering of dims 

674 :param result_array: array with data that are filled into template 

675 :param new_dim: new dimension which is shifted/appended to/at the end (if present or not) 

676 :param display_name: string that is attached to logging (default None) 

677 """ 

678 logging.debug(f"{display_name}: create res_full") 

679 new_coords = {**{k: template_array.coords[k].values for k in template_array.coords if k != new_dim}, 

680 new_dim: result_array.coords[new_dim]} 

681 dims = [*template_array.dims, new_dim] if new_dim not in template_array.dims else template_array.dims 

682 result_array = result_array.transpose(*dims) 

683 return result_array.broadcast_like(xr.DataArray(dims=dims, coords=new_coords)) 

684 

685 @TimeTrackingWrapper 

686 def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None, 

687 sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming", 

688 minimum_length=0, next_order=0, new_dim="window", plot_dates=None, display_name=None, 

689 extend_opts: int = 0, offset: int = 0): 

690 

691 logging.debug(f"{display_name}: extend apriori") 

692 

693 # calculate apriori information from data if not given and extend its range if not sufficient long enough 

694 if apriori is None: 

695 apriori = self.create_monthly_mean(data, time_dim, sel_opts=sel_opts, sampling=sampling) 

696 apriori = apriori.astype(data.dtype) 

697 apriori = self.extend_apriori(data, apriori, time_dim, sampling, display_name=display_name) 

698 

699 # calculate FIR filter coefficients 

700 h = self._calculate_filter_coefficients(window, order, cutoff_high, fs) 

701 length = len(h) 

702 

703 # set data extend that is required for filtering 

704 extend_length_history = minimum_length + int((next_order + 1) / 2) + int((length + 1) / 2) - offset 

705 extend_length_future = offset + int((next_order + 1) / 2) + int((length + 1) / 2) 

706 

707 # collect some data for visualization 

708 plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * fs 

709 if plot_dates is None: 

710 plot_dates = [data.isel({time_dim: int(pos)}).coords[time_dim].values for pos in plot_pos if 

711 pos < len(data.coords[time_dim])] 

712 plot_data = [] 

713 

714 coll = [] 

715 coll_input = [] 

716 

717 for var in reversed(data.coords[var_dim].values): 

718 logging.info(f"{display_name} ({var}): sel data") 

719 

720 _start, _end = self._get_year_interval(data, time_dim) 

721 filt_coll = [] 

722 filt_input_coll = [] 

723 for _year in range(_start, _end + 1): 

724 logging.debug(f"{display_name} ({var}): year={_year}") 

725 

726 # select observations and apriori data 

727 time_slice = self._create_time_range_extend( 

728 _year, sampling, max(extend_length_history, extend_length_future)) 

729 d = data.sel({var_dim: [var], time_dim: time_slice}) 

730 a = apriori.sel({var_dim: [var], time_dim: time_slice}) 

731 if len(d.coords[time_dim]) == 0: # no data at all for this year 731 ↛ 732line 731 didn't jump to line 732, because the condition on line 731 was never true

732 continue 

733 

734 # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length] 

735 filter_input_data = self.combine_observation_and_apriori(d, a, time_dim, new_dim, extend_length_history, 

736 extend_length_future, extend_length_separator=extend_opts) 

737 

738 # select only data for current year 

739 try: 

740 filter_input_data = filter_input_data.sel({time_dim: str(_year)}) 

741 except KeyError: # no valid data for this year 

742 continue 

743 if len(filter_input_data.coords[time_dim]) == 0: # no valid data for this year 743 ↛ 744line 743 didn't jump to line 744, because the condition on line 743 was never true

744 continue 

745 

746 # apply filter 

747 logging.debug(f"{display_name} ({var}): start filter convolve") 

748 with TimeTracking(name=f"{display_name} ({var}): filter convolve", logging_level=logging.DEBUG): 

749 filt = xr.apply_ufunc(fir_filter_convolve, filter_input_data, 

750 input_core_dims=[[new_dim]], output_core_dims=[[new_dim]], 

751 vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype]) 

752 

753 # trim data if required 

754 valid_range_end = int(filt.coords[new_dim].max() - (length + 1) / 2) + 1 

755 ext_len = min(extend_length_future, valid_range_end) 

756 trimmed = self._trim_data_to_minimum_length(filt, extend_length_history, new_dim, 

757 extend_length_future=ext_len) 

758 filt_coll.append(trimmed) 

759 trimmed = self._trim_data_to_minimum_length(filter_input_data, extend_length_history, new_dim, 

760 extend_length_future=ext_len) 

761 filt_input_coll.append(trimmed) 

762 

763 # visualization 

764 plot_data.extend(self.create_visualization(filt, d, filter_input_data, plot_dates, time_dim, new_dim, 

765 sampling, extend_length_history, extend_length_future, 

766 minimum_length, h, var, extend_opts, offset)) 

767 

768 # collect all filter results 

769 coll.append(xr.concat(filt_coll, time_dim)) 

770 coll_input.append(xr.concat(filt_input_coll, time_dim)) 

771 gc.collect() 

772 

773 # concat all variables 

774 logging.debug(f"{display_name}: concat all variables") 

775 res = xr.concat(coll, var_dim) 

776 res_input = xr.concat(coll_input, var_dim) 

777 

778 # create result array with same shape like input data, gaps are filled by nans 

779 res_full = self._create_full_filter_result_array(data, res, new_dim, display_name) 

780 res_input_full = self._create_full_filter_result_array(data, res_input, new_dim, display_name) 

781 return res_full, res_input_full, h, apriori, plot_data 

782 

783 @staticmethod 

784 def _create_time_range_extend(year: int, sampling: str, extend_length: int) -> slice: 

785 """ 

786 Create a slice object for given year plus extend_length in sampling resolution. 

787 

788 :param year: year to create time range for 

789 :param sampling: sampling of time range 

790 :param extend_length: number of time steps to extend out of given year 

791 :returns: slice object with time range 

792 """ 

793 td_type = {"1d": "D", "1H": "h"}.get(sampling) 

794 delta = np.timedelta64(extend_length + 1, td_type) 

795 start = np.datetime64(f"{year}-01-01") - delta 

796 end = np.datetime64(f"{year}-12-31") + delta 

797 return slice(start, end) 

798 

799 @staticmethod 

800 def _create_tmp_dimension(data: xr.DataArray) -> str: 

801 """ 

802 Create a tmp dimension with name 'window' preferably. If name is already part of one dimensions, tmp dimension 

803 name is multiplied by itself until not present in dims. Method will raise ValueError after 10 tries. 

804 

805 :param data: data array to create a new tmp dimension for with unique name 

806 :returns: valid name for a tmp dimension (preferably 'window') 

807 """ 

808 new_dim = "window" 

809 count = 0 

810 while new_dim in data.dims: 

811 new_dim += new_dim 

812 count += 1 

813 if count > 10: 

814 raise ValueError("Could not create new dimension.") 

815 return new_dim 

816 

817 def _shift_data(self, data: xr.DataArray, index_value: range, time_dim: str, new_dim: str) -> xr.DataArray: 

818 """ 

819 Shift data multiple times to create history or future along dimension new_dim for each time step. 

820 

821 :param data: data set to shift 

822 :param index_value: range of integers to span history and/or future 

823 :param time_dim: name of temporal dimension that should be shifted 

824 :param new_dim: name of dimension create by data shift 

825 :return: shifted data 

826 """ 

827 coll = [] 

828 for i in index_value: 

829 coll.append(data.shift({time_dim: -i})) 

830 new_ind = self.create_index_array(new_dim, index_value) 

831 return xr.concat(coll, dim=new_ind) 

832 

833 @staticmethod 

834 def create_index_array(index_name: str, index_value: range): 

835 """ 

836 Create index array from a range object to use as index of a data array. 

837 

838 :param index_name: name of the index dimension 

839 :param index_value: range of values to use as indexes 

840 :returns: index array for given range of values 

841 """ 

842 ind = pd.DataFrame({'val': index_value}, index=index_value) 

843 tmp_dim = index_name + "tmp" 

844 res = xr.Dataset.from_dataframe(ind).to_array(tmp_dim).rename({'index': index_name}) 

845 res = res.squeeze(dim=tmp_dim, drop=True) 

846 res.name = index_name 

847 return res 

848 

849 @property 

850 def apriori_data(self): 

851 return self._apriori_list 

852 

853 @property 

854 def initial_apriori_data(self): 

855 return self.apriori_data[0] 

856 

857 

858def fir_filter(data, fs, order=5, cutoff_low=None, cutoff_high=None, window="hamming", dim="variables", h=None, 

859 causal=True, padlen=None): 

860 """Expects xarray.""" 

861 if h is None: 

862 cutoff = [] 

863 if cutoff_low is not None: 

864 cutoff += [cutoff_low] 

865 if cutoff_high is not None: 

866 cutoff += [cutoff_high] 

867 if len(cutoff) == 2: 

868 filter_type = "bandpass" 

869 elif len(cutoff) == 1 and cutoff_low is not None: 

870 filter_type = "highpass" 

871 elif len(cutoff) == 1 and cutoff_high is not None: 

872 filter_type = "lowpass" 

873 else: 

874 raise ValueError("Please provide either cutoff_low or cutoff_high.") 

875 h = signal.firwin(order, cutoff, pass_zero=filter_type, fs=fs, window=window) 

876 filtered = xr.ones_like(data) 

877 for var in data.coords[dim]: 

878 d = data.sel({dim: var}).values.flatten() 

879 if causal: 

880 y = signal.lfilter(h, 1., d) 

881 else: 

882 padlen = padlen if padlen is not None else 3 * len(h) 

883 y = signal.filtfilt(h, 1., d, padlen=padlen) 

884 filtered.loc[{dim: var}] = y 

885 return filtered, h 

886 

887 

888def fir_filter_convolve(data, h): 

889 return signal.convolve(data, h, mode='same', method="direct") / sum(h) 

890 

891 

892class KolmogorovZurbenkoBaseClass: 

893 

894 def __init__(self, df, wl, itr, is_child=False, filter_dim="window"): 

895 """ 

896 It create the variables associate with the Kolmogorov-Zurbenko-filter. 

897 

898 Args: 

899 df(pd.DataFrame, None): time series of a variable 

900 wl(list of int): window length 

901 itr(list of int): number of iteration 

902 """ 

903 self.df = df 

904 self.filter_dim = filter_dim 

905 self.wl = to_list(wl) 

906 self.itr = to_list(itr) 

907 if abs(len(self.wl) - len(self.itr)) > 0: 

908 raise ValueError("Length of lists for wl and itr must agree!") 

909 self._isChild = is_child 

910 self.child = self.set_child() 

911 self.type = type(self).__name__ 

912 

913 def set_child(self): 

914 if len(self.wl) > 1: 

915 return KolmogorovZurbenkoBaseClass(None, self.wl[1:], self.itr[1:], True, self.filter_dim) 

916 else: 

917 return None 

918 

919 def kz_filter(self, df, m, k): 

920 pass 

921 

922 def spectral_calc(self): 

923 df_start = self.df 

924 kz = self.kz_filter(df_start, self.wl[0], self.itr[0]) 

925 filtered = self.subtract(df_start, kz) 

926 # case I: no child avail -> return kz and remaining 

927 if self.child is None: 

928 return [kz, filtered] 

929 # case II: has child -> return current kz and all child results 

930 else: 

931 self.child.df = filtered 

932 kz_next = self.child.spectral_calc() 

933 return [kz] + kz_next 

934 

935 @staticmethod 

936 def subtract(minuend, subtrahend): 

937 try: # pandas implementation 

938 return minuend.sub(subtrahend, axis=0) 

939 except AttributeError: # general implementation 

940 return minuend - subtrahend 

941 

942 def run(self): 

943 return self.spectral_calc() 

944 

945 def transfer_function(self): 

946 m = self.wl[0] 

947 k = self.itr[0] 

948 omega = np.linspace(0.00001, 0.15, 5000) 

949 return omega, (np.sin(m * np.pi * omega) / (m * np.sin(np.pi * omega))) ** (2 * k) 

950 

951 def omega_null(self, alpha=0.5): 

952 a = np.sqrt(6) / np.pi 

953 b = 1 / (2 * np.array(self.itr)) 

954 c = 1 - alpha ** b 

955 d = np.array(self.wl) ** 2 - alpha ** b 

956 return a * np.sqrt(c / d) 

957 

958 def period_null(self, alpha=0.5): 

959 return 1. / self.omega_null(alpha) 

960 

961 def period_null_days(self, alpha=0.5): 

962 return self.period_null(alpha) / 24. 

963 

964 def plot_transfer_function(self, fig=None, name=None): 

965 if fig is None: 

966 fig = plt.figure() 

967 omega, transfer_function = self.transfer_function() 

968 if self.child is not None: 

969 transfer_function_child = self.child.plot_transfer_function(fig) 

970 else: 

971 transfer_function_child = transfer_function * 0 

972 plt.semilogx(omega, transfer_function - transfer_function_child, 

973 label="m={:3.0f}, k={:3.0f}, T={:6.2f}d".format(self.wl[0], 

974 self.itr[0], 

975 self.period_null_days())) 

976 plt.axvline(x=self.omega_null()) 

977 if not self._isChild: 

978 locs, labels = plt.xticks() 

979 plt.xticks(locs, np.round(1. / (locs * 24), 1)) 

980 plt.xlim([0.00001, 0.15]) 

981 plt.legend() 

982 if name is None: 

983 plt.show() 

984 else: 

985 plt.savefig(name) 

986 else: 

987 return transfer_function 

988 

989 

990class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass): 

991 

992 def __init__(self, df, wl: Union[list, int], itr: Union[list, int], is_child=False, filter_dim="window", 

993 method="mean", percentile=0.5): 

994 """ 

995 It create the variables associate with the KolmogorovZurbenkoFilterMovingWindow class. 

996 

997 Args: 

998 df(pd.DataFrame, xr.DataArray): time series of a variable 

999 wl: window length 

1000 itr: number of iteration 

1001 """ 

1002 self.valid_methods = ["mean", "percentile", "median", "max", "min"] 

1003 if method not in self.valid_methods: 

1004 raise ValueError("Method '{}' is not supported. Please select from [{}].".format( 

1005 method, ", ".join(self.valid_methods))) 

1006 else: 

1007 self.method = method 

1008 if percentile > 1 or percentile < 0: 

1009 raise ValueError("Percentile must be in range [0, 1]. Given was {}!".format(percentile)) 

1010 else: 

1011 self.percentile = percentile 

1012 super().__init__(df, wl, itr, is_child, filter_dim) 

1013 

1014 def set_child(self): 

1015 if len(self.wl) > 1: 

1016 return KolmogorovZurbenkoFilterMovingWindow(self.df, self.wl[1:], self.itr[1:], is_child=True, 

1017 filter_dim=self.filter_dim, method=self.method, 

1018 percentile=self.percentile) 

1019 else: 

1020 return None 

1021 

1022 @TimeTrackingWrapper 

1023 def kz_filter_new(self, df, wl, itr): 

1024 """ 

1025 It passes the low frequency time series. 

1026 

1027 If filter method is from mean, max, min this method will call construct and rechunk before the actual 

1028 calculation to improve performance. If filter method is either median or percentile this approach is not 

1029 applicable and depending on the data and window size, this method can become slow. 

1030 

1031 Args: 

1032 wl(int): a window length 

1033 itr(int): a number of iteration 

1034 """ 

1035 warnings.filterwarnings("ignore") 

1036 df_itr = df.__deepcopy__() 

1037 try: 

1038 kwargs = {"min_periods": int(0.7 * wl), 

1039 "center": True, 

1040 self.filter_dim: wl} 

1041 for i in np.arange(0, itr): 

1042 print(i) 

1043 rolling = df_itr.chunk().rolling(**kwargs) 

1044 if self.method not in ["percentile", "median"]: 

1045 rolling = rolling.construct("construct").chunk("auto") 

1046 if self.method == "median": 

1047 df_mv_avg_tmp = rolling.median() 

1048 elif self.method == "percentile": 

1049 df_mv_avg_tmp = rolling.quantile(self.percentile) 

1050 elif self.method == "max": 

1051 df_mv_avg_tmp = rolling.max("construct") 

1052 elif self.method == "min": 

1053 df_mv_avg_tmp = rolling.min("construct") 

1054 else: 

1055 df_mv_avg_tmp = rolling.mean("construct") 

1056 df_itr = df_mv_avg_tmp.compute() 

1057 del df_mv_avg_tmp, rolling 

1058 gc.collect() 

1059 return df_itr 

1060 except ValueError: 

1061 raise ValueError 

1062 

1063 @TimeTrackingWrapper 

1064 def kz_filter(self, df, wl, itr): 

1065 """ 

1066 It passes the low frequency time series. 

1067 

1068 Args: 

1069 wl(int): a window length 

1070 itr(int): a number of iteration 

1071 """ 

1072 import warnings 

1073 warnings.filterwarnings("ignore") 

1074 df_itr = df.__deepcopy__() 

1075 try: 

1076 kwargs = {"min_periods": int(0.7 * wl), 

1077 "center": True, 

1078 self.filter_dim: wl} 

1079 iter_vars = df_itr.coords["variables"].values 

1080 for var in iter_vars: 

1081 df_itr_var = df_itr.sel(variables=[var]) 

1082 for _ in np.arange(0, itr): 

1083 df_itr_var = df_itr_var.chunk() 

1084 rolling = df_itr_var.rolling(**kwargs) 

1085 if self.method == "median": 

1086 df_mv_avg_tmp = rolling.median() 

1087 elif self.method == "percentile": 

1088 df_mv_avg_tmp = rolling.quantile(self.percentile) 

1089 elif self.method == "max": 

1090 df_mv_avg_tmp = rolling.max() 

1091 elif self.method == "min": 

1092 df_mv_avg_tmp = rolling.min() 

1093 else: 

1094 df_mv_avg_tmp = rolling.mean() 

1095 df_itr_var = df_mv_avg_tmp.compute() 

1096 df_itr.loc[{"variables": [var]}] = df_itr_var 

1097 return df_itr 

1098 except ValueError: 

1099 raise ValueError 

1100 

1101 

1102def firwin_kzf(m: int, k: int) -> np.array: 

1103 """Calculate weights of window for Kolmogorov Zurbenko filter.""" 

1104 m, k = int(m), int(k) 

1105 coef = np.ones(m) 

1106 for i in range(1, k): 

1107 t = np.zeros((m, m + i * (m - 1))) 

1108 for km in range(m): 

1109 t[km, km:km + coef.size] = coef 

1110 coef = np.sum(t, axis=0) 

1111 return coef / (m ** k) 

1112 

1113 

1114def omega_null_kzf(m: int, k: int, alpha: float = 0.5) -> float: 

1115 a = np.sqrt(6) / np.pi 

1116 b = 1 / (2 * np.array(k)) 

1117 c = 1 - alpha ** b 

1118 d = np.array(m) ** 2 - alpha ** b 

1119 return a * np.sqrt(c / d) 

1120 

1121 

1122def filter_width_kzf(m: int, k: int) -> int: 

1123 """Returns window width of the Kolmorogov Zurbenko filter.""" 

1124 return k * (m - 1) + 1