#!/usr/bin/python3 import asyncio, traceback, socket, ssl, zipfile, uuid, io, os from OpenSSL import crypto from urllib.parse import urlparse, quote from flask import Flask, request, redirect, send_file, Response, make_response from hypercorn.config import Config from hypercorn.asyncio import serve from pathlib import Path homefolder = str(Path.home()) try: os.mkdir(homefolder + "/certs") except FileExistsError: pass except: print(traceback.format_exc()) app = Flask(__name__) @app.route("/robots.txt") def robots(): return Response("User-agent: *\nDisallow: /gem\nDisallow: /loadcert\nDisallow: /gencert.zip\nDisallow: /certload", mimetype="text/plain") @app.route("/") def root(): return send_file("home.html") @app.route("/external.png") def external(): return send_file("external.png") @app.route("/cross-server.png") def crosserver(): return send_file("cross-server.png") @app.route("/loadcert") def loadcert(): return send_file("loadcert.html") def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() == "pem" @app.route("/certload", methods=['GET', 'POST']) def loadcert_backend(): if request.method == 'POST': if 'cert' not in request.files: return "Invalid request. (cert is missing!)" if 'privkey' not in request.files: return "Invalid request. (privkey is missing!)" cert = request.files['cert'] privkey = request.files['privkey'] certt = cert.read() privkeyy = privkey.read() if cert.filename == '' or privkey.filename == '': return "Please upload a certificate and private key." if allowed_file(cert.filename) and allowed_file(privkey.filename): for i in os.listdir(homefolder + "/certs/"): thisname = "-".join(i.split("-")[:-1]) if (open(homefolder + "/certs/" + thisname + "-privkey.pem").read() == privkeyy.decode()) and (open(homefolder + "/certs/" + thisname + "-chain.pem").read() == certt.decode()): resp = make_response('The certificate seems to already exist, loading the used file.

Go home

') resp.set_cookie('certname',thisname) return resp random_name = str(uuid.uuid4()) open(homefolder + "/certs/" + random_name + "-chain.pem", 'wb').write(certt) open(homefolder + "/certs/" + random_name + "-privkey.pem", 'wb').write(privkeyy) resp = make_response('Success!

Go home

') resp.set_cookie('certname',random_name) return resp else: return "Both files must be a .pem file, you might want to generate a certificate via the home page." else: return "Cannot go to /certload with GET, perhaps you're looking for /loadcert" @app.route("/gencert.zip") def gencert(): random_name = str(uuid.uuid4()) k = crypto.PKey() k.generate_key(crypto.TYPE_RSA, 2048) cert = crypto.X509() cert.get_subject().C = "US" cert.get_subject().ST = "Earth" cert.get_subject().L = "Earth" cert.get_subject().O = random_name cert.get_subject().OU = random_name cert.get_subject().CN = "Gem2Browser user " + random_name cert.set_serial_number(1000) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(10*365*24*60*60) cert.set_issuer(cert.get_subject()) cert.set_pubkey(k) cert.sign(k, 'sha1') #open(homefolder + "/" + random_name + "-privkey.pem", "wb").write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k)) #open(homefolder + "/" + random_name + "-cert.pem", "wb").write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: for file_name, data in [('cert.pem', io.BytesIO(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))), ('privkey.pem', io.BytesIO(crypto.dump_privatekey(crypto.FILETYPE_PEM, k)))]: zip_file.writestr(file_name, data.getvalue()) open(homefolder + "/certs/" + random_name + "-chain.pem", "wb").write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) open(homefolder + "/certs/" + random_name + "-privkey.pem", "wb").write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k)) resp = make_response(zip_buffer.getvalue()) resp.set_cookie('certname',random_name) resp.mimetype = "application/zip" return resp @app.route("/style.css") def style(): return send_file("style.css") @app.route("/favicon.ico") def favicon(): return send_file("gem2browser.png") @app.route("/logo.png") def logo(): return send_file("gem2browser.png") @app.route("/gem") def relay(): print(request.headers.get('User-Agent')) url = request.args.get('gemini') queries = request.args.get('query') certfile = request.cookies.get('certname') if url == None: return redirect("/") code = "

Something went wrong...

