Coverage for mlair/helpers/data_sources/ifs.py: 10%

66 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-12-18 17:51 +0000

1"""Methods to load ifs data.""" 

2__author__ = "Lukas Leufen, Michael Langgut" 

3__date__ = "2023-06-07" 

4 

5import logging 

6import os 

7import re 

8import glob 

9from functools import partial 

10 

11import numpy as np 

12import pandas as pd 

13import xarray as xr 

14 

15from mlair import helpers 

16from mlair.configuration.ifs_settings import ifs_settings 

17from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings 

18from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone 

19from mlair.helpers.data_sources.data_loader import EmptyQueryResult 

20from mlair.helpers.meteo import relative_humidity_from_dewpoint 

21 

22 

23def load_ifs(station_name, stat_var, sampling, data_origin, lead_time_dim, initial_time_dim, target_dim, 

24 ifs_data_path=None, ifs_file_names=None): 

25 

26 # make sure station_name parameter is a list 

27 station_name = helpers.to_list(station_name) 

28 

29 # get data path 

30 data_path, file_names = ifs_settings(sampling, ifs_data_path=ifs_data_path, ifs_file_names=ifs_file_names) 

31 

32 # correct stat_var values if data is not aggregated (hourly) 

33 if sampling == "hourly": 

34 stat_var = {key: "values" for key in stat_var.keys()} 

35 else: 

36 raise ValueError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.") 

37 

38 # load station meta using toar-data v2 API 

39 meta_url_base, headers = toar_data_v2_settings("meta") 

40 station_meta = load_station_information(station_name, meta_url_base, headers) 

41 

42 # sel data for station using sel method nearest 

43 logging.info(f"load data for {station_meta['codes'][0]} from IFS") 

44 try: 

45 lon, lat = station_meta["coordinates"]["lng"], station_meta["coordinates"]["lat"] 

46 file_names = sort_ifs_files(data_path) 

47 with xr.open_mfdataset(file_names, preprocess=partial(preprocess_ifs_single_file, lon, lat), 

48 concat_dim="initial_time", combine="nested") as data: 

49 station_data = data.to_array().T.compute() 

50 except OSError as e: 

51 logging.info(f"Cannot load ifs data from path {data_path} and filenames {file_names} due to: {e}") 

52 return None, None 

53 

54 if "relhum" in stat_var: 

55 relhum = relative_humidity_from_dewpoint(station_data.sel(variable="d2m"), station_data.sel(variable="t2m")) 

56 station_data = xr.concat([station_data, relhum.expand_dims({"variable": ["rhw"]})], dim="variable") 

57 station_data.coords["variable"] = _rename_ifs_variables(station_data.coords["variable"].values) 

58 

59 # check if all requested variables are available 

60 if set(stat_var).issubset(station_data.coords["variable"].values) is False: 

61 missing_variables = set(stat_var).difference(station_data.coords["variable"].values) 

62 origin = helpers.select_from_dict(data_origin, missing_variables) 

63 options = f"station={station_name}, origin={origin}" 

64 raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.") 

65 else: 

66 station_data = station_data.sel(variable=list(stat_var.keys())) 

67 

68 # convert to local timezone 

69 station_data.coords["initial_time"] = correct_timezone(station_data.sel(lead_time=0).to_pandas(), station_meta, 

70 sampling).index 

71 

72 # rename lead time and initial time to MLAir's internal dimension names 

73 station_data = station_data.rename({"lead_time": lead_time_dim, "initial_time": initial_time_dim, 

74 "variable": target_dim}) 

75 

76 variable_meta = _emulate_meta_data(station_data.coords[target_dim].values) 

77 meta = combine_meta_data(station_meta, variable_meta) 

78 meta = pd.DataFrame.from_dict(meta, orient='index') 

79 meta.columns = station_name 

80 return station_data, meta 

81 

82 

83def sort_ifs_files(data_path, pattern="sfc_*.nc"): 

84 def sort_by_date(file_name): 

85 match = re.search(r'(\d{8})_(\d{2})', file_name) 

86 if match: 

87 return match.group(1), match.group(2) 

88 file_names = glob.glob(os.path.join(data_path, pattern)) 

89 return sorted(file_names, key=sort_by_date) 

90 

91 

92def preprocess_ifs_single_file(lon, lat, ds): 

93 """Select lon and lat from data file and transform valid time into lead time.""" 

94 ds = ds.sel(longitude=lon, latitude=lat, method="nearest", drop=True) 

95 return expand_dims_initial_time(ds) 

96 

97 

98def expand_dims_initial_time(ds): 

99 """Create lead time from initial time and valid time.""" 

100 initial_time = ds.time[0] 

101 lead_time = (ds.time - initial_time) / np.timedelta64(1, "h") 

102 # ds = ds.expand_dims(dim={"initial_time": [initial_time.values], "lead_time": lead_time}, axis=(0, 1)) 

103 ds.coords["time"] = lead_time 

104 ds = ds.rename({"time": "lead_time"}) 

105 ds = ds.expand_dims(dim={"initial_time": [initial_time.values]}, axis=0) 

106 return ds 

107 

108 

109def _emulate_meta_data(variables): 

110 general_meta = {"sampling_frequency": "hourly", "data_origin": "model", "data_origin_type": "model"} 

111 roles_meta = {"roles": [{"contact": {"organisation": {"name": "IFS", "longname": "ECMWF"}}}]} 

112 variable_meta = {var: {"variable": {"name": var}, **roles_meta, ** general_meta} for var in variables} 

113 return variable_meta 

114 

115 

116def _rename_ifs_variables(ifs_names): 

117 mapper = {"sp": "press", "u10": "u", "v10": "v", "t2m": "temp", "d2m": "dew", "blh": "pblheight", 

118 "tcc": "cloudcover", "rhw": "relhum"} 

119 ifs_names = list(ifs_names) 

120 try: 

121 join_names = list(map(lambda x: mapper.get(x, x), ifs_names)) 

122 return join_names 

123 except KeyError as e: 

124 raise KeyError(f"Cannot map names from ifs to join naming convention: {e}")