Source code for eodag.plugins.search.build_search_result

# -*- coding: utf-8 -*-
# Copyright 2022, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import datetime as dt
import hashlib
import logging
import re
from collections import OrderedDict
from types import MethodType
from typing import TYPE_CHECKING, Annotated, Any, Optional
from urllib.parse import quote_plus, unquote_plus

import geojson
import orjson
from pydantic import AliasChoices
from pydantic.fields import FieldInfo
from requests.auth import AuthBase
from typing_extensions import get_args  # noqa: F401

from eodag.api.product import EOProduct
from eodag.api.product.metadata_mapping import (
    DEFAULT_GEOMETRY,
    NOT_AVAILABLE,
    OFFLINE_STATUS,
    STAGING_STATUS,
    format_metadata,
    mtd_cfg_as_conversion_and_querypath,
    properties_from_json,
)
from eodag.api.search_result import RawSearchResult, SearchResult
from eodag.plugins.search import PreparedSearch
from eodag.plugins.search.qssearch import PostJsonSearch, QueryStringSearch
from eodag.types import json_field_definition_to_python  # noqa: F401
from eodag.types.queryables import Queryables, QueryablesDict
from eodag.types.stac_extensions import EcmwfItemProperties
from eodag.utils import (
    DEFAULT_SEARCH_TIMEOUT,
    deepcopy,
    dict_items_recursive_sort,
    format_string,
    get_geometry_from_ecmwf_area,
    get_geometry_from_ecmwf_feature,
    get_geometry_from_ecmwf_location,
    get_geometry_from_various,
)
from eodag.utils.cache import instance_cached_method
from eodag.utils.dates import (
    COMPACT_DATE_RANGE_PATTERN,
    DATE_RANGE_PATTERN,
    compute_date_range_from_params,
    format_date,
    is_range_in_range,
    parse_date,
    parse_to_utc,
    parse_year_month_day,
    time_values_to_hhmm,
    to_iso_utc_string,
    validate_datetime_param,
)
from eodag.utils.exceptions import (
    DownloadError,
    MisconfiguredError,
    NotAvailableError,
    ValidationError,
)
from eodag.utils.requests import fetch_json

if TYPE_CHECKING:
    from eodag.config import PluginConfig

logger = logging.getLogger("eodag.search.build_search_result")

ECMWF_PREFIX = "ecmwf:"

ALLOWED_KEYWORDS = set(
    [k.replace("ecmwf_", "") for k in EcmwfItemProperties.model_fields.keys()]
)

END = "end_datetime"

START = "start_datetime"


def ecmwf_mtd() -> dict[str, Any]:
    """
    Make metadata mapping dict from a list of defined ECMWF Keywords

    We automatically add the #to_geojson convert to prevent modification of entries by eval() in the metadata mapping.

    keyword:
        - keyword
        - $."keyword"#to_geojson

    :return: metadata mapping dict
    """
    return {k: [k, f'{{$."{k}"#to_geojson}}'] for k in ALLOWED_KEYWORDS}


def _update_properties_from_element(
    prop: dict[str, Any], element: dict[str, Any], values: list[str]
) -> None:
    """updates a property dict with the given values based on the information from the element dict
    e.g. the type is set based on the type of the element
    """
    # multichoice elements are transformed into array
    if element["type"] in ("StringListWidget", "StringListArrayWidget"):
        prop["type"] = "array"
        if values:
            prop["items"] = {"type": "string", "enum": sorted(values)}

    # single choice elements are transformed into string
    elif element["type"] in (
        "StringChoiceWidget",
        "DateRangeWidget",
        "FreeformInputWidget",
    ):
        prop["type"] = "string"
        if values:
            prop["enum"] = sorted(values)

    # a bbox element
    elif element["type"] in ["GeographicExtentWidget", "GeographicExtentMapWidget"]:
        prop.update(
            {
                "type": "array",
                "minItems": 4,
                "additionalItems": False,
                "items": [
                    {
                        "type": "number",
                        "maximum": 90,
                        "minimum": -90,
                        "description": "North border of the bounding box",
                    },
                    {
                        "type": "number",
                        "maximum": 180,
                        "minimum": -180,
                        "description": "West border of the bounding box",
                    },
                    {
                        "type": "number",
                        "maximum": 90,
                        "minimum": -90,
                        "description": "South border of the bounding box",
                    },
                    {
                        "type": "number",
                        "maximum": 180,
                        "minimum": -180,
                        "description": "East border of the bounding box",
                    },
                ],
            }
        )

    # DateRangeWidget is a calendar date picker
    if element["type"] == "DateRangeWidget":
        prop["description"] = "date formatted like yyyy-mm-dd/yyyy-mm-dd"

    # a single geographic location
    if element["type"] == "GeographicLocationWidget":
        prop.update(
            {
                "type": "object",
                "description": "Longitude and latitude of a single location",
                "properties": {
                    "longitude": {
                        "type": "number",
                        "maximum": 180,
                        "minimum": -180,
                    },
                    "latitude": {
                        "type": "number",
                        "maximum": 90,
                        "minimum": -90,
                    },
                },
            }
        )

    if description := element.get("help"):
        prop["description"] = description


def ecmwf_format(v: str, alias: bool = True) -> str:
    """Add ECMWF prefix to value v if v is a ECMWF keyword.

    :param v: parameter to format
    :param alias: whether to format for alias (with ':') or for query param (False, with '_')
    :return: formatted parameter

    >>> ecmwf_format('dataset', alias=False)
    'ecmwf_dataset'
    >>> ecmwf_format('variable')
    'ecmwf:variable'
    >>> ecmwf_format('unknown_param')
    'unknown_param'
    """
    separator = ":" if alias else "_"
    return f"{ECMWF_PREFIX[:-1]}{separator}{v}" if v in ALLOWED_KEYWORDS else v


