Coverage for toardb/data/data.py: 66%

131 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 20:32 +0000

1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH 

2# SPDX-License-Identifier: MIT 

3 

4""" 

5Simple test API for data management 

6""" 

7 

8import json 

9from typing import List 

10from fastapi import APIRouter, Depends, HTTPException, \ 

11 File, UploadFile, Request 

12from sqlalchemy.orm import Session 

13from sqlalchemy.engine import Engine 

14from . import crud, schemas 

15from toardb.utils.database import ToarDbSession, engine, get_engine, get_db 

16from toardb.utils.utils import ( 

17 get_data_download_access_rights, 

18 get_map_data_download_access_rights, 

19 get_admin_access_rights, 

20 get_data_change_access_rights 

21) 

22from toardb.utils.settings import request_limitations as limits 

23import datetime as dt 

24 

25router = APIRouter() 

26 

27# CRUD: create, retrieve, update, delete 

28 

29# 1. retrieve 

30 

31#get all data of one timeseries 

32@router.get('/data/timeseries/{timeseries_id}', response_model=schemas.Composite, response_model_exclude_unset=True, response_model_exclude_none=True) 

33def get_data(timeseries_id: int, request: Request, 

34 access: dict = Depends(get_data_download_access_rights), 

35 db: Session = Depends(get_db)): 

36 # check whether the user is sending the request via the dashboard 

37 if not access['lfromdashboard']: 

38 if access['status_code'] == 200: 

39 if access['role'] == 'anonymous' and not limits['anonymous']['min_tsid'] <= timeseries_id <= limits['anonymous']['max_tsid']: 

40 raise HTTPException(status_code=401, detail=f"Anonymous access only to timeseries within the ID range of {limits['anonymous']['min_tsid']} to {limits['anonymous']['max_tsid']}.") 

41 else: 

42 raise HTTPException(status_code=401, detail="Unauthorized.") 

43 

44 db_data = crud.get_data(db, timeseries_id=timeseries_id, path_params=request.path_params, query_params=request.query_params) 

45 if db_data is None: 

46 raise HTTPException(status_code=404, detail="Data not found.") 

47 return db_data 

48 

49 

50#get all data of one timeseries 

51@router.get('/data/timeseries/id/{timeseries_id}', response_model=schemas.Composite, response_model_exclude_unset=True, response_model_exclude_none=True) 

52def get_data2(timeseries_id: int, request: Request, 

53 access: dict = Depends(get_data_download_access_rights), 

54 db: Session = Depends(get_db)): 

55 if access['status_code'] == 200: 

56 if access['role'] == 'anonymous' and not limits['anonymous']['min_tsid'] <= timeseries_id <= limits['anonymous']['max_tsid']: 

57 raise HTTPException(status_code=401, detail=f"Anonymous access only to timeseries within the ID range of {limits['anonymous']['min_tsid']} to {limits['anonymous']['max_tsid']}.") 

58 

59 db_data = crud.get_data(db, timeseries_id=timeseries_id, path_params=request.path_params, query_params=request.query_params) 

60 if db_data is None: 

61 raise HTTPException(status_code=404, detail="Data not found.") 

62 return db_data 

63 else: 

64 raise HTTPException(status_code=401, detail="Unauthorized.") 

65 

66 

67#get all data of one timeseries (including staging data) 

68@router.get('/data/timeseries_with_staging/id/{timeseries_id}', response_model=schemas.Composite, response_model_exclude_unset=True, response_model_exclude_none=True) 

69def get_data_with_staging(timeseries_id: int, flags: str = None, format: str = 'json', 

70 access: dict = Depends(get_data_download_access_rights), 

71 db: Session = Depends(get_db)): 

72 if access['status_code'] == 200: 

73 if access['role'] == 'anonymous' and not limits['anonymous']['min_tsid'] <= timeseries_id <= limits['anonymous']['max_tsid']: 

74 raise HTTPException(status_code=401, detail=f"Anonymous access only to timeseries within the ID range of {limits['anonymous']['min_tsid']} to {limits['anonymous']['max_tsid']}.") 

75 

76 db_data = crud.get_data_with_staging(db, timeseries_id=timeseries_id, flags=flags, format=format) 

77 if db_data is None: 

78 raise HTTPException(status_code=404, detail="Data not found.") 

79 return db_data 

80 else: 

81 raise HTTPException(status_code=401, detail="Unauthorized.") 

82 

83 

84#get map data (for a special variable and timestamp) 

85@router.get('/data/map/') 

86def get_map_data(variable_id: int = 5, daterange: str = '2023-02-22 12:00,2023-02-22 12:00', 

87 access: dict = Depends(get_map_data_download_access_rights), 

88 db: Session = Depends(get_db)): 

