Coverage for toardb/timeseries/crud.py: 79%
635 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH
2# SPDX-License-Identifier: MIT
4"""
5Create, Read, Update, Delete functionality
7"""
8from sqlalchemy import insert, select, and_, func, text
9from sqlalchemy.orm import Session, load_only
10from sqlalchemy.util import _collections
11from sqlalchemy.exc import IntegrityError
12from geoalchemy2.elements import WKBElement, WKTElement
13from fastapi import File, UploadFile
14from fastapi.responses import JSONResponse
15import datetime as dt
16import json
17from . import models
18from .models import TimeseriesChangelog, timeseries_timeseries_roles_table, \
19 timeseries_timeseries_annotations_table, s1_contributors_table
20from toardb.stationmeta.models import StationmetaCore, StationmetaGlobal
21from toardb.stationmeta.schemas import get_coordinates_from_geom, get_geom_from_coordinates, get_coordinates_from_string
22from toardb.stationmeta.crud import get_stationmeta_by_id, get_stationmeta_core, station_id_exists, get_stationmeta_changelog
23from toardb.contacts.crud import get_organisation_by_name, get_contact
24from toardb.contacts.models import Organisation, Person, Contact
25from toardb.contacts.schemas import ContactBase
26from toardb.variables.models import Variable
27from toardb.variables.crud import get_variable
28from .schemas import TimeseriesCreate, TimeseriesPatch, TimeseriesRoleNoCreate, TimeseriesRoleFields
29from toardb.utils.utils import get_value_from_str, get_str_from_value, create_filter, roles_params
30import toardb
33def clean_additional_metadata(ad_met_dict):
34 # all changes are permanent!
35 if not isinstance(ad_met_dict,dict):
36 tmp = ad_met_dict.replace('"','\\"')
37 return tmp.replace("'",'"')
38 # there is a mismatch with additional_metadata
39 additional_metadata = ad_met_dict
40 for key, value in additional_metadata.items():
41 if isinstance(value,dict):
42 for key2, value2 in value.items():
43 if isinstance(value2,str):
44 additional_metadata[key][key2] = value2.replace("'","$apostroph$")
45 elif isinstance(value,str):
46 additional_metadata[key] = value.replace("'","$apostroph$")
47 additional_metadata = str(additional_metadata).replace('"','\\"')
48 additional_metadata = str(additional_metadata).replace("'",'"')
49 additional_metadata = str(additional_metadata).replace("$apostroph$","'")
50 return additional_metadata
53def get_timeseries(db: Session, timeseries_id: int, fields: str = None):
54 if fields:
55 fields = ','.join('"{}"'.format(word) for word in fields.split(','))
56 # this command returns a tuple (which is unmutable --> problems with additional_metadata and coordinates!)
57 # also this command could not deal with reserved words used as column names (like "order")
58 resultproxy = db.execute(f'SELECT {fields} FROM timeseries WHERE id={timeseries_id} LIMIT 1')
59 db_object_dict = [ rowproxy._asdict() for rowproxy in resultproxy ][0]
60 if fields.find('additional_metadata') != -1:
61 db_object_dict['additional_metadata'] = clean_additional_metadata(db_object_dict['additional_metadata'])
62 db_object = models.Timeseries(**db_object_dict)
63 else:
64 db_object = db.query(models.Timeseries).filter(models.Timeseries.id == timeseries_id).first()
65 if db_object:
66 # only for internal use!
67 if db_object.data_license_accepted:
68 del db_object.data_license_accepted
69 if db_object.dataset_approved_by_provider:
70 del db_object.dataset_approved_by_provider
71 if db_object:
72 try:
73 # there is a mismatch with additional_metadata
74 db_object.additional_metadata = clean_additional_metadata(db_object.additional_metadata)
75 except:
76 pass
77 try:
78 # there is also a mismatch with coordinates and additional_metadata from station object
79 if isinstance(db_object.station.coordinates, (WKBElement, WKTElement)):
80 db_object.station.coordinates = get_coordinates_from_geom(db_object.station.coordinates)
81 # there is a mismatch with additional_metadata
82 if isinstance(db_object.station.additional_metadata, dict):
83 db_object.station.additional_metadata = json.dumps(db_object.station.additional_metadata)
84 except:
85 pass
86 return db_object
88# https://www.reddit.com/r/flask/comments/ypqk40/default_datetimenow_not_using_current_time/
89# dt.datetime.now(dt.timezone.utc) cannot be used as default value, since it will already be
90# evaluated at the start of the worker process and **not** when actually accessing the data
91# def get_citation(db: Session, timeseries_id: int, datetime: dt.datetime = dt.datetime.now(dt.timezone.utc)):
92def get_citation(db: Session, timeseries_id: int, datetime: dt.datetime = None):
93 if not datetime:
94 datetime = dt.datetime.now(dt.timezone.utc)
95 db_object = db.query(models.Timeseries).filter(models.Timeseries.id == timeseries_id).first()
96 # there is a mismatch with additional_metadata
97 PI = "unknown"
98 originators = False
99 attribution = None
100 if db_object:
101 pi_role = get_value_from_str(toardb.toardb.RC_vocabulary,'PrincipalInvestigator')
102 originator_role = get_value_from_str(toardb.toardb.RC_vocabulary,'Originator')
103 contributor_role = get_value_from_str(toardb.toardb.RC_vocabulary,'Contributor')
104 list_of_originators = []
105 for db_role in db_object.roles:
106 if (db_role.role == pi_role):
107 db_contact = get_contact(db, contact_id = db_role.contact_id)
108 PI = db_contact.name
109 elif (db_role.role == originator_role):
110 originators = True
111 db_contact = get_contact(db, contact_id = db_role.contact_id)
112 list_of_originators.append(db_contact.name)
113 elif (db_role.role == contributor_role):
114 db_contact = get_contact(db, contact_id = db_role.contact_id)
115 list_of_originators.append(db_contact.name)
116 list_of_data_originators = ", ".join(list_of_originators)
117 # if no PI is given, the resource provider's longname should be named
118 if PI == "unknown":
119 role = get_value_from_str(toardb.toardb.RC_vocabulary,'ResourceProvider')
120 for db_role in db_object.roles:
121 if (db_role.role == role):
122 db_contact = get_contact(db, contact_id = db_role.contact_id)
123 PI = db_contact.longname
124 if db_contact.attribution != '':
125 attribution = db_contact.attribution
126 var = get_variable(db, variable_id=db_object.variable_id).name
127 station = get_stationmeta_by_id(db, station_id=db_object.station_id).name
128 dataset_version = db_object.provider_version.strip()
129 dataset_doi = db_object.doi.strip()
130 citation = f"{PI}: time series of {var} at {station}, accessed from the TOAR database on {datetime}"
131 if attribution and list_of_data_originators:
132 attribution = attribution.format(orga_or_origs="data originators" if originators else "contributing organisations", list_of_data_originators=list_of_data_originators)
133 if dataset_version != 'N/A':
134 citation += f", original dataset version {dataset_version}"
135 if dataset_doi!= '':
136 citation += f", original dataset doi: {dataset_doi}"
137 license_txt = "This data is published under a Creative Commons Attribution 4.0 International (CC BY 4.0). https://creativecommons.org/licenses/by/4.0/"
138 return {"attribution": attribution, "citation": citation, "license": license_txt}
140def adapt_db_object(db_object, db, fields=False, lconstr_roles=False):
141 if fields:
142 db_object = dict(zip((field for field in fields if field not in {"station_changelog", "changelog"}), db_object))
144 # there is a mismatch with coordinates and additional_metadata
145 if "coordinates" in db_object:
146 db_object["coordinates"] = get_coordinates_from_string(db_object["coordinates"])
148 if "additional_metadata" in db_object:
149 db_object["additional_metadata"] = clean_additional_metadata(db_object["additional_metadata"])
151 if "station_id" in db_object:
152 station_id = {"id": db_object["station_id"]}
153 db_object["station"] = station_id
154 del db_object["station_id"]
156 if "variable_id" in db_object:
157 variable_id = {"id": db_object["variable_id"]}
158 db_object["variable"] = variable_id
159 del db_object["variable_id"]
161 if "changelog" in db_object:
162 db_object["changelog"] = get_timeseries_changelog(db, db_object["id"])
164 if "station_changelog" in db_object:
165 try:
166 db_object["station_changelog"] = get_stationmeta_changelog(db, db_object["station_id"])
167 except Exception:
168 pass
170 if lconstr_roles:
171 # example, how to put the roles explicitly (not needed at the moment)
172 # organisation = get_contact(db, contact_id=39)
173 # roles_atts["contact"] = {"id": 39, "organisation": organisation.__dict__}
174 roles_atts = {key: value for key, value in db_object.items() if key in roles_params}
175 db_object = {key: value for key, value in db_object.items() if key not in roles_params}
176 db_object["roles"] = TimeseriesRoleFields(**roles_atts)
177 else:
178 if isinstance(db_object.station.coordinates, (WKBElement, WKTElement)):
179 db_object.station.coordinates = get_coordinates_from_geom(db_object.station.coordinates)
180 # there is a mismatch with additional_metadata
181 if isinstance(db_object.station.additional_metadata, dict):
182 db_object.station.additional_metadata = json.dumps(db_object.station.additional_metadata)
183 db_object.additional_metadata = clean_additional_metadata(db_object.additional_metadata)
186 #Internall use
187 try:
188 del db_object.data_license_accepted
189 except AttributeError:
190 pass
192 try:
193 del db_object.dataset_approved_by_provider
194 except AttributeError:
195 pass
197 return db_object
199class TimeseriesQuery:
200 def __init__(self, sign, query, fields, lconstr_roles):
201 self.sign = sign
202 self.query = query
203 self.fields = fields
204 self.lconstr_roles = lconstr_roles
206 @staticmethod
207 def aggregate(querys):
208 aggregated_query = next(querys)
209 for query in querys:
210 if aggregated_query.fields != query.fields:
211 raise ValueError("Fields of subquerys are diffrent")
212 aggregated_query = TimeseriesQuery(
213 True,
214 aggregated_query.query.union(query.query)
215 if query.sign
216 else aggregated_query.query.except_(query.query),
217 aggregated_query.fields,
218 aggregated_query.lconstr_roles or query.lconstr_roles,
219 )
220 return aggregated_query
222 @staticmethod
223 def from_query_params(query_params, db, endpoint="timeseries", sign=True):
224 limit, offset, fields, format, filters = create_filter(query_params, endpoint)
225 t_filter = filters["t_filter"]
226 t_r_filter = filters["t_r_filter"]
227 s_c_filter = filters["s_c_filter"]
228 s_g_filter = filters["s_g_filter"]
230 if fields:
231 # If only certain fields are selected the return type is not a orm object anymore but a dict
232 # sort input fields to be sure to replace station_changelog before changelog
233 fields = sorted(fields.split(","), reverse=True)
234 if "role" in fields:
235 fields.remove("role")
236 fields.extend(roles_params)
238 field_map = {
239 "id": models.Timeseries.id,
240 "order": models.Timeseries.order,
241 "additional_metadata": models.Timeseries.additional_metadata,
242 "station_id": StationmetaCore.id,
243 "variable_id": Variable.id,
244 "name": StationmetaCore.name,
245 "coordinates": func.ST_AsText(StationmetaCore.coordinates),
246 "station_country": StationmetaCore.country,
247 "station_additional_metadata": StationmetaCore.additional_metadata
248 }
250 query_select = [field_map.get(field, text(field)) for field in fields]
252 else:
253 query_select = [models.Timeseries]
255 if 'station_id' in fields and not id in fields:
256 query = (
257 db.query(*query_select)
258 .select_from(models.Timeseries)
259 .distinct()
260 .filter(text(t_filter), text(s_c_filter), text(s_g_filter), text(t_r_filter))
261 .join(StationmetaCore)
262 .join(StationmetaGlobal)
263 .join(timeseries_timeseries_roles_table)
264 .join(models.TimeseriesRole)
265 .join(Contact)
266 .join(Organisation)
267 .join(Person)
268 .join(Variable)
269 .execution_options(stream_results=True)
270 )
271 else:
272 query = (
273 db.query(*query_select)
274 .select_from(models.Timeseries)
275 .distinct()
276 .filter(text(t_filter), text(s_c_filter), text(s_g_filter), text(t_r_filter))
277 .join(StationmetaCore)
278 .join(StationmetaGlobal)
279 .join(timeseries_timeseries_roles_table)
280 .join(models.TimeseriesRole)
281 .join(Contact)
282 .join(Organisation)
283 .join(Person)
284 .join(Variable)
285 .execution_options(stream_results=True)
286 .order_by(models.Timeseries.id)
287 )
289 # Apply NOT filter with role logic
290 if "NOT" in t_r_filter:
291 role_ids = get_role_id_from_string(db, query_params.get("has_role")[1:])
292 query = query.filter(
293 ~models.Timeseries.id.in_(
294 select(timeseries_timeseries_roles_table.c.timeseries_id).where(
295 timeseries_timeseries_roles_table.c.role_id.in_(role_ids)
296 )
297 )
298 )
300 if limit:
301 query = query.limit(limit).offset(offset)
303 return TimeseriesQuery(sign, query, fields, lconstr_roles=any(field in roles_params for field in fields))
305 def adapt_objects(self, db):
306 return [adapt_db_object(db_object, db, self.fields, self.lconstr_roles) for db_object in self.query]
309def search_all(db, path_params, query_params, lts=False, endpoint=None):
310 if endpoint == None:
311 endpoint = "timeseries" if lts else "search"
313 try:
314 ts_list = TimeseriesQuery.from_query_params(query_params, db, endpoint).adapt_objects(db)
315 # remove duplicates
316 if ts_list and isinstance(ts_list[0], dict):
317 try:
318 ts_set = set(json.dumps(ts, sort_keys=True) for ts in ts_list)
319 return [json.loads(ts) for ts in ts_set]
320 except: # not correct, because duplicates are NOT removed!!!
321 return ts_list
322 else:
323 return ts_list
324 except (KeyError, ValueError) as e:
325 status_code = 400
326 return JSONResponse(status_code=status_code, content=str(e))
329def search_all_aggregation(db, path_params, signs, query_params_list, lts=False):
330 endpoint = "timeseries" if lts else "search"
332 try:
333 ts_list = TimeseriesQuery.aggregate(
334 TimeseriesQuery.from_query_params(query_params, db, endpoint, sign)
335 for sign, query_params in zip(signs, query_params_list)
336 ).adapt_objects(db)
337 # remove duplicates
338 if ts_list and isinstance(ts_list[0], dict):
339 try:
340 ts_set = set(json.dumps(ts, sort_keys=True) for ts in ts_list)
341 return [json.loads(ts) for ts in ts_set]
342 except: # not correct, because duplicates are NOT removed!!!
343 return ts_list
344 else:
345 return ts_list
346 except (KeyError, ValueError) as e:
347 status_code = 400
348 return JSONResponse(status_code=status_code, content=str(e))
352def get_timeseries_by_unique_constraints(db: Session, station_id: int, variable_id: int, resource_provider: str = None,
353 sampling_frequency: str = None, provider_version: str = None, data_origin_type: str = None,
354 data_origin: str = None, sampling_height: float = None, label: str = None):
355 """
356 Criteria taken from TOAR_TG_Vol02_Data_Processing, 'Step 14: Identify Time Series'
357 Criterion 14.1: id of the corresponding station
358 Criterion 14.2: variable id
359 Criterion 14.3: role: resource_provider (organisation)
360 Criterion 14.4: sampling_frequency
361 Criterion 14.5: version number
362 Criterion 14.6: data_origin_type (measurement or model)
363 Criterion 14.7: data origin
364 Criterion 14.8: sampling height
365 Criterion 14.9: data filtering procedures or other special dataset identifiers (use database field 'label')
366 """
368# print("in get_timeseries_by_unique_constraints")
369# print(f"station_id: {station_id}, variable_id: {variable_id}, resource_provider: {resource_provider}, ", \
370# f"sampling_frequency: {sampling_frequency}, provider_version: {provider_version}, data_origin_type: {data_origin_type}, ", \
371# f"data_origin: {sampling_frequency}, sampling_height: {sampling_height}, label: {label}")
373 # filter for criterion 14.1 and 14.2
374 ret_db_object = db.query(models.Timeseries).filter(models.Timeseries.station_id == station_id) \
375 .filter(models.Timeseries.variable_id == variable_id).all()
376 # if already not found: return None
377 # if only one single object is found, it has to be checked whether all criterions are fullfilled
378 if len(ret_db_object) == 0:
379 return None
382 # filter for criterion 14.3
383 if resource_provider:
384 # issue with '/' in organisation longname ==> only possible with double encoding!
385 resource_provider = resource_provider.replace('%2F', '/')
386 role_num = get_value_from_str(toardb.toardb.RC_vocabulary,'ResourceProvider')
387 iter_obj = ret_db_object.copy()
388 counter=0
389 for db_object in iter_obj:
390 found = False
391 for role in db_object.roles:
392 # resource provider is always an organisation!
393 organisation = get_contact(db, contact_id=role.contact_id)
394 if ((role_num == role.role) and (organisation.longname == resource_provider)):
395 found = True
396 if not found:
397 ret_db_object.pop(counter)
398 else:
399 counter += 1
400 else:
401 # time series that do not have a resource_provider are not identical to those who do not!
402 role_num = get_value_from_str(toardb.toardb.RC_vocabulary,'ResourceProvider')
403 iter_obj = ret_db_object.copy()
404 counter=0
405 for db_object in iter_obj:
406 found = False
407 for role in db_object.roles:
408 if (role_num == role.role):
409 found = True
410 if found:
411 ret_db_object.pop(counter)
412 else:
413 counter += 1
416 # if already only none object --> return
417 # if only one single object is found, it has to be checked whether all criterions are fullfilled
418 if len(ret_db_object) == 0:
419 return None
421 # filter for criterion 14.4
422 if sampling_frequency:
423 iter_obj = ret_db_object.copy()
424 counter=0
425 for db_object in iter_obj:
426 if not (db_object.sampling_frequency == sampling_frequency):
427 ret_db_object.pop(counter)
428 else:
429 counter += 1
431 # if already only none object --> return
432 # if only one single object is found, it has to be checked whether all criterions are fullfilled
433 if len(ret_db_object) == 0:
434 return None
436 # filter for criterion 14.5
437 if provider_version:
438 iter_obj = ret_db_object.copy()
439 counter=0
440 for db_object in iter_obj:
441 if not (db_object.provider_version == provider_version):
442 ret_db_object.pop(counter)
443 else:
444 counter += 1
446 # if already only none object --> return
447 # if only one single object is found, it has to be checked whether all criterions are fullfilled
448 if len(ret_db_object) == 0:
449 return None
451 # filter for criterion 14.6
452 if data_origin_type:
453 data_origin_type_num = get_value_from_str(toardb.toardb.OT_vocabulary,data_origin_type)
454 iter_obj = ret_db_object.copy()
455 counter=0
456 for db_object in iter_obj:
457 if not (db_object.data_origin_type == data_origin_type_num):
458 ret_db_object.pop(counter)
459 else:
460 counter += 1
462 # if already only none object --> return
463 # if only one single object is found, it has to be checked whether all criterions are fullfilled
464 if len(ret_db_object) == 0:
465 return None
467 # filter for criterion 14.7
468 if data_origin:
469 data_origin_num = get_value_from_str(toardb.toardb.DO_vocabulary,data_origin)
470 iter_obj = ret_db_object.copy()
471 counter=0
472 for db_object in iter_obj:
473 if not (db_object.data_origin == data_origin_num):
474 ret_db_object.pop(counter)
475 else:
476 counter += 1
478 # if already only none object --> return
479 # if only one single object is found, it has to be checked whether all criterions are fullfilled
480 if len(ret_db_object) == 0:
481 return None
483 # filter for criterion 14.8
484 if sampling_height:
485 iter_obj = ret_db_object.copy()
486 counter=0
487 for db_object in iter_obj:
488 if not (db_object.sampling_height == sampling_height):
489 ret_db_object.pop(counter)
490 else:
491 counter += 1
493 # if already only none object --> return
494 # if only one single object is found, it has to be checked whether all criterions are fullfilled
495 if len(ret_db_object) == 0:
496 return None
498 # filter for criterion 14.9
499 if label:
500 iter_obj = ret_db_object.copy()
501 counter=0
502 for db_object in iter_obj:
503 if not (db_object.label == label):
504 ret_db_object.pop(counter)
505 else:
506 counter += 1
508 # check that only one object is left!!!
509 # adapt mismatches for return value
510 if len(ret_db_object) == 0:
511 ret_db_object = None
512 else:
513 if len(ret_db_object) == 1:
514 ret_db_object = ret_db_object[0]
515 # there is a mismatch with additional_metadata
516 ret_db_object.additional_metadata = clean_additional_metadata(ret_db_object.additional_metadata)
517 # there is also a mismatch with coordinates and additional_metadata from station object
518 if isinstance(ret_db_object.station.coordinates, (WKBElement, WKTElement)):
519 ret_db_object.station.coordinates = get_coordinates_from_geom(ret_db_object.station.coordinates)
520 # there is a mismatch with additional_metadata
521 if isinstance(ret_db_object.station.additional_metadata, dict):
522 ret_db_object.station.additional_metadata = json.dumps(ret_db_object.station.additional_metadata)
523 else:
524 status_code=405
525 message=f"Timeseries not unique, more criteria need to be defined."
526 return JSONResponse(status_code=status_code, content=message)
528 return ret_db_object
531def get_timeseries_changelog(db: Session, timeseries_id: int):
532 return db.query(models.TimeseriesChangelog).filter(models.TimeseriesChangelog.timeseries_id == timeseries_id).all()
535def get_timeseries_programme(db: Session, name: str):
536 return db.query(models.TimeseriesProgramme).filter(models.TimeseriesProgramme.name == name).all()
539# is this internal, or should this also go to public REST api?
540# do we need this at all?
541def get_role_ids_of_timeseries(db: Session, timeseries_id: int):
542 db_objects = db.query(models.TimeseriesTimeseriesRoles) \
543 .filter(models.TimeseriesTimeseriesRoles.timeseries_id == timeseries_id) \
544 .all()
545 return db_objects
548# is this internal, or should this also go to public REST api?
549def get_unique_timeseries_role(db: Session, role: int, contact_id: int, status: int):
550 db_object = db.query(models.TimeseriesRole).filter(models.TimeseriesRole.role == role) \
551 .filter(models.TimeseriesRole.contact_id == contact_id) \
552 .filter(models.TimeseriesRole.status == status) \
553 .first()
554 return db_object
557def get_role_id_from_string(db: Session, role_string: str):
558 sql_command = f"""
559 SELECT distinct(r.id) FROM timeseries_timeseries_roles tr,
560 timeseries_roles r,
561 contacts c,
562 organisations o,
563 persons p
564 WHERE (o.longname IN ('{role_string}') OR
565 o.name IN ('{role_string}') OR
566 o.city IN ('{role_string}') OR
567 o.homepage IN ('{role_string}') OR
568 o.contact_url IN ('{role_string}') OR
569 p.email IN ('{role_string}') OR
570 p.name IN ('{role_string}') OR
571 p.orcid IN ('{role_string}')) AND
572 tr.role_id=r.id AND
573 r.contact_id=c.id AND
574 c.person_id=p.id AND
575 c.organisation_id=o.id
576 """
577 resultproxy = db.execute(sql_command)
578 id_list = [ rowproxy._asdict()['id'] for rowproxy in resultproxy ]
579 return id_list
581# is this internal, or should this also go to public REST api?
582def get_unique_timeseries_programme(db: Session, name: str, homepage: str):
583 db_object = db.query(models.TimeseriesProgramme).filter(models.TimeseriesProgramme.name == name) \
584 .filter(models.TimeseriesProgramme.homepage == homepage) \
585 .first()
586 return db_object
589# is this internal, or should this also go to public REST api?
590def get_unique_timeseries_annotation(db: Session, text: str, contributor_id: int):
591 db_object = db.query(models.TimeseriesAnnotation).filter(models.TimeseriesAnnotation.text == text) \
592 .filter(models.TimeseriesAnnotation.contributor_id == contributor_id) \
593 .first()
594 return db_object
597def get_contributors_string(programmes, roles):
598 # sort every section alphabetically and have sub-section titles
599 # programmes are already unique, but they still need to be sorted
600 resultp = "programmes: " + ";".join(sorted([programme.longname for programme in programmes])) if len(programmes) > 0 else ''
601 # organisations might contain duplicates
602 organisations = set()
603 [ organisations.add(role.contact.organisation.longname) for role in roles ]
604 # eliminate dummy organisation
605 organisations.discard('')
606 resulto = "organisations: " + ";".join(sorted(organisations)) if len(organisations) > 0 else ''
607 # persons might contain duplicates
608 persons = set()
609 [ persons.add(role.contact.person.name) for role in roles ]
610 # eliminate dummy person
611 persons.discard('')
612 resultc = "persons:" + ";".join(sorted(persons)) if len(persons) > 0 else ''
613 result = resultp if resultp != '' else ''
614 if resulto != '':
615 result = resulto if result == '' else result + '; ' + resulto
616 if resultc != '':
617 result = resultc if result == '' else result + '; ' + resultc
618 return result
621def get_contributors_list(db: Session, timeseries_ids, format: str = 'text'):
622 # get time series' programmes
623 # join(models.Timeseries, Timeseries.programme_id == TimeseriesProgramme.id) is implicit given due to the foreign key
624 programmes = db.query(models.TimeseriesProgramme) \
625 .join(models.Timeseries) \
626 .filter(models.Timeseries.id.in_(timeseries_ids), models.TimeseriesProgramme.id != 0).all()
627 # get all time series roles (with duplicates)
628 roles = db.execute(select([timeseries_timeseries_roles_table]).where(timeseries_timeseries_roles_table.c.timeseries_id.in_(timeseries_ids)))
629 # eliminate duplicates
630 role_ids = set()
631 [ role_ids.add(role.role_id) for role in roles ]
632 roles = db.query(models.TimeseriesRole).filter(models.TimeseriesRole.id.in_(role_ids)).all()
633 # return both programmes and roles
634 if format == 'text':
635 result = get_contributors_string(programmes, roles)
636 elif format == 'json':
637 result = programmes + roles
638 else:
639 status_code=400
640 message=f"not a valid format: {format}"
641 result = JSONResponse(status_code=status_code, content=message)
642 return result
645def get_request_contributors(db: Session, format: str = 'text', input_handle: UploadFile = File(...)):
646 f = input_handle.file
647 timeseries_ids = [int(line.strip()) for line in f.readlines()]
648 return get_contributors_list(db, timeseries_ids, format)
651def get_registered_request_contributors(db: Session, rid, format: str = 'text'):
652 try:
653 timeseries_ids = db.execute(select([s1_contributors_table]).\
654 where(s1_contributors_table.c.request_id == rid)).mappings().first()['timeseries_ids']
655 return get_contributors_list(db, timeseries_ids, format)
656 except:
657 status_code=400
658 message=f"not a registered request id: {rid}"
659 return JSONResponse(status_code=status_code, content=message)
662def register_request_contributors(db: Session, rid, ids):
663 try:
664 db.execute(insert(s1_contributors_table).values(request_id=rid, timeseries_ids=ids))
665 db.commit()
666 status_code = 200
667 message=f'{rid} successfully registered.'
668 except IntegrityError as e:
669 error_code = e.orig.pgcode
670 if error_code == "23505":
671 status_code = 443
672 message=f'{rid} already registered.'
673 else:
674 status_code = 442
675 message=f'database error: error_code'
676 result = JSONResponse(status_code=status_code, content=message)
677 return result
680# is this internal, or should this also go to public REST api?
681def get_timeseries_roles(db: Session, timeseries_id: int):
682 return db.execute(select([timeseries_timeseries_roles_table]).where(timeseries_timeseries_roles_table.c.timeseries_id == timeseries_id))
685# is this internal, or should this also go to public REST api?
686def get_timeseries_role_by_id(db: Session, role_id):
687 return db.query(models.TimeseriesRole).filter(models.TimeseriesRole.id == role_id).first()
690# is this internal, or should this also go to public REST api?
691def get_timeseries_annotations(db: Session, timeseries_id: int):
692 return db.execute(select([timeseries_timeseries_annotations_table]).where(timeseries_timeseries_annotations_table.c.timeseries_id == timeseries_id))
695# is this internal, or should this also go to public REST api?
696def get_timeseries_annotation_by_id(db: Session, annotation_id):
697 return db.query(models.TimeseriesAnnotation).filter(models.TimeseriesAnnotation.id == annotation_id).first()
700def create_timeseries(db: Session, timeseries: TimeseriesCreate, author_id: int):
701 timeseries_dict = timeseries.dict()
702 # no timeseries can be created, if station_id or variable_id are not found in the database
703 if not station_id_exists(db,timeseries.station_id):
704 status_code=440
705 message=f"Station (station_id: {timeseries.station_id}) not found in database."
706 return JSONResponse(status_code=status_code, content=message)
707 if timeseries_dict['additional_metadata']:
708 for key, value in timeseries_dict['additional_metadata'].items():
709 if isinstance(value,dict):
710 for key2, value2 in value.items():
711 timeseries_dict['additional_metadata'][key][key2] = value2.replace("''","'")
712 else:
713 timeseries_dict['additional_metadata'][key] = value.replace("''","'")
714 if 'absorption_cross_section' in timeseries_dict['additional_metadata']:
715 value = get_value_from_str(toardb.toardb.CS_vocabulary,timeseries_dict['additional_metadata']['absorption_cross_section'])
716 timeseries_dict['additional_metadata']['absorption_cross_section'] = value
717 if 'calibration_type' in timeseries_dict['additional_metadata']:
718 value = get_value_from_str(toardb.toardb.CT_vocabulary,timeseries_dict['additional_metadata']['calibration_type'])
719 timeseries_dict['additional_metadata']['calibration_type'] = value
720 if 'sampling_type' in timeseries_dict['additional_metadata']:
721 value = get_value_from_str(toardb.toardb.KS_vocabulary,timeseries_dict['additional_metadata']['sampling_type'])
722 timeseries_dict['additional_metadata']['sampling_type'] = value
723 db_variable = get_variable(db,timeseries.variable_id)
724 if not db_variable:
725 status_code=441
726 message=f"Variable (variable_id: {timeseries.variable_id}) not found in database."
727 return JSONResponse(status_code=status_code, content=message)
728 # for networks: we do not want data without a resource_provider (organisation)
729 resource_provider = None
730 if timeseries.roles:
731 for role in timeseries.roles:
732 if role.role == 'ResourceProvider':
733 # resource provider is always an organisation!
734 organisation = get_contact(db, contact_id=role.contact_id)
735 if organisation:
736 resource_provider=organisation.longname
737 else:
738 status_code=442
739 message=f"Resource provider (contact_id: {role.contact_id}) not found in database."
740 return JSONResponse(status_code=status_code, content=message)
741 db_timeseries = get_timeseries_by_unique_constraints(db, station_id=timeseries.station_id,
742 variable_id=timeseries.variable_id, label=timeseries.label,
743 provider_version=timeseries.provider_version, resource_provider=resource_provider)
744 if db_timeseries:
745 if isinstance(db_timeseries, list):
746 status_code=444
747 message = {"detail":{"message":"Given constraints match more than one timeseries.",
748 "timeseries_ids": [ db_timeseries[i].id for i in range(len(db_timeseries)) ]}}
749 return JSONResponse(status_code=status_code, content=message)
750 else:
751 status_code=443
752 message = {"detail":{"message":"Timeseries already registered.","timeseries_id":db_timeseries.id}}
753 return JSONResponse(status_code=status_code, content=message)
754 roles_data = timeseries_dict.pop('roles', None)
755 annotations_data = timeseries_dict.pop('annotations', None)
756 db_timeseries = models.Timeseries(**timeseries_dict)
757 # there is a mismatch with additional_metadata
758 # in upload command, we have now: "additional_metadata": "{}"
759 # but return from this method gives (=database): "additional_metadata": {}
760# print(db_timeseries.additional_metadata)
761# db_timeseries.additional_metadata = json.loads(str(db_timeseries.additional_metadata).replace("'",'"'))
762 db_timeseries.sampling_frequency = get_value_from_str(toardb.toardb.SF_vocabulary,db_timeseries.sampling_frequency)
763 db_timeseries.aggregation = get_value_from_str(toardb.toardb.AT_vocabulary,db_timeseries.aggregation)
764 db_timeseries.data_origin_type = get_value_from_str(toardb.toardb.OT_vocabulary,db_timeseries.data_origin_type)
765 db_timeseries.data_origin = get_value_from_str(toardb.toardb.DO_vocabulary,db_timeseries.data_origin)
766 db.add(db_timeseries)
767 result = db.commit()
768 db.refresh(db_timeseries)
769 # get timeseries_id
770 timeseries_id = db_timeseries.id
771 # store roles and update association table
772 if roles_data:
773 for r in roles_data:
774 db_role = models.TimeseriesRole(**r)
775 db_role.role = get_value_from_str(toardb.toardb.RC_vocabulary,db_role.role)
776 db_role.status = get_value_from_str(toardb.toardb.RS_vocabulary,db_role.status)
777 db_object = get_unique_timeseries_role(db, db_role.role, db_role.contact_id, db_role.status)
778 if db_object:
779 role_id = db_object.id
780 else:
781 # Something is going wrong here!
782 # Is the model TimeseriesRole correctly defined?!
783 del db_role.contact
784 db.add(db_role)
785 db.commit()
786 db.refresh(db_role)
787 role_id = db_role.id
788 db.execute(insert(timeseries_timeseries_roles_table).values(timeseries_id=timeseries_id, role_id=role_id))
789 db.commit()
790 # store annotations and update association table
791 if annotations_data:
792 for a in annotations_data:
793 db_annotation = models.TimeseriesAnnotation(**a)
794 # check whether annotation is already present in database
795 db_object = get_unique_timeseries_annotation(db, db_annotation.text, db_annotation.contributor_id)
796 if db_object:
797 annotation_id = db_object.id
798 else:
799 db.add(db_annotation)
800 db.commit()
801 db.refresh(db_annotation)
802 annotation_id = db_annotation.id
803 db.execute(insert(timeseries_timeseries_annotations_table).values(timeseries_id=timeseries_id, annotation_id=annotation_id))
804 db.commit()
805 # create changelog entry
806 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"Created")
807 description="time series created"
808 db_changelog = TimeseriesChangelog(description=description, timeseries_id=timeseries_id, author_id=author_id, type_of_change=type_of_change,
809 old_value='', new_value='')
810 db.add(db_changelog)
811 db.commit()
812 status_code=200
813 message = {"detail":{"message":"New timeseries created.","timeseries_id":db_timeseries.id}}
814 return JSONResponse(status_code=status_code, content=message)
815# return db_timeseries
818def patch_timeseries(db: Session, description: str, timeseries_id: int, timeseries: TimeseriesPatch,
819 author_id: int):
820 timeseries_dict = timeseries.dict()
821 # check for controlled vocabulary in additional metadata
822 # do this in dictionary that always contains additional_metadata entry
823 if timeseries_dict['additional_metadata']:
824 for key, value in timeseries_dict['additional_metadata'].items():
825 if isinstance(value,dict):
826 for key2, value2 in value.items():
827 timeseries_dict['additional_metadata'][key][key2] = value2.replace("''","'")
828 else:
829 timeseries_dict['additional_metadata'][key] = value.replace("''","'")
830 if 'absorption_cross_section' in timeseries_dict['additional_metadata']:
831 value = get_value_from_str(toardb.toardb.CS_vocabulary,timeseries_dict['additional_metadata']['absorption_cross_section'])
832 timeseries_dict['additional_metadata']['absorption_cross_section'] = value
833 if 'calibration_type' in timeseries_dict['additional_metadata']:
834 value = get_value_from_str(toardb.toardb.CT_vocabulary,timeseries_dict['additional_metadata']['calibration_type'])
835 timeseries_dict['additional_metadata']['calibration_type'] = value
836 if 'sampling_type' in timeseries_dict['additional_metadata']:
837 value = get_value_from_str(toardb.toardb.KS_vocabulary,timeseries_dict['additional_metadata']['sampling_type'])
838 timeseries_dict['additional_metadata']['sampling_type'] = value
839 # delete empty fields from timeseries_dict already at this place, to be able to
840 # distinguish between "single value correction in metadata" and "comprehensive metadata revision"
841 # (see controlled vocabulary "CL_vocabulary")
842 roles_data = timeseries_dict.pop('roles', None)
843 annotations_data = timeseries_dict.pop('annotations', None)
844 timeseries_dict2 = {k: v for k, v in timeseries_dict.items() if v is not None}
845 number_of_elements = len(timeseries_dict2)
846 if roles_data:
847 number_of_elements +=1
848 if annotations_data:
849 number_of_elements +=1
850 if (number_of_elements == 1):
851 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"SingleValue")
852 else:
853 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"Comprehensive")
854 db_obj = models.Timeseries(**timeseries_dict2)
855# also the sqlalchemy get will call get_timeseries here!!!
856# --> therefore call it right away
857 db_timeseries = get_timeseries(db, timeseries_id, fields="dataset_approved_by_provider,data_license_accepted")
858 dataset_approved_by_provider = db_timeseries.dataset_approved_by_provider
859 data_license_accepted = db_timeseries.data_license_accepted
860 db_timeseries = get_timeseries(db, timeseries_id)
861 # for some unknown reasons, format of coordinates and additional_metadata (for timeseries, but also for its related station!) have changed!
862 db.rollback()
863 db_timeseries.dataset_approved_by_provider = dataset_approved_by_provider
864 db_timeseries.data_license_accepted = data_license_accepted
865 # still problems with coordinates and additional metadata (from STATION!!!)...
866 try:
867 db_timeseries.station.coordinates = get_geom_from_coordinates(db_timeseries.station.coordinates)
868 except:
869 pass
870# try:
871# db_timeseries.station.additional_metadata = json.loads(str(db_timeseries.station.additional_metadata).replace("'",'"'))
872# except:
873# pass
874 # prepare changelog entry/entries
875 no_log = (description == 'NOLOG')
876 if not no_log:
877 old_values={}
878 new_values={}
879 for k, v in timeseries_dict2.items():
880 field=str(getattr(db_timeseries,k))
881 if k == 'additional_metadata':
882 old_values[k] = db_timeseries.additional_metadata
883 else:
884 if k == "sampling_frequency":
885 old_values[k] = get_str_from_value(toardb.toardb.SF_vocabulary, int(field))
886 elif k == "aggregation":
887 old_values[k] = get_str_from_value(toardb.toardb.AT_vocabulary, int(field))
888 elif k == "data_origin":
889 old_values[k] = get_str_from_value(toardb.toardb.DO_vocabulary, int(field))
890 elif k == "data_origin_type":
891 old_values[k] = get_str_from_value(toardb.toardb.OT_vocabulary, int(field))
892 else:
893 old_values[k] = field
894 for k, v in timeseries_dict2.items():
895 setattr(db_timeseries,k,timeseries_dict[k])
896 # there is a mismatch with additional_metadata
897 # in upload command, we have now: "additional_metadata": "{}"
898 # but return from this method gives (=database): "additional_metadata": {}
899# if timeseries_dict['additional_metadata']:
900# db_timeseries.additional_metadata = clean_additional_metadata(db_timeseries.additional_metadata)
901 # do the following for every entry that uses the controlled vocabulary!
902 # this should be improved!
903 if db_obj.sampling_frequency:
904 db_timeseries.sampling_frequency = get_value_from_str(toardb.toardb.SF_vocabulary, db_obj.sampling_frequency)
905 if db_obj.aggregation:
906 db_timeseries.aggregation = get_value_from_str(toardb.toardb.AT_vocabulary, db_obj.aggregation)
907 if db_obj.data_origin:
908 db_timeseries.data_origin = get_value_from_str(toardb.toardb.DO_vocabulary, db_obj.data_origin)
909 if db_obj.data_origin_type:
910 db_timeseries.data_origin_type = get_value_from_str(toardb.toardb.OT_vocabulary, db_obj.data_origin_type)
911 db.add(db_timeseries)
912 result = db.commit()
913 # store roles and update association table
914 if roles_data:
915 if not no_log:
916 # prepare changelog entry/entries
917 description = description + f"; add role"
918 db_old_roles = get_timeseries_roles(db, timeseries_id)
919 old_roles = []
920 for oldr in db_old_roles:
921 old_role = get_timeseries_role_by_id(db, oldr.role_id)
922 old_value = {}
923 old_value['role'] = get_str_from_value(toardb.toardb.RC_vocabulary,old_role.role)
924 old_value['status'] = get_str_from_value(toardb.toardb.RS_vocabulary,old_role.status)
925 old_value['contact_id'] = old_role.contact_id
926 old_roles.append(old_value)
927 old_values['roles'] = old_roles
928 new_roles = old_roles.copy()
929 for r in roles_data:
930 db_role = models.TimeseriesRole(**r)
931 if not no_log:
932 new_roles.append(r)
933 db_role.role = get_value_from_str(toardb.toardb.RC_vocabulary,db_role.role)
934 db_role.status = get_value_from_str(toardb.toardb.RS_vocabulary,db_role.status)
935 # check whether role is already present in database
936 db_object = get_unique_timeseries_role(db, db_role.role, db_role.contact_id, db_role.status)
937 if db_object:
938 role_id = db_object.id
939 else:
940 db.add(db_role)
941 db.commit()
942 db.refresh(db_role)
943 role_id = db_role.id
944 db.execute(insert(timeseries_timeseries_roles_table).values(timeseries_id=timeseries_id, role_id=role_id))
945 db.commit()
946 if not no_log:
947 new_values['roles'] = new_roles
948 # store annotations and update association table
949 if annotations_data:
950 if not no_log:
951 # prepare changelog entry/entries
952 description = description + f"; add annotation"
953 db_old_annotations = get_timeseries_annotations(db, timeseries_id)
954 old_annotations = []
955 for olda in db_old_annotations:
956 old_annotation = get_timeseries_annotation_by_id(db, olda.annotation_id)
957 old_value = {}
958 old_value['kind'] = get_str_from_value(toardb.toardb.RC_vocabulary,old_role.kind)
959 old_value['text'] = get_str_from_value(toardb.toardb.RS_vocabulary,old_role.status)
960 old_value['date_added'] = get_str_from_value(toardb.toardb.RS_vocabulary,old_role.status)
961 old_value['approved'] = get_str_from_value(toardb.toardb.RS_vocabulary,old_role.status)
962 old_value['contributor_id'] = str(old_role.contact_id)
963 old_annotations.append(old_value)
964 old_values['annotations'] = old_annotations
965 new_annotations = []
966 for a in annotations_data:
967 db_annotation = models.TimeseriesAnnotation(**a)
968 # check whether annotation is already present in database
969 if not no_log:
970 new_annotations.append(a)
971 db_annotation.kind = get_value_from_str(toardb.toardb.AK_vocabulary,db_annotation.kind)
972 db_object = get_unique_timeseries_annotation(db, db_annotation.text, db_annotation.contributor_id)
973 if db_object:
974 annotation_id = db_object.id
975 else:
976 db.add(db_annotation)
977 db.commit()
978 db.refresh(db_annotation)
979 annotation_id = db_annotation.id
980 db.execute(insert(timeseries_timeseries_annotations_table).values(timeseries_id=timeseries_id, annotation_id=annotation_id))
981 db.commit()
982 if not no_log:
983 new_values['annotations'] = new_annotations
984 # add patch to changelog table
985 if not no_log:
986 if new_values:
987 timeseries_dict2.update(new_values)
988 db_changelog = TimeseriesChangelog(description=description, timeseries_id=timeseries_id, author_id=author_id, type_of_change=type_of_change,
989 old_value=str(old_values), new_value=str(timeseries_dict2))
990 db.add(db_changelog)
991 db.commit()
992 status_code=200
993 message = {"detail":{"message":"timeseries patched.","timeseries_id":db_timeseries.id}}
994 return JSONResponse(status_code=status_code, content=message)