Coverage for toardb/timeseries/crud.py: 79%

635 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 20:32 +0000

1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH 

2# SPDX-License-Identifier: MIT 

3 

4""" 

5Create, Read, Update, Delete functionality 

6 

7""" 

8from sqlalchemy import insert, select, and_, func, text 

9from sqlalchemy.orm import Session, load_only 

10from sqlalchemy.util import _collections 

11from sqlalchemy.exc import IntegrityError 

12from geoalchemy2.elements import WKBElement, WKTElement 

13from fastapi import File, UploadFile 

14from fastapi.responses import JSONResponse 

15import datetime as dt 

16import json 

17from . import models 

18from .models import TimeseriesChangelog, timeseries_timeseries_roles_table, \ 

19 timeseries_timeseries_annotations_table, s1_contributors_table 

20from toardb.stationmeta.models import StationmetaCore, StationmetaGlobal 

21from toardb.stationmeta.schemas import get_coordinates_from_geom, get_geom_from_coordinates, get_coordinates_from_string 

22from toardb.stationmeta.crud import get_stationmeta_by_id, get_stationmeta_core, station_id_exists, get_stationmeta_changelog 

23from toardb.contacts.crud import get_organisation_by_name, get_contact 

24from toardb.contacts.models import Organisation, Person, Contact 

25from toardb.contacts.schemas import ContactBase 

26from toardb.variables.models import Variable 

27from toardb.variables.crud import get_variable 

28from .schemas import TimeseriesCreate, TimeseriesPatch, TimeseriesRoleNoCreate, TimeseriesRoleFields 

29from toardb.utils.utils import get_value_from_str, get_str_from_value, create_filter, roles_params 

30import toardb 

31 

32 

33def clean_additional_metadata(ad_met_dict): 

34 # all changes are permanent! 

35 if not isinstance(ad_met_dict,dict): 

36 tmp = ad_met_dict.replace('"','\\"') 

37 return tmp.replace("'",'"') 

38 # there is a mismatch with additional_metadata 

39 additional_metadata = ad_met_dict 

40 for key, value in additional_metadata.items(): 

41 if isinstance(value,dict): 

42 for key2, value2 in value.items(): 

43 if isinstance(value2,str): 

44 additional_metadata[key][key2] = value2.replace("'","$apostroph$") 

45 elif isinstance(value,str): 

46 additional_metadata[key] = value.replace("'","$apostroph$") 

47 additional_metadata = str(additional_metadata).replace('"','\\"') 

48 additional_metadata = str(additional_metadata).replace("'",'"') 

49 additional_metadata = str(additional_metadata).replace("$apostroph$","'") 

50 return additional_metadata 

51 

52 

53def get_timeseries(db: Session, timeseries_id: int, fields: str = None): 

54 if fields: 

55 fields = ','.join('"{}"'.format(word) for word in fields.split(',')) 

56 # this command returns a tuple (which is unmutable --> problems with additional_metadata and coordinates!) 

57 # also this command could not deal with reserved words used as column names (like "order") 

58 resultproxy = db.execute(f'SELECT {fields} FROM timeseries WHERE id={timeseries_id} LIMIT 1') 

59 db_object_dict = [ rowproxy._asdict() for rowproxy in resultproxy ][0] 

60 if fields.find('additional_metadata') != -1: 

61 db_object_dict['additional_metadata'] = clean_additional_metadata(db_object_dict['additional_metadata']) 

62 db_object = models.Timeseries(**db_object_dict) 

63 else: 

64 db_object = db.query(models.Timeseries).filter(models.Timeseries.id == timeseries_id).first() 

65 if db_object: 

66 # only for internal use! 

67 if db_object.data_license_accepted: 

68 del db_object.data_license_accepted 

69 if db_object.dataset_approved_by_provider: 

70 del db_object.dataset_approved_by_provider 

71 if db_object: 

72 try: 

73 # there is a mismatch with additional_metadata 

74 db_object.additional_metadata = clean_additional_metadata(db_object.additional_metadata) 

75 except: 

76 pass 

77 try: 

78 # there is also a mismatch with coordinates and additional_metadata from station object 

79 if isinstance(db_object.station.coordinates, (WKBElement, WKTElement)): 

80 db_object.station.coordinates = get_coordinates_from_geom(db_object.station.coordinates) 

81 # there is a mismatch with additional_metadata 

82 if isinstance(db_object.station.additional_metadata, dict): 

83 db_object.station.additional_metadata = json.dumps(db_object.station.additional_metadata) 

84 except: 

85 pass 

86 return db_object 

87 

88# https://www.reddit.com/r/flask/comments/ypqk40/default_datetimenow_not_using_current_time/ 

89# dt.datetime.now(dt.timezone.utc) cannot be used as default value, since it will already be 

90# evaluated at the start of the worker process and **not** when actually accessing the data 

91# def get_citation(db: Session, timeseries_id: int, datetime: dt.datetime = dt.datetime.now(dt.timezone.utc)): 

92def get_citation(db: Session, timeseries_id: int, datetime: dt.datetime = None): 

93 if not datetime: 

94 datetime = dt.datetime.now(dt.timezone.utc) 

95 db_object = db.query(models.Timeseries).filter(models.Timeseries.id == timeseries_id).first() 

96 # there is a mismatch with additional_metadata 

97 PI = "unknown" 

98 originators = False 

99 attribution = None 

100 if db_object: 

101 pi_role = get_value_from_str(toardb.toardb.RC_vocabulary,'PrincipalInvestigator') 

102 originator_role = get_value_from_str(toardb.toardb.RC_vocabulary,'Originator') 

103 contributor_role = get_value_from_str(toardb.toardb.RC_vocabulary,'Contributor') 

104 list_of_originators = [] 

105 for db_role in db_object.roles: 

106 if (db_role.role == pi_role): 

107 db_contact = get_contact(db, contact_id = db_role.contact_id) 

108 PI = db_contact.name 

109 elif (db_role.role == originator_role): 

110 originators = True 

111 db_contact = get_contact(db, contact_id = db_role.contact_id) 

