Coverage for mlair/helpers/data_sources/join.py: 92%

166 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-06-30 10:22 +0000

1"""Functions to access join database.""" 

2__author__ = 'Felix Kleinert, Lukas Leufen' 

3__date__ = '2019-10-16' 

4 

5import datetime as dt 

6import logging 

7from typing import Iterator, Union, List, Dict, Tuple 

8 

9import pandas as pd 

10 

11from mlair import helpers 

12from mlair.configuration.join_settings import join_settings 

13from mlair.helpers.data_sources import toar_data, toar_data_v2, data_loader 

14 

15 

16# join_url_base = 'https://join.fz-juelich.de/services/rest/surfacedata/' 

17str_or_none = Union[str, None] 

18 

19 

20def download_join(station_name: Union[str, List[str]], stat_var: dict, station_type: str = None, 

21 sampling: str = "daily", data_origin: Dict = None) -> [pd.DataFrame, pd.DataFrame]: 

22 """ 

23 Read data from JOIN/TOAR. 

24 

25 :param station_name: Station name e.g. DEBY122 

26 :param stat_var: key as variable like 'O3', values as statistics on keys like 'mean' 

27 :param station_type: set the station type like "traffic" or "background", can be none 

28 :param sampling: sampling rate of the downloaded data, either set to daily or hourly (default daily) 

29 :param data_origin: additional dictionary to specify data origin as key (for variable) value (origin) pair. Valid 

30 origins are "REA" for reanalysis data and "" (empty string) for observational data. 

31 

32 :returns: data frame with all variables and statistics and meta data frame with all meta information 

33 """ 

34 # make sure station_name parameter is a list 

35 station_name = helpers.to_list(station_name) 

36 

37 # split network and origin information 

38 data_origin, network_name = split_network_and_origin(data_origin) 

39 

40 # get data connection settings 

41 join_url_base, headers = join_settings(sampling) 

42 

43 # load series information 

44 vars_dict, data_origin = load_series_information(station_name, station_type, network_name, join_url_base, headers, 

45 data_origin, stat_var) 

46 

47 # check if all requested variables are available 

48 if set(stat_var).issubset(vars_dict) is False: 

49 missing_variables = set(stat_var).difference(vars_dict) 

50 origin = helpers.select_from_dict(data_origin, missing_variables) 

51 options = f"station={station_name}, type={station_type}, network={network_name}, origin={origin}" 

52 raise data_loader.EmptyQueryResult( 

53 f"No data found for variables {missing_variables} and options {options} in JOIN.") 

54 

55 # correct stat_var values if data is not aggregated (hourly) 

56 if sampling == "hourly": 56 ↛ 57line 56 didn't jump to line 57, because the condition on line 56 was never true

57 stat_var = {key: "values" for key in stat_var.keys()} 

58 

59 # download all variables with given statistic 

60 data = None 

61 df = None 

62 meta = {} 

63 logging.info(f"load data for {station_name[0]} from JOIN") 

64 for var in _lower_list(sorted(vars_dict.keys())): 

65 if var in stat_var.keys(): 65 ↛ 64line 65 didn't jump to line 64, because the condition on line 65 was never false

66 

67 logging.debug('load: {}'.format(var)) 

68 

69 # create data link 

70 opts = {'base': join_url_base, 'service': 'stats', 'id': vars_dict[var], 'statistics': stat_var[var], 

71 'sampling': sampling, 'capture': 0, 'format': 'json'} 

72 

73 # load data 

74 data = data_loader.get_data(opts, headers) 

75 

76 # adjust data format if given as list of list 

77 # no branch cover because this just happens when downloading hourly data using a secret token, not available 

78 # for CI testing. 

79 if isinstance(data, list): # pragma: no branch 

80 data = correct_data_format(data) 

81 

82 # correct namespace of statistics 

83 stat = data_loader.correct_stat_name(stat_var[var]) 

84 

85 # store data in pandas dataframe 

86 df = _save_to_pandas(df, data, stat, var) 

87 meta[var] = _correct_meta(data["metadata"]) 

88 

89 logging.debug('finished: {}'.format(var)) 

90 

91 if data: 91 ↛ 103line 91 didn't jump to line 103, because the condition on line 91 was never false

92 # load station meta using toar-data v2 API and convert to local timezone 

93 meta_url_base, headers = toar_data_v2.toar_data_v2_settings("meta") 

