# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015-2018 CERN.
# Copyright (C) 2021 TU Wien.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Invenio user management and authentication."""
from warnings import warn
import pkg_resources
from flask import Blueprint, abort, current_app, request_finished, session
from flask_kvsession import KVSessionExtension
from flask_login import LoginManager, user_logged_in, user_logged_out
from flask_principal import AnonymousIdentity
from flask_security import Security
from invenio_db import db
from passlib.registry import register_crypt_handler
from werkzeug.utils import cached_property
from invenio_accounts.forms import (
confirm_register_form_factory,
login_form_factory,
register_form_factory,
)
from . import config
from .datastore import SessionAwareSQLAlchemyUserDatastore
from .hash import InvenioAesEncryptedEmail
from .models import Role, User
from .sessions import csrf_token_reset, login_listener, logout_listener
from .utils import obj_or_import_string, set_session_info
[docs]class InvenioAccounts(object):
"""Invenio-Accounts extension."""
def __init__(self, app=None, sessionstore=None):
"""Extension initialization.
:param app: The Flask application.
:param sessionstore: store for sessions. Passed to
``flask-kvsession``. Defaults to redis.
"""
self.security = Security()
self.datastore = None
if app:
self.init_app(app, sessionstore=sessionstore)
[docs] @staticmethod
def monkey_patch_flask_security():
"""Monkey-patch Flask-Security."""
# Disable remember me cookie generation as it does not work with
# session activity tracking (remember me token will bypass revoking
# of a session).
def patch_do_nothing(*args, **kwargs):
pass
LoginManager._set_cookie = patch_do_nothing
[docs] @cached_property
def jwt_decode_factory(self):
"""Load default JWT veryfication factory."""
return obj_or_import_string(
current_app.config.get("ACCOUNTS_JWT_DECODE_FACTORY")
)
[docs] @cached_property
def jwt_creation_factory(self):
"""Load default JWT creation factory."""
return obj_or_import_string(
current_app.config.get("ACCOUNTS_JWT_CREATION_FACTORY")
)
[docs] def register_anonymous_identity_loader(self, state):
"""Registers a loader for AnonymousIdentity.
Additional loader is necessary for applying a need 'any-user' to
AnonymousUser in the invenio-access module
"""
# Attention: the order of the loaders is important
# append is used here instead of decorator to enforce the order
state.principal.identity_loaders.append(AnonymousIdentity)
[docs] def check_configuration_consistency(self, app):
"""Check if the config is consistent and issue a warning if not."""
# Warn if inconsistent configuration is detected
sec = app.extensions["security"]
local_login = app.config.get("ACCOUNTS_LOCAL_LOGIN_ENABLED", True)
local_account_editable = sec.registerable or sec.changeable or sec.recoverable
if local_account_editable and not local_login:
warning_message = (
"ACCOUNTS_LOCAL_LOGIN_ENABLED is False, while at least one "
"of SECURITY_REGISTERABLE, SECURITY_RECOVERABLE and "
"SECURITY_CHANGEABLE is True"
)
warn(warning_message, Warning, stacklevel=2)
[docs] def init_app(self, app, sessionstore=None, register_blueprint=True):
"""Flask application initialization.
The following actions are executed:
#. Initialize the configuration.
#. Monkey-patch Flask-Security.
#. Create the user datastore.
#. Create the sessionstore.
#. Initialize the extension, the forms to register users and
confirms their emails, the CLI and, if ``ACCOUNTS_USE_CELERY`` is
``True``, register a celery task to send emails.
#. Override Flask-Security's default login view function.
#. Warn if inconsistent configuration is detected
:param app: The Flask application.
:param sessionstore: store for sessions. Passed to
``flask-kvsession``. If ``None`` then Redis is configured.
(Default: ``None``)
:param register_blueprint: If ``True``, the application registers the
blueprints. (Default: ``True``)
"""
self.init_config(app)
# Monkey-patch Flask-Security
InvenioAccounts.monkey_patch_flask_security()
# Create user datastore
if not self.datastore:
self.datastore = SessionAwareSQLAlchemyUserDatastore(db, User, Role)
if app.config["ACCOUNTS_SESSION_ACTIVITY_ENABLED"]:
self._enable_session_activity(app=app)
# Initialize extension.
_register_blueprint = app.config.get("ACCOUNTS_REGISTER_BLUEPRINT")
if _register_blueprint is not None:
register_blueprint = _register_blueprint
state = self.security.init_app(
app, datastore=self.datastore, register_blueprint=register_blueprint
)
# Override Flask-Security's default login view function
new_login_view = obj_or_import_string(
app.config.get("ACCOUNTS_LOGIN_VIEW_FUNCTION")
)
if new_login_view is not None:
app.view_functions["security.login"] = new_login_view
self.register_anonymous_identity_loader(state)
app.extensions["security"].register_form = register_form_factory(
app.extensions["security"].register_form, app
)
app.extensions[
"security"
].confirm_register_form = confirm_register_form_factory(
app.extensions["security"].confirm_register_form, app
)
app.extensions["security"].login_form = login_form_factory(
app.extensions["security"].login_form, app
)
if app.config["ACCOUNTS_USE_CELERY"]:
from invenio_accounts.tasks import send_security_email
@state.send_mail_task
def delay_security_email(msg):
send_security_email.delay(msg.__dict__)
# Register context processor
if app.config["ACCOUNTS_JWT_DOM_TOKEN"]:
from invenio_accounts.context_processors.jwt import jwt_proccessor
app.context_processor(jwt_proccessor)
# Register signal receiver
if app.config.get("ACCOUNTS_USERINFO_HEADERS"):
request_finished.connect(set_session_info, app)
# Set Session KV store
session_kvstore_factory = obj_or_import_string(
app.config["ACCOUNTS_SESSION_STORE_FACTORY"]
)
session_kvstore = session_kvstore_factory(app)
self.kvsession_extension = KVSessionExtension(session_kvstore, app)
self.check_configuration_consistency(app)
app.extensions["invenio-accounts"] = self
[docs] def init_config(self, app):
"""Initialize configuration.
:param app: The Flask application.
"""
try:
pkg_resources.get_distribution("celery")
app.config.setdefault("ACCOUNTS_USE_CELERY", not (app.debug or app.testing))
except pkg_resources.DistributionNotFound: # pragma: no cover
app.config.setdefault("ACCOUNTS_USE_CELERY", False)
# Register Invenio legacy password hashing
register_crypt_handler(InvenioAesEncryptedEmail)
# Change Flask defaults
app.config.setdefault("SESSION_COOKIE_SECURE", not app.debug)
# Change Flask-Security defaults
app.config.setdefault("SECURITY_PASSWORD_SALT", app.config["SECRET_KEY"])
# Set JWT secret key
app.config.setdefault(
"ACCOUNTS_JWT_SECRET_KEY",
app.config.get("ACCOUNTS_JWT_SECRET_KEY", app.config.get("SECRET_KEY")),
)
config_apps = ["ACCOUNTS", "SECURITY_"]
for k in dir(config):
if any([k.startswith(prefix) for prefix in config_apps]):
app.config.setdefault(k, getattr(config, k))
def _enable_session_activity(self, app):
"""Enable session activity."""
user_logged_in.connect(login_listener, app)
user_logged_in.connect(csrf_token_reset, app)
user_logged_out.connect(logout_listener, app)
user_logged_out.connect(csrf_token_reset, app)
from .views.security import revoke_session, security
from .views.settings import blueprint
blueprint.route("/security/", methods=["GET"])(security)
blueprint.route("/sessions/revoke/", methods=["POST"])(revoke_session)
[docs]class InvenioAccountsREST(InvenioAccounts):
"""Invenio-Accounts REST extension."""
[docs] def init_app(self, app, sessionstore=None, register_blueprint=False):
"""Flask application initialization.
:param app: The Flask application.
:param sessionstore: store for sessions. Passed to
``flask-kvsession``. If ``None`` then Redis is configured.
(Default: ``None``)
:param register_blueprint: If ``True``, the application registers the
blueprints. (Default: ``True``)
"""
# Register the Flask-Security blueprint for the email templates
if not register_blueprint:
security_bp = Blueprint(
"security_email_templates", # name differently to avoid misuse
"flask_security.core",
template_folder="templates",
)
security_rest_overrides = Blueprint(
"security_email_overrides", # overrides
__name__,
template_folder="templates",
)
app.register_blueprint(security_bp)
app.register_blueprint(security_rest_overrides)
super(InvenioAccountsREST, self).init_app(
app,
sessionstore=sessionstore,
register_blueprint=register_blueprint,
)
app.config["ACCOUNTS_CONFIRM_EMAIL_ENDPOINT"] = app.config[
"ACCOUNTS_REST_CONFIRM_EMAIL_ENDPOINT"
]
app.config["ACCOUNTS_RESET_PASSWORD_ENDPOINT"] = app.config[
"ACCOUNTS_REST_RESET_PASSWORD_ENDPOINT"
]
if app.config.get("ACCOUNTS_REGISTER_UNAUTHORIZED_CALLBACK", True):
def _unauthorized_callback():
"""Callback to abort when user is unauthorized."""
abort(401)
app.login_manager.unauthorized_handler(_unauthorized_callback)
[docs]class InvenioAccountsUI(InvenioAccounts):
"""Invenio-Accounts UI extension."""
[docs] def init_app(self, app, sessionstore=None, register_blueprint=True):
"""Flask application initialization.
:param app: The Flask application.
:param sessionstore: store for sessions. Passed to
``flask-kvsession``. If ``None`` then Redis is configured.
(Default: ``None``)
:param register_blueprint: If ``True``, the application registers the
blueprints. (Default: ``True``)
"""
self.make_session_permanent(app)
return super(InvenioAccountsUI, self).init_app(
app, sessionstore=sessionstore, register_blueprint=register_blueprint
)
[docs] def make_session_permanent(self, app):
"""Make session permanent by default.
Set `PERMANENT_SESSION_LIFETIME` to specify time-to-live
"""
@app.before_request
def make_session_permanent():
session.permanent = True