112 list_of_originators.append(db_contact.name) 

113 elif (db_role.role == contributor_role): 

114 db_contact = get_contact(db, contact_id = db_role.contact_id) 

115 list_of_originators.append(db_contact.name) 

116 list_of_data_originators = ", ".join(list_of_originators) 

117 # if no PI is given, the resource provider's longname should be named 

118 if PI == "unknown": 

119 role = get_value_from_str(toardb.toardb.RC_vocabulary,'ResourceProvider') 

120 for db_role in db_object.roles: 

121 if (db_role.role == role): 

122 db_contact = get_contact(db, contact_id = db_role.contact_id) 

123 PI = db_contact.longname 

124 if db_contact.attribution != '': 

125 attribution = db_contact.attribution 

126 var = get_variable(db, variable_id=db_object.variable_id).name 

127 station = get_stationmeta_by_id(db, station_id=db_object.station_id).name 

128 dataset_version = db_object.provider_version.strip() 

129 dataset_doi = db_object.doi.strip() 

130 citation = f"{PI}: time series of {var} at {station}, accessed from the TOAR database on {datetime}" 

131 if attribution and list_of_data_originators: 

132 attribution = attribution.format(orga_or_origs="data originators" if originators else "contributing organisations", list_of_data_originators=list_of_data_originators) 

133 if dataset_version != 'N/A': 

134 citation += f", original dataset version {dataset_version}" 

135 if dataset_doi!= '': 

136 citation += f", original dataset doi: {dataset_doi}" 

137 license_txt = "This data is published under a Creative Commons Attribution 4.0 International (CC BY 4.0). https://creativecommons.org/licenses/by/4.0/" 

138 return {"attribution": attribution, "citation": citation, "license": license_txt} 

139 

140def adapt_db_object(db_object, db, fields=False, lconstr_roles=False): 

141 if fields: 

142 db_object = dict(zip((field for field in fields if field not in {"station_changelog", "changelog"}), db_object)) 

143 

144 # there is a mismatch with coordinates and additional_metadata 

145 if "coordinates" in db_object: 

146 db_object["coordinates"] = get_coordinates_from_string(db_object["coordinates"]) 

147 

148 if "additional_metadata" in db_object: 

149 db_object["additional_metadata"] = clean_additional_metadata(db_object["additional_metadata"]) 

150 

151 if "station_id" in db_object: 

152 station_id = {"id": db_object["station_id"]} 

153 db_object["station"] = station_id 

154 del db_object["station_id"] 

155 

156 if "variable_id" in db_object: 

157 variable_id = {"id": db_object["variable_id"]} 

158 db_object["variable"] = variable_id 

159 del db_object["variable_id"] 

160 

161 if "changelog" in db_object: 

162 db_object["changelog"] = get_timeseries_changelog(db, db_object["id"]) 

163 

164 if "station_changelog" in db_object: 

165 try: 

166 db_object["station_changelog"] = get_stationmeta_changelog(db, db_object["station_id"]) 

167 except Exception: 

168 pass 

169 

170 if lconstr_roles: 

171 # example, how to put the roles explicitly (not needed at the moment) 

172 # organisation = get_contact(db, contact_id=39) 

173 # roles_atts["contact"] = {"id": 39, "organisation": organisation.__dict__} 

174 roles_atts = {key: value for key, value in db_object.items() if key in roles_params} 

175 db_object = {key: value for key, value in db_object.items() if key not in roles_params} 

176 db_object["roles"] = TimeseriesRoleFields(**roles_atts) 

177 else: 

178 if isinstance(db_object.station.coordinates, (WKBElement, WKTElement)): 

179 db_object.station.coordinates = get_coordinates_from_geom(db_object.station.coordinates) 

180 # there is a mismatch with additional_metadata 

181 if isinstance(db_object.station.additional_metadata, dict): 

182 db_object.station.additional_metadata = json.dumps(db_object.station.additional_metadata) 

183 db_object.additional_metadata = clean_additional_metadata(db_object.additional_metadata) 

184 

185 

186 #Internall use 

187 try: 

188 del db_object.data_license_accepted 

189 except AttributeError: 

190 pass 

191 

192 try: 

193 del db_object.dataset_approved_by_provider 

194 except AttributeError: 

195 pass 

196 

197 return db_object 

198 

199class TimeseriesQuery: 

200 def __init__(self, sign, query, fields, lconstr_roles): 

201 self.sign = sign 

202 self.query = query 

203 self.fields = fields 

204 self.lconstr_roles = lconstr_roles 

205 

206 @staticmethod 

207 def aggregate(querys): 

208 aggregated_query = next(querys) 

209 for query in querys: 

210 if aggregated_query.fields != query.fields: 

211 raise ValueError("Fields of subquerys are diffrent") 

212 aggregated_query = TimeseriesQuery( 

213 True, 

214 aggregated_query.query.union(query.query) 

215 if query.sign 

216 else aggregated_query.query.except_(query.query), 

217 aggregated_query.fields, 

218 aggregated_query.lconstr_roles or query.lconstr_roles, 

219 ) 

220 return aggregated_query 

221 

222 @staticmethod 

223 def from_query_params(query_params, db, endpoint="timeseries", sign=True): 

224 limit, offset, fields, format, filters = create_filter(query_params, endpoint) 

225 t_filter = filters["t_filter"] 

226 t_r_filter = filters["t_r_filter"] 

227 s_c_filter = filters["s_c_filter"] 

228 s_g_filter = filters["s_g_filter"] 

229 

230 if fields: 

231 # If only certain fields are selected the return type is not a orm object anymore but a dict 

232 # sort input fields to be sure to replace station_changelog before changelog 

233 fields = sorted(fields.split(","), reverse=True) 

234 if "role" in fields: 

235 fields.remove("role") 

236 fields.extend(roles_params) 

237 

