Coverage for mlair/helpers/filter.py: 45%
574 statements
« prev ^ index » next coverage.py v6.4.2, created at 2023-06-30 10:40 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2023-06-30 10:40 +0000
1import gc
2import warnings
3from typing import Union, Callable, Tuple, Dict, Any
4import logging
6import datetime
7import numpy as np
8import pandas as pd
9from matplotlib import pyplot as plt
10from scipy import signal, stats
11import xarray as xr
12import dask.array as da
14from mlair.helpers import to_list, TimeTrackingWrapper, TimeTracking
17class FIRFilter:
18 from mlair.plotting.data_insight_plotting import PlotFirFilter
20 def __init__(self, data, fs, order, cutoff, window, var_dim, time_dim, display_name=None, minimum_length=None,
21 extend_end=0, plot_path=None, plot_dates=None, offset=0):
22 self._filtered = []
23 self._h = []
24 self.data = data
25 self.fs = fs
26 self.order = order
27 self.cutoff = cutoff
28 self.window = window
29 self.var_dim = var_dim
30 self.time_dim = time_dim
31 self.display_name = display_name
32 self.minimum_length = minimum_length
33 self.extend_end = extend_end
34 self.offset = offset
35 self.plot_path = plot_path
36 self.plot_dates = plot_dates
37 self.run()
39 def run(self):
40 logging.info(f"{self.display_name}: start {self.__class__.__name__}")
41 filtered = []
42 h = []
43 input_data = self.data.__deepcopy__()
45 # collect some data for visualization
46 if self.plot_dates is None:
47 plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * self.fs
48 self.plot_dates = [input_data.isel({self.time_dim: int(pos)}).coords[self.time_dim].values
49 for pos in plot_pos if pos < len(input_data.coords[self.time_dim])]
50 plot_data = []
52 for i in range(len(self.order)):
53 # apply filter
54 fi, hi = self.fir_filter(input_data, self.fs, self.cutoff[i], self.order[i], time_dim=self.time_dim,
55 var_dim=self.var_dim, window=self.window, display_name=self.display_name)
56 filtered.append(fi)
57 h.append(hi)
59 # visualization
60 plot_data.append(self.create_visualization(fi, input_data, self.plot_dates, self.time_dim, self.fs, hi,
61 self.minimum_length, self.order, i, self.extend_end, self.var_dim))
62 # calculate residuum
63 input_data = input_data - fi
65 # add last residuum to filtered
66 filtered.append(input_data)
68 self._filtered = filtered
69 self._h = h
71 # visualize
72 if self.plot_path is not None:
73 try:
74 self.PlotFirFilter(self.plot_path, plot_data, self.display_name) # not working when t0 != 0
75 except Exception as e:
76 logging.info(f"Could not plot climate fir filter due to following reason:\n{e}")
78 def create_visualization(self, filtered, filter_input_data, plot_dates, time_dim, sampling,
79 h, minimum_length, order, i, extend_end, var_dim): # pragma: no cover
80 plot_data = []
81 minimum_length = minimum_length or 0
82 for viz_date in set(plot_dates).intersection(filtered.coords[time_dim].values):
83 try:
84 if i < len(order) - 1:
85 minimum_length += order[i+1]
87 td_type = {1: "D", 24: "h"}.get(sampling)
88 length = len(h)
89 extend_length_history = minimum_length + int((length + 1) / 2)
90 extend_length_future = int((length + 1) / 2) + 1
91 t_minus = viz_date + np.timedelta64(int(-extend_length_history), td_type)
92 t_plus = viz_date + np.timedelta64(int(extend_length_future + extend_end), td_type)
93 time_slice = slice(t_minus, t_plus - np.timedelta64(1, td_type))
94 plot_data.append({"t0": viz_date, "filter_input": filter_input_data.sel({time_dim: time_slice}),
95 "filtered": filtered.sel({time_dim: time_slice}), "h": h, "time_dim": time_dim,
96 "var_dim": var_dim})
97 except:
98 pass
99 return plot_data
101 @property
102 def filter_coefficients(self):
103 return self._h
105 @property
106 def filtered_data(self):
107 return self._filtered
109 @TimeTrackingWrapper
110 def fir_filter(self, data, fs, cutoff_high, order, sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming",
111 minimum_length=None, new_dim="window", plot_dates=None, display_name=None):
113 # calculate FIR filter coefficients
114 h = self._calculate_filter_coefficients(window, order, cutoff_high, fs)
116 coll = []
117 for var in data.coords[var_dim]:
118 d = data.sel({var_dim: var})
119 filt = xr.apply_ufunc(fir_filter_convolve, d,
120 input_core_dims=[[time_dim]], output_core_dims=[[time_dim]],
121 vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype])
122 # trim data to valid range
123 ext_len = int((len(h) + 1) / 2)
124 valid_range = filt.coords[time_dim].values[ext_len:-ext_len]
125 trimmed = filt.sel(**{time_dim: valid_range})
126 coll.append(trimmed)
127 filtered = xr.concat(coll, var_dim)
129 # create result array with same shape like input data, gaps are filled by nans
130 filtered = self._create_full_filter_result_array(data, filtered, time_dim, display_name)
131 return filtered, h
133 @staticmethod
134 def _calculate_filter_coefficients(window: Union[str, tuple], order: Union[int, tuple], cutoff_high: float,
135 fs: float) -> np.array:
136 """
137 Calculate filter coefficients for moving window using scipy's signal package for common filter types and local
138 method firwin_kzf for Kolmogorov Zurbenko filter (kzf). The filter is a low-pass filter.
140 :param window: name of the window type which is either a string with the window's name or a tuple containing the
141 name but also some parameters (e.g. `("kaiser", 5)`)
142 :param order: order of the filter to create as int or parameters m and k of kzf
143 :param cutoff_high: cutoff frequency to use for low-pass filter in frequency of fs
144 :param fs: sampling frequency of time series
145 """
146 if window == "kzf":
147 h = firwin_kzf(*order)
148 else:
149 h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window)
150 return h
152 @staticmethod
153 def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str,
154 display_name: str = None) -> xr.DataArray:
155 """
156 Create result filter array with same shape line given template data (should be the original input data before
157 filtering the data). All gaps are filled by nans.
159 :param template_array: this array is used as template for shape and ordering of dims
160 :param result_array: array with data that are filled into template
161 :param new_dim: new dimension which is shifted/appended to/at the end (if present or not)
162 :param display_name: string that is attached to logging (default None)
163 """
164 logging.debug(f"{display_name}: create res_full")
165 new_coords = {**{k: template_array.coords[k].values for k in template_array.coords if k != new_dim},
166 new_dim: result_array.coords[new_dim]}
167 dims = [*template_array.dims, new_dim] if new_dim not in template_array.dims else template_array.dims
168 result_array = result_array.transpose(*dims)
169 return result_array.broadcast_like(xr.DataArray(dims=dims, coords=new_coords))
172class ClimateFIRFilter(FIRFilter):
173 from mlair.plotting.data_insight_plotting import PlotClimateFirFilter
175 def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None,
176 apriori_diurnal=False, sel_opts=None, plot_path=None,
177 minimum_length=None, new_dim=None, display_name=None, extend_length_opts: int = 0,
178 extend_end: Union[dict, int] = 0, plot_dates=None, offset: int = 0):
179 """
180 :param data: data to filter
181 :param fs: sampling frequency in 1/days -> 1d: fs=1 -> 1H: fs=24
182 :param order: a tuple with the order of the filter in same ordering like cutoff
183 :param cutoff: a tuple with the cutoff frequencies (all are applied as low pass)
184 :param window: window type of the filter (e.g. hamming)
185 :param time_dim: name of time dimension to apply filter along
186 :param var_dim: name of variables dimension
187 :param apriori: apriori information to use for the first low pass. If None, climatology is calculated on the
188 provided data.
189 :param apriori_type: type of apriori information to use. Climatology will be used always for first low pass. For
190 the residuum either the value zero is used (apriori_type is None or "zeros") or a climatology on the
191 residua is used ("residuum_stats").
192 :param apriori_diurnal: Use diurnal cycle as additional apriori information (only applicable for hourly
193 resoluted data). The mean anomaly of each hour is added to the apriori_type information.
194 :param sel_opts: specify some parameters to select a subset of data before calculating the apriori information.
195 Use this parameter for example, if apriori shall only calculated on a shorter time period than available in
196 given data.
197 :param extend_length_opts: shift information switch between historical data and apriori estimation by the given
198 values (default None). Must either be a dictionary with keys available in var_dim or a single value that is
199 applied to all data. This parameter has only influence on which information is available at t0 for the
200 filter calculation but has no influence on the shape of the returned filtered data.
201 :param extend_end: This parameter indicates the number of time steps with ti>t0 to return of the filtered data.
202 In case the extend_end parameter is larger than the extend_length_opts parameter, this leads to the case
203 that not only observational data but also climatological estimations are returned. Must either be a
204 dictionary with keys available in var_dim or a single value that is applied to all data. Default is 0.
205 :param offset: Shift t0 by this timestep delta. Default is 0.
206 """
207 self._apriori = apriori
208 self.apriori_type = apriori_type
209 self.apriori_diurnal = apriori_diurnal
210 self._apriori_list = []
211 self.sel_opts = sel_opts
212 self.new_dim = new_dim
213 self.plot_data = []
214 self.extend_length_opts = extend_length_opts
215 super().__init__(data, fs, order, cutoff, window, var_dim, time_dim, display_name=display_name,
216 minimum_length=minimum_length, plot_path=plot_path, extend_end=extend_end, plot_dates=plot_dates, offset=offset)
218 def run(self):
219 filtered = []
220 h = []
221 forecasts = None
222 if self.sel_opts is not None:
223 self.sel_opts = self.sel_opts if isinstance(self.sel_opts, dict) else {self.time_dim: self.sel_opts}
224 self._check_sel_opts()
225 sampling = {1: "1d", 24: "1H"}.get(int(self.fs))
227 if self.new_dim in self.data.coords:
228 pseudo_timeseries = self.create_pseudo_timeseries(self.data, self.time_dim, sampling, self.new_dim)
229 forecasts = self.data.__deepcopy__()
230 self.data = pseudo_timeseries
232 logging.debug(f"{self.display_name}: create diurnal_anomalies")
233 if self.apriori_diurnal is True and sampling == "1H":
234 diurnal_anomalies = self.create_seasonal_hourly_mean(self.data, self.time_dim, sel_opts=self.sel_opts,
235 sampling=sampling, as_anomaly=True)
236 else:
237 diurnal_anomalies = 0
238 logging.debug(f"{self.display_name}: create monthly apriori")
239 if self._apriori is None:
240 self._apriori = self.create_monthly_mean(self.data, self.time_dim, sel_opts=self.sel_opts,
241 sampling=sampling) + diurnal_anomalies
242 logging.debug(f"{self.display_name}: apriori shape = {self._apriori.shape}")
243 apriori_list = to_list(self._apriori)
244 input_data = self.data.__deepcopy__()
246 # for visualization
247 plot_dates = self.plot_dates
249 # create tmp dimension to apply filter, search for unused name
250 new_dim = self._create_tmp_dimension(input_data) if self.new_dim is None else self.new_dim
252 for i in range(len(self.order)):
253 logging.info(f"{self.display_name}: start filter for order {self.order[i]}")
254 # calculate climatological filter
255 next_order = self._next_order(self.order, 0, i, self.window)
256 fi, input_data, hi, apriori, plot_data = self.clim_filter(input_data, self.fs, self.cutoff[i], self.order[i],
257 apriori=apriori_list[i], sel_opts=self.sel_opts,
258 sampling=sampling, time_dim=self.time_dim,
259 window=self.window, var_dim=self.var_dim,
260 minimum_length=self.minimum_length, new_dim=new_dim,
261 plot_dates=plot_dates, display_name=self.display_name,
262 extend_opts=self.extend_length_opts,
263 extend_end=self.extend_end, next_order=next_order,
264 forecasts=forecasts, offset=self.offset)
266 logging.info(f"{self.display_name}: finished clim_filter calculation")
267 if self.minimum_length is None:
268 filtered.append(fi.sel({new_dim: slice(None, self.extend_end)}))
269 else:
270 filtered.append(fi.sel({new_dim: slice(self.extend_end - self.minimum_length, self.extend_end)}))
271 h.append(hi)
272 gc.collect()
273 forecasts = None # reset/disable forecasts after first filter iteration
274 self.plot_data.append(plot_data)
275 plot_dates = {e["t0"] for e in plot_data}
277 # calculate residuum
278 logging.info(f"{self.display_name}: calculate residuum")
279 coord_range = range(int(fi.coords[new_dim].values.min()), int(fi.coords[new_dim].values.max() + 1))
280 if new_dim in input_data.coords:
281 input_data = input_data.sel({new_dim: coord_range}) - fi
282 else:
283 input_data = self._shift_data(input_data, coord_range, self.time_dim, new_dim) - fi
285 # create new apriori information for next iteration if no further apriori is provided
286 if len(apriori_list) < len(self.order):
287 logging.info(f"{self.display_name}: create diurnal_anomalies")
288 if self.apriori_diurnal is True and sampling == "1H":
289 diurnal_anomalies = self.create_seasonal_hourly_mean(input_data.sel({new_dim: 0}, drop=True),
290 self.time_dim, sel_opts=self.sel_opts,
291 sampling=sampling, as_anomaly=True)
292 else:
293 diurnal_anomalies = 0
294 logging.info(f"{self.display_name}: create monthly apriori")
295 if self.apriori_type is None or self.apriori_type == "zeros": # zero version
296 apriori_list.append(xr.zeros_like(apriori_list[i]) + diurnal_anomalies)
297 elif self.apriori_type == "residuum_stats": # calculate monthly statistic on residuum
298 apriori_list.append(
299 -self.create_monthly_mean(input_data.sel({new_dim: 0}, drop=True), self.time_dim,
300 sel_opts=self.sel_opts, sampling=sampling) + diurnal_anomalies)
301 else:
302 raise ValueError(f"Cannot handle unkown apriori type: {self.apriori_type}. Please choose from None,"
303 f" `zeros` or `residuum_stats`.")
305 # add last residuum to filtered
306 if self.minimum_length is None:
307 filtered.append(input_data.sel({new_dim: slice(None, self.extend_end)}))
308 else:
309 filtered.append(input_data.sel({new_dim: slice(self.extend_end - self.minimum_length, self.extend_end)}))
311 self._filtered = filtered
312 self._h = h
313 self._apriori_list = apriori_list
315 # visualize
316 if self.plot_path is not None:
317 try:
318 self.PlotClimateFirFilter(self.plot_path, self.plot_data, sampling, self.display_name)
319 except Exception as e:
320 logging.info(f"Could not plot climate fir filter due to following reason:\n{e}")
322 def _check_sel_opts(self):
323 if len(self.data.sel(**self.sel_opts).coords[self.time_dim]) == 0:
324 raise ValueError(f"Abort {self.__class__.__name__} as no data is available after applying sel_opts to data")
326 @staticmethod
327 def _next_order(order: list, minimum_length: Union[int, None], pos: int, window: Union[str, tuple]) -> int:
328 next_order = 0
329 if pos + 1 < len(order):
330 next_order = order[pos + 1]
331 if window == "kzf" and isinstance(next_order, tuple):
332 next_order = filter_width_kzf(*next_order)
333 if minimum_length is not None:
334 next_order = next_order + minimum_length
335 return next_order
337 @staticmethod
338 def create_monthly_unity_array(data: xr.DataArray, time_dim: str, extend_range: int = 366) -> xr.DataArray:
339 """
340 Create a xarray data array filled with ones with monthly resolution (set on 16th of month). Data is extended by
341 extend_range days in future and past along time_dim.
343 :param data: data to create monthly unity array from, must contain dimension time_dim
344 :param time_dim: name of temporal dimension
345 :param extend_range: number of days to extend data (default 366)
346 :returns: xarray in monthly resolution (centered at 16th day of month) with all values equal to 1
347 """
348 coords = data.coords
350 # extend time_dim by given extend_range days
351 start = coords[time_dim][0].values.astype("datetime64[D]") - np.timedelta64(extend_range, "D")
352 end = coords[time_dim][-1].values.astype("datetime64[D]") + np.timedelta64(extend_range, "D")
353 new_time_axis = np.arange(start, end).astype("datetime64[ns]")
355 # construct data array with updated coords
356 new_coords = {k: data.coords[k].values if k != time_dim else new_time_axis for k in coords}
357 new_array = xr.DataArray(1, coords=new_coords, dims=new_coords.keys()).transpose(*data.dims)
359 # loffset is required because resampling uses last day in month as resampling timestamp
360 return new_array.resample({time_dim: "1m"}, loffset=datetime.timedelta(days=-15)).max()
362 def create_monthly_mean(self, data: xr.DataArray, time_dim: str, sel_opts: dict = None, sampling: str = "1d") \
363 -> xr.DataArray:
364 """
365 Calculate monthly means (12 values) and return a data array with same resolution as given data containing these
366 monthly mean values. Sampling points are the 16th of each month (this value is equal to the true monthly mean)
367 and all other values between two points are interpolated linearly. It is possible to apply some pre-selection
368 to use only a subset of given data using the sel_opts parameter. Only data from this subset are used to
369 calculate the monthly statistic.
371 :param data: data to apply statistical calculation on
372 :param time_dim: name of temporal axis
373 :param sel_opts: selection options as dict to select a subset of data (default None). A given sel_opts with
374 `sel_opts={<time_dim>: "2006"}` forces the method e.g. to derive the monthly means only from data of the
375 year 2006.
376 :param sampling: sampling of the returned data (default 1d)
377 :returns: array in desired resolution containing interpolated monthly values. Months with no valid data are
378 returned as np.nan which also effects data in the neighbouring months (before / after sampling points which
379 are the 16th of each month).
380 """
382 # create unity xarray in monthly resolution with sampling point in mid of each month
383 monthly = self.create_monthly_unity_array(data, time_dim) * np.nan
385 # apply selection if given (only use subset for monthly means)
386 if sel_opts is not None:
387 data = data.sel(**sel_opts)
389 # create monthly mean and replace entries in unity array
390 monthly_mean = data.groupby(f"{time_dim}.month").mean()
391 for month in monthly_mean.month.values:
392 monthly = xr.where((monthly[f"{time_dim}.month"] == month),
393 monthly_mean.sel(month=month, drop=True),
394 monthly)
395 # transform monthly information into original sampling rate
396 return monthly.dropna(dim=time_dim).resample({time_dim: sampling}).interpolate()
398 @staticmethod
399 def _compute_hourly_mean_per_month(data: xr.DataArray, time_dim: str, as_anomaly: bool) -> Dict[int, xr.DataArray]:
400 """
401 Calculate for each hour in each month a separate mean value (12 x 24 values in total). Average is either the
402 anomaly of a monthly mean state or the raw mean value.
404 :param data: data to calculate averages on
405 :param time_dim: name of temporal dimension
406 :param as_anomaly: indicates whether to calculate means as anomaly of a monthly mean or as raw mean values.
407 :returns: dictionary containing 12 months each with a 24-valued array (1 entry for each hour)
408 """
409 seasonal_hourly_means = {}
410 for month in data.groupby(f"{time_dim}.month").groups.keys():
411 single_month_data = data.sel({time_dim: (data[f"{time_dim}.month"] == month)})
412 hourly_mean = single_month_data.groupby(f"{time_dim}.hour").mean()
413 if as_anomaly is True:
414 hourly_mean = hourly_mean - hourly_mean.mean("hour")
415 seasonal_hourly_means[month] = hourly_mean
416 return seasonal_hourly_means
418 @staticmethod
419 def _create_seasonal_cycle_of_single_hour_mean(result_arr: xr.DataArray, means: Dict[int, xr.DataArray], hour: int,
420 time_dim: str, sampling: str) -> xr.DataArray:
421 """
422 Use monthly means of a given hour to create an array with interpolated values at the indicated hour for each day
423 of the full time span indicated by given result_arr.
425 :param result_arr: template array indicating the full time range and additional dimensions to keep
426 :param means: dictionary containing 24 hourly averages for each month (12 x 24 values in total)
427 :param hour: integer of hour of interest
428 :param time_dim: name of temporal dimension
429 :param sampling: sampling rate to interpolate
430 :returns: array with interpolated averages in sampling resolution containing only values for hour of interest
431 """
432 h_coll = xr.ones_like(result_arr) * np.nan
433 for month in means.keys():
434 hourly_mean_single_month = means[month].sel(hour=hour, drop=True)
435 h_coll = xr.where((h_coll[f"{time_dim}.month"] == month), hourly_mean_single_month, h_coll)
436 h_coll = h_coll.dropna(time_dim).resample({time_dim: sampling}).interpolate()
437 h_coll = h_coll.sel({time_dim: (h_coll[f"{time_dim}.hour"] == hour)})
438 return h_coll
440 def create_seasonal_hourly_mean(self, data: xr.DataArray, time_dim: str, sel_opts: Dict[str, Any] = None,
441 sampling: str = "1H", as_anomaly: bool = True) -> xr.DataArray:
442 """
443 Compute climatological statistics on hourly base either as raw data or anomalies. For each month, an overall
444 mean value (only used if requiring anomalies) and the mean of each hour are calculated. The climatological
445 diurnal cycle is positioned on the 16th of each month and interpolated in between by using a distinct
446 interpolation for each hour of day. The returned array therefore contains data with a yearly cycle (if anomaly
447 is not calculated) or data without a yearly cycle (if using anomalies). In both cases, the data have an
448 amplitude that varies over the year.
450 :param data: data to apply this method to
451 :param time_dim: name of temporal axis
452 :param sel_opts: specific selection options that are applied before calculation of climatological statistics
453 (default None)
454 :param sampling: temporal resolution of data (default "1H")
455 :param as_anomaly: specify whether to use anomalies or raw data including a seasonal cycle of the mean value
456 (default: True)
457 :returns: climatological statistics for given data interpolated with given sampling rate
458 """
460 """Calculate hourly statistics. Either the absolute value or the anomaly (as_anomaly=True)."""
461 # can only be used for hourly sampling rate
462 assert sampling == "1H"
464 # apply selection if given (only use subset for seasonal hourly means)
465 if sel_opts is not None:
466 data = data.sel(**sel_opts)
468 # create unity xarray in monthly resolution with sampling point in mid of each month
469 monthly = self.create_monthly_unity_array(data, time_dim) * np.nan
471 # calculate for each hour in each month a separate mean value
472 seasonal_hourly_means = self._compute_hourly_mean_per_month(data, time_dim, as_anomaly)
474 # create seasonal cycles of these hourly averages
475 seasonal_coll = []
476 for hour in data.groupby(f"{time_dim}.hour").groups.keys():
477 mean_single_hour = self._create_seasonal_cycle_of_single_hour_mean(monthly, seasonal_hourly_means, hour,
478 time_dim, sampling)
479 seasonal_coll.append(mean_single_hour)
481 # combine all cycles in a common data array
482 hourly = xr.concat(seasonal_coll, time_dim).sortby(time_dim).resample({time_dim: sampling}).interpolate()
483 return hourly
485 @staticmethod
486 def extend_apriori(data: xr.DataArray, apriori: xr.DataArray, time_dim: str, sampling: str = "1d",
487 display_name: str = None) -> xr.DataArray:
488 """
489 Extend time range of apriori information to span a longer period as data (or at least of equal length). This
490 method may not working properly if length of apriori contains data from less then one year.
492 :param data: data to get time range of which apriori should span in minimum
493 :param apriori: data that is adjusted. It is assumed that this data varies in the course of the year but is same
494 for the same day in different years. Otherwise this method will introduce some unintended artefacts in the
495 apriori data.
496 :param time_dim: name of temporal dimension
497 :param sampling: sampling of data (e.g. "1m", "1d", default "1d")
498 :param display_name: name to use for logging message (default None)
499 :returns: array which adjusted temporal coverage derived from apriori
500 """
501 dates = data.coords[time_dim].values
502 td_type = {"1d": "D", "1H": "h"}.get(sampling)
504 # apriori starts after data
505 if dates[0] < apriori.coords[time_dim].values[0]:
506 logging.debug(f"{display_name}: apriori starts after data")
508 # add difference in full years
509 date_diff = abs(dates[0] - apriori.coords[time_dim].values[0]).astype("timedelta64[D]")
510 extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365
511 factor = 1 if td_type == "D" else 24
513 # get fill data range
514 start = apriori.coords[time_dim][0].values.astype("datetime64[%s]" % td_type)
515 end = apriori.coords[time_dim][0].values.astype("datetime64[%s]" % td_type) + np.timedelta64(
516 366 * factor + 1, td_type)
518 # fill year by year
519 for i in range(365, extend_range + 365, 365):
520 apriori_tmp = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date
521 new_time_axis = apriori_tmp.coords[time_dim] - np.timedelta64(i * factor, td_type)
522 apriori_tmp.coords[time_dim] = new_time_axis
523 apriori = apriori.combine_first(apriori_tmp)
525 # apriori ends before data
526 if dates[-1] + np.timedelta64(365, "D") > apriori.coords[time_dim].values[-1]:
527 logging.debug(f"{display_name}: apriori ends before data")
529 # add difference in full years + 1 year (because apriori is used as future estimate)
530 date_diff = abs(dates[-1] - apriori.coords[time_dim].values[-1]).astype("timedelta64[D]")
531 extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 + 365
532 factor = 1 if td_type == "D" else 24
534 # get fill data range
535 start = apriori.coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) - np.timedelta64(
536 366 * factor + 1, td_type)
537 end = apriori.coords[time_dim][-1].values.astype("datetime64[%s]" % td_type)
539 # fill year by year
540 for i in range(365, extend_range + 365, 365):
541 apriori_tmp = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date
542 new_time_axis = apriori_tmp.coords[time_dim] + np.timedelta64(i * factor, td_type)
543 apriori_tmp.coords[time_dim] = new_time_axis
544 apriori = apriori.combine_first(apriori_tmp)
546 return apriori
548 @staticmethod
549 def get_forecast_run_delta(data, time_dim):
550 time_data = data.coords[time_dim].values
551 deltas = (time_data[1:] - time_data[:-1]) / np.timedelta64(1, "h")
552 return stats.mode(deltas).mode[0]
554 def combine_observation_and_apriori(self, data: xr.DataArray, apriori: xr.DataArray, time_dim: str, new_dim: str,
555 extend_length_history: int, extend_length_future: int,
556 extend_length_separator: int = 0, forecasts: xr.DataArray = None,
557 sampling: str = "1H", extend_end: int = 0, offset: int = 0) -> xr.DataArray:
558 """
559 Combine historical data / observations ("data") and climatological statistics ("apriori"). Historical data are
560 used on interval [t0 - extend_length_history, t0] and apriori is used on [t0 + 1, t0 + extend_length_future]. If
561 indicated by the extend_length_seperator, it is possible to shift end of history interval and start of apriori
562 interval by given number of time steps.
564 :param data: historical data for past values, must contain dimensions time_dim and var_dim and might also have
565 a new_dim dimension
566 :param apriori: climatological estimate for future values, must contain dimensions time_dim and var_dim, but
567 can also have dimension new_dim
568 :param time_dim: name of temporal dimension
569 :param new_dim: name of new dim on which data is combined along
570 :param extend_length_history: number of time steps to use from data
571 :param extend_length_future: number of time steps to use from apriori (minus 1)
572 :param extend_length_separator: position of last history value to use (default 0), this position indicates the
573 last value that is used from data (followed by values from apriori). In other words, end of history
574 interval and start of apriori interval are shifted by this value from t0 (positive or negative).
575 :returns: combined data array
576 """
577 # prepare historical data / observation
578 if forecasts is None: 578 ↛ 581line 578 didn't jump to line 581, because the condition on line 578 was never false
579 ext_sep = offset + min(extend_end, extend_length_future, extend_length_separator)
580 else:
581 ext_sep = min(offset, extend_length_future)
582 if new_dim not in data.coords:
583 history = self._shift_data(data, range(int(-extend_length_history), ext_sep + 1),
584 time_dim, new_dim)
585 else:
586 history = data.sel({new_dim: slice(int(-extend_length_history), ext_sep)})
588 if forecasts is not None: 588 ↛ 589line 588 didn't jump to line 589, because the condition on line 588 was never true
589 forecast_delta = self.get_forecast_run_delta(forecasts, time_dim)
590 forecast_end = offset + min(extend_end, extend_length_separator)
591 for lead_time in forecasts.coords[new_dim]:
592 delta = np.timedelta64(int(lead_time - offset), {"1d": "D", "1H": "h"}.get(sampling))
593 forecasts_tmp = forecasts.sel({new_dim: slice(None, forecast_end + lead_time)})
594 forecasts_tmp.coords[time_dim] = forecasts_tmp.coords[time_dim] + delta
595 forecasts_tmp.coords[new_dim] = forecasts_tmp.coords[new_dim] + offset - lead_time
596 history = history.combine_first(forecasts_tmp)
597 # history.plot()
598 if lead_time >= forecast_delta - 1: # stop when all gaps are filled
599 break
601 if extend_length_future > ext_sep + 1: 601 ↛ 614line 601 didn't jump to line 614, because the condition on line 601 was never false
602 # prepare climatological statistics
603 if new_dim not in apriori.coords:
604 future = self._shift_data(apriori, range(ext_sep + 1,
605 offset + extend_length_future + 1),
606 time_dim, new_dim)
607 else:
608 future = apriori.sel({new_dim: slice(ext_sep + 1, offset + extend_length_future)})
609 # combine historical data [t0-length,t0+sep] and climatological statistics [t0+sep+1,t0+length]
610 filter_input_data = history.combine_first(future)
611 # filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left")
612 return filter_input_data.dropna(time_dim)
613 else:
614 return history
616 @staticmethod
617 def create_full_time_dim(data, dim, freq):
618 """Ensure time dimension to be equidistant. Sometimes dates if missing values have been dropped."""
619 start = data.coords[dim].values[0]
620 end = data.coords[dim].values[-1]
621 datetime_index = pd.DataFrame(index=pd.date_range(start, end, freq=freq))
622 t = data.sel({dim: start}, drop=True)
623 res = xr.DataArray(coords=[datetime_index.index, *[t.coords[c] for c in t.coords]], dims=[dim, *t.coords])
624 res = res.transpose(*data.dims)
625 res.loc[data.coords] = data
626 return res
628 def create_pseudo_timeseries(self, data, time_dim, sampling, window_dim):
629 empty_data = data.sel({window_dim: 0}, drop=True) * np.nan
630 data_full_time_index = self.create_full_time_dim(empty_data, time_dim, sampling)
631 for lead_time in data.coords[window_dim]:
632 data_tmp = data.sel({window_dim: lead_time}, drop=True)
633 delta = np.timedelta64(int(lead_time), {"1d": "D", "1H": "h"}.get(sampling))
634 data_tmp.coords[time_dim] = data_tmp.coords[time_dim] + delta
635 data_full_time_index = data_full_time_index.combine_first(data_tmp)
636 if data_full_time_index.isnull().sum() == 0: # stop when all gaps are filled
637 break
638 return data_full_time_index
640 def create_visualization(self, filtered, data, filter_input_data, plot_dates, time_dim, new_dim, sampling,
641 extend_length_history, extend_length_future, minimum_length, h,
642 variable_name, extend_length_opts=None, extend_end=None, offset=None, forecast=None): # pragma: no cover
644 plot_data = []
645 extend_end = 0 if extend_end is None else extend_end
646 extend_length_opts = 0 if extend_length_opts is None else extend_length_opts
647 offset = 0 if offset is None else offset
648 for t0 in set(plot_dates).intersection(filtered.coords[time_dim].values):
649 try:
650 td_type = {"1d": "D", "1H": "h"}.get(sampling)
651 t_minus = t0 + np.timedelta64(int(-extend_length_history), td_type)
652 t_plus = t0 + np.timedelta64(int(extend_length_future + 1), td_type)
653 if new_dim not in data.coords:
654 tmp_filter_data = self._shift_data(data.sel({time_dim: slice(t_minus, t_plus)}),
655 range(int(-extend_length_history),
656 int(extend_length_future + 1)),
657 time_dim,
658 new_dim).sel({time_dim: t0})
659 else:
660 tmp_filter_data = None
661 filter_input = filter_input_data.sel({time_dim: t0, new_dim: slice(None, extend_length_future)})
662 valid_start = int(filtered[new_dim].min()) + int((len(h) + 1) / 2)
663 valid_end = min(extend_length_opts + extend_end + 1, int(filtered[new_dim].max()) - int((len(h) + 1) / 2))
664 valid_range = range(valid_start, valid_end)
665 # if forecast is not None:
666 # forecast_deltas = (t0 - forecast.coords[time_dim]) / np.timedelta64(1, "h") + 12
667 # minimal_forecast_delta = int(forecast_deltas[forecast_deltas >= 0][-1])
668 # init_time = forecast.coords[time_dim][forecast_deltas >= 0][-1]
669 # forecast_end = min(extend_end, extend_length_opts)
670 # f = forecast.sel({time_dim: init_time, new_dim: slice(None, forecast_end)})
671 # f.coords[time_dim] = t0
672 # f.coords[new_dim] = f.coords[new_dim] - minimal_forecast_delta + offset
673 # else:
674 # f = None
675 # correct all data for offset
676 def correct_data_for_offset(d):
677 d = d.__deepcopy__()
678 d.coords[time_dim] = d.coords[time_dim] + np.timedelta64(int(offset), td_type)
679 d.coords[new_dim] = d.coords[new_dim] - offset
680 return d
681 plot_data.append({"t0": t0 + np.timedelta64(int(offset), td_type),
682 "var": variable_name,
683 "filter_input": correct_data_for_offset(filter_input),
684 "filter_input_nc": correct_data_for_offset(tmp_filter_data),
685 "valid_range": range(valid_range.start - offset, valid_range.stop - offset),
686 # "forecast": f,
687 "time_range": data.sel(
688 {time_dim: slice(t_minus, t_plus - np.timedelta64(1, td_type))}).coords[
689 time_dim].values,
690 "h": h,
691 "new_dim": new_dim})
692 except:
693 pass
694 return plot_data
696 @staticmethod
697 def _get_year_interval(data: xr.DataArray, time_dim: str) -> Tuple[int, int]:
698 """
699 Get year of start and end date of given data.
701 :param data: data to extract dates from
702 :param time_dim: name of temporal axis
703 :returns: two-element tuple with start and end
704 """
705 start = pd.to_datetime(data.coords[time_dim].min().values).year
706 end = pd.to_datetime(data.coords[time_dim].max().values).year
707 return start, end
709 @staticmethod
710 def _calculate_filter_coefficients(window: Union[str, tuple], order: Union[int, tuple], cutoff_high: float,
711 fs: float) -> np.array:
712 """
713 Calculate filter coefficients for moving window using scipy's signal package for common filter types and local
714 method firwin_kzf for Kolmogorov Zurbenko filter (kzf). The filter is a low-pass filter.
716 :param window: name of the window type which is either a string with the window's name or a tuple containing the
717 name but also some parameters (e.g. `("kaiser", 5)`)
718 :param order: order of the filter to create as int or parameters m and k of kzf
719 :param cutoff_high: cutoff frequency to use for low-pass filter in frequency of fs
720 :param fs: sampling frequency of time series
721 """
722 if window == "kzf":
723 h = firwin_kzf(*order)
724 else:
725 h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window)
726 return h
728 @staticmethod
729 def _trim_data_to_minimum_length(data: xr.DataArray, extend_length_history: int, dim: str,
730 extend_length_future: int = 0, offset: int = 0) -> xr.DataArray:
731 """
732 Trim data along given axis between either -minimum_length (if given) or -extend_length_history and
733 extend_length_opts (which is default set to 0).
735 :param data: data to trim
736 :param extend_length_history: start number for trim range, only used if parameter minimum_length is not provided
737 :param dim: dim to apply trim on
738 :param extend_length_future: number to use in "future"
739 :returns: trimmed data
740 """
741 return data.sel({dim: slice(extend_length_history + offset, extend_length_future + offset)}, drop=True)
743 @staticmethod
744 def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str,
745 display_name: str = None) -> xr.DataArray:
746 """
747 Create result filter array with same shape line given template data (should be the original input data before
748 filtering the data). All gaps are filled by nans.
750 :param template_array: this array is used as template for shape and ordering of dims
751 :param result_array: array with data that are filled into template
752 :param new_dim: new dimension which is shifted/appended to/at the end (if present or not)
753 :param display_name: string that is attached to logging (default None)
754 """
755 logging.debug(f"{display_name}: create res_full")
756 new_coords = {**{k: template_array.coords[k].values for k in template_array.coords if k != new_dim},
757 new_dim: result_array.coords[new_dim]}
758 dims = [*template_array.dims, new_dim] if new_dim not in template_array.dims else template_array.dims
759 result_array = result_array.transpose(*dims)
760 return result_array.broadcast_like(xr.DataArray(dims=dims, coords=new_coords))
762 @TimeTrackingWrapper
763 def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None,
764 sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming",
765 minimum_length=0, next_order=0, new_dim="window", plot_dates=None, display_name=None,
766 extend_opts: int = 0, extend_end: int = 0, forecasts=None, offset: int = 0):
768 logging.debug(f"{display_name}: extend apriori")
770 # calculate apriori information from data if not given and extend its range if not sufficient long enough
771 if apriori is None:
772 apriori = self.create_monthly_mean(data, time_dim, sel_opts=sel_opts, sampling=sampling)
773 apriori = apriori.astype(data.dtype)
774 apriori = self.extend_apriori(data, apriori, time_dim, sampling, display_name=display_name)
776 # calculate FIR filter coefficients
777 h = self._calculate_filter_coefficients(window, order, cutoff_high, fs)
778 length = len(h)
780 # set data extend that is required for filtering
781 extend_length_history = minimum_length + int((next_order + 1) / 2) + int((length + 1) / 2) - extend_end
782 extend_length_future = extend_end + int((next_order + 1) / 2) + int((length + 1) / 2)
784 # collect some data for visualization
785 plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * fs
786 if plot_dates is None:
787 plot_dates = [data.isel({time_dim: int(pos)}).coords[time_dim].values for pos in plot_pos if
788 pos < len(data.coords[time_dim])]
789 plot_data = []
791 coll = []
792 coll_input = []
794 for var in reversed(data.coords[var_dim].values):
795 logging.info(f"{display_name} ({var}): sel data")
797 _start, _end = self._get_year_interval(data, time_dim)
798 filt_coll = []
799 filt_input_coll = []
800 for _year in range(_start, _end + 1):
801 logging.debug(f"{display_name} ({var}): year={_year}")
803 # select observations and apriori data
804 time_slice = self._create_time_range_extend(
805 _year, sampling, max(extend_length_history, extend_length_future))
806 d = data.sel({var_dim: [var], time_dim: time_slice})
807 a = apriori.sel({var_dim: [var], time_dim: time_slice})
808 f = forecasts.sel({var_dim: [var]}) if forecasts is not None else None
809 if len(d.coords[time_dim]) == 0: # no data at all for this year 809 ↛ 810line 809 didn't jump to line 810, because the condition on line 809 was never true
810 continue
812 # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length]
813 filter_input_data = self.combine_observation_and_apriori(d, a, time_dim, new_dim, extend_length_history,
814 extend_length_future, extend_length_separator=extend_opts, forecasts=f, sampling=sampling,
815 extend_end=extend_end, offset=offset)
817 # select only data for current year
818 try:
819 filter_input_data = filter_input_data.sel({time_dim: str(_year)})
820 except KeyError: # no valid data for this year
821 continue
822 if len(filter_input_data.coords[time_dim]) == 0: # no valid data for this year 822 ↛ 823line 822 didn't jump to line 823, because the condition on line 822 was never true
823 continue
825 # apply filter
826 logging.debug(f"{display_name} ({var}): start filter convolve")
827 with TimeTracking(name=f"{display_name} ({var}): filter convolve", logging_level=logging.DEBUG):
828 filt = xr.apply_ufunc(fir_filter_convolve, filter_input_data,
829 input_core_dims=[[new_dim]], output_core_dims=[[new_dim]],
830 vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype])
832 # trim data if required
833 valid_range_start = int(filt.coords[new_dim].min() + (length + 1) / 2)
834 valid_range_end = int(filt.coords[new_dim].max() - (length + 1) / 2) + 1
835 ext_len = min(extend_length_future, valid_range_end)
836 trimmed = self._trim_data_to_minimum_length(filt, valid_range_start, new_dim,
837 extend_length_future=ext_len)
838 filt_coll.append(trimmed)
839 trimmed = self._trim_data_to_minimum_length(filter_input_data, valid_range_start, new_dim,
840 extend_length_future=ext_len)
841 filt_input_coll.append(trimmed)
843 # visualization
844 plot_data.extend(self.create_visualization(filt, d, filter_input_data, plot_dates, time_dim, new_dim,
845 sampling, extend_length_history, extend_length_future,
846 minimum_length, h, var, extend_opts, extend_end, offset, f))
848 # collect all filter results
849 coll.append(xr.concat(filt_coll, time_dim))
850 coll_input.append(xr.concat(filt_input_coll, time_dim))
851 gc.collect()
853 # concat all variables
854 logging.debug(f"{display_name}: concat all variables")
855 res = xr.concat(coll, var_dim)
856 res_input = xr.concat(coll_input, var_dim)
858 # create result array with same shape like input data, gaps are filled by nans
859 res_full = self._create_full_filter_result_array(data, res, new_dim, display_name)
860 res_input_full = self._create_full_filter_result_array(data, res_input, new_dim, display_name)
861 return res_full, res_input_full, h, apriori, plot_data
863 @staticmethod
864 def _create_time_range_extend(year: int, sampling: str, extend_length: int) -> slice:
865 """
866 Create a slice object for given year plus extend_length in sampling resolution.
868 :param year: year to create time range for
869 :param sampling: sampling of time range
870 :param extend_length: number of time steps to extend out of given year
871 :returns: slice object with time range
872 """
873 td_type = {"1d": "D", "1H": "h"}.get(sampling)
874 delta = np.timedelta64(extend_length + 1, td_type)
875 start = np.datetime64(f"{year}-01-01") - delta
876 end = np.datetime64(f"{year}-12-31") + delta
877 return slice(start, end)
879 @staticmethod
880 def _create_tmp_dimension(data: xr.DataArray) -> str:
881 """
882 Create a tmp dimension with name 'window' preferably. If name is already part of one dimensions, tmp dimension
883 name is multiplied by itself until not present in dims. Method will raise ValueError after 10 tries.
885 :param data: data array to create a new tmp dimension for with unique name
886 :returns: valid name for a tmp dimension (preferably 'window')
887 """
888 new_dim = "window"
889 count = 0
890 while new_dim in data.dims:
891 new_dim += new_dim
892 count += 1
893 if count > 10:
894 raise ValueError("Could not create new dimension.")
895 return new_dim
897 def _shift_data(self, data: xr.DataArray, index_value: range, time_dim: str, new_dim: str) -> xr.DataArray:
898 """
899 Shift data multiple times to create history or future along dimension new_dim for each time step.
901 :param data: data set to shift
902 :param index_value: range of integers to span history and/or future
903 :param time_dim: name of temporal dimension that should be shifted
904 :param new_dim: name of dimension create by data shift
905 :return: shifted data
906 """
907 coll = []
908 for i in index_value:
909 coll.append(data.shift({time_dim: -i}))
910 new_ind = self.create_index_array(new_dim, index_value)
911 return xr.concat(coll, dim=new_ind)
913 @staticmethod
914 def create_index_array(index_name: str, index_value: range):
915 """
916 Create index array from a range object to use as index of a data array.
918 :param index_name: name of the index dimension
919 :param index_value: range of values to use as indexes
920 :returns: index array for given range of values
921 """
922 ind = pd.DataFrame({'val': index_value}, index=index_value)
923 tmp_dim = index_name + "tmp"
924 res = xr.Dataset.from_dataframe(ind).to_array(tmp_dim).rename({'index': index_name})
925 res = res.squeeze(dim=tmp_dim, drop=True)
926 res.name = index_name
927 return res
929 @property
930 def apriori_data(self):
931 return self._apriori_list
933 @property
934 def initial_apriori_data(self):
935 return self.apriori_data[0]
938def fir_filter(data, fs, order=5, cutoff_low=None, cutoff_high=None, window="hamming", dim="variables", h=None,
939 causal=True, padlen=None):
940 """Expects xarray."""
941 if h is None:
942 cutoff = []
943 if cutoff_low is not None:
944 cutoff += [cutoff_low]
945 if cutoff_high is not None:
946 cutoff += [cutoff_high]
947 if len(cutoff) == 2:
948 filter_type = "bandpass"
949 elif len(cutoff) == 1 and cutoff_low is not None:
950 filter_type = "highpass"
951 elif len(cutoff) == 1 and cutoff_high is not None:
952 filter_type = "lowpass"
953 else:
954 raise ValueError("Please provide either cutoff_low or cutoff_high.")
955 h = signal.firwin(order, cutoff, pass_zero=filter_type, fs=fs, window=window)
956 filtered = xr.ones_like(data)
957 for var in data.coords[dim]:
958 d = data.sel({dim: var}).values.flatten()
959 if causal:
960 y = signal.lfilter(h, 1., d)
961 else:
962 padlen = padlen if padlen is not None else 3 * len(h)
963 y = signal.filtfilt(h, 1., d, padlen=padlen)
964 filtered.loc[{dim: var}] = y
965 return filtered, h
968def fir_filter_convolve(data, h):
969 return signal.convolve(data, h, mode='same', method="direct") / sum(h)
972class KolmogorovZurbenkoBaseClass:
974 def __init__(self, df, wl, itr, is_child=False, filter_dim="window"):
975 """
976 It create the variables associate with the Kolmogorov-Zurbenko-filter.
978 Args:
979 df(pd.DataFrame, None): time series of a variable
980 wl(list of int): window length
981 itr(list of int): number of iteration
982 """
983 self.df = df
984 self.filter_dim = filter_dim
985 self.wl = to_list(wl)
986 self.itr = to_list(itr)
987 if abs(len(self.wl) - len(self.itr)) > 0:
988 raise ValueError("Length of lists for wl and itr must agree!")
989 self._isChild = is_child
990 self.child = self.set_child()
991 self.type = type(self).__name__
993 def set_child(self):
994 if len(self.wl) > 1:
995 return KolmogorovZurbenkoBaseClass(None, self.wl[1:], self.itr[1:], True, self.filter_dim)
996 else:
997 return None
999 def kz_filter(self, df, m, k):
1000 pass
1002 def spectral_calc(self):
1003 df_start = self.df
1004 kz = self.kz_filter(df_start, self.wl[0], self.itr[0])
1005 filtered = self.subtract(df_start, kz)
1006 # case I: no child avail -> return kz and remaining
1007 if self.child is None:
1008 return [kz, filtered]
1009 # case II: has child -> return current kz and all child results
1010 else:
1011 self.child.df = filtered
1012 kz_next = self.child.spectral_calc()
1013 return [kz] + kz_next
1015 @staticmethod
1016 def subtract(minuend, subtrahend):
1017 try: # pandas implementation
1018 return minuend.sub(subtrahend, axis=0)
1019 except AttributeError: # general implementation
1020 return minuend - subtrahend
1022 def run(self):
1023 return self.spectral_calc()
1025 def transfer_function(self):
1026 m = self.wl[0]
1027 k = self.itr[0]
1028 omega = np.linspace(0.00001, 0.15, 5000)
1029 return omega, (np.sin(m * np.pi * omega) / (m * np.sin(np.pi * omega))) ** (2 * k)
1031 def omega_null(self, alpha=0.5):
1032 a = np.sqrt(6) / np.pi
1033 b = 1 / (2 * np.array(self.itr))
1034 c = 1 - alpha ** b
1035 d = np.array(self.wl) ** 2 - alpha ** b
1036 return a * np.sqrt(c / d)
1038 def period_null(self, alpha=0.5):
1039 return 1. / self.omega_null(alpha)
1041 def period_null_days(self, alpha=0.5):
1042 return self.period_null(alpha) / 24.
1044 def plot_transfer_function(self, fig=None, name=None):
1045 if fig is None:
1046 fig = plt.figure()
1047 omega, transfer_function = self.transfer_function()
1048 if self.child is not None:
1049 transfer_function_child = self.child.plot_transfer_function(fig)
1050 else:
1051 transfer_function_child = transfer_function * 0
1052 plt.semilogx(omega, transfer_function - transfer_function_child,
1053 label="m={:3.0f}, k={:3.0f}, T={:6.2f}d".format(self.wl[0],
1054 self.itr[0],
1055 self.period_null_days()))
1056 plt.axvline(x=self.omega_null())
1057 if not self._isChild:
1058 locs, labels = plt.xticks()
1059 plt.xticks(locs, np.round(1. / (locs * 24), 1))
1060 plt.xlim([0.00001, 0.15])
1061 plt.legend()
1062 if name is None:
1063 plt.show()
1064 else:
1065 plt.savefig(name)
1066 else:
1067 return transfer_function
1070class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass):
1072 def __init__(self, df, wl: Union[list, int], itr: Union[list, int], is_child=False, filter_dim="window",
1073 method="mean", percentile=0.5):
1074 """
1075 It create the variables associate with the KolmogorovZurbenkoFilterMovingWindow class.
1077 Args:
1078 df(pd.DataFrame, xr.DataArray): time series of a variable
1079 wl: window length
1080 itr: number of iteration
1081 """
1082 self.valid_methods = ["mean", "percentile", "median", "max", "min"]
1083 if method not in self.valid_methods:
1084 raise ValueError("Method '{}' is not supported. Please select from [{}].".format(
1085 method, ", ".join(self.valid_methods)))
1086 else:
1087 self.method = method
1088 if percentile > 1 or percentile < 0:
1089 raise ValueError("Percentile must be in range [0, 1]. Given was {}!".format(percentile))
1090 else:
1091 self.percentile = percentile
1092 super().__init__(df, wl, itr, is_child, filter_dim)
1094 def set_child(self):
1095 if len(self.wl) > 1:
1096 return KolmogorovZurbenkoFilterMovingWindow(self.df, self.wl[1:], self.itr[1:], is_child=True,
1097 filter_dim=self.filter_dim, method=self.method,
1098 percentile=self.percentile)
1099 else:
1100 return None
1102 @TimeTrackingWrapper
1103 def kz_filter_new(self, df, wl, itr):
1104 """
1105 It passes the low frequency time series.
1107 If filter method is from mean, max, min this method will call construct and rechunk before the actual
1108 calculation to improve performance. If filter method is either median or percentile this approach is not
1109 applicable and depending on the data and window size, this method can become slow.
1111 Args:
1112 wl(int): a window length
1113 itr(int): a number of iteration
1114 """
1115 warnings.filterwarnings("ignore")
1116 df_itr = df.__deepcopy__()
1117 try:
1118 kwargs = {"min_periods": int(0.7 * wl),
1119 "center": True,
1120 self.filter_dim: wl}
1121 for i in np.arange(0, itr):
1122 print(i)
1123 rolling = df_itr.chunk().rolling(**kwargs)
1124 if self.method not in ["percentile", "median"]:
1125 rolling = rolling.construct("construct").chunk("auto")
1126 if self.method == "median":
1127 df_mv_avg_tmp = rolling.median()
1128 elif self.method == "percentile":
1129 df_mv_avg_tmp = rolling.quantile(self.percentile)
1130 elif self.method == "max":
1131 df_mv_avg_tmp = rolling.max("construct")
1132 elif self.method == "min":
1133 df_mv_avg_tmp = rolling.min("construct")
1134 else:
1135 df_mv_avg_tmp = rolling.mean("construct")
1136 df_itr = df_mv_avg_tmp.compute()
1137 del df_mv_avg_tmp, rolling
1138 gc.collect()
1139 return df_itr
1140 except ValueError:
1141 raise ValueError
1143 @TimeTrackingWrapper
1144 def kz_filter(self, df, wl, itr):
1145 """
1146 It passes the low frequency time series.
1148 Args:
1149 wl(int): a window length
1150 itr(int): a number of iteration
1151 """
1152 import warnings
1153 warnings.filterwarnings("ignore")
1154 df_itr = df.__deepcopy__()
1155 try:
1156 kwargs = {"min_periods": int(0.7 * wl),
1157 "center": True,
1158 self.filter_dim: wl}
1159 iter_vars = df_itr.coords["variables"].values
1160 for var in iter_vars:
1161 df_itr_var = df_itr.sel(variables=[var])
1162 for _ in np.arange(0, itr):
1163 df_itr_var = df_itr_var.chunk()
1164 rolling = df_itr_var.rolling(**kwargs)
1165 if self.method == "median":
1166 df_mv_avg_tmp = rolling.median()
1167 elif self.method == "percentile":
1168 df_mv_avg_tmp = rolling.quantile(self.percentile)
1169 elif self.method == "max":
1170 df_mv_avg_tmp = rolling.max()
1171 elif self.method == "min":
1172 df_mv_avg_tmp = rolling.min()
1173 else:
1174 df_mv_avg_tmp = rolling.mean()
1175 df_itr_var = df_mv_avg_tmp.compute()
1176 df_itr.loc[{"variables": [var]}] = df_itr_var
1177 return df_itr
1178 except ValueError:
1179 raise ValueError
1182def firwin_kzf(m: int, k: int) -> np.array:
1183 """Calculate weights of window for Kolmogorov Zurbenko filter."""
1184 m, k = int(m), int(k)
1185 coef = np.ones(m)
1186 for i in range(1, k):
1187 t = np.zeros((m, m + i * (m - 1)))
1188 for km in range(m):
1189 t[km, km:km + coef.size] = coef
1190 coef = np.sum(t, axis=0)
1191 return coef / (m ** k)
1194def omega_null_kzf(m: int, k: int, alpha: float = 0.5) -> float:
1195 a = np.sqrt(6) / np.pi
1196 b = 1 / (2 * np.array(k))
1197 c = 1 - alpha ** b
1198 d = np.array(m) ** 2 - alpha ** b
1199 return a * np.sqrt(c / d)
1202def filter_width_kzf(m: int, k: int) -> int:
1203 """Returns window width of the Kolmorogov Zurbenko filter."""
1204 return k * (m - 1) + 1