Coverage for toardb/utils/utils.py: 83%

307 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 20:32 +0000

1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH 

2# SPDX-License-Identifier: MIT 

3 

4""" 

5Helper functions for TOAR database 

6 

7""" 

8from sqlalchemy import Table, and_ 

9from sqlalchemy.orm import Session 

10from sqlalchemy.inspection import inspect 

11from sqlalchemy.dialects import postgresql 

12from fastapi import HTTPException, Request, Header, Depends 

13from starlette.datastructures import QueryParams 

14from collections import namedtuple 

15from copy import copy 

16import requests 

17import datetime as dt 

18from typing import List 

19 

20from toardb.utils.settings import base_geopeas_url, userinfo_endpoint 

21from toardb.utils.deployment_settings import dashboard_token 

22 

23# the following statement only if not in testing (pytest) mode! 

24from toardb.utils.database import get_db 

25from toardb.contacts.models import Contact, Organisation, Person 

26from toardb.timeseries.models import Timeseries, TimeseriesRole, timeseries_timeseries_roles_table 

27from toardb.stationmeta.models import StationmetaCore, StationmetaGlobal 

28from toardb.data.models import Data 

29from toardb.variables.models import Variable 

30from toardb.auth_user.models import AuthUser 

31from toardb.auth_user.crud import get_eduperson_and_roles 

32import toardb 

33 

34roles_params = {column.name for column in inspect(TimeseriesRole).c} - {"id"} 

35 

36 

37def get_access_rights(request: Request, access_right: str = 'admin', incr: List[int] = [0, 1], db: Session = None): 

38 # Do not use underscores; they are not valid in header attributes! 

39 user_name = '' 

40 user_email = '' 

41 auth_user_id = -1 

42 role = 'anonymous' 

43 personinfo = get_eduperson_and_roles(request=request, db=db, DoIncr=incr) 

44 status_code = personinfo['status_code'] 

45 if status_code != 401: 

46 ltoken = True 

47 userinfo = personinfo['userinfo'] 

48 role = personinfo['role'] 

49 if ("eduperson_entitlement" in userinfo and \ 

50 f"urn:geant:helmholtz.de:res:toar-data{access_right}#login.helmholtz.de" \ 

51 in userinfo["eduperson_entitlement"]) or access_right in [":data-download", ":map-data-download"]: 

52 user_name = userinfo["name"] 

53 user_email = userinfo["email"] 

54 db_user = db.query(AuthUser).filter(AuthUser.email == user_email).first() 

55 if db_user: 

56 auth_user_id = db_user.id 

57 else: 

58 status_code = 401 

59 if access_right == ":data-download" and (personinfo["max_timeseries"] is not None): 

60 if personinfo["num_timeseries"] > personinfo["max_timeseries"]: 

61 status_code = 401 

62 if access_right == ":map-data-download" and (personinfo["max_gridded"] is not None): 

63 if personinfo["num_gridded"] > personinfo["max_gridded"]: 

64 status_code = 401 

65 else: 

66 role = 'unauthorized' 

67 access_dict = { "status_code": status_code, 

68 "user_name": user_name, 

69 "user_email": user_email, 

70 "auth_user_id": auth_user_id, 

71 "role": role } 

72 return access_dict 

73 

74 

75def get_admin_access_rights(request: Request, db: Session = Depends(get_db)): 

76 return get_access_rights(request, ':admin', db=db) 

77 

78 

79def get_station_md_change_access_rights(request: Request, db: Session = Depends(get_db)): 

80 return get_access_rights(request, ':station-md-change', db=db) 

81 

82 

83def get_timeseries_md_change_access_rights(request: Request, db: Session = Depends(get_db)): 

84 return get_access_rights(request, ':timeseries-md-change', db=db) 

85 

86 

87def get_data_change_access_rights(request: Request, db: Session = Depends(get_db)): 

88 return get_access_rights(request, ':data-change', db=db) 

89 

90 

91def get_register_contributors_access_rights(request: Request, db: Session = Depends(get_db)): 

92 return get_access_rights(request, ':contributors-register', db=db) 

93 

94 

95def get_data_download_access_rights(request: Request, db: Session = Depends(get_db)): 

96 # there is only need for (repeated) authorization via AAI, if request is not coming from the dashboard or any other service 

