Coverage for mlair/helpers/data_sources/ifs.py: 10%
66 statements
« prev ^ index » next coverage.py v6.4.2, created at 2023-12-18 17:51 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2023-12-18 17:51 +0000
1"""Methods to load ifs data."""
2__author__ = "Lukas Leufen, Michael Langgut"
3__date__ = "2023-06-07"
5import logging
6import os
7import re
8import glob
9from functools import partial
11import numpy as np
12import pandas as pd
13import xarray as xr
15from mlair import helpers
16from mlair.configuration.ifs_settings import ifs_settings
17from mlair.configuration.toar_data_v2_settings import toar_data_v2_settings
18from mlair.helpers.data_sources.toar_data_v2 import load_station_information, combine_meta_data, correct_timezone
19from mlair.helpers.data_sources.data_loader import EmptyQueryResult
20from mlair.helpers.meteo import relative_humidity_from_dewpoint
23def load_ifs(station_name, stat_var, sampling, data_origin, lead_time_dim, initial_time_dim, target_dim,
24 ifs_data_path=None, ifs_file_names=None):
26 # make sure station_name parameter is a list
27 station_name = helpers.to_list(station_name)
29 # get data path
30 data_path, file_names = ifs_settings(sampling, ifs_data_path=ifs_data_path, ifs_file_names=ifs_file_names)
32 # correct stat_var values if data is not aggregated (hourly)
33 if sampling == "hourly":
34 stat_var = {key: "values" for key in stat_var.keys()}
35 else:
36 raise ValueError(f"Given sampling {sampling} is not supported, only hourly sampling can be used.")
38 # load station meta using toar-data v2 API
39 meta_url_base, headers = toar_data_v2_settings("meta")
40 station_meta = load_station_information(station_name, meta_url_base, headers)
42 # sel data for station using sel method nearest
43 logging.info(f"load data for {station_meta['codes'][0]} from IFS")
44 try:
45 lon, lat = station_meta["coordinates"]["lng"], station_meta["coordinates"]["lat"]
46 file_names = sort_ifs_files(data_path)
47 with xr.open_mfdataset(file_names, preprocess=partial(preprocess_ifs_single_file, lon, lat),
48 concat_dim="initial_time", combine="nested") as data:
49 station_data = data.to_array().T.compute()
50 except OSError as e:
51 logging.info(f"Cannot load ifs data from path {data_path} and filenames {file_names} due to: {e}")
52 return None, None
54 if "relhum" in stat_var:
55 relhum = relative_humidity_from_dewpoint(station_data.sel(variable="d2m"), station_data.sel(variable="t2m"))
56 station_data = xr.concat([station_data, relhum.expand_dims({"variable": ["rhw"]})], dim="variable")
57 station_data.coords["variable"] = _rename_ifs_variables(station_data.coords["variable"].values)
59 # check if all requested variables are available
60 if set(stat_var).issubset(station_data.coords["variable"].values) is False:
61 missing_variables = set(stat_var).difference(station_data.coords["variable"].values)
62 origin = helpers.select_from_dict(data_origin, missing_variables)
63 options = f"station={station_name}, origin={origin}"
64 raise EmptyQueryResult(f"No data found for variables {missing_variables} and options {options} in JOIN.")
65 else:
66 station_data = station_data.sel(variable=list(stat_var.keys()))
68 # convert to local timezone
69 station_data.coords["initial_time"] = correct_timezone(station_data.sel(lead_time=0).to_pandas(), station_meta,
70 sampling).index
72 # rename lead time and initial time to MLAir's internal dimension names
73 station_data = station_data.rename({"lead_time": lead_time_dim, "initial_time": initial_time_dim,
74 "variable": target_dim})
76 variable_meta = _emulate_meta_data(station_data.coords[target_dim].values)
77 meta = combine_meta_data(station_meta, variable_meta)
78 meta = pd.DataFrame.from_dict(meta, orient='index')
79 meta.columns = station_name
80 return station_data, meta
83def sort_ifs_files(data_path, pattern="sfc_*.nc"):
84 def sort_by_date(file_name):
85 match = re.search(r'(\d{8})_(\d{2})', file_name)
86 if match:
87 return match.group(1), match.group(2)
88 file_names = glob.glob(os.path.join(data_path, pattern))
89 return sorted(file_names, key=sort_by_date)
92def preprocess_ifs_single_file(lon, lat, ds):
93 """Select lon and lat from data file and transform valid time into lead time."""
94 ds = ds.sel(longitude=lon, latitude=lat, method="nearest", drop=True)
95 return expand_dims_initial_time(ds)
98def expand_dims_initial_time(ds):
99 """Create lead time from initial time and valid time."""
100 initial_time = ds.time[0]
101 lead_time = (ds.time - initial_time) / np.timedelta64(1, "h")
102 # ds = ds.expand_dims(dim={"initial_time": [initial_time.values], "lead_time": lead_time}, axis=(0, 1))
103 ds.coords["time"] = lead_time
104 ds = ds.rename({"time": "lead_time"})
105 ds = ds.expand_dims(dim={"initial_time": [initial_time.values]}, axis=0)
106 return ds
109def _emulate_meta_data(variables):
110 general_meta = {"sampling_frequency": "hourly", "data_origin": "model", "data_origin_type": "model"}
111 roles_meta = {"roles": [{"contact": {"organisation": {"name": "IFS", "longname": "ECMWF"}}}]}
112 variable_meta = {var: {"variable": {"name": var}, **roles_meta, ** general_meta} for var in variables}
113 return variable_meta
116def _rename_ifs_variables(ifs_names):
117 mapper = {"sp": "press", "u10": "u", "v10": "v", "t2m": "temp", "d2m": "dew", "blh": "pblheight",
118 "tcc": "cloudcover", "rhw": "relhum"}
119 ifs_names = list(ifs_names)
120 try:
121 join_names = list(map(lambda x: mapper.get(x, x), ifs_names))
122 return join_names
123 except KeyError as e:
124 raise KeyError(f"Cannot map names from ifs to join naming convention: {e}")