Coverage for toardb/timeseries/timeseries.py: 91%

98 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 20:32 +0000

1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH 

2# SPDX-License-Identifier: MIT 

3 

4""" 

5Simple test API for timeseries management 

6""" 

7 

8import re 

9import datetime as dt 

10from typing import List, Union 

11from fastapi import APIRouter, Depends, HTTPException, Body, Request, File, UploadFile 

12from fastapi.responses import JSONResponse 

13from sqlalchemy.orm import Session 

14from sqlalchemy.inspection import inspect 

15from starlette.datastructures import QueryParams 

16import urllib.parse 

17from . import crud, schemas 

18from toardb.utils.database import ToarDbSession, get_db 

19from toardb.stationmeta.crud import get_stationmeta_by_id 

20from toardb.variables.crud import get_variable 

21from toardb.contacts.crud import get_contact 

22from toardb.timeseries.models import Timeseries 

23from toardb.stationmeta.models import StationmetaCore, StationmetaGlobal 

24from toardb.data.models import Data 

25from toardb.utils.utils import ( 

26 get_str_from_value, 

27 get_admin_access_rights, 

28 get_timeseries_md_change_access_rights, 

29 get_register_contributors_access_rights 

30) 

31 

32 

33router = APIRouter() 

34 

35 

36def get_query_params(raw_query_string): 

37 updated_query_string = raw_query_string.replace('+', '%2B') 

38 updated_params = dict(urllib.parse.parse_qsl(updated_query_string)) 

39 return QueryParams(updated_params) 

40 

41# CRUD: create, retrieve, update, delete 

42# 1. retrieve 

43#get all entries of table timeseries 

44@router.get('/search/', response_model=List[schemas.Timeseries] | List[schemas.TimeseriesFields], response_model_exclude_none=True, response_model_exclude_unset=True) 

45def search_all_timeseries(request: Request, db: Session = Depends(get_db)): 

46 updated_query_params = get_query_params(request.url.query) 

47 return crud.search_all(db, path_params=request.path_params, query_params=updated_query_params) 

48 

49 

50# Does not make sense when only certain fields are selected in different requests needs different syntax 

51# order of concatenated statements is also unclear 

52@router.get('/search/a', response_model=List[schemas.Timeseries] | List[schemas.TimeseriesFields], response_model_exclude_none=True, response_model_exclude_unset=True) 

53def search_all_timeseries_aggregations(request: Request, db: Session = Depends(get_db)): 

54 urls = re.split(r"(?=[+-]\D)", "+" + request.url.query)[1:] 

55 if urls: 

56 signs = [url.startswith("+") for url in urls] 

57 query_params = [get_query_params(url[1:]) for url in urls] 

58 return crud.search_all_aggregation( 

59 db, path_params=request.path_params, signs=signs, query_params_list=query_params 

60 ) 

61 else: 

62 return search_all_timeseries(request, db) 

63 

64 

65#get all entries of table timeseries 

66@router.get('/timeseries/', response_model=List[schemas.Timeseries], response_model_exclude_none=True) 

67def get_all_timeseries(request: Request, db: Session = Depends(get_db)): 

68 raw_query_string = request.url.query 

69 updated_query_string = raw_query_string.replace('+', '%2B') 

70 updated_params = dict(urllib.parse.parse_qsl(updated_query_string)) 

71 updated_query_params = QueryParams(updated_params) 

72 return crud.search_all(db, path_params=request.path_params, query_params=updated_query_params,lts=True) 

73 

74 

75#get all metadata of one timeseries (known its ID) 

76@router.get('/timeseries/{timeseries_id}', response_model=schemas.Timeseries, response_model_exclude_none=True, response_model_exclude_unset=True) 

77def get_timeseries(timeseries_id: int, fields: str = None, db: Session = Depends(get_db)): 

78 db_timeseries = crud.get_timeseries(db, timeseries_id=timeseries_id, fields=fields) 

79 if db_timeseries is None: 

80 raise HTTPException(status_code=404, detail="Timeseries not found.") 

81 return db_timeseries 

82 

83 

84#get all metadata of one timeseries (known its ID) 

85@router.get('/timeseries/id/{timeseries_id}', response_model=schemas.Timeseries, response_model_exclude_none=True, response_model_exclude_unset=True) 

86def get_timeseries2(timeseries_id: int, db: Session = Depends(get_db)): 

87 db_timeseries = crud.get_timeseries(db, timeseries_id=timeseries_id) 

88 if db_timeseries is None: 

89 raise HTTPException(status_code=404, detail="Timeseries not found.") 

90 return db_timeseries 

91 

92 

93#build citation text of one timeseries (known its ID) 

94@router.get('/timeseries/citation/{timeseries_id}') 

95def get_citation(timeseries_id: int, datetime: dt.datetime = None, db: Session = Depends(get_db)): 