97 lfromdashboard = request.headers.get('DashboardToken') == dashboard_token 

98 if not lfromdashboard: 

99 access = get_access_rights(request, ':data-download', [1, 1], db=db) 

100 else: 

101 access = {} 

102 access['lfromdashboard'] = lfromdashboard 

103 return access 

104 

105 

106def get_map_data_download_access_rights(request: Request, db: Session = Depends(get_db)): 

107 return get_access_rights(request, ':map-data-download', [2, 1]) 

108 

109 

110# function to return code for given value 

111def get_str_from_value(enum_dict, value) -> str: 

112 return tuple(filter(lambda x: x.value == value, enum_dict))[0].string 

113 

114def get_displaystr_from_value(enum_dict, value) -> str: 

115 return tuple(filter(lambda x: x.value == value, enum_dict))[0].display_str 

116 

117# function to return value for given code 

118def get_value_from_str(enum_dict, string) -> int: 

119 try: 

120 return tuple(filter(lambda x: x.string == string, enum_dict))[0].value 

121 except: 

122 raise ValueError(f"value not known: {string}") 

123 

124# function to return value for given display string 

125def get_value_from_display_str(enum_dict, string) -> int: 

126 return tuple(filter(lambda x: x.display_str == string, enum_dict))[0].value 

127 

128 

129# get human readable fields, if database field is controlled (by vocabulary) 

130def get_hr_value(table_str,field,value): 

131 index = None 

132 for mid in ['core', 'global', 'glob', 'annotations', 'roles', 'changelog']: 

133 if table_str + f'_{mid}_' + field in toardb.toardb.controlled_fields: 

134 index = table_str + f'_{mid}_' + field 

135 if index: 

136 vocabulary = getattr(toardb.toardb, toardb.toardb.controlled_fields[index]) 

137 value = get_str_from_value(vocabulary,int(value)) 

138 return value 

139 

140 

141# translate filters that contain controlled vocabulary 

142def translate_convoc_list(values, table, display_name): 

143 try: 

144 return [get_value_from_str(table,v) for v in values] 

145 except ValueError: 

146 raise HTTPException(status_code=470, detail=f"{display_name} not known: {values}") 

147 

148 

149# expand subdicts except for additional_metadata 

150def normalize_metadata(metadata): 

151 normalized_metadata = {} 

152 for key, val in metadata.items(): 

153 if isinstance(val, dict) and val and key != "additional_metadata": 

154 normalized_metadata.update( 

155 { 

156 f"{key}_{sub_key}": sub_val 

157 for sub_key, sub_val in normalize_metadata(val).items() 

158 } 

159 ) 

160 else: 

161 normalized_metadata[key] = val 

162 return normalized_metadata 

163 

164 

165def pop_non_merged(query_params, to_remove=["daterange", "flags"]): 

166 items = list(query_params.multi_items()) 

167 filtered_items = [(k, v) for k, v in items if k not in to_remove] 

168 return QueryParams(filtered_items) 

169 

170# 

171def create_filter(qps, endpoint): 

172 

173 # for ideas on how to create filter on special roles see: 

174 # https://gitlab.jsc.fz-juelich.de/esde/toar-data/toardb_fastapi/-/issues/95#note_144292 

175 

176 # determine allowed query parameters (first *only* from Timeseries) 

177 timeseries_params = {column.name for column in inspect(Timeseries).c} | {"has_role"} 

178 timeseries_params = timeseries_params | {"additional_metadata-"} 

179 gis_params = {"bounding_box", "altitude_range"} 

180 if endpoint in ['search', 'timeseries']: 

181 core_params = {column.name for column in inspect(StationmetaCore).c if column.name not in ['id']} 

182 else: 

183 core_params = {column.name for column in inspect(StationmetaCore).c} 

184 core_params |= {"globalmeta", "station_additional_metadata-"} 

185 global_params = {column.name for column in inspect(StationmetaGlobal).c if column.name not in ['id','station_id']} 

186 data_params = {column.name for column in inspect(Data).c} | {"daterange", "format"} 

187 timeseries_merged_params = data_params | {"station_code", "variable_id", "id"} 

188 ambig_params = {"station_id", "station_changelog", "station_country", "station_additional_metadata"} 

189 variable_params = {column.name for column in inspect(Variable).c} 

