Source code for eodag.plugins.authentication.eoiam

# -*- coding: utf-8 -*-
# Copyright 2026, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

from urllib.parse import parse_qs, urljoin

import requests
from lxml import html
from requests.auth import AuthBase

from eodag.plugins.authentication.base import Authentication
from eodag.utils.exceptions import AuthenticationError, MisconfiguredError


[docs] class EOIAMAuth(Authentication): """ Authentication plugin for EOIAM. """
[docs] def __init__(self, provider, config) -> None: """Initialize the plugin with provider and config, and set up a requests.Session for SAML login.""" super().__init__(provider, config) self.session = requests.Session()
def validate_config_credentials(self) -> None: """Validate configured credentials""" required = ["username", "password"] missing = [ k for k in required if k not in getattr(self.config, "credentials", {}) ] if missing: msg = f"Missing credentials for {self.provider}: {', '.join(missing)}" raise MisconfiguredError(msg) def authenticate(self) -> AuthBase: """Return a requests.AuthBase object using the session with SAML login.""" self.validate_config_credentials() return _EOIAMSessionAuth(self) def _extract_input_value(self, tree, name: str) -> str: """Extract the value of an input field from the HTML tree.""" inputs = tree.xpath(f"//input[@name='{name}']") if not inputs: msg = f"{name} input not found" raise MisconfiguredError(msg) value = inputs[0].get("value") if not value: msg = f"{name} has no value" raise MisconfiguredError(msg) return value def _extract_first_form(self, tree: html.HtmlElement) -> html.HtmlElement: """Extract the first form from the HTML tree.""" forms = tree.xpath("//form") if not forms: raise MisconfiguredError("Form not found") return forms[0] def _resolve_action(self, form: html.HtmlElement, base_url: str) -> str: """Resolve the action URL of a form relative to the base URL.""" action = form.get("action") if not action: raise MisconfiguredError("Form action not found") return urljoin(base_url, action) def _login_from_html(self, html_content: str, req_url: str) -> requests.Response: """Perform SAML login from HTML page.""" creds = self.config.credentials username = creds["username"] password = creds["password"] base_url = self.config.auth_uri # Parse the HTML to extract the session key and form action tree = html.fromstring(html_content) session_key = self._extract_input_value(tree, "sessionDataKey") form = self._extract_first_form(tree) idp_url = self._resolve_action(form, base_url) # Submit credentials payload = { "tocommonauth": "true", "username": username, "password": password, "sessionDataKey": session_key, } resp = self.session.post(idp_url, data=payload, allow_redirects=True) resp.raise_for_status() if "consent.do" in resp.url: redirect_url = resp.url service_names = parse_qs(redirect_url).get("sp", [""]) service_name = service_names[0] if service_names else "" msg = ( f"Consent required for service {service_name}, " f"please fill the following form and try again {req_url}" ) raise AuthenticationError(msg) if "login.do" in resp.url: raise MisconfiguredError("Login failed: please check your credentials") if "Earth Observation Identity and Access Management System" in resp.text: msg = f"Login failed: {resp.url}" raise MisconfiguredError(msg) # Extract SAML form tree = html.fromstring(resp.text) form = self._extract_first_form(tree) saml_url = self._resolve_action(form, base_url) saml_data = { inp.get("name"): inp.get("value") for inp in form.xpath(".//input[@name]") if inp.get("name") and inp.get("value") } # Submit SAML response resp_post = self.session.post(saml_url, data=saml_data, allow_redirects=False) if not resp_post.is_redirect: msg = f"Unexpected response after SAML login: {resp_post.status_code}" raise AuthenticationError(msg) final_url = resp_post.headers.get("Location") if not final_url: raise AuthenticationError("Final redirect URL not found after SAML login") resp_final = self.session.get(final_url, stream=True) content_type = resp_final.headers.get("Content-Type", "") if content_type.startswith(("text/html", "text/xml")): error_text = resp_final.text if "wants to access your account" in error_text: msg = ( "Consent required: please log in to the EOIAM portal " f"and grant consent through this link {final_url}" ) raise AuthenticationError(msg) if ( "not yet performed the necessary steps in order to access this data." in error_text ): msg = ( f"Data access request required: please log in to the EOIAM portal " f"and request access to the data through this link {final_url}" ) raise AuthenticationError(msg) raise AuthenticationError("Unexpected HTML response after SAML login") return resp_final
class _EOIAMSessionAuth(AuthBase): """AuthBase wrapper using a requests.Session with lazy SAML login.""" def __init__(self, auth_plugin: EOIAMAuth): """Initialize with the EOIAMAuth plugin to access its session and login method.""" self.auth_plugin = auth_plugin def __call__(self, request): """ This is called by requests before sending a request. We use the session's get/post to ensure login happens if needed. """ session = self.auth_plugin.session try: resp = session.get(request.url, allow_redirects=True) if "Earth Observation Identity and Access Management System" in resp.text: resp = self.auth_plugin._login_from_html(resp.text, request.url) # Copy cookies from session to the request request.prepare_cookies(self.auth_plugin.session.cookies) return request finally: self.auth_plugin.session = requests.Session()