Source code for eodag.api.product.metadata_mapping

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import ast
import datetime as dt
import json
import logging
import re
from string import Formatter
from typing import TYPE_CHECKING, Any, AnyStr, Callable, Iterator, Optional, Union, cast

import geojson
import orjson
import pyproj
import shapely
from dateutil.relativedelta import relativedelta
from dateutil.tz import tzutc
from jsonpath_ng.jsonpath import Child, JSONPath
from lxml import etree
from lxml.etree import XPathEvalError
from shapely import wkt
from shapely.geometry import LineString, MultiPolygon, Point, Polygon
from shapely.ops import transform

from eodag.api.product._assets import Asset
from eodag.types.queryables import Queryables
from eodag.utils import (
    DEFAULT_PROJ,
    DEFAULT_SHAPELY_GEOMETRY,
    deepcopy,
    dict_items_recursive_apply,
    format_string,
    get_geometry_from_various,
    items_recursive_apply,
    nested_pairs2dict,
    remove_str_array_quotes,
    sanitize,
    string_to_jsonpath,
    update_nested_dict,
)
from eodag.utils.dates import get_timestamp, parse_to_utc, to_iso_utc_string
from eodag.utils.exceptions import ValidationError

if TYPE_CHECKING:
    from collections.abc import Mapping, Sequence

    from shapely.geometry.base import BaseGeometry

    from eodag.config import PluginConfig

logger = logging.getLogger("eodag.product.metadata_mapping")

SEP = r"#"
INGEST_CONVERSION_REGEX = re.compile(
    r"^{(?P<path>[^#]*)" + SEP + r"(?P<converter>[^\d\W]\w*)(\((?P<args>.*)\))*}$"
)
NOT_AVAILABLE = "Not Available"
NOT_MAPPED = "Not Mapped"
ONLINE_STATUS = "succeeded"
STAGING_STATUS = "ordered"
OFFLINE_STATUS = "orderable"
COORDS_ROUNDING_PRECISION = 4
WKT_MAX_LEN = 1600
COMPLEX_QS_REGEX = re.compile(r"^(.+=)?([^=]*)({.+})+([^=&]*)$")
DEFAULT_GEOMETRY = "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))"


def get_metadata_path(
    map_value: Union[str, list[str]],
) -> tuple[Union[list[str], None], str]:
    """Return the jsonpath or xpath to the value of a EO product metadata in a provider
    search result.

    The path is retrieved depending on if the metadata is queryable (the value
    associated to it in the provider search config metadata mapping is a list) or not
    (the value is directly the string corresponding to the path).

    Assume we have the following provider config::

        provider:
            ...
            search:
                ...
                metadata_mapping:
                    platform:
                        - platform
                        - $.properties.platform
                    id: $.properties.id
                    ...
                ...
            ...

    Then the metadata `id` is not queryable for this provider meanwhile `platform`
    is queryable. The first value of the `metadata_mapping.platform` is how the
    eodag search parameter `platform` is interpreted in the
    :class:`~eodag.plugins.search.base.Search` plugin implemented by `provider`, and is
    used when eodag delegates search process to the corresponding plugin.

    :param map_value: The value originating from the definition of `metadata_mapping`
                      in the provider search config. For example, it is the list
                      `['platform', '$.properties.platform']` with the sample
                      above. Or the string `$.properties.id`.
    :returns: Either, None and the path to the metadata value, or a list of converter
             and its args, and the path to the metadata value.
    """
    path = get_metadata_path_value(map_value)
    try:
        match = INGEST_CONVERSION_REGEX.match(path)
    except TypeError as e:
        logger.error("Could not match regex on metadata path '%s'" % str(path))
        raise e
    if match:
        g = match.groupdict()
        return [g["converter"], g["args"]], g["path"]
    return None, path


def get_metadata_path_value(map_value: Union[str, list[str]]) -> str:
    """Get raw metadata path without converter"""
    return map_value[1] if isinstance(map_value, list) else map_value


def get_search_param(map_value: list[str]) -> str:
    """See :func:`~eodag.api.product.metadata_mapping.get_metadata_path`

    :param map_value: The value originating from the definition of `metadata_mapping`
                      in the provider search config
    :returns: The value of the search parameter as defined in the provider config
    """
    # Assume that caller will pass in the value as a list
    return map_value[0]


