Coverage for mlair/helpers/data_sources/toar_data.py: 86%

58 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-12-02 15:24 +0000

1__author__ = "Lukas Leufen" 

2__date__ = "2022-07-05" 

3 

4 

5from typing import Union, List, Dict 

6 

7from . import join, toar_data_v2 

8 

9import requests 

10from requests.adapters import HTTPAdapter 

11from requests.packages.urllib3.util.retry import Retry 

12import pandas as pd 

13 

14 

15class EmptyQueryResult(Exception): 

16 """Exception that get raised if a query to JOIN returns empty results.""" 

17 

18 pass 

19 

20 

21def create_url(base: str, service: str, param_id: Union[str, int, None] = None, 

22 **kwargs: Union[str, int, float, None]) -> str: 

23 """ 

24 Create a request url with given base url, service type and arbitrarily many additional keyword arguments. 

25 

26 :param base: basic url of the rest service 

27 :param service: service type, e.g. series, stats 

28 :param param_id: id for a distinct service, is added between ending / of service and ? of kwargs 

29 :param kwargs: keyword pairs for optional request specifications, e.g. 'statistics=maximum' 

30 

31 :return: combined url as string 

32 """ 

33 url = f"{base}" 

34 if not url.endswith("/"): 

35 url += "/" 

36 if service is not None: 

37 url = f"{url}{service}" 

38 if not url.endswith("/"): 

39 url += "/" 

40 if param_id is not None: 

41 url = f"{url}{param_id}" 

42 if len(kwargs) > 0: 

43 url = f"{url}?{'&'.join(f'{k}={v}' for k, v in kwargs.items() if v is not None)}" 

44 return url 

45 

46 

47def get_data(opts: Dict, headers: Dict, as_json: bool = True) -> Union[Dict, List, str]: 

48 """ 

49 Download join data using requests framework. 

50 

51 Data is returned as json like structure. Depending on the response structure, this can lead to a list or dictionary. 

52 

53 :param opts: options to create the request url 

54 :param headers: additional headers information like authorization, can be empty 

55 :param as_json: extract response as json if true (default True) 

56 

57 :return: requested data (either as list or dictionary) 

58 """ 

59 url = create_url(**opts) 

60 try: 

61 response = retries_session().get(url, headers=headers, timeout=(5, None)) # timeout=(open, read) 

62 if response.status_code == 200: 

63 return response.json() if as_json is True else response.text 

64 else: 

65 raise EmptyQueryResult(f"There was an error (STATUS {response.status_code}) for request {url}") 

66 except requests.exceptions.RetryError as e: 

67 raise EmptyQueryResult(f"There was an RetryError for request {url}: {e}") 

68 

69 

70def retries_session(max_retries=3): 

71 retry_strategy = Retry(total=max_retries, 

72 backoff_factor=0.1, 

73 status_forcelist=[429, 500, 502, 503, 504], 

74 method_whitelist=["HEAD", "GET", "OPTIONS"]) 

75 adapter = HTTPAdapter(max_retries=retry_strategy) 

76 http = requests.Session() 

77 http.mount("https://", adapter) 

78 http.mount("http://", adapter) 

79 return http 

80 

81 

82def download_toar(station, toar_stats, sampling, data_origin): 

83 

84 try: 

85 # load data from toar-data (v2) 

86 df_toar, meta_toar = toar_data_v2.download_toar(station, toar_stats, sampling=sampling, data_origin=data_origin) 

87 except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError): 

88 df_toar, meta_toar = None, None 

89 

90 try: 

91 # load join data (toar-data v1) 

92 df_join, meta_join = join.download_join(station_name=station, stat_var=toar_stats, sampling=sampling, 

93 data_origin=data_origin) 

94 except (AttributeError, EmptyQueryResult, KeyError, requests.ConnectionError, ValueError, IndexError): 

95 df_join, meta_join = None, None 

96 

97 # merge both data sources with priority on toar-data v2 

98 if df_toar is not None and df_join is not None: 98 ↛ 99line 98 didn't jump to line 99, because the condition on line 98 was never true

99 df_merged = merge_toar_join(df_toar, df_join, sampling) 

100 meta_merged = meta_toar 

101 else: 

102 df_merged = df_toar if df_toar is not None else df_join 

103 meta_merged = meta_toar if df_toar is not None else meta_join 

104 return df_merged, meta_merged 

105 

106 

107def merge_toar_join(df_toar, df_join, sampling): 

108 start_date = min([df_toar.index.min(), df_join.index.min()]) 

109 end_date = max([df_toar.index.max(), df_join.index.max()]) 

110 freq = {"hourly": "1H", "daily": "1d"}.get(sampling) 

111 full_time = pd.date_range(start_date, end_date, freq=freq) 

112 full_data = df_toar.reindex(full_time) 

113 full_data.update(df_join, overwrite=False) 

114 return full_data 

115 

116 

117def correct_stat_name(stat: str) -> str: 

118 """ 

119 Map given statistic name to new namespace defined by mapping dict. 

120 

121 Return given name stat if not element of mapping namespace. 

122 

123 :param stat: namespace from JOIN server 

124 

125 :return: stat mapped to local namespace 

126 """ 

127 mapping = {'average_values': 'mean', 'maximum': 'max', 'minimum': 'min'} 

128 return mapping.get(stat, stat)