190 person_params = {column.name for column in inspect(Person).c} 

191 allrel_params = {"limit", "offset", "fields", "format"} 

192 profiling_params = {"profile", "profile_format", "timing"} 

193 

194 # pagination 

195 offset= int(qps.get("offset", 0)) 

196 try: 

197 limit = int(qps.get("limit", 10)) 

198 except: 

199 limit = qps.get("limit") 

200 if limit == "None": 

201 limit = None 

202 else: 

203 raise ValueError(f"Wrong value for limit given: {limit}") 

204 

205 # fields and format are no filter options 

206 fields = qps.get("fields", "") 

207 format = qps.get("format", 'json') 

208 

209 allowed_params = allrel_params.copy() 

210 allowed_params |= profiling_params 

211 if endpoint in {'stationmeta'}: 

212 allowed_params |= gis_params | core_params | global_params 

213 elif endpoint in {'timeseries'}: 

214 allowed_params |= timeseries_params | roles_params 

215 elif endpoint in {'search'}: 

216 allowed_params |= gis_params | core_params | global_params | timeseries_params | roles_params | ambig_params 

217 elif endpoint in {'data'}: 

218 allowed_params |= data_params | profiling_params 

219 elif endpoint in {'timeseries_merged'}: 

220 allowed_params |= timeseries_merged_params | profiling_params 

221 elif endpoint in {'variables'}: 

222 allowed_params |= variable_params 

223 elif endpoint in {'persons'}: 

224 allowed_params |= person_params 

225 else: 

226 raise ValueError(f"Wrong endpoint given: {endpoint}") 

227 

228 query_params = qps 

229 if endpoint == 'timeseries_merged': 

230 query_params = pop_non_merged(qps) 

231 

232 if fields: 

233 for field in fields.split(','): 

234 if field not in allowed_params: 

235 raise ValueError(f"Wrong field given: {field}") 

236 if fields.find("globalmeta") >= 0: 

237 fields = fields.replace("globalmeta","") 

238 fields += ','.join(global_params) 

239 

240 t_filter = [] 

241 t_r_filter = [] 

242 s_c_filter = [] 

243 s_g_filter = [] 

244 d_filter = [] 

245 v_filter = [] 

246 p_filter = [] 

247 # query_params is a multi-dict! 

248 for param_long in query_params: 

249 param = param_long.split('>')[0] 

250 if param not in allowed_params: #inform user, that an unknown parameter name was used (this could be a typo and falsify the result!) 

251 raise KeyError(f"An unknown argument was received: {param}.") 

252 if param in allrel_params or param in profiling_params: 

253 continue 

254 if param == 'station_code': 

255 param = 'codes' 

256 values = [item.strip() for v in query_params.getlist(param_long) for item in v.split(',')] 

257 # make sure ids are ints 

258 if param.endswith("id"): 

259 try: 

260 int_values = [ int(v) for v in values ] 

261 values = int_values 

262 except: 

263 raise ValueError(f"Wrong value (not int) given: {param}") 

264 # allow '+' in datestring (for adding timezone information) 

265 if param.endswith("date"): 

266 try: 

267 value = dt.datetime.fromisoformat(values[0]) 

268 except: 

269 raise ValueError(f"Wrong value for time given: {values[0]}") 

270 # request package transforms blank to '+' --> return to blank 

271 try: 

272 values = [ value.replace('+',' ') for value in values ] 

273 except: 

274 pass 

275 if endpoint in ["stationmeta", "timeseries", "timeseries_merged", "search"] and param in core_params: 

276 #check for parameters of the controlled vocabulary 

277 if param == "timezone": 

278 values = translate_convoc_list(values, toardb.toardb.TZ_vocabulary, "timezone") 

279 elif param == "coordinate_validation_status": 

280 values = translate_convoc_list(values, toardb.toardb.CV_vocabulary, "coordinate validation status") 

281 elif param == "country": 

282 values = translate_convoc_list(values, toardb.toardb.CN_vocabulary, "country") 

283 elif param == "type": 

284 values = translate_convoc_list(values, toardb.toardb.ST_vocabulary, "type") 

285 elif param == "type_of_area": 

286 values = translate_convoc_list(values, toardb.toardb.TA_vocabulary, "type of area") 

287 elif param == "station_additional_metadata-": 

