Coverage for toardb/contacts/crud.py: 84%

98 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 20:32 +0000

1# SPDX-FileCopyrightText: 2021 Forschungszentrum Jülich GmbH 

2# SPDX-License-Identifier: MIT 

3 

4""" 

5Create, Read, Update, Delete functionality 

6 

7""" 

8 

9from sqlalchemy import text 

10from sqlalchemy.orm import Session 

11from fastapi.responses import JSONResponse 

12from . import models 

13from .schemas import PersonCreate, OrganisationCreate 

14from toardb.utils.utils import get_value_from_str, create_filter 

15import toardb 

16 

17 

18def get_organisation(db: Session, organisation_id: int): 

19 return db.query(models.Organisation).filter(models.Organisation.id == organisation_id).first() 

20 

21 

22def get_all_organisations(db: Session, limit: int, offset: int = 0): 

23 return db.query(models.Organisation).order_by(models.Organisation.id).offset(offset).limit(limit).all() 

24 

25 

26def get_organisation_by_name(db: Session, name: str): 

27 return db.query(models.Organisation).filter(models.Organisation.name == name.upper()).first() 

28 

29 

30def get_organisation_by_longname(db: Session, longname: str): 

31 return db.query(models.Organisation).filter(models.Organisation.longname == longname).first() 

32 

33 

34def create_organisation(db: Session, organisation: OrganisationCreate): 

35 db_organisation = models.Organisation(**organisation.dict()) 

36 db_organisation.kind = get_value_from_str(toardb.toardb.OK_vocabulary,db_organisation.kind) 

37 db_organisation.country = get_value_from_str(toardb.toardb.CN_vocabulary,db_organisation.country) 

38 db.add(db_organisation) 

39 result = db.commit() 

40 db.refresh(db_organisation) 

41 db_contact = models.Contact(person_id=0,organisation_id=db_organisation.id) 

42 db.add(db_contact) 

43 result = db.commit() 

44 db.refresh(db_contact) 

45 return db_organisation 

46 

47 

48def get_person(db: Session, person_id: int): 

49 return db.query(models.Person).filter(models.Person.id == person_id).filter(models.Person.isprivate == False).first() 

50 

51 

52def get_all_persons(db: Session, path_params, query_params): 

53 try: 

54 limit, offset, fields, format, filters = create_filter(query_params, "persons") 

55 p_filter = filters["p_filter"] 

56 except (KeyError, ValueError) as e: 

57 status_code=400 

58 return JSONResponse(status_code=status_code, content=str(e)) 

59 

60 if fields: 

61 fields2 = fields.split(',') 

62 db_objects_l = db.query(*map(text,fields2)).select_from(models.Person). \ 

63 filter(models.Person.id != 0).filter(models.Person.isprivate == False). \ 

64 filter(text(p_filter)). \ 

65 order_by(models.Person.id). \ 

66 limit(limit).offset(offset) 

67 db_objects = [] 

68 for db_object_immut in db_objects_l: 

69 db_object = dict(zip(fields.split(','),db_object_immut)) 

70 db_objects.append(db_object) 

71 else: 

72 db_objects = db.query(models.Person).filter(models.Person.id != 0).filter(models.Person.isprivate == False). \ 

73 filter(text(p_filter)). \ 

74 order_by(models.Person.id).offset(offset).limit(limit).all() 

75 return db_objects 

76 

77 

78def get_person_by_name(db: Session, name: str): 

79 return db.query(models.Person).filter(models.Person.name == name).filter(models.Person.isprivate == False).first() 

80 

81 

82def get_person_by_email(db: Session, email: str): 

83 return db.query(models.Person).filter(models.Person.email == email).filter(models.Person.isprivate == False).first() 

84 

85 

86def _get_private_person_by_name(db: Session, name: str): 

87 return db.query(models.Person).filter(models.Person.name == name).first() 

88 

89 

90def create_person(db: Session, person: PersonCreate): 

91 db_person = models.Person(**person.dict()) 

92 db.add(db_person) 

93 result = db.commit() 

94 db.refresh(db_person) 

95 db_contact = models.Contact(person_id=db_person.id,organisation_id=0) 

96 db.add(db_contact) 

97 result = db.commit() 

98 db.refresh(db_contact) 

99 return db_person 

100 

101 

102def get_all_contacts(db: Session, limit: int, offset: int = 0): 

103 return db.query(models.Contact).join(models.Person).filter(models.Contact.person_id == models.Person.id).filter(models.Person.isprivate == False).order_by(models.Contact.id).offset(offset).limit(limit).all() 

104 

105def get_contact(db: Session, contact_id: int): 

106 db_object = db.query(models.Contact).join(models.Person).filter(models.Contact.person_id == models.Person.id).filter(models.Contact.id == contact_id).filter(models.Person.isprivate == False).first() 

107 if db_object: 

108 if db_object.person_id == 0: 

109 return db.query(models.Organisation).filter(models.Organisation.id == db_object.organisation_id).first() 

110 else: 

111 return db.query(models.Person).filter(models.Person.id == db_object.person_id).filter(models.Person.isprivate == False).first() 

112 else: 

113 return db_object 

114 

115 

116def get_contact_by_orga_name(db: Session, name: str): 

117 # issue with '/' in organisation longname ==> only possible with double encoding! 

118 name = name.replace('%2F', '/') 

119 db_object = db.query(models.Organisation).filter(models.Organisation.longname == name).first() 

120 if db_object: 

121 return db.query(models.Contact).filter(models.Contact.organisation_id == db_object.id).filter(models.Contact.person_id == 0).first() 

122 else: 

123 return None 

124 

125 

126def get_contact_by_person_name(db: Session, name: str): 

127 db_object = db.query(models.Person).filter(models.Person.name == name).first() 

128 return db.query(models.Contact).filter(models.Contact.person_id == db_object.id).first() 

129 

130 

131def get_contact_by_person_email(db: Session, email: str): 

132 db_object = db.query(models.Person).filter(models.Person.email == email).first() 

133 return db.query(models.Contact).filter(models.Contact.person_id == db_object.id).first() 

134 

135 

136def get_contact_by_unique_constraints(db: Session, person_id, organisation_id: int): 

137 return db.query(models.Contact).filter(models.Contact.organisation_id == organisation_id). \ 

138 filter(models.Contact.person_id == person_id).first() 

139 

140 

141def create_contact(db: Session, person_name, organisation_longname: str): 

142 if person_name: 

143 db_person = _get_private_person_by_name(db,person_name) 

144 else: 

145 db_person = get_person(db,0) 

146 if organisation_longname: 

147 db_organisation = get_organisation_by_longname(db,organisation_longname) 

148 else: 

149 db_organisation = get_organisation(db,0) 

150 db_contact = get_contact_by_unique_constraints(db,db_person.id,db_organisation.id) 

151 if not db_contact: 

152 db_contact = models.Contact(person_id=db_person.id,organisation_id=db_organisation.id) 

153 db.add(db_contact) 

154 result = db.commit() 

155 db.refresh(db_contact) 

156 return db_contact