-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcrashnt.py
executable file
·130 lines (120 loc) · 5.1 KB
/
crashnt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3
import discord
from discord.ext import commands
import asyncio
import requests
import logging
import subprocess
import os
import sys
import re
from uuid import uuid4
log = logging.getLogger(__name__)
bot = commands.Bot(command_prefix='~')
whitelist = []
with open("owner.txt", "r") as f:
owner = int(f.readline())
with open("token.txt", "r") as f:
token = f.readline()
with open("channels.txt", "r") as f:
for line in f.read().splitlines():
whitelist.append(int(line))
@bot.event
async def on_ready():
log.debug('Logged in as:\n{0} (ID: {0.id})'.format(bot.user))
@bot.event
async def on_message(msg):
if msg.author.bot: return
if msg.author.id == owner and msg.content.startswith("~"): # Debug mode
m = msg.content[1:]
log.debug("Evaluate: {}".format(m))
try:
result = eval(m) # Add "await" before eval(m) to access Discord async methods
log.debug(result)
await msg.channel.send("```\n{}```".format(result))
except:
await msg.channel.send("```\nError:\n{}```".format(traceback.format_exc().replace("```", "\`\`\`")))
if msg.channel.id in whitelist:
log.debug("{}: {}".format(msg.author, msg.content))
# If someone sends a message in our watched channels, check content first
if re.search(r"https?:.*\.mp4", msg.content):
urls = re.findall(r"https?:.*\.mp4", msg.content)
for url in urls:
r = requests.get(url)
fname = str(uuid4())
with open("tmp/{}.mp4".format(fname), "wb") as f:
f.write(r.content)
if await parse(fname, msg): # If we found a crash vid, skip the rest.
return
# Check embeds too
if msg.embeds:
for embed in msg.embeds:
if hasattr(embed, "video") and embed.video.url:
if embed.video.url.endswith("mp4"):
r = requests.get(embed.video.url)
fname = str(uuid4())
with open("tmp/{}.mp4".format(fname), "wb") as f:
f.write(r.content)
if await parse(fname, msg):
return
# Finally, check attachments
if msg.attachments:
for attachment in msg.attachments:
log.debug("Filetype: {}".format(attachment.content_type))
if attachment.content_type == "video/mp4":
fname = str(uuid4())
await attachment.save("tmp/{}.mp4".format(fname))
if await parse(fname, msg):
return
async def parse(fname, msg):
log.debug("Analyzing '{}' <{}>".format(fname, msg))
was_bad = False
try:
probeRes = subprocess.check_output(["ffprobe", "-v", "error", "-show_entries", "frame=width,height,pix_fmt", "-select_streams", "v", "-of", "csv=p=0", "tmp/{}.mp4".format(fname)], stderr=subprocess.STDOUT, universal_newlines=True).strip().split("\n")
except subprocess.CalledProcessError as e:
log.error("ffprobe has crashed with return code {}".format(e.returncode))
if e.returncode == -9: # If ffprobe runs out of memory, it's likely bad, but just to be safe, we won't kick
await msg.delete()
await msg.channel.send(":hammer: Possible crash gif detected. Notifying <@{}>\nUser: <@{}>".format(owner, msg.author.id))
was_bad = True
else:
first = True
for width, height, fmt in [i.split(",") for i in probeRes]:
if first:
first_frame_vals = (width, height, fmt)
log.debug("First frame vals: {}".format(first_frame_vals))
first = False
continue
if (width, height, fmt) != first_frame_vals:
log.debug("Anomaly detected: ({}, {}, {}) has deviated from {}".format(width, height, fmt, first_frame_vals))
if fmt != first_frame_vals[2] or int(width) > 8000 or int(height) > 8000:
try:
await msg.delete()
except:
pass
await msg.channel.send(":hammer: Crash gif detected. Kicking user... <@{}>".format(msg.author.id))
await msg.author.kick(reason="Crash gif")
was_bad = True
break
log.debug("Done analyzing.")
os.remove("tmp/{}.mp4".format(fname))
if was_bad:
return True
else:
return False
if __name__ == "__main__":
root = logging.getLogger()
root.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
logging.getLogger("discord").setLevel(logging.WARNING)
logging.getLogger("websockets").setLevel(logging.WARNING)
if not os.path.exists('tmp'):
os.makedirs('tmp')
try:
bot.run(token, bot=True)
except RuntimeError:
sys.exit(0)