Newer
Older
#!/usr/bin/env python3
from waitress import serve #Used as webserver (Production)
from flask import Flask, request, render_template, redirect, abort, Markup, session, make_response #Used to prepare the dynamic pages (The main site)
import sqlite3 #Used to store the Data
import os #Used for getting the enviorement variables
import qrcode #Used to generate the QR
import base64 #Used to encode the generated QR as base64, to directly insert it into the HTML
from requests import post, get #Used to validate recaptcha / oauth
from io import BytesIO #Needed for base64 encoding of the image
from PIL import Image #Needed for QR generation
from flask_github import GitHub #github oauth library
import json #used for github oauth
domain_to_index = {}
try:
domain = os.environ["domains"].split(";") #Get the domains from the enviorement variable. If no enviorement variable is set set it to 127.0.0.1:5000 (for testing)
except:
domain = ["127.0.0.1:5000"]
try:
if(os.environ["show_build_date"] == "1"): #If you want to see the builddate you can enable this enviorement variable
builddate = open("builddate.txt", "r").read()
builddate = "" #If the enviorement Variable is not set also skip the builddate
try:
if(os.environ["show_version"] == "1"): #If you want to see the builddate you can enable this enviorement variable
version = open("VERSION", "r").read()
version = "" #If the enviorement Variable is not set also skip the version
recaptchaPrivateKey = os.environ["recaptcha_private"] #Get the recaptcha keys, if not set set skipRecaptcha to true to skip the check. If the publicKey is not set the user also will not get the code needed for recaptcha delivered in the page.
if(recaptchaPrivateKey != "") and (recaptchaPublicKey != ""): #If the variables are empty also skip the captcha
skipCaptcha = False
else:
skipCaptcha = True
try:
if(os.environ["production"] == "1"): #If you use this in production, please set this to 1, because the Flask Testserver is not very secure (e.g. shows error on Website)
production = True
else:
production = False
except:
production = False
try: #If you use https with a proxy afterwards you can set this to https and internal redirects will be https not http. This is to prevent bugs with modern browsers, bacause they block http content if the main page is loaded via https.
url_scheme = os.environ["url_scheme"]
except:
url_scheme = "http"
try:
host=os.environ["host"]
except:
host="127.0.0.1"
try:
app.config['GITHUB_CLIENT_ID'] = os.environ['GITHUB_CLIENT_ID']
app.config['GITHUB_CLIENT_SECRET'] = os.environ['GITHUB_CLIENT_SECRET']
except:
print("github client id sor client secret is not set, please set these and run again.")
exit()
github = GitHub(app)
for domains in domain: #Make from every domnain a entry for the select box later
domain_prepared = domain_prepared + '<option value="' + str(domains) + '">' + str(domains) + '</option>'
domain_to_index[domains] = str(index)
index = index + 1
if(index > 1):
showDomainSelect=True #Show only domain select, if there are more than one available
else:
domain_prepared = domain[0]
def table_check(): #This function is used on start to make a new Database if not already exists.
create_table = """
CREATE TABLE WEB_URL(
LONG_URL TEXT NOT NULL,
SHORT_URL TEXT NOT NULL,
USERNAME TEXT
);
"""
with sqlite3.connect('db/urls.db') as conn:
cursor = conn.cursor()
try: #Try making the database structure, if fails Database was already created.
cursor.execute(create_table)
except sqlite3.OperationalError:
pass
def makeQR(text): #This function is used to create a QR code and encode it base64, if you make a new shortlink
qr = qrcode.QRCode( #QR generation variables
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=1,
)
qr.add_data(text) #The URL is in the text variable
qr.make(fit=True) #Generate the QR
img = qr.make_image(fill_color="black", back_color="white") #Encode the WR as base 64
with BytesIO() as buffer:
img.save(buffer, 'jpeg')
return base64.b64encode(buffer.getvalue()).decode()
def grecaptcha_verify(request): #This function is used to verify the google recaptcha code, that is send to the server after submitting a new link
captcha_rs = request.form.get('g-recaptcha-response')
url = "https://www.google.com/recaptcha/api/siteverify" #The baseurl
headers = {'User-Agent': 'DebuguearApi-Browser',} #Useragent doesn't matters, but is set here
params = {'secret': recaptchaPrivateKey, 'response': captcha_rs} #As paramtere we send to google our private Key and the key from the user
verify_rs = post(url,params, headers=headers) #Send a post request with the parameters from before to googlde
response = verify_rs.get("success", False) #Verify that the response includes the success, if so return True, if not return False
try:
userID = request.cookies.get('userID')
loginbar = "Hello " + request.cookies.get('username') + ' (<a href="/user/logout" style="color:white">logout</a>)'
except:
userID = "null"
loginbar = '<a href="/user/login" style="color:white">login</a>'
return render_template('home.html', builddate=builddate, version=version, domain=domain_prepared, recaptchaPublicKey=recaptchaPublicKey, showDomainSelect=showDomainSelect, loginbar=loginbar) #return the default site to create a new shorten link
@app.route('/', methods=['POST']) #This function is used to create a new url
def home_post():
if not grecaptcha_verify(request) and not skipCaptcha:
return render_template('home.html', builddate=builddate, version=version, domain=domain_prepared, snackbar="There was an error validating, that you are a human, please try again.", long_url_prefilled=request.form.get('url'), short_url_prefilled=request.form.get('short').lower(), domain_prefilled=domain_to_index[request.form.get('domain')], recaptchaPublicKey=recaptchaPublicKey, showDomainSelect=showDomainSelect, loginbar=loginbar) #return the user the prefilled form with an error message, because no url to short was provided
try:
userID = request.cookies.get('userID')
loginbar = "Hello " + request.cookies.get('username') + ' (<a href="/user/logout" style="color:white">logout</a>)'
loginbar = '<a href="/user/login" style="color:white">login</a>'
return render_template('home.html', builddate=builddate, version=version, domain=domain_prepared, snackbar="Please enter a url to short, before submitting this form", long_url_prefilled=request.form.get('url'), short_url_prefilled=request.form.get('short').lower(), domain_prefilled=domain_to_index[request.form.get('domain')], recaptchaPublicKey=recaptchaPublicKey, showDomainSelect=showDomainSelect, loginbar=loginbar) #return the user the prefilled form with an error message, because no url to short was provided
return render_template('home.html', builddate=builddate, version=version, domain=domain_prepared, snackbar="Please enter a short name, before submitting this form", long_url_prefilled=request.form.get('url'), short_url_prefilled=request.form.get('short').lower(), domain_prefilled=domain_to_index[request.form.get('domain')], recaptchaPublicKey=recaptchaPublicKey, showDomainSelect=showDomainSelect, loginbar=loginbar) #return the user the prefilled form with an error message, because no short link was provided
shorturl = (request.form.get('domain') + "/" + request.form.get('short')).lower()
url = request.form.get('url')
with sqlite3.connect('db/urls.db') as conn: #Check if another user already used the short link
cursor = conn.cursor()
res = cursor.execute('SELECT LONG_URL FROM WEB_URL WHERE SHORT_URL=?', [shorturl])
try:
short = res.fetchone()
already_used = False
if short is not None:
already_used = True
except:
pass
if not already_used: #If short link wasn't used before, insert the link in the Database.
res = cursor.execute(
'INSERT INTO WEB_URL (LONG_URL, SHORT_URL, USERNAME) VALUES (?, ?, ?)',
[url, shorturl, userID]
return render_template('home.html', short_url=shorturl, recaptchaPublicKey=recaptchaPublicKey, builddate=builddate, version=version, domain=domain_prepared, qrcode=makeQR("http://" + shorturl), loginbar=loginbar) #return the shorten link to the user
return render_template('home.html', builddate=builddate, version=version, recaptchaPublicKey=recaptchaPublicKey, domain=domain_prepared, snackbar="URL already used, please try another one", long_url_prefilled=request.form.get('url'), short_url_prefilled=request.form.get('short').lower(), domain_prefilled=domain_to_index[request.form.get('domain')], showDomainSelect=showDomainSelect, loginbar=loginbar) #return the user the prefilled form with an error message, because the url was already used
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@app.route('/favicon.ico') #Redirect to the static url of the favicon
def favicon():
return redirect("/static/favicon.ico")
@app.route('/<short_url>')
def redirect_short_url(short_url):
host = request.headers['Host']
url = ""
with sqlite3.connect('db/urls.db') as conn: #Get the original URL from the database
cursor = conn.cursor()
res = cursor.execute('SELECT LONG_URL FROM WEB_URL WHERE SHORT_URL=?', [host + "/" + short_url.lower()])
try:
short = res.fetchone()
if short is not None: #If a long url is found
url = short[0]
error_404 = False
else:
error_404 = True #If no url is found throw a 404. If you throw a error in a try / catch block it will be catched by this, so set a variable to true and throw the error later
except Exception as e: #If there happens an error, print the exception to the console and throw a 500 error
print(e) #Print a debug Message to the console
abort(500) #Throw a 500 error. This means internal Server error.
if not error_404: #If there was no 404 error before, redirect the user. If not throw a 404 error
return redirect(url)
else:
abort(404)
@app.route('/user/login')
def login():
return github.authorize(scope="user")
@app.route('/user/github-callback')
@github.authorized_handler
def authorized(oauth_token):
if oauth_token is None:
return "oauth failed, please try again"
headers = {'Authorization': 'token ' + oauth_token,} #Useragent doesn't matters, but is set here
githubResponse = get("https://api.github.com/user", headers=headers).text
userID = str(json.loads(githubResponse)['id'])
username = str(json.loads(githubResponse)['login'])
resp = make_response(redirect('/'))
resp.set_cookie('userID', userID)
resp.set_cookie('username', username)
return resp
@app.route('/user/logout')
resp.set_cookie('userID', "", max_age=0)
resp.set_cookie('username', "", max_age=0)
return resp
if __name__ == '__main__':
table_check()# This code checks whether database table is created or not
if production: #Check if production variable is set to true use the waitress webserver, else use the buildin flask webserver, with more debug output
serve(app, host=host, port= 5000, url_scheme=url_scheme) #Start the Webserver for all users on port 5000
app.run(host=host, port=5000, debug=True) #Start the Webserver in Debug mode. This means, if the script runs in an error, it will show the error message in Browser.