def ecmwf_temporal_to_eodag(
    params: dict[str, Any],
) -> tuple[Optional[str], Optional[str]]:
    """
    Converts ECMWF temporal parameters to EODAG temporal parameters.

    ECMWF temporal parameters:
        - **year** or **hyear**: Union[str, list[str]] — Year(s) as a string or list of strings.
        - **month** or **hmonth**: Union[str, list[str]] — Month(s) as a string or list of strings.
        - **day** or **hday**: Union[str, list[str]] — Day(s) as a string or list of strings.
        - **time**: str — A string representing the time in the format `HHMM` (e.g., `0200`, `0800`, `1400`).
        - **date**: str — A string in one of the formats:
            - `YYYY-MM-DD`
            - `YYYY-MM-DD/YYYY-MM-DD`
            - `YYYY-MM-DD/to/YYYY-MM-DD`

    :param params: Dictionary containing ECMWF temporal parameters.
    :return: A tuple with:
        - **start**: A string in the format `YYYY-MM-DDTHH:MM:SSZ`.
        - **end**: A string in the format `YYYY-MM-DDTHH:MM:SSZ`.
    """
    start = end = None

    if date := params.get("date"):
        if isinstance(date, list):
            date = "/".join(date)
        start, end = parse_date(date, params.get("time"))

    elif year := (params.get("year") or params.get("hyear")):
        month = params.get("month") or params.get("hmonth")
        day = params.get("day") or params.get("hday")
        time = params.get("time")

        start, end = parse_year_month_day(year, month, day, time)

    if start and end:
        return to_iso_utc_string(start), to_iso_utc_string(end)
    else:
        return None, None


