Coverage for mlair/helpers/data_sources/toar_data_v2.py: 78%
158 statements
« prev ^ index » next coverage.py v6.4.2, created at 2023-06-30 10:22 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2023-06-30 10:22 +0000
1"""Functions to access https://toar-data.fz-juelich.de/api/v2/"""
2__author__ = 'Lukas Leufen'
3__date__ = '2022-06-30'
6import logging
7from typing import Union, List, Dict
8from io import StringIO
10import pandas as pd
11import pytz
12from timezonefinder import TimezoneFinder
13from io import BytesIO
14import zipfile
16from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
17from mlair.helpers import to_list
18from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name, get_data_with_query
20str_or_none = Union[str, None]
23def download_toar(station_name: Union[str, List[str]], stat_var: dict,
24 sampling: str = "daily", data_origin: Dict = None):
25 """
26 Download data from https://toar-data.fz-juelich.de/api/v2/
28 Uses station name to indicate measurement site and keys of stat_var to indicate variable name. If data origin is
29 given, this method tries to load time series for this origin. In case no origin is provided, this method loads data
30 with the highest priority according to toar-data's order parameter.
32 :param station_name:
33 :param stat_var:
34 :param sampling:
35 :param data_origin:
36 :return:
37 """
39 # make sure station_name parameter is a list
40 station_name = to_list(station_name)
42 # also ensure that given data_origin dict is no reference
43 if data_origin is None or len(data_origin) == 0: 43 ↛ 44line 43 didn't jump to line 44, because the condition on line 43 was never true
44 data_origin = None
45 else:
46 data_origin = {k: v for (k, v) in data_origin.items()}
48 # get data connection settings for meta
49 meta_url_base, headers = toar_data_v2_settings("meta")
51 # load variables
52 var_meta = load_variables_information(stat_var, meta_url_base, headers)
54 # load station meta
55 station_meta = load_station_information(station_name, meta_url_base, headers)
57 # load series information
58 timeseries_meta = load_timeseries_information(station_meta, var_meta, meta_url_base, headers, data_origin)
60 # # correct stat_var values if data is not aggregated (hourly)
61 # if sampling == "hourly":
62 # stat_var = {key: "values" for key in stat_var.keys()}
64 logging.info(f"load data for {station_meta['codes'][0]} from TOAR-DATA")
65 # get data connection settings for data
66 data_url_base, headers = toar_data_v2_settings(sampling)
68 data_dict = {}
69 for var, meta in timeseries_meta.items(): 69 ↛ 76line 69 didn't jump to line 76, because the loop on line 69 didn't complete
70 logging.debug(f"load {var}")
71 meta_and_opts = prepare_meta(meta, sampling, stat_var, var)
72 data_var = []
73 for var_meta, opts in meta_and_opts:
74 data_var.extend(load_timeseries_data(var_meta, data_url_base, opts, headers, sampling))
75 data_dict[var] = merge_data(*data_var, sampling=sampling)
76 data = pd.DataFrame.from_dict(data_dict)
77 data = correct_timezone(data, station_meta, sampling)
79 meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()})
80 meta = pd.DataFrame.from_dict(meta, orient='index')
81 meta.columns = station_name
82 return data, meta
85def merge_data(*args, sampling="hourly"):
86 start_date = min(map(lambda x: x.index.min(), args))
87 end_date = max(map(lambda x: x.index.max(), args))
88 freq = {"hourly": "1H", "daily": "1d"}.get(sampling)
89 full_time = pd.date_range(start_date, end_date, freq=freq)
90 full_data = args[0].reindex(full_time)
91 if not isinstance(full_data, pd.DataFrame): 91 ↛ 93line 91 didn't jump to line 93, because the condition on line 91 was never false
92 full_data = full_data.to_frame()
93 for d in args[1:]: 93 ↛ 94line 93 didn't jump to line 94, because the loop on line 93 never started
94 full_data.update(d, overwrite=False)
95 return full_data.squeeze()
98def correct_timezone(data, meta, sampling):
99 """
100 Extract timezone information and convert data index to this timezone.
102 Uses UTC if no information is provided. Note that is method only modifies data in with sampling='hourly'. In all
103 other cases, it returns just the given data without any change. This method expects date index of data to be in UTC.
104 Timezone information is not added to the index to get rid of daylight saving time and ambiguous timestamps.
105 """
106 if sampling == "hourly": 106 ↛ 107line 106 didn't jump to line 107, because the condition on line 106 was never true
107 tz_info = meta.get("timezone", "UTC")
108 try:
109 tz = pytz.timezone(tz_info)
110 except pytz.exceptions.UnknownTimeZoneError as e:
111 lon, lat = meta["coordinates"]["lng"], meta["coordinates"]["lat"]
112 tz = pytz.timezone(TimezoneFinder().timezone_at(lng=lon, lat=lat))
113 index = data.index
114 index = index.tz_localize(None)
115 utc_offset = tz.utcoffset(index[0]) - tz.dst(index[0])
116 data.index = index + utc_offset
117 return data
120def prepare_meta(meta, sampling, stat_var, var):
121 out = []
122 for m in meta:
123 opts = {}
124 if sampling == "daily": 124 ↛ 129line 124 didn't jump to line 129, because the condition on line 124 was never false
125 opts["id"] = m.pop("id")
126 m["id"] = None
127 opts["statistics"] = stat_var[var]
128 opts["sampling"] = sampling
129 out.append(([m], opts))
130 return out
133def combine_meta_data(station_meta, timeseries_meta):
134 meta = {}
135 for k, v in station_meta.items():
136 if k == "codes":
137 meta[k] = v[0]
138 elif k in ["coordinates", "additional_metadata", "globalmeta"]:
139 for _key, _val in v.items():
140 if _key == "lng":
141 meta["lon"] = _val
142 else:
143 meta[_key] = _val
144 elif k in ["changelog", "roles", "annotations", "aux_images", "aux_docs", "aux_urls"]:
145 continue
146 else:
147 meta[k] = v
148 for var, var_meta in timeseries_meta.items():
149 for k, v in var_meta.items():
150 if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]:
151 continue
152 elif k == "roles":
153 for _key, _val in v[0]["contact"]["organisation"].items():
154 new_k = f"{var}_organisation_{_key}"
155 meta[new_k] = _val
156 elif k == "variable":
157 for _key, _val in v.items():
158 new_k = f"{var}_{_key}"
159 meta[new_k] = _val
160 else:
161 new_k = f"{var}_{k}"
162 meta[new_k] = v
163 return meta
166def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling):
167 coll = []
168 for meta in timeseries_meta:
169 series_id = meta["id"]
170 # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"}
171 opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts}
172 if sampling == "hourly": 172 ↛ 173line 172 didn't jump to line 173, because the condition on line 172 was never true
173 res = get_data(opts, headers, as_json=False)
174 data = extract_timeseries_data(res, "string")
175 else:
176 opts["service"] = None
177 opts["format"] = None
178 res = get_data_with_query(opts, headers, as_json=False)
179 data = extract_timeseries_data(res, "bytes")
180 if len(data.index) > 0: 180 ↛ 168line 180 didn't jump to line 168, because the condition on line 180 was never false
181 data = data[correct_stat_name(opts.get("statistics", "value"))].rename(meta["variable"]["name"])
182 coll.append(data)
183 return coll
186def extract_timeseries_data(result, result_format):
187 if result_format == "string": 187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never true
188 return pd.read_csv(StringIO(result), comment="#", index_col="datetime", parse_dates=True,
189 infer_datetime_format=True)
190 elif result_format == "bytes": 190 ↛ 195line 190 didn't jump to line 195, because the condition on line 190 was never false
191 with zipfile.ZipFile(BytesIO(result)) as file:
192 return pd.read_csv(BytesIO(file.read(file.filelist[0].filename)), comment="#", index_col="datetime",
193 parse_dates=True)
194 else:
195 raise ValueError(f"Unknown result format given: {result_format}")
198def load_station_information(station_name: List[str], url_base: str, headers: Dict):
199 opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]}
200 return get_data(opts, headers)
203def load_timeseries_information(station_meta, var_meta, url_base: str, headers: Dict,
204 data_origin: Dict = None) -> [Dict, Dict]:
205 timeseries_id_dict = {}
206 missing = []
207 for var, meta in var_meta.items():
208 timeseries_id_dict[var] = []
209 opts = {"base": url_base, "service": "search", "station_id": station_meta["id"], "variable_id": meta["id"]}
210 res = get_data(opts, headers)
211 if len(res) == 0: 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true
212 missing.append((var, meta))
213 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
214 # f"({station_meta['codes'][0]}) and variable id {meta['id']} ({var}).")
215 if data_origin is not None: 215 ↛ 222line 215 didn't jump to line 222, because the condition on line 215 was never false
216 var_origin = data_origin[var]
217 timeseries_id_dict[var] = select_timeseries_by_origin(res, var_origin)
218 # if len(timeseries_id_dict[var]) == 0:
219 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
220 # f"({station_meta['codes'][0]}), variable id {meta['id']} ({var}) "
221 # f"and timeseries origin {var_origin}.")
222 if data_origin is None or len(timeseries_id_dict[var]) == 0: 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true
223 timeseries_id_dict[var] = select_timeseries_by_order(res)
224 if len(missing) > 0: 224 ↛ 225line 224 didn't jump to line 225, because the condition on line 224 was never true
225 missing = ",".join([f"{m[0]} ({m[1]['id']})" for m in missing])
226 raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} "
227 f"({station_meta['codes'][0]}) and variables {missing}.")
228 return timeseries_id_dict
231def select_timeseries_by_order(toar_meta):
232 order_dict = {meta["order"]: meta for meta in toar_meta}
233 res = [order_dict[order] for order in sorted(order_dict.keys())]
234 return res
237def select_timeseries_by_origin(toar_meta, var_origin):
238 res = []
239 for origin in to_list(var_origin):
240 for meta in toar_meta:
241 if meta["data_origin"] == "instrument":
242 for roles in meta["roles"]:
243 if roles["contact"]["organisation"]["name"].lower() == origin.lower():
244 res.append(meta)
245 break
246 elif meta["data_origin"].lower() == origin.lower(): 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true
247 res.append(meta)
248 break
249 return res
252def load_variables_information(var_dict, url_base, headers):
253 var_meta_dict = {}
254 for var in var_dict.keys():
255 opts = {"base": url_base, "service": f"variables", "param_id": var}
256 var_meta_dict[var] = get_data(opts, headers)
257 return var_meta_dict