Source code for eodag.api.product._product

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import base64
import datetime as dt
import logging
import os
import re
import tempfile
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Iterable,
    Literal,
    Optional,
    Union,
    cast,
)

import geojson
import orjson
import requests
from pystac import Item
from requests import RequestException
from requests.auth import AuthBase
from shapely import geometry
from shapely.errors import ShapelyError

from eodag.types.queryables import CommonStacMetadata
from eodag.types.stac_metadata import create_stac_metadata_model

try:
    # import from eodag-cube if installed
    from eodag_cube.api.product import (  # pyright: ignore[reportMissingImports]
        AssetsDict,
    )
except ImportError:
    from ._assets import AssetsDict

from eodag.api.product.drivers import DRIVERS
from eodag.api.product.drivers.generic import GenericDriver
from eodag.api.product.metadata_mapping import (
    DEFAULT_GEOMETRY,
    NOT_AVAILABLE,
    NOT_MAPPED,
    ONLINE_STATUS,
    normalize_bands,
)
from eodag.utils import (
    DEFAULT_DOWNLOAD_TIMEOUT,
    DEFAULT_DOWNLOAD_WAIT,
    DEFAULT_SHAPELY_GEOMETRY,
    DEFAULT_STREAM_REQUESTS_TIMEOUT,
    GENERIC_STAC_PROVIDER,
    STAC_VERSION,
    USER_AGENT,
    ProgressCallback,
    StreamResponse,
    _deprecated,
    deepcopy,
    format_string,
    get_geometry_from_various,
)
from eodag.utils.deserialize import (
    _import_stac_item_from_eodag_server,
    _import_stac_item_from_known_provider,
    _import_stac_item_from_unknown_provider,
)
from eodag.utils.exceptions import DownloadError, MisconfiguredError, ValidationError
from eodag.utils.repr import dict_to_html_table

if TYPE_CHECKING:
    from concurrent.futures import ThreadPoolExecutor
    from requests.structures import CaseInsensitiveDict
    from shapely.geometry.base import BaseGeometry

    from eodag import EODataAccessGateway
    from eodag.api.product.drivers.base import DatasetDriver
    from eodag.plugins.apis.base import Api
    from eodag.plugins.authentication.base import Authentication
    from eodag.plugins.download.base import Download
    from eodag.plugins.manager import PluginManager
    from eodag.types.download_args import DownloadConf
    from eodag.utils import Unpack

logger = logging.getLogger("eodag.product")


