Source code for eodag.api.provider

# -*- coding: utf-8 -*-
# Copyright 2025, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import logging
import os
import tempfile
import traceback
from collections import UserDict
from inspect import isclass
from textwrap import shorten
from typing import (
    TYPE_CHECKING,
    Any,
    Iterator,
    Mapping,
    Optional,
    Union,
    get_type_hints,
)

import yaml

from eodag.api.collection import Collection
from eodag.api.product.metadata_mapping import (
    NOT_AVAILABLE,
    mtd_cfg_as_conversion_and_querypath,
)
from eodag.config import PluginConfig, credentials_in_auth, load_stac_provider_config
from eodag.utils import (
    GENERIC_COLLECTION,
    STAC_SEARCH_PLUGINS,
    cast_scalar_value,
    deepcopy,
    merge_mappings,
    slugify,
    update_nested_dict,
)
from eodag.utils.exceptions import (
    MisconfiguredError,
    UnsupportedCollection,
    UnsupportedProvider,
    ValidationError,
)
from eodag.utils.free_text_search import compile_free_text_query
from eodag.utils.repr import dict_to_html_table, str_as_href

if TYPE_CHECKING:
    from typing_extensions import Self

    from eodag.api.core import EODataAccessGateway

logger = logging.getLogger("eodag.provider")

AUTH_TOPIC_KEYS = ("auth", "search_auth", "download_auth")
PLUGINS_TOPICS_KEYS = ("api", "search", "download") + AUTH_TOPIC_KEYS


