Coverage for mlair/helpers/data_sources/join.py: 92%
166 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-12-02 15:24 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-12-02 15:24 +0000
1"""Functions to access join database."""
2__author__ = 'Felix Kleinert, Lukas Leufen'
3__date__ = '2019-10-16'
5import datetime as dt
6import logging
7from typing import Iterator, Union, List, Dict, Tuple
9import pandas as pd
11from mlair import helpers
12from mlair.configuration.join_settings import join_settings
13from mlair.helpers.data_sources import toar_data, toar_data_v2
16# join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/'
17str_or_none = Union[str, None]
20def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None,
21 sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, pd.DataFrame]:
22 """
23 Read data from JOIN/TOAR.
25 :param station_name: Station name e.g. DEBY122
26 :param stat_var: key as variable like 'O3', values as statistics on keys like 'mean'
27 :param station_type: set the station type like "traffic" or "background", can be none
28 :param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily)
29 :param data_origin: additional dictionary to specify data origin as key (for variable) value (origin) pair. Valid
30 origins are "REA" for reanalysis data and "" (empty string) for observational data.
32 :returns: data frame with all variables and statistics and meta data frame with all meta information
33 """
34 # make sure station_name parameter is a list
35 station_name = helpers.to_list(station_name)
37 # split network and origin information
38 data_origin, network_name = split_network_and_origin(data_origin)
40 # get data connection settings
41 join_url_base, headers = join_settings(sampling)
43 # load series information
44 vars_dict, data_origin = load_series_information(station_name, station_type, network_name, join_url_base, headers,
45 data_origin, stat_var)
47 # check if all requested variables are available
48 if set(stat_var).issubset(vars_dict) is False:
49 missing_variables = set(stat_var).difference(vars_dict)
50 origin = helpers.select_from_dict(data_origin, missing_variables)
51 options = f"station={station_name}, type={station_type}, network={network_name}, origin={origin}"
52 raise toar_data.EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in "
53 f"JOIN.")
55 # correct stat_var values if data is not aggregated (hourly)
56 if sampling == "hourly": 56 ↛ 57line 56 didn't jump to line 57, because the condition on line 56 was never true
57 stat_var = {key: "values" for key in stat_var.keys()}
59 # download all variables with given statistic
60 data = None
61 df = None
62 meta = {}
63 logging.info(f"load data for {station_name[0]} from JOIN")
64 for var in _lower_list(sorted(vars_dict.keys())):
65 if var in stat_var.keys(): 65 ↛ 64line 65 didn't jump to line 64, because the condition on line 65 was never false
67 logging.debug('load: {}'.format(var))
69 # create data link
70 opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': stat_var[var],
71 'sampling': sampling, 'capture': 0, 'format': 'json'}
73 # load data
74 data = toar_data.get_data(opts, headers)
76 # adjust data format if given as list of list
77 # no branch cover because this just happens when downloading hourly data using a secret token, not available
78 # for CI testing.
79 if isinstance(data, list): # pragma: no branch
80 data = correct_data_format(data)
82 # correct namespace of statistics
83 stat = toar_data.correct_stat_name(stat_var[var])
85 # store data in pandas dataframe
86 df = _save_to_pandas(df, data, stat, var)
87 meta[var] = _correct_meta(data["metadata"])
89 logging.debug('finished: {}'.format(var))
91 if data: 91 ↛ 103line 91 didn't jump to line 103, because the condition on line 91 was never false
92 # load station meta using toar-data v2 API and convert to local timezone
93 meta_url_base, headers = toar_data_v2.toar_data_v2_settings("meta")
94 station_meta = toar_data_v2.load_station_information(station_name, meta_url_base, headers)
95 df = toar_data_v2.correct_timezone(df, station_meta, sampling)
97 # create meta data
98 meta = toar_data_v2.combine_meta_data(station_meta, meta)
99 meta = pd.DataFrame.from_dict(meta, orient='index')
100 meta.columns = station_name
101 return df, meta
102 else:
103 raise toar_data.EmptyQueryResult("No data found in JOIN.")
106def _correct_meta(meta):
107 meta_out = {}
108 for k, v in meta.items():
109 if k.startswith("station"):
110 _k = k.split("_", 1)[1]
111 _d = meta_out.get("station", {})
112 _d[_k] = v
113 meta_out["station"] = _d
114 elif k.startswith("parameter"):
115 _k = k.split("_", 1)[1]
116 _d = meta_out.get("variable", {})
117 _d[_k] = v
118 meta_out["variable"] = _d
119 elif k == "network_name":
120 if v == "AIRBASE": 120 ↛ 121line 120 didn't jump to line 121, because the condition on line 120 was never true
121 _d = {"name": "EEA", "longname": "European Environment Agency", "kind": "government"}
122 elif v == "UBA": 122 ↛ 125line 122 didn't jump to line 125, because the condition on line 122 was never false
123 _d = {"name": "UBA", "longname": "Umweltbundesamt", "kind": "government", "country": "Germany"}
124 else:
125 _d = {"name": v}
126 meta_out["roles"] = [{"contact": {"organisation": _d}}]
127 elif k in ["google_resolution", "numid"]:
128 continue
129 else:
130 meta_out[k] = v
131 return meta_out
134def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]:
135 """
136 Split given dict into network and data origin.
138 Method is required to transform Toar-Data v2 structure (using only origin) into Toar-Data v1 (JOIN) structure (which
139 uses origin and network parameter). Furthermore, EEA network (v2) is renamed to AIRBASE (v1).
140 """
141 if origin_network_dict is None or len(origin_network_dict) == 0:
142 data_origin, network = None, None
143 else:
144 data_origin = {}
145 network = {}
146 for k, v in origin_network_dict.items():
147 network[k] = []
148 for _network in helpers.to_list(v):
149 if _network.lower() == "EEA".lower(): 149 ↛ 150line 149 didn't jump to line 150, because the condition on line 149 was never true
150 network[k].append("AIRBASE")
151 elif _network.lower() != "REA".lower(): 151 ↛ 148line 151 didn't jump to line 148, because the condition on line 151 was never false
152 network[k].append(_network)
153 if "REA" in v: 153 ↛ 154line 153 didn't jump to line 154, because the condition on line 153 was never true
154 data_origin[k] = "REA"
155 else:
156 data_origin[k] = ""
157 network[k] = filter_network(network[k])
158 return data_origin, network
161def filter_network(network: list) -> Union[list, None]:
162 """
163 Filter given list of networks.
165 :param network: list of various network names (can contain duplicates)
166 :return: sorted list with unique entries
167 """
168 sorted_network = []
169 for v in list(filter(lambda x: x != "", network)):
170 if v not in sorted_network: 170 ↛ 169line 170 didn't jump to line 169, because the condition on line 170 was never false
171 sorted_network.append(v)
172 if len(sorted_network) == 0:
173 sorted_network = None
174 return sorted_network
177def correct_data_format(data):
178 """
179 Transform to the standard data format.
181 For some cases (e.g. hourly data), the data is returned as list instead of a dictionary with keys datetime, values
182 and metadata. This functions addresses this issue and transforms the data into the dictionary version.
184 :param data: data in hourly format
186 :return: the same data but formatted to fit with aggregated format
187 """
188 formatted = {"datetime": [],
189 "values": [],
190 "metadata": data[-1]}
191 for d in data[:-1]:
192 for k, v in zip(["datetime", "values"], d):
193 formatted[k].append(v)
194 return formatted
197def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none,
198 join_url_base: str, headers: Dict, data_origin: Dict = None, stat_var: Dict = None) -> [Dict, Dict]:
199 """
200 List all series ids that are available for given station id and network name.
202 :param station_name: Station name e.g. DEBW107
203 :param station_type: station type like "traffic" or "background"
204 :param network_name: measurement network of the station like "UBA" or "AIRBASE"
205 :param join_url_base: base url name to download data from
206 :param headers: additional headers information like authorization, can be empty
207 :param data_origin: additional information to select a distinct series e.g. from reanalysis (REA) or from observation
208 ("", empty string). This dictionary should contain a key for each variable and the information as key
209 :return: all available series for requested station stored in an dictionary with parameter name (variable) as key
210 and the series id as value.
211 """
212 network_name_opts = _create_network_name_opts(network_name)
213 parameter_name_opts = _create_parameter_name_opts(stat_var)
214 opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type,
215 "network_name": network_name_opts, "as_dict": "true", "parameter_name": parameter_name_opts,
216 "columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"}
217 station_vars = toar_data.get_data(opts, headers)
218 logging.debug(f"{station_name}: {station_vars}")
219 return _select_distinct_series(station_vars, data_origin, network_name)
222def _create_parameter_name_opts(stat_var):
223 if stat_var is None:
224 parameter_name_opts = None
225 else:
226 parameter_name_opts = ",".join(stat_var.keys())
227 return parameter_name_opts
230def _create_network_name_opts(network_name):
231 if network_name is None:
232 network_name_opts = network_name
233 elif isinstance(network_name, list): 233 ↛ 234line 233 didn't jump to line 234, because the condition on line 233 was never true
234 network_name_opts = ",".join(helpers.to_list(network_name))
235 elif isinstance(network_name, dict): 235 ↛ 242line 235 didn't jump to line 242, because the condition on line 235 was never false
236 _network = []
237 for v in network_name.values():
238 _network.extend(helpers.to_list(v))
239 network_name_opts = ",".join(filter(lambda x: x is not None, set(_network)))
240 network_name_opts = None if len(network_name_opts) == 0 else network_name_opts
241 else:
242 raise TypeError(f"network_name parameter must be of type None, list, or dict. Given is {type(network_name)}.")
243 return network_name_opts
246def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_name: Union[str, List[str]] = None) \
247 -> [Dict, Dict]:
248 """
249 Select distinct series ids for all variables. Also check if a parameter is from REA or not.
250 """
251 data_origin = {} if data_origin is None else data_origin
252 selected, data_origin = _select_distinct_data_origin(vars, data_origin)
254 network_name = [] if network_name is None else network_name
255 selected = _select_distinct_network(selected, network_name)
257 # extract id
258 selected = {k: v["id"] for k, v in selected.items()}
259 return selected, data_origin
262def _select_distinct_network(vars: dict, network_name: Union[list, dict]) -> dict:
263 """
264 Select distinct series regarding network name. The order the network names are provided in parameter `network_name`
265 indicates priority (from high to low). If no network name is provided, first entry is used and a logging info is
266 issued. In case network names are given but no match can be found, this method raises a ValueError.
268 :param vars: dictionary with all series candidates already grouped by variable name as key. Value should be a list
269 of possible candidates to select from. Each candidate must be a dictionary with at least keys `id` and
270 `network_name`.
271 :param network_name: list of networks to use with increasing priority (1st element has priority). Can be empty list
272 indicating to use always first candidate for each variable.
273 :return: dictionary with single series reference for each variable
274 """
275 if isinstance(network_name, (list, str)):
276 network_name = {var: helpers.to_list(network_name) for var in vars.keys()}
277 selected = {}
278 for var, series in vars.items():
279 res = []
280 network_list = helpers.to_list(network_name.get(var, []) or [])
281 for network in network_list:
282 res.extend(list(filter(lambda x: x["network_name"].upper() == network.upper(), series)))
283 if len(res) > 0: # use first match which has the highest priority
284 selected[var] = res[0]
285 else:
286 if len(network_list) == 0: # just print message which network is used if none is provided
287 selected[var] = series[0]
288 logging.info(f"Could not find a valid match for variable {var} and networks {network_name.get(var, [])}"
289 f"! Therefore, use first answer from JOIN: {series[0]}")
290 else: # raise error if network name is provided but no match could be found
291 raise ValueError(f"Cannot find a valid match for requested networks {network_name.get(var, [])} and "
292 f"variable {var} as only following networks are available in JOIN: "
293 f"{list(map(lambda x: x['network_name'], series))}")
294 return selected
297def _select_distinct_data_origin(vars: List[Dict], data_origin: Dict) -> (Dict[str, List], Dict):
298 """
299 Select distinct series regarding their data origin. Series are grouped as list according to their variable's name.
300 As series can be reported with different network attribution, results might contain multiple entries for a variable.
301 This method assumes the default data origin for chemical variables as `` (empty source) and for meteorological
302 variables as `REA`.
303 :param vars: list of all entries to check data origin for
304 :param data_origin: data origin to match series with, if empty default values are used
305 :return: dictionary with unique variable names as keys and list of respective series as values
306 """
307 data_origin_default = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA",
308 "temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA",
309 "no": "", "no2": "", "o3": "", "pm10": "", "so2": ""}
310 selected = {}
311 for var in vars:
312 name = var["parameter_name"].lower()
313 var_attr = var["parameter_attribute"].lower()
314 if name not in data_origin.keys():
315 data_origin.update({name: data_origin_default.get(name, "")})
316 attr = data_origin.get(name, "").lower()
317 if var_attr == attr:
318 selected[name] = selected.get(name, []) + helpers.to_list(var)
319 return selected, data_origin
322def _save_to_pandas(df: Union[pd.DataFrame, None], data: dict, stat: str, var: str) -> pd.DataFrame:
323 """
324 Save given data in data frame.
326 If given data frame is not empty, the data is appened as new column.
328 :param df: data frame to append the new data, can be none
329 :param data: new data to append or format as data frame containing the keys 'datetime' and '<stat>'
330 :param stat: extracted statistic to get values from data (e.g. 'mean', 'dma8eu')
331 :param var: variable the data is from (e.g. 'o3')
333 :return: new created or concatenated data frame
334 """
335 if len(data["datetime"][0]) == 19:
336 str_format = "%Y-%m-%d %H:%M:%S"
337 else:
338 str_format = "%Y-%m-%d %H:%M"
339 index = map(lambda s: dt.datetime.strptime(s, str_format), data['datetime'])
340 if df is None:
341 df = pd.DataFrame(data[stat], index=index, columns=[var])
342 else:
343 df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1)
344 return df
347def _lower_list(args: List[str]) -> Iterator[str]:
348 """
349 Lower all elements of given list.
351 :param args: list with string entries to lower
353 :return: iterator that lowers all list entries
354 """
355 for string in args:
356 yield string.lower()
359if __name__ == "__main__":
360 logging.basicConfig(level=logging.DEBUG)
361 var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values',
362 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values',
363 'pblheight': 'maximum'}
364 station = 'DEBW107'
365 # download_join(station, var_all_dic, sampling="daily")
366 download_join(station, var_all_dic, sampling="hourly")