238 field_map = { 

239 "id": models.Timeseries.id, 

240 "order": models.Timeseries.order, 

241 "additional_metadata": models.Timeseries.additional_metadata, 

242 "station_id": StationmetaCore.id, 

243 "variable_id": Variable.id, 

244 "name": StationmetaCore.name, 

245 "coordinates": func.ST_AsText(StationmetaCore.coordinates), 

246 "station_country": StationmetaCore.country, 

247 "station_additional_metadata": StationmetaCore.additional_metadata 

248 } 

249 

250 query_select = [field_map.get(field, text(field)) for field in fields] 

251 

252 else: 

253 query_select = [models.Timeseries] 

254 

255 if 'station_id' in fields and not id in fields: 

256 query = ( 

257 db.query(*query_select) 

258 .select_from(models.Timeseries) 

259 .distinct() 

260 .filter(text(t_filter), text(s_c_filter), text(s_g_filter), text(t_r_filter)) 

261 .join(StationmetaCore) 

262 .join(StationmetaGlobal) 

263 .join(timeseries_timeseries_roles_table) 

264 .join(models.TimeseriesRole) 

265 .join(Contact) 

266 .join(Organisation) 

267 .join(Person) 

268 .join(Variable) 

269 .execution_options(stream_results=True) 

270 ) 

271 else: 

272 query = ( 

273 db.query(*query_select) 

274 .select_from(models.Timeseries) 

275 .distinct() 

276 .filter(text(t_filter), text(s_c_filter), text(s_g_filter), text(t_r_filter)) 

277 .join(StationmetaCore) 

278 .join(StationmetaGlobal) 

279 .join(timeseries_timeseries_roles_table) 

280 .join(models.TimeseriesRole) 

281 .join(Contact) 

282 .join(Organisation) 

283 .join(Person) 

284 .join(Variable) 

285 .execution_options(stream_results=True) 

286 .order_by(models.Timeseries.id) 

287 ) 

288 

289 # Apply NOT filter with role logic 

290 if "NOT" in t_r_filter: 

291 role_ids = get_role_id_from_string(db, query_params.get("has_role")[1:]) 

292 query = query.filter( 

293 ~models.Timeseries.id.in_( 

294 select(timeseries_timeseries_roles_table.c.timeseries_id).where( 

295 timeseries_timeseries_roles_table.c.role_id.in_(role_ids) 

296 ) 

297 ) 

298 ) 

299 

300 if limit: 

301 query = query.limit(limit).offset(offset) 

302 

303 return TimeseriesQuery(sign, query, fields, lconstr_roles=any(field in roles_params for field in fields)) 

304 

305 def adapt_objects(self, db): 

306 return [adapt_db_object(db_object, db, self.fields, self.lconstr_roles) for db_object in self.query] 

307 

308 

309def search_all(db, path_params, query_params, lts=False, endpoint=None): 

310 if endpoint == None: 

311 endpoint = "timeseries" if lts else "search" 

312 

313 try: 

314 ts_list = TimeseriesQuery.from_query_params(query_params, db, endpoint).adapt_objects(db) 

315 # remove duplicates 

316 if ts_list and isinstance(ts_list[0], dict): 

317 try: 

318 ts_set = set(json.dumps(ts, sort_keys=True) for ts in ts_list) 

319 return [json.loads(ts) for ts in ts_set] 

320 except: # not correct, because duplicates are NOT removed!!! 

321 return ts_list 

322 else: 

323 return ts_list 

324 except (KeyError, ValueError) as e: 

325 status_code = 400 

326 return JSONResponse(status_code=status_code, content=str(e)) 

327 

328 

329def search_all_aggregation(db, path_params, signs, query_params_list, lts=False): 

330 endpoint = "timeseries" if lts else "search" 

331 

332 try: 

333 ts_list = TimeseriesQuery.aggregate( 

334 TimeseriesQuery.from_query_params(query_params, db, endpoint, sign) 

335 for sign, query_params in zip(signs, query_params_list) 

336 ).adapt_objects(db) 

337 # remove duplicates 

338 if ts_list and isinstance(ts_list[0], dict): 

339 try: 

340 ts_set = set(json.dumps(ts, sort_keys=True) for ts in ts_list) 

341 return [json.loads(ts) for ts in ts_set] 

342 except: # not correct, because duplicates are NOT removed!!! 

343 return ts_list 

344 else: 

345 return ts_list 

346 except (KeyError, ValueError) as e: 

347 status_code = 400 

348 return JSONResponse(status_code=status_code, content=str(e)) 

349 

350 

351 

352def get_timeseries_by_unique_constraints(db: Session, station_id: int, variable_id: int, resource_provider: str = None, 

353 sampling_frequency: str = None, provider_version: str = None, data_origin_type: str = None, 

354 data_origin: str = None, sampling_height: float = None, label: str = None): 

355 """ 

356 Criteria taken from TOAR_TG_Vol02_Data_Processing, 'Step 14: Identify Time Series' 

357 Criterion 14.1: id of the corresponding station 

358 Criterion 14.2: variable id 

359 Criterion 14.3: role: resource_provider (organisation) 

360 Criterion 14.4: sampling_frequency 

361 Criterion 14.5: version number 

362 Criterion 14.6: data_origin_type (measurement or model) 

363 Criterion 14.7: data origin 

364 Criterion 14.8: sampling height 

365 Criterion 14.9: data filtering procedures or other special dataset identifiers (use database field 'label') 

366 """ 

367 

368# print("in get_timeseries_by_unique_constraints") 

369# print(f"station_id: {station_id}, variable_id: {variable_id}, resource_provider: {resource_provider}, ", \ 

370# f"sampling_frequency: {sampling_frequency}, provider_version: {provider_version}, data_origin_type: {data_origin_type}, ", \ 

371# f"data_origin: {sampling_frequency}, sampling_height: {sampling_height}, label: {label}") 

372 

373 # filter for criterion 14.1 and 14.2 

374 ret_db_object = db.query(models.Timeseries).filter(models.Timeseries.station_id == station_id) \ 

375 .filter(models.Timeseries.variable_id == variable_id).all() 

376 # if already not found: return None 

377 # if only one single object is found, it has to be checked whether all criterions are fullfilled 

378 if len(ret_db_object) == 0: 

379 return None 

380 

381 

