Files
pyqcrm/lib/ConfigLoader.py
2025-04-24 01:37:09 +02:00

268 lines
8.9 KiB
Python

# This Python file uses the following encoding: utf-8
import os
from base64 import b64encode
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
import toml
from Crypto.Random import get_random_bytes
from PySide6.QtCore import QObject, Slot, Signal
from peewee import OperationalError
from platformdirs import user_config_dir
from .Config import Config, DatabaseConfig
from .DB.UserManager import UserManager
from .PyqcrmFlags import PyqcrmFlags
from .Vermasseln import Vermasseln
from .domain.BaseModel import init_database_from_config
class ConfigLoader(QObject):
__config: Optional[Config] = None
__version = "0.1-alpha"
__check_enc_key = True
dbConnectionError = Signal(str, bool)
adminUserError = Signal(str, bool)
adminNotAsvailable = Signal()
configurationReady = Signal()
backupEncryptionKey = Signal()
invalidEncryptionKey = Signal()
def __init__(self):
super().__init__()
self.config_dir = user_config_dir() + '/pyqcrm'
config_dir = Path(self.config_dir)
if config_dir.exists():
self.__configLoad()
if self.__config:
self.checkEncryptionKey()
else:
config_dir.mkdir(0o750, True, True)
@Slot(dict, result=bool)
def setConfig(self, app_config: Config):
if not self.__config:
base_conf = self.__initializeConfig()
conf = self._is_db_connectable(app_config['database'])
app_config = toml.dumps(app_config)
if conf:
app_config = base_conf + app_config
self.__config = toml.loads(app_config)
self.__saveConfig()
conf = self.__checkAdminUser()
if conf:
self.configurationReady.emit()
def __initializeConfig(self):
self.__encrypt_key = b64encode(get_random_bytes(32)).decode("utf-8")
conf = f"[pyqcrm]\nVERSION = \"{self.__version}\"\n"
conf = conf + f"ENCRYPTION_KEY_VALID = \"No\"\n"
conf = conf + f"ENCRYPTION_KEY = \"{self.__encrypt_key}\"\n\n"
return conf
def _is_db_connectable(self, config: DatabaseConfig):
try:
init_database_from_config(config)
self.dbConnectionError.emit("Connection OK", True)
return True
except OperationalError as e:
self.dbConnectionError.emit("Connection fehlgeschlagen", False)
return False
def __saveConfig(self):
# print(f"In {__file__} file, saveConfig()")
try:
with open(self.config_dir + '/pyqcrm.toml', 'w') as f:
# print(self.__config)
config = Vermasseln().oscarVermasseln(toml.dumps(self.__config))
f.write(config)
except FileNotFoundError:
print("Konnte die Konfiguration nicht speichern.")
def __checkAdminUser(self):
# print(f"In {__file__} file, __checkAdminUser()")
result = UserManager().checkAdmin()
if not result:
# if not result[0][0] == 1:
self.adminUserError.emit("Kein Admin vorhanden", False)
return False
else:
self.adminUserError.emit("Admin vorhanden", True)
return True
@Slot(dict, result=bool)
def addAdminUser(self, user_config):
# print(f"In {__file__} file, addAdminUser()")
admin = UserManager(user_config["user"], PyqcrmFlags.ADMIN).createUser()
if not admin:
# self.adminNotAvailable.emit()
self.adminUserError.emit("Benutzername nich verfügbar", False)
else:
self.__config['pyqcrm']['ENCRYPTION_KEY_VALID'] = 'Yes'
self.__saveConfig()
self.backupEncryptionKey.emit()
return admin
@Slot(str, str)
def __saveData(self, recovery_file, recovery_password, data):
# print(f"In {__file__} file, __saveData()")
local = False
rp = self.__setRecoveryPassword(recovery_password)
rf = rp[1] + data + rp[0]
rf = Vermasseln().oscarVermasseln(rf, local)
rec_file = urlparse(recovery_file)
if os.name == "nt":
rec_file = rec_file[1:]
else:
rec_file = rec_file.path + ".pyqrec"
try:
with open(rec_file, "w") as f:
f.write(rf)
self.configurationReady.emit()
except Exception as e:
print(str(e))
@Slot(str, str)
def getRecoveryKey(self, recovery_file, recovery_password):
rec_file = urlparse(recovery_file)
rec_file = rec_file.path
if os.name == "nt":
rec_file = rec_file[1:]
try:
ek = self.__parseImport(rec_file, recovery_password)
if ek:
self.__setEncryptionKey(ek)
self.configurationReady.emit()
else:
self.__invalidateEncryptionKey()
self.invalidEncryptionKey.emit()
except Exception as e:
print(str(e))
def __parseImport(self, rec_file, recovery_password):
local = False
with open(rec_file, "r") as f:
rf = f.read()
rf = Vermasseln().entschluesseln(rf, local)
ek = rf[128:]
ek = ek[:-32]
password = rf[:128]
salt = rf[-32:]
ok = self.__checkRecoveryPassword(recovery_password, password, salt)
if ok:
return ek
else:
return None
def __invalidateEncryptionKey(self):
# print(f"In {__file__} file, __invalidateEncryptionKey()")
self.__config['pyqcrm']['ENCRYPTION_KEY_VALID'] = 'No'
self.__saveConfig()
@Slot()
def checkEncryptionKey(self):
# print(f"In {__file__} file, __checkEncryptionKey()")
if self.__config['pyqcrm']['ENCRYPTION_KEY_VALID'] == 'No':
self.invalidEncryptionKey.emit()
def __checkRecoveryPassword(self, recovery_password, password, salt):
# print(f"In {__file__} file, __checkRecoveryPassword()")
rp = self.__setRecoveryPassword(recovery_password, salt)
return rp[1] == password
@Slot(str, str) # todo: non local encryption
def importConfig(self, confile, password):
confile = urlparse(confile)
confile = confile.path
if os.name == "nt":
confile = confile[1:]
try:
ek = self.__parseImport(confile, password)
if ek:
self.__config = toml.loads(ek)
self.__saveConfig()
self.__configLoad()
else:
self.invalidEncryptionKey.emit()
except Exception as e:
print(str(e))
def __configLoad(self):
try:
with open(self.config_dir + '/pyqcrm.toml', 'r') as f:
config = f.read()
self.__config = toml.loads(Vermasseln().entschluesseln(config))
self.configurationReady.emit()
except FileNotFoundError:
print("Konnte die Konfiguration nicht laden.")
except TypeError:
print(f"Invalid Configuration: {__file__}")
def get_config(self) -> Optional[Config]:
return self.__config
def __setRecoveryPassword(self, key, salt=None):
key = Vermasseln.userPasswordHash(key, salt)
return key.split("$")
@Slot(str)
def __setEncryptionKey(self, enc_key):
# print(f"In {__file__} file, __setEncryptionKey()")
self.__config['pyqcrm']['ENCRYPTION_KEY_VALID'] = 'Yes'
self.__config['pyqcrm']['ENCRYPTION_KEY'] = enc_key
self.__saveConfig()
@Slot(str, str)
def backupConfig(self, filename, password):
conf_file = toml.dumps(self.get_config())
self.__saveData(filename, password, conf_file)
@Slot(dict)
def saveDbConf(self, db=None):
self.__config.update(db)
self.__saveConfig()
@Slot(result=dict)
def getDbConf(self):
try:
return self.__config['database'] if self.__config is not None else None
except KeyError as ex:
print(f"Missing database configuration: {ex}")
return None
@Slot(dict)
def saveCompanyInfo(self, company=None):
self.__config.update(company)
self.__saveConfig()
@Slot(result=dict)
def getCompanyInfo(self):
try:
return self.__config['company']
except KeyError as ex:
print(f"Missing company info: {ex}")
return None
@Slot(dict)
def saveMiscConf(self, misc_conf=None):
self.__config.update(misc_conf)
self.__saveConfig()
@Slot(result=bool)
def systray(self):
try:
return self.__config['misc']['SYSTRAY']
except KeyError as ex:
print(f"Missing configuration: {ex}")
return False
@Slot(str, str)
def backupEncryptkey(self, filename, password):
encrypt_key = self.__config['pyqcrm']['ENCRYPTION_KEY']
self.__saveData(filename, password, encrypt_key)