Source code for biosimdb_interface.login.login

#!/usr/bin/env python
"""OAuth2 authorization code flow for BioSimDB login."""

import secrets
from urllib.parse import urlencode

import requests
from flask import current_app, redirect, request, session, url_for

from . import bp


[docs] def authz_url() -> str: """ Build the OAuth2 authorization URL with a secure random state parameter. Stores the state in the session for later verification in the callback. Returns: str: The full authorization URL to redirect the user to. """ state = secrets.token_urlsafe(32) # secure random string session["oauth_state"] = state # store in session params = { "response_type": "code", "client_id": current_app.config["CLIENT_ID"], "redirect_uri": current_app.config["REDIRECT_URI"], "scope": current_app.config["SCOPES"], "state": state, } return f"{current_app.config['AUTH_URL']}?{urlencode(params)}"
[docs] @bp.route("/login") def login(): """ Redirect the user to the OAuth2 authorization endpoint. Clears any previous error from the session before initiating the flow. """ session.pop("last_error", None) url = authz_url() return redirect(url)
[docs] @bp.route("/callback") def callback(): """ Handle the OAuth2 callback from the authorization server. Verifies the state parameter, exchanges the authorization code for an access token, and redirects to the post-login URL or the webform. Returns: Response: Redirect to the next URL on success, or the webform on failure. """ # make sure state returned is the same that was originally provided returned_state = request.args.get("state") original_state = session.get("oauth_state") if not returned_state or returned_state != original_state: return "Invalid state parameter", 400 session.pop("oauth_state", None) # clear state once verified code = request.args.get("code") # generated by the OAuth authorization server if not code: session["last_error"] = "No code returned" return redirect(url_for("form.webform")) # Exchange code for access token token_response = requests.post( current_app.config["TOKEN_URL"], data={ "grant_type": "authorization_code", "code": code, "redirect_uri": current_app.config["REDIRECT_URI"], "client_id": current_app.config["CLIENT_ID"], "client_secret": current_app.config["CLIENT_SECRET"], }, ) token_json = token_response.json() if "access_token" not in token_json: session["last_error"] = "Token exchange failed" return redirect(url_for("form.webform")) # save the access token in session so we know the user is logged in session["access_token"] = token_json["access_token"] next_url = session.pop("post_login_redirect", url_for("form.webform")) return redirect(next_url)
[docs] @bp.route("/logout") def logout(): """ Clear the access token from the session and redirect to the webform. Returns: Response: Redirect to the webform. """ session.pop("access_token", None) return redirect(url_for("form.webform"))
[docs] def is_logged_in(): """ Check whether the user has an active access token in the session. Returns: bool: True if the user is logged in, False otherwise. """ return "access_token" in session