# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import datetime as dt
import itertools
import logging
import os
import re
import shutil
import tempfile
import warnings
from collections import deque
from copy import deepcopy
from importlib.metadata import version
from importlib.resources import files as res_files
from operator import attrgetter, itemgetter
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterator, Optional, Union, cast
import geojson
import yaml
from pydantic import AliasChoices
from eodag.api.collection import Collection, CollectionsDict, CollectionsList
from eodag.api.product import EOProduct
from eodag.api.product.metadata_mapping import mtd_cfg_as_conversion_and_querypath
from eodag.api.provider import Provider, ProvidersDict
from eodag.api.search_result import SearchResult
from eodag.config import (
PLUGINS_TOPICS_KEYS,
PluginConfig,
SimpleYamlProxyConfig,
credentials_in_auth,
get_ext_collections_conf,
load_default_config,
load_yml_config,
)
from eodag.plugins.manager import PluginManager
from eodag.plugins.search import PreparedSearch
from eodag.plugins.search.build_search_result import MeteoblueSearch
from eodag.plugins.search.qssearch import PostJsonSearch
from eodag.types import model_fields_to_annotated
from eodag.types.queryables import CommonQueryables, Queryables, QueryablesDict
from eodag.utils import (
DEFAULT_DOWNLOAD_TIMEOUT,
DEFAULT_DOWNLOAD_WAIT,
DEFAULT_LIMIT,
DEFAULT_MAX_LIMIT,
DEFAULT_PAGE,
GENERIC_COLLECTION,
GENERIC_STAC_PROVIDER,
_deprecated,
get_geometry_from_various,
makedirs,
sort_dict,
string_to_jsonpath,
uri_to_path,
)
from eodag.utils.dates import get_datetime, rfc3339_str_to_datetime
from eodag.utils.env import is_env_var_true
from eodag.utils.exceptions import (
AuthenticationError,
NoMatchingCollection,
PluginImplementationError,
RequestError,
UnsupportedProvider,
ValidationError,
)
from eodag.utils.free_text_search import compile_free_text_query
from eodag.utils.stac_reader import fetch_stac_items
if TYPE_CHECKING:
from concurrent.futures import ThreadPoolExecutor
from shapely.geometry.base import BaseGeometry
from eodag.plugins.apis.base import Api
from eodag.plugins.crunch.base import Crunch
from eodag.plugins.search.base import Search
from eodag.types import ProviderSortables
from eodag.types.download_args import DownloadConf
from eodag.utils import DownloadedCallback, ProgressCallback, Unpack
logger = logging.getLogger("eodag.core")
[docs]
class EODataAccessGateway:
"""An API for downloading a wide variety of geospatial products originating
from different types of providers.
:param user_conf_file_path: (optional) Path to the user configuration file
:param locations_conf_path: (optional) Path to the locations configuration file
"""
def __init__(
self,
user_conf_file_path: Optional[str] = None,
locations_conf_path: Optional[str] = None,
) -> None:
collections_config_path = os.getenv("EODAG_COLLECTIONS_CFG_FILE") or str(
res_files("eodag") / "resources" / "collections.yml"
)
collections_config_dict = SimpleYamlProxyConfig(collections_config_path).source
self.collections_config = self._collections_config_init(collections_config_dict)
self._providers = ProvidersDict.from_configs(load_default_config())
env_var_cfg_dir = "EODAG_CFG_DIR"
self.conf_dir = os.getenv(
env_var_cfg_dir,
default=os.path.join(os.path.expanduser("~"), ".config", "eodag"),
)
try:
makedirs(self.conf_dir)
except OSError as e:
logger.debug(e)
tmp_conf_dir = os.path.join(tempfile.gettempdir(), ".config", "eodag")
logger.warning(
f"Cannot create configuration directory {self.conf_dir}. "
+ f"Falling back to temporary directory {tmp_conf_dir}."
)
if os.getenv(env_var_cfg_dir) is None:
logger.warning(
"You can set the path of the configuration directory "
+ f"with the environment variable {env_var_cfg_dir}"
)
self.conf_dir = tmp_conf_dir
makedirs(self.conf_dir)
self._plugins_manager = PluginManager(self._providers)
self._providers = self._plugins_manager.providers
# First level override: From a user configuration file
if user_conf_file_path is None:
env_var_name = "EODAG_CFG_FILE"
standard_configuration_path = os.path.join(self.conf_dir, "eodag.yml")
user_conf_file_path = os.getenv(env_var_name)
if user_conf_file_path is None:
user_conf_file_path = standard_configuration_path
source = str(
res_files("eodag") / "resources" / "user_conf_template.yml"
)
if os.path.isfile(source) and not os.path.isfile(
standard_configuration_path
):
shutil.copy(
source,
standard_configuration_path,
)
self._providers.update_from_config_file(user_conf_file_path)
# Second level override: From environment variables
self._providers.update_from_env()
# init updated providers conf
strict_mode = is_env_var_true("EODAG_STRICT_COLLECTIONS")
for provider in self._providers.values():
provider.sync_collections(self, strict_mode)
# re-build _plugins_manager using up-to-date providers_config
self._plugins_manager.rebuild(self._providers)
# store pruned providers configs
self._pruned_providers_config: dict[str, Any] = {}
# filter out providers needing auth that have no credentials set
self._prune_providers_list()
# Sort providers taking into account of possible new priority orders
self._plugins_manager.sort_providers()
self.set_locations_conf(locations_conf_path)
def _collections_config_init(
self, collections_config_dict: dict[str, Any]
) -> CollectionsDict:
"""Initialize collections configuration.
:param collections_config_dict: The collections config as a dictionary
"""
# Turn the collections config from a dict into a CollectionsDict() object
collections = [
Collection.create_with_dag(self, id=col, **col_f)
for col, col_f in collections_config_dict.items()
]
return CollectionsDict(collections)
@property
def providers(self) -> ProvidersDict:
"""Providers of eodag configuration sorted by priority in descending order and by name in ascending order."""
providers = deepcopy(self._providers)
# Sort: priority descending, then name ascending
providers.data = {
k: v
for k, v in sorted(
providers.data.items(), key=lambda item: (-item[1].priority, item[0])
)
}
return providers
def get_version(self) -> str:
"""Get eodag package version"""
return version("eodag")
[docs]
def set_preferred_provider(self, provider: str) -> None:
"""Set max priority for the given provider.
:param provider: The name of the provider that should be considered as the
preferred provider to be used for this instance
"""
if provider not in self.providers.names:
raise UnsupportedProvider(
f"This provider is not recognised by eodag: {provider}"
)
preferred_provider, max_priority = self.get_preferred_provider()
if preferred_provider != provider:
new_priority = max_priority + 1
self._plugins_manager.set_priority(provider, new_priority)
[docs]
def get_preferred_provider(self) -> tuple[str, int]:
"""Get the provider currently set as the preferred one for searching
products, along with its priority.
:returns: The provider with the maximum priority and its priority
"""
return max(self._providers.priorities.items(), key=itemgetter(1))
[docs]
def update_providers_config(
self,
yaml_conf: Optional[str] = None,
dict_conf: Optional[dict[str, Any]] = None,
) -> None:
"""Update providers configuration with given input.
Can be used to add a provider to existing configuration or update
an existing one.
:param yaml_conf: YAML formated provider configuration
:param dict_conf: provider configuration as dictionary in place of ``yaml_conf``
"""
if dict_conf is not None:
conf_update = dict_conf
elif yaml_conf is not None:
conf_update = yaml.safe_load(yaml_conf)
else:
return None
# restore the pruned configuration
for name in list(self._pruned_providers_config):
config = self._pruned_providers_config[name]
if name in conf_update:
logger.info(
"%s: provider restored from the pruned configurations", name
)
self._providers[name] = Provider(config)
self._pruned_providers_config.pop(name)
self._providers.update_from_configs(conf_update)
# re-create _plugins_manager using up-to-date providers_config
self._plugins_manager.build_collection_to_provider_config_map()
[docs]
def add_provider(
self,
name: str,
url: Optional[str] = None,
priority: Optional[int] = None,
search: dict[str, Any] = {"type": "StacSearch"},
products: dict[str, Any] = {
GENERIC_COLLECTION: {"_collection": "{collection}"}
},
download: dict[str, Any] = {"type": "HTTPDownload", "auth_error_code": 401},
**kwargs: dict[str, Any],
):
"""Adds a new provider.
``search``, ``products`` & ``download`` already have default values that will be
updated (not replaced), with user provided ones:
* ``search`` : ``{"type": "StacSearch"}``
* ``products`` : ``{"GENERIC_COLLECTION": {"_collection": "{collection}"}}``
* ``download`` : ``{"type": "HTTPDownload", "auth_error_code": 401}``
:param name: Name of provider
:param url: Provider url, also used as ``search["api_endpoint"]`` if not defined
:param priority: Provider priority. If None, provider will be set as preferred (highest priority)
:param search: Search :class:`~eodag.config.PluginConfig` mapping
:param products: Provider collections mapping
:param download: Download :class:`~eodag.config.PluginConfig` mapping
:param kwargs: Additional :class:`~eodag.api.provider.ProviderConfig` mapping
"""
conf_dict: dict[str, Any] = {
name: {
"name": name,
"url": url,
"search": {"type": "StacSearch", **search},
"products": {
GENERIC_COLLECTION: {"_collection": "{collection}"},
**products,
},
"download": {
"type": "HTTPDownload",
"auth_error_code": 401,
**download,
},
**kwargs,
}
}
if priority is not None:
conf_dict[name]["priority"] = priority
# if provided, use url as default search api_endpoint
if (
url
and conf_dict[name].get("search", {})
and not conf_dict[name]["search"].get("api_endpoint")
):
conf_dict[name]["search"]["api_endpoint"] = url
# api plugin usage: remove unneeded search/download/auth plugin conf
if conf_dict[name].get("api"):
for k in PLUGINS_TOPICS_KEYS:
if k != "api":
conf_dict[name].pop(k, None)
self.update_providers_config(dict_conf=conf_dict)
if priority is None:
self.set_preferred_provider(name)
def _prune_providers_list(self) -> None:
"""Removes from config providers needing auth that have no credentials set."""
update_needed = False
# loop over a copy to allow popping items
for name, provider in list(self._providers.items()):
conf = provider.config
# remove providers using skipped plugins
if [
v
for v in conf.__dict__.values()
if isinstance(v, PluginConfig)
and getattr(v, "type", None) in self._plugins_manager.skipped_plugins
]:
del self._providers[provider.name]
logger.debug(
f"{provider}: provider needing unavailable plugin has been removed"
)
continue
# check authentication
if hasattr(conf, "api") and getattr(conf.api, "need_auth", False):
credentials_exist = credentials_in_auth(conf.api)
if not credentials_exist:
# credentials needed but not found
self._pruned_providers_config[provider.name] = conf
del self._providers[provider.name]
update_needed = True
logger.info(
"%s: provider needing auth for search has been pruned because no credentials could be found",
provider,
)
elif hasattr(conf, "search") and getattr(conf.search, "need_auth", False):
if not hasattr(conf, "auth") and not hasattr(conf, "search_auth"):
# credentials needed but no auth plugin was found
self._pruned_providers_config[provider.name] = conf
del self._providers[provider.name]
update_needed = True
logger.info(
"%s: provider needing auth for search has been pruned because no auth plugin could be found",
provider,
)
continue
credentials_exist = (
hasattr(conf, "search_auth")
and credentials_in_auth(conf.search_auth)
) or (
not hasattr(conf, "search_auth")
and hasattr(conf, "auth")
and credentials_in_auth(conf.auth)
)
if not credentials_exist:
# credentials needed but not found
self._pruned_providers_config[provider.name] = conf
del self._providers[provider.name]
update_needed = True
logger.info(
"%s: provider needing auth for search has been pruned because no credentials could be found",
provider,
)
elif not hasattr(conf, "api") and not hasattr(conf, "search"):
# provider should have at least an api or search plugin
self._pruned_providers_config[provider.name] = conf
del self._providers[provider.name]
update_needed = True
logger.info(
"%s: provider has been pruned because no api or search plugin could be found",
provider,
)
if update_needed:
# rebuild _plugins_manager with updated providers list
self._plugins_manager.rebuild(self._providers)
def set_locations_conf(self, locations_conf_path: Optional[str]) -> None:
"""Set locations configuration.
This configuration (YML format) will contain a shapefile list associated
to a name and attribute parameters needed to identify the needed geometry.
You can also configure parent attributes, which can be used for creating
a catalogs path when using eodag as a REST server.
Example of locations configuration file content:
.. code-block:: yaml
shapefiles:
- name: country
path: /path/to/countries_list.shp
attr: ISO3
- name: department
path: /path/to/FR_departments.shp
attr: code_insee
parent:
name: country
attr: FRA
:param locations_conf_path: Path to the locations configuration file
"""
if locations_conf_path is None:
locations_conf_path = os.getenv("EODAG_LOCS_CFG_FILE")
if locations_conf_path is None:
locations_conf_path = os.path.join(self.conf_dir, "locations.yml")
if not os.path.isfile(locations_conf_path):
# Ensure the directory exists
os.makedirs(os.path.dirname(locations_conf_path), exist_ok=True)
# copy locations conf file and replace path example
locations_conf_template = str(
res_files("eodag") / "resources" / "locations_conf_template.yml"
)
with (
open(locations_conf_template) as infile,
open(locations_conf_path, "w") as outfile,
):
# The template contains paths in the form of:
# /path/to/locations/file.shp
path_template = "/path/to/locations/"
for line in infile:
line = line.replace(
path_template,
os.path.join(self.conf_dir, "shp") + os.path.sep,
)
outfile.write(line)
# copy sample shapefile dir
shutil.copytree(
str(res_files("eodag") / "resources" / "shp"),
os.path.join(self.conf_dir, "shp"),
)
if os.path.isfile(locations_conf_path):
locations_config = load_yml_config(locations_conf_path)
main_key = next(iter(locations_config))
main_locations_config = locations_config[main_key]
logger.info("Locations configuration loaded from %s" % locations_conf_path)
self.locations_config: list[dict[str, Any]] = main_locations_config
else:
logger.info(
"Could not load locations configuration from %s" % locations_conf_path
)
self.locations_config = []
[docs]
def list_collections(
self, provider: Optional[str] = None, fetch_providers: bool = True
) -> CollectionsList:
"""Lists supported collections.
:param provider: (optional) The name of a provider that must support the product
types we are about to list
:param fetch_providers: (optional) Whether to fetch providers for new product
types or not
:returns: The list of the collections that can be accessed using eodag.
:raises: :class:`~eodag.utils.exceptions.UnsupportedProvider`
"""
if fetch_providers:
# First, update collections list if possible
self.fetch_collections_list(provider=provider)
providers_iter, providers_check = itertools.tee(
self._providers.filter_by_name_or_group(provider)
)
if provider and not any(providers_check):
raise UnsupportedProvider(
f"The requested provider is not (yet) supported: {provider}"
)
# unique collection ids from providers configs
collection_ids = {
collection_id
for p in providers_iter
for collection_id in p.collections_config
if collection_id != GENERIC_COLLECTION
}
collections = CollectionsList(
[self.collections_config[collection_id] for collection_id in collection_ids]
)
# Return the collections sorted in lexicographic order of their id
collections.sort(key=attrgetter("id"))
return collections
[docs]
def fetch_collections_list(self, provider: Optional[str] = None) -> None:
"""Fetch collections list and update if needed.
If strict mode is enabled (by setting the ``EODAG_STRICT_COLLECTIONS`` environment variable
to a truthy value), this method will not fetch or update collections and will return immediately.
:param provider: The name of a provider or provider-group for which collections
list should be updated. Defaults to all providers (None value).
"""
strict_mode = is_env_var_true("EODAG_STRICT_COLLECTIONS")
if strict_mode:
return
# providers discovery confs that are fetchable
providers_discovery_configs_fetchable: dict[
str, PluginConfig.DiscoverCollections
] = {}
# check if any provider has not already been fetched for collections
already_fetched = True
for provider_to_fetch in self._providers.filter_by_name_or_group(provider):
if provider_to_fetch.fetchable and provider_to_fetch.search_config:
providers_discovery_configs_fetchable[
provider_to_fetch.name
] = provider_to_fetch.search_config.discover_collections
if not provider_to_fetch.collections_fetched:
already_fetched = False
if not already_fetched:
# get ext_collections conf
ext_collections_cfg_file = os.getenv("EODAG_EXT_COLLECTIONS_CFG_FILE")
if ext_collections_cfg_file is not None:
ext_collections_conf = get_ext_collections_conf(
ext_collections_cfg_file
)
else:
ext_collections_conf = get_ext_collections_conf()
if not ext_collections_conf:
# empty ext_collections conf
ext_collections_conf = (
self.discover_collections(provider=provider) or {}
)
# update eodag collections list with new conf
self.update_collections_list(ext_collections_conf)
# Compare current provider with default one to see if it has been modified
# and collections list would need to be fetched
# get ext_collections conf for user modified providers
default_providers = ProvidersDict.from_configs(load_default_config())
for (
provider,
user_discovery_conf,
) in providers_discovery_configs_fetchable.items():
# default discover_collections conf
if provider in default_providers:
default_provider = default_providers[provider]
if not default_provider.search_config:
continue
default_discovery_conf = (
default_provider.search_config.discover_collections
)
# compare confs (care, some providers do not have result_type property)
if default_discovery_conf.get("result_type") == "json" and isinstance(
default_discovery_conf["results_entry"], str
):
default_discovery_conf_parsed = cast(
PluginConfig.DiscoverCollections,
dict(
default_discovery_conf,
**{
"results_entry": string_to_jsonpath(
default_discovery_conf["results_entry"], force=True
)
},
**mtd_cfg_as_conversion_and_querypath(
dict(
generic_collection_id=default_discovery_conf[
"generic_collection_id"
]
)
),
**dict(
generic_collection_parsable_properties=mtd_cfg_as_conversion_and_querypath(
default_discovery_conf[
"generic_collection_parsable_properties"
]
)
),
**dict(
generic_collection_parsable_metadata=mtd_cfg_as_conversion_and_querypath(
default_discovery_conf[
"generic_collection_parsable_metadata"
]
)
),
),
)
else:
default_discovery_conf_parsed = default_discovery_conf
if (
user_discovery_conf == default_discovery_conf
or user_discovery_conf == default_discovery_conf_parsed
) and (
not default_discovery_conf.get("fetch_url")
or "ext_collections_conf" not in locals()
or "ext_collections_conf" in locals()
and (
provider in ext_collections_conf
or len(ext_collections_conf.keys()) == 0
)
):
continue
# providers not skipped here should be user-modified
# or not in ext_collections_conf (if eodag system conf != eodag conf used for ext_collections_conf)
if not already_fetched:
# discover collections for user configured provider
provider_ext_collections_conf = (
self.discover_collections(provider=provider) or {}
)
# update eodag collections list with new conf
self.update_collections_list(provider_ext_collections_conf)
[docs]
def discover_collections(
self, provider: Optional[str] = None
) -> Optional[dict[str, Any]]:
"""Fetch providers for collections
:param provider: The name of a provider or provider-group to fetch. Defaults to
all providers (None value).
:returns: external collections configuration
"""
providers_iter, providers_check = itertools.tee(
self.providers.filter_by_name_or_group(provider)
)
if provider and not any(providers_check):
raise UnsupportedProvider(
f"The requested provider is not (yet) supported: {provider}"
)
ext_collections_conf: dict[str, Any] = {}
kwargs: dict[str, Any] = {}
for p in providers_iter:
if not p.search_config:
return None
if p.fetchable:
search_plugin: Union[Search, Api] = next(
self._plugins_manager.get_search_plugins(provider=p.name)
)
# check after plugin init if still fetchable
if not getattr(search_plugin.config, "discover_collections", {}).get(
"fetch_url"
):
continue
# append auth to search plugin if needed
if getattr(search_plugin.config, "need_auth", False):
if auth := self._plugins_manager.get_auth(
search_plugin.provider,
getattr(search_plugin.config, "api_endpoint", None),
search_plugin.config,
):
kwargs["auth"] = auth
else:
logger.debug(
f"Could not authenticate on {p} for collections discovery"
)
ext_collections_conf[p.name] = None
continue
ext_collections_conf[p.name] = search_plugin.discover_collections(
**kwargs
)
return sort_dict(ext_collections_conf)
[docs]
def update_collections_list(
self, ext_collections_conf: dict[str, Optional[dict[str, dict[str, Any]]]]
) -> None:
"""Update eodag collections list
:param ext_collections_conf: external collections configuration
"""
for provider, new_collections_conf in ext_collections_conf.items():
if new_collections_conf and provider in self._providers:
try:
fetchable = self._providers[provider].fetchable
if not fetchable:
# conf has been updated and provider collections are no more discoverable
continue
provider_products_config = (
self._providers[provider].collections_config or {}
)
except UnsupportedProvider:
logger.debug(
"Ignoring external collections for unknown provider %s",
provider,
)
continue
new_collections: list[str] = []
bad_formatted_col_count = 0
for (
new_collection,
new_collection_conf,
) in new_collections_conf["providers_config"].items():
if new_collection not in provider_products_config:
for existing_collection in provider_products_config.copy():
# compare parsed extracted conf (without metadata_mapping entry)
unparsable_keys = self._providers[
provider
].unparsable_properties
new_parsed_collections_conf = {
k: v
for k, v in new_collection_conf.items()
if k not in unparsable_keys
}
if (
new_parsed_collections_conf.items()
<= provider_products_config[existing_collection].items()
):
# new_collections_conf is a subset on an existing conf
break
else:
try:
# new_collection_conf does not already exist, append it
# to self.collections_config
new_coll_obj = Collection.create_with_dag(
self,
id=new_collection,
**new_collections_conf["collections_config"][
new_collection
],
)
self.collections_config[new_coll_obj._id] = new_coll_obj
except ValidationError:
# skip collection if there is a problem with its id (missing or not a string)
logger.debug(
(
"Collection %s has been pruned on provider %s "
"because its id was incorrectly parsed for eodag"
),
new_collection,
provider,
)
else:
# to provider_products_config
provider_products_config[
new_collection
] = new_collection_conf
ext_collections_conf[provider] = new_collections_conf
new_collections.append(new_collection)
# increase the increment if the new collection had
# bad formatted attributes in the external config
dumped_collection = self.collections_config[
new_coll_obj._id
].model_dump()
dumped_ext_conf_col = {
**dumped_collection,
**new_collections_conf["collections_config"][
new_collection
],
}
if dumped_ext_conf_col != dumped_collection:
bad_formatted_col_count += 1
if new_collections:
logger.debug(
"Added %s collections for %s", len(new_collections), provider
)
if bad_formatted_col_count > 0:
logger.debug(
"bad formatted attributes skipped for %s collection(s) on %s",
bad_formatted_col_count,
provider,
)
elif provider not in self._providers:
# unknown provider
continue
self._providers[provider].collections_fetched = True
# re-create _plugins_manager using up-to-date providers_config
self._plugins_manager.build_collection_to_provider_config_map()
[docs]
@_deprecated(
reason="Please use 'EODataAccessGateway.providers' instead",
version="4.0.0",
)
def available_providers(
self, collection: Optional[str] = None, by_group: bool = False
) -> list[str]:
"""Gives the sorted list of the available providers or groups
.. deprecated:: v4.0.0
Please use :attr:`eodag.api.core.EODataAccessGateway.providers` instead.
The providers or groups are sorted first by their priority level in descending order,
and then alphabetically in ascending order for providers or groups with the same
priority level.
:param collection: (optional) Only list providers configured for this collection
:param by_group: (optional) If set to True, list groups when available instead
of providers, mixed with other providers
:returns: the sorted list of the available providers or groups
"""
candidates = []
# use "providers" property to get sorted providers
for key, provider in self.providers.items():
if collection and collection not in provider.collections_config:
continue
group = getattr(provider.config, "group", None)
name = group if by_group and group else key
candidates.append((name, provider.priority))
if by_group:
# Keep only the highest-priority entry per group
grouped: dict[str, int] = {}
for name, priority in candidates:
if name not in grouped or priority > grouped[name]:
grouped[name] = priority
candidates = list(grouped.items())
return [name for name, _ in candidates]
def get_collection_from_alias(self, alias_or_id: str) -> str:
"""Return the id of a collection by either its id or alias
:param alias_or_id: Alias of the collection. If an existing id is given, this
method will directly return the given value.
:returns: Internal name of the collection.
"""
collections = [
v for k, v in self.collections_config.items() if v.id == alias_or_id
]
if len(collections) > 1:
raise NoMatchingCollection(
f"Too many matching collections for alias {alias_or_id}: {collections}"
)
if len(collections) == 0:
if alias_or_id in self.collections_config:
return alias_or_id
else:
raise NoMatchingCollection(
f"Could not find collection from alias or id {alias_or_id}"
)
return collections[0]._id or collections[0].id
def get_alias_from_collection(self, collection: str) -> str:
"""Return the alias of a collection by its id. If no alias was defined for the
given collection, its id is returned instead.
:param collection: collection id
:returns: Alias of the collection or its id if no alias has been defined for it.
"""
if collection not in self.collections_config:
raise NoMatchingCollection(collection)
if alias := self.collections_config[collection].alias:
return alias
return collection
[docs]
def guess_collection(
self,
free_text: Optional[str] = None,
intersect: bool = False,
instruments: Optional[str] = None,
platform: Optional[str] = None,
constellation: Optional[str] = None,
processing_level: Optional[str] = None,
sensor_type: Optional[str] = None,
keywords: Optional[str] = None,
description: Optional[str] = None,
title: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
**kwargs: Any,
) -> CollectionsList:
"""
Find EODAG collection IDs that best match a set of search parameters.
When using several filters, collections that match most of them will be returned at first.
:param free_text: Free text search filter used to search accross all the following parameters. Handles logical
operators with parenthesis (``AND``/``OR``/``NOT``), quoted phrases (``"exact phrase"``),
``*`` and ``?`` wildcards.
:param intersect: Join results for each parameter using INTERSECT instead of UNION.
:param instruments: Instruments parameter.
:param platform: Platform parameter.
:param constellation: Constellation parameter.
:param processing_level: Processing level parameter.
:param sensor_type: Sensor type parameter.
:param keywords: Keywords parameter.
:param description: description parameter.
:param title: Title parameter.
:param start_date: start date for datetime filtering. Not used by free_text
:param end_date: end date for datetime filtering. Not used by free_text
:returns: The best match for the given parameters.
:raises: :class:`~eodag.utils.exceptions.NoMatchingCollection`
"""
if collection := kwargs.get("collection"):
if collection in self.collections_config:
return CollectionsList([self.collections_config[collection]])
else:
try:
collection = self.get_collection_from_alias(collection)
return CollectionsList([self.collections_config[collection]])
except NoMatchingCollection:
return CollectionsList(
[Collection.create_with_dag(self, id=collection)]
)
filters: dict[str, str] = {
k: v
for k, v in {
"instruments": instruments,
"constellation": constellation,
"platform": platform,
"processing:level": processing_level,
"eodag:sensor_type": sensor_type,
"keywords": keywords,
"description": description,
"title": title,
}.items()
if v is not None
}
only_dates = (
True
if (not free_text and not filters and (start_date or end_date))
else False
)
free_text_evaluator = (
compile_free_text_query(free_text) if free_text else lambda _: True
)
guesses_with_score: list[tuple[str, int]] = []
for col, col_f in self.collections_config.items():
if (
col == GENERIC_COLLECTION
or col not in self._plugins_manager.collection_to_provider_config_map
):
continue
score = 0 # how many filters matched
# free text search
if free_text:
match = free_text_evaluator(col_f.model_dump())
if match:
score += 1
elif intersect:
continue # must match all filters
# individual filters
if filters:
filters_matching_method = all if intersect else any
filters_evaluators = {
filter_name: compile_free_text_query(value)
for filter_name, value in filters.items()
if value is not None
}
filter_matches = [
filters_evaluators[filter_name](
{
filter_name: col_f.__dict__[
Collection.get_collection_mtd_from_alias(filter_name)
]
}
)
for filter_name, value in filters.items()
if Collection.get_collection_mtd_from_alias(filter_name)
in col_f.__dict__
]
if filters_matching_method(filter_matches):
# add number of True matches to score
score += sum(filter_matches)
elif intersect:
continue # must match all filters
if score == 0 and not only_dates:
continue
# datetime filtering
if start_date or end_date:
min_aware = dt.datetime.min.replace(tzinfo=dt.timezone.utc)
max_aware = dt.datetime.max.replace(tzinfo=dt.timezone.utc)
col_start_str = col_f.extent.temporal.interval[0][0]
if col_start_str and isinstance(col_start_str, str):
col_start = rfc3339_str_to_datetime(col_start_str)
else:
col_start = col_start_str or min_aware
col_end_str = col_f.extent.temporal.interval[0][1]
if col_end_str and isinstance(col_end_str, str):
col_end = rfc3339_str_to_datetime(col_end_str)
else:
col_end = col_end_str or max_aware
max_start = max(
rfc3339_str_to_datetime(start_date) if start_date else min_aware,
col_start,
)
min_end = min(
rfc3339_str_to_datetime(end_date) if end_date else max_aware,
col_end,
)
if not (max_start <= min_end):
continue
guesses_with_score.append((col_f._id, score))
if guesses_with_score:
# sort by score descending, then col for stability
guesses_with_score.sort(key=lambda x: (-x[1], x[0]))
return CollectionsList(
[self.collections_config[col] for col, _ in guesses_with_score]
)
raise NoMatchingCollection()
[docs]
def search(
self,
page: int = DEFAULT_PAGE,
limit: Optional[int] = DEFAULT_LIMIT,
items_per_page: Optional[int] = DEFAULT_LIMIT,
raise_errors: bool = False,
start: Optional[str] = None,
end: Optional[str] = None,
geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None,
locations: Optional[dict[str, str]] = None,
provider: Optional[str] = None,
count: bool = False,
validate: Optional[bool] = True,
**kwargs: Any,
) -> SearchResult:
"""Look for products matching criteria on known providers.
The default behaviour is to look for products on the provider with the
highest priority supporting the requested collection. These priorities
are configurable through user configuration file or individual environment variable.
If the request to the provider with the highest priority fails or is empty, the data
will be request from the provider with the next highest priority.
Only if the request fails for all available providers, an error will be thrown.
:param page: (optional) The page number to return (**deprecated**, use
:meth:`eodag.api.search_result.SearchResult.next_page` instead)
:param limit: (optional) The number of results that must appear in one single
page. If ``None``, the maximum number possible will be used.
:param items_per_page: (optional) The number of results that must appear in one single
page. If ``None``, the maximum number possible will be used.
(**deprecated**, use ``limit`` instead)
:param raise_errors: (optional) When an error occurs when searching, if this is set to
True, the error is raised
:param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26",
"1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...).
If no time offset is given, the time is assumed to be given in UTC.
:param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26",
"1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...).
If no time offset is given, the time is assumed to be given in UTC.
:param geom: (optional) Search area that can be defined in different ways:
* with a Shapely geometry object:
:class:`shapely.geometry.base.BaseGeometry`
* with a bounding box (dict with keys: "lonmin", "latmin", "lonmax", "latmax"):
``dict.fromkeys(["lonmin", "latmin", "lonmax", "latmax"])``
* with a bounding box as list of float:
``[lonmin, latmin, lonmax, latmax]``
* with a WKT str
:param locations: (optional) Location filtering by name using locations configuration
``{"<location_name>"="<attr_regex>"}``. For example, ``{"country"="PA."}`` will use
the geometry of the features having the property ISO3 starting with
'PA' such as Panama and Pakistan in the shapefile configured with
name=country and attr=ISO3
:param provider: (optional) the provider to be used. If set, search fallback will be disabled.
If not set, the configured preferred provider will be used at first
before trying others until finding results.
:param count: (optional) Whether to run a query with a count request or not
:param validate: (optional) Set to True to validate search parameters
before sending the query to the provider
:param kwargs: Some other criteria that will be used to do the search,
using paramaters compatibles with the provider
:returns: A set of EO products matching the criteria
.. versionchanged:: v3.0.0b1
``search()`` method now returns only a single :class:`~eodag.api.search_result.SearchResult`
instead of a 2 values tuple.
.. note::
The search interfaces, which are implemented as plugins, are required to
return a list as a result of their processing. This requirement is
enforced here.
"""
if page != DEFAULT_PAGE:
warnings.warn(
"Usage of deprecated search parameter 'page' "
"(Please use 'SearchResult.next_page()' instead)"
" -- Deprecated since v4.0.0",
DeprecationWarning,
stacklevel=2,
)
search_plugins, search_kwargs = self._prepare_search(
start=start,
end=end,
geom=geom,
locations=locations,
provider=provider,
**kwargs,
)
if search_kwargs.get("id"):
# Don't validate requests by ID. "id" is not queryable.
return self._search_by_id(
search_kwargs.pop("id"),
provider=provider,
raise_errors=raise_errors,
validate=False,
**search_kwargs,
)
# remove datacube query string from kwargs which was only needed for search-by-id
search_kwargs.pop("_dc_qs", None)
# add page parameter
search_kwargs["page"] = page
errors: list[tuple[str, Exception]] = []
# Loop over available providers and return the first non-empty results
for i, search_plugin in enumerate(search_plugins):
search_plugin.clear()
# add appropriate limit value, use deprecated items_per_page if no limit given
if (not limit or limit == DEFAULT_LIMIT) and (
items_per_page and items_per_page != DEFAULT_LIMIT
):
limit = items_per_page
warnings.warn(
"Usage of deprecated search parameter 'items_per_page' "
"(Please use 'limit' instead)"
" -- Deprecated since v4.0.0",
DeprecationWarning,
stacklevel=2,
)
search_kwargs["limit"] = (
limit
if limit is not None
else getattr(search_plugin.config, "pagination", {}).get(
"max_limit", DEFAULT_MAX_LIMIT
)
)
search_results = self._do_search(
search_plugin,
count=count,
raise_errors=raise_errors,
validate=validate,
**search_kwargs,
)
errors.extend(search_results.errors)
if len(search_results) == 0 and i < len(search_plugins) - 1:
logger.warning(
f"No result could be obtained from provider {search_plugin.provider}, "
"we will try to get the data from another provider",
)
elif len(search_results) > 0:
search_results.errors = errors
if count and search_results.number_matched:
logger.info(
"Found %s result(s) on provider '%s'",
search_results.number_matched,
search_results[0].provider,
)
return search_results
if i > 1:
logger.error("No result could be obtained from any available provider")
return SearchResult([], 0, errors) if count else SearchResult([], errors=errors)
[docs]
@_deprecated(
reason="Please use 'SearchResult.next_page()' instead",
version="4.0.0",
)
def search_iter_page(
self,
limit: int = DEFAULT_LIMIT,
items_per_page: Optional[int] = DEFAULT_LIMIT,
start: Optional[str] = None,
end: Optional[str] = None,
geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None,
locations: Optional[dict[str, str]] = None,
**kwargs: Any,
) -> Iterator[SearchResult]:
"""Iterate over the pages of a products search.
.. deprecated:: v4.0.0
Please use :meth:`eodag.api.search_result.SearchResult.next_page` instead.
:param limit: (optional) The number of results requested per page
:param items_per_page: (optional) The number of results requested per page
(**deprecated**, use ``limit`` instead)
:param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26",
"1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...).
If no time offset is given, the time is assumed to be given in UTC.
:param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26",
"1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...).
If no time offset is given, the time is assumed to be given in UTC.
:param geom: (optional) Search area that can be defined in different ways:
* with a Shapely geometry object:
:class:`shapely.geometry.base.BaseGeometry`
* with a bounding box (dict with keys: "lonmin", "latmin", "lonmax", "latmax"):
``dict.fromkeys(["lonmin", "latmin", "lonmax", "latmax"])``
* with a bounding box as list of float:
``[lonmin, latmin, lonmax, latmax]``
* with a WKT str
:param locations: (optional) Location filtering by name using locations configuration
``{"<location_name>"="<attr_regex>"}``. For example, ``{"country"="PA."}`` will use
the geometry of the features having the property ISO3 starting with
'PA' such as Panama and Pakistan in the shapefile configured with
name=country and attr=ISO3
:param kwargs: Some other criteria that will be used to do the search,
using paramaters compatibles with the provider
:returns: An iterator that yields page per page a set of EO products
matching the criteria
"""
search_plugins, search_kwargs = self._prepare_search(
start=start, end=end, geom=geom, locations=locations, **kwargs
)
# use deprecated items_per_page if limit is not given
if (not limit or limit == DEFAULT_LIMIT) and (
items_per_page and items_per_page != DEFAULT_LIMIT
):
limit = items_per_page
warnings.warn(
"Usage of deprecated search parameter 'items_per_page' "
"(Please use 'limit' instead)"
" -- Deprecated since v4.0.0",
DeprecationWarning,
stacklevel=2,
)
for i, search_plugin in enumerate(search_plugins):
try:
return self.search_iter_page_plugin(
limit=limit,
search_plugin=search_plugin,
**search_kwargs,
)
except RequestError:
if i < len(search_plugins) - 1:
logger.warning(
"No result could be obtained from provider %s, "
"we will try to get the data from another provider",
search_plugin.provider,
)
else:
logger.error(
"No result could be obtained from any available provider"
)
raise
raise RequestError("No result could be obtained from any available provider")
@_deprecated(
reason="Please use 'SearchResult.next_page()' instead",
version="4.0.0",
)
def search_iter_page_plugin(
self,
search_plugin: Union[Search, Api],
limit: int = DEFAULT_LIMIT,
items_per_page: Optional[int] = DEFAULT_LIMIT,
**kwargs: Any,
) -> Iterator[SearchResult]:
"""Iterate over the pages of a products search using a given search plugin.
.. deprecated:: v4.0.0
Please use :meth:`eodag.api.search_result.SearchResult.next_page` instead.
:param limit: (optional) The number of results requested per page
:param items_per_page: (optional) The number of results requested per page
(**deprecated**, use ``limit`` instead)
:param kwargs: Some other criteria that will be used to do the search,
using parameters compatibles with the provider
:param search_plugin: search plugin to be used
:returns: An iterator that yields page per page a set of EO products
matching the criteria
"""
# use deprecated items_per_page if limit is not given
if (not limit or limit == DEFAULT_LIMIT) and (
items_per_page and items_per_page != DEFAULT_LIMIT
):
limit = items_per_page
warnings.warn(
"Usage of deprecated search parameter 'items_per_page' "
"(Please use 'limit' instead)"
" -- Deprecated since v4.0.0",
DeprecationWarning,
stacklevel=2,
)
kwargs.update(
page=1,
limit=limit,
)
try:
# remove unwanted kwargs for _do_search
kwargs.pop("raise_errors", None)
search_result = self._do_search(search_plugin, raise_errors=True, **kwargs)
search_result.raise_errors = True
except Exception:
logger.warning(
"error at retrieval of data from %s, for params: %s",
search_plugin.provider,
str(kwargs),
)
raise
if len(search_result) == 0:
return
# remove unwanted kwargs for next_page
if kwargs.get("count") is True:
kwargs["count"] = False
kwargs.pop("page", None)
search_result.search_params = kwargs
if search_result._dag is None:
search_result._dag = self
yield search_result
for next_result in search_result.next_page():
if len(next_result) == 0:
break
yield next_result
[docs]
def search_all(
self,
limit: Optional[int] = None,
items_per_page: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None,
locations: Optional[dict[str, str]] = None,
**kwargs: Any,
) -> SearchResult:
"""Search and return all the products matching the search criteria.
It iterates over the pages of a search query and collects all the returned
products into a single :class:`~eodag.api.search_result.SearchResult` instance.
Requests are attempted to all providers of the product ordered by descending piority.
:param limit: (optional) The number of results requested internally per
page. The maximum number of items than can be requested
at once to a provider has been configured in EODAG for
some of them. If limit is None and this number
is available for the searched provider, it is used to
limit the number of requests made. This should also
reduce the time required to collect all the products
matching the search criteria. If this number is not
available, a default value of 50 is used instead.
limit can also be set to any arbitrary value.
:param items_per_page: (optional) The number of results requested internally per page
(**deprecated**, use ``limit`` instead)
:param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26",
"1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...).
If no time offset is given, the time is assumed to be given in UTC.
:param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26",
"1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...).
If no time offset is given, the time is assumed to be given in UTC.
:param geom: (optional) Search area that can be defined in different ways:
* with a Shapely geometry object:
:class:`shapely.geometry.base.BaseGeometry`
* with a bounding box (dict with keys: "lonmin", "latmin", "lonmax", "latmax"):
``dict.fromkeys(["lonmin", "latmin", "lonmax", "latmax"])``
* with a bounding box as list of float:
``[lonmin, latmin, lonmax, latmax]``
* with a WKT str
:param locations: (optional) Location filtering by name using locations configuration
``{"<location_name>"="<attr_regex>"}``. For example, ``{"country"="PA."}`` will use
the geometry of the features having the property ISO3 starting with
'PA' such as Panama and Pakistan in the shapefile configured with
name=country and attr=ISO3
:param kwargs: Some other criteria that will be used to do the search,
using parameters compatible with the provider
:returns: An iterator that yields page per page a set of EO products
matching the criteria
"""
# remove unwanted count
kwargs.pop("count", None)
# use deprecated items_per_page if limit is not given
if not limit and items_per_page:
limit = items_per_page
warnings.warn(
"Usage of deprecated search parameter 'items_per_page' "
"(Please use 'limit' instead)"
" -- Deprecated since v4.0.0",
DeprecationWarning,
stacklevel=2,
)
# First search
search_results = self.search(
limit=limit,
start=start,
end=end,
geom=geom,
locations=locations,
**kwargs,
)
if len(search_results) == 0:
return search_results
try:
search_results.raise_errors = True
# consume iterator
deque(search_results.next_page(update=True))
logger.info(
"Found %s result(s) on provider '%s'",
len(search_results),
search_results[0].provider,
)
search_results.number_matched = len(search_results)
except RequestError:
logger.warning(
"Found %s result(s) on provider '%s', but it may be incomplete "
"as it ended with an error",
len(search_results),
search_results[0].provider,
)
return search_results
def _search_by_id(
self, uid: str, provider: Optional[str] = None, **kwargs: Any
) -> SearchResult:
"""Internal method that enables searching a product by its id.
Keeps requesting providers until a result matching the id is supplied. The
search plugins should be developed in the way that enable them to handle the
support of a search by id by the providers. The providers are requested one by
one, in the order defined by their priorities. Be aware that because of that,
the search can be slow, if the priority order is such that the provider that
contains the requested product has the lowest priority. However, you can always
speed up a little the search by passing the name of the provider on which to
perform the search, if this information is available
:param uid: The uid of the EO product
:param provider: (optional) The provider on which to search the product.
This may be useful for performance reasons when the user
knows this product is available on the given provider
:param kwargs: Search criteria to help finding the right product
:returns: A search result with one EO product or None at all
"""
collection = kwargs.get("collection")
if collection is not None:
try:
collection = self.get_collection_from_alias(collection)
except NoMatchingCollection:
logger.debug("collection %s not found", collection)
get_search_plugins_kwargs = dict(provider=provider, collection=collection)
search_plugins = self._plugins_manager.get_search_plugins(
**get_search_plugins_kwargs
)
# datacube query string
_dc_qs = kwargs.pop("_dc_qs", None)
results = SearchResult([])
for plugin in search_plugins:
logger.info(
"Searching product with id '%s' on provider: %s", uid, plugin.provider
)
logger.debug("Using plugin class for search: %s", plugin.__class__.__name__)
plugin.clear()
# adds maximal pagination to be able to do a search-all + crunch if more
# than one result are returned
limit = plugin.config.pagination.get("max_limit", DEFAULT_MAX_LIMIT)
kwargs.update(limit=limit)
if isinstance(plugin, PostJsonSearch):
kwargs.update(
limit=limit,
_dc_qs=_dc_qs,
)
else:
kwargs.update(
limit=limit,
)
try:
# if more than one results are found, try getting them all and then filter using crunch
for page_results in self.search_iter_page_plugin(
search_plugin=plugin,
id=uid,
**kwargs,
):
results.data.extend(page_results.data)
except Exception as e:
if kwargs.get("raise_errors"):
raise
logger.warning(e)
results.errors.append((plugin.provider, e))
continue
# try using crunch to get unique result
if (
len(results) > 1
and len(filtered := results.filter_property(id=uid)) == 1
):
results = filtered
if len(results) == 1:
if not results[0].collection:
# guess collection from properties
guesses = self.guess_collection(**results[0].properties)
results[0].collection = guesses[0].id
# reset driver
results[0].driver = results[0].get_driver()
results.number_matched = 1
return results
elif len(results) > 1:
logger.info(
"Several products found for this id (%s). You may try searching using more selective criteria.",
results,
)
return SearchResult([], 0, results.errors)
def _fetch_external_collection(self, provider: str, collection: str):
plugins = self._plugins_manager.get_search_plugins(provider=provider)
plugin = next(plugins)
# check after plugin init if still fetchable
if not getattr(plugin.config, "discover_collections", {}).get("fetch_url"):
return None
kwargs: dict[str, Any] = {"collection": collection}
# append auth if needed
if getattr(plugin.config, "need_auth", False):
if auth := self._plugins_manager.get_auth(
plugin.provider,
getattr(plugin.config, "api_endpoint", None),
plugin.config,
):
kwargs["auth"] = auth
collection_config = plugin.discover_collections(**kwargs)
self.update_collections_list({provider: collection_config})
def _prepare_search(
self,
start: Optional[str] = None,
end: Optional[str] = None,
geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None,
locations: Optional[dict[str, str]] = None,
provider: Optional[str] = None,
**kwargs: Any,
) -> tuple[list[Union[Search, Api]], dict[str, Any]]:
"""Internal method to prepare the search kwargs and get the search plugins.
Product query:
* By id (plus optional 'provider')
* By search params:
* collection query:
* By collection (e.g. 'S2_MSI_L1C')
* By params (e.g. 'platform'), see guess_collection
* dates: 'start' and/or 'end'
* geometry: 'geom' or 'bbox' or 'box'
* search locations
* TODO: better expose cloudCover
* other search params are passed to Searchplugin.query()
:param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26",
"1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...).
If no time offset is given, the time is assumed to be given in UTC.
:param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26",
"1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...).
If no time offset is given, the time is assumed to be given in UTC.
:param geom: (optional) Search area that can be defined in different ways (see search)
:param locations: (optional) Location filtering by name using locations configuration
:param provider: provider to be used, if no provider is given or the collection
is not available for the provider, the preferred provider is used
:param kwargs: Some other criteria
* id and/or a provider for a search by
* search criteria to guess the collection
* other criteria compatible with the provider
:returns: Search plugins list and the prepared kwargs to make a query.
"""
collection: Optional[str] = kwargs.get("collection")
if collection is None:
try:
guesses = self.guess_collection(**kwargs)
# guess_collection raises a NoMatchingCollection error if no product
# is found. Here, the supported search params are removed from the
# kwargs if present, not to propagate them to the query itself.
for param in (
"instruments",
"constellation",
"platform",
"processing:level",
"eodag:sensor_type",
):
kwargs.pop(param, None)
# By now, only use the best bet
collection = guesses[0].id
except NoMatchingCollection:
queried_id = kwargs.get("id")
if queried_id is None:
logger.info(
"No collection could be guessed with provided arguments"
)
else:
return [], kwargs
if collection is not None:
try:
collection = self.get_collection_from_alias(collection)
except NoMatchingCollection:
logger.info("unknown collection " + collection)
kwargs["collection"] = collection
if start is not None:
if kwargs.pop("datetime", None):
logger.warning("datetime filter is overwritten by start")
kwargs["start_datetime"] = start
if end is not None:
if kwargs.pop("datetime", None):
logger.warning("datetime filter is overwritten by end")
kwargs["end_datetime"] = end
if not start and not end and "datetime" in kwargs:
datetimes = get_datetime(kwargs)
kwargs["start_datetime"] = datetimes[0]
kwargs["end_datetime"] = datetimes[1]
if "sort_by" in kwargs:
new_sort_by = []
for param_tuple in kwargs["sort_by"]:
if param_tuple[0] == "datetime":
new_sort_by.append(("start_datetime", param_tuple[1]))
else:
new_sort_by.append(param_tuple)
kwargs["sort_by"] = new_sort_by
if geom is not None:
kwargs["geometry"] = geom
elif "intersects" in kwargs:
kwargs["geometry"] = kwargs.pop("intersects")
box = kwargs.pop("box", None)
box = kwargs.pop("bbox", box)
if geom is None and box is not None:
kwargs["geometry"] = box
kwargs["locations"] = locations
kwargs["geometry"] = get_geometry_from_various(self.locations_config, **kwargs)
# remove locations_args from kwargs now that they have been used
locations_dict = {loc["name"]: loc for loc in self.locations_config}
for arg in locations_dict.keys():
kwargs.pop(arg, None)
del kwargs["locations"]
# fetch collections list if collection is unknown
if (
collection
not in self._plugins_manager.collection_to_provider_config_map.keys()
):
if provider and collection:
# fetch ref for given provider and collection
logger.debug(
f"Fetching external collections sources to find {provider} {collection} collection"
)
self.fetch_collections_list(provider)
if (
collection
not in self._plugins_manager.collection_to_provider_config_map.keys()
):
# Try to get specific collection from external provider
logger.debug(
"Fetching %s to find %s collection", provider, collection
)
self._fetch_external_collection(provider, collection)
if not provider:
# no provider or still not found -> fetch all external collections
logger.debug(
f"Fetching external collections sources to find {collection} collection"
)
self.fetch_collections_list()
preferred_provider = self.get_preferred_provider()[0]
search_plugins: list[Union[Search, Api]] = []
for plugin in self._plugins_manager.get_search_plugins(
collection=collection, provider=provider
):
# exclude MeteoblueSearch plugins from search fallback for unknown collection
if (
provider != plugin.provider
and preferred_provider != plugin.provider
and collection not in self.collections_config
and isinstance(plugin, MeteoblueSearch)
):
continue
search_plugins.append(plugin)
if not provider:
provider = preferred_provider
providers = [plugin.provider for plugin in search_plugins]
if provider not in providers:
logger.debug(
"Collection '%s' is not available with preferred provider '%s'.",
collection,
provider,
)
else:
provider_plugin = list(
filter(lambda p: p.provider == provider, search_plugins)
)[0]
search_plugins.remove(provider_plugin)
search_plugins.insert(0, provider_plugin)
# Add collections_config to plugin config. This dict contains product
# type metadata that will also be stored in each product's properties.
for search_plugin in search_plugins:
if collection is not None:
self._attach_collection_config(search_plugin, collection)
return search_plugins, kwargs
def _do_search(
self,
search_plugin: Union[Search, Api],
count: bool = False,
raise_errors: bool = False,
validate: Optional[bool] = True,
**kwargs: Any,
) -> SearchResult:
"""Internal method that performs a search on a given provider.
:param search_plugin: A search plugin
:param count: (optional) Whether to run a query with a count request or not
:param raise_errors: (optional) When an error occurs when searching, if this is set to
True, the error is raised
:param kwargs: Some other criteria that will be used to do the search
:param validate: (optional) Set to True to validate search parameters
before sending the query to the provider
:returns: A collection of EO products matching the criteria
"""
logger.info("Searching on provider %s", search_plugin.provider)
max_limit = getattr(search_plugin.config, "pagination", {}).get(
"max_limit", DEFAULT_MAX_LIMIT
)
if kwargs.get("limit", DEFAULT_LIMIT) > max_limit and max_limit > 0:
logger.warning(
"EODAG believes that you might have asked for more products/items "
"than the maximum allowed by '%s': %s > %s. Try to lower "
"the value of 'limit' and get the next page (e.g. 'page=2'), "
"or directly use the 'search_all' method.",
search_plugin.provider,
kwargs["limit"],
max_limit,
)
errors: list[tuple[str, Exception]] = []
try:
prep = PreparedSearch(count=count)
prep.raise_errors = raise_errors
# append auth if needed
if getattr(search_plugin.config, "need_auth", False):
if auth := self._plugins_manager.get_auth(
search_plugin.provider,
getattr(search_plugin.config, "api_endpoint", None),
search_plugin.config,
):
prep.auth = auth
prep.limit = kwargs.pop("limit", None)
prep.next_page_token = kwargs.pop("next_page_token", None)
prep.next_page_token_key = kwargs.pop(
"next_page_token_key", None
) or search_plugin.config.pagination.get("next_page_token_key", "page")
prep.page = kwargs.pop("page", None)
if (
prep.next_page_token_key == "page"
and prep.limit is not None
and prep.next_page_token is None
and prep.page is not None
):
prep.next_page_token = str(
prep.page
- 1
+ search_plugin.config.pagination.get("start_page", DEFAULT_PAGE)
)
# remove None values and convert param names to their pydantic alias if any
search_params = {}
queryables_fields = Queryables.from_stac_models().model_fields
for param, value in kwargs.items():
if value is None:
continue
if param in queryables_fields:
param_alias = queryables_fields[param].alias or param
search_params[param_alias] = value
else:
# remove `provider:` or `provider_` prefix if any
search_params[
re.sub(r"^" + search_plugin.provider + r"[_:]", "", param)
] = value
if validate:
search_plugin.validate(search_params, prep.auth)
search_result = search_plugin.query(prep, **search_params)
if not isinstance(search_result, SearchResult):
raise PluginImplementationError(
"The query function of a Search plugin must return a SearchResult "
"results, got {} instead".format(type(search_result))
)
# Filter and attach to each eoproduct in the result the plugin capable of
# downloading it (this is done to enable the eo_product to download itself
# doing: eo_product.download()). The filtering is done by keeping only
# those eo_products that intersects the search extent (if there was no
# search extent, search_intersection contains the geometry of the
# eo_product)
# WARNING: this means an eo_product that has an invalid geometry can still
# be returned as a search result if there was no search extent (because we
# will not try to do an intersection)
for eo_product in search_result:
# if collection is not defined, try to guess using properties
if eo_product.collection is None:
pattern = re.compile(r"[^\w,]+")
try:
guesses = self.guess_collection(
intersect=False,
**{
k: pattern.sub("", str(v).upper())
for k, v in eo_product.properties.items()
if k
in [
"instruments",
"constellation",
"platform",
"processing:level",
"eodag:sensor_type",
"keywords",
]
and v is not None
},
)
except NoMatchingCollection:
pass
else:
eo_product.collection = guesses[0].id
if eo_product.search_intersection is not None:
eo_product._register_downloader_from_manager(self._plugins_manager)
# Make next_page not available if the current one returned less than the maximum number of items asked for.
if not prep.limit or len(search_result) < prep.limit:
search_result.next_page_token = None
search_result._dag = self
return search_result
except Exception as e:
if raise_errors:
# Raise the error, letting the application wrapping eodag know that
# something went bad. This way it will be able to decide what to do next
raise
else:
logger.exception(
"Error while searching on provider %s (ignored):",
search_plugin.provider,
)
errors.append((search_plugin.provider, e))
return SearchResult([], 0, errors)
[docs]
def crunch(self, results: SearchResult, **kwargs: Any) -> SearchResult:
"""Apply the filters given through the keyword arguments to the results
:param results: The results of a eodag search request
:returns: The result of successively applying all the filters to the results
"""
search_criteria = kwargs.pop("search_criteria", {})
for cruncher_name, cruncher_args in kwargs.items():
cruncher = self._plugins_manager.get_crunch_plugin(
cruncher_name, **cruncher_args
)
results = results.crunch(cruncher, **search_criteria)
return results
[docs]
@staticmethod
def group_by_extent(searches: list[SearchResult]) -> list[SearchResult]:
"""Combines multiple SearchResults and return a list of SearchResults grouped
by extent (i.e. bounding box).
:param searches: List of eodag SearchResult
:returns: list of :class:`~eodag.api.search_result.SearchResult`
"""
# Dict with extents as keys, each extent being defined by a str
# "{minx}{miny}{maxx}{maxy}" (each float rounded to 2 dec).
products_grouped_by_extent: dict[str, Any] = {}
for search in searches:
for product in search:
same_geom = products_grouped_by_extent.setdefault(
"".join([str(round(p, 2)) for p in product.geometry.bounds]), []
)
same_geom.append(product)
return [
SearchResult(products_grouped_by_extent[extent_as_str])
for extent_as_str in products_grouped_by_extent
]
[docs]
def download_all(
self,
search_result: SearchResult,
downloaded_callback: Optional[DownloadedCallback] = None,
progress_callback: Optional[ProgressCallback] = None,
executor: Optional[ThreadPoolExecutor] = None,
wait: float = DEFAULT_DOWNLOAD_WAIT,
timeout: float = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Unpack[DownloadConf],
) -> list[str]:
"""Download all products resulting from a search.
:param search_result: A set of EO products resulting from a search
:param downloaded_callback: (optional) A method or a callable object which takes
as parameter the ``product``. You can use the base class
:class:`~eodag.utils.DownloadedCallback` and override
its ``__call__`` method. Will be called each time a product
finishes downloading
:param progress_callback: (optional) A method or a callable object
which takes a current size and a maximum
size as inputs and handle progress bar
creation and update to give the user a
feedback on the download progress
:param executor: (optional) An executor to download EO products of ``search_result`` in parallel
which will also be reused to download assets of these products in parallel.
:param wait: (optional) If download fails, wait time in minutes between
two download tries of the same product
:param timeout: (optional) If download fails, maximum time in minutes
before stop retrying to download
:param kwargs: Additional keyword arguments from the download plugin configuration class that can
be provided to override any other values defined in a configuration file
or with environment variables:
* ``output_dir`` - where to store downloaded products, as an absolute file path
(Default: local temporary directory)
* ``output_extension`` - downloaded file extension
* ``extract`` - whether to extract the downloaded products, only applies to archived products
* ``dl_url_params`` - additional parameters to pass over to the download url as an url parameter
* ``delete_archive`` - whether to delete the downloaded archives
* ``asset`` - regex filter to identify assets to download
:returns: A collection of the absolute paths to the downloaded products
"""
paths = []
if search_result:
logger.info("Downloading %s products", len(search_result))
# Get download plugin using first product assuming all plugins use base.Download.download_all
download_plugin = self._plugins_manager.get_download_plugin(
search_result[0]
)
paths = download_plugin.download_all(
search_result,
downloaded_callback=downloaded_callback,
progress_callback=progress_callback,
executor=executor,
wait=wait,
timeout=timeout,
**kwargs,
)
else:
logger.info("Empty search result, nothing to be downloaded !")
return paths
[docs]
@staticmethod
def serialize(
search_result: SearchResult,
filename: str = "search_results.geojson",
skip_invalid: bool = True,
) -> str:
"""Registers results of a search into a geojson file.
The output is a FeatureCollection containing the EO products as features,
with additional metadata such as ``number_matched``, ``next_page_token``,
and ``search_params`` stored in the properties.
:param search_result: A set of EO products resulting from a search
:param filename: (optional) The name of the file to generate
:param skip_invalid: Whether to skip properties whose values are not valid according to the STAC specification.
:returns: The name of the created file
"""
search_result_dict = search_result.as_dict(skip_invalid=skip_invalid)
# add self link
search_result_dict.setdefault("links", [])
search_result_dict["links"].append(
{
"rel": "self",
"href": f"{filename}",
"type": "application/json",
},
)
# write search results
with open(filename, "w") as fh:
geojson.dump(search_result_dict, fh)
logger.debug("Search results saved to %s", filename)
# write collection(s)
if search_result._dag is None:
return filename
collections = set(p.collection for p in search_result)
for collection in collections:
collection_obj = search_result._dag.collections_config.get(
collection, Collection(id=collection)
)
collection_dict = collection_obj.serialize()
# add links
collection_dict.setdefault("links", [])
collection_dict["links"].append(
{
"rel": "self",
"href": f"{collection}.json",
"type": "application/json",
},
)
with open(Path(filename).parent / f"{collection}.json", "w") as fh:
geojson.dump(collection_dict, fh)
logger.debug("Collection '%s' saved to %s", collection, fh.name)
return filename
[docs]
@staticmethod
def deserialize(filename: str) -> SearchResult:
"""Loads results of a search from a geojson file.
:param filename: A filename containing a search result encoded as a geojson
:returns: The search results encoded in `filename`
"""
return SearchResult.from_file(filename)
[docs]
def deserialize_and_register(self, filename: str) -> SearchResult:
"""Loads results of a search from a geojson file and register
products with the information needed to download itself.
This method also sets the internal EODataAccessGateway instance on the products,
enabling pagination (e.g. access to next pages) if available.
:param filename: A filename containing a search result encoded as a geojson
:returns: The search results encoded in `filename`, ready for download and pagination
"""
return SearchResult.from_file(filename, self)
[docs]
def download(
self,
product: EOProduct,
progress_callback: Optional[ProgressCallback] = None,
executor: Optional[ThreadPoolExecutor] = None,
wait: float = DEFAULT_DOWNLOAD_WAIT,
timeout: float = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Unpack[DownloadConf],
) -> str:
"""Download a single product.
This is an alias to the method of the same name on
:class:`~eodag.api.product._product.EOProduct`, but it performs some additional
checks like verifying that a downloader and authenticator are registered
for the product before trying to download it.
If the metadata mapping for ``eodag:download_link`` is set to something that can be
interpreted as a link on a
local filesystem, the download is skipped (by now, only a link starting
with ``file:/`` is supported). Therefore, any user that knows how to extract
product location from product metadata on a provider can override the
``eodag:download_link`` metadata mapping in the right way. For example, using the
environment variable:
``EODAG__CREODIAS__SEARCH__METADATA_MAPPING__EODAG_DOWNLOAD_LINK="file:///{id}"`` will
lead to all :class:`~eodag.api.product._product.EOProduct`'s originating from the
provider ``creodias`` to have their ``eodag:download_link`` metadata point to something like:
``file:///12345-678``, making this method immediately return the later string without
trying to download the product.
:param product: The EO product to download
:param progress_callback: (optional) A method or a callable object
which takes a current size and a maximum
size as inputs and handle progress bar
creation and update to give the user a
feedback on the download progress
:param executor: (optional) An executor to download assets of ``product`` in parallel if it has any. If ``None``
, a default executor will be created
:param wait: (optional) If download fails, wait time in minutes between
two download tries
:param timeout: (optional) If download fails, maximum time in minutes
before stop retrying to download
:param kwargs: Additional keyword arguments from the download plugin configuration class that can
be provided to override any other values defined in a configuration file
or with environment variables:
* ``output_dir`` - where to store downloaded products, as an absolute file path
(Default: local temporary directory)
* ``output_extension`` - downloaded file extension
* ``extract`` - whether to extract the downloaded products, only applies to archived products
* ``dl_url_params`` - additional parameters to pass over to the download url as an url parameter
* ``delete_archive`` - whether to delete the downloaded archives
* ``asset`` - regex filter to identify assets to download
:returns: The absolute path to the downloaded product in the local filesystem
:raises: :class:`~eodag.utils.exceptions.PluginImplementationError`
:raises: :class:`RuntimeError`
"""
if product.location.startswith("file:/"):
logger.info("Local product detected. Download skipped")
return uri_to_path(product.location)
self._setup_downloader(product)
path = product.download(
progress_callback=progress_callback,
executor=executor,
wait=wait,
timeout=timeout,
**kwargs,
)
return path
def _setup_downloader(self, product: EOProduct) -> None:
if product.downloader is None:
downloader = self._plugins_manager.get_download_plugin(product)
auth = product.downloader_auth
if auth is None:
auth = self._plugins_manager.get_auth_plugin(downloader, product)
product.register_downloader(downloader, auth)
[docs]
def get_cruncher(self, name: str, **options: Any) -> Crunch:
"""Build a crunch plugin from a configuration
:param name: The name of the cruncher to build
:param options: The configuration options of the cruncher
:returns: The cruncher named ``name``
"""
plugin_conf = {"name": name}
plugin_conf.update({key.replace("-", "_"): val for key, val in options.items()})
return self._plugins_manager.get_crunch_plugin(name, **plugin_conf)
[docs]
def list_queryables(
self,
provider: Optional[str] = None,
fetch_providers: bool = True,
**kwargs: Any,
) -> QueryablesDict:
"""Fetch the queryable properties for a given collection and/or provider.
:param provider: (optional) The provider.
:param fetch_providers: If new collections should be fetched from the providers; default: True
:param kwargs: additional filters for queryables (`collection` or other search arguments)
:returns: A :class:`~eodag.api.product.queryables.QueryablesDict` containing the EODAG queryable
properties, associating parameters to their annotated type, and a additional_properties attribute
"""
# only fetch providers if collection is not found
available_collections: list[str] = [
col.id
for col in self.list_collections(provider=provider, fetch_providers=False)
]
collection: Optional[str] = kwargs.get("collection")
coll_alias: Optional[str] = collection
if collection:
if collection not in available_collections:
if fetch_providers:
# fetch providers and try again
available_collections = [
col.id
for col in self.list_collections(
provider=provider, fetch_providers=True
)
]
try:
kwargs["collection"] = collection = self.get_collection_from_alias(
collection
)
except NoMatchingCollection:
# try fetching queryables for custom collection even if not known
pass
if not provider and not collection:
return QueryablesDict(
additional_properties=True,
**model_fields_to_annotated(CommonQueryables.model_fields),
)
additional_properties = False
additional_information = []
queryable_properties: dict[str, Any] = {}
for plugin in self._plugins_manager.get_search_plugins(collection, provider):
# attach collection config
collection_configs: dict[str, Any] = {}
if collection:
self._attach_collection_config(plugin, collection)
collection_configs[collection] = plugin.config.collection_config
else:
for col in available_collections:
self._attach_collection_config(plugin, col)
collection_configs[col] = plugin.config.collection_config
# authenticate if required
if getattr(plugin.config, "need_auth", False) and (
auth := self._plugins_manager.get_auth_plugin(plugin)
):
try:
plugin.auth = auth.authenticate()
except AuthenticationError:
logger.debug(
"queryables from provider %s could not be fetched due to an authentication error",
plugin.provider,
)
# use queryables aliases
kwargs_alias = {**kwargs}
queryables_fields = Queryables.from_stac_models().model_fields
for search_param, field_info in queryables_fields.items():
if search_param in kwargs and field_info.alias:
if isinstance(field_info.alias, AliasChoices):
kwargs_alias[
str(field_info.alias.choices[0])
] = kwargs_alias.pop(search_param)
else:
kwargs_alias[field_info.alias] = kwargs_alias.pop(search_param)
plugin_queryables = plugin.list_queryables(
kwargs_alias,
available_collections,
collection_configs,
collection,
coll_alias,
)
if plugin_queryables.additional_information:
additional_information.append(
f"{plugin.provider}: {plugin_queryables.additional_information}"
)
queryable_properties = {**plugin_queryables, **queryable_properties}
additional_properties = (
additional_properties or plugin_queryables.additional_properties
)
return QueryablesDict(
additional_properties=additional_properties,
additional_information=" | ".join(additional_information),
**queryable_properties,
)
[docs]
def available_sortables(self) -> dict[str, Optional[ProviderSortables]]:
"""For each provider, gives its available sortable parameter(s) and its maximum
number of them if it supports the sorting feature, otherwise gives None.
:returns: A dictionary with providers as keys and dictionary of sortable parameter(s) and
its (their) maximum number as value(s).
:raises: :class:`~eodag.utils.exceptions.UnsupportedProvider`
"""
sortables: dict[str, Optional[ProviderSortables]] = {}
provider_search_plugins = self._plugins_manager.get_search_plugins()
for provider_search_plugin in provider_search_plugins:
provider = provider_search_plugin.provider
if not hasattr(provider_search_plugin.config, "sort"):
sortables[provider] = None
continue
sortable_params = list(
provider_search_plugin.config.sort.get("sort_param_mapping", {}).keys()
)
if not provider_search_plugin.config.sort.get("max_sort_params"):
sortables[provider] = {
"sortables": sortable_params,
"max_sort_params": None,
}
continue
sortables[provider] = {
"sortables": sortable_params,
"max_sort_params": provider_search_plugin.config.sort[
"max_sort_params"
],
}
return sortables
def _attach_collection_config(self, plugin: Search, collection: str) -> None:
"""
Attach collections_config to plugin config. This dict contains product
type metadata that will also be stored in each product's properties.
"""
try:
plugin.config.collection_config = dict(
[
c.model_dump(mode="json", exclude={"id"})
for c in self.list_collections(
plugin.provider, fetch_providers=False
)
if c._id == collection
][0],
**{"collection": collection},
)
# If the product isn't in the catalog, it's a generic collection.
except IndexError:
# Construct the GENERIC_COLLECTION metadata
plugin.config.collection_config = dict(
**self.collections_config[GENERIC_COLLECTION].model_dump(
mode="json", exclude={"id"}
),
collection=collection,
)
[docs]
def import_stac_items(
self, items_urls: list[str], provider: Optional[str] = None
) -> SearchResult:
"""Import STAC items from a list of URLs and convert them to SearchResult.
- Origin provider and download links will be set if item comes from an EODAG
server.
- If item comes from a known EODAG provider, result will be registered to it,
ready to download and its metadata normalized.
- If item comes from an unknown provider, a generic STAC provider will be used.
:param items_urls: A list of STAC items URLs to import
:param provider: (optional) The EODAG provider to which the STAC items belong, if known.
:returns: A SearchResult containing the imported STAC items
"""
json_items = []
for item_url in items_urls:
json_items.extend(fetch_stac_items(item_url))
# add a generic STAC provider that might be needed to handle the items
self.add_provider(GENERIC_STAC_PROVIDER)
results = SearchResult([])
for json_item in json_items:
if product := EOProduct._from_stac_item(
json_item, self._plugins_manager, provider
):
results.append(product)
return results