Coverage for mlair/helpers/data_sources/data_loader.py: 71%

114 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-12-18 17:51 +0000

1__author__ = 'Lukas Leufen' 

2__date__ = '2023-06-01' 

3 

4import logging 

5from typing import Dict, Union, List 

6import time 

7import random 

8 

9import requests 

10from requests.adapters import HTTPAdapter, Retry 

11# from requests.packages.urllib3.util.retry import Retry 

12 

13from mlair.helpers import filter_dict_by_value, select_from_dict, data_sources, TimeTracking 

14import pandas as pd 

15import xarray as xr 

16 

17DEFAULT_TIME_DIM = "datetime" 

18DEFAULT_TARGET_DIM = "variables" 

19DEFAULT_ITER_DIM = "Stations" 

20DEFAULT_WINDOW_DIM = "window" 

21 

22 

23def download_data(file_name: str, meta_file: str, station, statistics_per_var, sampling, 

24 store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM, 

25 target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM, 

26 era5_data_path=None, era5_file_names=None, ifs_data_path=None, ifs_file_names=None) -> \ 

27 [xr.DataArray, pd.DataFrame]: 

28 """ 

29 Download data from TOAR database using the JOIN interface or load local era5 data. 

30 

31 Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally 

32 stored locally using given names for file and meta file. 

33 

34 :param file_name: name of file to save data to (containing full path) 

35 :param meta_file: name of the meta data file (also containing full path) 

36 

37 :return: downloaded data and its meta data 

38 """ 

39 df_era5_local, df_toar, df_ifs_local = None, None, None 

40 meta_era5_local, meta_toar, meta_ifs_local = None, None, None 

41 if data_origin is not None: 41 ↛ 51line 41 didn't jump to line 51, because the condition on line 41 was never false

42 era5_local_origin = filter_dict_by_value(data_origin, "era5_local", True) 

43 era5_local_stats = select_from_dict(statistics_per_var, era5_local_origin.keys()) 

44 ifs_local_origin = filter_dict_by_value(data_origin, "ifs", True) 

45 ifs_local_stats = select_from_dict(statistics_per_var, ifs_local_origin.keys()) 

46 toar_origin = filter_dict_by_value(data_origin, ["era5_local", "ifs"], False) 

47 toar_stats = select_from_dict(statistics_per_var, toar_origin.keys()) 

48 assert len(era5_local_origin) + len(toar_origin) + len(ifs_local_origin) == len(data_origin) 

49 assert len(era5_local_stats) + len(toar_stats) + len(ifs_local_stats) == len(statistics_per_var) 

50 else: 

51 era5_local_origin, toar_origin, ifs_local_origin = None, None, None 

52 era5_local_stats, toar_stats, ifs_local_stats = statistics_per_var, statistics_per_var, statistics_per_var 

53 

54 # load data 

55 if era5_local_origin is not None and len(era5_local_stats) > 0: 55 ↛ 57line 55 didn't jump to line 57, because the condition on line 55 was never true

56 # load era5 data 

57 df_era5_local, meta_era5_local = data_sources.era5.load_era5( 

58 station_name=station, stat_var=era5_local_stats, sampling=sampling, data_origin=era5_local_origin, 

59 window_dim=window_dim, time_dim=time_dim, target_dim=target_dim, era5_data_path=era5_data_path, 

60 era5_file_names=era5_file_names) 

61 if ifs_local_origin is not None and len(ifs_local_stats) > 0: 61 ↛ 63line 61 didn't jump to line 63, because the condition on line 61 was never true

62 # load era5 data 

63 df_ifs_local, meta_ifs_local = data_sources.ifs.load_ifs( 

64 station_name=station, stat_var=ifs_local_stats, sampling=sampling, data_origin=ifs_local_origin, 

65 lead_time_dim=window_dim, initial_time_dim=time_dim, target_dim=target_dim, ifs_data_path=ifs_data_path, 

66 ifs_file_names=ifs_file_names) 

67 if toar_origin is None or len(toar_stats) > 0: 67 ↛ 74line 67 didn't jump to line 74, because the condition on line 67 was never false

68 # load combined data from toar-data (v2 & v1) 

69 df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats, 

70 sampling=sampling, data_origin=toar_origin, 

71 window_dim=window_dim, time_dim=time_dim, 

72 target_dim=target_dim) 

73 

74 valid_df = [e for e in [df_era5_local, df_toar, df_ifs_local] if e is not None] 

75 if len(valid_df) == 0: 

76 raise EmptyQueryResult(f"No data available for era5_local, toar-data and ifs_local") 

77 df = xr.concat(valid_df, dim=time_dim) 

78 valid_meta = [e for e in [meta_era5_local, meta_toar, meta_ifs_local] if e is not None] 

79 if len(valid_meta) > 0: 

80 meta = valid_meta[0] 

81 for e in valid_meta[1:]: 

82 meta = meta.combine_first(e) 

83 else: 

84 meta = None 

85 meta.loc["data_origin"] = str(data_origin) 

86 meta.loc["statistics_per_var"] = str(statistics_per_var) 

87 

88 xarr = df.expand_dims({iter_dim: station}) 

