Coverage for toardb/toardb.py: 88%
171 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH
2# SPDX-License-Identifier: MIT
4"""
5API for TOAR-II database
6"""
8from collections import namedtuple
9from fastapi.responses import Response
10import io
11import functools
12import json
13import pandas as pd
15from pydantic import BaseSettings
16from typing import List
17from fastapi import FastAPI, Depends, HTTPException, APIRouter, \
18 Request
19from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
20from fastapi.staticfiles import StaticFiles
21from sqlalchemy.orm import Session
22from starlette.responses import FileResponse
23from fastapi.templating import Jinja2Templates
25from pyinstrument import Profiler
26from pyinstrument.renderers.html import HTMLRenderer
27from pyinstrument.renderers.speedscope import SpeedscopeRenderer
28from pathlib import Path
29import time
31from toardb.utils.database import ToarDbSession, engine, get_db
32from toardb.utils.settings import base_url
33from toardb.utils.utils import normalize_metadata
34from toardb.auth_user import auth_user
35from toardb.variables import variables
36from toardb.contacts import contacts
37from toardb.stationmeta import stationmeta, models
38from toardb.timeseries import timeseries
39from toardb.data import data
41RC_vocabulary = 0
42RS_vocabulary = 0
43AK_vocabulary = 0
44OK_vocabulary = 0
45SF_vocabulary = 0
46AT_vocabulary = 0
47OT_vocabulary = 0
48DO_vocabulary = 0
49CZ_vocabulary = 0
50CV_vocabulary = 0
51CN_vocabulary = 0
52TZ_vocabulary = 0
53ST_vocabulary = 0
54TA_vocabulary = 0
55TC_vocabulary = 0
56TR_vocabulary = 0
57LC_vocabulary = 0
58ER_vocabulary = 0
59RT_vocabulary = 0
60DF_vocabulary = 0
61CL_vocabulary = 0
62CS_vocabulary = 0
63KS_vocabulary = 0
64CT_vocabulary = 0
65controlled_fields = 0
67app = FastAPI()
68app.mount("/static", StaticFiles(directory="static"), name="_static")
69templates = Jinja2Templates(directory="templates")
72# check more on https://pyinstrument.readthedocs.io/en/latest/guide.html#profile-a-web-request-in-fastapi
73@app.middleware("http")
74async def profile_request(request: Request, call_next):
75 profile_type_to_ext = {"html": "html", "json": "json"}
76 profile_type_to_renderer = {
77 "html": HTMLRenderer,
78 "json": SpeedscopeRenderer,
79 }
80 if request.query_params.get("profile", False): # pragma: no cover
81 current_dir = Path(__file__).parent
82 profile_type = request.query_params.get("profile_format", "html")
83 with Profiler(interval=0.001, async_mode="enabled") as profiler:
84 response = await call_next(request)
85 ext = profile_type_to_ext[profile_type]
86 renderer = profile_type_to_renderer[profile_type]()
87 with open(current_dir / f"../profile.{ext}", "a") as outfile:
88 outfile.write(profiler.output(renderer=renderer))
89 return response
90 return await call_next(request)
93# check more on https://fastapi.tiangolo.com/tutorial/middleware/
94@app.middleware("http")
95async def add_process_time_header(request: Request, call_next):
96 if request.query_params.get("timing", False): # pragma: no cover
97 current_dir = Path(__file__).parent
98 start_time = time.time()
99 response = await call_next(request)
100 with open(current_dir / f"../timing.txt", "a") as outfile:
101 outfile.write("{} s: {}\n".format(time.time() - start_time, request.url))
102 return response
103 return await call_next(request)
106@app.middleware("http")
107async def response_to_csv(request: Request, call_next):
108 response = await call_next(request)
109 if ((response.status_code != 200) or
110 (request['path'].startswith(('/data/timeseries_merged/','/data/timeseries/','/data/timeseries_with_staging/','/ontology/','/controlled_vocabulary/'))) or
111 (request.query_params.get('format') != 'csv')):
112 return response
113 # from: https://stackoverflow.com/a/71883126
114 response_body = b""
115 async for chunk in response.body_iterator:
116 response_body += chunk
117 list_response = json.loads(response_body)
118 if isinstance(list_response,dict):
119 list_response = [ list_response ]
120 metadata = pd.DataFrame([ normalize_metadata(resp_object) for resp_object in list_response ])
121 return Response(content=metadata.to_csv(index=False), status_code=response.status_code, media_type="text/csv")
123# add endpoints
124# additional yaml version of openapi.json
125@app.get('/openapi.yaml', include_in_schema=False)
126@functools.lru_cache()
127def read_openapi_yaml() -> Response:
128 openapi_json= app.openapi()
129 yaml_s = io.StringIO()
130 yaml.dump(openapi_json, yaml_s)
131 return Response(yaml_s.getvalue(), media_type='text/yaml')
134@app.get('/', response_class=HTMLResponse)
135def show_api(request: Request):
136 message = None
137 return templates.TemplateResponse("TOARDB_FASTAPI_Rest.html", {"request": request,
138 'message': message})
141@app.get("/ontology")
142def read_onto(format: str = 'xml'):
143 if format == 'owldoc':
144 return RedirectResponse(base_url + "/documentation/ontologies/v1.0/index.html")
145 else:
146 return FileResponse('static/ontology.xml')
149@app.get("/controlled_vocabulary/")
150def info():
151 controlled_vocabulary = {
152 "Role Code": RC_vocabulary,
153 "Role Status": RS_vocabulary,
154 "Kind Of Annotation": AK_vocabulary,
155 "Kind Of Organization": OK_vocabulary,
156 "Sampling Frequency": SF_vocabulary,
157 "Aggregation Type": AT_vocabulary,
158 "Data Origin Type": OT_vocabulary,
159 "Data Origin": DO_vocabulary,
160 "Climatic Zone 2019": CZ_vocabulary,
161 "Country Code": CN_vocabulary,
162 "Timezone": TZ_vocabulary,
163 "Station Coordinate Validity": CV_vocabulary,
164 "Station Type": ST_vocabulary,
165 "Station Type Of Area": TA_vocabulary,
166 "Station TOAR Category": TC_vocabulary,
167 "Station HTAP Region": TR_vocabulary,
168 "Station Landcover Type": LC_vocabulary,
169 "Station ECO Region Type": ER_vocabulary,
170 "Result Type": RT_vocabulary,
171 "Data Flag": DF_vocabulary,
172 "Type Of Change": CL_vocabulary,
173 "Absorption Cross Section": CS_vocabulary,
174 "Sampling Type": KS_vocabulary,
175 "Calibration Type": CT_vocabulary,
176 }
177 return controlled_vocabulary
179@app.get("/controlled_vocabulary/{name}")
180def info(name: str):
181 controlled_vocabulary = {
182 "role code": RC_vocabulary,
183 "role status": RS_vocabulary,
184 "kind of annotation": AK_vocabulary,
185 "kind of organization": OK_vocabulary,
186 "sampling frequency": SF_vocabulary,
187 "aggregation type": AT_vocabulary,
188 "data origin": DO_vocabulary,
189 "data origin type": OT_vocabulary,
190 "climatic zone 2019": CZ_vocabulary,
191 "coordinate validity": CV_vocabulary,
192 "country code": CN_vocabulary,
193 "timezone": TZ_vocabulary,
194 "station coordinate validity": CV_vocabulary,
195 "station type": ST_vocabulary,
196 "station type of area": TA_vocabulary,
197 "station toar category": TC_vocabulary,
198 "station htap region": TR_vocabulary,
199 "station landcover type": LC_vocabulary,
200 "station eco region type": ER_vocabulary,
201 "result type": RT_vocabulary,
202 "data flag": DF_vocabulary,
203 "type of change": CL_vocabulary,
204 "absorption cross section": CS_vocabulary,
205 "sampling type": KS_vocabulary,
206 "calibration type": CT_vocabulary,
207 }
208 if name.lower() in controlled_vocabulary.keys():
209 return controlled_vocabulary[name.lower()]
210 else:
211 status_code=200
212 message = f"No controlled vocabulary found for '{name}'"
213 return JSONResponse(status_code=status_code, content=message)
215@app.get("/database_statistics/")
216def stats_info():
217 with open("static/db_statistics.json") as f:
218 db_stats=json.load(f)
219 return db_stats
221@app.get('/database_statistics/{name}')
222def stats_info(name: str):
223 with open("static/db_statistics.json") as f:
224 db_stats=json.load(f)
225 return db_stats[name]
227@app.get("/geopeas_urls/")
228def geo_info(db: Session = Depends(get_db)):
229 res = db.query(models.StationmetaGlobalService).all()
230 return res
233# Dependency
234def get_db():
235 try:
236 db = ToarDbSession()
237 yield db
238 finally:
239 db.close()
241app.include_router(auth_user.router)
242app.include_router(variables.router)
243app.include_router(contacts.router)
244app.include_router(stationmeta.router)
245app.include_router(timeseries.router)
246app.include_router(data.router)
248# get the controlled vocabulary from table
249def __get_enum_dict(connection, table_name):
250 res = connection.execute("select * from "+table_name+" order by enum_val")
251 Enumdict=namedtuple("Dict",["value","string","display_str"])
252 enum_dict = []
253 for entry in res:
254 enum_dict.append(Enumdict(*entry))
255 return enum_dict
257@app.on_event("startup")
258# will be executed before application *starts*
259# ==> again: at this point no database connection available!
260# (and also all tables are unknown)
261def startup_event():
263 global RC_vocabulary
264 global RS_vocabulary
265 global AK_vocabulary
266 global OK_vocabulary
267 global SF_vocabulary
268 global AT_vocabulary
269 global OT_vocabulary
270 global DO_vocabulary
271 global CZ_vocabulary
272 global CV_vocabulary
273 global CN_vocabulary
274 global TZ_vocabulary
275 global ST_vocabulary
276 global TA_vocabulary
277 global TC_vocabulary
278 global TR_vocabulary
279 global LC_vocabulary
280 global ER_vocabulary
281 global RT_vocabulary
282 global DF_vocabulary
283 global CL_vocabulary
284 global CS_vocabulary
285 global KS_vocabulary
286 global CT_vocabulary
287 global controlled_fields
289 with engine.begin() as connection:
290 RC_vocabulary = __get_enum_dict(connection, "rc_vocabulary")
291 RS_vocabulary = __get_enum_dict(connection, "rs_vocabulary")
292 AK_vocabulary = __get_enum_dict(connection, "ak_vocabulary")
293 OK_vocabulary = __get_enum_dict(connection, "ok_vocabulary")
294 SF_vocabulary = __get_enum_dict(connection, "sf_vocabulary")
295 AT_vocabulary = __get_enum_dict(connection, "at_vocabulary")
296 OT_vocabulary = __get_enum_dict(connection, "ot_vocabulary")
297 DO_vocabulary = __get_enum_dict(connection, "do_vocabulary")
298 CZ_vocabulary = __get_enum_dict(connection, "cz_vocabulary")
299 CV_vocabulary = __get_enum_dict(connection, "cv_vocabulary")
300 CN_vocabulary = __get_enum_dict(connection, "cn_vocabulary")
301 TZ_vocabulary = __get_enum_dict(connection, "tz_vocabulary")
302 ST_vocabulary = __get_enum_dict(connection, "st_vocabulary")
303 TA_vocabulary = __get_enum_dict(connection, "ta_vocabulary")
304 TC_vocabulary = __get_enum_dict(connection, "tc_vocabulary")
305 TR_vocabulary = __get_enum_dict(connection, "tr_vocabulary")
306 LC_vocabulary = __get_enum_dict(connection, "lc_vocabulary")
307 ER_vocabulary = __get_enum_dict(connection, "er_vocabulary")
308 RT_vocabulary = __get_enum_dict(connection, "rt_vocabulary")
309 DF_vocabulary = __get_enum_dict(connection, "df_vocabulary")
310 CL_vocabulary = __get_enum_dict(connection, "cl_vocabulary")
311 CS_vocabulary = __get_enum_dict(connection, "cs_vocabulary")
312 KS_vocabulary = __get_enum_dict(connection, "ks_vocabulary")
313 CT_vocabulary = __get_enum_dict(connection, "ct_vocabulary")
315 # also get information, which database field is using which controlled vocabulary
317 controlled_fields = {}
318 res = connection.execute("SELECT c.conname, c1.conname FROM pg_catalog.pg_constraint c " +
319 "INNER JOIN pg_catalog.pg_constraint c1 ON c.confrelid=c1.conrelid " +
320 "AND c1.conname LIKE '%%enum_val%%' AND c.conname LIKE '%%_fk_%%_voc%%'")
321 for line in res:
322 field, voc = (line[0].split('_fk_'))
323 voc = voc.split('_')[0].upper()
324 controlled_fields[field] = f"{voc}_vocabulary"
325 engine.dispose()