[docs] class ECMWFSearch(PostJsonSearch): """ECMWF search plugin. This plugin builds a :class:`~eodag.api.search_result.SearchResult` containing a single product using given query parameters as product properties. The available configuration parameters inherits from parent classes, with some particular parameters for this plugin. :param provider: An eodag providers configuration dictionary :param config: Search plugin configuration: * :attr:`~eodag.config.PluginConfig.remove_from_query` (``list[str]``): List of parameters used to parse metadata but that must not be included to the query * :attr:`~eodag.config.PluginConfig.end_date_excluded` (``bool``): Set to `False` if provider does not include end date to search * :attr:`~eodag.config.PluginConfig.discover_queryables` (:class:`~eodag.config.PluginConfig.DiscoverQueryables`): configuration to fetch the queryables from a provider queryables endpoint; It has the following keys: * :attr:`~eodag.config.PluginConfig.DiscoverQueryables.fetch_url` (``str``): url to fetch the queryables valid for all collections * :attr:`~eodag.config.PluginConfig.DiscoverQueryables.collection_fetch_url` (``str``): url to fetch the queryables for a specific collection * :attr:`~eodag.config.PluginConfig.DiscoverQueryables.constraints_url` (``str``): url of the constraint file used to build queryables * :attr:`~eodag.config.PluginConfig.dynamic_discover_queryables` (``list`` [:class:`~eodag.config.PluginConfig.DynamicDiscoverQueryables`]): list of configurations to fetch the queryables from different provider queryables endpoints. A configuration is used based on the given selection criterias. The first match is used. If no match is found, it falls back to standard behaviors (e.g. discovery using :attr:`~eodag.config.PluginConfig.discover_queryables`). Each element of the list has the following keys: * :attr:`~eodag.config.PluginConfig.DynamicDiscoverQueryables.collection_selector` (``list`` [:class:`~eodag.config.PluginConfig.CollectionSelector`]): list of collection selection criterias. The configuration given in :attr:`~eodag.config.PluginConfig.DynamicDiscoverQueryables.discover_queryables` is used if any collection selector matches the search parameters. The selector matches if the field value starts with the given prefix, i.e. it matches if ``parameters[field].startswith(prefix)==True``. It has the following keys: * :attr:`~eodag.config.PluginConfig.CollectionSelector.field` (``str``) Field in the search parameters to match * :attr:`~eodag.config.PluginConfig.CollectionSelector.prefix` (``str``) Prefix to match in the field * :attr:`~eodag.config.PluginConfig.DynamicDiscoverQueryables.discover_queryables` (``list`` [:class:`~eodag.config.PluginConfig.DiscoverQueryables`]): same as :attr:`~eodag.config.PluginConfig.discover_queryables` above. """
[docs] def __init__(self, provider: str, config: PluginConfig) -> None: config.metadata_mapping = { **ecmwf_mtd(), **{ "id": "$.id", "title": "$.id", "order:status": OFFLINE_STATUS, "eodag:download_link": "$.null", "geometry": ["feature", "$.geometry"], "eodag:default_geometry": "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))", }, **config.metadata_mapping, } super().__init__(provider, config) # ECMWF providers do not feature any api_endpoint or next_page_query_obj. # Searched is faked by EODAG. self.config.__dict__.setdefault("api_endpoint", "") self.config.pagination.setdefault("next_page_query_obj", "{{}}") # defaut conf for accepting custom query params self.config.__dict__.setdefault( "discover_metadata", { "auto_discovery": False, "search_param": "{metadata}", "metadata_pattern": "^[a-zA-Z0-9][a-zA-Z0-9_]*$", }, )
def do_search( self, prep: PreparedSearch = PreparedSearch(limit=None), **kwargs: Any ) -> RawSearchResult: """Should perform the actual search request. :param args: arguments to be used in the search :param kwargs: keyword arguments to be used in the search :return: list containing the results from the provider in json format """ # no real search. We fake it all raw_search_results = RawSearchResult([{}]) raw_search_results.search_params = kwargs raw_search_results.query_params = ( prep.query_params if hasattr(prep, "query_params") else {} ) raw_search_results.collection_def_params = ( prep.collection_def_params if hasattr(prep, "collection_def_params") else {} ) return raw_search_results def query( self, prep: PreparedSearch = PreparedSearch(), **kwargs: Any, ) -> SearchResult: """Build ready-to-download SearchResult :param prep: :class:`~eodag.plugins.search.PreparedSearch` object containing information needed for the search :param kwargs: keyword arguments to be used in the search :returns: list of products and number of products (optional) """ collection = prep.collection if not collection: collection = kwargs.get("collection") kwargs = self._preprocess_search_params(kwargs) result = super().query(prep, **kwargs) if prep.count and not result.number_matched: result.number_matched = 1 return result def clear(self) -> None: """Clear search context""" super().clear() def build_query_string( self, collection: str, query_dict: dict[str, Any] ) -> tuple[dict[str, Any], str]: """Build The query string using the search parameters :param collection: collection id :param query_dict: keyword arguments to be used in the query string :return: formatted query params and encode query string """ query_dict["_date"] = f"{query_dict.get(START)}/{query_dict.get(END)}" # Reorder kwargs to make sure year/month/day/time if set overwrite default datetime. priority_keys = [ START, END, ] ordered_kwargs = {k: query_dict[k] for k in priority_keys if k in query_dict} ordered_kwargs.update(query_dict) return super().build_query_string( collection=collection, query_dict=ordered_kwargs ) def _preprocess_search_params(self, params: dict[str, Any]) -> dict[str, Any]: """Preprocess search parameters before making a request to the CDS API. This method is responsible for checking and updating the provided search parameters to ensure that required parameters like 'collection', 'start_datetime', 'end_datetime', and 'geometry' are properly set. If not specified in the input parameters, default values or values from the configuration are used. :param params: Search parameters to be preprocessed. """ _dc_qs = params.get("_dc_qs") if _dc_qs is not None: # if available, update search params using datacube query-string _dc_qp = geojson.loads(unquote_plus(unquote_plus(_dc_qs))) if "/to/" in _dc_qp.get("date", ""): params[START], params[END] = _dc_qp["date"].split("/to/") elif "/" in _dc_qp.get("date", ""): (params[START], params[END],) = _dc_qp[ "date" ].split("/") elif _dc_qp.get("date"): params[START] = params[END] = _dc_qp["date"] if "/" in _dc_qp.get("area", ""): params["geometry"] = _dc_qp["area"].split("/") params = { k.removeprefix(ECMWF_PREFIX).removeprefix(f"{ECMWF_PREFIX[:-1]}_"): v for k, v in params.items() if v is not None } # read 'start_datetime' and 'end_datetime' from 'date' range if "date" in params: start_date, end_date = parse_date(params["date"]) params[START] = format_date(start_date) params[END] = format_date(end_date) # adapt end date if it is midnight if END in params: end_date_excluded = getattr(self.config, "end_date_excluded", True) is_datetime = "T" in str(params[END]) end_date = parse_to_utc(params[END]) start_date = parse_to_utc(params[START]) if ( not end_date_excluded and is_datetime and end_date > start_date and end_date == end_date.replace(hour=0, minute=0, second=0, microsecond=0) ): end_date += dt.timedelta(days=-1) params[END] = to_iso_utc_string(end_date) # geometry if "geometry" in params: params["geometry"] = get_geometry_from_various(geometry=params["geometry"]) # check ecmwf geom format if given # ECMWF Polytope uses non-geojson structure for features if "feature" in params: get_geometry_from_ecmwf_feature(params["feature"]) # bounding box in area format if "area" in params: get_geometry_from_ecmwf_area(params["area"]) # single location if "location" in params: get_geometry_from_ecmwf_location(params["location"]) return params def _preprocess_indirect_date_parameters(self, params: dict[str, Any]) -> dict: """ Compute start_datetime / end_datetime from "date", "time", "year", "month", "day" """ indirects: dict[str, Any] = {} # Validate and collect indirect date parameters time = validate_datetime_param( params.get("time"), "time", ["%H%M", "%H:%M", "%H%M%S", "%H:%M:%S"] ) year = validate_datetime_param(params.get("year"), "year", ["%Y"]) month = validate_datetime_param(params.get("month"), "month", ["%m"]) day = validate_datetime_param(params.get("day"), "day", ["%d"]) if time is not None: time = time_values_to_hhmm(time) indirects["time"] = time if year is not None: indirects["year"] = year if month is not None: indirects["month"] = [f"{int(m):02d}" for m in month] if day is not None: indirects["day"] = [f"{int(d):02d}" for d in day] # Compute date range from "date" param (takes precedence) date = params.get("date", None) if date is not None: try: start, end = compute_date_range_from_params(date=date, time=time) indirects["start_datetime"] = start indirects["end_datetime"] = end return indirects except Exception as e: raise ValidationError( 'Malformed parameter "date" (date given "{}", time given: "{}"): {}'.format( params.get("date"), params.get("time"), str(e) ) ) # Compute date range from year/month/day/time params start, end = compute_date_range_from_params( year=year, month=indirects.get("month"), day=indirects.get("day"), time=time, ) if start is not None: indirects["start_datetime"] = start if end is not None: indirects["end_datetime"] = end return indirects def _get_collection_queryables( self, collection: Optional[str], alias: Optional[str], filters: dict[str, Any] ) -> QueryablesDict: """Override to set additional_properties to false.""" default_values: dict[str, Any] = deepcopy( getattr(self.config, "products", {}).get(collection, {}) ) default_values.pop("metadata_mapping", None) default_values.pop("metadata_mapping_from_product", None) filters["collection"] = collection queryables = self.discover_queryables(**{**default_values, **filters}) or {} return QueryablesDict(additional_properties=False, **queryables) def _find_dynamic_queryables_config( self, kwargs: dict[str, Any], dynamic_config: list ) -> dict[str, Any]: """Find the appropriate queryables configuration from dynamic configuration. :param kwargs: Search parameters :param dynamic_config: List of dynamic discover queryables configurations :return: Found queryables configuration or empty dict """ for dc in dynamic_config: for cs in dc["collection_selector"]: field = cs["field"] if kwargs[field].startswith(cs["prefix"]): return dc["discover_queryables"] return {} def _is_discoverable_metadata_key(self, key: str) -> bool: """Check if a key can bypass strict queryables validation via discover_metadata.""" discover_metadata = getattr(self.config, "discover_metadata", None) or {} if not discover_metadata.get("auto_discovery"): return False pattern = discover_metadata.get("metadata_pattern") or "" try: return bool(pattern and re.match(pattern, key)) except re.error: msg = f"Invalid discover_metadata.metadata_pattern for provider {self.provider}: {pattern}" raise MisconfiguredError(msg) def discover_queryables( self, **kwargs: Any, ) -> Optional[dict[str, Annotated[Any, FieldInfo]]]: """Fetch queryables list from provider using its constraints file :param kwargs: additional filters for queryables (`collection` and other search arguments) :returns: fetched queryable parameters dict """ collection = kwargs.pop("collection") col_config = self.get_collection_def_params(collection) default_values = deepcopy(col_config) default_values.pop("metadata_mapping", None) default_values.pop("metadata_mapping_from_product", None) default_values.pop("discover_queryables", None) kwargs.pop("discover_queryables", None) filters = {**default_values, **kwargs} if "start" in filters: filters[START] = filters.pop("start") if "end" in filters: filters[END] = filters.pop("end") # extract default datetime and convert geometry try: processed_filters = self._preprocess_search_params(deepcopy(filters)) except Exception as e: raise ValidationError(e.args[0]) from e # dynamic_discover_queryables for WekeoECMWFSearch queryables_config = {} if dynamic_config := getattr(self.config, "dynamic_discover_queryables", []): queryables_config = self._find_dynamic_queryables_config( kwargs, dynamic_config ) provider_dq = getattr(self.config, "discover_queryables", {}) or {} product_dq = col_config.get("discover_queryables", {}) or {} dq_conf = {**provider_dq, **product_dq, **queryables_config} constraints_url = format_metadata(dq_conf.get("constraints_url", ""), **filters) constraints: list[dict[str, Any]] = self._fetch_data(constraints_url) form_url = format_metadata(dq_conf.get("form_url", ""), **filters) form: list[dict[str, Any]] = self._fetch_data(form_url) formated_filters = self.format_as_provider_keyword( collection, deepcopy(processed_filters) ) # we re-apply kwargs input to consider override of year, month, day and time. for k, v in {**default_values, **kwargs}.items(): key = k.removeprefix(ECMWF_PREFIX).removeprefix(f"{ECMWF_PREFIX[:-1]}_") if key not in ALLOWED_KEYWORDS | { START, END, "geometry", } and not self._is_discoverable_metadata_key(key): raise ValidationError( f"'{key}' is not a queryable parameter for {self.provider}", {key} ) formated_filters[key] = v # we use non empty filters as default to integrate user inputs # it is needed because pydantic json schema does not represent "value" # but only "default" non_empty_formated: dict[str, Any] = { k: v for k, v in formated_filters.items() if v and (not isinstance(v, list) or all(v)) } required_keywords: set[str] = set() # calculate available values if constraints: # Apply constraints filtering available_values = self.available_values_from_constraints( constraints, non_empty_formated, form_keywords=[f["name"] for f in form], ) # Pre-compute the required keywords (present in all constraint dicts) # when form, required keywords are extracted directly from form if not form: required_keywords = set.intersection( *(map(lambda d: set(d.keys()), constraints)) ) else: values_url = getattr(self.config, "available_values_url", "") if not values_url: return self.queryables_from_metadata_mapping(collection) if "{" in values_url: values_url = format_string(None, values_url, **filters) data = self._fetch_data(values_url) available_values = data["constraints"] required_keywords = data.get("required", []) # To check if all keywords are queryable parameters, we check if they are in the # available values or the collection config (available values calculated from the # constraints might not include all queryables) for keyword in processed_filters: if ( keyword not in available_values.keys() | col_config.keys() | { START, END, "geometry", } and keyword not in [f["name"] for f in form] and keyword not in set(list(available_values.keys()) + [f["name"] for f in form]) and not self._is_discoverable_metadata_key(keyword) ): raise ValidationError("'%s' is not a queryable parameter" % keyword) # generate queryables if form: queryables = self.queryables_by_form( form, available_values, non_empty_formated, ) else: queryables = self.queryables_by_values( available_values, list(required_keywords), non_empty_formated ) # ecmwf:date is replaced by start and end. # start and end filters are supported whenever combinations of "year", "month", "day" filters exist queryable_prefix = f"{ECMWF_PREFIX[:-1]}_" if ( f"{queryable_prefix}date" in queryables or f"{queryable_prefix}year" in queryables or f"{queryable_prefix}hyear" in queryables ): queryables.update( { "start": Queryables.get_with_default( "start", processed_filters.get(START) ), "end": Queryables.get_with_default( "end", processed_filters.get(END), ), } ) # area is geom in EODAG. if queryables.pop("area", None): queryables["geom"] = Queryables.get_with_default("geom", None) return queryables def available_values_from_constraints( self, constraints: list[dict[str, Any]], input_keywords: dict[str, Any], form_keywords: list[str], ) -> dict[str, list[str]]: """ Filter constraints using input_keywords. Return list of available queryables. All constraint entries must have the same parameters. :param constraints: list of constraints received from the provider :param input_keywords: dict of input parameters given by the user :param form_keywords: list of keyword names from the provider form endpoint :return: dict with available values for each parameter """ # get ordered constraint keywords constraints_keywords = list( OrderedDict.fromkeys(k for c in constraints for k in c.keys()) ) # prepare ordered input keywords formatted as provider's keywords # required to filter with constraints ordered_keywords = ( [kw for kw in form_keywords if kw in constraints_keywords] if form_keywords else constraints_keywords ) # filter constraint entries matching input keyword values filtered_constraints: list[dict[str, Any]] parsed_keywords: list[str] = [] for keyword in ordered_keywords: values = input_keywords.get(keyword) if values is None: parsed_keywords.append(keyword) continue # we only compare list of strings. if isinstance(values, dict): raise ValidationError( f"Parameter value as object is not supported: {keyword}={values}", {keyword}, ) # We convert every single value to a list of string filter_v = list(values) if isinstance(values, tuple) else values filter_v = filter_v if isinstance(filter_v, list) else [filter_v] filter_v = [str(v) for v in filter_v] # We strip values of superfluous quotes (added by mapping converter to_geojson). # ECMWF accept date ranges with /to/. We need to split it to an array # ECMWF accept date ranges in format val1/val2. We need to split it to an array date_regex = [ re.compile(p) for p in (DATE_RANGE_PATTERN, COMPACT_DATE_RANGE_PATTERN) ] is_date = any( any(r.match(v) is not None for r in date_regex) for v in filter_v ) if is_date: sep = re.compile(r"/to/|/") filter_v = [i for v in filter_v for i in sep.split(str(v))] # special handling for time 0000 converted to 0 by pre-formating with metadata_mapping if keyword.split(":")[-1] == "time": filter_v = ["0000" if str(v) == "0" else v for v in filter_v] # Collect missing values to report errors missing_values = set(filter_v) # Filter constraints and check for missing values filtered_constraints = [] # True if some constraint is defined for this keyword. # In other words: if no constraint defines a list of values # then any value is allowed for this keyword keyword_constrained = False for entry in constraints: # Filter based on the presence of any value in filter_v entry_values = entry.get(keyword, []) if entry_values: keyword_constrained = True # date constraint may be intervals. We identify intervals with a "/" in the value. # date constraint can be a mixed list of single values (e.g "2023-06-27") # and intervals (e.g. "2024-11-12/2025-11-20"). # collections with mixed values: CAMS_GAC_FORECAST, CAMS_EU_AIR_QUALITY_FORECAST present_values = [] for entry_value in entry_values: if keyword == "date" and "/" in entry_value: input_range = values if isinstance(values, list): input_range = values[0] if "/" not in input_range: input_range = f"{input_range}/{input_range}" if is_range_in_range(entry_value, input_range): present_values.extend(filter_v) else: new_values = [ value for value in filter_v if value == entry_value ] present_values.extend(new_values) # Remove present values from the missing_values set missing_values -= set(present_values) if present_values: filtered_constraints.append(entry) # raise an error as no constraint entry matched the input keywords # raise an error if one value from input is not allowed if keyword_constrained and (not filtered_constraints or missing_values): allowed_values = list( {value for c in constraints for value in c.get(keyword, [])} ) # restore ecmwf: prefix before raising error keyword = ECMWF_PREFIX + keyword all_keywords_str = "" if len(parsed_keywords) > 1: keywords = [ f"{ECMWF_PREFIX + k}={pk}" for k in parsed_keywords if (pk := input_keywords.get(k)) ] all_keywords_str = f" with {', '.join(keywords)}" raise ValidationError( f"{keyword}={values} is not available" f"{all_keywords_str}." f" Allowed values are {', '.join(allowed_values)}.", set( [keyword] + [k for k in parsed_keywords if k in input_keywords] ), ) parsed_keywords.append(keyword) # if the keyword is not constrained then any value is allowed if keyword_constrained: constraints = filtered_constraints available_values: dict[str, Any] = {k: set() for k in ordered_keywords} # we aggregate the constraint entries left for entry in constraints: for key, value in entry.items(): available_values[key].update(value) return {k: list(v) for k, v in available_values.items()} def queryables_by_form( self, form: list[dict[str, Any]], available_values: dict[str, list[str]], defaults: dict[str, Any], ) -> dict[str, Annotated[Any, FieldInfo]]: """ Generate Annotated field definitions from form entries and available values Used by Copernicus services like cop_cds, cop_ads, cop_ewds. :param form: data fetched from the form endpoint of the provider :param available_values: available values for each parameter :param defaults: default values for the parameters :return: dict of annotated queryables """ queryables: dict[str, Annotated[Any, FieldInfo]] = {} required_list: list[str] = [] for element in form: name: str = element["name"] # those are not parameter elements. if name in ("area_group", "global", "warning", "licences"): continue if "type" not in element or element["type"] == "FreeEditionWidget": # FreeEditionWidget used to select the whole available region # and to provide comments for the dataset continue # ordering done by id -> set id to high value if not present -> element will be last if "id" not in element: element["id"] = 100 prop = {"title": element.get("label", name)} details = element.get("details", {}) # add values from form if keyword was not in constraints values = ( available_values[name] if name in available_values else details.get("values") ) # updates the properties with the values given based on the information from the element _update_properties_from_element(prop, element, values) default = defaults.get(name) if details: fields = details.get("fields") if fields and (comment := fields[0].get("comment")): prop["description"] = comment if name == "area" and isinstance(default, dict): default = list(default.values()) # sometimes form returns default as array instead of string if default and prop.get("type") == "string" and isinstance(default, list): default = ",".join(default) is_required: bool if available_values.get(name): # required by the filtered constraints (available_values[name] is a not empty list) is_required = True elif bool(element.get("required")): if name in available_values and not available_values[name]: # not required by the filtered constraints (available_values[name] is an empty list) is_required = False else: # required only by form is_required = True else: # not required by form is_required = False if is_required: required_list.append(name) formatted_param = ecmwf_format(name, alias=False) formatted_alias = ecmwf_format(name) queryables[formatted_param] = Annotated[ get_args( json_field_definition_to_python( prop, default_value=default, required=is_required, validation_alias=AliasChoices(formatted_alias, name), serialization_alias=formatted_alias, ) ) ] return queryables def queryables_by_values( self, available_values: dict[str, list[str]], required_keywords: list[str], defaults: dict[str, Any], ) -> dict[str, Annotated[Any, FieldInfo]]: """ Generate Annotated field definitions from available values. Used by ECMWF data providers like dedt_lumi. :param available_values: available values for each parameter :param required_keywords: list of required parameters :param defaults: default values for the parameters :return: dict of annotated queryables """ # Rename keywords from form with metadata mapping. # Needed to map constraints like "xxxx" to eodag parameter "ecmwf:xxxx" required = [ecmwf_format(k) for k in required_keywords] # noqa: F841 queryables: dict[str, Annotated[Any, FieldInfo]] = {} for name, values in available_values.items(): # Rename keywords from form with metadata mapping. # Needed to map constraints like "xxxx" to eodag parameter "ecmwf:xxxx" formatted_param = ecmwf_format(name, alias=False) formatted_alias = ecmwf_format(name) queryables[formatted_param] = Annotated[ get_args( json_field_definition_to_python( {"type": "string", "title": name, "enum": values}, default_value=defaults.get(name), required=bool(formatted_alias in required), validation_alias=AliasChoices(formatted_alias, name), serialization_alias=formatted_alias, ) ) ] return queryables def format_as_provider_keyword( self, collection: str, properties: dict[str, Any] ) -> dict[str, Any]: """Return provider equivalent keyword names from EODAG keywords. :param collection: collection id :param properties: dict of properties to be formatted :return: dict of formatted properties """ properties["collection"] = collection # provider collection specific conf collection_def_params = self.get_collection_def_params( collection, format_variables=properties ) # Add to the query, the queryable parameters set in the provider collection definition properties.update( { k: v for k, v in collection_def_params.items() if k not in properties.keys() and k in self.config.metadata_mapping.keys() and isinstance(self.config.metadata_mapping[k], list) } ) qp, _ = self.build_query_string(collection, properties) return qp @instance_cached_method() def _fetch_data(self, url: str) -> Any: """ fetches from a provider elements like constraints or forms. :param url: url from which the constraints can be fetched :returns: json file content fetched from the provider """ if not url: return [] auth = ( self.auth if hasattr(self, "auth") and isinstance(self.auth, AuthBase) else None ) timeout = getattr(self.config, "timeout", DEFAULT_SEARCH_TIMEOUT) return fetch_json(url, auth=auth, timeout=timeout) def normalize_results( self, results: RawSearchResult, **kwargs: Any ) -> list[EOProduct]: """Build :class:`~eodag.api.product._product.EOProduct` from provider result :param results: Raw provider result as single dict in list :param kwargs: Search arguments :returns: list of single :class:`~eodag.api.product._product.EOProduct` """ collection = kwargs.get("collection") result = results[0] # datacube query string got from previous search _dc_qs = kwargs.pop("_dc_qs", None) if _dc_qs is not None: qs = unquote_plus(unquote_plus(_dc_qs)) sorted_unpaginated_qp = geojson.loads(qs) else: sorted_unpaginated_qp = dict_items_recursive_sort(results.query_params) # remove unwanted query params for param in getattr(self.config, "remove_from_query", []): sorted_unpaginated_qp.pop(param, None) if result: properties = result properties.update(result.pop("eodag:request_params", None) or {}) properties = {k: v for k, v in properties.items() if not k.startswith("__")} properties["geometry"] = properties.get("area") or DEFAULT_GEOMETRY start, end = ecmwf_temporal_to_eodag(properties) properties["start_datetime"] = start properties["end_datetime"] = end else: # use all available query_params to parse properties result_data: dict[str, Any] = { **results.collection_def_params, **sorted_unpaginated_qp, **{"qs": sorted_unpaginated_qp}, } # update result with collection_def_params and search args if not None (and not auth) kwargs.pop("auth", None) result_data.update(results.collection_def_params) result_data = { **result_data, **{k: v for k, v in kwargs.items() if v is not None}, } # start_datetime / computed from "date", "time", "year", "month", "day" indirects = self._preprocess_indirect_date_parameters(result_data) for key in (START, END): if result_data.get(key) is None and indirects.get(key) is not None: result_data[key] = indirects[key] properties = properties_from_json( result_data, self.config.metadata_mapping, discovery_config=getattr(self.config, "discover_metadata", {}), ) query_hash = hashlib.sha1(str(result_data).encode("UTF-8")).hexdigest() properties["title"] = properties["id"] = ( (collection or kwargs.get("dataset", self.provider)).upper() + "_ORDERABLE_" + query_hash ) # collection alias (required by opentelemetry-instrumentation-eodag) if alias := getattr(self.config, "collection_config", {}).get("alias"): kwargs["collection"] = alias # Convert ecmwf geometries for properties but keep original in qs # ECMWF Polytope uses non-geojson structure for features if "feature" in sorted_unpaginated_qp: properties["geometry"] = get_geometry_from_ecmwf_feature( sorted_unpaginated_qp["feature"] ) properties.pop("feature", None) # bounding box in area format if "area" in sorted_unpaginated_qp: properties["geometry"] = get_geometry_from_ecmwf_area( sorted_unpaginated_qp["area"] ) properties.pop("area", None) # single location if "location" in sorted_unpaginated_qp: properties["geometry"] = get_geometry_from_ecmwf_location( sorted_unpaginated_qp["location"] ) properties.pop("location", None) qs = geojson.dumps(sorted_unpaginated_qp) # used by server mode to generate eodag:download_link href # TODO: to remove once the legacy server is removed properties["_dc_qs"] = quote_plus(qs) product = EOProduct( provider=self.provider, properties={ecmwf_format(k): v for k, v in properties.items()}, **kwargs, ) # backup original register_downloader to register_downloader_only product.register_downloader_only = product.register_downloader # patched register_downloader that will also update properties product.register_downloader = MethodType(patched_register_downloader, product) # type: ignore[method-assign] return [product] def count_hits( self, count_url: Optional[str] = None, result_type: Optional[str] = None ) -> int: """Count method that will always return 1. :param count_url: not used, only here because this method overwrites count_hits from the parent class :param result_type: not used, only here because this method overwrites count_hits from the parent class :return: always 1 """ return 1
def _check_id(product: EOProduct) -> EOProduct: """Check if the id is the one of an existing job. If the job exists, poll it, otherwise, raise an error. :param product: The product to check the id for :raises: :class:`~eodag.utils.exceptions.ValidationError` """ if not (product_id := product.search_kwargs.get("id")): return product if "ORDERABLE" in product_id: return product if product.downloader is None: return product on_response_mm = getattr(product.downloader.config, "order_on_response", {}).get( "metadata_mapping", {} ) if not on_response_mm: return product logger.debug(f"Update product properties using given eodag:order_id {product_id}") on_response_mm_jsonpath = mtd_cfg_as_conversion_and_querypath( on_response_mm, ) properties_update = properties_from_json( {}, {**on_response_mm_jsonpath, **{"eodag:order_id": (None, product_id)}} ) product.properties.update( {k: v for k, v in properties_update.items() if v != NOT_AVAILABLE} ) auth = product.downloader_auth.authenticate() if product.downloader_auth else None # try to poll the job corresponding to the given id try: product.downloader._order_status(product=product, auth=auth) # type: ignore # when a NotAvailableError is catched, it means the product is not ready and still needs to be polled except NotAvailableError: product.properties["order:status"] = STAGING_STATUS except Exception as e: if ( isinstance(e, DownloadError) or isinstance(e, ValidationError) ) and "order status could not be checked" in e.args[0]: raise ValidationError( f"Requested data is not available on {product.provider} ({product_id})." ) from e raise ValidationError(e.args[0]) from e # update product id product.properties["id"] = product_id # update collection if needed if product.collection is None: product.collection = product.properties.get("ecmwf:dataset") # update product title product.properties["title"] = ( (product.collection or product.provider).upper() + "_" + product_id ) # use NOT_AVAILABLE as fallback collection to avoid using guess_collection if product.collection is None: product.collection = NOT_AVAILABLE return product def patched_register_downloader(self, downloader, authenticator): """Register product donwloader and update properties if searched by id. :param self: product to which information should be added :param downloader: The download method that it can use :class:`~eodag.plugins.download.base.Download` or :class:`~eodag.plugins.api.base.Api` :param authenticator: The authentication method needed to perform the download :class:`~eodag.plugins.authentication.base.Authentication` """ # register downloader self.register_downloader_only(downloader, authenticator) # and also update properties _check_id(self)
[docs] class MeteoblueSearch(ECMWFSearch): """MeteoblueSearch search plugin. This plugin, which inherits from :class:`~eodag.plugins.search.build_search_result.ECMWFSearch`, performs a POST request and uses its result to build a single :class:`~eodag.api.search_result.SearchResult` object. The available configuration parameters are inherited from parent classes, with some a particularity for pagination for this plugin. :param provider: An eodag providers configuration dictionary :param config: Search plugin configuration: * :attr:`~eodag.config.PluginConfig.pagination` (:class:`~eodag.config.PluginConfig.Pagination`) (**mandatory**): The configuration of how the pagination is done on the provider. For this plugin it has the node: * :attr:`~eodag.config.PluginConfig.Pagination.next_page_query_obj` (``str``): The additional parameters needed to perform search. These parameters won't be included in the result. This must be a json dict formatted like ``{{"foo":"bar"}}`` because it will be passed to a :meth:`str.format` method before being loaded as json. """ def collect_search_urls( self, prep: PreparedSearch = PreparedSearch(), **kwargs: Any, ) -> tuple[list[str], int]: """Wraps PostJsonSearch.collect_search_urls to force product count to 1 :param prep: :class:`~eodag.plugins.search.PreparedSearch` object containing information for the search :param kwargs: keyword arguments used in the search :return: list of search url and number of results """ urls, _ = super().collect_search_urls(prep, **kwargs) return urls, 1 def do_search( self, prep: PreparedSearch = PreparedSearch(limit=None), **kwargs: Any ) -> RawSearchResult: """Perform the actual search request, and return result in a single element. :param prep: :class:`~eodag.plugins.search.PreparedSearch` object containing information for the search :param kwargs: keyword arguments to be used in the search :return: list containing the results from the provider in json format """ prep.url = prep.search_urls[0] prep.info_message = f"Sending search request: {prep.url}" prep.exception_message = ( f"Skipping error while searching for {self.provider}" f" {self.__class__.__name__} instance" ) response = self._request(prep) raw_search_results = RawSearchResult([response.json()]) raw_search_results.search_params = kwargs raw_search_results.query_params = prep.query_params raw_search_results.collection_def_params = prep.collection_def_params return raw_search_results def build_query_string( self, collection: str, query_dict: dict[str, Any] ) -> tuple[dict[str, Any], str]: """Build The query string using the search parameters :param collection: collection id :param query_dict: keyword arguments to be used in the query string :return: formatted query params and encode query string """ return QueryStringSearch.build_query_string(self, collection, query_dict) def normalize_results(self, results, **kwargs): """Build :class:`~eodag.api.product._product.EOProduct` from provider result :param results: Raw provider result as single dict in list :param kwargs: Search arguments :returns: list of single :class:`~eodag.api.product._product.EOProduct` """ collection = kwargs.get("collection") result = results[0] # datacube query string got from previous search _dc_qs = kwargs.pop("_dc_qs", None) if _dc_qs is not None: qs = unquote_plus(unquote_plus(_dc_qs)) sorted_unpaginated_query_params = geojson.loads(qs) else: next_page_query_obj = orjson.loads( self.config.pagination["next_page_query_obj"].format() ) unpaginated_query_params = { k: v for k, v in results.query_params.items() if (k, v) not in next_page_query_obj.items() } # query hash, will be used to build a product id sorted_unpaginated_query_params = dict_items_recursive_sort( unpaginated_query_params ) # use all available query_params to parse properties result = dict( result, **sorted_unpaginated_query_params, qs=sorted_unpaginated_query_params, ) qs = geojson.dumps(sorted_unpaginated_query_params) query_hash = hashlib.sha1(str(qs).encode("UTF-8")).hexdigest() # update result with collection_def_params and search args if not None (and not auth) kwargs.pop("auth", None) result.update(results.collection_def_params) result = dict(result, **{k: v for k, v in kwargs.items() if v is not None}) # parse properties parsed_properties = properties_from_json( result, self.config.metadata_mapping, discovery_config=getattr(self.config, "discover_metadata", {}), ) properties = {ecmwf_format(k): v for k, v in parsed_properties.items()} # collection alias (required by opentelemetry-instrumentation-eodag) if alias := getattr(self.config, "collection_config", {}).get("alias"): collection = alias def slugify(date_str: str) -> str: return date_str.split("T")[0].replace("-", "") # build product id product_id = (collection or self.provider).upper() start = properties.get(START, NOT_AVAILABLE) end = properties.get(END, NOT_AVAILABLE) if start != NOT_AVAILABLE: product_id += f"_{slugify(start)}" if end != NOT_AVAILABLE: product_id += f"_{slugify(end)}" product_id += f"_{query_hash}" properties["id"] = properties["title"] = product_id # used by server mode to generate eodag:download_link href properties["_dc_qs"] = quote_plus(qs) product = EOProduct( provider=self.provider, collection=collection, properties=properties, ) return [ product, ]
[docs] class WekeoECMWFSearch(ECMWFSearch): """ WekeoECMWFSearch search plugin. This plugin, which inherits from :class:`~eodag.plugins.search.build_search_result.ECMWFSearch`, performs a POST request and uses its result to build a single :class:`~eodag.api.search_result.SearchResult` object. In contrast to ECMWFSearch or MeteoblueSearch, the products are only build with information returned by the provider. The available configuration parameters are inherited from parent classes, with some a particularity for pagination for this plugin. :param provider: An eodag providers configuration dictionary :param config: Search plugin configuration: * :attr:`~eodag.config.PluginConfig.pagination` (:class:`~eodag.config.PluginConfig.Pagination`) (**mandatory**): The configuration of how the pagination is done on the provider. For this plugin it has the node: * :attr:`~eodag.config.PluginConfig.Pagination.next_page_query_obj` (``str``): The additional parameters needed to perform search. These parameters won't be included in the result. This must be a json dict formatted like ``{{"foo":"bar"}}`` because it will be passed to a :meth:`str.format` method before being loaded as json. """ def normalize_results( self, results: RawSearchResult, **kwargs: Any ) -> list[EOProduct]: """Build :class:`~eodag.api.product._product.EOProduct` from provider result :param results: Raw provider result as single dict in list :param kwargs: Search arguments :returns: list of single :class:`~eodag.api.product._product.EOProduct` """ if kwargs.get("id") and "ORDERABLE" not in kwargs["id"]: # id is order id (only letters and numbers) -> use parent normalize results return super().normalize_results(results, **kwargs) # formating of eodag:order_link requires access to the collection value. results.data = [ {**result, **results.collection_def_params} for result in results ] normalized = QueryStringSearch.normalize_results(self, results, **kwargs) if not normalized: return normalized # remove unwanted query params excluded_query_params = getattr(self.config, "remove_from_query", []) filtered_query_params = { k: v for k, v in results.query_params.items() if k not in excluded_query_params } for product in normalized: properties = {**product.properties, **results.query_params} properties["_dc_qs"] = quote_plus(orjson.dumps(filtered_query_params)) product.properties = {ecmwf_format(k): v for k, v in properties.items()} # update product and title the same way as in parent class splitted_id = product.properties.get("title", "").split("-") dataset = "_".join(splitted_id[:-1]) query_hash = splitted_id[-1] product.properties["title"] = product.properties["id"] = ( (product.collection or dataset or self.provider).upper() + "_ORDERABLE_" + query_hash ) return normalized def do_search( self, prep: PreparedSearch = PreparedSearch(limit=None), **kwargs: Any ) -> RawSearchResult: """Should perform the actual search request. :param args: arguments to be used in the search :param kwargs: keyword arguments to be used in the search :return: list containing the results from the provider in json format """ if "id" in kwargs and "ORDERABLE" not in kwargs["id"]: # id is order id (only letters and numbers) -> use parent normalize results. # No real search. We fake it all, then check order status using given id raw_search_results = RawSearchResult([{}]) raw_search_results.search_params = kwargs raw_search_results.query_params = ( prep.query_params if hasattr(prep, "query_params") else {} ) raw_search_results.collection_def_params = ( prep.collection_def_params if hasattr(prep, "collection_def_params") else {} ) return raw_search_results else: return QueryStringSearch.do_search(self, prep, **kwargs)