1
0
Fork 0
forked from swee/MeowNex
MeowNex/sweebot.py
2024-12-08 22:39:02 -08:00

694 lines
32 KiB
Python

__version__ = "0.0.2 Funni update" + ". https://git.swee.codes/MeowNex"
import socket
import subprocess
from time import sleep, time, ctime
from os import system as ossystem, path, environ, listdir, getcwd
import sqlite3
cwd = getcwd()
import re
from random import choice, randint as random, randrange
import traceback
import threading
from pathlib import Path
from requests import get, exceptions as rex
from bs4 import BeautifulSoup
from googleapiclient.discovery import build
run = 0
block = 0
parsed = 0
# Dashboard thread
from flask import Flask, send_file
app = Flask(__name__)
@app.route('/')
def index_html():
return send_file("dashboard.html", mimetype='text/html')
@app.route('/run.txt')
def runs():
return str(run)
@app.route('/block.txt')
def blocks():
return str(block)
@app.route('/link.txt')
def parses():
return str(parsed)
@app.route('/script.js')
def script_js():
return send_file("script.js", mimetype='application/javascript')
threading.Thread(target=app.run, daemon=True, kwargs={"port": 2005}).start()
DEVELOPER_KEY = environ["ytapi"]
headers = {
'User-Agent': 'SweeBot IRC ' + __version__
}
class config:
def __init__(self):
self.conn = sqlite3.connect(environ["SBconfig"])
self.database = self.conn.cursor()
def perms(self, cloak: str):
try:
self.database.execute(f"SELECT * FROM users WHERE username = ?;", [cloak])
output = self.database.fetchall()
return output[0][1]
except:
print(traceback.format_exc())
return ''
def chansettings(self, chan: str):
try:
self.database.execute("SELECT * FROM chans WHERE chan = ?;", [chan])
output = self.database.fetchall()
print(output)
temp = output[0][1].split(",")
return temp if temp != [''] else []
except:
print(traceback.format_exc())
return []
def setchanconfig(self, chan: str, flags: list):
try:
print(self.chansettings(chan))
if self.chansettings(chan) == []:
print("[!!!] Channel doesn't exist in config")
self.database.execute("INSERT INTO chans (chan, flags) values(?, ?);", [chan, "+config," + ",".join(flags)])
else:
self.database.execute("UPDATE chans SET FLAGS = ? WHERE chan = ?", [",".join(flags),chan])
self.conn.commit()
except:
print(traceback.format_exc())
return False
def cflagexist(self, chan:str, flag:str):
return flag in self.chansettings(chan)
# Code snippet from my buddy Irish
# Define a dictionary to map terminal color codes to IRC color codes
color_map = {
'\033[30m': '\x0301', # Black
'\033[31m': '\x0305', # Red
'\033[32m': '\x0303', # Green
'\033[33m': '\x0307', # Yellow
'\033[34m': '\x0302', # Blue
'\033[35m': '\x0306', # Magenta
'\033[36m': '\x0310', # Cyan
'\033[37m': '\x0315', # White
'\033[90m': '\x0314', # Bright Black (Gray)
'\033[91m': '\x0304', # Bright Red
'\033[92m': '\x0309', # Bright Green
'\033[93m': '\x0308', # Bright Yellow
'\033[94m': '\x0312', # Bright Blue
'\033[95m': '\x0313', # Bright Magenta
'\033[96m': '\x0311', # Bright Cyan
'\033[97m': '\x0316', # Bright White
'\033[0m': '\x03', # Reset
}
pattern = re.compile(r'\033\[\d+(;\d+)*m|\033\[\?25[lh]|\033\[\?7[lh]|\033\[\d+C|\033\[\d+A|\033\[\d+D|\033\[\d+B')
def humanbytes(B):
"""Return the given bytes as a human friendly KB, MB, GB, or TB string."""
B = float(B)
KB = float(1024)
MB = float(KB ** 2) # 1,048,576
GB = float(KB ** 3) # 1,073,741,824
TB = float(KB ** 4) # 1,099,511,627,776
if B < KB:
return '{0}{1}'.format(B,'Bytes' if 0 == B > 1 else 'Byte')
elif KB <= B < MB:
return '{0:.2f}KB'.format(B / KB)
elif MB <= B < GB:
return '{0:.2f}MB'.format(B / MB)
elif GB <= B < TB:
return '{0:.2f}GB'.format(B / GB)
elif TB <= B:
return '{0:.2f}TB'.format(B / TB)
def replace_color_codes(text):
def replacer(match):
code = match.group(0)
if code in color_map:
return color_map[code]
elif re.match(r'\033\[\d+C', code):
# Handle cursor move right (e.g., \033[30C)
spaces = int(re.findall(r'\d+', code)[0])
return ' ' * spaces
elif re.match(r'\033\[\d+A', code) or re.match(r'\033\[\d+B', code):
# Handle cursor move up (e.g., \033[17A) and down (e.g., \033[17B)
return ''
elif re.match(r'\033\[\d+D', code):
# Handle cursor move to the start of the line (e.g., \033[9999999D)
return ''
elif re.match(r'\033\[\?25[lh]', code):
# Handle cursor visibility (e.g., \033[?25l or \033[?25h)
return ''
elif re.match(r'\033\[\?7[lh]', code):
# Handle line wrapping (e.g., \033[?7l or \033[?7h)
return ''
else:
return ''
# Remove all control sequences
cleaned_text = pattern.sub(replacer, text)
# Split the text into lines and remove lines that contain control sequences or are empty
lines = cleaned_text.split('\n')
lines = [line for line in lines if line.strip() and not re.search(r'\033\[m', line)]
# Join the cleaned lines back into a single string
return '\n'.join(lines)
class bot_irc:
irc_socket = socket.socket()
def __init__(self):
self.irc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def send_irc(self, channel, msg):
self.irc_socket.send(bytes("PRIVMSG " + channel + " :" + msg + "\n", "UTF-8"))
def connect_irc(self, server, port, channel, bot_nick, bot_pass, bot_nickpass):
print("Server connection: " + server)
self.irc_socket.connect((server, port))
self.irc_socket.send(
bytes(
"USER " + bot_nick + " " + bot_nick + " " + bot_nick + " :SweeBot, a very cool bot made by Swee :3\n",
"UTF-8",
)
)
self.irc_socket.send(bytes("NICK " + bot_nick + "\n", "UTF-8"))
self.irc_socket.send(
bytes("PASS " + bot_nickpass + ":" + bot_pass + "\n", "UTF-8")
)
for i in channel:
self.irc_socket.send(bytes("JOIN " + i + "\n", "UTF-8"))
def response_irc(self):
try:
r = self.irc_socket.recv(2040).decode()
if r.find("PING") != -1:
self.irc_socket.send(
bytes("PONG " + r.split()[1] + "\r\n", "UTF-8")
)
return r
except:
return ""
def ping(self):
try:
r = self.irc_socket.recv(2040).decode()
if r.find("PING") != -1:
self.irc_socket.send(
bytes("PONG " + r.split()[1] + "\r\n", "UTF-8")
)
except:
pass
server_irc = "127.0.0.1" # Use 127.0.0.1 for local ZNC
port_irc = 5000 # NO SSL FOR YOU
channel_irc = ["##sweezero"]
botnick_irc = environ.get('SBnick')
botnickpass_irc =environ.get('SBuser')
botpass_irc = environ.get('SBpass')
allowedparse = ["text/html", "application/x-httpd-php", "application/xhtml", "application/xhtml+xml"]
irc = bot_irc()
irc2 = bot_irc()
irc3 = bot_irc()
irc.connect_irc(
server_irc, port_irc, channel_irc, botnick_irc, botpass_irc, botnickpass_irc + "/A"
)
sbconfig = config()
threes = [":3", ":3c", "uwu", "owo", "/ᐠ。ꞈ。ᐟ\\", "(✿◠‿◠)"]
snacks = ["PepperMintHTMLs", "Dice App candies", "SweeCrypt codes", "Windows 11 Bloomberry flavored icecream"]
baps = ["T-T", "Ow!", "Beep!"]
pats = ["-w-", "Meep...", "Prr!"]
meows_happy = ['Meow!', 'Nyaa~', 'Mrow.', 'Prr! :3', "Mrrp?", "Mreow.", "!woeM", "3: !rrP", "~aayN", "Mew!", "Moew!"]
meows_upset = ['Hiss!', "!ssiH", "Grrr..."]
my_self = ["MeowNexUS", "MeowNexU5", "MeowN3xUS"]
happiness = 5
global times
times = 0
def irci2():
irc2.connect_irc(
server_irc, port_irc, channel_irc, "sweeB0t", botpass_irc, botnickpass_irc + "/B"
)
while True:
irc2.ping()
def irci3():
irc3.connect_irc(
server_irc, port_irc, channel_irc, "swe3Bot", botpass_irc, botnickpass_irc + "/C"
)
while True:
irc3.ping()
def multiline(text, channel):
global times
if text.__class__ == bytes:
text = text.decode()
for t in text.split("\n"):
if t != "" and t != " ":
if times > 8:
if times == 9:
sleep(1)
irc3.send_irc(channel, replace_color_codes(t))
times +=1
if times == 13:
times = 0
elif times > 4:
if times == 5:
sleep(1)
irc2.send_irc(channel, replace_color_codes(t))
times +=1
else:
if times==0:
sleep(1)
irc.send_irc(channel, replace_color_codes(t))
times += 1
from sys import exit
def restart():
irc.irc_socket.close()
irc2.irc_socket.close()
irc3.irc_socket.close()
exit(0)
def system(cmd, chan, rstrip=False):
try:
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
outpp = ""
for line in iter(p.stdout.readline, b''):
if text != "" and text != " ":
if not rstrip:
multiline(line.rstrip(), chan)
else:
outpp += line.decode().rstrip() + " "
if rstrip:
multiline(outpp[:-1], chan)
p.stdout.close()
p.wait()
except FileNotFoundError:
multiline(cmd[0] + " not found", chan)
except:
multiline(traceback.format_exc(), chan)
irl2 = threading.Thread(target=irci2, daemon=True)
irl2.start()
irl3 = threading.Thread(target=irci3, daemon=True)
irl3.start()
while True:
teext = irc.response_irc().split("\r\n")
for text in teext:
if text != "":
try:
print(text)
except:
print("")
try:
command = text.split("PRIVMSG")[1].split(":")[1:]
command = ":".join(command).split(" ")
channel = text.split("PRIVMSG")[1].split(" ")[1]
nick = text.split(" ")[0][1:].split("!")[0]
username = text.split(" ")[0][1:].split("@")[1]
try:
perms = sbconfig.perms(username)
except:
perms = ""
print(command)
print(channel)
print(nick)
print(username)
print(perms)
#open("log-text-"+channel, "a").write(" ".join(command) + "\n")
#open("log-name-"+channel, "a").write(nick + "\n")
except:
#print(traceback.format_exc())
pass
try:
#if True:
if "PRIVMSG" in text and not nick in my_self:
if "PRIVMSG" in text and command[0][0] == "$":
run+=1
if command[0] == "$ping":
if random(1,2) == 1:
irc.send_irc(channel, nick + ": Pnog")
else:
irc.send_irc(channel, nick + ": Pong")
elif command[0] == "$help":
if len(command) > 1:
if "." in " ".join(command[1:]):
irc.send_irc(channel, nick + ": Nice try")
block+=1
run-=1
else:
try:
if path.isfile(cwd + "/helps/" + " ".join(command[1:])):
output = open(cwd + "/helps/" + " ".join(command[1:])).read()
multiline(output, nick if sbconfig.cflagexist(channel, "-multiline") else channel)
else:
irc.send_irc(channel, "Either the command isn't documented yet or it doesn't exist lol")
except:
multiline(output, channel)
else:
irc.send_irc(channel, "tip, ping, whoami, perms, version, figlet, tdfiglet, cowsay, uptime, cc, joke, botsnack. (restart, join, part, shell, config, pull.) Use '$help (command)' for explanation.")
elif command[0] == "$tip":
system(["/usr/games/fortune", "debian-hints"], channel, True)
elif command[0] == "$whoami":
if perms != "":
irc.send_irc(channel, nick + ": " + username + " (Your cloak has permissions, use the 'perms' command to see)")
else:
irc.send_irc(channel, nick + ": " + username)
elif command[0] == "$welcome":
name = nick if not len(command) > 1 else " ".join(command[1:])
if random(1,2) == 1:
irc.send_irc(channel, f"Heyo! Welcome to the channel, {name}")
else:
irc.send_irc(channel, f"Welcome, {name}")
elif command[0] == "$perms":
if len(command) == 2:
if sbconfig.perms(command[1]) != "":
irc.send_irc(channel, nick + ": permissions of cloak " + command[1] + ": " + sbconfig.perms(command[1]))
else:
irc.send_irc(channel, nick + ": The cloak " + command[1] + " Doesn't have permissions.")
else:
if perms != "":
irc.send_irc(channel, nick + ": " + perms)
else:
irc.send_irc(channel, nick + ": none")
elif command[0] == "$restart":
if perms == "full" or "reboot" in perms.split(",") or "restart" in perms.split(","):
irc.send_irc(channel, nick + ": Restarting bot...")
sleep(1)
restart()
else:
irc.send_irc(channel, nick + ": Permission denied.")
block+=1
run-=1
elif command[0] == "$pull":
if perms == "full" or "git" in perms.split(","):
ossystem("git fetch")
ossystem("git pull")
irc.send_irc(channel, nick + ": Pulled from Git, restarting bot...")
sleep(1)
ossystem("sudo systemctl restart sweebot")
else:
irc.send_irc(channel, nick + ": Permission denied.")
block+=1
run-=1
elif command[0] == "$join":
if perms == "full" or "join" in perms.split(","):
if len(command) == 2:
irc.send_irc(channel, nick + ": Joining " + command[1])
irc.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
irc2.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
irc3.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
multiline(channel=command[1], text="A wild bot appears! ($join initiated by " + nick + ")")
else:
if len(command) == 1:
irc.send_irc(channel, nick + ": I NEED A CHANNEL NAME TO JOIN!!!")
else:
irc.send_irc(channel, nick + ": Y'know this only uses one argument? JUST THE CHANNEL NAME!!!")
else:
irc.send_irc(channel, nick + ": Permission denied.")
block+=1
run-=1
elif command[0] == "$part":
if perms == "full" or "part" in perms.split(","):
if len(command) == 2:
irc.send_irc(channel, nick + ": Parting " + command[1])
irc.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
irc2.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
irc3.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
else:
if len(command) == 1:
irc.send_irc(channel, "Bye bye!")
sleep(1)
irc.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
irc2.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
irc3.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
else:
irc.send_irc(channel, nick + ": Y'know this only uses one or zero arguments?")
else:
irc.send_irc(channel, nick + ": Permission denied.")
block+=1
run-=1
elif command[0] == "$socket":
if perms == "full":
if len(command) > 1:
irc.send_irc(channel, nick + ": Sending to socket...")
irc.irc_socket.send(bytes(" ".join(command[1:]) + "\n", "UTF-8"))
irc2.irc_socket.send(bytes(" ".join(command[1:]) + "\n", "UTF-8"))
irc3.irc_socket.send(bytes(" ".join(command[1:]) + "\n", "UTF-8"))
else:
irc.send_irc(channel, nick + ": what to send to the server?")
else:
irc.send_irc(channel, nick + ": Permission denied.")
block+=1
run-=1
elif command[0] == "$patpat" or command[0] == "$headpat":
if len(command) == 2:
for i in range(0,int(command[1])):
happiness += (1 if happiness != 10 else 0)
elif len(command) == 1:
happiness += (1 if happiness != 10 else 0)
multiline(choice(pats) + " (Emotion: Upset " +
("#" * int(happiness)) +
("-" * int(10-happiness)) + " Happy)",
channel)
elif command[0] == "$emotion":
multiline("Emotion: Upset " +
("#" * int(happiness)) +
("-" * int(10-happiness)) + " Happy",
channel)
elif command[0] == "$bap":
if len(command) == 2:
for i in range(0,int(command[1])):
happiness -= (1 if happiness != 0 else 0)
elif len(command) == 1:
happiness -= (1 if happiness != 0 else 0)
multiline(choice(baps) + " (Emotion: Upset " +
("#" * int(happiness)) +
("-" * int(10-happiness)) + " Happy)",
channel)
elif command[0] == "$meow":
if random(1,100) > (happiness*10):
multiline(choice(meows_upset), channel)
else:
multiline(choice(meows_happy), channel)
elif command[0] == "$stats":
multiline(f"{nick}: {run} command(s) run, {block} refused, and {parsed} link(s) parsed, check out https://irc-bot.swee.codes", channel)
elif command[0] == "$shell":
if perms == "full":
#or "shell" in perms.split(",")
if len(command) > 1:
try:
system(command[1:], channel)
except Exception as ex:
multiline(traceback.format_exc(), channel)
else:
irc.send_irc(channel, nick + ": What do you want me to F-ing run in the shell?")
else:
irc.send_irc(channel, nick + ": Permission denied.")
block+=1
run-=1
elif command[0] == "$version":
irc.send_irc(channel, "This is SweeBot " + __version__)
elif command[0] == "$figlet":
if not sbconfig.cflagexist(channel, "-multiline"):
if len(command) > 1:
try:
system(['figlet'] + command[1:], channel)
except Exception as ex:
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
else:
irc.send_irc(channel, nick + ": What text should I enlarge?")
else:
multiline("Multiline commands are disabled in this channel.", channel)
elif command[0] == "$tdfiglet":
if not sbconfig.cflagexist(channel, "-multiline"):
if len(command) > 1:
try:
system(['tdfiglet', '-cm', "-r"] + command[1:], channel)
except Exception as ex:
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
else:
irc.send_irc(channel, nick + ": What text should I enlarge?")
else:
multiline("Multiline commands are disabled in this channel.", channel)
elif command[0] == "$cowsay":
if not sbconfig.cflagexist(channel, "-multiline"):
if len(command) > 1:
try:
system(['/usr/games/cowsay'] + command[1:], channel)
except Exception as ex:
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
else:
irc.send_irc(channel, nick + ": What should the cow say?")
else:
multiline("Multiline commands are disabled in this channel.", channel)
elif command[0] == "$uptime":
try:
system(['uptime', '-p'], channel)
except Exception as ex:
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
elif command[0] == "$joke":
try:
system(['pyjoke'] + command[1:], channel)
except Exception as ex:
multiline(traceback.format_exc(), channel)
elif command[0] == "$botsnack":
multiline(f"\x01ACTION eats some {choice(snacks)}\x01", channel)
elif command[0] == "$hug":
name = nick if not len(command) > 1 else " ".join(command[1:])
multiline(f"\x01ACTION gives a hug to {name}\x01", channel)
elif command[0] == "$sed":
if len(command) > 1:
try:
#if True:
raise Exception("WorkInProgress!!!")
except:
print(traceback.format_exc())
else:
irc.send_irc(channel, nick + ": What to correct?")
elif command[0] == "$config":
try:
if len(command) > 1:
if perms == "full":
abort = False
flag = sbconfig.chansettings(channel)
for i in command[1:]:
if i == "clear":
flag = ["+config"]
elif i[0] == "+":
if "-" + i[1:] in flag:
flag[flag.index("-" + i[1:])] = i
else:
flag.append(i)
elif i[0] == "-":
if "+" + i[1:] in flag:
flag[flag.index("+" + i[1:])] = i
else:
flag.append(i)
else:
multiline(nick + ": flag name should begin with + or -", channel)
abort = True
break
if not abort:
print(flag)
sbconfig.setchanconfig(channel, flag)
multiline(nick + ": Successfuly applied configuration: " + " ".join(flag), channel)
else:
irc.send_irc(channel, nick + ": Permission denied")
block+=1
run-=1
else:
multiline("Configuration for " + channel + ": " + " ".join(sbconfig.chansettings(channel)), channel)
except ex:
multiline(str(ex), channel)
else:
if ".." in command[0][:3]:
irc.send_irc(channel, nick + ": Nice try.")
block+=1
run-=1
else:
if path.isfile("cc/" + command[0][1:]):
try:
if sbconfig.cflagexist(channel, "-multiline") and "#require-multiline" in open("cc/" + command[0][1:]).read():
irc.send_irc(channel, "Multiline commands are disabled in this channel.")
else:
com = ['python3']
comm = command
com.append("cc/" + comm[0][1:])
com.append(nick)
com.append(username)
comm.pop(0)
for i in comm:
com.append(i)
system(com, channel)
except Exception as ex:
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
elif path.isdir("cc/" + command[0][1:]):
irc.send_irc(channel, nick + ": Command list under cc/debug: " + ", ".join(listdir("cc/" + command[0][1:])) + ".")
else:
print(nick + ": Unrecognised command")
elif command[0] == ":3":
if not sbconfig.cflagexist(channel, "-colonthree"):
irc.send_irc(channel, ":3")
elif ":3c" in command:
if not sbconfig.cflagexist(channel, "-colonthree"):
multiline(channel, choice(threes))
# try to parse and find a link
if sbconfig.cflagexist(channel, "+links"):
try:
for i in command:
if i[:8] == "https://":
try:
e = get(i, headers=headers, timeout=10)
header = e.headers
content_type = header.get('content-type').split(";")[0]
content_len = header.get('Content-length')
if content_type in allowedparse:
if e.ok:
soup = BeautifulSoup(e.text, 'html.parser')
multiline("(" + nick + ") " + (" ".join(soup.title.string.splitlines())[:100] if soup.title != None else "[No title provided]"), channel)
else:
multiline("(" + nick + ") [HTTP " + str(e.status_code) + "]", channel)
else:
multiline("(" + nick + ") [" + humanbytes(content_len) + " " + str(content_type) + "]", channel)
parsed += 1
except rex.SSLError as ex:
multiline("(" + nick + ") [SSL Error: " + str(ex.message) + "]", channel)
except Exception as ex:
multiline("(" + nick + ") [Request error: " + str(ex.message) + "]", channel)
elif i[:7] == "http://":
e = get(i, headers=headers, timeout=10)
header = e.headers
content_type = header.get('content-type').split(";")[0]
content_len = header.get('Content-length')
if content_type in allowedparse:
if e.ok:
soup = BeautifulSoup(e.text, 'html.parser')
multiline("(" + nick + ") " + (" ".join(soup.title.string.splitlines())[:100] if soup.title != None else "[No title provided]"), channel)
else:
multiline("(" + nick + ") [HTTP " + str(e.status_code) + "]", channel)
else:
multiline("(" + nick + ") [" + humanbytes(content_len) + " " + str(content_type) + "]", channel)
parsed += 1
except:
print(traceback.format_exc())
elif "JOIN" in text and "#nixsanctuary" in text:
nick = text.split(":")[1].split("!")[0]
if not "Meow" in nick:
irc.send_irc("##hiya", "hiya: " + nick + " has joined #nixsanctuary")
pass
except Exception as ex:
print(traceback.format_exc())