import mariadb import datetime from typing import Any, Dict, List, Union import discord from .user import User class Database: def __init__(self, user: str, password: str): self.database: mariadb._mariadb.connection = mariadb.connect(user=user, password=password, host='127.0.0.1', port=3306, database='kaizen') self.database.autocommit = True self.cursor = self.database.cursor() def sync(self): self.cursor.execute('INSERT INTO info (id) (SELECT id FROM rules_accepted WHERE id NOT IN (SELECT id FROM info))') self.cursor.execute('INSERT INTO economy (id) (SELECT id FROM rules_accepted WHERE id NOT IN (SELECT id FROM economy))') # --- # def get_message_id(self) -> Union[int, None]: try: self.cursor.execute('SELECT v FROM stuff WHERE k=?', ('message_id',)) return int(self.cursor.fetchone()[0]) except (TypeError, ValueError): return None def set_message_id(self, id: int): if id is None: self.cursor.execute('DELETE FROM stuff WHERE k=?', ('message_id',)) else: self.cursor.execute('INSERT INTO stuff (k, v) VALUES (?, ?)', ('message_id', str(id))) def get_image_id(self) -> Union[int, None]: try: self.cursor.execute('SELECT v FROM stuff WHERE k=?', ('image_id',)) return int(self.cursor.fetchone()[0]) except (TypeError, ValueError): return None def set_image_id(self, id: int): if id is None: self.cursor.execute('DELETE FROM stuff WHERE k=?', ('image_id',)) else: self.cursor.execute('INSERT INTO stuff (k, v) VALUES (?, ?)', ('image_id', str(id))) # --- user specific --- # """def user(self, member: discord.Member) -> Union[User, None]: self.cursor.execute('SELECT * FROM rules_accepted WHERE id=?', (member.id,)) result = self.cursor.fetchone() if result: return User(result[0], result[1], result[2], member.joined_at, result[3]) else: self.cursor.execute('SELECT * FROM normal_user WHERE id=?', (member.id,)) result = self.cursor.fetchone() if result: return User(result[0], result[1], result[2], result[3], None, result[4], result[5]) else: return None""" def add_user(self, user: discord.Member, join_message_id: int) -> User: self.cursor.execute('INSERT INTO normal_user (id, name, tag, joined, join_message) VALUES (?, ?, ?, ?, ?)', (user.id, user.display_name, str(user), user.joined_at, join_message_id)) return User(user.id, user.display_name, str(user), user.joined_at, join_message=join_message_id) def get_all_users(self) -> Dict[int, User]: users = {} self.cursor.execute('SELECT * FROM normal_user') for row in self.cursor.fetchall(): users[row[0]] = User(row[0], row[1], row[2], row[3], None, row[4], row[5]) self.cursor.execute('SELECT * FROM rules_accepted') for row in self.cursor.fetchall(): users[row[0]] = User(row[0], row[1], row[2], None, row[3]) return users def set_user_warning(self, user: User, warning_time: datetime.datetime): self.cursor.execute('UPDATE normal_user SET warned=? WHERE id=?', (warning_time, user.id)) user.warning_time = warning_time def add_user_accepted_rules(self, user: User, accepted_rules_datetime: datetime.datetime): self.cursor.execute('DELETE FROM normal_user WHERE id=?', (user.id,)) self.cursor.execute('INSERT INTO rules_accepted (id, name, tag, accepted) VALUES (?, ?, ?, ?)', (user.id, user.name, user.tag, accepted_rules_datetime)) self.cursor.execute('INSERT INTO info (id) VALUES (?)', (user.id,)) self.cursor.execute('INSERT INTO economy (id) SELECT id FROM rules_accepted WHERE NOT EXISTS(SELECT id FROM rules_accepted WHERE id=?)', (user.id,)) user.warning_time = None user.accepted_rules_date = accepted_rules_datetime user.join_message = None def reset_user(self, user: User): self.cursor.execute('DELETE FROM rules_accepted WHERE id=?', (user.id,)) now = datetime.datetime.now() self.cursor.execute('INSERT INTO normal_user (id, name, tag, joined) VALUES (?, ?, ?, ?)', (user.id, user.name, user.tag, now)) user.joined = now user.warning_time = None user.accepted_rules_date = None def remove_user(self, user: Union[User, int]): if isinstance(user, User): id = user.id else: id = user self.cursor.execute('DELETE FROM normal_user WHERE id=?', (id,)) self.cursor.execute('DELETE FROM rules_accepted WHERE id=?', (id,)) self.cursor.execute('DELETE FROM info WHERE id=?', (id,)) def change_user_infos(self, user: User, new_name: str, new_tag: str): self.cursor.execute('UPDATE normal_user SET name=?, tag=? WHERE id=?', (new_name, new_tag, user.id)) self.cursor.execute('UPDATE rules_accepted SET name=?, tag=? WHERE id=?', (new_name, new_tag, user.id)) user.name = new_name user.tag = new_tag # --- user infos --- # def get_user_infos(self, id: int) -> Union[Dict[str, Any], None]: self.cursor.execute('SELECT * FROM info WHERE id=?', (id,)) result = self.cursor.fetchone() try: return { 'Name': result[1], 'Alter': result[2], 'Anime Liste': result[3], 'Lieblings Anime': result[4], 'Waifu': result[5], 'Husbando': result[6], } except TypeError: return None def set_user_name(self, id: int, name: str): self.cursor.execute('UPDATE info SET name=? WHERE id=?', (name, id)) def set_user_age(self, id: int, age: [int, None]): self.cursor.execute('UPDATE info SET age=? WHERE id=?', (age, id)) def set_user_list(self, id: int, list: Union[str, None]): self.cursor.execute('UPDATE info SET list=? WHERE id=?', (list, id)) def set_user_fav(self, id: int, fav: str): self.cursor.execute('UPDATE info SET fav=? WHERE id=?', (fav, id)) def set_user_waifu(self, id: int, waifu: str): self.cursor.execute('UPDATE info SET waifu=? WHERE id=?', (waifu, id)) def set_user_husbando(self, id: int, husbando: str): self.cursor.execute('UPDATE info SET husbando=? WHERE id=?', (husbando, id)) # --- points --- # def add_user_gold(self, id: int, gold: int): self.cursor.execute('UPDATE economy SET gold=gold + ? WHERE id=?', (gold, id)) def has_remove_user_gold(self, id: int, gold: int) -> bool: self.cursor.execute('UPDATE economy SET gold=gold - ? WHERE id=? AND gold >= ?', (gold, id, gold)) return self.cursor.rowcount > 0 def get_user_items(self, id: int) -> Dict[str, int]: self.cursor.execute('SELECT * FROM economy WHERE id=?', (id,)) result = self.cursor.fetchone() return { 'gold': result[1], 'extra vote': result[2], 'color': result[3] } def get_leaderboard(self) -> Dict[str, int]: self.cursor.execute('SELECT id, gold FROM economy ORDER BY gold DESC LIMIT 10') return {user[0]: user[1] for user in self.cursor.fetchall()} def set_user_extra_vote(self, id: int, count: int): self.cursor.execute('UPDATE economy SET extra_vote=extra_vote + ? WHERE id=? AND extra_vote > 0', (count, id)) return self.cursor.rowcount > 0 def set_user_color_count(self, id: int, count: int): self.cursor.execute('UPDATE economy SET color=color + ? WHERE id=?', (count, id)) def set_user_color(self, id: int, color: str): self.cursor.execute('UPDATE info SET color=? WHERE id=?', (color, id)) # --- family --- # def add_user_parent(self, user: int, parent_id: int): self.cursor.execute('INSERT INTO family (id, parent) SELECT ?, ? WHERE NOT EXISTS(SELECT * FROM family WHERE id=? AND parent=?)', (user, parent_id, user, parent_id)) def remove_user_parent(self, user: int, parent_id: int): self.cursor.execute('DELETE FROM family WHERE id=? AND parent=?', (user, parent_id)) def get_user_parents(self, user: int) -> List[int]: parents = [] self.cursor.execute('SELECT parent FROM family WHERE id=?', (user,)) for parent in self.cursor.fetchall: parents.append(parent[0]) return parents def get_user_children(self, user: int) -> List[int]: children = [] self.cursor.execute('SELECT id FROM family WHERE parent=?', (user,)) for parent in self.cursor.fetchall: children.append(parent[0]) return children