Coverage for mlair/helpers/data_sources/toar_data.py: 86%
58 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-12-02 15:24 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2022-12-02 15:24 +0000
1__author__ = "Lukas Leufen"
2__date__ = "2022-07-05"
5from typing import Union, List, Dict
7from . import join, toar_data_v2
9import requests
10from requests.adapters import HTTPAdapter
11from requests.packages.urllib3.util.retry import Retry
12import pandas as pd
15class EmptyQueryResult(Exception):
16 """Exception that get raised if a query to JOIN returns empty results."""
18 pass
21def create_url(base: str, service: str, param_id: Union[str, int, None] = None,
22 **kwargs: Union[str, int, float, None]) -> str:
23 """
24 Create a request url with given base url, service type and arbitrarily many additional keyword arguments.
26 :param base: basic url of the rest service
27 :param service: service type, e.g. series, stats
28 :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs
29 :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum'
31 :return: combined url as string
32 """
33 url = f"{base}"
34 if not url.endswith("/"):
35 url += "/"
36 if service is not None:
37 url = f"{url}{service}"
38 if not url.endswith("/"):
39 url += "/"
40 if param_id is not None:
41 url = f"{url}{param_id}"
42 if len(kwargs) > 0:
43 url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}"
44 return url
47def get_data(opts: Dict, headers: Dict, as_json: bool = True) -> Union[Dict, List, str]:
48 """
49 Download join data using requests framework.
51 Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary.
53 :param opts: options to create the request url
54 :param headers: additional headers information like authorization, can be empty
55 :param as_json: extract response as json if true (default True)
57 :return: requested data (either as list or dictionary)
58 """
59 url = create_url(**opts)
60 try:
61 response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read)
62 if response.status_code == 200:
63 return response.json() if as_json is True else response.text
64 else:
65 raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}")
66 except requests.exceptions.RetryError as e:
67 raise EmptyQueryResult(f"There was an RetryError for request {url}: {e}")
70def retries_session(max_retries=3):
71 retry_strategy = Retry(total=max_retries,
72 backoff_factor=0.1,
73 status_forcelist=[429, 500, 502, 503, 504],
74 method_whitelist=["HEAD", "GET", "OPTIONS"])
75 adapter = HTTPAdapter(max_retries=retry_strategy)
76 http = requests.Session()
77 http.mount("https://", adapter)
78 http.mount("http://", adapter)
79 return http
82def download_toar(station, toar_stats, sampling, data_origin):
84 try:
85 # load data from toar-data (v2)
86 df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin)
87 except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError):
88 df_toar, meta_toar = None, None
90 try:
91 # load join data (toar-data v1)
92 df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling,
93 data_origin=data_origin)
94 except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError):
95 df_join, meta_join = None, None
97 # merge both data sources with priority on toar-data v2
98 if df_toar is not None and df_join is not None: 98 ↛ 99line 98 didn't jump to line 99, because the condition on line 98 was never true
99 df_merged = merge_toar_join(df_toar, df_join, sampling)
100 meta_merged = meta_toar
101 else:
102 df_merged = df_toar if df_toar is not None else df_join
103 meta_merged = meta_toar if df_toar is not None else meta_join
104 return df_merged, meta_merged
107def merge_toar_join(df_toar, df_join, sampling):
108 start_date = min([df_toar.index.min(), df_join.index.min()])
109 end_date = max([df_toar.index.max(), df_join.index.max()])
110 freq = {"hourly": "1H", "daily": "1d"}.get(sampling)
111 full_time = pd.date_range(start_date, end_date, freq=freq)
112 full_data = df_toar.reindex(full_time)
113 full_data.update(df_join, overwrite=False)
114 return full_data
117def correct_stat_name(stat: str) -> str:
118 """
119 Map given statistic name to new namespace defined by mapping dict.
121 Return given name stat if not element of mapping namespace.
123 :param stat: namespace from JOIN server
125 :return: stat mapped to local namespace
126 """
127 mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'}
128 return mapping.get(stat, stat)