\n" title = "Something went wrong..." print("GET " + "gemini://" + url + ("?" + queries if queries != None else "")) try: gsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) fulladdr = "gemini://" + url + ("?" + queries if queries != None else "") contx = ssl.create_default_context() contx.check_hostname = False contx.verify_mode = ssl.CERT_NONE if certfile: contx.load_cert_chain(certfile=homefolder + "/certs/" + certfile + "-chain.pem", keyfile=homefolder + "/certs/" + certfile + "-privkey.pem"); gemsocket = contx.wrap_socket(gsocket, server_hostname=urlparse(fulladdr).hostname) gemsocket.connect((urlparse(fulladdr).hostname, 1965)) gemsocket.send(bytes(fulladdr + "\r\n", "UTF-8")) received = "" mimetype = "" gemraw = bytearray() num = 0 ok = False binary = False while True: gemresponse = gemsocket.recv(2048) if len(gemresponse) != 0: for i in gemresponse: gemraw.append(i) else: break found = False gemcontent_binary = bytearray() mtype = bytearray() for i in bytes(gemraw): try: if not found: mtype.append(i) else: gemcontent_binary.append(i) if "\n" in bytes(mtype).decode() and not found: found = True ok = bytes(mtype).decode().split(" ")[0][0] == "2" mimetype = bytes(mtype).decode().replace("\r", "").split("\n")[0].split(" ")[1].split(";")[0] except: pass try: received = gemraw.decode().strip() except: binary = True print(traceback.format_exc()) if binary: return Response(bytes(gemcontent_binary), mimetype="text/gemini" if mimetype=="" else mimetype) received = received.replace("\r", "") firstline = True redirected = False gemtext = True code = "" escaped = False for i in received.split("\n"): i = i.replace("<", "<") if firstline: if i.split(" ")[0][0] == "3": if i.split(" ")[1][0] == "/": return redirect("/gem?gemini=" + urlparse(fulladdr).hostname + quote(i.split(" ")[1], safe='')) return redirect("/gem?gemini=" + quote(i.split(" ")[1][9:], safe='')) elif i.split(" ")[0][0] == "1": return f'\nInput required

Input required

The specified Gemini server wants more data:

{i}




' elif i.split(" ")[0][0] == "6": return f'\nCertificate requested

Certificate requested

The specified Gemini server wants a client certificate, or the certificate is invalid.

{i}



You can load a certificate in home page.

' elif i.split(" ")[0][0] != "2": return f'\nSomething went wrong...

Something went wrong...

The specified Gemini server returned a status of: {i}

' else: firstline = False if i.split(" ")[1].split(";")[0] != "text/gemini": print("Unrecognised type: " + i.split(" ")[1]) return Response(" ".join(received.split("\n")[1:]), mimetype=i.split(" ")[1]) else: if escaped: if i[0:3] == "```": code += "\n" escaped = False else: code += i + "\n" else: if i[0:2] == "# ": if title == "Something went wrong...": title = i[2:] temp = i[2:] code += f"

{temp}

\n" elif i[0:3] == "## ": temp = i[3:] code += f"

{temp}

\n" elif i[0:3] == "```": code += "
\n"
                        escaped = True
                    elif i[0:4] == "### ":
                        temp = i[4:]
                        code += f"

{temp}

\n" elif i[0:2] == "* ": temp = i[2:] code += f"\n" elif i[0:2] == "=>": temp = " ".join(i[2:].strip().replace(" ", " ").split(" ")) goto = temp.split(" ")[0] prse = urlparse(goto) extra = "" qury = "" extracomment = goto if prse.netloc == "" and prse.scheme == "": isdir = url[len(url) - 1] == "/" isabs = goto[0] == "/" or goto[0:2] == "//" isquery = goto[0] == "?" dubleslash = goto[0:2] == "//" if url == urlparse(fulladdr).hostname: isquery=False isabs=False isdir=True goto = f"/{goto}" if goto[0] != "/" else goto if isquery: tempurl = url qury = goto[1:] elif isabs: tempurl = urlparse(fulladdr).hostname + (goto[1:] if dubleslash else goto) elif isdir: tempurl = url + goto else: tempurl = "/".join(url.split("/")[:-1]) + "/" + goto tempurl = quote(tempurl, safe='') goto = f"/gem?gemini={tempurl}" elif prse.scheme != "gemini": extra = "" extracomment = f"This link points to an address that isn't gemini ({prse.scheme})" else: if prse.hostname != urlparse(fulladdr).hostname: extra = "" extracomment = f"This link points to an address that isn't from the server you're currently connecting to ({prse.hostname})" goto = goto.replace("gemini://", "") goto = quote(goto, safe='') goto = f"/gem?gemini={goto}" if temp.split(" ") == 1: comment = goto else: comment = " ".join(temp.split(" ")[1:]) if qury != "": qury = "&query=" + quote(qury, safe='') code += f"

{comment} {extra}

\n" elif i[0:2] == "> ": code += f"

{i}

\n" else: code += f"

{i}

\n" if title == "Something went wrong...": title = "gemini://" + url except: code += "
" + traceback.format_exc() + "
" return f'\n{title}{code}' # Run the Hypercorn ASGI server conf = Config() conf.bind = ["0.0.0.0:2009", "[::]:2009"] asyncio.run(serve(app, conf))