288 param = f"{param_long[8:]}" 

289 # exceptions for special fields (codes, name) 

290 if param == 'codes': 

291 tmp_filter = [] 

292 for v in values: 

293 tmp_filter.append(f"('{v}'=ANY(stationmeta_core.codes))") 

294 tmp_filter = " OR ".join(tmp_filter) 

295 s_c_filter.append(f"({tmp_filter})") 

296 elif param == 'name': 

297 s_c_filter.append(f"LOWER(stationmeta_core.name) LIKE '%{values[0].lower()}%'") 

298 elif param_long.split('>')[0] == "station_additional_metadata-": 

299 val_mod = [ f"'\"{val}\"'::text" for val in values ] 

300 values = ",".join(val_mod) 

301 s_c_filter.append(f"to_json(stationmeta_core.{param})::text IN ({values})") 

302 else: 

303 s_c_filter.append(f"stationmeta_core.{param} IN {values}") 

304 elif endpoint in ["stationmeta", "search"] and param in global_params: 

305 if param == "climatic_zone_year2016": 

306 values = translate_convoc_list(values, toardb.toardb.CZ_vocabulary, "climatic zone year2016") 

307 elif param == "toar1_category": 

308 values = translate_convoc_list(values, toardb.toardb.TC_vocabulary, "TOAR-I category") 

309 elif param == "toar2_category": 

310 values = translate_convoc_list(values, toardb.toardb.TA_vocabulary, "TOAR-II category") 

311 elif param == "htap_region_tier1_year2010": 

312 values = translate_convoc_list(values, toardb.toardb.TR_vocabulary, "HTAP region TIER1 year2010") 

313 elif param == "dominant_landcover_year2012": 

314 values = translate_convoc_list(values, toardb.toardb.LC_vocabulary, "landcover type") 

315 elif param == "dominant_ecoregion_year2017": 

316 values = translate_convoc_list(values, toardb.toardb.ER_vocabulary, "ECO region type") 

317 s_g_filter.append(f"stationmeta_global.{param} IN {values}") 

318 elif endpoint in ["stationmeta", "search"] and param in gis_params: 

319 if param == "bounding_box": 

320 min_lat, min_lon, max_lat, max_lon = values 

321 bbox= f'SRID=4326;POLYGON (({min_lon} {min_lat}, {min_lon} {max_lat}, {max_lon} {max_lat}, {max_lon} {min_lat}, {min_lon} {min_lat}))' 

322 s_c_filter.append(f"ST_CONTAINS(ST_GeomFromEWKT('{bbox}'), coordinates)") 

323 else: 

324 s_c_filter.append(f"ST_Z(coordinates) BETWEEN {values[0]} AND {values[1]}") 

325 elif endpoint in ["timeseries", "timeseries_merged", "search"]: 

326 #check for parameters of the controlled vocabulary 

327 if param == "sampling_frequency": 

328 values = translate_convoc_list(values, toardb.toardb.SF_vocabulary, "sampling_frequency") 

329 elif param == "aggregation": 

330 values = translate_convoc_list(values, toardb.toardb.AT_vocabulary, "aggregation") 

331 elif param == "data_origin_type": 

332 values = translate_convoc_list(values, toardb.toardb.OT_vocabulary, "data origin type") 

333 elif param == "data_origin": 

334 values = translate_convoc_list(values, toardb.toardb.DO_vocabulary, "data origin") 

335 elif param == "additional_metadata-": 

336 param = param_long 

337 if param == "additional_metadata->'absorption_cross_section'": 

338 trlist = translate_convoc_list(values, toardb.toardb.CS_vocabulary, "absorption_cross_section") 

339 values = [ str(val) for val in trlist ] 

340 param = f"timeseries.{param}" 

341 elif param == "additional_metadata->'sampling_type'": 

342 trlist = translate_convoc_list(values, toardb.toardb.KS_vocabulary, "sampling_type") 

343 values = [ str(val) for val in trlist ] 

344 param = f"timeseries.{param}" 

345 elif param == "additional_metadata->'calibration_type'": 

346 trlist = translate_convoc_list(values, toardb.toardb.CT_vocabulary, "calibration_type") 

347 values = [ str(val) for val in trlist ] 

348 param = f"timeseries.{param}" 

349 else: 