89 if access['status_code'] == 200: 

90 db_data = crud.get_map_data(db, variable_id=variable_id, daterange=daterange) 

91 if db_data is None: 

92 raise HTTPException(status_code=404, detail="Data not found.") 

93 return db_data 

94 else: 

95 raise HTTPException(status_code=401, detail="Unauthorized.") 

96 

97 

98#get the next available version for one timeseries 

99@router.get('/data/timeseries/next_version/{timeseries_id}') 

100def get_version(timeseries_id: int, request: Request, db: Session = Depends(get_db)): 

101 version = crud.get_next_version(db, timeseries_id=timeseries_id, path_params=request.path_params, query_params=request.query_params) 

102 return version 

103 

104 

105@router.get('/data/timeseries_merged/', response_model=schemas.Composite, response_model_exclude_unset=True, response_model_exclude_none=True) 

106def get_merged_data(variable_id: int, station_code: str, request: Request, daterange: str = None, 

107 access: dict = Depends(get_data_download_access_rights), 

108 db: Session = Depends(get_db)): 

109 # check whether the user is sending the request via the dashboard 

110 if not access['lfromdashboard']: 

111 if access['status_code'] == 200: 

112 if access['role'] == 'anonymous': 

113 merging_list = crud.get_merging_list(db, variable_id=variable_id, station_code=station_code, daterange=daterange) 

114 # gather all timeseries_ids involved in this request 

115 ts_ids = set() 

116 if merging_list[0] != []: 

117 [ts_ids.add(ts_id[2]) for ts_id in merging_list[0]] 

118 if merging_list[1] != []: 

119 [ts_ids.add(ts_id) for ts_id in merging_list[1]] 

120 min_request_id = -1 if not ts_ids else min(ts_ids) 

121 max_request_id = -1 if not ts_ids else max(ts_ids) 

122 if (not limits['anonymous']['min_tsid'] <= min_request_id or 

123 not max_request_id <= limits['anonymous']['max_tsid']): 

124 raise HTTPException(status_code=401, detail=f"Anonymous access only to timeseries within the ID range of {limits['anonymous']['min_tsid']} to {limits['anonymous']['max_tsid']}.") 

125 else: 

126 raise HTTPException(status_code=401, detail="Unauthorized.") 

127 

128 db_data = crud.get_merged_data(db, variable_id=variable_id, station_code=station_code, path_params=request.path_params, query_params=request.query_params) 

129 if db_data is None: 

130 raise HTTPException(status_code=404, detail="Data not found.") 

131 return db_data 

132 

133 

134@router.get('/data/get_merging_list/') 

135def get_merging_list(variable_id: int, station_code: str, daterange: str, db: Session = Depends(get_db)): 

136 return crud.get_merging_list(db, variable_id=variable_id, station_code=station_code, daterange=daterange) 

137 

138 

139#get map data (for a special variable and timestamp) 

140@router.get('/data/map/') 

141def get_map_data(variable_id: int = 5, daterange: str = '2023-02-22 12:00,2023-02-22 12:00', db: Session = Depends(get_db)): 

142 db_data = crud.get_map_data(db, variable_id=variable_id, daterange=daterange) 

143 if db_data is None: 

144 raise HTTPException(status_code=404, detail="Data not found.") 

145 return db_data 

146 

147 

148# post and patch only via authorization by Helmholtz AAI 

149 

150# 2. create 

151 

152@router.post('/data/timeseries/') 

153def create_data(file: UploadFile = File(...), 

154 toarqc_config_type: str = 'standard', 

155 dry_run: bool = False, 

156 force: bool = False, 

157 access: dict = Depends(get_admin_access_rights), 

158 db: Session = Depends(get_db), 

159 engine: Engine = Depends(get_engine)): 

160# # the next three lines are automatically done by database management, 

161# # but we do want helpful error messages! 

162# db_data = crud.get_data_by_datetime_and_timeseriesid(db, datetime=data.datetime, timeseries_id=data.timeseries_id) 

163# if db_data: 

164# raise HTTPException(status_code=400, detail="Data already registered.") 

165 

166# BUT: 

167# we want to upload a whole file! 

168# 

169 # check whether the post is authorized (401: Unauthorized) 

170 if access['status_code'] == 200: 

171 response = crud.create_data(db, engine, author_id=access['auth_user_id'], input_handle=file, 

172 toarqc_config_type=toarqc_config_type, dry_run=dry_run, force=force) 

173 if response.status_code != 200 and response.status_code != 446: 

174 msg = json.loads(response.body.decode('utf-8')) 