94 station_meta = toar_data_v2.load_station_information(station_name, meta_url_base, headers) 

95 df = toar_data_v2.correct_timezone(df, station_meta, sampling) 

96 

97 # create meta data 

98 meta = toar_data_v2.combine_meta_data(station_meta, meta) 

99 meta = pd.DataFrame.from_dict(meta, orient='index') 

100 meta.columns = station_name 

101 return df, meta 

102 else: 

103 raise data_loader.EmptyQueryResult("No data found in JOIN.") 

104 

105 

106def _correct_meta(meta): 

107 meta_out = {} 

108 for k, v in meta.items(): 

109 if k.startswith("station"): 

110 _k = k.split("_", 1)[1] 

111 _d = meta_out.get("station", {}) 

112 _d[_k] = v 

113 meta_out["station"] = _d 

114 elif k.startswith("parameter"): 

115 _k = k.split("_", 1)[1] 

116 _d = meta_out.get("variable", {}) 

117 _d[_k] = v 

118 meta_out["variable"] = _d 

119 elif k == "network_name": 

120 if v == "AIRBASE": 120 ↛ 121line 120 didn't jump to line 121, because the condition on line 120 was never true

121 _d = {"name": "EEA", "longname": "European Environment Agency", "kind": "government"} 

122 elif v == "UBA": 122 ↛ 125line 122 didn't jump to line 125, because the condition on line 122 was never false

123 _d = {"name": "UBA", "longname": "Umweltbundesamt", "kind": "government", "country": "Germany"} 

124 else: 

125 _d = {"name": v} 

126 meta_out["roles"] = [{"contact": {"organisation": _d}}] 

127 elif k in ["google_resolution", "numid"]: 

128 continue 

129 else: 

130 meta_out[k] = v 

131 return meta_out 

132 

133 

134def split_network_and_origin(origin_network_dict: dict) -> Tuple[Union[None, dict], Union[None, dict]]: 

135 """ 

136 Split given dict into network and data origin. 

137 

138 Method is required to transform Toar-Data v2 structure (using only origin) into Toar-Data v1 (JOIN) structure (which 

139 uses origin and network parameter). Furthermore, EEA network (v2) is renamed to AIRBASE (v1). 

140 """ 

141 if origin_network_dict is None or len(origin_network_dict) == 0: 

142 data_origin, network = None, None 

143 else: 

144 data_origin = {} 

145 network = {} 

146 for k, v in origin_network_dict.items(): 

147 network[k] = [] 

148 for _network in helpers.to_list(v): 

149 if _network.lower() == "EEA".lower(): 149 ↛ 150line 149 didn't jump to line 150, because the condition on line 149 was never true

150 network[k].append("AIRBASE") 

151 elif _network.lower() != "REA".lower(): 151 ↛ 148line 151 didn't jump to line 148, because the condition on line 151 was never false

152 network[k].append(_network) 

153 if "REA" in v: 153 ↛ 154line 153 didn't jump to line 154, because the condition on line 153 was never true

154 data_origin[k] = "REA" 

155 else: 

156 data_origin[k] = "" 

157 network[k] = filter_network(network[k]) 

158 return data_origin, network 

159 

160 

161def filter_network(network: list) -> Union[list, None]: 

162 """ 

163 Filter given list of networks. 

164 

165 :param network: list of various network names (can contain duplicates) 

166 :return: sorted list with unique entries 

167 """ 

168 sorted_network = [] 

169 for v in list(filter(lambda x: x != "", network)): 

170 if v not in sorted_network: 170 ↛ 169line 170 didn't jump to line 169, because the condition on line 170 was never false

171 sorted_network.append(v) 

172 if len(sorted_network) == 0: 

173 sorted_network = None 

174 return sorted_network 

175 

176 

177def correct_data_format(data): 

178 """ 

179 Transform to the standard data format. 

180 

181 For some cases (e.g. hourly data), the data is returned as list instead of a dictionary with keys datetime, values 

182 and metadata. This functions addresses this issue and transforms the data into the dictionary version. 

183 

184 :param data: data in hourly format 

185 

186 :return: the same data but formatted to fit with aggregated format 

187 """ 

188 formatted = {"datetime": [], 

189 "values": [], 

190 "metadata": data[-1]} 

191 for d in data[:-1]: 

192 for k, v in zip(["datetime", "values"], d): 

193 formatted[k].append(v) 

