Use ORM for applicants

This commit is contained in:
Yuri Becker
2025-04-21 23:45:33 +02:00
parent 0ae153617b
commit 45f19d80d0
45 changed files with 388 additions and 354 deletions

View File

@@ -1,27 +1,26 @@
from .DbManager import DbManager
import mariadb
import json
import mariadb
from lib.domain.BaseModel import database
class AddressDAO:
__cur = None
def __init__(self):
#print(f"*** File: {__file__}, init()")
self.__con = DbManager().getConnection()
self.__con = database.connection()
if self.__con:
self.__cur = self.__con.cursor()
def __importPlz(self):
with open("pfad zur datei", "r") as plz:
postcodes = json.load(plz)
irgendwas = ""
try:
for i in postcodes:
test =i["plz_name"].split(",")
test = i["plz_name"].split(",")
for town in test:
if "u.a" in town:
town = town[:-4]
@@ -29,12 +28,12 @@ class AddressDAO:
if town:
print(f"PROCESSING {i['name']} {town}")
self.__cur.callproc("addZipCodes", (i["name"], town, irgendwas,))
#self.__cur.callproc("addZipCodes", ("56271", "Kleinmaischeid", irgendwas,))
# self.__cur.callproc("addZipCodes", ("56271", "Kleinmaischeid", irgendwas,))
except mariadb.OperationalError as e:
print(f"Database Error: {e}")
finally:
self.__con.commit()
print("FINISHED")#
print("FINISHED") #
def __importCountry(self):
with open("pfad zur datei", "r") as country:
@@ -55,18 +54,17 @@ class AddressDAO:
print(i[4], i[3], i[2], i[8], i[7])
self.__cur.execute("INSERT INTO country (country, countryshort, nationality, iso2, iso3) VALUES (%s, %s, %s, %s, %s)", (i[4], i[3], i[2], i[8], i[7]))
self.__cur.execute(
"INSERT INTO country (country, countryshort, nationality, iso2, iso3) VALUES (%s, %s, %s, %s, %s)",
(i[4], i[3], i[2], i[8], i[7]))
old = i[4]
except mariadb.OperationalError as e:
print(f"Database Error: {e}")
finally:
self.__con.commit()
print("FINISHED")#
print("FINISHED") #
def getAddressData(self, all = True, zipcode = None):
def getAddressData(self, all=True, zipcode=None):
try:
if self.__cur:
self.__cur.callproc("getAddress", (all, zipcode,))
@@ -76,5 +74,3 @@ class AddressDAO:
return None
except mariadb.Error as e:
print(str(e))

View File

@@ -2,18 +2,19 @@ from PySide6.QtCore import QAbstractListModel, Qt, Slot, QModelIndex
from .AddressDAO import AddressDAO
from ..PyqcrmDataRoles import PyqcrmDataRoles
class AddressModel(QAbstractListModel):
def __init__(self):
super().__init__()
self.__address_data = AddressDAO().getAddressData()
def rowCount(self, parent = QModelIndex()):
def rowCount(self, parent=QModelIndex()):
return len(self.__address_data)
def data(self, index, role = Qt.DisplayRole):
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
row = index.row()
if role == Qt.DisplayRole:
if role == Qt.ItemDataRole.DisplayRole:
data = self.__address_data[row][2]
return data
elif role == PyqcrmDataRoles.CITY_ROLE:
@@ -23,7 +24,7 @@ class AddressModel(QAbstractListModel):
def roleNames(self):
return {
Qt.DisplayRole: b"display",
Qt.ItemDataRole.DisplayRole: b"display",
PyqcrmDataRoles.CITY_ROLE: b"city",
}
@@ -34,9 +35,3 @@ class AddressModel(QAbstractListModel):
def getAddresses(self, all, zipcode):
data = AddressDAO().getAddressData(all, zipcode)
return data

76
lib/DB/ApplicantModel.py Normal file
View File

