Source code for eodag.api.product._product

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import base64
import logging
import os
import re
import urllib.parse
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union

import requests
from requests import RequestException
from shapely import geometry, wkb, wkt
from shapely.errors import ShapelyError

from eodag.api.product._assets import AssetsDict
from eodag.api.product.drivers import DRIVERS, NoDriver
from eodag.api.product.metadata_mapping import NOT_AVAILABLE, NOT_MAPPED
from eodag.utils import (
    DEFAULT_DOWNLOAD_TIMEOUT,
    DEFAULT_DOWNLOAD_WAIT,
    DEFAULT_STREAM_REQUESTS_TIMEOUT,
    USER_AGENT,
    ProgressCallback,
    get_geometry_from_various,
)
from eodag.utils.exceptions import DownloadError, MisconfiguredError

if TYPE_CHECKING:
    from shapely.geometry.base import BaseGeometry

    from eodag.api.product.drivers.base import DatasetDriver
    from eodag.plugins.apis.base import Api
    from eodag.plugins.authentication.base import Authentication
    from eodag.plugins.download.base import Download

try:
    from shapely.errors import GEOSException
except ImportError:
    # shapely < 2.0 compatibility
    from shapely.errors import TopologicalError as GEOSException


logger = logging.getLogger("eodag.product")


