Coverage for mlair/helpers/data_sources/toar_data_v2.py: 78%

158 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-06-30 10:40 +0000

1"""Functions to access https://toar-data.fz-juelich.de/api/v2/""" 

2__author__ = 'Lukas Leufen' 

3__date__ = '2022-06-30' 

4 

5 

6import logging 

7from typing import Union, List, Dict 

8from io import StringIO 

9 

10import pandas as pd 

11import pytz 

12from timezonefinder import TimezoneFinder 

13from io import BytesIO 

14import zipfile 

15 

16from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings 

17from mlair.helpers import to_list 

18from mlair.helpers.data_sources.data_loader import EmptyQueryResult, get_data, correct_stat_name, get_data_with_query 

19 

20str_or_none = Union[str, None] 

21 

22 

23def download_toar(station_name: Union[str, List[str]], stat_var: dict, 

24 sampling: str = "daily", data_origin: Dict = None): 

25 """ 

26 Download data from https://toar-data.fz-juelich.de/api/v2/ 

27 

28 Uses station name to indicate measurement site and keys of stat_var to indicate variable name. If data origin is 

29 given, this method tries to load time series for this origin. In case no origin is provided, this method loads data 

30 with the highest priority according to toar-data's order parameter. 

31 

32 :param station_name: 

33 :param stat_var: 

34 :param sampling: 

35 :param data_origin: 

36 :return: 

37 """ 

38 

39 # make sure station_name parameter is a list 

40 station_name = to_list(station_name) 

41 

42 # also ensure that given data_origin dict is no reference 

43 if data_origin is None or len(data_origin) == 0: 43 ↛ 44line 43 didn't jump to line 44, because the condition on line 43 was never true

44 data_origin = None 

45 else: 

46 data_origin = {k: v for (k, v) in data_origin.items()} 

47 

48 # get data connection settings for meta 

49 meta_url_base, headers = toar_data_v2_settings("meta") 

50 

51 # load variables 

52 var_meta = load_variables_information(stat_var, meta_url_base, headers) 

53 

54 # load station meta 

55 station_meta = load_station_information(station_name, meta_url_base, headers) 

56 

57 # load series information 

58 timeseries_meta = load_timeseries_information(station_meta, var_meta, meta_url_base, headers, data_origin) 

59 

60 # # correct stat_var values if data is not aggregated (hourly) 

61 # if sampling == "hourly": 

62 # stat_var = {key: "values" for key in stat_var.keys()} 

63 

64 logging.info(f"load data for {station_meta['codes'][0]} from TOAR-DATA") 

65 # get data connection settings for data 

66 data_url_base, headers = toar_data_v2_settings(sampling) 

67 

68 data_dict = {} 

69 for var, meta in timeseries_meta.items(): 69 ↛ 76line 69 didn't jump to line 76, because the loop on line 69 didn't complete

70 logging.debug(f"load {var}") 

71 meta_and_opts = prepare_meta(meta, sampling, stat_var, var) 

72 data_var = [] 

73 for var_meta, opts in meta_and_opts: 

74 data_var.extend(load_timeseries_data(var_meta, data_url_base, opts, headers, sampling)) 

75 data_dict[var] = merge_data(*data_var, sampling=sampling) 

76 data = pd.DataFrame.from_dict(data_dict) 

77 data = correct_timezone(data, station_meta, sampling) 

78 

79 meta = combine_meta_data(station_meta, {k: v[0] for k, v in timeseries_meta.items()}) 

80 meta = pd.DataFrame.from_dict(meta, orient='index') 

81 meta.columns = station_name 

82 return data, meta 

83 

84 

85def merge_data(*args, sampling="hourly"): 

86 start_date = min(map(lambda x: x.index.min(), args)) 

87 end_date = max(map(lambda x: x.index.max(), args)) 

88 freq = {"hourly": "1H", "daily": "1d"}.get(sampling) 

89 full_time = pd.date_range(start_date, end_date, freq=freq) 

90 full_data = args[0].reindex(full_time) 

