Source code for eodag.plugins.search.cop_marine

# -*- coding: utf-8 -*-
# Copyright 2024, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import copy
import datetime as dt
import logging
import os
import re
from typing import TYPE_CHECKING, Any, Optional, cast
from urllib.parse import urlsplit

import boto3
import botocore
import requests
from dateutil.tz import tzutc
from dateutil.utils import today

from eodag import EOProduct
from eodag.api.product import AssetsDict
from eodag.api.search_result import SearchResult
from eodag.config import PluginConfig
from eodag.plugins.search import PreparedSearch
from eodag.plugins.search.static_stac_search import StaticStacSearch
from eodag.utils import get_bucket_name_and_prefix, get_geometry_from_various
from eodag.utils.dates import parse_to_utc, to_iso_utc_string
from eodag.utils.exceptions import RequestError, UnsupportedCollection, ValidationError

if TYPE_CHECKING:
    from mypy_boto3_s3 import S3Client

logger = logging.getLogger("eodag.search.cop_marine")


def _get_date_from_yyyymmdd(date_str: str, item_key: str) -> Optional[dt.datetime]:
    year = date_str[:4]
    month = date_str[4:6]
    if len(date_str) > 6:
        day = date_str[6:]
    else:
        day = "1"
    try:
        date = dt.datetime(
            int(year),
            int(month),
            int(day),
            tzinfo=tzutc(),
        )
    except ValueError:
        logger.error(f"{item_key}: {date_str} is not a valid date")
        return None
    else:
        return date


def _get_dates_from_dataset_data(
    dataset_item: dict[str, Any]
) -> Optional[dict[str, str]]:
    dates = {}
    if "start_datetime" in dataset_item["properties"]:
        dates["start"] = dataset_item["properties"]["start_datetime"]
        dates["end"] = dataset_item["properties"]["end_datetime"]
    elif "datetime" in dataset_item["properties"]:
        dates["start"] = dataset_item["properties"]["datetime"]
        dates["end"] = dataset_item["properties"]["datetime"]
    else:
        return None
    return dates


def _get_s3_client(endpoint_url: str) -> S3Client:
    s3_session = boto3.Session()
    return s3_session.client(
        "s3",
        config=botocore.config.Config(
            # Configures to use subdomain/virtual calling format.
            s3={"addressing_style": "virtual"},
            signature_version=botocore.UNSIGNED,
        ),
        endpoint_url=endpoint_url,
    )


def _check_int_values_properties(properties: dict[str, Any]):
    # remove int values with a bit length of more than 64 from the properties
    invalid = []
    for prop, prop_value in properties.items():
        if isinstance(prop_value, int) and prop_value.bit_length() > 64:
            invalid.append(prop)
        if isinstance(prop_value, dict):
            _check_int_values_properties(prop_value)

    for inv_key in invalid:
        properties.pop(inv_key)


