893 lines
43 KiB
Python
893 lines
43 KiB
Python
__version__ = "0.0.2 Funni update" + ". https://git.swee.codes/swee/MeowNex"
|
|
import socket
|
|
import subprocess
|
|
from time import sleep, time, ctime
|
|
from os import system as ossystem, path, environ, listdir, getcwd
|
|
import sqlite3
|
|
cwd = getcwd()
|
|
import re
|
|
from random import choice, randint as random, randrange
|
|
import traceback
|
|
import threading
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse, parse_qs
|
|
from requests import get, exceptions as rex
|
|
from bs4 import BeautifulSoup
|
|
from googleapiclient.discovery import build
|
|
from characterai import pycai
|
|
import uuid
|
|
import ssl
|
|
CAItoken = environ["CAItoken"]
|
|
CAImodel = environ["CAIchara"]
|
|
try:
|
|
cai = pycai.Client(CAItoken)
|
|
me = cai.get_me()
|
|
chats = []
|
|
except:
|
|
print(traceback.format_exc())
|
|
run = 0
|
|
block = 0
|
|
parsed = 0
|
|
lastquery = "None yet."
|
|
# Dashboard thread
|
|
from flask import Flask, send_file, abort as abrt, redirect as redir
|
|
app = Flask(__name__)
|
|
responses_ai = {} # {uuid: response}
|
|
|
|
@app.route('/')
|
|
def index_html():
|
|
return send_file("dashboard.html", mimetype='text/html')
|
|
@app.route('/run.txt')
|
|
def runs():
|
|
return str(run)
|
|
@app.route('/block.txt')
|
|
def blocks():
|
|
return str(block)
|
|
@app.route('/link.txt')
|
|
def parses():
|
|
return str(parsed)
|
|
@app.route('/script.js')
|
|
def script_js():
|
|
return send_file("script.js", mimetype='application/javascript')
|
|
@app.route('/lastquery.txt')
|
|
def lastquery_txt():
|
|
return str(lastquery)
|
|
@app.route('/docs/config')
|
|
def config_docs():
|
|
return redir("https://git.swee.codes/swee/MeowNex/src/branch/main/docs/config.md")
|
|
@app.route('/response/<uuidd>')
|
|
def ai_response_truncate(uuidd):
|
|
if uuidd in responses_ai:
|
|
res = responses_ai[uuidd]
|
|
return f'<title>AI response {uuidd}</title>\n<meta name="viewport" content="width=device-width, initial-scale=1.0">\n<link rel="stylesheet" href="https://swee.codes/style.css" type="text/css">\n<body><center><h1>AI response {uuidd}</h1><br><br><div class=code width="100%" style="word-wrap: break-word;white-space: pre;text-align: left;">' + res
|
|
else:
|
|
abrt(404)
|
|
threading.Thread(target=app.run, daemon=True, kwargs={"port": 2005}).start()
|
|
|
|
# YouTube API
|
|
DEVELOPER_KEY = environ["ytapi"]
|
|
headers = {
|
|
'User-Agent': 'MeowNexUS IRC ' + __version__
|
|
}
|
|
def get_yt_id(url):
|
|
query = urlparse(url)
|
|
# youtu.be already contains the ID in the path
|
|
if query.hostname == 'youtu.be': return query.path[1:]
|
|
if query.hostname in {'www.youtube.com', 'youtube.com', 'music.youtube.com'}:
|
|
# URLs that have the ID in the path instead of the query.
|
|
integrated_in_url = ["watch", "embed", "v", "shorts"]
|
|
try:
|
|
# The regular /watch path, which stores the ID in the query.
|
|
if query.path == '/watch': return parse_qs(query.query)['v'][0]
|
|
# Alternatively, it will get the ID in the path if the path was in the list above.
|
|
elif query.path.split('/')[1] in integrated_in_url: return query.path.split('/')[2]
|
|
except:
|
|
return None
|
|
class config:
|
|
def __init__(self):
|
|
self.conn = sqlite3.connect(environ["SBconfig"])
|
|
self.database = self.conn.cursor()
|
|
def perms(self, mask: str):
|
|
try:
|
|
self.database.execute(f"SELECT * FROM users;")
|
|
output = self.database.fetchall()
|
|
for i in output:
|
|
if re.match(i[0].replace("*", ".+"), mask):
|
|
return i[1]
|
|
return ''
|
|
except:
|
|
print(traceback.format_exc())
|
|
return ''
|
|
def chansettings(self, chan: str):
|
|
try:
|
|
self.database.execute("SELECT * FROM chans WHERE chan = ?;", [chan])
|
|
output = self.database.fetchall()
|
|
print(output)
|
|
temp = output[0][1].split(",")
|
|
return temp if temp != [''] else []
|
|
except:
|
|
print(traceback.format_exc())
|
|
return []
|
|
def setchanconfig(self, chan: str, flags: list):
|
|
try:
|
|
print(self.chansettings(chan))
|
|
if self.chansettings(chan) == []:
|
|
print("[!!!] Channel doesn't exist in config")
|
|
self.database.execute("INSERT INTO chans (chan, flags) values(?, ?);", [chan, "+config," + ",".join(flags)])
|
|
else:
|
|
self.database.execute("UPDATE chans SET FLAGS = ? WHERE chan = ?", [",".join(flags),chan])
|
|
self.conn.commit()
|
|
except:
|
|
print(traceback.format_exc())
|
|
return False
|
|
def cflagexist(self, chan:str, flag:str):
|
|
return flag in self.chansettings(chan)
|
|
|
|
|
|
# Code snippet from my buddy Irish
|
|
# Define a dictionary to map terminal color codes to IRC color codes
|
|
color_map = {
|
|
'\033[30m': '\x0301', # Black
|
|
'\033[31m': '\x0305', # Red
|
|
'\033[32m': '\x0303', # Green
|
|
'\033[33m': '\x0307', # Yellow
|
|
'\033[34m': '\x0302', # Blue
|
|
'\033[35m': '\x0306', # Magenta
|
|
'\033[36m': '\x0310', # Cyan
|
|
'\033[37m': '\x0315', # White
|
|
'\033[90m': '\x0314', # Bright Black (Gray)
|
|
'\033[91m': '\x0304', # Bright Red
|
|
'\033[92m': '\x0309', # Bright Green
|
|
'\033[93m': '\x0308', # Bright Yellow
|
|
'\033[94m': '\x0312', # Bright Blue
|
|
'\033[95m': '\x0313', # Bright Magenta
|
|
'\033[96m': '\x0311', # Bright Cyan
|
|
'\033[97m': '\x0316', # Bright White
|
|
'\033[0m': '\x03', # Reset
|
|
}
|
|
pattern = re.compile(r'\033\[\d+(;\d+)*m|\033\[\?25[lh]|\033\[\?7[lh]|\033\[\d+C|\033\[\d+A|\033\[\d+D|\033\[\d+B')
|
|
def humanbytes(B):
|
|
"""Return the given bytes as a human friendly KB, MB, GB, or TB string."""
|
|
B = float(B)
|
|
KB = float(1024)
|
|
MB = float(KB ** 2) # 1,048,576
|
|
GB = float(KB ** 3) # 1,073,741,824
|
|
TB = float(KB ** 4) # 1,099,511,627,776
|
|
|
|
if B < KB:
|
|
return '{0}{1}'.format(B,'Bytes' if 0 == B > 1 else 'Byte')
|
|
elif KB <= B < MB:
|
|
return '{0:.2f}KB'.format(B / KB)
|
|
elif MB <= B < GB:
|
|
return '{0:.2f}MB'.format(B / MB)
|
|
elif GB <= B < TB:
|
|
return '{0:.2f}GB'.format(B / GB)
|
|
elif TB <= B:
|
|
return '{0:.2f}TB'.format(B / TB)
|
|
def rplce(text, old, new, n):
|
|
parts = text.split(old)
|
|
if len(parts) <= n:
|
|
return text # Not enough occurrences to replace
|
|
return old.join(parts[:n]) + new + old.join(parts[n:])
|
|
def replace_color_codes(text):
|
|
def replacer(match):
|
|
code = match.group(0)
|
|
if code in color_map:
|
|
return color_map[code]
|
|
elif re.match(r'\033\[\d+C', code):
|
|
# Handle cursor move right (e.g., \033[30C)
|
|
spaces = int(re.findall(r'\d+', code)[0])
|
|
return ' ' * spaces
|
|
elif re.match(r'\033\[\d+A', code) or re.match(r'\033\[\d+B', code):
|
|
# Handle cursor move up (e.g., \033[17A) and down (e.g., \033[17B)
|
|
return ''
|
|
elif re.match(r'\033\[\d+D', code):
|
|
# Handle cursor move to the start of the line (e.g., \033[9999999D)
|
|
return ''
|
|
elif re.match(r'\033\[\?25[lh]', code):
|
|
# Handle cursor visibility (e.g., \033[?25l or \033[?25h)
|
|
return ''
|
|
elif re.match(r'\033\[\?7[lh]', code):
|
|
# Handle line wrapping (e.g., \033[?7l or \033[?7h)
|
|
return ''
|
|
else:
|
|
return ''
|
|
|
|
# Remove all control sequences
|
|
cleaned_text = pattern.sub(replacer, text)
|
|
|
|
# Split the text into lines and remove lines that contain control sequences or are empty
|
|
lines = cleaned_text.split('\n')
|
|
lines = [line for line in lines if line.strip() and not re.search(r'\033\[m', line)]
|
|
|
|
# Join the cleaned lines back into a single string
|
|
return '\n'.join(lines)
|
|
|
|
class bot_irc:
|
|
|
|
irc_socket = socket.socket()
|
|
|
|
def __init__(self):
|
|
self.irc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
def send_irc(self, channel, msg):
|
|
self.irc_socket.send(bytes("PRIVMSG " + channel + " :" + msg + "\n", "UTF-8"))
|
|
|
|
def connect_irc(self, server, port, channel, bot_nick, bot_pass, bot_nickpass):
|
|
print("Server connection: " + server)
|
|
self.irc_socket.connect((server, port))
|
|
|
|
self.irc_socket.send(
|
|
bytes(
|
|
"USER " + bot_nick + " " + bot_nick + " " + bot_nick + " :MeowNexUS, a very cool bot made by Swee :3\n",
|
|
"UTF-8",
|
|
)
|
|
)
|
|
self.irc_socket.send(bytes("NICK " + bot_nick + "\n", "UTF-8"))
|
|
self.irc_socket.send(
|
|
bytes("PASS " + bot_nickpass + ":" + bot_pass + "\n", "UTF-8")
|
|
)
|
|
for i in channel:
|
|
self.irc_socket.send(bytes("JOIN " + i + "\n", "UTF-8"))
|
|
|
|
def response_irc(self):
|
|
try:
|
|
r = self.irc_socket.recv(2040).decode()
|
|
if r.find("PING") != -1:
|
|
self.irc_socket.send(
|
|
bytes("PONG " + r.split()[1] + "\r\n", "UTF-8")
|
|
)
|
|
return r
|
|
except:
|
|
return ""
|
|
def ping(self):
|
|
try:
|
|
r = self.irc_socket.recv(2040).decode()
|
|
if r.find("PING") != -1:
|
|
self.irc_socket.send(
|
|
bytes("PONG " + r.split()[1] + "\r\n", "UTF-8")
|
|
)
|
|
except:
|
|
pass
|
|
|
|
|
|
server_irc = "127.0.0.1" # Use 127.0.0.1 for local ZNC
|
|
port_irc = 5000 # NO SSL FOR YOU
|
|
channel_irc = ["##sweezero"]
|
|
botnick_irc = environ.get('SBnick')
|
|
botnickpass_irc =environ.get('SBuser')
|
|
botpass_irc = environ.get('SBpass')
|
|
allowedparse = ["text/html", "application/x-httpd-php", "application/xhtml", "application/xhtml+xml"]
|
|
irc = bot_irc()
|
|
irc2 = bot_irc()
|
|
irc3 = bot_irc()
|
|
irc.connect_irc(
|
|
server_irc, port_irc, channel_irc, botnick_irc, botpass_irc, botnickpass_irc + "/A"
|
|
)
|
|
sbconfig = config()
|
|
threes = [":3", ":3c", "uwu", "owo", "/ᐠ。ꞈ。ᐟ\\", "(✿◠‿◠)"]
|
|
snacks = ["PepperMintHTMLs", "Dice App candies", "SweeCrypt codes", "Windows 11 Bloomberry flavored icecream"]
|
|
baps = ["T-T", "Ow!", "Beep!"]
|
|
pats = ["-w-", "Meep...", "Prr!"]
|
|
meows_happy = ['Meow!', 'Nyaa~', 'Mrow.', 'Prr! :3', "Mrrp?", "Mreow.", "!woeM", "3: !rrP", "~aayN", "Mew!", "Moew!"]
|
|
meows_upset = ['Hiss!', "!ssiH", "Grrr..."]
|
|
my_self = ["MeowNexUS", "MeowNexU5", "MeowN3xUS"]
|
|
logs = {} # {channel: [line, line]} max 10 lines
|
|
happiness = 5
|
|
global times
|
|
times = 0
|
|
ai_storage = {}
|
|
emoticons = {";3": "I think you forgot to press shift", ":#": "Wait no, don't hold shift THAT long!"}
|
|
def irci2():
|
|
irc2.connect_irc(
|
|
server_irc, port_irc, channel_irc, "sweeB0t", botpass_irc, botnickpass_irc + "/B"
|
|
)
|
|
while True:
|
|
irc2.ping()
|
|
def irci3():
|
|
irc3.connect_irc(
|
|
server_irc, port_irc, channel_irc, "swe3Bot", botpass_irc, botnickpass_irc + "/C"
|
|
)
|
|
while True:
|
|
irc3.ping()
|
|
def multiline(text, channel):
|
|
global times
|
|
if text.__class__ == bytes:
|
|
text = text.decode()
|
|
for t in text.split("\n"):
|
|
if t != "" and t != " ":
|
|
if times > 8:
|
|
if times == 9:
|
|
sleep(1)
|
|
irc3.send_irc(channel, replace_color_codes(t))
|
|
times +=1
|
|
if times == 13:
|
|
times = 0
|
|
elif times > 4:
|
|
if times == 5:
|
|
sleep(1)
|
|
irc2.send_irc(channel, replace_color_codes(t))
|
|
times +=1
|
|
else:
|
|
if times==0:
|
|
sleep(1)
|
|
irc.send_irc(channel, replace_color_codes(t))
|
|
times += 1
|
|
from sys import exit
|
|
def restart():
|
|
irc.irc_socket.close()
|
|
irc2.irc_socket.close()
|
|
irc3.irc_socket.close()
|
|
exit(0)
|
|
def system(cmd, chan, rstrip=False):
|
|
try:
|
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
outpp = ""
|
|
for line in iter(p.stdout.readline, b''):
|
|
if text != "" and text != " ":
|
|
if not rstrip:
|
|
multiline(line.rstrip(), chan)
|
|
else:
|
|
outpp += line.decode().rstrip() + " "
|
|
if rstrip:
|
|
multiline(outpp[:-1], chan)
|
|
p.stdout.close()
|
|
p.wait()
|
|
except FileNotFoundError:
|
|
multiline(cmd[0] + " not found", chan)
|
|
except:
|
|
multiline(traceback.format_exc(), chan)
|
|
irl2 = threading.Thread(target=irci2, daemon=True)
|
|
irl2.start()
|
|
irl3 = threading.Thread(target=irci3, daemon=True)
|
|
irl3.start()
|
|
while True:
|
|
teext = irc.response_irc().split("\r\n")
|
|
for text in teext:
|
|
if text != "":
|
|
try:
|
|
print(text)
|
|
except:
|
|
print("")
|
|
try:
|
|
command = text.split("PRIVMSG")[1].split(":")[1:]
|
|
command = ":".join(command).split(" ")
|
|
channel = text.split("PRIVMSG")[1].split(" ")[1]
|
|
nick = text.split(" ")[0][1:].split("!")[0]
|
|
username = text.split(" ")[0][1:].split("@")[1]
|
|
mask = text.split(" ")[0][1:]
|
|
try:
|
|
perms = sbconfig.perms(mask)
|
|
except:
|
|
perms = ""
|
|
print(command)
|
|
print(channel)
|
|
print(nick)
|
|
print(username)
|
|
print(perms)
|
|
cont = " ".join(command)
|
|
if not channel in logs:
|
|
logs[channel] = [{"nick": nick, "content": cont}]
|
|
else:
|
|
logs[channel].append({"nick": nick, "content": cont})
|
|
if len(logs[channel]) > 128: logs[channel] = logs[channel][::-1][:128][::-1]
|
|
#open("log-text-"+channel, "a").write(" ".join(command) + "\n")
|
|
#open("log-name-"+channel, "a").write(nick + "\n")
|
|
except:
|
|
print(traceback.format_exc())
|
|
def replased(expression):
|
|
if channel in logs:
|
|
try:
|
|
if expression.split("/")[0].lower() != "s": raise IndexError()
|
|
find = expression.split("/")[1]
|
|
replace = expression.split("/")[2]
|
|
user = None
|
|
repeat = False
|
|
index = 1
|
|
try:
|
|
extraflags = expression.split("/")[3:]
|
|
for i in extraflags:
|
|
if i.strip() == "":
|
|
pass
|
|
elif i.lower() == "g":
|
|
repeat = True
|
|
elif i.strip().isdigit():
|
|
print(f"index = {int(i)}")
|
|
index = int(i)
|
|
else:
|
|
print(f"user = {i.lower()}")
|
|
user = i.lower()
|
|
except: pass
|
|
cache = None
|
|
for i in logs[channel][::-1]:
|
|
if find in i["content"] and (user == None or i["nick"].lower() == user):
|
|
cache = i
|
|
break
|
|
if cache == None:
|
|
raise Exception("The specified text couldn't be found")
|
|
else:
|
|
if repeat:
|
|
cache["content"] = cache["content"].replace(find, replace)
|
|
else:
|
|
cache["content"] = rplce(cache["content"], find, replace, index)
|
|
cachenick = cache["nick"]
|
|
cachecontent = cache["content"]
|
|
multiline(f"[RPL] <{cachenick}> {cachecontent}", channel)
|
|
except IndexError:
|
|
irc.send_irc(channel, nick + ": [ERR] Sed expression might be incorrectly written!")
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": [ERR] " + str(ex))
|
|
else:
|
|
irc.send_irc(channel, nick + ": [!!!] No logs saved in " + channel)
|
|
try:
|
|
#if True:
|
|
if "PRIVMSG" in text and not nick in my_self:
|
|
if "PRIVMSG" in text and command[0][0] == "$":
|
|
logs[channel] = logs[channel][:-1]
|
|
run+=1
|
|
if command[0] == "$ping":
|
|
if random(1,2) == 1:
|
|
irc.send_irc(channel, nick + ": Pnog")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Pong")
|
|
|
|
elif command[0] == "$help":
|
|
if len(command) > 1:
|
|
if "." in " ".join(command[1:]):
|
|
irc.send_irc(channel, nick + ": Nice try")
|
|
block+=1
|
|
run-=1
|
|
|
|
else:
|
|
try:
|
|
if path.isfile(cwd + "/helps/" + " ".join(command[1:])):
|
|
output = open(cwd + "/helps/" + " ".join(command[1:])).read()
|
|
multiline(output, nick if sbconfig.cflagexist(channel, "-multiline") else channel)
|
|
else:
|
|
irc.send_irc(channel, "Either the command isn't documented yet or it doesn't exist lol")
|
|
except:
|
|
multiline(output, channel)
|
|
else:
|
|
irc.send_irc(channel, "tip, ping, whoami, perms, version, figlet, tdfiglet, cowsay, uptime, cc, joke, botsnack. (restart, join, part, shell, config, pull.) Use '$help (command)' for explanation.")
|
|
|
|
elif command[0] == "$tip":
|
|
system(["/usr/games/fortune", "debian-hints"], channel, True)
|
|
|
|
elif command[0] == "$whoami":
|
|
if perms != "":
|
|
irc.send_irc(channel, nick + ": " + mask + " (Your hostmask has permissions, use the 'perms' command to see)")
|
|
else:
|
|
irc.send_irc(channel, nick + ": " + mask)
|
|
|
|
elif command[0] == "$welcome":
|
|
name = nick if not len(command) > 1 else " ".join(command[1:])
|
|
if random(1,2) == 1:
|
|
irc.send_irc(channel, f"Heyo! Welcome to the channel, {name}")
|
|
else:
|
|
irc.send_irc(channel, f"Welcome, {name}")
|
|
|
|
elif command[0] == "$perms":
|
|
if len(command) == 2:
|
|
if sbconfig.perms(command[1]) != "":
|
|
irc.send_irc(channel, nick + ": permissions of hostmask " + command[1] + ": " + sbconfig.perms(command[1]))
|
|
else:
|
|
irc.send_irc(channel, nick + ": The hostmask wildcard " + command[1] + " Doesn't have permissions.")
|
|
else:
|
|
if perms != "":
|
|
irc.send_irc(channel, nick + ": " + perms)
|
|
else:
|
|
irc.send_irc(channel, nick + ": none")
|
|
|
|
elif command[0] == "$restart":
|
|
if perms == "full" or "reboot" in perms.split(",") or "restart" in perms.split(","):
|
|
irc.send_irc(channel, nick + ": Restarting bot...")
|
|
sleep(1)
|
|
restart()
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
|
|
elif command[0] == "$pull":
|
|
if perms == "full" or "git" in perms.split(","):
|
|
ossystem("git fetch")
|
|
ossystem("git pull")
|
|
irc.send_irc(channel, nick + ": Pulled from Git, restarting bot...")
|
|
restart()
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
|
|
elif command[0] == "$join":
|
|
if perms == "full" or "join" in perms.split(","):
|
|
if len(command) == 2:
|
|
irc.send_irc(channel, nick + ": Joining " + command[1])
|
|
irc.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
|
|
irc2.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
|
|
irc3.irc_socket.send(bytes("JOIN " + command[1] + "\n", "UTF-8"))
|
|
multiline(channel=command[1], text="A wild bot appears! ($join initiated by " + nick + ")")
|
|
else:
|
|
if len(command) == 1:
|
|
irc.send_irc(channel, nick + ": I NEED A CHANNEL NAME TO JOIN!!!")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Y'know this only uses one argument? JUST THE CHANNEL NAME!!!")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
|
|
elif command[0] == "$part":
|
|
if perms == "full" or "part" in perms.split(","):
|
|
if len(command) == 2:
|
|
irc.send_irc(channel, nick + ": Parting " + command[1])
|
|
irc.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
|
|
irc2.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
|
|
irc3.irc_socket.send(bytes("PART " + command[1] + "\n", "UTF-8"))
|
|
else:
|
|
if len(command) == 1:
|
|
irc.send_irc(channel, "Bye bye!")
|
|
sleep(1)
|
|
irc.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
|
|
irc2.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
|
|
irc3.irc_socket.send(bytes("PART " + channel + "\n", "UTF-8"))
|
|
else:
|
|
irc.send_irc(channel, nick + ": Y'know this only uses one or zero arguments?")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
elif command[0] == "$ai-chat":
|
|
try:
|
|
aiconnector = cai.connect()
|
|
chatquery = f"--Listen Up--\nYour happiness level is {happiness} (Maximum 10), your emotion level will impact the conversation's tone.\n--END Listen Up--\n--IRC recieve--\n<{nick}> " + " ".join(command[1:])
|
|
if channel in ai_storage:
|
|
aimsg = aiconnector.send_message(CAImodel, ai_storage[channel], chatquery)
|
|
else:
|
|
new, answer = aiconnector.new_chat(
|
|
CAImodel, me.id
|
|
)
|
|
aimsg = aiconnector.send_message(CAImodel, new.chat_id, chatquery)
|
|
ai_storage[channel] = new.chat_id
|
|
randomname = str(uuid.uuid4())
|
|
responses_ai[randomname] = aimsg.text
|
|
mtline = ""
|
|
lastquery = f"<p>Channel: {channel}</p><p>Request:</p><br><br><textarea class=code width=100% spellcheck=\"false\" autocapitalize=\"off\" autocomplete=\"off\" autocorrect=\"off\" readonly=\"true\">{chatquery}</textarea><hr><p>Response</p><br><br><textarea class=code width=100% spellcheck=\"false\" autocapitalize=\"off\" autocomplete=\"off\" autocorrect=\"off\" readonly=\"true\">{aimsg.text}</textarea>"
|
|
broke = False
|
|
for num, i in enumerate(aimsg.text.split("\n")):
|
|
if num == 5:
|
|
broke = True
|
|
break
|
|
elif len(i) > 256:
|
|
mtline += "\n" + i[:256] + "..."
|
|
broke = True
|
|
else:
|
|
mtline += "\n" + i
|
|
if broke:
|
|
mtline += f"\nView full response: https://irc-bot.swee.codes/response/{randomname}"
|
|
multiline(mtline, channel)
|
|
except:
|
|
multiline("Something went wrong while connecting to Character.AI.", channel)
|
|
print(traceback.format_exc())
|
|
|
|
elif command[0] == "$ai-clear":
|
|
if channel in ai_storage:
|
|
del ai_storage[channel]
|
|
multiline("Done.", channel)
|
|
else:
|
|
multiline(f"There is no existing AI conversation in {channel}", channel)
|
|
elif command[0] == "$socket":
|
|
if perms == "full":
|
|
if len(command) > 1:
|
|
irc.send_irc(channel, nick + ": Sending to socket...")
|
|
irc.irc_socket.send(bytes(" ".join(command[1:]) + "\n", "UTF-8"))
|
|
irc2.irc_socket.send(bytes(" ".join(command[1:]) + "\n", "UTF-8"))
|
|
irc3.irc_socket.send(bytes(" ".join(command[1:]) + "\n", "UTF-8"))
|
|
else:
|
|
irc.send_irc(channel, nick + ": what to send to the server?")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
|
|
elif command[0] == "$patpat" or command[0] == "$headpat":
|
|
if len(command) == 2:
|
|
if int(command[1]) < 50:
|
|
for i in range(0,int(command[1])):
|
|
happiness += (1 if happiness != 10 else 0)
|
|
else:
|
|
happiness = 10
|
|
elif len(command) == 1:
|
|
happiness += (1 if happiness != 10 else 0)
|
|
if len(command) != 2 or int(command[1]) < 50:
|
|
multiline(choice(pats) + " (Emotion: Upset " +
|
|
("#" * int(happiness)) +
|
|
("-" * int(10-happiness)) + " Happy)",
|
|
channel)
|
|
else:
|
|
multiline("Okay that's enough!!" + " (Emotion: Upset " +
|
|
("#" * int(happiness)) +
|
|
("-" * int(10-happiness)) + " Happy)",
|
|
channel)
|
|
|
|
elif command[0] == "$emotion":
|
|
multiline("Emotion: Upset " +
|
|
("#" * int(happiness)) +
|
|
("-" * int(10-happiness)) + " Happy",
|
|
channel)
|
|
elif command[0] == "$bap":
|
|
if len(command) == 2:
|
|
if int(command[1]) < 50:
|
|
for i in range(0,int(command[1])):
|
|
happiness -= (1 if happiness != 0 else 0)
|
|
else:
|
|
happiness = 0
|
|
elif len(command) == 1:
|
|
happiness -= (1 if happiness != 0 else 0)
|
|
if len(command) != 2 or int(command[1]) < 50:
|
|
multiline(choice(baps) + " (Emotion: Upset " +
|
|
("#" * int(happiness)) +
|
|
("-" * int(10-happiness)) + " Happy)",
|
|
channel)
|
|
else:
|
|
multiline("Okay that's enough!!" + " (Emotion: Upset " +
|
|
("#" * int(happiness)) +
|
|
("-" * int(10-happiness)) + " Happy)",
|
|
channel)
|
|
elif command[0] == "$meow":
|
|
if random(1,100) > (happiness*10):
|
|
multiline(choice(meows_upset), channel)
|
|
else:
|
|
multiline(choice(meows_happy), channel)
|
|
elif command[0] == "$stats":
|
|
multiline(f"{nick}: {run} command(s) run, {block} refused, and {parsed} link(s) parsed, check out https://irc-bot.swee.codes", channel)
|
|
elif command[0] == "$shell":
|
|
if perms == "full":
|
|
#or "shell" in perms.split(",")
|
|
if len(command) > 1:
|
|
try:
|
|
system(command[1:], channel)
|
|
except Exception as ex:
|
|
multiline(traceback.format_exc(), channel)
|
|
else:
|
|
irc.send_irc(channel, nick + ": What do you want me to F-ing run in the shell?")
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied.")
|
|
block+=1
|
|
run-=1
|
|
|
|
|
|
elif command[0] == "$version":
|
|
irc.send_irc(channel, "This is MeowNexUS IRC " + __version__)
|
|
|
|
elif command[0] == "$figlet":
|
|
if not sbconfig.cflagexist(channel, "-multiline"):
|
|
if len(command) > 1:
|
|
try:
|
|
system(['figlet'] + command[1:], channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
|
else:
|
|
irc.send_irc(channel, nick + ": What text should I enlarge?")
|
|
else:
|
|
multiline("Multiline commands are disabled in this channel.", channel)
|
|
|
|
elif command[0] == "$tdfiglet":
|
|
if not sbconfig.cflagexist(channel, "-multiline"):
|
|
if len(command) > 1:
|
|
try:
|
|
system(['tdfiglet', '-cm', "-r"] + command[1:], channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
|
else:
|
|
irc.send_irc(channel, nick + ": What text should I enlarge?")
|
|
else:
|
|
multiline("Multiline commands are disabled in this channel.", channel)
|
|
|
|
elif command[0] == "$cowsay":
|
|
if not sbconfig.cflagexist(channel, "-multiline"):
|
|
if len(command) > 1:
|
|
try:
|
|
system(['/usr/games/cowsay'] + command[1:], channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
|
else:
|
|
irc.send_irc(channel, nick + ": What should the cow say?")
|
|
else:
|
|
multiline("Multiline commands are disabled in this channel.", channel)
|
|
|
|
elif command[0] == "$uptime":
|
|
try:
|
|
system(['uptime', '-p'], channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
|
|
|
elif command[0] == "$joke":
|
|
try:
|
|
system(['pyjoke'] + command[1:], channel)
|
|
except Exception as ex:
|
|
multiline(traceback.format_exc(), channel)
|
|
|
|
elif command[0] == "$botsnack":
|
|
multiline(f"\x01ACTION eats some {choice(snacks)}\x01", channel)
|
|
elif command[0] == "$hug":
|
|
name = nick if not len(command) > 1 else " ".join(command[1:])
|
|
multiline(f"\x01ACTION gives a hug to {name}\x01", channel)
|
|
|
|
|
|
elif command[0] == "$sed":
|
|
if len(command) == 2:
|
|
replased(" ".join(command[1:]))
|
|
else:
|
|
irc.send_irc(channel, nick + ": [???] This command takes only one argument.")
|
|
|
|
|
|
elif command[0] == "$config":
|
|
try:
|
|
if len(command) > 1:
|
|
if perms == "full":
|
|
abort = False
|
|
flag = sbconfig.chansettings(channel)
|
|
for i in command[1:]:
|
|
if i == "clear":
|
|
flag = ["+config"]
|
|
elif i[0] == "+":
|
|
if "-" + i[1:] in flag:
|
|
flag[flag.index("-" + i[1:])] = i
|
|
else:
|
|
flag.append(i)
|
|
elif i[0] == "-":
|
|
if "+" + i[1:] in flag:
|
|
flag[flag.index("+" + i[1:])] = i
|
|
else:
|
|
flag.append(i)
|
|
else:
|
|
multiline(nick + ": flag name should begin with + or -", channel)
|
|
abort = True
|
|
break
|
|
if not abort:
|
|
print(flag)
|
|
sbconfig.setchanconfig(channel, flag)
|
|
multiline(nick + ": Successfuly applied configuration: " + " ".join(flag), channel)
|
|
else:
|
|
irc.send_irc(channel, nick + ": Permission denied")
|
|
block+=1
|
|
run-=1
|
|
|
|
else:
|
|
multiline("Configuration for " + channel + ": " + " ".join(sbconfig.chansettings(channel)), channel)
|
|
except ex:
|
|
multiline(str(ex), channel)
|
|
|
|
else:
|
|
if ".." in command[0][:3]:
|
|
irc.send_irc(channel, nick + ": Nice try.")
|
|
block+=1
|
|
run-=1
|
|
|
|
else:
|
|
if path.isfile("cc/" + command[0][1:]):
|
|
try:
|
|
if sbconfig.cflagexist(channel, "-multiline") and "#require-multiline" in open("cc/" + command[0][1:]).read():
|
|
irc.send_irc(channel, "Multiline commands are disabled in this channel.")
|
|
else:
|
|
com = ['python3']
|
|
comm = command
|
|
com.append("cc/" + comm[0][1:])
|
|
com.append(nick)
|
|
com.append(perms)
|
|
comm.pop(0)
|
|
for i in comm:
|
|
com.append(i)
|
|
system(com, channel)
|
|
except Exception as ex:
|
|
irc.send_irc(channel, nick + ": " + ex.__class__.__name__)
|
|
elif path.isdir("cc/" + command[0][1:]):
|
|
irc.send_irc(channel, nick + ": Command list under cc/debug: " + ", ".join(listdir("cc/" + command[0][1:])) + ".")
|
|
else:
|
|
print(nick + ": Unrecognised command")
|
|
|
|
|
|
elif command[0] == ":3":
|
|
if not sbconfig.cflagexist(channel, "-colonthree"):
|
|
irc.send_irc(channel, ":3")
|
|
elif command[0] in emoticons:
|
|
if not sbconfig.cflagexist(channel, "-colonthree"):
|
|
irc.send_irc(channel, emoticons[command[0]])
|
|
elif ":3c" in command:
|
|
if sbconfig.cflagexist(channel, "+:3c") or not sbconfig.cflagexist(channel, "-colonthree"):
|
|
multiline(choice(threes), channel)
|
|
elif command[0][:2].lower() == "s/":
|
|
logs[channel] = logs[channel][:-1]
|
|
if sbconfig.cflagexist(channel, "+sed"):
|
|
replased(" ".join(command))
|
|
# try to parse and find a link
|
|
|
|
if sbconfig.cflagexist(channel, "+links"):
|
|
try:
|
|
for i in command:
|
|
parse = urlparse(i)
|
|
if parse.scheme in ["http", "https"]:
|
|
try:
|
|
if parse.hostname in ["youtube.com", "youtu.be", "www.youtube.com", "m.youtube.com", "youtube-nocookie.com"] and get_yt_id(i) != None:
|
|
try:
|
|
video_id = get_yt_id(i)
|
|
youtube = build('youtube', 'v3', developerKey=DEVELOPER_KEY)
|
|
request = youtube.videos().list(part='snippet,statistics', id=video_id)
|
|
details = request.execute()
|
|
title = details['items'][0]['snippet']['title']
|
|
channel_yt = details['items'][0]['snippet']['channelTitle']
|
|
views = details['items'][0]['statistics']['viewCount']
|
|
multiline(f"({nick}) [▶️ YouTube] {title} | Author: {channel_yt} | {views} Views", channel)
|
|
except Exception as ex:
|
|
multiline("(" + nick + ") [YouTube Error, is it a valid YouTube URL?]", channel)
|
|
print(traceback.format_exc())
|
|
else:
|
|
e = get(i, headers=headers, timeout=10)
|
|
header = e.headers
|
|
content_type = header.get('content-type').split(";")[0]
|
|
content_len = header.get('Content-length')
|
|
if content_type in allowedparse:
|
|
if e.ok:
|
|
soup = BeautifulSoup(e.text, 'html.parser')
|
|
multiline("(" + nick + ") " + (" ".join(soup.title.string.splitlines())[:100] if soup.title != None else "[No title provided]"), channel)
|
|
else:
|
|
multiline("(" + nick + ") [HTTP " + str(e.status_code) + "]", channel)
|
|
else:
|
|
multiline("(" + nick + ") [" + humanbytes(content_len) + " " + str(content_type) + "]", channel)
|
|
parsed += 1
|
|
except rex.SSLError as ex:
|
|
multiline("(" + nick + ") [SSL Error: " + str(ex.message) + "]", channel)
|
|
except Exception as ex:
|
|
multiline("(" + nick + ") [Request error: " + str(ex.message) + "]", channel)
|
|
except:
|
|
print(traceback.format_exc())
|
|
if sbconfig.cflagexist(channel, "+links") or sbconfig.cflagexist(channel, "+gemini"):
|
|
try:
|
|
for i in command:
|
|
parse = urlparse(i)
|
|
if parse.scheme == "gemini":
|
|
gsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
gemsocket = ssl.wrap_socket(gsocket)
|
|
gemsocket.connect((parse.hostname, 1965))
|
|
wrappedSocket.send(i + "\r\n")
|
|
received = ""
|
|
while True:
|
|
gemresponse = wrappedSocket.recv(2048)
|
|
if gemresponse:
|
|
received += gemresponse.decode()
|
|
else:
|
|
break
|
|
received = received.replace("\r", "")
|
|
firstline = True
|
|
title = False
|
|
for i in received.split("\n"):
|
|
if firstline:
|
|
if i.split(" ")[0][0] != "2":
|
|
multiline("(" + nick + f") [{i}]", channel)
|
|
break
|
|
else:
|
|
firstline = False
|
|
else:
|
|
if i.split(" ")[0][0] == "#":
|
|
title = True
|
|
multiline("(" + nick + f") " + i[2:], channel)
|
|
break
|
|
if not title:
|
|
multiline("(" + nick + ") [No title found]", channel)
|
|
except Exception as ex:
|
|
exc = str(ex)
|
|
multiline("(" + nick + f") [Request error: {exc}]", channel)
|
|
elif "JOIN" in text and "#nixsanctuary" in text:
|
|
nick = text.split(":")[1].split("!")[0]
|
|
if not "Meow" in nick:
|
|
irc.send_irc("##hiya", "hiya: " + nick + " has joined #nixsanctuary")
|
|
pass
|
|
|
|
except Exception as ex:
|
|
print(traceback.format_exc())
|