@@ -0,0 +1,76 @@
import uuid
from typing import List, Callable, Any
from PySide6.QtCore import QModelIndex, Qt, QAbstractTableModel, Slot
from PySide6.QtQml import QJSValue
from peewee import Select
from lib.domain.Applicant import Applicant
COLUMNS: list[Callable[[Applicant], Any]] = [
lambda applicant: applicant.first_name,
lambda applicant: applicant.last_name,
lambda applicant: applicant.zip_code.zip_code or None,
lambda applicant: applicant.zip_code.town.town if applicant.zip_code.id is not None else None
]
COLUMN_NAMES = ["Vorname", "Nachname", "PLZ", "Ort"]
class ApplicantModel(QAbstractTableModel):
_applicants: Select
def __init__(self) -> None:
super().__init__()
self._applicants = Applicant.select_table_data()
def rowCount(self, /, parent=...):
return len(self._applicants)
def columnCount(self, /, parent=...):
return len(COLUMNS)
def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole:
applicant = self._applicants[index.row()]
return COLUMNS[index.column()](applicant)
return None
@Slot(int, result=dict)
def applicant(self, row) -> dict:
applicant = Applicant.get_by_id(self._applicants[row].id)
return {
'title': applicant.title,
"firstName": applicant.first_name,
"lastName": applicant.last_name,
"street": applicant.street,
"houseNumber": applicant.house_number,
"zipCode": applicant.zip_code_id,
"phoneNumber": applicant.phone_number,
"mobileNumber": applicant.mobile_number,
"emailAddress": applicant.email_address,
"salutation": applicant.salutation
}
@Slot(QJSValue)
def createApplicant(self, values: QJSValue):
applicant = Applicant()
applicant.id = uuid.uuid4()
applicant.title = values.property("title").toInt()
applicant.first_name = values.property("firstName").toString()
applicant.last_name = values.property("lastName").toString()
applicant.street = values.property("street").toString() or None
applicant.house_number = values.property("houseNumber").toString() or None
if values.property("zipCode").toInt() != -1:
applicant.zip_code = values.property("zipCode").toInt()
applicant.phone_number = values.property("phoneNumber").toString() or None
applicant.mobile_number = values.property("mobileNumber").toString() or None
applicant.email_address = values.property("emailAddress").toString() or None
applicant.salutation = values.property("salutation").toString() or None
applicant.save(force_insert=True)
self._applicants = Applicant.select_table_data()
def headerData(self, section: int, orientation: Qt.Orientation, role: int = Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole:
return COLUMN_NAMES[section]
return None

View File

@@ -1,20 +1,20 @@
from .DbManager import DbManager
from lib.domain.BaseModel import database
class BTypeDAO:
__cur = None
def __init__(self):
#print(f"*** File: {__file__}, init()")
self.__con = DbManager().getConnection()
self.__con = database.connection()
if self.__con:
self.__cur = self.__con.cursor()
def getBType(self):
try:
if self.__cur:
self.__cur.callproc("getBtype", (None, None, ))
self.__cur.callproc("getBtype", (None, None,))
data = self.__cur.fetchall()
return(data)
return data
else:
return None
except mariadb.Error as e:

View File

@@ -1,7 +1,7 @@
from .DbManager import DbManager
import json
import mariadb
from PySide6.QtCore import QObject, Signal
from lib.domain.BaseModel import database
class BusinessDAO(QObject):
@@ -12,11 +12,11 @@ class BusinessDAO(QObject):
def __init__(self):
super().__init__()
self.__con = DbManager().getConnection()
self.__con = database.connection()
if self.__con:
self.__cur = self.__con.cursor()
def getBusiness(self, enc_key, criterion = "Alle"):
def getBusiness(self, enc_key, criterion="Alle"):
try:
if self.__cur:
self.__cur.callproc("getCustomerView", (enc_key, criterion,))
@@ -27,14 +27,14 @@ class BusinessDAO(QObject):
except mariadb.Error as e:
print(str(e))
def getOneBusiness(self, business_id, enc_key = None):
def getOneBusiness(self, business_id, enc_key=None):
try:
if self.__cur:
self.__cur.callproc("getCustomer", (business_id, enc_key,))
#self.__all_cols = [desc[0] for desc in self.__cur.description]
return self.__cur.fetchall() #, self.__all_cols
# self.__all_cols = [desc[0] for desc in self.__cur.description]
return self.__cur.fetchall() # , self.__all_cols
else:
return None #, None
return None # , None
except mariadb.Error as e:
print(str(e))
@@ -47,10 +47,3 @@ class BusinessDAO(QObject):
except mariadb.Error as e:
print(str(e))

View File

@@ -1,7 +1,9 @@
from .DbManager import DbManager
from PySide6.QtCore import QObject, Signal
import json
import mariadb
from PySide6.QtCore import QObject, Signal
from lib.domain.BaseModel import database
class ContactDAO(QObject):
@@ -9,8 +11,7 @@ class ContactDAO(QObject):
def __init__(self):
super().__init__()
#print(f"*** File: {__file__}, __init__()")
self.__con = DbManager().getConnection()
self.__con = database.connection()
if self.__con:
self.__cur = self.__con.cursor()

View File

@@ -1,46 +0,0 @@
import mariadb
class DbManager():
__connection = None
__con_param = None
__dbmanager = None
def __new__(cls, dbconf = None):
if cls.__dbmanager is None:
cls.__dbmanager = super(DbManager, cls).__new__(cls)
cls.__dbmanager.__initializeConfig(dbconf)
return cls.__dbmanager
def getConnection(cls):
#print(f"DB Manager: {cls.__dbmanager}")
#print(f"DB Connection: {cls.__connection}")
try:
if not cls.__connection or not cls.__connection.ping():
cls.__failure_notified = False
cls.__connection = mariadb.connect(**cls.__con_param)
except mariadb.InterfaceError as e:
cls.__connection = mariadb.connect(**cls.__con_param)
print(f"DbManager Connection (INTERFACE ERROR): {e}..reconnecting...")
except mariadb.Error as e:
if '(110)' in str(e):
print(f"File: {__file__}\n Database connection timed out (Check connection parameters or server running): {e}")
elif '(138)' in str(e):
print(f"File: {__file__}\n Database connection timed out (Check connection parameters or server running - initial handshake): {e}")
else:
print(f"File: {__file__}\n Database connection error: {e}")
cls.__connection = None
return cls.__connection
def __initializeConfig(cls, dbconf):
cls.__con_param = { 'user': dbconf['DB_USER'], 'password': dbconf['DB_PASS'],
'port': int (dbconf['DB_PORT']), 'host': dbconf['DB_HOST'],
'database': dbconf['DB_NAME'], 'connect_timeout': 5, 'autocommit': True,
}

View File

@@ -1,35 +1,41 @@
from .DbManager import DbManager
import json
import mariadb
from PySide6.QtCore import QObject, Signal
from lib.domain.BaseModel import database
class EmployeeDAO(QObject):
newEmployeeAdded = Signal(bool)
__all_cols = None
def __init__(self):
super().__init__()
self.__con = DbManager().getConnection()
self._connection = database.connection()
def getEmployees(self, enc_key, criterion="Alle", processed=False, fired=False, every_state=True):
cursor = self.__con.cursor()
cursor.callproc("getEmployeeTable", (criterion, processed, fired, every_state, enc_key,))
self.__all_cols = [desc[0] for desc in cursor.description]
result = cursor.fetchall(), self.__all_cols
cursor.close()
return result
cursor = self._connection.cursor()
try:
cursor.callproc("getEmployeeTable", (criterion, processed, fired, every_state, enc_key,))
all_cols = [desc[0] for desc in cursor.description]
result = cursor.fetchall(), all_cols
return result
finally:
cursor.close()
def fetchApplicant(self, employee_id, enc_key=None) -> dict:
cursor = self.__con.cursor(dictionary=True)
cursor.callproc("getApplicant", (employee_id, enc_key))
it = cursor.fetchone()
cursor.close()
return it
cursor = self._connection.cursor(dictionary=True)
try:
cursor.callproc("getApplicant", (employee_id, enc_key))
it = cursor.fetchone()
return it
finally:
cursor.close()
def addEmployee(self, data, enc_key, applicant=True):
cursor = self.__con.cursor()
cursor.callproc("addApplicant", (json.dumps(data), applicant, enc_key,))
self.__con.commit()
cursor.close()
self.newEmployeeAdded.emit(True)
def addApplicant(self, data, enc_key, applicant=True):
cursor = self._connection.cursor()
try:
cursor.callproc("addApplicant", (json.dumps(data), applicant, enc_key,))
self._connection.commit()
self.newEmployeeAdded.emit(True)
finally:
cursor.close()

View File

@@ -10,7 +10,6 @@ class EmployeeModel(QAbstractTableModel):
addedNewEmployee = Signal(bool)
__data = None
__employee_dao = None
__visible_index = None
__visible_columns = None
__col_name = ""
__col_skip = 2
@@ -18,7 +17,7 @@ class EmployeeModel(QAbstractTableModel):
def __init__(self):
super().__init__()
self.__employee_dao = EmployeeDAO()
self.__employee_dao = EmployeeDAO()
self.__employee_dao.newEmployeeAdded.connect(self.__refreshView)
self.__conf = ConfigLoader().getConfig()
self.__key = self.__conf['pyqcrm']['ENCRYPTION_KEY']
@@ -29,11 +28,11 @@ class EmployeeModel(QAbstractTableModel):
if 'worklicense' in new_employee:
new_employee['worklicense'] = int(new_employee['worklicense'])
new_employee['residencetype'] = int(new_employee['residencetype'])
self.__employee_dao.addEmployee(new_employee, self.__key, False)
self.__employee_dao.addApplicant(new_employee, self.__key, False)
@Slot(QJSValue)
def addApplicant(self, applicant: QJSValue):
self.__employee_dao.addEmployee({
self.__employee_dao.addApplicant({
"city": applicant.property("city").toString(),
"email": applicant.property("email").toString(),
"firstname": applicant.property("firstname").toString(),
@@ -72,29 +71,21 @@ class EmployeeModel(QAbstractTableModel):
self.__col_skip = 2
self.__getData(criterion, criterion == 'Erledigt', False, criterion == 'Alle')
def data(self, index, role=Qt.DisplayRole):
if role == Qt.DisplayRole:
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole:
row = self.__data[index.row()]
applicant_col = index.column() + self.__col_skip
tr = row[
applicant_col] # if type(row[index.column() + 2]) is str else str(row[index.column() + 2], "utf-8")
tr = row[applicant_col]
if applicant_col == 2 and self.__everyone:
tr = 'Ja' if tr == 1 else 'Nein'
else:
if tr:
tr = re.sub("Keine Angabe ", "", tr)
# print(f"Data: {tr}")
# return row[index.column() + 2]
return tr
return None
def headerData(self, section, orientation, role=Qt.DisplayRole):
if orientation == Qt.Horizontal and role == Qt.DisplayRole:
def headerData(self, section, orientation, role=Qt.ItemDataRole.DisplayRole):
if orientation == Qt.Orientation.Horizontal and role == Qt.ItemDataRole.DisplayRole:
self.__col_name = self.__visible_columns[section + self.__col_skip]
return self.__col_name
return super().headerData(section, orientation, role)
@Slot(int, result=dict)
def fetchApplicant(self, row) -> dict:
employee_id = self.__data[row][0]
return self.__employee_dao.fetchApplicant(employee_id, self.__key)

View File

@@ -1,17 +1,14 @@
from .DbManager import DbManager
import json
import mariadb
from PySide6.QtCore import QObject, Signal
# from ..PyqcrmFlags import PyqcrmAppliEmpyFlags
from lib.domain.BaseModel import database
class ObjectDAO(QObject):
newObjectAdded = Signal(bool, int)
def __init__(self):
super().__init__()
#print(f"*** File: {__file__}, __init__()")
self.__con = DbManager().getConnection()
self.__con = database.connection()
if self.__con:
self.__cur = self.__con.cursor()

View File

@@ -1,20 +1,21 @@
# This Python file uses the following encoding: utf-8
from .DbManager import DbManager
from ..PyqcrmFlags import PyqcrmFlags
import mariadb
from PySide6.QtCore import QObject, Signal
from lib.domain.BaseModel import database
class UserDAO(QObject):
noDbConnection = Signal(str)
__cursor = None
def __init__(self):
#print(f"*** File: {__file__}, init()")
super().__init__()
self.__con = DbManager().getConnection()
self.__con = database.connection()
if self.__con:
self.__cur = self.__con.cursor()
def createUser(self, username, password, info, role= PyqcrmFlags.USER):
def createUser(self, username, password, info, role=PyqcrmFlags.USER):
user_created = False
try:
if self.__cur:
@@ -39,6 +40,3 @@ class UserDAO(QObject):
except mariadb.Error as e:
print(str(e))
self.noDbConnection.emit(str(e))

View File

@@ -1,35 +1,28 @@
from .DbManager import DbManager
from PySide6.QtCore import Slot, QObject, Signal
from lib.domain.BaseModel import database
from .UserDAO import UserDAO
from ..PyqcrmFlags import PyqcrmFlags
from ..Vermasseln import Vermasseln
#from PySide6.QtMultimedia import QMediaPlayer, QAudioOutput : Not working well with Nuitka
import soundfile as sf
import sounddevice as sd
from .UserDAO import UserDAO
from PySide6.QtCore import Slot, QObject, Signal, QUrl, QFile
import tempfile
class UserManager(QObject):
loginOkay = Signal()
noDbConnection = Signal(str)
def __init__(self, user_config = None, role = None):
def __init__(self, user_config=None, role=None):
super().__init__()
self.__con = DbManager().getConnection()
self.__con = database.connection()
self.__user_dao = UserDAO()
self.__user_dao.noDbConnection.connect(self.noDbConnection)
if self.__con:
self.__cur = self.__con.cursor()
if user_config and role:
self.__username = user_config["PYQCRM_USER"]
self.__password = user_config["PYQCRM_USER_PASS"]
self.__info = user_config["PYQCRM_USER_INFO"]
self.__role = role if role == PyqcrmFlags.ADMIN else 0
def createUser(self):
self.__hashPassword()
user_created = self.__user_dao.createUser(self.__username, self.__password, self.__info, self.__role)
@@ -64,25 +57,6 @@ class UserManager(QObject):
user = self.__user_dao.getUser(username)
if user:
self.__checkPassword(password, user[2])
else:
fail_src = ":/sounds/fail2c.ogg"
with tempfile.NamedTemporaryFile(suffix='.ogg') as ogg_file:
failure_sound = QFile(fail_src)
if not failure_sound.open(QFile.ReadOnly):
print(f"Failed to open resource file: {fail_src}")
else:
ogg_file.write(failure_sound.readAll())
ogg_path = ogg_file.name
fail, samplerate = sf.read(ogg_path)
sd.play(fail, samplerate)
### Not working with Nuitka
# player = QMediaPlayer(self)
# audioOutput = QAudioOutput(self)
# player.setAudioOutput(audioOutput)
# player.setSource(QUrl("qrc:/sounds/fail2c.ogg"))
# audioOutput.setVolume(150)
# player.play()
def __checkPassword(self, password, hash_password):
pw_list = hash_password.split("$")
@@ -90,6 +64,3 @@ class UserManager(QObject):
hash_pw = Vermasseln.userPasswordHash(password, pw_list[0])
if hash_password == hash_pw:
self.loginOkay.emit()