Source code for eodag.api.core

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import datetime as dt
import itertools
import logging
import os
import re
import shutil
import tempfile
import warnings
from collections import deque
from copy import deepcopy
from importlib.metadata import version
from importlib.resources import files as res_files
from operator import attrgetter, itemgetter
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterator, Optional, Union, cast

import geojson
import yaml
from pydantic import AliasChoices

from eodag.api.collection import Collection, CollectionsDict, CollectionsList
from eodag.api.product import EOProduct
from eodag.api.product.metadata_mapping import mtd_cfg_as_conversion_and_querypath
from eodag.api.provider import Provider, ProvidersDict
from eodag.api.search_result import SearchResult
from eodag.config import (
    PLUGINS_TOPICS_KEYS,
    PluginConfig,
    SimpleYamlProxyConfig,
    credentials_in_auth,
    get_ext_collections_conf,
    load_default_config,
    load_yml_config,
)
from eodag.plugins.manager import PluginManager
from eodag.plugins.search import PreparedSearch
from eodag.plugins.search.build_search_result import MeteoblueSearch
from eodag.plugins.search.qssearch import PostJsonSearch
from eodag.types import model_fields_to_annotated
from eodag.types.queryables import CommonQueryables, Queryables, QueryablesDict
from eodag.utils import (
    DEFAULT_DOWNLOAD_TIMEOUT,
    DEFAULT_DOWNLOAD_WAIT,
    DEFAULT_LIMIT,
    DEFAULT_MAX_LIMIT,
    DEFAULT_PAGE,
    GENERIC_COLLECTION,
    GENERIC_STAC_PROVIDER,
    _deprecated,
    get_geometry_from_various,
    makedirs,
    sort_dict,
    string_to_jsonpath,
    uri_to_path,
)
from eodag.utils.dates import get_datetime, rfc3339_str_to_datetime
from eodag.utils.env import is_env_var_true
from eodag.utils.exceptions import (
    AuthenticationError,
    NoMatchingCollection,
    PluginImplementationError,
    RequestError,
    UnsupportedProvider,
    ValidationError,
)
from eodag.utils.free_text_search import compile_free_text_query
from eodag.utils.stac_reader import fetch_stac_items

if TYPE_CHECKING:
    from concurrent.futures import ThreadPoolExecutor
    from shapely.geometry.base import BaseGeometry

    from eodag.plugins.apis.base import Api
    from eodag.plugins.crunch.base import Crunch
    from eodag.plugins.search.base import Search
    from eodag.types import ProviderSortables
    from eodag.types.download_args import DownloadConf
    from eodag.utils import DownloadedCallback, ProgressCallback, Unpack

logger = logging.getLogger("eodag.core")


