Create sweebot.py
This commit is contained in:
parent
8ac8dcb0f3
commit
2144d8cd5b
1 changed files with 523 additions and 0 deletions
523
sweebot.py
Normal file
523
sweebot.py
Normal file
|
@ -0,0 +1,523 @@
|
|||
import socket
|
||||
import subprocess
|
||||
from time import sleep, time, ctime
|
||||
from os import system as ossystem, path, environ, listdir
|
||||
import re
|
||||
from random import choice, randint as random
|
||||
import traceback
|
||||
import threading
|
||||
import sqlite3
|
||||
conn = sqlite3.connect("/home/swee/sbirc.db")
|
||||
database = conn.cursor()
|
||||
# Code snippet from my buddy Irish
|
||||
# Define a dictionary to map terminal color codes to IRC color codes
|
||||
color_map = {
|
||||
'\033[30m': '\x0301', # Black
|
||||
'\033[31m': '\x0305', # Red
|
||||
'\033[32m': '\x0303', # Green
|
||||
'\033[33m': '\x0307', # Yellow
|
||||
'\033[34m': '\x0302', # Blue
|
||||
'\033[35m': '\x0306', # Magenta
|
||||
'\033[36m': '\x0310', # Cyan
|
||||
'\033[37m': '\x0315', # White
|
||||
'\033[90m': '\x0314', # Bright Black (Gray)
|
||||
'\033[91m': '\x0304', # Bright Red
|
||||
'\033[92m': '\x0309', # Bright Green
|
||||
'\033[93m': '\x0308', # Bright Yellow
|
||||
'\033[94m': '\x0312', # Bright Blue
|
||||
'\033[95m': '\x0313', # Bright Magenta
|
||||
'\033[96m': '\x0311', # Bright Cyan
|
||||
'\033[97m': '\x0316', # Bright White
|
||||
'\033[0m': '\x03', # Reset
|
||||
}
|
||||
pattern = re.compile(r'\033\[\d+(;\d+)*m|\033\[\?25[lh]|\033\[\?7[lh]|\033\[\d+C|\033\[\d+A|\033\[\d+D|\033\[\d+B')
|
||||
|
||||
def replace_color_codes(text):
|
||||
def replacer(match):
|
||||
code = match.group(0)
|
||||
if code in color_map:
|
||||
return color_map[code]
|
||||
elif re.match(r'\033\[\d+C', code):
|
||||
# Handle cursor move right (e.g., \033[30C)
|
||||
spaces = int(re.findall(r'\d+', code)[0])
|
||||
return ' ' * spaces
|
||||
elif re.match(r'\033\[\d+A', code) or re.match(r'\033\[\d+B', code):
|
||||
# Handle cursor move up (e.g., \033[17A) and down (e.g., \033[17B)
|
||||
return ''
|
||||
elif re.match(r'\033\[\d+D', code):
|
||||
# Handle cursor move to the start of the line (e.g., \033[9999999D)
|
||||
return ''
|
||||
elif re.match(r'\033\[\?25[lh]', code):
|
||||
# Handle cursor visibility (e.g., \033[?25l or \033[?25h)
|
||||
return ''
|
||||
elif re.match(r'\033\[\?7[lh]', code):
|
||||
# Handle line wrapping (e.g., \033[?7l or \033[?7h)
|
||||
return ''
|
||||
else:
|
||||
return ''
|
||||
|
||||
# Remove all control sequences
|
||||
cleaned_text = pattern.sub(replacer, text)
|
||||
|
||||
# Split the text into lines and remove lines that contain control sequences or are empty
|
||||
lines = cleaned_text.split('\n')
|
||||
lines = [line for line in lines if line.strip() and not re.search(r'\033\[m', line)]
|
||||
|
||||
# Join the cleaned lines back into a single string
|
||||
return '\n'.join(lines)
|
||||
|
||||
class bot_irc:
|
||||
|
||||
irc_socket = socket.socket()
|
||||
|
||||
def __init__(self):
|
||||
self.irc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
def send_irc(self, channel, msg):
|
||||
self.irc_socket.send(bytes("PRIVMSG " + channel + " :" + msg + " \n", "UTF-8"))
|
||||
|
||||
def connect_irc(self, server, port, channel, bot_nick, bot_pass, bot_nickpass):
|
||||
print("Server connection: " + server)
|
||||
self.irc_socket.connect((server, port))
|
||||
|
||||
self.irc_socket.send(
|
||||
bytes(
|
||||
"USER " + bot_nick + " " + bot_nick + " " + bot_nick + " :SweeBot, a very cool bot made by Swee :3\n",
|
||||
"UTF-8",
|
||||
)
|
||||
)
|
||||
self.irc_socket.send(bytes("NICK " + bot_nick + "\n", "UTF-8"))
|
||||
self.irc_socket.send(
|
||||
bytes("PASS " + bot_nickpass + ":" + bot_pass + "\n", "UTF-8")
|
||||
)
|
||||
for i in channel:
|
||||
self.irc_socket.send(bytes("JOIN " + i + "\n", "UTF-8"))
|
||||
|
||||
def response_irc(self):
|
||||
try:
|
||||
r = self.irc_socket.recv(2040).decode()
|
||||
if r.find("PING") != -1:
|
||||
self.irc_socket.send(
|
||||
bytes("PONG " + r.split()[1] + "\r\n", "UTF-8")
|
||||
)
|
||||
return r
|
||||
except:
|
||||
return ""
|
||||
def ping(self):
|
||||
try:
|
||||
r = self.irc_socket.recv(2040).decode()
|
||||
if r.find("PING") != -1:
|
||||
self.irc_socket.send(
|
||||
bytes("PONG " + r.split()[1] + "\r\n", "UTF-8")
|
||||
)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
server_irc = "127.0.0.1" # Use 127.0.0.1 for local ZNC
|
||||
port_irc = 6667 # NO SSL FOR YOU
|
||||
channel_irc = ["##sweezero"]
|
||||
botnick_irc = environ.get('SBnick')
|
||||
botnickpass_irc =environ.get('SBuser')
|
||||
botpass_irc = environ.get('SBpass')
|
||||
irc = bot_irc()
|
||||
irc2 = bot_irc()
|
||||
irc3 = bot_irc()
|
||||
irc.connect_irc(
|
||||
server_irc, port_irc, channel_irc, botnick_irc, botpass_irc, botnickpass_irc
|
||||
)
|
||||
threes = [":3", ":3c", "uwu", "owo", "/ᐠ。ꞈ。ᐟ\\", "(✿◠‿◠)"]
|
||||
snacks = ["PepperMintHTMLs", "Dice App candies", "SweeCrypt codes", "Windows 11 Bloomberry flavored icecream"]
|
||||
run = 0
|
||||
block = 0
|
||||
global times
|
||||
times = 0
|
||||
def update():
|
||||
open("/home/swee/run.txt", 'w').write(str(run))
|
||||
open("/home/swee/block.txt", 'w').write(str(block))
|
||||
def getperms(cloak: str):
|
||||
try:
|
||||
database.execute("SELECT * FROM users WHERE username = '"+ cloak + "';")
|
||||
output = database.fetchall()
|
||||
return output[0][1]
|
||||
except:
|
||||
return ''
|
||||
|
||||
update()
|
||||
def irci2():
|
||||
irc2.connect_irc(
|
||||
server_irc, port_irc, channel_irc, "sweeB0t", botpass_irc, botnickpass_irc + "/secondary"
|
||||
)
|
||||
while True:
|
||||
irc2.ping()
|
||||
def irci3():
|
||||
irc3.connect_irc(
|
||||
server_irc, port_irc, channel_irc, "swe3Bot", botpass_irc, botnickpass_irc + "/tertiary"
|
||||
)
|
||||
while True:
|
||||
irc3.ping()
|
||||
def multiline(text, channel):
|
||||
global times
|
||||
if text.__class__ == bytes:
|
||||
text = text.decode()
|
||||
for t in text.split("\n"):
|
||||
if t != "" and t != " ":
|
||||
if times > 8:
|
||||
if times == 9:
|
||||
sleep(1)
|
||||
irc3.send_irc(channel, replace_color_codes(t))
|
||||
times +=1
|
||||
if times == 13:
|
||||
times = 0
|
||||
elif times > 4:
|
||||
if times == 5:
|
||||
sleep(1)
|
||||
irc2.send_irc(channel, replace_color_codes(t))
|
||||
times +=1
|
||||
else:
|
||||
if times==0:
|
||||
sleep(1)
|
||||
irc.send_irc(channel, replace_color_codes(t))
|
||||
times += 1
|
||||
def system(cmd, chan):
|
||||
try:
|
||||
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
for line in iter(p.stdout.readline, b''):
|
||||
if text != "" and text != " ":
|
||||
multiline(line.rstrip(), chan)
|
||||
p.stdout.close()
|
||||
p.wait()
|
||||
except FileNotFoundError:
|
||||
multiline(cmd[0] + " not found", chan)
|
||||
except:
|
||||
multiline(traceback.format_exc())
|
||||
irl2 = threading.Thread(target=irci2)
|
||||
irl2.start()
|
||||
irl3 = threading.Thread(target=irci3)
|
||||
irl3.start()
|
||||
while True:
|
||||
teext = irc.response_irc().split("\r\n")
|
||||
for text in teext:
|
||||
if text != "":
|
||||
try:
|
||||
print(text)
|
||||
except:
|
||||
print("")
|
||||
try:
|
||||
command = text.split("PRIVMSG")[1].split(":")[1:]
|
||||
command = ":".join(command).split(" ")
|
||||
channel = text.split("PRIVMSG")[1].split(" ")[1]
|
||||
nick = text.split(" ")[0][1:].split("!")[0]
|
||||
username = text.split(" ")[0][1:].split("@")[1]
|
||||
try:
|
||||
perms = getperms(username)
|
||||
except:
|
||||
perms = ""
|
||||
print(command)
|
||||
print(channel)
|
||||
print(nick)
|
||||
print(username)
|
||||
print(perms)
|
||||
#open("log-text-"+channel, "a").write(" ".join(command) + "\n")
|
||||
#open("log-name-"+channel, "a").write(nick + "\n")
|
||||
except:
|
||||
#print(traceback.format_exc())
|
||||
pass
|
||||
try:
|
||||
#if True:
|
||||
if "PRIVMSG" in text:
|
||||
if "PRIVMSG" in text and command[0][0] == "$":
|
||||
run+=1
|
||||
update()
|
||||
if command[0] == "$ping":
|
||||
if random(1,2) == 1:
|
||||
irc.send_irc(channel, nick + ": Pnog")
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": Pong")
|
||||
|
||||
elif command[0] == "$help":
|
||||
if len(command) > 1:
|
||||
if "." in " ".join(command[1:]):
|
||||
irc.send_irc(channel, nick + ": Nice try")
|
||||
block+=1
|
||||
run-=1
|
||||
update()
|
||||
else:
|
||||
if path.isfile("/home/swee/helps/" + " ".join(command[1:])):
|
||||
output = open("/home/swee/helps/" + " ".join(command[1:])).read().split("\n")
|
||||
multiline(output)
|
||||
else:
|
||||
irc.send_irc(channel, "Either the command isn't documented yet or it doesn't exist lol")
|
||||
else:
|
||||
irc.send_irc(channel, "tip, ping, whoami, perms, version, figlet, tdfiglet, cowsay, uptime, cc, joke. (restart, join, part, shell, config.) Use '$help (command)' for explanation.")
|
||||
|
||||
elif command[0] == "$tip":
|
||||
result = subprocess.run(['fortune'], stdout=subprocess.PIPE)
|
||||
output = result.stdout.decode('utf-8').split("\n")
|
||||
multiline(output)
|
||||
|
||||
elif command[0] == "$whoami":
|
||||
if perms != "":
|
||||
irc.send_irc(channel, nick + ": " + username + " (Your cloak has permissions, use the 'perms' command to see)")
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": " + username)
|
||||
|
||||
elif command[0] == "$perms":
|
||||
if len(command) == 2:
|
||||
irc.send_irc(channel, nick + ": permissions of cloak " + command[1] + ": " + getperms(command[1]))
|
||||
else:
|
||||
if perms != "":
|
||||
irc.send_irc(channel, nick + ": " + perms)
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": none")
|
||||
|
||||
elif command[0] == "$restart":
|
||||
if perms == "full" or "reboot" in perms.split(",") or "restart" in perms.split(","):
|
||||
irc.send_irc(channel, nick + ": Restarting bot...")
|
||||
sleep(1)
|
||||
ossystem("sudo systemctl restart sweebot")
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": Permission denied.")
|
||||
block+=1
|
||||
run-=1
|
||||
update()
|
||||
|
||||
elif command[0] == "$join":
|
||||
if perms == "full" or "join" in perms.split(","):
|
||||
if len(command) == 2:
|
||||
irc.send_irc(channel, nick + ": Joining " + command[1])
|
||||
irc.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
|
||||
irc2.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
|
||||
irc3.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
|
||||
else:
|
||||
if len(command) == 1:
|
||||
irc.send_irc(channel, nick + ": I NEED A CHANNEL NAME TO JOIN!!!")
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": Y'know this only uses one argument? JUST THE CHANNEL NAME!!!")
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": Permission denied.")
|
||||
block+=1
|
||||
run-=1
|
||||
update()
|
||||
|
||||
elif command[0] == "$part":
|
||||
if perms == "full" or "part" in perms.split(","):
|
||||
if len(command) == 2:
|
||||
irc.send_irc(channel, nick + ": Parting " + command[1])
|
||||
irc.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
|
||||
irc2.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
|
||||
irc3.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
|
||||
else:
|
||||
if len(command) == 1:
|
||||
irc.send_irc(channel, "Bye bye!")
|
||||
sleep(1)
|
||||
irc.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
|
||||
irc2.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
|
||||
irc3.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": Y'know this only uses one or zero arguments?")
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": Permission denied.")
|
||||
block+=1
|
||||
run-=1
|
||||
update()
|
||||
|
||||
elif command[0] == "$shell":
|
||||
if perms == "full":
|
||||
#or "shell" in perms.split(",")
|
||||
if len(command) > 1:
|
||||
try:
|
||||
system(command[1:], channel)
|
||||
except Exception as ex:
|
||||
multiline(traceback.format_exc(), channel)
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": What do you want me to F-ing run in the shell?")
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": Permission denied.")
|
||||
block+=1
|
||||
run-=1
|
||||
update()
|
||||
|
||||
elif command[0] == "$version":
|
||||
irc.send_irc(channel, "This is SweeBot 0.0.1 patch 3")
|
||||
|
||||
elif command[0] == "$figlet":
|
||||
if len(command) > 1:
|
||||
try:
|
||||
system(['figlet'] + command[1:], channel)
|
||||
except Exception as ex:
|
||||
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": What text should I enlarge?")
|
||||
|
||||
elif command[0] == "$tdfiglet":
|
||||
if len(command) > 1:
|
||||
try:
|
||||
system(['tdfiglet', '-cm', "-r"] + command[1:], channel)
|
||||
except Exception as ex:
|
||||
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": What text should I enlarge?")
|
||||
|
||||
elif command[0] == "$cowsay":
|
||||
if len(command) > 1:
|
||||
try:
|
||||
system(['cowsay'] + command[1:], channel)
|
||||
except Exception as ex:
|
||||
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": What should the cow say?")
|
||||
|
||||
elif command[0] == "$uptime":
|
||||
try:
|
||||
system(['uptime -p'], channel)
|
||||
except Exception as ex:
|
||||
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
||||
|
||||
elif command[0] == "$joke":
|
||||
try:
|
||||
system(['pyjoke'] + command[1:], channel)
|
||||
except Exception as ex:
|
||||
multiline(traceback.format_exc(), channel)
|
||||
|
||||
elif command[0] == "$botsnack":
|
||||
multiline(f"\x01ACTION eats some {choice(snacks)}\x01", channel)
|
||||
|
||||
|
||||
#lif command[0] == "$sed":
|
||||
# if len(command) > 1:
|
||||
#try:
|
||||
# if True:
|
||||
# com = ['sed',"" + command[1] + ""]
|
||||
# print(command[1])
|
||||
# texte = open("log-text-"+channel).read().split("\n")
|
||||
# texte.reverse()
|
||||
# texte.pop(0)
|
||||
# texte.pop(0)
|
||||
# print(texte)
|
||||
# texttoreplace = ""
|
||||
# j = 0
|
||||
# broken = False
|
||||
# for i in texte:
|
||||
# if command[1].split("/")[1] in i:
|
||||
# texttoreplace = i
|
||||
# broken = True
|
||||
# break
|
||||
# j+=1
|
||||
# if not broken:
|
||||
# irc.send_irc(channel, "Unable to correct: '" + command[1].split("/")[1] + "' not found in any chat message.")
|
||||
# continue
|
||||
# namee = open("log-name-"+channel).read().split("\n")
|
||||
# namee.reverse()
|
||||
# namee.pop(0)
|
||||
# namee.pop(0)
|
||||
# namee = namee[j]
|
||||
# open("temp","w").write(texttoreplace)
|
||||
# com.append("temp")
|
||||
# print(com)
|
||||
# result = subprocess.run(com, stdout=subprocess.PIPE)
|
||||
# output = result.stdout.decode('utf-8').split("\n")
|
||||
# irc.send_irc(channel, "Correction using sed: <" + namee + "> " + output[0])
|
||||
# #except Exception as ex:
|
||||
# #irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
||||
# else:
|
||||
# irc.send_irc(channel, nick + ": What to correct?")
|
||||
|
||||
|
||||
elif command[0] == "$config":
|
||||
if perms == "full":
|
||||
try:
|
||||
#if True:
|
||||
if len(command) > 2:
|
||||
if command[1] == "colonthree":
|
||||
file = open("/home/swee/chans").read().split("\n")
|
||||
file2 = open("/home/swee/chanconfig").read().split("\n")
|
||||
|
||||
if command[2] == "1":
|
||||
if not channel in file:
|
||||
file.append(channel)
|
||||
print(file)
|
||||
file2.append("100000000")
|
||||
print(file2)
|
||||
else:
|
||||
file2[file.index(channel)]="1" + file2[file.index(channel)][1:]
|
||||
print("\n".join(file))
|
||||
open("/home/swee/chans", 'w').write("\n".join(file))
|
||||
|
||||
open("/home/swee/chanconfig", 'w').write("\n".join(file2))
|
||||
irc.send_irc(channel, nick + ": Config complete, I will now use the colonthree function in " + channel)
|
||||
elif command[2] == "0":
|
||||
if not channel in file:
|
||||
file.append(channel)
|
||||
file2.append("000000000")
|
||||
else:
|
||||
file2[file.index(channel)]="0" + file2[file.index(channel)][1:]
|
||||
open("/home/swee/chans", 'w').write("\n".join(file))
|
||||
open("/home/swee/chanconfig", 'w').write("\n".join(file2))
|
||||
irc.send_irc(channel, nick + ": Config complete, I will no longer use the colonthree function in " + channel)
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": Values are: 1, 0")
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": Not enough arguments")
|
||||
except Exception as ex:
|
||||
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
||||
else:
|
||||
irc.send_irc(channel, nick + ": Permission denied")
|
||||
block+=1
|
||||
run-=1
|
||||
update()
|
||||
|
||||
|
||||
|
||||
|
||||
else:
|
||||
if command[0][:3] == "$.." or command[0][:2] == "$/":
|
||||
irc.send_irc(channel, nick + ": Nice try.")
|
||||
block+=1
|
||||
run-=1
|
||||
update()
|
||||
else:
|
||||
if path.isfile("/home/swee/cc/" + command[0][1:]):
|
||||
try:
|
||||
com = ['python3']
|
||||
comm = command
|
||||
com.append("/home/swee/cc/" + comm[0][1:])
|
||||
com.append(nick)
|
||||
com.append(username)
|
||||
comm.pop(0)
|
||||
for i in comm:
|
||||
com.append(i)
|
||||
system(com, channel)
|
||||
except Exception as ex:
|
||||
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
||||
elif path.isdir("/home/swee/cc/" + command[0][1:]):
|
||||
irc.send_irc(channel, nick + ": Command list under cc/debug:")
|
||||
irc.send_irc(channel, ", ".join(listdir("/home/swee/cc/" + command[0][1:])) + ".")
|
||||
|
||||
|
||||
else:
|
||||
print(nick + ": Unrecognised command")
|
||||
|
||||
|
||||
elif command[0] == ":3":
|
||||
if channel in open("/home/swee/chans").read().split("\n"):
|
||||
print()
|
||||
if open("/home/swee/chanconfig").read().split("\n")[open("/home/swee/chans").read().split("\n").index(channel)][0] == "1":
|
||||
irc.send_irc(channel, ":3")
|
||||
else:
|
||||
irc.send_irc(channel, ":3")
|
||||
elif ":3c" in command:
|
||||
if channel in open("/home/swee/chans").read().split("\n"):
|
||||
print()
|
||||
if open("/home/swee/chanconfig").read().split("\n")[open("/home/swee/chans").read().split("\n").index(channel)][0] == "1":
|
||||
irc.send_irc(channel, choice(threes))
|
||||
else:
|
||||
irc.send_irc(channel, choice(threes))
|
||||
elif "JOIN" in text and "#nixsanctuary" in text:
|
||||
nick = text.split(":")[1].split("!")[0]
|
||||
if nick != "sweeBot":
|
||||
irc.send_irc("##hiya", "hiya: " + nick + " has joined #nixsanctuary")
|
||||
pass
|
||||
|
||||
except Exception as ex:
|
||||
print(traceback.format_exc())
|
Loading…
Reference in a new issue