382 # filter for criterion 14.3 

383 if resource_provider: 

384 # issue with '/' in organisation longname ==> only possible with double encoding! 

385 resource_provider = resource_provider.replace('%2F', '/') 

386 role_num = get_value_from_str(toardb.toardb.RC_vocabulary,'ResourceProvider') 

387 iter_obj = ret_db_object.copy() 

388 counter=0 

389 for db_object in iter_obj: 

390 found = False 

391 for role in db_object.roles: 

392 # resource provider is always an organisation! 

393 organisation = get_contact(db, contact_id=role.contact_id) 

394 if ((role_num == role.role) and (organisation.longname == resource_provider)): 

395 found = True 

396 if not found: 

397 ret_db_object.pop(counter) 

398 else: 

399 counter += 1 

400 else: 

401 # time series that do not have a resource_provider are not identical to those who do not! 

402 role_num = get_value_from_str(toardb.toardb.RC_vocabulary,'ResourceProvider') 

403 iter_obj = ret_db_object.copy() 

404 counter=0 

405 for db_object in iter_obj: 

406 found = False 

407 for role in db_object.roles: 

408 if (role_num == role.role): 

409 found = True 

410 if found: 

411 ret_db_object.pop(counter) 

412 else: 

413 counter += 1 

414 

415 

416 # if already only none object --> return 

417 # if only one single object is found, it has to be checked whether all criterions are fullfilled 

418 if len(ret_db_object) == 0: 

419 return None 

420 

421 # filter for criterion 14.4 

422 if sampling_frequency: 

423 iter_obj = ret_db_object.copy() 

424 counter=0 

425 for db_object in iter_obj: 

426 if not (db_object.sampling_frequency == sampling_frequency): 

427 ret_db_object.pop(counter) 

428 else: 

429 counter += 1 

430 

431 # if already only none object --> return 

432 # if only one single object is found, it has to be checked whether all criterions are fullfilled 

433 if len(ret_db_object) == 0: 

434 return None 

435 

436 # filter for criterion 14.5 

437 if provider_version: 

438 iter_obj = ret_db_object.copy() 

439 counter=0 

440 for db_object in iter_obj: 

441 if not (db_object.provider_version == provider_version): 

442 ret_db_object.pop(counter) 

443 else: 

444 counter += 1 

445 

446 # if already only none object --> return 

447 # if only one single object is found, it has to be checked whether all criterions are fullfilled 

448 if len(ret_db_object) == 0: 

449 return None 

450 

451 # filter for criterion 14.6 

452 if data_origin_type: 

453 data_origin_type_num = get_value_from_str(toardb.toardb.OT_vocabulary,data_origin_type) 

454 iter_obj = ret_db_object.copy() 

455 counter=0 

456 for db_object in iter_obj: 

457 if not (db_object.data_origin_type == data_origin_type_num): 

458 ret_db_object.pop(counter) 

459 else: 

460 counter += 1 

461 

462 # if already only none object --> return 

463 # if only one single object is found, it has to be checked whether all criterions are fullfilled 

464 if len(ret_db_object) == 0: 

465 return None 

466 

467 # filter for criterion 14.7 

468 if data_origin: 

469 data_origin_num = get_value_from_str(toardb.toardb.DO_vocabulary,data_origin) 

470 iter_obj = ret_db_object.copy() 

471 counter=0 

472 for db_object in iter_obj: 

473 if not (db_object.data_origin == data_origin_num): 

474 ret_db_object.pop(counter) 

475 else: 

476 counter += 1 

477 

478 # if already only none object --> return 

479 # if only one single object is found, it has to be checked whether all criterions are fullfilled 

480 if len(ret_db_object) == 0: 

481 return None 

482 

483 # filter for criterion 14.8 

484 if sampling_height: 

485 iter_obj = ret_db_object.copy() 

486 counter=0 

487 for db_object in iter_obj: 

488 if not (db_object.sampling_height == sampling_height): 

489 ret_db_object.pop(counter) 

490 else: 

491 counter += 1 

492 

493 # if already only none object --> return 

494 # if only one single object is found, it has to be checked whether all criterions are fullfilled 

495 if len(ret_db_object) == 0: 

496 return None 

497 

498 # filter for criterion 14.9 

499 if label: 

500 iter_obj = ret_db_object.copy() 

501 counter=0 

502 for db_object in iter_obj: 

503 if not (db_object.label == label): 

504 ret_db_object.pop(counter) 

505 else: 

506 counter += 1 

507 

508 # check that only one object is left!!! 

509 # adapt mismatches for return value 

510 if len(ret_db_object) == 0: 

511 ret_db_object = None 

512 else: 

513 if len(ret_db_object) == 1: 

514 ret_db_object = ret_db_object[0] 

515 # there is a mismatch with additional_metadata 

516 ret_db_object.additional_metadata = clean_additional_metadata(ret_db_object.additional_metadata) 

517 # there is also a mismatch with coordinates and additional_metadata from station object 

518 if isinstance(ret_db_object.station.coordinates, (WKBElement, WKTElement)): 

519 ret_db_object.station.coordinates = get_coordinates_from_geom(ret_db_object.station.coordinates) 

520 # there is a mismatch with additional_metadata 

521 if isinstance(ret_db_object.station.additional_metadata, dict): 

522 ret_db_object.station.additional_metadata = json.dumps(ret_db_object.station.additional_metadata) 

523 else: 

524 status_code=405 

525 message=f"Timeseries not unique, more criteria need to be defined." 

526 return JSONResponse(status_code=status_code, content=message) 

527 

528 return ret_db_object 

529 

530 

531def get_timeseries_changelog(db: Session, timeseries_id: int): 

532 return db.query(models.TimeseriesChangelog).filter(models.TimeseriesChangelog.timeseries_id == timeseries_id).all() 

533 

534 

535def get_timeseries_programme(db: Session, name: str): 

536 return db.query(models.TimeseriesProgramme).filter(models.TimeseriesProgramme.name == name).all() 

537 

538 

539# is this internal, or should this also go to public REST api? 

540# do we need this at all? 