[docs] class ProviderConfig(yaml.YAMLObject): """EODAG configuration for a provider. :param name: The name of the provider :param priority: (optional) The priority of the provider while searching a product. Lower value means lower priority. (Default: 0) :param roles: The roles of the provider (e.g. "host", "producer", "licensor", "processor") :param description: (optional) A short description of the provider :param url: URL to the webpage representing the provider :param api: (optional) The configuration of a plugin of type Api :param search: (optional) The configuration of a plugin of type Search :param products: (optional) The collections supported by the provider :param download: (optional) The configuration of a plugin of type Download :param auth: (optional) The configuration of a plugin of type Authentication :param search_auth: (optional) The configuration of a plugin of type Authentication for search :param download_auth: (optional) The configuration of a plugin of type Authentication for download :param kwargs: Additional configuration variables for this provider """ name: str group: str priority: int = 0 roles: list[str] description: str url: str api: PluginConfig search: PluginConfig products: dict[str, Any] download: PluginConfig auth: PluginConfig search_auth: PluginConfig download_auth: PluginConfig yaml_loader = yaml.Loader yaml_dumper = yaml.SafeDumper yaml_tag = "!provider" def __setstate__(self, state: dict[str, Any]) -> None: """Apply defaults when building from yaml.""" self.__dict__.update(state) self._apply_defaults() def __contains__(self, key): """Check if a key is in the ProviderConfig.""" return key in self.__dict__
[docs] @classmethod def from_yaml(cls, loader: yaml.Loader, node: Any) -> Iterator[Self]: """Build a :class:`~eodag.api.provider.ProviderConfig` from Yaml""" cls.validate(tuple(node_key.value for node_key, _ in node.value)) for node_key, node_value in node.value: if node_key.value == "name": node_value.value = slugify(node_value.value).replace("-", "_") elif node_key.value in PLUGINS_TOPICS_KEYS: if node_value.tag != PluginConfig.yaml_tag: msg = "Provider plugin topic '%s' must be tagged with '%s'" % ( node_key.value, PluginConfig.yaml_tag, ) raise MisconfiguredError(msg) return loader.construct_yaml_object(node, cls)
[docs] @classmethod def from_mapping(cls, mapping: dict[str, Any]) -> Self: """Build a :class:`~eodag.api.provider.ProviderConfig` from a mapping""" cls.validate(mapping) # Create a deep copy to avoid modifying the input dict or its nested structures mapping_copy = deepcopy(mapping) for key in PLUGINS_TOPICS_KEYS: if not (_mapping := mapping_copy.get(key)): continue if not isinstance(_mapping, dict): _mapping = _mapping.__dict__ mapping_copy[key] = PluginConfig.from_mapping(_mapping) c = cls() c.__dict__.update(mapping_copy) c._apply_defaults() return c
[docs] @staticmethod def validate(config_keys: Union[tuple[str, ...], dict[str, Any]]) -> None: """Validate a :class:`~eodag.api.provider.ProviderConfig` :param config_keys: The configurations keys to validate """ if "name" not in config_keys: raise ValidationError("Provider config must have name key") if not any(k in config_keys for k in PLUGINS_TOPICS_KEYS): raise ValidationError("A provider must implement at least one plugin") non_api_keys = [k for k in PLUGINS_TOPICS_KEYS if k != "api"] if "api" in config_keys and any(k in config_keys for k in non_api_keys): raise ValidationError( "A provider implementing an Api plugin must not implement any other " "type of plugin" )
[docs] def update(self, config: Union[Self, dict[str, Any]]) -> None: """Update the configuration parameters with values from `mapping` :param config: The config from which to override configuration parameters """ source = config if isinstance(config, dict) else config.__dict__ merge_mappings( self.__dict__, { key: value for key, value in source.items() if key not in PLUGINS_TOPICS_KEYS and value is not None }, ) for key in PLUGINS_TOPICS_KEYS: current_value: Optional[PluginConfig] = getattr(self, key, None) config_value = source.get(key, {}) if current_value is not None: current_value |= config_value elif isinstance(config_value, PluginConfig): setattr(self, key, config_value) elif config_value: try: setattr(self, key, PluginConfig.from_mapping(config_value)) except ValidationError as e: logger.warning( ( "Could not add %s Plugin config to %s configuration: %s. " "Try updating existing %s Plugin configs instead." ), key, self.name, str(e), ", ".join([k for k in PLUGINS_TOPICS_KEYS if hasattr(self, k)]), ) self._apply_defaults()
[docs] def with_name(self, new_name: str) -> Self: """Create a copy of this :class:`~eodag.api.provider.ProviderConfig` with a different name. :param new_name: The new name for the provider config. :return: A new ProviderConfig instance with the updated name. """ config_dict = self.__dict__.copy() config_dict["name"] = new_name for key in PLUGINS_TOPICS_KEYS: if key in config_dict and isinstance(config_dict[key], PluginConfig): config_dict[key] = config_dict[key].__dict__ return self.__class__.from_mapping(config_dict)
def _apply_defaults(self: Self) -> None: """Applies some default values to provider config.""" stac_search_default_conf = load_stac_provider_config() # For the provider, set the default output_dir of its download plugin # as tempdir in a portable way for download_topic_key in ("download", "api"): if download_topic_key in vars(self): download_conf = getattr(self, download_topic_key) if not getattr(download_conf, "output_dir", None): download_conf.output_dir = tempfile.gettempdir() if not getattr(download_conf, "delete_archive", None): download_conf.delete_archive = True try: if ( stac_search_default_conf is not None and self.search and self.search.type in STAC_SEARCH_PLUGINS ): # search config set to stac defaults overriden with provider config per_provider_stac_provider_config = deepcopy(stac_search_default_conf) self.search.__dict__ = update_nested_dict( per_provider_stac_provider_config["search"], self.search.__dict__, allow_empty_values=True, ) except AttributeError: pass
[docs] class Provider: """ Represents a data provider with its configuration and utility methods. :param config: Provider configuration as :meth:`~eodag.api.provider.ProviderConfig` instance or :class:`dict` :param collections_fetched: Flag indicating whether collections have been fetched Example ------- >>> from eodag.api.provider import Provider >>> config = { ... 'name': 'example_provider', ... 'description': 'Example provider for testing', ... 'search': {'type': 'StacSearch'}, ... 'products': {'S2_MSI_L1C': {'_collection': 'S2_MSI_L1C'}} ... } >>> provider = Provider(config) >>> provider.name 'example_provider' >>> 'S2_MSI_L1C' in provider.collections_config True >>> provider.priority # Default priority 0 """ _name: str _config: ProviderConfig collections_fetched: bool def __init__(self, config: Union[ProviderConfig, dict[str, Any]]): """Initialize provider with configuration.""" if isinstance(config, dict): self._config = ProviderConfig.from_mapping(config) elif isinstance(config, ProviderConfig): self._config = config else: msg = f"Unsupported config type: {type(config)}. Expected ProviderConfig or dict." raise ValidationError(msg) self._name = self._config.name self.collections_fetched = False def __str__(self) -> str: """Return the provider's name as string.""" return self.name def __repr__(self) -> str: """Return a string representation of the Provider.""" return f"Provider('{self.name}')" def __eq__(self, other: object): """Compare providers by name or with a string.""" if isinstance(other, Provider): return self.name == other.name elif isinstance(other, str): return self.name == other return False def __hash__(self): """Hash based on provider name, for use in sets/dicts.""" return hash(self.name) def _repr_html_(self, embedded: bool = False) -> str: """HTML representation for Jupyter/IPython display.""" group_display = f" ({self.group})" if self.group else "" thead = ( f"""<thead><tr><td style='text-align: left; color: grey;'> {type(self).__name__}("<span style='color: black'>{self.name}{group_display}</span>")</td></tr></thead> """ if not embedded else "" ) tr_style = "style='background-color: transparent;'" if embedded else "" summaries = { "name": self.name, "title": self.config.description or "", "url": self.config.url or "", "priority": self.priority, } if self.group: summaries["group"] = self.group col_html_table = dict_to_html_table(summaries, depth=1, brackets=False) return ( f"<table>{thead}<tbody>" f"<tr {tr_style}><td style='text-align: left;'>" f"{col_html_table}</td></tr>" "</tbody></table>" ) @property def config(self) -> ProviderConfig: """ Provider configuration (read-only assignment). To update configuration safely, use :meth:`~eodag.api.provider.Provider.update_from_config` which handles metadata mapping and other provider-specific logic. Note: Direct config modification (``config.update()``, ``config.name = ...``) bypasses important provider validation. """ return self._config @property def name(self) -> str: """The name of the provider.""" return self._name @property def title(self) -> Optional[str]: """The title of the provider.""" return getattr(self.config, "description", None) @property def url(self) -> Optional[str]: """The url of the provider.""" return getattr(self.config, "url", None) @property def collections_config(self) -> dict[str, Any]: """Return the collections configuration dictionary for this provider.""" return getattr(self.config, "products", {}) @property def priority(self) -> int: """Return the provider's priority (default: 0).""" return self.config.priority @property def group(self) -> Optional[str]: """Return the provider's group, if any.""" return getattr(self.config, "group", None) @property def search_config(self) -> Optional[PluginConfig]: """Return the search plugin config, if any.""" return getattr(self.config, "search", None) or getattr(self.config, "api", None) @property def fetchable(self) -> bool: """Return True if the provider can fetch collections.""" return bool( getattr(self.search_config, "discover_collections", {}).get("fetch_url") ) @property def unparsable_properties(self) -> set[str]: """Return set of unparsable properties from :attr:`~eodag.config.PluginConfig.DiscoverCollections.generic_collection_unparsable_properties`, if any. """ if not self.fetchable or self.search_config is None: return set() props = getattr( getattr(self.search_config, "discover_collections", None), "generic_collection_unparsable_properties", {}, ) return set(props.keys()) if isinstance(props, dict) else set() @property def api_config(self) -> Optional[PluginConfig]: """Return the api plugin config, if any.""" return getattr(self.config, "api", None) @property def download_config(self) -> Optional[PluginConfig]: """Return the download plugin config, if any.""" return getattr(self.config, "download", None) def _get_auth_confs_with_credentials(self) -> list[PluginConfig]: """ Collect all auth configs from the provider that have credentials. :return: List of auth plugin configs with credentials. """ return [ getattr(self.config, auth_key) for auth_key in AUTH_TOPIC_KEYS if hasattr(self.config, auth_key) and credentials_in_auth(getattr(self.config, auth_key)) ] def _copy_matching_credentials( self, auth_confs_with_creds: list[PluginConfig], ) -> None: """ Copy credentials from matching auth configs to the target auth config. :param auth_confs_with_creds: Auth configs with credentials. """ for key in AUTH_TOPIC_KEYS: provider_auth_config = getattr(self.config, key, None) if provider_auth_config and not credentials_in_auth(provider_auth_config): for conf_with_creds in auth_confs_with_creds: if conf_with_creds.matches_target_auth(provider_auth_config): getattr( self.config, key ).credentials = conf_with_creds.credentials return
[docs] def delete_collection(self, name: str) -> None: """Remove a collection from this provider. :param name: The collection name. :raises UnsupportedCollection: If the collection is not found. """ try: del self.collections_config[name] except KeyError: msg = f"Collection '{name}' not found in provider '{self.name}'." raise UnsupportedCollection(msg)
[docs] def sync_collections( self, dag: EODataAccessGateway, strict_mode: bool, ) -> None: """ Synchronize collections for a provider based on strict or permissive mode. In strict mode, removes collections not in :attr:`~eodag.api.core.EODataAccessGateway.collections_config`. In permissive mode, adds empty collection to config for missing types. :param dag: The gateway instance to use to list existing collections and to create new collection instances. :param strict_mode: If ``True``, remove unknown collections; if ``False``, add empty configs for them. """ products_to_remove: list[str] = [] products_to_add: list[str] = [] for product_id in self.collections_config: if product_id == GENERIC_COLLECTION: continue if product_id not in dag.collections_config: if strict_mode: products_to_remove.append(product_id) continue empty_product = Collection.create_with_dag( dag, id=product_id, title=product_id, description=NOT_AVAILABLE ) dag.collections_config[product_id] = empty_product products_to_add.append(product_id) if products_to_add: logger.debug( "Collections permissive mode, %s added (provider %s)", ", ".join(products_to_add), self, ) if products_to_remove: logger.debug( "Collections strict mode, ignoring %s (provider %s)", ", ".join(products_to_remove), self, ) for id in products_to_remove: self.delete_collection(id)
def _mm_already_built(self) -> bool: """Check if metadata mapping is already built (converted to querypaths/conversion).""" mm = getattr(self.search_config, "metadata_mapping", None) if not mm: return False try: first = next(iter(mm.values())) except StopIteration: return False # Consider it built if it's a tuple, or a list with second element as tuple if isinstance(first, tuple): return True if isinstance(first, list) and len(first) > 1 and isinstance(first[1], tuple): return True return False
[docs] def update_from_config(self, config: Union[ProviderConfig, dict[str, Any]]) -> None: """Update the provider's configuration from a given config. :param config: The new configuration to update from. :raises ValidationError: If the config attempts to change the provider name. """ # Prevent name changes to maintain provider identity source = config if isinstance(config, dict) else config.__dict__ if (new_name := source.get("name")) and new_name != self._name: raise ValidationError( f"Cannot change provider name from '{self._name}' to '{new_name}'. " "Provider names are immutable after creation." ) # check if metadata mapping is already built for that provider # this happens when the provider search plugin has already been used search_key = "search" if "search" in self.config else "api" new_conf_search = source.get(search_key, {}) or {} if "metadata_mapping" in new_conf_search and self._mm_already_built(): mtd_cfg_as_conversion_and_querypath( deepcopy(new_conf_search["metadata_mapping"]), new_conf_search["metadata_mapping"], ) self.config.update(config)
[docs] class ProvidersDict(UserDict[str, Provider]): """ A dictionary-like collection of :class:`~eodag.api.provider.Provider` objects, keyed by provider name. :param providers: Initial providers to populate the dictionary. """ def __contains__(self, item: object) -> bool: """ Check if a provider is in the dictionary by name or :class:`~eodag.api.provider.Provider` instance. :param item: Provider name or Provider instance to check. :return: True if the provider is in the dictionary, False otherwise. """ if isinstance(item, Provider): return item.name in self.data return item in self.data def __setitem__(self, key: str, value: Provider) -> None: """ Add a :class:`~eodag.api.provider.Provider` to the dictionary. :param key: The name of the provider. :param value: The Provider instance to add. :raises ValueError: If the provider key already exists. """ if key in self.data: msg = f"Provider '{key}' already exists." raise ValueError(msg) super().__setitem__(key, value) def __delitem__(self, key: str) -> None: """ Delete a provider by name. :param key: The name of the provider to delete. :raises UnsupportedProvider: If the provider key is not found. """ if key not in self.data: msg = f"Provider '{key}' not found." raise UnsupportedProvider(msg) super().__delitem__(key) def __repr__(self) -> str: """ String representation of :class:`~eodag.api.provider.ProvidersDict`. :return: String listing provider names. """ return f"ProvidersDict({list(self.data.keys())})" def _repr_html_(self, embeded=False) -> str: """ HTML representation for Jupyter/IPython display. :return: HTML string representation of the :class:`~eodag.api.provider.ProvidersDict`. """ longest_name = max([len(k) for k in self.keys()]) thead = ( f"""<thead><tr><td style='text-align: left; color: grey;'> {type(self).__name__}&ensp;({len(self)}) </td></tr></thead> """ if not embeded else "" ) tr_style = "style='background-color: transparent;'" if embeded else "" return ( f"<table>{thead}" + "".join( [ f"""<tr {tr_style}><td style='text-align: left;'> <details><summary style='color: grey;'> <span style='color: black; font-family: monospace;'>{k}:{'&nbsp;' * (longest_name - len(k))}</span> Provider( {"'priority': '<span style='color: black'>" + str(v.priority) + "</span>',&ensp;" if v.priority is not None else ""} {"'title': '<span style='color: black'>" + shorten(v.title, width=70, placeholder="[...]") + "</span>',&ensp;" if v.title else ""} {"'url': '" + str_as_href(v.url) + "'" if v.url else ""} ) </summary> {v._repr_html_(embedded=True)} </details> </td></tr> """ for k, v in self.items() ] ) + "</table>" ) @property def names(self) -> list[str]: """ List of provider names. :return: List of provider names. """ return [provider.name for provider in self.data.values()] @property def groups(self) -> list[str]: """ List of provider groups if exist or names. :return: List of provider groups if exist or names. """ return list( set(provider.group or provider.name for provider in self.data.values()) ) @property def configs(self) -> dict[str, ProviderConfig]: """ Dictionary of provider configs keyed by provider name. :return: Dictionary mapping provider name to :class:`~eodag.api.provider.ProviderConfig`. """ return {provider.name: provider.config for provider in self.data.values()} @property def priorities(self) -> dict[str, int]: """ Dictionary of provider priorities keyed by provider name. :return: Dictionary mapping provider name to priority integer. """ return { provider.name: provider.config.priority for provider in self.data.values() }
[docs] def get_config(self, provider: str) -> Optional[ProviderConfig]: """ Get a :class:`~eodag.api.provider.ProviderConfig` from provider name. :param provider: The provider name. :return: The :class:`~eodag.api.provider.ProviderConfig` if found, otherwise None. """ prov = self.get(provider) return prov.config if prov else None
[docs] def filter(self, q: Optional[str] = None) -> ProvidersDict: """ Return providers whose name, group, description, URL or collection matches the free-text query. Supports logical operators with parenthesis (``AND``/``OR``/``NOT``), quoted phrases (``"exact phrase"``), ``*`` and ``?`` wildcards. If no query is provided, returns all providers. :param q: Free-text parameter to filter providers. If None, returns all providers. :return: matching Provider objects in a :class:`~eodag.api.provider.ProvidersDict`. Example ------- >>> from eodag.api.provider import ProvidersDict, Provider >>> providers = ProvidersDict() >>> providers['test1'] = Provider({ ... 'name': 'test1', ... 'description': 'Satellite data', ... 'search': {'type': 'StacSearch'} ... }) >>> providers['test2'] = Provider({ ... 'name': 'test2', ... 'description': 'Weather data', ... 'search': {'type': 'StacSearch'} ... }) >>> # Filter by description content >>> providers.filter('Satellite') ProvidersDict(['test1']) >>> # Filter with logical operators >>> providers['test3'] = Provider({ ... 'name': 'test3', ... 'description': 'Satellite weather data', ... 'search': {'type': 'StacSearch'} ... }) >>> providers.filter('Satellite AND weather') ProvidersDict(['test3']) >>> # Get all providers when no filter >>> len(providers.filter()) 3 """ if not q: # yield from self.data.values() return self free_text_query = compile_free_text_query(q) searchable_attributes = {"name", "group", "description", "products"} filtered = ProvidersDict() for p in self.data.values(): searchables = { k: v for k, v in p.config.__dict__.items() if k in searchable_attributes } if free_text_query(searchables): # yield p filtered[p.name] = p return filtered
[docs] def filter_by_name_or_group( self, name_or_group: Optional[str] = None ) -> Iterator[Provider]: """ Yield providers whose name or group matches the given ``name_or_group``. If ``name_or_group`` is ``None``, yields all providers. :param name_or_group: The provider name or group to filter by. If None, yields all providers. :return: Iterator of matching :class:`~eodag.api.provider.Provider` objects. Example ------- >>> from eodag.api.provider import ProvidersDict, Provider >>> providers = ProvidersDict() >>> providers['sentinel'] = Provider({'name': 'sentinel', 'group': 'esa', 'search': {'type': 'StacSearch'}}) >>> providers['landsat'] = Provider({'name': 'landsat', 'group': 'usgs', 'search': {'type': 'StacSearch'}}) >>> providers['modis'] = Provider({'name': 'modis', 'group': 'nasa', 'search': {'type': 'StacSearch'}}) >>> >>> # Filter by exact provider name >>> list(p.name for p in providers.filter_by_name_or_group('sentinel')) ['sentinel'] >>> >>> # Filter by group (case-insensitive) >>> list(p.name for p in providers.filter_by_name_or_group('ESA')) ['sentinel'] >>> >>> # Get all providers when no filter >>> len(list(providers.filter_by_name_or_group())) 3 """ if name_or_group is None: yield from self.data.values() return name_or_group_lower = name_or_group.lower() for provider in self.data.values(): if provider.name.lower() == name_or_group_lower or ( provider.group and provider.group.lower() == name_or_group_lower ): yield provider
[docs] def delete_collection(self, provider: str, collection: str) -> None: """ Delete a collection from a provider. :param provider: The provider's name. :param product_ID: The collection to delete. :raises UnsupportedProvider: If the provider or product is not found. """ if provider_obj := self.get(provider): if collection in provider_obj.collections_config: provider_obj.delete_collection(collection) else: msg = f"Collection '{collection}' not found for provider '{provider}'." raise UnsupportedCollection(msg) else: msg = f"Provider '{provider}' not found." raise UnsupportedProvider(msg)
def _share_credentials(self) -> None: """ Share credentials between plugins with matching criteria across all providers in this dictionary. """ auth_confs_with_creds: list[PluginConfig] = [] for provider in self.values(): auth_confs_with_creds.extend(provider._get_auth_confs_with_credentials()) if not auth_confs_with_creds: return for provider in self.values(): provider._copy_matching_credentials(auth_confs_with_creds) @staticmethod def _get_whitelisted_configs( configs: Mapping[str, Union[ProviderConfig, dict[str, Any]]], ) -> Mapping[str, Union[ProviderConfig, dict[str, Any]]]: """ Filter configs according to the EODAG_PROVIDERS_WHITELIST environment variable, if set. :param configs: The dictionary of provider configurations. :return: Filtered configurations. """ whitelist = set(os.getenv("EODAG_PROVIDERS_WHITELIST", "").split(",")) if not whitelist or whitelist == {""}: return configs return {name: conf for name, conf in configs.items() if name in whitelist}
[docs] def update_from_configs( self, configs: Mapping[str, Union[ProviderConfig, dict[str, Any]]], ) -> None: """ Update providers from a dictionary of configurations. :param configs: A dictionary mapping provider names to configurations. """ configs = self._get_whitelisted_configs(configs) for name, conf in configs.items(): if isinstance(conf, dict) and conf.get("name") != name: if "name" in conf: logger.debug( "%s: config name '%s' overridden by dict key", name, conf["name"], ) conf = {**conf, "name": name} elif isinstance(conf, ProviderConfig) and conf.name != name: raise ValidationError( f"ProviderConfig name '{conf.name}' must match dict key '{name}'" ) try: if name in self.data: self.data[name].update_from_config(conf) else: self.data[name] = Provider(conf) self.data[name].collections_fetched = False except Exception: if name in self.data: logger.warning( "%s: skipped updating provider due to invalid config", name ) else: logger.warning( "%s: could not create provider from scratch using config", name ) logger.debug("Traceback:\n%s", traceback.format_exc()) self._share_credentials()
[docs] def update_from_config_file(self, file_path: str) -> None: """ Override provider configurations with values loaded from a YAML file. :param file_path: The path to the configuration file. :raises yaml.parser.ParserError: If the YAML file cannot be parsed. """ logger.info("Loading user configuration from: %s", os.path.abspath(file_path)) with open(os.path.abspath(os.path.realpath(file_path)), "r") as fh: try: config_in_file = yaml.safe_load(fh) if config_in_file is None: return except yaml.parser.ParserError as e: logger.error("Unable to load configuration file %s", file_path) raise e self.update_from_configs(config_in_file)
[docs] def update_from_env(self) -> None: """ Override provider configurations with environment variables values. Environment variables must start with ``EODAG__`` and follow a nested key pattern separated by double underscores ``__``. """ def build_mapping_from_env( env_var: str, env_value: str, mapping: dict[str, Any] ) -> None: """ Recursively build a dictionary from an environment variable. The environment variable must respect the pattern: ``KEY1__KEY2__[...]__KEYN``. It will be transformed into a nested dictionary. :param env_var: The environment variable key (nested keys separated by ``__``). :param env_value: The value from environment variable. :param mapping: The dictionary where the nested mapping is built. """ parts = env_var.split("__") iter_parts = iter(parts) env_type = get_type_hints(PluginConfig).get(next(iter_parts, ""), str) child_env_type = ( get_type_hints(env_type).get(next(iter_parts, "")) if isclass(env_type) else None ) if len(parts) == 2 and child_env_type: try: env_value = cast_scalar_value(env_value, child_env_type) except TypeError: logger.warning( f"Could not convert {parts} value {env_value} to {child_env_type}" ) mapping.setdefault(parts[0], {}) mapping[parts[0]][parts[1]] = env_value elif len(parts) == 1: try: env_value = cast_scalar_value(env_value, env_type) except TypeError: logger.warning( f"Could not convert {parts[0]} value {env_value} to {env_type}" ) mapping[parts[0]] = env_value else: new_map = mapping.setdefault(parts[0], {}) build_mapping_from_env("__".join(parts[1:]), env_value, new_map) logger.debug("Loading configuration from environment variables") mapping_from_env: dict[str, dict[str, Any]] = {} for env_var in os.environ: if env_var.startswith("EODAG__"): build_mapping_from_env( env_var[len("EODAG__") :].lower(), # noqa os.environ[env_var], mapping_from_env, ) self.update_from_configs(mapping_from_env)
[docs] @classmethod def from_configs( cls, configs: Mapping[str, Union[ProviderConfig, dict[str, Any]]] ) -> Self: """ Build a ProvidersDict from a configuration mapping. :param configs: A dictionary mapping provider names to configuration dicts or :class:`~eodag.api.provider.ProviderConfig` instances. :return: An instance of :class:`~eodag.api.provider.ProvidersDict` populated with the given configurations. """ providers = cls() providers.update_from_configs(configs) return providers