Coverage for toardb/data/crud.py: 80%

615 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 20:32 +0000

1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH 

2# SPDX-License-Identifier: MIT 

3 

4""" 

5Create, Read, Update, Delete functionality 

6 

7""" 

8 

9import sys 

10from io import StringIO 

11from contextlib import closing 

12import requests 

13import json 

14from collections import defaultdict 

15import os 

16 

17from sqlalchemy import insert, delete, select, and_, text, func 

18from sqlalchemy.orm import Session 

19from sqlalchemy.engine import Engine 

20from geoalchemy2.elements import WKBElement, WKTElement 

21from fastapi import File, UploadFile 

22from typing import List 

23from fastapi import HTTPException 

24from fastapi.responses import JSONResponse, Response 

25import datetime as dt 

26import pytz 

27import pandas as pd 

28import csv 

29 

30from . import models, schemas 

31from toardb.variables import models as variables_models 

32from toardb.variables.crud import get_variable 

33from toardb.stationmeta import models as stationmeta_models 

34from toardb.stationmeta.crud import get_stationmeta 

35from toardb.timeseries.models import TimeseriesChangelog 

36from toardb.timeseries.schemas import TimeseriesWithCitation 

37from toardb.timeseries.crud import get_timeseries_by_unique_constraints, get_timeseries, get_citation, search_all 

38from toardb.utils.utils import get_value_from_str, get_str_from_value, create_filter 

39import toardb 

40from toarqc import get_toarqc_config, run_toarqc 

41from toarqc.tests import RangeTest 

42 

43def create_filter_from_aggreated_flags(i: int, filter_string: str, agg_flag_num: int): 

44 # definition of aggregated_flags: 

45 # agg. flag | composition of unique flags 

46 # ----------|---------------------------- 

47 # 100 | 0-6 

48 # 101 | 0-2 

49 # 102 | 3-5 

50 # 103 | 0, 1, 3, 4 

51 # 104 | 2, 5, 6 

52 # 110 | 10-16 

53 # 111 | 10-12 

54 # 112 | 13-16 

55 # 120 | 20-28 

56 # 121 | 20-23 

57 # 122 | 24-28 

58 # 130 | 10-28 

59 # 131 | 10-12, 20-23 

60 # 132 | 13-16, 24-28 

61 # 140 | 7, 16, 28 

62 filter_dict = { 100: [ 0, 1, 2, 3, 4, 5, 6], 

63 101: [ 0, 1, 2], 

64 102: [ 3, 4, 5], 

65 103: [ 0, 1, 3, 4], 

66 104: [ 2, 5, 6], 

67 110: [10, 11, 12, 13, 14, 15, 16], 

68 111: [10, 11, 12], 

69 112: [13, 14, 15, 16], 

70 120: [20, 21, 22, 23, 24, 25, 26, 27, 28], 

71 121: [20, 21, 22, 23], 

72 122: [24, 25, 26, 27, 28], 

73 130: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 

74 20, 21, 22, 23, 24, 25, 26, 27, 28], 

75 131: [10, 11, 12, 

76 20, 21, 22, 23], 

77 132: [13, 16, 24, 25, 26, 27, 28], 

78 140: [ 7, 16, 28] } 

79 for flag_num in filter_dict[agg_flag_num]: 

80 if i == 0: 

81 filter_string = f"(data.flags = {flag_num})" 

82 else: 

83 filter_string = filter_string + f" OR (data.flags = {flag_num})" 

84 i += 1 

85 return i, filter_string 

86 

87 

88def create_filter_from_flags(flags: str): 

89 i = 0 

90 filter_string = '' 

91 for flag in flags.split(','): 

92 flag_num = get_value_from_str(toardb.toardb.DF_vocabulary,flag) 

93 # check, whether an aggregated flag was given 

94 if (flag_num >= 100): 

95 i, filter_string = create_filter_from_aggreated_flags(i,filter_string,flag_num) 

96 else: 

97 if i == 0: 

98 filter_string = f"(data.flags = {flag_num})" 

99 else: 

100 filter_string = filter_string + f" OR (data.flags = {flag_num})" 

101 i += 1 

102 return f"({filter_string})" 

103 

104 

105def get_data(db: Session, timeseries_id: int, path_params, query_params, daterange = None): 

106 

107 # BUT for some unknown reasons, get_timeseries does not return variable and programme! 

108 def get_timeseries_meta(timeseries_id, lmerge=False): 

109 record = get_timeseries(db, timeseries_id=timeseries_id) 

110 # for some unknown reasons, variable, changelog, and programme have to be 'loaded' into the structure?! 

111 if record: 

112 dummy = record.variable 

113 dummy = record.programme 

114 dummy = record.changelog 

115 attribution, citation, license_txt = get_citation(db, timeseries_id=timeseries_id).values() 

116 record_dict = record.__dict__ 

117 record_dict['license'] = license_txt 

118 record_dict['citation'] = citation 

119 if attribution != None: 

120 record_dict['attribution'] = attribution 

121 if lmerge: 

122 return record_dict 

123 else: 

124 return TimeseriesWithCitation(**record_dict) 

125 else: 

126 return None 

127 

128 try: 

129 # translation of flags should be done here! 

130 lmerge = False 

131 flags = ",".join([item.strip() for v in query_params.getlist("flags") for item in v.split(',')]) 

132 if not flags: 

133 flags = None 

134 station_code = query_params.get("station_code", None) 

135 if station_code: 

136 limit, offset, fields, format, filters = create_filter(query_params, "timeseries_merged") 

137 lmerge = True 

138 else: 

139 limit, offset, fields, format, filters = create_filter(query_params, "data") 

140 d_filter = filters["d_filter"] 

141 if lmerge: 

142 try: 

143 start_date = dt.datetime.fromisoformat(daterange[0]) 

144 stop_date = dt.datetime.fromisoformat(daterange[1]) 

145 except: 

146 start_date = dt.datetime.fromisoformat(f"{daterange[0]}-01-01") 

147 stop_date = dt.datetime.fromisoformat(f"{daterange[1]}-12-31") 

148 d_filter += f"datetime BETWEEN '{start_date}' AND '{stop_date}'" 

149 fields_list = [] 

150 if fields: 

151 fields_list = fields.split(',') 

152 columns = ( [getattr(models.Data, field) for field in fields_list] 

153 if fields_list 

154 else list(models.Data.__table__.columns) ) 

