# -*- coding: utf-8 -*-
# Copyright 2023, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import time
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
import requests
from eodag import EOProduct
from eodag.api.product.metadata_mapping import (
format_query_params,
mtd_cfg_as_conversion_and_querypath,
properties_from_json,
)
from eodag.plugins.search import PreparedSearch
from eodag.plugins.search.base import Search
from eodag.utils import (
DEFAULT_ITEMS_PER_PAGE,
DEFAULT_MISSION_START_DATE,
DEFAULT_PAGE,
GENERIC_PRODUCT_TYPE,
HTTP_REQ_TIMEOUT,
USER_AGENT,
deepcopy,
string_to_jsonpath,
)
from eodag.utils.exceptions import (
NotAvailableError,
RequestError,
TimeOutError,
ValidationError,
)
if TYPE_CHECKING:
from eodag.config import PluginConfig
logger = logging.getLogger("eodag.search.data_request_search")
[docs]
class DataRequestSearch(Search):
"""
Plugin to execute search requests composed of several steps:
#. do a data request which defines which data shall be searched
#. check the status of the request job
#. if finished - fetch the result of the job
:param provider: provider name
:param config: Search plugin configuration:
* :attr:`~eodag.config.PluginConfig.api_endpoint` (``str``) (**mandatory**): The endpoint of the
provider's search interface
* :attr:`~eodag.config.PluginConfig.results_entry` (``str``) (**mandatory**): The name of
the key in the provider search result that gives access to the result entries
* :attr:`~eodag.config.PluginConfig.data_request_url` (``str``) (**mandatory**): url
to which the data request shall be sent
* :attr:`~eodag.config.PluginConfig.status_url` (``str``) (**mandatory**): url to fetch
the status of the data request
* :attr:`~eodag.config.PluginConfig.result_url` (``str``) (**mandatory**): url to fetch
the search result when the data request is done
* :attr:`~eodag.config.PluginConfig.need_auth` (``bool``): if authentication is needed for
the search request; default: ``False``
* :attr:`~eodag.config.PluginConfig.auth_error_code` (``int``): which error code is returned in case of an
authentication error; only used if ``need_auth=true``
* :attr:`~eodag.config.PluginConfig.ssl_verify` (``bool``): if the ssl certificates should be
verified in requests; default: ``True``
* :attr:`~eodag.config.PluginConfig.timeout` (``int``): time to wait until request timeout in seconds;
default: ``5``
* :attr:`~eodag.config.PluginConfig.dates_required` (``bool``): if date parameters are mandatory
in the request; default: ``True``
* :attr:`~eodag.config.PluginConfig.pagination` (:class:`~eodag.config.PluginConfig.Pagination`)
(**mandatory**): The configuration of how the pagination is done on the provider. It is a tree with the
following nodes:
* :attr:`~eodag.config.PluginConfig.Pagination.total_items_nb_key_path` (``str``): An XPath or JsonPath
leading to the total number of results satisfying a request. This is used for providers which provides the
total results metadata along with the result of the query and don't have an endpoint for querying
the number of items satisfying a request, or for providers for which the count endpoint
returns a json or xml document
* :attr:`~eodag.config.PluginConfig.Pagination.max_items_per_page` (``int``): The maximum
number of items per page that the provider can handle; default: ``50``
* :attr:`~eodag.config.PluginConfig.Pagination.start_page` (``int``): number of the
first page; default: ``1``
* :attr:`~eodag.config.PluginConfig.discover_product_types`
(:class:`~eodag.config.PluginConfig.DiscoverProductTypes`): configuration for product type discovery based on
information from the provider; It contains the keys:
* :attr:`~eodag.config.PluginConfig.DiscoverProductTypes.fetch_url` (``str``) (**mandatory**): url from which
the product types can be fetched
* :attr:`~eodag.config.PluginConfig.DiscoverProductTypes.result_type` (``str``): type of the provider result;
currently only ``json`` is supported (other types could be used in an extension of this plugin)
* :attr:`~eodag.config.PluginConfig.DiscoverProductTypes.results_entry` (``str``) (**mandatory**): json path
to the list of product types
* :attr:`~eodag.config.PluginConfig.DiscoverProductTypes.generic_product_type_id` (``str``): mapping for the
product type id
* :attr:`~eodag.config.PluginConfig.DiscoverProductTypes.generic_product_type_parsable_metadata`
(``Dict[str, str]``): mapping for product type metadata (e.g. ``abstract``, ``licence``) which can be parsed
from the provider result
* :attr:`~eodag.config.PluginConfig.DiscoverProductTypes.generic_product_type_parsable_properties`
(``Dict[str, str]``): mapping for product type properties which can be parsed from the result that are not
product type metadata
* :attr:`~eodag.config.PluginConfig.DiscoverProductTypes.single_collection_fetch_url` (``str``): url to fetch
data for a single collection; used if product type metadata is not available from the endpoint given in
:attr:`~eodag.config.PluginConfig.DiscoverProductTypes.fetch_url`
* :attr:`~eodag.config.PluginConfig.DiscoverProductTypes.single_collection_fetch_qs` (``str``): query string
to be added to the :attr:`~eodag.config.PluginConfig.DiscoverProductTypes.fetch_url` to filter for a
collection
* :attr:`~eodag.config.PluginConfig.DiscoverProductTypes.single_product_type_parsable_metadata`
(``Dict[str, str]``): mapping for product type metadata returned by the endpoint given in
:attr:`~eodag.config.PluginConfig.DiscoverProductTypes.single_collection_fetch_url`.
* :attr:`~eodag.config.PluginConfig.constraints_file_url` (``str``): url to fetch the constraints for a specific
product type, can be an http url or a path to a file; the constraints are used to build queryables
* :attr:`~eodag.config.PluginConfig.constraints_entry` (``str``): key in the json result where the constraints
can be found; if not given, it is assumed that the constraints are on top level of the result, i.e.
the result is an array of constraints
* :attr:`~eodag.config.PluginConfig.metadata_mapping` (``Dict[str, Any]``): The search plugins of this kind can
detect when a metadata mapping is "query-able", and get the semantics of how to format the query string
parameter that enables to make a query on the corresponding metadata. To make a metadata query-able,
just configure it in the metadata mapping to be a list of 2 items, the first one being the
specification of the query string search formatting. The later is a string following the
specification of Python string formatting, with a special behaviour added to it. For example,
an entry in the metadata mapping of this kind::
completionTimeFromAscendingNode:
- 'f=acquisition.endViewingDate:lte:{completionTimeFromAscendingNode#timestamp}'
- '$.properties.acquisition.endViewingDate'
means that the search url will have a query string parameter named ``f`` with a value of
``acquisition.endViewingDate:lte:1543922280.0`` if the search was done with the value
of ``completionTimeFromAscendingNode`` being ``2018-12-04T12:18:00``. What happened is that
``{completionTimeFromAscendingNode#timestamp}`` was replaced with the timestamp of the value
of ``completionTimeFromAscendingNode``. This example shows all there is to know about the
semantics of the query string formatting introduced by this plugin: any eodag search parameter
can be referenced in the query string with an additional optional conversion function that
is separated from it by a ``#`` (see :func:`~eodag.api.product.metadata_mapping.format_metadata` for further
details on the available converters). Note that for the values in the
:attr:`~eodag.config.PluginConfig.free_text_search_operations` configuration parameter follow the same rule.
If the metadata_mapping is not a list but only a string, this means that the parameters is not queryable but
it is included in the result obtained from the provider. The string indicates how the provider result should
be mapped to the eodag parameter.
"""
data_request_id: Optional[str]
[docs]
def __init__(self, provider: str, config: PluginConfig) -> None:
super(DataRequestSearch, self).__init__(provider, config)
self.config.__dict__.setdefault("result_type", "json")
self.config.__dict__.setdefault("results_entry", "content")
self.config.__dict__.setdefault("pagination", {})
self.config.__dict__.setdefault("free_text_search_operations", {})
for product_type in self.config.products.keys():
if "metadata_mapping" in self.config.products[product_type].keys():
self.config.products[product_type][
"metadata_mapping"
] = mtd_cfg_as_conversion_and_querypath(
self.config.products[product_type]["metadata_mapping"]
)
# Complete and ready to use product type specific metadata-mapping
product_type_metadata_mapping = deepcopy(self.config.metadata_mapping)
# update config using provider product type definition metadata_mapping
# from another product
other_product_for_mapping = self.config.products[product_type].get(
"metadata_mapping_from_product", ""
)
if other_product_for_mapping:
other_product_type_def_params = self.get_product_type_def_params(
other_product_for_mapping, # **kwargs
)
product_type_metadata_mapping.update(
other_product_type_def_params.get("metadata_mapping", {})
)
# from current product
product_type_metadata_mapping.update(
self.config.products[product_type]["metadata_mapping"]
)
self.config.products[product_type][
"metadata_mapping"
] = product_type_metadata_mapping
if (
self.config.result_type == "json"
and "next_page_url_key_path" in self.config.pagination
):
self.config.pagination["next_page_url_key_path"] = string_to_jsonpath(
self.config.pagination.get("next_page_url_key_path", None)
)
self.download_info: Dict[str, Any] = {}
self.data_request_id = None
def discover_product_types(self, **kwargs: Any) -> Optional[Dict[str, Any]]:
"""Fetch product types is disabled for `DataRequestSearch`
:returns: empty dict
"""
return None
def clear(self) -> None:
"""Clear search context"""
super().clear()
self.data_request_id = None
def query(
self,
prep: PreparedSearch = PreparedSearch(),
**kwargs: Any,
) -> Tuple[List[EOProduct], Optional[int]]:
"""
performs the search for a provider where several steps are required to fetch the data
"""
if kwargs.get("sort_by"):
raise ValidationError(f"{self.provider} does not support sorting feature")
product_type = kwargs.get("productType", None)
if product_type is None:
raise ValidationError("Required productType is missing")
# replace "product_type" to "providerProductType" in search args if exists
# for compatibility with DataRequestSearch method
if kwargs.get("product_type"):
kwargs["providerProductType"] = kwargs.pop("product_type", None)
provider_product_type = cast(str, self._map_product_type(product_type or ""))
keywords = {k: v for k, v in kwargs.items() if k != "auth" and v is not None}
if provider_product_type and provider_product_type != GENERIC_PRODUCT_TYPE:
keywords["productType"] = provider_product_type
else:
keywords["productType"] = product_type
# provider product type specific conf
self.product_type_def_params = self.get_product_type_def_params(
product_type, **kwargs
)
# update config using provider product type definition metadata_mapping
# from another product
other_product_for_mapping = self.product_type_def_params.get(
"metadata_mapping_from_product", ""
)
if other_product_for_mapping:
other_product_type_def_params = self.get_product_type_def_params(
other_product_for_mapping, **kwargs
)
self.config.metadata_mapping.update(
other_product_type_def_params.get("metadata_mapping", {})
)
# from current product
self.config.metadata_mapping.update(
self.product_type_def_params.get("metadata_mapping", {})
)
# if product_type_def_params is set, remove product_type as it may conflict with this conf
if self.product_type_def_params:
keywords.pop("productType", None)
keywords.update(
{
k: v
for k, v in self.product_type_def_params.items()
if k not in keywords.keys()
and k in self.config.metadata_mapping.keys()
and isinstance(self.config.metadata_mapping[k], list)
}
)
# update dates if needed
if getattr(self.config, "dates_required", True) and "id" not in keywords:
if not keywords.get("startTimeFromAscendingNode", None):
keywords["startTimeFromAscendingNode"] = getattr(
self.config, "product_type_config", {}
).get("missionStartDate", DEFAULT_MISSION_START_DATE)
if not keywords.get("completionTimeFromAscendingNode", None):
keywords["completionTimeFromAscendingNode"] = getattr(
self.config, "product_type_config", {}
).get("missionEndDate", datetime.now(timezone.utc).isoformat())
# ask for data_request_id if not set (it must exist when iterating over pages)
if not self.data_request_id:
data_request_id = self._create_data_request(
provider_product_type, product_type, **keywords
)
self.data_request_id = data_request_id
request_finished = False
else:
data_request_id = self.data_request_id
request_finished = True
# loop to check search job status
search_timeout = int(getattr(self.config, "timeout", HTTP_REQ_TIMEOUT))
logger.info(
f"checking status of request job {data_request_id} (timeout={search_timeout}s)"
)
check_beginning = datetime.now()
while not request_finished:
request_finished = self._check_request_status(data_request_id)
if not request_finished and datetime.now() >= check_beginning + timedelta(
seconds=search_timeout
):
self._cancel_request(data_request_id)
raise NotAvailableError(
f"Timeout reached when checking search job status for {self.provider}"
)
elif not request_finished:
time.sleep(1)
logger.info("search job for product_type %s finished", provider_product_type)
result = self._get_result_data(
data_request_id,
kwargs.get("items_per_page", DEFAULT_ITEMS_PER_PAGE),
kwargs.get("page", DEFAULT_PAGE),
)
# if exists, add the geometry from search args in the content of the response for each product
if keywords.get("geometry"):
for product_content in result["content"]:
if product_content["extraInformation"] is None:
product_content["extraInformation"] = {
"footprint": keywords["geometry"]
}
elif not product_content["extraInformation"].get("footprint"):
product_content["extraInformation"]["footprint"] = keywords[
"geometry"
]
logger.info("result retrieved from search job")
if self._check_uses_custom_filters(product_type):
result = self._apply_additional_filters(
result, self.config.products[product_type]["custom_filters"]
)
return self._convert_result_data(
result, data_request_id, product_type or "", **kwargs
)
def _create_data_request(
self, product_type: str, eodag_product_type: str, **kwargs: Any
) -> str:
headers = getattr(self.auth, "headers", USER_AGENT)
ssl_verify = getattr(self.config.ssl_verify, "ssl_verify", True)
try:
url = self.config.data_request_url
request_body = format_query_params(eodag_product_type, self.config, kwargs)
logger.debug(
f"Sending search job request to {url} with {str(request_body)}"
)
request_job = requests.post(
url,
json=request_body,
headers=headers,
timeout=HTTP_REQ_TIMEOUT,
verify=ssl_verify,
)
request_job.raise_for_status()
except requests.exceptions.Timeout as exc:
raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc
except requests.RequestException as e:
raise RequestError.from_error(
e, f"search job for product_type {product_type} could not be created"
) from e
else:
logger.info("search job for product_type %s created", product_type)
return request_job.json()["jobId"]
def _cancel_request(self, data_request_id: str) -> None:
logger.info("deleting request job %s", data_request_id)
delete_url = f"{self.config.data_request_url}/{data_request_id}"
headers = getattr(self.auth, "headers", USER_AGENT)
try:
delete_resp = requests.delete(
delete_url, headers=headers, timeout=HTTP_REQ_TIMEOUT
)
delete_resp.raise_for_status()
except requests.exceptions.Timeout as exc:
raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc
except requests.RequestException as e:
raise RequestError.from_error(e, "_cancel_request failed") from e
def _check_request_status(self, data_request_id: str) -> bool:
logger.debug("checking status of request job %s", data_request_id)
status_url = self.config.status_url + data_request_id
headers = getattr(self.auth, "headers", USER_AGENT)
ssl_verify = getattr(self.config, "ssl_verify", True)
try:
status_resp = requests.get(
status_url,
headers=headers,
timeout=HTTP_REQ_TIMEOUT,
verify=ssl_verify,
)
status_resp.raise_for_status()
except requests.exceptions.Timeout as exc:
raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc
except requests.RequestException as e:
raise RequestError.from_error(e, "_check_request_status failed") from e
else:
status_data = status_resp.json()
if "status_code" in status_data and status_data["status_code"] in [
403,
404,
]:
logger.error(f"_check_request_status failed: {status_data}")
error = RequestError("authentication token expired during request")
error.status_code = status_data["status_code"]
raise error
if status_data["status"] == "failed":
logger.error(f"_check_request_status failed: {status_data}")
raise RequestError(
f"data request job has failed, message: {status_data['message']}"
)
return status_data["status"] == "completed"
def _get_result_data(
self, data_request_id: str, items_per_page: int, page: int
) -> Dict[str, Any]:
page = page - 1 + self.config.pagination.get("start_page", 1)
url = self.config.result_url.format(
jobId=data_request_id, items_per_page=items_per_page, page=page
)
ssl_verify = getattr(self.config, "ssl_verify", True)
headers = getattr(self.auth, "headers", USER_AGENT)
try:
return requests.get(
url, headers=headers, timeout=HTTP_REQ_TIMEOUT, verify=ssl_verify
).json()
except requests.exceptions.Timeout as exc:
raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc
except requests.RequestException:
logger.error(f"Result could not be retrieved for {url}")
return {}
def _convert_result_data(
self,
result_data: Dict[str, Any],
data_request_id: str,
product_type: str,
**kwargs: Any,
) -> Tuple[List[EOProduct], int]:
"""Build EOProducts from provider results"""
results_entry = self.config.results_entry
results = result_data[results_entry]
logger.debug(
"Adapting %s plugin results to eodag product representation" % len(results)
)
products: List[EOProduct] = []
for result in results:
product = EOProduct(
self.provider,
properties_from_json(
result,
self.get_metadata_mapping(kwargs.get("productType")),
discovery_config=getattr(self.config, "discover_metadata", {}),
),
**kwargs,
)
# use product_type_config as default properties
product.properties = dict(
getattr(self.config, "product_type_config", {}), **product.properties
)
products.append(product)
# postprocess filtering needed when provider does not natively offer filtering by id
if "id" in kwargs:
products = [
p for p in products if product.properties["id"] == kwargs["id"]
] or products
total_items_nb_key_path = string_to_jsonpath(
self.config.pagination["total_items_nb_key_path"]
)
found_total_items_nb_paths = total_items_nb_key_path.find(results)
if found_total_items_nb_paths and not isinstance(
found_total_items_nb_paths, int
):
total_items_nb = found_total_items_nb_paths[0].value
else:
total_items_nb = 0
for p in products:
# add the request id to the order link property (required to create data order)
p.properties["orderLink"] = p.properties["orderLink"].replace(
"requestJobId", str(data_request_id)
)
if self.config.products[product_type].get("storeDownloadUrl", False):
# store download information to retrieve it later in case search by id
# is not possible
self.download_info[p.properties["id"]] = {
"requestJobId": data_request_id,
"orderLink": p.properties["orderLink"],
"downloadLink": p.properties["downloadLink"],
"provider": self.provider,
}
return products, total_items_nb
def _check_uses_custom_filters(self, product_type: str) -> bool:
if (
product_type in self.config.products
and "custom_filters" in self.config.products[product_type]
):
return True
return False
def _apply_additional_filters(
self, result: Dict[str, Any], custom_filters: Dict[str, str]
) -> Dict[str, Any]:
filtered_result = []
results_entry = self.config.results_entry
results = result[results_entry]
path = string_to_jsonpath(custom_filters["filter_attribute"])
indexes = custom_filters["indexes"].split("-")
for record in results:
found_paths = path.find(record)
if not found_paths or isinstance(found_paths, int):
continue
filter_param = found_paths[0].value
filter_value = filter_param[int(indexes[0]) : int(indexes[1])]
filter_clause = "'" + filter_value + "' " + custom_filters["filter_clause"]
if eval(filter_clause):
filtered_result.append(record)
result[results_entry] = filtered_result
return result
def _map_product_type(self, product_type: Optional[str]) -> Optional[str]:
"""Map the eodag product type to the provider product type"""
if product_type is None:
return None
logger.debug("Mapping eodag product type to provider product type")
return self.config.products.get(product_type, {}).get(
"productType", GENERIC_PRODUCT_TYPE
)