Coverage for mlair/helpers/data_sources/toar_data_v2.py: 69%

144 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-12-02 15:24 +0000

1"""Functions to access https://toar-data.fz-juelich.de/api/v2/""" 

2__author__ = 'Lukas Leufen' 

3__date__ = '2022-06-30' 

4 

5 

6import logging 

7from typing import Union, List, Dict 

8from io import StringIO 

9 

10import pandas as pd 

11import pytz 

12from timezonefinder import TimezoneFinder 

13 

14from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings 

15from mlair.helpers import to_list 

16from mlair.helpers.data_sources.toar_data import EmptyQueryResult, get_data, correct_stat_name 

17 

18 

19str_or_none = Union[str, None] 

20 

21 

22def download_toar(station_name: Union[str, List[str]], stat_var: dict, 

23 sampling: str = "daily", data_origin: Dict = None): 

24 """ 

25 Download data from https://toar-data.fz-juelich.de/api/v2/ 

26 

27 Uses station name to indicate measurement site and keys of stat_var to indicate variable name. If data origin is 

28 given, this method tries to load time series for this origin. In case no origin is provided, this method loads data 

29 with the highest priority according to toar-data's order parameter. 

30 

31 :param station_name: 

32 :param stat_var: 

33 :param sampling: 

34 :param data_origin: 

35 :return: 

36 """ 

37 

38 # make sure station_name parameter is a list 

39 station_name = to_list(station_name) 

40 

41 # also ensure that given data_origin dict is no reference 

42 if data_origin is None or len(data_origin) == 0: 42 ↛ 43line 42 didn't jump to line 43, because the condition on line 42 was never true

43 data_origin = None 

44 else: 

45 data_origin = {k: v for (k, v) in data_origin.items()} 

46 

47 # get data connection settings for meta 

48 meta_url_base, headers = toar_data_v2_settings("meta") 

49 

50 # load variables 

51 var_meta = load_variables_information(stat_var, meta_url_base, headers) 

52 

53 # load station meta 

54 station_meta = load_station_information(station_name, meta_url_base, headers) 

55 

56 # load series information 

57 timeseries_meta = load_timeseries_information(station_meta, var_meta, meta_url_base, headers, data_origin) 

58 

59 # # correct stat_var values if data is not aggregated (hourly) 

60 # if sampling == "hourly": 

61 # stat_var = {key: "values" for key in stat_var.keys()} 

62 

63 logging.info(f"load data for {station_meta['codes'][0]} from TOAR-DATA") 

64 # get data connection settings for data 

65 data_url_base, headers = toar_data_v2_settings(sampling) 

66 

67 data_dict = {} 

68 for var, meta in timeseries_meta.items(): 68 ↛ 75line 68 didn't jump to line 75, because the loop on line 68 didn't complete

69 logging.debug(f"load {var}") 

70 meta_and_opts = prepare_meta(meta, sampling, stat_var, var) 

71 data_var = [] 

72 for var_meta, opts in meta_and_opts: 72 ↛ 74line 72 didn't jump to line 74, because the loop on line 72 didn't complete

73 data_var.extend(load_timeseries_data(var_meta, data_url_base, opts, headers, sampling)) 

74 data_dict[var] = merge_data(*data_var, sampling=sampling) 

75 data = pd.DataFrame.from_dict(data_dict) 

76 data = correct_timezone(data, station_meta, sampling) 

77 

78 meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()}) 

79 meta = pd.DataFrame.from_dict(meta, orient='index') 

80 meta.columns = station_name 

81 return data, meta 

82 

83 

84def merge_data(*args, sampling="hourly"): 

85 start_date = min(map(lambda x: x.index.min(), args)) 

86 end_date = max(map(lambda x: x.index.max(), args)) 

87 freq = {"hourly": "1H", "daily": "1d"}.get(sampling) 

88 full_time = pd.date_range(start_date, end_date, freq=freq) 

89 full_data = args[0].reindex(full_time) 

90 if not isinstance(full_data, pd.DataFrame): 

91 full_data = full_data.to_frame() 

92 for d in args[1:]: 

93 full_data.update(d, overwrite=False) 

94 return full_data.squeeze() 

95 

96 

97def correct_timezone(data, meta, sampling): 

98 """ 

99 Extract timezone information and convert data index to this timezone. 

100 

101 Uses UTC if no information is provided. Note that is method only modifies data in with sampling='hourly'. In all 

102 other cases, it returns just the given data without any change. This method expects date index of data to be in UTC. 

103 Timezone information is not added to the index to get rid of daylight saving time and ambiguous timestamps. 

104 """ 

105 if sampling == "hourly": 105 ↛ 106line 105 didn't jump to line 106, because the condition on line 105 was never true

106 tz_info = meta.get("timezone", "UTC") 

107 try: 

108 tz = pytz.timezone(tz_info) 

109 except pytz.exceptions.UnknownTimeZoneError as e: 

110 lon, lat = meta["coordinates"]["lng"], meta["coordinates"]["lat"] 

111 tz = pytz.timezone(TimezoneFinder().timezone_at(lng=lon, lat=lat)) 

112 index = data.index 

113 index = index.tz_localize(None) 

114 utc_offset = tz.utcoffset(index[0]) - tz.dst(index[0]) 

115 data.index = index + utc_offset 

116 return data 

117 

118 

119def prepare_meta(meta, sampling, stat_var, var): 