155 except KeyError as e: 

156 status_code=400 

157 return JSONResponse(status_code=status_code, content=str(e)) 

158 if flags: 

159 filter_string = create_filter_from_flags(flags) 

160 data = db.query(*columns).filter(models.Data.timeseries_id == timeseries_id). \ 

161 filter(text(filter_string)). \ 

162 filter(text(d_filter)). \ 

163 order_by(models.Data.datetime).all() 

164 else: 

165 data = db.query(*columns).filter(models.Data.timeseries_id == timeseries_id). \ 

166 filter(text(d_filter)). \ 

167 order_by(models.Data.datetime).all() 

168 # get advantages from pydantic, but without having another call of the REST API 

169 # (especially needed for testing with pytest!) 

170 metadata = get_timeseries_meta(timeseries_id, lmerge) 

171# TOAR day 2022-08-09: We want the timezone information for the requested data 

172# metadata['timezone'] = ... 

173 if not metadata: 

174 return None 

175 if format == 'json': 

176 if lmerge: 

177 return columns, metadata, data 

178 else: 

179 composite = schemas.Composite(metadata=metadata, data=data) 

180 return composite 

181 elif format == 'csv': 

182 if not lmerge: 

183 # start with metadata 

184 content = '#' + metadata.json(indent=4, ensure_ascii=False).replace('\n', '\n#') + '\n' 

185 # add header 

186 content += ','.join(column.name for column in columns) + '\n' 

187 # now the data 

188 content += '\n'.join(','.join(f"{getattr(curr, column.name)}" for column in columns) for curr in data) 

189 return Response(content=content, media_type="text/csv") 

190 # take response for csv in timeseries_merge 

191 else: 

192 return columns, metadata, data 

193 else: 

194 status_code=400 

195 message='Invalid format!' 

196 return JSONResponse(status_code=status_code, content=message) 

197 

198 

199def get_next_version(db: Session, timeseries_id: int, path_params, query_params): 

200 data_version = db.query(func.max(models.Data.version)). \ 

201 filter(models.Data.timeseries_id == timeseries_id). \ 

202 distinct().first() 

203 if is_preliminary(data_version[0]): 

204 status_code = 407 

205 message = 'Preliminary data has no next version!' 

206 return JSONResponse(status_code=status_code, content=message) 

207 else: 

208 splitted = data_version[0].split('.') 

209 lmajor = query_params.get("major", "False").lower() == "true" 

210 major = int(splitted[0]) 

211 if lmajor: 

212 major += 1 

213 minor = 0 

214 else: 

215 minor = int(splitted[1]) + 1 

216 return f"{major:06d}.{minor:06d}.00000000000000" 

217 

218 

219def get_map_data(db:Session, variable_id, daterange): 

220 daterange = daterange.split(',') 

221 query_string = f"timeseries_id, value FROM timeseries t, data d WHERE d.timeseries_id=t.id AND t.variable_id={variable_id} AND d.datetime BETWEEN '{daterange[0]}' AND '{daterange[1]}'" 

222 data = db.execute(select(text(query_string))).all() 

223 return data 

224 

225 

226def get_data_with_staging(db: Session, timeseries_id: int, flags: str, format: str): 

227 

228 # BUT for some unknown reasons, get_timeseries does not return variable and programme! 

229 def get_timeseries_meta(timeseries_id): 

230 record = get_timeseries(db, timeseries_id=timeseries_id) 

231 # for some unknown reasons, variable, changelog, and programme have to be 'loaded' into the structure?! 

232 if record: 

233 dummy = record.variable 

234 dummy = record.programme 

235 dummy = record.changelog 

236 attribution, citation, license_txt = get_citation(db, timeseries_id=timeseries_id).values() 

237 record_dict = record.__dict__ 

238 record_dict['citation'] = citation 

239 record_dict['license'] =license_txt 

240 if attribution != None: 

241 record_dict['attribution'] = attribution 

242 return TimeseriesWithCitation(**record_dict) 

243 else: 

244 return None 

245 if flags: 

246 filter_string = create_filter_from_flags(flags) 

247 data = db.query(models.Data).filter(models.Data.timeseries_id == timeseries_id).filter(text(filter_string)).order_by(models.Data.datetime).all() 

248 else: 

249 query_txt = f"SELECT distinct ON (datetime) * \ 

250 FROM ( SELECT *,1 AS db FROM staging.data WHERE timeseries_id={timeseries_id} \ 

251 UNION \ 

252 SELECT *,2 AS db FROM public.data WHERE timeseries_id={timeseries_id}) AS mix \ 

253 ORDER BY datetime, db" 

254 data = db.query(models.Data).from_statement(text(query_txt)).all() 

255 

256 # get advantages from pydantic, but without having another call of the REST API 

257 # (especially needed for testing with pytest!) 

258 metadata = get_timeseries_meta(timeseries_id) 

259# TOAR day 2022-08-09: We want the timezone information for the requested data 

260# metadata['timezone'] = ... 

261 if not metadata: 

262 return None 

263 if format == 'json': 

264 composite = schemas.Composite(metadata=metadata, data=data) 

265 return composite 

266 elif format == 'csv': 

267 # start with metadata 

268 content = '#' + metadata.json(indent=4, ensure_ascii=False).replace('\n', '\n#') + '\n' 

269 # add header 

270 content += ','.join(column.name for column in models.Data.__mapper__.columns) + '\n' 

271 # now the data 

272 content += '\n'.join(','.join(f"{getattr(curr, column.name)}" for column in models.Data.__mapper__.columns) for curr in data) 

273 return Response(content=content, media_type="text/csv") 

274 else: 

275 status_code=400 

276 message='Invalid format!' 

277 return JSONResponse(status_code=status_code, content=message) 

278 

279 

280def get_all_merged_timeseries_ids(data): 

281 ts_ids = [] 

282 for _, _, ts_id in data: 

283 if not ts_id in ts_ids: 

284 ts_ids.append(ts_id) 

285 return ts_ids 

286 

287def replace_v8_v9_elements(elements): 

288 # Initialize variables to store the elements with 'v8' and 'v9' 

289 v8_elements = [] 

290 v9_elements = [] 

291 

292 # Iterate through the list of elements 

293 for i, element in enumerate(elements): 

294 # Check if the fourth element is 2012 

295 if element[3] == 2012: 

