Coverage for mlair/helpers/data_sources/toar_data_v2.py: 69%
144 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-12-02 15:24 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-12-02 15:24 +0000
1"""Functions to access https://toar-data.fz-juelich.de/api/v2/"""
2__author__ = 'Lukas Leufen'
3__date__ = '2022-06-30'
6import logging
7from typing import Union, List, Dict
8from io import StringIO
10import pandas as pd
11import pytz
12from timezonefinder import TimezoneFinder
14from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
15from mlair.helpers import to_list
16from mlair.helpers.data_sources.toar_data import EmptyQueryResult, get_data, correct_stat_name
19str_or_none = Union[str, None]
22def download_toar(station_name: Union[str, List[str]], stat_var: dict,
23 sampling: str = "daily", data_origin: Dict = None):
24 """
25 Download data from https://toar-data.fz-juelich.de/api/v2/
27 Uses station name to indicate measurement site and keys of stat_var to indicate variable name. If data origin is
28 given, this method tries to load time series for this origin. In case no origin is provided, this method loads data
29 with the highest priority according to toar-data's order parameter.
31 :param station_name:
32 :param stat_var:
33 :param sampling:
34 :param data_origin:
35 :return:
36 """
38 # make sure station_name parameter is a list
39 station_name = to_list(station_name)
41 # also ensure that given data_origin dict is no reference
42 if data_origin is None or len(data_origin) == 0: 42 ↛ 43line 42 didn't jump to line 43, because the condition on line 42 was never true
43 data_origin = None
44 else:
45 data_origin = {k: v for (k, v) in data_origin.items()}
47 # get data connection settings for meta
48 meta_url_base, headers = toar_data_v2_settings("meta")
50 # load variables
51 var_meta = load_variables_information(stat_var, meta_url_base, headers)
53 # load station meta
54 station_meta = load_station_information(station_name, meta_url_base, headers)
56 # load series information
57 timeseries_meta = load_timeseries_information(station_meta, var_meta, meta_url_base, headers, data_origin)
59 # # correct stat_var values if data is not aggregated (hourly)
60 # if sampling == "hourly":
61 # stat_var = {key: "values" for key in stat_var.keys()}
63 logging.info(f"load data for {station_meta['codes'][0]} from TOAR-DATA")
64 # get data connection settings for data
65 data_url_base, headers = toar_data_v2_settings(sampling)
67 data_dict = {}
68 for var, meta in timeseries_meta.items(): 68 ↛ 75line 68 didn't jump to line 75, because the loop on line 68 didn't complete
69 logging.debug(f"load {var}")
70 meta_and_opts = prepare_meta(meta, sampling, stat_var, var)
71 data_var = []
72 for var_meta, opts in meta_and_opts: 72 ↛ 74line 72 didn't jump to line 74, because the loop on line 72 didn't complete
73 data_var.extend(load_timeseries_data(var_meta, data_url_base, opts, headers, sampling))
74 data_dict[var] = merge_data(*data_var, sampling=sampling)
75 data = pd.DataFrame.from_dict(data_dict)
76 data = correct_timezone(data, station_meta, sampling)
78 meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()})
79 meta = pd.DataFrame.from_dict(meta, orient='index')
80 meta.columns = station_name
81 return data, meta
84def merge_data(*args, sampling="hourly"):
85 start_date = min(map(lambda x: x.index.min(), args))
86 end_date = max(map(lambda x: x.index.max(), args))
87 freq = {"hourly": "1H", "daily": "1d"}.get(sampling)
88 full_time = pd.date_range(start_date, end_date, freq=freq)
89 full_data = args[0].reindex(full_time)
90 if not isinstance(full_data, pd.DataFrame):
91 full_data = full_data.to_frame()
92 for d in args[1:]:
93 full_data.update(d, overwrite=False)
94 return full_data.squeeze()
97def correct_timezone(data, meta, sampling):
98 """
99 Extract timezone information and convert data index to this timezone.
101 Uses UTC if no information is provided. Note that is method only modifies data in with sampling='hourly'. In all
102 other cases, it returns just the given data without any change. This method expects date index of data to be in UTC.
103 Timezone information is not added to the index to get rid of daylight saving time and ambiguous timestamps.
104 """
105 if sampling == "hourly": 105 ↛ 106line 105 didn't jump to line 106, because the condition on line 105 was never true
106 tz_info = meta.get("timezone", "UTC")
107 try:
108 tz = pytz.timezone(tz_info)
109 except pytz.exceptions.UnknownTimeZoneError as e:
110 lon, lat = meta["coordinates"]["lng"], meta["coordinates"]["lat"]
111 tz = pytz.timezone(TimezoneFinder().timezone_at(lng=lon, lat=lat))
112 index = data.index
113 index = index.tz_localize(None)
114 utc_offset = tz.utcoffset(index[0]) - tz.dst(index[0])
115 data.index = index + utc_offset
116 return data
119def prepare_meta(meta, sampling, stat_var, var):
120 out = []
121 for m in meta:
122 opts = {}
123 if sampling == "daily": 123 ↛ 128line 123 didn't jump to line 128, because the condition on line 123 was never false
124 opts["timeseries_id"] = m.pop("id")
125 m["id"] = None
126 opts["names"] = stat_var[var]
127 opts["sampling"] = sampling
128 out.append(([m], opts))
129 return out
132def combine_meta_data(station_meta, timeseries_meta):
133 meta = {}
134 for k, v in station_meta.items():
135 if k == "codes":
136 meta[k] = v[0]
137 elif k in ["coordinates", "additional_metadata", "globalmeta"]:
138 for _key, _val in v.items():
139 if _key == "lng":
140 meta["lon"] = _val
141 else:
142 meta[_key] = _val
143 elif k in ["changelog", "roles", "annotations", "aux_images", "aux_docs", "aux_urls"]:
144 continue
145 else:
146 meta[k] = v
147 for var, var_meta in timeseries_meta.items():
148 for k, v in var_meta.items():
149 if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]:
150 continue
151 elif k == "roles":
152 for _key, _val in v[0]["contact"]["organisation"].items():
153 new_k = f"{var}_organisation_{_key}"
154 meta[new_k] = _val
155 elif k == "variable":
156 for _key, _val in v.items():
157 new_k = f"{var}_{_key}"
158 meta[new_k] = _val
159 else:
160 new_k = f"{var}_{k}"
161 meta[new_k] = v
162 return meta
165def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling):
166 coll = []
167 for meta in timeseries_meta: 167 ↛ 179line 167 didn't jump to line 179, because the loop on line 167 didn't complete
168 series_id = meta["id"]
169 # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"}
170 opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts}
171 if sampling != "hourly": 171 ↛ 173line 171 didn't jump to line 173, because the condition on line 171 was never false
172 opts["service"] = None
173 res = get_data(opts, headers, as_json=False)
174 data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True,
175 infer_datetime_format=True)
176 if len(data.index) > 0:
177 data = data[correct_stat_name(opts.get("names", "value"))].rename(meta["variable"]["name"])
178 coll.append(data)
179 return coll
182def load_station_information(station_name: List[str], url_base: str, headers: Dict):
183 # opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"}
184 opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]}
185 return get_data(opts, headers)
188def load_timeseries_information(station_meta, var_meta, url_base: str, headers: Dict,
189 data_origin: Dict = None) -> [Dict, Dict]:
190 timeseries_id_dict = {}
191 missing = []
192 for var, meta in var_meta.items():
193 timeseries_id_dict[var] = []
194 opts = {"base": url_base, "service": "search", "station_id": station_meta["id"], "variable_id": meta["id"]}
195 res = get_data(opts, headers)
196 if len(res) == 0: 196 ↛ 197line 196 didn't jump to line 197, because the condition on line 196 was never true
197 missing.append((var, meta))
198 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
199 # f"({station_meta['codes'][0]}) and variable id {meta['id']} ({var}).")
200 if data_origin is not None: 200 ↛ 207line 200 didn't jump to line 207, because the condition on line 200 was never false
201 var_origin = data_origin[var]
202 timeseries_id_dict[var] = select_timeseries_by_origin(res, var_origin)
203 # if len(timeseries_id_dict[var]) == 0:
204 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
205 # f"({station_meta['codes'][0]}), variable id {meta['id']} ({var}) "
206 # f"and timeseries origin {var_origin}.")
207 if data_origin is None or len(timeseries_id_dict[var]) == 0: 207 ↛ 208line 207 didn't jump to line 208, because the condition on line 207 was never true
208 timeseries_id_dict[var] = select_timeseries_by_order(res)
209 if len(missing) > 0: 209 ↛ 210line 209 didn't jump to line 210, because the condition on line 209 was never true
210 missing = ",".join([f"{m[0]} ({m[1]['id']})" for m in missing])
211 raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
212 f"({station_meta['codes'][0]}) and variables {missing}.")
213 return timeseries_id_dict
216def select_timeseries_by_order(toar_meta):
217 order_dict = {meta["order"]: meta for meta in toar_meta}
218 res = [order_dict[order] for order in sorted(order_dict.keys())]
219 return res
222def select_timeseries_by_origin(toar_meta, var_origin):
223 res = []
224 for origin in to_list(var_origin):
225 for meta in toar_meta:
226 for roles in meta["roles"]:
227 if roles["contact"]["organisation"]["name"].lower() == origin.lower():
228 res.append(meta)
229 break
230 return res
233def load_variables_information(var_dict, url_base, headers):
234 var_meta_dict = {}
235 for var in var_dict.keys():
236 opts = {"base": url_base, "service": f"variables", "param_id": var}
237 var_meta_dict[var] = get_data(opts, headers)
238 return var_meta_dict