[docs] def format_metadata(search_param: str, *args: Any, **kwargs: Any) -> str: """Format a string of form ``{<field_name>#<conversion_function>}`` The currently understood converters are: - ``ceda_collection_name``: generate a CEDA collection name from a string - ``wekeo_to_cop_collection``: converts the name of a collection from the WEkEO format to the Copernicus format - ``csv_list``: convert to a comma separated list - ``datetime_to_timestamp_milliseconds``: converts a utc date string to a timestamp in milliseconds - ``dict_filter_and_sub``: filter dict items using jsonpath and then apply recursive_sub_str - ``dict_with_roles``: keep only dict items with given roles in their "roles" list - ``fake_l2a_title_from_l1c``: used to generate SAFE format metadata for data from AWS - ``from_alternate``: update assets using given alternate - ``from_ewkt``: convert EWKT to shapely geometry / WKT in DEFAULT_PROJ - ``from_georss``: convert GeoRSS to shapely geometry / WKT in DEFAULT_PROJ - ``get_ecmwf_time``: get the time of a datetime string in the ECMWF format - ``get_group_name``: get the matching regex group name - ``literalize_unicode``: convert a string to its raw Unicode literal form - ``not_available``: replace value with "Not Available" - ``recursive_sub_str``: recursively substitue in the structure (e.g. dict) values matching a regex - ``remove_extension``: on a string that contains dots, only take the first part of the list obtained by splitting the string on dots - ``replace_str``: execute "string".replace(old, new) - ``replace_str_tuple``: apply multiple replacements on a string (parts or complete) - ``replace_tuple``: apply multiple replacements matching whole value - ``s2msil2a_title_to_aws_productinfo``: used to generate SAFE format metadata for data from AWS - ``sanitize``: sanitize string - ``slice_str``: slice a string (equivalent to s[start, end, step]) - ``split``: split a string using given separator - ``split_cop_dem_id``: get the bbox by splitting the product id - ``split_corine_id``: get the collection by splitting the product id - ``to_bounds_lists``: convert to list(s) of bounds - ``to_datetime_dict``: convert a datetime string to a dictionary where values are either a string or a list - ``to_ewkt``: convert to EWKT (Extended Well-Known text) - ``to_geojson``: convert to a GeoJSON (via __geo_interface__ if exists) - ``to_geojson_polytope``: convert shapely Point/LineString/Polygon to ECMWF polytope feature dicts - ``to_iso_date``: remove the time part of a iso datetime string - ``to_iso_utc_datetime_from_milliseconds``: convert a utc timestamp in given milliseconds to a utc iso datetime - ``to_iso_utc_datetime``: convert a UTC datetime string to ISO UTC datetime string - ``to_lower``: Convert a string to lowercase - ``to_nwse_bounds_str``: convert to North,West,South,East bounds string with given separator - ``to_nwse_bounds``: convert to North,West,South,East bounds - ``to_rounded_wkt``: simplify the WKT of a geometry - ``to_title``: Convert a string to title case - ``to_upper``: Convert a string to uppercase :param search_param: The string to be formatted :param args: (optional) Additional arguments to use in the formatting process :param kwargs: (optional) Additional named-arguments to use when formatting :returns: The formatted string """ class MetadataFormatter(Formatter): CONVERSION_REGEX = re.compile( r"^(?P<field_name>.+)" + SEP + r"(?P<converter>[^\d\W]\w*)(\((?P<args>.*)\))*$" ) def __init__(self) -> None: self.custom_converter: Optional[Callable] = None self.custom_args: Optional[str] = None def parse(self, format_string: str): """ Rewrite field names in the template before the base parser sees them. Replaces `{foo:bar}` with `{foo__bar}`. """ pattern = re.compile(r"{([^{}]+)}") def rewrite_field(field: str) -> str: # If there's a format spec (e.g., {foo:bar:.2f}), preserve it if ":" in field and not field.lstrip().startswith(("!", ".", ":")): before_colon, *after = field.split(":") # Don't confuse format spec with field name colons if len(after) == 1 and "." in after[0]: # It's a format specifier, leave it return field return field.replace(":", "__", 1) return field # Replace in string (but not in format_spec itself) safe_template = pattern.sub( lambda m: "{" + rewrite_field(m.group(1)) + "}", format_string ) # Yield from base class yield from super().parse(safe_template) def get_value( self, key: Any, args: "Sequence[Any]", kwargs: "Mapping[str, Any]" ) -> Any: """ Look up rewritten field name in kwargs by converting __ back to : """ if isinstance(key, str): original_key = key.replace("__", ":") result = kwargs.get(original_key) if result is not None: return result key_with_COLON = key.replace("__", "_COLON_") return kwargs.get(key_with_COLON) return super().get_value(key, args, kwargs) def get_field(self, field_name: str, args: Any, kwargs: Any) -> Any: conversion_func_spec = self.CONVERSION_REGEX.match(field_name) # Register a custom converter if any for later use (see convert_field) # This is done because we don't have the value associated to field_name at # this stage if conversion_func_spec: field_name = conversion_func_spec.groupdict()["field_name"] converter = conversion_func_spec.groupdict()["converter"] self.custom_args = conversion_func_spec.groupdict()["args"] # converts back "_COLON_" to ":" if self.custom_args is not None and "_COLON_" in self.custom_args: self.custom_args = self.custom_args.replace("_COLON_", ":") self.custom_converter = getattr(self, "convert_{}".format(converter)) return super(MetadataFormatter, self).get_field(field_name, args, kwargs) def convert_field(self, value: Any, conversion: Any) -> Any: # Do custom conversion if any (see get_field) if self.custom_converter is not None: if self.custom_args is not None and value is not None: converted = self.custom_converter(value, self.custom_args) elif value is not None: converted = self.custom_converter(value) else: converted = None # Clear this state variable in case the same converter is used to # resolve other named arguments self.custom_converter = None self.custom_args = None return converted return super(MetadataFormatter, self).convert_field(value, conversion) @staticmethod def convert_datetime_to_timestamp_milliseconds(date_time: str) -> int: """Convert a date_time (str) to a Unix timestamp in milliseconds "2021-04-21T18:27:19.123Z" => "1619029639123" "2021-04-21" => "1618963200000" "2021-04-21T00:00:00+02:00" => "1618956000000" """ return int(1e3 * get_timestamp(date_time)) @staticmethod def convert_to_iso_utc_datetime_from_milliseconds( timestamp: int, ) -> Union[str, int]: """Convert a timestamp in milliseconds (int) to its ISO8601 UTC format 1619029639123 => "2021-04-21T18:27:19.123Z" """ try: return cast( str, to_iso_utc_string( dt.datetime.fromtimestamp(timestamp / 1e3, tzutc()) ), ) except TypeError: return timestamp @staticmethod def convert_to_iso_utc_datetime( date_time: str, timespec: str = "milliseconds" ) -> str: """Convert a date_time (str) to its ISO 8601 representation in UTC "2021-04-21" => "2021-04-21T00:00:00.000Z" "2021-04-21T00:00:00.000+02:00" => "2021-04-20T22:00:00.000Z" The optional argument timespec specifies the number of additional terms of the time to include. Valid options are 'auto', 'hours', 'minutes', 'seconds', 'milliseconds' and 'microseconds'. """ try: parsed_dt = parse_to_utc(date_time) except ValidationError: return date_time return parsed_dt.isoformat(timespec=timespec).replace("+00:00", "Z") @staticmethod def convert_to_iso_date( datetime_string: str, time_delta_args_str: str = "0,0,0,0,0,0,0" ) -> str: """Convert an ISO8601 datetime (str) to its ISO8601 date format "2021-04-21T18:27:19.123Z" => "2021-04-21" "2021-04-21" => "2021-04-21" "2021-04-21T00:00:00+06:00" => "2021-04-20" ! """ parsed_dt = parse_to_utc(datetime_string) time_delta_args = ast.literal_eval(time_delta_args_str) parsed_dt += dt.timedelta(*time_delta_args) return parsed_dt.isoformat()[:10] @staticmethod def convert_to_non_separated_date(datetime_string): iso_date = MetadataFormatter.convert_to_iso_date(datetime_string) return iso_date.replace("-", "") @staticmethod def convert_to_rounded_wkt(value: BaseGeometry) -> str: wkt_value = cast( str, wkt.dumps(value, rounding_precision=COORDS_ROUNDING_PRECISION) ) # If needed, simplify WKT to prevent too long request failure tolerance = 0.1 while len(wkt_value) > WKT_MAX_LEN and tolerance <= 1: logger.debug( "Geometry WKT is too long (%s), trying to simplify it with tolerance %s", len(wkt_value), tolerance, ) wkt_value = cast( str, wkt.dumps( value.simplify(tolerance), rounding_precision=COORDS_ROUNDING_PRECISION, ), ) tolerance += 0.1 if len(wkt_value) > WKT_MAX_LEN and tolerance > 1: logger.warning("Failed to reduce WKT length lower than %s", WKT_MAX_LEN) return wkt_value @staticmethod def convert_to_bounds_lists(input_geom: BaseGeometry) -> list[list[float]]: if isinstance(input_geom, MultiPolygon): geoms = [geom for geom in input_geom.geoms] # sort with larger one at first (stac-browser only plots first one) geoms.sort(key=lambda x: x.area, reverse=True) return [list(x.bounds[0:4]) for x in geoms] else: return [list(input_geom.bounds[0:4])] @staticmethod def convert_to_bounds(input_geom_unformatted: Any) -> list[float]: input_geom = get_geometry_from_various(geometry=input_geom_unformatted) if input_geom is None: input_geom = DEFAULT_SHAPELY_GEOMETRY if isinstance(input_geom, MultiPolygon): geoms = [geom for geom in input_geom.geoms] # sort with larger one at first (stac-browser only plots first one) geoms.sort(key=lambda x: x.area, reverse=True) min_lon = 180.0 min_lat = 90.0 max_lon = -180.0 max_lat = -90.0 for geom in geoms: min_lon = min(min_lon, geom.bounds[0]) min_lat = min(min_lat, geom.bounds[1]) max_lon = max(max_lon, geom.bounds[2]) max_lat = max(max_lat, geom.bounds[3]) return [min_lon, min_lat, max_lon, max_lat] else: return list(input_geom.bounds[0:4]) @staticmethod def convert_to_nwse_bounds(input_geom: BaseGeometry) -> list[float]: if isinstance(input_geom, str): input_geom = shapely.wkt.loads(input_geom) return list(input_geom.bounds[-1:] + input_geom.bounds[:-1]) @staticmethod def convert_to_nwse_bounds_str( input_geom: BaseGeometry, separator: str = "," ) -> str: return separator.join( str(x) for x in MetadataFormatter.convert_to_nwse_bounds(input_geom) ) @staticmethod def convert_to_geojson(value: Any) -> str: return geojson.dumps(value) @staticmethod def convert_to_geojson_polytope( value: BaseGeometry, ) -> Union[dict[Any, Any], str]: """Convert a shapely Point/LineString/Polygon to ECMWF polytope feature dicts""" # ECMWF Polytope uses non-geojson structure for features if isinstance(value, Polygon): return { "type": "polygon", "shape": [[y, x] for x, y in value.exterior.coords], } if isinstance(value, Point): return {"type": "position", "points": [[value.y, value.x]]} if isinstance(value, LineString): return { "type": "trajectory", "points": [[y, x] for x, y in value.coords], "inflation": 0, } raise ValidationError( "to_geojson_polytope only accepts shapely Polygon, Point and LineString" ) @staticmethod def convert_from_ewkt(ewkt_string: str) -> Union[BaseGeometry, str]: """Convert EWKT (Extended Well-Known text) to shapely geometry""" ewkt_regex = re.compile( r"^.*(?P<proj>SRID=[0-9]+);(?P<wkt>[A-Z0-9 \(\),\.-]+).*$" ) ewkt_match = ewkt_regex.match(ewkt_string) if ewkt_match: g = ewkt_match.groupdict() from_proj = g["proj"].replace("SRID", "EPSG").replace("=", ":") input_geom = wkt.loads(g["wkt"]) from_proj = pyproj.CRS(from_proj) to_proj = pyproj.CRS(DEFAULT_PROJ) if from_proj != to_proj: # reproject project = pyproj.Transformer.from_crs( from_proj, to_proj, always_xy=True ).transform return transform(project, input_geom) else: return input_geom else: logger.warning(f"Could not read {ewkt_string} as EWKT") return ewkt_string @staticmethod def convert_to_ewkt(input_geom: BaseGeometry) -> str: """Convert shapely geometry to EWKT (Extended Well-Known text)""" proj = DEFAULT_PROJ.upper().replace("EPSG", "SRID").replace(":", "=") wkt_geom = MetadataFormatter.convert_to_rounded_wkt(input_geom) return f"{proj};{wkt_geom}" @staticmethod def convert_from_georss(georss: Any) -> Union[BaseGeometry, Any]: """Convert GeoRSS to shapely geometry""" if "polygon" in georss.tag: # Polygon coords_list = georss.text.split() polygon_args = [ (float(coords_list[2 * i]), float(coords_list[2 * i + 1])) for i in range(int(len(coords_list) / 2)) ] return Polygon(polygon_args) elif len(georss) == 1 and "multisurface" in georss[0].tag.lower(): # Multipolygon from_proj = getattr(georss[0], "attrib", {}).get("srsName") if from_proj: from_proj = pyproj.CRS(from_proj) to_proj = pyproj.CRS(DEFAULT_PROJ) project = pyproj.Transformer.from_crs( from_proj, to_proj, always_xy=True ).transform # function to get deepest elements def flatten_elements(nested) -> Iterator[Any]: for e in nested: if len(e) > 0: yield from flatten_elements(e) else: yield e polygons_list: list[Polygon] = [] for elem in flatten_elements(georss[0]): coords_list = elem.text.split() polygon_args = [ (float(coords_list[2 * i]), float(coords_list[2 * i + 1])) for i in range(int(len(coords_list) / 2)) ] polygon = Polygon(polygon_args) # reproject if needed if from_proj and from_proj != to_proj: polygons_list.append(transform(project, polygon)) else: polygons_list.append(polygon) return MultiPolygon(polygons_list) else: logger.warning( f"Incoming GeoRSS format not supported yet: {str(georss)}" ) return georss @staticmethod def convert_to_longitude_latitude( input_geom_unformatted: Any, ) -> dict[str, float]: bounds = MetadataFormatter.convert_to_bounds(input_geom_unformatted) lon = (bounds[0] + bounds[2]) / 2 lat = (bounds[1] + bounds[3]) / 2 return {"lon": lon, "lat": lat} @staticmethod def convert_csv_list(values_list: Any, separator=",") -> Any: if isinstance(values_list, list): return separator.join([str(x) for x in values_list]) else: return values_list @staticmethod def convert_remove_extension(string: str) -> str: parts = string.split(".") if parts: return parts[0] return "" @staticmethod def convert_get_group_name(string: str, pattern: str) -> str: sanitized_pattern = pattern.replace(" ", "_SPACE_") try: match = re.search(sanitized_pattern, str(string)) if match: if result := match.lastgroup: return result.replace("_SPACE_", " ") else: return NOT_AVAILABLE except AttributeError: pass logger.warning( "Could not extract property from %s using %s", string, pattern ) return NOT_AVAILABLE @staticmethod def convert_replace_str(value: Any, args: str) -> str: if isinstance(value, dict): value = MetadataFormatter.convert_to_geojson(value) elif not isinstance(value, str): raise TypeError( f"convert_replace_str expects a string or a dict (apply to_geojson). Got {type(value)}: {value}" ) old, new = ast.literal_eval(args) return re.sub(old, new, value) @staticmethod def convert_replace_str_tuple( value: Union[str, dict[Any, Any]], args: str ) -> str: """ Apply multiple replacements on a string (parts or complete). :param value: input string or dict. :param args: string representing a list/tuple of (old, new) pairs, like ``'(("old1", "new1"), ("old2", "new2"))'`` """ if isinstance(value, dict): value = MetadataFormatter.convert_to_geojson(value) elif not isinstance(value, str): raise TypeError( f"convert_replace_str_tuple expects a string or a dict (apply to_geojson). " f"Got {type(value)}: {value}" ) # args sera une chaîne représentant une liste/tuple de tuples replacements = ast.literal_eval(args) if not isinstance(replacements, (list, tuple)): raise TypeError( f"convert_replace_str_tuple expects a list/tuple of (old,new) pairs. " f"Got {type(replacements)}: {replacements}" ) for old, new in replacements: value = re.sub(old, new, value) return value @staticmethod def convert_replace_tuple(value: Any, args: str) -> Any: """ Apply multiple replacements matching whole value. :param value: input to replace :param args: string representing a list/tuple of (old, new) pairs, like ``'((["old1"], "new1"), ("old2", ["new2"]))'`` """ # args sera une chaîne représentant une liste/tuple de tuples replacements = ast.literal_eval(args) if not isinstance(replacements, (list, tuple)): raise TypeError( f"convert_replace_str_tuple expects a list/tuple of (old,new) pairs. " f"Got {type(replacements)}: {replacements}" ) for old, new in replacements: if old == value: return new return value @staticmethod def convert_not_available(value: Any) -> str: """Convert any value to "Not Available". This is more useful than "$.null" to keep original jsonpath while parsing in metadata_mapping. """ return NOT_AVAILABLE @staticmethod def convert_split(value: str, separator: str) -> list[str]: """Split a string using given separator""" if value == NOT_AVAILABLE: return [NOT_AVAILABLE] if not isinstance(value, str): logger.warning( "Could not split non-string value %s (type %s)", value, type(value) ) return [NOT_AVAILABLE] if not isinstance(separator, str): logger.warning( "Could not split string using non-string separator %s (type %s)", separator, type(separator), ) return [NOT_AVAILABLE] return value.split(separator) @staticmethod def convert_ceda_collection_name(value: str) -> str: data_regex = re.compile(r"/data/(?P<name>.+?)/?$") match = data_regex.search(value) if match: return match.group("name").replace("/", "_").upper() return NOT_AVAILABLE @staticmethod def convert_literalize_unicode(value: str) -> str: if value == NOT_AVAILABLE: return value return value.encode("raw_unicode_escape").decode("utf-8") @staticmethod def convert_recursive_sub_str( input_obj: Union[dict[Any, Any], list[Any]], args: str ) -> Union[dict[Any, Any], list[Any]]: old, new = ast.literal_eval(args) return items_recursive_apply( input_obj, lambda k, v, x, y: re.sub(x, y, v) if isinstance(v, str) else v, **{"x": old, "y": new}, ) @staticmethod def convert_dict_update( input_dict: dict[Any, Any], args: str ) -> dict[Any, Any]: """Converts""" new_items_list = ast.literal_eval(args) new_items_dict = nested_pairs2dict(new_items_list) return dict(input_dict, **new_items_dict) @staticmethod def convert_dict_filter( input_dict: dict[Any, Any], jsonpath_filter_str: str ) -> dict[Any, Any]: """Fitlers dict items using jsonpath""" jsonpath_filter = string_to_jsonpath(jsonpath_filter_str, force=True) if isinstance(jsonpath_filter, str) or not isinstance(input_dict, dict): return {} keys_list = list(input_dict.keys()) matches = jsonpath_filter.find(input_dict) result = {} for match in matches: # extract key index from matched jsonpath matched_jsonpath_str = str(match.full_path) matched_index = int(matched_jsonpath_str.split(".")[-1][1:-1]) key = keys_list[matched_index] result[key] = match.value return result @staticmethod def convert_dict_filter_and_sub( input_dict: dict[Any, Any], args: str ) -> Union[dict[Any, Any], list[Any]]: """Fitlers dict items using jsonpath and then apply recursive_sub_str""" jsonpath_filter_str, old, new = ast.literal_eval(args) filtered = MetadataFormatter.convert_dict_filter( input_dict, jsonpath_filter_str ) args_str = f"('{old}', '{new}')" return MetadataFormatter.convert_recursive_sub_str(filtered, args_str) @staticmethod def convert_dict_with_roles( input_dict: dict[Any, Any], roles_str: str ) -> dict[Any, Any]: """Keep only dict items with given roles in their "roles" list""" roles = ast.literal_eval(roles_str) if not isinstance(roles, (list, tuple)): raise TypeError( f"convert_keep_dict_with_roles expects a list/tuple of roles. Got {type(roles)}: {roles}" ) result = {} for k, v in input_dict.items(): if not isinstance(v, dict): continue item_roles = v.get("roles", []) if any(role in item_roles for role in roles): result[k] = v return result @staticmethod def convert_from_alternate( input_obj: dict[str, Any], value: str ) -> dict[str, Any]: """ Update assets using given alternate. """ result: dict[str, Any] = {} for k, v in input_obj.items(): if not isinstance(v, dict): continue alt_dict = deepcopy(v).get("alternate") if not isinstance(alt_dict, dict): continue value_entry = alt_dict.pop(value, None) if not isinstance(value_entry, dict): continue result[k] = v | value_entry | {"alternate": alt_dict} if len(result[k]["alternate"]) == 0: del result[k]["alternate"] return result @staticmethod def convert_slice_str(string: str, args: str) -> str: cmin, cmax, cstep = [ int(x.strip()) if x.strip().lstrip("-").isdigit() else None for x in args.split(",") ] return string[cmin:cmax:cstep] or NOT_AVAILABLE @staticmethod def convert_to_lower(string: str) -> str: """Convert a string to lowercase.""" if string == NOT_AVAILABLE: return string return string.lower() @staticmethod def convert_to_upper(string: str) -> str: """Convert a string to uppercase.""" return string.upper() @staticmethod def convert_to_title(string: str) -> str: """Convert a string to title case.""" if string == NOT_AVAILABLE: return string return string.title() @staticmethod def convert_fake_l2a_title_from_l1c(string: str) -> str: id_regex = re.compile( r"^(?P<id1>\w+)_(?P<id2>\w+)_(?P<id3>\w+)_(?P<id4>\w+)_(?P<id5>\w+)_(?P<id6>\w+)_(?P<id7>\w+)$" ) id_match = id_regex.match(string) if id_match: id_dict = id_match.groupdict() return "%s_MSIL2A_%s____________%s________________" % ( id_dict["id1"], id_dict["id3"], id_dict["id6"], ) else: logger.error("Could not extract fake title from %s" % string) return NOT_AVAILABLE @staticmethod def convert_s2msil2a_title_to_aws_productinfo(string: str) -> str: id_regex = re.compile( r"^(?P<id1>\w+)_(?P<id2>\w+)_(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})T[0-9]+_" + r"(?P<id4>[A-Z0-9_]+)_(?P<id5>[A-Z0-9_]+)_T(?P<tile1>[0-9]{2})(?P<tile2>[A-Z])(?P<tile3>[A-Z]{2})_" + r"(?P<id7>[A-Z0-9_]+)$" ) id_match = id_regex.match(string) if id_match: id_dict = id_match.groupdict() return ( "https://roda.sentinel-hub.com/sentinel-s2-l2a/tiles/%s/%s/%s/%s/%s/%s/0/{_collection}.json" % ( id_dict["tile1"], id_dict["tile2"], id_dict["tile3"], id_dict["year"], int(id_dict["month"]), int(id_dict["day"]), ) ) else: logger.error("Could not extract title infos from %s" % string) return NOT_AVAILABLE @staticmethod def convert_split_id_into_s3_params(product_id: str) -> dict[str, str]: parts: list[str] = re.split(r"_(?!_)", product_id) params = {"collection": product_id[4:15]} dates = re.findall("[0-9]{8}T[0-9]{6}", product_id) start_date = dt.datetime.strptime(dates[0], "%Y%m%dT%H%M%S") - dt.timedelta( seconds=1 ) # cast to tell the type checker that value won't be None here params["startDate"] = cast(str, to_iso_utc_string(start_date)) end_date = dt.datetime.strptime(dates[1], "%Y%m%dT%H%M%S") + dt.timedelta( seconds=1 ) params["endDate"] = cast(str, to_iso_utc_string(end_date)) params["timeliness"] = parts[-2] params["sat"] = "Sentinel-" + parts[0][1:] return params @staticmethod def convert_dates_from_cmems_id(product_id: str): date_format_1 = "[0-9]{10}" date_format_2 = "[0-9]{8}" dates = re.findall(date_format_1, product_id) if dates: date = dates[0] else: dates = re.findall(date_format_2, product_id) date = dates[0] if len(date) == 10: date_time = dt.datetime.strptime(dates[0], "%Y%m%d%H") else: date_time = dt.datetime.strptime(dates[0], "%Y%m%d") return { "min_date": to_iso_utc_string(date_time), "max_date": to_iso_utc_string(date_time + dt.timedelta(days=1)), } @staticmethod def convert_to_datetime_dict( date: str, format: str ) -> dict[str, Union[list[str], str]]: """Convert a date (str) to a dictionary where values are in the format given in argument date == "2021-04-21T18:27:19.123Z" and format == "list" => { "year": ["2021"], "month": ["04"], "day": ["21"], "hour": ["18"], "minute": ["27"], "second": ["19"], } date == "2021-04-21T18:27:19.123Z" and format == "string" => { "year": "2021", "month": "04", "day": "21", "hour": "18", "minute": "27", "second": "19", } date == "2021-04-21" and format == "list" => { "year": ["2021"], "month": ["04"], "day": ["21"], "hour": ["00"], "minute": ["00"], "second": ["00"], } """ utc_date = MetadataFormatter.convert_to_iso_utc_datetime(date) date_object = parse_to_utc(utc_date) if format == "list": return { "year": [date_object.strftime("%Y")], "month": [date_object.strftime("%m")], "day": [date_object.strftime("%d")], "hour": [date_object.strftime("%H")], "minute": [date_object.strftime("%M")], "second": [date_object.strftime("%S")], } else: return { "year": date_object.strftime("%Y"), "month": date_object.strftime("%m"), "day": date_object.strftime("%d"), "hour": date_object.strftime("%H"), "minute": date_object.strftime("%M"), "second": date_object.strftime("%S"), } @staticmethod def convert_interval_to_datetime_dict( date: str, separator: str = "/" ) -> dict[str, list[str]]: """Convert a date interval ('/' separated str) to a dictionary where values are lists date == "2021-04-21/2021-04-22" => { "year": ["2021"], "month": ["04"], "day": ["21", "22"], } """ if separator not in date: raise ValueError( f"Could not format {date} using convert_interval_to_datetime_dict: {separator} separator missing" ) start, end = date.split(separator) start_utc_date = MetadataFormatter.convert_to_iso_utc_datetime(start) end_utc_date = MetadataFormatter.convert_to_iso_utc_datetime(end) start_date_object = parse_to_utc(start_utc_date) if end_utc_date == "None": end_utc_date = start_utc_date end_date_object = parse_to_utc(end_utc_date) delta_utc_date = end_date_object - start_date_object years = set() months = set() days = set() for i in range(delta_utc_date.days + 1): date_object = start_date_object + dt.timedelta(days=i) years.add(date_object.strftime("%Y")) months.add(date_object.strftime("%m")) days.add(date_object.strftime("%d")) return { "year": list(years), "month": list(months), "day": list(days), } @staticmethod def convert_get_ecmwf_time(date: str) -> list[str]: """Get the time of a date (str) in the ECMWF format (["HH:00"]) "2021-04-21T18:27:19.123Z" => ["18:00"] "2021-04-21" => ["00:00"] """ return [ str(MetadataFormatter.convert_to_datetime_dict(date, "str")["hour"]) + ":00" ] @staticmethod def convert_sanitize(text: str) -> str: """Sanitize string""" return sanitize(text) @staticmethod def convert_get_dates_from_string(text: str, split_param="-"): reg = "[0-9]{8}" + split_param + "[0-9]{8}" match = re.search(reg, text) if not match: return NOT_AVAILABLE dates_str = match.group() dates = dates_str.split(split_param) start_date = dt.datetime.strptime(dates[0], "%Y%m%d") end_date = dt.datetime.strptime(dates[1], "%Y%m%d") return { "startDate": to_iso_utc_string(start_date), "endDate": to_iso_utc_string(end_date), } @staticmethod def convert_get_hydrological_year(date: str): utc_date = MetadataFormatter.convert_to_iso_utc_datetime(date) date_object = parse_to_utc(utc_date) date_object_second_year = date_object + relativedelta(years=1) return [ f"{date_object.strftime('%Y')}_{date_object_second_year.strftime('%y')}" ] @staticmethod def convert_get_variables_from_path(path: str): if "?" not in path: return [] variables = path.split("?")[1] return variables.split(",") @staticmethod def convert_assets_list_to_dict( assets_list: list[dict[str, str]], asset_name_key: str = "title" ) -> dict[str, dict[str, str]]: """Convert a list of assets to a dictionary where keys represent name of assets and are found among values of asset dictionaries. assets_list == [ {"href": "foo", "title": "asset1", "name": "foo-name"}, {"href": "bar", "title": "path/to/asset1", "name": "bar-name"}, {"href": "baz", "title": "path/to/asset2", "name": "baz-name"}, {"href": "qux", "title": "asset3", "name": "qux-name"}, ] and asset_name_key == "title" => { "asset1": {"href": "foo", "title": "asset1", "name": "foo-name"}, "path/to/asset1": {"href": "bar", "title": "path/to/asset1", "name": "bar-name"}, "asset2": {"href": "baz", "title": "path/to/asset2", "name": "baz-name"}, "asset3": {"href": "qux", "title": "asset3", "name": "qux-name"}, } assets_list == [ {"href": "foo", "title": "foo-title", "name": "asset1"}, {"href": "bar", "title": "bar-title", "name": "path/to/asset1"}, {"href": "baz", "title": "baz-title", "name": "path/to/asset2"}, {"href": "qux", "title": "qux-title", "name": "asset3"}, ] and asset_name_key == "name" => { "asset1": {"href": "foo", "title": "foo-title", "name": "asset1"}, "path/to/asset1": {"href": "bar", "title": "bar-title", "name": "path/to/asset1"}, "asset2": {"href": "baz", "title": "baz-title", "name": "path/to/asset2"}, "asset3": {"href": "qux", "title": "qux-title", "name": "asset3"}, } """ asset_names: list[str] = [] assets_dict: dict[str, dict[str, str]] = {} for asset in assets_list: asset_name = asset[asset_name_key] asset_names.append(asset_name) assets_dict[asset_name] = asset # we only keep the equivalent of the path basename in the case where the # asset name has a path pattern and this basename is only found once immutable_asset_indexes: list[int] = [] for i, asset_name in enumerate(asset_names): if i in immutable_asset_indexes: continue change_asset_name = True asset_basename = asset_name.split("/")[-1] j = i + 1 while change_asset_name and j < len(asset_names): asset_tmp_basename = asset_names[j].split("/")[-1] if asset_basename == asset_tmp_basename: change_asset_name = False immutable_asset_indexes.extend([i, j]) j += 1 if change_asset_name: assets_dict[asset_basename] = assets_dict.pop(asset_name) return assets_dict @staticmethod def convert_wekeo_to_cop_collection(val: str, prefix: str) -> str: """Converts the name of a collection from the WEkEO format to the Copernicus format.""" return val.removeprefix(prefix).lower().replace("_", "-") # if stac extension colon separator `:` is in search params, parse it to prevent issues with vformat if re.search(r"{[\w-]*:[\w#-]*\(?.*}", search_param): search_param = re.sub( r"{([\w-]*):([\w#-]*\(?.*)}", r"{\1_COLON_\2}", search_param, ) kwargs = {k.replace(":", "_COLON_"): v for k, v in kwargs.items()} # convert colons `:` in the parameters passed to the converter (e.g. 'foo#boo(fun:with:colons)') if re.search(r"{[\w-]*#[\w-]*\([^)]*:.*}", search_param): search_param = re.sub( r"({[\w-]*#[\w-]*)\(([^)]*)(.*})", lambda m: m.group(1) + "(" + m.group(2).replace(":", "_COLON_") + m.group(3), search_param, ) return MetadataFormatter().vformat(search_param, args, kwargs)
def properties_from_json( json: dict[str, Any], mapping: dict[str, Any], discovery_config: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: """Extract properties from a provider json result. :param json: The representation of a provider result as a json object :param mapping: A mapping between :class:`~eodag.api.product._product.EOProduct`'s metadata keys and the location of the values of these properties in the json representation, expressed as a `jsonpath <http://goessner.net/articles/JsonPath/>`_ :param discovery_config: (optional) metadata discovery configuration dict, accepting among other items `discovery_pattern` (Regex pattern for metadata key discovery, e.g. "^[a-zA-Z]+$"), `discovery_path` (String representation of jsonpath) :returns: The metadata of the :class:`~eodag.api.product._product.EOProduct` """ extracted_value: Any properties: dict[str, Any] = {} templates = {} used_jsonpaths = [] for metadata, value in mapping.items(): # Treat the case when the value is from a queryable metadata if isinstance(value, list): conversion_or_none, path_or_text = value[1] else: conversion_or_none, path_or_text = value if isinstance(path_or_text, str): if re.search(r"{[^{}]+}", path_or_text): templates[metadata] = path_or_text else: properties[metadata] = path_or_text else: try: match = path_or_text.find(json) except KeyError: match = [] if len(match) == 0: extracted_value = NOT_AVAILABLE elif len(match) == 1: extracted_value = match[0].value used_jsonpaths.append(match[0].full_path) else: extracted_value = [m.value for m in match] if extracted_value is None: properties[metadata] = None else: if conversion_or_none is None: properties[metadata] = extracted_value else: # reformat conversion_or_none as metadata#converter(args) or metadata#converter if ( len(conversion_or_none) > 1 and isinstance(conversion_or_none, list) and conversion_or_none[1] is not None ): conversion_or_none = "%s(%s)" % ( conversion_or_none[0], conversion_or_none[1], ) elif isinstance(conversion_or_none, list): conversion_or_none = conversion_or_none[0] # check if conversion uses variables to format if re.search(r"({[^{}:]+})+", conversion_or_none): conversion_or_none = conversion_or_none.format(**properties) if extracted_value == NOT_AVAILABLE: # try if value can be formatted even if it is not available try: properties[metadata] = format_metadata( "{%s%s%s}" % (metadata, SEP, conversion_or_none), **{metadata: extracted_value}, ) except ValueError: logger.debug( f"{metadata}: {extracted_value} could not be formatted with {conversion_or_none}" ) continue else: # in this case formatting should work, otherwise something is wrong in the mapping properties[metadata] = format_metadata( "{%s%s%s}" % (metadata, SEP, conversion_or_none), **{metadata: extracted_value}, ) # properties as python objects when possible (format_metadata returns only strings) try: properties[metadata] = ast.literal_eval(properties[metadata]) except Exception: pass # Resolve templates for metadata, template in templates.items(): try: properties[metadata] = format_string(metadata, template, **properties) except ValueError: logger.warning( f"Could not parse {metadata} ({template}) using product properties" ) logger.debug(f"available properties: {properties}") properties[metadata] = NOT_AVAILABLE # adds missing discovered properties if not discovery_config: discovery_config = {} discovery_pattern = discovery_config.get("metadata_pattern") discovery_path = discovery_config.get("metadata_path") if discovery_pattern and discovery_path: discovery_jsonpath = string_to_jsonpath(discovery_path) discovered_properties = ( discovery_jsonpath.find(json) if isinstance(discovery_jsonpath, JSONPath) else [] ) mtd_prefix = discovery_config.get("metadata_prefix", "provider") for found_jsonpath in discovered_properties: if "metadata_path_id" in discovery_config.keys(): found_key_paths = string_to_jsonpath( discovery_config["metadata_path_id"], force=True ).find(found_jsonpath.value) if not found_key_paths or isinstance(found_key_paths, int): continue found_key = found_key_paths[0].value used_jsonpath = Child( found_jsonpath.full_path, string_to_jsonpath( discovery_config["metadata_path_value"], force=True ), ) else: # default key got from metadata_path found_key = found_jsonpath.path.fields[-1] used_jsonpath = found_jsonpath.full_path if ( re.compile(discovery_pattern).match(found_key) and found_key not in properties.keys() and f"{mtd_prefix}:{found_key}" not in properties.keys() and used_jsonpath not in used_jsonpaths ): # prepend with default STAC prefix if none is already used if ":" not in found_key: found_key = f"{mtd_prefix}:{found_key}" if "metadata_path_value" in discovery_config.keys(): found_value_path = string_to_jsonpath( discovery_config["metadata_path_value"], force=True ).find(found_jsonpath.value) properties[found_key] = ( found_value_path[0].value if found_value_path and not isinstance(found_value_path, int) else NOT_AVAILABLE ) else: # default value got from metadata_path properties[found_key] = found_jsonpath.value # properties as python objects when possible (format_metadata returns only strings) try: properties[found_key] = ast.literal_eval(properties[found_key]) except Exception: pass return properties def properties_from_xml( xml_as_text: AnyStr, mapping: Any, empty_ns_prefix: str = "ns", discovery_config: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: """Extract properties from a provider xml result. :param xml_as_text: The representation of a provider result as xml :param mapping: A mapping between :class:`~eodag.api.product._product.EOProduct`'s metadata keys and the location of the values of these properties in the xml representation, expressed as a `xpath <https://www.w3schools.com/xml/xml_xpath.asp>`_ :param empty_ns_prefix: (optional) The name to give to the default namespace of `xml_as_text`. This is a technical workaround for the limitation of lxml not supporting empty namespace prefix. The xpath in `mapping` must use this value to be able to correctly reach empty-namespace prefixed elements :param discovery_config: (optional) metadata discovery configuration dict, accepting among other items `discovery_pattern` (Regex pattern for metadata key discovery, e.g. "^[a-zA-Z]+$"), `discovery_path` (String representation of xpath) :returns: the metadata of the :class:`~eodag.api.product._product.EOProduct` """ properties: dict[str, Any] = {} templates = {} used_xpaths = [] root = etree.XML(xml_as_text) for metadata, value in mapping.items(): # Treat the case when the value is from a queryable metadata if isinstance(value, list): conversion_or_none, path_or_text = value[1] else: conversion_or_none, path_or_text = value try: extracted_value = root.xpath( path_or_text, namespaces={k or empty_ns_prefix: v for k, v in root.nsmap.items()}, ) if len(extracted_value) <= 1: if len(extracted_value) < 1: # If there is no matched value (empty list), mark the metadata as not # available extracted_value = [NOT_AVAILABLE] else: # store element tag in used_xpaths used_xpaths.append( getattr( root.xpath( path_or_text.replace("/text()", ""), namespaces={ k or empty_ns_prefix: v for k, v in root.nsmap.items() }, )[0], "tag", None, ) ) if conversion_or_none is None: properties[metadata] = extracted_value[0] else: # reformat conversion_or_none as metadata#converter(args) or metadata#converter if ( len(conversion_or_none) > 1 and isinstance(conversion_or_none, list) and conversion_or_none[1] is not None ): conversion_or_none = "%s(%s)" % ( conversion_or_none[0], conversion_or_none[1], ) elif isinstance(conversion_or_none, list): conversion_or_none = conversion_or_none[0] properties[metadata] = format_metadata( "{%s%s%s}" % (metadata, SEP, conversion_or_none), **{metadata: extracted_value[0]}, ) # If there are multiple matches, consider the result as a list, doing a # formatting if any else: if conversion_or_none is None: properties[metadata] = extracted_value else: # reformat conversion_or_none as metadata#converter(args) or metadata#converter if ( len(conversion_or_none) > 1 and isinstance(conversion_or_none, list) and conversion_or_none[1] is not None ): conversion_or_none = "%s(%s)" % ( conversion_or_none[0], conversion_or_none[1], ) elif isinstance(conversion_or_none, list): conversion_or_none = conversion_or_none[0] # check if conversion uses variables to format if re.search(r"({[^{}:]+})+", conversion_or_none): conversion_or_none = conversion_or_none.format(**properties) properties[metadata] = [ format_metadata( "{%s%s%s}" % ( metadata, SEP, conversion_or_none, ), # Re-build conversion format identifier **{metadata: extracted_value_item}, ) for extracted_value_item in extracted_value ] except XPathEvalError: # Assume the mapping is to be passed as is, in which case we readily # register it, or is a template, in which case we register it for later # formatting resolution using previously successfully resolved properties # Ignore any transformation specified. If a value is to be passed as is, # we don't want to transform it further if re.search(r"({[^{}:]+})+", path_or_text): templates[metadata] = path_or_text else: properties[metadata] = path_or_text # Resolve templates for metadata, template in templates.items(): properties[metadata] = template.format(**properties) # adds missing discovered properties if not discovery_config: discovery_config = {} discovery_pattern = discovery_config.get("metadata_pattern") discovery_path = discovery_config.get("metadata_path") if discovery_pattern and discovery_path: discovered_properties = root.xpath( discovery_path, namespaces={k or empty_ns_prefix: v for k, v in root.nsmap.items()}, ) for found_xpath in discovered_properties: found_key = found_xpath.tag.rpartition("}")[-1] if ( re.compile(discovery_pattern).match(found_key) and found_key not in properties.keys() and found_xpath.tag not in used_xpaths ): properties[found_key] = found_xpath.text return properties def mtd_cfg_as_conversion_and_querypath( src_dict: dict[str, Any], dest_dict: dict[str, Any] = {}, result_type: str = "json", ) -> dict[str, Any]: """Metadata configuration dictionary to querypath with conversion dictionary Transform every src_dict value from jsonpath_str to tuple `(conversion, jsonpath_object)` or from xpath_str to tuple `(conversion, xpath_str)` :param src_dict: Input dict containing jsonpath str as values :param dest_dict: (optional) Output dict containing jsonpath objects as values :returns: dest_dict """ # check if the configuration has already been converted some_configured_value = ( next(iter(dest_dict.values())) if dest_dict else next(iter(src_dict.values())) ) if ( isinstance(some_configured_value, list) and isinstance(some_configured_value[1], tuple) or isinstance(some_configured_value, tuple) ): return dest_dict or src_dict if not dest_dict: dest_dict = deepcopy(src_dict) for metadata in src_dict: if metadata not in dest_dict: dest_dict[metadata] = (None, NOT_MAPPED) else: conversion, path = get_metadata_path(dest_dict[metadata]) if result_type == "json": parsed_path = string_to_jsonpath(path) if isinstance(parsed_path, str): # not a jsonpath: assume the mapping is to be passed as is. Ignore any transformation specified. # If a value is to be passed as is, we don't want to transform it further conversion = None else: parsed_path = path if isinstance(dest_dict[metadata], list) and len(dest_dict[metadata]) == 2: dest_dict[metadata][1] = (conversion, parsed_path) else: dest_dict[metadata] = (conversion, parsed_path) # Put the updated mapping at the end dest_dict[metadata] = dest_dict.pop(metadata) return dest_dict def format_query_params( collection: str, config: PluginConfig, query_dict: dict[str, Any], error_context: str = "", ) -> dict[str, Any]: """format the search parameters to query parameters""" if "raise_errors" in query_dict.keys(): del query_dict["raise_errors"] # . not allowed in eodag_search_key, replaced with %2E query_dict = {k.replace(".", "%2E"): v for k, v in query_dict.items()} collection_metadata_mapping = dict( config.metadata_mapping, **config.products.get(collection, {}).get("metadata_mapping", {}), ) # Raise error if non-queryables parameters are used and raise_mtd_discovery_error configured if ( raise_mtd_discovery_error := config.products.get(collection, {}) .get("discover_metadata", {}) .get("raise_mtd_discovery_error") ) is None: raise_mtd_discovery_error = getattr(config, "discover_metadata", {}).get( "raise_mtd_discovery_error", False ) query_params: dict[str, Any] = {} # Get all the search parameters that are recognised as queryables by the # provider (they appear in the queryables dictionary) queryables = _get_queryables( query_dict, config, collection_metadata_mapping, raise_mtd_discovery_error, error_context, ) for eodag_search_key, provider_search_param in queryables.items(): user_input = query_dict[eodag_search_key] if provider_search_param == user_input: # means the mapping is to be passed as is, in which case we # readily register it if ( eodag_search_key in query_params and isinstance(query_params[eodag_search_key], dict) and isinstance(user_input, dict) ): query_params[eodag_search_key].update(user_input) else: query_params[eodag_search_key] = user_input continue if COMPLEX_QS_REGEX.match(provider_search_param): parts = provider_search_param.split("=") if len(parts) == 1: formatted_query_param = format_metadata( provider_search_param, collection, **query_dict ) formatted_query_param = formatted_query_param.replace("'", '"') if "{{" in provider_search_param: # retrieve values from hashes where keys are given in the param if "}[" in formatted_query_param: formatted_query_param = _resolve_hashes(formatted_query_param) # remove quotes around arrays formatted_query_param = remove_str_array_quotes( formatted_query_param ) if NOT_AVAILABLE in formatted_query_param: raise ValidationError( "Could not parse %s query parameter, got %s" % (eodag_search_key, formatted_query_param) ) # json query string (for POST request) update_nested_dict( query_params, orjson.loads(formatted_query_param), extend_list_values=True, allow_extend_duplicates=False, ) else: query_params[eodag_search_key] = formatted_query_param else: provider_search_key, provider_value = parts query_params[provider_search_key] = format_metadata( provider_value, collection, **query_dict ) else: query_params[provider_search_param] = user_input # Now get all the literal search params (i.e params to be passed "as is" # in the search request) # ignore additional_params if it isn't a dictionary literal_search_params = getattr(config, "literal_search_params", {}) if not isinstance(literal_search_params, dict): literal_search_params = {} # Now add formatted free text search parameters (this is for cases where a # complex query through a free text search parameter is available for the # provider and needed for the consumer) collection_metadata_mapping = dict( config.metadata_mapping, **config.products.get(collection, {}).get("metadata_mapping", {}), ) literal_search_params.update( _format_free_text_search(config, collection_metadata_mapping, **query_dict) ) for provider_search_key, provider_value in literal_search_params.items(): if isinstance(provider_value, list): query_params.setdefault(provider_search_key, []).extend(provider_value) else: query_params.setdefault(provider_search_key, []).append(provider_value) return query_params def _resolve_hashes(formatted_query_param: str) -> str: """ resolves structures of the format {"a": "abc", "b": "cde"}["a"] given in the formatted_query_param the structure is replaced by the value corresponding to the given key in the hash (in this case "abc") """ # check if there is still a hash to be resolved while '}["' in formatted_query_param: # find and parse code between {} ind_open = formatted_query_param.find('}["') ind_close = formatted_query_param.find('"]', ind_open) hash_start = formatted_query_param[:ind_open].rfind(": {") + 2 h = orjson.loads(formatted_query_param[hash_start : ind_open + 1]) # find key and get value ind_key_start = formatted_query_param.find('"', ind_open) + 1 key = formatted_query_param[ind_key_start:ind_close] value = h[key] # replace hash with value if isinstance(value, str): formatted_query_param = formatted_query_param.replace( formatted_query_param[hash_start : ind_close + 2], '"' + value + '"' ) else: formatted_query_param = formatted_query_param.replace( formatted_query_param[hash_start : ind_close + 2], json.dumps(value) ) return formatted_query_param def _format_free_text_search( config: PluginConfig, metadata_mapping: dict[str, Any], **kwargs: Any ) -> dict[str, Any]: """Build the free text search parameter using the search parameters""" query_params: dict[str, Any] = {} if not getattr(config, "free_text_search_operations", None): return query_params for param, operations_config in config.free_text_search_operations.items(): union = operations_config["union"] wrapper = operations_config.get("wrapper", "{}") formatted_query = [] for operator, operands in operations_config["operations"].items(): # The Operator string is the operator wrapped with spaces operator = " {} ".format(operator) # Build the operation string by joining the formatted operands together # using the operation string operation_string = operator.join( format_metadata(operand, **kwargs) for operand in operands if any( re.search(rf"{{{kw}[}}#]", operand) and val is not None and isinstance(metadata_mapping.get(kw, []), list) for kw, val in kwargs.items() ) ) # Finally wrap the operation string as specified by the wrapper and add # it to the list of queries (only if the operation string is not empty) if operation_string: query = wrapper.format(operation_string) formatted_query.append(query) # Join the formatted query using the "union" config parameter, and then # wrap it with the Python format string specified in the "wrapper" config # parameter final_query = union.join(formatted_query) if len(operations_config["operations"]) > 1 and len(formatted_query) > 1: final_query = wrapper.format(query_params[param]) if final_query: query_params[param] = final_query return query_params def _get_queryables( search_params: dict[str, Any], config: PluginConfig, metadata_mapping: dict[str, Any], raise_mtd_discovery_error: bool, error_context: str, ) -> dict[str, Any]: """Retrieve the metadata mappings that are query-able""" logger.debug("Retrieving queryable metadata from metadata_mapping") queryables: dict[str, Any] = {} for eodag_search_key, user_input in search_params.items(): if user_input is not None: md_mapping = metadata_mapping.get(eodag_search_key, (None, NOT_MAPPED)) # raise an error when a query param not allowed by the provider is found if not isinstance(md_mapping, list) and raise_mtd_discovery_error: raise ValidationError( "Search parameters which are not queryable are disallowed for this collection on this provider: " f"please remove '{eodag_search_key}' from your search parameters. {error_context}", {eodag_search_key}, ) _, md_value = md_mapping # query param from defined metadata_mapping if md_mapping is not None and isinstance(md_mapping, list): search_param = get_search_param(md_mapping) if search_param is not None: queryables[eodag_search_key] = search_param # query param from metadata auto discovery elif md_value == NOT_MAPPED and getattr( config, "discover_metadata", {} ).get("auto_discovery", False): pattern = re.compile( config.discover_metadata.get("metadata_pattern", "") ) search_param_cfg = config.discover_metadata.get("search_param", "") search_param_unparsed_cfg = config.discover_metadata.get( "search_param_unparsed", [] ) if ( search_param_unparsed_cfg and eodag_search_key in search_param_unparsed_cfg ): queryables[eodag_search_key] = user_input elif pattern.match(eodag_search_key) and isinstance( search_param_cfg, str ): search_param = search_param_cfg.format(metadata=eodag_search_key) queryables[eodag_search_key] = search_param elif pattern.match(eodag_search_key) and isinstance( search_param_cfg, dict ): search_param_cfg_parsed = dict_items_recursive_apply( search_param_cfg, lambda k, v: v.format(metadata=eodag_search_key), ) for k, v in search_param_cfg_parsed.items(): if getattr(config, k, None): update_nested_dict( getattr(config, k), v, extend_list_values=True, allow_extend_duplicates=False, ) else: logger.warning( "Could not use discover_metadata[search_param]: no entry for %s in plugin config", k, ) return queryables def get_queryable_from_provider( provider_queryable: str, metadata_mapping: dict[str, Union[str, list[str]]] ) -> Optional[str]: """Get EODAG configured queryable parameter from provider queryable parameter :param provider_queryable: provider queryable parameter :param metadata_mapping: metadata-mapping configuration :returns: EODAG configured queryable parameter or None """ pattern = rf"\"{provider_queryable}\"" # if 1:1 mapping exists privilege this one instead of other mapping # e.g. provider queryable = year -> use year and not date in which year also appears mapping_values = [ v[0] if isinstance(v, list) else "" for v in metadata_mapping.values() ] StacQueryables = Queryables.from_stac_models() if provider_queryable in mapping_values: ind = mapping_values.index(provider_queryable) return StacQueryables.get_queryable_from_alias( list(metadata_mapping.keys())[ind] ) for param, param_conf in metadata_mapping.items(): if ( isinstance(param_conf, list) and param_conf[0] and re.search(pattern, param_conf[0]) ): return StacQueryables.get_queryable_from_alias(param) return None def get_provider_queryable_path( queryable: str, metadata_mapping: dict[str, Union[str, list[str]]] ) -> Optional[str]: """Get EODAG configured queryable path from its parameter :param queryable: eodag queryable parameter :param metadata_mapping: metadata-mapping configuration :returns: EODAG configured queryable path or None """ parameter_conf = metadata_mapping.get(queryable) if isinstance(parameter_conf, list): return parameter_conf[0] else: return None def get_provider_queryable_key( eodag_key: str, provider_queryables: dict[str, Any], metadata_mapping: dict[str, Union[list[Any], str]], ) -> str: """Finds the provider queryable corresponding to the given eodag key based on the metadata mapping :param eodag_key: key in eodag :param provider_queryables: queryables returned from the provider :param metadata_mapping: metadata mapping from which the keys are retrieved :returns: provider queryable key """ if eodag_key not in metadata_mapping: return "" mapping_key = metadata_mapping[eodag_key] if isinstance(mapping_key, list): for queryable in provider_queryables: pattern = rf"\b{queryable}\b" if re.search(pattern, mapping_key[0]): return queryable return "" else: return eodag_key def normalize_bands(data: Union[dict, Asset]) -> Union[dict, Asset]: """Migrate ``eo:bands`` / ``raster:bands`` of ``data`` into a STAC 1.1 ``bands`` array, in place. Returns ``data`` for convenience. :param data: properties dict or Asset to migrate :returns: the same data with migrated bands """ UNPREFIX_BAND_FIELDNAME = [ "name", "description", "data_type", "nodata", "unit", "statistics", ] EXCLUDE_MOVE_TO_PARENT_BAND_FIELDNAME = ["name", "eo:common_name"] # https://github.com/radiantearth/stac-spec/blob/v1.1.0/best-practices.md#bands # Migrate band STAC 1.0 to 1.1 if isinstance(data, dict) or isinstance(data, Asset): # Gather eo:band et raster:bands bands: dict[str, Any] = {"eo:bands": [], "raster:bands": []} hasData = False for fieldname in bands: if fieldname in data: if isinstance(data[fieldname], list): bands[fieldname] = data[fieldname] else: bands[fieldname] = [data[fieldname]] hasData = True del data[fieldname] if hasData: processed_bands = [] # migrate eo:bands -> bands if len(bands["eo:bands"]) > 0: for item in bands["eo:bands"]: band = {} for key in item: if key in UNPREFIX_BAND_FIELDNAME: band[key] = item[key] else: band["eo:{}".format(key)] = item[key] processed_bands.append(band) # migrate raster:bands -> bands if len(bands["raster:bands"]) > 0: index = 0 for item in bands["raster:bands"]: band = ( processed_bands[index] if index < len(processed_bands) else {} ) for key in item: if key in UNPREFIX_BAND_FIELDNAME: band[key] = item[key] else: band["raster:{}".format(key)] = item[key] if index < len(processed_bands): processed_bands[index] = band else: processed_bands.append(band) index += 1 # When a property has the same value for each band, move it in parent scope if len(processed_bands) > 0: field_values: dict[str, Any] = {} # Lists each distinct value for a field of the same name on each band for band in processed_bands: for key in band: if key not in field_values: field_values[key] = [] if band[key] not in field_values[key]: field_values[key].append(band[key]) # Move band fields from asset to parent if all fields shared same value # (distinct values == 1) remove_band_fields = [] for key in field_values: if ( key in EXCLUDE_MOVE_TO_PARENT_BAND_FIELDNAME or len(field_values[key]) != 1 ): continue # Do not overwrite a value already set on the parent # (e.g. an Asset's own `description`); keep the # per-band value on the `bands` array instead. if key in data and data[key] != field_values[key][0]: continue # All bands have same value data[key] = field_values[key][0] # Tag field "to remove" from assets remove_band_fields.append(key) del field_values # Remove from assets field moved to parent cleaned_bands = [] for band in processed_bands: cleaned_band = {} for key in band: if key not in remove_band_fields: cleaned_band[key] = band[key] if len(list(cleaned_band.keys())) > 0: cleaned_bands.append(cleaned_band) processed_bands = cleaned_bands del cleaned_bands # Remap band field if contains at least one value if len(processed_bands) > 0: data["bands"] = processed_bands return data