Source code for biosimdb_interface.login.login
#!/usr/bin/env python
"""OAuth2 authorization code flow for BioSimDB login."""
import secrets
from urllib.parse import urlencode
import requests
from flask import current_app, redirect, request, session, url_for
from . import bp
[docs]
def authz_url() -> str:
"""
Build the OAuth2 authorization URL with a secure random state parameter.
Stores the state in the session for later verification in the callback.
Returns:
str: The full authorization URL to redirect the user to.
"""
state = secrets.token_urlsafe(32) # secure random string
session["oauth_state"] = state # store in session
params = {
"response_type": "code",
"client_id": current_app.config["CLIENT_ID"],
"redirect_uri": current_app.config["REDIRECT_URI"],
"scope": current_app.config["SCOPES"],
"state": state,
}
return f"{current_app.config['AUTH_URL']}?{urlencode(params)}"
[docs]
@bp.route("/login")
def login():
"""
Redirect the user to the OAuth2 authorization endpoint.
Clears any previous error from the session before initiating the flow.
"""
session.pop("last_error", None)
url = authz_url()
return redirect(url)
[docs]
@bp.route("/callback")
def callback():
"""
Handle the OAuth2 callback from the authorization server.
Verifies the state parameter, exchanges the authorization code for an
access token, and redirects to the post-login URL or the webform.
Returns:
Response: Redirect to the next URL on success, or the webform on failure.
"""
# make sure state returned is the same that was originally provided
returned_state = request.args.get("state")
original_state = session.get("oauth_state")
if not returned_state or returned_state != original_state:
return "Invalid state parameter", 400
session.pop("oauth_state", None) # clear state once verified
code = request.args.get("code") # generated by the OAuth authorization server
if not code:
session["last_error"] = "No code returned"
return redirect(url_for("form.webform"))
# Exchange code for access token
token_response = requests.post(
current_app.config["TOKEN_URL"],
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": current_app.config["REDIRECT_URI"],
"client_id": current_app.config["CLIENT_ID"],
"client_secret": current_app.config["CLIENT_SECRET"],
},
)
token_json = token_response.json()
if "access_token" not in token_json:
session["last_error"] = "Token exchange failed"
return redirect(url_for("form.webform"))
# save the access token in session so we know the user is logged in
session["access_token"] = token_json["access_token"]
next_url = session.pop("post_login_redirect", url_for("form.webform"))
return redirect(next_url)
[docs]
@bp.route("/logout")
def logout():
"""
Clear the access token from the session and redirect to the webform.
Returns:
Response: Redirect to the webform.
"""
session.pop("access_token", None)
return redirect(url_for("form.webform"))
[docs]
def is_logged_in():
"""
Check whether the user has an active access token in the session.
Returns:
bool: True if the user is logged in, False otherwise.
"""
return "access_token" in session