541def get_role_ids_of_timeseries(db: Session, timeseries_id: int): 

542 db_objects = db.query(models.TimeseriesTimeseriesRoles) \ 

543 .filter(models.TimeseriesTimeseriesRoles.timeseries_id == timeseries_id) \ 

544 .all() 

545 return db_objects 

546 

547 

548# is this internal, or should this also go to public REST api? 

549def get_unique_timeseries_role(db: Session, role: int, contact_id: int, status: int): 

550 db_object = db.query(models.TimeseriesRole).filter(models.TimeseriesRole.role == role) \ 

551 .filter(models.TimeseriesRole.contact_id == contact_id) \ 

552 .filter(models.TimeseriesRole.status == status) \ 

553 .first() 

554 return db_object 

555 

556 

557def get_role_id_from_string(db: Session, role_string: str): 

558 sql_command = f""" 

559 SELECT distinct(r.id) FROM timeseries_timeseries_roles tr, 

560 timeseries_roles r, 

561 contacts c, 

562 organisations o, 

563 persons p 

564 WHERE (o.longname IN ('{role_string}') OR 

565 o.name IN ('{role_string}') OR 

566 o.city IN ('{role_string}') OR 

567 o.homepage IN ('{role_string}') OR 

568 o.contact_url IN ('{role_string}') OR 

569 p.email IN ('{role_string}') OR 

570 p.name IN ('{role_string}') OR 

571 p.orcid IN ('{role_string}')) AND 

572 tr.role_id=r.id AND 

573 r.contact_id=c.id AND 

574 c.person_id=p.id AND 

575 c.organisation_id=o.id 

576 """ 

577 resultproxy = db.execute(sql_command) 

578 id_list = [ rowproxy._asdict()['id'] for rowproxy in resultproxy ] 

579 return id_list 

580 

581# is this internal, or should this also go to public REST api? 

582def get_unique_timeseries_programme(db: Session, name: str, homepage: str): 

583 db_object = db.query(models.TimeseriesProgramme).filter(models.TimeseriesProgramme.name == name) \ 

584 .filter(models.TimeseriesProgramme.homepage == homepage) \ 

585 .first() 

586 return db_object 

587 

588 

589# is this internal, or should this also go to public REST api? 

590def get_unique_timeseries_annotation(db: Session, text: str, contributor_id: int): 

591 db_object = db.query(models.TimeseriesAnnotation).filter(models.TimeseriesAnnotation.text == text) \ 

592 .filter(models.TimeseriesAnnotation.contributor_id == contributor_id) \ 

593 .first() 

594 return db_object 

595 

596 

597def get_contributors_string(programmes, roles): 

598 # sort every section alphabetically and have sub-section titles 

599 # programmes are already unique, but they still need to be sorted 

600 resultp = "programmes: " + ";".join(sorted([programme.longname for programme in programmes])) if len(programmes) > 0 else '' 

601 # organisations might contain duplicates 

602 organisations = set() 

603 [ organisations.add(role.contact.organisation.longname) for role in roles ] 

604 # eliminate dummy organisation 

605 organisations.discard('') 

606 resulto = "organisations: " + ";".join(sorted(organisations)) if len(organisations) > 0 else '' 

607 # persons might contain duplicates 

608 persons = set() 

609 [ persons.add(role.contact.person.name) for role in roles ] 

610 # eliminate dummy person 

611 persons.discard('') 

612 resultc = "persons:" + ";".join(sorted(persons)) if len(persons) > 0 else '' 

613 result = resultp if resultp != '' else '' 

614 if resulto != '': 

615 result = resulto if result == '' else result + '; ' + resulto 

616 if resultc != '': 

617 result = resultc if result == '' else result + '; ' + resultc 

618 return result 

619 

620 

621def get_contributors_list(db: Session, timeseries_ids, format: str = 'text'): 

622 # get time series' programmes 

623 # join(models.Timeseries, Timeseries.programme_id == TimeseriesProgramme.id) is implicit given due to the foreign key 

624 programmes = db.query(models.TimeseriesProgramme) \ 

625 .join(models.Timeseries) \ 

626 .filter(models.Timeseries.id.in_(timeseries_ids), models.TimeseriesProgramme.id != 0).all() 

627 # get all time series roles (with duplicates) 

628 roles = db.execute(select([timeseries_timeseries_roles_table]).where(timeseries_timeseries_roles_table.c.timeseries_id.in_(timeseries_ids))) 

629 # eliminate duplicates 

630 role_ids = set() 

631 [ role_ids.add(role.role_id) for role in roles ] 

632 roles = db.query(models.TimeseriesRole).filter(models.TimeseriesRole.id.in_(role_ids)).all() 

633 # return both programmes and roles 

634 if format == 'text': 

635 result = get_contributors_string(programmes, roles) 

636 elif format == 'json': 

637 result = programmes + roles 

638 else: 

639 status_code=400 

640 message=f"not a valid format: {format}" 

641 result = JSONResponse(status_code=status_code, content=message) 

642 return result 

643 

644 

645def get_request_contributors(db: Session, format: str = 'text', input_handle: UploadFile = File(...)): 

646 f = input_handle.file 

647 timeseries_ids = [int(line.strip()) for line in f.readlines()] 

648 return get_contributors_list(db, timeseries_ids, format) 

649 

650 

651def get_registered_request_contributors(db: Session, rid, format: str = 'text'): 

652 try: 

653 timeseries_ids = db.execute(select([s1_contributors_table]).\ 

654 where(s1_contributors_table.c.request_id == rid)).mappings().first()['timeseries_ids'] 

655 return get_contributors_list(db, timeseries_ids, format) 

656 except: 

657 status_code=400 

658 message=f"not a registered request id: {rid}" 

659 return JSONResponse(status_code=status_code, content=message) 

660 

661 

662def register_request_contributors(db: Session, rid, ids): 

663 try: 

664 db.execute(insert(s1_contributors_table).values(request_id=rid, timeseries_ids=ids)) 

665 db.commit() 

666 status_code = 200 

667 message=f'{rid} successfully registered.' 