[docs] class CopMarineSearch(StaticStacSearch): """class that implements search for the Copernicus Marine provider It calls :meth:`~eodag.plugins.search.static_stac_search.StaticStacSearch.discover_collections` inherited from :class:`~eodag.plugins.search.static_stac_search.StaticStacSearch` but for the actual search a special method which fetches the urls of the available products from an S3 storage and filters them has been written. The configuration parameters are inherited from the parent and grand-parent classes. The :attr:`~eodag.config.PluginConfig.DiscoverMetadata.auto_discovery` parameter in the :attr:`~eodag.config.PluginConfig.discover_metadata` section has to be set to ``false`` and the :attr:`~eodag.config.PluginConfig.DiscoverQueryables.fetch_url` in the :attr:`~eodag.config.PluginConfig.discover_queryables` queryables section has to be set to ``null`` to overwrite the default config from the stac provider configuration because those functionalities are not available. """
[docs] def __init__(self, provider: str, config: PluginConfig): original_metadata_mapping = copy.deepcopy(config.metadata_mapping) super().__init__(provider, config) # reset to original metadata mapping from config (changed in super class init) self.config.metadata_mapping = original_metadata_mapping
def _get_collection_info( self, collection: str ) -> tuple[dict[str, Any], list[dict[str, Any]]]: """Fetch collection and associated datasets info""" fetch_url = cast(str, self.config.discover_collections["fetch_url"]).format( **self.config.__dict__ ) logger.debug("fetch data for collection %s", collection) provider_collection = self.config.products.get(collection, {}).get( "_collection" ) if not provider_collection: provider_collection = collection collection_url = ( fetch_url.replace("catalog.stac.json", provider_collection) + "/product.stac.json" ) try: collection_data = requests.get(collection_url).json() except requests.RequestException as exc: if exc.errno == 404: logger.error("product %s not found", collection) raise UnsupportedCollection(collection) logger.error("data for product %s could not be fetched", collection) raise RequestError.from_error( exc, f"data for product {collection} could not be fetched" ) from exc datasets = [] for link in [li for li in collection_data["links"] if li["rel"] == "item"]: dataset_url = ( fetch_url.replace("catalog.stac.json", provider_collection) + "/" + link["href"] ) try: dataset_item = requests.get(dataset_url).json() datasets.append(dataset_item) except requests.RequestException: logger.error("data for dataset %s could not be fetched", link["title"]) return collection_data, datasets def _create_product( self, collection: str, item_key: str, s3_url: str, dataset_item: dict[str, Any], collection_dict: dict[str, Any], use_dataset_dates: bool = False, product_id: Optional[str] = None, asset_properties: dict = {}, ) -> Optional[EOProduct]: item_id = os.path.splitext(item_key.split("/")[-1])[0] if product_id and product_id != item_id: return None s3_url_parts = s3_url.split("/") # create download url from s3_url and item_key, # there are situations where the 2 overlap -> remove parts included in s3_url from item_key item_key_parts = [ part for part in item_key.split("/") if part not in s3_url_parts ] download_url = s3_url + "/" + "/".join(item_key_parts) geometry = ( get_geometry_from_various(**dataset_item) or self.config.metadata_mapping["eodag:default_geometry"] ) properties = { "id": item_id, "title": item_id, "geometry": geometry, "eodag:download_link": download_url, "dataset": dataset_item["id"], # order:status set to succeeded for consistency between providers "order:status": "succeeded", } if use_dataset_dates: dates = _get_dates_from_dataset_data(dataset_item) if not dates: return None properties["start_datetime"] = dates["start"] properties["end_datetime"] = dates["end"] else: item_dates = re.findall(r"(\d{4})(0[1-9]|1[0-2])([0-3]\d)", item_id) if not item_dates: item_dates = re.findall(r"_(\d{4})(0[1-9]|1[0-2])", item_id) item_dates = ["".join(row) for row in item_dates] item_start = _get_date_from_yyyymmdd(item_dates[0], item_key) if not item_start: # identified pattern was not a valid datetime return None if len(item_dates) > 2: # start, end and created_at timestamps item_end = _get_date_from_yyyymmdd(item_dates[1], item_key) else: # only date and created_at timestamps item_end = item_start properties["start_datetime"] = to_iso_utc_string(item_start) properties["end_datetime"] = to_iso_utc_string(item_end or item_start) for key, value in collection_dict["properties"].items(): if key not in ["id", "title", "start_datetime", "end_datetime", "datetime"]: properties[key] = value for key, value in dataset_item["properties"].items(): if key not in ["id", "title", "start_datetime", "end_datetime", "datetime"]: properties[key] = value code_mapping = self.config.products.get(collection, {}).get("code_mapping") if code_mapping: id_parts = item_id.split("_") if len(id_parts) > code_mapping["index"]: code = id_parts[code_mapping["index"]] if "pattern" not in code_mapping: properties[code_mapping["param"]] = code elif re.findall(code_mapping["pattern"], code): properties[code_mapping["param"]] = re.findall( code_mapping["pattern"], code )[0] _check_int_values_properties(properties) properties["eodag:thumbnail"] = collection_dict["assets"]["thumbnail"]["href"] if "omiFigure" in collection_dict["assets"]: properties["eodag:quicklook"] = collection_dict["assets"]["omiFigure"][ "href" ] asset_native = { "title": "native", "href": download_url, "type": "application/x-netcdf", } asset_native.update(asset_properties) assets = {"native": asset_native} additional_assets = self.get_assets_from_mapping(dataset_item) assets.update(additional_assets) product = EOProduct(self.provider, properties, collection=collection) product.assets = AssetsDict(product, assets) product._normalize_bands() return product def query( self, prep: PreparedSearch = PreparedSearch(), **kwargs: Any, ) -> SearchResult: """ Implementation of search for the Copernicus Marine provider :param prep: object containing search parameterds :param kwargs: additional search arguments :returns: list of products and total number of products """ limit = prep.limit token_value = getattr(prep, "next_page_token") or prep.page # only return 1 page if pagination is disabled if token_value is None or limit is None or int(token_value) > 1 and limit <= 0: result = SearchResult([]) if prep.count: result.number_matched = 0 return result token = int(token_value) collection = kwargs.get("collection", prep.collection) if not collection: raise ValidationError( "parameter collection is required for search with cop_marine provider" ) collection_dict, datasets_items_list = self._get_collection_info(collection) geometry = kwargs.pop("geometry", None) products: list[EOProduct] = [] start_index = limit * (token - 1) + 1 num_total = 0 for i, dataset_item in enumerate(datasets_items_list): if len(products) >= limit and not prep.count and limit > 0: break # Filter by geometry if "id" not in kwargs and geometry: dataset_geom = get_geometry_from_various(**dataset_item) if dataset_geom and not dataset_geom.intersects(geometry): continue try: logger.debug("searching data for dataset %s", dataset_item["id"]) # date bounds if "start_datetime" in kwargs: start_date = parse_to_utc(kwargs["start_datetime"]) elif "start_datetime" in dataset_item["properties"]: start_date = parse_to_utc( dataset_item["properties"]["start_datetime"] ) else: start_date = parse_to_utc(dataset_item["properties"]["datetime"]) if "end_datetime" in kwargs: end_date = parse_to_utc(kwargs["end_datetime"]) elif "end_datetime" in dataset_item["properties"]: end_date = parse_to_utc(dataset_item["properties"]["end_datetime"]) else: end_date = today(tzinfo=tzutc()) # retrieve information about s3 from collection data s3_url = dataset_item["assets"]["native"]["href"] except KeyError as e: logger.warning( f"Unable to extract info from {collection} item #{i}: {str(e)}" ) continue url_parts = urlsplit(s3_url) endpoint_url = url_parts.scheme + "://" + url_parts.hostname bucket, collection_path = get_bucket_name_and_prefix(s3_url, 0) if bucket is None or collection_path is None: logger.warning( f"Unable to get bucket and prefix from {s3_url}, got {(bucket, collection_path)}" ) continue s3_client = _get_s3_client(endpoint_url) asset_properties: dict[str, Any] = {} if ".nc" in collection_path: num_total += 1 if num_total < start_index: continue if len(products) < limit or limit < 0: asset_properties = {} try: object_metadata = s3_client.head_object( Bucket=bucket, Key=collection_path ) if ( object_metadata.get("ResponseMetadata", {}).get( "HTTPStatusCode" ) == 200 ): headers = object_metadata.get("ResponseMetadata", {}).get( "HTTPHeaders", {} ) # do not use 'content-type' header, always at value binary/octet-stream if ( content_length := headers.get("content-length") ) is not None: asset_properties["file:size"] = int(content_length) if (etag := headers.get("etag")) is not None: if "-" not in etag: asset_properties["file:checksum"] = etag.strip('"') if ( last_modified := headers.get("last-modified") ) is not None: try: asset_properties["updated"] = to_iso_utc_string( last_modified ) except Exception: pass except Exception: pass product = self._create_product( collection, collection_path, endpoint_url + "/" + bucket, dataset_item, collection_dict, True, kwargs.get("id"), asset_properties=asset_properties, ) if product: products.append(product) if product and kwargs.get("id"): break continue stop_search = False current_object = None while not stop_search: # list_objects returns max 1000 objects -> use marker to get next objects if current_object: s3_objects = s3_client.list_objects( Bucket=bucket, Prefix=collection_path, Marker=current_object ) else: s3_objects = s3_client.list_objects( Bucket=bucket, Prefix=collection_path ) if "Contents" not in s3_objects: if len(products) == 0 and i == len(datasets_items_list) - 1: result = SearchResult([]) if prep.count: result.number_matched = 0 return result else: break elif len(s3_objects["Contents"]) == 0: stop_search = True break for obj in s3_objects["Contents"]: item_key = obj["Key"] item_id = os.path.splitext(item_key.split("/")[-1])[0] # filter according to date(s) in item id item_dates = re.findall(r"(\d{4})(0[1-9]|1[0-2])([0-3]\d)", item_id) if not item_dates: item_dates = re.findall(r"_(\d{4})(0[1-9]|1[0-2])", item_id) item_dates = [ "".join(row) for row in item_dates ] # join tuples returned by findall if "id" in kwargs: if kwargs["id"] in obj["Key"]: product = self._create_product( collection, obj["Key"], s3_url, dataset_item, collection_dict, not bool(item_dates), ) if product: return SearchResult([product], 1) if len(s3_objects["Contents"]) > 0: current_object = s3_objects["Contents"][-1]["Key"] continue item_start = None item_end = None use_dataset_dates = False if item_dates: item_start = _get_date_from_yyyymmdd(item_dates[0], item_key) if len(item_dates) > 2: # start, end and created_at timestamps item_end = _get_date_from_yyyymmdd(item_dates[1], item_key) if not item_start: # no valid datetime given in id use_dataset_dates = True dates = _get_dates_from_dataset_data(dataset_item) if dates: item_start = parse_to_utc(dates["start"]) item_end = parse_to_utc(dates["end"]) if not item_start: # no valid datetime in id and dataset data continue if item_start > end_date: stop_search = True if ( (start_date <= item_start <= end_date) or (item_end and start_date <= item_end <= end_date) or ( item_end and item_start < start_date and item_end > end_date ) ): num_total += 1 if num_total < start_index: continue if len(products) < limit or limit < 0: # Asset properties asset_properties = {} last_modified_date = obj.get("LastModified") if isinstance(last_modified_date, dt.datetime): asset_properties["updated"] = to_iso_utc_string( last_modified_date ) etag_obj: Any = obj.get("ETag") if isinstance(etag_obj, str) and "-" not in etag_obj: asset_properties["file:checksum"] = etag_obj.strip('"') size = obj.get("Size") if size is not None: asset_properties["file:size"] = int(size) owner_id = obj.get("Owner", {}).get("ID") if owner_id is not None: asset_properties["cop_marine:owner_id"] = owner_id owner_displayname = obj.get("Owner", {}).get("DisplayName") if owner_displayname is not None: asset_properties[ "cop_marine:owner_name" ] = owner_displayname product = self._create_product( collection, item_key, endpoint_url + "/" + bucket, dataset_item, collection_dict, use_dataset_dates, asset_properties=asset_properties, ) if product: products.append(product) current_object = item_key if len(products) >= limit and not prep.count: stop_search = True break search_params = ( kwargs | {"limit": prep.limit} | {"collection": collection} | {"provider": self.provider} | {"geometry": geometry} if geometry else {} ) number_matched = num_total if prep.count else None formated_result = SearchResult( products, number_matched, search_params=search_params, next_page_token=str(start_index + 1), ) return formated_result