mirror of
https://github.com/bytedream/guilddump.git
synced 2025-05-09 04:05:09 +02:00
247 lines
8.4 KiB
Python
247 lines
8.4 KiB
Python
import asyncio
|
|
import csv
|
|
import pathlib
|
|
import time
|
|
|
|
import discord
|
|
from simple_term_menu import TerminalMenu
|
|
|
|
|
|
async def input_prompt(question: str, default: str = '', checker=None) -> str:
|
|
while True:
|
|
try:
|
|
result = input(question) or default
|
|
except KeyboardInterrupt:
|
|
exit(1)
|
|
|
|
if checker is None:
|
|
return result
|
|
elif asyncio.iscoroutinefunction(checker) and await checker(result):
|
|
return result
|
|
elif not asyncio.iscoroutinefunction(checker) and checker(result):
|
|
return result
|
|
|
|
print('\r')
|
|
|
|
|
|
async def option_prompt(question: str, options: [str]) -> int:
|
|
print(question)
|
|
return TerminalMenu(options, clear_menu_on_exit=False).show()
|
|
|
|
|
|
async def select_prompt(question: str, options: [str]) -> [int]:
|
|
print(question)
|
|
return TerminalMenu(options, clear_menu_on_exit=False, multi_select=True).show()
|
|
|
|
|
|
async def yesno_prompt(question: str) -> bool:
|
|
return option_prompt(question, ['Yes', 'No']) == 0
|
|
|
|
|
|
async def request_client() -> discord.Client:
|
|
intents = discord.Intents().default()
|
|
intents.members = True
|
|
client = discord.Client(intents=intents)
|
|
|
|
async def checker(token: str) -> bool:
|
|
try:
|
|
await client.login(token)
|
|
return True
|
|
except discord.LoginFailure:
|
|
await client.close()
|
|
print('Invalid token')
|
|
return False
|
|
|
|
await input_prompt('Discord Bot Token: ', checker=checker)
|
|
return client
|
|
|
|
|
|
async def request_guild(client: discord.Client) -> discord.Guild:
|
|
options = []
|
|
guilds = [guild async for guild in client.fetch_guilds(limit=51)]
|
|
|
|
if len(guilds) > 50:
|
|
manual = True
|
|
elif len(guilds) <= 10:
|
|
manual = False
|
|
else:
|
|
option = await option_prompt(f'{len(guilds)} guilds are available. Choose from list or enter manually?: ', ['Choose', 'Manually'])
|
|
manual = option == 1
|
|
|
|
if manual:
|
|
async def checker(guild_id: str) -> bool:
|
|
try:
|
|
return await client.fetch_guild(int(guild_id)) is not None
|
|
except ValueError:
|
|
print('Enter a valid id (must only contain number)')
|
|
except discord.Forbidden:
|
|
print('Bot not in guild')
|
|
|
|
return False
|
|
|
|
return await client.fetch_guild(int(await input_prompt('Guild id to scrape: ', checker=checker)))
|
|
else:
|
|
for guild in guilds:
|
|
options.append(f'{guild.id} ({guild.name})')
|
|
|
|
return guilds[await option_prompt('Select guild to scrape: ', options)]
|
|
|
|
|
|
async def request_directory() -> pathlib.Path:
|
|
async def checker(directory: str) -> bool:
|
|
path = pathlib.Path(directory)
|
|
if path.exists():
|
|
if path.is_dir():
|
|
return True
|
|
else:
|
|
print(f'{directory} is not a directory')
|
|
else:
|
|
if await yesno_prompt(f'{directory} does not exist. Do you want to create it?'):
|
|
try:
|
|
path.mkdir(parents=True)
|
|
return True
|
|
except OSError as e:
|
|
print(f'Failed to create directory: {e}')
|
|
|
|
return False
|
|
|
|
return pathlib.Path(await input_prompt('Directory to save content in: ', checker=checker))
|
|
|
|
|
|
async def request_channels(guild: discord.Guild) -> [discord.TextChannel]:
|
|
options = list(filter(lambda channel: isinstance(channel, discord.TextChannel), await guild.fetch_channels()))
|
|
selected = await select_prompt('Select channels to scrape: ', [f'{channel.id} (#{channel.name})' for channel in options])
|
|
return [options[select] for select in selected]
|
|
|
|
|
|
async def request_max(target: str) -> int:
|
|
def checker(number: str) -> bool:
|
|
try:
|
|
if int(number) > 0:
|
|
return True
|
|
else:
|
|
print('Value must be greater than 0')
|
|
return False
|
|
except ValueError:
|
|
return False
|
|
|
|
return int(await input_prompt(f'Max {target} to fetch per channel (default: 1000): ', default='1000', checker=checker))
|
|
|
|
|
|
async def scrape_channels(guild: discord.Guild, channel_writer):
|
|
channel_writer.writerow(['id', 'created_at', 'name', 'type'])
|
|
|
|
for channel in await guild.fetch_channels():
|
|
channel_type = ''
|
|
if isinstance(channel, discord.TextChannel):
|
|
channel_type = 'text'
|
|
elif isinstance(channel, discord.VoiceChannel):
|
|
channel_type = 'voice'
|
|
elif isinstance(channel, discord.CategoryChannel):
|
|
channel_type = 'category'
|
|
elif isinstance(channel, discord.StageChannel):
|
|
channel_type = 'stage'
|
|
|
|
channel_writer.writerow([
|
|
channel.id,
|
|
int(time.mktime(channel.created_at.timetuple())),
|
|
channel.name,
|
|
channel_type
|
|
])
|
|
|
|
|
|
async def scrape_members(guild: discord.Guild, limit: int, user_writer):
|
|
user_writer.writerow(['id', 'joined_at', 'name', 'nick', 'premium_since'])
|
|
|
|
async for member in guild.fetch_members(limit=limit):
|
|
user_writer.writerow([
|
|
member.id,
|
|
int(time.mktime(member.joined_at.timetuple())),
|
|
str(member),
|
|
member.nick,
|
|
int(time.mktime(member.premium_since.timetuple())) if member.premium_since else 0
|
|
])
|
|
|
|
|
|
async def scrape_messages(channel: discord.TextChannel, limit: int, message_writer, attachment_writer, reaction_writer):
|
|
message_writer.writerow(['id', 'author_id', 'created_at', 'modified at', 'content'])
|
|
attachment_writer.writerow(['message_id', 'type', 'size', 'filename', 'url', 'spoiler'])
|
|
reaction_writer.writerow(['message_id', 'name', 'reaction_count', 'animated'])
|
|
|
|
async for message in channel.history(limit=limit):
|
|
message_writer.writerow([
|
|
message.id,
|
|
message.author.id,
|
|
int(time.mktime(message.created_at.timetuple())),
|
|
int(time.mktime(message.edited_at.timetuple())) if message.edited_at else 0,
|
|
message.content
|
|
])
|
|
if message.attachments:
|
|
for attachment in message.attachments:
|
|
attachment_writer.writerow([
|
|
message.id,
|
|
attachment.content_type,
|
|
attachment.size,
|
|
attachment.filename,
|
|
attachment.url,
|
|
attachment.is_spoiler()
|
|
])
|
|
if message.reactions:
|
|
for reaction in message.reactions:
|
|
reaction_writer.writerow([
|
|
message.id,
|
|
reaction.emoji if isinstance(reaction.emoji, str) else reaction.emoji.name,
|
|
reaction.count,
|
|
False if isinstance(reaction.emoji, str) else reaction.emoji.animated
|
|
])
|
|
|
|
|
|
async def main():
|
|
client = await request_client()
|
|
|
|
try:
|
|
guild = await request_guild(client)
|
|
directory = (await request_directory()).joinpath(str(guild.id))
|
|
if directory.exists():
|
|
print(f'Directory for guild already exists ({directory.absolute()})')
|
|
return
|
|
else:
|
|
directory.mkdir()
|
|
|
|
max_members = await request_max('users')
|
|
max_messages = await request_max('messages')
|
|
|
|
selected_channels = await request_channels(guild)
|
|
|
|
print('Starting scraping, this make take a while...')
|
|
|
|
with directory.joinpath('channels.csv').open('w+') as channels:
|
|
await scrape_channels(guild, csv.writer(channels))
|
|
print('Scraped channels')
|
|
|
|
with directory.joinpath('members.csv').open('w+') as members:
|
|
await scrape_members(guild, max_members, csv.writer(members))
|
|
print('Scraped members')
|
|
|
|
for channel in selected_channels:
|
|
if isinstance(channel, discord.TextChannel):
|
|
dir = directory.joinpath(str(channel.id))
|
|
dir.mkdir()
|
|
with dir.joinpath('messages.csv').open('w+') as messages, dir.joinpath('attachments.csv').open('w+') as attachments, dir.joinpath('reactions.csv').open('w+') as reactions:
|
|
await scrape_messages(channel, max_messages, csv.writer(messages), csv.writer(attachments), csv.writer(reactions))
|
|
print(f'Scraped channel {channel.id} (#{channel.name})')
|
|
|
|
print('Finished scraping')
|
|
|
|
await client.close()
|
|
|
|
except BaseException:
|
|
await client.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
asyncio.run(main())
|
|
except RuntimeError:
|
|
pass
|