# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import base64
import datetime as dt
import logging
import os
import re
import tempfile
from typing import (
TYPE_CHECKING,
Any,
Callable,
Iterable,
Literal,
Optional,
Union,
cast,
)
import geojson
import orjson
import requests
from pystac import Item
from requests import RequestException
from requests.auth import AuthBase
from shapely import geometry
from shapely.errors import ShapelyError
from eodag.types.queryables import CommonStacMetadata
from eodag.types.stac_metadata import create_stac_metadata_model
try:
# import from eodag-cube if installed
from eodag_cube.api.product import ( # pyright: ignore[reportMissingImports]
AssetsDict,
)
except ImportError:
from ._assets import AssetsDict
from eodag.api.product.drivers import DRIVERS
from eodag.api.product.drivers.generic import GenericDriver
from eodag.api.product.metadata_mapping import (
DEFAULT_GEOMETRY,
NOT_AVAILABLE,
NOT_MAPPED,
ONLINE_STATUS,
normalize_bands,
)
from eodag.utils import (
DEFAULT_DOWNLOAD_TIMEOUT,
DEFAULT_DOWNLOAD_WAIT,
DEFAULT_SHAPELY_GEOMETRY,
DEFAULT_STREAM_REQUESTS_TIMEOUT,
GENERIC_STAC_PROVIDER,
STAC_VERSION,
USER_AGENT,
ProgressCallback,
StreamResponse,
_deprecated,
deepcopy,
format_string,
get_geometry_from_various,
)
from eodag.utils.deserialize import (
_import_stac_item_from_eodag_server,
_import_stac_item_from_known_provider,
_import_stac_item_from_unknown_provider,
)
from eodag.utils.exceptions import DownloadError, MisconfiguredError, ValidationError
from eodag.utils.repr import dict_to_html_table
if TYPE_CHECKING:
from concurrent.futures import ThreadPoolExecutor
from requests.structures import CaseInsensitiveDict
from shapely.geometry.base import BaseGeometry
from eodag import EODataAccessGateway
from eodag.api.product.drivers.base import DatasetDriver
from eodag.plugins.apis.base import Api
from eodag.plugins.authentication.base import Authentication
from eodag.plugins.download.base import Download
from eodag.plugins.manager import PluginManager
from eodag.types.download_args import DownloadConf
from eodag.utils import Unpack
logger = logging.getLogger("eodag.product")
[docs]
class EOProduct:
"""A wrapper around an Earth Observation Product originating from a search.
Every Search plugin instance must build an instance of this class for each of
the result of its query method, and return a list of such instances. A EOProduct
has a `location` attribute that initially points to its remote location, but is
later changed to point to its path on the filesystem when the product has been
downloaded. It also has a `remote_location` that always points to the remote
location, so that the product can be downloaded at anytime if it is deleted from
the filesystem. An EOProduct instance also has a reference to the search
parameters that led to its creation.
:param provider: The provider from which the product originates
:param properties: The metadata of the product
.. note::
The geojson spec `enforces <https://github.com/geojson/draft-geojson/pull/6>`_
the expression of geometries as
WGS84 CRS (EPSG:4326) coordinates and EOProduct is intended to be transmitted
as geojson between applications. Therefore it stores geometries in the before
mentioned CRS.
"""
#: The provider from which the product originates
provider: str
#: The metadata of the product
properties: dict[str, Any]
#: The collection
collection: Optional[str]
#: The geometry of the product
geometry: BaseGeometry
#: The intersection between the product's geometry and the search area.
search_intersection: Optional[BaseGeometry]
#: The path to the product, either remote or local if downloaded
location: str
#: The remote path to the product
remote_location: str
#: Assets of the product
assets: AssetsDict
#: Driver enables additional methods to be called on the EOProduct
driver: DatasetDriver
#: Product data filename, stored during download
filename: str
#: Product search keyword arguments, stored during search
search_kwargs: Any
#: Datetime for download next try
next_try: dt.datetime
#: Stream for requests
_stream: requests.Response
#: HTTP response headers, stored during streamed download
headers: CaseInsensitiveDict[str]
#: Product size in bytes, stored during streamed download
size: Optional[int]
#: Backup of the original ``register_downloader``, set by search plugins that
#: patch ``register_downloader`` on the instance
register_downloader_only: Callable[..., None]
def __init__(
self, provider: str, properties: dict[str, Any], **kwargs: Any
) -> None:
self.provider = provider
self.collection = (
kwargs.get("collection")
or properties.pop("collection", None)
or properties.get("_collection")
)
self.location = self.remote_location = properties.get("eodag:download_link", "")
self.assets = AssetsDict(self)
self.properties = {
key: value
for key, value in properties.items()
if key != "geometry"
and value != NOT_MAPPED
and NOT_AVAILABLE not in str(value)
and not key.startswith("_")
}
self.properties.setdefault(
"datetime",
self.properties.get("start_datetime")
or self.properties.get("end_datetime"),
)
# sort properties to have common stac properties first
common_stac_properties = {
key: self.properties[key]
for key in sorted(self.properties)
if ":" not in key
}
extensions_stac_properties = {
key: self.properties[key] for key in sorted(self.properties) if ":" in key
}
self.properties = common_stac_properties | extensions_stac_properties
if "geometry" not in properties or (
(
properties["geometry"] == NOT_AVAILABLE
or properties["geometry"] == NOT_MAPPED
)
and "eodag:default_geometry" not in properties
):
product_geometry = DEFAULT_SHAPELY_GEOMETRY
elif not properties["geometry"] or properties["geometry"] == NOT_AVAILABLE:
product_geometry = properties.pop(
"eodag:default_geometry", DEFAULT_GEOMETRY
)
else:
product_geometry = properties["geometry"]
geometry_obj = get_geometry_from_various(geometry=product_geometry)
# whole world as default geometry
if geometry_obj is None:
geometry_obj = DEFAULT_SHAPELY_GEOMETRY
self.geometry = self.search_intersection = geometry_obj
self.search_kwargs = kwargs
if self.search_kwargs.get("geometry") is not None:
searched_geom = get_geometry_from_various(
**{"geometry": self.search_kwargs["geometry"]}
)
try:
self.search_intersection = self.geometry.intersection(searched_geom)
except ShapelyError:
logger.warning(
"Unable to intersect the requested extent: %s with the product "
"geometry: %s",
searched_geom,
product_geometry,
)
self.search_intersection = None
self.driver = self.get_driver()
self.downloader: Optional[Union[Api, Download]] = None
self.downloader_auth: Optional[Authentication] = None
[docs]
def as_dict(self, skip_invalid: bool = True) -> dict[str, Any]:
"""Builds a representation of EOProduct as a dictionary to enable its geojson
serialization
:param skip_invalid: Whether to skip properties whose values are not valid according to the STAC specification.
:returns: The representation of a :class:`~eodag.api.product._product.EOProduct` as a
Python dict
"""
search_intersection = None
if self.search_intersection is not None:
search_intersection = orjson.loads(
orjson.dumps(self.search_intersection.__geo_interface__)
)
# product properties
stac_properties = {
**{
key: value
for key, value in self.properties.items()
if key not in ("geometry", "id")
},
"eodag:provider": self.provider,
"eodag:search_intersection": search_intersection,
"federation:backends": [self.provider],
}
stac_providers = self.properties.get("providers", [])
if not any("host" in p.get("roles", []) for p in stac_providers):
stac_providers.append({"name": self.provider, "roles": ["host"]})
stac_properties["providers"] = stac_providers
props_model = cast(type[CommonStacMetadata], create_stac_metadata_model())
props_validated = props_model.safe_validate(
stac_properties, skip_invalid=skip_invalid
)
stac_extensions: set[str] = set(props_validated.get_conformance_classes())
# skip invalid properties
if skip_invalid:
props_validated_dict = props_validated.model_dump(
by_alias=False, exclude_unset=False
)
pythonic_fields_properties = {
props_model.get_field_from_alias(k): v
for k, v in stac_properties.items()
}
invalid_properties = {
k
for k, v in pythonic_fields_properties.items()
# keep none values
if props_model.has_field(k)
and props_validated_dict[k] is None
and v is not None
}
for key in invalid_properties:
stac_key = props_model.model_fields[key].alias or key
stac_properties.pop(stac_key, None)
# get conformance classes for assets properties
assets_dict = {**self.assets.as_dict()}
for asset_key, asset_properties in self.assets.as_dict().items():
asset_props_validated = props_model.safe_validate(
asset_properties, skip_invalid=skip_invalid
)
stac_extensions.update(asset_props_validated.get_conformance_classes())
# skip invalid assets properties
if skip_invalid:
invalid_asset_properties = {
k
for k in asset_properties.keys()
if k not in asset_props_validated.model_dump()
and props_model.has_field(k)
}
for key in invalid_asset_properties:
assets_dict[asset_key].pop(key, None)
geojson_repr: dict[str, Any] = {
"type": "Feature",
"geometry": orjson.loads(orjson.dumps(self.geometry.__geo_interface__)),
"bbox": list(self.geometry.bounds),
"id": self.properties["id"],
"assets": assets_dict,
"properties": stac_properties,
"links": [
{
"rel": "collection",
"href": f"{self.collection}.json",
"type": "application/json",
},
],
"stac_extensions": list(stac_extensions),
"stac_version": STAC_VERSION,
"collection": self.collection,
}
return geojson_repr
[docs]
def as_pystac_object(self, skip_invalid: bool = True) -> Item:
"""Builds a representation of EOProduct as a pystac Item to enable its manipulation with pystac methods
:param skip_invalid: Whether to skip properties whose values are not valid according to the STAC specification.
:returns: The representation of a :class:`~eodag.api.product._product.EOProduct` as a :class:`pystac.Item`
"""
prod_dict = self.as_dict(skip_invalid=skip_invalid)
return Item.from_dict(prod_dict)
[docs]
@classmethod
def from_dict(
cls,
feature: dict[str, Any],
dag: Optional[EODataAccessGateway] = None,
raise_errors: bool = False,
) -> EOProduct:
"""Builds an :class:`~eodag.api.product._product.EOProduct` object from its
serialized representation as a Python dict.
:param feature: The representation of a :class:`~eodag.api.product._product.EOProduct`
as a Python dict
:param dag: (optional) The EODataAccessGateway instance to use for registering the product downloader. If not
provided, the downloader and authenticator will not be registered.
:param raise_errors: (optional) Whether to raise exceptions in case of errors during the deserialize process.
If False, and if ``dag`` is given, several import methods will be tried: from serialized,
from eodag-server, from known provider, from unknown provider.
:returns: An instance of :class:`~eodag.api.product._product.EOProduct`
:raises: :class:`~eodag.utils.exceptions.ValidationError`
"""
if dag is not None:
# add a generic STAC provider that might be needed to handle the items
dag.add_provider(GENERIC_STAC_PROVIDER)
plugin_manager = dag._plugins_manager
product = cls._from_stac_item(
feature, plugin_manager, provider=None, raise_errors=raise_errors
)
if product is None:
raise ValidationError(
"Unable to build EOProduct from the provided dictionary, no import method succeeded"
)
else:
product = cls._import_stac_item_from_serialized(feature)
return product
[docs]
@classmethod
def from_file(
cls,
filepath: str,
dag: Optional[EODataAccessGateway] = None,
raise_errors: bool = False,
) -> EOProduct:
"""Builds an :class:`~eodag.api.product._product.EOProduct` object from a file containing its serialized
representation as geojson.
:param filepath: The path to the file containing the serialized representation of a product
:param dag: (optional) The EODataAccessGateway instance to use for registering the product downloader. If not
provided, the downloader and authenticator will not be registered.
:param raise_errors: (optional) Whether to raise exceptions in case of errors during the deserialize process.
If False, several import methods will be tried: from serialized, from eodag-server, from
known provider, from unknown provider.
:returns: An instance of :class:`~eodag.api.product._product.EOProduct`
:raises: :class:`~eodag.utils.exceptions.ValidationError`
"""
with open(filepath, "r") as fh:
feature = geojson.load(fh)
return cls.from_dict(feature, dag=dag, raise_errors=raise_errors)
[docs]
@classmethod
def from_pystac(
cls,
item: Item,
dag: Optional[EODataAccessGateway] = None,
raise_errors: bool = False,
) -> EOProduct:
"""Builds an :class:`~eodag.api.product._product.EOProduct` object from a pystac Item.
:param item: The :class:`pystac.Item` containing the metadata of the product
:param dag: (optional) The EODataAccessGateway instance to use for registering the product downloader. If not
provided, the downloader and authenticator will not be registered.
:param raise_errors: (optional) Whether to raise exceptions in case of errors during the deserialize process.
If False, several import methods will be tried: from serialized, from eodag-server, from
known provider, from unknown provider.
:returns: An instance of :class:`~eodag.api.product._product.EOProduct`
:raises: :class:`~eodag.utils.exceptions.ValidationError`
"""
feature = item.to_dict()
return cls.from_dict(feature, dag=dag, raise_errors=raise_errors)
@classmethod
@_deprecated(
reason="Please use 'EOProduct.from_dict' instead",
version="4.1.0",
)
def from_geojson(cls, feature: dict[str, Any]) -> EOProduct:
"""Builds an :class:`~eodag.api.product._product.EOProduct` object from its
representation as geojson
:param feature: The representation of a :class:`~eodag.api.product._product.EOProduct`
as a Python dict
:returns: An instance of :class:`~eodag.api.product._product.EOProduct`
:raises: :class:`~eodag.utils.exceptions.ValidationError`
"""
return cls.from_dict(feature, raise_errors=True)
# Implementation of geo-interface protocol (See
# https://gist.github.com/sgillies/2217756)
__geo_interface__ = property(as_dict)
def _normalize_bands(self) -> None:
"""Normalize bands in properties and each asset
from STAC 1.0 (``eo:bands`` / ``raster:bands``) to STAC 1.1 (``bands``), in place.
"""
normalize_bands(self.properties)
for key in self.assets:
normalize_bands(self.assets[key])
def __repr__(self) -> str:
try:
return "{}(id={}, provider={})".format(
self.__class__.__name__, self.properties.get("id", "?"), self.provider
)
except KeyError as e:
raise MisconfiguredError(
f"Unable to get {e.args[0]} key from EOProduct.properties"
)
def _register_downloader_from_manager(self, plugins_manager: PluginManager) -> None:
"""Register the downloader and authenticator for this EOProduct using the
provided plugins manager.
This method is typically called after the EOProduct has been created and
before any download operation is performed.
:param plugins_manager: The plugins manager instance to use for retrieving
the download and authentication plugins.
"""
download_plugin = plugins_manager.get_download_plugin(self)
if len(self.assets) > 0:
matching_url = next(iter(self.assets.values()))["href"]
elif self.properties.get("order:status") != ONLINE_STATUS:
matching_url = self.properties.get(
"eodag:order_link"
) or self.properties.get("eodag:download_link")
else:
matching_url = self.properties.get("eodag:download_link")
try:
auth_plugin = next(
plugins_manager.get_auth_plugins(
self.provider,
matching_url=matching_url,
matching_conf=download_plugin.config,
)
)
except StopIteration:
auth_plugin = None
self.register_downloader(download_plugin, auth_plugin)
def register_downloader(
self, downloader: Union[Api, Download], authenticator: Optional[Authentication]
) -> None:
"""Give to the product the information needed to download itself.
:param downloader: The download method that it can use
:class:`~eodag.plugins.download.base.Download` or
:class:`~eodag.plugins.api.base.Api`
:param authenticator: The authentication method needed to perform the download
:class:`~eodag.plugins.authentication.base.Authentication`
"""
self.downloader = downloader
self.downloader_auth = authenticator
# resolve locations and properties if needed with downloader configuration
location_attrs = ("location", "remote_location")
for location_attr in location_attrs:
if "%(" in getattr(self, location_attr):
try:
setattr(
self,
location_attr,
getattr(self, location_attr) % vars(self.downloader.config),
)
except ValueError as e:
logger.debug(
f"Could not resolve product.{location_attr} ({getattr(self, location_attr)})"
f" in register_downloader: {str(e)}"
)
for k, v in self.properties.items():
if isinstance(v, str) and "%(" in v:
try:
self.properties[k] = v % vars(self.downloader.config)
except (TypeError, ValueError) as e:
logger.debug(
f"Could not resolve {k} property ({v}) in register_downloader: {str(e)}"
)
[docs]
def download(
self,
progress_callback: Optional[ProgressCallback] = None,
executor: Optional[ThreadPoolExecutor] = None,
wait: float = DEFAULT_DOWNLOAD_WAIT,
timeout: float = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Unpack[DownloadConf],
) -> str:
"""Download the EO product using the provided download plugin and the
authenticator if necessary.
The actual download of the product occurs only at the first call of this
method. A side effect of this method is that it changes the ``location``
attribute of an EOProduct, from its remote address to the local address.
:param progress_callback: (optional) A method or a callable object
which takes a current size and a maximum
size as inputs and handle progress bar
creation and update to give the user a
feedback on the download progress
:param executor: (optional) An executor to download assets of the product in parallel if it has any. If ``None``
, a default executor will be created
:param wait: (optional) If download fails, wait time in minutes between
two download tries
:param timeout: (optional) If download fails, maximum time in minutes
before stop retrying to download
:param kwargs: `output_dir` (str), `extract` (bool), `delete_archive` (bool)
and `dl_url_params` (dict) can be provided as additional kwargs
and will override any other values defined in a configuration
file or with environment variables.
:returns: The absolute path to the downloaded product on the local filesystem
:raises: :class:`~eodag.utils.exceptions.PluginImplementationError`
:raises: :class:`RuntimeError`
"""
if self.downloader is None:
raise RuntimeError(
"EO product is unable to download itself due to lacking of a "
"download plugin"
)
auth = (
self.downloader_auth.authenticate()
if self.downloader_auth is not None
else self.downloader_auth
)
progress_callback, close_progress_callback = self._init_progress_bar(
progress_callback, executor
)
fs_path = self.downloader.download(
self,
auth=auth,
progress_callback=progress_callback,
executor=executor,
wait=wait,
timeout=timeout,
**kwargs,
)
# shutdown executor if it was not created during parallel product downloads
if (
executor is not None
and executor._thread_name_prefix != "eodag-download-all"
):
executor.shutdown(wait=True)
# close progress bar if needed
if close_progress_callback:
progress_callback.close()
if fs_path is None:
raise DownloadError("Missing file location returned by download process")
logger.debug(
"Product location updated from '%s' to '%s'",
self.remote_location,
self.location,
)
if self.remote_location != NOT_AVAILABLE:
logger.info(
"Remote location of the product is still available through its "
"'remote_location' property: %s",
self.remote_location,
)
return fs_path
[docs]
def stream_download(
self,
byte_range: tuple[Optional[int], Optional[int]] = (None, None),
compress: Literal["zip", "raw", "auto"] = "auto",
wait: float = DEFAULT_DOWNLOAD_WAIT,
timeout: float = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Unpack[DownloadConf],
) -> StreamResponse:
"""Download as StreamResponse the EO product using the provided download plugin and the
authenticator if necessary.
:param byte_range: (optional) Tuple of first index / last index byte to read
:param compress: (optional) "zip", "raw", "auto"
:param wait: (optional) If download fails, wait time in minutes between
two download tries
:param timeout: (optional) If download fails, maximum time in minutes
before stop retrying to download
:param kwargs: additional kwargs like `dl_url_params` (dict) can be provided
and will override any other values defined in a configuration
file or with environment variables.
:returns: StreamResponse Stream representation of a file
:raises: :class:`~eodag.utils.exceptions.PluginImplementationError`
:raises: :class:`RuntimeError`
"""
if self.downloader is None:
raise RuntimeError(
"EO product is unable to stream_download itself due to lacking of a "
"download plugin"
)
auth = (
self.downloader_auth.authenticate()
if self.downloader_auth is not None
else self.downloader_auth
)
return self.downloader.stream_download(
self,
auth,
byte_range,
compress,
wait=wait,
timeout=timeout,
**kwargs,
)
def _init_progress_bar(
self,
progress_callback: Optional[ProgressCallback],
executor: Optional[ThreadPoolExecutor],
) -> tuple[ProgressCallback, bool]:
# determine position of the progress bar with a counter of executor passings
# to avoid bar overwriting in case of parallel downloads
count = executor._counter() if executor is not None else 1 # type: ignore
# progress bar init
if progress_callback is None:
progress_callback = ProgressCallback(position=count)
# one shot progress callback to close after download
close_progress_callback = True
else:
close_progress_callback = False
progress_callback.pos = count
# update units as bar may have been previously used for extraction
progress_callback.unit = "B"
progress_callback.unit_scale = True
progress_callback.desc = str(self.properties.get("id", ""))
progress_callback.refresh()
return (progress_callback, close_progress_callback)
def _download_quicklook(
self,
quicklook_file: str,
progress_callback: ProgressCallback,
ssl_verify: Optional[bool] = None,
auth: Optional[AuthBase] = None,
):
"""Download the quicklook image from the EOProduct's quicklook URL.
This method performs an HTTP GET request to retrieve the quicklook image and saves it
locally at the specified path. It optionally verifies SSL certificates, uses HTTP
authentication, and can display a download progress if a callback is provided.
:param quicklook_file: The full path (including filename) where the quicklook will be saved.
:param progress_callback: A callable that accepts the current and total download sizes
to display or log the download progress. It must support `reset(total)`
and be callable with downloaded chunk sizes.
:param ssl_verify: (optional) Whether to verify SSL certificates. Defaults to True.
:param auth: (optional) Authentication credentials (e.g., tuple or object) used for the
HTTP request if the resource requires authentication.
:raises HTTPError: If the HTTP request to the quicklook URL fails.
"""
with requests.get(
self.properties["eodag:quicklook"],
stream=True,
auth=auth,
headers=USER_AGENT,
timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT,
verify=ssl_verify,
) as stream:
stream.raise_for_status()
stream_size = int(stream.headers.get("Content-Length", 0))
progress_callback.reset(stream_size)
with open(quicklook_file, "wb") as fhandle:
for chunk in stream.iter_content(chunk_size=64 * 1024):
if chunk:
fhandle.write(chunk)
progress_callback(len(chunk))
logger.info("Download recorded in %s", quicklook_file)
[docs]
def get_quicklook(
self,
filename: Optional[str] = None,
output_dir: Optional[str] = None,
progress_callback: Optional[ProgressCallback] = None,
) -> str:
"""Download the quicklook image of a given EOProduct from its provider if it
exists.
This method retrieves the quicklook URL from the EOProduct metadata and delegates
the download to the internal `download_quicklook` method.
:param filename: (optional) The name to give to the downloaded quicklook. If not
given, it defaults to the product's ID (without file extension).
:param output_dir: (optional) The absolute path of the directory where to store
the quicklooks in the filesystem. If not given, it defaults to the
`quicklooks` directory under this EO product downloader's ``output_dir``
config param (e.g. '/tmp/quicklooks/')
:param progress_callback: (optional) A method or a callable object which takes
a current size and a maximum size as inputs and handle progress bar
creation and update to give the user a feedback on the download progress
:returns: The absolute path of the downloaded quicklook
"""
def format_quicklook_address() -> None:
"""If the quicklook address is a Python format string, resolve the
formatting with the properties of the product."""
fstrmatch = re.match(r".*{.+}*.*", self.properties["eodag:quicklook"])
if fstrmatch:
self.properties["eodag:quicklook"] = format_string(
None,
self.properties["eodag:quicklook"],
**{
prop_key: prop_val
for prop_key, prop_val in self.properties.items()
if prop_key != "eodag:quicklook"
},
)
if self.properties.get("eodag:quicklook") is None:
logger.warning(
"Missing information to retrieve quicklook for EO product: %s",
self.properties["id"],
)
return ""
format_quicklook_address()
if output_dir is not None:
quicklooks_output_dir = os.path.abspath(os.path.realpath(output_dir))
else:
tempdir = tempfile.gettempdir()
downloader_output_dir = (
getattr(self.downloader.config, "output_dir", tempdir)
if self.downloader
else tempdir
)
quicklooks_output_dir = os.path.join(downloader_output_dir, "quicklooks")
if not os.path.isdir(quicklooks_output_dir):
os.makedirs(quicklooks_output_dir)
quicklook_file = os.path.join(
quicklooks_output_dir,
filename if filename is not None else self.properties["id"],
)
if not os.path.isfile(quicklook_file):
# progress bar init
if progress_callback is None:
progress_callback = ProgressCallback()
# one shot progress callback to close after download
close_progress_callback = True
else:
close_progress_callback = False
# update units as bar may have been previously used for extraction
progress_callback.unit = "B"
progress_callback.unit_scale = True
progress_callback.desc = "quicklooks/%s" % self.properties.get("id", "")
# VERY SPECIAL CASE (introduced by the onda provider): first check if
# it is a HTTP URL. If not, we assume it is a base64 string, in which case
# we just decode the content, write it into the quicklook_file and return it.
if not (
self.properties["eodag:quicklook"].startswith("http")
or self.properties["eodag:quicklook"].startswith("https")
):
with open(quicklook_file, "wb") as fd:
img = self.properties["eodag:quicklook"].encode("ascii")
fd.write(base64.b64decode(img))
return quicklook_file
auth = (
self.downloader_auth.authenticate()
if self.downloader_auth is not None
else None
)
if not isinstance(auth, AuthBase):
auth = None
# Read the ssl_verify parameter used on the provider config
# to ensure the same behavior for get_quicklook as other download functions
ssl_verify = (
getattr(self.downloader.config, "ssl_verify", True)
if self.downloader
else True
)
try:
self._download_quicklook(
quicklook_file, progress_callback, ssl_verify, auth
)
except RequestException as e:
logger.debug(
f"Error while getting resource with authentication. {e} \nTrying without authentication..."
)
try:
self._download_quicklook(
quicklook_file, progress_callback, ssl_verify, None
)
except RequestException as e_no_auth:
logger.error(
f"Failed to get resource with authentication: {e} \n \
Failed to get resource even without authentication. {e_no_auth}"
)
return ""
# close progress bar if needed
if close_progress_callback:
progress_callback.close()
return quicklook_file
def get_driver(self) -> DatasetDriver:
"""Get the most appropriate driver"""
for driver_conf in DRIVERS:
# Select a driver if all criterias match
match = True
for criteria in driver_conf["criteria"]:
if not criteria(self):
match = False
break
if match:
return driver_conf["driver"]
return GenericDriver()
def _repr_html_(self):
thumbnail = self.properties.get("eodag:thumbnail") or self.properties.get(
"eodag:quicklook"
)
thumbnail_html = (
f"<img src='{thumbnail}' width=100 alt='thumbnail'/>"
if thumbnail and not thumbnail.startswith("s3")
else ""
)
geom_style = "style='color: grey; text-align: center; min-width:100px; vertical-align: top;'"
thumbnail_style = (
"style='padding-top: 1.5em; min-width:100px; vertical-align: top;'"
)
return f"""<table>
<thead><tr style='background-color: transparent;'><td style='text-align: left; color: grey;'>
{type(self).__name__}
</td></tr></thead>
<tr style='background-color: transparent;'>
<td style='text-align: left; vertical-align: top;'>
{dict_to_html_table({
"provider": self.provider,
"collection": self.collection,
"properties["id"]": self.properties.get('id'),
"properties["start_datetime"]": self.properties.get(
'start_datetime'
),
"properties["end_datetime"]": self.properties.get(
'end_datetime'
),
}, brackets=False)}
<details><summary style='color: grey; margin-top: 10px;'>properties: ({len(
self.properties)})</summary>{
dict_to_html_table(self.properties, depth=1)}</details>
<details><summary style='color: grey; margin-top: 10px;'>assets: ({len(
self.assets)})</summary>{self.assets._repr_html_(embeded=True)}</details>
</td>
<td {geom_style} title='geometry'>geometry<br />{self.geometry._repr_svg_()}</td>
<td {thumbnail_style} title='properties["thumbnail"]'>{thumbnail_html}</td>
</tr>
</table>"""
def to_xarray(
self,
asset_key: Optional[str] = None,
wait: float = DEFAULT_DOWNLOAD_WAIT,
timeout: float = DEFAULT_DOWNLOAD_TIMEOUT,
roles: Iterable[str] = {"data", "data-mask"},
**xarray_kwargs: Any,
):
"""
Return product data as a dictionary of :class:`xarray.Dataset`.
:param asset_key: (optional) key of the asset. If not specified the whole
product data will be retrieved
:param wait: (optional) If order is needed, wait time in minutes between two
order status check
:param timeout: (optional) If order is needed, maximum time in minutes before
stop checking order status
:param roles: (optional) roles of assets that must be fetched
:param xarray_kwargs: (optional) keyword arguments passed to :func:`xarray.open_dataset`
:returns: a dictionary of :class:`xarray.Dataset`
"""
raise NotImplementedError("Install eodag-cube to make this method available.")
def augment_from_xarray(
self,
roles: Iterable[str] = {"data", "data-mask"},
) -> EOProduct:
"""
Annotate the product properties and assets with STAC metadata got by fetching its xarray representation.
:param roles: (optional) roles of assets that must be fetched
:returns: updated EOProduct
"""
raise NotImplementedError("Install eodag-cube to make this method available.")
@classmethod
def _import_stac_item_from_serialized(
cls, feature: dict[str, Any], plugins_manager: Optional[PluginManager] = None
) -> EOProduct:
"""Import a STAC item from a EODAG serialized EOProduct.
:param feature: A STAC item as a dictionary
:param plugins_manager: The EODAG plugin manager instance
:returns: An EOProduct created from the STAC item
:raises: :class:`~eodag.utils.exceptions.ValidationError`
"""
try:
collection = feature.get("collection")
properties = deepcopy(feature["properties"])
properties["geometry"] = feature["geometry"]
properties["id"] = feature["id"]
provider = properties.pop("eodag:provider")
search_intersection = properties.pop("eodag:search_intersection")
except KeyError as e:
raise ValidationError(
"Key %s not found in geojson, make sure it comes from a serialized SearchResult or EOProduct"
% e.args[0]
) from e
obj = cls(provider, properties, collection=collection)
obj.search_intersection = geometry.shape(search_intersection)
obj.assets.update(feature.get("assets", {}))
if plugins_manager is not None:
# register
downloader = plugins_manager.get_download_plugin(obj)
auth = obj.downloader_auth
if auth is None:
auth = plugins_manager.get_auth_plugin(downloader, obj)
obj.register_downloader(downloader, auth)
return obj
@classmethod
def _from_stac_item(
cls,
feature: dict[str, Any],
plugins_manager: PluginManager,
provider: Optional[str] = None,
raise_errors: bool = False,
) -> Optional[EOProduct]:
"""Create a SearchResult from a STAC item.
:param feature: A STAC item as a dictionary
:param plugins_manager: The EODAG plugin manager instance
:provider: (optional) The provider to which the STAC item belongs, if known. If not provided, the method will
try to determine it from the STAC item properties.
:param raise_errors: (optional) Whether to raise exceptions in case of errors during the deserialize process.
If False, several import methods will be tried: from serialized, from eodag-server, from
known provider, from unknown provider.
:returns: An EOProduct created from the STAC item
"""
result: Optional[EOProduct] = None
try:
# try importing from a serialized EODAG EOProduct
if result := cls._import_stac_item_from_serialized(
feature, plugins_manager
):
return result
except ValidationError:
if raise_errors:
raise
# Try importing from EODAG Server
if result := _import_stac_item_from_eodag_server(feature, plugins_manager):
return result
# try importing from a known STAC provider
if result := _import_stac_item_from_known_provider(
feature, plugins_manager, provider
):
return result
# try importing from an unknown STAC provider
return _import_stac_item_from_unknown_provider(feature, plugins_manager)