Coverage for toardb/data/crud.py: 80%
615 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH
2# SPDX-License-Identifier: MIT
4"""
5Create, Read, Update, Delete functionality
7"""
9import sys
10from io import StringIO
11from contextlib import closing
12import requests
13import json
14from collections import defaultdict
15import os
17from sqlalchemy import insert, delete, select, and_, text, func
18from sqlalchemy.orm import Session
19from sqlalchemy.engine import Engine
20from geoalchemy2.elements import WKBElement, WKTElement
21from fastapi import File, UploadFile
22from typing import List
23from fastapi import HTTPException
24from fastapi.responses import JSONResponse, Response
25import datetime as dt
26import pytz
27import pandas as pd
28import csv
30from . import models, schemas
31from toardb.variables import models as variables_models
32from toardb.variables.crud import get_variable
33from toardb.stationmeta import models as stationmeta_models
34from toardb.stationmeta.crud import get_stationmeta
35from toardb.timeseries.models import TimeseriesChangelog
36from toardb.timeseries.schemas import TimeseriesWithCitation
37from toardb.timeseries.crud import get_timeseries_by_unique_constraints, get_timeseries, get_citation, search_all
38from toardb.utils.utils import get_value_from_str, get_str_from_value, create_filter
39import toardb
40from toarqc import get_toarqc_config, run_toarqc
41from toarqc.tests import RangeTest
43def create_filter_from_aggreated_flags(i: int, filter_string: str, agg_flag_num: int):
44 # definition of aggregated_flags:
45 # agg. flag | composition of unique flags
46 # ----------|----------------------------
47 # 100 | 0-6
48 # 101 | 0-2
49 # 102 | 3-5
50 # 103 | 0, 1, 3, 4
51 # 104 | 2, 5, 6
52 # 110 | 10-16
53 # 111 | 10-12
54 # 112 | 13-16
55 # 120 | 20-28
56 # 121 | 20-23
57 # 122 | 24-28
58 # 130 | 10-28
59 # 131 | 10-12, 20-23
60 # 132 | 13-16, 24-28
61 # 140 | 7, 16, 28
62 filter_dict = { 100: [ 0, 1, 2, 3, 4, 5, 6],
63 101: [ 0, 1, 2],
64 102: [ 3, 4, 5],
65 103: [ 0, 1, 3, 4],
66 104: [ 2, 5, 6],
67 110: [10, 11, 12, 13, 14, 15, 16],
68 111: [10, 11, 12],
69 112: [13, 14, 15, 16],
70 120: [20, 21, 22, 23, 24, 25, 26, 27, 28],
71 121: [20, 21, 22, 23],
72 122: [24, 25, 26, 27, 28],
73 130: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
74 20, 21, 22, 23, 24, 25, 26, 27, 28],
75 131: [10, 11, 12,
76 20, 21, 22, 23],
77 132: [13, 16, 24, 25, 26, 27, 28],
78 140: [ 7, 16, 28] }
79 for flag_num in filter_dict[agg_flag_num]:
80 if i == 0:
81 filter_string = f"(data.flags = {flag_num})"
82 else:
83 filter_string = filter_string + f" OR (data.flags = {flag_num})"
84 i += 1
85 return i, filter_string
88def create_filter_from_flags(flags: str):
89 i = 0
90 filter_string = ''
91 for flag in flags.split(','):
92 flag_num = get_value_from_str(toardb.toardb.DF_vocabulary,flag)
93 # check, whether an aggregated flag was given
94 if (flag_num >= 100):
95 i, filter_string = create_filter_from_aggreated_flags(i,filter_string,flag_num)
96 else:
97 if i == 0:
98 filter_string = f"(data.flags = {flag_num})"
99 else:
100 filter_string = filter_string + f" OR (data.flags = {flag_num})"
101 i += 1
102 return f"({filter_string})"
105def get_data(db: Session, timeseries_id: int, path_params, query_params, daterange = None):
107 # BUT for some unknown reasons, get_timeseries does not return variable and programme!
108 def get_timeseries_meta(timeseries_id, lmerge=False):
109 record = get_timeseries(db, timeseries_id=timeseries_id)
110 # for some unknown reasons, variable, changelog, and programme have to be 'loaded' into the structure?!
111 if record:
112 dummy = record.variable
113 dummy = record.programme
114 dummy = record.changelog
115 attribution, citation, license_txt = get_citation(db, timeseries_id=timeseries_id).values()
116 record_dict = record.__dict__
117 record_dict['license'] = license_txt
118 record_dict['citation'] = citation
119 if attribution != None:
120 record_dict['attribution'] = attribution
121 if lmerge:
122 return record_dict
123 else:
124 return TimeseriesWithCitation(**record_dict)
125 else:
126 return None
128 try:
129 # translation of flags should be done here!
130 lmerge = False
131 flags = ",".join([item.strip() for v in query_params.getlist("flags") for item in v.split(',')])
132 if not flags:
133 flags = None
134 station_code = query_params.get("station_code", None)
135 if station_code:
136 limit, offset, fields, format, filters = create_filter(query_params, "timeseries_merged")
137 lmerge = True
138 else:
139 limit, offset, fields, format, filters = create_filter(query_params, "data")
140 d_filter = filters["d_filter"]
141 if lmerge:
142 try:
143 start_date = dt.datetime.fromisoformat(daterange[0])
144 stop_date = dt.datetime.fromisoformat(daterange[1])
145 except:
146 start_date = dt.datetime.fromisoformat(f"{daterange[0]}-01-01")
147 stop_date = dt.datetime.fromisoformat(f"{daterange[1]}-12-31")
148 d_filter += f"datetime BETWEEN '{start_date}' AND '{stop_date}'"
149 fields_list = []
150 if fields:
151 fields_list = fields.split(',')
152 columns = ( [getattr(models.Data, field) for field in fields_list]
153 if fields_list
154 else list(models.Data.__table__.columns) )
155 except KeyError as e:
156 status_code=400
157 return JSONResponse(status_code=status_code, content=str(e))
158 if flags:
159 filter_string = create_filter_from_flags(flags)
160 data = db.query(*columns).filter(models.Data.timeseries_id == timeseries_id). \
161 filter(text(filter_string)). \
162 filter(text(d_filter)). \
163 order_by(models.Data.datetime).all()
164 else:
165 data = db.query(*columns).filter(models.Data.timeseries_id == timeseries_id). \
166 filter(text(d_filter)). \
167 order_by(models.Data.datetime).all()
168 # get advantages from pydantic, but without having another call of the REST API
169 # (especially needed for testing with pytest!)
170 metadata = get_timeseries_meta(timeseries_id, lmerge)
171# TOAR day 2022-08-09: We want the timezone information for the requested data
172# metadata['timezone'] = ...
173 if not metadata:
174 return None
175 if format == 'json':
176 if lmerge:
177 return columns, metadata, data
178 else:
179 composite = schemas.Composite(metadata=metadata, data=data)
180 return composite
181 elif format == 'csv':
182 if not lmerge:
183 # start with metadata
184 content = '#' + metadata.json(indent=4, ensure_ascii=False).replace('\n', '\n#') + '\n'
185 # add header
186 content += ','.join(column.name for column in columns) + '\n'
187 # now the data
188 content += '\n'.join(','.join(f"{getattr(curr, column.name)}" for column in columns) for curr in data)
189 return Response(content=content, media_type="text/csv")
190 # take response for csv in timeseries_merge
191 else:
192 return columns, metadata, data
193 else:
194 status_code=400
195 message='Invalid format!'
196 return JSONResponse(status_code=status_code, content=message)
199def get_next_version(db: Session, timeseries_id: int, path_params, query_params):
200 data_version = db.query(func.max(models.Data.version)). \
201 filter(models.Data.timeseries_id == timeseries_id). \
202 distinct().first()
203 if is_preliminary(data_version[0]):
204 status_code = 407
205 message = 'Preliminary data has no next version!'
206 return JSONResponse(status_code=status_code, content=message)
207 else:
208 splitted = data_version[0].split('.')
209 lmajor = query_params.get("major", "False").lower() == "true"
210 major = int(splitted[0])
211 if lmajor:
212 major += 1
213 minor = 0
214 else:
215 minor = int(splitted[1]) + 1
216 return f"{major:06d}.{minor:06d}.00000000000000"
219def get_map_data(db:Session, variable_id, daterange):
220 daterange = daterange.split(',')
221 query_string = f"timeseries_id, value FROM timeseries t, data d WHERE d.timeseries_id=t.id AND t.variable_id={variable_id} AND d.datetime BETWEEN '{daterange[0]}' AND '{daterange[1]}'"
222 data = db.execute(select(text(query_string))).all()
223 return data
226def get_data_with_staging(db: Session, timeseries_id: int, flags: str, format: str):
228 # BUT for some unknown reasons, get_timeseries does not return variable and programme!
229 def get_timeseries_meta(timeseries_id):
230 record = get_timeseries(db, timeseries_id=timeseries_id)
231 # for some unknown reasons, variable, changelog, and programme have to be 'loaded' into the structure?!
232 if record:
233 dummy = record.variable
234 dummy = record.programme
235 dummy = record.changelog
236 attribution, citation, license_txt = get_citation(db, timeseries_id=timeseries_id).values()
237 record_dict = record.__dict__
238 record_dict['citation'] = citation
239 record_dict['license'] =license_txt
240 if attribution != None:
241 record_dict['attribution'] = attribution
242 return TimeseriesWithCitation(**record_dict)
243 else:
244 return None
245 if flags:
246 filter_string = create_filter_from_flags(flags)
247 data = db.query(models.Data).filter(models.Data.timeseries_id == timeseries_id).filter(text(filter_string)).order_by(models.Data.datetime).all()
248 else:
249 query_txt = f"SELECT distinct ON (datetime) * \
250 FROM ( SELECT *,1 AS db FROM staging.data WHERE timeseries_id={timeseries_id} \
251 UNION \
252 SELECT *,2 AS db FROM public.data WHERE timeseries_id={timeseries_id}) AS mix \
253 ORDER BY datetime, db"
254 data = db.query(models.Data).from_statement(text(query_txt)).all()
256 # get advantages from pydantic, but without having another call of the REST API
257 # (especially needed for testing with pytest!)
258 metadata = get_timeseries_meta(timeseries_id)
259# TOAR day 2022-08-09: We want the timezone information for the requested data
260# metadata['timezone'] = ...
261 if not metadata:
262 return None
263 if format == 'json':
264 composite = schemas.Composite(metadata=metadata, data=data)
265 return composite
266 elif format == 'csv':
267 # start with metadata
268 content = '#' + metadata.json(indent=4, ensure_ascii=False).replace('\n', '\n#') + '\n'
269 # add header
270 content += ','.join(column.name for column in models.Data.__mapper__.columns) + '\n'
271 # now the data
272 content += '\n'.join(','.join(f"{getattr(curr, column.name)}" for column in models.Data.__mapper__.columns) for curr in data)
273 return Response(content=content, media_type="text/csv")
274 else:
275 status_code=400
276 message='Invalid format!'
277 return JSONResponse(status_code=status_code, content=message)
280def get_all_merged_timeseries_ids(data):
281 ts_ids = []
282 for _, _, ts_id in data:
283 if not ts_id in ts_ids:
284 ts_ids.append(ts_id)
285 return ts_ids
287def replace_v8_v9_elements(elements):
288 # Initialize variables to store the elements with 'v8' and 'v9'
289 v8_elements = []
290 v9_elements = []
292 # Iterate through the list of elements
293 for i, element in enumerate(elements):
294 # Check if the fourth element is 2012
295 if element[3] == 2012:
296 # Check if the seventh element is 'v8' or 'v9' -- there may be more than one element of v8 and v9
297 if element[6] == 'v8':
298 v8_elements.append(element)
299 elif element[6] == 'v9':
300 v9_elements.append(element)
302 # Check if both 'v8' and 'v9' elements were found
303 if v8_elements != [] and v9_elements != []:
304 # Create a new element with the third item substituted with 'v89'
305 # use elements with highest coverages
307 v8_idx = max(range(len(v8_elements)), key=lambda i: v8_elements[i][4])
308 v8_element = v8_elements[v8_idx]
309 v9_idx = max(range(len(v9_elements)), key=lambda i: v9_elements[i][4])
310 v9_element = v9_elements[v9_idx]
311 new_element = (v8_element[0], v8_element[1], 'v89', v8_element[3], v8_element[4], v8_element[5], v8_element[6], v8_element[7], v8_element[8])
313 # To edit: if there are other elements of the sequential years, edit there timeseies id to 'v89'
314 # Add the new element to the list
315 index_v8_element = elements.index(v8_element)
316 elements[index_v8_element] = new_element
317 # Remove the 'v8' and 'v9' elements from the list
318 for i in range(len(elements) - 1, -1, -1):
319 if elements[i][3] == 2012 and elements[i][6] in ['v8', 'v9'] and elements[i][2] != 'v89':
320 del elements[i]
322 return elements, v8_element, v9_element
323 else:
324 return elements, None, None
327def select_provider(data):
328 # Organize data by year. Extract the timeseries entry from the provider with the highest rank.
329 # Input data example: [(46, 5, 18763, 1991, 0.3442922374429223, 1, 'N/A', dt.datetime(1993, 1, 1, 2, 0, tzinfo=dt.timezone.utc), dt.datetime(2025, 3, 5, 23, 0, tzinfo=dt.timezone.utc)), (46, 5, 18763, 1992, 0.345, 1, 'N/A', dt.datetime(1993, 1, 1, 2, 0, tzinfo=dt.timezone.utc), dt.datetime(2025, 3, 5, 23, 0, tzinfo=dt.timezone.utc)), (46, 5, 18763, 1993, 0.902054794520548, 1, 'N/A', dt.datetime(1993, 1, 1, 2, 0, tzinfo=dt.timezone.utc), dt.datetime(2025, 3, 5, 23, 0, tzinfo=dt.timezone.utc)), (46, 5, 18764, 1994, 0.56, 1, 'N/A', dt.datetime(2008, 1, 1, 1, 0, tzinfo=dt.timezone.utc)
330 year_data = defaultdict(list)
331 for _, _, ts_id, year, coverage, order, version, start_date, end_date in data:
332 year_data[year].append((ts_id, coverage, order, version, start_date, end_date))
334 selected = []
335 # Process each year
336 for year, entries in year_data.items():
337 # Sort by provider order (ascending, since lower is more relevant)
338 entries.sort(key=lambda x: x[2])
340 best_ts_id, best_coverage = entries[0][0], entries[0][1]
342 if best_coverage >= 0.75:
343 selected.append((year, best_ts_id))
344 continue
346 for ts_id, coverage, *_ in entries[1:]:
347 # do not confuse user in taking data of lowest quality, even if there is no data
348 if (coverage < 0.2 and best_coverage == 0.):
349 continue
350 if coverage >= best_coverage * 1.2:
351 best_ts_id, best_coverage = ts_id, coverage
353 if best_coverage >= 0.75:
354 break
355 selected.append((year, best_ts_id))
356 return selected
358def ensure_v8_before_v9_and_strip_tags(entries):
359 #Handling the case when v9's TS ID is smaller than v8's TS ID. v9 has to be always located after v8, whatever TS ID they contain.
360 v8_index = None
361 v9_index = None
363 # Find the indices of v8 and v9
364 for i, entry in enumerate(entries):
365 if len(entry) == 3:
366 if entry[2] == 'v8':
367 v8_index = i
368 elif entry[2] == 'v9':
369 v9_index = i
371 # Swap if v9 comes before v8
372 if v9_index is not None and v8_index is not None and v9_index < v8_index:
373 entries[v8_index], entries[v9_index] = entries[v9_index], entries[v8_index]
375 # Strip the third element if present
376 cleaned_entries = [(entry[0], entry[1]) if len(entry) == 3 else entry for entry in entries]
378 return cleaned_entries
380def merge_sequences(selected):
381 # Merge the timeseries (same timeseries id) from sequential years together.
382 merged = []
383 if not selected:
384 return merged
386 selected.sort()
387 eeasorted_selected = ensure_v8_before_v9_and_strip_tags(selected)
389 start_year, ts_id = eeasorted_selected[0]
390 end_year = start_year
392 for year, current_ts_id in eeasorted_selected[1:]:
393 if current_ts_id != ts_id:
394 merged.append([start_year, end_year, ts_id])
395 start_year = year
396 end_year = start_year
397 ts_id = current_ts_id
398 else:
399 end_year = year
400 merged.append([start_year, end_year, ts_id])
401 return merged
403def format_timeseries(data, v8_element, v9_element):
404 def format_start(start_year):
405 # Formatting the data outputted from the previous function.
406 formatted_date = f"{start_year}-01-01 00:00"
407 return formatted_date
408 def format_end(end_year):
409 formatted_date = f"{end_year}-12-31 23:00"
410 return formatted_date
412 v8_end_time = None
414 if v8_element and v9_element:
415 # Treating the case when v8_element and v9_element exist.
416 idelete = 0
417 for i, (start, end, ts_id) in enumerate(data):
418 if ts_id == v8_element[2]:
419 new_end = v8_element[8] # Replace second element
420 if new_end.year > end:
421 new_end = dt.datetime(end,12,31,23,0, tzinfo=dt.timezone.utc)
422 v8_end_time = new_end # Save for comparison
423 data[i] = [format_start(start), new_end.strftime("%Y-%m-%d %H:%M"), ts_id]
424 elif ts_id == v9_element[2]:
425 new_start = v9_element[7]
426 if new_start.year < start:
427 new_start = dt.datetime(start,1,1,0,0, tzinfo=dt.timezone.utc)
428 if new_start <= v8_end_time:
429 new_start = v8_end_time + dt.timedelta(hours=1)
430 if v8_end_time and v8_end_time == dt.datetime(2012, 12, 31, 23, tzinfo=pytz.utc):
431 new_start = dt.datetime(2013,1,1,0,0, tzinfo=dt.timezone.utc)
432 if new_start > v9_element[8]:
433 idelete = i
434 data[i] = [new_start.strftime("%Y-%m-%d %H:%M"), format_end(end), ts_id] # Replace first element
435 else:
436 # Format other dates
437 data[i] = [format_start(start), format_end(end), ts_id]
438 if idelete:
439 del data[idelete]
440 else:
441 for i, (start, end, ts_id) in enumerate(data):
442 data[i] = [format_start(start), format_end(end), ts_id]
443 return data
445def substitute_v89_entry(entries, v8_element, v9_element):
446 new_elements = [(2012, v8_element[2], "v8"), (2012, v9_element[2], "v9")]
447 modified_entries = [(year, tsid) for year, tsid in entries if tsid != 'v89'] + new_elements
448 return modified_entries
450def get_merging_list(db, station_code: str, variable_id: str, daterange: str):
452 query_string = f"y.*, t.order, t.provider_version, t.data_start_date, t.data_end_date FROM yearly_coverage y, timeseries t, stationmeta_core s WHERE t.id=y.timeseries_id AND s.id = y.station_id AND '{station_code}'=ANY(s.codes) AND y.variable_id={variable_id} ORDER BY year, t.order;"
453 records = db.execute(select(text(query_string))).all()
455 new_records, v8_element, v9_element = replace_v8_v9_elements(records) # Create fake timeseries IDs for v8 and v9 timeseries to make them distinguishable in the next steps
456 result = select_provider(new_records) # Coverage check
458 eea_ts_id = next((ts_id for year, ts_id in result if year == 2012), None) # Extract ts_id for any time series of 2012
459 modified_entries = None
460 if eea_ts_id == 'v89': # if a timeseries with "v89" has passed through the coverage check, the next steps will execute the processing of the 2012 EEA particular data along with the general data.
461 modified_entries = substitute_v89_entry(result, v8_element, v9_element)
463 if modified_entries:
464 merged_results = merge_sequences(modified_entries)
465 ts_list = format_timeseries(merged_results, v8_element, v9_element)
466 else:
467 merged_results = merge_sequences(result) # no special processing, since no splitted 2012 EEA data were found.
468 ts_list = format_timeseries(merged_results, None, None)
470 if daterange:
471 start_date, end_date = [dt.datetime.fromisoformat(date) for date in daterange.split(',')]
472 ts_list_daterange = []
473 for ts in ts_list:
474 try:
475 ts_start = dt.datetime.fromisoformat(ts[0])
476 ts_end = dt.datetime.fromisoformat(ts[1])
477 except:
478 ts_start = dt.datetime.fromisoformat(f"{ts[0]}-01-01")
479 ts_end = dt.datetime.fromisoformat(f"{ts[1]}-12-31")
480 if (start_date <= ts_start <= end_date) or (start_date <= ts_end <= end_date) or (ts_start <= start_date and ts_end >= end_date):
481 intersection_start = max(start_date, ts_start)
482 intersection_end = min(end_date, ts_end)
483 ts_list_daterange.append([intersection_start.isoformat(), intersection_end.isoformat(), ts[2]])
484 ts_list = ts_list_daterange
485 timeseries_ids = []
486 if ts_list == []:
487 timeseries_ids = get_all_merged_timeseries_ids(merged_results)
488 return ts_list, timeseries_ids
491def get_merged_data(db: Session, variable_id: int, station_code: str, path_params, query_params):
493 ## still to be done: the daterange given by the user should be taken into account to only load the data that is needed
494 # do this in the first place (do not cut the data short later)
495 # --> take the daterange as a separate argument to be able know it, when being ignored by utils.create_filters
497 daterange = query_params.get("daterange", None)
499 ## check, whether station_code exists
500 db_stationmeta = get_stationmeta(db, station_code=station_code, fields="id")
501 if db_stationmeta is None:
502 raise HTTPException(status_code=404, detail=f"Metadata for station '{station_code}' not found.")
504 ts_list, timeseries_ids = get_merging_list(db, station_code, variable_id, daterange)
505 data_merged = {'metadata': [],
506 'data': []}
507 used_timeseries_ids = []
508 if ts_list == []:
509 # some time series do not have an entry in the yearly coverage table!
510 if timeseries_ids == []:
511 data_merged['metadata'] = search_all(db, path_params, query_params, endpoint='timeseries_merged')
512 data_merged['data'] = []
513 return data_merged
514 for ts_id in timeseries_ids:
515 ts_list.append(['1960-01-01', '1961-01-01', ts_id])
516 for ts_part in ts_list:
517 columns, metadata, data = get_data(db, ts_part[2], path_params, query_params, daterange=[ts_part[0], ts_part[1]])
518 data_merged['data'] += data
519 # do not show metadata twice
520 if not ts_part[2] in used_timeseries_ids:
521 data_merged['metadata'] += [ TimeseriesWithCitation(**metadata) ]
522 used_timeseries_ids.append(ts_part[2])
523 format = query_params.get("format", "json")
524 if format == 'csv':
525 content = ''
526 # start with metadata
527 for part_meta in data_merged['metadata']:
528 content += '#' + part_meta.json(indent=4, ensure_ascii=False).replace('\n', '\n#') + '\n'
529 # add header
530 content += ','.join(column.name for column in columns) + '\n'
531 # now the data
532 content += '\n'.join(','.join(f"{getattr(curr, column.name)}" for column in columns) for curr in data_merged['data'])
533 return Response(content=content, media_type="text/csv")
534 else:
535 return data_merged
537def get_data_by_datetime_and_timeseriesid(db: Session, datetime: dt.datetime, timeseries_id: int):
538 return db.query(models.Data).filter([models.Data.datetime== datetime, models.Data.timeseries_id == timeseries_id]).first()
540def get_all_data(db: Session, limit: int, offset: int = 0):
541 return db.query(models.Data).order_by(models.Data.datetime).limit(limit).all()
543def is_preliminary(version: str):
544 return (version.split('.')[0] == '000000')
546def create_data_record(db: Session, engine: Engine,
547 series_id: int, datetime: dt.datetime,
548 value: float, flag: str, version: str,
549 author_id: int):
550 toarqc_config_type: str = 'standard'
551 timeseries = get_timeseries(db=db,timeseries_id=series_id)
552 variable = get_variable(db=db, variable_id=timeseries.variable_id)
553 parameter = variable.name
554 data_dict = {"datetime": datetime,
555 "value": value,
556 "flags": flag,
557 "version": version,
558 "timeseries_id": series_id}
559 df = pd.DataFrame([data_dict]).set_index("datetime")
560 try:
561 test_config = get_toarqc_config('static/../toardb/data/toarqc_config',parameter, toarqc_config_type)
562 except FileNotFoundError:
563 message = f'no toarqc configuration found for {parameter}, {toarqc_config_type}'
564 status_code = 400
565 return JSONResponse(status_code=status_code, content=message)
566 result = run_toarqc(test_config, df, ok_limit=0.85, questionable_limit=0.6)
567 combined_flag_matrix = {( 'OK', 'OK') : 'OKPreliminaryNotChecked',
568 ( 'Questionable', 'OK') : 'QuestionablePreliminaryNotChecked',
569 ( 'Erroneous', 'OK') : 'ErroneousPreliminaryNotChecked',
570 ( 'OK', 'Erroneous') : 'ErroneousPreliminaryFlagged1',
571 ( 'Questionable', 'Erroneous') : 'ErroneousPreliminaryFlagged2',
572 ( 'Erroneous', 'Erroneous') : 'Erroneous_Preliminary_Confirmed'}
573 combined_flag = combined_flag_matrix[(flag,result['flags'].iloc[0])]
574 data_dict["flags"] = get_value_from_str(toardb.toardb.DF_vocabulary,combined_flag.strip())
575 data = models.Data(**data_dict)
576 db.rollback()
577 db.add(data)
578 # adjust data_start_date, data_end_date
579 datetime = datetime.replace(tzinfo=timeseries.data_end_date.tzinfo)
580 if datetime < timeseries.data_start_date:
581 timeseries.data_start_date = datetime
582 if datetime > timeseries.data_end_date:
583 timeseries.data_end_date = datetime
584 db.add(timeseries)
585 result = db.commit()
586 db.refresh(data)
587 # if not preliminary data: create changelog entry
588 if not is_preliminary(version):
589 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"Created")
590 description="data record created"
591 db_changelog = TimeseriesChangelog(description=description, timeseries_id=series_id, author_id=author_id, type_of_change=type_of_change,
592 old_value='', new_value='', period_start=datetime, period_end=datetime, version=version)
593 db.add(db_changelog)
594 db.commit()
595 status_code=200
596 message='Data successfully inserted.'
597 return JSONResponse(status_code=status_code, content=message)
600def insert_dataframe (db: Session, engine: Engine, df: pd.DataFrame, toarqc_config_type: str = 'standard', dry_run: bool = False, parameter: str = 'o3', preliminary: bool = False, force: bool = False):
601 # df: pandas.DataFrame
602 # index: datetime
603 # 1st column: value
604 # 2nd column: flags (0: 'OK', 1: 'Questionable', 2: 'Erroneous')
605 # 3rd column: timeseries_id
606 # 4th column: version
607 df['flags'] = df['flags'].replace([0],'OK')
608 df['flags'] = df['flags'].replace([1],'Questionable')
609 df['flags'] = df['flags'].replace([2],'Erroneous')
610 try:
611 test_config = get_toarqc_config('static/../toardb/data/toarqc_config',parameter, toarqc_config_type)
612 except FileNotFoundError:
613 message = f'no toarqc configuration found for {parameter}, {toarqc_config_type}'
614 status_code = 400
615 return JSONResponse(status_code=status_code, content=message)
616 # get flags and qc-statistics from toarqc
617 # available keys in result (at the moment):
618 # 'time_series'
619 # 'test_config'
620 # 'ok_limit'
621 # 'questionable_limit'
622 # 'metadata'
623 # 'plot'
624 # 'outfile'
625 # 'plot_type'
626 # 'results'
627 # 'qc-score'
628 # 'flags'
629 # 'number_OK'
630 # 'number_Questionable'
631 # 'number_Erroneous'
632 # 'percentage_OK'
633 # 'percentage_Questionable'
634 # 'percentage_Erroneous'
635 # 'qc-score_g0'
636 # 'flags_g0'
637 # 'qc-score_g1'
638 # 'flags_g1'
639 # 'qc-score_g2'
640 # 'flags_g2'
641 # 'qc-score_g3'
642 # 'flags_g3'
643 # 'qc-score_g0_range_test'
644 # 'flags_g0_range_test'
645 # 'qc-score_g1_sigma_test'
646 # 'flags_g1_sigma_test'
647 # 'qc-score_g2_constant_value_test'
648 # 'flags_g2_constant_value_test'
649 # 'qc-score_g2_positive_spike_test'
650 # 'flags_g2_positive_spike_test'
651 # 'qc-score_g2_negative_spike_test'
652 # 'flags_g2_negative_spike_test'
653 # 'qc-score_g3_before_nan_test'
654 # 'flags_g3_before_nan_test'
655 # 'qc-score_g3_after_nan_test'
656 # 'flags_g3_after_nan_test'
657 # toarqc expects pandas.Series
658 # --> if getting pandas.DataFrame, it will take index and first column
659 # (other columns will be ignored)
660 if toarqc_config_type != 'realtime':
661 try:
662 result = run_toarqc(test_config, df, ok_limit=0.85, questionable_limit=0.6)
663 except ValueError as ve:
664 message = {"detail":{"message":f'Aborted by automatic quality control: {ve}'}}
665 status_code = 445
666 return JSONResponse(status_code=status_code, content=message)
667 percentage_OK = result['percentage_OK']
668 if percentage_OK >= 0.9 or force:
669 target_schema='public'
670 message = {"detail":{"message":"Data successfully inserted."}}
671 status_code = 200
672 else:
673 target_schema='staging'
674 message = f'Aborted by automatic quality control: percentage_OK = {percentage_OK}'
675 status_code = 446
676 toarqc_flags = result['flags']
677 # combine toarqc flags with flags given by provider (see matrix: TOAR_UG_Vol03_Database_2021-05.docx, chapter 5.2)
678 # preliminary provider toarqc combined
679 combined_flag_matrix = {( False, 'OK', 'OK') : 'OKValidatedQCPassed',
680 ( False, 'OK', 'Questionable') : 'QuestionableValidatedFlagged',
681 ( False, 'OK', 'Erroneous') : 'ErroneousValidatedFlagged1',
682 ( False, 'Questionable', 'OK') : 'QuestionableValidatedUnconfirmed',
683 ( False, 'Questionable', 'Questionable') : 'QuestionableValidatedConfirmed',
684 ( False, 'Questionable', 'Erroneous') : 'ErroneousValidatedFlagged2',
685 ( False, 'Erroneous', 'OK') : 'ErroneousValidatedUnconfirmed',
686 ( False, 'Erroneous', 'Questionable') : 'ErroneousValidatedConfirmed',
687 ( False, 'Erroneous', 'Erroneous') : 'ErroneousValidatedConfirmed',
688 ( True, 'OK', 'OK') : 'OKPreliminaryQCPassed',
689 ( True, 'OK', 'Questionable') : 'QuestionablePreliminaryFlagged',
690 ( True, 'OK', 'Erroneous') : 'ErroneousPreliminaryFlagged1',
691 ( True, 'Questionable', 'OK') : 'QuestionablePreliminaryUnconfirmed',
692 ( True, 'Questionable', 'Questionable') : 'QuestionablePreliminaryConfirmed',
693 ( True, 'Questionable', 'Erroneous') : 'ErroneousPreliminaryFlagged2',
694 ( True, 'Erroneous', 'OK') : 'ErroneousPreliminaryUnconfirmed',
695 ( True, 'Erroneous', 'Questionable') : 'ErroneousPreliminaryConfirmed',
696 ( True, 'Erroneous', 'Erroneous') : 'ErroneousPreliminaryConfirmed'}
697 combined_flags = [ combined_flag_matrix[(preliminary,provider_flag,toarqc_flag)] for provider_flag, toarqc_flag in zip(df['flags'],toarqc_flags) ]
698 else:
699 range_test = RangeTest(**test_config[0]["range_test"])
700 toarqc_flags = range_test.run(df['value'])
701 toarqc_flags = toarqc_flags.replace([1],'OK')
702 toarqc_flags = toarqc_flags.replace([0],'Erroneous')
703 target_schema='public'
704 status_code = 200
705 message = {"detail":{"message":"Data successfully inserted."}}
707 # combine toarqc flags with flags given by provider (see matrix: TOAR_UG_Vol03_Database_2021-05.docx, chapter 5.2)
708 # the range test is not a full QC test ==> the range test only returns 'OK' and 'Erroneous'
709 # provider toarqc combined
710 combined_flag_matrix = {( 'OK', 'OK') : 'OKPreliminaryNotChecked',
711 ( 'Questionable', 'OK') : 'QuestionablePreliminaryNotChecked',
712 ( 'Erroneous', 'OK') : 'ErroneousPreliminaryNotChecked',
713 ( 'OK', 'Erroneous') : 'ErroneousPreliminaryFlagged1',
714 ( 'Questionable', 'Erroneous') : 'ErroneousPreliminaryFlagged2',
715 ( 'Erroneous', 'Erroneous') : 'Erroneous_Preliminary_Confirmed'}
716 combined_flags = [ combined_flag_matrix[(provider_flag, toarqc_flag)] for provider_flag, toarqc_flag in zip(df['flags'],toarqc_flags) ]
717 # exchange combined flags in dataframe
718 flag_num = [get_value_from_str(toardb.toardb.DF_vocabulary,flag.strip()) for flag in combined_flags]
719 del df['flags']
720 df.insert(1, 'flags', flag_num)
721 if dry_run:
722 return JSONResponse(status_code=status_code, content=df.to_json(orient='table', date_format='iso'))
723 else:
724 buf = StringIO()
725 df.to_csv(buf, header=False)
726 buf.pos = 0
727 buf.seek(0)
728 with closing(engine.raw_connection()) as fake_conn:
729 fake_cur = fake_conn.cursor()
730 try:
731 fake_cur.copy_expert(f"COPY {target_schema}.data (datetime, value, flags, timeseries_id, version) FROM STDIN WITH CSV DELIMITER ',';", buf)
732 fake_conn.commit()
733 except:
734 e = sys.exc_info()[0]
735 message = f"An error occurred in {target_schema}.data insertion: %s" % (e,)
736 status_code = 400
737 fake_cur.close()
738 return JSONResponse(status_code=status_code, content=message)
741def create_data(db: Session, engine: Engine, author_id: int, input_handle: UploadFile = File(...), toarqc_config_type: str = 'standard', dry_run: bool = False,
742 force: bool = False):
743 # a timeseries is defined by the unique_constraint of (station_id, variable_id, ...)
744 # station_id: from header
745 # variable_id: from database (with variable_name -- from filename)
746 # get variable_name from filename
747 variable_name = input_handle.filename.split('_')[0]
748 variable = db.query(variables_models.Variable).filter(variables_models.Variable.name == variable_name).first()
749 variable_id = variable.id
750 # get header information (station_id, contributor_shortname, timeshift_from_utc)
751 line = '#bla'
752 f = input_handle.file
753 prev = pos = 0
754 while line[0] == '#':
755 line = f.readline().decode('utf-8')
756 key = line.split(':')[0].lower().strip()
757 if key == "#station_id":
758 station_id = line.split(':')[1]
759 if key == "#timeshift_from_utc":
760 timeoffset = dt.timedelta(hours=float(line.split(':')[1]))
761 if key == "#dataset_contributor_organisation_longname":
762 contributor = line.split(':')[1].strip()
763 if key == "#dataset_pi_organisation_longname":
764 contributor = line.split(':')[1].strip()
765 if key == "#dataset_resourceprovider_organisation_longname":
766 contributor = line.split(':')[1].strip()
767 prev, pos = pos, f.tell()
768 f.seek(prev)
769 station_code = station_id.strip()
770 stationmeta_core = get_stationmeta(db=db,station_code=station_code,fields="id")
771 station_id = stationmeta_core["id"]
772 timeseries = get_timeseries_by_unique_constraints(db=db,station_id=station_id,variable_id=variable_id,resource_provider=contributor)
773 # again problems with converted coordinates!
774 db.rollback()
775 version = '000001.000000.00000000000000'
776 if timeseries:
777 timeseries_id = timeseries.id
778 # open SpooledTemporaryFile, skip header (and also try to insert timeseries_id!)
779 # python3.8: bug in SpooledTemporaryFile --> https://bugs.python.org/issue26175
780 # --> https://github.com/fedspendingtransparency/usaspending-api/pull/2963
781 # ==> I have to check for a workaround!
782 df = pd.read_csv(input_handle.file, comment='#', header=None, sep=';',names=["time","value","flags"],parse_dates=["time"],index_col="time")
783 # substract timeshift to convert data to UTC
784 df.index = df.index - timeoffset
785 # now insert the timeseries_id to the end of the data frame
786 df.insert(2, 'timeseries_id', timeseries_id)
787 # also insert version
788 df.insert(3, 'version', version)
789 # datetime needs timezone information
790 df = df.tz_localize('UTC')
791 result = insert_dataframe (db, engine, df, toarqc_config_type=toarqc_config_type, parameter=variable_name, dry_run=dry_run, force=force)
792 # adjust data_start_date, data_end_date
793 if result.status_code == 200:
794 data_start_date = min(df.index)
795 data_end_date = max(df.index)
796 if data_start_date < timeseries.data_start_date:
797 timeseries.data_start_date = data_start_date
798 if data_end_date > timeseries.data_end_date:
799 timeseries.data_end_date = data_end_date
800 db.add(timeseries)
801 db.commit()
802 return result
803 else:
804 message = f'Timeseries not found for station {station_code.strip()}, variable {variable_name}'
805 status_code = 400
806 return JSONResponse(status_code=status_code, content=message)
809def create_bulk_data(db: Session, engine: Engine, bulk: List[schemas.DataCreate], author_id: int,
810 toarqc_config_type: str = 'standard', dry_run: bool = False, force: bool = False):
811 df = pd.DataFrame([x.dict() for x in bulk]).set_index("datetime")
812 # bulk data: to be able to do at least a range test, all data needs to be from the same parameter
813 # variable_name is therefore determined for the first entry of the dataframe
814 timeseries = get_timeseries(db=db,timeseries_id=int(df['timeseries_id'].iloc[0]))
815 # again problems with converted coordinates!
816 db.rollback()
817 variable = get_variable(db=db, variable_id=timeseries.variable_id)
818 variable_name = variable.name
819 insert_result = insert_dataframe (db, engine, df, toarqc_config_type=toarqc_config_type, parameter=variable_name, dry_run=dry_run, force=force)
820 # adjust data_start_date, data_end_date
821 if insert_result.status_code == 200:
822 data_start_date = min(df.index)
823 data_end_date = max(df.index)
824 if data_start_date < timeseries.data_start_date:
825 timeseries.data_start_date = data_start_date
826 if data_end_date > timeseries.data_end_date:
827 timeseries.data_end_date = data_end_date
828 db.add(timeseries)
829 db.commit()
830 return insert_result
833def patch_data(db: Session, engine: Engine, description: str, version: str, toarqc_config_type: str,
834 force: bool, author_id: int, input_handle: UploadFile = File(...)):
835 # a timeseries is defined by the unique_constraint of (station_id, variable_id, ...)
836 # station_id: from header
837 # variable_id: from database (with variable_name -- from filename)
838 # get variable_name from filename
840 # versionlabel has to be unique for this timeseries ==> to be checked!
842 variable_name = input_handle.filename.split('_')[0]
843 variable = db.query(variables_models.Variable).filter(variables_models.Variable.name == variable_name).first()
844 variable_id = variable.id
845 # get header information (station_id, contributor_shortname, timeshift_from_utc)
846 line = '#bla'
847 f = input_handle.file
848 prev = pos = 0
849 while line[0] == '#':
850 line = f.readline().decode('utf-8')
851 key = line.split(':')[0].lower().strip()
852 if key == "#station_id":
853 station_id = line.split(':')[1]
854 if key == "#timeshift_from_utc":
855 timeoffset = dt.timedelta(hours=float(line.split(':')[1]))
856 if key == "#dataset_contributor_organisation_longname":
857 contributor = line.split(':')[1].strip()
858 if key == "#dataset_pi_organisation_longname":
859 contributor = line.split(':')[1].strip()
860 if key == "#dataset_resourceprovider_organisation_longname":
861 contributor = line.split(':')[1].strip()
862 prev, pos = pos, f.tell()
863 f.seek(prev)
864 station_code = station_id
865 stationmeta_core = get_stationmeta(db=db,station_code=station_code,fields="id")
866 station_id = stationmeta_core["id"]
867 timeseries = get_timeseries_by_unique_constraints(db=db,station_id=station_id,variable_id=variable_id, resource_provider=contributor)
868 # again problems with converted coordinates!
869 db.rollback()
870 if timeseries:
871 timeseries_id = timeseries.id
872 # open SpooledTemporaryFile, skip header (and also try to insert timeseries_id!)
873 df = pd.read_csv(input_handle.file, comment='#', header=None, sep=';',names=["time","value","flags"],parse_dates=["time"],index_col="time")
874 # substract timeshift to convert data to UTC
875 df.index = df.index - timeoffset
876 # now insert the timeseries_id to the end of the data frame
877 df.insert(2, 'timeseries_id', timeseries_id)
878 # also insert version
879 df.insert(3, 'version', version)
880 # datetime needs timezone information
881 df = df.tz_localize('UTC')
882 # determine period_start and period_end of data
883 period_start = min(df.index)
884 period_end = max(df.index)
885 # mv data from this period to data_archive
886 db.execute(insert(models.DataArchive).from_select((models.Data.datetime,models.Data.value,models.Data.flags,models.Data.version,models.Data.timeseries_id),
887 select([models.Data]).where(
888 and_(and_(models.Data.timeseries_id == timeseries_id,
889 models.Data.datetime >= period_start),
890 models.Data.datetime <= period_end))))
891 db.execute(delete(models.Data).where(
892 and_(and_(models.Data.timeseries_id == timeseries_id,
893 models.Data.datetime >= period_start),
894 models.Data.datetime <= period_end)))
895 db.commit()
896 # now insert new data for this period from file
897 buf = StringIO()
898 df.to_csv(buf, header=False)
899 buf.pos = 0
900 buf.seek(0)
901 with closing(engine.raw_connection()) as fake_conn:
902 fake_cur = fake_conn.cursor()
903 try:
904 fake_cur.copy_from(buf, 'data', sep=',', columns=('datetime','value','flags','timeseries_id', 'version'))
905 fake_conn.commit()
906 # adjust data_start_date, data_end_date
907 if period_start < timeseries.data_start_date:
908 timeseries.data_start_date = period_start
909 if period_end > timeseries.data_end_date:
910 timeseries.data_end_date = period_end
911 db.add(timeseries)
912 db.commit()
913 message = {"detail":{"message":"Data successfully inserted."}}
914 status_code = 200
915 except:
916 e = sys.exc_info()[0]
917 message = {"detail":{"message":"An error occurred in data insertion: %s" % (e,)}}
918 status_code = 400
919 return JSONResponse(status_code=status_code, content=message)
920 fake_cur.close()
921 # create changelog entry
922 # how to determine type_of_change?
923 # 4 – unspecified data value corrections (this holds also, if there is only one single value to be corrected; the addition "unspecified" keeps all possibilities open to add "specified" corrections later (e. g. from QC)
924 # 5 – replaced data with a new version
925 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"UnspecifiedData")
926 db_changelog = TimeseriesChangelog(description=description, timeseries_id=timeseries_id, author_id=author_id, type_of_change=type_of_change,
927 old_value="", new_value="", period_start=period_start, period_end=period_end, version=version)
928 db.add(db_changelog)
929 db.commit()
930 else:
931 message = f'Timeseries not found for station {station_code.strip()}, variable {variable_name}'
932 status_code = 400
933 return JSONResponse(status_code=status_code, content=message)
935def patch_bulk_data(db: Session, engine: Engine, description: str, version: str, bulk: List[schemas.DataPatch], author_id: int,
936 toarqc_config_type: str = 'standard', force: bool = False, no_archive: bool = False):
937 df = pd.DataFrame([x.dict() for x in bulk]).set_index("datetime")
938 # bulk data: to be able to do at least a range test, all data needs to be from the same parameter
939 # variable_name is therefore determined for the first entry of the dataframe
940 timeseries_id = int(df['timeseries_id'].iloc[0])
941 timeseries = get_timeseries(db=db,timeseries_id=timeseries_id)
942 variable = get_variable(db=db, variable_id=timeseries.variable_id)
943 variable_name = variable.name
944 timeseries_list = pd.unique(df['timeseries_id'])
945 # next command just to avoid problems with converted coordinates
946 db.rollback()
947 for t_id in timeseries_list:
948 df2 = df[df['timeseries_id'] == t_id]
949 timeseries_id = int(t_id)
950 # determine period_start and period_end of data
951 period_start = min(df2.index)
952 period_end = max(df2.index)
953 # mv data from this period to data_archive
954 if not no_archive:
955 db.execute(insert(models.DataArchive).from_select((models.Data.datetime,models.Data.value,models.Data.flags,models.Data.version,models.Data.timeseries_id),
956 select([models.Data]).where(
957 and_(and_(models.Data.timeseries_id == timeseries_id,
958 models.Data.datetime >= period_start),
959 models.Data.datetime <= period_end))))
960 # delete all data in period that is patched
961 db.execute(delete(models.Data).where(
962 and_(and_(models.Data.timeseries_id == timeseries_id,
963 models.Data.datetime >= period_start),
964 models.Data.datetime <= period_end)))
965 db.commit()
966 # adjust data_start_date, data_end_date
967 timeseries = get_timeseries(db=db,timeseries_id=timeseries_id)
968 # next command just to avoid problems with converted coordinates
969 db.rollback()
970 if period_start < timeseries.data_start_date:
971 timeseries.data_start_date = period_start
972 if period_end > timeseries.data_end_date:
973 timeseries.data_end_date = period_end
974 db.add(timeseries)
975 db.commit()
976 result = insert_dataframe (db, engine, df, toarqc_config_type=toarqc_config_type, parameter=variable_name, force=force)
977 if not no_archive:
978 # create changelog entry
979 # how to determine type_of_change?
980 # 4 – unspecified data value corrections (this holds also, if there is only one single value to be corrected; the addition "unspecified" keeps all possibilities open to add "specified" corrections later (e. g. from QC)
981 # 5 – replaced data with a new version
982 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"Replaced")
983 db_changelog = TimeseriesChangelog(description=description, timeseries_id=timeseries_id, author_id=author_id, type_of_change=type_of_change,
984 old_value="", new_value="", period_start=period_start, period_end=period_end, version=version)
985 db.add(db_changelog)
986 db.commit()
987 return result