This repository has been archived on 2023-07-12. You can view files and clone it, but cannot push or open issues or pull requests.
kaizen-bot-old/kaizenbot/database.py

204 lines
8.6 KiB
Python
Raw Normal View History

2022-04-20 23:41:25 +02:00
import mariadb
import datetime
from typing import Any, Dict, List, Union
import discord
from .user import User
class Database:
def __init__(self, user: str, password: str):
self.database: mariadb._mariadb.connection = mariadb.connect(user=user, password=password, host='127.0.0.1', port=3306, database='kaizen')
self.database.autocommit = True
self.cursor = self.database.cursor()
def sync(self):
self.cursor.execute('INSERT INTO info (id) (SELECT id FROM rules_accepted WHERE id NOT IN (SELECT id FROM info))')
self.cursor.execute('INSERT INTO economy (id) (SELECT id FROM rules_accepted WHERE id NOT IN (SELECT id FROM economy))')
# --- #
def get_message_id(self) -> Union[int, None]:
try:
self.cursor.execute('SELECT v FROM stuff WHERE k=?', ('message_id',))
return int(self.cursor.fetchone()[0])
except (TypeError, ValueError):
return None
def set_message_id(self, id: int):
if id is None:
self.cursor.execute('DELETE FROM stuff WHERE k=?', ('message_id',))
else:
self.cursor.execute('INSERT INTO stuff (k, v) VALUES (?, ?)', ('message_id', str(id)))
def get_image_id(self) -> Union[int, None]:
try:
self.cursor.execute('SELECT v FROM stuff WHERE k=?', ('image_id',))
return int(self.cursor.fetchone()[0])
except (TypeError, ValueError):
return None
def set_image_id(self, id: int):
if id is None:
self.cursor.execute('DELETE FROM stuff WHERE k=?', ('image_id',))
else:
self.cursor.execute('INSERT INTO stuff (k, v) VALUES (?, ?)', ('image_id', str(id)))
# --- user specific --- #
"""def user(self, member: discord.Member) -> Union[User, None]:
self.cursor.execute('SELECT * FROM rules_accepted WHERE id=?', (member.id,))
result = self.cursor.fetchone()
if result:
return User(result[0], result[1], result[2], member.joined_at, result[3])
else:
self.cursor.execute('SELECT * FROM normal_user WHERE id=?', (member.id,))
result = self.cursor.fetchone()
if result:
return User(result[0], result[1], result[2], result[3], None, result[4], result[5])
else:
return None"""
def add_user(self, user: discord.Member, join_message_id: int) -> User:
self.cursor.execute('INSERT INTO normal_user (id, name, tag, joined, join_message) VALUES (?, ?, ?, ?, ?)', (user.id, user.display_name, str(user), user.joined_at, join_message_id))
return User(user.id, user.display_name, str(user), user.joined_at, join_message=join_message_id)
def get_all_users(self) -> Dict[int, User]:
users = {}
self.cursor.execute('SELECT * FROM normal_user')
for row in self.cursor.fetchall():
users[row[0]] = User(row[0], row[1], row[2], row[3], None, row[4], row[5])
self.cursor.execute('SELECT * FROM rules_accepted')
for row in self.cursor.fetchall():
users[row[0]] = User(row[0], row[1], row[2], None, row[3])
return users
def set_user_warning(self, user: User, warning_time: datetime.datetime):
self.cursor.execute('UPDATE normal_user SET warned=? WHERE id=?', (warning_time, user.id))
user.warning_time = warning_time
def add_user_accepted_rules(self, user: User, accepted_rules_datetime: datetime.datetime):
self.cursor.execute('DELETE FROM normal_user WHERE id=?', (user.id,))
self.cursor.execute('INSERT INTO rules_accepted (id, name, tag, accepted) VALUES (?, ?, ?, ?)', (user.id, user.name, user.tag, accepted_rules_datetime))
self.cursor.execute('INSERT INTO info (id) VALUES (?)', (user.id,))
self.cursor.execute('INSERT INTO economy (id) SELECT id FROM rules_accepted WHERE NOT EXISTS(SELECT id FROM rules_accepted WHERE id=?)', (user.id,))
user.warning_time = None
user.accepted_rules_date = accepted_rules_datetime
user.join_message = None
def reset_user(self, user: User):
self.cursor.execute('DELETE FROM rules_accepted WHERE id=?', (user.id,))
now = datetime.datetime.now()
self.cursor.execute('INSERT INTO normal_user (id, name, tag, joined) VALUES (?, ?, ?, ?)', (user.id, user.name, user.tag, now))
user.joined = now
user.warning_time = None
user.accepted_rules_date = None
def remove_user(self, user: Union[User, int]):
if isinstance(user, User):
id = user.id
else:
id = user
self.cursor.execute('DELETE FROM normal_user WHERE id=?', (id,))
self.cursor.execute('DELETE FROM rules_accepted WHERE id=?', (id,))
self.cursor.execute('DELETE FROM info WHERE id=?', (id,))
def change_user_infos(self, user: User, new_name: str, new_tag: str):
self.cursor.execute('UPDATE normal_user SET name=?, tag=? WHERE id=?', (new_name, new_tag, user.id))
self.cursor.execute('UPDATE rules_accepted SET name=?, tag=? WHERE id=?', (new_name, new_tag, user.id))
user.name = new_name
user.tag = new_tag
# --- user infos --- #
def get_user_infos(self, id: int) -> Union[Dict[str, Any], None]:
self.cursor.execute('SELECT * FROM info WHERE id=?', (id,))
result = self.cursor.fetchone()
try:
return {
'Name': result[1],
'Alter': result[2],
'Anime Liste': result[3],
'Lieblings Anime': result[4],
'Waifu': result[5],
'Husbando': result[6],
}
except TypeError:
return None
def set_user_name(self, id: int, name: str):
self.cursor.execute('UPDATE info SET name=? WHERE id=?', (name, id))
def set_user_age(self, id: int, age: [int, None]):
self.cursor.execute('UPDATE info SET age=? WHERE id=?', (age, id))
def set_user_list(self, id: int, list: Union[str, None]):
self.cursor.execute('UPDATE info SET list=? WHERE id=?', (list, id))
def set_user_fav(self, id: int, fav: str):
self.cursor.execute('UPDATE info SET fav=? WHERE id=?', (fav, id))
def set_user_waifu(self, id: int, waifu: str):
self.cursor.execute('UPDATE info SET waifu=? WHERE id=?', (waifu, id))
def set_user_husbando(self, id: int, husbando: str):
self.cursor.execute('UPDATE info SET husbando=? WHERE id=?', (husbando, id))
# --- points --- #
def add_user_gold(self, id: int, gold: int):
self.cursor.execute('UPDATE economy SET gold=gold + ? WHERE id=?', (gold, id))
def has_remove_user_gold(self, id: int, gold: int) -> bool:
self.cursor.execute('UPDATE economy SET gold=gold - ? WHERE id=? AND gold >= ?', (gold, id, gold))
return self.cursor.rowcount > 0
def get_user_items(self, id: int) -> Dict[str, int]:
self.cursor.execute('SELECT * FROM economy WHERE id=?', (id,))
result = self.cursor.fetchone()
return {
'gold': result[1],
'extra vote': result[2],
'color': result[3]
}
def get_leaderboard(self) -> Dict[str, int]:
self.cursor.execute('SELECT id, gold FROM economy ORDER BY gold DESC LIMIT 10')
return {user[0]: user[1] for user in self.cursor.fetchall()}
def set_user_extra_vote(self, id: int, count: int):
self.cursor.execute('UPDATE economy SET extra_vote=extra_vote + ? WHERE id=? AND extra_vote > 0', (count, id))
return self.cursor.rowcount > 0
def set_user_color_count(self, id: int, count: int):
self.cursor.execute('UPDATE economy SET color=color + ? WHERE id=?', (count, id))
def set_user_color(self, id: int, color: str):
self.cursor.execute('UPDATE info SET color=? WHERE id=?', (color, id))
# --- family --- #
def add_user_parent(self, user: int, parent_id: int):
self.cursor.execute('INSERT INTO family (id, parent) SELECT ?, ? WHERE NOT EXISTS(SELECT * FROM family WHERE id=? AND parent=?)', (user, parent_id, user, parent_id))
def remove_user_parent(self, user: int, parent_id: int):
self.cursor.execute('DELETE FROM family WHERE id=? AND parent=?', (user, parent_id))
def get_user_parents(self, user: int) -> List[int]:
parents = []
self.cursor.execute('SELECT parent FROM family WHERE id=?', (user,))
for parent in self.cursor.fetchall:
parents.append(parent[0])
return parents
def get_user_children(self, user: int) -> List[int]:
children = []
self.cursor.execute('SELECT id FROM family WHERE parent=?', (user,))
for parent in self.cursor.fetchall:
children.append(parent[0])
return children