From 2144d8cd5b0088a2e5f7f7bc379919d0b5b2eef9 Mon Sep 17 00:00:00 2001 From: Swee Date: Tue, 10 Sep 2024 21:47:28 -0700 Subject: [PATCH] Create sweebot.py --- sweebot.py | 523 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 523 insertions(+) create mode 100644 sweebot.py diff --git a/sweebot.py b/sweebot.py new file mode 100644 index 0000000..05b5a19 --- /dev/null +++ b/sweebot.py @@ -0,0 +1,523 @@ +import socket +import subprocess +from time import sleep, time, ctime +from os import system as ossystem, path, environ, listdir +import re +from random import choice, randint as random +import traceback +import threading +import sqlite3 +conn = sqlite3.connect("/home/swee/sbirc.db") +database = conn.cursor() +# Code snippet from my buddy Irish +# Define a dictionary to map terminal color codes to IRC color codes +color_map = { + '\033[30m': '\x0301', # Black + '\033[31m': '\x0305', # Red + '\033[32m': '\x0303', # Green + '\033[33m': '\x0307', # Yellow + '\033[34m': '\x0302', # Blue + '\033[35m': '\x0306', # Magenta + '\033[36m': '\x0310', # Cyan + '\033[37m': '\x0315', # White + '\033[90m': '\x0314', # Bright Black (Gray) + '\033[91m': '\x0304', # Bright Red + '\033[92m': '\x0309', # Bright Green + '\033[93m': '\x0308', # Bright Yellow + '\033[94m': '\x0312', # Bright Blue + '\033[95m': '\x0313', # Bright Magenta + '\033[96m': '\x0311', # Bright Cyan + '\033[97m': '\x0316', # Bright White + '\033[0m': '\x03', # Reset +} +pattern = re.compile(r'\033\[\d+(;\d+)*m|\033\[\?25[lh]|\033\[\?7[lh]|\033\[\d+C|\033\[\d+A|\033\[\d+D|\033\[\d+B') + +def replace_color_codes(text): + def replacer(match): + code = match.group(0) + if code in color_map: + return color_map[code] + elif re.match(r'\033\[\d+C', code): + # Handle cursor move right (e.g., \033[30C) + spaces = int(re.findall(r'\d+', code)[0]) + return ' ' * spaces + elif re.match(r'\033\[\d+A', code) or re.match(r'\033\[\d+B', code): + # Handle cursor move up (e.g., \033[17A) and down (e.g., \033[17B) + return '' + elif re.match(r'\033\[\d+D', code): + # Handle cursor move to the start of the line (e.g., \033[9999999D) + return '' + elif re.match(r'\033\[\?25[lh]', code): + # Handle cursor visibility (e.g., \033[?25l or \033[?25h) + return '' + elif re.match(r'\033\[\?7[lh]', code): + # Handle line wrapping (e.g., \033[?7l or \033[?7h) + return '' + else: + return '' + + # Remove all control sequences + cleaned_text = pattern.sub(replacer, text) + + # Split the text into lines and remove lines that contain control sequences or are empty + lines = cleaned_text.split('\n') + lines = [line for line in lines if line.strip() and not re.search(r'\033\[m', line)] + + # Join the cleaned lines back into a single string + return '\n'.join(lines) + +class bot_irc: + + irc_socket = socket.socket() + + def __init__(self): + self.irc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def send_irc(self, channel, msg): + self.irc_socket.send(bytes("PRIVMSG " + channel + " :" + msg + " \n", "UTF-8")) + + def connect_irc(self, server, port, channel, bot_nick, bot_pass, bot_nickpass): + print("Server connection: " + server) + self.irc_socket.connect((server, port)) + + self.irc_socket.send( + bytes( + "USER " + bot_nick + " " + bot_nick + " " + bot_nick + " :SweeBot, a very cool bot made by Swee :3\n", + "UTF-8", + ) + ) + self.irc_socket.send(bytes("NICK " + bot_nick + "\n", "UTF-8")) + self.irc_socket.send( + bytes("PASS " + bot_nickpass + ":" + bot_pass + "\n", "UTF-8") + ) + for i in channel: + self.irc_socket.send(bytes("JOIN " + i + "\n", "UTF-8")) + + def response_irc(self): + try: + r = self.irc_socket.recv(2040).decode() + if r.find("PING") != -1: + self.irc_socket.send( + bytes("PONG " + r.split()[1] + "\r\n", "UTF-8") + ) + return r + except: + return "" + def ping(self): + try: + r = self.irc_socket.recv(2040).decode() + if r.find("PING") != -1: + self.irc_socket.send( + bytes("PONG " + r.split()[1] + "\r\n", "UTF-8") + ) + except: + pass + + +server_irc = "127.0.0.1" # Use 127.0.0.1 for local ZNC +port_irc = 6667 # NO SSL FOR YOU +channel_irc = ["##sweezero"] +botnick_irc = environ.get('SBnick') +botnickpass_irc =environ.get('SBuser') +botpass_irc = environ.get('SBpass') +irc = bot_irc() +irc2 = bot_irc() +irc3 = bot_irc() +irc.connect_irc( + server_irc, port_irc, channel_irc, botnick_irc, botpass_irc, botnickpass_irc +) +threes = [":3", ":3c", "uwu", "owo", "/ᐠ。ꞈ。ᐟ\\", "(✿◠‿◠)"] +snacks = ["PepperMintHTMLs", "Dice App candies", "SweeCrypt codes", "Windows 11 Bloomberry flavored icecream"] +run = 0 +block = 0 +global times +times = 0 +def update(): + open("/home/swee/run.txt", 'w').write(str(run)) + open("/home/swee/block.txt", 'w').write(str(block)) +def getperms(cloak: str): + try: + database.execute("SELECT * FROM users WHERE username = '"+ cloak + "';") + output = database.fetchall() + return output[0][1] + except: + return '' + +update() +def irci2(): + irc2.connect_irc( + server_irc, port_irc, channel_irc, "sweeB0t", botpass_irc, botnickpass_irc + "/secondary" + ) + while True: + irc2.ping() +def irci3(): + irc3.connect_irc( + server_irc, port_irc, channel_irc, "swe3Bot", botpass_irc, botnickpass_irc + "/tertiary" + ) + while True: + irc3.ping() +def multiline(text, channel): + global times + if text.__class__ == bytes: + text = text.decode() + for t in text.split("\n"): + if t != "" and t != " ": + if times > 8: + if times == 9: + sleep(1) + irc3.send_irc(channel, replace_color_codes(t)) + times +=1 + if times == 13: + times = 0 + elif times > 4: + if times == 5: + sleep(1) + irc2.send_irc(channel, replace_color_codes(t)) + times +=1 + else: + if times==0: + sleep(1) + irc.send_irc(channel, replace_color_codes(t)) + times += 1 +def system(cmd, chan): + try: + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(p.stdout.readline, b''): + if text != "" and text != " ": + multiline(line.rstrip(), chan) + p.stdout.close() + p.wait() + except FileNotFoundError: + multiline(cmd[0] + " not found", chan) + except: + multiline(traceback.format_exc()) +irl2 = threading.Thread(target=irci2) +irl2.start() +irl3 = threading.Thread(target=irci3) +irl3.start() +while True: + teext = irc.response_irc().split("\r\n") + for text in teext: + if text != "": + try: + print(text) + except: + print("") + try: + command = text.split("PRIVMSG")[1].split(":")[1:] + command = ":".join(command).split(" ") + channel = text.split("PRIVMSG")[1].split(" ")[1] + nick = text.split(" ")[0][1:].split("!")[0] + username = text.split(" ")[0][1:].split("@")[1] + try: + perms = getperms(username) + except: + perms = "" + print(command) + print(channel) + print(nick) + print(username) + print(perms) + #open("log-text-"+channel, "a").write(" ".join(command) + "\n") + #open("log-name-"+channel, "a").write(nick + "\n") + except: + #print(traceback.format_exc()) + pass + try: + #if True: + if "PRIVMSG" in text: + if "PRIVMSG" in text and command[0][0] == "$": + run+=1 + update() + if command[0] == "$ping": + if random(1,2) == 1: + irc.send_irc(channel, nick + ": Pnog") + else: + irc.send_irc(channel, nick + ": Pong") + + elif command[0] == "$help": + if len(command) > 1: + if "." in " ".join(command[1:]): + irc.send_irc(channel, nick + ": Nice try") + block+=1 + run-=1 + update() + else: + if path.isfile("/home/swee/helps/" + " ".join(command[1:])): + output = open("/home/swee/helps/" + " ".join(command[1:])).read().split("\n") + multiline(output) + else: + irc.send_irc(channel, "Either the command isn't documented yet or it doesn't exist lol") + else: + irc.send_irc(channel, "tip, ping, whoami, perms, version, figlet, tdfiglet, cowsay, uptime, cc, joke. (restart, join, part, shell, config.) Use '$help (command)' for explanation.") + + elif command[0] == "$tip": + result = subprocess.run(['fortune'], stdout=subprocess.PIPE) + output = result.stdout.decode('utf-8').split("\n") + multiline(output) + + elif command[0] == "$whoami": + if perms != "": + irc.send_irc(channel, nick + ": " + username + " (Your cloak has permissions, use the 'perms' command to see)") + else: + irc.send_irc(channel, nick + ": " + username) + + elif command[0] == "$perms": + if len(command) == 2: + irc.send_irc(channel, nick + ": permissions of cloak " + command[1] + ": " + getperms(command[1])) + else: + if perms != "": + irc.send_irc(channel, nick + ": " + perms) + else: + irc.send_irc(channel, nick + ": none") + + elif command[0] == "$restart": + if perms == "full" or "reboot" in perms.split(",") or "restart" in perms.split(","): + irc.send_irc(channel, nick + ": Restarting bot...") + sleep(1) + ossystem("sudo systemctl restart sweebot") + else: + irc.send_irc(channel, nick + ": Permission denied.") + block+=1 + run-=1 + update() + + elif command[0] == "$join": + if perms == "full" or "join" in perms.split(","): + if len(command) == 2: + irc.send_irc(channel, nick + ": Joining " + command[1]) + irc.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8")) + irc2.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8")) + irc3.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8")) + else: + if len(command) == 1: + irc.send_irc(channel, nick + ": I NEED A CHANNEL NAME TO JOIN!!!") + else: + irc.send_irc(channel, nick + ": Y'know this only uses one argument? JUST THE CHANNEL NAME!!!") + else: + irc.send_irc(channel, nick + ": Permission denied.") + block+=1 + run-=1 + update() + + elif command[0] == "$part": + if perms == "full" or "part" in perms.split(","): + if len(command) == 2: + irc.send_irc(channel, nick + ": Parting " + command[1]) + irc.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8")) + irc2.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8")) + irc3.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8")) + else: + if len(command) == 1: + irc.send_irc(channel, "Bye bye!") + sleep(1) + irc.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8")) + irc2.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8")) + irc3.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8")) + else: + irc.send_irc(channel, nick + ": Y'know this only uses one or zero arguments?") + else: + irc.send_irc(channel, nick + ": Permission denied.") + block+=1 + run-=1 + update() + + elif command[0] == "$shell": + if perms == "full": + #or "shell" in perms.split(",") + if len(command) > 1: + try: + system(command[1:], channel) + except Exception as ex: + multiline(traceback.format_exc(), channel) + else: + irc.send_irc(channel, nick + ": What do you want me to F-ing run in the shell?") + else: + irc.send_irc(channel, nick + ": Permission denied.") + block+=1 + run-=1 + update() + + elif command[0] == "$version": + irc.send_irc(channel, "This is SweeBot 0.0.1 patch 3") + + elif command[0] == "$figlet": + if len(command) > 1: + try: + system(['figlet'] + command[1:], channel) + except Exception as ex: + irc.send_irc(channel, nick + ": " + ex.__class__.__name__) + else: + irc.send_irc(channel, nick + ": What text should I enlarge?") + + elif command[0] == "$tdfiglet": + if len(command) > 1: + try: + system(['tdfiglet', '-cm', "-r"] + command[1:], channel) + except Exception as ex: + irc.send_irc(channel, nick + ": " + ex.__class__.__name__) + else: + irc.send_irc(channel, nick + ": What text should I enlarge?") + + elif command[0] == "$cowsay": + if len(command) > 1: + try: + system(['cowsay'] + command[1:], channel) + except Exception as ex: + irc.send_irc(channel, nick + ": " + ex.__class__.__name__) + else: + irc.send_irc(channel, nick + ": What should the cow say?") + + elif command[0] == "$uptime": + try: + system(['uptime -p'], channel) + except Exception as ex: + irc.send_irc(channel, nick + ": " + ex.__class__.__name__) + + elif command[0] == "$joke": + try: + system(['pyjoke'] + command[1:], channel) + except Exception as ex: + multiline(traceback.format_exc(), channel) + + elif command[0] == "$botsnack": + multiline(f"\x01ACTION eats some {choice(snacks)}\x01", channel) + + + #lif command[0] == "$sed": + # if len(command) > 1: + #try: + # if True: + # com = ['sed',"" + command[1] + ""] + # print(command[1]) + # texte = open("log-text-"+channel).read().split("\n") + # texte.reverse() + # texte.pop(0) + # texte.pop(0) + # print(texte) + # texttoreplace = "" + # j = 0 + # broken = False + # for i in texte: + # if command[1].split("/")[1] in i: + # texttoreplace = i + # broken = True + # break + # j+=1 + # if not broken: + # irc.send_irc(channel, "Unable to correct: '" + command[1].split("/")[1] + "' not found in any chat message.") + # continue + # namee = open("log-name-"+channel).read().split("\n") + # namee.reverse() + # namee.pop(0) + # namee.pop(0) + # namee = namee[j] + # open("temp","w").write(texttoreplace) + # com.append("temp") + # print(com) + # result = subprocess.run(com, stdout=subprocess.PIPE) + # output = result.stdout.decode('utf-8').split("\n") + # irc.send_irc(channel, "Correction using sed: <" + namee + "> " + output[0]) + # #except Exception as ex: + # #irc.send_irc(channel, nick + ": " + ex.__class__.__name__) + # else: + # irc.send_irc(channel, nick + ": What to correct?") + + + elif command[0] == "$config": + if perms == "full": + try: + #if True: + if len(command) > 2: + if command[1] == "colonthree": + file = open("/home/swee/chans").read().split("\n") + file2 = open("/home/swee/chanconfig").read().split("\n") + + if command[2] == "1": + if not channel in file: + file.append(channel) + print(file) + file2.append("100000000") + print(file2) + else: + file2[file.index(channel)]="1" + file2[file.index(channel)][1:] + print("\n".join(file)) + open("/home/swee/chans", 'w').write("\n".join(file)) + + open("/home/swee/chanconfig", 'w').write("\n".join(file2)) + irc.send_irc(channel, nick + ": Config complete, I will now use the colonthree function in " + channel) + elif command[2] == "0": + if not channel in file: + file.append(channel) + file2.append("000000000") + else: + file2[file.index(channel)]="0" + file2[file.index(channel)][1:] + open("/home/swee/chans", 'w').write("\n".join(file)) + open("/home/swee/chanconfig", 'w').write("\n".join(file2)) + irc.send_irc(channel, nick + ": Config complete, I will no longer use the colonthree function in " + channel) + else: + irc.send_irc(channel, nick + ": Values are: 1, 0") + else: + irc.send_irc(channel, nick + ": Not enough arguments") + except Exception as ex: + irc.send_irc(channel, nick + ": " + ex.__class__.__name__) + else: + irc.send_irc(channel, nick + ": Permission denied") + block+=1 + run-=1 + update() + + + + + else: + if command[0][:3] == "$.." or command[0][:2] == "$/": + irc.send_irc(channel, nick + ": Nice try.") + block+=1 + run-=1 + update() + else: + if path.isfile("/home/swee/cc/" + command[0][1:]): + try: + com = ['python3'] + comm = command + com.append("/home/swee/cc/" + comm[0][1:]) + com.append(nick) + com.append(username) + comm.pop(0) + for i in comm: + com.append(i) + system(com, channel) + except Exception as ex: + irc.send_irc(channel, nick + ": " + ex.__class__.__name__) + elif path.isdir("/home/swee/cc/" + command[0][1:]): + irc.send_irc(channel, nick + ": Command list under cc/debug:") + irc.send_irc(channel, ", ".join(listdir("/home/swee/cc/" + command[0][1:])) + ".") + + + else: + print(nick + ": Unrecognised command") + + + elif command[0] == ":3": + if channel in open("/home/swee/chans").read().split("\n"): + print() + if open("/home/swee/chanconfig").read().split("\n")[open("/home/swee/chans").read().split("\n").index(channel)][0] == "1": + irc.send_irc(channel, ":3") + else: + irc.send_irc(channel, ":3") + elif ":3c" in command: + if channel in open("/home/swee/chans").read().split("\n"): + print() + if open("/home/swee/chanconfig").read().split("\n")[open("/home/swee/chans").read().split("\n").index(channel)][0] == "1": + irc.send_irc(channel, choice(threes)) + else: + irc.send_irc(channel, choice(threes)) + elif "JOIN" in text and "#nixsanctuary" in text: + nick = text.split(":")[1].split("!")[0] + if nick != "sweeBot": + irc.send_irc("##hiya", "hiya: " + nick + " has joined #nixsanctuary") + pass + + except Exception as ex: + print(traceback.format_exc())