Source code for eodag.api.product.metadata_mapping
# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import ast
import json
import logging
import re
from datetime import datetime, timedelta
from string import Formatter
from typing import (
TYPE_CHECKING,
Any,
AnyStr,
Callable,
Dict,
Iterator,
List,
Optional,
Tuple,
Union,
cast,
)
import geojson
import orjson
import pyproj
from dateutil.parser import isoparse
from dateutil.relativedelta import relativedelta
from dateutil.tz import UTC, tzutc
from jsonpath_ng.jsonpath import Child, JSONPath
from lxml import etree
from lxml.etree import XPathEvalError
from shapely import wkt
from shapely.geometry import MultiPolygon, Polygon
from shapely.ops import transform
from eodag.types.queryables import Queryables
from eodag.utils import (
DEFAULT_PROJ,
deepcopy,
dict_items_recursive_apply,
format_string,
get_geometry_from_various,
get_timestamp,
items_recursive_apply,
nested_pairs2dict,
string_to_jsonpath,
update_nested_dict,
)
if TYPE_CHECKING:
from shapely.geometry.base import BaseGeometry
from eodag.config import PluginConfig
logger = logging.getLogger("eodag.product.metadata_mapping")
SEP = r"#"
INGEST_CONVERSION_REGEX = re.compile(
r"^{(?P<path>[^#]*)" + SEP + r"(?P<converter>[^\d\W]\w*)(\((?P<args>.*)\))*}$"
)
NOT_AVAILABLE = "Not Available"
NOT_MAPPED = "Not Mapped"
ONLINE_STATUS = "ONLINE"
STAGING_STATUS = "STAGING"
OFFLINE_STATUS = "OFFLINE"
COORDS_ROUNDING_PRECISION = 4
WKT_MAX_LEN = 1600
COMPLEX_QS_REGEX = re.compile(r"^(.+=)?([^=]*)({.+})+([^=&]*)$")
DEFAULT_GEOMETRY = "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))"
def get_metadata_path(
map_value: Union[str, List[str]],
) -> Tuple[Union[List[str], None], str]:
"""Return the jsonpath or xpath to the value of a EO product metadata in a provider
search result.
The path is retrieved depending on if the metadata is queryable (the value
associated to it in the provider search config metadata mapping is a list) or not
(the value is directly the string corresponding to the path).
Assume we have the following provider config::
provider:
...
search:
...
metadata_mapping:
productType:
- productType
- $.properties.productType
id: $.properties.id
...
...
...
Then the metadata `id` is not queryable for this provider meanwhile `productType`
is queryable. The first value of the `metadata_mapping.productType` is how the
eodag search parameter `productType` is interpreted in the
:class:`~eodag.plugins.search.base.Search` plugin implemented by `provider`, and is
used when eodag delegates search process to the corresponding plugin.
:param map_value: The value originating from the definition of `metadata_mapping`
in the provider search config. For example, it is the list
`['productType', '$.properties.productType']` with the sample
above. Or the string `$.properties.id`.
:returns: Either, None and the path to the metadata value, or a list of converter
and its args, and the path to the metadata value.
"""
path = get_metadata_path_value(map_value)
try:
match = INGEST_CONVERSION_REGEX.match(path)
except TypeError as e:
logger.error("Could not match regex on metadata path '%s'" % str(path))
raise e
if match:
g = match.groupdict()
return [g["converter"], g["args"]], g["path"]
return None, path
def get_metadata_path_value(map_value: Union[str, List[str]]) -> str:
"""Get raw metadata path without converter"""
return map_value[1] if isinstance(map_value, list) else map_value
def get_search_param(map_value: List[str]) -> str:
"""See :func:`~eodag.api.product.metadata_mapping.get_metadata_path`
:param map_value: The value originating from the definition of `metadata_mapping`
in the provider search config
:returns: The value of the search parameter as defined in the provider config
"""
# Assume that caller will pass in the value as a list
return map_value[0]
[docs]
def format_metadata(search_param: str, *args: Any, **kwargs: Any) -> str:
"""Format a string of form ``{<field_name>#<conversion_function>}``
The currently understood converters are:
- ``datetime_to_timestamp_milliseconds``: converts a utc date string to a timestamp in
milliseconds
- ``to_rounded_wkt``: simplify the WKT of a geometry
- ``to_bounds_lists``: convert to list(s) of bounds
- ``to_nwse_bounds``: convert to North,West,South,East bounds
- ``to_nwse_bounds_str``: convert to North,West,South,East bounds string with given separator
- ``to_geojson``: convert to a GeoJSON (via __geo_interface__ if exists)
- ``from_ewkt``: convert EWKT to shapely geometry / WKT in DEFAULT_PROJ
- ``to_ewkt``: convert to EWKT (Extended Well-Known text)
- ``from_georss``: convert GeoRSS to shapely geometry / WKT in DEFAULT_PROJ
- ``csv_list``: convert to a comma separated list
- ``to_iso_utc_datetime_from_milliseconds``: convert a utc timestamp in given
milliseconds to a utc iso datetime
- ``to_iso_utc_datetime``: convert a UTC datetime string to ISO UTC datetime
string
- ``to_iso_date``: remove the time part of a iso datetime string
- ``remove_extension``: on a string that contains dots, only take the first
part of the list obtained by splitting the string on dots
- ``get_group_name``: get the matching regex group name
- ``replace_str``: execute "string".replace(old, new)
- ``recursive_sub_str``: recursively substitue in the structure (e.g. dict)
values matching a regex
- ``slice_str``: slice a string (equivalent to s[start, end, step])
- ``fake_l2a_title_from_l1c``: used to generate SAFE format metadata for data from AWS
- ``s2msil2a_title_to_aws_productinfo``: used to generate SAFE format metadata for data from AWS
- ``split_cop_dem_id``: get the bbox by splitting the product id
- ``split_corine_id``: get the product type by splitting the product id
- ``to_datetime_dict``: convert a datetime string to a dictionary where values are either a string or a list
- ``get_ecmwf_time``: get the time of a datetime string in the ECMWF format
:param search_param: The string to be formatted
:param args: (optional) Additional arguments to use in the formatting process
:param kwargs: (optional) Additional named-arguments to use when formatting
:returns: The formatted string
"""
class MetadataFormatter(Formatter):
CONVERSION_REGEX = re.compile(
r"^(?P<field_name>.+)"
+ SEP
+ r"(?P<converter>[^\d\W]\w*)(\((?P<args>.*)\))*$"
)
def __init__(self) -> None:
self.custom_converter: Optional[Callable] = None
self.custom_args: Optional[str] = None
def get_field(self, field_name: str, args: Any, kwargs: Any) -> Any:
conversion_func_spec = self.CONVERSION_REGEX.match(field_name)
# Register a custom converter if any for later use (see convert_field)
# This is done because we don't have the value associated to field_name at
# this stage
if conversion_func_spec:
field_name = conversion_func_spec.groupdict()["field_name"]
converter = conversion_func_spec.groupdict()["converter"]
self.custom_args = conversion_func_spec.groupdict()["args"]
self.custom_converter = getattr(self, "convert_{}".format(converter))
return super(MetadataFormatter, self).get_field(field_name, args, kwargs)
def convert_field(self, value: Any, conversion: Any) -> Any:
# Do custom conversion if any (see get_field)
if self.custom_converter is not None:
if self.custom_args is not None and value is not None:
converted = self.custom_converter(value, self.custom_args)
elif value is not None:
converted = self.custom_converter(value)
else:
converted = ""
# Clear this state variable in case the same converter is used to
# resolve other named arguments
self.custom_converter = None
self.custom_args = None
return converted
return super(MetadataFormatter, self).convert_field(value, conversion)
@staticmethod
def convert_datetime_to_timestamp_milliseconds(date_time: str) -> int:
"""Convert a date_time (str) to a Unix timestamp in milliseconds
"2021-04-21T18:27:19.123Z" => "1619029639123"
"2021-04-21" => "1618963200000"
"2021-04-21T00:00:00+02:00" => "1618956000000"
"""
return int(1e3 * get_timestamp(date_time))
@staticmethod
def convert_to_iso_utc_datetime_from_milliseconds(
timestamp: int,
) -> Union[str, int]:
"""Convert a timestamp in milliseconds (int) to its ISO8601 UTC format
1619029639123 => "2021-04-21T18:27:19.123Z"
"""
try:
return (
datetime.fromtimestamp(timestamp / 1e3, tzutc())
.isoformat(timespec="milliseconds")
.replace("+00:00", "Z")
)
except TypeError:
return timestamp
@staticmethod
def convert_to_iso_utc_datetime(
date_time: str, timespec: str = "milliseconds"
) -> str:
"""Convert a date_time (str) to its ISO 8601 representation in UTC
"2021-04-21" => "2021-04-21T00:00:00.000Z"
"2021-04-21T00:00:00.000+02:00" => "2021-04-20T22:00:00.000Z"
The optional argument timespec specifies the number of additional
terms of the time to include. Valid options are 'auto', 'hours',
'minutes', 'seconds', 'milliseconds' and 'microseconds'.
"""
try:
dt = isoparse(date_time)
except ValueError:
return date_time
if not dt.tzinfo:
dt = dt.replace(tzinfo=UTC)
elif dt.tzinfo is not UTC:
dt = dt.astimezone(UTC)
return dt.isoformat(timespec=timespec).replace("+00:00", "Z")
@staticmethod
def convert_to_iso_date(
datetime_string: str, time_delta_args_str: str = "0,0,0,0,0,0,0"
) -> str:
"""Convert an ISO8601 datetime (str) to its ISO8601 date format
"2021-04-21T18:27:19.123Z" => "2021-04-21"
"2021-04-21" => "2021-04-21"
"2021-04-21T00:00:00+06:00" => "2021-04-20" !
"""
dt = isoparse(datetime_string)
if not dt.tzinfo:
dt = dt.replace(tzinfo=UTC)
elif dt.tzinfo is not UTC:
dt = dt.astimezone(UTC)
time_delta_args = ast.literal_eval(time_delta_args_str)
dt += timedelta(*time_delta_args)
return dt.isoformat()[:10]
@staticmethod
def convert_to_non_separated_date(datetime_string):
iso_date = MetadataFormatter.convert_to_iso_date(datetime_string)
return iso_date.replace("-", "")
@staticmethod
def convert_to_rounded_wkt(value: BaseGeometry) -> str:
wkt_value = cast(
str, wkt.dumps(value, rounding_precision=COORDS_ROUNDING_PRECISION)
)
# If needed, simplify WKT to prevent too long request failure
tolerance = 0.1
while len(wkt_value) > WKT_MAX_LEN and tolerance <= 1:
logger.debug(
"Geometry WKT is too long (%s), trying to simplify it with tolerance %s",
len(wkt_value),
tolerance,
)
wkt_value = cast(
str,
wkt.dumps(
value.simplify(tolerance),
rounding_precision=COORDS_ROUNDING_PRECISION,
),
)
tolerance += 0.1
if len(wkt_value) > WKT_MAX_LEN and tolerance > 1:
logger.warning("Failed to reduce WKT length lower than %s", WKT_MAX_LEN)
return wkt_value
@staticmethod
def convert_to_bounds_lists(input_geom: BaseGeometry) -> List[List[float]]:
if isinstance(input_geom, MultiPolygon):
geoms = [geom for geom in input_geom.geoms]
# sort with larger one at first (stac-browser only plots first one)
geoms.sort(key=lambda x: x.area, reverse=True)
return [list(x.bounds[0:4]) for x in geoms]
else:
return [list(input_geom.bounds[0:4])]
@staticmethod
def convert_to_bounds(input_geom_unformatted: Any) -> List[float]:
input_geom = get_geometry_from_various(geometry=input_geom_unformatted)
if isinstance(input_geom, MultiPolygon):
geoms = [geom for geom in input_geom.geoms]
# sort with larger one at first (stac-browser only plots first one)
geoms.sort(key=lambda x: x.area, reverse=True)
min_lon = 180
min_lat = 90
max_lon = -180
max_lat = -90
for geom in geoms:
min_lon = min(min_lon, geom.bound[0])
min_lat = min(min_lat, geom.bound[1])
max_lon = max(max_lon, geom.bound[2])
max_lat = max(max_lat, geom.bound[3])
return [min_lon, min_lat, max_lon, max_lat]
else:
return list(input_geom.bounds[0:4])
@staticmethod
def convert_to_nwse_bounds(input_geom: BaseGeometry) -> List[float]:
return list(input_geom.bounds[-1:] + input_geom.bounds[:-1])
@staticmethod
def convert_to_nwse_bounds_str(
input_geom: BaseGeometry, separator: str = ","
) -> str:
return separator.join(
str(x) for x in MetadataFormatter.convert_to_nwse_bounds(input_geom)
)
@staticmethod
def convert_to_geojson(string: str) -> str:
return geojson.dumps(string)
@staticmethod
def convert_from_ewkt(ewkt_string: str) -> Union[BaseGeometry, str]:
"""Convert EWKT (Extended Well-Known text) to shapely geometry"""
ewkt_regex = re.compile(
r"^.*(?P<proj>SRID=[0-9]+);(?P<wkt>[A-Z0-9 \(\),\.-]+).*$"
)
ewkt_match = ewkt_regex.match(ewkt_string)
if ewkt_match:
g = ewkt_match.groupdict()
from_proj = g["proj"].replace("SRID", "EPSG").replace("=", ":")
input_geom = wkt.loads(g["wkt"])
from_proj = pyproj.CRS(from_proj)
to_proj = pyproj.CRS(DEFAULT_PROJ)
if from_proj != to_proj:
# reproject
project = pyproj.Transformer.from_crs(
from_proj, to_proj, always_xy=True
).transform
return transform(project, input_geom)
else:
return input_geom
else:
logger.warning(f"Could not read {ewkt_string} as EWKT")
return ewkt_string
@staticmethod
def convert_to_ewkt(input_geom: BaseGeometry) -> str:
"""Convert shapely geometry to EWKT (Extended Well-Known text)"""
proj = DEFAULT_PROJ.upper().replace("EPSG", "SRID").replace(":", "=")
wkt_geom = MetadataFormatter.convert_to_rounded_wkt(input_geom)
return f"{proj};{wkt_geom}"
@staticmethod
def convert_from_georss(georss: Any) -> Union[BaseGeometry, Any]:
"""Convert GeoRSS to shapely geometry"""
if "polygon" in georss.tag:
# Polygon
coords_list = georss.text.split()
polygon_args = [
(float(coords_list[2 * i]), float(coords_list[2 * i + 1]))
for i in range(int(len(coords_list) / 2))
]
return Polygon(polygon_args)
elif len(georss) == 1 and "multisurface" in georss[0].tag.lower():
# Multipolygon
from_proj = getattr(georss[0], "attrib", {}).get("srsName", None)
if from_proj:
from_proj = pyproj.CRS(from_proj)
to_proj = pyproj.CRS(DEFAULT_PROJ)
project = pyproj.Transformer.from_crs(
from_proj, to_proj, always_xy=True
).transform
# function to get deepest elements
def flatten_elements(nested) -> Iterator[Any]:
for e in nested:
if len(e) > 0:
yield from flatten_elements(e)
else:
yield e
polygons_list: List[Polygon] = []
for elem in flatten_elements(georss[0]):
coords_list = elem.text.split()
polygon_args = [
(float(coords_list[2 * i]), float(coords_list[2 * i + 1]))
for i in range(int(len(coords_list) / 2))
]
polygon = Polygon(polygon_args)
# reproject if needed
if from_proj and from_proj != to_proj:
polygons_list.append(transform(project, polygon))
else:
polygons_list.append(polygon)
return MultiPolygon(polygons_list)
else:
logger.warning(
f"Incoming GeoRSS format not supported yet: {str(georss)}"
)
return georss
@staticmethod
def convert_to_longitude_latitude(
input_geom_unformatted: Any,
) -> Dict[str, float]:
bounds = MetadataFormatter.convert_to_bounds(input_geom_unformatted)
lon = (bounds[0] + bounds[2]) / 2
lat = (bounds[1] + bounds[3]) / 2
return {"lon": lon, "lat": lat}
@staticmethod
def convert_csv_list(values_list: Any) -> Any:
if isinstance(values_list, list):
return ",".join([str(x) for x in values_list])
else:
return values_list
@staticmethod
def convert_remove_extension(string: str) -> str:
parts = string.split(".")
if parts:
return parts[0]
return ""
@staticmethod
def convert_get_group_name(string: str, pattern: str) -> str:
try:
match = re.search(pattern, str(string))
if match:
return match.lastgroup or NOT_AVAILABLE
except AttributeError:
pass
logger.warning(
"Could not extract property from %s using %s", string, pattern
)
return NOT_AVAILABLE
@staticmethod
def convert_replace_str(string: str, args: str) -> str:
old, new = ast.literal_eval(args)
return re.sub(old, new, string)
@staticmethod
def convert_recursive_sub_str(
input_obj: Union[Dict[Any, Any], List[Any]], args: str
) -> Union[Dict[Any, Any], List[Any]]:
old, new = ast.literal_eval(args)
return items_recursive_apply(
input_obj,
lambda k, v, x, y: re.sub(x, y, v) if isinstance(v, str) else v,
**{"x": old, "y": new},
)
@staticmethod
def convert_dict_update(
input_dict: Dict[Any, Any], args: str
) -> Dict[Any, Any]:
"""Converts"""
new_items_list = ast.literal_eval(args)
new_items_dict = nested_pairs2dict(new_items_list)
return dict(input_dict, **new_items_dict)
@staticmethod
def convert_dict_filter(
input_dict: Dict[Any, Any], jsonpath_filter_str: str
) -> Dict[Any, Any]:
"""Fitlers dict items using jsonpath"""
jsonpath_filter = string_to_jsonpath(jsonpath_filter_str, force=True)
if isinstance(jsonpath_filter, str) or not isinstance(input_dict, dict):
return {}
keys_list = list(input_dict.keys())
matches = jsonpath_filter.find(input_dict)
result = {}
for match in matches:
# extract key index from matched jsonpath
matched_jsonpath_str = str(match.full_path)
matched_index = int(matched_jsonpath_str.split(".")[-1][1:-1])
key = keys_list[matched_index]
result[key] = match.value
return result
@staticmethod
def convert_slice_str(string: str, args: str) -> str:
cmin, cmax, cstep = [
int(x.strip()) if x.strip().lstrip("-").isdigit() else None
for x in args.split(",")
]
return string[cmin:cmax:cstep]
@staticmethod
def convert_fake_l2a_title_from_l1c(string: str) -> str:
id_regex = re.compile(
r"^(?P<id1>\w+)_(?P<id2>\w+)_(?P<id3>\w+)_(?P<id4>\w+)_(?P<id5>\w+)_(?P<id6>\w+)_(?P<id7>\w+)$"
)
id_match = id_regex.match(string)
if id_match:
id_dict = id_match.groupdict()
return "%s_MSIL2A_%s____________%s________________" % (
id_dict["id1"],
id_dict["id3"],
id_dict["id6"],
)
else:
logger.error("Could not extract fake title from %s" % string)
return NOT_AVAILABLE
@staticmethod
def convert_s2msil2a_title_to_aws_productinfo(string: str) -> str:
id_regex = re.compile(
r"^(?P<id1>\w+)_(?P<id2>\w+)_(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})T[0-9]+_"
+ r"(?P<id4>[A-Z0-9_]+)_(?P<id5>[A-Z0-9_]+)_T(?P<tile1>[0-9]{2})(?P<tile2>[A-Z])(?P<tile3>[A-Z]{2})_"
+ r"(?P<id7>[A-Z0-9_]+)$"
)
id_match = id_regex.match(string)
if id_match:
id_dict = id_match.groupdict()
return (
"https://roda.sentinel-hub.com/sentinel-s2-l2a/tiles/%s/%s/%s/%s/%s/%s/0/{collection}.json"
% (
id_dict["tile1"],
id_dict["tile2"],
id_dict["tile3"],
id_dict["year"],
int(id_dict["month"]),
int(id_dict["day"]),
)
)
else:
logger.error("Could not extract title infos from %s" % string)
return NOT_AVAILABLE
@staticmethod
def convert_split_id_into_s1_params(product_id: str) -> Dict[str, str]:
parts: List[str] = re.split(r"_(?!_)", product_id)
if len(parts) < 9:
logger.error(
"id %s does not match expected Sentinel-1 id format", product_id
)
raise ValueError
params = {"sensorMode": parts[1]}
level = "LEVEL" + parts[3][0]
params["processingLevel"] = level
start_date = datetime.strptime(parts[4], "%Y%m%dT%H%M%S") - timedelta(
seconds=1
)
params["startDate"] = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
end_date = datetime.strptime(parts[5], "%Y%m%dT%H%M%S") + timedelta(
seconds=1
)
params["endDate"] = end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
product_type = parts[2][:3]
if product_type == "GRD" and parts[-1] == "COG":
product_type = "GRD-COG"
elif product_type == "GRD" and parts[-2] == "CARD" and parts[-1] == "BS":
product_type = "CARD-BS"
params["productType"] = product_type
polarisation_mapping = {
"SV": "VV",
"SH": "HH",
"DH": "HH+HV",
"DV": "VV+VH",
}
polarisation = polarisation_mapping[parts[3][2:]]
params["polarisation"] = polarisation
return params
@staticmethod
def convert_split_id_into_s3_params(product_id: str) -> Dict[str, str]:
parts: List[str] = re.split(r"_(?!_)", product_id)
params = {"productType": product_id[4:15]}
dates = re.findall("[0-9]{8}T[0-9]{6}", product_id)
start_date = datetime.strptime(dates[0], "%Y%m%dT%H%M%S") - timedelta(
seconds=1
)
params["startDate"] = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
end_date = datetime.strptime(dates[1], "%Y%m%dT%H%M%S") + timedelta(
seconds=1
)
params["endDate"] = end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
params["timeliness"] = parts[-2]
params["sat"] = "Sentinel-" + parts[0][1:]
return params
@staticmethod
def convert_split_id_into_s5p_params(product_id: str) -> Dict[str, str]:
parts: List[str] = re.split(r"_(?!_)", product_id)
params = {
"productType": product_id[9:19],
"processingMode": parts[1],
"processingLevel": parts[2].replace("_", ""),
}
start_date = datetime.strptime(parts[-6], "%Y%m%dT%H%M%S") - timedelta(
seconds=10
)
params["startDate"] = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
end_date = datetime.strptime(parts[-5], "%Y%m%dT%H%M%S") + timedelta(
seconds=10
)
params["endDate"] = end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
return params
@staticmethod
def convert_split_cop_dem_id(product_id: str) -> List[int]:
parts = product_id.split("_")
lattitude = parts[3]
longitude = parts[5]
if lattitude[0] == "N":
lat_num = int(lattitude[1:])
else:
lat_num = -1 * int(lattitude[1:])
if longitude[0] == "E":
long_num = int(longitude[1:])
else:
long_num = -1 * int(longitude[1:])
bbox = [long_num - 1, lat_num - 1, long_num + 1, lat_num + 1]
return bbox
@staticmethod
def convert_dates_from_cmems_id(product_id: str):
date_format_1 = "[0-9]{10}"
date_format_2 = "[0-9]{8}"
dates = re.findall(date_format_1, product_id)
if dates:
date = dates[0]
else:
dates = re.findall(date_format_2, product_id)
date = dates[0]
if len(date) == 10:
date_time = datetime.strptime(dates[0], "%Y%m%d%H")
else:
date_time = datetime.strptime(dates[0], "%Y%m%d")
return {
"min_date": date_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"max_date": (date_time + timedelta(days=1)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
}
@staticmethod
def convert_to_datetime_dict(
date: str, format: str
) -> Dict[str, Union[List[str], str]]:
"""Convert a date (str) to a dictionary where values are in the format given in argument
date == "2021-04-21T18:27:19.123Z" and format == "list" => {
"year": ["2021"],
"month": ["04"],
"day": ["21"],
"hour": ["18"],
"minute": ["27"],
"second": ["19"],
}
date == "2021-04-21T18:27:19.123Z" and format == "string" => {
"year": "2021",
"month": "04",
"day": "21",
"hour": "18",
"minute": "27",
"second": "19",
}
date == "2021-04-21" and format == "list" => {
"year": ["2021"],
"month": ["04"],
"day": ["21"],
"hour": ["00"],
"minute": ["00"],
"second": ["00"],
}
"""
utc_date = MetadataFormatter.convert_to_iso_utc_datetime(date)
date_object = datetime.strptime(utc_date, "%Y-%m-%dT%H:%M:%S.%fZ")
if format == "list":
return {
"year": [date_object.strftime("%Y")],
"month": [date_object.strftime("%m")],
"day": [date_object.strftime("%d")],
"hour": [date_object.strftime("%H")],
"minute": [date_object.strftime("%M")],
"second": [date_object.strftime("%S")],
}
else:
return {
"year": date_object.strftime("%Y"),
"month": date_object.strftime("%m"),
"day": date_object.strftime("%d"),
"hour": date_object.strftime("%H"),
"minute": date_object.strftime("%M"),
"second": date_object.strftime("%S"),
}
@staticmethod
def convert_interval_to_datetime_dict(
date: str, separator: str = "/"
) -> Dict[str, List[str]]:
"""Convert a date interval ('/' separated str) to a dictionary where values are lists
date == "2021-04-21/2021-04-22" => {
"year": ["2021"],
"month": ["04"],
"day": ["21", "22"],
}
"""
if separator not in date:
raise ValueError(
f"Could not format {date} using convert_interval_to_datetime_dict: {separator} separator missing"
)
start, end = date.split(separator)
start_utc_date = MetadataFormatter.convert_to_iso_utc_datetime(start)
end_utc_date = MetadataFormatter.convert_to_iso_utc_datetime(end)
start_date_object = datetime.strptime(
start_utc_date, "%Y-%m-%dT%H:%M:%S.%fZ"
)
end_date_object = datetime.strptime(end_utc_date, "%Y-%m-%dT%H:%M:%S.%fZ")
delta_utc_date = end_date_object - start_date_object
years = set()
months = set()
days = set()
for i in range(delta_utc_date.days + 1):
date_object = start_date_object + timedelta(days=i)
years.add(date_object.strftime("%Y"))
months.add(date_object.strftime("%m"))
days.add(date_object.strftime("%d"))
return {
"year": list(years),
"month": list(months),
"day": list(days),
}
@staticmethod
def convert_get_ecmwf_time(date: str) -> List[str]:
"""Get the time of a date (str) in the ECMWF format (["HH:00"])
"2021-04-21T18:27:19.123Z" => ["18:00"]
"2021-04-21" => ["00:00"]
"""
return [
str(MetadataFormatter.convert_to_datetime_dict(date, "str")["hour"])
+ ":00"
]
@staticmethod
def convert_get_dates_from_string(text: str, split_param="-"):
reg = "[0-9]{8}" + split_param + "[0-9]{8}"
match = re.search(reg, text)
if not match:
return NOT_AVAILABLE
dates_str = match.group()
dates = dates_str.split(split_param)
start_date = datetime.strptime(dates[0], "%Y%m%d")
end_date = datetime.strptime(dates[1], "%Y%m%d")
return {
"startDate": start_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
"endDate": end_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
}
@staticmethod
def convert_get_hydrological_year(date: str):
utc_date = MetadataFormatter.convert_to_iso_utc_datetime(date)
date_object = datetime.strptime(utc_date, "%Y-%m-%dT%H:%M:%S.%fZ")
date_object_second_year = date_object + relativedelta(years=1)
return [
f'{date_object.strftime("%Y")}_{date_object_second_year.strftime("%y")}'
]
@staticmethod
def convert_get_variables_from_path(path: str):
if "?" not in path:
return []
variables = path.split("?")[1]
return variables.split(",")
@staticmethod
def convert_assets_list_to_dict(
assets_list: List[Dict[str, str]], asset_name_key: str = "title"
) -> Dict[str, Dict[str, str]]:
"""Convert a list of assets to a dictionary where keys represent
name of assets and are found among values of asset dictionaries.
assets_list == [
{"href": "foo", "title": "asset1", "name": "foo-name"},
{"href": "bar", "title": "path/to/asset1", "name": "bar-name"},
{"href": "baz", "title": "path/to/asset2", "name": "baz-name"},
{"href": "qux", "title": "asset3", "name": "qux-name"},
] and asset_name_key == "title" => {
"asset1": {"href": "foo", "title": "asset1", "name": "foo-name"},
"path/to/asset1": {"href": "bar", "title": "path/to/asset1", "name": "bar-name"},
"asset2": {"href": "baz", "title": "path/to/asset2", "name": "baz-name"},
"asset3": {"href": "qux", "title": "asset3", "name": "qux-name"},
}
assets_list == [
{"href": "foo", "title": "foo-title", "name": "asset1"},
{"href": "bar", "title": "bar-title", "name": "path/to/asset1"},
{"href": "baz", "title": "baz-title", "name": "path/to/asset2"},
{"href": "qux", "title": "qux-title", "name": "asset3"},
] and asset_name_key == "name" => {
"asset1": {"href": "foo", "title": "foo-title", "name": "asset1"},
"path/to/asset1": {"href": "bar", "title": "bar-title", "name": "path/to/asset1"},
"asset2": {"href": "baz", "title": "baz-title", "name": "path/to/asset2"},
"asset3": {"href": "qux", "title": "qux-title", "name": "asset3"},
}
"""
asset_names: List[str] = []
assets_dict: Dict[str, Dict[str, str]] = {}
for asset in assets_list:
asset_name = asset[asset_name_key]
asset_names.append(asset_name)
assets_dict[asset_name] = asset
# we only keep the equivalent of the path basename in the case where the
# asset name has a path pattern and this basename is only found once
immutable_asset_indexes: List[int] = []
for i, asset_name in enumerate(asset_names):
if i in immutable_asset_indexes:
continue
change_asset_name = True
asset_basename = asset_name.split("/")[-1]
j = i + 1
while change_asset_name and j < len(asset_names):
asset_tmp_basename = asset_names[j].split("/")[-1]
if asset_basename == asset_tmp_basename:
change_asset_name = False
immutable_asset_indexes.extend([i, j])
j += 1
if change_asset_name:
assets_dict[asset_basename] = assets_dict.pop(asset_name)
return assets_dict
# if stac extension colon separator `:` is in search params, parse it to prevent issues with vformat
if re.search(r"{[a-zA-Z0-9_-]*:[a-zA-Z0-9_-]*}", search_param):
search_param = re.sub(
r"{([a-zA-Z0-9_-]*):([a-zA-Z0-9_-]*)}", r"{\1_COLON_\2}", search_param
)
kwargs = {k.replace(":", "_COLON_"): v for k, v in kwargs.items()}
return MetadataFormatter().vformat(search_param, args, kwargs)
def properties_from_json(
json: Dict[str, Any],
mapping: Dict[str, Any],
discovery_config: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Extract properties from a provider json result.
:param json: The representation of a provider result as a json object
:param mapping: A mapping between :class:`~eodag.api.product._product.EOProduct`'s metadata
keys and the location of the values of these properties in the json
representation, expressed as a
`jsonpath <http://goessner.net/articles/JsonPath/>`_
:param discovery_config: (optional) metadata discovery configuration dict, accepting among other items
`discovery_pattern` (Regex pattern for metadata key discovery, e.g. "^[a-zA-Z]+$"),
`discovery_path` (String representation of jsonpath)
:returns: The metadata of the :class:`~eodag.api.product._product.EOProduct`
"""
properties: Dict[str, Any] = {}
templates = {}
used_jsonpaths = []
for metadata, value in mapping.items():
# Treat the case when the value is from a queryable metadata
if isinstance(value, list):
conversion_or_none, path_or_text = value[1]
else:
conversion_or_none, path_or_text = value
if isinstance(path_or_text, str):
if re.search(r"({[^{}:]+})+", path_or_text):
templates[metadata] = path_or_text
else:
properties[metadata] = path_or_text
else:
try:
match = path_or_text.find(json)
except KeyError:
match = []
if len(match) == 1:
extracted_value = match[0].value
used_jsonpaths.append(match[0].full_path)
else:
extracted_value = NOT_AVAILABLE
if extracted_value is None:
properties[metadata] = None
else:
if conversion_or_none is None:
properties[metadata] = extracted_value
else:
# reformat conversion_or_none as metadata#converter(args) or metadata#converter
if (
len(conversion_or_none) > 1
and isinstance(conversion_or_none, list)
and conversion_or_none[1] is not None
):
conversion_or_none = "%s(%s)" % (
conversion_or_none[0],
conversion_or_none[1],
)
elif isinstance(conversion_or_none, list):
conversion_or_none = conversion_or_none[0]
# check if conversion uses variables to format
if re.search(r"({[^{}:]+})+", conversion_or_none):
conversion_or_none = conversion_or_none.format(**properties)
properties[metadata] = format_metadata(
"{%s%s%s}" % (metadata, SEP, conversion_or_none),
**{metadata: extracted_value},
)
# properties as python objects when possible (format_metadata returns only strings)
try:
properties[metadata] = ast.literal_eval(properties[metadata])
except Exception:
pass
# Resolve templates
for metadata, template in templates.items():
try:
properties[metadata] = format_string(metadata, template, **properties)
except ValueError:
logger.warning(
f"Could not parse {metadata} ({template}) using product properties"
)
logger.debug(f"available properties: {properties}")
properties[metadata] = NOT_AVAILABLE
# adds missing discovered properties
if not discovery_config:
discovery_config = {}
discovery_pattern = discovery_config.get("metadata_pattern", None)
discovery_path = discovery_config.get("metadata_path", None)
if discovery_pattern and discovery_path:
discovery_jsonpath = string_to_jsonpath(discovery_path)
discovered_properties = (
discovery_jsonpath.find(json)
if isinstance(discovery_jsonpath, JSONPath)
else []
)
for found_jsonpath in discovered_properties:
if "metadata_path_id" in discovery_config.keys():
found_key_paths = string_to_jsonpath(
discovery_config["metadata_path_id"], force=True
).find(found_jsonpath.value)
if not found_key_paths or isinstance(found_key_paths, int):
continue
found_key = found_key_paths[0].value
used_jsonpath = Child(
found_jsonpath.full_path,
string_to_jsonpath(
discovery_config["metadata_path_value"], force=True
),
)
else:
# default key got from metadata_path
found_key = found_jsonpath.path.fields[-1]
used_jsonpath = found_jsonpath.full_path
if (
re.compile(discovery_pattern).match(found_key)
and found_key not in properties.keys()
and used_jsonpath not in used_jsonpaths
):
if "metadata_path_value" in discovery_config.keys():
found_value_path = string_to_jsonpath(
discovery_config["metadata_path_value"], force=True
).find(found_jsonpath.value)
properties[found_key] = (
found_value_path[0].value
if found_value_path and not isinstance(found_value_path, int)
else NOT_AVAILABLE
)
else:
# default value got from metadata_path
properties[found_key] = found_jsonpath.value
# properties as python objects when possible (format_metadata returns only strings)
try:
properties[found_key] = ast.literal_eval(properties[found_key])
except Exception:
pass
return properties
def properties_from_xml(
xml_as_text: AnyStr,
mapping: Any,
empty_ns_prefix: str = "ns",
discovery_config: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Extract properties from a provider xml result.
:param xml_as_text: The representation of a provider result as xml
:param mapping: A mapping between :class:`~eodag.api.product._product.EOProduct`'s metadata
keys and the location of the values of these properties in the xml
representation, expressed as a
`xpath <https://www.w3schools.com/xml/xml_xpath.asp>`_
:param empty_ns_prefix: (optional) The name to give to the default namespace of `xml_as_text`.
This is a technical workaround for the limitation of lxml
not supporting empty namespace prefix. The
xpath in `mapping` must use this value to be able to
correctly reach empty-namespace prefixed elements
:param discovery_config: (optional) metadata discovery configuration dict, accepting among other items
`discovery_pattern` (Regex pattern for metadata key discovery, e.g. "^[a-zA-Z]+$"),
`discovery_path` (String representation of xpath)
:returns: the metadata of the :class:`~eodag.api.product._product.EOProduct`
"""
properties: Dict[str, Any] = {}
templates = {}
used_xpaths = []
root = etree.XML(xml_as_text)
for metadata, value in mapping.items():
# Treat the case when the value is from a queryable metadata
if isinstance(value, list):
conversion_or_none, path_or_text = value[1]
else:
conversion_or_none, path_or_text = value
try:
extracted_value = root.xpath(
path_or_text,
namespaces={k or empty_ns_prefix: v for k, v in root.nsmap.items()},
)
if len(extracted_value) <= 1:
if len(extracted_value) < 1:
# If there is no matched value (empty list), mark the metadata as not
# available
extracted_value = [NOT_AVAILABLE]
else:
# store element tag in used_xpaths
used_xpaths.append(
getattr(
root.xpath(
path_or_text.replace("/text()", ""),
namespaces={
k or empty_ns_prefix: v
for k, v in root.nsmap.items()
},
)[0],
"tag",
None,
)
)
if conversion_or_none is None:
properties[metadata] = extracted_value[0]
else:
# reformat conversion_or_none as metadata#converter(args) or metadata#converter
if (
len(conversion_or_none) > 1
and isinstance(conversion_or_none, list)
and conversion_or_none[1] is not None
):
conversion_or_none = "%s(%s)" % (
conversion_or_none[0],
conversion_or_none[1],
)
elif isinstance(conversion_or_none, list):
conversion_or_none = conversion_or_none[0]
properties[metadata] = format_metadata(
"{%s%s%s}" % (metadata, SEP, conversion_or_none),
**{metadata: extracted_value[0]},
)
# If there are multiple matches, consider the result as a list, doing a
# formatting if any
else:
if conversion_or_none is None:
properties[metadata] = extracted_value
else:
# reformat conversion_or_none as metadata#converter(args) or metadata#converter
if (
len(conversion_or_none) > 1
and isinstance(conversion_or_none, list)
and conversion_or_none[1] is not None
):
conversion_or_none = "%s(%s)" % (
conversion_or_none[0],
conversion_or_none[1],
)
elif isinstance(conversion_or_none, list):
conversion_or_none = conversion_or_none[0]
# check if conversion uses variables to format
if re.search(r"({[^{}:]+})+", conversion_or_none):
conversion_or_none = conversion_or_none.format(**properties)
properties[metadata] = [
format_metadata(
"{%s%s%s}"
% (
metadata,
SEP,
conversion_or_none,
), # Re-build conversion format identifier
**{metadata: extracted_value_item},
)
for extracted_value_item in extracted_value
]
except XPathEvalError:
# Assume the mapping is to be passed as is, in which case we readily
# register it, or is a template, in which case we register it for later
# formatting resolution using previously successfully resolved properties
# Ignore any transformation specified. If a value is to be passed as is,
# we don't want to transform it further
if re.search(r"({[^{}:]+})+", path_or_text):
templates[metadata] = path_or_text
else:
properties[metadata] = path_or_text
# Resolve templates
for metadata, template in templates.items():
properties[metadata] = template.format(**properties)
# adds missing discovered properties
if not discovery_config:
discovery_config = {}
discovery_pattern = discovery_config.get("metadata_pattern", None)
discovery_path = discovery_config.get("metadata_path", None)
if discovery_pattern and discovery_path:
discovered_properties = root.xpath(
discovery_path,
namespaces={k or empty_ns_prefix: v for k, v in root.nsmap.items()},
)
for found_xpath in discovered_properties:
found_key = found_xpath.tag.rpartition("}")[-1]
if (
re.compile(discovery_pattern).match(found_key)
and found_key not in properties.keys()
and found_xpath.tag not in used_xpaths
):
properties[found_key] = found_xpath.text
return properties
def mtd_cfg_as_conversion_and_querypath(
src_dict: Dict[str, Any],
dest_dict: Dict[str, Any] = {},
result_type: str = "json",
) -> Dict[str, Any]:
"""Metadata configuration dictionary to querypath with conversion dictionary
Transform every src_dict value from jsonpath_str to tuple `(conversion, jsonpath_object)`
or from xpath_str to tuple `(conversion, xpath_str)`
:param src_dict: Input dict containing jsonpath str as values
:param dest_dict: (optional) Output dict containing jsonpath objects as values
:returns: dest_dict
"""
# check if the configuration has already been converted
some_configured_value = (
next(iter(dest_dict.values())) if dest_dict else next(iter(src_dict.values()))
)
if (
isinstance(some_configured_value, list)
and isinstance(some_configured_value[1], tuple)
or isinstance(some_configured_value, tuple)
):
return dest_dict or src_dict
if not dest_dict:
dest_dict = deepcopy(src_dict)
for metadata in src_dict:
if metadata not in dest_dict:
dest_dict[metadata] = (None, NOT_MAPPED)
else:
conversion, path = get_metadata_path(dest_dict[metadata])
if result_type == "json":
parsed_path = string_to_jsonpath(path)
if isinstance(parsed_path, str):
# not a jsonpath: assume the mapping is to be passed as is. Ignore any transformation specified.
# If a value is to be passed as is, we don't want to transform it further
conversion = None
else:
parsed_path = path
if isinstance(dest_dict[metadata], list) and len(dest_dict[metadata]) == 2:
dest_dict[metadata][1] = (conversion, parsed_path)
else:
dest_dict[metadata] = (conversion, parsed_path)
# Put the updated mapping at the end
dest_dict[metadata] = dest_dict.pop(metadata)
return dest_dict
def format_query_params(
product_type: str, config: PluginConfig, query_dict: Dict[str, Any]
) -> Dict[str, Any]:
"""format the search parameters to query parameters"""
if "raise_errors" in query_dict.keys():
del query_dict["raise_errors"]
# . not allowed in eodag_search_key, replaced with %2E
query_dict = {k.replace(".", "%2E"): v for k, v in query_dict.items()}
product_type_metadata_mapping = dict(
config.metadata_mapping,
**config.products.get(product_type, {}).get("metadata_mapping", {}),
)
query_params: Dict[str, Any] = {}
# Get all the search parameters that are recognised as queryables by the
# provider (they appear in the queryables dictionary)
queryables = _get_queryables(query_dict, config, product_type_metadata_mapping)
for eodag_search_key, provider_search_key in queryables.items():
user_input = query_dict[eodag_search_key]
if COMPLEX_QS_REGEX.match(provider_search_key):
parts = provider_search_key.split("=")
if len(parts) == 1:
formatted_query_param = format_metadata(
provider_search_key, product_type, **query_dict
)
formatted_query_param = formatted_query_param.replace("'", '"')
if "{{" in provider_search_key:
# retrieve values from hashes where keys are given in the param
if "}[" in formatted_query_param:
formatted_query_param = _resolve_hashes(formatted_query_param)
# json query string (for POST request)
update_nested_dict(
query_params,
orjson.loads(formatted_query_param),
extend_list_values=True,
allow_extend_duplicates=False,
)
else:
query_params[eodag_search_key] = formatted_query_param
else:
provider_search_key, provider_value = parts
query_params.setdefault(provider_search_key, []).append(
format_metadata(provider_value, product_type, **query_dict)
)
else:
query_params[provider_search_key] = user_input
# Now get all the literal search params (i.e params to be passed "as is"
# in the search request)
# ignore additional_params if it isn't a dictionary
literal_search_params = getattr(config, "literal_search_params", {})
if not isinstance(literal_search_params, dict):
literal_search_params = {}
# Now add formatted free text search parameters (this is for cases where a
# complex query through a free text search parameter is available for the
# provider and needed for the consumer)
product_type_metadata_mapping = dict(
config.metadata_mapping,
**config.products.get(product_type, {}).get("metadata_mapping", {}),
)
literal_search_params.update(
_format_free_text_search(config, product_type_metadata_mapping, **query_dict)
)
for provider_search_key, provider_value in literal_search_params.items():
if isinstance(provider_value, list):
query_params.setdefault(provider_search_key, []).extend(provider_value)
else:
query_params.setdefault(provider_search_key, []).append(provider_value)
return query_params
def _resolve_hashes(formatted_query_param: str) -> str:
"""
resolves structures of the format {"a": "abc", "b": "cde"}["a"] given in the formatted_query_param
the structure is replaced by the value corresponding to the given key in the hash
(in this case "abc")
"""
# check if there is still a hash to be resolved
while '}["' in formatted_query_param:
# find and parse code between {}
ind_open = formatted_query_param.find('}["')
ind_close = formatted_query_param.find('"]', ind_open)
hash_start = formatted_query_param[:ind_open].rfind(": {") + 2
h = orjson.loads(formatted_query_param[hash_start : ind_open + 1])
# find key and get value
ind_key_start = formatted_query_param.find('"', ind_open) + 1
key = formatted_query_param[ind_key_start:ind_close]
value = h[key]
# replace hash with value
if isinstance(value, str):
formatted_query_param = formatted_query_param.replace(
formatted_query_param[hash_start : ind_close + 2], '"' + value + '"'
)
else:
formatted_query_param = formatted_query_param.replace(
formatted_query_param[hash_start : ind_close + 2], json.dumps(value)
)
return formatted_query_param
def _format_free_text_search(
config: PluginConfig, metadata_mapping: Dict[str, Any], **kwargs: Any
) -> Dict[str, Any]:
"""Build the free text search parameter using the search parameters"""
query_params: Dict[str, Any] = {}
if not getattr(config, "free_text_search_operations", None):
return query_params
for param, operations_config in config.free_text_search_operations.items():
union = operations_config["union"]
wrapper = operations_config.get("wrapper", "{}")
formatted_query = []
for operator, operands in operations_config["operations"].items():
# The Operator string is the operator wrapped with spaces
operator = " {} ".format(operator)
# Build the operation string by joining the formatted operands together
# using the operation string
operation_string = operator.join(
format_metadata(operand, **kwargs)
for operand in operands
if any(
re.search(rf"{{{kw}[}}#]", operand)
and val is not None
and isinstance(metadata_mapping.get(kw, []), list)
for kw, val in kwargs.items()
)
)
# Finally wrap the operation string as specified by the wrapper and add
# it to the list of queries (only if the operation string is not empty)
if operation_string:
query = wrapper.format(operation_string)
formatted_query.append(query)
# Join the formatted query using the "union" config parameter, and then
# wrap it with the Python format string specified in the "wrapper" config
# parameter
final_query = union.join(formatted_query)
if len(operations_config["operations"]) > 1 and len(formatted_query) > 1:
final_query = wrapper.format(query_params[param])
if final_query:
query_params[param] = final_query
return query_params
def _get_queryables(
search_params: Dict[str, Any],
config: PluginConfig,
metadata_mapping: Dict[str, Any],
) -> Dict[str, Any]:
"""Retrieve the metadata mappings that are query-able"""
logger.debug("Retrieving queryable metadata from metadata_mapping")
queryables: Dict[str, Any] = {}
for eodag_search_key, user_input in search_params.items():
if user_input is not None:
md_mapping = metadata_mapping.get(eodag_search_key, (None, NOT_MAPPED))
_, md_value = md_mapping
# query param from defined metadata_mapping
if md_mapping is not None and isinstance(md_mapping, list):
search_param = get_search_param(md_mapping)
if search_param is not None:
queryables[eodag_search_key] = search_param
# query param from metadata auto discovery
elif md_value == NOT_MAPPED and getattr(
config, "discover_metadata", {}
).get("auto_discovery", False):
pattern = re.compile(
config.discover_metadata.get("metadata_pattern", "")
)
search_param_cfg = config.discover_metadata.get("search_param", "")
if pattern.match(eodag_search_key) and isinstance(
search_param_cfg, str
):
search_param = search_param_cfg.format(metadata=eodag_search_key)
queryables[eodag_search_key] = search_param
elif pattern.match(eodag_search_key) and isinstance(
search_param_cfg, dict
):
search_param_cfg_parsed = dict_items_recursive_apply(
search_param_cfg,
lambda k, v: v.format(metadata=eodag_search_key),
)
for k, v in search_param_cfg_parsed.items():
if getattr(config, k, None):
update_nested_dict(
getattr(config, k),
v,
extend_list_values=True,
allow_extend_duplicates=False,
)
else:
logger.warning(
"Could not use discover_metadata[search_param]: no entry for %s in plugin config",
k,
)
return queryables
def get_queryable_from_provider(
provider_queryable: str, metadata_mapping: Dict[str, Union[str, List[str]]]
) -> Optional[str]:
"""Get EODAG configured queryable parameter from provider queryable parameter
:param provider_queryable: provider queryable parameter
:param metadata_mapping: metadata-mapping configuration
:returns: EODAG configured queryable parameter or None
"""
pattern = rf"\b{provider_queryable}\b"
# if 1:1 mapping exists privilege this one instead of other mapping
# e.g. provider queryable = year -> use year and not date in which year also appears
mapping_values = [
v[0] if isinstance(v, list) else "" for v in metadata_mapping.values()
]
if provider_queryable in mapping_values:
ind = mapping_values.index(provider_queryable)
return Queryables.get_queryable_from_alias(list(metadata_mapping.keys())[ind])
for param, param_conf in metadata_mapping.items():
if isinstance(param_conf, list) and re.search(pattern, param_conf[0]):
return Queryables.get_queryable_from_alias(param)
return None
def get_provider_queryable_path(
queryable: str, metadata_mapping: Dict[str, Union[str, List[str]]]
) -> Optional[str]:
"""Get EODAG configured queryable path from its parameter
:param queryable: eodag queryable parameter
:param metadata_mapping: metadata-mapping configuration
:returns: EODAG configured queryable path or None
"""
parameter_conf = metadata_mapping.get(queryable, None)
if isinstance(parameter_conf, list):
return parameter_conf[0]
else:
return None
def get_provider_queryable_key(
eodag_key: str,
provider_queryables: Dict[str, Any],
metadata_mapping: Dict[str, Union[List[Any], str]],
) -> str:
"""finds the provider queryable corresponding to the given eodag key based on the metadata mapping
:param eodag_key: key in eodag
:param provider_queryables: queryables returned from the provider
:param metadata_mapping: metadata mapping from which the keys are retrieved
:returns: provider queryable key
"""
if eodag_key not in metadata_mapping:
return ""
mapping_key = metadata_mapping[eodag_key]
if isinstance(mapping_key, list):
for queryable in provider_queryables:
pattern = rf"\b{queryable}\b"
if re.search(pattern, mapping_key[0]):
return queryable
return ""
else:
return eodag_key
# Keys taken from OpenSearch extension for Earth Observation http://docs.opengeospatial.org/is/13-026r9/13-026r9.html
# For a metadata to be queryable, The way to query it must be specified in the
# provider metadata_mapping configuration parameter. It will be automatically
# detected as queryable by eodag when this is done
OSEO_METADATA_MAPPING = {
# Opensearch resource identifier within the search engine context (in our case
# within the context of the data provider)
"uid": "$.uid",
# OpenSearch Parameters for Collection Search (Table 3)
"productType": "$.properties.productType",
"doi": "$.properties.doi",
"platform": "$.properties.platform",
"platformSerialIdentifier": "$.properties.platformSerialIdentifier",
"instrument": "$.properties.instrument",
"sensorType": "$.properties.sensorType",
"compositeType": "$.properties.compositeType",
"processingLevel": "$.properties.processingLevel",
"orbitType": "$.properties.orbitType",
"spectralRange": "$.properties.spectralRange",
"wavelengths": "$.properties.wavelengths",
"hasSecurityConstraints": "$.properties.hasSecurityConstraints",
"dissemination": "$.properties.dissemination",
# INSPIRE obligated OpenSearch Parameters for Collection Search (Table 4)
"title": "$.properties.title",
"topicCategory": "$.properties.topicCategory",
"keyword": "$.properties.keyword",
"abstract": "$.properties.abstract",
"resolution": "$.properties.resolution",
"organisationName": "$.properties.organisationName",
"organisationRole": "$.properties.organisationRole",
"publicationDate": "$.properties.publicationDate",
"lineage": "$.properties.lineage",
"useLimitation": "$.properties.useLimitation",
"accessConstraint": "$.properties.accessConstraint",
"otherConstraint": "$.properties.otherConstraint",
"classification": "$.properties.classification",
"language": "$.properties.language",
"specification": "$.properties.specification",
# OpenSearch Parameters for Product Search (Table 5)
"parentIdentifier": "$.properties.parentIdentifier",
"productionStatus": "$.properties.productionStatus",
"acquisitionType": "$.properties.acquisitionType",
"orbitNumber": "$.properties.orbitNumber",
"orbitDirection": "$.properties.orbitDirection",
"track": "$.properties.track",
"frame": "$.properties.frame",
"swathIdentifier": "$.properties.swathIdentifier",
"cloudCover": "$.properties.cloudCover",
"snowCover": "$.properties.snowCover",
"lowestLocation": "$.properties.lowestLocation",
"highestLocation": "$.properties.highestLocation",
"productVersion": "$.properties.productVersion",
"productQualityStatus": "$.properties.productQualityStatus",
"productQualityDegradationTag": "$.properties.productQualityDegradationTag",
"processorName": "$.properties.processorName",
"processingCenter": "$.properties.processingCenter",
"creationDate": "$.properties.creationDate",
"modificationDate": "$.properties.modificationDate",
"processingDate": "$.properties.processingDate",
"sensorMode": "$.properties.sensorMode",
"archivingCenter": "$.properties.archivingCenter",
"processingMode": "$.properties.processingMode",
# OpenSearch Parameters for Acquistion Parameters Search (Table 6)
"availabilityTime": "$.properties.availabilityTime",
"acquisitionStation": "$.properties.acquisitionStation",
"acquisitionSubType": "$.properties.acquisitionSubType",
"startTimeFromAscendingNode": "$.properties.startTimeFromAscendingNode",
"completionTimeFromAscendingNode": "$.properties.completionTimeFromAscendingNode",
"illuminationAzimuthAngle": "$.properties.illuminationAzimuthAngle",
"illuminationZenithAngle": "$.properties.illuminationZenithAngle",
"illuminationElevationAngle": "$.properties.illuminationElevationAngle",
"polarizationMode": "$.properties.polarizationMode",
"polarizationChannels": "$.properties.polarizationChannels",
"antennaLookDirection": "$.properties.antennaLookDirection",
"minimumIncidenceAngle": "$.properties.minimumIncidenceAngle",
"maximumIncidenceAngle": "$.properties.maximumIncidenceAngle",
"dopplerFrequency": "$.properties.dopplerFrequency",
"incidenceAngleVariation": "$.properties.incidenceAngleVariation",
}
DEFAULT_METADATA_MAPPING = dict(
OSEO_METADATA_MAPPING,
**{
# Custom parameters (not defined in the base document referenced above)
# id differs from uid. The id is an identifier by which a product which is
# distributed by many providers can be retrieved (a property that it has in common
# in the catalogues of all the providers on which it is referenced)
"id": "$.id",
# The geographic extent of the product
"geometry": "$.geometry",
# The url of the quicklook
"quicklook": "$.properties.quicklook",
# The url to download the product "as is" (literal or as a template to be completed
# either after the search result is obtained from the provider or during the eodag
# download phase)
"downloadLink": "$.properties.downloadLink",
},
)