Coverage for toardb/data/data.py: 66%
131 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH
2# SPDX-License-Identifier: MIT
4"""
5Simple test API for data management
6"""
8import json
9from typing import List
10from fastapi import APIRouter, Depends, HTTPException, \
11 File, UploadFile, Request
12from sqlalchemy.orm import Session
13from sqlalchemy.engine import Engine
14from . import crud, schemas
15from toardb.utils.database import ToarDbSession, engine, get_engine, get_db
16from toardb.utils.utils import (
17 get_data_download_access_rights,
18 get_map_data_download_access_rights,
19 get_admin_access_rights,
20 get_data_change_access_rights
21)
22from toardb.utils.settings import request_limitations as limits
23import datetime as dt
25router = APIRouter()
27# CRUD: create, retrieve, update, delete
29# 1. retrieve
31#get all data of one timeseries
32@router.get('/data/timeseries/{timeseries_id}', response_model=schemas.Composite, response_model_exclude_unset=True, response_model_exclude_none=True)
33def get_data(timeseries_id: int, request: Request,
34 access: dict = Depends(get_data_download_access_rights),
35 db: Session = Depends(get_db)):
36 # check whether the user is sending the request via the dashboard
37 if not access['lfromdashboard']:
38 if access['status_code'] == 200:
39 if access['role'] == 'anonymous' and not limits['anonymous']['min_tsid'] <= timeseries_id <= limits['anonymous']['max_tsid']:
40 raise HTTPException(status_code=401, detail=f"Anonymous access only to timeseries within the ID range of {limits['anonymous']['min_tsid']} to {limits['anonymous']['max_tsid']}.")
41 else:
42 raise HTTPException(status_code=401, detail="Unauthorized.")
44 db_data = crud.get_data(db, timeseries_id=timeseries_id, path_params=request.path_params, query_params=request.query_params)
45 if db_data is None:
46 raise HTTPException(status_code=404, detail="Data not found.")
47 return db_data
50#get all data of one timeseries
51@router.get('/data/timeseries/id/{timeseries_id}', response_model=schemas.Composite, response_model_exclude_unset=True, response_model_exclude_none=True)
52def get_data2(timeseries_id: int, request: Request,
53 access: dict = Depends(get_data_download_access_rights),
54 db: Session = Depends(get_db)):
55 if access['status_code'] == 200:
56 if access['role'] == 'anonymous' and not limits['anonymous']['min_tsid'] <= timeseries_id <= limits['anonymous']['max_tsid']:
57 raise HTTPException(status_code=401, detail=f"Anonymous access only to timeseries within the ID range of {limits['anonymous']['min_tsid']} to {limits['anonymous']['max_tsid']}.")
59 db_data = crud.get_data(db, timeseries_id=timeseries_id, path_params=request.path_params, query_params=request.query_params)
60 if db_data is None:
61 raise HTTPException(status_code=404, detail="Data not found.")
62 return db_data
63 else:
64 raise HTTPException(status_code=401, detail="Unauthorized.")
67#get all data of one timeseries (including staging data)
68@router.get('/data/timeseries_with_staging/id/{timeseries_id}', response_model=schemas.Composite, response_model_exclude_unset=True, response_model_exclude_none=True)
69def get_data_with_staging(timeseries_id: int, flags: str = None, format: str = 'json',
70 access: dict = Depends(get_data_download_access_rights),
71 db: Session = Depends(get_db)):
72 if access['status_code'] == 200:
73 if access['role'] == 'anonymous' and not limits['anonymous']['min_tsid'] <= timeseries_id <= limits['anonymous']['max_tsid']:
74 raise HTTPException(status_code=401, detail=f"Anonymous access only to timeseries within the ID range of {limits['anonymous']['min_tsid']} to {limits['anonymous']['max_tsid']}.")
76 db_data = crud.get_data_with_staging(db, timeseries_id=timeseries_id, flags=flags, format=format)
77 if db_data is None:
78 raise HTTPException(status_code=404, detail="Data not found.")
79 return db_data
80 else:
81 raise HTTPException(status_code=401, detail="Unauthorized.")
84#get map data (for a special variable and timestamp)
85@router.get('/data/map/')
86def get_map_data(variable_id: int = 5, daterange: str = '2023-02-22 12:00,2023-02-22 12:00',
87 access: dict = Depends(get_map_data_download_access_rights),
88 db: Session = Depends(get_db)):
89 if access['status_code'] == 200:
90 db_data = crud.get_map_data(db, variable_id=variable_id, daterange=daterange)
91 if db_data is None:
92 raise HTTPException(status_code=404, detail="Data not found.")
93 return db_data
94 else:
95 raise HTTPException(status_code=401, detail="Unauthorized.")
98#get the next available version for one timeseries
99@router.get('/data/timeseries/next_version/{timeseries_id}')
100def get_version(timeseries_id: int, request: Request, db: Session = Depends(get_db)):
101 version = crud.get_next_version(db, timeseries_id=timeseries_id, path_params=request.path_params, query_params=request.query_params)
102 return version
105@router.get('/data/timeseries_merged/', response_model=schemas.Composite, response_model_exclude_unset=True, response_model_exclude_none=True)
106def get_merged_data(variable_id: int, station_code: str, request: Request, daterange: str = None,
107 access: dict = Depends(get_data_download_access_rights),
108 db: Session = Depends(get_db)):
109 # check whether the user is sending the request via the dashboard
110 if not access['lfromdashboard']:
111 if access['status_code'] == 200:
112 if access['role'] == 'anonymous':
113 merging_list = crud.get_merging_list(db, variable_id=variable_id, station_code=station_code, daterange=daterange)
114 # gather all timeseries_ids involved in this request
115 ts_ids = set()
116 if merging_list[0] != []:
117 [ts_ids.add(ts_id[2]) for ts_id in merging_list[0]]
118 if merging_list[1] != []:
119 [ts_ids.add(ts_id) for ts_id in merging_list[1]]
120 min_request_id = -1 if not ts_ids else min(ts_ids)
121 max_request_id = -1 if not ts_ids else max(ts_ids)
122 if (not limits['anonymous']['min_tsid'] <= min_request_id or
123 not max_request_id <= limits['anonymous']['max_tsid']):
124 raise HTTPException(status_code=401, detail=f"Anonymous access only to timeseries within the ID range of {limits['anonymous']['min_tsid']} to {limits['anonymous']['max_tsid']}.")
125 else:
126 raise HTTPException(status_code=401, detail="Unauthorized.")
128 db_data = crud.get_merged_data(db, variable_id=variable_id, station_code=station_code, path_params=request.path_params, query_params=request.query_params)
129 if db_data is None:
130 raise HTTPException(status_code=404, detail="Data not found.")
131 return db_data
134@router.get('/data/get_merging_list/')
135def get_merging_list(variable_id: int, station_code: str, daterange: str, db: Session = Depends(get_db)):
136 return crud.get_merging_list(db, variable_id=variable_id, station_code=station_code, daterange=daterange)
139#get map data (for a special variable and timestamp)
140@router.get('/data/map/')
141def get_map_data(variable_id: int = 5, daterange: str = '2023-02-22 12:00,2023-02-22 12:00', db: Session = Depends(get_db)):
142 db_data = crud.get_map_data(db, variable_id=variable_id, daterange=daterange)
143 if db_data is None:
144 raise HTTPException(status_code=404, detail="Data not found.")
145 return db_data
148# post and patch only via authorization by Helmholtz AAI
150# 2. create
152@router.post('/data/timeseries/')
153def create_data(file: UploadFile = File(...),
154 toarqc_config_type: str = 'standard',
155 dry_run: bool = False,
156 force: bool = False,
157 access: dict = Depends(get_admin_access_rights),
158 db: Session = Depends(get_db),
159 engine: Engine = Depends(get_engine)):
160# # the next three lines are automatically done by database management,
161# # but we do want helpful error messages!
162# db_data = crud.get_data_by_datetime_and_timeseriesid(db, datetime=data.datetime, timeseries_id=data.timeseries_id)
163# if db_data:
164# raise HTTPException(status_code=400, detail="Data already registered.")
166# BUT:
167# we want to upload a whole file!
168#
169 # check whether the post is authorized (401: Unauthorized)
170 if access['status_code'] == 200:
171 response = crud.create_data(db, engine, author_id=access['auth_user_id'], input_handle=file,
172 toarqc_config_type=toarqc_config_type, dry_run=dry_run, force=force)
173 if response.status_code != 200 and response.status_code != 446:
174 msg = json.loads(response.body.decode('utf-8'))
175 # try to parse error messages from DBS (to be more understandable)
176 msg2 = "An error occurred in public.data insertion: <class 'psycopg2.errors.UniqueViolation'>"
177 if (msg == msg2):
178 msg = 'Data for timeseries already registered.'
179 raise HTTPException(status_code=400, detail=msg)
180 return response
181 else:
182 raise HTTPException(status_code=401, detail="Unauthorized.")
185@router.post('/data/timeseries/bulk/')
186def create_bulk_data(bulk: List[schemas.DataCreate],
187 toarqc_config_type: str = 'standard',
188 dry_run: bool = False,
189 force: bool = False,
190 access: dict = Depends(get_admin_access_rights),
191 db: Session = Depends(get_db),
192 engine: Engine = Depends(get_engine)):
193 # check whether the post is authorized (401: Unauthorized)
194 if access['status_code'] == 200:
195 response = crud.create_bulk_data(db, engine, bulk=bulk,
196 author_id=access['auth_user_id'], toarqc_config_type=toarqc_config_type, dry_run=dry_run,
197 force=force)
198 if response.status_code != 200:
199 msg = response.body.decode('utf-8')
200 # try to parse error messages from DBS (to be more understandable)
201 msg2 = '"An error occurred in public.data insertion: <class \'psycopg2.errors.UniqueViolation\'>"'
202 if (msg == msg2):
203 msg = 'Data for timeseries already registered.'
204 raise HTTPException(status_code=400, detail=msg)
205 return response
206 else:
207 raise HTTPException(status_code=401, detail="Unauthorized.")
210@router.post('/data/timeseries/record/')
211def create_data_record(series_id: int,
212 datetime: dt.datetime,
213 value: float,
214 flag: str,
215 version: str = None,
216 suppress_unique_violation: bool = False,
217 access: dict = Depends(get_admin_access_rights),
218 db: Session = Depends(get_db),
219 engine: Engine = Depends(get_engine)):
220 # check whether the post is authorized (401: Unauthorized)
221 if access['status_code'] == 200:
222 response = crud.create_data_record(db, engine, series_id=series_id, datetime=datetime,
223 value=value, flag=flag, version=version,
224 author_id=access['auth_user_id'])
225 if response.status_code != 200:
226 msg = response.body.decode('utf-8')
227 # try to parse error messages from DBS (to be more understandable)
228 msg2 = '"An error occurred in data insertion: <class \'psycopg2.errors.UniqueViolation\'>"'
229 if (msg == msg2):
230 if not suppress_unique_violation:
231 msg = 'Data for timeseries already registered.'
232 raise HTTPException(status_code=400, detail=msg)
233 else:
234 raise HTTPException(status_code=400, detail=msg)
235 return response
236 else:
237 raise HTTPException(status_code=401, detail="Unauthorized.")
239# 3. update
241@router.patch('/data/timeseries/')
242def patch_data(description: str,
243 version: str,
244 file: UploadFile = File(...),
245 toarqc_config_type: str = 'standard',
246 force: bool = False,
247 access: dict = Depends(get_data_change_access_rights),
248 db: Session = Depends(get_db),
249 engine: Engine = Depends(get_engine)):
250 # check whether the patch is authorized (401: Unauthorized)
251 if access['status_code'] == 200:
252 return crud.patch_data(db, engine, description=description, version=version, toarqc_config_type=toarqc_config_type,
253 force=force, author_id=access['auth_user_id'], input_handle=file)
254 else:
255 raise HTTPException(status_code=401, detail="Unauthorized.")
257@router.patch('/data/timeseries/bulk/')
258def patch_bulk_data(description: str,
259 version: str,
260 bulk: List[schemas.DataPatch],
261 toarqc_config_type: str = 'standard',
262 force: bool = False,
263 no_archive: bool = False,
264 access: dict = Depends(get_data_change_access_rights),
265 db: Session = Depends(get_db),
266 engine: Engine = Depends(get_engine)):
267 # check whether the patch is authorized (401: Unauthorized)
268 if access['status_code'] == 200:
269 return crud.patch_bulk_data(db, engine, description=description, version=version, bulk=bulk, author_id=access['auth_user_id'],
270 toarqc_config_type=toarqc_config_type, force=force, no_archive=no_archive)
271 else:
272 raise HTTPException(status_code=401, detail="Unauthorized.")