# -*- coding: utf-8 -*-
# Copyright 2022, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import datetime as dt
import hashlib
import logging
import re
from collections import OrderedDict
from types import MethodType
from typing import TYPE_CHECKING, Annotated, Any, Optional
from urllib.parse import quote_plus, unquote_plus
import geojson
import orjson
from pydantic import AliasChoices
from pydantic.fields import FieldInfo
from requests.auth import AuthBase
from typing_extensions import get_args # noqa: F401
from eodag.api.product import EOProduct
from eodag.api.product.metadata_mapping import (
DEFAULT_GEOMETRY,
NOT_AVAILABLE,
OFFLINE_STATUS,
STAGING_STATUS,
format_metadata,
mtd_cfg_as_conversion_and_querypath,
properties_from_json,
)
from eodag.api.search_result import RawSearchResult, SearchResult
from eodag.plugins.search import PreparedSearch
from eodag.plugins.search.qssearch import PostJsonSearch, QueryStringSearch
from eodag.types import json_field_definition_to_python # noqa: F401
from eodag.types.queryables import Queryables, QueryablesDict
from eodag.types.stac_extensions import EcmwfItemProperties
from eodag.utils import (
DEFAULT_SEARCH_TIMEOUT,
deepcopy,
dict_items_recursive_sort,
format_string,
get_geometry_from_ecmwf_area,
get_geometry_from_ecmwf_feature,
get_geometry_from_ecmwf_location,
get_geometry_from_various,
)
from eodag.utils.cache import instance_cached_method
from eodag.utils.dates import (
COMPACT_DATE_RANGE_PATTERN,
DATE_RANGE_PATTERN,
compute_date_range_from_params,
format_date,
is_range_in_range,
parse_date,
parse_to_utc,
parse_year_month_day,
time_values_to_hhmm,
to_iso_utc_string,
validate_datetime_param,
)
from eodag.utils.exceptions import (
DownloadError,
MisconfiguredError,
NotAvailableError,
ValidationError,
)
from eodag.utils.requests import fetch_json
if TYPE_CHECKING:
from eodag.config import PluginConfig
logger = logging.getLogger("eodag.search.build_search_result")
ECMWF_PREFIX = "ecmwf:"
ALLOWED_KEYWORDS = set(
[k.replace("ecmwf_", "") for k in EcmwfItemProperties.model_fields.keys()]
)
END = "end_datetime"
START = "start_datetime"
def ecmwf_mtd() -> dict[str, Any]:
"""
Make metadata mapping dict from a list of defined ECMWF Keywords
We automatically add the #to_geojson convert to prevent modification of entries by eval() in the metadata mapping.
keyword:
- keyword
- $."keyword"#to_geojson
:return: metadata mapping dict
"""
return {k: [k, f'{{$."{k}"#to_geojson}}'] for k in ALLOWED_KEYWORDS}
def _update_properties_from_element(
prop: dict[str, Any], element: dict[str, Any], values: list[str]
) -> None:
"""updates a property dict with the given values based on the information from the element dict
e.g. the type is set based on the type of the element
"""
# multichoice elements are transformed into array
if element["type"] in ("StringListWidget", "StringListArrayWidget"):
prop["type"] = "array"
if values:
prop["items"] = {"type": "string", "enum": sorted(values)}
# single choice elements are transformed into string
elif element["type"] in (
"StringChoiceWidget",
"DateRangeWidget",
"FreeformInputWidget",
):
prop["type"] = "string"
if values:
prop["enum"] = sorted(values)
# a bbox element
elif element["type"] in ["GeographicExtentWidget", "GeographicExtentMapWidget"]:
prop.update(
{
"type": "array",
"minItems": 4,
"additionalItems": False,
"items": [
{
"type": "number",
"maximum": 90,
"minimum": -90,
"description": "North border of the bounding box",
},
{
"type": "number",
"maximum": 180,
"minimum": -180,
"description": "West border of the bounding box",
},
{
"type": "number",
"maximum": 90,
"minimum": -90,
"description": "South border of the bounding box",
},
{
"type": "number",
"maximum": 180,
"minimum": -180,
"description": "East border of the bounding box",
},
],
}
)
# DateRangeWidget is a calendar date picker
if element["type"] == "DateRangeWidget":
prop["description"] = "date formatted like yyyy-mm-dd/yyyy-mm-dd"
# a single geographic location
if element["type"] == "GeographicLocationWidget":
prop.update(
{
"type": "object",
"description": "Longitude and latitude of a single location",
"properties": {
"longitude": {
"type": "number",
"maximum": 180,
"minimum": -180,
},
"latitude": {
"type": "number",
"maximum": 90,
"minimum": -90,
},
},
}
)
if description := element.get("help"):
prop["description"] = description
def ecmwf_format(v: str, alias: bool = True) -> str:
"""Add ECMWF prefix to value v if v is a ECMWF keyword.
:param v: parameter to format
:param alias: whether to format for alias (with ':') or for query param (False, with '_')
:return: formatted parameter
>>> ecmwf_format('dataset', alias=False)
'ecmwf_dataset'
>>> ecmwf_format('variable')
'ecmwf:variable'
>>> ecmwf_format('unknown_param')
'unknown_param'
"""
separator = ":" if alias else "_"
return f"{ECMWF_PREFIX[:-1]}{separator}{v}" if v in ALLOWED_KEYWORDS else v
def ecmwf_temporal_to_eodag(
params: dict[str, Any],
) -> tuple[Optional[str], Optional[str]]:
"""
Converts ECMWF temporal parameters to EODAG temporal parameters.
ECMWF temporal parameters:
- **year** or **hyear**: Union[str, list[str]] — Year(s) as a string or list of strings.
- **month** or **hmonth**: Union[str, list[str]] — Month(s) as a string or list of strings.
- **day** or **hday**: Union[str, list[str]] — Day(s) as a string or list of strings.
- **time**: str — A string representing the time in the format `HHMM` (e.g., `0200`, `0800`, `1400`).
- **date**: str — A string in one of the formats:
- `YYYY-MM-DD`
- `YYYY-MM-DD/YYYY-MM-DD`
- `YYYY-MM-DD/to/YYYY-MM-DD`
:param params: Dictionary containing ECMWF temporal parameters.
:return: A tuple with:
- **start**: A string in the format `YYYY-MM-DDTHH:MM:SSZ`.
- **end**: A string in the format `YYYY-MM-DDTHH:MM:SSZ`.
"""
start = end = None
if date := params.get("date"):
if isinstance(date, list):
date = "/".join(date)
start, end = parse_date(date, params.get("time"))
elif year := (params.get("year") or params.get("hyear")):
month = params.get("month") or params.get("hmonth")
day = params.get("day") or params.get("hday")
time = params.get("time")
start, end = parse_year_month_day(year, month, day, time)
if start and end:
return to_iso_utc_string(start), to_iso_utc_string(end)
else:
return None, None
[docs]
class ECMWFSearch(PostJsonSearch):
"""ECMWF search plugin.
This plugin builds a :class:`~eodag.api.search_result.SearchResult` containing a single product
using given query parameters as product properties.
The available configuration parameters inherits from parent classes, with some particular parameters
for this plugin.
:param provider: An eodag providers configuration dictionary
:param config: Search plugin configuration:
* :attr:`~eodag.config.PluginConfig.remove_from_query` (``list[str]``): List of parameters
used to parse metadata but that must not be included to the query
* :attr:`~eodag.config.PluginConfig.end_date_excluded` (``bool``): Set to `False` if
provider does not include end date to search
* :attr:`~eodag.config.PluginConfig.discover_queryables`
(:class:`~eodag.config.PluginConfig.DiscoverQueryables`): configuration to fetch the queryables from a
provider queryables endpoint; It has the following keys:
* :attr:`~eodag.config.PluginConfig.DiscoverQueryables.fetch_url` (``str``): url to fetch the queryables valid
for all collections
* :attr:`~eodag.config.PluginConfig.DiscoverQueryables.collection_fetch_url` (``str``): url to fetch the
queryables for a specific collection
* :attr:`~eodag.config.PluginConfig.DiscoverQueryables.constraints_url` (``str``): url of the constraint file
used to build queryables
* :attr:`~eodag.config.PluginConfig.dynamic_discover_queryables`
(``list`` [:class:`~eodag.config.PluginConfig.DynamicDiscoverQueryables`]): list of configurations to fetch
the queryables from different provider queryables endpoints. A configuration is used based on the given
selection criterias. The first match is used. If no match is found, it falls back to standard behaviors
(e.g. discovery using :attr:`~eodag.config.PluginConfig.discover_queryables`).
Each element of the list has the following keys:
* :attr:`~eodag.config.PluginConfig.DynamicDiscoverQueryables.collection_selector`
(``list`` [:class:`~eodag.config.PluginConfig.CollectionSelector`]): list of collection selection
criterias. The configuration given in
:attr:`~eodag.config.PluginConfig.DynamicDiscoverQueryables.discover_queryables` is used if any collection
selector matches the search parameters. The selector matches if the field value starts with the given
prefix, i.e. it matches if ``parameters[field].startswith(prefix)==True``. It has the following keys:
* :attr:`~eodag.config.PluginConfig.CollectionSelector.field` (``str``) Field in the search parameters to
match
* :attr:`~eodag.config.PluginConfig.CollectionSelector.prefix` (``str``) Prefix to match in the field
* :attr:`~eodag.config.PluginConfig.DynamicDiscoverQueryables.discover_queryables`
(``list`` [:class:`~eodag.config.PluginConfig.DiscoverQueryables`]): same as
:attr:`~eodag.config.PluginConfig.discover_queryables` above.
"""
[docs]
def __init__(self, provider: str, config: PluginConfig) -> None:
config.metadata_mapping = {
**ecmwf_mtd(),
**{
"id": "$.id",
"title": "$.id",
"order:status": OFFLINE_STATUS,
"eodag:download_link": "$.null",
"geometry": ["feature", "$.geometry"],
"eodag:default_geometry": "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))",
},
**config.metadata_mapping,
}
super().__init__(provider, config)
# ECMWF providers do not feature any api_endpoint or next_page_query_obj.
# Searched is faked by EODAG.
self.config.__dict__.setdefault("api_endpoint", "")
self.config.pagination.setdefault("next_page_query_obj", "{{}}")
# defaut conf for accepting custom query params
self.config.__dict__.setdefault(
"discover_metadata",
{
"auto_discovery": False,
"search_param": "{metadata}",
"metadata_pattern": "^[a-zA-Z0-9][a-zA-Z0-9_]*$",
},
)
def do_search(
self, prep: PreparedSearch = PreparedSearch(limit=None), **kwargs: Any
) -> RawSearchResult:
"""Should perform the actual search request.
:param args: arguments to be used in the search
:param kwargs: keyword arguments to be used in the search
:return: list containing the results from the provider in json format
"""
# no real search. We fake it all
raw_search_results = RawSearchResult([{}])
raw_search_results.search_params = kwargs
raw_search_results.query_params = (
prep.query_params if hasattr(prep, "query_params") else {}
)
raw_search_results.collection_def_params = (
prep.collection_def_params if hasattr(prep, "collection_def_params") else {}
)
return raw_search_results
def query(
self,
prep: PreparedSearch = PreparedSearch(),
**kwargs: Any,
) -> SearchResult:
"""Build ready-to-download SearchResult
:param prep: :class:`~eodag.plugins.search.PreparedSearch` object containing information needed for the search
:param kwargs: keyword arguments to be used in the search
:returns: list of products and number of products (optional)
"""
collection = prep.collection
if not collection:
collection = kwargs.get("collection")
kwargs = self._preprocess_search_params(kwargs)
result = super().query(prep, **kwargs)
if prep.count and not result.number_matched:
result.number_matched = 1
return result
def clear(self) -> None:
"""Clear search context"""
super().clear()
def build_query_string(
self, collection: str, query_dict: dict[str, Any]
) -> tuple[dict[str, Any], str]:
"""Build The query string using the search parameters
:param collection: collection id
:param query_dict: keyword arguments to be used in the query string
:return: formatted query params and encode query string
"""
query_dict["_date"] = f"{query_dict.get(START)}/{query_dict.get(END)}"
# Reorder kwargs to make sure year/month/day/time if set overwrite default datetime.
priority_keys = [
START,
END,
]
ordered_kwargs = {k: query_dict[k] for k in priority_keys if k in query_dict}
ordered_kwargs.update(query_dict)
return super().build_query_string(
collection=collection, query_dict=ordered_kwargs
)
def _preprocess_search_params(self, params: dict[str, Any]) -> dict[str, Any]:
"""Preprocess search parameters before making a request to the CDS API.
This method is responsible for checking and updating the provided search parameters
to ensure that required parameters like 'collection', 'start_datetime',
'end_datetime', and 'geometry' are properly set. If not specified
in the input parameters, default values or values from the configuration are used.
:param params: Search parameters to be preprocessed.
"""
_dc_qs = params.get("_dc_qs")
if _dc_qs is not None:
# if available, update search params using datacube query-string
_dc_qp = geojson.loads(unquote_plus(unquote_plus(_dc_qs)))
if "/to/" in _dc_qp.get("date", ""):
params[START], params[END] = _dc_qp["date"].split("/to/")
elif "/" in _dc_qp.get("date", ""):
(params[START], params[END],) = _dc_qp[
"date"
].split("/")
elif _dc_qp.get("date"):
params[START] = params[END] = _dc_qp["date"]
if "/" in _dc_qp.get("area", ""):
params["geometry"] = _dc_qp["area"].split("/")
params = {
k.removeprefix(ECMWF_PREFIX).removeprefix(f"{ECMWF_PREFIX[:-1]}_"): v
for k, v in params.items()
if v is not None
}
# read 'start_datetime' and 'end_datetime' from 'date' range
if "date" in params:
start_date, end_date = parse_date(params["date"])
params[START] = format_date(start_date)
params[END] = format_date(end_date)
# adapt end date if it is midnight
if END in params:
end_date_excluded = getattr(self.config, "end_date_excluded", True)
is_datetime = "T" in str(params[END])
end_date = parse_to_utc(params[END])
start_date = parse_to_utc(params[START])
if (
not end_date_excluded
and is_datetime
and end_date > start_date
and end_date
== end_date.replace(hour=0, minute=0, second=0, microsecond=0)
):
end_date += dt.timedelta(days=-1)
params[END] = to_iso_utc_string(end_date)
# geometry
if "geometry" in params:
params["geometry"] = get_geometry_from_various(geometry=params["geometry"])
# check ecmwf geom format if given
# ECMWF Polytope uses non-geojson structure for features
if "feature" in params:
get_geometry_from_ecmwf_feature(params["feature"])
# bounding box in area format
if "area" in params:
get_geometry_from_ecmwf_area(params["area"])
# single location
if "location" in params:
get_geometry_from_ecmwf_location(params["location"])
return params
def _preprocess_indirect_date_parameters(self, params: dict[str, Any]) -> dict:
"""
Compute start_datetime / end_datetime from "date", "time", "year", "month", "day"
"""
indirects: dict[str, Any] = {}
# Validate and collect indirect date parameters
time = validate_datetime_param(
params.get("time"), "time", ["%H%M", "%H:%M", "%H%M%S", "%H:%M:%S"]
)
year = validate_datetime_param(params.get("year"), "year", ["%Y"])
month = validate_datetime_param(params.get("month"), "month", ["%m"])
day = validate_datetime_param(params.get("day"), "day", ["%d"])
if time is not None:
time = time_values_to_hhmm(time)
indirects["time"] = time
if year is not None:
indirects["year"] = year
if month is not None:
indirects["month"] = [f"{int(m):02d}" for m in month]
if day is not None:
indirects["day"] = [f"{int(d):02d}" for d in day]
# Compute date range from "date" param (takes precedence)
date = params.get("date", None)
if date is not None:
try:
start, end = compute_date_range_from_params(date=date, time=time)
indirects["start_datetime"] = start
indirects["end_datetime"] = end
return indirects
except Exception as e:
raise ValidationError(
'Malformed parameter "date" (date given "{}", time given: "{}"): {}'.format(
params.get("date"), params.get("time"), str(e)
)
)
# Compute date range from year/month/day/time params
start, end = compute_date_range_from_params(
year=year,
month=indirects.get("month"),
day=indirects.get("day"),
time=time,
)
if start is not None:
indirects["start_datetime"] = start
if end is not None:
indirects["end_datetime"] = end
return indirects
def _get_collection_queryables(
self, collection: Optional[str], alias: Optional[str], filters: dict[str, Any]
) -> QueryablesDict:
"""Override to set additional_properties to false."""
default_values: dict[str, Any] = deepcopy(
getattr(self.config, "products", {}).get(collection, {})
)
default_values.pop("metadata_mapping", None)
default_values.pop("metadata_mapping_from_product", None)
filters["collection"] = collection
queryables = self.discover_queryables(**{**default_values, **filters}) or {}
return QueryablesDict(additional_properties=False, **queryables)
def _find_dynamic_queryables_config(
self, kwargs: dict[str, Any], dynamic_config: list
) -> dict[str, Any]:
"""Find the appropriate queryables configuration from dynamic configuration.
:param kwargs: Search parameters
:param dynamic_config: List of dynamic discover queryables configurations
:return: Found queryables configuration or empty dict
"""
for dc in dynamic_config:
for cs in dc["collection_selector"]:
field = cs["field"]
if kwargs[field].startswith(cs["prefix"]):
return dc["discover_queryables"]
return {}
def _is_discoverable_metadata_key(self, key: str) -> bool:
"""Check if a key can bypass strict queryables validation via discover_metadata."""
discover_metadata = getattr(self.config, "discover_metadata", None) or {}
if not discover_metadata.get("auto_discovery"):
return False
pattern = discover_metadata.get("metadata_pattern") or ""
try:
return bool(pattern and re.match(pattern, key))
except re.error:
msg = f"Invalid discover_metadata.metadata_pattern for provider {self.provider}: {pattern}"
raise MisconfiguredError(msg)
def discover_queryables(
self,
**kwargs: Any,
) -> Optional[dict[str, Annotated[Any, FieldInfo]]]:
"""Fetch queryables list from provider using its constraints file
:param kwargs: additional filters for queryables (`collection` and other search
arguments)
:returns: fetched queryable parameters dict
"""
collection = kwargs.pop("collection")
col_config = self.get_collection_def_params(collection)
default_values = deepcopy(col_config)
default_values.pop("metadata_mapping", None)
default_values.pop("metadata_mapping_from_product", None)
default_values.pop("discover_queryables", None)
kwargs.pop("discover_queryables", None)
filters = {**default_values, **kwargs}
if "start" in filters:
filters[START] = filters.pop("start")
if "end" in filters:
filters[END] = filters.pop("end")
# extract default datetime and convert geometry
try:
processed_filters = self._preprocess_search_params(deepcopy(filters))
except Exception as e:
raise ValidationError(e.args[0]) from e
# dynamic_discover_queryables for WekeoECMWFSearch
queryables_config = {}
if dynamic_config := getattr(self.config, "dynamic_discover_queryables", []):
queryables_config = self._find_dynamic_queryables_config(
kwargs, dynamic_config
)
provider_dq = getattr(self.config, "discover_queryables", {}) or {}
product_dq = col_config.get("discover_queryables", {}) or {}
dq_conf = {**provider_dq, **product_dq, **queryables_config}
constraints_url = format_metadata(dq_conf.get("constraints_url", ""), **filters)
constraints: list[dict[str, Any]] = self._fetch_data(constraints_url)
form_url = format_metadata(dq_conf.get("form_url", ""), **filters)
form: list[dict[str, Any]] = self._fetch_data(form_url)
formated_filters = self.format_as_provider_keyword(
collection, deepcopy(processed_filters)
)
# we re-apply kwargs input to consider override of year, month, day and time.
for k, v in {**default_values, **kwargs}.items():
key = k.removeprefix(ECMWF_PREFIX).removeprefix(f"{ECMWF_PREFIX[:-1]}_")
if key not in ALLOWED_KEYWORDS | {
START,
END,
"geometry",
} and not self._is_discoverable_metadata_key(key):
raise ValidationError(
f"'{key}' is not a queryable parameter for {self.provider}", {key}
)
formated_filters[key] = v
# we use non empty filters as default to integrate user inputs
# it is needed because pydantic json schema does not represent "value"
# but only "default"
non_empty_formated: dict[str, Any] = {
k: v
for k, v in formated_filters.items()
if v and (not isinstance(v, list) or all(v))
}
required_keywords: set[str] = set()
# calculate available values
if constraints:
# Apply constraints filtering
available_values = self.available_values_from_constraints(
constraints,
non_empty_formated,
form_keywords=[f["name"] for f in form],
)
# Pre-compute the required keywords (present in all constraint dicts)
# when form, required keywords are extracted directly from form
if not form:
required_keywords = set.intersection(
*(map(lambda d: set(d.keys()), constraints))
)
else:
values_url = getattr(self.config, "available_values_url", "")
if not values_url:
return self.queryables_from_metadata_mapping(collection)
if "{" in values_url:
values_url = format_string(None, values_url, **filters)
data = self._fetch_data(values_url)
available_values = data["constraints"]
required_keywords = data.get("required", [])
# To check if all keywords are queryable parameters, we check if they are in the
# available values or the collection config (available values calculated from the
# constraints might not include all queryables)
for keyword in processed_filters:
if (
keyword
not in available_values.keys()
| col_config.keys()
| {
START,
END,
"geometry",
}
and keyword not in [f["name"] for f in form]
and keyword
not in set(list(available_values.keys()) + [f["name"] for f in form])
and not self._is_discoverable_metadata_key(keyword)
):
raise ValidationError("'%s' is not a queryable parameter" % keyword)
# generate queryables
if form:
queryables = self.queryables_by_form(
form,
available_values,
non_empty_formated,
)
else:
queryables = self.queryables_by_values(
available_values, list(required_keywords), non_empty_formated
)
# ecmwf:date is replaced by start and end.
# start and end filters are supported whenever combinations of "year", "month", "day" filters exist
queryable_prefix = f"{ECMWF_PREFIX[:-1]}_"
if (
f"{queryable_prefix}date" in queryables
or f"{queryable_prefix}year" in queryables
or f"{queryable_prefix}hyear" in queryables
):
queryables.update(
{
"start": Queryables.get_with_default(
"start", processed_filters.get(START)
),
"end": Queryables.get_with_default(
"end",
processed_filters.get(END),
),
}
)
# area is geom in EODAG.
if queryables.pop("area", None):
queryables["geom"] = Queryables.get_with_default("geom", None)
return queryables
def available_values_from_constraints(
self,
constraints: list[dict[str, Any]],
input_keywords: dict[str, Any],
form_keywords: list[str],
) -> dict[str, list[str]]:
"""
Filter constraints using input_keywords. Return list of available queryables.
All constraint entries must have the same parameters.
:param constraints: list of constraints received from the provider
:param input_keywords: dict of input parameters given by the user
:param form_keywords: list of keyword names from the provider form endpoint
:return: dict with available values for each parameter
"""
# get ordered constraint keywords
constraints_keywords = list(
OrderedDict.fromkeys(k for c in constraints for k in c.keys())
)
# prepare ordered input keywords formatted as provider's keywords
# required to filter with constraints
ordered_keywords = (
[kw for kw in form_keywords if kw in constraints_keywords]
if form_keywords
else constraints_keywords
)
# filter constraint entries matching input keyword values
filtered_constraints: list[dict[str, Any]]
parsed_keywords: list[str] = []
for keyword in ordered_keywords:
values = input_keywords.get(keyword)
if values is None:
parsed_keywords.append(keyword)
continue
# we only compare list of strings.
if isinstance(values, dict):
raise ValidationError(
f"Parameter value as object is not supported: {keyword}={values}",
{keyword},
)
# We convert every single value to a list of string
filter_v = list(values) if isinstance(values, tuple) else values
filter_v = filter_v if isinstance(filter_v, list) else [filter_v]
filter_v = [str(v) for v in filter_v]
# We strip values of superfluous quotes (added by mapping converter to_geojson).
# ECMWF accept date ranges with /to/. We need to split it to an array
# ECMWF accept date ranges in format val1/val2. We need to split it to an array
date_regex = [
re.compile(p) for p in (DATE_RANGE_PATTERN, COMPACT_DATE_RANGE_PATTERN)
]
is_date = any(
any(r.match(v) is not None for r in date_regex) for v in filter_v
)
if is_date:
sep = re.compile(r"/to/|/")
filter_v = [i for v in filter_v for i in sep.split(str(v))]
# special handling for time 0000 converted to 0 by pre-formating with metadata_mapping
if keyword.split(":")[-1] == "time":
filter_v = ["0000" if str(v) == "0" else v for v in filter_v]
# Collect missing values to report errors
missing_values = set(filter_v)
# Filter constraints and check for missing values
filtered_constraints = []
# True if some constraint is defined for this keyword.
# In other words: if no constraint defines a list of values
# then any value is allowed for this keyword
keyword_constrained = False
for entry in constraints:
# Filter based on the presence of any value in filter_v
entry_values = entry.get(keyword, [])
if entry_values:
keyword_constrained = True
# date constraint may be intervals. We identify intervals with a "/" in the value.
# date constraint can be a mixed list of single values (e.g "2023-06-27")
# and intervals (e.g. "2024-11-12/2025-11-20").
# collections with mixed values: CAMS_GAC_FORECAST, CAMS_EU_AIR_QUALITY_FORECAST
present_values = []
for entry_value in entry_values:
if keyword == "date" and "/" in entry_value:
input_range = values
if isinstance(values, list):
input_range = values[0]
if "/" not in input_range:
input_range = f"{input_range}/{input_range}"
if is_range_in_range(entry_value, input_range):
present_values.extend(filter_v)
else:
new_values = [
value for value in filter_v if value == entry_value
]
present_values.extend(new_values)
# Remove present values from the missing_values set
missing_values -= set(present_values)
if present_values:
filtered_constraints.append(entry)
# raise an error as no constraint entry matched the input keywords
# raise an error if one value from input is not allowed
if keyword_constrained and (not filtered_constraints or missing_values):
allowed_values = list(
{value for c in constraints for value in c.get(keyword, [])}
)
# restore ecmwf: prefix before raising error
keyword = ECMWF_PREFIX + keyword
all_keywords_str = ""
if len(parsed_keywords) > 1:
keywords = [
f"{ECMWF_PREFIX + k}={pk}"
for k in parsed_keywords
if (pk := input_keywords.get(k))
]
all_keywords_str = f" with {', '.join(keywords)}"
raise ValidationError(
f"{keyword}={values} is not available"
f"{all_keywords_str}."
f" Allowed values are {', '.join(allowed_values)}.",
set(
[keyword] + [k for k in parsed_keywords if k in input_keywords]
),
)
parsed_keywords.append(keyword)
# if the keyword is not constrained then any value is allowed
if keyword_constrained:
constraints = filtered_constraints
available_values: dict[str, Any] = {k: set() for k in ordered_keywords}
# we aggregate the constraint entries left
for entry in constraints:
for key, value in entry.items():
available_values[key].update(value)
return {k: list(v) for k, v in available_values.items()}
def queryables_by_form(
self,
form: list[dict[str, Any]],
available_values: dict[str, list[str]],
defaults: dict[str, Any],
) -> dict[str, Annotated[Any, FieldInfo]]:
"""
Generate Annotated field definitions from form entries and available values
Used by Copernicus services like cop_cds, cop_ads, cop_ewds.
:param form: data fetched from the form endpoint of the provider
:param available_values: available values for each parameter
:param defaults: default values for the parameters
:return: dict of annotated queryables
"""
queryables: dict[str, Annotated[Any, FieldInfo]] = {}
required_list: list[str] = []
for element in form:
name: str = element["name"]
# those are not parameter elements.
if name in ("area_group", "global", "warning", "licences"):
continue
if "type" not in element or element["type"] == "FreeEditionWidget":
# FreeEditionWidget used to select the whole available region
# and to provide comments for the dataset
continue
# ordering done by id -> set id to high value if not present -> element will be last
if "id" not in element:
element["id"] = 100
prop = {"title": element.get("label", name)}
details = element.get("details", {})
# add values from form if keyword was not in constraints
values = (
available_values[name]
if name in available_values
else details.get("values")
)
# updates the properties with the values given based on the information from the element
_update_properties_from_element(prop, element, values)
default = defaults.get(name)
if details:
fields = details.get("fields")
if fields and (comment := fields[0].get("comment")):
prop["description"] = comment
if name == "area" and isinstance(default, dict):
default = list(default.values())
# sometimes form returns default as array instead of string
if default and prop.get("type") == "string" and isinstance(default, list):
default = ",".join(default)
is_required: bool
if available_values.get(name):
# required by the filtered constraints (available_values[name] is a not empty list)
is_required = True
elif bool(element.get("required")):
if name in available_values and not available_values[name]:
# not required by the filtered constraints (available_values[name] is an empty list)
is_required = False
else:
# required only by form
is_required = True
else:
# not required by form
is_required = False
if is_required:
required_list.append(name)
formatted_param = ecmwf_format(name, alias=False)
formatted_alias = ecmwf_format(name)
queryables[formatted_param] = Annotated[
get_args(
json_field_definition_to_python(
prop,
default_value=default,
required=is_required,
validation_alias=AliasChoices(formatted_alias, name),
serialization_alias=formatted_alias,
)
)
]
return queryables
def queryables_by_values(
self,
available_values: dict[str, list[str]],
required_keywords: list[str],
defaults: dict[str, Any],
) -> dict[str, Annotated[Any, FieldInfo]]:
"""
Generate Annotated field definitions from available values.
Used by ECMWF data providers like dedt_lumi.
:param available_values: available values for each parameter
:param required_keywords: list of required parameters
:param defaults: default values for the parameters
:return: dict of annotated queryables
"""
# Rename keywords from form with metadata mapping.
# Needed to map constraints like "xxxx" to eodag parameter "ecmwf:xxxx"
required = [ecmwf_format(k) for k in required_keywords] # noqa: F841
queryables: dict[str, Annotated[Any, FieldInfo]] = {}
for name, values in available_values.items():
# Rename keywords from form with metadata mapping.
# Needed to map constraints like "xxxx" to eodag parameter "ecmwf:xxxx"
formatted_param = ecmwf_format(name, alias=False)
formatted_alias = ecmwf_format(name)
queryables[formatted_param] = Annotated[
get_args(
json_field_definition_to_python(
{"type": "string", "title": name, "enum": values},
default_value=defaults.get(name),
required=bool(formatted_alias in required),
validation_alias=AliasChoices(formatted_alias, name),
serialization_alias=formatted_alias,
)
)
]
return queryables
def format_as_provider_keyword(
self, collection: str, properties: dict[str, Any]
) -> dict[str, Any]:
"""Return provider equivalent keyword names from EODAG keywords.
:param collection: collection id
:param properties: dict of properties to be formatted
:return: dict of formatted properties
"""
properties["collection"] = collection
# provider collection specific conf
collection_def_params = self.get_collection_def_params(
collection, format_variables=properties
)
# Add to the query, the queryable parameters set in the provider collection definition
properties.update(
{
k: v
for k, v in collection_def_params.items()
if k not in properties.keys()
and k in self.config.metadata_mapping.keys()
and isinstance(self.config.metadata_mapping[k], list)
}
)
qp, _ = self.build_query_string(collection, properties)
return qp
@instance_cached_method()
def _fetch_data(self, url: str) -> Any:
"""
fetches from a provider elements like constraints or forms.
:param url: url from which the constraints can be fetched
:returns: json file content fetched from the provider
"""
if not url:
return []
auth = (
self.auth
if hasattr(self, "auth") and isinstance(self.auth, AuthBase)
else None
)
timeout = getattr(self.config, "timeout", DEFAULT_SEARCH_TIMEOUT)
return fetch_json(url, auth=auth, timeout=timeout)
def normalize_results(
self, results: RawSearchResult, **kwargs: Any
) -> list[EOProduct]:
"""Build :class:`~eodag.api.product._product.EOProduct` from provider result
:param results: Raw provider result as single dict in list
:param kwargs: Search arguments
:returns: list of single :class:`~eodag.api.product._product.EOProduct`
"""
collection = kwargs.get("collection")
result = results[0]
# datacube query string got from previous search
_dc_qs = kwargs.pop("_dc_qs", None)
if _dc_qs is not None:
qs = unquote_plus(unquote_plus(_dc_qs))
sorted_unpaginated_qp = geojson.loads(qs)
else:
sorted_unpaginated_qp = dict_items_recursive_sort(results.query_params)
# remove unwanted query params
for param in getattr(self.config, "remove_from_query", []):
sorted_unpaginated_qp.pop(param, None)
if result:
properties = result
properties.update(result.pop("eodag:request_params", None) or {})
properties = {k: v for k, v in properties.items() if not k.startswith("__")}
properties["geometry"] = properties.get("area") or DEFAULT_GEOMETRY
start, end = ecmwf_temporal_to_eodag(properties)
properties["start_datetime"] = start
properties["end_datetime"] = end
else:
# use all available query_params to parse properties
result_data: dict[str, Any] = {
**results.collection_def_params,
**sorted_unpaginated_qp,
**{"qs": sorted_unpaginated_qp},
}
# update result with collection_def_params and search args if not None (and not auth)
kwargs.pop("auth", None)
result_data.update(results.collection_def_params)
result_data = {
**result_data,
**{k: v for k, v in kwargs.items() if v is not None},
}
# start_datetime / computed from "date", "time", "year", "month", "day"
indirects = self._preprocess_indirect_date_parameters(result_data)
for key in (START, END):
if result_data.get(key) is None and indirects.get(key) is not None:
result_data[key] = indirects[key]
properties = properties_from_json(
result_data,
self.config.metadata_mapping,
discovery_config=getattr(self.config, "discover_metadata", {}),
)
query_hash = hashlib.sha1(str(result_data).encode("UTF-8")).hexdigest()
properties["title"] = properties["id"] = (
(collection or kwargs.get("dataset", self.provider)).upper()
+ "_ORDERABLE_"
+ query_hash
)
# collection alias (required by opentelemetry-instrumentation-eodag)
if alias := getattr(self.config, "collection_config", {}).get("alias"):
kwargs["collection"] = alias
# Convert ecmwf geometries for properties but keep original in qs
# ECMWF Polytope uses non-geojson structure for features
if "feature" in sorted_unpaginated_qp:
properties["geometry"] = get_geometry_from_ecmwf_feature(
sorted_unpaginated_qp["feature"]
)
properties.pop("feature", None)
# bounding box in area format
if "area" in sorted_unpaginated_qp:
properties["geometry"] = get_geometry_from_ecmwf_area(
sorted_unpaginated_qp["area"]
)
properties.pop("area", None)
# single location
if "location" in sorted_unpaginated_qp:
properties["geometry"] = get_geometry_from_ecmwf_location(
sorted_unpaginated_qp["location"]
)
properties.pop("location", None)
qs = geojson.dumps(sorted_unpaginated_qp)
# used by server mode to generate eodag:download_link href
# TODO: to remove once the legacy server is removed
properties["_dc_qs"] = quote_plus(qs)
product = EOProduct(
provider=self.provider,
properties={ecmwf_format(k): v for k, v in properties.items()},
**kwargs,
)
# backup original register_downloader to register_downloader_only
product.register_downloader_only = product.register_downloader
# patched register_downloader that will also update properties
product.register_downloader = MethodType(patched_register_downloader, product) # type: ignore[method-assign]
return [product]
def count_hits(
self, count_url: Optional[str] = None, result_type: Optional[str] = None
) -> int:
"""Count method that will always return 1.
:param count_url: not used, only here because this method overwrites count_hits from the parent class
:param result_type: not used, only here because this method overwrites count_hits from the parent class
:return: always 1
"""
return 1
def _check_id(product: EOProduct) -> EOProduct:
"""Check if the id is the one of an existing job.
If the job exists, poll it, otherwise, raise an error.
:param product: The product to check the id for
:raises: :class:`~eodag.utils.exceptions.ValidationError`
"""
if not (product_id := product.search_kwargs.get("id")):
return product
if "ORDERABLE" in product_id:
return product
if product.downloader is None:
return product
on_response_mm = getattr(product.downloader.config, "order_on_response", {}).get(
"metadata_mapping", {}
)
if not on_response_mm:
return product
logger.debug(f"Update product properties using given eodag:order_id {product_id}")
on_response_mm_jsonpath = mtd_cfg_as_conversion_and_querypath(
on_response_mm,
)
properties_update = properties_from_json(
{}, {**on_response_mm_jsonpath, **{"eodag:order_id": (None, product_id)}}
)
product.properties.update(
{k: v for k, v in properties_update.items() if v != NOT_AVAILABLE}
)
auth = product.downloader_auth.authenticate() if product.downloader_auth else None
# try to poll the job corresponding to the given id
try:
product.downloader._order_status(product=product, auth=auth) # type: ignore
# when a NotAvailableError is catched, it means the product is not ready and still needs to be polled
except NotAvailableError:
product.properties["order:status"] = STAGING_STATUS
except Exception as e:
if (
isinstance(e, DownloadError) or isinstance(e, ValidationError)
) and "order status could not be checked" in e.args[0]:
raise ValidationError(
f"Requested data is not available on {product.provider} ({product_id})."
) from e
raise ValidationError(e.args[0]) from e
# update product id
product.properties["id"] = product_id
# update collection if needed
if product.collection is None:
product.collection = product.properties.get("ecmwf:dataset")
# update product title
product.properties["title"] = (
(product.collection or product.provider).upper() + "_" + product_id
)
# use NOT_AVAILABLE as fallback collection to avoid using guess_collection
if product.collection is None:
product.collection = NOT_AVAILABLE
return product
def patched_register_downloader(self, downloader, authenticator):
"""Register product donwloader and update properties if searched by id.
:param self: product to which information should be added
:param downloader: The download method that it can use
:class:`~eodag.plugins.download.base.Download` or
:class:`~eodag.plugins.api.base.Api`
:param authenticator: The authentication method needed to perform the download
:class:`~eodag.plugins.authentication.base.Authentication`
"""
# register downloader
self.register_downloader_only(downloader, authenticator)
# and also update properties
_check_id(self)
[docs]
class MeteoblueSearch(ECMWFSearch):
"""MeteoblueSearch search plugin.
This plugin, which inherits from :class:`~eodag.plugins.search.build_search_result.ECMWFSearch`,
performs a POST request and uses its result to build a single :class:`~eodag.api.search_result.SearchResult`
object.
The available configuration parameters are inherited from parent classes, with some a particularity
for pagination for this plugin.
:param provider: An eodag providers configuration dictionary
:param config: Search plugin configuration:
* :attr:`~eodag.config.PluginConfig.pagination` (:class:`~eodag.config.PluginConfig.Pagination`)
(**mandatory**): The configuration of how the pagination is done on the provider. For
this plugin it has the node:
* :attr:`~eodag.config.PluginConfig.Pagination.next_page_query_obj` (``str``): The
additional parameters needed to perform search. These parameters won't be included in
the result. This must be a json dict formatted like ``{{"foo":"bar"}}`` because it
will be passed to a :meth:`str.format` method before being loaded as json.
"""
def collect_search_urls(
self,
prep: PreparedSearch = PreparedSearch(),
**kwargs: Any,
) -> tuple[list[str], int]:
"""Wraps PostJsonSearch.collect_search_urls to force product count to 1
:param prep: :class:`~eodag.plugins.search.PreparedSearch` object containing information for the search
:param kwargs: keyword arguments used in the search
:return: list of search url and number of results
"""
urls, _ = super().collect_search_urls(prep, **kwargs)
return urls, 1
def do_search(
self, prep: PreparedSearch = PreparedSearch(limit=None), **kwargs: Any
) -> RawSearchResult:
"""Perform the actual search request, and return result in a single element.
:param prep: :class:`~eodag.plugins.search.PreparedSearch` object containing information for the search
:param kwargs: keyword arguments to be used in the search
:return: list containing the results from the provider in json format
"""
prep.url = prep.search_urls[0]
prep.info_message = f"Sending search request: {prep.url}"
prep.exception_message = (
f"Skipping error while searching for {self.provider}"
f" {self.__class__.__name__} instance"
)
response = self._request(prep)
raw_search_results = RawSearchResult([response.json()])
raw_search_results.search_params = kwargs
raw_search_results.query_params = prep.query_params
raw_search_results.collection_def_params = prep.collection_def_params
return raw_search_results
def build_query_string(
self, collection: str, query_dict: dict[str, Any]
) -> tuple[dict[str, Any], str]:
"""Build The query string using the search parameters
:param collection: collection id
:param query_dict: keyword arguments to be used in the query string
:return: formatted query params and encode query string
"""
return QueryStringSearch.build_query_string(self, collection, query_dict)
def normalize_results(self, results, **kwargs):
"""Build :class:`~eodag.api.product._product.EOProduct` from provider result
:param results: Raw provider result as single dict in list
:param kwargs: Search arguments
:returns: list of single :class:`~eodag.api.product._product.EOProduct`
"""
collection = kwargs.get("collection")
result = results[0]
# datacube query string got from previous search
_dc_qs = kwargs.pop("_dc_qs", None)
if _dc_qs is not None:
qs = unquote_plus(unquote_plus(_dc_qs))
sorted_unpaginated_query_params = geojson.loads(qs)
else:
next_page_query_obj = orjson.loads(
self.config.pagination["next_page_query_obj"].format()
)
unpaginated_query_params = {
k: v
for k, v in results.query_params.items()
if (k, v) not in next_page_query_obj.items()
}
# query hash, will be used to build a product id
sorted_unpaginated_query_params = dict_items_recursive_sort(
unpaginated_query_params
)
# use all available query_params to parse properties
result = dict(
result,
**sorted_unpaginated_query_params,
qs=sorted_unpaginated_query_params,
)
qs = geojson.dumps(sorted_unpaginated_query_params)
query_hash = hashlib.sha1(str(qs).encode("UTF-8")).hexdigest()
# update result with collection_def_params and search args if not None (and not auth)
kwargs.pop("auth", None)
result.update(results.collection_def_params)
result = dict(result, **{k: v for k, v in kwargs.items() if v is not None})
# parse properties
parsed_properties = properties_from_json(
result,
self.config.metadata_mapping,
discovery_config=getattr(self.config, "discover_metadata", {}),
)
properties = {ecmwf_format(k): v for k, v in parsed_properties.items()}
# collection alias (required by opentelemetry-instrumentation-eodag)
if alias := getattr(self.config, "collection_config", {}).get("alias"):
collection = alias
def slugify(date_str: str) -> str:
return date_str.split("T")[0].replace("-", "")
# build product id
product_id = (collection or self.provider).upper()
start = properties.get(START, NOT_AVAILABLE)
end = properties.get(END, NOT_AVAILABLE)
if start != NOT_AVAILABLE:
product_id += f"_{slugify(start)}"
if end != NOT_AVAILABLE:
product_id += f"_{slugify(end)}"
product_id += f"_{query_hash}"
properties["id"] = properties["title"] = product_id
# used by server mode to generate eodag:download_link href
properties["_dc_qs"] = quote_plus(qs)
product = EOProduct(
provider=self.provider,
collection=collection,
properties=properties,
)
return [
product,
]
[docs]
class WekeoECMWFSearch(ECMWFSearch):
"""
WekeoECMWFSearch search plugin.
This plugin, which inherits from :class:`~eodag.plugins.search.build_search_result.ECMWFSearch`,
performs a POST request and uses its result to build a single :class:`~eodag.api.search_result.SearchResult`
object. In contrast to ECMWFSearch or MeteoblueSearch, the products are only build with information
returned by the provider.
The available configuration parameters are inherited from parent classes, with some a particularity
for pagination for this plugin.
:param provider: An eodag providers configuration dictionary
:param config: Search plugin configuration:
* :attr:`~eodag.config.PluginConfig.pagination` (:class:`~eodag.config.PluginConfig.Pagination`)
(**mandatory**): The configuration of how the pagination is done on the provider. For
this plugin it has the node:
* :attr:`~eodag.config.PluginConfig.Pagination.next_page_query_obj` (``str``): The
additional parameters needed to perform search. These parameters won't be included in
the result. This must be a json dict formatted like ``{{"foo":"bar"}}`` because it
will be passed to a :meth:`str.format` method before being loaded as json.
"""
def normalize_results(
self, results: RawSearchResult, **kwargs: Any
) -> list[EOProduct]:
"""Build :class:`~eodag.api.product._product.EOProduct` from provider result
:param results: Raw provider result as single dict in list
:param kwargs: Search arguments
:returns: list of single :class:`~eodag.api.product._product.EOProduct`
"""
if kwargs.get("id") and "ORDERABLE" not in kwargs["id"]:
# id is order id (only letters and numbers) -> use parent normalize results
return super().normalize_results(results, **kwargs)
# formating of eodag:order_link requires access to the collection value.
results.data = [
{**result, **results.collection_def_params} for result in results
]
normalized = QueryStringSearch.normalize_results(self, results, **kwargs)
if not normalized:
return normalized
# remove unwanted query params
excluded_query_params = getattr(self.config, "remove_from_query", [])
filtered_query_params = {
k: v
for k, v in results.query_params.items()
if k not in excluded_query_params
}
for product in normalized:
properties = {**product.properties, **results.query_params}
properties["_dc_qs"] = quote_plus(orjson.dumps(filtered_query_params))
product.properties = {ecmwf_format(k): v for k, v in properties.items()}
# update product and title the same way as in parent class
splitted_id = product.properties.get("title", "").split("-")
dataset = "_".join(splitted_id[:-1])
query_hash = splitted_id[-1]
product.properties["title"] = product.properties["id"] = (
(product.collection or dataset or self.provider).upper()
+ "_ORDERABLE_"
+ query_hash
)
return normalized
def do_search(
self, prep: PreparedSearch = PreparedSearch(limit=None), **kwargs: Any
) -> RawSearchResult:
"""Should perform the actual search request.
:param args: arguments to be used in the search
:param kwargs: keyword arguments to be used in the search
:return: list containing the results from the provider in json format
"""
if "id" in kwargs and "ORDERABLE" not in kwargs["id"]:
# id is order id (only letters and numbers) -> use parent normalize results.
# No real search. We fake it all, then check order status using given id
raw_search_results = RawSearchResult([{}])
raw_search_results.search_params = kwargs
raw_search_results.query_params = (
prep.query_params if hasattr(prep, "query_params") else {}
)
raw_search_results.collection_def_params = (
prep.collection_def_params
if hasattr(prep, "collection_def_params")
else {}
)
return raw_search_results
else:
return QueryStringSearch.do_search(self, prep, **kwargs)