# -*- coding: utf-8 -*-
# Copyright 2023, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import time
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
import requests
from eodag import EOProduct
from eodag.api.product.metadata_mapping import (
format_query_params,
mtd_cfg_as_conversion_and_querypath,
properties_from_json,
)
from eodag.plugins.search.base import Search
from eodag.utils import (
DEFAULT_ITEMS_PER_PAGE,
DEFAULT_MISSION_START_DATE,
DEFAULT_PAGE,
GENERIC_PRODUCT_TYPE,
HTTP_REQ_TIMEOUT,
USER_AGENT,
deepcopy,
string_to_jsonpath,
)
from eodag.utils.exceptions import (
NotAvailableError,
RequestError,
TimeOutError,
ValidationError,
)
if TYPE_CHECKING:
from eodag.config import PluginConfig
logger = logging.getLogger("eodag.search.data_request_search")
[docs]
class DataRequestSearch(Search):
"""
Plugin to execute search requests composed of several steps:
- do a data request which defines which data shall be searched
- check the status of the request job
- if finished - fetch the result of the job
"""
data_request_id: Optional[str]
[docs]
def __init__(self, provider: str, config: PluginConfig) -> None:
super(DataRequestSearch, self).__init__(provider, config)
self.config.__dict__.setdefault("result_type", "json")
self.config.__dict__.setdefault("results_entry", "content")
self.config.__dict__.setdefault("pagination", {})
self.config.__dict__.setdefault("free_text_search_operations", {})
for product_type in self.config.products.keys():
if "metadata_mapping" in self.config.products[product_type].keys():
self.config.products[product_type][
"metadata_mapping"
] = mtd_cfg_as_conversion_and_querypath(
self.config.products[product_type]["metadata_mapping"]
)
# Complete and ready to use product type specific metadata-mapping
product_type_metadata_mapping = deepcopy(self.config.metadata_mapping)
# update config using provider product type definition metadata_mapping
# from another product
other_product_for_mapping = self.config.products[product_type].get(
"metadata_mapping_from_product", ""
)
if other_product_for_mapping:
other_product_type_def_params = self.get_product_type_def_params(
other_product_for_mapping, # **kwargs
)
product_type_metadata_mapping.update(
other_product_type_def_params.get("metadata_mapping", {})
)
# from current product
product_type_metadata_mapping.update(
self.config.products[product_type]["metadata_mapping"]
)
self.config.products[product_type][
"metadata_mapping"
] = product_type_metadata_mapping
if (
self.config.result_type == "json"
and "next_page_url_key_path" in self.config.pagination
):
self.config.pagination["next_page_url_key_path"] = string_to_jsonpath(
self.config.pagination.get("next_page_url_key_path", None)
)
self.download_info: Dict[str, Any] = {}
self.data_request_id = None
def discover_product_types(self, **kwargs: Any) -> Optional[Dict[str, Any]]:
"""Fetch product types is disabled for `DataRequestSearch`
:returns: empty dict
:rtype: (optional) dict
"""
return None
def clear(self) -> None:
"""Clear search context"""
super().clear()
self.data_request_id = None
def query(
self,
product_type: Optional[str] = None,
items_per_page: int = DEFAULT_ITEMS_PER_PAGE,
page: int = DEFAULT_PAGE,
count: bool = True,
**kwargs: Any,
) -> Tuple[List[EOProduct], Optional[int]]:
"""
performs the search for a provider where several steps are required to fetch the data
"""
if kwargs.get("sortBy"):
raise ValidationError(f"{self.provider} does not support sorting feature")
product_type = kwargs.get("productType", None)
if product_type is None:
raise ValidationError("Required productType is missing")
# replace "product_type" to "providerProductType" in search args if exists
# for compatibility with DataRequestSearch method
if kwargs.get("product_type"):
kwargs["providerProductType"] = kwargs.pop("product_type", None)
provider_product_type = cast(str, self._map_product_type(product_type or ""))
keywords = {k: v for k, v in kwargs.items() if k != "auth" and v is not None}
if provider_product_type and provider_product_type != GENERIC_PRODUCT_TYPE:
keywords["productType"] = provider_product_type
else:
keywords["productType"] = product_type
# provider product type specific conf
self.product_type_def_params = self.get_product_type_def_params(
product_type, **kwargs
)
# update config using provider product type definition metadata_mapping
# from another product
other_product_for_mapping = self.product_type_def_params.get(
"metadata_mapping_from_product", ""
)
if other_product_for_mapping:
other_product_type_def_params = self.get_product_type_def_params(
other_product_for_mapping, **kwargs
)
self.config.metadata_mapping.update(
other_product_type_def_params.get("metadata_mapping", {})
)
# from current product
self.config.metadata_mapping.update(
self.product_type_def_params.get("metadata_mapping", {})
)
# if product_type_def_params is set, remove product_type as it may conflict with this conf
if self.product_type_def_params:
keywords.pop("productType", None)
keywords.update(
{
k: v
for k, v in self.product_type_def_params.items()
if k not in keywords.keys()
and k in self.config.metadata_mapping.keys()
and isinstance(self.config.metadata_mapping[k], list)
}
)
# update dates if needed
if getattr(self.config, "dates_required", True) and "id" not in keywords:
if not keywords.get("startTimeFromAscendingNode", None):
keywords["startTimeFromAscendingNode"] = getattr(
self.config, "product_type_config", {}
).get("missionStartDate", DEFAULT_MISSION_START_DATE)
if not keywords.get("completionTimeFromAscendingNode", None):
keywords["completionTimeFromAscendingNode"] = getattr(
self.config, "product_type_config", {}
).get("missionEndDate", datetime.now(timezone.utc).isoformat())
# ask for data_request_id if not set (it must exist when iterating over pages)
if not self.data_request_id:
data_request_id = self._create_data_request(
provider_product_type, product_type, **keywords
)
self.data_request_id = data_request_id
request_finished = False
else:
data_request_id = self.data_request_id
request_finished = True
# loop to check search job status
search_timeout = int(getattr(self.config, "timeout", HTTP_REQ_TIMEOUT))
logger.info(
f"checking status of request job {data_request_id} (timeout={search_timeout}s)"
)
check_beginning = datetime.now()
while not request_finished:
request_finished = self._check_request_status(data_request_id)
if not request_finished and datetime.now() >= check_beginning + timedelta(
seconds=search_timeout
):
self._cancel_request(data_request_id)
raise NotAvailableError(
f"Timeout reached when checking search job status for {self.provider}"
)
elif not request_finished:
time.sleep(1)
logger.info("search job for product_type %s finished", provider_product_type)
result = self._get_result_data(
data_request_id,
kwargs.get("items_per_page", DEFAULT_ITEMS_PER_PAGE),
kwargs.get("page", DEFAULT_PAGE),
)
# if exists, add the geometry from search args in the content of the response for each product
if keywords.get("geometry"):
for product_content in result["content"]:
if product_content["extraInformation"] is None:
product_content["extraInformation"] = {
"footprint": keywords["geometry"]
}
elif not product_content["extraInformation"].get("footprint"):
product_content["extraInformation"]["footprint"] = keywords[
"geometry"
]
logger.info("result retrieved from search job")
if self._check_uses_custom_filters(product_type):
result = self._apply_additional_filters(
result, self.config.products[product_type]["custom_filters"]
)
return self._convert_result_data(
result, data_request_id, product_type or "", **kwargs
)
def _create_data_request(
self, product_type: str, eodag_product_type: str, **kwargs: Any
) -> str:
headers = getattr(self.auth, "headers", USER_AGENT)
ssl_verify = getattr(self.config.ssl_verify, "ssl_verify", True)
try:
url = self.config.data_request_url
request_body = format_query_params(
eodag_product_type, self.config, **kwargs
)
logger.debug(
f"Sending search job request to {url} with {str(request_body)}"
)
request_job = requests.post(
url,
json=request_body,
headers=headers,
timeout=HTTP_REQ_TIMEOUT,
verify=ssl_verify,
)
request_job.raise_for_status()
except requests.exceptions.Timeout as exc:
raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc
except requests.RequestException as e:
raise RequestError(
f"search job for product_type {product_type} could not be created: {str(e)}, {request_job.text}"
)
else:
logger.info("search job for product_type %s created", product_type)
return request_job.json()["jobId"]
def _cancel_request(self, data_request_id: str) -> None:
logger.info("deleting request job %s", data_request_id)
delete_url = f"{self.config.data_request_url}/{data_request_id}"
headers = getattr(self.auth, "headers", USER_AGENT)
try:
delete_resp = requests.delete(
delete_url, headers=headers, timeout=HTTP_REQ_TIMEOUT
)
delete_resp.raise_for_status()
except requests.exceptions.Timeout as exc:
raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc
except requests.RequestException as e:
raise RequestError(f"_cancel_request failed: {str(e)}")
def _check_request_status(self, data_request_id: str) -> bool:
logger.debug("checking status of request job %s", data_request_id)
status_url = self.config.status_url + data_request_id
headers = getattr(self.auth, "headers", USER_AGENT)
ssl_verify = getattr(self.config, "ssl_verify", True)
try:
status_resp = requests.get(
status_url,
headers=headers,
timeout=HTTP_REQ_TIMEOUT,
verify=ssl_verify,
)
status_resp.raise_for_status()
except requests.exceptions.Timeout as exc:
raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc
except requests.RequestException as e:
raise RequestError(f"_check_request_status failed: {str(e)}")
else:
status_data = status_resp.json()
if "status_code" in status_data and status_data["status_code"] in [
403,
404,
]:
logger.error(f"_check_request_status failed: {status_data}")
raise RequestError("authentication token expired during request")
if status_data["status"] == "failed":
logger.error(f"_check_request_status failed: {status_data}")
raise RequestError(
f"data request job has failed, message: {status_data['message']}"
)
return status_data["status"] == "completed"
def _get_result_data(
self, data_request_id: str, items_per_page: int, page: int
) -> Dict[str, Any]:
page = page - 1 + self.config.pagination.get("start_page", 1)
url = self.config.result_url.format(
jobId=data_request_id, items_per_page=items_per_page, page=page
)
ssl_verify = getattr(self.config, "ssl_verify", True)
headers = getattr(self.auth, "headers", USER_AGENT)
try:
return requests.get(
url, headers=headers, timeout=HTTP_REQ_TIMEOUT, verify=ssl_verify
).json()
except requests.exceptions.Timeout as exc:
raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc
except requests.RequestException:
logger.error(f"Result could not be retrieved for {url}")
return {}
def _convert_result_data(
self,
result_data: Dict[str, Any],
data_request_id: str,
product_type: str,
**kwargs: Any,
) -> Tuple[List[EOProduct], int]:
"""Build EOProducts from provider results"""
results_entry = self.config.results_entry
results = result_data[results_entry]
logger.debug(
"Adapting %s plugin results to eodag product representation" % len(results)
)
products: List[EOProduct] = []
for result in results:
product = EOProduct(
self.provider,
properties_from_json(
result,
self.get_metadata_mapping(kwargs.get("productType")),
discovery_config=getattr(self.config, "discover_metadata", {}),
),
**kwargs,
)
# use product_type_config as default properties
product.properties = dict(
getattr(self.config, "product_type_config", {}), **product.properties
)
products.append(product)
# postprocess filtering needed when provider does not natively offer filtering by id
if "id" in kwargs:
products = [
p for p in products if product.properties["id"] == kwargs["id"]
] or products
total_items_nb_key_path = string_to_jsonpath(
self.config.pagination["total_items_nb_key_path"]
)
if len(total_items_nb_key_path.find(results)) > 0:
total_items_nb = total_items_nb_key_path.find(results)[0].value
else:
total_items_nb = 0
for p in products:
# add the request id to the order link property (required to create data order)
p.properties["orderLink"] = p.properties["orderLink"].replace(
"requestJobId", str(data_request_id)
)
if self.config.products[product_type].get("storeDownloadUrl", False):
# store download information to retrieve it later in case search by id
# is not possible
self.download_info[p.properties["id"]] = {
"requestJobId": data_request_id,
"orderLink": p.properties["orderLink"],
"downloadLink": p.properties["downloadLink"],
"provider": self.provider,
}
return products, total_items_nb
def _check_uses_custom_filters(self, product_type: str) -> bool:
if (
product_type in self.config.products
and "custom_filters" in self.config.products[product_type]
):
return True
return False
def _apply_additional_filters(
self, result: Dict[str, Any], custom_filters: Dict[str, str]
) -> Dict[str, Any]:
filtered_result = []
results_entry = self.config.results_entry
results = result[results_entry]
path = string_to_jsonpath(custom_filters["filter_attribute"])
indexes = custom_filters["indexes"].split("-")
for record in results:
filter_param = path.find(record)[0].value
filter_value = filter_param[int(indexes[0]) : int(indexes[1])]
filter_clause = "'" + filter_value + "' " + custom_filters["filter_clause"]
if eval(filter_clause):
filtered_result.append(record)
result[results_entry] = filtered_result
return result
def _map_product_type(self, product_type: Optional[str]) -> Optional[str]:
"""Map the eodag product type to the provider product type"""
if product_type is None:
return None
logger.debug("Mapping eodag product type to provider product type")
return self.config.products.get(product_type, {}).get(
"productType", GENERIC_PRODUCT_TYPE
)