Coverage for mlair/helpers/data_sources/data_loader.py: 71%
114 statements
« prev ^ index » next coverage.py v6.4.2, created at 2023-12-18 17:51 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2023-12-18 17:51 +0000
1__author__ = 'Lukas Leufen'
2__date__ = '2023-06-01'
4import logging
5from typing import Dict, Union, List
6import time
7import random
9import requests
10from requests.adapters import HTTPAdapter, Retry
11# from requests.packages.urllib3.util.retry import Retry
13from mlair.helpers import filter_dict_by_value, select_from_dict, data_sources, TimeTracking
14import pandas as pd
15import xarray as xr
17DEFAULT_TIME_DIM = "datetime"
18DEFAULT_TARGET_DIM = "variables"
19DEFAULT_ITER_DIM = "Stations"
20DEFAULT_WINDOW_DIM = "window"
23def download_data(file_name: str, meta_file: str, station, statistics_per_var, sampling,
24 store_data_locally=True, data_origin: Dict = None, time_dim=DEFAULT_TIME_DIM,
25 target_dim=DEFAULT_TARGET_DIM, iter_dim=DEFAULT_ITER_DIM, window_dim=DEFAULT_WINDOW_DIM,
26 era5_data_path=None, era5_file_names=None, ifs_data_path=None, ifs_file_names=None) -> \
27 [xr.DataArray, pd.DataFrame]:
28 """
29 Download data from TOAR database using the JOIN interface or load local era5 data.
31 Data is transformed to a xarray dataset. If class attribute store_data_locally is true, data is additionally
32 stored locally using given names for file and meta file.
34 :param file_name: name of file to save data to (containing full path)
35 :param meta_file: name of the meta data file (also containing full path)
37 :return: downloaded data and its meta data
38 """
39 df_era5_local, df_toar, df_ifs_local = None, None, None
40 meta_era5_local, meta_toar, meta_ifs_local = None, None, None
41 if data_origin is not None: 41 ↛ 51line 41 didn't jump to line 51, because the condition on line 41 was never false
42 era5_local_origin = filter_dict_by_value(data_origin, "era5_local", True)
43 era5_local_stats = select_from_dict(statistics_per_var, era5_local_origin.keys())
44 ifs_local_origin = filter_dict_by_value(data_origin, "ifs", True)
45 ifs_local_stats = select_from_dict(statistics_per_var, ifs_local_origin.keys())
46 toar_origin = filter_dict_by_value(data_origin, ["era5_local", "ifs"], False)
47 toar_stats = select_from_dict(statistics_per_var, toar_origin.keys())
48 assert len(era5_local_origin) + len(toar_origin) + len(ifs_local_origin) == len(data_origin)
49 assert len(era5_local_stats) + len(toar_stats) + len(ifs_local_stats) == len(statistics_per_var)
50 else:
51 era5_local_origin, toar_origin, ifs_local_origin = None, None, None
52 era5_local_stats, toar_stats, ifs_local_stats = statistics_per_var, statistics_per_var, statistics_per_var
54 # load data
55 if era5_local_origin is not None and len(era5_local_stats) > 0: 55 ↛ 57line 55 didn't jump to line 57, because the condition on line 55 was never true
56 # load era5 data
57 df_era5_local, meta_era5_local = data_sources.era5.load_era5(
58 station_name=station, stat_var=era5_local_stats, sampling=sampling, data_origin=era5_local_origin,
59 window_dim=window_dim, time_dim=time_dim, target_dim=target_dim, era5_data_path=era5_data_path,
60 era5_file_names=era5_file_names)
61 if ifs_local_origin is not None and len(ifs_local_stats) > 0: 61 ↛ 63line 61 didn't jump to line 63, because the condition on line 61 was never true
62 # load era5 data
63 df_ifs_local, meta_ifs_local = data_sources.ifs.load_ifs(
64 station_name=station, stat_var=ifs_local_stats, sampling=sampling, data_origin=ifs_local_origin,
65 lead_time_dim=window_dim, initial_time_dim=time_dim, target_dim=target_dim, ifs_data_path=ifs_data_path,
66 ifs_file_names=ifs_file_names)
67 if toar_origin is None or len(toar_stats) > 0: 67 ↛ 74line 67 didn't jump to line 74, because the condition on line 67 was never false
68 # load combined data from toar-data (v2 & v1)
69 df_toar, meta_toar = data_sources.toar_data.download_toar(station=station, toar_stats=toar_stats,
70 sampling=sampling, data_origin=toar_origin,
71 window_dim=window_dim, time_dim=time_dim,
72 target_dim=target_dim)
74 valid_df = [e for e in [df_era5_local, df_toar, df_ifs_local] if e is not None]
75 if len(valid_df) == 0:
76 raise EmptyQueryResult(f"No data available for era5_local, toar-data and ifs_local")
77 df = xr.concat(valid_df, dim=time_dim)
78 valid_meta = [e for e in [meta_era5_local, meta_toar, meta_ifs_local] if e is not None]
79 if len(valid_meta) > 0:
80 meta = valid_meta[0]
81 for e in valid_meta[1:]:
82 meta = meta.combine_first(e)
83 else:
84 meta = None
85 meta.loc["data_origin"] = str(data_origin)
86 meta.loc["statistics_per_var"] = str(statistics_per_var)
88 xarr = df.expand_dims({iter_dim: station})
89 if len(xarr.coords[window_dim]) <= 1: # keep window dim only if there is more than a single entry
90 xarr = xarr.squeeze(window_dim, drop=True)
91 if store_data_locally is True:
92 # save locally as nc/csv file
93 xarr.to_netcdf(path=file_name)
94 meta.to_csv(meta_file)
95 return xarr, meta
98class EmptyQueryResult(Exception):
99 """Exception that get raised if a query to JOIN returns empty results."""
101 pass
104def get_data_with_query(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> bytes:
105 """
106 Download data from statistics rest api. This API is based on three steps: (1) post query and retrieve job id, (2)
107 read status of id until finished, (3) download data with job id.
108 """
109 url = create_url(**opts)
110 response_error = None
111 for retry in range(max_retries + 1): 111 ↛ exitline 111 didn't return from function 'get_data_with_query', because the loop on line 111 didn't complete
112 time.sleep(random.random())
113 try:
114 timeout = timeout_base * (2 ** retry)
115 logging.info(f"connect (retry={retry}, timeout={timeout}) {url}")
116 start_time = time.time()
117 with TimeTracking(name=url):
118 session = retries_session(max_retries=0)
119 response = session.get(url, headers=headers, timeout=(5, 5)) # timeout=(open, read)
120 while (time.time() - start_time) < timeout: 120 ↛ 125line 120 didn't jump to line 125, because the condition on line 120 was never false
121 response = requests.get(response.json()["status"], timeout=(5, 5))
122 if response.history:
123 break
124 time.sleep(2)
125 return response.content
126 except Exception as e:
127 time.sleep(retry)
128 logging.debug(f"There was an error for request {url}: {e}")
129 response_error = e
130 if retry + 1 >= max_retries:
131 raise EmptyQueryResult(f"There was an RetryError for request {url}: {response_error}")
134def get_data(opts: Dict, headers: Dict, as_json: bool = True, max_retries=5, timeout_base=60) -> Union[Dict, List, str]:
135 """
136 Download join data using requests framework.
138 Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary.
140 :param opts: options to create the request url
141 :param headers: additional headers information like authorization, can be empty
142 :param as_json: extract response as json if true (default True)
144 :return: requested data (either as list or dictionary)
145 """
146 url = create_url(**opts)
147 response_error = None
148 for retry in range(max_retries + 1): 148 ↛ exitline 148 didn't return from function 'get_data', because the loop on line 148 didn't complete
149 time.sleep(random.random())
150 try:
151 timeout = timeout_base * (2 ** retry)
152 logging.info(f"connect (retry={retry}, timeout={timeout}) {url}")
153 with TimeTracking(name=url):
154 session = retries_session(max_retries=0)
155 response = session.get(url, headers=headers, timeout=(5, timeout)) # timeout=(open, read)
156 if response.status_code == 200:
157 return response.json() if as_json is True else response.text
158 else:
159 logging.debug(f"There was an error (STATUS {response.status_code}) for request {url}")
160 response_error = f"STATUS {response.status_code}"
161 except Exception as e:
162 time.sleep(2 * (2 ** retry))
163 logging.debug(f"There was an error for request {url}: {e}")
164 response_error = e
165 if retry + 1 >= max_retries:
166 raise EmptyQueryResult(f"There was an RetryError for request {url}: {response_error}")
169def correct_stat_name(stat: str) -> str:
170 """
171 Map given statistic name to new namespace defined by mapping dict.
173 Return given name stat if not element of mapping namespace.
175 :param stat: namespace from JOIN server
177 :return: stat mapped to local namespace
178 """
179 mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'}
180 return mapping.get(stat, stat)
183def create_url(base: str, service: str, param_id: Union[str, int, None] = None,
184 **kwargs: Union[str, int, float, None]) -> str:
185 """
186 Create a request url with given base url, service type and arbitrarily many additional keyword arguments.
188 :param base: basic url of the rest service
189 :param service: service type, e.g. series, stats
190 :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs
191 :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum'
193 :return: combined url as string
194 """
195 url = f"{base}"
196 if not url.endswith("/"):
197 url += "/"
198 if service is not None:
199 url = f"{url}{service}"
200 if not url.endswith("/"):
201 url += "/"
202 if param_id is not None:
203 url = f"{url}{param_id}"
204 if len(kwargs) > 0:
205 url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}"
206 return url
209def retries_session(max_retries=5):
210 retry_strategy = Retry(total=max_retries,
211 backoff_factor=1,
212 status_forcelist=[429, 500, 502, 503, 504],
213 method_whitelist=["HEAD", "GET", "OPTIONS"])
214 adapter = HTTPAdapter(max_retries=retry_strategy)
215 http = requests.Session()
216 http.mount("https://", adapter)
217 http.mount("http://", adapter)
218 return http