91 if not isinstance(full_data, pd.DataFrame): 91 ↛ 93line 91 didn't jump to line 93, because the condition on line 91 was never false

92 full_data = full_data.to_frame() 

93 for d in args[1:]: 93 ↛ 94line 93 didn't jump to line 94, because the loop on line 93 never started

94 full_data.update(d, overwrite=False) 

95 return full_data.squeeze() 

96 

97 

98def correct_timezone(data, meta, sampling): 

99 """ 

100 Extract timezone information and convert data index to this timezone. 

101 

102 Uses UTC if no information is provided. Note that is method only modifies data in with sampling='hourly'. In all 

103 other cases, it returns just the given data without any change. This method expects date index of data to be in UTC. 

104 Timezone information is not added to the index to get rid of daylight saving time and ambiguous timestamps. 

105 """ 

106 if sampling == "hourly": 106 ↛ 107line 106 didn't jump to line 107, because the condition on line 106 was never true

107 tz_info = meta.get("timezone", "UTC") 

108 try: 

109 tz = pytz.timezone(tz_info) 

110 except pytz.exceptions.UnknownTimeZoneError as e: 

111 lon, lat = meta["coordinates"]["lng"], meta["coordinates"]["lat"] 

112 tz = pytz.timezone(TimezoneFinder().timezone_at(lng=lon, lat=lat)) 

113 index = data.index 

114 index = index.tz_localize(None) 

115 utc_offset = tz.utcoffset(index[0]) - tz.dst(index[0]) 

116 data.index = index + utc_offset 

117 return data 

118 

119 

120def prepare_meta(meta, sampling, stat_var, var): 

121 out = [] 

122 for m in meta: 

123 opts = {} 

124 if sampling == "daily": 124 ↛ 129line 124 didn't jump to line 129, because the condition on line 124 was never false

125 opts["id"] = m.pop("id") 

126 m["id"] = None 

127 opts["statistics"] = stat_var[var] 

128 opts["sampling"] = sampling 

129 out.append(([m], opts)) 

130 return out 

131 

132 

133def combine_meta_data(station_meta, timeseries_meta): 

134 meta = {} 

135 for k, v in station_meta.items(): 

136 if k == "codes": 

137 meta[k] = v[0] 

138 elif k in ["coordinates", "additional_metadata", "globalmeta"]: 

139 for _key, _val in v.items(): 

140 if _key == "lng": 

141 meta["lon"] = _val 

142 else: 

143 meta[_key] = _val 

144 elif k in ["changelog", "roles", "annotations", "aux_images", "aux_docs", "aux_urls"]: 

145 continue 

146 else: 

147 meta[k] = v 

148 for var, var_meta in timeseries_meta.items(): 

149 for k, v in var_meta.items(): 

150 if k in ["additional_metadata", "station", "programme", "annotations", "changelog"]: 

151 continue 

152 elif k == "roles": 

153 for _key, _val in v[0]["contact"]["organisation"].items(): 

154 new_k = f"{var}_organisation_{_key}" 

155 meta[new_k] = _val 

156 elif k == "variable": 

157 for _key, _val in v.items(): 

158 new_k = f"{var}_{_key}" 

159 meta[new_k] = _val 

160 else: 

161 new_k = f"{var}_{k}" 

162 meta[new_k] = v 

163 return meta 

164 

165 

166def load_timeseries_data(timeseries_meta, url_base, opts, headers, sampling): 

167 coll = [] 

168 for meta in timeseries_meta: 

169 series_id = meta["id"] 

170 # opts = {"base": url_base, "service": f"data/timeseries/{series_id}"} 

171 opts = {"base": url_base, "service": f"data/timeseries", "param_id": series_id, "format": "csv", **opts} 

172 if sampling == "hourly": 172 ↛ 173line 172 didn't jump to line 173, because the condition on line 172 was never true

173 res = get_data(opts, headers, as_json=False) 

174 data = extract_timeseries_data(res, "string") 

175 else: 

176 opts["service"] = None 

177 opts["format"] = None 

178 res = get_data_with_query(opts, headers, as_json=False) 