194 return formatted 

195 

196 

197def load_series_information(station_name: List[str], station_type: str_or_none, network_name: str_or_none, 

198 join_url_base: str, headers: Dict, data_origin: Dict = None, stat_var: Dict = None) -> [Dict, Dict]: 

199 """ 

200 List all series ids that are available for given station id and network name. 

201 

202 :param station_name: Station name e.g. DEBW107 

203 :param station_type: station type like "traffic" or "background" 

204 :param network_name: measurement network of the station like "UBA" or "AIRBASE" 

205 :param join_url_base: base url name to download data from 

206 :param headers: additional headers information like authorization, can be empty 

207 :param data_origin: additional information to select a distinct series e.g. from reanalysis (REA) or from observation 

208 ("", empty string). This dictionary should contain a key for each variable and the information as key 

209 :return: all available series for requested station stored in an dictionary with parameter name (variable) as key 

210 and the series id as value. 

211 """ 

212 network_name_opts = _create_network_name_opts(network_name) 

213 parameter_name_opts = _create_parameter_name_opts(stat_var) 

214 opts = {"base": join_url_base, "service": "search", "station_id": station_name[0], "station_type": station_type, 

215 "network_name": network_name_opts, "as_dict": "true", "parameter_name": parameter_name_opts, 

216 "columns": "id,network_name,station_id,parameter_name,parameter_label,parameter_attribute"} 

217 station_vars = data_loader.get_data(opts, headers) 

218 logging.debug(f"{station_name}: {station_vars}") 

219 return _select_distinct_series(station_vars, data_origin, network_name) 

220 

221 

222def _create_parameter_name_opts(stat_var): 

223 if stat_var is None: 

224 parameter_name_opts = None 

225 else: 

226 parameter_name_opts = ",".join(stat_var.keys()) 

227 return parameter_name_opts 

228 

229 

230def _create_network_name_opts(network_name): 

231 if network_name is None: 

232 network_name_opts = network_name 

233 elif isinstance(network_name, list): 233 ↛ 234line 233 didn't jump to line 234, because the condition on line 233 was never true

234 network_name_opts = ",".join(helpers.to_list(network_name)) 

235 elif isinstance(network_name, dict): 235 ↛ 242line 235 didn't jump to line 242, because the condition on line 235 was never false

236 _network = [] 

237 for v in network_name.values(): 

238 _network.extend(helpers.to_list(v)) 

239 network_name_opts = ",".join(filter(lambda x: x is not None, set(_network))) 

240 network_name_opts = None if len(network_name_opts) == 0 else network_name_opts 

241 else: 

242 raise TypeError(f"network_name parameter must be of type None, list, or dict. Given is {type(network_name)}.") 

243 return network_name_opts 

244 

245 

246def _select_distinct_series(vars: List[Dict], data_origin: Dict = None, network_name: Union[str, List[str]] = None) \ 

247 -> [Dict, Dict]: 

248 """ 

249 Select distinct series ids for all variables. Also check if a parameter is from REA or not. 

250 """ 

251 data_origin = {} if data_origin is None else data_origin 

252 selected, data_origin = _select_distinct_data_origin(vars, data_origin) 

253 

254 network_name = [] if network_name is None else network_name 

255 selected = _select_distinct_network(selected, network_name) 

256 

257 # extract id 

258 selected = {k: v["id"] for k, v in selected.items()} 

259 return selected, data_origin 

260 

261 

262def _select_distinct_network(vars: dict, network_name: Union[list, dict]) -> dict: 

263 """ 

264 Select distinct series regarding network name. The order the network names are provided in parameter `network_name` 

265 indicates priority (from high to low). If no network name is provided, first entry is used and a logging info is 

266 issued. In case network names are given but no match can be found, this method raises a ValueError. 

267 

268 :param vars: dictionary with all series candidates already grouped by variable name as key. Value should be a list 

269 of possible candidates to select from. Each candidate must be a dictionary with at least keys `id` and 

270 `network_name`. 

271 :param network_name: list of networks to use with increasing priority (1st element has priority). Can be empty list 

272 indicating to use always first candidate for each variable. 

273 :return: dictionary with single series reference for each variable 

274 """ 

275 if isinstance(network_name, (list, str)): 

276 network_name = {var: helpers.to_list(network_name) for var in vars.keys()} 

277 selected = {} 

278 for var, series in vars.items(): 

