Coverage for toardb/timeseries/timeseries.py: 91%
98 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH
2# SPDX-License-Identifier: MIT
4"""
5Simple test API for timeseries management
6"""
8import re
9import datetime as dt
10from typing import List, Union
11from fastapi import APIRouter, Depends, HTTPException, Body, Request, File, UploadFile
12from fastapi.responses import JSONResponse
13from sqlalchemy.orm import Session
14from sqlalchemy.inspection import inspect
15from starlette.datastructures import QueryParams
16import urllib.parse
17from . import crud, schemas
18from toardb.utils.database import ToarDbSession, get_db
19from toardb.stationmeta.crud import get_stationmeta_by_id
20from toardb.variables.crud import get_variable
21from toardb.contacts.crud import get_contact
22from toardb.timeseries.models import Timeseries
23from toardb.stationmeta.models import StationmetaCore, StationmetaGlobal
24from toardb.data.models import Data
25from toardb.utils.utils import (
26 get_str_from_value,
27 get_admin_access_rights,
28 get_timeseries_md_change_access_rights,
29 get_register_contributors_access_rights
30)
33router = APIRouter()
36def get_query_params(raw_query_string):
37 updated_query_string = raw_query_string.replace('+', '%2B')
38 updated_params = dict(urllib.parse.parse_qsl(updated_query_string))
39 return QueryParams(updated_params)
41# CRUD: create, retrieve, update, delete
42# 1. retrieve
43#get all entries of table timeseries
44@router.get('/search/', response_model=List[schemas.Timeseries] | List[schemas.TimeseriesFields], response_model_exclude_none=True, response_model_exclude_unset=True)
45def search_all_timeseries(request: Request, db: Session = Depends(get_db)):
46 updated_query_params = get_query_params(request.url.query)
47 return crud.search_all(db, path_params=request.path_params, query_params=updated_query_params)
50# Does not make sense when only certain fields are selected in different requests needs different syntax
51# order of concatenated statements is also unclear
52@router.get('/search/a', response_model=List[schemas.Timeseries] | List[schemas.TimeseriesFields], response_model_exclude_none=True, response_model_exclude_unset=True)
53def search_all_timeseries_aggregations(request: Request, db: Session = Depends(get_db)):
54 urls = re.split(r"(?=[+-]\D)", "+" + request.url.query)[1:]
55 if urls:
56 signs = [url.startswith("+") for url in urls]
57 query_params = [get_query_params(url[1:]) for url in urls]
58 return crud.search_all_aggregation(
59 db, path_params=request.path_params, signs=signs, query_params_list=query_params
60 )
61 else:
62 return search_all_timeseries(request, db)
65#get all entries of table timeseries
66@router.get('/timeseries/', response_model=List[schemas.Timeseries], response_model_exclude_none=True)
67def get_all_timeseries(request: Request, db: Session = Depends(get_db)):
68 raw_query_string = request.url.query
69 updated_query_string = raw_query_string.replace('+', '%2B')
70 updated_params = dict(urllib.parse.parse_qsl(updated_query_string))
71 updated_query_params = QueryParams(updated_params)
72 return crud.search_all(db, path_params=request.path_params, query_params=updated_query_params,lts=True)
75#get all metadata of one timeseries (known its ID)
76@router.get('/timeseries/{timeseries_id}', response_model=schemas.Timeseries, response_model_exclude_none=True, response_model_exclude_unset=True)
77def get_timeseries(timeseries_id: int, fields: str = None, db: Session = Depends(get_db)):
78 db_timeseries = crud.get_timeseries(db, timeseries_id=timeseries_id, fields=fields)
79 if db_timeseries is None:
80 raise HTTPException(status_code=404, detail="Timeseries not found.")
81 return db_timeseries
84#get all metadata of one timeseries (known its ID)
85@router.get('/timeseries/id/{timeseries_id}', response_model=schemas.Timeseries, response_model_exclude_none=True, response_model_exclude_unset=True)
86def get_timeseries2(timeseries_id: int, db: Session = Depends(get_db)):
87 db_timeseries = crud.get_timeseries(db, timeseries_id=timeseries_id)
88 if db_timeseries is None:
89 raise HTTPException(status_code=404, detail="Timeseries not found.")
90 return db_timeseries
93#build citation text of one timeseries (known its ID)
94@router.get('/timeseries/citation/{timeseries_id}')
95def get_citation(timeseries_id: int, datetime: dt.datetime = None, db: Session = Depends(get_db)):
96 citation = crud.get_citation(db, timeseries_id=timeseries_id, datetime=datetime)
97 return citation
100#get all metadata of one timeseries (known its unique label)
101@router.get('/timeseries/unique/')
102def get_timeseries(station_id: int, variable_id: int, resource_provider: str = None,
103 sampling_frequency: str = None, provider_version: str = None, data_origin_type: str = None,
104 data_origin: str = None, sampling_height: float = None, label: str = None,
105 db: Session = Depends(get_db)):
106 db_timeseries = crud.get_timeseries_by_unique_constraints(db, station_id=station_id,
107 variable_id=variable_id, resource_provider=resource_provider,
108 sampling_frequency=sampling_frequency, provider_version=provider_version, data_origin_type=data_origin_type,
109 data_origin= data_origin, sampling_height=sampling_height, label=label)
110 if db_timeseries is None:
111 raise HTTPException(status_code=404, detail="Timeseries not found.")
112 return db_timeseries
115@router.get('/timeseries_changelog/{timeseries_id}', response_model=List[schemas.TimeseriesChangelog])
116def get_timeseries_changelog(timeseries_id: int, db: Session = Depends(get_db)):
117 db_changelog = crud.get_timeseries_changelog(db, timeseries_id=timeseries_id)
118 return db_changelog
121@router.get('/timeseries_programme/{name}', response_model=List[schemas.TimeseriesProgramme])
122def get_timeseries_programme(name: str, db: Session = Depends(get_db)):
123 db_programme = crud.get_timeseries_programme(db, name=name)
124 return db_programme
127@router.get('/timeseries/request_timeseries_list_of_contributors/{rid}', response_model=Union[str,List[schemas.Contributors]])
128def get_registered_request_contributors(rid: str, format: str = 'text', db: Session = Depends(get_db)):
129 contributors = crud.get_registered_request_contributors(db, rid=rid, format=format)
130 return contributors
133@router.post('/timeseries/register_timeseries_list_of_contributors/{rid}')
134def register_request_contributors(rid: str,
135 ids: List[int],
136 access: dict = Depends(get_register_contributors_access_rights),
137 db: Session = Depends(get_db)):
138 return crud.register_request_contributors(db, rid=rid, ids=ids)
141@router.post('/timeseries/request_contributors', response_model=Union[str,List[schemas.Contributors]])
142def get_request_contributors(format: str = 'text', file: UploadFile = File(...), db: Session = Depends(get_db)):
143 contributors = crud.get_request_contributors(db, format=format, input_handle=file)
144 return contributors
147# post and patch only via authorization by Helmholtz AAI
149# 2. create
151#problems with Roles!!! prelimarily return TimeseriesPatch (instead of Timeseries!)
152#@router.post('/timeseries/', response_model=schemas.TimeseriesPatch)
153#@router.post('/timeseries/', response_model=schemas.Timeseries)
154#@do not return object, just some information (see stationmeta)
155@router.post('/timeseries/')
156def create_timeseries(timeseries: schemas.TimeseriesCreate = Body(..., embed = True),
157 access: dict = Depends(get_admin_access_rights),
158 db: Session = Depends(get_db)):
159 # check whether the post is authorized (401: Unauthorized)
160 if access['status_code'] == 200:
161 return crud.create_timeseries(db=db, timeseries=timeseries,
162 author_id=access['auth_user_id'])
163 else:
164 raise HTTPException(status_code=401, detail="Unauthorized.")
167#@router.patch('/timeseries/id/{timeseries_id}', response_model=schemas.TimeseriesPatch)
168@router.patch('/timeseries/id/{timeseries_id}')
169def patch_timeseries(timeseries_id: int,
170 description: str = None,
171 timeseries: schemas.TimeseriesPatch = Body(..., embed = True),
172 access: dict = Depends(get_timeseries_md_change_access_rights),
173 db: Session = Depends(get_db)):
174 # check whether the patch is authorized (401: Unauthorized)
175 if access['status_code'] == 200:
176 # check, if description has been given in query_params!
177 if not description:
178 raise HTTPException(status_code=404, detail="description text ist missing.")
179 db_timeseries = crud.get_timeseries(db, timeseries_id=timeseries_id)
180 if db_timeseries is None:
181 raise HTTPException(status_code=404, detail="Time series for patching not found.")
182 return crud.patch_timeseries(db=db, description=description,
183 timeseries_id=db_timeseries.id, timeseries=timeseries,
184 author_id=access['auth_user_id'])
185 else:
186 raise HTTPException(status_code=401, detail="Unauthorized.")