Coverage for mlair/helpers/data_sources/toar_data_v2.py: 69%
148 statements
« prev ^ index » next coverage.py v6.4.2, created at 2023-06-01 13:03 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2023-06-01 13:03 +0000
1"""Functions to access https://toar-data.fz-juelich.de/api/v2/"""
2__author__ = 'Lukas Leufen'
3__date__ = '2022-06-30'
6import logging
7from typing import Union, List, Dict
8from io import StringIO
10import pandas as pd
11import pytz
12from timezonefinder import TimezoneFinder
14from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
15from mlair.helpers import to_list
16from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name
18str_or_none = Union[str, None]
21def download_toar(station_name: Union[str, List[str]], stat_var: dict,
22 sampling: str = "daily", data_origin: Dict = None):
23 """
24 Download data from https://toar-data.fz-juelich.de/api/v2/
26 Uses station name to indicate measurement site and keys of stat_var to indicate variable name. If data origin is
27 given, this method tries to load time series for this origin. In case no origin is provided, this method loads data
28 with the highest priority according to toar-data's order parameter.
30 :param station_name:
31 :param stat_var:
32 :param sampling:
33 :param data_origin:
34 :return:
35 """
37 # make sure station_name parameter is a list
38 station_name = to_list(station_name)
40 # also ensure that given data_origin dict is no reference
41 if data_origin is None or len(data_origin) == 0: 41 ↛ 42line 41 didn't jump to line 42, because the condition on line 41 was never true
42 data_origin = None
43 else:
44 data_origin = {k: v for (k, v) in data_origin.items()}
46 # get data connection settings for meta
47 meta_url_base, headers = toar_data_v2_settings("meta")
49 # load variables
50 var_meta = load_variables_information(stat_var, meta_url_base, headers)
52 # load station meta
53 station_meta = load_station_information(station_name, meta_url_base, headers)
55 # load series information
56 timeseries_meta = load_timeseries_information(station_meta, var_meta, meta_url_base, headers, data_origin)
58 # # correct stat_var values if data is not aggregated (hourly)
59 # if sampling == "hourly":
60 # stat_var = {key: "values" for key in stat_var.keys()}
62 logging.info(f"load data for {station_meta['codes'][0]} from TOAR-DATA")
63 # get data connection settings for data
64 data_url_base, headers = toar_data_v2_settings(sampling)
66 data_dict = {}
67 for var, meta in timeseries_meta.items(): 67 ↛ 74line 67 didn't jump to line 74, because the loop on line 67 didn't complete
68 logging.debug(f"load {var}")
69 meta_and_opts = prepare_meta(meta, sampling, stat_var, var)
70 data_var = []
71 for var_meta, opts in meta_and_opts: 71 ↛ 73line 71 didn't jump to line 73, because the loop on line 71 didn't complete
72 data_var.extend(load_timeseries_data(var_meta, data_url_base, opts, headers, sampling))
73 data_dict[var] = merge_data(*data_var, sampling=sampling)
74 data = pd.DataFrame.from_dict(data_dict)
75 data = correct_timezone(data, station_meta, sampling)
77 meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()})
78 meta = pd.DataFrame.from_dict(meta, orient='index')
79 meta.columns = station_name
80 return data, meta
83def merge_data(*args, sampling="hourly"):
84 start_date = min(map(lambda x: x.index.min(), args))
85 end_date = max(map(lambda x: x.index.max(), args))
86 freq = {"hourly": "1H", "daily": "1d"}.get(sampling)
87 full_time = pd.date_range(start_date, end_date, freq=freq)
88 full_data = args[0].reindex(full_time)
89 if not isinstance(full_data, pd.DataFrame):
90 full_data = full_data.to_frame()
91 for d in args[1:]:
92 full_data.update(d, overwrite=False)
93 return full_data.squeeze()
96def correct_timezone(data, meta, sampling):
97 """
98 Extract timezone information and convert data index to this timezone.
100 Uses UTC if no information is provided. Note that is method only modifies data in with sampling='hourly'. In all
101 other cases, it returns just the given data without any change. This method expects date index of data to be in UTC.
102 Timezone information is not added to the index to get rid of daylight saving time and ambiguous timestamps.
103 """
104 if sampling == "hourly": 104 ↛ 105line 104 didn't jump to line 105, because the condition on line 104 was never true
105 tz_info = meta.get("timezone", "UTC")
106 try:
107 tz = pytz.timezone(tz_info)
108 except pytz.exceptions.UnknownTimeZoneError as e:
109 lon, lat = meta["coordinates"]["lng"], meta["coordinates"]["lat"]
110 tz = pytz.timezone(TimezoneFinder().timezone_at(lng=lon, lat=lat))
111 index = data.index
112 index = index.tz_localize(None)
113 utc_offset = tz.utcoffset(index[0]) - tz.dst(index[0])
114 data.index = index + utc_offset
115 return data
118def prepare_meta(meta, sampling, stat_var, var):
119 out = []
120 for m in meta:
121 opts = {}
122 if sampling == "daily": 122 ↛ 127line 122 didn't jump to line 127, because the condition on line 122 was never false
123 opts["timeseries_id"] = m.pop("id")
124 m["id"] = None
125 opts["names"] = stat_var[var]
126 opts["sampling"] = sampling
127 out.append(([m], opts))
128 return out
131def combine_meta_data(station_meta, timeseries_meta):
132 meta = {}
133 for k, v in station_meta.items():
134 if k == "codes":
135 meta[k] = v[0]
136 elif k in ["coordinates", "additional_metadata", "globalmeta"]:
137 for _key, _val in v.items():
138 if _key == "lng":
139 meta["lon"] = _val
140 else:
141 meta[_key] = _val
142 elif k in ["changelog", "roles", "annotations", "aux_images", "aux_docs", "aux_urls"]:
143 continue
144 else:
145 meta[k] = v
146 for var, var_meta in timeseries_meta.items():
147 for k, v in var_meta.items():
148 if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]:
149 continue
150 elif k == "roles":
151 for _key, _val in v[0]["contact"]["organisation"].items():
152 new_k = f"{var}_organisation_{_key}"
153 meta[new_k] = _val
154 elif k == "variable":
155 for _key, _val in v.items():
156 new_k = f"{var}_{_key}"
157 meta[new_k] = _val
158 else:
159 new_k = f"{var}_{k}"
160 meta[new_k] = v
161 return meta
164def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling):
165 coll = []
166 for meta in timeseries_meta: 166 ↛ 178line 166 didn't jump to line 178, because the loop on line 166 didn't complete
167 series_id = meta["id"]
168 # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"}
169 opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts}
170 if sampling != "hourly": 170 ↛ 172line 170 didn't jump to line 172, because the condition on line 170 was never false
171 opts["service"] = None
172 res = get_data(opts, headers, as_json=False)
173 data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True,
174 infer_datetime_format=True)
175 if len(data.index) > 0:
176 data = data[correct_stat_name(opts.get("names", "value"))].rename(meta["variable"]["name"])
177 coll.append(data)
178 return coll
181def load_station_information(station_name: List[str], url_base: str, headers: Dict):
182 # opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"}
183 opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]}
184 return get_data(opts, headers)
187def load_timeseries_information(station_meta, var_meta, url_base: str, headers: Dict,
188 data_origin: Dict = None) -> [Dict, Dict]:
189 timeseries_id_dict = {}
190 missing = []
191 for var, meta in var_meta.items():
192 timeseries_id_dict[var] = []
193 opts = {"base": url_base, "service": "search", "station_id": station_meta["id"], "variable_id": meta["id"]}
194 res = get_data(opts, headers)
195 if len(res) == 0: 195 ↛ 196line 195 didn't jump to line 196, because the condition on line 195 was never true
196 missing.append((var, meta))
197 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
198 # f"({station_meta['codes'][0]}) and variable id {meta['id']} ({var}).")
199 if data_origin is not None: 199 ↛ 206line 199 didn't jump to line 206, because the condition on line 199 was never false
200 var_origin = data_origin[var]
201 timeseries_id_dict[var] = select_timeseries_by_origin(res, var_origin)
202 # if len(timeseries_id_dict[var]) == 0:
203 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
204 # f"({station_meta['codes'][0]}), variable id {meta['id']} ({var}) "
205 # f"and timeseries origin {var_origin}.")
206 if data_origin is None or len(timeseries_id_dict[var]) == 0: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true
207 timeseries_id_dict[var] = select_timeseries_by_order(res)
208 if len(missing) > 0: 208 ↛ 209line 208 didn't jump to line 209, because the condition on line 208 was never true
209 missing = ",".join([f"{m[0]} ({m[1]['id']})" for m in missing])
210 raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
211 f"({station_meta['codes'][0]}) and variables {missing}.")
212 return timeseries_id_dict
215def select_timeseries_by_order(toar_meta):
216 order_dict = {meta["order"]: meta for meta in toar_meta}
217 res = [order_dict[order] for order in sorted(order_dict.keys())]
218 return res
221def select_timeseries_by_origin(toar_meta, var_origin):
222 res = []
223 for origin in to_list(var_origin):
224 for meta in toar_meta:
225 if meta["data_origin"] == "instrument":
226 for roles in meta["roles"]:
227 if roles["contact"]["organisation"]["name"].lower() == origin.lower():
228 res.append(meta)
229 break
230 elif meta["data_origin"].lower() == origin.lower(): 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true
231 res.append(meta)
232 break
233 return res
236def load_variables_information(var_dict, url_base, headers):
237 var_meta_dict = {}
238 for var in var_dict.keys():
239 opts = {"base": url_base, "service": f"variables", "param_id": var}
240 var_meta_dict[var] = get_data(opts, headers)
241 return var_meta_dict