279 res = [] 

280 network_list = helpers.to_list(network_name.get(var, []) or []) 

281 for network in network_list: 

282 res.extend(list(filter(lambda x: x["network_name"].upper() == network.upper(), series))) 

283 if len(res) > 0: # use first match which has the highest priority 

284 selected[var] = res[0] 

285 else: 

286 if len(network_list) == 0: # just print message which network is used if none is provided 

287 selected[var] = series[0] 

288 logging.info(f"Could not find a valid match for variable {var} and networks {network_name.get(var, [])}" 

289 f"! Therefore, use first answer from JOIN: {series[0]}") 

290 else: # raise error if network name is provided but no match could be found 

291 raise ValueError(f"Cannot find a valid match for requested networks {network_name.get(var, [])} and " 

292 f"variable {var} as only following networks are available in JOIN: " 

293 f"{list(map(lambda x: x['network_name'], series))}") 

294 return selected 

295 

296 

297def _select_distinct_data_origin(vars: List[Dict], data_origin: Dict) -> (Dict[str, List], Dict): 

298 """ 

299 Select distinct series regarding their data origin. Series are grouped as list according to their variable's name. 

300 As series can be reported with different network attribution, results might contain multiple entries for a variable. 

301 This method assumes the default data origin for chemical variables as `` (empty source) and for meteorological 

302 variables as `REA`. 

303 :param vars: list of all entries to check data origin for 

304 :param data_origin: data origin to match series with, if empty default values are used 

305 :return: dictionary with unique variable names as keys and list of respective series as values 

306 """ 

307 data_origin_default = {"cloudcover": "REA", "humidity": "REA", "pblheight": "REA", "press": "REA", "relhum": "REA", 

308 "temp": "REA", "totprecip": "REA", "u": "REA", "v": "REA", 

309 "no": "", "no2": "", "o3": "", "pm10": "", "so2": ""} 

310 selected = {} 

311 for var in vars: 

312 name = var["parameter_name"].lower() 

313 var_attr = var["parameter_attribute"].lower() 

314 if name not in data_origin.keys(): 

315 data_origin.update({name: data_origin_default.get(name, "")}) 

316 attr = data_origin.get(name, "").lower() 

317 if var_attr == attr: 

318 selected[name] = selected.get(name, []) + helpers.to_list(var) 

319 return selected, data_origin 

320 

321 

322def _save_to_pandas(df: Union[pd.DataFrame, None], data: dict, stat: str, var: str) -> pd.DataFrame: 

323 """ 

324 Save given data in data frame. 

325 

326 If given data frame is not empty, the data is appened as new column. 

327 

328 :param df: data frame to append the new data, can be none 

329 :param data: new data to append or format as data frame containing the keys 'datetime' and '<stat>' 

330 :param stat: extracted statistic to get values from data (e.g. 'mean', 'dma8eu') 

331 :param var: variable the data is from (e.g. 'o3') 

332 

333 :return: new created or concatenated data frame 

334 """ 

335 if len(data["datetime"][0]) == 19: 

336 str_format = "%Y-%m-%d %H:%M:%S" 

337 else: 

338 str_format = "%Y-%m-%d %H:%M" 

339 index = map(lambda s: dt.datetime.strptime(s, str_format), data['datetime']) 

340 if df is None: 

341 df = pd.DataFrame(data[stat], index=index, columns=[var]) 

342 else: 

343 df = pd.concat([df, pd.DataFrame(data[stat], index=index, columns=[var])], axis=1) 

344 return df 

345 

346 

347def _lower_list(args: List[str]) -> Iterator[str]: 

348 """ 

349 Lower all elements of given list. 

350 

351 :param args: list with string entries to lower 

352 

353 :return: iterator that lowers all list entries 

354 """ 

355 for string in args: 

356 yield string.lower() 

357 

358 

359if __name__ == "__main__": 

360 logging.basicConfig(level=logging.DEBUG) 

361 var_all_dic = {'o3': 'dma8eu', 'relhum': 'average_values', 'temp': 'maximum', 'u': 'average_values', 

362 'v': 'average_values', 'no': 'dma8eu', 'no2': 'dma8eu', 'cloudcover': 'average_values', 

363 'pblheight': 'maximum'} 

364 station = 'DEBW107' 

365 # download_join(station, var_all_dic, sampling="daily") 

366 download_join(station, var_all_dic, sampling="hourly")