96 citation = crud.get_citation(db, timeseries_id=timeseries_id, datetime=datetime) 

97 return citation 

98 

99 

100#get all metadata of one timeseries (known its unique label) 

101@router.get('/timeseries/unique/') 

102def get_timeseries(station_id: int, variable_id: int, resource_provider: str = None, 

103 sampling_frequency: str = None, provider_version: str = None, data_origin_type: str = None, 

104 data_origin: str = None, sampling_height: float = None, label: str = None, 

105 db: Session = Depends(get_db)): 

106 db_timeseries = crud.get_timeseries_by_unique_constraints(db, station_id=station_id, 

107 variable_id=variable_id, resource_provider=resource_provider, 

108 sampling_frequency=sampling_frequency, provider_version=provider_version, data_origin_type=data_origin_type, 

109 data_origin= data_origin, sampling_height=sampling_height, label=label) 

110 if db_timeseries is None: 

111 raise HTTPException(status_code=404, detail="Timeseries not found.") 

112 return db_timeseries 

113 

114 

115@router.get('/timeseries_changelog/{timeseries_id}', response_model=List[schemas.TimeseriesChangelog]) 

116def get_timeseries_changelog(timeseries_id: int, db: Session = Depends(get_db)): 

117 db_changelog = crud.get_timeseries_changelog(db, timeseries_id=timeseries_id) 

118 return db_changelog 

119 

120 

121@router.get('/timeseries_programme/{name}', response_model=List[schemas.TimeseriesProgramme]) 

122def get_timeseries_programme(name: str, db: Session = Depends(get_db)): 

123 db_programme = crud.get_timeseries_programme(db, name=name) 

124 return db_programme 

125 

126 

127@router.get('/timeseries/request_timeseries_list_of_contributors/{rid}', response_model=Union[str,List[schemas.Contributors]]) 

128def get_registered_request_contributors(rid: str, format: str = 'text', db: Session = Depends(get_db)): 

129 contributors = crud.get_registered_request_contributors(db, rid=rid, format=format) 

130 return contributors 

131 

132 

133@router.post('/timeseries/register_timeseries_list_of_contributors/{rid}') 

134def register_request_contributors(rid: str, 

135 ids: List[int], 

136 access: dict = Depends(get_register_contributors_access_rights), 

137 db: Session = Depends(get_db)): 

138 return crud.register_request_contributors(db, rid=rid, ids=ids) 

139 

140 

141@router.post('/timeseries/request_contributors', response_model=Union[str,List[schemas.Contributors]]) 

142def get_request_contributors(format: str = 'text', file: UploadFile = File(...), db: Session = Depends(get_db)): 

143 contributors = crud.get_request_contributors(db, format=format, input_handle=file) 

144 return contributors 

145 

146 

147# post and patch only via authorization by Helmholtz AAI 

148 

149# 2. create 

150 

151#problems with Roles!!! prelimarily return TimeseriesPatch (instead of Timeseries!) 

152#@router.post('/timeseries/', response_model=schemas.TimeseriesPatch) 

153#@router.post('/timeseries/', response_model=schemas.Timeseries) 

154#@do not return object, just some information (see stationmeta) 

155@router.post('/timeseries/') 

156def create_timeseries(timeseries: schemas.TimeseriesCreate = Body(..., embed = True), 

157 access: dict = Depends(get_admin_access_rights), 

158 db: Session = Depends(get_db)): 

159 # check whether the post is authorized (401: Unauthorized) 

160 if access['status_code'] == 200: 

161 return crud.create_timeseries(db=db, timeseries=timeseries, 

162 author_id=access['auth_user_id']) 

163 else: 

164 raise HTTPException(status_code=401, detail="Unauthorized.") 

165 

166 

167#@router.patch('/timeseries/id/{timeseries_id}', response_model=schemas.TimeseriesPatch) 

168@router.patch('/timeseries/id/{timeseries_id}') 

169def patch_timeseries(timeseries_id: int, 

170 description: str = None, 

171 timeseries: schemas.TimeseriesPatch = Body(..., embed = True), 

172 access: dict = Depends(get_timeseries_md_change_access_rights), 

173 db: Session = Depends(get_db)): 

174 # check whether the patch is authorized (401: Unauthorized) 

175 if access['status_code'] == 200: 

176 # check, if description has been given in query_params! 

177 if not description: 

178 raise HTTPException(status_code=404, detail="description text ist missing.") 

179 db_timeseries = crud.get_timeseries(db, timeseries_id=timeseries_id) 

180 if db_timeseries is None: 

181 raise HTTPException(status_code=404, detail="Time series for patching not found.") 

182 return crud.patch_timeseries(db=db, description=description, 

183 timeseries_id=db_timeseries.id, timeseries=timeseries, 

184 author_id=access['auth_user_id']) 

185 else: 

186 raise HTTPException(status_code=401, detail="Unauthorized.") 

187