668 except IntegrityError as e: 

669 error_code = e.orig.pgcode 

670 if error_code == "23505": 

671 status_code = 443 

672 message=f'{rid} already registered.' 

673 else: 

674 status_code = 442 

675 message=f'database error: error_code' 

676 result = JSONResponse(status_code=status_code, content=message) 

677 return result 

678 

679 

680# is this internal, or should this also go to public REST api? 

681def get_timeseries_roles(db: Session, timeseries_id: int): 

682 return db.execute(select([timeseries_timeseries_roles_table]).where(timeseries_timeseries_roles_table.c.timeseries_id == timeseries_id)) 

683 

684 

685# is this internal, or should this also go to public REST api? 

686def get_timeseries_role_by_id(db: Session, role_id): 

687 return db.query(models.TimeseriesRole).filter(models.TimeseriesRole.id == role_id).first() 

688 

689 

690# is this internal, or should this also go to public REST api? 

691def get_timeseries_annotations(db: Session, timeseries_id: int): 

692 return db.execute(select([timeseries_timeseries_annotations_table]).where(timeseries_timeseries_annotations_table.c.timeseries_id == timeseries_id)) 

693 

694 

695# is this internal, or should this also go to public REST api? 

696def get_timeseries_annotation_by_id(db: Session, annotation_id): 

697 return db.query(models.TimeseriesAnnotation).filter(models.TimeseriesAnnotation.id == annotation_id).first() 

698 

699 

700def create_timeseries(db: Session, timeseries: TimeseriesCreate, author_id: int): 

701 timeseries_dict = timeseries.dict() 

702 # no timeseries can be created, if station_id or variable_id are not found in the database 

703 if not station_id_exists(db,timeseries.station_id): 

704 status_code=440 

705 message=f"Station (station_id: {timeseries.station_id}) not found in database." 

706 return JSONResponse(status_code=status_code, content=message) 

707 if timeseries_dict['additional_metadata']: 

708 for key, value in timeseries_dict['additional_metadata'].items(): 

709 if isinstance(value,dict): 

710 for key2, value2 in value.items(): 

711 timeseries_dict['additional_metadata'][key][key2] = value2.replace("''","'") 

712 else: 

713 timeseries_dict['additional_metadata'][key] = value.replace("''","'") 

714 if 'absorption_cross_section' in timeseries_dict['additional_metadata']: 

715 value = get_value_from_str(toardb.toardb.CS_vocabulary,timeseries_dict['additional_metadata']['absorption_cross_section']) 

716 timeseries_dict['additional_metadata']['absorption_cross_section'] = value 

717 if 'calibration_type' in timeseries_dict['additional_metadata']: 

718 value = get_value_from_str(toardb.toardb.CT_vocabulary,timeseries_dict['additional_metadata']['calibration_type']) 

719 timeseries_dict['additional_metadata']['calibration_type'] = value 

720 if 'sampling_type' in timeseries_dict['additional_metadata']: 

721 value = get_value_from_str(toardb.toardb.KS_vocabulary,timeseries_dict['additional_metadata']['sampling_type']) 

722 timeseries_dict['additional_metadata']['sampling_type'] = value 

723 db_variable = get_variable(db,timeseries.variable_id) 

724 if not db_variable: 

725 status_code=441 

726 message=f"Variable (variable_id: {timeseries.variable_id}) not found in database." 

727 return JSONResponse(status_code=status_code, content=message) 

728 # for networks: we do not want data without a resource_provider (organisation) 

729 resource_provider = None 

730 if timeseries.roles: 

731 for role in timeseries.roles: 

732 if role.role == 'ResourceProvider': 

733 # resource provider is always an organisation! 

734 organisation = get_contact(db, contact_id=role.contact_id) 

735 if organisation: 

736 resource_provider=organisation.longname 

737 else: 

738 status_code=442 

739 message=f"Resource provider (contact_id: {role.contact_id}) not found in database." 

740 return JSONResponse(status_code=status_code, content=message) 

741 db_timeseries = get_timeseries_by_unique_constraints(db, station_id=timeseries.station_id, 

742 variable_id=timeseries.variable_id, label=timeseries.label, 

743 provider_version=timeseries.provider_version, resource_provider=resource_provider) 

744 if db_timeseries: 

745 if isinstance(db_timeseries, list): 

746 status_code=444 

747 message = {"detail":{"message":"Given constraints match more than one timeseries.", 

748 "timeseries_ids": [ db_timeseries[i].id for i in range(len(db_timeseries)) ]}} 

749 return JSONResponse(status_code=status_code, content=message) 

750 else: 

751 status_code=443 

752 message = {"detail":{"message":"Timeseries already registered.","timeseries_id":db_timeseries.id}} 

753 return JSONResponse(status_code=status_code, content=message) 

754 roles_data = timeseries_dict.pop('roles', None) 

755 annotations_data = timeseries_dict.pop('annotations', None) 

756 db_timeseries = models.Timeseries(**timeseries_dict) 

757 # there is a mismatch with additional_metadata 

758 # in upload command, we have now: "additional_metadata": "{}" 

759 # but return from this method gives (=database): "additional_metadata": {} 

760# print(db_timeseries.additional_metadata) 

761# db_timeseries.additional_metadata = json.loads(str(db_timeseries.additional_metadata).replace("'",'"')) 

762 db_timeseries.sampling_frequency = get_value_from_str(toardb.toardb.SF_vocabulary,db_timeseries.sampling_frequency) 

763 db_timeseries.aggregation = get_value_from_str(toardb.toardb.AT_vocabulary,db_timeseries.aggregation) 

764 db_timeseries.data_origin_type = get_value_from_str(toardb.toardb.OT_vocabulary,db_timeseries.data_origin_type) 

765 db_timeseries.data_origin = get_value_from_str(toardb.toardb.DO_vocabulary,db_timeseries.data_origin) 

766 db.add(db_timeseries) 

767 result = db.commit() 

768 db.refresh(db_timeseries) 

769 # get timeseries_id 

770 timeseries_id = db_timeseries.id 

771 # store roles and update association table 

