Coverage for toardb/toardb.py: 88%

171 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 20:32 +0000

1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH 

2# SPDX-License-Identifier: MIT 

3 

4""" 

5API for TOAR-II database 

6""" 

7 

8from collections import namedtuple 

9from fastapi.responses import Response 

10import io 

11import functools 

12import json 

13import pandas as pd 

14 

15from pydantic import BaseSettings 

16from typing import List 

17from fastapi import FastAPI, Depends, HTTPException, APIRouter, \ 

18 Request 

19from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse 

20from fastapi.staticfiles import StaticFiles 

21from sqlalchemy.orm import Session 

22from starlette.responses import FileResponse 

23from fastapi.templating import Jinja2Templates 

24 

25from pyinstrument import Profiler 

26from pyinstrument.renderers.html import HTMLRenderer 

27from pyinstrument.renderers.speedscope import SpeedscopeRenderer 

28from pathlib import Path 

29import time 

30 

31from toardb.utils.database import ToarDbSession, engine, get_db 

32from toardb.utils.settings import base_url 

33from toardb.utils.utils import normalize_metadata 

34from toardb.auth_user import auth_user 

35from toardb.variables import variables 

36from toardb.contacts import contacts 

37from toardb.stationmeta import stationmeta, models 

38from toardb.timeseries import timeseries 

39from toardb.data import data 

40 

41RC_vocabulary = 0 

42RS_vocabulary = 0 

43AK_vocabulary = 0 

44OK_vocabulary = 0 

45SF_vocabulary = 0 

46AT_vocabulary = 0 

47OT_vocabulary = 0 

48DO_vocabulary = 0 

49CZ_vocabulary = 0 

50CV_vocabulary = 0 

51CN_vocabulary = 0 

52TZ_vocabulary = 0 

53ST_vocabulary = 0 

54TA_vocabulary = 0 

55TC_vocabulary = 0 

56TR_vocabulary = 0 

57LC_vocabulary = 0 

58ER_vocabulary = 0 

59RT_vocabulary = 0 

60DF_vocabulary = 0 

61CL_vocabulary = 0 

62CS_vocabulary = 0 

63KS_vocabulary = 0 

64CT_vocabulary = 0 

65controlled_fields = 0 

66 

67app = FastAPI() 

68app.mount("/static", StaticFiles(directory="static"), name="_static") 

69templates = Jinja2Templates(directory="templates") 

70 

71 

72# check more on https://pyinstrument.readthedocs.io/en/latest/guide.html#profile-a-web-request-in-fastapi 

73@app.middleware("http") 

74async def profile_request(request: Request, call_next): 

75 profile_type_to_ext = {"html": "html", "json": "json"} 

76 profile_type_to_renderer = { 

77 "html": HTMLRenderer, 

78 "json": SpeedscopeRenderer, 

79 } 

80 if request.query_params.get("profile", False): # pragma: no cover 

81 current_dir = Path(__file__).parent 

82 profile_type = request.query_params.get("profile_format", "html") 

83 with Profiler(interval=0.001, async_mode="enabled") as profiler: 

84 response = await call_next(request) 

85 ext = profile_type_to_ext[profile_type] 

86 renderer = profile_type_to_renderer[profile_type]() 

87 with open(current_dir / f"../profile.{ext}", "a") as outfile: 

88 outfile.write(profiler.output(renderer=renderer)) 

89 return response 

90 return await call_next(request) 

91 

92 

93# check more on https://fastapi.tiangolo.com/tutorial/middleware/ 

94@app.middleware("http") 

95async def add_process_time_header(request: Request, call_next): 

96 if request.query_params.get("timing", False): # pragma: no cover 

97 current_dir = Path(__file__).parent 

98 start_time = time.time() 

99 response = await call_next(request) 

100 with open(current_dir / f"../timing.txt", "a") as outfile: 

101 outfile.write("{} s: {}\n".format(time.time() - start_time, request.url)) 

102 return response 

103 return await call_next(request) 

104 

105 

106@app.middleware("http") 

107async def response_to_csv(request: Request, call_next): 

108 response = await call_next(request) 

109 if ((response.status_code != 200) or 

110 (request['path'].startswith(('/data/timeseries_merged/','/data/timeseries/','/data/timeseries_with_staging/','/ontology/','/controlled_vocabulary/'))) or 

111 (request.query_params.get('format') != 'csv')): 

112 return response 

113 # from: https://stackoverflow.com/a/71883126 

114 response_body = b"" 

115 async for chunk in response.body_iterator: 

116 response_body += chunk 

117 list_response = json.loads(response_body) 

118 if isinstance(list_response,dict): 

119 list_response = [ list_response ] 

120 metadata = pd.DataFrame([ normalize_metadata(resp_object) for resp_object in list_response ]) 