179 data = extract_timeseries_data(res, "bytes") 

180 if len(data.index) > 0: 180 ↛ 168line 180 didn't jump to line 168, because the condition on line 180 was never false

181 data = data[correct_stat_name(opts.get("statistics", "value"))].rename(meta["variable"]["name"]) 

182 coll.append(data) 

183 return coll 

184 

185 

186def extract_timeseries_data(result, result_format): 

187 if result_format == "string": 187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never true

188 return pd.read_csv(StringIO(result), comment="#", index_col="datetime", parse_dates=True, 

189 infer_datetime_format=True) 

190 elif result_format == "bytes": 190 ↛ 195line 190 didn't jump to line 195, because the condition on line 190 was never false

191 with zipfile.ZipFile(BytesIO(result)) as file: 

192 return pd.read_csv(BytesIO(file.read(file.filelist[0].filename)), comment="#", index_col="datetime", 

193 parse_dates=True) 

194 else: 

195 raise ValueError(f"Unknown result format given: {result_format}") 

196 

197 

198def load_station_information(station_name: List[str], url_base: str, headers: Dict): 

199 opts = {"base": url_base, "service": f"stationmeta", "param_id": station_name[0]} 

200 return get_data(opts, headers) 

201 

202 

203def load_timeseries_information(station_meta, var_meta, url_base: str, headers: Dict, 

204 data_origin: Dict = None) -> [Dict, Dict]: 

205 timeseries_id_dict = {} 

206 missing = [] 

207 for var, meta in var_meta.items(): 

208 timeseries_id_dict[var] = [] 

209 opts = {"base": url_base, "service": "search", "station_id": station_meta["id"], "variable_id": meta["id"]} 

210 res = get_data(opts, headers) 

211 if len(res) == 0: 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true

212 missing.append((var, meta)) 

213 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " 

214 # f"({station_meta['codes'][0]}) and variable id {meta['id']} ({var}).") 

215 if data_origin is not None: 215 ↛ 222line 215 didn't jump to line 222, because the condition on line 215 was never false

216 var_origin = data_origin[var] 

217 timeseries_id_dict[var] = select_timeseries_by_origin(res, var_origin) 

218 # if len(timeseries_id_dict[var]) == 0: 

219 # raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " 

220 # f"({station_meta['codes'][0]}), variable id {meta['id']} ({var}) " 

221 # f"and timeseries origin {var_origin}.") 

222 if data_origin is None or len(timeseries_id_dict[var]) == 0: 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true

223 timeseries_id_dict[var] = select_timeseries_by_order(res) 

224 if len(missing) > 0: 224 ↛ 225line 224 didn't jump to line 225, because the condition on line 224 was never true

225 missing = ",".join([f"{m[0]} ({m[1]['id']})" for m in missing]) 

226 raise EmptyQueryResult(f"Cannot find any timeseries for station id {station_meta['id']} " 

227 f"({station_meta['codes'][0]}) and variables {missing}.") 

228 return timeseries_id_dict 

229 

230 

231def select_timeseries_by_order(toar_meta): 

232 order_dict = {meta["order"]: meta for meta in toar_meta} 

233 res = [order_dict[order] for order in sorted(order_dict.keys())] 

234 return res 

235 

236 

237def select_timeseries_by_origin(toar_meta, var_origin): 

238 res = [] 

239 for origin in to_list(var_origin): 

240 for meta in toar_meta: 

241 if meta["data_origin"] == "instrument": 

242 for roles in meta["roles"]: 

243 if roles["contact"]["organisation"]["name"].lower() == origin.lower(): 

244 res.append(meta) 

245 break 

246 elif meta["data_origin"].lower() == origin.lower(): 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true

247 res.append(meta) 

248 break 

249 return res 

250 

251 

252def load_variables_information(var_dict, url_base, headers): 

253 var_meta_dict = {} 

254 for var in var_dict.keys(): 

255 opts = {"base": url_base, "service": f"variables", "param_id": var} 

256 var_meta_dict[var] = get_data(opts, headers) 

257 return var_meta_dict