Coverage for toardb/utils/utils.py: 83%
307 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH
2# SPDX-License-Identifier: MIT
4"""
5Helper functions for TOAR database
7"""
8from sqlalchemy import Table, and_
9from sqlalchemy.orm import Session
10from sqlalchemy.inspection import inspect
11from sqlalchemy.dialects import postgresql
12from fastapi import HTTPException, Request, Header, Depends
13from starlette.datastructures import QueryParams
14from collections import namedtuple
15from copy import copy
16import requests
17import datetime as dt
18from typing import List
20from toardb.utils.settings import base_geopeas_url, userinfo_endpoint
21from toardb.utils.deployment_settings import dashboard_token
23# the following statement only if not in testing (pytest) mode!
24from toardb.utils.database import get_db
25from toardb.contacts.models import Contact, Organisation, Person
26from toardb.timeseries.models import Timeseries, TimeseriesRole, timeseries_timeseries_roles_table
27from toardb.stationmeta.models import StationmetaCore, StationmetaGlobal
28from toardb.data.models import Data
29from toardb.variables.models import Variable
30from toardb.auth_user.models import AuthUser
31from toardb.auth_user.crud import get_eduperson_and_roles
32import toardb
34roles_params = {column.name for column in inspect(TimeseriesRole).c} - {"id"}
37def get_access_rights(request: Request, access_right: str = 'admin', incr: List[int] = [0, 1], db: Session = None):
38 # Do not use underscores; they are not valid in header attributes!
39 user_name = ''
40 user_email = ''
41 auth_user_id = -1
42 role = 'anonymous'
43 personinfo = get_eduperson_and_roles(request=request, db=db, DoIncr=incr)
44 status_code = personinfo['status_code']
45 if status_code != 401:
46 ltoken = True
47 userinfo = personinfo['userinfo']
48 role = personinfo['role']
49 if ("eduperson_entitlement" in userinfo and \
50 f"urn:geant:helmholtz.de:res:toar-data{access_right}#login.helmholtz.de" \
51 in userinfo["eduperson_entitlement"]) or access_right in [":data-download", ":map-data-download"]:
52 user_name = userinfo["name"]
53 user_email = userinfo["email"]
54 db_user = db.query(AuthUser).filter(AuthUser.email == user_email).first()
55 if db_user:
56 auth_user_id = db_user.id
57 else:
58 status_code = 401
59 if access_right == ":data-download" and (personinfo["max_timeseries"] is not None):
60 if personinfo["num_timeseries"] > personinfo["max_timeseries"]:
61 status_code = 401
62 if access_right == ":map-data-download" and (personinfo["max_gridded"] is not None):
63 if personinfo["num_gridded"] > personinfo["max_gridded"]:
64 status_code = 401
65 else:
66 role = 'unauthorized'
67 access_dict = { "status_code": status_code,
68 "user_name": user_name,
69 "user_email": user_email,
70 "auth_user_id": auth_user_id,
71 "role": role }
72 return access_dict
75def get_admin_access_rights(request: Request, db: Session = Depends(get_db)):
76 return get_access_rights(request, ':admin', db=db)
79def get_station_md_change_access_rights(request: Request, db: Session = Depends(get_db)):
80 return get_access_rights(request, ':station-md-change', db=db)
83def get_timeseries_md_change_access_rights(request: Request, db: Session = Depends(get_db)):
84 return get_access_rights(request, ':timeseries-md-change', db=db)
87def get_data_change_access_rights(request: Request, db: Session = Depends(get_db)):
88 return get_access_rights(request, ':data-change', db=db)
91def get_register_contributors_access_rights(request: Request, db: Session = Depends(get_db)):
92 return get_access_rights(request, ':contributors-register', db=db)
95def get_data_download_access_rights(request: Request, db: Session = Depends(get_db)):
96 # there is only need for (repeated) authorization via AAI, if request is not coming from the dashboard or any other service
97 lfromdashboard = request.headers.get('DashboardToken') == dashboard_token
98 if not lfromdashboard:
99 access = get_access_rights(request, ':data-download', [1, 1], db=db)
100 else:
101 access = {}
102 access['lfromdashboard'] = lfromdashboard
103 return access
106def get_map_data_download_access_rights(request: Request, db: Session = Depends(get_db)):
107 return get_access_rights(request, ':map-data-download', [2, 1])
110# function to return code for given value
111def get_str_from_value(enum_dict, value) -> str:
112 return tuple(filter(lambda x: x.value == value, enum_dict))[0].string
114def get_displaystr_from_value(enum_dict, value) -> str:
115 return tuple(filter(lambda x: x.value == value, enum_dict))[0].display_str
117# function to return value for given code
118def get_value_from_str(enum_dict, string) -> int:
119 try:
120 return tuple(filter(lambda x: x.string == string, enum_dict))[0].value
121 except:
122 raise ValueError(f"value not known: {string}")
124# function to return value for given display string
125def get_value_from_display_str(enum_dict, string) -> int:
126 return tuple(filter(lambda x: x.display_str == string, enum_dict))[0].value
129# get human readable fields, if database field is controlled (by vocabulary)
130def get_hr_value(table_str,field,value):
131 index = None
132 for mid in ['core', 'global', 'glob', 'annotations', 'roles', 'changelog']:
133 if table_str + f'_{mid}_' + field in toardb.toardb.controlled_fields:
134 index = table_str + f'_{mid}_' + field
135 if index:
136 vocabulary = getattr(toardb.toardb, toardb.toardb.controlled_fields[index])
137 value = get_str_from_value(vocabulary,int(value))
138 return value
141# translate filters that contain controlled vocabulary
142def translate_convoc_list(values, table, display_name):
143 try:
144 return [get_value_from_str(table,v) for v in values]
145 except ValueError:
146 raise HTTPException(status_code=470, detail=f"{display_name} not known: {values}")
149# expand subdicts except for additional_metadata
150def normalize_metadata(metadata):
151 normalized_metadata = {}
152 for key, val in metadata.items():
153 if isinstance(val, dict) and val and key != "additional_metadata":
154 normalized_metadata.update(
155 {
156 f"{key}_{sub_key}": sub_val
157 for sub_key, sub_val in normalize_metadata(val).items()
158 }
159 )
160 else:
161 normalized_metadata[key] = val
162 return normalized_metadata
165def pop_non_merged(query_params, to_remove=["daterange", "flags"]):
166 items = list(query_params.multi_items())
167 filtered_items = [(k, v) for k, v in items if k not in to_remove]
168 return QueryParams(filtered_items)
170#
171def create_filter(qps, endpoint):
173 # for ideas on how to create filter on special roles see:
174 # https://gitlab.jsc.fz-juelich.de/esde/toar-data/toardb_fastapi/-/issues/95#note_144292
176 # determine allowed query parameters (first *only* from Timeseries)
177 timeseries_params = {column.name for column in inspect(Timeseries).c} | {"has_role"}
178 timeseries_params = timeseries_params | {"additional_metadata-"}
179 gis_params = {"bounding_box", "altitude_range"}
180 if endpoint in ['search', 'timeseries']:
181 core_params = {column.name for column in inspect(StationmetaCore).c if column.name not in ['id']}
182 else:
183 core_params = {column.name for column in inspect(StationmetaCore).c}
184 core_params |= {"globalmeta", "station_additional_metadata-"}
185 global_params = {column.name for column in inspect(StationmetaGlobal).c if column.name not in ['id','station_id']}
186 data_params = {column.name for column in inspect(Data).c} | {"daterange", "format"}
187 timeseries_merged_params = data_params | {"station_code", "variable_id", "id"}
188 ambig_params = {"station_id", "station_changelog", "station_country", "station_additional_metadata"}
189 variable_params = {column.name for column in inspect(Variable).c}
190 person_params = {column.name for column in inspect(Person).c}
191 allrel_params = {"limit", "offset", "fields", "format"}
192 profiling_params = {"profile", "profile_format", "timing"}
194 # pagination
195 offset= int(qps.get("offset", 0))
196 try:
197 limit = int(qps.get("limit", 10))
198 except:
199 limit = qps.get("limit")
200 if limit == "None":
201 limit = None
202 else:
203 raise ValueError(f"Wrong value for limit given: {limit}")
205 # fields and format are no filter options
206 fields = qps.get("fields", "")
207 format = qps.get("format", 'json')
209 allowed_params = allrel_params.copy()
210 allowed_params |= profiling_params
211 if endpoint in {'stationmeta'}:
212 allowed_params |= gis_params | core_params | global_params
213 elif endpoint in {'timeseries'}:
214 allowed_params |= timeseries_params | roles_params
215 elif endpoint in {'search'}:
216 allowed_params |= gis_params | core_params | global_params | timeseries_params | roles_params | ambig_params
217 elif endpoint in {'data'}:
218 allowed_params |= data_params | profiling_params
219 elif endpoint in {'timeseries_merged'}:
220 allowed_params |= timeseries_merged_params | profiling_params
221 elif endpoint in {'variables'}:
222 allowed_params |= variable_params
223 elif endpoint in {'persons'}:
224 allowed_params |= person_params
225 else:
226 raise ValueError(f"Wrong endpoint given: {endpoint}")
228 query_params = qps
229 if endpoint == 'timeseries_merged':
230 query_params = pop_non_merged(qps)
232 if fields:
233 for field in fields.split(','):
234 if field not in allowed_params:
235 raise ValueError(f"Wrong field given: {field}")
236 if fields.find("globalmeta") >= 0:
237 fields = fields.replace("globalmeta","")
238 fields += ','.join(global_params)
240 t_filter = []
241 t_r_filter = []
242 s_c_filter = []
243 s_g_filter = []
244 d_filter = []
245 v_filter = []
246 p_filter = []
247 # query_params is a multi-dict!
248 for param_long in query_params:
249 param = param_long.split('>')[0]
250 if param not in allowed_params: #inform user, that an unknown parameter name was used (this could be a typo and falsify the result!)
251 raise KeyError(f"An unknown argument was received: {param}.")
252 if param in allrel_params or param in profiling_params:
253 continue
254 if param == 'station_code':
255 param = 'codes'
256 values = [item.strip() for v in query_params.getlist(param_long) for item in v.split(',')]
257 # make sure ids are ints
258 if param.endswith("id"):
259 try:
260 int_values = [ int(v) for v in values ]
261 values = int_values
262 except:
263 raise ValueError(f"Wrong value (not int) given: {param}")
264 # allow '+' in datestring (for adding timezone information)
265 if param.endswith("date"):
266 try:
267 value = dt.datetime.fromisoformat(values[0])
268 except:
269 raise ValueError(f"Wrong value for time given: {values[0]}")
270 # request package transforms blank to '+' --> return to blank
271 try:
272 values = [ value.replace('+',' ') for value in values ]
273 except:
274 pass
275 if endpoint in ["stationmeta", "timeseries", "timeseries_merged", "search"] and param in core_params:
276 #check for parameters of the controlled vocabulary
277 if param == "timezone":
278 values = translate_convoc_list(values, toardb.toardb.TZ_vocabulary, "timezone")
279 elif param == "coordinate_validation_status":
280 values = translate_convoc_list(values, toardb.toardb.CV_vocabulary, "coordinate validation status")
281 elif param == "country":
282 values = translate_convoc_list(values, toardb.toardb.CN_vocabulary, "country")
283 elif param == "type":
284 values = translate_convoc_list(values, toardb.toardb.ST_vocabulary, "type")
285 elif param == "type_of_area":
286 values = translate_convoc_list(values, toardb.toardb.TA_vocabulary, "type of area")
287 elif param == "station_additional_metadata-":
288 param = f"{param_long[8:]}"
289 # exceptions for special fields (codes, name)
290 if param == 'codes':
291 tmp_filter = []
292 for v in values:
293 tmp_filter.append(f"('{v}'=ANY(stationmeta_core.codes))")
294 tmp_filter = " OR ".join(tmp_filter)
295 s_c_filter.append(f"({tmp_filter})")
296 elif param == 'name':
297 s_c_filter.append(f"LOWER(stationmeta_core.name) LIKE '%{values[0].lower()}%'")
298 elif param_long.split('>')[0] == "station_additional_metadata-":
299 val_mod = [ f"'\"{val}\"'::text" for val in values ]
300 values = ",".join(val_mod)
301 s_c_filter.append(f"to_json(stationmeta_core.{param})::text IN ({values})")
302 else:
303 s_c_filter.append(f"stationmeta_core.{param} IN {values}")
304 elif endpoint in ["stationmeta", "search"] and param in global_params:
305 if param == "climatic_zone_year2016":
306 values = translate_convoc_list(values, toardb.toardb.CZ_vocabulary, "climatic zone year2016")
307 elif param == "toar1_category":
308 values = translate_convoc_list(values, toardb.toardb.TC_vocabulary, "TOAR-I category")
309 elif param == "toar2_category":
310 values = translate_convoc_list(values, toardb.toardb.TA_vocabulary, "TOAR-II category")
311 elif param == "htap_region_tier1_year2010":
312 values = translate_convoc_list(values, toardb.toardb.TR_vocabulary, "HTAP region TIER1 year2010")
313 elif param == "dominant_landcover_year2012":
314 values = translate_convoc_list(values, toardb.toardb.LC_vocabulary, "landcover type")
315 elif param == "dominant_ecoregion_year2017":
316 values = translate_convoc_list(values, toardb.toardb.ER_vocabulary, "ECO region type")
317 s_g_filter.append(f"stationmeta_global.{param} IN {values}")
318 elif endpoint in ["stationmeta", "search"] and param in gis_params:
319 if param == "bounding_box":
320 min_lat, min_lon, max_lat, max_lon = values
321 bbox= f'SRID=4326;POLYGON (({min_lon} {min_lat}, {min_lon} {max_lat}, {max_lon} {max_lat}, {max_lon} {min_lat}, {min_lon} {min_lat}))'
322 s_c_filter.append(f"ST_CONTAINS(ST_GeomFromEWKT('{bbox}'), coordinates)")
323 else:
324 s_c_filter.append(f"ST_Z(coordinates) BETWEEN {values[0]} AND {values[1]}")
325 elif endpoint in ["timeseries", "timeseries_merged", "search"]:
326 #check for parameters of the controlled vocabulary
327 if param == "sampling_frequency":
328 values = translate_convoc_list(values, toardb.toardb.SF_vocabulary, "sampling_frequency")
329 elif param == "aggregation":
330 values = translate_convoc_list(values, toardb.toardb.AT_vocabulary, "aggregation")
331 elif param == "data_origin_type":
332 values = translate_convoc_list(values, toardb.toardb.OT_vocabulary, "data origin type")
333 elif param == "data_origin":
334 values = translate_convoc_list(values, toardb.toardb.DO_vocabulary, "data origin")
335 elif param == "additional_metadata-":
336 param = param_long
337 if param == "additional_metadata->'absorption_cross_section'":
338 trlist = translate_convoc_list(values, toardb.toardb.CS_vocabulary, "absorption_cross_section")
339 values = [ str(val) for val in trlist ]
340 param = f"timeseries.{param}"
341 elif param == "additional_metadata->'sampling_type'":
342 trlist = translate_convoc_list(values, toardb.toardb.KS_vocabulary, "sampling_type")
343 values = [ str(val) for val in trlist ]
344 param = f"timeseries.{param}"
345 elif param == "additional_metadata->'calibration_type'":
346 trlist = translate_convoc_list(values, toardb.toardb.CT_vocabulary, "calibration_type")
347 values = [ str(val) for val in trlist ]
348 param = f"timeseries.{param}"
349 else:
350 val_mod = [ f"'\"{val}\"'::text" for val in values ]
351 values = "(" + ",".join(val_mod) + ")"
352 param = f"to_json(timeseries.{param})::text"
353 if param == "has_role":
354 operator = "IN"
355 join_operator = "OR"
356 if (values[0][0] == '~'):
357 operator = "NOT IN"
358 join_operator = "AND"
359 values[0] = values[0][1:]
360 t_r_filter.append(f"organisations.longname {operator} {values}")
361 t_r_filter.append(f"organisations.name {operator} {values}")
362 t_r_filter.append(f"organisations.city {operator} {values}")
363 t_r_filter.append(f"organisations.homepage {operator} {values}")
364 t_r_filter.append(f"organisations.contact_url {operator} {values}")
365 t_r_filter.append(f"persons.email {operator} {values}")
366 t_r_filter.append(f"persons.name {operator} {values}")
367 t_r_filter.append(f"persons.orcid {operator} {values}")
368 t_r_filter = f" {join_operator} ".join(t_r_filter)
369 elif param_long.split('>')[0] == "additional_metadata-":
370 t_filter.append(f"{param} IN {values}")
371 else:
372 t_filter.append(f"timeseries.{param} IN {values}")
373 elif param in data_params:
374 if param == "daterange":
375 start_date = dt.datetime.fromisoformat(values[0])
376 stop_date = dt.datetime.fromisoformat(values[1])
377 d_filter.append(f"datetime BETWEEN '{start_date}' AND '{stop_date}'")
378 elif param in ("flags", "format"):
379 # translation of flags should be done in data.crud
380 continue
381 else:
382 d_filter.append(f"data.{param} IN {values}")
383 elif endpoint == "variables":
384 v_filter.append(f"variables.{param} IN {values}")
385 elif param in person_params:
386 p_filter.append(f"persons.{param} IN {values}")
389 t_filter = " AND ".join(t_filter).replace('[','(').replace(']',')')
390 t_r_filter = '(' + "".join(t_r_filter).replace('[','(').replace(']',')') + ')'
391 if t_r_filter == '()':
392 t_r_filter = ''
393 s_c_filter = " AND ".join(s_c_filter).replace('[','(').replace(']',')')
394 s_g_filter = " AND ".join(s_g_filter).replace('[','(').replace(']',')')
395 d_filter = " AND ".join(d_filter).replace('[','(').replace(']',')')
396 v_filter = " AND ".join(v_filter).replace('[','(').replace(']',')')
397 p_filter = " AND ".join(p_filter).replace('[','(').replace(']',')')
398 filters = {
399 "t_filter": t_filter,
400 "t_r_filter":t_r_filter,
401 "s_c_filter": s_c_filter,
402 "s_g_filter": s_g_filter,
403 "d_filter": d_filter,
404 "v_filter": v_filter,
405 "p_filter": p_filter,
406 }
407 return limit, offset, fields, format, filters
410###
411# Rasdaman does not run stable!
412# --> since GEO PEAS runs in DEBUG mode, its messages flood the error log file!
413#
414# only activate the following lines if you want to update pages!
415###
416# also get provenance information
417geopeas_services = [ 'topography_srtm', 'ecoregion', 'stable_nightlights',
418 'climatic_zone', 'nox_emissions', 'landcover',
419 'major_road', 'population_density', 'htap_region_tier1' ]
420provenance = {}
421#for service in geopeas_services:
422# result = requests.get(f"{base_geopeas_url}/{service}/").json()
423# provenance[service] = result['provenance']
424# # major_road has different dict entries
425# tmp_provenance = copy(provenance[service])
426# # htap_region_tier1 does not provide a dict
427# try:
428# keys = [x for x in result['provenance'].keys()]
429# for key in keys:
430# if key not in ['units', 'data_source', 'citation', 'doi']:
431# result['provenance'].pop(key)
432# if len(result['provenance']) == 0:
433# provenance[service] = tmp_provenance
434# except:
435# continue
436for service in geopeas_services:
437 provenance[service] = "dummy text"