121 return Response(content=metadata.to_csv(index=False), status_code=response.status_code, media_type="text/csv") 

122 

123# add endpoints 

124# additional yaml version of openapi.json 

125@app.get('/openapi.yaml', include_in_schema=False) 

126@functools.lru_cache() 

127def read_openapi_yaml() -> Response: 

128 openapi_json= app.openapi() 

129 yaml_s = io.StringIO() 

130 yaml.dump(openapi_json, yaml_s) 

131 return Response(yaml_s.getvalue(), media_type='text/yaml') 

132 

133 

134@app.get('/', response_class=HTMLResponse) 

135def show_api(request: Request): 

136 message = None 

137 return templates.TemplateResponse("TOARDB_FASTAPI_Rest.html", {"request": request, 

138 'message': message}) 

139 

140 

141@app.get("/ontology") 

142def read_onto(format: str = 'xml'): 

143 if format == 'owldoc': 

144 return RedirectResponse(base_url + "/documentation/ontologies/v1.0/index.html") 

145 else: 

146 return FileResponse('static/ontology.xml') 

147 

148 

149@app.get("/controlled_vocabulary/") 

150def info(): 

151 controlled_vocabulary = { 

152 "Role Code": RC_vocabulary, 

153 "Role Status": RS_vocabulary, 

154 "Kind Of Annotation": AK_vocabulary, 

155 "Kind Of Organization": OK_vocabulary, 

156 "Sampling Frequency": SF_vocabulary, 

157 "Aggregation Type": AT_vocabulary, 

158 "Data Origin Type": OT_vocabulary, 

159 "Data Origin": DO_vocabulary, 

160 "Climatic Zone 2019": CZ_vocabulary, 

161 "Country Code": CN_vocabulary, 

162 "Timezone": TZ_vocabulary, 

163 "Station Coordinate Validity": CV_vocabulary, 

164 "Station Type": ST_vocabulary, 

165 "Station Type Of Area": TA_vocabulary, 

166 "Station TOAR Category": TC_vocabulary, 

167 "Station HTAP Region": TR_vocabulary, 

168 "Station Landcover Type": LC_vocabulary, 

169 "Station ECO Region Type": ER_vocabulary, 

170 "Result Type": RT_vocabulary, 

171 "Data Flag": DF_vocabulary, 

172 "Type Of Change": CL_vocabulary, 

173 "Absorption Cross Section": CS_vocabulary, 

174 "Sampling Type": KS_vocabulary, 

175 "Calibration Type": CT_vocabulary, 

176 } 

177 return controlled_vocabulary 

178 

179@app.get("/controlled_vocabulary/{name}") 

180def info(name: str): 

181 controlled_vocabulary = { 

182 "role code": RC_vocabulary, 

183 "role status": RS_vocabulary, 

184 "kind of annotation": AK_vocabulary, 

185 "kind of organization": OK_vocabulary, 

186 "sampling frequency": SF_vocabulary, 

187 "aggregation type": AT_vocabulary, 

188 "data origin": DO_vocabulary, 

189 "data origin type": OT_vocabulary, 

190 "climatic zone 2019": CZ_vocabulary, 

191 "coordinate validity": CV_vocabulary, 

192 "country code": CN_vocabulary, 

193 "timezone": TZ_vocabulary, 

194 "station coordinate validity": CV_vocabulary, 

195 "station type": ST_vocabulary, 

196 "station type of area": TA_vocabulary, 

197 "station toar category": TC_vocabulary, 

198 "station htap region": TR_vocabulary, 

199 "station landcover type": LC_vocabulary, 

200 "station eco region type": ER_vocabulary, 

201 "result type": RT_vocabulary, 

202 "data flag": DF_vocabulary, 

203 "type of change": CL_vocabulary, 

204 "absorption cross section": CS_vocabulary, 

205 "sampling type": KS_vocabulary, 

206 "calibration type": CT_vocabulary, 

207 } 

208 if name.lower() in controlled_vocabulary.keys(): 

209 return controlled_vocabulary[name.lower()] 

210 else: 

211 status_code=200 

212 message = f"No controlled vocabulary found for '{name}'" 

213 return JSONResponse(status_code=status_code, content=message) 

214 

215@app.get("/database_statistics/") 

216def stats_info(): 

217 with open("static/db_statistics.json") as f: 

218 db_stats=json.load(f) 

219 return db_stats 

220 

221@app.get('/database_statistics/{name}') 

222def stats_info(name: str): 

223 with open("static/db_statistics.json") as f: 

224 db_stats=json.load(f) 

225 return db_stats[name] 

226 

227@app.get("/geopeas_urls/") 

228def geo_info(db: Session = Depends(get_db)): 