175 # try to parse error messages from DBS (to be more understandable) 

176 msg2 = "An error occurred in public.data insertion: <class 'psycopg2.errors.UniqueViolation'>" 

177 if (msg == msg2): 

178 msg = 'Data for timeseries already registered.' 

179 raise HTTPException(status_code=400, detail=msg) 

180 return response 

181 else: 

182 raise HTTPException(status_code=401, detail="Unauthorized.") 

183 

184 

185@router.post('/data/timeseries/bulk/') 

186def create_bulk_data(bulk: List[schemas.DataCreate], 

187 toarqc_config_type: str = 'standard', 

188 dry_run: bool = False, 

189 force: bool = False, 

190 access: dict = Depends(get_admin_access_rights), 

191 db: Session = Depends(get_db), 

192 engine: Engine = Depends(get_engine)): 

193 # check whether the post is authorized (401: Unauthorized) 

194 if access['status_code'] == 200: 

195 response = crud.create_bulk_data(db, engine, bulk=bulk, 

196 author_id=access['auth_user_id'], toarqc_config_type=toarqc_config_type, dry_run=dry_run, 

197 force=force) 

198 if response.status_code != 200: 

199 msg = response.body.decode('utf-8') 

200 # try to parse error messages from DBS (to be more understandable) 

201 msg2 = '"An error occurred in public.data insertion: <class \'psycopg2.errors.UniqueViolation\'>"' 

202 if (msg == msg2): 

203 msg = 'Data for timeseries already registered.' 

204 raise HTTPException(status_code=400, detail=msg) 

205 return response 

206 else: 

207 raise HTTPException(status_code=401, detail="Unauthorized.") 

208 

209 

210@router.post('/data/timeseries/record/') 

211def create_data_record(series_id: int, 

212 datetime: dt.datetime, 

213 value: float, 

214 flag: str, 

215 version: str = None, 

216 suppress_unique_violation: bool = False, 

217 access: dict = Depends(get_admin_access_rights), 

218 db: Session = Depends(get_db), 

219 engine: Engine = Depends(get_engine)): 

220 # check whether the post is authorized (401: Unauthorized) 

221 if access['status_code'] == 200: 

222 response = crud.create_data_record(db, engine, series_id=series_id, datetime=datetime, 

223 value=value, flag=flag, version=version, 

224 author_id=access['auth_user_id']) 

225 if response.status_code != 200: 

226 msg = response.body.decode('utf-8') 

227 # try to parse error messages from DBS (to be more understandable) 

228 msg2 = '"An error occurred in data insertion: <class \'psycopg2.errors.UniqueViolation\'>"' 

229 if (msg == msg2): 

230 if not suppress_unique_violation: 

231 msg = 'Data for timeseries already registered.' 

232 raise HTTPException(status_code=400, detail=msg) 

233 else: 

234 raise HTTPException(status_code=400, detail=msg) 

235 return response 

236 else: 

237 raise HTTPException(status_code=401, detail="Unauthorized.") 

238 

239# 3. update 

240 

241@router.patch('/data/timeseries/') 

242def patch_data(description: str, 

243 version: str, 

244 file: UploadFile = File(...), 

245 toarqc_config_type: str = 'standard', 

246 force: bool = False, 

247 access: dict = Depends(get_data_change_access_rights), 

248 db: Session = Depends(get_db), 

249 engine: Engine = Depends(get_engine)): 

250 # check whether the patch is authorized (401: Unauthorized) 

251 if access['status_code'] == 200: 

252 return crud.patch_data(db, engine, description=description, version=version, toarqc_config_type=toarqc_config_type, 

253 force=force, author_id=access['auth_user_id'], input_handle=file) 

254 else: 

255 raise HTTPException(status_code=401, detail="Unauthorized.") 

256 

257@router.patch('/data/timeseries/bulk/') 

258def patch_bulk_data(description: str, 

259 version: str, 

260 bulk: List[schemas.DataPatch], 

261 toarqc_config_type: str = 'standard', 

262 force: bool = False, 

263 no_archive: bool = False, 

264 access: dict = Depends(get_data_change_access_rights), 

265 db: Session = Depends(get_db), 

266 engine: Engine = Depends(get_engine)): 

267 # check whether the patch is authorized (401: Unauthorized) 

268 if access['status_code'] == 200: 

269 return crud.patch_bulk_data(db, engine, description=description, version=version, bulk=bulk, author_id=access['auth_user_id'], 

270 toarqc_config_type=toarqc_config_type, force=force, no_archive=no_archive) 

271 else: 

272 raise HTTPException(status_code=401, detail="Unauthorized.") 

273