Coverage for mlair/helpers/data_sources/toar_data_v2.py: 69%

148 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-06-01 13:03 +0000

1"""Functions to access https://toar-data.fz-juelich.de/api/v2/""" 

2__author__ = 'Lukas Leufen' 

3__date__ = '2022-06-30' 

4 

5 

6import logging 

7from typing import Union, List, Dict 

8from io import StringIO 

9 

10import pandas as pd 

11import pytz 

12from timezonefinder import TimezoneFinder 

13 

14from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings 

15from mlair.helpers import to_list 

16from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name 

17 

18str_or_none = Union[str, None] 

19 

20 

21def download_toar(station_name: Union[str, List[str]], stat_var: dict, 

22 sampling: str = "daily", data_origin: Dict = None): 

23 """ 

24 Download data from https://toar-data.fz-juelich.de/api/v2/ 

25 

26 Uses station name to indicate measurement site and keys of stat_var to indicate variable name. If data origin is 

27 given, this method tries to load time series for this origin. In case no origin is provided, this method loads data 

28 with the highest priority according to toar-data's order parameter. 

29 

30 :param station_name: 

31 :param stat_var: 

32 :param sampling: 

33 :param data_origin: 

34 :return: 

35 """ 

36 

37 # make sure station_name parameter is a list 

38 station_name = to_list(station_name) 

39 

40 # also ensure that given data_origin dict is no reference 

41 if data_origin is None or len(data_origin) == 0: 41 ↛ 42line 41 didn't jump to line 42, because the condition on line 41 was never true

42 data_origin = None 

43 else: 

44 data_origin = {k: v for (k, v) in data_origin.items()} 

45 

46 # get data connection settings for meta 

47 meta_url_base, headers = toar_data_v2_settings("meta") 

48 

49 # load variables 

50 var_meta = load_variables_information(stat_var, meta_url_base, headers) 

51 

52 # load station meta 

53 station_meta = load_station_information(station_name, meta_url_base, headers) 

54 

55 # load series information 

56 timeseries_meta = load_timeseries_information(station_meta, var_meta, meta_url_base, headers, data_origin) 

57 

58 # # correct stat_var values if data is not aggregated (hourly) 

59 # if sampling == "hourly": 

60 # stat_var = {key: "values" for key in stat_var.keys()} 

61 

62 logging.info(f"load data for {station_meta['codes'][0]} from TOAR-DATA") 

63 # get data connection settings for data 

64 data_url_base, headers = toar_data_v2_settings(sampling) 

65 

66 data_dict = {} 

67 for var, meta in timeseries_meta.items(): 67 ↛ 74line 67 didn't jump to line 74, because the loop on line 67 didn't complete

68 logging.debug(f"load {var}") 

69 meta_and_opts = prepare_meta(meta, sampling, stat_var, var) 

70 data_var = [] 

71 for var_meta, opts in meta_and_opts: 71 ↛ 73line 71 didn't jump to line 73, because the loop on line 71 didn't complete

72 data_var.extend(load_timeseries_data(var_meta, data_url_base, opts, headers, sampling)) 

73 data_dict[var] = merge_data(*data_var, sampling=sampling) 

74 data = pd.DataFrame.from_dict(data_dict) 

75 data = correct_timezone(data, station_meta, sampling) 

76 

77 meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()}) 

78 meta = pd.DataFrame.from_dict(meta, orient='index') 

79 meta.columns = station_name 

80 return data, meta 

81 

82 

83def merge_data(*args, sampling="hourly"): 

84 start_date = min(map(lambda x: x.index.min(), args)) 

85 end_date = max(map(lambda x: x.index.max(), args)) 

86 freq = {"hourly": "1H", "daily": "1d"}.get(sampling) 

87 full_time = pd.date_range(start_date, end_date, freq=freq) 

88 full_data = args[0].reindex(full_time) 

89 if not isinstance(full_data, pd.DataFrame): 

90 full_data = full_data.to_frame() 

91 for d in args[1:]: 

92 full_data.update(d, overwrite=False) 

93 return full_data.squeeze() 

94 

95 

96def correct_timezone(data, meta, sampling): 

97 """ 

98 Extract timezone information and convert data index to this timezone. 

99 

100 Uses UTC if no information is provided. Note that is method only modifies data in with sampling='hourly'. In all 

101 other cases, it returns just the given data without any change. This method expects date index of data to be in UTC. 

102 Timezone information is not added to the index to get rid of daylight saving time and ambiguous timestamps. 

103 """ 

104 if sampling == "hourly": 104 ↛ 105line 104 didn't jump to line 105, because the condition on line 104 was never true

105 tz_info = meta.get("timezone", "UTC") 

106 try: 

107 tz = pytz.timezone(tz_info) 

108 except pytz.exceptions.UnknownTimeZoneError as e: 

109 lon, lat = meta["coordinates"]["lng"], meta["coordinates"]["lat"] 

110 tz = pytz.timezone(TimezoneFinder().timezone_at(lng=lon, lat=lat)) 

111 index = data.index 

112 index = index.tz_localize(None) 

113 utc_offset = tz.utcoffset(index[0]) - tz.dst(index[0]) 

114 data.index = index + utc_offset 

115 return data 

116 

117 

118def prepare_meta(meta, sampling, stat_var, var): 