296 # Check if the seventh element is 'v8' or 'v9' -- there may be more than one element of v8 and v9 

297 if element[6] == 'v8': 

298 v8_elements.append(element) 

299 elif element[6] == 'v9': 

300 v9_elements.append(element) 

301 

302 # Check if both 'v8' and 'v9' elements were found 

303 if v8_elements != [] and v9_elements != []: 

304 # Create a new element with the third item substituted with 'v89' 

305 # use elements with highest coverages 

306 

307 v8_idx = max(range(len(v8_elements)), key=lambda i: v8_elements[i][4]) 

308 v8_element = v8_elements[v8_idx] 

309 v9_idx = max(range(len(v9_elements)), key=lambda i: v9_elements[i][4]) 

310 v9_element = v9_elements[v9_idx] 

311 new_element = (v8_element[0], v8_element[1], 'v89', v8_element[3], v8_element[4], v8_element[5], v8_element[6], v8_element[7], v8_element[8]) 

312 

313 # To edit: if there are other elements of the sequential years, edit there timeseies id to 'v89' 

314 # Add the new element to the list 

315 index_v8_element = elements.index(v8_element) 

316 elements[index_v8_element] = new_element 

317 # Remove the 'v8' and 'v9' elements from the list 

318 for i in range(len(elements) - 1, -1, -1): 

319 if elements[i][3] == 2012 and elements[i][6] in ['v8', 'v9'] and elements[i][2] != 'v89': 

320 del elements[i] 

321 

322 return elements, v8_element, v9_element 

323 else: 

324 return elements, None, None 

325 

326 

327def select_provider(data): 

328 # Organize data by year. Extract the timeseries entry from the provider with the highest rank. 

329 # Input data example: [(46, 5, 18763, 1991, 0.3442922374429223, 1, 'N/A', dt.datetime(1993, 1, 1, 2, 0, tzinfo=dt.timezone.utc), dt.datetime(2025, 3, 5, 23, 0, tzinfo=dt.timezone.utc)), (46, 5, 18763, 1992, 0.345, 1, 'N/A', dt.datetime(1993, 1, 1, 2, 0, tzinfo=dt.timezone.utc), dt.datetime(2025, 3, 5, 23, 0, tzinfo=dt.timezone.utc)), (46, 5, 18763, 1993, 0.902054794520548, 1, 'N/A', dt.datetime(1993, 1, 1, 2, 0, tzinfo=dt.timezone.utc), dt.datetime(2025, 3, 5, 23, 0, tzinfo=dt.timezone.utc)), (46, 5, 18764, 1994, 0.56, 1, 'N/A', dt.datetime(2008, 1, 1, 1, 0, tzinfo=dt.timezone.utc) 

330 year_data = defaultdict(list) 

331 for _, _, ts_id, year, coverage, order, version, start_date, end_date in data: 

332 year_data[year].append((ts_id, coverage, order, version, start_date, end_date)) 

333 

334 selected = [] 

335 # Process each year 

336 for year, entries in year_data.items(): 

337 # Sort by provider order (ascending, since lower is more relevant) 

338 entries.sort(key=lambda x: x[2]) 

339 

340 best_ts_id, best_coverage = entries[0][0], entries[0][1] 

341 

342 if best_coverage >= 0.75: 

343 selected.append((year, best_ts_id)) 

344 continue 

345 

346 for ts_id, coverage, *_ in entries[1:]: 

347 # do not confuse user in taking data of lowest quality, even if there is no data 

348 if (coverage < 0.2 and best_coverage == 0.): 

349 continue 

350 if coverage >= best_coverage * 1.2: 

351 best_ts_id, best_coverage = ts_id, coverage 

352 

353 if best_coverage >= 0.75: 

354 break 

355 selected.append((year, best_ts_id)) 

356 return selected 

357 

358def ensure_v8_before_v9_and_strip_tags(entries): 

359 #Handling the case when v9's TS ID is smaller than v8's TS ID. v9 has to be always located after v8, whatever TS ID they contain. 

360 v8_index = None 

361 v9_index = None 

362 

363 # Find the indices of v8 and v9 

364 for i, entry in enumerate(entries): 

365 if len(entry) == 3: 

366 if entry[2] == 'v8': 

367 v8_index = i 

368 elif entry[2] == 'v9': 

369 v9_index = i 

370 

371 # Swap if v9 comes before v8 

372 if v9_index is not None and v8_index is not None and v9_index < v8_index: 

373 entries[v8_index], entries[v9_index] = entries[v9_index], entries[v8_index] 

374 

375 # Strip the third element if present 

376 cleaned_entries = [(entry[0], entry[1]) if len(entry) == 3 else entry for entry in entries] 

377 

378 return cleaned_entries 

379 

380def merge_sequences(selected): 

381 # Merge the timeseries (same timeseries id) from sequential years together. 

382 merged = [] 

383 if not selected: 

384 return merged 

385 

386 selected.sort() 

387 eeasorted_selected = ensure_v8_before_v9_and_strip_tags(selected) 

388 

389 start_year, ts_id = eeasorted_selected[0] 

390 end_year = start_year 

391 

392 for year, current_ts_id in eeasorted_selected[1:]: 

393 if current_ts_id != ts_id: 

394 merged.append([start_year, end_year, ts_id]) 

395 start_year = year 

396 end_year = start_year 

397 ts_id = current_ts_id 

398 else: 

399 end_year = year 

400 merged.append([start_year, end_year, ts_id]) 

401 return merged 

402 

403def format_timeseries(data, v8_element, v9_element): 

404 def format_start(start_year): 

405 # Formatting the data outputted from the previous function. 

406 formatted_date = f"{start_year}-01-01 00:00" 

407 return formatted_date 

408 def format_end(end_year): 

409 formatted_date = f"{end_year}-12-31 23:00" 

410 return formatted_date 

411 

412 v8_end_time = None 

413 

414 if v8_element and v9_element: 

415 # Treating the case when v8_element and v9_element exist. 

416 idelete = 0 

417 for i, (start, end, ts_id) in enumerate(data): 

418 if ts_id == v8_element[2]: 

419 new_end = v8_element[8] # Replace second element 

420 if new_end.year > end: 

421 new_end = dt.datetime(end,12,31,23,0, tzinfo=dt.timezone.utc) 

422 v8_end_time = new_end # Save for comparison 

423 data[i] = [format_start(start), new_end.strftime("%Y-%m-%d %H:%M"), ts_id] 

424 elif ts_id == v9_element[2]: 

425 new_start = v9_element[7] 

426 if new_start.year < start: 

427 new_start = dt.datetime(start,1,1,0,0, tzinfo=dt.timezone.utc) 

428 if new_start <= v8_end_time: 

429 new_start = v8_end_time + dt.timedelta(hours=1) 

430 if v8_end_time and v8_end_time == dt.datetime(2012, 12, 31, 23, tzinfo=pytz.utc): 

431 new_start = dt.datetime(2013,1,1,0,0, tzinfo=dt.timezone.utc) 

432 if new_start > v9_element[8]: 

433 idelete = i 

434 data[i] = [new_start.strftime("%Y-%m-%d %H:%M"), format_end(end), ts_id] # Replace first element 

435 else: 

436 # Format other dates 

437 data[i] = [format_start(start), format_end(end), ts_id] 

438 if idelete: 

439 del data[idelete] 

440 else: 

441 for i, (start, end, ts_id) in enumerate(data): 

442 data[i] = [format_start(start), format_end(end), ts_id] 

443 return data 

444 

445def substitute_v89_entry(entries, v8_element, v9_element): 

446 new_elements = [(2012, v8_element[2], "v8"), (2012, v9_element[2], "v9")] 

447 modified_entries = [(year, tsid) for year, tsid in entries if tsid != 'v89'] + new_elements 

448 return modified_entries 

449 

450def get_merging_list(db, station_code: str, variable_id: str, daterange: str): 

451 

452 query_string = f"y.*, t.order, t.provider_version, t.data_start_date, t.data_end_date FROM yearly_coverage y, timeseries t, stationmeta_core s WHERE t.id=y.timeseries_id AND s.id = y.station_id AND '{station_code}'=ANY(s.codes) AND y.variable_id={variable_id} ORDER BY year, t.order;" 

453 records = db.execute(select(text(query_string))).all() 

454 

455 new_records, v8_element, v9_element = replace_v8_v9_elements(records) # Create fake timeseries IDs for v8 and v9 timeseries to make them distinguishable in the next steps 

456 result = select_provider(new_records) # Coverage check 

457 

458 eea_ts_id = next((ts_id for year, ts_id in result if year == 2012), None) # Extract ts_id for any time series of 2012 

459 modified_entries = None 

460 if eea_ts_id == 'v89': # if a timeseries with "v89" has passed through the coverage check, the next steps will execute the processing of the 2012 EEA particular data along with the general data. 

461 modified_entries = substitute_v89_entry(result, v8_element, v9_element) 

462 

463 if modified_entries: 

464 merged_results = merge_sequences(modified_entries) 

465 ts_list = format_timeseries(merged_results, v8_element, v9_element) 

466 else: 

467 merged_results = merge_sequences(result) # no special processing, since no splitted 2012 EEA data were found. 

468 ts_list = format_timeseries(merged_results, None, None) 

469 

470 if daterange: 

471 start_date, end_date = [dt.datetime.fromisoformat(date) for date in daterange.split(',')] 

472 ts_list_daterange = [] 

473 for ts in ts_list: 

474 try: 

475 ts_start = dt.datetime.fromisoformat(ts[0]) 

476 ts_end = dt.datetime.fromisoformat(ts[1]) 

477 except: 

478 ts_start = dt.datetime.fromisoformat(f"{ts[0]}-01-01") 

479 ts_end = dt.datetime.fromisoformat(f"{ts[1]}-12-31") 

480 if (start_date <= ts_start <= end_date) or (start_date <= ts_end <= end_date) or (ts_start <= start_date and ts_end >= end_date): 

481 intersection_start = max(start_date, ts_start) 

482 intersection_end = min(end_date, ts_end) 

483 ts_list_daterange.append([intersection_start.isoformat(), intersection_end.isoformat(), ts[2]]) 

484 ts_list = ts_list_daterange 

485 timeseries_ids = [] 

486 if ts_list == []: 

487 timeseries_ids = get_all_merged_timeseries_ids(merged_results) 

488 return ts_list, timeseries_ids 

489 

490 

491def get_merged_data(db: Session, variable_id: int, station_code: str, path_params, query_params): 

492 

493 ## still to be done: the daterange given by the user should be taken into account to only load the data that is needed 

494 # do this in the first place (do not cut the data short later) 

495 # --> take the daterange as a separate argument to be able know it, when being ignored by utils.create_filters 

496 

497 daterange = query_params.get("daterange", None) 

498 

499 ## check, whether station_code exists 

500 db_stationmeta = get_stationmeta(db, station_code=station_code, fields="id") 

501 if db_stationmeta is None: 

502 raise HTTPException(status_code=404, detail=f"Metadata for station '{station_code}' not found.") 

503 

504 ts_list, timeseries_ids = get_merging_list(db, station_code, variable_id, daterange) 

505 data_merged = {'metadata': [], 

506 'data': []} 

507 used_timeseries_ids = [] 

508 if ts_list == []: 

509 # some time series do not have an entry in the yearly coverage table! 

510 if timeseries_ids == []: 

511 data_merged['metadata'] = search_all(db, path_params, query_params, endpoint='timeseries_merged') 

512 data_merged['data'] = [] 

513 return data_merged 

514 for ts_id in timeseries_ids: 

515 ts_list.append(['1960-01-01', '1961-01-01', ts_id]) 

516 for ts_part in ts_list: 

517 columns, metadata, data = get_data(db, ts_part[2], path_params, query_params, daterange=[ts_part[0], ts_part[1]]) 

518 data_merged['data'] += data 

519 # do not show metadata twice 

520 if not ts_part[2] in used_timeseries_ids: 

521 data_merged['metadata'] += [ TimeseriesWithCitation(**metadata) ] 

522 used_timeseries_ids.append(ts_part[2]) 

523 format = query_params.get("format", "json") 

524 if format == 'csv': 

525 content = '' 

526 # start with metadata 

527 for part_meta in data_merged['metadata']: 

528 content += '#' + part_meta.json(indent=4, ensure_ascii=False).replace('\n', '\n#') + '\n' 

529 # add header 

530 content += ','.join(column.name for column in columns) + '\n' 

531 # now the data 

532 content += '\n'.join(','.join(f"{getattr(curr, column.name)}" for column in columns) for curr in data_merged['data']) 

533 return Response(content=content, media_type="text/csv") 

