Kilton937342 4941b5a154 tests
2023-02-22 12:43:39 +01:00

96 lines
2.4 KiB
Python

import uuid
from jose import jwt, exceptions
from sqlmodel import Session, select
from config import SECRET_KEY, ALGORITHM
from database.auth.models import User, UserEdit
from database.room.models import Member
from services.password import get_password_hash
def create_user_db(username:str , password: str, db: Session):
user = User(username=username, hashed_password=password, clientId=uuid.uuid4())
db.add(user)
db.commit()
db.refresh(user)
return user
def get_user_from_username_db(username, db: Session):
user = db.exec(select(User).where(User.username == username)).first()
return user
def get_user_from_clientId_db(clientId: str , db: Session):
user = db.exec(select(User).where(User.clientId == clientId)).first()
return user
def get_user_from_token(token: str, db: Session):
try:
decoded = jwt.decode(token=token, key=SECRET_KEY,
algorithms=[ALGORITHM])
except exceptions.ExpiredSignatureError:
return False
clientId = decoded.get('sub')
return get_user_from_clientId_db(clientId=clientId, db=db)
def update_user_db(clientId: str, user: UserEdit, db: Session):
db_user = get_user_from_clientId_db(clientId, db)
if not db_user:
return None
user_data = user.dict(exclude_unset=True)
for key, value in user_data.items():
setattr(db_user, key, value)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_password_db(id: int, password: str, db: Session):
user = db.get(User, id)
if not user:
return None
user.hashed_password = get_password_hash(password)
db.add(user)
db.commit()
db.refresh(user)
return user
def delete_user_db(id: int, db: Session):
user = db.get(User, id)
if not user:
return False
db.delete(user)
db.commit()
return True
def check_unique_username(username: str, db: Session):
user = db.exec(select(User).where(User.username == username)).first()
if not user:
return username
return None
def change_user_uuid(id: int, db: Session):
user = db.get(User, id)
if not user:
return None
user.clientId = uuid.uuid4()
db.add(user)
db.commit()
db.refresh(user)
return user.clientId
def parse_user_rooms(user: User, db: Session):
members = db.exec(select(Member).where(Member.user_id == user.id)).all()
return [{"name": m.room.name, "id_code": m.room.id_code, "admin": m.is_admin} for m in members]