957 lines
48 KiB
Python
957 lines
48 KiB
Python
__version__ = "1.0.2 DOcker-SHELL update" + ". https://git.swee.codes/swee/MeowNex"
|
|
import socket
|
|
import subprocess
|
|
from time import sleep, time, ctime
|
|
from os import system as ossystem, path, environ, listdir, getcwd
|
|
import sqlite3
|
|
cwd = getcwd()
|
|
import re
|
|
from random import choice, randint as random, randrange
|
|
import traceback
|
|
import threading
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse, parse_qs
|
|
from requests import get, exceptions as rex
|
|
from bs4 import BeautifulSoup
|
|
from googleapiclient.discovery import build
|
|
import uuid
|
|
import ssl
|
|
import requests
|
|
import docker
|
|
dock = docker.from_env()
|
|
global lastquery
|
|
AIurl = environ["AIurl"]
|
|
AIsession = requests.Session()
|
|
AIsession.auth = (environ["AIuser"], environ["AIpass"])
|
|
AImodel = environ["AImodel"] if "AImodel" in environ else "llama2"
|
|
run = 0
|
|
block = 0
|
|
parsed = 0
|
|
lastquery = "None yet."
|
|
# Dashboard thread
|
|
from flask import Flask, send_file, abort as abrt, redirect as redir
|
|
app = Flask(__name__)
|
|
responses_ai = {} # {uuid: response}
|
|
|
|
@app.route('/')
|
|
def index_html():
|
|
return send_file("dashboard.html", mimetype='text/html')
|
|
@app.route('/run.txt')
|
|
def runs():
|
|
return str(run)
|
|
@app.route('/block.txt')
|
|
def blocks():
|
|
return str(block)
|
|
@app.route('/link.txt')
|
|
def parses():
|
|
return str(parsed)
|
|
@app.route('/script.js')
|
|
def script_js():
|
|
return send_file("script.js", mimetype='application/javascript')
|
|
@app.route('/lastquery.txt')
|
|
def lastquery_txt():
|
|
return str(lastquery)
|
|
@app.route('/docs/config')
|
|
def config_docs():
|
|
return redir("https://git.swee.codes/swee/MeowNex/src/branch/main/docs/config.md")
|
|
@app.route('/response/<uuidd>')
|
|
def ai_response_truncate(uuidd):
|
|
if uuidd in responses_ai:
|
|
res = responses_ai[uuidd]
|
|
return f'<title>AI response {uuidd}</title>\n<meta name="viewport" content="width=device-width, initial-scale=1.0">\n<link rel="stylesheet" href="https://swee.codes/style.css" type="text/css">\n<body><center><h1>AI response {uuidd}</h1><br><br><div class=code width="100%" style="word-wrap: break-word;white-space: pre;text-align: left;">' + res
|
|
else:
|
|
abrt(404)
|
|
threading.Thread(target=app.run, daemon=True, kwargs={"port": 2005}).start()
|
|
cntr = None
|
|
# YouTube API
|
|
DEVELOPER_KEY = environ["ytapi"]
|
|
headers = {
|
|
'User-Agent': 'MeowNexUS IRC ' + __version__
|
|
}
|
|
def get_yt_id(url):
|
|
query = urlparse(url)
|
|
# youtu.be already contains the ID in the path
|
|
if query.hostname == 'youtu.be': return query.path[1:]
|
|
if query.hostname in {'www.youtube.com', 'youtube.com', 'music.youtube.com'}:
|
|
# URLs that have the ID in the path instead of the query.
|
|
integrated_in_url = ["watch", "embed", "v", "shorts"]
|
|
try:
|
|
# The regular /watch path, which stores the ID in the query.
|
|
if query.path == '/watch': return parse_qs(query.query)['v'][0]
|
|
# Alternatively, it will get the ID in the path if the path was in the list above.
|
|
elif query.path.split('/')[1] in integrated_in_url: return query.path.split('/')[2]
|
|
except:
|
|
return None
|
|
class config:
|
|
def __init__(self):
|
|
self.conn = sqlite3.connect(environ["SBconfig"])
|
|
self.database = self.conn.cursor()
|
|
def perms(self, mask: str):
|
|
try:
|
|
self.database.execute(f"SELECT * FROM users;")
|
|
output = self.database.fetchall()
|
|
for i in output:
|
|
if re.match(i[0].replace("*", ".+"), mask):
|
|
return i[1]
|
|
return ''
|
|
except:
|
|
print(traceback.format_exc())
|
|
return ''
|
|
def chansettings(self, chan: str):
|
|
try:
|
|
self.database.execute("SELECT * FROM chans WHERE chan = ?;", [chan])
|
|
output = self.database.fetchall()
|
|
print(output)
|
|
temp = output[0][1].split(",")
|
|
return temp if temp != [''] else []
|
|
except:
|
|
print(traceback.format_exc())
|
|
return []
|
|
def setchanconfig(self, chan: str, flags: list):
|
|
try:
|
|
print(self.chansettings(chan))
|
|
if self.chansettings(chan) == []:
|
|
print("[!!!] Channel doesn't exist in config")
|
|
self.database.execute("INSERT INTO chans (chan, flags) values(?, ?);", [chan, "+config," + ",".join(flags)])
|
|
else:
|
|
self.database.execute("UPDATE chans SET FLAGS = ? WHERE chan = ?", [",".join(flags),chan])
|
|
self.conn.commit()
|
|
except:
|
|
print(traceback.format_exc())
|
|
return False
|
|
def cflagexist(self, chan:str, flag:str):
|
|
return flag in self.chansettings(chan)
|
|
|
|
|
|
# Code snippet from my buddy Irish
|
|
# Define a dictionary to map terminal color codes to IRC color codes
|
|
color_map = {
|
|
'\033[30m': '\x0301', # Black
|
|
'\033[31m': '\x0305', # Red
|
|
'\033[32m': '\x0303', # Green
|
|
'\033[33m': '\x0307', # Yellow
|
|
'\033[34m': '\x0302', # Blue
|
|
'\033[35m': '\x0306', # Magenta
|
|
'\033[36m': '\x0310', # Cyan
|
|
'\033[37m': '\x0315', # White
|
|
'\033[90m': '\x0314', # Bright Black (Gray)
|
|
'\033[91m': '\x0304', # Bright Red
|
|
'\033[92m': '\x0309', # Bright Green
|
|
'\033[93m': '\x0308', # Bright Yellow
|
|
'\033[94m': '\x0312', # Bright Blue
|
|
'\033[95m': '\x0313', # Bright Magenta
|
|
'\033[96m': '\x0311', # Bright Cyan
|
|
'\033[97m': '\x0316', # Bright White
|
|
'\033[0m': '\x03', # Reset
|
|
}
|
|
pattern = re.compile(r'\033\[\d+(;\d+)*m|\033\[\?25[lh]|\033\[\?7[lh]|\033\[\d+C|\033\[\d+A|\033\[\d+D|\033\[\d+B')
|
|
def humanbytes(B):
|
|
"""Return the given bytes as a human friendly KB, MB, GB, or TB string."""
|
|
B = float(B)
|
|
KB = float(1024)
|
|
MB = float(KB ** 2) # 1,048,576
|
|
GB = float(KB ** 3) # 1,073,741,824
|
|
TB = float(KB ** 4) # 1,099,511,627,776
|
|
|
|
if B < KB:
|
|
return '{0}{1}'.format(B,'Bytes' if 0 == B > 1 else 'Byte')
|
|
elif KB <= B < MB:
|
|
return '{0:.2f}KB'.format(B / KB)
|
|
elif MB <= B < GB:
|
|
return '{0:.2f}MB'.format(B / MB)
|
|
elif GB <= B < TB:
|
|
return '{0:.2f}GB'.format(B / GB)
|
|
elif TB <= B:
|
|
return '{0:.2f}TB'.format(B / TB)
|
|
def rplce(text, old, new, n):
|
|
parts = text.split(old)
|
|
if len(parts) <= n:
|
|
return text # Not enough occurrences to replace
|
|
return old.join(parts[:n]) + new + old.join(parts[n:])
|
|
def replace_color_codes(text):
|
|
def replacer(match):
|
|
code = match.group(0)
|
|
if code in color_map:
|
|
return color_map[code]
|
|
elif re.match(r'\033\[\d+C', code):
|
|
# Handle cursor move right (e.g., \033[30C)
|
|
spaces = int(re.findall(r'\d+', code)[0])
|
|
return ' ' * spaces
|
|
elif re.match(r'\033\[\d+A', code) or re.match(r'\033\[\d+B', code):
|
|
# Handle cursor move up (e.g., \033[17A) and down (e.g., \033[17B)
|
|
return ''
|
|
elif re.match(r'\033\[\d+D', code):
|
|
# Handle cursor move to the start of the line (e.g., \033[9999999D)
|
|
return ''
|
|
elif re.match(r'\033\[\?25[lh]', code):
|
|
# Handle cursor visibility (e.g., \033[?25l or \033[?25h)
|
|
return ''
|
|
elif re.match(r'\033\[\?7[lh]', code):
|
|
# Handle line wrapping (e.g., \033[?7l or \033[?7h)
|
|
return ''
|
|
else:
|
|
return ''
|
|
|
|
# Remove all control sequences
|
|
cleaned_text = pattern.sub(replacer, text)
|
|
|
|
# Split the text into lines and remove lines that contain control sequences or are empty
|
|
lines = cleaned_text.split('\n')
|
|
lines = [line for line in lines if line.strip() and not re.search(r'\033\[m', line)]
|
|
|
|
# Join the cleaned lines back into a single string
|
|
return '\n'.join(lines)
|
|
|
|
class bot_irc:
|
|
|
|
irc_socket = socket.socket()
|
|
|
|
def __init__(self):
|
|
self.irc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
def send_irc(self, channel, msg):
|
|
self.irc_socket.send(bytes("PRIVMSG " + channel + " :" + msg + "\n", "UTF-8"))
|
|
|
|
def connect_irc(self, server, port, channel, bot_nick, bot_pass, bot_nickpass):
|
|
print("Server connection: " + server)
|
|
self.irc_socket.connect((server, port))
|
|
|
|
self.irc_socket.send(
|
|
bytes(
|
|
"USER " + bot_nick + " " + bot_nick + " " + bot_nick + " :MeowNexUS, a very cool bot made by Swee :3\n",
|
|
"UTF-8",
|
|
)
|
|
)
|
|
self.irc_socket.send(bytes("NICK " + bot_nick + "\n", "UTF-8"))
|
|
self.irc_socket.send(
|
|
bytes("PASS " + bot_nickpass + ":" + bot_pass + "\n", "UTF-8")
|
|
)
|
|
for i in channel:
|
|
self.irc_socket.send(bytes("JOIN " + i + "\n", "UTF-8"))
|
|
|
|
def response_irc(self):
|
|
try:
|
|
r = self.irc_socket.recv(2040).decode()
|
|
if r.find("PING") != -1:
|
|
self.irc_socket.send(
|
|
bytes("PONG " + r.split()[1] + "\r\n", "UTF-8")
|
|
)
|
|
return r
|
|
except:
|
|
return ""
|
|
def ping(self):
|
|
try:
|
|
r = self.irc_socket.recv(2040).decode()
|
|
if r.find("PING") != -1:
|
|
self.irc_socket.send(
|
|
bytes("PONG " + r.split()[1] + "\r\n", "UTF-8")
|
|
)
|
|
except:
|
|
pass
|
|
|
|
|
|
server_irc = "127.0.0.1" # Use 127.0.0.1 for local ZNC
|
|
port_irc = 5000 # NO SSL FOR YOU
|
|
channel_irc = ["##sweezero"]
|
|
botnick_irc = environ.get('SBnick')
|
|
botnickpass_irc =environ.get('SBuser')
|
|
botpass_irc = environ.get('SBpass')
|
|
allowedparse = ["text/html", "application/x-httpd-php", "application/xhtml", "application/xhtml+xml"]
|
|
irc = bot_irc()
|
|
irc2 = bot_irc()
|
|
irc3 = bot_irc()
|
|
irc.connect_irc(
|
|
server_irc, port_irc, channel_irc, botnick_irc, botpass_irc, botnickpass_irc + "/A"
|
|
)
|
|
sbconfig = config()
|
|
threes = [":3", ":3c", "uwu", "owo", "/ᐠ。ꞈ。ᐟ\\", "(✿◠‿◠)"]
|
|
snacks = ["PepperMintHTMLs", "Dice App candies", "SweeCrypt codes", "Windows 11 Bloomberry flavored icecream"]
|
|
baps = ["T-T", "Ow!", "Beep!"]
|
|
pats = ["-w-", "Meep...", "Prr!"]
|
|
meows_happy = ['Meow!', 'Nyaa~', 'Mrow.', 'Prr! :3', "Mrrp?", "Mreow.", "!woeM", "3: !rrP", "~aayN", "Mew!", "Moew!"]
|
|
meows_upset = ['Hiss!', "!ssiH", "Grrr..."]
|
|
my_self = ["MeowNexUS", "MeowNexU5", "MeowN3xUS"]
|
|
logs = {} # {channel: [line, line]} max 10 lines
|
|
happiness = 5
|
|
global times
|
|
times = 0
|
|
ai_storage = {}
|
|
ai_quotes = ["Oki lemme think about this one...", "Oki gimme a sec...", "Processing...", "Stand by...", "Meow..."]
|
|
emoticons = {";3": "I think you forgot to press shift", ":#": "Wait no, don't hold shift THAT long!", ";#": "You are joking with me at this point."}
|
|
def irci2():
|
|
irc2.connect_irc(
|
|
server_irc, port_irc, channel_irc, "sweeB0t", botpass_irc, botnickpass_irc + "/B"
|
|
)
|
|
while True:
|
|
irc2.ping()
|
|
def irci3():
|
|
irc3.connect_irc(
|
|
server_irc, port_irc, channel_irc, "swe3Bot", botpass_irc, botnickpass_irc + "/C"
|
|
)
|
|
while True:
|
|
irc3.ping()
|
|
def multiline(text, channel):
|
|
global times
|
|
if text.__class__ == bytes:
|
|
text = text.decode()
|
|
for t in text.split("\n"):
|
|
if t != "" and t != " ":
|
|
if times > 8:
|
|
if times == 9:
|
|
sleep(1)
|
|
irc3.send_irc(channel, replace_color_codes(t))
|
|
times +=1
|
|
if times == 13:
|
|
times = 0
|
|
elif times > 4:
|
|
if times == 5:
|
|
sleep(1)
|
|
irc2.send_irc(channel, replace_color_codes(t))
|
|
times +=1
|
|
else:
|
|
if times==0:
|
|
sleep(1)
|
|
irc.send_irc(channel, replace_color_codes(t))
|
|
times += 1
|
|
from sys import exit
|
|
def restart():
|
|
irc.irc_socket.close()
|
|
irc2.irc_socket.close()
|
|
irc3.irc_socket.close()
|
|
exit(0)
|
|
def system(cmd, chan, rstrip=False):
|
|
try:
|
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
outpp = ""
|
|
for line in iter(p.stdout.readline, b''):
|
|
if text != "" and text != " ":
|
|
if not rstrip:
|
|
multiline(line.rstrip(), chan)
|
|
else:
|
|
outpp += line.decode().rstrip() + " "
|
|
if rstrip:
|
|
multiline(outpp[:-1], chan)
|
|
p.stdout.close()
|
|
p.wait()
|
|
except FileNotFoundError:
|
|
multiline(cmd[0] + " not found", chan)
|
|
except:
|
|
multiline(traceback.format_exc(), chan)
|
|
irl2 = threading.Thread(target=irci2, daemon=True)
|
|
irl2.start()
|
|
irl3 = threading.Thread(target=irci3, daemon=True)
|
|
irl3.start()
|
|
ticker=0
|
|
while True:
|
|
teext = irc.response_irc().split("\r\n")
|
|
for text in teext:
|
|
if text != "":
|
|
try:
|
|
print(text)
|
|
except:
|
|
print("")
|
|
try:
|
|
command = text.split("PRIVMSG")[1].split(":")[1:]
|
|
command = ":".join(command).split(" ")
|
|
channel = text.split("PRIVMSG")[1].split(" ")[1]
|
|
nick = text.split(" ")[0][1:].split("!")[0]
|
|
username = text.split(" ")[0][1:].split("@")[1]
|
|
mask = text.split(" ")[0][1:]
|
|
try:
|
|
perms = sbconfig.perms(mask)
|
|
except:
|
|
perms = ""
|
|
print(command)
|
|
print(channel)
|
|
print(nick)
|
|
print(username)
|
|
print(perms)
|
|
cont = " ".join(command)
|
|
if not channel in logs:
|
|
logs[channel] = [{"nick": nick, "content": cont}]
|
|
else:
|
|
logs[channel].append({"nick": nick, "content": cont})
|
|
if len(logs[channel]) > 128: logs[channel] = logs[channel][::-1][:128][::-1]
|
|
#open("log-text-"+channel, "a").write(" ".join(command) + "\n")
|
|
#open("log-name-"+channel, "a").write(nick + "\n")
|
|
except:
|
|
print(traceback.format_exc())
|
|
def replased(expression):
|
|
if channel in logs:
|
|
try:
|
|
if expression.split("/")[0].lower() != "s": raise IndexError()
|
|
find = expression.split("/")[1]
|
|
replace = expression.split("/")[2]
|
|
user = None
|
|
repeat = False
|
|
index = 1
|
|
try:
|
|
extraflags = expression.split("/")[3:]
|
|
for i in extraflags:
|
|
if i.strip() == "":
|
|
pass
|
|
elif i.lower() == "g":
|
|
repeat = True
|
|
elif i.strip().isdigit():
|
|
print(f"index = {int(i)}")
|
|
index = int(i)
|
|
else:
|
|
print(f"user = {i.lower()}")
|
|
user = i.lower()
|
|
except: pass
|
|
cache = None
|
|
for i in logs[channel][::-1]:
|
|
if find in i["content"] and (user == None or i["nick"].lower() == user) and not "ignore" in i:
|
|
cache = i
|
|
break
|
|
if cache == None:
|
|
raise Exception("The specified text couldn't be found")
|
|
else:
|
|
if repeat:
|
|
cache["content"] = cache["content"].replace(find, replace)
|
|
else:
|
|
cache["content"] = rplce(cache["content"], find, replace, index)
|
|
cachenick = cache["nick"]
|
|
cachecontent = cache["content"]
|
|
multiline(f"[RPL] <{cachenick}> {cachecontent}", channel)
|
|
except IndexError:
|
|
irc.send_irc(channel, nick + ": [ERR] Sed expression might be incorrectly written!")
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": [ERR] " + str(ex))
|
|
else:
|
|
irc.send_irc(channel, nick + ": [!!!] No logs saved in " + channel)
|
|
try:
|
|
#if True:
|
|
if "PRIVMSG" in text and (not "ignore" in sbconfig.perms(mask)) and not nick in my_self:
|
|
if "PRIVMSG" in text and command[0][0] == "$":
|
|
logs[channel] = logs[channel][:-1]
|
|
run+=1
|
|
if command[0] == "$ping":
|
|
if random(1,2) == 1:
|
|
irc.send_irc(channel, nick + ": Pnog")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Pong")
|
|
|
|
elif command[0] == "$help":
|
|
if len(command) > 1:
|
|
if "." in " ".join(command[1:]):
|
|
irc.send_irc(channel, nick + ": Nice try")
|
|
block+=1
|
|
run-=1
|
|
|
|
else:
|
|
try:
|
|
if path.isfile(cwd + "/helps/" + " ".join(command[1:])):
|
|
output = open(cwd + "/helps/" + " ".join(command[1:])).read()
|
|
multiline(output, nick if sbconfig.cflagexist(channel, "-multiline") else channel)
|
|
else:
|
|
irc.send_irc(channel, "Either the command isn't documented yet or it doesn't exist lol")
|
|
except:
|
|
multiline(output, channel)
|
|
else:
|
|
irc.send_irc(channel, "tip, ping, whoami, perms, version, figlet, tdfiglet, cowsay, uptime, cc, joke, botsnack. (restart, join, part, shell, config, pull.) Use '$help (command)' for explanation.")
|
|
|
|
elif command[0] == "$tip":
|
|
system(["/usr/games/fortune", "debian-hints"], channel, True)
|
|
|
|
elif command[0] == "$whoami":
|
|
if perms != "":
|
|
irc.send_irc(channel, nick + ": " + mask + " (Your hostmask has permissions, use the 'perms' command to see)")
|
|
else:
|
|
irc.send_irc(channel, nick + ": " + mask)
|
|
|
|
elif command[0] == "$welcome":
|
|
name = nick if not len(command) > 1 else " ".join(command[1:])
|
|
if random(1,2) == 1:
|
|
irc.send_irc(channel, f"Heyo! Welcome to the channel, {name}")
|
|
else:
|
|
irc.send_irc(channel, f"Welcome, {name}")
|
|
|
|
elif command[0] == "$perms":
|
|
if len(command) == 2:
|
|
if sbconfig.perms(command[1]) != "":
|
|
irc.send_irc(channel, nick + ": permissions of hostmask " + command[1] + ": " + sbconfig.perms(command[1]))
|
|
else:
|
|
irc.send_irc(channel, nick + ": The hostmask wildcard " + command[1] + " Doesn't have permissions.")
|
|
else:
|
|
if perms != "":
|
|
irc.send_irc(channel, nick + ": " + perms)
|
|
else:
|
|
irc.send_irc(channel, nick + ": none")
|
|
|
|
elif command[0] == "$restart":
|
|
if perms == "full" or "reboot" in perms.split(",") or "restart" in perms.split(","):
|
|
irc.send_irc(channel, nick + ": Restarting bot...")
|
|
sleep(1)
|
|
restart()
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
|
|
elif command[0] == "$pull":
|
|
if perms == "full" or "git" in perms.split(","):
|
|
ossystem("git fetch")
|
|
ossystem("git pull")
|
|
irc.send_irc(channel, nick + ": Pulled from Git, restarting bot...")
|
|
restart()
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
|
|
elif command[0] == "$join":
|
|
if perms == "full" or "join" in perms.split(","):
|
|
if len(command) == 2:
|
|
irc.send_irc(channel, nick + ": Joining " + command[1])
|
|
irc.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
|
|
irc2.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
|
|
irc3.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
|
|
multiline(channel=command[1], text="A wild bot appears! ($join initiated by " + nick + ")")
|
|
else:
|
|
if len(command) == 1:
|
|
irc.send_irc(channel, nick + ": I NEED A CHANNEL NAME TO JOIN!!!")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Y'know this only uses one argument? JUST THE CHANNEL NAME!!!")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
|
|
elif command[0] == "$part":
|
|
if perms == "full" or "part" in perms.split(","):
|
|
if len(command) == 2:
|
|
irc.send_irc(channel, nick + ": Parting " + command[1])
|
|
irc.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
|
|
irc2.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
|
|
irc3.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
|
|
else:
|
|
if len(command) == 1:
|
|
irc.send_irc(channel, "Bye bye!")
|
|
sleep(1)
|
|
irc.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
|
|
irc2.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
|
|
irc3.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
|
|
else:
|
|
irc.send_irc(channel, nick + ": Y'know this only uses one or zero arguments?")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
elif command[0] == "$ai-chat":
|
|
try:
|
|
|
|
multiline(choice(ai_quotes), channel)
|
|
prettylogs = ""
|
|
for i in logs[channel][-15:]:
|
|
prettylogs += i["nick"] + ": " + i["content"] + "\n"
|
|
prettylogs = prettylogs.strip()
|
|
chatquery = f"--Details--\nYour happiness level is {happiness}/10, your emotion level will impact the conversation's tone.\nYour name is MeowNexUS, you're a bot who acts like an anthropomorphic cat/furry\nYour beloved creator is named Swee.\nDo not start your reply with your name, people already know who you are.\nYou should not use ANY emojis at all, instead, use the much cuter ASCII emoticons and kaomojis such as :3\nIf the user's message contains ^ that means the person wants more details from the last message in the provided history.\nYou will be provided with the last 15 lines of this channel's chat logs to improve the conversation's context, Don't mind if your own messages have mutliple lines, and do not repeat any of the messages listed, it will be considered annoying.\nThe bottom of the chat history in the prompt will be the main question\n--Chat on {channel}--\n{prettylogs}\n--main question--\n{nick}: " + " ".join(command)
|
|
postAttempt = AIsession.post(AIurl, json={"model": AImodel, "prompt": chatquery, "stream": False})
|
|
print(postAttempt.content)
|
|
if not postAttempt.ok:
|
|
raise Exception("API Returned non-okay status code: " + str(postAttempt.status_code))
|
|
aimsg = postAttempt.json()["response"]
|
|
randomname = str(uuid.uuid4())
|
|
responses_ai[randomname] = aimsg
|
|
mtline = ""
|
|
lastquery = f"<p>Channel: {channel}</p><p>Request:</p><br><br><textarea class=code width=100% spellcheck=\"false\" autocapitalize=\"off\" autocomplete=\"off\" autocorrect=\"off\" readonly=\"true\">{chatquery}</textarea><hr><p>Response</p><br><br><textarea class=code width=100% spellcheck=\"false\" autocapitalize=\"off\" autocomplete=\"off\" autocorrect=\"off\" readonly=\"true\">{aimsg}</textarea>"
|
|
broke = False
|
|
logs[channel].append({"nick": nick, "content": " ".join(command)})
|
|
logs[channel].append({"nick": "MeowNexUS", "content": aimsg, "ignore": True})
|
|
for num, i in enumerate(aimsg.split("\n")):
|
|
if num == 5:
|
|
broke = True
|
|
break
|
|
elif len(i) > 256:
|
|
mtline += "\n" + i[:256] + "..."
|
|
broke = True
|
|
else:
|
|
mtline += "\n" + i
|
|
if broke:
|
|
mtline += f"\nView full response: https://irc-bot.swee.codes/response/{randomname}"
|
|
multiline(mtline, channel)
|
|
except Exception as ex:
|
|
multiline(f"Something went wrong while connecting to the AI server: {ex}", channel)
|
|
print(traceback.format_exc())
|
|
|
|
#elif command[0] == "$ai-clear":
|
|
# if channel in ai_storage:
|
|
# del ai_storage[channel]
|
|
# multiline("Done.", channel)
|
|
# else:
|
|
# multiline(f"There is no existing AI conversation in {channel}", channel)
|
|
elif command[0] == "$socket":
|
|
if perms == "full":
|
|
if len(command) > 1:
|
|
irc.send_irc(channel, nick + ": Sending to socket...")
|
|
irc.irc_socket.send(bytes(" ".join(command[1:]) + "\n", "UTF-8"))
|
|
irc2.irc_socket.send(bytes(" ".join(command[1:]) + "\n", "UTF-8"))
|
|
irc3.irc_socket.send(bytes(" ".join(command[1:]) + "\n", "UTF-8"))
|
|
else:
|
|
irc.send_irc(channel, nick + ": what to send to the server?")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
|
|
elif command[0] == "$patpat" or command[0] == "$headpat":
|
|
if len(command) == 2:
|
|
if int(command[1]) < 50:
|
|
for i in range(0,int(command[1])):
|
|
happiness += (1 if happiness != 10 else 0)
|
|
else:
|
|
happiness = 10
|
|
elif len(command) == 1:
|
|
happiness += (1 if happiness != 10 else 0)
|
|
if len(command) != 2 or int(command[1]) < 50:
|
|
multiline(choice(pats) + " (Emotion: Upset " +
|
|
("#" * int(happiness)) +
|
|
("-" * int(10-happiness)) + " Happy)",
|
|
channel)
|
|
else:
|
|
multiline("Okay that's enough!!" + " (Emotion: Upset " +
|
|
("#" * int(happiness)) +
|
|
("-" * int(10-happiness)) + " Happy)",
|
|
channel)
|
|
|
|
elif command[0] == "$emotion":
|
|
multiline("Emotion: Upset " +
|
|
("#" * int(happiness)) +
|
|
("-" * int(10-happiness)) + " Happy",
|
|
channel)
|
|
elif command[0] == "$bap":
|
|
if len(command) == 2:
|
|
if int(command[1]) < 50:
|
|
for i in range(0,int(command[1])):
|
|
happiness -= (1 if happiness != 0 else 0)
|
|
else:
|
|
happiness = 0
|
|
elif len(command) == 1:
|
|
happiness -= (1 if happiness != 0 else 0)
|
|
if len(command) != 2 or int(command[1]) < 50:
|
|
multiline(choice(baps) + " (Emotion: Upset " +
|
|
("#" * int(happiness)) +
|
|
("-" * int(10-happiness)) + " Happy)",
|
|
channel)
|
|
else:
|
|
multiline("Okay that's enough!!" + " (Emotion: Upset " +
|
|
("#" * int(happiness)) +
|
|
("-" * int(10-happiness)) + " Happy)",
|
|
channel)
|
|
elif command[0] == "$meow":
|
|
if random(1,100) > (happiness*10):
|
|
multiline(choice(meows_upset), channel)
|
|
else:
|
|
multiline(choice(meows_happy), channel)
|
|
elif command[0] == "$stats":
|
|
multiline(f"{nick}: {run} command(s) run, {block} refused, and {parsed} link(s) parsed, check out https://irc-bot.swee.codes", channel)
|
|
elif command[0] == "$shell":
|
|
if perms == "full":
|
|
#or "shell" in perms.split(",")
|
|
if len(command) > 1:
|
|
try:
|
|
system(command[1:], channel)
|
|
except Exception as ex:
|
|
multiline(traceback.format_exc(), channel)
|
|
else:
|
|
irc.send_irc(channel, nick + ": What do you want me to F-ing run in the shell?")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
elif command[0] == "$dopull":
|
|
irc.send_irc(channel, nick + ":[...] Please wait, pulling docker image...")
|
|
image = dock.images.pull('alpine')
|
|
irc.send_irc(channel, nick + ":[OK] Pulled docker image, running command now.")
|
|
elif command[0] == "$doshell":
|
|
if len(command) > 1:
|
|
try:
|
|
print("Running container")
|
|
if cntr == None:
|
|
cntr = dock.containers.run('alpine', "/bin/ash", detach=True, tty=True)
|
|
print("Executing...")
|
|
_, stream = cntr.exec_run(cmd=" ".join(command[1:]))
|
|
multiline(stream, channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ":[ERR] Traceback:")
|
|
multiline(traceback.format_exc(), channel)
|
|
else:
|
|
irc.send_irc(channel, nick + ":[ERR] ")
|
|
elif command[0] == "$doreset":
|
|
if cntr == None:
|
|
irc.send_irc(channel, nick + ":[OK] No container is loaded yet.")
|
|
else:
|
|
cntr.remove()
|
|
cntr = None
|
|
irc.send_irc(channel, nick + ":[OK] Done.")
|
|
|
|
|
|
elif command[0] == "$version":
|
|
irc.send_irc(channel, "This is MeowNexUS IRC " + __version__)
|
|
|
|
elif command[0] == "$figlet":
|
|
if not sbconfig.cflagexist(channel, "-multiline"):
|
|
if len(command) > 1:
|
|
try:
|
|
system(['figlet'] + command[1:], channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
|
else:
|
|
irc.send_irc(channel, nick + ": What text should I enlarge?")
|
|
else:
|
|
multiline("Multiline commands are disabled in this channel.", channel)
|
|
|
|
elif command[0] == "$tdfiglet":
|
|
if not sbconfig.cflagexist(channel, "-multiline"):
|
|
if len(command) > 1:
|
|
try:
|
|
system(['tdfiglet', '-cm', "-r"] + command[1:], channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
|
else:
|
|
irc.send_irc(channel, nick + ": What text should I enlarge?")
|
|
else:
|
|
multiline("Multiline commands are disabled in this channel.", channel)
|
|
|
|
elif command[0] == "$cowsay":
|
|
if not sbconfig.cflagexist(channel, "-multiline"):
|
|
if len(command) > 1:
|
|
try:
|
|
system(['/usr/games/cowsay'] + command[1:], channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
|
else:
|
|
irc.send_irc(channel, nick + ": What should the cow say?")
|
|
else:
|
|
multiline("Multiline commands are disabled in this channel.", channel)
|
|
|
|
elif command[0] == "$uptime":
|
|
try:
|
|
system(['uptime', '-p'], channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
|
|
|
elif command[0] == "$joke":
|
|
try:
|
|
system(['pyjoke'] + command[1:], channel)
|
|
except Exception as ex:
|
|
multiline(traceback.format_exc(), channel)
|
|
|
|
elif command[0] == "$botsnack":
|
|
multiline(f"\x01ACTION eats some {choice(snacks)}\x01", channel)
|
|
elif command[0] == "$hug":
|
|
name = nick if not len(command) > 1 else " ".join(command[1:])
|
|
multiline(f"\x01ACTION gives a hug to {name}\x01", channel)
|
|
|
|
|
|
elif command[0] == "$sed":
|
|
if len(command) == 2:
|
|
replased(" ".join(command[1:]))
|
|
else:
|
|
irc.send_irc(channel, nick + ": [???] This command takes only one argument.")
|
|
|
|
|
|
elif command[0] == "$config":
|
|
try:
|
|
if len(command) > 1:
|
|
if perms == "full":
|
|
abort = False
|
|
flag = sbconfig.chansettings(channel)
|
|
for i in command[1:]:
|
|
if i == "clear":
|
|
flag = ["+config"]
|
|
elif i[0] == "+":
|
|
if "-" + i[1:] in flag:
|
|
flag[flag.index("-" + i[1:])] = i
|
|
else:
|
|
flag.append(i)
|
|
elif i[0] == "-":
|
|
if "+" + i[1:] in flag:
|
|
flag[flag.index("+" + i[1:])] = i
|
|
else:
|
|
flag.append(i)
|
|
else:
|
|
multiline(nick + ": flag name should begin with + or -", channel)
|
|
abort = True
|
|
break
|
|
if not abort:
|
|
print(flag)
|
|
sbconfig.setchanconfig(channel, flag)
|
|
multiline(nick + ": Successfuly applied configuration: " + " ".join(flag), channel)
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied")
|
|
block+=1
|
|
run-=1
|
|
|
|
else:
|
|
multiline("Configuration for " + channel + ": " + " ".join(sbconfig.chansettings(channel)), channel)
|
|
except ex:
|
|
multiline(str(ex), channel)
|
|
|
|
else:
|
|
if ".." in command[0][:3]:
|
|
irc.send_irc(channel, nick + ": Nice try.")
|
|
block+=1
|
|
run-=1
|
|
|
|
else:
|
|
if path.isfile("cc/" + command[0][1:]):
|
|
try:
|
|
if sbconfig.cflagexist(channel, "-multiline") and "#require-multiline" in open("cc/" + command[0][1:]).read():
|
|
irc.send_irc(channel, "Multiline commands are disabled in this channel.")
|
|
else:
|
|
com = ['python3']
|
|
comm = command
|
|
com.append("cc/" + comm[0][1:])
|
|
com.append(nick)
|
|
com.append(perms)
|
|
comm.pop(0)
|
|
for i in comm:
|
|
com.append(i)
|
|
system(com, channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
|
elif path.isdir("cc/" + command[0][1:]):
|
|
irc.send_irc(channel, nick + ": Command list under cc/debug: " + ", ".join(listdir("cc/" + command[0][1:])) + ".")
|
|
else:
|
|
print(nick + ": Unrecognised command")
|
|
elif " ".join(command[0:3]).lower() == "i am meownexus":
|
|
irc.send_irc(channel, "No I am!")
|
|
elif " ".join(command[0:3]).lower() == "no i am!":
|
|
if ticker >= 5:
|
|
system(['figlet', 'NO I AM!'], channel)
|
|
ticker = 0
|
|
else:
|
|
irc.send_irc(channel, "No I am!")
|
|
ticker+=1
|
|
elif command[0] == ":3":
|
|
if not sbconfig.cflagexist(channel, "-colonthree"):
|
|
irc.send_irc(channel, ":3")
|
|
elif command[0] in emoticons:
|
|
if not sbconfig.cflagexist(channel, "-colonthree"):
|
|
irc.send_irc(channel, emoticons[command[0]])
|
|
elif ":3c" in command:
|
|
if sbconfig.cflagexist(channel, "+:3c") or not sbconfig.cflagexist(channel, "-colonthree"):
|
|
multiline(choice(threes), channel)
|
|
elif command[0][:2].lower() == "s/":
|
|
logs[channel] = logs[channel][:-1]
|
|
if sbconfig.cflagexist(channel, "+sed"):
|
|
replased(" ".join(command))
|
|
# try to parse and find a link
|
|
|
|
if sbconfig.cflagexist(channel, "+links"):
|
|
try:
|
|
for i in command:
|
|
parse = urlparse(i)
|
|
if parse.scheme in ["http", "https"]:
|
|
try:
|
|
if parse.hostname in ["youtube.com", "youtu.be", "www.youtube.com", "m.youtube.com", "youtube-nocookie.com"] and get_yt_id(i) != None:
|
|
try:
|
|
video_id = get_yt_id(i)
|
|
youtube = build('youtube', 'v3', developerKey=DEVELOPER_KEY)
|
|
request = youtube.videos().list(part='snippet,statistics', id=video_id)
|
|
details = request.execute()
|
|
title = details['items'][0]['snippet']['title']
|
|
channel_yt = details['items'][0]['snippet']['channelTitle']
|
|
views = details['items'][0]['statistics']['viewCount']
|
|
multiline(f"({nick}) [▶️ YouTube] {title} | Author: {channel_yt} | {views} Views", channel)
|
|
except Exception as ex:
|
|
multiline("(" + nick + ") [YouTube Error, is it a valid YouTube URL?]", channel)
|
|
print(traceback.format_exc())
|
|
else:
|
|
e = get(i, headers=headers, timeout=10)
|
|
header = e.headers
|
|
content_type = header.get('content-type').split(";")[0]
|
|
content_len = header.get('Content-length')
|
|
if content_type in allowedparse:
|
|
if e.ok:
|
|
soup = BeautifulSoup(e.text, 'html.parser')
|
|
multiline("(" + nick + ") " + (" ".join(soup.title.string.splitlines())[:256] if soup.title != None else "[No title provided]"), channel)
|
|
else:
|
|
multiline("(" + nick + ") [HTTP " + str(e.status_code) + "]", channel)
|
|
else:
|
|
multiline("(" + nick + ") [" + humanbytes(content_len) + " " + str(content_type) + "]", channel)
|
|
parsed += 1
|
|
except rex.SSLError as ex:
|
|
multiline("(" + nick + ") [SSL Error: " + str(ex.message) + "]", channel)
|
|
except Exception as ex:
|
|
multiline("(" + nick + ") [Request error: " + str(ex.message) + "]", channel)
|
|
except:
|
|
print(traceback.format_exc())
|
|
if sbconfig.cflagexist(channel, "+links") or sbconfig.cflagexist(channel, "+gemini"):
|
|
try:
|
|
for i in command:
|
|
parse = urlparse(i)
|
|
unparsed = i
|
|
while True:
|
|
if parse.scheme == "gemini":
|
|
contx = ssl.create_default_context()
|
|
contx.check_hostname = False
|
|
contx.verify_mode = ssl.CERT_NONE
|
|
gsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
gemsocket = contx.wrap_socket(gsocket, server_hostname=parse.hostname)
|
|
gemsocket.connect((parse.hostname, 1965))
|
|
gemsocket.send(bytes(unparsed + "\r\n", "UTF-8"))
|
|
received = ""
|
|
while True:
|
|
gemresponse = gemsocket.recv(2048)
|
|
if gemresponse:
|
|
received += gemresponse.decode()
|
|
else:
|
|
break
|
|
received = received.replace("\r", "")
|
|
firstline = True
|
|
title = False
|
|
redirected = False
|
|
for i in received.split("\n"):
|
|
if firstline:
|
|
if i.split(" ")[0][0] == "3":
|
|
redirected = True
|
|
parsedd = urlparse(i.split(" ")[1])
|
|
unparsed = i.split(" ")[1]
|
|
break
|
|
elif i.split(" ")[0][0] != "2":
|
|
title = True
|
|
multiline("(" + nick + f") [{i}]", channel)
|
|
break
|
|
else:
|
|
firstline = False
|
|
if i.split(" ")[1].split(";")[0] != "text/gemini":
|
|
typee = i.split(" ")[1]
|
|
multiline("(" + nick + f") [Non-Gemtext file: {typee}]", channel)
|
|
title = True
|
|
break
|
|
else:
|
|
try:
|
|
if i.strip()[0] == "#" and i.strip()[1] == " ":
|
|
print("Found title: " + i)
|
|
title = True
|
|
multiline("(" + nick + f") " + i.strip()[2:].strip(), channel)
|
|
break
|
|
except:
|
|
pass
|
|
if not title and not redirected:
|
|
multiline("(" + nick + ") [No title found]", channel)
|
|
if not redirected:
|
|
break
|
|
else:
|
|
break
|
|
except Exception as ex:
|
|
exc = str(ex)
|
|
print(traceback.format_exc())
|
|
multiline("(" + nick + f") [Request error: {exc}]", channel)
|
|
elif "JOIN" in text and "#nixsanctuary" in text:
|
|
nick = text.split(":")[1].split("!")[0]
|
|
if not "Meow" in nick:
|
|
irc.send_irc("##hiya", "hiya: " + nick + " has joined #nixsanctuary")
|
|
pass
|
|
|
|
except Exception as ex:
|
|
print(traceback.format_exc())
|