# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import hashlib
import logging
import os
import shutil
import tarfile
import tempfile
import zipfile
from datetime import datetime, timedelta
from time import sleep
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Tuple,
TypeVar,
Union,
)
from eodag.plugins.base import PluginTopic
from eodag.utils import (
DEFAULT_DOWNLOAD_TIMEOUT,
DEFAULT_DOWNLOAD_WAIT,
ProgressCallback,
StreamResponse,
sanitize,
uri_to_path,
)
from eodag.utils.exceptions import (
AuthenticationError,
MisconfiguredError,
NotAvailableError,
)
from eodag.utils.notebook import NotebookWidgets
if TYPE_CHECKING:
from requests.auth import AuthBase
from eodag.api.product import EOProduct
from eodag.api.search_result import SearchResult
from eodag.config import PluginConfig
from eodag.types.download_args import DownloadConf
from eodag.utils import DownloadedCallback, Unpack
logger = logging.getLogger("eodag.download.base")
T = TypeVar("T")
[docs]
class Download(PluginTopic):
"""Base Download Plugin.
A Download plugin has two download methods that it must implement:
- ``download``: download a single :class:`~eodag.api.product._product.EOProduct`
- ``download_all``: download multiple products from a :class:`~eodag.api.search_result.SearchResult`
They must:
- download data in the ``outputs_prefix`` folder defined in the plugin's
configuration or passed through kwargs
- extract products from their archive (if relevant) if ``extract`` is set to True
(True by default)
- save a product in an archive/directory (in ``outputs_prefix``) whose name must be
the product's ``title`` property
- update the product's ``location`` attribute once its data is downloaded (and
eventually after it's extracted) to the product's location given as a file URI
(e.g. 'file:///tmp/product_folder' on Linux or
'file:///C:/Users/username/AppData/LOcal/Temp' on Windows)
- save a *record* file in the directory ``outputs_prefix/.downloaded`` whose name
is built on the MD5 hash of the product's ``product_type`` and ``properties['id']``
attributes (``hashlib.md5((product.product_type+"-"+product.properties['id']).encode("utf-8")).hexdigest()``)
and whose content is the product's ``remote_location`` attribute itself.
- not try to download a product whose ``location`` attribute already points to an
existing file/directory
- not try to download a product if its *record* file exists as long as the expected
product's file/directory. If the *record* file only is found, it must be deleted
(it certainly indicates that the download didn't complete)
:param provider: An eodag providers configuration dictionary
:type provider: dict
:param config: Path to the user configuration file
:type config: str
"""
def __init__(self, provider: str, config: PluginConfig) -> None:
super(Download, self).__init__(provider, config)
self._authenticate = bool(getattr(self.config, "authenticate", False))
[docs]
def download(
self,
product: EOProduct,
auth: Optional[Union[AuthBase, Dict[str, str]]] = None,
progress_callback: Optional[ProgressCallback] = None,
wait: int = DEFAULT_DOWNLOAD_WAIT,
timeout: int = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Unpack[DownloadConf],
) -> Optional[str]:
r"""
Base download method. Not available, it must be defined for each plugin.
:param product: The EO product to download
:type product: :class:`~eodag.api.product._product.EOProduct`
:param auth: (optional) authenticated object
:type auth: Optional[Union[AuthBase, Dict[str, str]]]
:param progress_callback: (optional) A progress callback
:type progress_callback: :class:`~eodag.utils.ProgressCallback`
:param wait: (optional) If download fails, wait time in minutes between two download tries
:type wait: int
:param timeout: (optional) If download fails, maximum time in minutes before stop retrying
to download
:type timeout: int
:param kwargs: `outputs_prefix` (str), `extract` (bool), `delete_archive` (bool)
and `dl_url_params` (dict) can be provided as additional kwargs
and will override any other values defined in a configuration
file or with environment variables.
:type kwargs: Union[str, bool, dict]
:returns: The absolute path to the downloaded product in the local filesystem
(e.g. '/tmp/product.zip' on Linux or
'C:\\Users\\username\\AppData\\Local\\Temp\\product.zip' on Windows)
:rtype: str
"""
raise NotImplementedError(
"A Download plugin must implement a method named download"
)
def _stream_download_dict(
self,
product: EOProduct,
auth: Optional[Union[AuthBase, Dict[str, str]]] = None,
progress_callback: Optional[ProgressCallback] = None,
wait: int = DEFAULT_DOWNLOAD_WAIT,
timeout: int = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Unpack[DownloadConf],
) -> StreamResponse:
r"""
Base _stream_download_dict method. Not available, it must be defined for each plugin.
:param product: The EO product to download
:type product: :class:`~eodag.api.product._product.EOProduct`
:param auth: (optional) authenticated object
:type auth: Optional[Union[AuthBase, Dict[str, str]]]
:param progress_callback: (optional) A progress callback
:type progress_callback: :class:`~eodag.utils.ProgressCallback`
:param wait: (optional) If download fails, wait time in minutes between two download tries
:type wait: int
:param timeout: (optional) If download fails, maximum time in minutes before stop retrying
to download
:type timeout: int
:param kwargs: `outputs_prefix` (str), `extract` (bool), `delete_archive` (bool)
and `dl_url_params` (dict) can be provided as additional kwargs
and will override any other values defined in a configuration
file or with environment variables.
:type kwargs: Union[str, bool, dict]
:returns: Dictionnary of :class:`~fastapi.responses.StreamingResponse` keyword-arguments
:rtype: dict
"""
raise NotImplementedError(
"Download streaming must be implemented using a method named _stream_download_dict"
)
def _prepare_download(
self,
product: EOProduct,
progress_callback: Optional[ProgressCallback] = None,
**kwargs: Unpack[DownloadConf],
) -> Tuple[Optional[str], Optional[str]]:
"""Check if file has already been downloaded, and prepare product download
:param product: The EO product to download
:type product: :class:`~eodag.api.product._product.EOProduct`
:param progress_callback: (optional) A progress callback
:type progress_callback: :class:`~eodag.utils.ProgressCallback` or None
:returns: fs_path, record_filename
:rtype: Tuple[Optional[str], Optional[str]]
"""
if product.location != product.remote_location:
fs_path = uri_to_path(product.location)
# The fs path of a product is either a file (if 'extract' config is False) or a directory
if os.path.isfile(fs_path) or os.path.isdir(fs_path):
logger.info(
f"Product already present on this platform. Identifier: {fs_path}",
)
# Do not download data if we are on site. Instead give back the absolute path to the data
return fs_path, None
url = product.remote_location
if not url:
logger.debug(
f"Unable to get download url for {product}, skipping download",
)
return None, None
logger.info(
f"Download url: {url}",
)
outputs_prefix = (
kwargs.pop("outputs_prefix", None)
or getattr(self.config, "outputs_prefix", tempfile.gettempdir())
or tempfile.gettempdir()
)
outputs_extension = kwargs.get("outputs_extension", None) or getattr(
self.config, "outputs_extension", ".zip"
)
# Strong asumption made here: all products downloaded will be zip files
# If they are not, the '.zip' extension will be removed when they are downloaded and returned as is
prefix = os.path.abspath(outputs_prefix)
sanitized_title = sanitize(product.properties["title"])
if sanitized_title == product.properties["title"]:
collision_avoidance_suffix = ""
else:
collision_avoidance_suffix = "-" + sanitize(product.properties["id"])
fs_path = os.path.join(
prefix,
f"{sanitize(product.properties['title'])}{collision_avoidance_suffix}{outputs_extension}",
)
fs_dir_path = (
fs_path.replace(outputs_extension, "") if outputs_extension else fs_path
)
download_records_dir = os.path.join(prefix, ".downloaded")
try:
os.makedirs(download_records_dir)
except OSError as exc:
import errno
if exc.errno != errno.EEXIST: # Skip error if dir exists
import traceback as tb
logger.warning(
f"Unable to create records directory. Got:\n{tb.format_exc()}",
)
record_filename = os.path.join(
download_records_dir, self.generate_record_hash(product)
)
if os.path.isfile(record_filename) and os.path.isfile(fs_path):
logger.info(
f"Product already downloaded: {fs_path}",
)
return (
self._finalize(fs_path, progress_callback=progress_callback, **kwargs),
None,
)
elif os.path.isfile(record_filename) and os.path.isdir(fs_dir_path):
logger.info(
f"Product already downloaded: {fs_dir_path}",
)
return (
self._finalize(
fs_dir_path, progress_callback=progress_callback, **kwargs
),
None,
)
# Remove the record file if fs_path is absent (e.g. it was deleted while record wasn't)
elif os.path.isfile(record_filename):
logger.debug(
f"Record file found ({record_filename}) but not the actual file",
)
logger.debug(
f"Removing record file : {record_filename}",
)
os.remove(record_filename)
return fs_path, record_filename
[docs]
def generate_record_hash(self, product: EOProduct) -> str:
"""Generate the record hash of the given product.
The MD5 hash is built from the product's ``product_type`` and ``properties['id']`` attributes
(``hashlib.md5((product.product_type+"-"+product.properties['id']).encode("utf-8")).hexdigest()``)
:param product: The product to calculate the record hash
:type product: :class:`~eodag.api.product._product.EOProduct`
:returns: The MD5 hash
:rtype: str
"""
# In some unit tests, `product.product_type` is `None` and `product.properties["id"]` is `ìnt`
product_hash = str(product.product_type) + "-" + str(product.properties["id"])
return hashlib.md5(product_hash.encode("utf-8")).hexdigest()
def _resolve_archive_depth(self, product_path: str) -> str:
"""Update product_path using archive_depth from provider configuration.
Handle depth levels in the product archive. For example, if the downloaded archive was
extracted to: ``/top_level/product_base_dir`` and ``archive_depth`` was configured to 2, the product
location will be ``/top_level/product_base_dir``.
WARNING: A strong assumption is made here: there is only one subdirectory per level
:param product_path: The path to the extracted product
:type product_path: str
:returns: The path to the extracted product with the right depth
:rtype: str
"""
archive_depth = getattr(self.config, "archive_depth", 1)
count = 1
while count < archive_depth:
product_path = os.path.join(product_path, os.listdir(product_path)[0])
count += 1
return product_path
def _finalize(
self,
fs_path: str,
progress_callback: Optional[ProgressCallback] = None,
**kwargs: Unpack[DownloadConf],
) -> str:
"""Finalize the download process.
:param fs_path: The path to the local zip archive downloaded or already present
:type fs_path: str
:param progress_callback: (optional) A progress callback
:type progress_callback: :class:`~eodag.utils.ProgressCallback` or None
:returns: The absolute path to the product
:rtype: str
"""
# progress bar init
if progress_callback is None:
progress_callback = ProgressCallback(
unit="file",
unit_scale=False,
position=2,
)
# one shot progress callback to close after download
close_progress_callback = True
else:
close_progress_callback = False
progress_callback.unit = "file"
progress_callback.unit_scale = False
progress_callback.refresh()
extract = kwargs.pop("extract", None)
extract = (
extract if extract is not None else getattr(self.config, "extract", True)
)
if not extract:
logger.info("Extraction not activated. The product is available as is.")
progress_callback(1, total=1)
return fs_path
delete_archive = kwargs.pop("delete_archive", None)
delete_archive = (
delete_archive
if delete_archive is not None
else getattr(self.config, "delete_archive", True)
)
outputs_extension = kwargs.pop("outputs_extension", ".zip")
product_path = (
fs_path[: fs_path.index(outputs_extension)]
if outputs_extension in fs_path
else fs_path
)
product_path_exists = os.path.exists(product_path)
if product_path_exists and os.path.isfile(product_path):
logger.info(
f"Remove existing partially downloaded file: {product_path}"
f" ({os.stat(product_path).st_size}/{os.stat(fs_path).st_size})"
)
os.remove(product_path)
elif (
product_path_exists
and os.path.isdir(product_path)
and len(os.listdir(product_path)) == 0
):
logger.info(f"Remove existing empty destination directory: {product_path}")
os.rmdir(product_path)
elif (
product_path_exists
and os.path.isdir(product_path)
and len(os.listdir(product_path)) > 0
):
logger.info(
f"Extraction cancelled, destination directory already exists and is not empty: {product_path}"
)
progress_callback(1, total=1)
return product_path
outputs_prefix = (
kwargs.pop("outputs_prefix", None) or self.config.outputs_prefix
)
if not os.path.exists(product_path):
logger.info("Extraction activated")
progress_callback.desc = (
f"Extracting files from {os.path.basename(fs_path)}"
)
progress_callback.refresh()
outputs_dir = os.path.join(outputs_prefix, product_path)
tmp_dir = tempfile.TemporaryDirectory()
extraction_dir = os.path.join(tmp_dir.name, os.path.basename(outputs_dir))
if fs_path.endswith(".zip"):
with zipfile.ZipFile(fs_path, "r") as zfile:
fileinfos = zfile.infolist()
progress_callback.reset(total=len(fileinfos))
for fileinfo in fileinfos:
zfile.extract(
fileinfo,
path=extraction_dir,
)
progress_callback(1)
# in some cases, only a lone file is extracted without being in a directory
# then, we create a directory in which we place this file
product_extraction_path = self._resolve_archive_depth(extraction_dir)
if os.path.isfile(product_extraction_path) and not os.path.isdir(
outputs_dir
):
os.makedirs(outputs_dir)
shutil.move(product_extraction_path, outputs_dir)
elif fs_path.endswith(".tar") or fs_path.endswith(".tar.gz"):
with tarfile.open(fs_path, "r") as zfile:
progress_callback.reset(total=1)
zfile.extractall(path=extraction_dir)
progress_callback(1)
# in some cases, only a lone file is extracted without being in a directory
# then, we create a directory in which we place this file
product_extraction_path = self._resolve_archive_depth(extraction_dir)
if os.path.isfile(product_extraction_path) and not os.path.isdir(
outputs_dir
):
os.makedirs(outputs_dir)
shutil.move(product_extraction_path, outputs_dir)
else:
progress_callback(1, total=1)
tmp_dir.cleanup()
if delete_archive:
logger.info(f"Deleting archive {os.path.basename(fs_path)}")
os.unlink(fs_path)
else:
logger.info(
f"Archive deletion is deactivated, keeping {os.path.basename(fs_path)}"
)
else:
progress_callback(1, total=1)
# close progress bar if needed
if close_progress_callback:
progress_callback.close()
return product_path
[docs]
def download_all(
self,
products: SearchResult,
auth: Optional[Union[AuthBase, Dict[str, str]]] = None,
downloaded_callback: Optional[DownloadedCallback] = None,
progress_callback: Optional[ProgressCallback] = None,
wait: int = DEFAULT_DOWNLOAD_WAIT,
timeout: int = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Unpack[DownloadConf],
) -> List[str]:
"""
Base download_all method.
This specific implementation uses the :meth:`eodag.plugins.download.base.Download.download` method
implemented by the plugin to **sequentially** attempt to download products.
:param products: Products to download
:type products: :class:`~eodag.api.search_result.SearchResult`
:param auth: (optional) authenticated object
:type auth: Optional[Union[AuthBase, Dict[str, str]]]
:param downloaded_callback: (optional) A method or a callable object which takes
as parameter the ``product``. You can use the base class
:class:`~eodag.api.product.DownloadedCallback` and override
its ``__call__`` method. Will be called each time a product
finishes downloading
:type downloaded_callback: Callable[[:class:`~eodag.api.product._product.EOProduct`], None]
or None
:param progress_callback: (optional) A progress callback
:type progress_callback: :class:`~eodag.utils.ProgressCallback`
:param wait: (optional) If download fails, wait time in minutes between two download tries
:type wait: int
:param timeout: (optional) If download fails, maximum time in minutes before stop retrying
to download
:type timeout: int
:param kwargs: `outputs_prefix` (str), `extract` (bool), `delete_archive` (bool)
and `dl_url_params` (dict) can be provided as additional kwargs
and will override any other values defined in a configuration
file or with environment variables.
:type kwargs: Union[str, bool, dict]
:returns: List of absolute paths to the downloaded products in the local
filesystem (e.g. ``['/tmp/product.zip']`` on Linux or
``['C:\\Users\\username\\AppData\\Local\\Temp\\product.zip']`` on Windows)
:rtype: list
"""
# Products are going to be removed one by one from this sequence once
# downloaded.
products = products[:]
paths: List[str] = []
# initiate retry loop
start_time = datetime.now()
stop_time = start_time + timedelta(minutes=timeout)
nb_products = len(products)
retry_count = 0
# another output for notbooks
nb_info = NotebookWidgets()
for product in products:
product.next_try = start_time
# progress bar init
if progress_callback is None:
progress_callback = ProgressCallback(
total=nb_products,
unit="product",
desc="Downloaded products",
unit_scale=False,
)
product_progress_callback = None
else:
product_progress_callback = progress_callback.copy()
progress_callback.reset(total=nb_products)
progress_callback.unit = "product"
progress_callback.desc = "Downloaded products"
progress_callback.unit_scale = False
progress_callback.refresh()
with progress_callback as bar:
while "Loop until all products are download or timeout is reached":
# try downloading each product before retry
for idx, product in enumerate(products):
if datetime.now() >= product.next_try:
products[idx].next_try += timedelta(minutes=wait)
try:
paths.append(
product.download(
progress_callback=product_progress_callback,
wait=wait,
timeout=-1,
**kwargs,
)
)
if downloaded_callback:
downloaded_callback(product)
# product downloaded, to not retry it
products.remove(product)
bar(1)
# reset stop time for next product
stop_time = datetime.now() + timedelta(minutes=timeout)
except NotAvailableError as e:
logger.info(e)
continue
except (AuthenticationError, MisconfiguredError):
logger.exception(
f"Stopped because of credentials problems with provider {self.provider}"
)
raise
except RuntimeError:
import traceback as tb
logger.error(
f"A problem occurred during download of product: {product}. "
"Skipping it"
)
logger.debug(f"\n{tb.format_exc()}")
stop_time = datetime.now()
except Exception:
import traceback as tb
logger.warning(
f"A problem occurred during download of product: {product}. "
"Skipping it",
)
logger.debug(f"\n{tb.format_exc()}")
if (
len(products) > 0
and datetime.now() < products[0].next_try
and datetime.now() < stop_time
):
wait_seconds = (products[0].next_try - datetime.now()).seconds
retry_count += 1
info_message = (
f"[Retry #{retry_count}, {nb_products - len(products)}/{nb_products} D/L] "
f"Waiting {wait_seconds}s until next download try (retry every {wait}' for {timeout}')"
)
logger.info(info_message)
nb_info.display_html(info_message)
sleep(wait_seconds + 1)
elif len(products) > 0 and datetime.now() >= stop_time:
logger.warning(
f"{len(products)} products could not be downloaded: "
+ str([prod.properties["title"] for prod in products])
)
break
elif len(products) == 0:
break
return paths
def _download_retry(
self, product: EOProduct, wait: int, timeout: int
) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""
Download retry decorator.
Retries the wrapped download method after `wait` minutes if a NotAvailableError
exception is thrown until `timeout` minutes.
:param product: The EO product to download
:type product: :class:`~eodag.api.product._product.EOProduct`
:param wait: If download fails, wait time in minutes between two download tries
:type wait: int
:param timeout: If download fails, maximum time in minutes before stop retrying
to download
:type timeout: int
:returns: decorator
:rtype: Callable[[Callable[..., T]], Callable[..., T]]
"""
def decorator(download: Callable[..., T]) -> Callable[..., T]:
def download_and_retry(*args: Any, **kwargs: Unpack[DownloadConf]) -> T:
# initiate retry loop
start_time = datetime.now()
stop_time = start_time + timedelta(minutes=timeout)
product.next_try = start_time
retry_count = 0
not_available_info = "The product could not be downloaded"
# another output for notebooks
nb_info = NotebookWidgets()
while "Loop until products download succeeds or timeout is reached":
datetime_now = datetime.now()
if datetime_now >= product.next_try:
product.next_try += timedelta(minutes=wait)
try:
return download(*args, **kwargs)
except NotAvailableError as e:
if not getattr(self.config, "order_enabled", False):
raise NotAvailableError(
f"Product is not available for download and order is not supported for"
f" {self.provider}, {e}"
)
not_available_info = str(e)
pass
if datetime_now >= product.next_try and datetime_now < stop_time:
wait_seconds = (
datetime_now - product.next_try + timedelta(minutes=wait)
).seconds
retry_count += 1
retry_info = (
f"[Retry #{retry_count}] Waited {wait_seconds}s, trying again to download ordered product"
f" (retry every {wait}' for {timeout}')"
)
logger.debug(not_available_info)
# Retry-After info from Response header
if hasattr(self, "stream"):
retry_server_info = self.stream.headers.get(
"Retry-After", ""
)
if retry_server_info:
logger.debug(
f"[{self.provider} response] Retry-After: {retry_server_info}"
)
logger.info(retry_info)
nb_info.display_html(retry_info)
product.next_try = datetime_now
elif datetime_now < product.next_try and datetime_now < stop_time:
wait_seconds = (product.next_try - datetime_now).seconds + (
product.next_try - datetime_now
).microseconds / 1e6
retry_count += 1
retry_info = (
f"[Retry #{retry_count}] Waiting {wait_seconds}s until next download try"
f" for ordered product (retry every {wait}' for {timeout}')"
)
logger.debug(not_available_info)
# Retry-After info from Response header
if hasattr(self, "stream"):
retry_server_info = self.stream.headers.get(
"Retry-After", ""
)
if retry_server_info:
logger.debug(
f"[{self.provider} response] Retry-After: {retry_server_info}"
)
logger.info(retry_info)
nb_info.display_html(retry_info)
sleep(wait_seconds)
elif datetime_now >= stop_time and timeout > 0:
if "storageStatus" not in product.properties:
product.properties["storageStatus"] = "N/A status"
logger.info(not_available_info)
raise NotAvailableError(
f"{product.properties['title']} is not available ({product.properties['storageStatus']})"
f" and could not be downloaded, timeout reached"
)
elif datetime_now >= stop_time:
raise NotAvailableError(not_available_info)
return download(*args, **kwargs)
return download_and_retry
return decorator