diff --git a/sweebot.py b/sweebot.py index b28a759..5ebeeb2 100644 --- a/sweebot.py +++ b/sweebot.py @@ -10,6 +10,7 @@ from random import choice, randint as random, randrange import traceback import threading from pathlib import Path +from urllib.parse import urlparse, parse_qs from requests import get, exceptions as rex from bs4 import BeautifulSoup from googleapiclient.discovery import build @@ -38,11 +39,25 @@ def script_js(): return send_file("script.js", mimetype='application/javascript') threading.Thread(target=app.run, daemon=True, kwargs={"port": 2005}).start() - +# YouTube API DEVELOPER_KEY = environ["ytapi"] headers = { 'User-Agent': 'SweeBot IRC ' + __version__ } +def get_yt_id(url): + query = urlparse(url) + # youtu.be already contains the ID in the path + if query.hostname == 'youtu.be': return query.path[1:] + if query.hostname in {'www.youtube.com', 'youtube.com', 'music.youtube.com'}: + # URLs that have the ID in the path instead of the query. + integrated_in_url = ["watch", "embed", "v", "shorts"] + try: + # The regular /watch path, which stores the ID in the query. + if query.path == '/watch': return parse_qs(query.query)['v'][0] + # Alternatively, it will get the ID in the path if the path was in the list above. + elif query.path.split('/')[1] in integrated_in_url: return query.path.split('/')[2] + except: + return None class config: def __init__(self): self.conn = sqlite3.connect(environ["SBconfig"]) @@ -649,39 +664,39 @@ while True: if sbconfig.cflagexist(channel, "+links"): try: for i in command: - if i[:8] == "https://": + parse = urlparse(i) + if parse.scheme in ["http", "https"]: try: - e = get(i, headers=headers, timeout=10) - header = e.headers - content_type = header.get('content-type').split(";")[0] - content_len = header.get('Content-length') - if content_type in allowedparse: - if e.ok: - soup = BeautifulSoup(e.text, 'html.parser') - multiline("(" + nick + ") " + (" ".join(soup.title.string.splitlines())[:100] if soup.title != None else "[No title provided]"), channel) + try: + if parse.hostname in ["youtube.com", "youtu.be", "www.youtube.com", "m.youtube.com", "youtube-nocookie.com"]: + video_id = get_yt_id(i) + youtube = build('youtube', 'v3', developerKey=DEVELOPER_KEY) + request = youtube.videos().list(part='snippet,statistics', id=video_id) + details = request.execute() + title = details['items'][0]['snippet']['title'] + channel = details['items'][0]['snippet']['channelTitle'] + views = details['items'][0]['statistics']['viewCount'] + multiline("(" + nick + ") [▶️ YouTube] {title} | Author: {channel} | {views} views", channel) else: - multiline("(" + nick + ") [HTTP " + str(e.status_code) + "]", channel) - else: - multiline("(" + nick + ") [" + humanbytes(content_len) + " " + str(content_type) + "]", channel) - parsed += 1 + raise Exception("No special URL, go ahead and parse the normal title...") + except: + e = get(i, headers=headers, timeout=10) + header = e.headers + content_type = header.get('content-type').split(";")[0] + content_len = header.get('Content-length') + if content_type in allowedparse: + if e.ok: + soup = BeautifulSoup(e.text, 'html.parser') + multiline("(" + nick + ") " + (" ".join(soup.title.string.splitlines())[:100] if soup.title != None else "[No title provided]"), channel) + else: + multiline("(" + nick + ") [HTTP " + str(e.status_code) + "]", channel) + else: + multiline("(" + nick + ") [" + humanbytes(content_len) + " " + str(content_type) + "]", channel) + parsed += 1 except rex.SSLError as ex: multiline("(" + nick + ") [SSL Error: " + str(ex.message) + "]", channel) except Exception as ex: multiline("(" + nick + ") [Request error: " + str(ex.message) + "]", channel) - elif i[:7] == "http://": - e = get(i, headers=headers, timeout=10) - header = e.headers - content_type = header.get('content-type').split(";")[0] - content_len = header.get('Content-length') - if content_type in allowedparse: - if e.ok: - soup = BeautifulSoup(e.text, 'html.parser') - multiline("(" + nick + ") " + (" ".join(soup.title.string.splitlines())[:100] if soup.title != None else "[No title provided]"), channel) - else: - multiline("(" + nick + ") [HTTP " + str(e.status_code) + "]", channel) - else: - multiline("(" + nick + ") [" + humanbytes(content_len) + " " + str(content_type) + "]", channel) - parsed += 1 except: print(traceback.format_exc()) elif "JOIN" in text and "#nixsanctuary" in text: