Coverage for mlair/helpers/filter.py: 48%
526 statements
« prev ^ index » next coverage.py v6.4.2, created at 2023-06-01 13:03 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2023-06-01 13:03 +0000
1import gc
2import warnings
3from typing import Union, Callable, Tuple, Dict, Any
4import logging
6import datetime
7import numpy as np
8import pandas as pd
9from matplotlib import pyplot as plt
10from scipy import signal
11import xarray as xr
12import dask.array as da
14from mlair.helpers import to_list, TimeTrackingWrapper, TimeTracking
17class FIRFilter:
18 from mlair.plotting.data_insight_plotting import PlotFirFilter
20 def __init__(self, data, fs, order, cutoff, window, var_dim, time_dim, display_name=None, minimum_length=None,
21 offset=0, plot_path=None, plot_dates=None):
22 self._filtered = []
23 self._h = []
24 self.data = data
25 self.fs = fs
26 self.order = order
27 self.cutoff = cutoff
28 self.window = window
29 self.var_dim = var_dim
30 self.time_dim = time_dim
31 self.display_name = display_name
32 self.minimum_length = minimum_length
33 self.offset = offset
34 self.plot_path = plot_path
35 self.plot_dates = plot_dates
36 self.run()
38 def run(self):
39 logging.info(f"{self.display_name}: start {self.__class__.__name__}")
40 filtered = []
41 h = []
42 input_data = self.data.__deepcopy__()
44 # collect some data for visualization
45 if self.plot_dates is None:
46 plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * self.fs
47 self.plot_dates = [input_data.isel({self.time_dim: int(pos)}).coords[self.time_dim].values
48 for pos in plot_pos if pos < len(input_data.coords[self.time_dim])]
49 plot_data = []
51 for i in range(len(self.order)):
52 # apply filter
53 fi, hi = self.fir_filter(input_data, self.fs, self.cutoff[i], self.order[i], time_dim=self.time_dim,
54 var_dim=self.var_dim, window=self.window, display_name=self.display_name)
55 filtered.append(fi)
56 h.append(hi)
58 # visualization
59 plot_data.append(self.create_visualization(fi, input_data, self.plot_dates, self.time_dim, self.fs, hi,
60 self.minimum_length, self.order, i, self.offset, self.var_dim))
61 # calculate residuum
62 input_data = input_data - fi
64 # add last residuum to filtered
65 filtered.append(input_data)
67 self._filtered = filtered
68 self._h = h
70 # visualize
71 if self.plot_path is not None:
72 try:
73 self.PlotFirFilter(self.plot_path, plot_data, self.display_name) # not working when t0 != 0
74 except Exception as e:
75 logging.info(f"Could not plot climate fir filter due to following reason:\n{e}")
77 def create_visualization(self, filtered, filter_input_data, plot_dates, time_dim, sampling,
78 h, minimum_length, order, i, offset, var_dim): # pragma: no cover
79 plot_data = []
80 minimum_length = minimum_length or 0
81 for viz_date in set(plot_dates).intersection(filtered.coords[time_dim].values):
82 try:
83 if i < len(order) - 1:
84 minimum_length += order[i+1]
86 td_type = {1: "D", 24: "h"}.get(sampling)
87 length = len(h)
88 extend_length_history = minimum_length + int((length + 1) / 2)
89 extend_length_future = int((length + 1) / 2) + 1
90 t_minus = viz_date + np.timedelta64(int(-extend_length_history), td_type)
91 t_plus = viz_date + np.timedelta64(int(extend_length_future + offset), td_type)
92 time_slice = slice(t_minus, t_plus - np.timedelta64(1, td_type))
93 plot_data.append({"t0": viz_date, "filter_input": filter_input_data.sel({time_dim: time_slice}),
94 "filtered": filtered.sel({time_dim: time_slice}), "h": h, "time_dim": time_dim,
95 "var_dim": var_dim})
96 except:
97 pass
98 return plot_data
100 @property
101 def filter_coefficients(self):
102 return self._h
104 @property
105 def filtered_data(self):
106 return self._filtered
108 @TimeTrackingWrapper
109 def fir_filter(self, data, fs, cutoff_high, order, sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming",
110 minimum_length=None, new_dim="window", plot_dates=None, display_name=None):
112 # calculate FIR filter coefficients
113 h = self._calculate_filter_coefficients(window, order, cutoff_high, fs)
115 coll = []
116 for var in data.coords[var_dim]:
117 d = data.sel({var_dim: var})
118 filt = xr.apply_ufunc(fir_filter_convolve, d,
119 input_core_dims=[[time_dim]], output_core_dims=[[time_dim]],
120 vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype])
121 # trim data to valid range
122 ext_len = int((len(h) + 1) / 2)
123 valid_range = filt.coords[time_dim].values[ext_len:-ext_len]
124 trimmed = filt.sel(**{time_dim: valid_range})
125 coll.append(trimmed)
126 filtered = xr.concat(coll, var_dim)
128 # create result array with same shape like input data, gaps are filled by nans
129 filtered = self._create_full_filter_result_array(data, filtered, time_dim, display_name)
130 return filtered, h
132 @staticmethod
133 def _calculate_filter_coefficients(window: Union[str, tuple], order: Union[int, tuple], cutoff_high: float,
134 fs: float) -> np.array:
135 """
136 Calculate filter coefficients for moving window using scipy's signal package for common filter types and local
137 method firwin_kzf for Kolmogorov Zurbenko filter (kzf). The filter is a low-pass filter.
139 :param window: name of the window type which is either a string with the window's name or a tuple containing the
140 name but also some parameters (e.g. `("kaiser", 5)`)
141 :param order: order of the filter to create as int or parameters m and k of kzf
142 :param cutoff_high: cutoff frequency to use for low-pass filter in frequency of fs
143 :param fs: sampling frequency of time series
144 """
145 if window == "kzf":
146 h = firwin_kzf(*order)
147 else:
148 h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window)
149 return h
151 @staticmethod
152 def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str,
153 display_name: str = None) -> xr.DataArray:
154 """
155 Create result filter array with same shape line given template data (should be the original input data before
156 filtering the data). All gaps are filled by nans.
158 :param template_array: this array is used as template for shape and ordering of dims
159 :param result_array: array with data that are filled into template
160 :param new_dim: new dimension which is shifted/appended to/at the end (if present or not)
161 :param display_name: string that is attached to logging (default None)
162 """
163 logging.debug(f"{display_name}: create res_full")
164 new_coords = {**{k: template_array.coords[k].values for k in template_array.coords if k != new_dim},
165 new_dim: result_array.coords[new_dim]}
166 dims = [*template_array.dims, new_dim] if new_dim not in template_array.dims else template_array.dims
167 result_array = result_array.transpose(*dims)
168 return result_array.broadcast_like(xr.DataArray(dims=dims, coords=new_coords))
171class ClimateFIRFilter(FIRFilter):
172 from mlair.plotting.data_insight_plotting import PlotClimateFirFilter
174 def __init__(self, data, fs, order, cutoff, window, time_dim, var_dim, apriori=None, apriori_type=None,
175 apriori_diurnal=False, sel_opts=None, plot_path=None,
176 minimum_length=None, new_dim=None, display_name=None, extend_length_opts: int = 0,
177 offset: Union[dict, int] = 0, plot_dates=None):
178 """
179 :param data: data to filter
180 :param fs: sampling frequency in 1/days -> 1d: fs=1 -> 1H: fs=24
181 :param order: a tuple with the order of the filter in same ordering like cutoff
182 :param cutoff: a tuple with the cutoff frequencies (all are applied as low pass)
183 :param window: window type of the filter (e.g. hamming)
184 :param time_dim: name of time dimension to apply filter along
185 :param var_dim: name of variables dimension
186 :param apriori: apriori information to use for the first low pass. If None, climatology is calculated on the
187 provided data.
188 :param apriori_type: type of apriori information to use. Climatology will be used always for first low pass. For
189 the residuum either the value zero is used (apriori_type is None or "zeros") or a climatology on the
190 residua is used ("residuum_stats").
191 :param apriori_diurnal: Use diurnal cycle as additional apriori information (only applicable for hourly
192 resoluted data). The mean anomaly of each hour is added to the apriori_type information.
193 :param sel_opts: specify some parameters to select a subset of data before calculating the apriori information.
194 Use this parameter for example, if apriori shall only calculated on a shorter time period than available in
195 given data.
196 :param extend_length_opts: shift information switch between historical data and apriori estimation by the given
197 values (default None). Must either be a dictionary with keys available in var_dim or a single value that is
198 applied to all data. This parameter has only influence on which information is available at t0 for the
199 filter calculcation but has no influence on the shape of the returned filtered data.
200 :param offset: This parameter indicates the number of time steps with ti>t0 to return of the filtered data. In
201 case the offset parameter is larger than the extend_lenght_opts parameter, this leads to the case that not
202 only observational data but also climatological estimations are returned. Must either be a dictionary with
203 keys available in var_dim or a single value that is applied to all data. Default is 0.
204 """
205 self._apriori = apriori
206 self.apriori_type = apriori_type
207 self.apriori_diurnal = apriori_diurnal
208 self._apriori_list = []
209 self.sel_opts = sel_opts
210 self.new_dim = new_dim
211 self.plot_data = []
212 self.extend_length_opts = extend_length_opts
213 super().__init__(data, fs, order, cutoff, window, var_dim, time_dim, display_name=display_name,
214 minimum_length=minimum_length, plot_path=plot_path, offset=offset, plot_dates=plot_dates)
216 def run(self):
217 filtered = []
218 h = []
219 if self.sel_opts is not None:
220 self.sel_opts = self.sel_opts if isinstance(self.sel_opts, dict) else {self.time_dim: self.sel_opts}
221 self._check_sel_opts()
222 sampling = {1: "1d", 24: "1H"}.get(int(self.fs))
223 logging.debug(f"{self.display_name}: create diurnal_anomalies")
224 if self.apriori_diurnal is True and sampling == "1H":
225 diurnal_anomalies = self.create_seasonal_hourly_mean(self.data, self.time_dim, sel_opts=self.sel_opts,
226 sampling=sampling, as_anomaly=True)
227 else:
228 diurnal_anomalies = 0
229 logging.debug(f"{self.display_name}: create monthly apriori")
230 if self._apriori is None:
231 self._apriori = self.create_monthly_mean(self.data, self.time_dim, sel_opts=self.sel_opts,
232 sampling=sampling) + diurnal_anomalies
233 logging.debug(f"{self.display_name}: apriori shape = {self._apriori.shape}")
234 apriori_list = to_list(self._apriori)
235 input_data = self.data.__deepcopy__()
237 # for visualization
238 plot_dates = self.plot_dates
240 # create tmp dimension to apply filter, search for unused name
241 new_dim = self._create_tmp_dimension(input_data) if self.new_dim is None else self.new_dim
243 for i in range(len(self.order)):
244 logging.info(f"{self.display_name}: start filter for order {self.order[i]}")
245 # calculate climatological filter
246 next_order = self._next_order(self.order, 0, i, self.window)
247 fi, input_data, hi, apriori, plot_data = self.clim_filter(input_data, self.fs, self.cutoff[i], self.order[i],
248 apriori=apriori_list[i], sel_opts=self.sel_opts,
249 sampling=sampling, time_dim=self.time_dim,
250 window=self.window, var_dim=self.var_dim,
251 minimum_length=self.minimum_length, new_dim=new_dim,
252 plot_dates=plot_dates, display_name=self.display_name,
253 extend_opts=self.extend_length_opts,
254 offset=self.offset, next_order=next_order)
256 logging.info(f"{self.display_name}: finished clim_filter calculation")
257 if self.minimum_length is None:
258 filtered.append(fi.sel({new_dim: slice(None, self.offset)}))
259 else:
260 filtered.append(fi.sel({new_dim: slice(self.offset - self.minimum_length, self.offset)}))
261 h.append(hi)
262 gc.collect()
263 self.plot_data.append(plot_data)
264 plot_dates = {e["t0"] for e in plot_data}
266 # calculate residuum
267 logging.info(f"{self.display_name}: calculate residuum")
268 coord_range = range(fi.coords[new_dim].values.min(), fi.coords[new_dim].values.max() + 1)
269 if new_dim in input_data.coords:
270 input_data = input_data.sel({new_dim: coord_range}) - fi
271 else:
272 input_data = self._shift_data(input_data, coord_range, self.time_dim, new_dim) - fi
274 # create new apriori information for next iteration if no further apriori is provided
275 if len(apriori_list) < len(self.order):
276 logging.info(f"{self.display_name}: create diurnal_anomalies")
277 if self.apriori_diurnal is True and sampling == "1H":
278 diurnal_anomalies = self.create_seasonal_hourly_mean(input_data.sel({new_dim: 0}, drop=True),
279 self.time_dim, sel_opts=self.sel_opts,
280 sampling=sampling, as_anomaly=True)
281 else:
282 diurnal_anomalies = 0
283 logging.info(f"{self.display_name}: create monthly apriori")
284 if self.apriori_type is None or self.apriori_type == "zeros": # zero version
285 apriori_list.append(xr.zeros_like(apriori_list[i]) + diurnal_anomalies)
286 elif self.apriori_type == "residuum_stats": # calculate monthly statistic on residuum
287 apriori_list.append(
288 -self.create_monthly_mean(input_data.sel({new_dim: 0}, drop=True), self.time_dim,
289 sel_opts=self.sel_opts, sampling=sampling) + diurnal_anomalies)
290 else:
291 raise ValueError(f"Cannot handle unkown apriori type: {self.apriori_type}. Please choose from None,"
292 f" `zeros` or `residuum_stats`.")
294 # add last residuum to filtered
295 if self.minimum_length is None:
296 filtered.append(input_data.sel({new_dim: slice(None, self.offset)}))
297 else:
298 filtered.append(input_data.sel({new_dim: slice(self.offset - self.minimum_length, self.offset)}))
300 self._filtered = filtered
301 self._h = h
302 self._apriori_list = apriori_list
304 # visualize
305 if self.plot_path is not None:
306 try:
307 self.PlotClimateFirFilter(self.plot_path, self.plot_data, sampling, self.display_name)
308 except Exception as e:
309 logging.info(f"Could not plot climate fir filter due to following reason:\n{e}")
311 def _check_sel_opts(self):
312 if len(self.data.sel(**self.sel_opts).coords[self.time_dim]) == 0:
313 raise ValueError(f"Abort {self.__class__.__name__} as no data is available after applying sel_opts to data")
315 @staticmethod
316 def _next_order(order: list, minimum_length: Union[int, None], pos: int, window: Union[str, tuple]) -> int:
317 next_order = 0
318 if pos + 1 < len(order):
319 next_order = order[pos + 1]
320 if window == "kzf" and isinstance(next_order, tuple):
321 next_order = filter_width_kzf(*next_order)
322 if minimum_length is not None:
323 next_order = next_order + minimum_length
324 return next_order
326 @staticmethod
327 def create_monthly_unity_array(data: xr.DataArray, time_dim: str, extend_range: int = 366) -> xr.DataArray:
328 """
329 Create a xarray data array filled with ones with monthly resolution (set on 16th of month). Data is extended by
330 extend_range days in future and past along time_dim.
332 :param data: data to create monthly unity array from, must contain dimension time_dim
333 :param time_dim: name of temporal dimension
334 :param extend_range: number of days to extend data (default 366)
335 :returns: xarray in monthly resolution (centered at 16th day of month) with all values equal to 1
336 """
337 coords = data.coords
339 # extend time_dim by given extend_range days
340 start = coords[time_dim][0].values.astype("datetime64[D]") - np.timedelta64(extend_range, "D")
341 end = coords[time_dim][-1].values.astype("datetime64[D]") + np.timedelta64(extend_range, "D")
342 new_time_axis = np.arange(start, end).astype("datetime64[ns]")
344 # construct data array with updated coords
345 new_coords = {k: data.coords[k].values if k != time_dim else new_time_axis for k in coords}
346 new_array = xr.DataArray(1, coords=new_coords, dims=new_coords.keys()).transpose(*data.dims)
348 # loffset is required because resampling uses last day in month as resampling timestamp
349 return new_array.resample({time_dim: "1m"}, loffset=datetime.timedelta(days=-15)).max()
351 def create_monthly_mean(self, data: xr.DataArray, time_dim: str, sel_opts: dict = None, sampling: str = "1d") \
352 -> xr.DataArray:
353 """
354 Calculate monthly means (12 values) and return a data array with same resolution as given data containing these
355 monthly mean values. Sampling points are the 16th of each month (this value is equal to the true monthly mean)
356 and all other values between two points are interpolated linearly. It is possible to apply some pre-selection
357 to use only a subset of given data using the sel_opts parameter. Only data from this subset are used to
358 calculate the monthly statistic.
360 :param data: data to apply statistical calculation on
361 :param time_dim: name of temporal axis
362 :param sel_opts: selection options as dict to select a subset of data (default None). A given sel_opts with
363 `sel_opts={<time_dim>: "2006"}` forces the method e.g. to derive the monthly means only from data of the
364 year 2006.
365 :param sampling: sampling of the returned data (default 1d)
366 :returns: array in desired resolution containing interpolated monthly values. Months with no valid data are
367 returned as np.nan which also effects data in the neighbouring months (before / after sampling points which
368 are the 16th of each month).
369 """
371 # create unity xarray in monthly resolution with sampling point in mid of each month
372 monthly = self.create_monthly_unity_array(data, time_dim) * np.nan
374 # apply selection if given (only use subset for monthly means)
375 if sel_opts is not None:
376 data = data.sel(**sel_opts)
378 # create monthly mean and replace entries in unity array
379 monthly_mean = data.groupby(f"{time_dim}.month").mean()
380 for month in monthly_mean.month.values:
381 monthly = xr.where((monthly[f"{time_dim}.month"] == month),
382 monthly_mean.sel(month=month, drop=True),
383 monthly)
384 # transform monthly information into original sampling rate
385 return monthly.dropna(dim=time_dim).resample({time_dim: sampling}).interpolate()
387 @staticmethod
388 def _compute_hourly_mean_per_month(data: xr.DataArray, time_dim: str, as_anomaly: bool) -> Dict[int, xr.DataArray]:
389 """
390 Calculate for each hour in each month a separate mean value (12 x 24 values in total). Average is either the
391 anomaly of a monthly mean state or the raw mean value.
393 :param data: data to calculate averages on
394 :param time_dim: name of temporal dimension
395 :param as_anomaly: indicates whether to calculate means as anomaly of a monthly mean or as raw mean values.
396 :returns: dictionary containing 12 months each with a 24-valued array (1 entry for each hour)
397 """
398 seasonal_hourly_means = {}
399 for month in data.groupby(f"{time_dim}.month").groups.keys():
400 single_month_data = data.sel({time_dim: (data[f"{time_dim}.month"] == month)})
401 hourly_mean = single_month_data.groupby(f"{time_dim}.hour").mean()
402 if as_anomaly is True:
403 hourly_mean = hourly_mean - hourly_mean.mean("hour")
404 seasonal_hourly_means[month] = hourly_mean
405 return seasonal_hourly_means
407 @staticmethod
408 def _create_seasonal_cycle_of_single_hour_mean(result_arr: xr.DataArray, means: Dict[int, xr.DataArray], hour: int,
409 time_dim: str, sampling: str) -> xr.DataArray:
410 """
411 Use monthly means of a given hour to create an array with interpolated values at the indicated hour for each day
412 of the full time span indicated by given result_arr.
414 :param result_arr: template array indicating the full time range and additional dimensions to keep
415 :param means: dictionary containing 24 hourly averages for each month (12 x 24 values in total)
416 :param hour: integer of hour of interest
417 :param time_dim: name of temporal dimension
418 :param sampling: sampling rate to interpolate
419 :returns: array with interpolated averages in sampling resolution containing only values for hour of interest
420 """
421 h_coll = xr.ones_like(result_arr) * np.nan
422 for month in means.keys():
423 hourly_mean_single_month = means[month].sel(hour=hour, drop=True)
424 h_coll = xr.where((h_coll[f"{time_dim}.month"] == month), hourly_mean_single_month, h_coll)
425 h_coll = h_coll.dropna(time_dim).resample({time_dim: sampling}).interpolate()
426 h_coll = h_coll.sel({time_dim: (h_coll[f"{time_dim}.hour"] == hour)})
427 return h_coll
429 def create_seasonal_hourly_mean(self, data: xr.DataArray, time_dim: str, sel_opts: Dict[str, Any] = None,
430 sampling: str = "1H", as_anomaly: bool = True) -> xr.DataArray:
431 """
432 Compute climatological statistics on hourly base either as raw data or anomalies. For each month, an overall
433 mean value (only used if requiring anomalies) and the mean of each hour are calculated. The climatological
434 diurnal cycle is positioned on the 16th of each month and interpolated in between by using a distinct
435 interpolation for each hour of day. The returned array therefore contains data with a yearly cycle (if anomaly
436 is not calculated) or data without a yearly cycle (if using anomalies). In both cases, the data have an
437 amplitude that varies over the year.
439 :param data: data to apply this method to
440 :param time_dim: name of temporal axis
441 :param sel_opts: specific selection options that are applied before calculation of climatological statistics
442 (default None)
443 :param sampling: temporal resolution of data (default "1H")
444 :param as_anomaly: specify whether to use anomalies or raw data including a seasonal cycle of the mean value
445 (default: True)
446 :returns: climatological statistics for given data interpolated with given sampling rate
447 """
449 """Calculate hourly statistics. Either the absolute value or the anomaly (as_anomaly=True)."""
450 # can only be used for hourly sampling rate
451 assert sampling == "1H"
453 # apply selection if given (only use subset for seasonal hourly means)
454 if sel_opts is not None:
455 data = data.sel(**sel_opts)
457 # create unity xarray in monthly resolution with sampling point in mid of each month
458 monthly = self.create_monthly_unity_array(data, time_dim) * np.nan
460 # calculate for each hour in each month a separate mean value
461 seasonal_hourly_means = self._compute_hourly_mean_per_month(data, time_dim, as_anomaly)
463 # create seasonal cycles of these hourly averages
464 seasonal_coll = []
465 for hour in data.groupby(f"{time_dim}.hour").groups.keys():
466 mean_single_hour = self._create_seasonal_cycle_of_single_hour_mean(monthly, seasonal_hourly_means, hour,
467 time_dim, sampling)
468 seasonal_coll.append(mean_single_hour)
470 # combine all cycles in a common data array
471 hourly = xr.concat(seasonal_coll, time_dim).sortby(time_dim).resample({time_dim: sampling}).interpolate()
472 return hourly
474 @staticmethod
475 def extend_apriori(data: xr.DataArray, apriori: xr.DataArray, time_dim: str, sampling: str = "1d",
476 display_name: str = None) -> xr.DataArray:
477 """
478 Extend time range of apriori information to span a longer period as data (or at least of equal length). This
479 method may not working properly if length of apriori contains data from less then one year.
481 :param data: data to get time range of which apriori should span in minimum
482 :param apriori: data that is adjusted. It is assumed that this data varies in the course of the year but is same
483 for the same day in different years. Otherwise this method will introduce some unintended artefacts in the
484 apriori data.
485 :param time_dim: name of temporal dimension
486 :param sampling: sampling of data (e.g. "1m", "1d", default "1d")
487 :param display_name: name to use for logging message (default None)
488 :returns: array which adjusted temporal coverage derived from apriori
489 """
490 dates = data.coords[time_dim].values
491 td_type = {"1d": "D", "1H": "h"}.get(sampling)
493 # apriori starts after data
494 if dates[0] < apriori.coords[time_dim].values[0]:
495 logging.debug(f"{display_name}: apriori starts after data")
497 # add difference in full years
498 date_diff = abs(dates[0] - apriori.coords[time_dim].values[0]).astype("timedelta64[D]")
499 extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365
500 factor = 1 if td_type == "D" else 24
502 # get fill data range
503 start = apriori.coords[time_dim][0].values.astype("datetime64[%s]" % td_type)
504 end = apriori.coords[time_dim][0].values.astype("datetime64[%s]" % td_type) + np.timedelta64(
505 366 * factor + 1, td_type)
507 # fill year by year
508 for i in range(365, extend_range + 365, 365):
509 apriori_tmp = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date
510 new_time_axis = apriori_tmp.coords[time_dim] - np.timedelta64(i * factor, td_type)
511 apriori_tmp.coords[time_dim] = new_time_axis
512 apriori = apriori.combine_first(apriori_tmp)
514 # apriori ends before data
515 if dates[-1] + np.timedelta64(365, "D") > apriori.coords[time_dim].values[-1]:
516 logging.debug(f"{display_name}: apriori ends before data")
518 # add difference in full years + 1 year (because apriori is used as future estimate)
519 date_diff = abs(dates[-1] - apriori.coords[time_dim].values[-1]).astype("timedelta64[D]")
520 extend_range = np.ceil(date_diff / (np.timedelta64(1, "D") * 365)).astype(int) * 365 + 365
521 factor = 1 if td_type == "D" else 24
523 # get fill data range
524 start = apriori.coords[time_dim][-1].values.astype("datetime64[%s]" % td_type) - np.timedelta64(
525 366 * factor + 1, td_type)
526 end = apriori.coords[time_dim][-1].values.astype("datetime64[%s]" % td_type)
528 # fill year by year
529 for i in range(365, extend_range + 365, 365):
530 apriori_tmp = apriori.sel({time_dim: slice(start, end)}) # hint: slice includes end date
531 new_time_axis = apriori_tmp.coords[time_dim] + np.timedelta64(i * factor, td_type)
532 apriori_tmp.coords[time_dim] = new_time_axis
533 apriori = apriori.combine_first(apriori_tmp)
535 return apriori
537 def combine_observation_and_apriori(self, data: xr.DataArray, apriori: xr.DataArray, time_dim: str, new_dim: str,
538 extend_length_history: int, extend_length_future: int,
539 extend_length_separator: int = 0) -> xr.DataArray:
540 """
541 Combine historical data / observations ("data") and climatological statistics ("apriori"). Historical data are
542 used on interval [t0 - extend_length_history, t0] and apriori is used on [t0 + 1, t0 + extend_length_future]. If
543 indicated by the extend_length_seperator, it is possible to shift end of history interval and start of apriori
544 interval by given number of time steps.
546 :param data: historical data for past values, must contain dimensions time_dim and var_dim and might also have
547 a new_dim dimension
548 :param apriori: climatological estimate for future values, must contain dimensions time_dim and var_dim, but
549 can also have dimension new_dim
550 :param time_dim: name of temporal dimension
551 :param new_dim: name of new dim on which data is combined along
552 :param extend_length_history: number of time steps to use from data
553 :param extend_length_future: number of time steps to use from apriori (minus 1)
554 :param extend_length_separator: position of last history value to use (default 0), this position indicates the
555 last value that is used from data (followed by values from apriori). In other words, end of history
556 interval and start of apriori interval are shifted by this value from t0 (positive or negative).
557 :returns: combined data array
558 """
559 # prepare historical data / observation
560 ext_sep = min(extend_length_separator, extend_length_future)
561 if new_dim not in data.coords:
562 history = self._shift_data(data, range(int(-extend_length_history), ext_sep + 1),
563 time_dim, new_dim)
564 else:
565 history = data.sel({new_dim: slice(int(-extend_length_history), ext_sep)})
567 if extend_length_future > ext_sep + 1: 567 ↛ 580line 567 didn't jump to line 580, because the condition on line 567 was never false
568 # prepare climatological statistics
569 if new_dim not in apriori.coords:
570 future = self._shift_data(apriori, range(ext_sep + 1,
571 extend_length_future + 1),
572 time_dim, new_dim)
573 else:
574 future = apriori.sel({new_dim: slice(ext_sep + 1,
575 extend_length_future)})
576 # combine historical data [t0-length,t0+sep] and climatological statistics [t0+sep+1,t0+length]
577 filter_input_data = xr.concat([history.dropna(time_dim), future], dim=new_dim, join="left")
578 return filter_input_data
579 else:
580 return history
582 def create_visualization(self, filtered, data, filter_input_data, plot_dates, time_dim, new_dim, sampling,
583 extend_length_history, extend_length_future, minimum_length, h,
584 variable_name, extend_length_opts=None, offset=None): # pragma: no cover
585 plot_data = []
586 offset = 0 if offset is None else offset
587 extend_length_opts = 0 if extend_length_opts is None else extend_length_opts
588 for t0 in set(plot_dates).intersection(filtered.coords[time_dim].values):
589 try:
590 td_type = {"1d": "D", "1H": "h"}.get(sampling)
591 t_minus = t0 + np.timedelta64(int(-extend_length_history), td_type)
592 t_plus = t0 + np.timedelta64(int(extend_length_future + 1), td_type)
593 if new_dim not in data.coords:
594 tmp_filter_data = self._shift_data(data.sel({time_dim: slice(t_minus, t_plus)}),
595 range(int(-extend_length_history),
596 int(extend_length_future + 1)),
597 time_dim,
598 new_dim).sel({time_dim: t0})
599 else:
600 tmp_filter_data = None
601 valid_start = int(filtered[new_dim].min()) + int((len(h) + 1) / 2)
602 valid_end = min(extend_length_opts + offset + 1, int(filtered[new_dim].max()) - int((len(h) + 1) / 2))
603 valid_range = range(valid_start, valid_end)
604 plot_data.append({"t0": t0,
605 "var": variable_name,
606 "filter_input": filter_input_data.sel({time_dim: t0}),
607 "filter_input_nc": tmp_filter_data,
608 "valid_range": valid_range,
609 "time_range": data.sel(
610 {time_dim: slice(t_minus, t_plus - np.timedelta64(1, td_type))}).coords[
611 time_dim].values,
612 "h": h,
613 "new_dim": new_dim})
614 except:
615 pass
616 return plot_data
618 @staticmethod
619 def _get_year_interval(data: xr.DataArray, time_dim: str) -> Tuple[int, int]:
620 """
621 Get year of start and end date of given data.
623 :param data: data to extract dates from
624 :param time_dim: name of temporal axis
625 :returns: two-element tuple with start and end
626 """
627 start = pd.to_datetime(data.coords[time_dim].min().values).year
628 end = pd.to_datetime(data.coords[time_dim].max().values).year
629 return start, end
631 @staticmethod
632 def _calculate_filter_coefficients(window: Union[str, tuple], order: Union[int, tuple], cutoff_high: float,
633 fs: float) -> np.array:
634 """
635 Calculate filter coefficients for moving window using scipy's signal package for common filter types and local
636 method firwin_kzf for Kolmogorov Zurbenko filter (kzf). The filter is a low-pass filter.
638 :param window: name of the window type which is either a string with the window's name or a tuple containing the
639 name but also some parameters (e.g. `("kaiser", 5)`)
640 :param order: order of the filter to create as int or parameters m and k of kzf
641 :param cutoff_high: cutoff frequency to use for low-pass filter in frequency of fs
642 :param fs: sampling frequency of time series
643 """
644 if window == "kzf":
645 h = firwin_kzf(*order)
646 else:
647 h = signal.firwin(order, cutoff_high, pass_zero="lowpass", fs=fs, window=window)
648 return h
650 @staticmethod
651 def _trim_data_to_minimum_length(data: xr.DataArray, extend_length_history: int, dim: str,
652 extend_length_future: int = 0) -> xr.DataArray:
653 """
654 Trim data along given axis between either -minimum_length (if given) or -extend_length_history and
655 extend_length_opts (which is default set to 0).
657 :param data: data to trim
658 :param extend_length_history: start number for trim range (transformed to negative), only used if parameter
659 minimum_length is not provided
660 :param dim: dim to apply trim on
661 :param extend_length_future: number to use in "future"
662 :returns: trimmed data
663 """
664 return data.sel({dim: slice(-extend_length_history, extend_length_future)}, drop=True)
666 @staticmethod
667 def _create_full_filter_result_array(template_array: xr.DataArray, result_array: xr.DataArray, new_dim: str,
668 display_name: str = None) -> xr.DataArray:
669 """
670 Create result filter array with same shape line given template data (should be the original input data before
671 filtering the data). All gaps are filled by nans.
673 :param template_array: this array is used as template for shape and ordering of dims
674 :param result_array: array with data that are filled into template
675 :param new_dim: new dimension which is shifted/appended to/at the end (if present or not)
676 :param display_name: string that is attached to logging (default None)
677 """
678 logging.debug(f"{display_name}: create res_full")
679 new_coords = {**{k: template_array.coords[k].values for k in template_array.coords if k != new_dim},
680 new_dim: result_array.coords[new_dim]}
681 dims = [*template_array.dims, new_dim] if new_dim not in template_array.dims else template_array.dims
682 result_array = result_array.transpose(*dims)
683 return result_array.broadcast_like(xr.DataArray(dims=dims, coords=new_coords))
685 @TimeTrackingWrapper
686 def clim_filter(self, data, fs, cutoff_high, order, apriori=None, sel_opts=None,
687 sampling="1d", time_dim="datetime", var_dim="variables", window: Union[str, Tuple] = "hamming",
688 minimum_length=0, next_order=0, new_dim="window", plot_dates=None, display_name=None,
689 extend_opts: int = 0, offset: int = 0):
691 logging.debug(f"{display_name}: extend apriori")
693 # calculate apriori information from data if not given and extend its range if not sufficient long enough
694 if apriori is None:
695 apriori = self.create_monthly_mean(data, time_dim, sel_opts=sel_opts, sampling=sampling)
696 apriori = apriori.astype(data.dtype)
697 apriori = self.extend_apriori(data, apriori, time_dim, sampling, display_name=display_name)
699 # calculate FIR filter coefficients
700 h = self._calculate_filter_coefficients(window, order, cutoff_high, fs)
701 length = len(h)
703 # set data extend that is required for filtering
704 extend_length_history = minimum_length + int((next_order + 1) / 2) + int((length + 1) / 2) - offset
705 extend_length_future = offset + int((next_order + 1) / 2) + int((length + 1) / 2)
707 # collect some data for visualization
708 plot_pos = np.array([0.25, 1.5, 2.75, 4]) * 365 * fs
709 if plot_dates is None:
710 plot_dates = [data.isel({time_dim: int(pos)}).coords[time_dim].values for pos in plot_pos if
711 pos < len(data.coords[time_dim])]
712 plot_data = []
714 coll = []
715 coll_input = []
717 for var in reversed(data.coords[var_dim].values):
718 logging.info(f"{display_name} ({var}): sel data")
720 _start, _end = self._get_year_interval(data, time_dim)
721 filt_coll = []
722 filt_input_coll = []
723 for _year in range(_start, _end + 1):
724 logging.debug(f"{display_name} ({var}): year={_year}")
726 # select observations and apriori data
727 time_slice = self._create_time_range_extend(
728 _year, sampling, max(extend_length_history, extend_length_future))
729 d = data.sel({var_dim: [var], time_dim: time_slice})
730 a = apriori.sel({var_dim: [var], time_dim: time_slice})
731 if len(d.coords[time_dim]) == 0: # no data at all for this year 731 ↛ 732line 731 didn't jump to line 732, because the condition on line 731 was never true
732 continue
734 # combine historical data / observation [t0-length,t0] and climatological statistics [t0+1,t0+length]
735 filter_input_data = self.combine_observation_and_apriori(d, a, time_dim, new_dim, extend_length_history,
736 extend_length_future, extend_length_separator=extend_opts)
738 # select only data for current year
739 try:
740 filter_input_data = filter_input_data.sel({time_dim: str(_year)})
741 except KeyError: # no valid data for this year
742 continue
743 if len(filter_input_data.coords[time_dim]) == 0: # no valid data for this year 743 ↛ 744line 743 didn't jump to line 744, because the condition on line 743 was never true
744 continue
746 # apply filter
747 logging.debug(f"{display_name} ({var}): start filter convolve")
748 with TimeTracking(name=f"{display_name} ({var}): filter convolve", logging_level=logging.DEBUG):
749 filt = xr.apply_ufunc(fir_filter_convolve, filter_input_data,
750 input_core_dims=[[new_dim]], output_core_dims=[[new_dim]],
751 vectorize=True, kwargs={"h": h}, output_dtypes=[d.dtype])
753 # trim data if required
754 valid_range_end = int(filt.coords[new_dim].max() - (length + 1) / 2) + 1
755 ext_len = min(extend_length_future, valid_range_end)
756 trimmed = self._trim_data_to_minimum_length(filt, extend_length_history, new_dim,
757 extend_length_future=ext_len)
758 filt_coll.append(trimmed)
759 trimmed = self._trim_data_to_minimum_length(filter_input_data, extend_length_history, new_dim,
760 extend_length_future=ext_len)
761 filt_input_coll.append(trimmed)
763 # visualization
764 plot_data.extend(self.create_visualization(filt, d, filter_input_data, plot_dates, time_dim, new_dim,
765 sampling, extend_length_history, extend_length_future,
766 minimum_length, h, var, extend_opts, offset))
768 # collect all filter results
769 coll.append(xr.concat(filt_coll, time_dim))
770 coll_input.append(xr.concat(filt_input_coll, time_dim))
771 gc.collect()
773 # concat all variables
774 logging.debug(f"{display_name}: concat all variables")
775 res = xr.concat(coll, var_dim)
776 res_input = xr.concat(coll_input, var_dim)
778 # create result array with same shape like input data, gaps are filled by nans
779 res_full = self._create_full_filter_result_array(data, res, new_dim, display_name)
780 res_input_full = self._create_full_filter_result_array(data, res_input, new_dim, display_name)
781 return res_full, res_input_full, h, apriori, plot_data
783 @staticmethod
784 def _create_time_range_extend(year: int, sampling: str, extend_length: int) -> slice:
785 """
786 Create a slice object for given year plus extend_length in sampling resolution.
788 :param year: year to create time range for
789 :param sampling: sampling of time range
790 :param extend_length: number of time steps to extend out of given year
791 :returns: slice object with time range
792 """
793 td_type = {"1d": "D", "1H": "h"}.get(sampling)
794 delta = np.timedelta64(extend_length + 1, td_type)
795 start = np.datetime64(f"{year}-01-01") - delta
796 end = np.datetime64(f"{year}-12-31") + delta
797 return slice(start, end)
799 @staticmethod
800 def _create_tmp_dimension(data: xr.DataArray) -> str:
801 """
802 Create a tmp dimension with name 'window' preferably. If name is already part of one dimensions, tmp dimension
803 name is multiplied by itself until not present in dims. Method will raise ValueError after 10 tries.
805 :param data: data array to create a new tmp dimension for with unique name
806 :returns: valid name for a tmp dimension (preferably 'window')
807 """
808 new_dim = "window"
809 count = 0
810 while new_dim in data.dims:
811 new_dim += new_dim
812 count += 1
813 if count > 10:
814 raise ValueError("Could not create new dimension.")
815 return new_dim
817 def _shift_data(self, data: xr.DataArray, index_value: range, time_dim: str, new_dim: str) -> xr.DataArray:
818 """
819 Shift data multiple times to create history or future along dimension new_dim for each time step.
821 :param data: data set to shift
822 :param index_value: range of integers to span history and/or future
823 :param time_dim: name of temporal dimension that should be shifted
824 :param new_dim: name of dimension create by data shift
825 :return: shifted data
826 """
827 coll = []
828 for i in index_value:
829 coll.append(data.shift({time_dim: -i}))
830 new_ind = self.create_index_array(new_dim, index_value)
831 return xr.concat(coll, dim=new_ind)
833 @staticmethod
834 def create_index_array(index_name: str, index_value: range):
835 """
836 Create index array from a range object to use as index of a data array.
838 :param index_name: name of the index dimension
839 :param index_value: range of values to use as indexes
840 :returns: index array for given range of values
841 """
842 ind = pd.DataFrame({'val': index_value}, index=index_value)
843 tmp_dim = index_name + "tmp"
844 res = xr.Dataset.from_dataframe(ind).to_array(tmp_dim).rename({'index': index_name})
845 res = res.squeeze(dim=tmp_dim, drop=True)
846 res.name = index_name
847 return res
849 @property
850 def apriori_data(self):
851 return self._apriori_list
853 @property
854 def initial_apriori_data(self):
855 return self.apriori_data[0]
858def fir_filter(data, fs, order=5, cutoff_low=None, cutoff_high=None, window="hamming", dim="variables", h=None,
859 causal=True, padlen=None):
860 """Expects xarray."""
861 if h is None:
862 cutoff = []
863 if cutoff_low is not None:
864 cutoff += [cutoff_low]
865 if cutoff_high is not None:
866 cutoff += [cutoff_high]
867 if len(cutoff) == 2:
868 filter_type = "bandpass"
869 elif len(cutoff) == 1 and cutoff_low is not None:
870 filter_type = "highpass"
871 elif len(cutoff) == 1 and cutoff_high is not None:
872 filter_type = "lowpass"
873 else:
874 raise ValueError("Please provide either cutoff_low or cutoff_high.")
875 h = signal.firwin(order, cutoff, pass_zero=filter_type, fs=fs, window=window)
876 filtered = xr.ones_like(data)
877 for var in data.coords[dim]:
878 d = data.sel({dim: var}).values.flatten()
879 if causal:
880 y = signal.lfilter(h, 1., d)
881 else:
882 padlen = padlen if padlen is not None else 3 * len(h)
883 y = signal.filtfilt(h, 1., d, padlen=padlen)
884 filtered.loc[{dim: var}] = y
885 return filtered, h
888def fir_filter_convolve(data, h):
889 return signal.convolve(data, h, mode='same', method="direct") / sum(h)
892class KolmogorovZurbenkoBaseClass:
894 def __init__(self, df, wl, itr, is_child=False, filter_dim="window"):
895 """
896 It create the variables associate with the Kolmogorov-Zurbenko-filter.
898 Args:
899 df(pd.DataFrame, None): time series of a variable
900 wl(list of int): window length
901 itr(list of int): number of iteration
902 """
903 self.df = df
904 self.filter_dim = filter_dim
905 self.wl = to_list(wl)
906 self.itr = to_list(itr)
907 if abs(len(self.wl) - len(self.itr)) > 0:
908 raise ValueError("Length of lists for wl and itr must agree!")
909 self._isChild = is_child
910 self.child = self.set_child()
911 self.type = type(self).__name__
913 def set_child(self):
914 if len(self.wl) > 1:
915 return KolmogorovZurbenkoBaseClass(None, self.wl[1:], self.itr[1:], True, self.filter_dim)
916 else:
917 return None
919 def kz_filter(self, df, m, k):
920 pass
922 def spectral_calc(self):
923 df_start = self.df
924 kz = self.kz_filter(df_start, self.wl[0], self.itr[0])
925 filtered = self.subtract(df_start, kz)
926 # case I: no child avail -> return kz and remaining
927 if self.child is None:
928 return [kz, filtered]
929 # case II: has child -> return current kz and all child results
930 else:
931 self.child.df = filtered
932 kz_next = self.child.spectral_calc()
933 return [kz] + kz_next
935 @staticmethod
936 def subtract(minuend, subtrahend):
937 try: # pandas implementation
938 return minuend.sub(subtrahend, axis=0)
939 except AttributeError: # general implementation
940 return minuend - subtrahend
942 def run(self):
943 return self.spectral_calc()
945 def transfer_function(self):
946 m = self.wl[0]
947 k = self.itr[0]
948 omega = np.linspace(0.00001, 0.15, 5000)
949 return omega, (np.sin(m * np.pi * omega) / (m * np.sin(np.pi * omega))) ** (2 * k)
951 def omega_null(self, alpha=0.5):
952 a = np.sqrt(6) / np.pi
953 b = 1 / (2 * np.array(self.itr))
954 c = 1 - alpha ** b
955 d = np.array(self.wl) ** 2 - alpha ** b
956 return a * np.sqrt(c / d)
958 def period_null(self, alpha=0.5):
959 return 1. / self.omega_null(alpha)
961 def period_null_days(self, alpha=0.5):
962 return self.period_null(alpha) / 24.
964 def plot_transfer_function(self, fig=None, name=None):
965 if fig is None:
966 fig = plt.figure()
967 omega, transfer_function = self.transfer_function()
968 if self.child is not None:
969 transfer_function_child = self.child.plot_transfer_function(fig)
970 else:
971 transfer_function_child = transfer_function * 0
972 plt.semilogx(omega, transfer_function - transfer_function_child,
973 label="m={:3.0f}, k={:3.0f}, T={:6.2f}d".format(self.wl[0],
974 self.itr[0],
975 self.period_null_days()))
976 plt.axvline(x=self.omega_null())
977 if not self._isChild:
978 locs, labels = plt.xticks()
979 plt.xticks(locs, np.round(1. / (locs * 24), 1))
980 plt.xlim([0.00001, 0.15])
981 plt.legend()
982 if name is None:
983 plt.show()
984 else:
985 plt.savefig(name)
986 else:
987 return transfer_function
990class KolmogorovZurbenkoFilterMovingWindow(KolmogorovZurbenkoBaseClass):
992 def __init__(self, df, wl: Union[list, int], itr: Union[list, int], is_child=False, filter_dim="window",
993 method="mean", percentile=0.5):
994 """
995 It create the variables associate with the KolmogorovZurbenkoFilterMovingWindow class.
997 Args:
998 df(pd.DataFrame, xr.DataArray): time series of a variable
999 wl: window length
1000 itr: number of iteration
1001 """
1002 self.valid_methods = ["mean", "percentile", "median", "max", "min"]
1003 if method not in self.valid_methods:
1004 raise ValueError("Method '{}' is not supported. Please select from [{}].".format(
1005 method, ", ".join(self.valid_methods)))
1006 else:
1007 self.method = method
1008 if percentile > 1 or percentile < 0:
1009 raise ValueError("Percentile must be in range [0, 1]. Given was {}!".format(percentile))
1010 else:
1011 self.percentile = percentile
1012 super().__init__(df, wl, itr, is_child, filter_dim)
1014 def set_child(self):
1015 if len(self.wl) > 1:
1016 return KolmogorovZurbenkoFilterMovingWindow(self.df, self.wl[1:], self.itr[1:], is_child=True,
1017 filter_dim=self.filter_dim, method=self.method,
1018 percentile=self.percentile)
1019 else:
1020 return None
1022 @TimeTrackingWrapper
1023 def kz_filter_new(self, df, wl, itr):
1024 """
1025 It passes the low frequency time series.
1027 If filter method is from mean, max, min this method will call construct and rechunk before the actual
1028 calculation to improve performance. If filter method is either median or percentile this approach is not
1029 applicable and depending on the data and window size, this method can become slow.
1031 Args:
1032 wl(int): a window length
1033 itr(int): a number of iteration
1034 """
1035 warnings.filterwarnings("ignore")
1036 df_itr = df.__deepcopy__()
1037 try:
1038 kwargs = {"min_periods": int(0.7 * wl),
1039 "center": True,
1040 self.filter_dim: wl}
1041 for i in np.arange(0, itr):
1042 print(i)
1043 rolling = df_itr.chunk().rolling(**kwargs)
1044 if self.method not in ["percentile", "median"]:
1045 rolling = rolling.construct("construct").chunk("auto")
1046 if self.method == "median":
1047 df_mv_avg_tmp = rolling.median()
1048 elif self.method == "percentile":
1049 df_mv_avg_tmp = rolling.quantile(self.percentile)
1050 elif self.method == "max":
1051 df_mv_avg_tmp = rolling.max("construct")
1052 elif self.method == "min":
1053 df_mv_avg_tmp = rolling.min("construct")
1054 else:
1055 df_mv_avg_tmp = rolling.mean("construct")
1056 df_itr = df_mv_avg_tmp.compute()
1057 del df_mv_avg_tmp, rolling
1058 gc.collect()
1059 return df_itr
1060 except ValueError:
1061 raise ValueError
1063 @TimeTrackingWrapper
1064 def kz_filter(self, df, wl, itr):
1065 """
1066 It passes the low frequency time series.
1068 Args:
1069 wl(int): a window length
1070 itr(int): a number of iteration
1071 """
1072 import warnings
1073 warnings.filterwarnings("ignore")
1074 df_itr = df.__deepcopy__()
1075 try:
1076 kwargs = {"min_periods": int(0.7 * wl),
1077 "center": True,
1078 self.filter_dim: wl}
1079 iter_vars = df_itr.coords["variables"].values
1080 for var in iter_vars:
1081 df_itr_var = df_itr.sel(variables=[var])
1082 for _ in np.arange(0, itr):
1083 df_itr_var = df_itr_var.chunk()
1084 rolling = df_itr_var.rolling(**kwargs)
1085 if self.method == "median":
1086 df_mv_avg_tmp = rolling.median()
1087 elif self.method == "percentile":
1088 df_mv_avg_tmp = rolling.quantile(self.percentile)
1089 elif self.method == "max":
1090 df_mv_avg_tmp = rolling.max()
1091 elif self.method == "min":
1092 df_mv_avg_tmp = rolling.min()
1093 else:
1094 df_mv_avg_tmp = rolling.mean()
1095 df_itr_var = df_mv_avg_tmp.compute()
1096 df_itr.loc[{"variables": [var]}] = df_itr_var
1097 return df_itr
1098 except ValueError:
1099 raise ValueError
1102def firwin_kzf(m: int, k: int) -> np.array:
1103 """Calculate weights of window for Kolmogorov Zurbenko filter."""
1104 m, k = int(m), int(k)
1105 coef = np.ones(m)
1106 for i in range(1, k):
1107 t = np.zeros((m, m + i * (m - 1)))
1108 for km in range(m):
1109 t[km, km:km + coef.size] = coef
1110 coef = np.sum(t, axis=0)
1111 return coef / (m ** k)
1114def omega_null_kzf(m: int, k: int, alpha: float = 0.5) -> float:
1115 a = np.sqrt(6) / np.pi
1116 b = 1 / (2 * np.array(k))
1117 c = 1 - alpha ** b
1118 d = np.array(m) ** 2 - alpha ** b
1119 return a * np.sqrt(c / d)
1122def filter_width_kzf(m: int, k: int) -> int:
1123 """Returns window width of the Kolmorogov Zurbenko filter."""
1124 return k * (m - 1) + 1