120 out = [] 

121 for m in meta: 

122 opts = {} 

123 if sampling == "daily": 123 ↛ 128line 123 didn't jump to line 128, because the condition on line 123 was never false

124 opts["timeseries_id"] = m.pop("id") 

125 m["id"] = None 

126 opts["names"] = stat_var[var] 

127 opts["sampling"] = sampling 

128 out.append(([m], opts)) 

129 return out 

130 

131 

132def combine_meta_data(station_meta, timeseries_meta): 

133 meta = {} 

134 for k, v in station_meta.items(): 

135 if k == "codes": 

136 meta[k] = v[0] 

137 elif k in ["coordinates", "additional_metadata", "globalmeta"]: 

138 for _key, _val in v.items(): 

139 if _key == "lng": 

140 meta["lon"] = _val 

141 else: 

142 meta[_key] = _val 

143 elif k in ["changelog", "roles", "annotations", "aux_images", "aux_docs", "aux_urls"]: 

144 continue 

145 else: 

146 meta[k] = v 

147 for var, var_meta in timeseries_meta.items(): 

148 for k, v in var_meta.items(): 

149 if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]: 

150 continue 

151 elif k == "roles": 

152 for _key, _val in v[0]["contact"]["organisation"].items(): 

153 new_k = f"{var}_organisation_{_key}" 

154 meta[new_k] = _val 

155 elif k == "variable": 

156 for _key, _val in v.items(): 

157 new_k = f"{var}_{_key}" 

158 meta[new_k] = _val 

159 else: 

160 new_k = f"{var}_{k}" 

161 meta[new_k] = v 

162 return meta 

163 

164 

165def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling): 

166 coll = [] 

167 for meta in timeseries_meta: 167 ↛ 179line 167 didn't jump to line 179, because the loop on line 167 didn't complete

168 series_id = meta["id"] 

169 # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} 

170 opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} 

171 if sampling != "hourly": 171 ↛ 173line 171 didn't jump to line 173, because the condition on line 171 was never false

172 opts["service"] = None 

173 res = get_data(opts, headers, as_json=False) 

174 data = pd.read_csv(StringIO(res), comment="#", index_col="datetime", parse_dates=True, 

175 infer_datetime_format=True) 

176 if len(data.index) > 0: 

177 data = data[correct_stat_name(opts.get("names", "value"))].rename(meta["variable"]["name"]) 

178 coll.append(data) 

179 return coll 

180 

181 

182def load_station_information(station_name: List[str], url_base: str, headers: Dict): 

183 # opts = {"base": url_base, "service": f"stationmeta/{station_name[0]}"} 

184 opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]} 

185 return get_data(opts, headers) 

186 

187 

188def load_timeseries_information(station_meta, var_meta, url_base: str, headers: Dict, 

189 data_origin: Dict = None) -> [Dict, Dict]: 

190 timeseries_id_dict = {} 

191 missing = [] 

192 for var, meta in var_meta.items(): 

193 timeseries_id_dict[var] = [] 

194 opts = {"base": url_base, "service": "search", "station_id": station_meta["id"], "variable_id": meta["id"]} 

195 res = get_data(opts, headers) 

196 if len(res) == 0: 196 ↛ 197line 196 didn't jump to line 197, because the condition on line 196 was never true

197 missing.append((var, meta)) 

198 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " 

199 # f"({station_meta['codes'][0]}) and variable id {meta['id']} ({var}).") 

200 if data_origin is not None: 200 ↛ 207line 200 didn't jump to line 207, because the condition on line 200 was never false

201 var_origin = data_origin[var] 

202 timeseries_id_dict[var] = select_timeseries_by_origin(res, var_origin) 

203 # if len(timeseries_id_dict[var]) == 0: 

204 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " 

205 # f"({station_meta['codes'][0]}), variable id {meta['id']} ({var}) " 

206 # f"and timeseries origin {var_origin}.") 

207 if data_origin is None or len(timeseries_id_dict[var]) == 0: 207 ↛ 208line 207 didn't jump to line 208, because the condition on line 207 was never true

208 timeseries_id_dict[var] = select_timeseries_by_order(res) 

209 if len(missing) > 0: 209 ↛ 210line 209 didn't jump to line 210, because the condition on line 209 was never true

210 missing = ",".join([f"{m[0]} ({m[1]['id']})" for m in missing]) 

211 raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " 

212 f"({station_meta['codes'][0]}) and variables {missing}.") 

213 return timeseries_id_dict 

214 

215 

216def select_timeseries_by_order(toar_meta): 

217 order_dict = {meta["order"]: meta for meta in toar_meta} 

218 res = [order_dict[order] for order in sorted(order_dict.keys())] 

219 return res 

220 

221 

222def select_timeseries_by_origin(toar_meta, var_origin): 

223 res = [] 

224 for origin in to_list(var_origin): 

225 for meta in toar_meta: 

226 for roles in meta["roles"]: 

227 if roles["contact"]["organisation"]["name"].lower() == origin.lower(): 

228 res.append(meta) 

229 break 

230 return res 

231 

232 

233def load_variables_information(var_dict, url_base, headers): 

234 var_meta_dict = {} 

235 for var in var_dict.keys(): 

236 opts = {"base": url_base, "service": f"variables", "param_id": var} 

237 var_meta_dict[var] = get_data(opts, headers) 

238 return var_meta_dict