...
 
Commits (9)
......@@ -34,9 +34,8 @@ unittest:
only: [master, develop]
script:
- echo "$ENV_FILE" > .env
- export $(grep -v '^#' .env | xargs)
- rm .env
- python-unittest
- rm .env
type_check:
stage: test
......
V 0.3.0:
- Now uses puffotter flask module
V 0.2.0:
- Fixed issues with mysql timeouts
- Added support for background tasks
......
......@@ -5,14 +5,10 @@ ENV DEBIAN_FRONTEND=noninteractive
RUN apt update && \
apt install -y python3 python3-pip python3-mysqldb && \
pip3 install Flask cherrypy
pip3 install flask
ADD . flask-app
RUN cd flask-app && python3 setup.py install
WORKDIR flask-app
EXPOSE 8000
CMD ["/usr/bin/python3", "server.py"]
......@@ -19,11 +19,15 @@ services:
- DB_MODE=mysql
- MYSQL_HOST=db
- MYSQL_PORT=3306
- LOGGING_PATH=/var/logs
- MYSQL_DATABASE=appdata
- LOGGING_PATH=/var/logs/fat_ffipd.log
- FLASK_PORT=8000
restart: always
db:
image: "mariadb:10.1"
env_file: .env
environment:
- MYSQL_DATABASE=appdata
networks:
- net
volumes:
......
......@@ -17,18 +17,23 @@ You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from typing import Type
from puffotter.flask.Config import Config as BaseConfig
class ApiException(Exception):
class Config(BaseConfig):
"""
Api raised when an API-related exception occurs
Configuration for the flask application
"""
def __init__(self, reason: str, status_code: int):
@classmethod
def _load_extras(cls, parent: Type[BaseConfig]):
"""
Initializes the exception
:param reason: The reason the API Exception was raised
:param status_code: The status code associated with the exception
Loads non-standard configuration variables
:param parent: The base configuration
:return: None
"""
super().__init__(reason)
self.reason = reason
self.status_code = status_code
parent.API_VERSION = "0"
parent.STRINGS.update({
"password_changed": "PaSsWoRd ChAnGeD SuCeSsFuLlY"
})
......@@ -16,3 +16,16 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
import os
sentry_dsn = "https://4f6db305797046f6985fb0c2dc023423@sentry.namibsun.net/17"
"""
The sentry DSN used for error logging
"""
root_path: str = os.path.join(os.path.dirname(os.path.abspath(__file__)))
"""
The root path of the application
"""
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
import os
import pkg_resources
from typing import Optional
from fat_ffipd.flask import app
class Config:
"""
Class that stores configuration data
"""
@property
def version(self) -> str:
"""
:return: The version of the program
"""
return pkg_resources.get_distribution("fat_ffipd").version
@property
def recaptcha_site_key(self) -> Optional[str]:
"""
:return: The (public) recaptcha site key
"""
return os.environ.get("RECAPTCHA_SITE_KEY")
@property
def recaptcha_secret_key(self) -> Optional[str]:
"""
:return: The secret recaptcha key used to validate the recaptcha result
"""
return os.environ.get("RECAPTCHA_SECRET_KEY")
@property
def db_uri(self) -> str:
"""
:return: The database URI to use in this application
"""
db_mode = os.environ.get("DB_MODE", "sqlite")
if os.environ.get("FLASK_TESTING") or os.environ.get("TESTING"):
db_mode = "sqlite"
if db_mode == "sqlite":
app.logger.warning("Using SQLite database")
return "sqlite:///" + Config.sqlite_path
else:
prefix = db_mode.upper()
default_port = 3306
password = os.environ[prefix + "_PASSWORD"]
uri = "{}://{}:{}@{}:{}/{}".format(
db_mode,
os.environ[prefix + "_USER"],
password,
os.environ.get(prefix + "_HOST", "localhost"),
os.environ.get(prefix + "_PORT", default_port),
os.environ[prefix + "_DATABASE"],
)
app.logger.info("Using DB URI " + uri.replace(password, "?"))
return uri
sqlite_path = "/tmp/fat-ffipd.db"
"""
The path to the SQLite database file
"""
@property
def smtp_host(self) -> str:
"""
:return: The SMTP host used for outbound emails
"""
return os.environ["SMTP_HOST"]
@property
def smtp_port(self) -> int:
"""
:return: The SMTP host used for outbound emails
"""
return int(os.environ["SMTP_PORT"])
@property
def smtp_address(self) -> str:
"""
:return: The SMTP host used for outbound emails
"""
return os.environ["SMTP_ADDRESS"]
@property
def smtp_password(self) -> str:
"""
:return: The SMTP host used for outbound emails
"""
return os.environ["SMTP_PASSWORD"]
@property
def logging_path(self) -> str:
"""
:return: The file in which to store logging data
"""
return os.path.join(
os.environ.get("LOGGING_PATH", default="/tmp"),
"fat-ffipd.log"
)
MIN_USERNAME_LENGTH = 1
"""
The minimum length of a username
"""
MAX_USERNAME_LENGTH = 12
"""
The maximum length of a username
"""
MAX_API_KEY_AGE = 2592000 # 30 days
"""
The maximum age of an API key in seconds
"""
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
import time
from typing import Dict, Any
from fat_ffipd.flask import db
from fat_ffipd.config import Config
from fat_ffipd.db.ModelMixin import ModelMixin
from puffotter.crypto import verify_password
class ApiKey(ModelMixin, db.Model):
"""
Model that describes the 'api_keys' SQL table
An ApiKey is used for API access using HTTP basic auth
"""
def __init__(self, *args, **kwargs):
"""
Initializes the Model
:param args: The constructor arguments
:param kwargs: The constructor keyword arguments
"""
super().__init__(*args, **kwargs)
__tablename__ = "api_keys"
"""
The name of the table
"""
user_id = db.Column(
db.Integer, db.ForeignKey(
"users.id", onupdate="CASCADE", ondelete="CASCADE"
),
nullable=False
)
"""
The ID of the user associated with this API key
"""
user = db.relationship(
"User", backref=db.backref("api_keys", lazy=True, cascade="all,delete")
)
"""
The user associated with this API key
"""
key_hash = db.Column(db.String(255), nullable=False)
"""
The hash of the API key
"""
creation_time = db.Column(db.Integer, nullable=False, default=time.time)
"""
The time at which this API key was created as a UNIX timestamp
"""
def has_expired(self) -> bool:
"""
Checks if the API key has expired.
API Keys expire after 30 days
:return: True if the key has expired, False otherwise
"""
return time.time() - self.creation_time > Config.MAX_API_KEY_AGE
def verify_key(self, key: str) -> bool:
"""
Checks if a given key is valid
:param key: The key to check
:return: True if the key is valid, False otherwise
"""
try:
_id, api_key = key.split(":", 1)
if int(_id) != self.id:
return False
else:
return verify_password(api_key, self.key_hash)
except ValueError:
return False
def __json__(self, include_children: bool = False) -> Dict[str, Any]:
"""
Generates a dictionary containing the information of this model
:param include_children: Specifies if children data models will be
included or if they're limited to IDs
:return: A dictionary representing the model's values
"""
data = {
"id": self.id,
"user_id": self.user_id,
"creation_time": self.creation_time
}
if include_children:
data["user"] = self.user.__json__(include_children)
return data
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from typing import Dict, Any
from fat_ffipd.flask import db
class ModelMixin:
"""
A mixin class that specifies a couple of methods all database
models should implement
"""
id = db.Column(
db.Integer, primary_key=True, nullable=False, autoincrement=True
)
"""
The ID is the primary key of the table and increments automatically
"""
def __json__(self, include_children: bool = False) -> Dict[str, Any]:
"""
Generates a dictionary containing the information of this model
:param include_children: Specifies if children data models will be
included or if they're limited to IDs
:return: A dictionary representing the model's values
"""
raise NotImplementedError() # pragma: no cover
def __str__(self) -> str:
"""
:return: The string representation of this object
"""
data = self.__json__()
_id = data.pop("id")
return "{}:{} <{}>".format(self.__class__.__name__, _id, str(data))
def __repr__(self) -> str:
"""
:return: A string with which the object may be generated
"""
params = ""
for key, val in self.__json__().items():
params += "{}={}, ".format(key, repr(val))
params = params.rsplit(",", 1)[0]
return "{}({})".format(self.__class__.__name__, params)
def __eq__(self, other: Any) -> bool:
"""
Checks the model object for equality with another object
:param other: The other object
:return: True if the objects are equal, False otherwise
"""
if "__json__" in dir(other):
return other.__json__() == self.__json__()
else:
return False
def __hash__(self) -> int:
"""
Creates a hash so that the model objects can be used as keys
:return: None
"""
return hash(repr(self))
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from typing import Dict, Any
from fat_ffipd.flask import db
from fat_ffipd.db.ModelMixin import ModelMixin
from puffotter.crypto import verify_password
class User(ModelMixin, db.Model):
"""
Model that describes the 'users' SQL table
A User stores a user's information, including their email address, username
and password hash
"""
def __init__(self, *args, **kwargs):
"""
Initializes the Model
:param args: The constructor arguments
:param kwargs: The constructor keyword arguments
"""
super().__init__(*args, **kwargs)
__tablename__ = "users"
"""
The name of the table
"""
username = db.Column(db.String(12), nullable=False, unique=True)
"""
The user's username
"""
email = db.Column(db.String(150), nullable=False, unique=True)
"""
The user's email address
"""
password_hash = db.Column(db.String(255), nullable=False)
"""
The user's hashed password, salted and hashed.
"""
confirmed = db.Column(db.Boolean, nullable=False, default=False)
"""
The account's confirmation status. Logins should be impossible as long as
this value is False.
"""
confirmation_hash = db.Column(db.String(255), nullable=False)
"""
The account's confirmation hash. This is the hash of a key emailed to
the user. Only once the user follows the link in the email containing the
key will their account be activated
"""
@property
def is_authenticated(self) -> bool:
"""
Property required by flask-login
:return: True if the user is confirmed, False otherwise
"""
return True
@property
def is_anonymous(self) -> bool:
"""
Property required by flask-login
:return: True if the user is not confirmed, False otherwise
"""
return not self.is_authenticated # pragma: no cover
@property
def is_active(self) -> bool:
"""
Property required by flask-login
:return: True
"""
return self.confirmed
def get_id(self) -> str:
"""
Method required by flask-login
:return: The user's ID as a unicode string
"""
return str(self.id)
def verify_password(self, password: str) -> bool:
"""
Verifies a password against the password hash
:param password: The password to check
:return: True if the password matches, False otherwise
"""
return verify_password(password, self.password_hash)
def verify_confirmation(self, confirmation_key: str) -> bool:
"""
Verifies a confirmation key against the confirmation hash
:param confirmation_key: The key to check
:return: True if the key matches, False otherwise
"""
return verify_password(confirmation_key, self.confirmation_hash)
def __json__(self, include_children: bool = False) -> Dict[str, Any]:
"""
Generates a dictionary containing the information of this model
:param include_children: Specifies if children data models will be
included or if they're limited to IDs
:return: A dictionary representing the model's values
"""
data = {
"id": self.id,
"username": self.username,
"email": self.email,
"confirmed": self.confirmed
}
if include_children:
pass
return data
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
app = Flask(__name__)
"""
The Flask App
"""
db = SQLAlchemy()
"""
The SQLAlchemy database connection
"""
login_manager = LoginManager(app)
"""
The Flask-Login Login Manager
"""
......@@ -17,21 +17,26 @@ You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from puffotter.env import load_env_file
from puffotter.flask.initialize import init_flask
from puffotter.flask.wsgi import start_server
from fat_ffipd import sentry_dsn, root_path
from fat_ffipd.bg_tasks import bg_tasks
from fat_ffipd.Config import Config
def create_tables(app: Flask, db: SQLAlchemy):
def main():
"""
Creates all tables in the database if they don't exist yet
:param app: The flask application
:param db: The database
Starts the flask application
:return: None
"""
# noinspection PyUnresolvedReferences
from fat_ffipd.db.User import User
# noinspection PyUnresolvedReferences
from fat_ffipd.db.ApiKey import ApiKey
with app.app_context():
db.create_all()
load_env_file()
init_flask(
"fat_ffipd",
sentry_dsn,
root_path,
Config,
[],
[]
)
start_server(Config, bg_tasks)
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from functools import wraps
from typing import Callable
from flask import jsonify, make_response, request
from werkzeug.exceptions import Unauthorized
from fat_ffipd.exceptions import ApiException
def api(func: Callable) -> Callable:
"""
Decorator that handles common API patterns and ensures that
the JSON response will always follow a certain pattern
:param func: The function to wrap
:return: The wrapper function
"""
@wraps(func)
def wrapper(*args, **kwargs):
"""
Tries running the function and checks for errors
:param args: args
:param kwargs: kwargs
:return: The JSON response including an appropriate HTTP status code
"""
code = 200
response = {"status": "ok"}
try:
is_json = request.content_type.startswith("application/json") \
and request.is_json \
and isinstance(request.get_json(silent=True), dict)
if request.method in ["POST", "PUT", "DELETE"] and not is_json:
raise ApiException(
"Not in JSON format", 400
)
response["data"] = func(*args, **kwargs)
except (KeyError, TypeError, ValueError, ApiException) as e:
response["status"] = "error"
if isinstance(e, ApiException):
code = e.status_code
response["reason"] = e.reason
else:
code = 400
response["reason"] = "Bad Request: {}".format(type(e).__name__)
return make_response(jsonify(response), code)
return wrapper
def api_login_required(func: Callable) -> Callable:
"""
Decorator to make unauthorized API calls respond with JSON properly
:param func: The function to wrap
:return: The wrapped function
"""
@wraps(func)
def wrapper(*args, **kwargs):
"""
Checks if flask-login throws an Unauthorized exception. If so,
re-wrap the response in JSON
:param args: The function arguments
:param kwargs: The function keyword arguments
:return: The newly wrapped response,
or just the plain response if authorized
"""
try:
resp = func(*args, **kwargs)
return resp
except Unauthorized:
return make_response(
jsonify({"status": "error", "reason": "Unauthorized"}), 401
)
return wrapper
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from typing import Dict, Any
from flask import Blueprint, request
from flask_login import login_required
from puffotter.crypto import generate_random, generate_hash
from fat_ffipd.flask import db
from fat_ffipd.config import Config
from fat_ffipd.db.User import User
from fat_ffipd.db.ApiKey import ApiKey
from fat_ffipd.exceptions import ApiException
from fat_ffipd.routes.api.decorators import api, api_login_required
user_management_api_blueprint = Blueprint("user_management_api", __name__)
@user_management_api_blueprint.route(
"/api/v1/key", methods=["POST", "DELETE"]
)
@api
def api_key() -> Dict[str, Any]:
"""
Allows users to request a new API key or revoke an existing API key
:return: The JSON response
"""
data = request.get_json()
if request.method == "POST":
username = data["username"]
password = data["password"]
user = User.query.filter_by(username=username).first()
if user is None:
raise ApiException("user does not exist", 401)
elif not user.confirmed:
raise ApiException("user is not confirmed", 401)
elif not user.verify_password(password):
raise ApiException("password is incorrect", 401)
else:
key = generate_random(32)
hashed = generate_hash(key)
_api_key = ApiKey(user=user, key_hash=hashed)
db.session.add(_api_key)
db.session.commit()
return {
"api_key": "{}:{}".format(_api_key.id, key),
"expiration": (
int(_api_key.creation_time)
+ Config.MAX_API_KEY_AGE
),
"user": user.__json__(True)
}
else: # request.method == "DELETE"
key = data["api_key"]
_api_key = ApiKey.query.get(key.split(":", 1)[0])
if _api_key is None:
raise ApiException("API key does not exist", 401)
elif not _api_key.verify_key(key):
raise ApiException("API key not valid", 401)
else:
db.session.delete(_api_key)
db.session.commit()
return {}
@user_management_api_blueprint.route("/api/v1/authorize", methods=["GET"])
@api_login_required
@login_required
@api
def api_authorize() -> Dict[str, Any]:
"""
Allows a user to check if an API key is authorized or not
:return: None
"""
return {} # Checks done by @login_required
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from flask import Flask
from fat_ffipd.routes.static import static_blueprint
from fat_ffipd.routes.user_management import user_management_blueprint
from fat_ffipd.routes.api.user_management import user_management_api_blueprint
def register_blueprints(app: Flask):
"""
Registers all route blueprints in the flask app
:param app: The flask application
:return: None
"""
for blueprint in [
static_blueprint,
user_management_blueprint,
user_management_api_blueprint
]:
app.register_blueprint(blueprint)
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from typing import Union
from flask import render_template, Blueprint
from werkzeug import Response
static_blueprint = Blueprint("static", __name__)
@static_blueprint.route("/", methods=["GET"])
def index() -> Union[Response, str]:
"""
The index page
:return: The response
"""
return render_template("static/index.html")
@static_blueprint.route("/about", methods=["GET"])
def about() -> Union[Response, str]:
"""
The about page/"Impressum" for the website
:return: The response
"""
return render_template("static/about.html")
@static_blueprint.route("/privacy", methods=["GET"])
def privacy() -> Union[Response, str]:
"""
Page containing a privacy disclaimer
:return: The response
"""
return render_template("static/privacy.html")
import os
from typing import Union
from werkzeug import Response
from flask import Blueprint, redirect, url_for, request, render_template, flash
from flask_login import login_required, current_user, logout_user, login_user
from puffotter.crypto import generate_hash, generate_random
from puffotter.recaptcha import verify_recaptcha
from puffotter.smtp import send_email
from fat_ffipd.flask import app, db
from fat_ffipd.config import Config
from fat_ffipd.db.User import User
user_management_blueprint = Blueprint("user_management", __name__)
@user_management_blueprint.route("/login", methods=["GET", "POST"])
def login() -> Union[Response, str]:
"""
Page that allows the user to log in
:return: The response
"""
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
remember_me = request.form.get("remember_me") in ["on", True]
user: User = User.query.filter_by(username=username).first()
if user is None:
flash("User does not exist", "danger")
elif current_user.is_authenticated:
flash("User already logged in", "info")
elif not user.confirmed:
flash("User is not confirmed", "danger")
elif not user.verify_password(password):
flash("Invalid Password", "danger")
else:
login_user(user, remember=remember_me)
flash("Logged in successfully", "success")
app.logger.info("User {} logged in.".format(current_user.username))
return redirect(url_for("static.index"))
return redirect(url_for("user_management.login"))
else:
return render_template("user_management/login.html")
@user_management_blueprint.route("/logout", methods=["GET"])
@login_required
def logout() -> Union[Response, str]:
"""
Logs out the user
:return: The response
"""
app.logger.info("User {} logged out.".format(current_user.username))
logout_user()
flash("Logged out", "info")
return redirect(url_for("static.index"))
@user_management_blueprint.route("/register", methods=["GET", "POST"])
def register() -> Union[Response, str]:
"""
Page that allows a new user to register
:return: The response
"""
if request.method == "POST":
username = request.form["username"]
email = request.form["email"]
password = request.form["password"]
password_repeat = request.form["password-repeat"]
recaptcha_result = verify_recaptcha(
request.remote_addr,
request.form["g-recaptcha-response"],
Config().recaptcha_secret_key
)
all_users = User.query.all()
usernames = [user.username for user in all_users]
emails = [user.email for user in all_users]
_min, _max = Config.MIN_USERNAME_LENGTH, Config.MAX_USERNAME_LENGTH
if len(username) < _min or len(username) > _max:
flash("Username must be between {} and {} characters long"
.format(_min, _max), "danger")
elif password != password_repeat:
flash("Passwords do not match", "danger")
elif username in usernames:
flash("Username already exists", "danger")
elif email in emails:
flash("Email already in use", "danger")
elif not recaptcha_result:
flash("ReCaptcha not solved correctly", "danger")
else:
confirmation_key = generate_random(32)
confirmation_hash = generate_hash(confirmation_key)
user = User(
username=username,
email=email,
password_hash=generate_hash(password),
confirmation_hash=confirmation_hash
)
db.session.add(user)
db.session.commit()
email_msg = render_template(
"email/registration.html",
host=request.host,
target=os.path.join(request.host, "confirm"),
username=username,
user_id=user.id,
confirm_key=confirmation_key
)
send_email(
email,
"Registration",
email_msg,
Config().smtp_host,
Config().smtp_address,
Config().smtp_password,
Config().smtp_port
)
app.logger.info("User {} registered.".format(user.username))
flash("Registered Successfully. Check your email inbox for "
"confirmation", "info")
return redirect(url_for("static.index"))
return redirect(url_for("user_management.register"))
else:
return render_template("user_management/register.html")
@user_management_blueprint.route("/confirm", methods=["GET"])
def confirm() -> Union[Response, str]:
"""
Confirms a user
:return: The response
"""
user_id = int(request.args["user_id"])
confirm_key = request.args["confirm_key"]
user: User = User.query.get(user_id)
if user is None:
flash("User does not exist", "danger")
elif user.confirmed:
flash("User already confirmed", "warning")
elif not user.verify_confirmation(confirm_key):
flash("Confirmation key invalid", "warning")
else:
print("D")
user.confirmed = True
db.session.commit()
flash("User confirmed successfully", "success")
return redirect(url_for("static.index"))
@user_management_blueprint.route("/forgot", methods=["POST", "GET"])
def forgot() -> Union[Response, str]:
"""
Allows a user to reset their password
:return: The response
"""
if request.method == "POST":
email = request.form["email"]
recaptcha_result = verify_recaptcha(
request.remote_addr,
request.form["g-recaptcha-response"],
Config().recaptcha_secret_key
)
user: User = User.query.filter_by(email=email).first()
if not recaptcha_result:
flash("Invalid ReCaptcha Response", "danger")
return redirect(url_for("user_management.forgot"))
else:
if user is None:
# Fail silently to ensure that a potential attacker can't use
# the response to figure out information on registered users
pass
else:
new_pass = generate_random(20)
user.password_hash = generate_hash(new_pass)
db.session.commit()
email_msg = render_template(
"email/forgot_password_email.html",
host=request.host,
target=os.path.join(request.host, "login"),
password=new_pass,
username=user.username
)
send_email(
email,
"Password Reset",
email_msg,
Config().smtp_host,
Config().smtp_address,
Config().smtp_password,
Config().smtp_port
)
flash("Password was reset successfully", "success")
return redirect(url_for("static.index"))
else:
return render_template("user_management/forgot.html")
@user_management_blueprint.route("/profile", methods=["GET"])
@login_required
def profile() -> Union[Response, str]:
"""
Allows a user to edit their profile details
:return: The response
"""
return render_template("user_management/profile.html")
@user_management_blueprint.route("/change_password", methods=["POST"])
@login_required
def change_password() -> Union[Response, str]:
"""
Allows the user to change their password
:return: The response
"""
old_password = request.form["old_password"]
new_password = request.form["new_password"]
password_repeat = request.form["password_repeat"]
user: User = current_user
if new_password != password_repeat:
flash("Passwords do not match", "danger")
elif not user.verify_password(old_password):
flash("Invalid Password", "danger")
else:
user.password_hash = generate_hash(new_password)
db.session.commit()
flash("Password changed successfully", "success")
return redirect(url_for("user_management.profile"))
@user_management_blueprint.route("/delete_user", methods=["POST"])
@login_required
def delete_user() -> Union[Response, str]:
"""
Allows a user to delete their account
:return: The response
"""
password = request.form["password"]
user: User = current_user
if not user.verify_password(password):
flash("Invalid Password", "danger")
else:
app.logger.info("Deleting user {}".format(user))
db.session.delete(user)
db.session.commit()
logout_user()
flash("User was deleted", "success")
return redirect(url_for("static.index"))
return redirect(url_for("user_management.profile"))
"""LICENSE
Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
This file is part of fat-ffipd.
fat-ffipd is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
fat-ffipd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
import os
import base64
import logging
import random
import string
from binascii import Error
from typing import Optional
from flask import render_template, flash, redirect, url_for
from flask.logging import default_handler
from werkzeug.exceptions import HTTPException
from fat_ffipd.config import Config
from fat_ffipd.db.User import User
from fat_ffipd.db.ApiKey import ApiKey
from fat_ffipd.db.models import create_tables
from fat_ffipd.flask import app, db, login_manager
from fat_ffipd.routes.blueprints import register_blueprints
def init_logging():
"""
Sets up logging
:return: None
"""
app.logger.removeHandler(default_handler)
logging.basicConfig(
filename=Config().logging_path,
level=logging.DEBUG,
format="[%(asctime)s] %(levelname)s in %(module)s: %(message)s"
)
app.logger.info("STARTING FLASK")
def init_app():
"""
Initializes the flask app
:return: None
"""
app.testing = os.environ.get("FLASK_TESTING") == "1"
app.config["TRAP_HTTP_EXCEPTIONS"] = True
try:
app.secret_key = os.environ["FLASK_SECRET"]
except KeyError:
app.secret_key = "".join(random.choice(string.ascii_letters)
for _ in range(0, 32))
app.logger.warning("No secret key provided")
register_blueprints(app)
@app.context_processor
def inject_template_variables():
"""
Injects the project's version string so that it will be available
in templates
:return: The dictionary to inject
"""
return {
"version": Config().version,
"env": app.env,
"config": Config()
}
@app.errorhandler(HTTPException)
def error_handling(error: HTTPException):
"""
Custom redirect for 401 errors
:param error: The error that caused the error handler to be called
:return: A redirect to the login page
"""
if error.code == 401:
flash("You are not logged in", "danger")
return redirect(url_for("user_management.login"))
else:
return render_template("static/error_page.html", error=error)
@app.errorhandler(Exception)
def exception_handling(e: Exception):
"""
Handles any uncaught exceptions and shows an error 500 page
:param e: The caught exception
:return: None
"""
error = HTTPException("The server encountered an internal error and "
"was unable to complete your request. "
"Either the server is overloaded or there "
"is an error in the application.")
error.code = 500
app.logger.error("Caught exception: {}".format(e))
return render_template("static/error_page.html", error=error)
def init_db():
"""
Initializes the database
:return: None
"""
app.config["SQLALCHEMY_DATABASE_URI"] = Config().db_uri
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# Makes sure that we don't get errors because
# of an idle database connection
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {"pool_pre_ping": True}
db.init_app(app)
create_tables(app, db)
def init_login_manager():
"""
Initializes the login manager
:return: None
"""
login_manager.session_protection = "strong"
# Set up login manager
@login_manager.user_loader
def load_user(user_id: str) -> Optional[User]:
"""
Loads a user from an ID
:param user_id: The ID
:return: The User
"""
return User.query.get(int(user_id))
@login_manager.request_loader
def load_user_from_request(request) -> Optional[User]:
"""
Loads a user pased on a provided API key
:param request: The request containing the API key in the headers
:return: The user or None if no valid API key was provided
"""
if "Authorization" not in request.headers:
return None
api_key = request.headers["Authorization"].replace("Basic ", "", 1)
try:
api_key = base64.b64decode(
api_key.encode("utf-8")
).decode("utf-8")
except (TypeError, Error):
return None
db_api_key = ApiKey.query.get(api_key.split(":", 1)[0])
# Check for validity of API key
if db_api_key is None or not db_api_key.verify_key(api_key):
return None
elif db_api_key.has_expired():
db.session.delete(db_api_key)
db.session.commit()
return None
return User.query.get(db_api_key.user_id)
def init():
"""
Initializes the Flask application
:return: None
"""
init_logging()
init_app()
init_db()
init_login_manager()
......@@ -37,7 +37,7 @@ along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
</div>
<br>
<div data-sitekey="{{ config.recaptcha_site_key }}"
<div data-sitekey="{{ config.RECAPTCHA_SITE_KEY }}"
class="g-recaptcha">
</div>
<br>
......
......@@ -67,7 +67,7 @@ along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
name="password-repeat">
</div>
<br>
<div data-sitekey="{{ config.recaptcha_site_key }}"
<div data-sitekey="{{ config.RECAPTCHA_SITE_KEY }}"
class="g-recaptcha">
</div>
<br>
......
......@@ -17,118 +17,16 @@ You should have received a copy of the GNU General Public License
along with fat-ffipd. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
import os
from base64 import b64encode
from typing import Tuple, Dict
from unittest import TestCase
from puffotter.crypto import generate_random, generate_hash
from fat_ffipd.run import app, db, init
from fat_ffipd.config import Config
from fat_ffipd.db.User import User
from fat_ffipd.db.ApiKey import ApiKey
from fat_ffipd.main import root_path
from puffotter.flask.test.TestFramework import \
_TestFramework as __TestFrameWork
from fat_ffipd.Config import Config
class _TestFramework(TestCase):
class _TestFramework(__TestFrameWork):
"""
Class that models a testing framework for the flask application
"""
def setUp(self):
"""
Sets up the flask application and a temporary database to test with
:return: None
"""
os.environ["FLASK_TESTING"] = "1"
self.app = app
self.db = db
self.app.config["TESTING"] = True
self.app.secret_key = generate_random(20)
self.db_path = Config.sqlite_path
self.cleanup()
init()
self.app.app_context().push()
self.client = self.app.test_client()
self.context = self.app.test_request_context()
def tearDown(self):
"""
Removes any generated files from the filesystem and handles other
post-test tasks
:return: None
"""
self.cleanup()
def cleanup(self):
"""
Cleans up the filesystem after/before tests
:return: None
"""
try:
os.remove(self.db_path)
except FileNotFoundError:
pass
def generate_sample_user(self, confirmed: bool = True) \
-> Tuple[User, str, str]:
"""
Generates a random user for use in tests
:param confirmed: Whether or not the user should be confirmed
:return: The User object, the password and the confirmation key
"""
password = generate_random(20)
confirm_key = generate_random(20)
user = User(
username=generate_random(12),
password_hash=generate_hash(password),
email=generate_random(8) + "@example.com",
confirmed=confirmed,
confirmation_hash=generate_hash(confirm_key)
)
self.db.session.add(user)
self.db.session.commit()
return user, password, confirm_key
def login_user(self, user: User, password: str):
"""
Logs in a user
:param user: The user to log in
:param password: The password to use
:return: None
"""
self.client.post("/login", follow_redirects=True, data={
"username": user.username,
"password": password
})
def generate_api_key(self, user: User) \
-> Tuple[ApiKey, str, Dict[str, str]]:
"""
Generates an API key and base64 encoded headers for requests
:param user: The user for which to create the key
:return: The API key object, the actual API key, the headers
"""
key = generate_random(20)
hashed = generate_hash(key)
api_key_obj = ApiKey(user=user, key_hash=hashed)
self.db.session.add(api_key_obj)
self.db.session.commit()
api_key =