243 lines
6.6 KiB
Python
243 lines
6.6 KiB
Python
|
from flask import Flask, g, request, render_template, redirect, current_app
|
||
|
from uuid import uuid4
|
||
|
from flask_argon2 import Argon2
|
||
|
from Crypto.Cipher import ARC4
|
||
|
from binascii import hexlify, unhexlify
|
||
|
import sqlite3
|
||
|
import json
|
||
|
import secrets
|
||
|
app = Flask(__name__)
|
||
|
argon2 = Argon2(app)
|
||
|
|
||
|
flag = 'HM{h3r35_y0ur_2time_p4d}'
|
||
|
|
||
|
database = 'wep.sqlite3'
|
||
|
|
||
|
def get_db():
|
||
|
db = getattr(g, '_database', None)
|
||
|
if db is None:
|
||
|
db = g._database = sqlite3.connect(database)
|
||
|
return db
|
||
|
|
||
|
@app.teardown_appcontext
|
||
|
def close_connection(exception):
|
||
|
db = getattr(g, '_database', None)
|
||
|
if db is not None:
|
||
|
db.close()
|
||
|
|
||
|
def check_session(cookies):
|
||
|
|
||
|
if 'session' not in cookies and 'iv' not in cookies or 'uid' not in cookies:
|
||
|
return False
|
||
|
|
||
|
try:
|
||
|
cur = get_db().execute('SELECT uid, username, iv, key FROM users WHERE uid = ? AND iv = ? LIMIT 1', (cookies['uid'], cookies['iv']))
|
||
|
cur_user = cur.fetchone()
|
||
|
except sqlite3.Error as e:
|
||
|
print(e)
|
||
|
return 'Something went wrong, ping the admins'
|
||
|
|
||
|
if not cur_user:
|
||
|
return False
|
||
|
|
||
|
uid = cur_user[0]
|
||
|
username = cur_user[1]
|
||
|
iv = cur_user[2]
|
||
|
key = cur_user[3]
|
||
|
cipher = ARC4.new(bytes(iv) + key)
|
||
|
|
||
|
try:
|
||
|
plaintext = cipher.decrypt(unhexlify(cookies['session']))
|
||
|
session = json.loads(plaintext)
|
||
|
except:
|
||
|
return False
|
||
|
|
||
|
session['key'] = key
|
||
|
session['iv'] = iv
|
||
|
return session
|
||
|
|
||
|
|
||
|
@app.route('/', methods=['GET'])
|
||
|
def index():
|
||
|
return render_template('index.html')
|
||
|
|
||
|
|
||
|
@app.route('/register', methods=['GET'])
|
||
|
def show_register():
|
||
|
return render_template('register.html')
|
||
|
|
||
|
@app.route('/register', methods=['POST'])
|
||
|
def register():
|
||
|
|
||
|
if 'username' not in request.form or 'password' not in request.form or 'password2' not in request.form:
|
||
|
return 'All fields are required!'
|
||
|
username = request.form['username']
|
||
|
password = request.form['password']
|
||
|
password2 = request.form['password2']
|
||
|
|
||
|
if not username.isalnum():
|
||
|
return 'Username must be alphanumeric'
|
||
|
|
||
|
if password != password2:
|
||
|
return 'Password confirmation does not match'
|
||
|
|
||
|
if len(password) < 6:
|
||
|
return 'Password must be at least 6 chars!'
|
||
|
|
||
|
try:
|
||
|
cur = get_db().execute('SELECT username FROM users WHERE username= ? LIMIT 1', (username,))
|
||
|
cur_user = cur.fetchone()
|
||
|
except:
|
||
|
return 'Something went wrong, ping the admins'
|
||
|
|
||
|
if cur_user:
|
||
|
return 'Username already exist'
|
||
|
|
||
|
try:
|
||
|
cur.execute('INSERT INTO users (uid, username, password, iv, key) VALUES(?, ?, ?, 0, ?)', (str(uuid4()), username, argon2.generate_password_hash(password), secrets.token_bytes(16),))
|
||
|
get_db().commit()
|
||
|
except sqlite3.Error as e:
|
||
|
print(e)
|
||
|
return 'Something went wrong, ping the admins'
|
||
|
|
||
|
return redirect('/login', 302)
|
||
|
|
||
|
|
||
|
@app.route('/login', methods=['GET'])
|
||
|
def show_login():
|
||
|
return render_template('login.html')
|
||
|
|
||
|
|
||
|
@app.route('/login', methods=['POST'])
|
||
|
def login():
|
||
|
|
||
|
if 'username' not in request.form or 'password' not in request.form:
|
||
|
return 'All fields are required!'
|
||
|
username = request.form['username']
|
||
|
password = request.form['password']
|
||
|
|
||
|
if not username.isalnum():
|
||
|
return 'Login failed'
|
||
|
|
||
|
if len(password) < 6:
|
||
|
return 'Login failed'
|
||
|
|
||
|
try:
|
||
|
cur = get_db().execute('SELECT uid, username, password, iv, key FROM users WHERE username = ? LIMIT 1', (username,))
|
||
|
cur_user = cur.fetchone()
|
||
|
except sqlite3.Error as e:
|
||
|
print(e)
|
||
|
return 'Something went wrong, ping the admins'
|
||
|
|
||
|
if not cur_user:
|
||
|
return 'Login failed'
|
||
|
|
||
|
if not argon2.check_password_hash(cur_user[2], password):
|
||
|
return 'Login failed'
|
||
|
|
||
|
uid = cur_user[0]
|
||
|
username = cur_user[1]
|
||
|
iv = cur_user[3]+1
|
||
|
key = cur_user[4]
|
||
|
|
||
|
if iv >= 255:
|
||
|
iv = 0
|
||
|
|
||
|
data = {
|
||
|
'username': username,
|
||
|
'description': 'No description yet',
|
||
|
'show_flag': False
|
||
|
}
|
||
|
|
||
|
cipher = ARC4.new(bytes(iv) + key)
|
||
|
|
||
|
try:
|
||
|
ciphertext = cipher.encrypt(json.dumps(data).encode('ascii')).hex()
|
||
|
except:
|
||
|
return 'Something went wrong, ping the admins'
|
||
|
|
||
|
try:
|
||
|
get_db().execute('UPDATE users SET iv = ? WHERE username = ?', (iv, username,))
|
||
|
get_db().commit()
|
||
|
except sqlite3.Error as e:
|
||
|
print(e)
|
||
|
return 'Something went wrong, ping the admins'
|
||
|
|
||
|
response = current_app.make_response(redirect('/user', 302))
|
||
|
response.set_cookie('session', value=str(ciphertext), path='/', httponly=True)
|
||
|
response.set_cookie('uid', value=uid, path='/', httponly=True)
|
||
|
response.set_cookie('iv', value=str(iv), path='/', httponly=True)
|
||
|
return response
|
||
|
|
||
|
@app.route('/user', methods=['GET'])
|
||
|
def show_user():
|
||
|
session = check_session(request.cookies)
|
||
|
if not session:
|
||
|
return redirect('/login', 302)
|
||
|
if session['show_flag']:
|
||
|
return flag
|
||
|
|
||
|
return render_template('user.html', authenticated=True, username=session['username'], description=session['description'])
|
||
|
|
||
|
@app.route('/user', methods=['POST'])
|
||
|
def user():
|
||
|
session = check_session(request.cookies)
|
||
|
if not session:
|
||
|
return redirect('/login', 302)
|
||
|
if session['show_flag']:
|
||
|
return flag
|
||
|
|
||
|
if 'description' not in request.form:
|
||
|
return 'Description field is mandatory!'
|
||
|
|
||
|
description = request.form['description']
|
||
|
|
||
|
if len(description) < 10 or len(description) > 200:
|
||
|
return 'Description either too short (<10) or too long (>200)'
|
||
|
|
||
|
username = session['username']
|
||
|
key = session['key']
|
||
|
iv = session['iv']+1
|
||
|
|
||
|
if iv >= 255:
|
||
|
iv = 0
|
||
|
|
||
|
data = {
|
||
|
'username': username,
|
||
|
'description': description,
|
||
|
'show_flag': False
|
||
|
}
|
||
|
|
||
|
cipher = ARC4.new(bytes(iv) + key)
|
||
|
|
||
|
try:
|
||
|
ciphertext = cipher.encrypt(json.dumps(data).encode('ascii')).hex()
|
||
|
except:
|
||
|
return 'Something went wrong, ping the admins'
|
||
|
|
||
|
try:
|
||
|
get_db().execute('UPDATE users SET iv = ? WHERE username = ?', (iv, username,))
|
||
|
get_db().commit()
|
||
|
except sqlite3.Error as e:
|
||
|
print(e)
|
||
|
return 'Something went wrong, ping the admins'
|
||
|
|
||
|
response = current_app.make_response(redirect('/user', 302))
|
||
|
response.set_cookie('session', value=str(ciphertext), path='/', httponly=True)
|
||
|
response.set_cookie('iv', value=str(iv), path='/', httponly=True)
|
||
|
return response
|
||
|
|
||
|
|
||
|
return render_template('user.html', authenticated=True, username=session['username'], description=session['description'])
|
||
|
|
||
|
@app.route('/logout', methods=['GET'])
|
||
|
def logout():
|
||
|
response = current_app.make_response(redirect('/', 302))
|
||
|
for cookie in request.cookies:
|
||
|
response.set_cookie(cookie, value='', expires=0)
|
||
|
return response
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
app.run(host='0.0.0.0')
|