772 if roles_data: 

773 for r in roles_data: 

774 db_role = models.TimeseriesRole(**r) 

775 db_role.role = get_value_from_str(toardb.toardb.RC_vocabulary,db_role.role) 

776 db_role.status = get_value_from_str(toardb.toardb.RS_vocabulary,db_role.status) 

777 db_object = get_unique_timeseries_role(db, db_role.role, db_role.contact_id, db_role.status) 

778 if db_object: 

779 role_id = db_object.id 

780 else: 

781 # Something is going wrong here! 

782 # Is the model TimeseriesRole correctly defined?! 

783 del db_role.contact 

784 db.add(db_role) 

785 db.commit() 

786 db.refresh(db_role) 

787 role_id = db_role.id 

788 db.execute(insert(timeseries_timeseries_roles_table).values(timeseries_id=timeseries_id, role_id=role_id)) 

789 db.commit() 

790 # store annotations and update association table 

791 if annotations_data: 

792 for a in annotations_data: 

793 db_annotation = models.TimeseriesAnnotation(**a) 

794 # check whether annotation is already present in database 

795 db_object = get_unique_timeseries_annotation(db, db_annotation.text, db_annotation.contributor_id) 

796 if db_object: 

797 annotation_id = db_object.id 

798 else: 

799 db.add(db_annotation) 

800 db.commit() 

801 db.refresh(db_annotation) 

802 annotation_id = db_annotation.id 

803 db.execute(insert(timeseries_timeseries_annotations_table).values(timeseries_id=timeseries_id, annotation_id=annotation_id)) 

804 db.commit() 

805 # create changelog entry 

806 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"Created") 

807 description="time series created" 

808 db_changelog = TimeseriesChangelog(description=description, timeseries_id=timeseries_id, author_id=author_id, type_of_change=type_of_change, 

809 old_value='', new_value='') 

810 db.add(db_changelog) 

811 db.commit() 

812 status_code=200 

813 message = {"detail":{"message":"New timeseries created.","timeseries_id":db_timeseries.id}} 

814 return JSONResponse(status_code=status_code, content=message) 

815# return db_timeseries 

816 

817 

818def patch_timeseries(db: Session, description: str, timeseries_id: int, timeseries: TimeseriesPatch, 

819 author_id: int): 

820 timeseries_dict = timeseries.dict() 

821 # check for controlled vocabulary in additional metadata 

822 # do this in dictionary that always contains additional_metadata entry 

823 if timeseries_dict['additional_metadata']: 

824 for key, value in timeseries_dict['additional_metadata'].items(): 

825 if isinstance(value,dict): 

826 for key2, value2 in value.items(): 

827 timeseries_dict['additional_metadata'][key][key2] = value2.replace("''","'") 

828 else: 

829 timeseries_dict['additional_metadata'][key] = value.replace("''","'") 

830 if 'absorption_cross_section' in timeseries_dict['additional_metadata']: 

831 value = get_value_from_str(toardb.toardb.CS_vocabulary,timeseries_dict['additional_metadata']['absorption_cross_section']) 

832 timeseries_dict['additional_metadata']['absorption_cross_section'] = value 

833 if 'calibration_type' in timeseries_dict['additional_metadata']: 

834 value = get_value_from_str(toardb.toardb.CT_vocabulary,timeseries_dict['additional_metadata']['calibration_type']) 

835 timeseries_dict['additional_metadata']['calibration_type'] = value 

836 if 'sampling_type' in timeseries_dict['additional_metadata']: 

837 value = get_value_from_str(toardb.toardb.KS_vocabulary,timeseries_dict['additional_metadata']['sampling_type']) 

838 timeseries_dict['additional_metadata']['sampling_type'] = value 

839 # delete empty fields from timeseries_dict already at this place, to be able to 

840 # distinguish between "single value correction in metadata" and "comprehensive metadata revision" 

841 # (see controlled vocabulary "CL_vocabulary") 

842 roles_data = timeseries_dict.pop('roles', None) 

843 annotations_data = timeseries_dict.pop('annotations', None) 

844 timeseries_dict2 = {k: v for k, v in timeseries_dict.items() if v is not None} 

845 number_of_elements = len(timeseries_dict2) 

846 if roles_data: 

847 number_of_elements +=1 

848 if annotations_data: 

849 number_of_elements +=1 

850 if (number_of_elements == 1): 

851 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"SingleValue") 

852 else: 

853 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"Comprehensive") 

854 db_obj = models.Timeseries(**timeseries_dict2) 

855# also the sqlalchemy get will call get_timeseries here!!! 

856# --> therefore call it right away 

857 db_timeseries = get_timeseries(db, timeseries_id, fields="dataset_approved_by_provider,data_license_accepted") 

858 dataset_approved_by_provider = db_timeseries.dataset_approved_by_provider 

859 data_license_accepted = db_timeseries.data_license_accepted 

860 db_timeseries = get_timeseries(db, timeseries_id) 

861 # for some unknown reasons, format of coordinates and additional_metadata (for timeseries, but also for its related station!) have changed! 

862 db.rollback() 

863 db_timeseries.dataset_approved_by_provider = dataset_approved_by_provider 

864 db_timeseries.data_license_accepted = data_license_accepted 

865 # still problems with coordinates and additional metadata (from STATION!!!)... 

866 try: 

867 db_timeseries.station.coordinates = get_geom_from_coordinates(db_timeseries.station.coordinates) 

868 except: 

869 pass 

870# try: 

871# db_timeseries.station.additional_metadata = json.loads(str(db_timeseries.station.additional_metadata).replace("'",'"')) 

872# except: 

873# pass 

874 # prepare changelog entry/entries 

875 no_log = (description == 'NOLOG') 

876 if not no_log: 

877 old_values={} 

878 new_values={} 

879 for k, v in timeseries_dict2.items(): 

880 field=str(getattr(db_timeseries,k)) 

881 if k == 'additional_metadata': 

882 old_values[k] = db_timeseries.additional_metadata 

883 else: 

884 if k == "sampling_frequency": 

