240 lines
No EOL
12 KiB
Python
240 lines
No EOL
12 KiB
Python
#!/usr/bin/python3
|
|
import asyncio, traceback, socket, ssl, zipfile, random, string, io
|
|
from OpenSSL import crypto
|
|
from urllib.parse import urlparse, quote
|
|
from flask import Flask, request, redirect, send_file, Response, make_response
|
|
from hypercorn.config import Config
|
|
from hypercorn.asyncio import serve
|
|
from pathlib import Path
|
|
homefolder = str(Path.home())
|
|
app = Flask(__name__)
|
|
@app.route("/robots.txt")
|
|
def robots():
|
|
return Response("User-agent: *\nDisallow: /", mimetype="text/plain")
|
|
@app.route("/")
|
|
def root():
|
|
return send_file("home.html")
|
|
@app.route("/external.png")
|
|
def external():
|
|
return send_file("external.png")
|
|
@app.route("/cross-server.png")
|
|
def crosserver():
|
|
return send_file("cross-server.png")
|
|
@app.route("/loadcert")
|
|
def loadcert():
|
|
return send_file("loadcert.html")
|
|
def allowed_file(filename):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() == "pem"
|
|
@app.route("/certload", methods=['GET', 'POST'])
|
|
def loadcert():
|
|
if request.method == 'POST':
|
|
if 'cert' not in request.files:
|
|
return "Invalid request. (cert is missing!)"
|
|
if 'privkey' not in request.files:
|
|
return "Invalid request. (privkey is missing!)"
|
|
cert = request.files['cert']
|
|
privkey = request.files['privkey']
|
|
if cert.filename == '' or privkey.filename == '':
|
|
return "Please upload a certificate and private key."
|
|
if allowed_file(cert.filename) and allowed_file(privkey.filename):
|
|
random_name = ''.join(random.choice(string.ascii_lowercase+string.digits+string.ascii_uppercase) for i in range(8))
|
|
cert.save(homefolder + "/certs/" + randomname + "-chain.pem")
|
|
privkey.save(homefolder + "/certs/" + randomname + "-privkey.pem")
|
|
resp = make_response("<meta http-equiv=\"refresh\" content=\"0; url=//\" />Success!")
|
|
resp.set_cookie('certname',random_name)
|
|
return resp
|
|
else:
|
|
return "Both files must be a .pem file, you might want to generate a certificate via the home page."
|
|
else:
|
|
return "Cannot go to /certload with GET, perhaps you're looking for /loadcert"
|
|
@app.route("/gencert.zip")
|
|
def gencert():
|
|
random_name = ''.join(random.choice(string.ascii_lowercase+string.digits+string.ascii_uppercase) for i in range(8))
|
|
k = crypto.PKey()
|
|
k.generate_key(crypto.TYPE_RSA, 1024)
|
|
cert = crypto.X509()
|
|
cert.get_subject().C = "US"
|
|
cert.get_subject().ST = "Earth"
|
|
cert.get_subject().L = "Earth"
|
|
cert.get_subject().O = random_name
|
|
cert.get_subject().OU = random_name
|
|
cert.get_subject().CN = "g2b.swee.codes"
|
|
cert.set_serial_number(1000)
|
|
cert.gmtime_adj_notBefore(0)
|
|
cert.gmtime_adj_notAfter(10*365*24*60*60)
|
|
cert.set_issuer(cert.get_subject())
|
|
cert.set_pubkey(k)
|
|
cert.sign(k, 'sha1')
|
|
#open(homefolder + "/" + random_name + "-privkey.pem", "wb").write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
|
|
#open(homefolder + "/" + random_name + "-cert.pem", "wb").write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
|
|
zip_buffer = io.BytesIO()
|
|
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
|
|
for file_name, data in [('cert.pem', io.BytesIO(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))),
|
|
('privkey.pem', io.BytesIO(crypto.dump_privatekey(crypto.FILETYPE_PEM, k)))]:
|
|
zip_file.writestr(file_name, data.getvalue())
|
|
open(homefolder + "/certs/" + random_name + "-chain.pem", "wb").write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
|
|
open(homefolder + "/certs/" + random_name + "-privkey.pem", "wb").write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
|
|
resp = make_response(zip_buffer.getvalue())
|
|
resp.set_cookie('certname',random_name)
|
|
resp.mimetype = "application/zip"
|
|
return resp
|
|
@app.route("/style.css")
|
|
def style():
|
|
return send_file("style.css")
|
|
@app.route("/logo.png")
|
|
def logo():
|
|
return send_file("gem2browser.png")
|
|
@app.route("/gem")
|
|
def relay():
|
|
print(request.headers.get('User-Agent'))
|
|
url = request.args.get('gemini')
|
|
queries = request.args.get('query')
|
|
certfile = request.cookies.get('certname')
|
|
if url == None:
|
|
return redirect("/")
|
|
code = "<h1>Something went wrong...</h1>\n"
|
|
title = "Something went wrong..."
|
|
print("GET " + "gemini://" + url + ("?" + queries if queries != None else ""))
|
|
try:
|
|
gsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
fulladdr = "gemini://" + url + ("?" + queries if queries != None else "")
|
|
contx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
|
if certfile:
|
|
contx.load_cert_chain(certfile=homefolder + "/certs/" + certfile + "-chain.pem", keyfile=homefolder + "/certs/" + certfile + "-privkey.pem");
|
|
context.load_verify_locations(cafile=ssl.CERT_AU);
|
|
context.verify_mode = ssl.CERT_OPTIONAL
|
|
contx.check_hostname = False;
|
|
gemsocket = contx.wrap_socket(gsocket)
|
|
gemsocket.connect((urlparse(fulladdr).hostname, 1965))
|
|
gemsocket.send(bytes(fulladdr + "\r\n", "UTF-8"))
|
|
received = ""
|
|
mimetype = ""
|
|
gemraw = bytearray()
|
|
num = 0
|
|
ok = False
|
|
binary = False
|
|
while True:
|
|
gemresponse = gemsocket.recv(2048)
|
|
if len(gemresponse) != 0:
|
|
for i in gemresponse:
|
|
gemraw.append(i)
|
|
else:
|
|
break
|
|
found = False
|
|
gemcontent_binary = bytearray()
|
|
mtype = bytearray()
|
|
for i in bytes(gemraw):
|
|
try:
|
|
if not found:
|
|
mtype.append(i)
|
|
else:
|
|
gemcontent_binary.append(i)
|
|
if "\n" in bytes(mtype).decode() and not found:
|
|
found = True
|
|
ok = bytes(mtype).decode().split(" ")[0][0] == "2"
|
|
mimetype = bytes(mtype).decode().replace("\r", "").split("\n")[0].split(" ")[1].split(";")[0]
|
|
except:
|
|
pass
|
|
try:
|
|
received = gemraw.decode().strip()
|
|
except:
|
|
binary = True
|
|
print(traceback.format_exc())
|
|
if binary:
|
|
return Response(bytes(gemcontent_binary), mimetype="text/gemini" if mimetype=="" else mimetype)
|
|
received = received.replace("\r", "")
|
|
firstline = True
|
|
redirected = False
|
|
gemtext = True
|
|
code = ""
|
|
escaped = False
|
|
for i in received.split("\n"):
|
|
i = i.replace("<", "<")
|
|
if firstline:
|
|
if i.split(" ")[0][0] == "3":
|
|
if i.split(" ")[1][0] == "/":
|
|
return redirect("/gem?gemini=" + urlparse(fulladdr).hostname + quote(i.split(" ")[1], safe=''))
|
|
return redirect("/gem?gemini=" + quote(i.split(" ")[1][9:], safe=''))
|
|
elif i.split(" ")[0][0] == "1":
|
|
return f'<!DOCTYPE html>\n<html><head><meta charset="UTF-8"><link rel="stylesheet" href="/style.css"><title>Input required</title></head><body><h1>Input required</h1><p>The specified Gemini server wants more data: <pre>{i}</pre></p><form action="/gem"><input hidden class="input" value="{url}" type="text" name="gemini"><input class="input" type="text" name="query"><br><input type="submit" class="go" value="Go!"><br><br></form></body></html>'
|
|
elif i.split(" ")[0][0] == "6":
|
|
return f'<!DOCTYPE html>\n<html><head><meta charset="UTF-8"><link rel="stylesheet" href="/style.css"><title>Certificate requested</title></head><body><h1>Certificate requested</h1><p>The specified Gemini server wants a client certificate, or the certificate is invalid. <pre>{i}</pre></p><br><br><p>You can load a certificate in <a href="/">home page.</a></p></body></html>'
|
|
elif i.split(" ")[0][0] != "2":
|
|
return f'<!DOCTYPE html>\n<html><head><meta charset="UTF-8"><link rel="stylesheet" href="/style.css"><title>Something went wrong...</title></head><body><h1>Something went wrong...</h1><p>The specified Gemini server returned a status of: {i}</p></body></html>'
|
|
else:
|
|
firstline = False
|
|
if i.split(" ")[1].split(";")[0] != "text/gemini":
|
|
print("Unrecognised type: " + i.split(" ")[1])
|
|
return Response(" ".join(received.split("\n")[1:]), mimetype=i.split(" ")[1])
|
|
else:
|
|
if escaped:
|
|
if i[0:3] == "```":
|
|
code += "</pre>\n"
|
|
escaped = False
|
|
else:
|
|
code += i + "\n"
|
|
else:
|
|
if i[0:2] == "# ":
|
|
if title == "Something went wrong...":
|
|
title = i[2:]
|
|
temp = i[2:]
|
|
code += f"<h1>{temp}</h1>\n"
|
|
elif i[0:3] == "## ":
|
|
temp = i[3:]
|
|
code += f"<h2>{temp}</h2>\n"
|
|
elif i[0:3] == "```":
|
|
code += "<pre>\n"
|
|
escaped = True
|
|
elif i[0:4] == "### ":
|
|
temp = i[4:]
|
|
code += f"<h3>{temp}</h3>\n"
|
|
elif i[0:2] == "* ":
|
|
temp = i[2:]
|
|
code += f"<ul><li>{temp}</li></ul>\n"
|
|
elif i[0:2] == "=>":
|
|
temp = " ".join(i[2:].strip().replace(" ", " ").split(" "))
|
|
goto = temp.split(" ")[0]
|
|
prse = urlparse(goto)
|
|
extra = ""
|
|
extracomment = goto
|
|
if prse.netloc == "" and prse.scheme == "":
|
|
isdir = url[len(url) - 1] == "/"
|
|
isabs = goto[0] == "/" or goto[0:2] == "//"
|
|
dubleslash = goto[0:2] == "//"
|
|
if isabs:
|
|
tempurl = urlparse(fulladdr).hostname + (goto[1:] if dubleslash else goto)
|
|
elif isdir:
|
|
tempurl = url + goto
|
|
else:
|
|
tempurl = "/".join(url.split("/")[:-1]) + "/" + goto
|
|
tempurl = quote(tempurl, safe='')
|
|
goto = f"/gem?gemini={tempurl}"
|
|
elif prse.scheme != "gemini":
|
|
extra = "<img height=\"19\" src=\"/external.png\">"
|
|
extracomment = f"This link points to an address that isn't gemini ({prse.scheme})"
|
|
else:
|
|
if prse.hostname != urlparse(fulladdr).hostname:
|
|
extra = "<img height=\"19\" src=\"/cross-server.png\">"
|
|
extracomment = f"This link points to an address that isn't from the server you're currently connecting to ({prse.hostname})"
|
|
goto = goto.replace("gemini://", "")
|
|
goto = quote(goto, safe='')
|
|
goto = f"/gem?gemini={goto}"
|
|
if temp.split(" ") == 1:
|
|
comment = goto
|
|
else:
|
|
comment = " ".join(temp.split(" ")[1:])
|
|
code += f"<p title=\"{extracomment}\"><a href=\"{goto}\">{comment} {extra}</a></p>\n"
|
|
elif i[0:2] == "> ":
|
|
code += f"<p class='greentext'>{i}</p>\n"
|
|
else:
|
|
code += f"<p>{i}</p>\n"
|
|
if title == "Something went wrong...":
|
|
title = "gemini://" + url
|
|
except:
|
|
code += "<pre>" + traceback.format_exc() + "</pre>"
|
|
return f'<!DOCTYPE html>\n<html><head><meta charset="UTF-8"><link rel="stylesheet" href="/style.css"><title>{title}</title></head><body>{code}</body></html>'
|
|
|
|
# Run the Hypercorn ASGI server
|
|
conf = Config()
|
|
conf.bind = "0.0.0.0:2009"
|
|
asyncio.run(serve(app, conf)) |