534 else: 

535 return data_merged 

536 

537def get_data_by_datetime_and_timeseriesid(db: Session, datetime: dt.datetime, timeseries_id: int): 

538 return db.query(models.Data).filter([models.Data.datetime== datetime, models.Data.timeseries_id == timeseries_id]).first() 

539 

540def get_all_data(db: Session, limit: int, offset: int = 0): 

541 return db.query(models.Data).order_by(models.Data.datetime).limit(limit).all() 

542 

543def is_preliminary(version: str): 

544 return (version.split('.')[0] == '000000') 

545 

546def create_data_record(db: Session, engine: Engine, 

547 series_id: int, datetime: dt.datetime, 

548 value: float, flag: str, version: str, 

549 author_id: int): 

550 toarqc_config_type: str = 'standard' 

551 timeseries = get_timeseries(db=db,timeseries_id=series_id) 

552 variable = get_variable(db=db, variable_id=timeseries.variable_id) 

553 parameter = variable.name 

554 data_dict = {"datetime": datetime, 

555 "value": value, 

556 "flags": flag, 

557 "version": version, 

558 "timeseries_id": series_id} 

559 df = pd.DataFrame([data_dict]).set_index("datetime") 

560 try: 

561 test_config = get_toarqc_config('static/../toardb/data/toarqc_config',parameter, toarqc_config_type) 

562 except FileNotFoundError: 

563 message = f'no toarqc configuration found for {parameter}, {toarqc_config_type}' 

564 status_code = 400 

565 return JSONResponse(status_code=status_code, content=message) 

566 result = run_toarqc(test_config, df, ok_limit=0.85, questionable_limit=0.6) 

567 combined_flag_matrix = {( 'OK', 'OK') : 'OKPreliminaryNotChecked', 

568 ( 'Questionable', 'OK') : 'QuestionablePreliminaryNotChecked', 

569 ( 'Erroneous', 'OK') : 'ErroneousPreliminaryNotChecked', 

570 ( 'OK', 'Erroneous') : 'ErroneousPreliminaryFlagged1', 

571 ( 'Questionable', 'Erroneous') : 'ErroneousPreliminaryFlagged2', 

572 ( 'Erroneous', 'Erroneous') : 'Erroneous_Preliminary_Confirmed'} 

573 combined_flag = combined_flag_matrix[(flag,result['flags'].iloc[0])] 

574 data_dict["flags"] = get_value_from_str(toardb.toardb.DF_vocabulary,combined_flag.strip()) 

575 data = models.Data(**data_dict) 

576 db.rollback() 

577 db.add(data) 

578 # adjust data_start_date, data_end_date 

579 datetime = datetime.replace(tzinfo=timeseries.data_end_date.tzinfo) 

580 if datetime < timeseries.data_start_date: 

581 timeseries.data_start_date = datetime 

582 if datetime > timeseries.data_end_date: 

583 timeseries.data_end_date = datetime 

584 db.add(timeseries) 

585 result = db.commit() 

586 db.refresh(data) 

587 # if not preliminary data: create changelog entry 

588 if not is_preliminary(version): 

589 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"Created") 

590 description="data record created" 

591 db_changelog = TimeseriesChangelog(description=description, timeseries_id=series_id, author_id=author_id, type_of_change=type_of_change, 

592 old_value='', new_value='', period_start=datetime, period_end=datetime, version=version) 

593 db.add(db_changelog) 

594 db.commit() 

595 status_code=200 

596 message='Data successfully inserted.' 

597 return JSONResponse(status_code=status_code, content=message) 

598 

599 

600def insert_dataframe (db: Session, engine: Engine, df: pd.DataFrame, toarqc_config_type: str = 'standard', dry_run: bool = False, parameter: str = 'o3', preliminary: bool = False, force: bool = False): 

601 # df: pandas.DataFrame 

602 # index: datetime 

603 # 1st column: value 

604 # 2nd column: flags (0: 'OK', 1: 'Questionable', 2: 'Erroneous') 

605 # 3rd column: timeseries_id 

606 # 4th column: version 

607 df['flags'] = df['flags'].replace([0],'OK') 

608 df['flags'] = df['flags'].replace([1],'Questionable') 

609 df['flags'] = df['flags'].replace([2],'Erroneous') 

610 try: 

611 test_config = get_toarqc_config('static/../toardb/data/toarqc_config',parameter, toarqc_config_type) 

612 except FileNotFoundError: 

613 message = f'no toarqc configuration found for {parameter}, {toarqc_config_type}' 

614 status_code = 400 

615 return JSONResponse(status_code=status_code, content=message) 

616 # get flags and qc-statistics from toarqc  

617 # available keys in result (at the moment): 

618 # 'time_series' 

619 # 'test_config' 

620 # 'ok_limit' 

621 # 'questionable_limit' 

622 # 'metadata' 

623 # 'plot' 

624 # 'outfile' 

625 # 'plot_type' 

626 # 'results' 

627 # 'qc-score' 

628 # 'flags' 

629 # 'number_OK' 

630 # 'number_Questionable' 

631 # 'number_Erroneous' 

632 # 'percentage_OK' 

633 # 'percentage_Questionable' 

634 # 'percentage_Erroneous' 

635 # 'qc-score_g0' 

636 # 'flags_g0' 

637 # 'qc-score_g1' 

638 # 'flags_g1' 

639 # 'qc-score_g2' 

640 # 'flags_g2' 

641 # 'qc-score_g3' 

642 # 'flags_g3' 

643 # 'qc-score_g0_range_test' 

644 # 'flags_g0_range_test' 

645 # 'qc-score_g1_sigma_test' 

646 # 'flags_g1_sigma_test' 

647 # 'qc-score_g2_constant_value_test' 

648 # 'flags_g2_constant_value_test' 

649 # 'qc-score_g2_positive_spike_test' 

650 # 'flags_g2_positive_spike_test' 

651 # 'qc-score_g2_negative_spike_test' 

652 # 'flags_g2_negative_spike_test' 

653 # 'qc-score_g3_before_nan_test' 

654 # 'flags_g3_before_nan_test' 

655 # 'qc-score_g3_after_nan_test' 

656 # 'flags_g3_after_nan_test' 

657 # toarqc expects pandas.Series 

658 # --> if getting pandas.DataFrame, it will take index and first column 

659 # (other columns will be ignored) 

