75 lines
1.7 KiB
Python
75 lines
1.7 KiB
Python
import uuid
|
|
from services.password import get_password_hash
|
|
from database.auth.models import User, UserEdit
|
|
from sqlmodel import Session, select
|
|
|
|
def create_user_db(username:str , password: str, db: Session):
|
|
user = User(username=username, hashed_password=password, clientId=uuid.uuid4())
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
def get_user_from_username_db(username, db: Session):
|
|
user = db.exec(select(User).where(User.username == username)).first()
|
|
return user
|
|
|
|
def get_user_from_clientId_db(clientId: str , db: Session):
|
|
user = db.exec(select(User).where(User.clientId == clientId)).first()
|
|
return user
|
|
|
|
def update_user_db(clientId: str, user: UserEdit, db: Session):
|
|
db_user = get_user_from_clientId_db(clientId, db)
|
|
|
|
if not db_user:
|
|
return None
|
|
|
|
user_data = user.dict(exclude_unset=True)
|
|
|
|
for key, value in user_data.items():
|
|
setattr(db_user, key, value)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
def update_password_db(id: int, password: str, db: Session):
|
|
user = db.get(User, id)
|
|
if not user:
|
|
return None
|
|
|
|
user.hashed_password = get_password_hash(password)
|
|
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
return user
|
|
|
|
def delete_user_db(id: int, db: Session):
|
|
user = db.get(User, id)
|
|
if not user:
|
|
return False
|
|
db.delete(user)
|
|
db.commit()
|
|
return True
|
|
|
|
def check_unique_username(username: str, db: Session):
|
|
user = db.exec(select(User).where(User.username == username)).first()
|
|
if not user:
|
|
return username
|
|
return None
|
|
|
|
def change_user_uuid(id: int, db: Session):
|
|
user = db.get(User, id)
|
|
if not user:
|
|
return None
|
|
|
|
user.clientId = uuid.uuid4()
|
|
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user.clientId
|
|
|