Coverage for toardb/contacts/crud.py: 84%
98 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 20:32 +0000
1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH
2# SPDX-License-Identifier: MIT
4"""
5Create, Read, Update, Delete functionality
7"""
9from sqlalchemy import text
10from sqlalchemy.orm import Session
11from fastapi.responses import JSONResponse
12from . import models
13from .schemas import PersonCreate, OrganisationCreate
14from toardb.utils.utils import get_value_from_str, create_filter
15import toardb
18def get_organisation(db: Session, organisation_id: int):
19 return db.query(models.Organisation).filter(models.Organisation.id == organisation_id).first()
22def get_all_organisations(db: Session, limit: int, offset: int = 0):
23 return db.query(models.Organisation).order_by(models.Organisation.id).offset(offset).limit(limit).all()
26def get_organisation_by_name(db: Session, name: str):
27 return db.query(models.Organisation).filter(models.Organisation.name == name.upper()).first()
30def get_organisation_by_longname(db: Session, longname: str):
31 return db.query(models.Organisation).filter(models.Organisation.longname == longname).first()
34def create_organisation(db: Session, organisation: OrganisationCreate):
35 db_organisation = models.Organisation(**organisation.dict())
36 db_organisation.kind = get_value_from_str(toardb.toardb.OK_vocabulary,db_organisation.kind)
37 db_organisation.country = get_value_from_str(toardb.toardb.CN_vocabulary,db_organisation.country)
38 db.add(db_organisation)
39 result = db.commit()
40 db.refresh(db_organisation)
41 db_contact = models.Contact(person_id=0,organisation_id=db_organisation.id)
42 db.add(db_contact)
43 result = db.commit()
44 db.refresh(db_contact)
45 return db_organisation
48def get_person(db: Session, person_id: int):
49 return db.query(models.Person).filter(models.Person.id == person_id).filter(models.Person.isprivate == False).first()
52def get_all_persons(db: Session, path_params, query_params):
53 try:
54 limit, offset, fields, format, filters = create_filter(query_params, "persons")
55 p_filter = filters["p_filter"]
56 except (KeyError, ValueError) as e:
57 status_code=400
58 return JSONResponse(status_code=status_code, content=str(e))
60 if fields:
61 fields2 = fields.split(',')
62 db_objects_l = db.query(*map(text,fields2)).select_from(models.Person). \
63 filter(models.Person.id != 0).filter(models.Person.isprivate == False). \
64 filter(text(p_filter)). \
65 order_by(models.Person.id). \
66 limit(limit).offset(offset)
67 db_objects = []
68 for db_object_immut in db_objects_l:
69 db_object = dict(zip(fields.split(','),db_object_immut))
70 db_objects.append(db_object)
71 else:
72 db_objects = db.query(models.Person).filter(models.Person.id != 0).filter(models.Person.isprivate == False). \
73 filter(text(p_filter)). \
74 order_by(models.Person.id).offset(offset).limit(limit).all()
75 return db_objects
78def get_person_by_name(db: Session, name: str):
79 return db.query(models.Person).filter(models.Person.name == name).filter(models.Person.isprivate == False).first()
82def get_person_by_email(db: Session, email: str):
83 return db.query(models.Person).filter(models.Person.email == email).filter(models.Person.isprivate == False).first()
86def _get_private_person_by_name(db: Session, name: str):
87 return db.query(models.Person).filter(models.Person.name == name).first()
90def create_person(db: Session, person: PersonCreate):
91 db_person = models.Person(**person.dict())
92 db.add(db_person)
93 result = db.commit()
94 db.refresh(db_person)
95 db_contact = models.Contact(person_id=db_person.id,organisation_id=0)
96 db.add(db_contact)
97 result = db.commit()
98 db.refresh(db_contact)
99 return db_person
102def get_all_contacts(db: Session, limit: int, offset: int = 0):
103 return db.query(models.Contact).join(models.Person).filter(models.Contact.person_id == models.Person.id).filter(models.Person.isprivate == False).order_by(models.Contact.id).offset(offset).limit(limit).all()
105def get_contact(db: Session, contact_id: int):
106 db_object = db.query(models.Contact).join(models.Person).filter(models.Contact.person_id == models.Person.id).filter(models.Contact.id == contact_id).filter(models.Person.isprivate == False).first()
107 if db_object:
108 if db_object.person_id == 0:
109 return db.query(models.Organisation).filter(models.Organisation.id == db_object.organisation_id).first()
110 else:
111 return db.query(models.Person).filter(models.Person.id == db_object.person_id).filter(models.Person.isprivate == False).first()
112 else:
113 return db_object
116def get_contact_by_orga_name(db: Session, name: str):
117 # issue with '/' in organisation longname ==> only possible with double encoding!
118 name = name.replace('%2F', '/')
119 db_object = db.query(models.Organisation).filter(models.Organisation.longname == name).first()
120 if db_object:
121 return db.query(models.Contact).filter(models.Contact.organisation_id == db_object.id).filter(models.Contact.person_id == 0).first()
122 else:
123 return None
126def get_contact_by_person_name(db: Session, name: str):
127 db_object = db.query(models.Person).filter(models.Person.name == name).first()
128 return db.query(models.Contact).filter(models.Contact.person_id == db_object.id).first()
131def get_contact_by_person_email(db: Session, email: str):
132 db_object = db.query(models.Person).filter(models.Person.email == email).first()
133 return db.query(models.Contact).filter(models.Contact.person_id == db_object.id).first()
136def get_contact_by_unique_constraints(db: Session, person_id, organisation_id: int):
137 return db.query(models.Contact).filter(models.Contact.organisation_id == organisation_id). \
138 filter(models.Contact.person_id == person_id).first()
141def create_contact(db: Session, person_name, organisation_longname: str):
142 if person_name:
143 db_person = _get_private_person_by_name(db,person_name)
144 else:
145 db_person = get_person(db,0)
146 if organisation_longname:
147 db_organisation = get_organisation_by_longname(db,organisation_longname)
148 else:
149 db_organisation = get_organisation(db,0)
150 db_contact = get_contact_by_unique_constraints(db,db_person.id,db_organisation.id)
151 if not db_contact:
152 db_contact = models.Contact(person_id=db_person.id,organisation_id=db_organisation.id)
153 db.add(db_contact)
154 result = db.commit()
155 db.refresh(db_contact)
156 return db_contact