885 old_values[k] = get_str_from_value(toardb.toardb.SF_vocabulary, int(field)) 

886 elif k == "aggregation": 

887 old_values[k] = get_str_from_value(toardb.toardb.AT_vocabulary, int(field)) 

888 elif k == "data_origin": 

889 old_values[k] = get_str_from_value(toardb.toardb.DO_vocabulary, int(field)) 

890 elif k == "data_origin_type": 

891 old_values[k] = get_str_from_value(toardb.toardb.OT_vocabulary, int(field)) 

892 else: 

893 old_values[k] = field 

894 for k, v in timeseries_dict2.items(): 

895 setattr(db_timeseries,k,timeseries_dict[k]) 

896 # there is a mismatch with additional_metadata 

897 # in upload command, we have now: "additional_metadata": "{}" 

898 # but return from this method gives (=database): "additional_metadata": {} 

899# if timeseries_dict['additional_metadata']: 

900# db_timeseries.additional_metadata = clean_additional_metadata(db_timeseries.additional_metadata) 

901 # do the following for every entry that uses the controlled vocabulary! 

902 # this should be improved! 

903 if db_obj.sampling_frequency: 

904 db_timeseries.sampling_frequency = get_value_from_str(toardb.toardb.SF_vocabulary, db_obj.sampling_frequency) 

905 if db_obj.aggregation: 

906 db_timeseries.aggregation = get_value_from_str(toardb.toardb.AT_vocabulary, db_obj.aggregation) 

907 if db_obj.data_origin: 

908 db_timeseries.data_origin = get_value_from_str(toardb.toardb.DO_vocabulary, db_obj.data_origin) 

909 if db_obj.data_origin_type: 

910 db_timeseries.data_origin_type = get_value_from_str(toardb.toardb.OT_vocabulary, db_obj.data_origin_type) 

911 db.add(db_timeseries) 

912 result = db.commit() 

913 # store roles and update association table 

914 if roles_data: 

915 if not no_log: 

916 # prepare changelog entry/entries 

917 description = description + f"; add role" 

918 db_old_roles = get_timeseries_roles(db, timeseries_id) 

919 old_roles = [] 

920 for oldr in db_old_roles: 

921 old_role = get_timeseries_role_by_id(db, oldr.role_id) 

922 old_value = {} 

923 old_value['role'] = get_str_from_value(toardb.toardb.RC_vocabulary,old_role.role) 

924 old_value['status'] = get_str_from_value(toardb.toardb.RS_vocabulary,old_role.status) 

925 old_value['contact_id'] = old_role.contact_id 

926 old_roles.append(old_value) 

927 old_values['roles'] = old_roles 

928 new_roles = old_roles.copy() 

929 for r in roles_data: 

930 db_role = models.TimeseriesRole(**r) 

931 if not no_log: 

932 new_roles.append(r) 

933 db_role.role = get_value_from_str(toardb.toardb.RC_vocabulary,db_role.role) 

934 db_role.status = get_value_from_str(toardb.toardb.RS_vocabulary,db_role.status) 

935 # check whether role is already present in database 

936 db_object = get_unique_timeseries_role(db, db_role.role, db_role.contact_id, db_role.status) 

937 if db_object: 

938 role_id = db_object.id 

939 else: 

940 db.add(db_role) 

941 db.commit() 

942 db.refresh(db_role) 

943 role_id = db_role.id 

944 db.execute(insert(timeseries_timeseries_roles_table).values(timeseries_id=timeseries_id, role_id=role_id)) 

945 db.commit() 

946 if not no_log: 

947 new_values['roles'] = new_roles 

948 # store annotations and update association table 

949 if annotations_data: 

950 if not no_log: 

951 # prepare changelog entry/entries 

952 description = description + f"; add annotation" 

953 db_old_annotations = get_timeseries_annotations(db, timeseries_id) 

954 old_annotations = [] 

955 for olda in db_old_annotations: 

956 old_annotation = get_timeseries_annotation_by_id(db, olda.annotation_id) 

957 old_value = {} 

958 old_value['kind'] = get_str_from_value(toardb.toardb.RC_vocabulary,old_role.kind) 

959 old_value['text'] = get_str_from_value(toardb.toardb.RS_vocabulary,old_role.status) 

960 old_value['date_added'] = get_str_from_value(toardb.toardb.RS_vocabulary,old_role.status) 

961 old_value['approved'] = get_str_from_value(toardb.toardb.RS_vocabulary,old_role.status) 

962 old_value['contributor_id'] = str(old_role.contact_id) 

963 old_annotations.append(old_value) 

964 old_values['annotations'] = old_annotations 

965 new_annotations = [] 

966 for a in annotations_data: 

967 db_annotation = models.TimeseriesAnnotation(**a) 

968 # check whether annotation is already present in database 

969 if not no_log: 

970 new_annotations.append(a) 

971 db_annotation.kind = get_value_from_str(toardb.toardb.AK_vocabulary,db_annotation.kind) 

972 db_object = get_unique_timeseries_annotation(db, db_annotation.text, db_annotation.contributor_id) 

973 if db_object: 

974 annotation_id = db_object.id 

975 else: 

976 db.add(db_annotation) 

977 db.commit() 

978 db.refresh(db_annotation) 

979 annotation_id = db_annotation.id 

980 db.execute(insert(timeseries_timeseries_annotations_table).values(timeseries_id=timeseries_id, annotation_id=annotation_id)) 

981 db.commit() 

982 if not no_log: 

983 new_values['annotations'] = new_annotations 

984 # add patch to changelog table 

985 if not no_log: 

986 if new_values: 

987 timeseries_dict2.update(new_values) 

988 db_changelog = TimeseriesChangelog(description=description, timeseries_id=timeseries_id, author_id=author_id, type_of_change=type_of_change, 

989 old_value=str(old_values), new_value=str(timeseries_dict2)) 

990 db.add(db_changelog) 

991 db.commit() 

992 status_code=200 

993 message = {"detail":{"message":"timeseries patched.","timeseries_id":db_timeseries.id}} 

994 return JSONResponse(status_code=status_code, content=message)