89 if len(xarr.coords[window_dim]) <= 1: # keep window dim only if there is more than a single entry 

90 xarr = xarr.squeeze(window_dim, drop=True) 

91 if store_data_locally is True: 

92 # save locally as nc/csv file 

93 xarr.to_netcdf(path=file_name) 

94 meta.to_csv(meta_file) 

95 return xarr, meta 

96 

97 

98class EmptyQueryResult(Exception): 

99 """Exception that get raised if a query to JOIN returns empty results.""" 

100 

101 pass 

102 

103 

104def get_data_with_query(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> bytes: 

105 """ 

106 Download data from statistics rest api. This API is based on three steps: (1) post query and retrieve job id, (2) 

107 read status of id until finished, (3) download data with job id. 

108 """ 

109 url = create_url(**opts) 

110 response_error = None 

111 for retry in range(max_retries + 1): 111 ↛ exitline 111 didn't return from function 'get_data_with_query', because the loop on line 111 didn't complete

112 time.sleep(random.random()) 

113 try: 

114 timeout = timeout_base * (2 ** retry) 

115 logging.info(f"connect (retry={retry}, timeout={timeout}) {url}") 

116 start_time = time.time() 

117 with TimeTracking(name=url): 

118 session = retries_session(max_retries=0) 

119 response = session.get(url, headers=headers, timeout=(5, 5)) # timeout=(open, read) 

120 while (time.time() - start_time) < timeout: 120 ↛ 125line 120 didn't jump to line 125, because the condition on line 120 was never false

121 response = requests.get(response.json()["status"], timeout=(5, 5)) 

122 if response.history: 

123 break 

124 time.sleep(2) 

125 return response.content 

126 except Exception as e: 

127 time.sleep(retry) 

128 logging.debug(f"There was an error for request {url}: {e}") 

129 response_error = e 

130 if retry + 1 >= max_retries: 

131 raise EmptyQueryResult(f"There was an RetryError for request {url}: {response_error}") 

132 

133 

134def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> Union[Dict, List, str]: 

135 """ 

136 Download join data using requests framework. 

137 

138 Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. 

139 

140 :param opts: options to create the request url 

141 :param headers: additional headers information like authorization, can be empty 

142 :param as_json: extract response as json if true (default True) 

143 

144 :return: requested data (either as list or dictionary) 

145 """ 

146 url = create_url(**opts) 

147 response_error = None 

148 for retry in range(max_retries + 1): 148 ↛ exitline 148 didn't return from function 'get_data', because the loop on line 148 didn't complete

149 time.sleep(random.random()) 

150 try: 

151 timeout = timeout_base * (2 ** retry) 

152 logging.info(f"connect (retry={retry}, timeout={timeout}) {url}") 

153 with TimeTracking(name=url): 

154 session = retries_session(max_retries=0) 

155 response = session.get(url, headers=headers, timeout=(5, timeout)) # timeout=(open, read) 

156 if response.status_code == 200: 

157 return response.json() if as_json is True else response.text 

158 else: 

159 logging.debug(f"There was an error (STATUS {response.status_code}) for request {url}") 

160 response_error = f"STATUS {response.status_code}" 

161 except Exception as e: 

162 time.sleep(2 * (2 ** retry)) 

163 logging.debug(f"There was an error for request {url}: {e}") 

164 response_error = e 

165 if retry + 1 >= max_retries: 

166 raise EmptyQueryResult(f"There was an RetryError for request {url}: {response_error}") 

167 

168 

169def correct_stat_name(stat: str) -> str: 

170 """ 

171 Map given statistic name to new namespace defined by mapping dict. 

172 

173 Return given name stat if not element of mapping namespace. 

174 

175 :param stat: namespace from JOIN server 

176 

177 :return: stat mapped to local namespace 

178 """ 

179 mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'} 

180 return mapping.get(stat, stat) 

181 

182 

183def create_url(base: str, service: str, param_id: Union[str, int, None] = None, 

184 **kwargs: Union[str, int, float, None]) -> str: 

185 """ 

186 Create a request url with given base url, service type and arbitrarily many additional keyword arguments. 

187 

188 :param base: basic url of the rest service 

189 :param service: service type, e.g. series, stats 

190 :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs 

191 :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' 

192 

193 :return: combined url as string 

194 """ 

195 url = f"{base}" 

196 if not url.endswith("/"): 

197 url += "/" 

198 if service is not None: 

199 url = f"{url}{service}" 

200 if not url.endswith("/"): 

201 url += "/" 

202 if param_id is not None: 

203 url = f"{url}{param_id}" 

204 if len(kwargs) > 0: 

205 url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" 

206 return url 

207 

208 

209def retries_session(max_retries=5): 

210 retry_strategy = Retry(total=max_retries, 

211 backoff_factor=1, 

212 status_forcelist=[429, 500, 502, 503, 504], 

213 method_whitelist=["HEAD", "GET", "OPTIONS"]) 

214 adapter = HTTPAdapter(max_retries=retry_strategy) 

215 http = requests.Session() 

216 http.mount("https://", adapter) 

217 http.mount("http://", adapter) 

218 return http