Source code for eodag.api.product.metadata_mapping
# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import ast
import datetime as dt
import json
import logging
import re
from string import Formatter
from typing import TYPE_CHECKING, Any, AnyStr, Callable, Iterator, Optional, Union, cast
import geojson
import orjson
import pyproj
import shapely
from dateutil.relativedelta import relativedelta
from dateutil.tz import tzutc
from jsonpath_ng.jsonpath import Child, JSONPath
from lxml import etree
from lxml.etree import XPathEvalError
from shapely import wkt
from shapely.geometry import LineString, MultiPolygon, Point, Polygon
from shapely.ops import transform
from eodag.api.product._assets import Asset
from eodag.types.queryables import Queryables
from eodag.utils import (
DEFAULT_PROJ,
DEFAULT_SHAPELY_GEOMETRY,
deepcopy,
dict_items_recursive_apply,
format_string,
get_geometry_from_various,
items_recursive_apply,
nested_pairs2dict,
remove_str_array_quotes,
sanitize,
string_to_jsonpath,
update_nested_dict,
)
from eodag.utils.dates import get_timestamp, parse_to_utc, to_iso_utc_string
from eodag.utils.exceptions import ValidationError
if TYPE_CHECKING:
from collections.abc import Mapping, Sequence
from shapely.geometry.base import BaseGeometry
from eodag.config import PluginConfig
logger = logging.getLogger("eodag.product.metadata_mapping")
SEP = r"#"
INGEST_CONVERSION_REGEX = re.compile(
r"^{(?P<path>[^#]*)" + SEP + r"(?P<converter>[^\d\W]\w*)(\((?P<args>.*)\))*}$"
)
NOT_AVAILABLE = "Not Available"
NOT_MAPPED = "Not Mapped"
ONLINE_STATUS = "succeeded"
STAGING_STATUS = "ordered"
OFFLINE_STATUS = "orderable"
COORDS_ROUNDING_PRECISION = 4
WKT_MAX_LEN = 1600
COMPLEX_QS_REGEX = re.compile(r"^(.+=)?([^=]*)({.+})+([^=&]*)$")
DEFAULT_GEOMETRY = "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))"
def get_metadata_path(
map_value: Union[str, list[str]],
) -> tuple[Union[list[str], None], str]:
"""Return the jsonpath or xpath to the value of a EO product metadata in a provider
search result.
The path is retrieved depending on if the metadata is queryable (the value
associated to it in the provider search config metadata mapping is a list) or not
(the value is directly the string corresponding to the path).
Assume we have the following provider config::
provider:
...
search:
...
metadata_mapping:
platform:
- platform
- $.properties.platform
id: $.properties.id
...
...
...
Then the metadata `id` is not queryable for this provider meanwhile `platform`
is queryable. The first value of the `metadata_mapping.platform` is how the
eodag search parameter `platform` is interpreted in the
:class:`~eodag.plugins.search.base.Search` plugin implemented by `provider`, and is
used when eodag delegates search process to the corresponding plugin.
:param map_value: The value originating from the definition of `metadata_mapping`
in the provider search config. For example, it is the list
`['platform', '$.properties.platform']` with the sample
above. Or the string `$.properties.id`.
:returns: Either, None and the path to the metadata value, or a list of converter
and its args, and the path to the metadata value.
"""
path = get_metadata_path_value(map_value)
try:
match = INGEST_CONVERSION_REGEX.match(path)
except TypeError as e:
logger.error("Could not match regex on metadata path '%s'" % str(path))
raise e
if match:
g = match.groupdict()
return [g["converter"], g["args"]], g["path"]
return None, path
def get_metadata_path_value(map_value: Union[str, list[str]]) -> str:
"""Get raw metadata path without converter"""
return map_value[1] if isinstance(map_value, list) else map_value
def get_search_param(map_value: list[str]) -> str:
"""See :func:`~eodag.api.product.metadata_mapping.get_metadata_path`
:param map_value: The value originating from the definition of `metadata_mapping`
in the provider search config
:returns: The value of the search parameter as defined in the provider config
"""
# Assume that caller will pass in the value as a list
return map_value[0]
[docs]
def format_metadata(search_param: str, *args: Any, **kwargs: Any) -> str:
"""Format a string of form ``{<field_name>#<conversion_function>}``
The currently understood converters are:
- ``ceda_collection_name``: generate a CEDA collection name from a string
- ``wekeo_to_cop_collection``: converts the name of a collection from the WEkEO format to the Copernicus format
- ``csv_list``: convert to a comma separated list
- ``datetime_to_timestamp_milliseconds``: converts a utc date string to a timestamp in milliseconds
- ``dict_filter_and_sub``: filter dict items using jsonpath and then apply recursive_sub_str
- ``dict_with_roles``: keep only dict items with given roles in their "roles" list
- ``fake_l2a_title_from_l1c``: used to generate SAFE format metadata for data from AWS
- ``from_alternate``: update assets using given alternate
- ``from_ewkt``: convert EWKT to shapely geometry / WKT in DEFAULT_PROJ
- ``from_georss``: convert GeoRSS to shapely geometry / WKT in DEFAULT_PROJ
- ``get_ecmwf_time``: get the time of a datetime string in the ECMWF format
- ``get_group_name``: get the matching regex group name
- ``literalize_unicode``: convert a string to its raw Unicode literal form
- ``not_available``: replace value with "Not Available"
- ``recursive_sub_str``: recursively substitue in the structure (e.g. dict) values matching a regex
- ``remove_extension``: on a string that contains dots, only take the first part of the list obtained by
splitting the string on dots
- ``replace_str``: execute "string".replace(old, new)
- ``replace_str_tuple``: apply multiple replacements on a string (parts or complete)
- ``replace_tuple``: apply multiple replacements matching whole value
- ``s2msil2a_title_to_aws_productinfo``: used to generate SAFE format metadata for data from AWS
- ``sanitize``: sanitize string
- ``slice_str``: slice a string (equivalent to s[start, end, step])
- ``split``: split a string using given separator
- ``split_cop_dem_id``: get the bbox by splitting the product id
- ``split_corine_id``: get the collection by splitting the product id
- ``to_bounds_lists``: convert to list(s) of bounds
- ``to_datetime_dict``: convert a datetime string to a dictionary where values are either a string or a list
- ``to_ewkt``: convert to EWKT (Extended Well-Known text)
- ``to_geojson``: convert to a GeoJSON (via __geo_interface__ if exists)
- ``to_geojson_polytope``: convert shapely Point/LineString/Polygon to ECMWF polytope feature dicts
- ``to_iso_date``: remove the time part of a iso datetime string
- ``to_iso_utc_datetime_from_milliseconds``: convert a utc timestamp in given milliseconds to a utc iso datetime
- ``to_iso_utc_datetime``: convert a UTC datetime string to ISO UTC datetime string
- ``to_lower``: Convert a string to lowercase
- ``to_nwse_bounds_str``: convert to North,West,South,East bounds string with given separator
- ``to_nwse_bounds``: convert to North,West,South,East bounds
- ``to_rounded_wkt``: simplify the WKT of a geometry
- ``to_title``: Convert a string to title case
- ``to_upper``: Convert a string to uppercase
:param search_param: The string to be formatted
:param args: (optional) Additional arguments to use in the formatting process
:param kwargs: (optional) Additional named-arguments to use when formatting
:returns: The formatted string
"""
class MetadataFormatter(Formatter):
CONVERSION_REGEX = re.compile(
r"^(?P<field_name>.+)"
+ SEP
+ r"(?P<converter>[^\d\W]\w*)(\((?P<args>.*)\))*$"
)
def __init__(self) -> None:
self.custom_converter: Optional[Callable] = None
self.custom_args: Optional[str] = None
def parse(self, format_string: str):
"""
Rewrite field names in the template before the base parser sees them.
Replaces `{foo:bar}` with `{foo__bar}`.
"""
pattern = re.compile(r"{([^{}]+)}")
def rewrite_field(field: str) -> str:
# If there's a format spec (e.g., {foo:bar:.2f}), preserve it
if ":" in field and not field.lstrip().startswith(("!", ".", ":")):
before_colon, *after = field.split(":")
# Don't confuse format spec with field name colons
if len(after) == 1 and "." in after[0]:
# It's a format specifier, leave it
return field
return field.replace(":", "__", 1)
return field
# Replace in string (but not in format_spec itself)
safe_template = pattern.sub(
lambda m: "{" + rewrite_field(m.group(1)) + "}", format_string
)
# Yield from base class
yield from super().parse(safe_template)
def get_value(
self, key: Any, args: "Sequence[Any]", kwargs: "Mapping[str, Any]"
) -> Any:
"""
Look up rewritten field name in kwargs by converting __ back to :
"""
if isinstance(key, str):
original_key = key.replace("__", ":")
result = kwargs.get(original_key)
if result is not None:
return result
key_with_COLON = key.replace("__", "_COLON_")
return kwargs.get(key_with_COLON)
return super().get_value(key, args, kwargs)
def get_field(self, field_name: str, args: Any, kwargs: Any) -> Any:
conversion_func_spec = self.CONVERSION_REGEX.match(field_name)
# Register a custom converter if any for later use (see convert_field)
# This is done because we don't have the value associated to field_name at
# this stage
if conversion_func_spec:
field_name = conversion_func_spec.groupdict()["field_name"]
converter = conversion_func_spec.groupdict()["converter"]
self.custom_args = conversion_func_spec.groupdict()["args"]
# converts back "_COLON_" to ":"
if self.custom_args is not None and "_COLON_" in self.custom_args:
self.custom_args = self.custom_args.replace("_COLON_", ":")
self.custom_converter = getattr(self, "convert_{}".format(converter))
return super(MetadataFormatter, self).get_field(field_name, args, kwargs)
def convert_field(self, value: Any, conversion: Any) -> Any:
# Do custom conversion if any (see get_field)
if self.custom_converter is not None:
if self.custom_args is not None and value is not None:
converted = self.custom_converter(value, self.custom_args)
elif value is not None:
converted = self.custom_converter(value)
else:
converted = None
# Clear this state variable in case the same converter is used to
# resolve other named arguments
self.custom_converter = None
self.custom_args = None
return converted
return super(MetadataFormatter, self).convert_field(value, conversion)
@staticmethod
def convert_datetime_to_timestamp_milliseconds(date_time: str) -> int:
"""Convert a date_time (str) to a Unix timestamp in milliseconds
"2021-04-21T18:27:19.123Z" => "1619029639123"
"2021-04-21" => "1618963200000"
"2021-04-21T00:00:00+02:00" => "1618956000000"
"""
return int(1e3 * get_timestamp(date_time))
@staticmethod
def convert_to_iso_utc_datetime_from_milliseconds(
timestamp: int,
) -> Union[str, int]:
"""Convert a timestamp in milliseconds (int) to its ISO8601 UTC format
1619029639123 => "2021-04-21T18:27:19.123Z"
"""
try:
return cast(
str,
to_iso_utc_string(
dt.datetime.fromtimestamp(timestamp / 1e3, tzutc())
),
)
except TypeError:
return timestamp
@staticmethod
def convert_to_iso_utc_datetime(
date_time: str, timespec: str = "milliseconds"
) -> str:
"""Convert a date_time (str) to its ISO 8601 representation in UTC
"2021-04-21" => "2021-04-21T00:00:00.000Z"
"2021-04-21T00:00:00.000+02:00" => "2021-04-20T22:00:00.000Z"
The optional argument timespec specifies the number of additional
terms of the time to include. Valid options are 'auto', 'hours',
'minutes', 'seconds', 'milliseconds' and 'microseconds'.
"""
try:
parsed_dt = parse_to_utc(date_time)
except ValidationError:
return date_time
return parsed_dt.isoformat(timespec=timespec).replace("+00:00", "Z")
@staticmethod
def convert_to_iso_date(
datetime_string: str, time_delta_args_str: str = "0,0,0,0,0,0,0"
) -> str:
"""Convert an ISO8601 datetime (str) to its ISO8601 date format
"2021-04-21T18:27:19.123Z" => "2021-04-21"
"2021-04-21" => "2021-04-21"
"2021-04-21T00:00:00+06:00" => "2021-04-20" !
"""
parsed_dt = parse_to_utc(datetime_string)
time_delta_args = ast.literal_eval(time_delta_args_str)
parsed_dt += dt.timedelta(*time_delta_args)
return parsed_dt.isoformat()[:10]
@staticmethod
def convert_to_non_separated_date(datetime_string):
iso_date = MetadataFormatter.convert_to_iso_date(datetime_string)
return iso_date.replace("-", "")
@staticmethod
def convert_to_rounded_wkt(value: BaseGeometry) -> str:
wkt_value = cast(
str, wkt.dumps(value, rounding_precision=COORDS_ROUNDING_PRECISION)
)
# If needed, simplify WKT to prevent too long request failure
tolerance = 0.1
while len(wkt_value) > WKT_MAX_LEN and tolerance <= 1:
logger.debug(
"Geometry WKT is too long (%s), trying to simplify it with tolerance %s",
len(wkt_value),
tolerance,
)
wkt_value = cast(
str,
wkt.dumps(
value.simplify(tolerance),
rounding_precision=COORDS_ROUNDING_PRECISION,
),
)
tolerance += 0.1
if len(wkt_value) > WKT_MAX_LEN and tolerance > 1:
logger.warning("Failed to reduce WKT length lower than %s", WKT_MAX_LEN)
return wkt_value
@staticmethod
def convert_to_bounds_lists(input_geom: BaseGeometry) -> list[list[float]]:
if isinstance(input_geom, MultiPolygon):
geoms = [geom for geom in input_geom.geoms]
# sort with larger one at first (stac-browser only plots first one)
geoms.sort(key=lambda x: x.area, reverse=True)
return [list(x.bounds[0:4]) for x in geoms]
else:
return [list(input_geom.bounds[0:4])]
@staticmethod
def convert_to_bounds(input_geom_unformatted: Any) -> list[float]:
input_geom = get_geometry_from_various(geometry=input_geom_unformatted)
if input_geom is None:
input_geom = DEFAULT_SHAPELY_GEOMETRY
if isinstance(input_geom, MultiPolygon):
geoms = [geom for geom in input_geom.geoms]
# sort with larger one at first (stac-browser only plots first one)
geoms.sort(key=lambda x: x.area, reverse=True)
min_lon = 180.0
min_lat = 90.0
max_lon = -180.0
max_lat = -90.0
for geom in geoms:
min_lon = min(min_lon, geom.bounds[0])
min_lat = min(min_lat, geom.bounds[1])
max_lon = max(max_lon, geom.bounds[2])
max_lat = max(max_lat, geom.bounds[3])
return [min_lon, min_lat, max_lon, max_lat]
else:
return list(input_geom.bounds[0:4])
@staticmethod
def convert_to_nwse_bounds(input_geom: BaseGeometry) -> list[float]:
if isinstance(input_geom, str):
input_geom = shapely.wkt.loads(input_geom)
return list(input_geom.bounds[-1:] + input_geom.bounds[:-1])
@staticmethod
def convert_to_nwse_bounds_str(
input_geom: BaseGeometry, separator: str = ","
) -> str:
return separator.join(
str(x) for x in MetadataFormatter.convert_to_nwse_bounds(input_geom)
)
@staticmethod
def convert_to_geojson(value: Any) -> str:
return geojson.dumps(value)
@staticmethod
def convert_to_geojson_polytope(
value: BaseGeometry,
) -> Union[dict[Any, Any], str]:
"""Convert a shapely Point/LineString/Polygon to ECMWF polytope feature dicts"""
# ECMWF Polytope uses non-geojson structure for features
if isinstance(value, Polygon):
return {
"type": "polygon",
"shape": [[y, x] for x, y in value.exterior.coords],
}
if isinstance(value, Point):
return {"type": "position", "points": [[value.y, value.x]]}
if isinstance(value, LineString):
return {
"type": "trajectory",
"points": [[y, x] for x, y in value.coords],
"inflation": 0,
}
raise ValidationError(
"to_geojson_polytope only accepts shapely Polygon, Point and LineString"
)
@staticmethod
def convert_from_ewkt(ewkt_string: str) -> Union[BaseGeometry, str]:
"""Convert EWKT (Extended Well-Known text) to shapely geometry"""
ewkt_regex = re.compile(
r"^.*(?P<proj>SRID=[0-9]+);(?P<wkt>[A-Z0-9 \(\),\.-]+).*$"
)
ewkt_match = ewkt_regex.match(ewkt_string)
if ewkt_match:
g = ewkt_match.groupdict()
from_proj = g["proj"].replace("SRID", "EPSG").replace("=", ":")
input_geom = wkt.loads(g["wkt"])
from_proj = pyproj.CRS(from_proj)
to_proj = pyproj.CRS(DEFAULT_PROJ)
if from_proj != to_proj:
# reproject
project = pyproj.Transformer.from_crs(
from_proj, to_proj, always_xy=True
).transform
return transform(project, input_geom)
else:
return input_geom
else:
logger.warning(f"Could not read {ewkt_string} as EWKT")
return ewkt_string
@staticmethod
def convert_to_ewkt(input_geom: BaseGeometry) -> str:
"""Convert shapely geometry to EWKT (Extended Well-Known text)"""
proj = DEFAULT_PROJ.upper().replace("EPSG", "SRID").replace(":", "=")
wkt_geom = MetadataFormatter.convert_to_rounded_wkt(input_geom)
return f"{proj};{wkt_geom}"
@staticmethod
def convert_from_georss(georss: Any) -> Union[BaseGeometry, Any]:
"""Convert GeoRSS to shapely geometry"""
if "polygon" in georss.tag:
# Polygon
coords_list = georss.text.split()
polygon_args = [
(float(coords_list[2 * i]), float(coords_list[2 * i + 1]))
for i in range(int(len(coords_list) / 2))
]
return Polygon(polygon_args)
elif len(georss) == 1 and "multisurface" in georss[0].tag.lower():
# Multipolygon
from_proj = getattr(georss[0], "attrib", {}).get("srsName")
if from_proj:
from_proj = pyproj.CRS(from_proj)
to_proj = pyproj.CRS(DEFAULT_PROJ)
project = pyproj.Transformer.from_crs(
from_proj, to_proj, always_xy=True
).transform
# function to get deepest elements
def flatten_elements(nested) -> Iterator[Any]:
for e in nested:
if len(e) > 0:
yield from flatten_elements(e)
else:
yield e
polygons_list: list[Polygon] = []
for elem in flatten_elements(georss[0]):
coords_list = elem.text.split()
polygon_args = [
(float(coords_list[2 * i]), float(coords_list[2 * i + 1]))
for i in range(int(len(coords_list) / 2))
]
polygon = Polygon(polygon_args)
# reproject if needed
if from_proj and from_proj != to_proj:
polygons_list.append(transform(project, polygon))
else:
polygons_list.append(polygon)
return MultiPolygon(polygons_list)
else:
logger.warning(
f"Incoming GeoRSS format not supported yet: {str(georss)}"
)
return georss
@staticmethod
def convert_to_longitude_latitude(
input_geom_unformatted: Any,
) -> dict[str, float]:
bounds = MetadataFormatter.convert_to_bounds(input_geom_unformatted)
lon = (bounds[0] + bounds[2]) / 2
lat = (bounds[1] + bounds[3]) / 2
return {"lon": lon, "lat": lat}
@staticmethod
def convert_csv_list(values_list: Any, separator=",") -> Any:
if isinstance(values_list, list):
return separator.join([str(x) for x in values_list])
else:
return values_list
@staticmethod
def convert_remove_extension(string: str) -> str:
parts = string.split(".")
if parts:
return parts[0]
return ""
@staticmethod
def convert_get_group_name(string: str, pattern: str) -> str:
sanitized_pattern = pattern.replace(" ", "_SPACE_")
try:
match = re.search(sanitized_pattern, str(string))
if match:
if result := match.lastgroup:
return result.replace("_SPACE_", " ")
else:
return NOT_AVAILABLE
except AttributeError:
pass
logger.warning(
"Could not extract property from %s using %s", string, pattern
)
return NOT_AVAILABLE
@staticmethod
def convert_replace_str(value: Any, args: str) -> str:
if isinstance(value, dict):
value = MetadataFormatter.convert_to_geojson(value)
elif not isinstance(value, str):
raise TypeError(
f"convert_replace_str expects a string or a dict (apply to_geojson). Got {type(value)}: {value}"
)
old, new = ast.literal_eval(args)
return re.sub(old, new, value)
@staticmethod
def convert_replace_str_tuple(
value: Union[str, dict[Any, Any]], args: str
) -> str:
"""
Apply multiple replacements on a string (parts or complete).
:param value: input string or dict.
:param args: string representing a list/tuple of (old, new) pairs, like
``'(("old1", "new1"), ("old2", "new2"))'``
"""
if isinstance(value, dict):
value = MetadataFormatter.convert_to_geojson(value)
elif not isinstance(value, str):
raise TypeError(
f"convert_replace_str_tuple expects a string or a dict (apply to_geojson). "
f"Got {type(value)}: {value}"
)
# args sera une chaîne représentant une liste/tuple de tuples
replacements = ast.literal_eval(args)
if not isinstance(replacements, (list, tuple)):
raise TypeError(
f"convert_replace_str_tuple expects a list/tuple of (old,new) pairs. "
f"Got {type(replacements)}: {replacements}"
)
for old, new in replacements:
value = re.sub(old, new, value)
return value
@staticmethod
def convert_replace_tuple(value: Any, args: str) -> Any:
"""
Apply multiple replacements matching whole value.
:param value: input to replace
:param args: string representing a list/tuple of (old, new) pairs, like
``'((["old1"], "new1"), ("old2", ["new2"]))'``
"""
# args sera une chaîne représentant une liste/tuple de tuples
replacements = ast.literal_eval(args)
if not isinstance(replacements, (list, tuple)):
raise TypeError(
f"convert_replace_str_tuple expects a list/tuple of (old,new) pairs. "
f"Got {type(replacements)}: {replacements}"
)
for old, new in replacements:
if old == value:
return new
return value
@staticmethod
def convert_not_available(value: Any) -> str:
"""Convert any value to "Not Available".
This is more useful than "$.null" to keep original jsonpath while parsing in metadata_mapping.
"""
return NOT_AVAILABLE
@staticmethod
def convert_split(value: str, separator: str) -> list[str]:
"""Split a string using given separator"""
if value == NOT_AVAILABLE:
return [NOT_AVAILABLE]
if not isinstance(value, str):
logger.warning(
"Could not split non-string value %s (type %s)", value, type(value)
)
return [NOT_AVAILABLE]
if not isinstance(separator, str):
logger.warning(
"Could not split string using non-string separator %s (type %s)",
separator,
type(separator),
)
return [NOT_AVAILABLE]
return value.split(separator)
@staticmethod
def convert_ceda_collection_name(value: str) -> str:
data_regex = re.compile(r"/data/(?P<name>.+?)/?$")
match = data_regex.search(value)
if match:
return match.group("name").replace("/", "_").upper()
return NOT_AVAILABLE
@staticmethod
def convert_literalize_unicode(value: str) -> str:
if value == NOT_AVAILABLE:
return value
return value.encode("raw_unicode_escape").decode("utf-8")
@staticmethod
def convert_recursive_sub_str(
input_obj: Union[dict[Any, Any], list[Any]], args: str
) -> Union[dict[Any, Any], list[Any]]:
old, new = ast.literal_eval(args)
return items_recursive_apply(
input_obj,
lambda k, v, x, y: re.sub(x, y, v) if isinstance(v, str) else v,
**{"x": old, "y": new},
)
@staticmethod
def convert_dict_update(
input_dict: dict[Any, Any], args: str
) -> dict[Any, Any]:
"""Converts"""
new_items_list = ast.literal_eval(args)
new_items_dict = nested_pairs2dict(new_items_list)
return dict(input_dict, **new_items_dict)
@staticmethod
def convert_dict_filter(
input_dict: dict[Any, Any], jsonpath_filter_str: str
) -> dict[Any, Any]:
"""Fitlers dict items using jsonpath"""
jsonpath_filter = string_to_jsonpath(jsonpath_filter_str, force=True)
if isinstance(jsonpath_filter, str) or not isinstance(input_dict, dict):
return {}
keys_list = list(input_dict.keys())
matches = jsonpath_filter.find(input_dict)
result = {}
for match in matches:
# extract key index from matched jsonpath
matched_jsonpath_str = str(match.full_path)
matched_index = int(matched_jsonpath_str.split(".")[-1][1:-1])
key = keys_list[matched_index]
result[key] = match.value
return result
@staticmethod
def convert_dict_filter_and_sub(
input_dict: dict[Any, Any], args: str
) -> Union[dict[Any, Any], list[Any]]:
"""Fitlers dict items using jsonpath and then apply recursive_sub_str"""
jsonpath_filter_str, old, new = ast.literal_eval(args)
filtered = MetadataFormatter.convert_dict_filter(
input_dict, jsonpath_filter_str
)
args_str = f"('{old}', '{new}')"
return MetadataFormatter.convert_recursive_sub_str(filtered, args_str)
@staticmethod
def convert_dict_with_roles(
input_dict: dict[Any, Any], roles_str: str
) -> dict[Any, Any]:
"""Keep only dict items with given roles in their "roles" list"""
roles = ast.literal_eval(roles_str)
if not isinstance(roles, (list, tuple)):
raise TypeError(
f"convert_keep_dict_with_roles expects a list/tuple of roles. Got {type(roles)}: {roles}"
)
result = {}
for k, v in input_dict.items():
if not isinstance(v, dict):
continue
item_roles = v.get("roles", [])
if any(role in item_roles for role in roles):
result[k] = v
return result
@staticmethod
def convert_from_alternate(
input_obj: dict[str, Any], value: str
) -> dict[str, Any]:
"""
Update assets using given alternate.
"""
result: dict[str, Any] = {}
for k, v in input_obj.items():
if not isinstance(v, dict):
continue
alt_dict = deepcopy(v).get("alternate")
if not isinstance(alt_dict, dict):
continue
value_entry = alt_dict.pop(value, None)
if not isinstance(value_entry, dict):
continue
result[k] = v | value_entry | {"alternate": alt_dict}
if len(result[k]["alternate"]) == 0:
del result[k]["alternate"]
return result
@staticmethod
def convert_slice_str(string: str, args: str) -> str:
cmin, cmax, cstep = [
int(x.strip()) if x.strip().lstrip("-").isdigit() else None
for x in args.split(",")
]
return string[cmin:cmax:cstep] or NOT_AVAILABLE
@staticmethod
def convert_to_lower(string: str) -> str:
"""Convert a string to lowercase."""
if string == NOT_AVAILABLE:
return string
return string.lower()
@staticmethod
def convert_to_upper(string: str) -> str:
"""Convert a string to uppercase."""
return string.upper()
@staticmethod
def convert_to_title(string: str) -> str:
"""Convert a string to title case."""
if string == NOT_AVAILABLE:
return string
return string.title()
@staticmethod
def convert_fake_l2a_title_from_l1c(string: str) -> str:
id_regex = re.compile(
r"^(?P<id1>\w+)_(?P<id2>\w+)_(?P<id3>\w+)_(?P<id4>\w+)_(?P<id5>\w+)_(?P<id6>\w+)_(?P<id7>\w+)$"
)
id_match = id_regex.match(string)
if id_match:
id_dict = id_match.groupdict()
return "%s_MSIL2A_%s____________%s________________" % (
id_dict["id1"],
id_dict["id3"],
id_dict["id6"],
)
else:
logger.error("Could not extract fake title from %s" % string)
return NOT_AVAILABLE
@staticmethod
def convert_s2msil2a_title_to_aws_productinfo(string: str) -> str:
id_regex = re.compile(
r"^(?P<id1>\w+)_(?P<id2>\w+)_(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})T[0-9]+_"
+ r"(?P<id4>[A-Z0-9_]+)_(?P<id5>[A-Z0-9_]+)_T(?P<tile1>[0-9]{2})(?P<tile2>[A-Z])(?P<tile3>[A-Z]{2})_"
+ r"(?P<id7>[A-Z0-9_]+)$"
)
id_match = id_regex.match(string)
if id_match:
id_dict = id_match.groupdict()
return (
"https://roda.sentinel-hub.com/sentinel-s2-l2a/tiles/%s/%s/%s/%s/%s/%s/0/{_collection}.json"
% (
id_dict["tile1"],
id_dict["tile2"],
id_dict["tile3"],
id_dict["year"],
int(id_dict["month"]),
int(id_dict["day"]),
)
)
else:
logger.error("Could not extract title infos from %s" % string)
return NOT_AVAILABLE
@staticmethod
def convert_split_id_into_s3_params(product_id: str) -> dict[str, str]:
parts: list[str] = re.split(r"_(?!_)", product_id)
params = {"collection": product_id[4:15]}
dates = re.findall("[0-9]{8}T[0-9]{6}", product_id)
start_date = dt.datetime.strptime(dates[0], "%Y%m%dT%H%M%S") - dt.timedelta(
seconds=1
)
# cast to tell the type checker that value won't be None here
params["startDate"] = cast(str, to_iso_utc_string(start_date))
end_date = dt.datetime.strptime(dates[1], "%Y%m%dT%H%M%S") + dt.timedelta(
seconds=1
)
params["endDate"] = cast(str, to_iso_utc_string(end_date))
params["timeliness"] = parts[-2]
params["sat"] = "Sentinel-" + parts[0][1:]
return params
@staticmethod
def convert_dates_from_cmems_id(product_id: str):
date_format_1 = "[0-9]{10}"
date_format_2 = "[0-9]{8}"
dates = re.findall(date_format_1, product_id)
if dates:
date = dates[0]
else:
dates = re.findall(date_format_2, product_id)
date = dates[0]
if len(date) == 10:
date_time = dt.datetime.strptime(dates[0], "%Y%m%d%H")
else:
date_time = dt.datetime.strptime(dates[0], "%Y%m%d")
return {
"min_date": to_iso_utc_string(date_time),
"max_date": to_iso_utc_string(date_time + dt.timedelta(days=1)),
}
@staticmethod
def convert_to_datetime_dict(
date: str, format: str
) -> dict[str, Union[list[str], str]]:
"""Convert a date (str) to a dictionary where values are in the format given in argument
date == "2021-04-21T18:27:19.123Z" and format == "list" => {
"year": ["2021"],
"month": ["04"],
"day": ["21"],
"hour": ["18"],
"minute": ["27"],
"second": ["19"],
}
date == "2021-04-21T18:27:19.123Z" and format == "string" => {
"year": "2021",
"month": "04",
"day": "21",
"hour": "18",
"minute": "27",
"second": "19",
}
date == "2021-04-21" and format == "list" => {
"year": ["2021"],
"month": ["04"],
"day": ["21"],
"hour": ["00"],
"minute": ["00"],
"second": ["00"],
}
"""
utc_date = MetadataFormatter.convert_to_iso_utc_datetime(date)
date_object = parse_to_utc(utc_date)
if format == "list":
return {
"year": [date_object.strftime("%Y")],
"month": [date_object.strftime("%m")],
"day": [date_object.strftime("%d")],
"hour": [date_object.strftime("%H")],
"minute": [date_object.strftime("%M")],
"second": [date_object.strftime("%S")],
}
else:
return {
"year": date_object.strftime("%Y"),
"month": date_object.strftime("%m"),
"day": date_object.strftime("%d"),
"hour": date_object.strftime("%H"),
"minute": date_object.strftime("%M"),
"second": date_object.strftime("%S"),
}
@staticmethod
def convert_interval_to_datetime_dict(
date: str, separator: str = "/"
) -> dict[str, list[str]]:
"""Convert a date interval ('/' separated str) to a dictionary where values are lists
date == "2021-04-21/2021-04-22" => {
"year": ["2021"],
"month": ["04"],
"day": ["21", "22"],
}
"""
if separator not in date:
raise ValueError(
f"Could not format {date} using convert_interval_to_datetime_dict: {separator} separator missing"
)
start, end = date.split(separator)
start_utc_date = MetadataFormatter.convert_to_iso_utc_datetime(start)
end_utc_date = MetadataFormatter.convert_to_iso_utc_datetime(end)
start_date_object = parse_to_utc(start_utc_date)
if end_utc_date == "None":
end_utc_date = start_utc_date
end_date_object = parse_to_utc(end_utc_date)
delta_utc_date = end_date_object - start_date_object
years = set()
months = set()
days = set()
for i in range(delta_utc_date.days + 1):
date_object = start_date_object + dt.timedelta(days=i)
years.add(date_object.strftime("%Y"))
months.add(date_object.strftime("%m"))
days.add(date_object.strftime("%d"))
return {
"year": list(years),
"month": list(months),
"day": list(days),
}
@staticmethod
def convert_get_ecmwf_time(date: str) -> list[str]:
"""Get the time of a date (str) in the ECMWF format (["HH:00"])
"2021-04-21T18:27:19.123Z" => ["18:00"]
"2021-04-21" => ["00:00"]
"""
return [
str(MetadataFormatter.convert_to_datetime_dict(date, "str")["hour"])
+ ":00"
]
@staticmethod
def convert_sanitize(text: str) -> str:
"""Sanitize string"""
return sanitize(text)
@staticmethod
def convert_get_dates_from_string(text: str, split_param="-"):
reg = "[0-9]{8}" + split_param + "[0-9]{8}"
match = re.search(reg, text)
if not match:
return NOT_AVAILABLE
dates_str = match.group()
dates = dates_str.split(split_param)
start_date = dt.datetime.strptime(dates[0], "%Y%m%d")
end_date = dt.datetime.strptime(dates[1], "%Y%m%d")
return {
"startDate": to_iso_utc_string(start_date),
"endDate": to_iso_utc_string(end_date),
}
@staticmethod
def convert_get_hydrological_year(date: str):
utc_date = MetadataFormatter.convert_to_iso_utc_datetime(date)
date_object = parse_to_utc(utc_date)
date_object_second_year = date_object + relativedelta(years=1)
return [
f"{date_object.strftime('%Y')}_{date_object_second_year.strftime('%y')}"
]
@staticmethod
def convert_get_variables_from_path(path: str):
if "?" not in path:
return []
variables = path.split("?")[1]
return variables.split(",")
@staticmethod
def convert_assets_list_to_dict(
assets_list: list[dict[str, str]], asset_name_key: str = "title"
) -> dict[str, dict[str, str]]:
"""Convert a list of assets to a dictionary where keys represent
name of assets and are found among values of asset dictionaries.
assets_list == [
{"href": "foo", "title": "asset1", "name": "foo-name"},
{"href": "bar", "title": "path/to/asset1", "name": "bar-name"},
{"href": "baz", "title": "path/to/asset2", "name": "baz-name"},
{"href": "qux", "title": "asset3", "name": "qux-name"},
] and asset_name_key == "title" => {
"asset1": {"href": "foo", "title": "asset1", "name": "foo-name"},
"path/to/asset1": {"href": "bar", "title": "path/to/asset1", "name": "bar-name"},
"asset2": {"href": "baz", "title": "path/to/asset2", "name": "baz-name"},
"asset3": {"href": "qux", "title": "asset3", "name": "qux-name"},
}
assets_list == [
{"href": "foo", "title": "foo-title", "name": "asset1"},
{"href": "bar", "title": "bar-title", "name": "path/to/asset1"},
{"href": "baz", "title": "baz-title", "name": "path/to/asset2"},
{"href": "qux", "title": "qux-title", "name": "asset3"},
] and asset_name_key == "name" => {
"asset1": {"href": "foo", "title": "foo-title", "name": "asset1"},
"path/to/asset1": {"href": "bar", "title": "bar-title", "name": "path/to/asset1"},
"asset2": {"href": "baz", "title": "baz-title", "name": "path/to/asset2"},
"asset3": {"href": "qux", "title": "qux-title", "name": "asset3"},
}
"""
asset_names: list[str] = []
assets_dict: dict[str, dict[str, str]] = {}
for asset in assets_list:
asset_name = asset[asset_name_key]
asset_names.append(asset_name)
assets_dict[asset_name] = asset
# we only keep the equivalent of the path basename in the case where the
# asset name has a path pattern and this basename is only found once
immutable_asset_indexes: list[int] = []
for i, asset_name in enumerate(asset_names):
if i in immutable_asset_indexes:
continue
change_asset_name = True
asset_basename = asset_name.split("/")[-1]
j = i + 1
while change_asset_name and j < len(asset_names):
asset_tmp_basename = asset_names[j].split("/")[-1]
if asset_basename == asset_tmp_basename:
change_asset_name = False
immutable_asset_indexes.extend([i, j])
j += 1
if change_asset_name:
assets_dict[asset_basename] = assets_dict.pop(asset_name)
return assets_dict
@staticmethod
def convert_wekeo_to_cop_collection(val: str, prefix: str) -> str:
"""Converts the name of a collection from the WEkEO format to the Copernicus format."""
return val.removeprefix(prefix).lower().replace("_", "-")
# if stac extension colon separator `:` is in search params, parse it to prevent issues with vformat
if re.search(r"{[\w-]*:[\w#-]*\(?.*}", search_param):
search_param = re.sub(
r"{([\w-]*):([\w#-]*\(?.*)}",
r"{\1_COLON_\2}",
search_param,
)
kwargs = {k.replace(":", "_COLON_"): v for k, v in kwargs.items()}
# convert colons `:` in the parameters passed to the converter (e.g. 'foo#boo(fun:with:colons)')
if re.search(r"{[\w-]*#[\w-]*\([^)]*:.*}", search_param):
search_param = re.sub(
r"({[\w-]*#[\w-]*)\(([^)]*)(.*})",
lambda m: m.group(1)
+ "("
+ m.group(2).replace(":", "_COLON_")
+ m.group(3),
search_param,
)
return MetadataFormatter().vformat(search_param, args, kwargs)
def properties_from_json(
json: dict[str, Any],
mapping: dict[str, Any],
discovery_config: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""Extract properties from a provider json result.
:param json: The representation of a provider result as a json object
:param mapping: A mapping between :class:`~eodag.api.product._product.EOProduct`'s metadata
keys and the location of the values of these properties in the json
representation, expressed as a
`jsonpath <http://goessner.net/articles/JsonPath/>`_
:param discovery_config: (optional) metadata discovery configuration dict, accepting among other items
`discovery_pattern` (Regex pattern for metadata key discovery, e.g. "^[a-zA-Z]+$"),
`discovery_path` (String representation of jsonpath)
:returns: The metadata of the :class:`~eodag.api.product._product.EOProduct`
"""
extracted_value: Any
properties: dict[str, Any] = {}
templates = {}
used_jsonpaths = []
for metadata, value in mapping.items():
# Treat the case when the value is from a queryable metadata
if isinstance(value, list):
conversion_or_none, path_or_text = value[1]
else:
conversion_or_none, path_or_text = value
if isinstance(path_or_text, str):
if re.search(r"{[^{}]+}", path_or_text):
templates[metadata] = path_or_text
else:
properties[metadata] = path_or_text
else:
try:
match = path_or_text.find(json)
except KeyError:
match = []
if len(match) == 0:
extracted_value = NOT_AVAILABLE
elif len(match) == 1:
extracted_value = match[0].value
used_jsonpaths.append(match[0].full_path)
else:
extracted_value = [m.value for m in match]
if extracted_value is None:
properties[metadata] = None
else:
if conversion_or_none is None:
properties[metadata] = extracted_value
else:
# reformat conversion_or_none as metadata#converter(args) or metadata#converter
if (
len(conversion_or_none) > 1
and isinstance(conversion_or_none, list)
and conversion_or_none[1] is not None
):
conversion_or_none = "%s(%s)" % (
conversion_or_none[0],
conversion_or_none[1],
)
elif isinstance(conversion_or_none, list):
conversion_or_none = conversion_or_none[0]
# check if conversion uses variables to format
if re.search(r"({[^{}:]+})+", conversion_or_none):
conversion_or_none = conversion_or_none.format(**properties)
if extracted_value == NOT_AVAILABLE:
# try if value can be formatted even if it is not available
try:
properties[metadata] = format_metadata(
"{%s%s%s}" % (metadata, SEP, conversion_or_none),
**{metadata: extracted_value},
)
except ValueError:
logger.debug(
f"{metadata}: {extracted_value} could not be formatted with {conversion_or_none}"
)
continue
else:
# in this case formatting should work, otherwise something is wrong in the mapping
properties[metadata] = format_metadata(
"{%s%s%s}" % (metadata, SEP, conversion_or_none),
**{metadata: extracted_value},
)
# properties as python objects when possible (format_metadata returns only strings)
try:
properties[metadata] = ast.literal_eval(properties[metadata])
except Exception:
pass
# Resolve templates
for metadata, template in templates.items():
try:
properties[metadata] = format_string(metadata, template, **properties)
except ValueError:
logger.warning(
f"Could not parse {metadata} ({template}) using product properties"
)
logger.debug(f"available properties: {properties}")
properties[metadata] = NOT_AVAILABLE
# adds missing discovered properties
if not discovery_config:
discovery_config = {}
discovery_pattern = discovery_config.get("metadata_pattern")
discovery_path = discovery_config.get("metadata_path")
if discovery_pattern and discovery_path:
discovery_jsonpath = string_to_jsonpath(discovery_path)
discovered_properties = (
discovery_jsonpath.find(json)
if isinstance(discovery_jsonpath, JSONPath)
else []
)
mtd_prefix = discovery_config.get("metadata_prefix", "provider")
for found_jsonpath in discovered_properties:
if "metadata_path_id" in discovery_config.keys():
found_key_paths = string_to_jsonpath(
discovery_config["metadata_path_id"], force=True
).find(found_jsonpath.value)
if not found_key_paths or isinstance(found_key_paths, int):
continue
found_key = found_key_paths[0].value
used_jsonpath = Child(
found_jsonpath.full_path,
string_to_jsonpath(
discovery_config["metadata_path_value"], force=True
),
)
else:
# default key got from metadata_path
found_key = found_jsonpath.path.fields[-1]
used_jsonpath = found_jsonpath.full_path
if (
re.compile(discovery_pattern).match(found_key)
and found_key not in properties.keys()
and f"{mtd_prefix}:{found_key}" not in properties.keys()
and used_jsonpath not in used_jsonpaths
):
# prepend with default STAC prefix if none is already used
if ":" not in found_key:
found_key = f"{mtd_prefix}:{found_key}"
if "metadata_path_value" in discovery_config.keys():
found_value_path = string_to_jsonpath(
discovery_config["metadata_path_value"], force=True
).find(found_jsonpath.value)
properties[found_key] = (
found_value_path[0].value
if found_value_path and not isinstance(found_value_path, int)
else NOT_AVAILABLE
)
else:
# default value got from metadata_path
properties[found_key] = found_jsonpath.value
# properties as python objects when possible (format_metadata returns only strings)
try:
properties[found_key] = ast.literal_eval(properties[found_key])
except Exception:
pass
return properties
def properties_from_xml(
xml_as_text: AnyStr,
mapping: Any,
empty_ns_prefix: str = "ns",
discovery_config: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""Extract properties from a provider xml result.
:param xml_as_text: The representation of a provider result as xml
:param mapping: A mapping between :class:`~eodag.api.product._product.EOProduct`'s metadata
keys and the location of the values of these properties in the xml
representation, expressed as a
`xpath <https://www.w3schools.com/xml/xml_xpath.asp>`_
:param empty_ns_prefix: (optional) The name to give to the default namespace of `xml_as_text`.
This is a technical workaround for the limitation of lxml
not supporting empty namespace prefix. The
xpath in `mapping` must use this value to be able to
correctly reach empty-namespace prefixed elements
:param discovery_config: (optional) metadata discovery configuration dict, accepting among other items
`discovery_pattern` (Regex pattern for metadata key discovery, e.g. "^[a-zA-Z]+$"),
`discovery_path` (String representation of xpath)
:returns: the metadata of the :class:`~eodag.api.product._product.EOProduct`
"""
properties: dict[str, Any] = {}
templates = {}
used_xpaths = []
root = etree.XML(xml_as_text)
for metadata, value in mapping.items():
# Treat the case when the value is from a queryable metadata
if isinstance(value, list):
conversion_or_none, path_or_text = value[1]
else:
conversion_or_none, path_or_text = value
try:
extracted_value = root.xpath(
path_or_text,
namespaces={k or empty_ns_prefix: v for k, v in root.nsmap.items()},
)
if len(extracted_value) <= 1:
if len(extracted_value) < 1:
# If there is no matched value (empty list), mark the metadata as not
# available
extracted_value = [NOT_AVAILABLE]
else:
# store element tag in used_xpaths
used_xpaths.append(
getattr(
root.xpath(
path_or_text.replace("/text()", ""),
namespaces={
k or empty_ns_prefix: v
for k, v in root.nsmap.items()
},
)[0],
"tag",
None,
)
)
if conversion_or_none is None:
properties[metadata] = extracted_value[0]
else:
# reformat conversion_or_none as metadata#converter(args) or metadata#converter
if (
len(conversion_or_none) > 1
and isinstance(conversion_or_none, list)
and conversion_or_none[1] is not None
):
conversion_or_none = "%s(%s)" % (
conversion_or_none[0],
conversion_or_none[1],
)
elif isinstance(conversion_or_none, list):
conversion_or_none = conversion_or_none[0]
properties[metadata] = format_metadata(
"{%s%s%s}" % (metadata, SEP, conversion_or_none),
**{metadata: extracted_value[0]},
)
# If there are multiple matches, consider the result as a list, doing a
# formatting if any
else:
if conversion_or_none is None:
properties[metadata] = extracted_value
else:
# reformat conversion_or_none as metadata#converter(args) or metadata#converter
if (
len(conversion_or_none) > 1
and isinstance(conversion_or_none, list)
and conversion_or_none[1] is not None
):
conversion_or_none = "%s(%s)" % (
conversion_or_none[0],
conversion_or_none[1],
)
elif isinstance(conversion_or_none, list):
conversion_or_none = conversion_or_none[0]
# check if conversion uses variables to format
if re.search(r"({[^{}:]+})+", conversion_or_none):
conversion_or_none = conversion_or_none.format(**properties)
properties[metadata] = [
format_metadata(
"{%s%s%s}"
% (
metadata,
SEP,
conversion_or_none,
), # Re-build conversion format identifier
**{metadata: extracted_value_item},
)
for extracted_value_item in extracted_value
]
except XPathEvalError:
# Assume the mapping is to be passed as is, in which case we readily
# register it, or is a template, in which case we register it for later
# formatting resolution using previously successfully resolved properties
# Ignore any transformation specified. If a value is to be passed as is,
# we don't want to transform it further
if re.search(r"({[^{}:]+})+", path_or_text):
templates[metadata] = path_or_text
else:
properties[metadata] = path_or_text
# Resolve templates
for metadata, template in templates.items():
properties[metadata] = template.format(**properties)
# adds missing discovered properties
if not discovery_config:
discovery_config = {}
discovery_pattern = discovery_config.get("metadata_pattern")
discovery_path = discovery_config.get("metadata_path")
if discovery_pattern and discovery_path:
discovered_properties = root.xpath(
discovery_path,
namespaces={k or empty_ns_prefix: v for k, v in root.nsmap.items()},
)
for found_xpath in discovered_properties:
found_key = found_xpath.tag.rpartition("}")[-1]
if (
re.compile(discovery_pattern).match(found_key)
and found_key not in properties.keys()
and found_xpath.tag not in used_xpaths
):
properties[found_key] = found_xpath.text
return properties
def mtd_cfg_as_conversion_and_querypath(
src_dict: dict[str, Any],
dest_dict: dict[str, Any] = {},
result_type: str = "json",
) -> dict[str, Any]:
"""Metadata configuration dictionary to querypath with conversion dictionary
Transform every src_dict value from jsonpath_str to tuple `(conversion, jsonpath_object)`
or from xpath_str to tuple `(conversion, xpath_str)`
:param src_dict: Input dict containing jsonpath str as values
:param dest_dict: (optional) Output dict containing jsonpath objects as values
:returns: dest_dict
"""
# check if the configuration has already been converted
some_configured_value = (
next(iter(dest_dict.values())) if dest_dict else next(iter(src_dict.values()))
)
if (
isinstance(some_configured_value, list)
and isinstance(some_configured_value[1], tuple)
or isinstance(some_configured_value, tuple)
):
return dest_dict or src_dict
if not dest_dict:
dest_dict = deepcopy(src_dict)
for metadata in src_dict:
if metadata not in dest_dict:
dest_dict[metadata] = (None, NOT_MAPPED)
else:
conversion, path = get_metadata_path(dest_dict[metadata])
if result_type == "json":
parsed_path = string_to_jsonpath(path)
if isinstance(parsed_path, str):
# not a jsonpath: assume the mapping is to be passed as is. Ignore any transformation specified.
# If a value is to be passed as is, we don't want to transform it further
conversion = None
else:
parsed_path = path
if isinstance(dest_dict[metadata], list) and len(dest_dict[metadata]) == 2:
dest_dict[metadata][1] = (conversion, parsed_path)
else:
dest_dict[metadata] = (conversion, parsed_path)
# Put the updated mapping at the end
dest_dict[metadata] = dest_dict.pop(metadata)
return dest_dict
def format_query_params(
collection: str,
config: PluginConfig,
query_dict: dict[str, Any],
error_context: str = "",
) -> dict[str, Any]:
"""format the search parameters to query parameters"""
if "raise_errors" in query_dict.keys():
del query_dict["raise_errors"]
# . not allowed in eodag_search_key, replaced with %2E
query_dict = {k.replace(".", "%2E"): v for k, v in query_dict.items()}
collection_metadata_mapping = dict(
config.metadata_mapping,
**config.products.get(collection, {}).get("metadata_mapping", {}),
)
# Raise error if non-queryables parameters are used and raise_mtd_discovery_error configured
if (
raise_mtd_discovery_error := config.products.get(collection, {})
.get("discover_metadata", {})
.get("raise_mtd_discovery_error")
) is None:
raise_mtd_discovery_error = getattr(config, "discover_metadata", {}).get(
"raise_mtd_discovery_error", False
)
query_params: dict[str, Any] = {}
# Get all the search parameters that are recognised as queryables by the
# provider (they appear in the queryables dictionary)
queryables = _get_queryables(
query_dict,
config,
collection_metadata_mapping,
raise_mtd_discovery_error,
error_context,
)
for eodag_search_key, provider_search_param in queryables.items():
user_input = query_dict[eodag_search_key]
if provider_search_param == user_input:
# means the mapping is to be passed as is, in which case we
# readily register it
if (
eodag_search_key in query_params
and isinstance(query_params[eodag_search_key], dict)
and isinstance(user_input, dict)
):
query_params[eodag_search_key].update(user_input)
else:
query_params[eodag_search_key] = user_input
continue
if COMPLEX_QS_REGEX.match(provider_search_param):
parts = provider_search_param.split("=")
if len(parts) == 1:
formatted_query_param = format_metadata(
provider_search_param, collection, **query_dict
)
formatted_query_param = formatted_query_param.replace("'", '"')
if "{{" in provider_search_param:
# retrieve values from hashes where keys are given in the param
if "}[" in formatted_query_param:
formatted_query_param = _resolve_hashes(formatted_query_param)
# remove quotes around arrays
formatted_query_param = remove_str_array_quotes(
formatted_query_param
)
if NOT_AVAILABLE in formatted_query_param:
raise ValidationError(
"Could not parse %s query parameter, got %s"
% (eodag_search_key, formatted_query_param)
)
# json query string (for POST request)
update_nested_dict(
query_params,
orjson.loads(formatted_query_param),
extend_list_values=True,
allow_extend_duplicates=False,
)
else:
query_params[eodag_search_key] = formatted_query_param
else:
provider_search_key, provider_value = parts
query_params[provider_search_key] = format_metadata(
provider_value, collection, **query_dict
)
else:
query_params[provider_search_param] = user_input
# Now get all the literal search params (i.e params to be passed "as is"
# in the search request)
# ignore additional_params if it isn't a dictionary
literal_search_params = getattr(config, "literal_search_params", {})
if not isinstance(literal_search_params, dict):
literal_search_params = {}
# Now add formatted free text search parameters (this is for cases where a
# complex query through a free text search parameter is available for the
# provider and needed for the consumer)
collection_metadata_mapping = dict(
config.metadata_mapping,
**config.products.get(collection, {}).get("metadata_mapping", {}),
)
literal_search_params.update(
_format_free_text_search(config, collection_metadata_mapping, **query_dict)
)
for provider_search_key, provider_value in literal_search_params.items():
if isinstance(provider_value, list):
query_params.setdefault(provider_search_key, []).extend(provider_value)
else:
query_params.setdefault(provider_search_key, []).append(provider_value)
return query_params
def _resolve_hashes(formatted_query_param: str) -> str:
"""
resolves structures of the format {"a": "abc", "b": "cde"}["a"] given in the formatted_query_param
the structure is replaced by the value corresponding to the given key in the hash
(in this case "abc")
"""
# check if there is still a hash to be resolved
while '}["' in formatted_query_param:
# find and parse code between {}
ind_open = formatted_query_param.find('}["')
ind_close = formatted_query_param.find('"]', ind_open)
hash_start = formatted_query_param[:ind_open].rfind(": {") + 2
h = orjson.loads(formatted_query_param[hash_start : ind_open + 1])
# find key and get value
ind_key_start = formatted_query_param.find('"', ind_open) + 1
key = formatted_query_param[ind_key_start:ind_close]
value = h[key]
# replace hash with value
if isinstance(value, str):
formatted_query_param = formatted_query_param.replace(
formatted_query_param[hash_start : ind_close + 2], '"' + value + '"'
)
else:
formatted_query_param = formatted_query_param.replace(
formatted_query_param[hash_start : ind_close + 2], json.dumps(value)
)
return formatted_query_param
def _format_free_text_search(
config: PluginConfig, metadata_mapping: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
"""Build the free text search parameter using the search parameters"""
query_params: dict[str, Any] = {}
if not getattr(config, "free_text_search_operations", None):
return query_params
for param, operations_config in config.free_text_search_operations.items():
union = operations_config["union"]
wrapper = operations_config.get("wrapper", "{}")
formatted_query = []
for operator, operands in operations_config["operations"].items():
# The Operator string is the operator wrapped with spaces
operator = " {} ".format(operator)
# Build the operation string by joining the formatted operands together
# using the operation string
operation_string = operator.join(
format_metadata(operand, **kwargs)
for operand in operands
if any(
re.search(rf"{{{kw}[}}#]", operand)
and val is not None
and isinstance(metadata_mapping.get(kw, []), list)
for kw, val in kwargs.items()
)
)
# Finally wrap the operation string as specified by the wrapper and add
# it to the list of queries (only if the operation string is not empty)
if operation_string:
query = wrapper.format(operation_string)
formatted_query.append(query)
# Join the formatted query using the "union" config parameter, and then
# wrap it with the Python format string specified in the "wrapper" config
# parameter
final_query = union.join(formatted_query)
if len(operations_config["operations"]) > 1 and len(formatted_query) > 1:
final_query = wrapper.format(query_params[param])
if final_query:
query_params[param] = final_query
return query_params
def _get_queryables(
search_params: dict[str, Any],
config: PluginConfig,
metadata_mapping: dict[str, Any],
raise_mtd_discovery_error: bool,
error_context: str,
) -> dict[str, Any]:
"""Retrieve the metadata mappings that are query-able"""
logger.debug("Retrieving queryable metadata from metadata_mapping")
queryables: dict[str, Any] = {}
for eodag_search_key, user_input in search_params.items():
if user_input is not None:
md_mapping = metadata_mapping.get(eodag_search_key, (None, NOT_MAPPED))
# raise an error when a query param not allowed by the provider is found
if not isinstance(md_mapping, list) and raise_mtd_discovery_error:
raise ValidationError(
"Search parameters which are not queryable are disallowed for this collection on this provider: "
f"please remove '{eodag_search_key}' from your search parameters. {error_context}",
{eodag_search_key},
)
_, md_value = md_mapping
# query param from defined metadata_mapping
if md_mapping is not None and isinstance(md_mapping, list):
search_param = get_search_param(md_mapping)
if search_param is not None:
queryables[eodag_search_key] = search_param
# query param from metadata auto discovery
elif md_value == NOT_MAPPED and getattr(
config, "discover_metadata", {}
).get("auto_discovery", False):
pattern = re.compile(
config.discover_metadata.get("metadata_pattern", "")
)
search_param_cfg = config.discover_metadata.get("search_param", "")
search_param_unparsed_cfg = config.discover_metadata.get(
"search_param_unparsed", []
)
if (
search_param_unparsed_cfg
and eodag_search_key in search_param_unparsed_cfg
):
queryables[eodag_search_key] = user_input
elif pattern.match(eodag_search_key) and isinstance(
search_param_cfg, str
):
search_param = search_param_cfg.format(metadata=eodag_search_key)
queryables[eodag_search_key] = search_param
elif pattern.match(eodag_search_key) and isinstance(
search_param_cfg, dict
):
search_param_cfg_parsed = dict_items_recursive_apply(
search_param_cfg,
lambda k, v: v.format(metadata=eodag_search_key),
)
for k, v in search_param_cfg_parsed.items():
if getattr(config, k, None):
update_nested_dict(
getattr(config, k),
v,
extend_list_values=True,
allow_extend_duplicates=False,
)
else:
logger.warning(
"Could not use discover_metadata[search_param]: no entry for %s in plugin config",
k,
)
return queryables
def get_queryable_from_provider(
provider_queryable: str, metadata_mapping: dict[str, Union[str, list[str]]]
) -> Optional[str]:
"""Get EODAG configured queryable parameter from provider queryable parameter
:param provider_queryable: provider queryable parameter
:param metadata_mapping: metadata-mapping configuration
:returns: EODAG configured queryable parameter or None
"""
pattern = rf"\"{provider_queryable}\""
# if 1:1 mapping exists privilege this one instead of other mapping
# e.g. provider queryable = year -> use year and not date in which year also appears
mapping_values = [
v[0] if isinstance(v, list) else "" for v in metadata_mapping.values()
]
StacQueryables = Queryables.from_stac_models()
if provider_queryable in mapping_values:
ind = mapping_values.index(provider_queryable)
return StacQueryables.get_queryable_from_alias(
list(metadata_mapping.keys())[ind]
)
for param, param_conf in metadata_mapping.items():
if (
isinstance(param_conf, list)
and param_conf[0]
and re.search(pattern, param_conf[0])
):
return StacQueryables.get_queryable_from_alias(param)
return None
def get_provider_queryable_path(
queryable: str, metadata_mapping: dict[str, Union[str, list[str]]]
) -> Optional[str]:
"""Get EODAG configured queryable path from its parameter
:param queryable: eodag queryable parameter
:param metadata_mapping: metadata-mapping configuration
:returns: EODAG configured queryable path or None
"""
parameter_conf = metadata_mapping.get(queryable)
if isinstance(parameter_conf, list):
return parameter_conf[0]
else:
return None
def get_provider_queryable_key(
eodag_key: str,
provider_queryables: dict[str, Any],
metadata_mapping: dict[str, Union[list[Any], str]],
) -> str:
"""Finds the provider queryable corresponding to the given eodag key based on the metadata mapping
:param eodag_key: key in eodag
:param provider_queryables: queryables returned from the provider
:param metadata_mapping: metadata mapping from which the keys are retrieved
:returns: provider queryable key
"""
if eodag_key not in metadata_mapping:
return ""
mapping_key = metadata_mapping[eodag_key]
if isinstance(mapping_key, list):
for queryable in provider_queryables:
pattern = rf"\b{queryable}\b"
if re.search(pattern, mapping_key[0]):
return queryable
return ""
else:
return eodag_key
def normalize_bands(data: Union[dict, Asset]) -> Union[dict, Asset]:
"""Migrate ``eo:bands`` / ``raster:bands`` of ``data`` into a STAC 1.1
``bands`` array, in place. Returns ``data`` for convenience.
:param data: properties dict or Asset to migrate
:returns: the same data with migrated bands
"""
UNPREFIX_BAND_FIELDNAME = [
"name",
"description",
"data_type",
"nodata",
"unit",
"statistics",
]
EXCLUDE_MOVE_TO_PARENT_BAND_FIELDNAME = ["name", "eo:common_name"]
# https://github.com/radiantearth/stac-spec/blob/v1.1.0/best-practices.md#bands
# Migrate band STAC 1.0 to 1.1
if isinstance(data, dict) or isinstance(data, Asset):
# Gather eo:band et raster:bands
bands: dict[str, Any] = {"eo:bands": [], "raster:bands": []}
hasData = False
for fieldname in bands:
if fieldname in data:
if isinstance(data[fieldname], list):
bands[fieldname] = data[fieldname]
else:
bands[fieldname] = [data[fieldname]]
hasData = True
del data[fieldname]
if hasData:
processed_bands = []
# migrate eo:bands -> bands
if len(bands["eo:bands"]) > 0:
for item in bands["eo:bands"]:
band = {}
for key in item:
if key in UNPREFIX_BAND_FIELDNAME:
band[key] = item[key]
else:
band["eo:{}".format(key)] = item[key]
processed_bands.append(band)
# migrate raster:bands -> bands
if len(bands["raster:bands"]) > 0:
index = 0
for item in bands["raster:bands"]:
band = (
processed_bands[index] if index < len(processed_bands) else {}
)
for key in item:
if key in UNPREFIX_BAND_FIELDNAME:
band[key] = item[key]
else:
band["raster:{}".format(key)] = item[key]
if index < len(processed_bands):
processed_bands[index] = band
else:
processed_bands.append(band)
index += 1
# When a property has the same value for each band, move it in parent scope
if len(processed_bands) > 0:
field_values: dict[str, Any] = {}
# Lists each distinct value for a field of the same name on each band
for band in processed_bands:
for key in band:
if key not in field_values:
field_values[key] = []
if band[key] not in field_values[key]:
field_values[key].append(band[key])
# Move band fields from asset to parent if all fields shared same value
# (distinct values == 1)
remove_band_fields = []
for key in field_values:
if (
key in EXCLUDE_MOVE_TO_PARENT_BAND_FIELDNAME
or len(field_values[key]) != 1
):
continue
# Do not overwrite a value already set on the parent
# (e.g. an Asset's own `description`); keep the
# per-band value on the `bands` array instead.
if key in data and data[key] != field_values[key][0]:
continue
# All bands have same value
data[key] = field_values[key][0]
# Tag field "to remove" from assets
remove_band_fields.append(key)
del field_values
# Remove from assets field moved to parent
cleaned_bands = []
for band in processed_bands:
cleaned_band = {}
for key in band:
if key not in remove_band_fields:
cleaned_band[key] = band[key]
if len(list(cleaned_band.keys())) > 0:
cleaned_bands.append(cleaned_band)
processed_bands = cleaned_bands
del cleaned_bands
# Remap band field if contains at least one value
if len(processed_bands) > 0:
data["bands"] = processed_bands
return data