119 out = [] 

120 for m in meta: 

121 opts = {} 

122 if sampling == "daily": 122 ↛ 127line 122 didn't jump to line 127, because the condition on line 122 was never false

123 opts["timeseries_id"] = m.pop("id") 

124 m["id"] = None 

125 opts["names"] = stat_var[var] 

126 opts["sampling"] = sampling 

127 out.append(([m], opts)) 

128 return out 

129 

130 

131def combine_meta_data(station_meta, timeseries_meta): 

132 meta = {} 

133 for k, v in station_meta.items(): 

134 if k == "codes": 

135 meta[k] = v[0] 

136 elif k in ["coordinates", "additional_metadata", "globalmeta"]: 

137 for _key, _val in v.items(): 

138 if _key == "lng": 

139 meta["lon"] = _val 

140 else: 

141 meta[_key] = _val 

142 elif k in ["changelog", "roles", "annotations", "aux_images", "aux_docs", "aux_urls"]: 

143 continue 

144 else: 

145 meta[k] = v 

146 for var, var_meta in timeseries_meta.items(): 

147 for k, v in var_meta.items(): 

148 if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]: 

149 continue 

150 elif k == "roles": 

151 for _key, _val in v[0]["contact"]["organisation"].items(): 

152 new_k = f"{var}_organisation_{_key}" 

153 meta[new_k] = _val 

154 elif k == "variable": 

155 for _key, _val in v.items(): 

156 new_k = f"{var}_{_key}" 

157 meta[new_k] = _val 

158 else: 

159 new_k = f"{var}_{k}" 

160 meta[new_k] = v 

161 return meta 

162 

163 

164def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling): 

165 coll = [] 

166 for meta in timeseries_meta: 166 ↛ 178line 166 didn't jump to line 178, because the loop on line 166 didn't complete

167 series_id = meta["id"] 

168 # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} 

169 opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} 

170 if sampling != "hourly": 170 ↛ 172line 170 didn't jump to line 172, because the condition on line 170 was never false

171 opts["service"] = None 

172 res = get_data(opts, headers, as_json=False) 

173 data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, 

174 infer_datetime_format=True) 

175 if len(data.index) > 0: 

176 data = data[correct_stat_name(opts.get("names", "value"))].rename(meta["variable"]["name"]) 

177 coll.append(data) 

178 return coll 

179 

180 

181def load_station_information(station_name: List[str], url_base: str, headers: Dict): 

182 # opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"} 

183 opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]} 

184 return get_data(opts, headers) 

185 

186 

187def load_timeseries_information(station_meta, var_meta, url_base: str, headers: Dict, 

188 data_origin: Dict = None) -> [Dict, Dict]: 

189 timeseries_id_dict = {} 

190 missing = [] 

191 for var, meta in var_meta.items(): 

192 timeseries_id_dict[var] = [] 

193 opts = {"base": url_base, "service": "search", "station_id": station_meta["id"], "variable_id": meta["id"]} 

194 res = get_data(opts, headers) 

195 if len(res) == 0: 195 ↛ 196line 195 didn't jump to line 196, because the condition on line 195 was never true

196 missing.append((var, meta)) 

197 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " 

198 # f"({station_meta['codes'][0]}) and variable id {meta['id']} ({var}).") 

199 if data_origin is not None: 199 ↛ 206line 199 didn't jump to line 206, because the condition on line 199 was never false

200 var_origin = data_origin[var] 

201 timeseries_id_dict[var] = select_timeseries_by_origin(res, var_origin) 

202 # if len(timeseries_id_dict[var]) == 0: 

203 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " 

204 # f"({station_meta['codes'][0]}), variable id {meta['id']} ({var}) " 

205 # f"and timeseries origin {var_origin}.") 

206 if data_origin is None or len(timeseries_id_dict[var]) == 0: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true

207 timeseries_id_dict[var] = select_timeseries_by_order(res) 

208 if len(missing) > 0: 208 ↛ 209line 208 didn't jump to line 209, because the condition on line 208 was never true

209 missing = ",".join([f"{m[0]} ({m[1]['id']})" for m in missing]) 

210 raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " 

211 f"({station_meta['codes'][0]}) and variables {missing}.") 

212 return timeseries_id_dict 

213 

214 

215def select_timeseries_by_order(toar_meta): 

216 order_dict = {meta["order"]: meta for meta in toar_meta} 

217 res = [order_dict[order] for order in sorted(order_dict.keys())] 

218 return res 

219 

220 

221def select_timeseries_by_origin(toar_meta, var_origin): 

222 res = [] 

223 for origin in to_list(var_origin): 

224 for meta in toar_meta: 

225 if meta["data_origin"] == "instrument": 

226 for roles in meta["roles"]: 

227 if roles["contact"]["organisation"]["name"].lower() == origin.lower(): 

228 res.append(meta) 

229 break 

230 elif meta["data_origin"].lower() == origin.lower(): 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true

231 res.append(meta) 

232 break 

233 return res 

234 

235 

236def load_variables_information(var_dict, url_base, headers): 

237 var_meta_dict = {} 

238 for var in var_dict.keys(): 

239 opts = {"base": url_base, "service": f"variables", "param_id": var} 

240 var_meta_dict[var] = get_data(opts, headers) 

241 return var_meta_dict