[docs] class EOProduct: """A wrapper around an Earth Observation Product originating from a search. Every Search plugin instance must build an instance of this class for each of the result of its query method, and return a list of such instances. A EOProduct has a `location` attribute that initially points to its remote location, but is later changed to point to its path on the filesystem when the product has been downloaded. It also has a `remote_location` that always points to the remote location, so that the product can be downloaded at anytime if it is deleted from the filesystem. An EOProduct instance also has a reference to the search parameters that led to its creation. :param provider: The provider from which the product originates :param properties: The metadata of the product .. note:: The geojson spec `enforces <https://github.com/geojson/draft-geojson/pull/6>`_ the expression of geometries as WGS84 CRS (EPSG:4326) coordinates and EOProduct is intended to be transmitted as geojson between applications. Therefore it stores geometries in the before mentioned CRS. """ #: The provider from which the product originates provider: str #: The metadata of the product properties: dict[str, Any] #: The collection collection: Optional[str] #: The geometry of the product geometry: BaseGeometry #: The intersection between the product's geometry and the search area. search_intersection: Optional[BaseGeometry] #: The path to the product, either remote or local if downloaded location: str #: The remote path to the product remote_location: str #: Assets of the product assets: AssetsDict #: Driver enables additional methods to be called on the EOProduct driver: DatasetDriver #: Product data filename, stored during download filename: str #: Product search keyword arguments, stored during search search_kwargs: Any #: Datetime for download next try next_try: dt.datetime #: Stream for requests _stream: requests.Response #: HTTP response headers, stored during streamed download headers: CaseInsensitiveDict[str] #: Product size in bytes, stored during streamed download size: Optional[int] #: Backup of the original ``register_downloader``, set by search plugins that #: patch ``register_downloader`` on the instance register_downloader_only: Callable[..., None] def __init__( self, provider: str, properties: dict[str, Any], **kwargs: Any ) -> None: self.provider = provider self.collection = ( kwargs.get("collection") or properties.pop("collection", None) or properties.get("_collection") ) self.location = self.remote_location = properties.get("eodag:download_link", "") self.assets = AssetsDict(self) self.properties = { key: value for key, value in properties.items() if key != "geometry" and value != NOT_MAPPED and NOT_AVAILABLE not in str(value) and not key.startswith("_") } self.properties.setdefault( "datetime", self.properties.get("start_datetime") or self.properties.get("end_datetime"), ) # sort properties to have common stac properties first common_stac_properties = { key: self.properties[key] for key in sorted(self.properties) if ":" not in key } extensions_stac_properties = { key: self.properties[key] for key in sorted(self.properties) if ":" in key } self.properties = common_stac_properties | extensions_stac_properties if "geometry" not in properties or ( ( properties["geometry"] == NOT_AVAILABLE or properties["geometry"] == NOT_MAPPED ) and "eodag:default_geometry" not in properties ): product_geometry = DEFAULT_SHAPELY_GEOMETRY elif not properties["geometry"] or properties["geometry"] == NOT_AVAILABLE: product_geometry = properties.pop( "eodag:default_geometry", DEFAULT_GEOMETRY ) else: product_geometry = properties["geometry"] geometry_obj = get_geometry_from_various(geometry=product_geometry) # whole world as default geometry if geometry_obj is None: geometry_obj = DEFAULT_SHAPELY_GEOMETRY self.geometry = self.search_intersection = geometry_obj self.search_kwargs = kwargs if self.search_kwargs.get("geometry") is not None: searched_geom = get_geometry_from_various( **{"geometry": self.search_kwargs["geometry"]} ) try: self.search_intersection = self.geometry.intersection(searched_geom) except ShapelyError: logger.warning( "Unable to intersect the requested extent: %s with the product " "geometry: %s", searched_geom, product_geometry, ) self.search_intersection = None self.driver = self.get_driver() self.downloader: Optional[Union[Api, Download]] = None self.downloader_auth: Optional[Authentication] = None
[docs] def as_dict(self, skip_invalid: bool = True) -> dict[str, Any]: """Builds a representation of EOProduct as a dictionary to enable its geojson serialization :param skip_invalid: Whether to skip properties whose values are not valid according to the STAC specification. :returns: The representation of a :class:`~eodag.api.product._product.EOProduct` as a Python dict """ search_intersection = None if self.search_intersection is not None: search_intersection = orjson.loads( orjson.dumps(self.search_intersection.__geo_interface__) ) # product properties stac_properties = { **{ key: value for key, value in self.properties.items() if key not in ("geometry", "id") }, "eodag:provider": self.provider, "eodag:search_intersection": search_intersection, "federation:backends": [self.provider], } stac_providers = self.properties.get("providers", []) if not any("host" in p.get("roles", []) for p in stac_providers): stac_providers.append({"name": self.provider, "roles": ["host"]}) stac_properties["providers"] = stac_providers props_model = cast(type[CommonStacMetadata], create_stac_metadata_model()) props_validated = props_model.safe_validate( stac_properties, skip_invalid=skip_invalid ) stac_extensions: set[str] = set(props_validated.get_conformance_classes()) # skip invalid properties if skip_invalid: props_validated_dict = props_validated.model_dump( by_alias=False, exclude_unset=False ) pythonic_fields_properties = { props_model.get_field_from_alias(k): v for k, v in stac_properties.items() } invalid_properties = { k for k, v in pythonic_fields_properties.items() # keep none values if props_model.has_field(k) and props_validated_dict[k] is None and v is not None } for key in invalid_properties: stac_key = props_model.model_fields[key].alias or key stac_properties.pop(stac_key, None) # get conformance classes for assets properties assets_dict = {**self.assets.as_dict()} for asset_key, asset_properties in self.assets.as_dict().items(): asset_props_validated = props_model.safe_validate( asset_properties, skip_invalid=skip_invalid ) stac_extensions.update(asset_props_validated.get_conformance_classes()) # skip invalid assets properties if skip_invalid: invalid_asset_properties = { k for k in asset_properties.keys() if k not in asset_props_validated.model_dump() and props_model.has_field(k) } for key in invalid_asset_properties: assets_dict[asset_key].pop(key, None) geojson_repr: dict[str, Any] = { "type": "Feature", "geometry": orjson.loads(orjson.dumps(self.geometry.__geo_interface__)), "bbox": list(self.geometry.bounds), "id": self.properties["id"], "assets": assets_dict, "properties": stac_properties, "links": [ { "rel": "collection", "href": f"{self.collection}.json", "type": "application/json", }, ], "stac_extensions": list(stac_extensions), "stac_version": STAC_VERSION, "collection": self.collection, } return geojson_repr
[docs] def as_pystac_object(self, skip_invalid: bool = True) -> Item: """Builds a representation of EOProduct as a pystac Item to enable its manipulation with pystac methods :param skip_invalid: Whether to skip properties whose values are not valid according to the STAC specification. :returns: The representation of a :class:`~eodag.api.product._product.EOProduct` as a :class:`pystac.Item` """ prod_dict = self.as_dict(skip_invalid=skip_invalid) return Item.from_dict(prod_dict)
[docs] @classmethod def from_dict( cls, feature: dict[str, Any], dag: Optional[EODataAccessGateway] = None, raise_errors: bool = False, ) -> EOProduct: """Builds an :class:`~eodag.api.product._product.EOProduct` object from its serialized representation as a Python dict. :param feature: The representation of a :class:`~eodag.api.product._product.EOProduct` as a Python dict :param dag: (optional) The EODataAccessGateway instance to use for registering the product downloader. If not provided, the downloader and authenticator will not be registered. :param raise_errors: (optional) Whether to raise exceptions in case of errors during the deserialize process. If False, and if ``dag`` is given, several import methods will be tried: from serialized, from eodag-server, from known provider, from unknown provider. :returns: An instance of :class:`~eodag.api.product._product.EOProduct` :raises: :class:`~eodag.utils.exceptions.ValidationError` """ if dag is not None: # add a generic STAC provider that might be needed to handle the items dag.add_provider(GENERIC_STAC_PROVIDER) plugin_manager = dag._plugins_manager product = cls._from_stac_item( feature, plugin_manager, provider=None, raise_errors=raise_errors ) if product is None: raise ValidationError( "Unable to build EOProduct from the provided dictionary, no import method succeeded" ) else: product = cls._import_stac_item_from_serialized(feature) return product
[docs] @classmethod def from_file( cls, filepath: str, dag: Optional[EODataAccessGateway] = None, raise_errors: bool = False, ) -> EOProduct: """Builds an :class:`~eodag.api.product._product.EOProduct` object from a file containing its serialized representation as geojson. :param filepath: The path to the file containing the serialized representation of a product :param dag: (optional) The EODataAccessGateway instance to use for registering the product downloader. If not provided, the downloader and authenticator will not be registered. :param raise_errors: (optional) Whether to raise exceptions in case of errors during the deserialize process. If False, several import methods will be tried: from serialized, from eodag-server, from known provider, from unknown provider. :returns: An instance of :class:`~eodag.api.product._product.EOProduct` :raises: :class:`~eodag.utils.exceptions.ValidationError` """ with open(filepath, "r") as fh: feature = geojson.load(fh) return cls.from_dict(feature, dag=dag, raise_errors=raise_errors)
[docs] @classmethod def from_pystac( cls, item: Item, dag: Optional[EODataAccessGateway] = None, raise_errors: bool = False, ) -> EOProduct: """Builds an :class:`~eodag.api.product._product.EOProduct` object from a pystac Item. :param item: The :class:`pystac.Item` containing the metadata of the product :param dag: (optional) The EODataAccessGateway instance to use for registering the product downloader. If not provided, the downloader and authenticator will not be registered. :param raise_errors: (optional) Whether to raise exceptions in case of errors during the deserialize process. If False, several import methods will be tried: from serialized, from eodag-server, from known provider, from unknown provider. :returns: An instance of :class:`~eodag.api.product._product.EOProduct` :raises: :class:`~eodag.utils.exceptions.ValidationError` """ feature = item.to_dict() return cls.from_dict(feature, dag=dag, raise_errors=raise_errors)
@classmethod @_deprecated( reason="Please use 'EOProduct.from_dict' instead", version="4.1.0", ) def from_geojson(cls, feature: dict[str, Any]) -> EOProduct: """Builds an :class:`~eodag.api.product._product.EOProduct` object from its representation as geojson :param feature: The representation of a :class:`~eodag.api.product._product.EOProduct` as a Python dict :returns: An instance of :class:`~eodag.api.product._product.EOProduct` :raises: :class:`~eodag.utils.exceptions.ValidationError` """ return cls.from_dict(feature, raise_errors=True) # Implementation of geo-interface protocol (See # https://gist.github.com/sgillies/2217756) __geo_interface__ = property(as_dict) def _normalize_bands(self) -> None: """Normalize bands in properties and each asset from STAC 1.0 (``eo:bands`` / ``raster:bands``) to STAC 1.1 (``bands``), in place. """ normalize_bands(self.properties) for key in self.assets: normalize_bands(self.assets[key]) def __repr__(self) -> str: try: return "{}(id={}, provider={})".format( self.__class__.__name__, self.properties.get("id", "?"), self.provider ) except KeyError as e: raise MisconfiguredError( f"Unable to get {e.args[0]} key from EOProduct.properties" ) def _register_downloader_from_manager(self, plugins_manager: PluginManager) -> None: """Register the downloader and authenticator for this EOProduct using the provided plugins manager. This method is typically called after the EOProduct has been created and before any download operation is performed. :param plugins_manager: The plugins manager instance to use for retrieving the download and authentication plugins. """ download_plugin = plugins_manager.get_download_plugin(self) if len(self.assets) > 0: matching_url = next(iter(self.assets.values()))["href"] elif self.properties.get("order:status") != ONLINE_STATUS: matching_url = self.properties.get( "eodag:order_link" ) or self.properties.get("eodag:download_link") else: matching_url = self.properties.get("eodag:download_link") try: auth_plugin = next( plugins_manager.get_auth_plugins( self.provider, matching_url=matching_url, matching_conf=download_plugin.config, ) ) except StopIteration: auth_plugin = None self.register_downloader(download_plugin, auth_plugin) def register_downloader( self, downloader: Union[Api, Download], authenticator: Optional[Authentication] ) -> None: """Give to the product the information needed to download itself. :param downloader: The download method that it can use :class:`~eodag.plugins.download.base.Download` or :class:`~eodag.plugins.api.base.Api` :param authenticator: The authentication method needed to perform the download :class:`~eodag.plugins.authentication.base.Authentication` """ self.downloader = downloader self.downloader_auth = authenticator # resolve locations and properties if needed with downloader configuration location_attrs = ("location", "remote_location") for location_attr in location_attrs: if "%(" in getattr(self, location_attr): try: setattr( self, location_attr, getattr(self, location_attr) % vars(self.downloader.config), ) except ValueError as e: logger.debug( f"Could not resolve product.{location_attr} ({getattr(self, location_attr)})" f" in register_downloader: {str(e)}" ) for k, v in self.properties.items(): if isinstance(v, str) and "%(" in v: try: self.properties[k] = v % vars(self.downloader.config) except (TypeError, ValueError) as e: logger.debug( f"Could not resolve {k} property ({v}) in register_downloader: {str(e)}" )
[docs] def download( self, progress_callback: Optional[ProgressCallback] = None, executor: Optional[ThreadPoolExecutor] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> str: """Download the EO product using the provided download plugin and the authenticator if necessary. The actual download of the product occurs only at the first call of this method. A side effect of this method is that it changes the ``location`` attribute of an EOProduct, from its remote address to the local address. :param progress_callback: (optional) A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :param executor: (optional) An executor to download assets of the product in parallel if it has any. If ``None`` , a default executor will be created :param wait: (optional) If download fails, wait time in minutes between two download tries :param timeout: (optional) If download fails, maximum time in minutes before stop retrying to download :param kwargs: `output_dir` (str), `extract` (bool), `delete_archive` (bool) and `dl_url_params` (dict) can be provided as additional kwargs and will override any other values defined in a configuration file or with environment variables. :returns: The absolute path to the downloaded product on the local filesystem :raises: :class:`~eodag.utils.exceptions.PluginImplementationError` :raises: :class:`RuntimeError` """ if self.downloader is None: raise RuntimeError( "EO product is unable to download itself due to lacking of a " "download plugin" ) auth = ( self.downloader_auth.authenticate() if self.downloader_auth is not None else self.downloader_auth ) progress_callback, close_progress_callback = self._init_progress_bar( progress_callback, executor ) fs_path = self.downloader.download( self, auth=auth, progress_callback=progress_callback, executor=executor, wait=wait, timeout=timeout, **kwargs, ) # shutdown executor if it was not created during parallel product downloads if ( executor is not None and executor._thread_name_prefix != "eodag-download-all" ): executor.shutdown(wait=True) # close progress bar if needed if close_progress_callback: progress_callback.close() if fs_path is None: raise DownloadError("Missing file location returned by download process") logger.debug( "Product location updated from '%s' to '%s'", self.remote_location, self.location, ) if self.remote_location != NOT_AVAILABLE: logger.info( "Remote location of the product is still available through its " "'remote_location' property: %s", self.remote_location, ) return fs_path
[docs] def stream_download( self, byte_range: tuple[Optional[int], Optional[int]] = (None, None), compress: Literal["zip", "raw", "auto"] = "auto", wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> StreamResponse: """Download as StreamResponse the EO product using the provided download plugin and the authenticator if necessary. :param byte_range: (optional) Tuple of first index / last index byte to read :param compress: (optional) "zip", "raw", "auto" :param wait: (optional) If download fails, wait time in minutes between two download tries :param timeout: (optional) If download fails, maximum time in minutes before stop retrying to download :param kwargs: additional kwargs like `dl_url_params` (dict) can be provided and will override any other values defined in a configuration file or with environment variables. :returns: StreamResponse Stream representation of a file :raises: :class:`~eodag.utils.exceptions.PluginImplementationError` :raises: :class:`RuntimeError` """ if self.downloader is None: raise RuntimeError( "EO product is unable to stream_download itself due to lacking of a " "download plugin" ) auth = ( self.downloader_auth.authenticate() if self.downloader_auth is not None else self.downloader_auth ) return self.downloader.stream_download( self, auth, byte_range, compress, wait=wait, timeout=timeout, **kwargs, )
def _init_progress_bar( self, progress_callback: Optional[ProgressCallback], executor: Optional[ThreadPoolExecutor], ) -> tuple[ProgressCallback, bool]: # determine position of the progress bar with a counter of executor passings # to avoid bar overwriting in case of parallel downloads count = executor._counter() if executor is not None else 1 # type: ignore # progress bar init if progress_callback is None: progress_callback = ProgressCallback(position=count) # one shot progress callback to close after download close_progress_callback = True else: close_progress_callback = False progress_callback.pos = count # update units as bar may have been previously used for extraction progress_callback.unit = "B" progress_callback.unit_scale = True progress_callback.desc = str(self.properties.get("id", "")) progress_callback.refresh() return (progress_callback, close_progress_callback) def _download_quicklook( self, quicklook_file: str, progress_callback: ProgressCallback, ssl_verify: Optional[bool] = None, auth: Optional[AuthBase] = None, ): """Download the quicklook image from the EOProduct's quicklook URL. This method performs an HTTP GET request to retrieve the quicklook image and saves it locally at the specified path. It optionally verifies SSL certificates, uses HTTP authentication, and can display a download progress if a callback is provided. :param quicklook_file: The full path (including filename) where the quicklook will be saved. :param progress_callback: A callable that accepts the current and total download sizes to display or log the download progress. It must support `reset(total)` and be callable with downloaded chunk sizes. :param ssl_verify: (optional) Whether to verify SSL certificates. Defaults to True. :param auth: (optional) Authentication credentials (e.g., tuple or object) used for the HTTP request if the resource requires authentication. :raises HTTPError: If the HTTP request to the quicklook URL fails. """ with requests.get( self.properties["eodag:quicklook"], stream=True, auth=auth, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, verify=ssl_verify, ) as stream: stream.raise_for_status() stream_size = int(stream.headers.get("Content-Length", 0)) progress_callback.reset(stream_size) with open(quicklook_file, "wb") as fhandle: for chunk in stream.iter_content(chunk_size=64 * 1024): if chunk: fhandle.write(chunk) progress_callback(len(chunk)) logger.info("Download recorded in %s", quicklook_file)
[docs] def get_quicklook( self, filename: Optional[str] = None, output_dir: Optional[str] = None, progress_callback: Optional[ProgressCallback] = None, ) -> str: """Download the quicklook image of a given EOProduct from its provider if it exists. This method retrieves the quicklook URL from the EOProduct metadata and delegates the download to the internal `download_quicklook` method. :param filename: (optional) The name to give to the downloaded quicklook. If not given, it defaults to the product's ID (without file extension). :param output_dir: (optional) The absolute path of the directory where to store the quicklooks in the filesystem. If not given, it defaults to the `quicklooks` directory under this EO product downloader's ``output_dir`` config param (e.g. '/tmp/quicklooks/') :param progress_callback: (optional) A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :returns: The absolute path of the downloaded quicklook """ def format_quicklook_address() -> None: """If the quicklook address is a Python format string, resolve the formatting with the properties of the product.""" fstrmatch = re.match(r".*{.+}*.*", self.properties["eodag:quicklook"]) if fstrmatch: self.properties["eodag:quicklook"] = format_string( None, self.properties["eodag:quicklook"], **{ prop_key: prop_val for prop_key, prop_val in self.properties.items() if prop_key != "eodag:quicklook" }, ) if self.properties.get("eodag:quicklook") is None: logger.warning( "Missing information to retrieve quicklook for EO product: %s", self.properties["id"], ) return "" format_quicklook_address() if output_dir is not None: quicklooks_output_dir = os.path.abspath(os.path.realpath(output_dir)) else: tempdir = tempfile.gettempdir() downloader_output_dir = ( getattr(self.downloader.config, "output_dir", tempdir) if self.downloader else tempdir ) quicklooks_output_dir = os.path.join(downloader_output_dir, "quicklooks") if not os.path.isdir(quicklooks_output_dir): os.makedirs(quicklooks_output_dir) quicklook_file = os.path.join( quicklooks_output_dir, filename if filename is not None else self.properties["id"], ) if not os.path.isfile(quicklook_file): # progress bar init if progress_callback is None: progress_callback = ProgressCallback() # one shot progress callback to close after download close_progress_callback = True else: close_progress_callback = False # update units as bar may have been previously used for extraction progress_callback.unit = "B" progress_callback.unit_scale = True progress_callback.desc = "quicklooks/%s" % self.properties.get("id", "") # VERY SPECIAL CASE (introduced by the onda provider): first check if # it is a HTTP URL. If not, we assume it is a base64 string, in which case # we just decode the content, write it into the quicklook_file and return it. if not ( self.properties["eodag:quicklook"].startswith("http") or self.properties["eodag:quicklook"].startswith("https") ): with open(quicklook_file, "wb") as fd: img = self.properties["eodag:quicklook"].encode("ascii") fd.write(base64.b64decode(img)) return quicklook_file auth = ( self.downloader_auth.authenticate() if self.downloader_auth is not None else None ) if not isinstance(auth, AuthBase): auth = None # Read the ssl_verify parameter used on the provider config # to ensure the same behavior for get_quicklook as other download functions ssl_verify = ( getattr(self.downloader.config, "ssl_verify", True) if self.downloader else True ) try: self._download_quicklook( quicklook_file, progress_callback, ssl_verify, auth ) except RequestException as e: logger.debug( f"Error while getting resource with authentication. {e} \nTrying without authentication..." ) try: self._download_quicklook( quicklook_file, progress_callback, ssl_verify, None ) except RequestException as e_no_auth: logger.error( f"Failed to get resource with authentication: {e} \n \ Failed to get resource even without authentication. {e_no_auth}" ) return "" # close progress bar if needed if close_progress_callback: progress_callback.close() return quicklook_file
def get_driver(self) -> DatasetDriver: """Get the most appropriate driver""" for driver_conf in DRIVERS: # Select a driver if all criterias match match = True for criteria in driver_conf["criteria"]: if not criteria(self): match = False break if match: return driver_conf["driver"] return GenericDriver() def _repr_html_(self): thumbnail = self.properties.get("eodag:thumbnail") or self.properties.get( "eodag:quicklook" ) thumbnail_html = ( f"<img src='{thumbnail}' width=100 alt='thumbnail'/>" if thumbnail and not thumbnail.startswith("s3") else "" ) geom_style = "style='color: grey; text-align: center; min-width:100px; vertical-align: top;'" thumbnail_style = ( "style='padding-top: 1.5em; min-width:100px; vertical-align: top;'" ) return f"""<table> <thead><tr style='background-color: transparent;'><td style='text-align: left; color: grey;'> {type(self).__name__} </td></tr></thead> <tr style='background-color: transparent;'> <td style='text-align: left; vertical-align: top;'> {dict_to_html_table({ "provider": self.provider, "collection": self.collection, "properties[&quot;id&quot;]": self.properties.get('id'), "properties[&quot;start_datetime&quot;]": self.properties.get( 'start_datetime' ), "properties[&quot;end_datetime&quot;]": self.properties.get( 'end_datetime' ), }, brackets=False)} <details><summary style='color: grey; margin-top: 10px;'>properties:&ensp;({len( self.properties)})</summary>{ dict_to_html_table(self.properties, depth=1)}</details> <details><summary style='color: grey; margin-top: 10px;'>assets:&ensp;({len( self.assets)})</summary>{self.assets._repr_html_(embeded=True)}</details> </td> <td {geom_style} title='geometry'>geometry<br />{self.geometry._repr_svg_()}</td> <td {thumbnail_style} title='properties[&quot;thumbnail&quot;]'>{thumbnail_html}</td> </tr> </table>""" def to_xarray( self, asset_key: Optional[str] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, roles: Iterable[str] = {"data", "data-mask"}, **xarray_kwargs: Any, ): """ Return product data as a dictionary of :class:`xarray.Dataset`. :param asset_key: (optional) key of the asset. If not specified the whole product data will be retrieved :param wait: (optional) If order is needed, wait time in minutes between two order status check :param timeout: (optional) If order is needed, maximum time in minutes before stop checking order status :param roles: (optional) roles of assets that must be fetched :param xarray_kwargs: (optional) keyword arguments passed to :func:`xarray.open_dataset` :returns: a dictionary of :class:`xarray.Dataset` """ raise NotImplementedError("Install eodag-cube to make this method available.") def augment_from_xarray( self, roles: Iterable[str] = {"data", "data-mask"}, ) -> EOProduct: """ Annotate the product properties and assets with STAC metadata got by fetching its xarray representation. :param roles: (optional) roles of assets that must be fetched :returns: updated EOProduct """ raise NotImplementedError("Install eodag-cube to make this method available.") @classmethod def _import_stac_item_from_serialized( cls, feature: dict[str, Any], plugins_manager: Optional[PluginManager] = None ) -> EOProduct: """Import a STAC item from a EODAG serialized EOProduct. :param feature: A STAC item as a dictionary :param plugins_manager: The EODAG plugin manager instance :returns: An EOProduct created from the STAC item :raises: :class:`~eodag.utils.exceptions.ValidationError` """ try: collection = feature.get("collection") properties = deepcopy(feature["properties"]) properties["geometry"] = feature["geometry"] properties["id"] = feature["id"] provider = properties.pop("eodag:provider") search_intersection = properties.pop("eodag:search_intersection") except KeyError as e: raise ValidationError( "Key %s not found in geojson, make sure it comes from a serialized SearchResult or EOProduct" % e.args[0] ) from e obj = cls(provider, properties, collection=collection) obj.search_intersection = geometry.shape(search_intersection) obj.assets.update(feature.get("assets", {})) if plugins_manager is not None: # register downloader = plugins_manager.get_download_plugin(obj) auth = obj.downloader_auth if auth is None: auth = plugins_manager.get_auth_plugin(downloader, obj) obj.register_downloader(downloader, auth) return obj @classmethod def _from_stac_item( cls, feature: dict[str, Any], plugins_manager: PluginManager, provider: Optional[str] = None, raise_errors: bool = False, ) -> Optional[EOProduct]: """Create a SearchResult from a STAC item. :param feature: A STAC item as a dictionary :param plugins_manager: The EODAG plugin manager instance :provider: (optional) The provider to which the STAC item belongs, if known. If not provided, the method will try to determine it from the STAC item properties. :param raise_errors: (optional) Whether to raise exceptions in case of errors during the deserialize process. If False, several import methods will be tried: from serialized, from eodag-server, from known provider, from unknown provider. :returns: An EOProduct created from the STAC item """ result: Optional[EOProduct] = None try: # try importing from a serialized EODAG EOProduct if result := cls._import_stac_item_from_serialized( feature, plugins_manager ): return result except ValidationError: if raise_errors: raise # Try importing from EODAG Server if result := _import_stac_item_from_eodag_server(feature, plugins_manager): return result # try importing from a known STAC provider if result := _import_stac_item_from_known_provider( feature, plugins_manager, provider ): return result # try importing from an unknown STAC provider return _import_stac_item_from_unknown_provider(feature, plugins_manager)