229 res = db.query(models.StationmetaGlobalService).all() 

230 return res 

231 

232 

233# Dependency 

234def get_db(): 

235 try: 

236 db = ToarDbSession() 

237 yield db 

238 finally: 

239 db.close() 

240 

241app.include_router(auth_user.router) 

242app.include_router(variables.router) 

243app.include_router(contacts.router) 

244app.include_router(stationmeta.router) 

245app.include_router(timeseries.router) 

246app.include_router(data.router) 

247 

248# get the controlled vocabulary from table 

249def __get_enum_dict(connection, table_name): 

250 res = connection.execute("select * from "+table_name+" order by enum_val") 

251 Enumdict=namedtuple("Dict",["value","string","display_str"]) 

252 enum_dict = [] 

253 for entry in res: 

254 enum_dict.append(Enumdict(*entry)) 

255 return enum_dict 

256 

257@app.on_event("startup") 

258# will be executed before application *starts* 

259# ==> again: at this point no database connection available! 

260# (and also all tables are unknown) 

261def startup_event(): 

262 

263 global RC_vocabulary 

264 global RS_vocabulary 

265 global AK_vocabulary 

266 global OK_vocabulary 

267 global SF_vocabulary 

268 global AT_vocabulary 

269 global OT_vocabulary 

270 global DO_vocabulary 

271 global CZ_vocabulary 

272 global CV_vocabulary 

273 global CN_vocabulary 

274 global TZ_vocabulary 

275 global ST_vocabulary 

276 global TA_vocabulary 

277 global TC_vocabulary 

278 global TR_vocabulary 

279 global LC_vocabulary 

280 global ER_vocabulary 

281 global RT_vocabulary 

282 global DF_vocabulary 

283 global CL_vocabulary 

284 global CS_vocabulary 

285 global KS_vocabulary 

286 global CT_vocabulary 

287 global controlled_fields 

288 

289 with engine.begin() as connection: 

290 RC_vocabulary = __get_enum_dict(connection, "rc_vocabulary") 

291 RS_vocabulary = __get_enum_dict(connection, "rs_vocabulary") 

292 AK_vocabulary = __get_enum_dict(connection, "ak_vocabulary") 

293 OK_vocabulary = __get_enum_dict(connection, "ok_vocabulary") 

294 SF_vocabulary = __get_enum_dict(connection, "sf_vocabulary") 

295 AT_vocabulary = __get_enum_dict(connection, "at_vocabulary") 

296 OT_vocabulary = __get_enum_dict(connection, "ot_vocabulary") 

297 DO_vocabulary = __get_enum_dict(connection, "do_vocabulary") 

298 CZ_vocabulary = __get_enum_dict(connection, "cz_vocabulary") 

299 CV_vocabulary = __get_enum_dict(connection, "cv_vocabulary") 

300 CN_vocabulary = __get_enum_dict(connection, "cn_vocabulary") 

301 TZ_vocabulary = __get_enum_dict(connection, "tz_vocabulary") 

302 ST_vocabulary = __get_enum_dict(connection, "st_vocabulary") 

303 TA_vocabulary = __get_enum_dict(connection, "ta_vocabulary") 

304 TC_vocabulary = __get_enum_dict(connection, "tc_vocabulary") 

305 TR_vocabulary = __get_enum_dict(connection, "tr_vocabulary") 

306 LC_vocabulary = __get_enum_dict(connection, "lc_vocabulary") 

307 ER_vocabulary = __get_enum_dict(connection, "er_vocabulary") 

308 RT_vocabulary = __get_enum_dict(connection, "rt_vocabulary") 

309 DF_vocabulary = __get_enum_dict(connection, "df_vocabulary") 

310 CL_vocabulary = __get_enum_dict(connection, "cl_vocabulary") 

311 CS_vocabulary = __get_enum_dict(connection, "cs_vocabulary") 

312 KS_vocabulary = __get_enum_dict(connection, "ks_vocabulary") 

313 CT_vocabulary = __get_enum_dict(connection, "ct_vocabulary") 

314 

315 # also get information, which database field is using which controlled vocabulary 

316 

317 controlled_fields = {} 

318 res = connection.execute("SELECT c.conname, c1.conname FROM pg_catalog.pg_constraint c " + 

319 "INNER JOIN pg_catalog.pg_constraint c1 ON c.confrelid=c1.conrelid " + 

320 "AND c1.conname LIKE '%%enum_val%%' AND c.conname LIKE '%%_fk_%%_voc%%'") 

321 for line in res: 

322 field, voc = (line[0].split('_fk_')) 

323 voc = voc.split('_')[0].upper() 

324 controlled_fields[field] = f"{voc}_vocabulary" 

325 engine.dispose() 

326