660 if toarqc_config_type != 'realtime': 

661 try: 

662 result = run_toarqc(test_config, df, ok_limit=0.85, questionable_limit=0.6) 

663 except ValueError as ve: 

664 message = {"detail":{"message":f'Aborted by automatic quality control: {ve}'}} 

665 status_code = 445 

666 return JSONResponse(status_code=status_code, content=message) 

667 percentage_OK = result['percentage_OK'] 

668 if percentage_OK >= 0.9 or force: 

669 target_schema='public' 

670 message = {"detail":{"message":"Data successfully inserted."}} 

671 status_code = 200 

672 else: 

673 target_schema='staging' 

674 message = f'Aborted by automatic quality control: percentage_OK = {percentage_OK}' 

675 status_code = 446 

676 toarqc_flags = result['flags'] 

677 # combine toarqc flags with flags given by provider (see matrix: TOAR_UG_Vol03_Database_2021-05.docx, chapter 5.2) 

678 # preliminary provider toarqc combined 

679 combined_flag_matrix = {( False, 'OK', 'OK') : 'OKValidatedQCPassed', 

680 ( False, 'OK', 'Questionable') : 'QuestionableValidatedFlagged', 

681 ( False, 'OK', 'Erroneous') : 'ErroneousValidatedFlagged1', 

682 ( False, 'Questionable', 'OK') : 'QuestionableValidatedUnconfirmed', 

683 ( False, 'Questionable', 'Questionable') : 'QuestionableValidatedConfirmed', 

684 ( False, 'Questionable', 'Erroneous') : 'ErroneousValidatedFlagged2', 

685 ( False, 'Erroneous', 'OK') : 'ErroneousValidatedUnconfirmed', 

686 ( False, 'Erroneous', 'Questionable') : 'ErroneousValidatedConfirmed', 

687 ( False, 'Erroneous', 'Erroneous') : 'ErroneousValidatedConfirmed', 

688 ( True, 'OK', 'OK') : 'OKPreliminaryQCPassed', 

689 ( True, 'OK', 'Questionable') : 'QuestionablePreliminaryFlagged', 

690 ( True, 'OK', 'Erroneous') : 'ErroneousPreliminaryFlagged1', 

691 ( True, 'Questionable', 'OK') : 'QuestionablePreliminaryUnconfirmed', 

692 ( True, 'Questionable', 'Questionable') : 'QuestionablePreliminaryConfirmed', 

693 ( True, 'Questionable', 'Erroneous') : 'ErroneousPreliminaryFlagged2', 

694 ( True, 'Erroneous', 'OK') : 'ErroneousPreliminaryUnconfirmed', 

695 ( True, 'Erroneous', 'Questionable') : 'ErroneousPreliminaryConfirmed', 

696 ( True, 'Erroneous', 'Erroneous') : 'ErroneousPreliminaryConfirmed'} 

697 combined_flags = [ combined_flag_matrix[(preliminary,provider_flag,toarqc_flag)] for provider_flag, toarqc_flag in zip(df['flags'],toarqc_flags) ] 

698 else: 

699 range_test = RangeTest(**test_config[0]["range_test"]) 

700 toarqc_flags = range_test.run(df['value']) 

701 toarqc_flags = toarqc_flags.replace([1],'OK') 

702 toarqc_flags = toarqc_flags.replace([0],'Erroneous') 

703 target_schema='public' 

704 status_code = 200 

705 message = {"detail":{"message":"Data successfully inserted."}} 

706 

707 # combine toarqc flags with flags given by provider (see matrix: TOAR_UG_Vol03_Database_2021-05.docx, chapter 5.2) 

708 # the range test is not a full QC test ==> the range test only returns 'OK' and 'Erroneous' 

709 # provider toarqc combined 

710 combined_flag_matrix = {( 'OK', 'OK') : 'OKPreliminaryNotChecked', 

711 ( 'Questionable', 'OK') : 'QuestionablePreliminaryNotChecked', 

712 ( 'Erroneous', 'OK') : 'ErroneousPreliminaryNotChecked', 

713 ( 'OK', 'Erroneous') : 'ErroneousPreliminaryFlagged1', 

714 ( 'Questionable', 'Erroneous') : 'ErroneousPreliminaryFlagged2', 

715 ( 'Erroneous', 'Erroneous') : 'Erroneous_Preliminary_Confirmed'} 

716 combined_flags = [ combined_flag_matrix[(provider_flag, toarqc_flag)] for provider_flag, toarqc_flag in zip(df['flags'],toarqc_flags) ] 

717 # exchange combined flags in dataframe 

718 flag_num = [get_value_from_str(toardb.toardb.DF_vocabulary,flag.strip()) for flag in combined_flags] 

719 del df['flags'] 

720 df.insert(1, 'flags', flag_num) 

721 if dry_run: 

722 return JSONResponse(status_code=status_code, content=df.to_json(orient='table', date_format='iso')) 

723 else: 

724 buf = StringIO() 

725 df.to_csv(buf, header=False) 

726 buf.pos = 0 

727 buf.seek(0) 

728 with closing(engine.raw_connection()) as fake_conn: 

729 fake_cur = fake_conn.cursor() 

730 try: 

731 fake_cur.copy_expert(f"COPY {target_schema}.data (datetime, value, flags, timeseries_id, version) FROM STDIN WITH CSV DELIMITER ',';", buf) 

732 fake_conn.commit() 

733 except: 

734 e = sys.exc_info()[0] 

735 message = f"An error occurred in {target_schema}.data insertion: %s" % (e,) 

736 status_code = 400 

737 fake_cur.close() 

738 return JSONResponse(status_code=status_code, content=message) 

739 

740 

741def create_data(db: Session, engine: Engine, author_id: int, input_handle: UploadFile = File(...), toarqc_config_type: str = 'standard', dry_run: bool = False, 

742 force: bool = False): 

743 # a timeseries is defined by the unique_constraint of (station_id, variable_id, ...) 

744 # station_id: from header 

745 # variable_id: from database (with variable_name -- from filename) 

746 # get variable_name from filename 

747 variable_name = input_handle.filename.split('_')[0] 

748 variable = db.query(variables_models.Variable).filter(variables_models.Variable.name == variable_name).first() 

749 variable_id = variable.id 

750 # get header information (station_id, contributor_shortname, timeshift_from_utc) 

751 line = '#bla' 

752 f = input_handle.file 