[docs] class EODataAccessGateway: """An API for downloading a wide variety of geospatial products originating from different types of providers. :param user_conf_file_path: (optional) Path to the user configuration file :param locations_conf_path: (optional) Path to the locations configuration file """ def __init__( self, user_conf_file_path: Optional[str] = None, locations_conf_path: Optional[str] = None, ) -> None: collections_config_path = os.getenv("EODAG_COLLECTIONS_CFG_FILE") or str( res_files("eodag") / "resources" / "collections.yml" ) collections_config_dict = SimpleYamlProxyConfig(collections_config_path).source self.collections_config = self._collections_config_init(collections_config_dict) self._providers = ProvidersDict.from_configs(load_default_config()) env_var_cfg_dir = "EODAG_CFG_DIR" self.conf_dir = os.getenv( env_var_cfg_dir, default=os.path.join(os.path.expanduser("~"), ".config", "eodag"), ) try: makedirs(self.conf_dir) except OSError as e: logger.debug(e) tmp_conf_dir = os.path.join(tempfile.gettempdir(), ".config", "eodag") logger.warning( f"Cannot create configuration directory {self.conf_dir}. " + f"Falling back to temporary directory {tmp_conf_dir}." ) if os.getenv(env_var_cfg_dir) is None: logger.warning( "You can set the path of the configuration directory " + f"with the environment variable {env_var_cfg_dir}" ) self.conf_dir = tmp_conf_dir makedirs(self.conf_dir) self._plugins_manager = PluginManager(self._providers) self._providers = self._plugins_manager.providers # First level override: From a user configuration file if user_conf_file_path is None: env_var_name = "EODAG_CFG_FILE" standard_configuration_path = os.path.join(self.conf_dir, "eodag.yml") user_conf_file_path = os.getenv(env_var_name) if user_conf_file_path is None: user_conf_file_path = standard_configuration_path source = str( res_files("eodag") / "resources" / "user_conf_template.yml" ) if os.path.isfile(source) and not os.path.isfile( standard_configuration_path ): shutil.copy( source, standard_configuration_path, ) self._providers.update_from_config_file(user_conf_file_path) # Second level override: From environment variables self._providers.update_from_env() # init updated providers conf strict_mode = is_env_var_true("EODAG_STRICT_COLLECTIONS") for provider in self._providers.values(): provider.sync_collections(self, strict_mode) # re-build _plugins_manager using up-to-date providers_config self._plugins_manager.rebuild(self._providers) # store pruned providers configs self._pruned_providers_config: dict[str, Any] = {} # filter out providers needing auth that have no credentials set self._prune_providers_list() # Sort providers taking into account of possible new priority orders self._plugins_manager.sort_providers() self.set_locations_conf(locations_conf_path) def _collections_config_init( self, collections_config_dict: dict[str, Any] ) -> CollectionsDict: """Initialize collections configuration. :param collections_config_dict: The collections config as a dictionary """ # Turn the collections config from a dict into a CollectionsDict() object collections = [ Collection.create_with_dag(self, id=col, **col_f) for col, col_f in collections_config_dict.items() ] return CollectionsDict(collections) @property def providers(self) -> ProvidersDict: """Providers of eodag configuration sorted by priority in descending order and by name in ascending order.""" providers = deepcopy(self._providers) # Sort: priority descending, then name ascending providers.data = { k: v for k, v in sorted( providers.data.items(), key=lambda item: (-item[1].priority, item[0]) ) } return providers def get_version(self) -> str: """Get eodag package version""" return version("eodag")
[docs] def set_preferred_provider(self, provider: str) -> None: """Set max priority for the given provider. :param provider: The name of the provider that should be considered as the preferred provider to be used for this instance """ if provider not in self.providers.names: raise UnsupportedProvider( f"This provider is not recognised by eodag: {provider}" ) preferred_provider, max_priority = self.get_preferred_provider() if preferred_provider != provider: new_priority = max_priority + 1 self._plugins_manager.set_priority(provider, new_priority)
[docs] def get_preferred_provider(self) -> tuple[str, int]: """Get the provider currently set as the preferred one for searching products, along with its priority. :returns: The provider with the maximum priority and its priority """ return max(self._providers.priorities.items(), key=itemgetter(1))
[docs] def update_providers_config( self, yaml_conf: Optional[str] = None, dict_conf: Optional[dict[str, Any]] = None, ) -> None: """Update providers configuration with given input. Can be used to add a provider to existing configuration or update an existing one. :param yaml_conf: YAML formated provider configuration :param dict_conf: provider configuration as dictionary in place of ``yaml_conf`` """ if dict_conf is not None: conf_update = dict_conf elif yaml_conf is not None: conf_update = yaml.safe_load(yaml_conf) else: return None # restore the pruned configuration for name in list(self._pruned_providers_config): config = self._pruned_providers_config[name] if name in conf_update: logger.info( "%s: provider restored from the pruned configurations", name ) self._providers[name] = Provider(config) self._pruned_providers_config.pop(name) self._providers.update_from_configs(conf_update) # re-create _plugins_manager using up-to-date providers_config self._plugins_manager.build_collection_to_provider_config_map()
[docs] def add_provider( self, name: str, url: Optional[str] = None, priority: Optional[int] = None, search: dict[str, Any] = {"type": "StacSearch"}, products: dict[str, Any] = { GENERIC_COLLECTION: {"_collection": "{collection}"} }, download: dict[str, Any] = {"type": "HTTPDownload", "auth_error_code": 401}, **kwargs: dict[str, Any], ): """Adds a new provider. ``search``, ``products`` & ``download`` already have default values that will be updated (not replaced), with user provided ones: * ``search`` : ``{"type": "StacSearch"}`` * ``products`` : ``{"GENERIC_COLLECTION": {"_collection": "{collection}"}}`` * ``download`` : ``{"type": "HTTPDownload", "auth_error_code": 401}`` :param name: Name of provider :param url: Provider url, also used as ``search["api_endpoint"]`` if not defined :param priority: Provider priority. If None, provider will be set as preferred (highest priority) :param search: Search :class:`~eodag.config.PluginConfig` mapping :param products: Provider collections mapping :param download: Download :class:`~eodag.config.PluginConfig` mapping :param kwargs: Additional :class:`~eodag.api.provider.ProviderConfig` mapping """ conf_dict: dict[str, Any] = { name: { "name": name, "url": url, "search": {"type": "StacSearch", **search}, "products": { GENERIC_COLLECTION: {"_collection": "{collection}"}, **products, }, "download": { "type": "HTTPDownload", "auth_error_code": 401, **download, }, **kwargs, } } if priority is not None: conf_dict[name]["priority"] = priority # if provided, use url as default search api_endpoint if ( url and conf_dict[name].get("search", {}) and not conf_dict[name]["search"].get("api_endpoint") ): conf_dict[name]["search"]["api_endpoint"] = url # api plugin usage: remove unneeded search/download/auth plugin conf if conf_dict[name].get("api"): for k in PLUGINS_TOPICS_KEYS: if k != "api": conf_dict[name].pop(k, None) self.update_providers_config(dict_conf=conf_dict) if priority is None: self.set_preferred_provider(name)
def _prune_providers_list(self) -> None: """Removes from config providers needing auth that have no credentials set.""" update_needed = False # loop over a copy to allow popping items for name, provider in list(self._providers.items()): conf = provider.config # remove providers using skipped plugins if [ v for v in conf.__dict__.values() if isinstance(v, PluginConfig) and getattr(v, "type", None) in self._plugins_manager.skipped_plugins ]: del self._providers[provider.name] logger.debug( f"{provider}: provider needing unavailable plugin has been removed" ) continue # check authentication if hasattr(conf, "api") and getattr(conf.api, "need_auth", False): credentials_exist = credentials_in_auth(conf.api) if not credentials_exist: # credentials needed but not found self._pruned_providers_config[provider.name] = conf del self._providers[provider.name] update_needed = True logger.info( "%s: provider needing auth for search has been pruned because no credentials could be found", provider, ) elif hasattr(conf, "search") and getattr(conf.search, "need_auth", False): if not hasattr(conf, "auth") and not hasattr(conf, "search_auth"): # credentials needed but no auth plugin was found self._pruned_providers_config[provider.name] = conf del self._providers[provider.name] update_needed = True logger.info( "%s: provider needing auth for search has been pruned because no auth plugin could be found", provider, ) continue credentials_exist = ( hasattr(conf, "search_auth") and credentials_in_auth(conf.search_auth) ) or ( not hasattr(conf, "search_auth") and hasattr(conf, "auth") and credentials_in_auth(conf.auth) ) if not credentials_exist: # credentials needed but not found self._pruned_providers_config[provider.name] = conf del self._providers[provider.name] update_needed = True logger.info( "%s: provider needing auth for search has been pruned because no credentials could be found", provider, ) elif not hasattr(conf, "api") and not hasattr(conf, "search"): # provider should have at least an api or search plugin self._pruned_providers_config[provider.name] = conf del self._providers[provider.name] update_needed = True logger.info( "%s: provider has been pruned because no api or search plugin could be found", provider, ) if update_needed: # rebuild _plugins_manager with updated providers list self._plugins_manager.rebuild(self._providers) def set_locations_conf(self, locations_conf_path: Optional[str]) -> None: """Set locations configuration. This configuration (YML format) will contain a shapefile list associated to a name and attribute parameters needed to identify the needed geometry. You can also configure parent attributes, which can be used for creating a catalogs path when using eodag as a REST server. Example of locations configuration file content: .. code-block:: yaml shapefiles: - name: country path: /path/to/countries_list.shp attr: ISO3 - name: department path: /path/to/FR_departments.shp attr: code_insee parent: name: country attr: FRA :param locations_conf_path: Path to the locations configuration file """ if locations_conf_path is None: locations_conf_path = os.getenv("EODAG_LOCS_CFG_FILE") if locations_conf_path is None: locations_conf_path = os.path.join(self.conf_dir, "locations.yml") if not os.path.isfile(locations_conf_path): # Ensure the directory exists os.makedirs(os.path.dirname(locations_conf_path), exist_ok=True) # copy locations conf file and replace path example locations_conf_template = str( res_files("eodag") / "resources" / "locations_conf_template.yml" ) with ( open(locations_conf_template) as infile, open(locations_conf_path, "w") as outfile, ): # The template contains paths in the form of: # /path/to/locations/file.shp path_template = "/path/to/locations/" for line in infile: line = line.replace( path_template, os.path.join(self.conf_dir, "shp") + os.path.sep, ) outfile.write(line) # copy sample shapefile dir shutil.copytree( str(res_files("eodag") / "resources" / "shp"), os.path.join(self.conf_dir, "shp"), ) if os.path.isfile(locations_conf_path): locations_config = load_yml_config(locations_conf_path) main_key = next(iter(locations_config)) main_locations_config = locations_config[main_key] logger.info("Locations configuration loaded from %s" % locations_conf_path) self.locations_config: list[dict[str, Any]] = main_locations_config else: logger.info( "Could not load locations configuration from %s" % locations_conf_path ) self.locations_config = []
[docs] def list_collections( self, provider: Optional[str] = None, fetch_providers: bool = True ) -> CollectionsList: """Lists supported collections. :param provider: (optional) The name of a provider that must support the product types we are about to list :param fetch_providers: (optional) Whether to fetch providers for new product types or not :returns: The list of the collections that can be accessed using eodag. :raises: :class:`~eodag.utils.exceptions.UnsupportedProvider` """ if fetch_providers: # First, update collections list if possible self.fetch_collections_list(provider=provider) providers_iter, providers_check = itertools.tee( self._providers.filter_by_name_or_group(provider) ) if provider and not any(providers_check): raise UnsupportedProvider( f"The requested provider is not (yet) supported: {provider}" ) # unique collection ids from providers configs collection_ids = { collection_id for p in providers_iter for collection_id in p.collections_config if collection_id != GENERIC_COLLECTION } collections = CollectionsList( [self.collections_config[collection_id] for collection_id in collection_ids] ) # Return the collections sorted in lexicographic order of their id collections.sort(key=attrgetter("id")) return collections
[docs] def fetch_collections_list(self, provider: Optional[str] = None) -> None: """Fetch collections list and update if needed. If strict mode is enabled (by setting the ``EODAG_STRICT_COLLECTIONS`` environment variable to a truthy value), this method will not fetch or update collections and will return immediately. :param provider: The name of a provider or provider-group for which collections list should be updated. Defaults to all providers (None value). """ strict_mode = is_env_var_true("EODAG_STRICT_COLLECTIONS") if strict_mode: return # providers discovery confs that are fetchable providers_discovery_configs_fetchable: dict[ str, PluginConfig.DiscoverCollections ] = {} # check if any provider has not already been fetched for collections already_fetched = True for provider_to_fetch in self._providers.filter_by_name_or_group(provider): if provider_to_fetch.fetchable and provider_to_fetch.search_config: providers_discovery_configs_fetchable[ provider_to_fetch.name ] = provider_to_fetch.search_config.discover_collections if not provider_to_fetch.collections_fetched: already_fetched = False if not already_fetched: # get ext_collections conf ext_collections_cfg_file = os.getenv("EODAG_EXT_COLLECTIONS_CFG_FILE") if ext_collections_cfg_file is not None: ext_collections_conf = get_ext_collections_conf( ext_collections_cfg_file ) else: ext_collections_conf = get_ext_collections_conf() if not ext_collections_conf: # empty ext_collections conf ext_collections_conf = ( self.discover_collections(provider=provider) or {} ) # update eodag collections list with new conf self.update_collections_list(ext_collections_conf) # Compare current provider with default one to see if it has been modified # and collections list would need to be fetched # get ext_collections conf for user modified providers default_providers = ProvidersDict.from_configs(load_default_config()) for ( provider, user_discovery_conf, ) in providers_discovery_configs_fetchable.items(): # default discover_collections conf if provider in default_providers: default_provider = default_providers[provider] if not default_provider.search_config: continue default_discovery_conf = ( default_provider.search_config.discover_collections ) # compare confs (care, some providers do not have result_type property) if default_discovery_conf.get("result_type") == "json" and isinstance( default_discovery_conf["results_entry"], str ): default_discovery_conf_parsed = cast( PluginConfig.DiscoverCollections, dict( default_discovery_conf, **{ "results_entry": string_to_jsonpath( default_discovery_conf["results_entry"], force=True ) }, **mtd_cfg_as_conversion_and_querypath( dict( generic_collection_id=default_discovery_conf[ "generic_collection_id" ] ) ), **dict( generic_collection_parsable_properties=mtd_cfg_as_conversion_and_querypath( default_discovery_conf[ "generic_collection_parsable_properties" ] ) ), **dict( generic_collection_parsable_metadata=mtd_cfg_as_conversion_and_querypath( default_discovery_conf[ "generic_collection_parsable_metadata" ] ) ), ), ) else: default_discovery_conf_parsed = default_discovery_conf if ( user_discovery_conf == default_discovery_conf or user_discovery_conf == default_discovery_conf_parsed ) and ( not default_discovery_conf.get("fetch_url") or "ext_collections_conf" not in locals() or "ext_collections_conf" in locals() and ( provider in ext_collections_conf or len(ext_collections_conf.keys()) == 0 ) ): continue # providers not skipped here should be user-modified # or not in ext_collections_conf (if eodag system conf != eodag conf used for ext_collections_conf) if not already_fetched: # discover collections for user configured provider provider_ext_collections_conf = ( self.discover_collections(provider=provider) or {} ) # update eodag collections list with new conf self.update_collections_list(provider_ext_collections_conf)
[docs] def discover_collections( self, provider: Optional[str] = None ) -> Optional[dict[str, Any]]: """Fetch providers for collections :param provider: The name of a provider or provider-group to fetch. Defaults to all providers (None value). :returns: external collections configuration """ providers_iter, providers_check = itertools.tee( self.providers.filter_by_name_or_group(provider) ) if provider and not any(providers_check): raise UnsupportedProvider( f"The requested provider is not (yet) supported: {provider}" ) ext_collections_conf: dict[str, Any] = {} kwargs: dict[str, Any] = {} for p in providers_iter: if not p.search_config: return None if p.fetchable: search_plugin: Union[Search, Api] = next( self._plugins_manager.get_search_plugins(provider=p.name) ) # check after plugin init if still fetchable if not getattr(search_plugin.config, "discover_collections", {}).get( "fetch_url" ): continue # append auth to search plugin if needed if getattr(search_plugin.config, "need_auth", False): if auth := self._plugins_manager.get_auth( search_plugin.provider, getattr(search_plugin.config, "api_endpoint", None), search_plugin.config, ): kwargs["auth"] = auth else: logger.debug( f"Could not authenticate on {p} for collections discovery" ) ext_collections_conf[p.name] = None continue ext_collections_conf[p.name] = search_plugin.discover_collections( **kwargs ) return sort_dict(ext_collections_conf)
[docs] def update_collections_list( self, ext_collections_conf: dict[str, Optional[dict[str, dict[str, Any]]]] ) -> None: """Update eodag collections list :param ext_collections_conf: external collections configuration """ for provider, new_collections_conf in ext_collections_conf.items(): if new_collections_conf and provider in self._providers: try: fetchable = self._providers[provider].fetchable if not fetchable: # conf has been updated and provider collections are no more discoverable continue provider_products_config = ( self._providers[provider].collections_config or {} ) except UnsupportedProvider: logger.debug( "Ignoring external collections for unknown provider %s", provider, ) continue new_collections: list[str] = [] bad_formatted_col_count = 0 for ( new_collection, new_collection_conf, ) in new_collections_conf["providers_config"].items(): if new_collection not in provider_products_config: for existing_collection in provider_products_config.copy(): # compare parsed extracted conf (without metadata_mapping entry) unparsable_keys = self._providers[ provider ].unparsable_properties new_parsed_collections_conf = { k: v for k, v in new_collection_conf.items() if k not in unparsable_keys } if ( new_parsed_collections_conf.items() <= provider_products_config[existing_collection].items() ): # new_collections_conf is a subset on an existing conf break else: try: # new_collection_conf does not already exist, append it # to self.collections_config new_coll_obj = Collection.create_with_dag( self, id=new_collection, **new_collections_conf["collections_config"][ new_collection ], ) self.collections_config[new_coll_obj._id] = new_coll_obj except ValidationError: # skip collection if there is a problem with its id (missing or not a string) logger.debug( ( "Collection %s has been pruned on provider %s " "because its id was incorrectly parsed for eodag" ), new_collection, provider, ) else: # to provider_products_config provider_products_config[ new_collection ] = new_collection_conf ext_collections_conf[provider] = new_collections_conf new_collections.append(new_collection) # increase the increment if the new collection had # bad formatted attributes in the external config dumped_collection = self.collections_config[ new_coll_obj._id ].model_dump() dumped_ext_conf_col = { **dumped_collection, **new_collections_conf["collections_config"][ new_collection ], } if dumped_ext_conf_col != dumped_collection: bad_formatted_col_count += 1 if new_collections: logger.debug( "Added %s collections for %s", len(new_collections), provider ) if bad_formatted_col_count > 0: logger.debug( "bad formatted attributes skipped for %s collection(s) on %s", bad_formatted_col_count, provider, ) elif provider not in self._providers: # unknown provider continue self._providers[provider].collections_fetched = True # re-create _plugins_manager using up-to-date providers_config self._plugins_manager.build_collection_to_provider_config_map()
[docs] @_deprecated( reason="Please use 'EODataAccessGateway.providers' instead", version="4.0.0", ) def available_providers( self, collection: Optional[str] = None, by_group: bool = False ) -> list[str]: """Gives the sorted list of the available providers or groups .. deprecated:: v4.0.0 Please use :attr:`eodag.api.core.EODataAccessGateway.providers` instead. The providers or groups are sorted first by their priority level in descending order, and then alphabetically in ascending order for providers or groups with the same priority level. :param collection: (optional) Only list providers configured for this collection :param by_group: (optional) If set to True, list groups when available instead of providers, mixed with other providers :returns: the sorted list of the available providers or groups """ candidates = [] # use "providers" property to get sorted providers for key, provider in self.providers.items(): if collection and collection not in provider.collections_config: continue group = getattr(provider.config, "group", None) name = group if by_group and group else key candidates.append((name, provider.priority)) if by_group: # Keep only the highest-priority entry per group grouped: dict[str, int] = {} for name, priority in candidates: if name not in grouped or priority > grouped[name]: grouped[name] = priority candidates = list(grouped.items()) return [name for name, _ in candidates]
def get_collection_from_alias(self, alias_or_id: str) -> str: """Return the id of a collection by either its id or alias :param alias_or_id: Alias of the collection. If an existing id is given, this method will directly return the given value. :returns: Internal name of the collection. """ collections = [ v for k, v in self.collections_config.items() if v.id == alias_or_id ] if len(collections) > 1: raise NoMatchingCollection( f"Too many matching collections for alias {alias_or_id}: {collections}" ) if len(collections) == 0: if alias_or_id in self.collections_config: return alias_or_id else: raise NoMatchingCollection( f"Could not find collection from alias or id {alias_or_id}" ) return collections[0]._id or collections[0].id def get_alias_from_collection(self, collection: str) -> str: """Return the alias of a collection by its id. If no alias was defined for the given collection, its id is returned instead. :param collection: collection id :returns: Alias of the collection or its id if no alias has been defined for it. """ if collection not in self.collections_config: raise NoMatchingCollection(collection) if alias := self.collections_config[collection].alias: return alias return collection
[docs] def guess_collection( self, free_text: Optional[str] = None, intersect: bool = False, instruments: Optional[str] = None, platform: Optional[str] = None, constellation: Optional[str] = None, processing_level: Optional[str] = None, sensor_type: Optional[str] = None, keywords: Optional[str] = None, description: Optional[str] = None, title: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, **kwargs: Any, ) -> CollectionsList: """ Find EODAG collection IDs that best match a set of search parameters. When using several filters, collections that match most of them will be returned at first. :param free_text: Free text search filter used to search accross all the following parameters. Handles logical operators with parenthesis (``AND``/``OR``/``NOT``), quoted phrases (``"exact phrase"``), ``*`` and ``?`` wildcards. :param intersect: Join results for each parameter using INTERSECT instead of UNION. :param instruments: Instruments parameter. :param platform: Platform parameter. :param constellation: Constellation parameter. :param processing_level: Processing level parameter. :param sensor_type: Sensor type parameter. :param keywords: Keywords parameter. :param description: description parameter. :param title: Title parameter. :param start_date: start date for datetime filtering. Not used by free_text :param end_date: end date for datetime filtering. Not used by free_text :returns: The best match for the given parameters. :raises: :class:`~eodag.utils.exceptions.NoMatchingCollection` """ if collection := kwargs.get("collection"): if collection in self.collections_config: return CollectionsList([self.collections_config[collection]]) else: try: collection = self.get_collection_from_alias(collection) return CollectionsList([self.collections_config[collection]]) except NoMatchingCollection: return CollectionsList( [Collection.create_with_dag(self, id=collection)] ) filters: dict[str, str] = { k: v for k, v in { "instruments": instruments, "constellation": constellation, "platform": platform, "processing:level": processing_level, "eodag:sensor_type": sensor_type, "keywords": keywords, "description": description, "title": title, }.items() if v is not None } only_dates = ( True if (not free_text and not filters and (start_date or end_date)) else False ) free_text_evaluator = ( compile_free_text_query(free_text) if free_text else lambda _: True ) guesses_with_score: list[tuple[str, int]] = [] for col, col_f in self.collections_config.items(): if ( col == GENERIC_COLLECTION or col not in self._plugins_manager.collection_to_provider_config_map ): continue score = 0 # how many filters matched # free text search if free_text: match = free_text_evaluator(col_f.model_dump()) if match: score += 1 elif intersect: continue # must match all filters # individual filters if filters: filters_matching_method = all if intersect else any filters_evaluators = { filter_name: compile_free_text_query(value) for filter_name, value in filters.items() if value is not None } filter_matches = [ filters_evaluators[filter_name]( { filter_name: col_f.__dict__[ Collection.get_collection_mtd_from_alias(filter_name) ] } ) for filter_name, value in filters.items() if Collection.get_collection_mtd_from_alias(filter_name) in col_f.__dict__ ] if filters_matching_method(filter_matches): # add number of True matches to score score += sum(filter_matches) elif intersect: continue # must match all filters if score == 0 and not only_dates: continue # datetime filtering if start_date or end_date: min_aware = dt.datetime.min.replace(tzinfo=dt.timezone.utc) max_aware = dt.datetime.max.replace(tzinfo=dt.timezone.utc) col_start_str = col_f.extent.temporal.interval[0][0] if col_start_str and isinstance(col_start_str, str): col_start = rfc3339_str_to_datetime(col_start_str) else: col_start = col_start_str or min_aware col_end_str = col_f.extent.temporal.interval[0][1] if col_end_str and isinstance(col_end_str, str): col_end = rfc3339_str_to_datetime(col_end_str) else: col_end = col_end_str or max_aware max_start = max( rfc3339_str_to_datetime(start_date) if start_date else min_aware, col_start, ) min_end = min( rfc3339_str_to_datetime(end_date) if end_date else max_aware, col_end, ) if not (max_start <= min_end): continue guesses_with_score.append((col_f._id, score)) if guesses_with_score: # sort by score descending, then col for stability guesses_with_score.sort(key=lambda x: (-x[1], x[0])) return CollectionsList( [self.collections_config[col] for col, _ in guesses_with_score] ) raise NoMatchingCollection()
[docs] def search( self, page: int = DEFAULT_PAGE, limit: Optional[int] = DEFAULT_LIMIT, items_per_page: Optional[int] = DEFAULT_LIMIT, raise_errors: bool = False, start: Optional[str] = None, end: Optional[str] = None, geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None, locations: Optional[dict[str, str]] = None, provider: Optional[str] = None, count: bool = False, validate: Optional[bool] = True, **kwargs: Any, ) -> SearchResult: """Look for products matching criteria on known providers. The default behaviour is to look for products on the provider with the highest priority supporting the requested collection. These priorities are configurable through user configuration file or individual environment variable. If the request to the provider with the highest priority fails or is empty, the data will be request from the provider with the next highest priority. Only if the request fails for all available providers, an error will be thrown. :param page: (optional) The page number to return (**deprecated**, use :meth:`eodag.api.search_result.SearchResult.next_page` instead) :param limit: (optional) The number of results that must appear in one single page. If ``None``, the maximum number possible will be used. :param items_per_page: (optional) The number of results that must appear in one single page. If ``None``, the maximum number possible will be used. (**deprecated**, use ``limit`` instead) :param raise_errors: (optional) When an error occurs when searching, if this is set to True, the error is raised :param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param geom: (optional) Search area that can be defined in different ways: * with a Shapely geometry object: :class:`shapely.geometry.base.BaseGeometry` * with a bounding box (dict with keys: "lonmin", "latmin", "lonmax", "latmax"): ``dict.fromkeys(["lonmin", "latmin", "lonmax", "latmax"])`` * with a bounding box as list of float: ``[lonmin, latmin, lonmax, latmax]`` * with a WKT str :param locations: (optional) Location filtering by name using locations configuration ``{"<location_name>"="<attr_regex>"}``. For example, ``{"country"="PA."}`` will use the geometry of the features having the property ISO3 starting with 'PA' such as Panama and Pakistan in the shapefile configured with name=country and attr=ISO3 :param provider: (optional) the provider to be used. If set, search fallback will be disabled. If not set, the configured preferred provider will be used at first before trying others until finding results. :param count: (optional) Whether to run a query with a count request or not :param validate: (optional) Set to True to validate search parameters before sending the query to the provider :param kwargs: Some other criteria that will be used to do the search, using paramaters compatibles with the provider :returns: A set of EO products matching the criteria .. versionchanged:: v3.0.0b1 ``search()`` method now returns only a single :class:`~eodag.api.search_result.SearchResult` instead of a 2 values tuple. .. note:: The search interfaces, which are implemented as plugins, are required to return a list as a result of their processing. This requirement is enforced here. """ if page != DEFAULT_PAGE: warnings.warn( "Usage of deprecated search parameter 'page' " "(Please use 'SearchResult.next_page()' instead)" " -- Deprecated since v4.0.0", DeprecationWarning, stacklevel=2, ) search_plugins, search_kwargs = self._prepare_search( start=start, end=end, geom=geom, locations=locations, provider=provider, **kwargs, ) if search_kwargs.get("id"): # Don't validate requests by ID. "id" is not queryable. return self._search_by_id( search_kwargs.pop("id"), provider=provider, raise_errors=raise_errors, validate=False, **search_kwargs, ) # remove datacube query string from kwargs which was only needed for search-by-id search_kwargs.pop("_dc_qs", None) # add page parameter search_kwargs["page"] = page errors: list[tuple[str, Exception]] = [] # Loop over available providers and return the first non-empty results for i, search_plugin in enumerate(search_plugins): search_plugin.clear() # add appropriate limit value, use deprecated items_per_page if no limit given if (not limit or limit == DEFAULT_LIMIT) and ( items_per_page and items_per_page != DEFAULT_LIMIT ): limit = items_per_page warnings.warn( "Usage of deprecated search parameter 'items_per_page' " "(Please use 'limit' instead)" " -- Deprecated since v4.0.0", DeprecationWarning, stacklevel=2, ) search_kwargs["limit"] = ( limit if limit is not None else getattr(search_plugin.config, "pagination", {}).get( "max_limit", DEFAULT_MAX_LIMIT ) ) search_results = self._do_search( search_plugin, count=count, raise_errors=raise_errors, validate=validate, **search_kwargs, ) errors.extend(search_results.errors) if len(search_results) == 0 and i < len(search_plugins) - 1: logger.warning( f"No result could be obtained from provider {search_plugin.provider}, " "we will try to get the data from another provider", ) elif len(search_results) > 0: search_results.errors = errors if count and search_results.number_matched: logger.info( "Found %s result(s) on provider '%s'", search_results.number_matched, search_results[0].provider, ) return search_results if i > 1: logger.error("No result could be obtained from any available provider") return SearchResult([], 0, errors) if count else SearchResult([], errors=errors)
[docs] @_deprecated( reason="Please use 'SearchResult.next_page()' instead", version="4.0.0", ) def search_iter_page( self, limit: int = DEFAULT_LIMIT, items_per_page: Optional[int] = DEFAULT_LIMIT, start: Optional[str] = None, end: Optional[str] = None, geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None, locations: Optional[dict[str, str]] = None, **kwargs: Any, ) -> Iterator[SearchResult]: """Iterate over the pages of a products search. .. deprecated:: v4.0.0 Please use :meth:`eodag.api.search_result.SearchResult.next_page` instead. :param limit: (optional) The number of results requested per page :param items_per_page: (optional) The number of results requested per page (**deprecated**, use ``limit`` instead) :param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param geom: (optional) Search area that can be defined in different ways: * with a Shapely geometry object: :class:`shapely.geometry.base.BaseGeometry` * with a bounding box (dict with keys: "lonmin", "latmin", "lonmax", "latmax"): ``dict.fromkeys(["lonmin", "latmin", "lonmax", "latmax"])`` * with a bounding box as list of float: ``[lonmin, latmin, lonmax, latmax]`` * with a WKT str :param locations: (optional) Location filtering by name using locations configuration ``{"<location_name>"="<attr_regex>"}``. For example, ``{"country"="PA."}`` will use the geometry of the features having the property ISO3 starting with 'PA' such as Panama and Pakistan in the shapefile configured with name=country and attr=ISO3 :param kwargs: Some other criteria that will be used to do the search, using paramaters compatibles with the provider :returns: An iterator that yields page per page a set of EO products matching the criteria """ search_plugins, search_kwargs = self._prepare_search( start=start, end=end, geom=geom, locations=locations, **kwargs ) # use deprecated items_per_page if limit is not given if (not limit or limit == DEFAULT_LIMIT) and ( items_per_page and items_per_page != DEFAULT_LIMIT ): limit = items_per_page warnings.warn( "Usage of deprecated search parameter 'items_per_page' " "(Please use 'limit' instead)" " -- Deprecated since v4.0.0", DeprecationWarning, stacklevel=2, ) for i, search_plugin in enumerate(search_plugins): try: return self.search_iter_page_plugin( limit=limit, search_plugin=search_plugin, **search_kwargs, ) except RequestError: if i < len(search_plugins) - 1: logger.warning( "No result could be obtained from provider %s, " "we will try to get the data from another provider", search_plugin.provider, ) else: logger.error( "No result could be obtained from any available provider" ) raise raise RequestError("No result could be obtained from any available provider")
@_deprecated( reason="Please use 'SearchResult.next_page()' instead", version="4.0.0", ) def search_iter_page_plugin( self, search_plugin: Union[Search, Api], limit: int = DEFAULT_LIMIT, items_per_page: Optional[int] = DEFAULT_LIMIT, **kwargs: Any, ) -> Iterator[SearchResult]: """Iterate over the pages of a products search using a given search plugin. .. deprecated:: v4.0.0 Please use :meth:`eodag.api.search_result.SearchResult.next_page` instead. :param limit: (optional) The number of results requested per page :param items_per_page: (optional) The number of results requested per page (**deprecated**, use ``limit`` instead) :param kwargs: Some other criteria that will be used to do the search, using parameters compatibles with the provider :param search_plugin: search plugin to be used :returns: An iterator that yields page per page a set of EO products matching the criteria """ # use deprecated items_per_page if limit is not given if (not limit or limit == DEFAULT_LIMIT) and ( items_per_page and items_per_page != DEFAULT_LIMIT ): limit = items_per_page warnings.warn( "Usage of deprecated search parameter 'items_per_page' " "(Please use 'limit' instead)" " -- Deprecated since v4.0.0", DeprecationWarning, stacklevel=2, ) kwargs.update( page=1, limit=limit, ) try: # remove unwanted kwargs for _do_search kwargs.pop("raise_errors", None) search_result = self._do_search(search_plugin, raise_errors=True, **kwargs) search_result.raise_errors = True except Exception: logger.warning( "error at retrieval of data from %s, for params: %s", search_plugin.provider, str(kwargs), ) raise if len(search_result) == 0: return # remove unwanted kwargs for next_page if kwargs.get("count") is True: kwargs["count"] = False kwargs.pop("page", None) search_result.search_params = kwargs if search_result._dag is None: search_result._dag = self yield search_result for next_result in search_result.next_page(): if len(next_result) == 0: break yield next_result
[docs] def search_all( self, limit: Optional[int] = None, items_per_page: Optional[int] = None, start: Optional[str] = None, end: Optional[str] = None, geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None, locations: Optional[dict[str, str]] = None, **kwargs: Any, ) -> SearchResult: """Search and return all the products matching the search criteria. It iterates over the pages of a search query and collects all the returned products into a single :class:`~eodag.api.search_result.SearchResult` instance. Requests are attempted to all providers of the product ordered by descending piority. :param limit: (optional) The number of results requested internally per page. The maximum number of items than can be requested at once to a provider has been configured in EODAG for some of them. If limit is None and this number is available for the searched provider, it is used to limit the number of requests made. This should also reduce the time required to collect all the products matching the search criteria. If this number is not available, a default value of 50 is used instead. limit can also be set to any arbitrary value. :param items_per_page: (optional) The number of results requested internally per page (**deprecated**, use ``limit`` instead) :param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param geom: (optional) Search area that can be defined in different ways: * with a Shapely geometry object: :class:`shapely.geometry.base.BaseGeometry` * with a bounding box (dict with keys: "lonmin", "latmin", "lonmax", "latmax"): ``dict.fromkeys(["lonmin", "latmin", "lonmax", "latmax"])`` * with a bounding box as list of float: ``[lonmin, latmin, lonmax, latmax]`` * with a WKT str :param locations: (optional) Location filtering by name using locations configuration ``{"<location_name>"="<attr_regex>"}``. For example, ``{"country"="PA."}`` will use the geometry of the features having the property ISO3 starting with 'PA' such as Panama and Pakistan in the shapefile configured with name=country and attr=ISO3 :param kwargs: Some other criteria that will be used to do the search, using parameters compatible with the provider :returns: An iterator that yields page per page a set of EO products matching the criteria """ # remove unwanted count kwargs.pop("count", None) # use deprecated items_per_page if limit is not given if not limit and items_per_page: limit = items_per_page warnings.warn( "Usage of deprecated search parameter 'items_per_page' " "(Please use 'limit' instead)" " -- Deprecated since v4.0.0", DeprecationWarning, stacklevel=2, ) # First search search_results = self.search( limit=limit, start=start, end=end, geom=geom, locations=locations, **kwargs, ) if len(search_results) == 0: return search_results try: search_results.raise_errors = True # consume iterator deque(search_results.next_page(update=True)) logger.info( "Found %s result(s) on provider '%s'", len(search_results), search_results[0].provider, ) search_results.number_matched = len(search_results) except RequestError: logger.warning( "Found %s result(s) on provider '%s', but it may be incomplete " "as it ended with an error", len(search_results), search_results[0].provider, ) return search_results
def _search_by_id( self, uid: str, provider: Optional[str] = None, **kwargs: Any ) -> SearchResult: """Internal method that enables searching a product by its id. Keeps requesting providers until a result matching the id is supplied. The search plugins should be developed in the way that enable them to handle the support of a search by id by the providers. The providers are requested one by one, in the order defined by their priorities. Be aware that because of that, the search can be slow, if the priority order is such that the provider that contains the requested product has the lowest priority. However, you can always speed up a little the search by passing the name of the provider on which to perform the search, if this information is available :param uid: The uid of the EO product :param provider: (optional) The provider on which to search the product. This may be useful for performance reasons when the user knows this product is available on the given provider :param kwargs: Search criteria to help finding the right product :returns: A search result with one EO product or None at all """ collection = kwargs.get("collection") if collection is not None: try: collection = self.get_collection_from_alias(collection) except NoMatchingCollection: logger.debug("collection %s not found", collection) get_search_plugins_kwargs = dict(provider=provider, collection=collection) search_plugins = self._plugins_manager.get_search_plugins( **get_search_plugins_kwargs ) # datacube query string _dc_qs = kwargs.pop("_dc_qs", None) results = SearchResult([]) for plugin in search_plugins: logger.info( "Searching product with id '%s' on provider: %s", uid, plugin.provider ) logger.debug("Using plugin class for search: %s", plugin.__class__.__name__) plugin.clear() # adds maximal pagination to be able to do a search-all + crunch if more # than one result are returned limit = plugin.config.pagination.get("max_limit", DEFAULT_MAX_LIMIT) kwargs.update(limit=limit) if isinstance(plugin, PostJsonSearch): kwargs.update( limit=limit, _dc_qs=_dc_qs, ) else: kwargs.update( limit=limit, ) try: # if more than one results are found, try getting them all and then filter using crunch for page_results in self.search_iter_page_plugin( search_plugin=plugin, id=uid, **kwargs, ): results.data.extend(page_results.data) except Exception as e: if kwargs.get("raise_errors"): raise logger.warning(e) results.errors.append((plugin.provider, e)) continue # try using crunch to get unique result if ( len(results) > 1 and len(filtered := results.filter_property(id=uid)) == 1 ): results = filtered if len(results) == 1: if not results[0].collection: # guess collection from properties guesses = self.guess_collection(**results[0].properties) results[0].collection = guesses[0].id # reset driver results[0].driver = results[0].get_driver() results.number_matched = 1 return results elif len(results) > 1: logger.info( "Several products found for this id (%s). You may try searching using more selective criteria.", results, ) return SearchResult([], 0, results.errors) def _fetch_external_collection(self, provider: str, collection: str): plugins = self._plugins_manager.get_search_plugins(provider=provider) plugin = next(plugins) # check after plugin init if still fetchable if not getattr(plugin.config, "discover_collections", {}).get("fetch_url"): return None kwargs: dict[str, Any] = {"collection": collection} # append auth if needed if getattr(plugin.config, "need_auth", False): if auth := self._plugins_manager.get_auth( plugin.provider, getattr(plugin.config, "api_endpoint", None), plugin.config, ): kwargs["auth"] = auth collection_config = plugin.discover_collections(**kwargs) self.update_collections_list({provider: collection_config}) def _prepare_search( self, start: Optional[str] = None, end: Optional[str] = None, geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None, locations: Optional[dict[str, str]] = None, provider: Optional[str] = None, **kwargs: Any, ) -> tuple[list[Union[Search, Api]], dict[str, Any]]: """Internal method to prepare the search kwargs and get the search plugins. Product query: * By id (plus optional 'provider') * By search params: * collection query: * By collection (e.g. 'S2_MSI_L1C') * By params (e.g. 'platform'), see guess_collection * dates: 'start' and/or 'end' * geometry: 'geom' or 'bbox' or 'box' * search locations * TODO: better expose cloudCover * other search params are passed to Searchplugin.query() :param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param geom: (optional) Search area that can be defined in different ways (see search) :param locations: (optional) Location filtering by name using locations configuration :param provider: provider to be used, if no provider is given or the collection is not available for the provider, the preferred provider is used :param kwargs: Some other criteria * id and/or a provider for a search by * search criteria to guess the collection * other criteria compatible with the provider :returns: Search plugins list and the prepared kwargs to make a query. """ collection: Optional[str] = kwargs.get("collection") if collection is None: try: guesses = self.guess_collection(**kwargs) # guess_collection raises a NoMatchingCollection error if no product # is found. Here, the supported search params are removed from the # kwargs if present, not to propagate them to the query itself. for param in ( "instruments", "constellation", "platform", "processing:level", "eodag:sensor_type", ): kwargs.pop(param, None) # By now, only use the best bet collection = guesses[0].id except NoMatchingCollection: queried_id = kwargs.get("id") if queried_id is None: logger.info( "No collection could be guessed with provided arguments" ) else: return [], kwargs if collection is not None: try: collection = self.get_collection_from_alias(collection) except NoMatchingCollection: logger.info("unknown collection " + collection) kwargs["collection"] = collection if start is not None: if kwargs.pop("datetime", None): logger.warning("datetime filter is overwritten by start") kwargs["start_datetime"] = start if end is not None: if kwargs.pop("datetime", None): logger.warning("datetime filter is overwritten by end") kwargs["end_datetime"] = end if not start and not end and "datetime" in kwargs: datetimes = get_datetime(kwargs) kwargs["start_datetime"] = datetimes[0] kwargs["end_datetime"] = datetimes[1] if "sort_by" in kwargs: new_sort_by = [] for param_tuple in kwargs["sort_by"]: if param_tuple[0] == "datetime": new_sort_by.append(("start_datetime", param_tuple[1])) else: new_sort_by.append(param_tuple) kwargs["sort_by"] = new_sort_by if geom is not None: kwargs["geometry"] = geom elif "intersects" in kwargs: kwargs["geometry"] = kwargs.pop("intersects") box = kwargs.pop("box", None) box = kwargs.pop("bbox", box) if geom is None and box is not None: kwargs["geometry"] = box kwargs["locations"] = locations kwargs["geometry"] = get_geometry_from_various(self.locations_config, **kwargs) # remove locations_args from kwargs now that they have been used locations_dict = {loc["name"]: loc for loc in self.locations_config} for arg in locations_dict.keys(): kwargs.pop(arg, None) del kwargs["locations"] # fetch collections list if collection is unknown if ( collection not in self._plugins_manager.collection_to_provider_config_map.keys() ): if provider and collection: # fetch ref for given provider and collection logger.debug( f"Fetching external collections sources to find {provider} {collection} collection" ) self.fetch_collections_list(provider) if ( collection not in self._plugins_manager.collection_to_provider_config_map.keys() ): # Try to get specific collection from external provider logger.debug( "Fetching %s to find %s collection", provider, collection ) self._fetch_external_collection(provider, collection) if not provider: # no provider or still not found -> fetch all external collections logger.debug( f"Fetching external collections sources to find {collection} collection" ) self.fetch_collections_list() preferred_provider = self.get_preferred_provider()[0] search_plugins: list[Union[Search, Api]] = [] for plugin in self._plugins_manager.get_search_plugins( collection=collection, provider=provider ): # exclude MeteoblueSearch plugins from search fallback for unknown collection if ( provider != plugin.provider and preferred_provider != plugin.provider and collection not in self.collections_config and isinstance(plugin, MeteoblueSearch) ): continue search_plugins.append(plugin) if not provider: provider = preferred_provider providers = [plugin.provider for plugin in search_plugins] if provider not in providers: logger.debug( "Collection '%s' is not available with preferred provider '%s'.", collection, provider, ) else: provider_plugin = list( filter(lambda p: p.provider == provider, search_plugins) )[0] search_plugins.remove(provider_plugin) search_plugins.insert(0, provider_plugin) # Add collections_config to plugin config. This dict contains product # type metadata that will also be stored in each product's properties. for search_plugin in search_plugins: if collection is not None: self._attach_collection_config(search_plugin, collection) return search_plugins, kwargs def _do_search( self, search_plugin: Union[Search, Api], count: bool = False, raise_errors: bool = False, validate: Optional[bool] = True, **kwargs: Any, ) -> SearchResult: """Internal method that performs a search on a given provider. :param search_plugin: A search plugin :param count: (optional) Whether to run a query with a count request or not :param raise_errors: (optional) When an error occurs when searching, if this is set to True, the error is raised :param kwargs: Some other criteria that will be used to do the search :param validate: (optional) Set to True to validate search parameters before sending the query to the provider :returns: A collection of EO products matching the criteria """ logger.info("Searching on provider %s", search_plugin.provider) max_limit = getattr(search_plugin.config, "pagination", {}).get( "max_limit", DEFAULT_MAX_LIMIT ) if kwargs.get("limit", DEFAULT_LIMIT) > max_limit and max_limit > 0: logger.warning( "EODAG believes that you might have asked for more products/items " "than the maximum allowed by '%s': %s > %s. Try to lower " "the value of 'limit' and get the next page (e.g. 'page=2'), " "or directly use the 'search_all' method.", search_plugin.provider, kwargs["limit"], max_limit, ) errors: list[tuple[str, Exception]] = [] try: prep = PreparedSearch(count=count) prep.raise_errors = raise_errors # append auth if needed if getattr(search_plugin.config, "need_auth", False): if auth := self._plugins_manager.get_auth( search_plugin.provider, getattr(search_plugin.config, "api_endpoint", None), search_plugin.config, ): prep.auth = auth prep.limit = kwargs.pop("limit", None) prep.next_page_token = kwargs.pop("next_page_token", None) prep.next_page_token_key = kwargs.pop( "next_page_token_key", None ) or search_plugin.config.pagination.get("next_page_token_key", "page") prep.page = kwargs.pop("page", None) if ( prep.next_page_token_key == "page" and prep.limit is not None and prep.next_page_token is None and prep.page is not None ): prep.next_page_token = str( prep.page - 1 + search_plugin.config.pagination.get("start_page", DEFAULT_PAGE) ) # remove None values and convert param names to their pydantic alias if any search_params = {} queryables_fields = Queryables.from_stac_models().model_fields for param, value in kwargs.items(): if value is None: continue if param in queryables_fields: param_alias = queryables_fields[param].alias or param search_params[param_alias] = value else: # remove `provider:` or `provider_` prefix if any search_params[ re.sub(r"^" + search_plugin.provider + r"[_:]", "", param) ] = value if validate: search_plugin.validate(search_params, prep.auth) search_result = search_plugin.query(prep, **search_params) if not isinstance(search_result, SearchResult): raise PluginImplementationError( "The query function of a Search plugin must return a SearchResult " "results, got {} instead".format(type(search_result)) ) # Filter and attach to each eoproduct in the result the plugin capable of # downloading it (this is done to enable the eo_product to download itself # doing: eo_product.download()). The filtering is done by keeping only # those eo_products that intersects the search extent (if there was no # search extent, search_intersection contains the geometry of the # eo_product) # WARNING: this means an eo_product that has an invalid geometry can still # be returned as a search result if there was no search extent (because we # will not try to do an intersection) for eo_product in search_result: # if collection is not defined, try to guess using properties if eo_product.collection is None: pattern = re.compile(r"[^\w,]+") try: guesses = self.guess_collection( intersect=False, **{ k: pattern.sub("", str(v).upper()) for k, v in eo_product.properties.items() if k in [ "instruments", "constellation", "platform", "processing:level", "eodag:sensor_type", "keywords", ] and v is not None }, ) except NoMatchingCollection: pass else: eo_product.collection = guesses[0].id if eo_product.search_intersection is not None: eo_product._register_downloader_from_manager(self._plugins_manager) # Make next_page not available if the current one returned less than the maximum number of items asked for. if not prep.limit or len(search_result) < prep.limit: search_result.next_page_token = None search_result._dag = self return search_result except Exception as e: if raise_errors: # Raise the error, letting the application wrapping eodag know that # something went bad. This way it will be able to decide what to do next raise else: logger.exception( "Error while searching on provider %s (ignored):", search_plugin.provider, ) errors.append((search_plugin.provider, e)) return SearchResult([], 0, errors)
[docs] def crunch(self, results: SearchResult, **kwargs: Any) -> SearchResult: """Apply the filters given through the keyword arguments to the results :param results: The results of a eodag search request :returns: The result of successively applying all the filters to the results """ search_criteria = kwargs.pop("search_criteria", {}) for cruncher_name, cruncher_args in kwargs.items(): cruncher = self._plugins_manager.get_crunch_plugin( cruncher_name, **cruncher_args ) results = results.crunch(cruncher, **search_criteria) return results
[docs] @staticmethod def group_by_extent(searches: list[SearchResult]) -> list[SearchResult]: """Combines multiple SearchResults and return a list of SearchResults grouped by extent (i.e. bounding box). :param searches: List of eodag SearchResult :returns: list of :class:`~eodag.api.search_result.SearchResult` """ # Dict with extents as keys, each extent being defined by a str # "{minx}{miny}{maxx}{maxy}" (each float rounded to 2 dec). products_grouped_by_extent: dict[str, Any] = {} for search in searches: for product in search: same_geom = products_grouped_by_extent.setdefault( "".join([str(round(p, 2)) for p in product.geometry.bounds]), [] ) same_geom.append(product) return [ SearchResult(products_grouped_by_extent[extent_as_str]) for extent_as_str in products_grouped_by_extent ]
[docs] def download_all( self, search_result: SearchResult, downloaded_callback: Optional[DownloadedCallback] = None, progress_callback: Optional[ProgressCallback] = None, executor: Optional[ThreadPoolExecutor] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> list[str]: """Download all products resulting from a search. :param search_result: A set of EO products resulting from a search :param downloaded_callback: (optional) A method or a callable object which takes as parameter the ``product``. You can use the base class :class:`~eodag.utils.DownloadedCallback` and override its ``__call__`` method. Will be called each time a product finishes downloading :param progress_callback: (optional) A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :param executor: (optional) An executor to download EO products of ``search_result`` in parallel which will also be reused to download assets of these products in parallel. :param wait: (optional) If download fails, wait time in minutes between two download tries of the same product :param timeout: (optional) If download fails, maximum time in minutes before stop retrying to download :param kwargs: Additional keyword arguments from the download plugin configuration class that can be provided to override any other values defined in a configuration file or with environment variables: * ``output_dir`` - where to store downloaded products, as an absolute file path (Default: local temporary directory) * ``output_extension`` - downloaded file extension * ``extract`` - whether to extract the downloaded products, only applies to archived products * ``dl_url_params`` - additional parameters to pass over to the download url as an url parameter * ``delete_archive`` - whether to delete the downloaded archives * ``asset`` - regex filter to identify assets to download :returns: A collection of the absolute paths to the downloaded products """ paths = [] if search_result: logger.info("Downloading %s products", len(search_result)) # Get download plugin using first product assuming all plugins use base.Download.download_all download_plugin = self._plugins_manager.get_download_plugin( search_result[0] ) paths = download_plugin.download_all( search_result, downloaded_callback=downloaded_callback, progress_callback=progress_callback, executor=executor, wait=wait, timeout=timeout, **kwargs, ) else: logger.info("Empty search result, nothing to be downloaded !") return paths
[docs] @staticmethod def serialize( search_result: SearchResult, filename: str = "search_results.geojson", skip_invalid: bool = True, ) -> str: """Registers results of a search into a geojson file. The output is a FeatureCollection containing the EO products as features, with additional metadata such as ``number_matched``, ``next_page_token``, and ``search_params`` stored in the properties. :param search_result: A set of EO products resulting from a search :param filename: (optional) The name of the file to generate :param skip_invalid: Whether to skip properties whose values are not valid according to the STAC specification. :returns: The name of the created file """ search_result_dict = search_result.as_dict(skip_invalid=skip_invalid) # add self link search_result_dict.setdefault("links", []) search_result_dict["links"].append( { "rel": "self", "href": f"{filename}", "type": "application/json", }, ) # write search results with open(filename, "w") as fh: geojson.dump(search_result_dict, fh) logger.debug("Search results saved to %s", filename) # write collection(s) if search_result._dag is None: return filename collections = set(p.collection for p in search_result) for collection in collections: collection_obj = search_result._dag.collections_config.get( collection, Collection(id=collection) ) collection_dict = collection_obj.serialize() # add links collection_dict.setdefault("links", []) collection_dict["links"].append( { "rel": "self", "href": f"{collection}.json", "type": "application/json", }, ) with open(Path(filename).parent / f"{collection}.json", "w") as fh: geojson.dump(collection_dict, fh) logger.debug("Collection '%s' saved to %s", collection, fh.name) return filename
[docs] @staticmethod def deserialize(filename: str) -> SearchResult: """Loads results of a search from a geojson file. :param filename: A filename containing a search result encoded as a geojson :returns: The search results encoded in `filename` """ return SearchResult.from_file(filename)
[docs] def deserialize_and_register(self, filename: str) -> SearchResult: """Loads results of a search from a geojson file and register products with the information needed to download itself. This method also sets the internal EODataAccessGateway instance on the products, enabling pagination (e.g. access to next pages) if available. :param filename: A filename containing a search result encoded as a geojson :returns: The search results encoded in `filename`, ready for download and pagination """ return SearchResult.from_file(filename, self)
[docs] def download( self, product: EOProduct, progress_callback: Optional[ProgressCallback] = None, executor: Optional[ThreadPoolExecutor] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> str: """Download a single product. This is an alias to the method of the same name on :class:`~eodag.api.product._product.EOProduct`, but it performs some additional checks like verifying that a downloader and authenticator are registered for the product before trying to download it. If the metadata mapping for ``eodag:download_link`` is set to something that can be interpreted as a link on a local filesystem, the download is skipped (by now, only a link starting with ``file:/`` is supported). Therefore, any user that knows how to extract product location from product metadata on a provider can override the ``eodag:download_link`` metadata mapping in the right way. For example, using the environment variable: ``EODAG__CREODIAS__SEARCH__METADATA_MAPPING__EODAG_DOWNLOAD_LINK="file:///{id}"`` will lead to all :class:`~eodag.api.product._product.EOProduct`'s originating from the provider ``creodias`` to have their ``eodag:download_link`` metadata point to something like: ``file:///12345-678``, making this method immediately return the later string without trying to download the product. :param product: The EO product to download :param progress_callback: (optional) A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :param executor: (optional) An executor to download assets of ``product`` in parallel if it has any. If ``None`` , a default executor will be created :param wait: (optional) If download fails, wait time in minutes between two download tries :param timeout: (optional) If download fails, maximum time in minutes before stop retrying to download :param kwargs: Additional keyword arguments from the download plugin configuration class that can be provided to override any other values defined in a configuration file or with environment variables: * ``output_dir`` - where to store downloaded products, as an absolute file path (Default: local temporary directory) * ``output_extension`` - downloaded file extension * ``extract`` - whether to extract the downloaded products, only applies to archived products * ``dl_url_params`` - additional parameters to pass over to the download url as an url parameter * ``delete_archive`` - whether to delete the downloaded archives * ``asset`` - regex filter to identify assets to download :returns: The absolute path to the downloaded product in the local filesystem :raises: :class:`~eodag.utils.exceptions.PluginImplementationError` :raises: :class:`RuntimeError` """ if product.location.startswith("file:/"): logger.info("Local product detected. Download skipped") return uri_to_path(product.location) self._setup_downloader(product) path = product.download( progress_callback=progress_callback, executor=executor, wait=wait, timeout=timeout, **kwargs, ) return path
def _setup_downloader(self, product: EOProduct) -> None: if product.downloader is None: downloader = self._plugins_manager.get_download_plugin(product) auth = product.downloader_auth if auth is None: auth = self._plugins_manager.get_auth_plugin(downloader, product) product.register_downloader(downloader, auth)
[docs] def get_cruncher(self, name: str, **options: Any) -> Crunch: """Build a crunch plugin from a configuration :param name: The name of the cruncher to build :param options: The configuration options of the cruncher :returns: The cruncher named ``name`` """ plugin_conf = {"name": name} plugin_conf.update({key.replace("-", "_"): val for key, val in options.items()}) return self._plugins_manager.get_crunch_plugin(name, **plugin_conf)
[docs] def list_queryables( self, provider: Optional[str] = None, fetch_providers: bool = True, **kwargs: Any, ) -> QueryablesDict: """Fetch the queryable properties for a given collection and/or provider. :param provider: (optional) The provider. :param fetch_providers: If new collections should be fetched from the providers; default: True :param kwargs: additional filters for queryables (`collection` or other search arguments) :returns: A :class:`~eodag.api.product.queryables.QueryablesDict` containing the EODAG queryable properties, associating parameters to their annotated type, and a additional_properties attribute """ # only fetch providers if collection is not found available_collections: list[str] = [ col.id for col in self.list_collections(provider=provider, fetch_providers=False) ] collection: Optional[str] = kwargs.get("collection") coll_alias: Optional[str] = collection if collection: if collection not in available_collections: if fetch_providers: # fetch providers and try again available_collections = [ col.id for col in self.list_collections( provider=provider, fetch_providers=True ) ] try: kwargs["collection"] = collection = self.get_collection_from_alias( collection ) except NoMatchingCollection: # try fetching queryables for custom collection even if not known pass if not provider and not collection: return QueryablesDict( additional_properties=True, **model_fields_to_annotated(CommonQueryables.model_fields), ) additional_properties = False additional_information = [] queryable_properties: dict[str, Any] = {} for plugin in self._plugins_manager.get_search_plugins(collection, provider): # attach collection config collection_configs: dict[str, Any] = {} if collection: self._attach_collection_config(plugin, collection) collection_configs[collection] = plugin.config.collection_config else: for col in available_collections: self._attach_collection_config(plugin, col) collection_configs[col] = plugin.config.collection_config # authenticate if required if getattr(plugin.config, "need_auth", False) and ( auth := self._plugins_manager.get_auth_plugin(plugin) ): try: plugin.auth = auth.authenticate() except AuthenticationError: logger.debug( "queryables from provider %s could not be fetched due to an authentication error", plugin.provider, ) # use queryables aliases kwargs_alias = {**kwargs} queryables_fields = Queryables.from_stac_models().model_fields for search_param, field_info in queryables_fields.items(): if search_param in kwargs and field_info.alias: if isinstance(field_info.alias, AliasChoices): kwargs_alias[ str(field_info.alias.choices[0]) ] = kwargs_alias.pop(search_param) else: kwargs_alias[field_info.alias] = kwargs_alias.pop(search_param) plugin_queryables = plugin.list_queryables( kwargs_alias, available_collections, collection_configs, collection, coll_alias, ) if plugin_queryables.additional_information: additional_information.append( f"{plugin.provider}: {plugin_queryables.additional_information}" ) queryable_properties = {**plugin_queryables, **queryable_properties} additional_properties = ( additional_properties or plugin_queryables.additional_properties ) return QueryablesDict( additional_properties=additional_properties, additional_information=" | ".join(additional_information), **queryable_properties, )
[docs] def available_sortables(self) -> dict[str, Optional[ProviderSortables]]: """For each provider, gives its available sortable parameter(s) and its maximum number of them if it supports the sorting feature, otherwise gives None. :returns: A dictionary with providers as keys and dictionary of sortable parameter(s) and its (their) maximum number as value(s). :raises: :class:`~eodag.utils.exceptions.UnsupportedProvider` """ sortables: dict[str, Optional[ProviderSortables]] = {} provider_search_plugins = self._plugins_manager.get_search_plugins() for provider_search_plugin in provider_search_plugins: provider = provider_search_plugin.provider if not hasattr(provider_search_plugin.config, "sort"): sortables[provider] = None continue sortable_params = list( provider_search_plugin.config.sort.get("sort_param_mapping", {}).keys() ) if not provider_search_plugin.config.sort.get("max_sort_params"): sortables[provider] = { "sortables": sortable_params, "max_sort_params": None, } continue sortables[provider] = { "sortables": sortable_params, "max_sort_params": provider_search_plugin.config.sort[ "max_sort_params" ], } return sortables
def _attach_collection_config(self, plugin: Search, collection: str) -> None: """ Attach collections_config to plugin config. This dict contains product type metadata that will also be stored in each product's properties. """ try: plugin.config.collection_config = dict( [ c.model_dump(mode="json", exclude={"id"}) for c in self.list_collections( plugin.provider, fetch_providers=False ) if c._id == collection ][0], **{"collection": collection}, ) # If the product isn't in the catalog, it's a generic collection. except IndexError: # Construct the GENERIC_COLLECTION metadata plugin.config.collection_config = dict( **self.collections_config[GENERIC_COLLECTION].model_dump( mode="json", exclude={"id"} ), collection=collection, )
[docs] def import_stac_items( self, items_urls: list[str], provider: Optional[str] = None ) -> SearchResult: """Import STAC items from a list of URLs and convert them to SearchResult. - Origin provider and download links will be set if item comes from an EODAG server. - If item comes from a known EODAG provider, result will be registered to it, ready to download and its metadata normalized. - If item comes from an unknown provider, a generic STAC provider will be used. :param items_urls: A list of STAC items URLs to import :param provider: (optional) The EODAG provider to which the STAC items belong, if known. :returns: A SearchResult containing the imported STAC items """ json_items = [] for item_url in items_urls: json_items.extend(fetch_stac_items(item_url)) # add a generic STAC provider that might be needed to handle the items self.add_provider(GENERIC_STAC_PROVIDER) results = SearchResult([]) for json_item in json_items: if product := EOProduct._from_stac_item( json_item, self._plugins_manager, provider ): results.append(product) return results