350 val_mod = [ f"'\"{val}\"'::text" for val in values ] 

351 values = "(" + ",".join(val_mod) + ")" 

352 param = f"to_json(timeseries.{param})::text" 

353 if param == "has_role": 

354 operator = "IN" 

355 join_operator = "OR" 

356 if (values[0][0] == '~'): 

357 operator = "NOT IN" 

358 join_operator = "AND" 

359 values[0] = values[0][1:] 

360 t_r_filter.append(f"organisations.longname {operator} {values}") 

361 t_r_filter.append(f"organisations.name {operator} {values}") 

362 t_r_filter.append(f"organisations.city {operator} {values}") 

363 t_r_filter.append(f"organisations.homepage {operator} {values}") 

364 t_r_filter.append(f"organisations.contact_url {operator} {values}") 

365 t_r_filter.append(f"persons.email {operator} {values}") 

366 t_r_filter.append(f"persons.name {operator} {values}") 

367 t_r_filter.append(f"persons.orcid {operator} {values}") 

368 t_r_filter = f" {join_operator} ".join(t_r_filter) 

369 elif param_long.split('>')[0] == "additional_metadata-": 

370 t_filter.append(f"{param} IN {values}") 

371 else: 

372 t_filter.append(f"timeseries.{param} IN {values}") 

373 elif param in data_params: 

374 if param == "daterange": 

375 start_date = dt.datetime.fromisoformat(values[0]) 

376 stop_date = dt.datetime.fromisoformat(values[1]) 

377 d_filter.append(f"datetime BETWEEN '{start_date}' AND '{stop_date}'") 

378 elif param in ("flags", "format"): 

379 # translation of flags should be done in data.crud 

380 continue 

381 else: 

382 d_filter.append(f"data.{param} IN {values}") 

383 elif endpoint == "variables": 

384 v_filter.append(f"variables.{param} IN {values}") 

385 elif param in person_params: 

386 p_filter.append(f"persons.{param} IN {values}") 

387 

388 

389 t_filter = " AND ".join(t_filter).replace('[','(').replace(']',')') 

390 t_r_filter = '(' + "".join(t_r_filter).replace('[','(').replace(']',')') + ')' 

391 if t_r_filter == '()': 

392 t_r_filter = '' 

393 s_c_filter = " AND ".join(s_c_filter).replace('[','(').replace(']',')') 

394 s_g_filter = " AND ".join(s_g_filter).replace('[','(').replace(']',')') 

395 d_filter = " AND ".join(d_filter).replace('[','(').replace(']',')') 

396 v_filter = " AND ".join(v_filter).replace('[','(').replace(']',')') 

397 p_filter = " AND ".join(p_filter).replace('[','(').replace(']',')') 

398 filters = { 

399 "t_filter": t_filter, 

400 "t_r_filter":t_r_filter, 

401 "s_c_filter": s_c_filter, 

402 "s_g_filter": s_g_filter, 

403 "d_filter": d_filter, 

404 "v_filter": v_filter, 

405 "p_filter": p_filter, 

406 } 

407 return limit, offset, fields, format, filters 

408 

409 

410### 

411# Rasdaman does not run stable! 

412# --> since GEO PEAS runs in DEBUG mode, its messages flood the error log file! 

413# 

414# only activate the following lines if you want to update pages! 

415### 

416# also get provenance information 

417geopeas_services = [ 'topography_srtm', 'ecoregion', 'stable_nightlights', 

418 'climatic_zone', 'nox_emissions', 'landcover', 

419 'major_road', 'population_density', 'htap_region_tier1' ] 

420provenance = {} 

421#for service in geopeas_services: 

422# result = requests.get(f"{base_geopeas_url}/{service}/").json() 

423# provenance[service] = result['provenance'] 

424# # major_road has different dict entries 

425# tmp_provenance = copy(provenance[service]) 

426# # htap_region_tier1 does not provide a dict 

427# try: 

428# keys = [x for x in result['provenance'].keys()] 

429# for key in keys: 

430# if key not in ['units', 'data_source', 'citation', 'doi']: 

431# result['provenance'].pop(key) 

432# if len(result['provenance']) == 0: 

433# provenance[service] = tmp_provenance 

434# except: 

435# continue 

436for service in geopeas_services: 

437 provenance[service] = "dummy text" 

438