753 prev = pos = 0 

754 while line[0] == '#': 

755 line = f.readline().decode('utf-8') 

756 key = line.split(':')[0].lower().strip() 

757 if key == "#station_id": 

758 station_id = line.split(':')[1] 

759 if key == "#timeshift_from_utc": 

760 timeoffset = dt.timedelta(hours=float(line.split(':')[1])) 

761 if key == "#dataset_contributor_organisation_longname": 

762 contributor = line.split(':')[1].strip() 

763 if key == "#dataset_pi_organisation_longname": 

764 contributor = line.split(':')[1].strip() 

765 if key == "#dataset_resourceprovider_organisation_longname": 

766 contributor = line.split(':')[1].strip() 

767 prev, pos = pos, f.tell() 

768 f.seek(prev) 

769 station_code = station_id.strip() 

770 stationmeta_core = get_stationmeta(db=db,station_code=station_code,fields="id") 

771 station_id = stationmeta_core["id"] 

772 timeseries = get_timeseries_by_unique_constraints(db=db,station_id=station_id,variable_id=variable_id,resource_provider=contributor) 

773 # again problems with converted coordinates! 

774 db.rollback() 

775 version = '000001.000000.00000000000000' 

776 if timeseries: 

777 timeseries_id = timeseries.id 

778 # open SpooledTemporaryFile, skip header (and also try to insert timeseries_id!) 

779 # python3.8: bug in SpooledTemporaryFile --> https://bugs.python.org/issue26175 

780 # --> https://github.com/fedspendingtransparency/usaspending-api/pull/2963 

781 # ==> I have to check for a workaround! 

782 df = pd.read_csv(input_handle.file, comment='#', header=None, sep=';',names=["time","value","flags"],parse_dates=["time"],index_col="time") 

783 # substract timeshift to convert data to UTC 

784 df.index = df.index - timeoffset 

785 # now insert the timeseries_id to the end of the data frame 

786 df.insert(2, 'timeseries_id', timeseries_id) 

787 # also insert version 

788 df.insert(3, 'version', version) 

789 # datetime needs timezone information 

790 df = df.tz_localize('UTC') 

791 result = insert_dataframe (db, engine, df, toarqc_config_type=toarqc_config_type, parameter=variable_name, dry_run=dry_run, force=force) 

792 # adjust data_start_date, data_end_date 

793 if result.status_code == 200: 

794 data_start_date = min(df.index) 

795 data_end_date = max(df.index) 

796 if data_start_date < timeseries.data_start_date: 

797 timeseries.data_start_date = data_start_date 

798 if data_end_date > timeseries.data_end_date: 

799 timeseries.data_end_date = data_end_date 

800 db.add(timeseries) 

801 db.commit() 

802 return result 

803 else: 

804 message = f'Timeseries not found for station {station_code.strip()}, variable {variable_name}' 

805 status_code = 400 

806 return JSONResponse(status_code=status_code, content=message) 

807 

808 

809def create_bulk_data(db: Session, engine: Engine, bulk: List[schemas.DataCreate], author_id: int, 

810 toarqc_config_type: str = 'standard', dry_run: bool = False, force: bool = False): 

811 df = pd.DataFrame([x.dict() for x in bulk]).set_index("datetime") 

812 # bulk data: to be able to do at least a range test, all data needs to be from the same parameter 

813 # variable_name is therefore determined for the first entry of the dataframe 

814 timeseries = get_timeseries(db=db,timeseries_id=int(df['timeseries_id'].iloc[0])) 

815 # again problems with converted coordinates! 

816 db.rollback() 

817 variable = get_variable(db=db, variable_id=timeseries.variable_id) 

818 variable_name = variable.name 

819 insert_result = insert_dataframe (db, engine, df, toarqc_config_type=toarqc_config_type, parameter=variable_name, dry_run=dry_run, force=force) 

820 # adjust data_start_date, data_end_date 

821 if insert_result.status_code == 200: 

822 data_start_date = min(df.index) 

823 data_end_date = max(df.index) 

824 if data_start_date < timeseries.data_start_date: 

825 timeseries.data_start_date = data_start_date 

826 if data_end_date > timeseries.data_end_date: 

827 timeseries.data_end_date = data_end_date 

828 db.add(timeseries) 

829 db.commit() 

830 return insert_result 

831 

832 

833def patch_data(db: Session, engine: Engine, description: str, version: str, toarqc_config_type: str, 

834 force: bool, author_id: int, input_handle: UploadFile = File(...)): 

835 # a timeseries is defined by the unique_constraint of (station_id, variable_id, ...) 

836 # station_id: from header 

837 # variable_id: from database (with variable_name -- from filename) 

838 # get variable_name from filename 

839 

840 # versionlabel has to be unique for this timeseries ==> to be checked! 

841 

842 variable_name = input_handle.filename.split('_')[0] 

843 variable = db.query(variables_models.Variable).filter(variables_models.Variable.name == variable_name).first() 

844 variable_id = variable.id 

845 # get header information (station_id, contributor_shortname, timeshift_from_utc) 

846 line = '#bla' 

847 f = input_handle.file 

848 prev = pos = 0 

849 while line[0] == '#': 

850 line = f.readline().decode('utf-8') 

851 key = line.split(':')[0].lower().strip() 

852 if key == "#station_id": 

853 station_id = line.split(':')[1] 

854 if key == "#timeshift_from_utc": 

855 timeoffset = dt.timedelta(hours=float(line.split(':')[1])) 

856 if key == "#dataset_contributor_organisation_longname": 

857 contributor = line.split(':')[1].strip() 

858 if key == "#dataset_pi_organisation_longname": 

859 contributor = line.split(':')[1].strip() 

860 if key == "#dataset_resourceprovider_organisation_longname": 

861 contributor = line.split(':')[1].strip() 

862 prev, pos = pos, f.tell() 

863 f.seek(prev) 

864 station_code = station_id 

865 stationmeta_core = get_stationmeta(db=db,station_code=station_code,fields="id") 

866 station_id = stationmeta_core["id"] 

867 timeseries = get_timeseries_by_unique_constraints(db=db,station_id=station_id,variable_id=variable_id, resource_provider=contributor) 

868 # again problems with converted coordinates! 

869 db.rollback() 

870 if timeseries: 

871 timeseries_id = timeseries.id 

872 # open SpooledTemporaryFile, skip header (and also try to insert timeseries_id!) 