[docs]class EOProduct: """A wrapper around an Earth Observation Product originating from a search. Every Search plugin instance must build an instance of this class for each of the result of its query method, and return a list of such instances. A EOProduct has a `location` attribute that initially points to its remote location, but is later changed to point to its path on the filesystem when the product has been downloaded. It also has a `remote_location` that always points to the remote location, so that the product can be downloaded at anytime if it is deleted from the filesystem. An EOProduct instance also has a reference to the search parameters that led to its creation. :param provider: The provider from which the product originates :type provider: str :param properties: The metadata of the product :type properties: dict :ivar product_type: The product type :vartype product_type: str :ivar location: The path to the product, either remote or local if downloaded :vartype location: str :ivar remote_location: The remote path to the product :vartype remote_location: str :ivar search_kwargs: The search kwargs used by eodag to search for the product :vartype search_kwargs: Any :ivar geometry: The geometry of the product :vartype geometry: :class:`shapely.geometry.base.BaseGeometry` :ivar search_intersection: The intersection between the product's geometry and the search area. :vartype search_intersection: :class:`shapely.geometry.base.BaseGeometry` or None .. note:: The geojson spec `enforces <https://github.com/geojson/draft-geojson/pull/6>`_ the expression of geometries as WGS84 CRS (EPSG:4326) coordinates and EOProduct is intended to be transmitted as geojson between applications. Therefore it stores geometries in the before mentioned CRS. """ provider: str properties: Dict[str, Any] product_type: Optional[str] location: str remote_location: str search_kwargs: Any geometry: BaseGeometry search_intersection: Optional[BaseGeometry] assets: AssetsDict def __init__( self, provider: str, properties: Dict[str, Any], **kwargs: Any ) -> None: self.provider = provider self.product_type = kwargs.get("productType") self.location = self.remote_location = properties.get("downloadLink", "") self.assets = AssetsDict(self) self.properties = { key: value for key, value in properties.items() if key != "geometry" and value != NOT_MAPPED and NOT_AVAILABLE not in str(value) } if "geometry" not in properties or ( ( properties["geometry"] == NOT_AVAILABLE or properties["geometry"] == NOT_MAPPED ) and "defaultGeometry" not in properties ): raise MisconfiguredError( f"No geometry available to build EOProduct(id={properties.get('id', None)}, provider={provider})" ) elif properties["geometry"] == NOT_AVAILABLE: product_geometry = properties.pop("defaultGeometry") else: product_geometry = properties["geometry"] # Let's try 'latmin lonmin latmax lonmax' if isinstance(product_geometry, str): bbox_pattern = re.compile( r"^(-?\d+\.?\d*) (-?\d+\.?\d*) (-?\d+\.?\d*) (-?\d+\.?\d*)$" ) found_bbox = bbox_pattern.match(product_geometry) if found_bbox: coords = found_bbox.groups() if len(coords) == 4: product_geometry = geometry.box( float(coords[1]), float(coords[0]), float(coords[3]), float(coords[2]), ) # Best effort to understand provider specific geometry (the default is to # assume an object implementing the Geo Interface: see # https://gist.github.com/2217756) if isinstance(product_geometry, str): try: product_geometry = wkt.loads(product_geometry) except (ShapelyError, GEOSException): try: product_geometry = wkb.loads(product_geometry) # Also catching TypeError because product_geometry can be a # string and not a bytes string except (ShapelyError, GEOSException, TypeError): # Giv up! raise self.geometry = self.search_intersection = geometry.shape(product_geometry) self.search_kwargs = kwargs if self.search_kwargs.get("geometry") is not None: searched_geom = get_geometry_from_various( **{"geometry": self.search_kwargs["geometry"]} ) try: self.search_intersection = self.geometry.intersection(searched_geom) except (GEOSException, ShapelyError): logger.warning( "Unable to intersect the requested extent: %s with the product " "geometry: %s", searched_geom, product_geometry, ) self.search_intersection = None self.driver = self.get_driver() self.downloader: Optional[Union[Api, Download]] = None self.downloader_auth: Optional[Authentication] = None
[docs] def as_dict(self) -> Dict[str, Any]: """Builds a representation of EOProduct as a dictionary to enable its geojson serialization :returns: The representation of a :class:`~eodag.api.product._product.EOProduct` as a Python dict :rtype: dict """ search_intersection = None if self.search_intersection is not None: search_intersection = geometry.mapping(self.search_intersection) geojson_repr: Dict[str, Any] = { "type": "Feature", "geometry": geometry.mapping(self.geometry), "id": self.properties["id"], "assets": self.assets.as_dict(), "properties": { "eodag_product_type": self.product_type, "eodag_provider": self.provider, "eodag_search_intersection": search_intersection, **{ key: value for key, value in self.properties.items() if key not in ("geometry", "id") }, }, } return geojson_repr
[docs] @classmethod def from_geojson(cls, feature: Dict[str, Any]) -> EOProduct: """Builds an :class:`~eodag.api.product._product.EOProduct` object from its representation as geojson :param feature: The representation of a :class:`~eodag.api.product._product.EOProduct` as a Python dict :type feature: dict :returns: An instance of :class:`~eodag.api.product._product.EOProduct` :rtype: :class:`~eodag.api.product._product.EOProduct` """ properties = feature["properties"] properties["geometry"] = feature["geometry"] properties["id"] = feature["id"] provider = feature["properties"]["eodag_provider"] product_type = feature["properties"]["eodag_product_type"] obj = cls(provider, properties, productType=product_type) obj.search_intersection = geometry.shape( feature["properties"]["eodag_search_intersection"] ) obj.assets = AssetsDict(obj, feature.get("assets", {})) return obj
# Implementation of geo-interface protocol (See # https://gist.github.com/sgillies/2217756) __geo_interface__ = property(as_dict) def __repr__(self) -> str: try: return "{}(id={}, provider={})".format( self.__class__.__name__, self.properties["id"], self.provider ) except KeyError as e: raise MisconfiguredError( f"Unable to get {e.args[0]} key from EOProduct.properties" ) def register_downloader( self, downloader: Union[Api, Download], authenticator: Optional[Authentication] ) -> None: """Give to the product the information needed to download itself. :param downloader: The download method that it can use :type downloader: Concrete subclass of :class:`~eodag.plugins.download.base.Download` or :class:`~eodag.plugins.api.base.Api` :param authenticator: The authentication method needed to perform the download :type authenticator: Concrete subclass of :class:`~eodag.plugins.authentication.base.Authentication` """ self.downloader = downloader self.downloader_auth = authenticator # resolve locations and properties if needed with downloader configuration location_attrs = ("location", "remote_location") for location_attr in location_attrs: try: setattr( self, location_attr, urllib.parse.unquote(getattr(self, location_attr)) % vars(self.downloader.config), ) except ValueError as e: logger.debug( f"Could not resolve product.{location_attr} ({getattr(self, location_attr)})" f" in register_downloader: {str(e)}" ) for k, v in self.properties.items(): if isinstance(v, str): try: if "%" in v: parsed = urllib.parse.urlparse(v) prop = urllib.parse.unquote(parsed.path) % vars( self.downloader.config ) parsed = parsed._replace(path=urllib.parse.quote(prop)) self.properties[k] = urllib.parse.urlunparse(parsed) else: self.properties[k] = v % vars(self.downloader.config) except (TypeError, ValueError) as e: logger.debug( f"Could not resolve {k} property ({v}) in register_downloader: {str(e)}" )
[docs] def download( self, progress_callback: Optional[ProgressCallback] = None, wait: int = DEFAULT_DOWNLOAD_WAIT, timeout: int = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Any, ) -> str: """Download the EO product using the provided download plugin and the authenticator if necessary. The actual download of the product occurs only at the first call of this method. A side effect of this method is that it changes the ``location`` attribute of an EOProduct, from its remote address to the local address. :param progress_callback: (optional) A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :type progress_callback: :class:`~eodag.utils.ProgressCallback` or None :param wait: (optional) If download fails, wait time in minutes between two download tries :type wait: int :param timeout: (optional) If download fails, maximum time in minutes before stop retrying to download :type timeout: int :param kwargs: `outputs_prefix` (str), `extract` (bool), `delete_archive` (bool) and `dl_url_params` (dict) can be provided as additional kwargs and will override any other values defined in a configuration file or with environment variables. :type kwargs: Union[str, bool, dict] :returns: The absolute path to the downloaded product on the local filesystem :rtype: str :raises: :class:`~eodag.utils.exceptions.PluginImplementationError` :raises: :class:`RuntimeError` """ if self.downloader is None: raise RuntimeError( "EO product is unable to download itself due to lacking of a " "download plugin" ) auth = ( self.downloader_auth.authenticate() if self.downloader_auth is not None else self.downloader_auth ) # resolve remote location if needed with downloader configuration self.remote_location = urllib.parse.unquote(self.remote_location) % vars( self.downloader.config ) if not self.location.startswith("file"): self.location = urllib.parse.unquote(self.location) progress_callback, close_progress_callback = self._init_progress_bar( progress_callback ) fs_path = self.downloader.download( self, auth=auth, progress_callback=progress_callback, wait=wait, timeout=timeout, **kwargs, ) # close progress bar if needed if close_progress_callback: progress_callback.close() if fs_path is None: raise DownloadError("Missing file location returned by download process") logger.debug( "Product location updated from '%s' to '%s'", self.remote_location, self.location, ) logger.info( "Remote location of the product is still available through its " "'remote_location' property: %s", self.remote_location, ) return fs_path
def _init_progress_bar( self, progress_callback: Optional[ProgressCallback] ) -> Tuple[ProgressCallback, bool]: # progress bar init if progress_callback is None: progress_callback = ProgressCallback(position=1) # one shot progress callback to close after download close_progress_callback = True else: close_progress_callback = False # update units as bar may have been previously used for extraction progress_callback.unit = "B" progress_callback.unit_scale = True progress_callback.desc = str(self.properties.get("id", "")) progress_callback.refresh() return (progress_callback, close_progress_callback)
[docs] def get_quicklook( self, filename: Optional[str] = None, base_dir: Optional[str] = None, progress_callback: Optional[ProgressCallback] = None, ) -> str: """Download the quicklook image of a given EOProduct from its provider if it exists. :param filename: (optional) The name to give to the downloaded quicklook. If not given, it defaults to the product's ID (without file extension). :type filename: str :param base_dir: (optional) The absolute path of the directory where to store the quicklooks in the filesystem. If not given, it defaults to the `quicklooks` directory under this EO product downloader's ``outputs_prefix`` config param (e.g. '/tmp/quicklooks/') :type base_dir: str :param progress_callback: (optional) A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :type progress_callback: :class:`~eodag.utils.ProgressCallback` or None :returns: The absolute path of the downloaded quicklook :rtype: str """ def format_quicklook_address() -> None: """If the quicklook address is a Python format string, resolve the formatting with the properties of the product.""" fstrmatch = re.match(r".*{.+}*.*", self.properties["quicklook"]) if fstrmatch: self.properties["quicklook"].format( { prop_key: prop_val for prop_key, prop_val in self.properties.items() if prop_key != "quicklook" } ) # progress bar init if progress_callback is None: progress_callback = ProgressCallback() # one shot progress callback to close after download close_progress_callback = True else: close_progress_callback = False # update units as bar may have been previously used for extraction progress_callback.unit = "B" progress_callback.unit_scale = True progress_callback.desc = "quicklooks/%s" % self.properties.get("id", "") if self.properties.get("quicklook", None) is None: logger.warning( "Missing information to retrieve quicklook for EO product: %s", self.properties["id"], ) return "" format_quicklook_address() if base_dir is not None: quicklooks_base_dir = os.path.abspath(os.path.realpath(base_dir)) else: quicklooks_base_dir = os.path.join( self.downloader.config.outputs_prefix, "quicklooks" ) if not os.path.isdir(quicklooks_base_dir): os.makedirs(quicklooks_base_dir) quicklook_file = os.path.join( quicklooks_base_dir, filename if filename is not None else self.properties["id"], ) if not os.path.isfile(quicklook_file): # VERY SPECIAL CASE (introduced by the onda provider): first check if # it is a HTTP URL. If not, we assume it is a base64 string, in which case # we just decode the content, write it into the quicklook_file and return it. if not ( self.properties["quicklook"].startswith("http") or self.properties["quicklook"].startswith("https") ): with open(quicklook_file, "wb") as fd: img = self.properties["quicklook"].encode("ascii") fd.write(base64.b64decode(img)) return quicklook_file auth = ( self.downloader_auth.authenticate() if self.downloader_auth is not None else None ) with requests.get( self.properties["quicklook"], stream=True, auth=auth, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, ) as stream: try: stream.raise_for_status() except RequestException as e: import traceback as tb logger.error("Error while getting resource :\n%s", tb.format_exc()) return str(e) else: stream_size = int(stream.headers.get("content-length", 0)) progress_callback.reset(stream_size) with open(quicklook_file, "wb") as fhandle: for chunk in stream.iter_content(chunk_size=64 * 1024): if chunk: fhandle.write(chunk) progress_callback(len(chunk)) logger.info("Download recorded in %s", quicklook_file) # close progress bar if needed if close_progress_callback: progress_callback.close() return quicklook_file
def get_driver(self) -> DatasetDriver: """Get the most appropriate driver""" try: for driver_conf in DRIVERS: if all([criteria(self) for criteria in driver_conf["criteria"]]): return driver_conf["driver"] except TypeError: logger.warning( "Drivers definition seems out-of-date, please update eodag-cube" ) pass return NoDriver()