873 df = pd.read_csv(input_handle.file, comment='#', header=None, sep=';',names=["time","value","flags"],parse_dates=["time"],index_col="time") 

874 # substract timeshift to convert data to UTC 

875 df.index = df.index - timeoffset 

876 # now insert the timeseries_id to the end of the data frame 

877 df.insert(2, 'timeseries_id', timeseries_id) 

878 # also insert version 

879 df.insert(3, 'version', version) 

880 # datetime needs timezone information 

881 df = df.tz_localize('UTC') 

882 # determine period_start and period_end of data 

883 period_start = min(df.index) 

884 period_end = max(df.index) 

885 # mv data from this period to data_archive 

886 db.execute(insert(models.DataArchive).from_select((models.Data.datetime,models.Data.value,models.Data.flags,models.Data.version,models.Data.timeseries_id), 

887 select([models.Data]).where( 

888 and_(and_(models.Data.timeseries_id == timeseries_id, 

889 models.Data.datetime >= period_start), 

890 models.Data.datetime <= period_end)))) 

891 db.execute(delete(models.Data).where( 

892 and_(and_(models.Data.timeseries_id == timeseries_id, 

893 models.Data.datetime >= period_start), 

894 models.Data.datetime <= period_end))) 

895 db.commit() 

896 # now insert new data for this period from file 

897 buf = StringIO() 

898 df.to_csv(buf, header=False) 

899 buf.pos = 0 

900 buf.seek(0) 

901 with closing(engine.raw_connection()) as fake_conn: 

902 fake_cur = fake_conn.cursor() 

903 try: 

904 fake_cur.copy_from(buf, 'data', sep=',', columns=('datetime','value','flags','timeseries_id', 'version')) 

905 fake_conn.commit() 

906 # adjust data_start_date, data_end_date 

907 if period_start < timeseries.data_start_date: 

908 timeseries.data_start_date = period_start 

909 if period_end > timeseries.data_end_date: 

910 timeseries.data_end_date = period_end 

911 db.add(timeseries) 

912 db.commit() 

913 message = {"detail":{"message":"Data successfully inserted."}} 

914 status_code = 200 

915 except: 

916 e = sys.exc_info()[0] 

917 message = {"detail":{"message":"An error occurred in data insertion: %s" % (e,)}} 

918 status_code = 400 

919 return JSONResponse(status_code=status_code, content=message) 

920 fake_cur.close() 

921 # create changelog entry 

922 # how to determine type_of_change? 

923 # 4 – unspecified data value corrections (this holds also, if there is only one single value to be corrected; the addition "unspecified" keeps all possibilities open to add "specified" corrections later (e. g. from QC) 

924 # 5 – replaced data with a new version 

925 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"UnspecifiedData") 

926 db_changelog = TimeseriesChangelog(description=description, timeseries_id=timeseries_id, author_id=author_id, type_of_change=type_of_change, 

927 old_value="", new_value="", period_start=period_start, period_end=period_end, version=version) 

928 db.add(db_changelog) 

929 db.commit() 

930 else: 

931 message = f'Timeseries not found for station {station_code.strip()}, variable {variable_name}' 

932 status_code = 400 

933 return JSONResponse(status_code=status_code, content=message) 

934 

935def patch_bulk_data(db: Session, engine: Engine, description: str, version: str, bulk: List[schemas.DataPatch], author_id: int, 

936 toarqc_config_type: str = 'standard', force: bool = False, no_archive: bool = False): 

937 df = pd.DataFrame([x.dict() for x in bulk]).set_index("datetime") 

938 # bulk data: to be able to do at least a range test, all data needs to be from the same parameter 

939 # variable_name is therefore determined for the first entry of the dataframe 

940 timeseries_id = int(df['timeseries_id'].iloc[0]) 

941 timeseries = get_timeseries(db=db,timeseries_id=timeseries_id) 

942 variable = get_variable(db=db, variable_id=timeseries.variable_id) 

943 variable_name = variable.name 

944 timeseries_list = pd.unique(df['timeseries_id']) 

945 # next command just to avoid problems with converted coordinates 

946 db.rollback() 

947 for t_id in timeseries_list: 

948 df2 = df[df['timeseries_id'] == t_id] 

949 timeseries_id = int(t_id) 

950 # determine period_start and period_end of data 

951 period_start = min(df2.index) 

952 period_end = max(df2.index) 

953 # mv data from this period to data_archive 

954 if not no_archive: 

955 db.execute(insert(models.DataArchive).from_select((models.Data.datetime,models.Data.value,models.Data.flags,models.Data.version,models.Data.timeseries_id), 

956 select([models.Data]).where( 

957 and_(and_(models.Data.timeseries_id == timeseries_id, 

958 models.Data.datetime >= period_start), 

959 models.Data.datetime <= period_end)))) 

960 # delete all data in period that is patched 

961 db.execute(delete(models.Data).where( 

962 and_(and_(models.Data.timeseries_id == timeseries_id, 

963 models.Data.datetime >= period_start), 

964 models.Data.datetime <= period_end))) 

965 db.commit() 

966 # adjust data_start_date, data_end_date 

967 timeseries = get_timeseries(db=db,timeseries_id=timeseries_id) 

968 # next command just to avoid problems with converted coordinates 

969 db.rollback() 

970 if period_start < timeseries.data_start_date: 

971 timeseries.data_start_date = period_start 

972 if period_end > timeseries.data_end_date: 

973 timeseries.data_end_date = period_end 

974 db.add(timeseries) 

975 db.commit() 

976 result = insert_dataframe (db, engine, df, toarqc_config_type=toarqc_config_type, parameter=variable_name, force=force) 

977 if not no_archive: 

978 # create changelog entry 

979 # how to determine type_of_change? 

980 # 4 – unspecified data value corrections (this holds also, if there is only one single value to be corrected; the addition "unspecified" keeps all possibilities open to add "specified" corrections later (e. g. from QC) 

981 # 5 – replaced data with a new version 

982 type_of_change = get_value_from_str(toardb.toardb.CL_vocabulary,"Replaced") 

983 db_changelog = TimeseriesChangelog(description=description, timeseries_id=timeseries_id, author_id=author_id, type_of_change=type_of_change, 

984 old_value="", new_value="", period_start=period_start, period_end=period_end, version=version) 

985 db.add(db_changelog) 

986 db.commit() 

987 return result 

988