Source code for eodag.plugins.download.http

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import logging
import os
import shutil
import zipfile
from datetime import datetime
from email.message import Message
from itertools import chain
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple, Union
from urllib.parse import parse_qs, urlparse

import geojson
import requests
import requests_ftp
from lxml import etree
from requests import RequestException
from stream_zip import ZIP_AUTO, stream_zip

from eodag.api.product.metadata_mapping import (
    OFFLINE_STATUS,
    ONLINE_STATUS,
    mtd_cfg_as_conversion_and_querypath,
    properties_from_json,
    properties_from_xml,
)
from eodag.plugins.download.base import Download
from eodag.utils import (
    DEFAULT_DOWNLOAD_TIMEOUT,
    DEFAULT_DOWNLOAD_WAIT,
    DEFAULT_STREAM_REQUESTS_TIMEOUT,
    HTTP_REQ_TIMEOUT,
    USER_AGENT,
    ProgressCallback,
    flatten_top_directories,
    parse_header,
    path_to_uri,
    sanitize,
    uri_to_path,
)
from eodag.utils.exceptions import (
    AuthenticationError,
    DownloadError,
    MisconfiguredError,
    NotAvailableError,
    TimeOutError,
)

if TYPE_CHECKING:
    from requests import Response

    from eodag.api.product import EOProduct
    from eodag.api.search_result import SearchResult
    from eodag.config import PluginConfig
    from eodag.utils import DownloadedCallback

logger = logging.getLogger("eodag.download.http")


[docs]class HTTPDownload(Download): """HTTPDownload plugin. Handles product download over HTTP protocol :param provider: provider name :type provider: str :param config: Download plugin configuration: * ``config.base_uri`` (str) - default endpoint url * ``config.extract`` (bool) - (optional) extract downloaded archive or not * ``config.auth_error_code`` (int) - (optional) authentication error code * ``config.dl_url_params`` (dict) - (optional) attitional parameters to send in the request * ``config.archive_depth`` (int) - (optional) level in extracted path tree where to find data * ``config.flatten_top_dirs`` (bool) - (optional) flatten directory structure * ``config.ignore_assets`` (bool) - (optional) ignore assets and download using downloadLink * ``config.order_enabled`` (bool) - (optional) wether order is enabled or not if product is `OFFLINE` * ``config.order_method`` (str) - (optional) HTTP request method, GET (default) or POST * ``config.order_headers`` (dict) - (optional) order request headers * ``config.order_on_response`` (dict) - (optional) edit or add new product properties * ``config.order_status_method`` (str) - (optional) status HTTP request method, GET (default) or POST * ``config.order_status_percent`` (str) - (optional) progress percentage key in obtained status response * ``config.order_status_error`` (dict) - (optional) key/value identifying an error status :type config: :class:`~eodag.config.PluginConfig` """
[docs] def __init__(self, provider: str, config: PluginConfig) -> None: super(HTTPDownload, self).__init__(provider, config) if not hasattr(self.config, "base_uri"): raise MisconfiguredError( "{} plugin require a base_uri configuration key".format( type(self).__name__ ) )
def orderDownload( self, product: EOProduct, auth: Optional[PluginConfig] = None, **kwargs: Union[str, bool, Dict[str, Any]], ) -> None: """Send product order request. It will be executed once before the download retry loop, if the product is OFFLINE and has `orderLink` in its properties. Product ordering can be configured using the following download plugin parameters: - **order_enabled**: Wether order is enabled or not (may not use this method if no `orderLink` exists) - **order_method**: (optional) HTTP request method, GET (default) or POST - **order_on_response**: (optional) things to do with obtained order response: - *metadata_mapping*: edit or add new product propoerties properties Product properties used for order: - **orderLink**: order request URL :param product: The EO product to order :type product: :class:`~eodag.api.product._product.EOProduct` :param auth: (optional) The configuration of a plugin of type Authentication :type auth: :class:`~eodag.config.PluginConfig` :param kwargs: download additional kwargs :type kwargs: Union[str, bool, dict] """ order_method = getattr(self.config, "order_method", "GET").lower() if order_method == "post": # separate url & parameters parts = urlparse(str(product.properties["orderLink"])) query_dict = parse_qs(parts.query) if not query_dict and parts.query: query_dict = geojson.loads(parts.query) order_url = parts._replace(query=None).geturl() order_kwargs = {"json": query_dict} if query_dict else {} else: order_url = product.properties["orderLink"] order_kwargs = {} with requests.request( method=order_method, url=order_url, auth=auth, timeout=HTTP_REQ_TIMEOUT, headers=dict(getattr(self.config, "order_headers", {}), **USER_AGENT), **order_kwargs, ) as response: try: response.raise_for_status() ordered_message = response.text logger.debug(ordered_message) logger.info("%s was ordered", product.properties["title"]) except requests.exceptions.Timeout as exc: raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc except RequestException as e: if e.response and hasattr(e.response, "content"): error_message = f"{e.response.content.decode('utf-8')} - {e}" else: error_message = str(e) logger.warning( "%s could not be ordered, request returned %s", product.properties["title"], error_message, ) order_metadata_mapping = getattr(self.config, "order_on_response", {}).get( "metadata_mapping", {} ) if order_metadata_mapping: logger.debug("Parsing order response to update product metada-mapping") order_metadata_mapping_jsonpath = mtd_cfg_as_conversion_and_querypath( order_metadata_mapping, ) properties_update = properties_from_json( response.json(), order_metadata_mapping_jsonpath, ) product.properties.update(properties_update) if "downloadLink" in properties_update: product.remote_location = product.location = product.properties[ "downloadLink" ] logger.debug(f"Product location updated to {product.location}") def orderDownloadStatus( self, product: EOProduct, auth: Optional[PluginConfig] = None, **kwargs: Union[str, bool, Dict[str, Any]], ) -> None: """Send product order status request. It will be executed before each download retry. Product order status request can be configured using the following download plugin parameters: - **order_status_method**: (optional) HTTP request method, GET (default) or POST - **order_status_percent**: (optional) progress percentage key in obtained response - **order_status_error**: (optional) key/value identifying an error status Product properties used for order status: - **orderStatusLink**: order status request URL :param product: The ordered EO product :type product: :class:`~eodag.api.product._product.EOProduct` :param auth: (optional) The configuration of a plugin of type Authentication :type auth: :class:`~eodag.config.PluginConfig` :param kwargs: download additional kwargs :type kwargs: Union[str, bool, dict] """ status_method = getattr(self.config, "order_status_method", "GET").lower() if status_method == "post": # separate url & parameters parts = urlparse(str(product.properties["orderStatusLink"])) query_dict = parse_qs(parts.query) if not query_dict and parts.query: query_dict = geojson.loads(parts.query) status_url = parts._replace(query=None).geturl() status_kwargs = {"json": query_dict} if query_dict else {} else: status_url = product.properties["orderStatusLink"] status_kwargs = {} with requests.request( method=status_method, url=status_url, auth=auth, timeout=HTTP_REQ_TIMEOUT, headers=dict( getattr(self.config, "order_status_headers", {}), **USER_AGENT ), **status_kwargs, ) as response: try: response.raise_for_status() status_message = response.text status_dict = response.json() # display progress percentage order_status_percent_key = getattr( self.config, "order_status_percent", None ) if order_status_percent_key and order_status_percent_key in status_dict: order_status_value = str(status_dict[order_status_percent_key]) if order_status_value.isdigit(): order_status_value += "%" logger.info( f"{product.properties['title']} order status: {order_status_value}" ) # display error if any order_status_error_dict = getattr(self.config, "order_status_error", {}) if ( order_status_error_dict and order_status_error_dict.items() <= status_dict.items() ): # order_status_error_dict is a subset of status_dict : error logger.warning(status_message) else: logger.debug(status_message) # check if succeeds and need search again order_status_success_dict = getattr( self.config, "order_status_success", {} ) if ( "status" in status_dict and status_dict["status"] == order_status_success_dict["status"] and "message" in status_dict and status_dict["message"] == order_status_success_dict["message"] ): product.properties["storageStatus"] = ONLINE_STATUS if ( order_status_success_dict and order_status_success_dict.items() <= status_dict.items() and getattr(self.config, "order_status_on_success", {}).get( "need_search" ) ): logger.debug( f"Search for new location: {product.properties['searchLink']}" ) # search again response = requests.get( product.properties["searchLink"], timeout=HTTP_REQ_TIMEOUT, headers=USER_AGENT, ) response.raise_for_status() if ( self.config.order_status_on_success.get("result_type", "json") == "xml" ): root_node = etree.fromstring(response.content) namespaces = {k or "ns": v for k, v in root_node.nsmap.items()} results = [ etree.tostring(entry) for entry in root_node.xpath( self.config.order_status_on_success["results_entry"], namespaces=namespaces, ) ] if isinstance(results, list) and len(results) != 1: raise DownloadError( "Could not get a single result after order success for " f"{product.properties['searchLink']} request. " f"Please search and download {product} again" ) return try: assert isinstance( results, list ), "results must be in a list" # single result result = results[0] # parse result new_search_metadata_mapping = ( self.config.order_status_on_success["metadata_mapping"] ) order_metadata_mapping_jsonpath = {} order_metadata_mapping_jsonpath = ( mtd_cfg_as_conversion_and_querypath( new_search_metadata_mapping, order_metadata_mapping_jsonpath, ) ) properties_update = properties_from_xml( result, order_metadata_mapping_jsonpath, ) except Exception as e: logger.debug(e) raise DownloadError( f"Could not parse result after order success for {product.properties['searchLink']} " f"request. Please search and download {product} again" ) # update product product.properties.update(properties_update) product.location = product.remote_location = product.properties[ "downloadLink" ] else: logger.warning( "JSON response parsing is not implemented yet for new searches " f"after order success. Please search and download {product} again" ) except requests.exceptions.Timeout as exc: raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc except RequestException as e: logger.warning( "%s order status could not be checked, request returned %s", product.properties["title"], e, ) def download( self, product: EOProduct, auth: Optional[PluginConfig] = None, progress_callback: Optional[ProgressCallback] = None, wait: int = DEFAULT_DOWNLOAD_WAIT, timeout: int = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Union[str, bool, Dict[str, Any]], ) -> Optional[str]: """Download a product using HTTP protocol. The downloaded product is assumed to be a Zip file. If it is not, the user is warned, it is renamed to remove the zip extension and no further treatment is done (no extraction) """ if progress_callback is None: logger.info( "Progress bar unavailable, please call product.download() instead of plugin.download()" ) progress_callback = ProgressCallback(disable=True) fs_path, record_filename = self._prepare_download( product, progress_callback=progress_callback, **kwargs ) if not fs_path or not record_filename: if fs_path: product.location = path_to_uri(fs_path) return fs_path # download assets if exist instead of remote_location if len(product.assets) > 0 and not getattr(self.config, "ignore_assets", False): try: fs_path = self._download_assets( product, fs_path.replace(".zip", ""), record_filename, auth, progress_callback, **kwargs, ) if kwargs.get("asset", None) is None: product.location = path_to_uri(fs_path) return fs_path except NotAvailableError as e: if kwargs.get("asset", None) is not None: raise NotAvailableError(e).with_traceback(e.__traceback__) else: pass url = product.remote_location @self._download_retry(product, wait, timeout) def download_request( product: EOProduct, auth: PluginConfig, progress_callback: ProgressCallback, wait: int, timeout: int, **kwargs: Dict[str, Any], ) -> None: chunks = self._stream_download(product, auth, progress_callback, **kwargs) with open(fs_path, "wb") as fhandle: for chunk in chunks: fhandle.write(chunk) download_request(product, auth, progress_callback, wait, timeout, **kwargs) with open(record_filename, "w") as fh: fh.write(url) logger.debug("Download recorded in %s", record_filename) # Check that the downloaded file is really a zip file outputs_extension = kwargs.get("outputs_extension", None) or getattr( self.config, "outputs_extension", ".zip" ) if not zipfile.is_zipfile(fs_path) and outputs_extension == ".zip": logger.warning( "Downloaded product is not a Zip File. Please check its file type before using it" ) new_fs_path = fs_path[: fs_path.index(".zip")] shutil.move(fs_path, new_fs_path) product.location = path_to_uri(new_fs_path) return new_fs_path product_path = self._finalize( fs_path, progress_callback=progress_callback, **kwargs ) product.location = path_to_uri(product_path) return product_path def _check_stream_size(self, product: EOProduct) -> int: stream_size = int(self.stream.headers.get("content-length", 0)) if ( stream_size == 0 and "storageStatus" in product.properties and product.properties["storageStatus"] != ONLINE_STATUS ): raise NotAvailableError( "%s(initially %s) ordered, got: %s" % ( product.properties["title"], product.properties["storageStatus"], self.stream.reason, ) ) return stream_size def _stream_download_dict( self, product: EOProduct, auth: Optional[PluginConfig] = None, progress_callback: Optional[ProgressCallback] = None, wait: int = DEFAULT_DOWNLOAD_WAIT, timeout: int = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Union[str, bool, Dict[str, Any]], ) -> Dict[str, Any]: r""" Returns dictionnary of :class:`~fastapi.responses.StreamingResponse` keyword-arguments. It contains a generator to streamed download chunks and the response headers. :param product: The EO product to download :type product: :class:`~eodag.api.product._product.EOProduct` :param auth: (optional) The configuration of a plugin of type Authentication :type auth: :class:`~eodag.config.PluginConfig` :param progress_callback: (optional) A progress callback :type progress_callback: :class:`~eodag.utils.ProgressCallback` :param wait: (optional) If download fails, wait time in minutes between two download tries :type wait: int :param timeout: (optional) If download fails, maximum time in minutes before stop retrying to download :type timeout: int :param kwargs: `outputs_prefix` (str), `extract` (bool), `delete_archive` (bool) and `dl_url_params` (dict) can be provided as additional kwargs and will override any other values defined in a configuration file or with environment variables. :type kwargs: Union[str, bool, dict] :returns: Dictionnary of :class:`~fastapi.responses.StreamingResponse` keyword-arguments :rtype: dict """ # download assets if exist instead of remote_location if len(product.assets) > 0 and not getattr(self.config, "ignore_assets", False): try: assets_values = product.assets.get_values(kwargs.get("asset", None)) chunks_tuples = self._stream_download_assets( product, auth, progress_callback, assets_values=assets_values, **kwargs, ) if len(assets_values) == 1: # start reading chunks to set asset.headers first_chunks_tuple = next(chunks_tuples) # update headers assets_values[0].headers[ "content-disposition" ] = f"attachment; filename={assets_values[0].filename}" if assets_values[0].get("type", None): assets_values[0].headers["content-type"] = assets_values[0][ "type" ] return dict( content=chain(iter([first_chunks_tuple]), chunks_tuples), headers=assets_values[0].headers, ) else: outputs_filename = ( sanitize(product.properties["title"]) if "title" in product.properties else sanitize(product.properties.get("id", "download")) ) return dict( content=stream_zip(chunks_tuples), media_type="application/zip", headers={ "content-disposition": f"attachment; filename={outputs_filename}.zip", }, ) except NotAvailableError as e: if kwargs.get("asset", None) is not None: raise NotAvailableError(e).with_traceback(e.__traceback__) else: pass chunks = self._stream_download(product, auth, progress_callback, **kwargs) # start reading chunks to set product.headers first_chunk = next(chunks) return dict( content=chain(iter([first_chunk]), chunks), headers=product.headers, ) def _process_exception( self, e: RequestException, product: EOProduct, ordered_message: str ) -> None: # check if error is identified as auth_error in provider conf auth_errors = getattr(self.config, "auth_error_code", [None]) if not isinstance(auth_errors, list): auth_errors = [auth_errors] if e.response and e.response.status_code in auth_errors: raise AuthenticationError( "HTTP Error %s returned, %s\nPlease check your credentials for %s" % ( e.response.status_code, e.response.text.strip(), self.provider, ) ) # product not available elif product.properties.get("storageStatus", ONLINE_STATUS) != ONLINE_STATUS: msg = ( ordered_message if ordered_message and not e.response.text.strip() else e.response.text.strip() ) raise NotAvailableError( "%s(initially %s) requested, returned: %s" % ( product.properties["title"], product.properties["storageStatus"], msg, ) ) else: import traceback as tb logger.error( "Error while getting resource :\n%s\n%s", tb.format_exc(), e.response.text.strip(), ) def _stream_download( self, product: EOProduct, auth: Optional[PluginConfig] = None, progress_callback: Optional[ProgressCallback] = None, **kwargs: Dict[str, Any], ) -> Iterator[Any]: """ fetches a zip file containing the assets of a given product as a stream and returns a generator yielding the chunks of the file :param product: product for which the assets should be downloaded :type product: :class:`~eodag.api.product._product.EOProduct` :param auth: The configuration of a plugin of type Authentication :type auth: :class:`~eodag.config.PluginConfig` :param progress_callback: A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :type progress_callback: :class:`~eodag.utils.ProgressCallback` :param kwargs: additional arguments :type kwargs: dict """ if progress_callback is None: logger.info("Progress bar unavailable, please call product.download()") progress_callback = ProgressCallback(disable=True) ordered_message = "" if ( "orderLink" in product.properties and "storageStatus" in product.properties and product.properties["storageStatus"] == OFFLINE_STATUS ): self.orderDownload(product=product, auth=auth) if product.properties.get("orderStatusLink", None): self.orderDownloadStatus(product=product, auth=auth) params = kwargs.pop("dl_url_params", None) or getattr( self.config, "dl_url_params", {} ) req_method = ( product.properties.get("downloadMethod", "").lower() or getattr(self.config, "method", "GET").lower() ) url = product.remote_location if req_method == "post": # separate url & parameters parts = urlparse(url) query_dict = parse_qs(parts.query) if not query_dict and parts.query: query_dict = geojson.loads(parts.query) req_url = parts._replace(query=None).geturl() req_kwargs: Dict[str, Any] = {"json": query_dict} if query_dict else {} else: req_url = url req_kwargs = {} # url where data is downloaded from can be ftp -> add ftp adapter requests_ftp.monkeypatch_session() s = requests.Session() with s.request( req_method, req_url, stream=True, auth=auth, params=params, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, **req_kwargs, ) as self.stream: try: self.stream.raise_for_status() except requests.exceptions.Timeout as exc: raise TimeOutError( exc, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT ) from exc except RequestException as e: self._process_exception(e, product, ordered_message) else: stream_size = self._check_stream_size(product) product.headers = self.stream.headers progress_callback.reset(total=stream_size) for chunk in self.stream.iter_content(chunk_size=64 * 1024): if chunk: progress_callback(len(chunk)) yield chunk def _stream_download_assets( self, product: EOProduct, auth: Optional[PluginConfig] = None, progress_callback: Optional[ProgressCallback] = None, **kwargs: Union[str, bool, Dict[str, Any]], ) -> Iterator[Tuple[str, datetime, int, Any, Iterator[Any]]]: if progress_callback is None: logger.info("Progress bar unavailable, please call product.download()") progress_callback = ProgressCallback(disable=True) assets_urls = [ a["href"] for a in getattr(product, "assets", {}).values() if "href" in a ] if not assets_urls: raise NotAvailableError("No assets available for %s" % product) assets_values = kwargs.get("assets_values", []) # get extra parameters to pass to the query params = kwargs.pop("dl_url_params", None) or getattr( self.config, "dl_url_params", {} ) total_size = self._get_asset_sizes(assets_values, auth, params) progress_callback.reset(total=total_size) def get_chunks(stream: Response) -> Any: for chunk in stream.iter_content(chunk_size=64 * 1024): if chunk: progress_callback(len(chunk)) yield chunk # zipped files properties modified_at = datetime.now() perms = 0o600 # loop for assets paths and get common_subdir asset_rel_paths_list = [] for asset in assets_values: asset_rel_path_parts = urlparse(asset["href"]).path.strip("/").split("/") asset_rel_path_parts_sanitized = [ sanitize(part) for part in asset_rel_path_parts ] asset.rel_path = os.path.join(*asset_rel_path_parts_sanitized) asset_rel_paths_list.append(asset.rel_path) if asset_rel_paths_list: assets_common_subdir = os.path.commonpath(asset_rel_paths_list) # product conf overrides provider conf for "flatten_top_dirs" product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) flatten_top_dirs = product_conf.get( "flatten_top_dirs", getattr(self.config, "flatten_top_dirs", False) ) # loop for assets download for asset in assets_values: if asset["href"].startswith("file:"): logger.info( f"Local asset detected. Download skipped for {asset['href']}" ) continue with requests.get( asset["href"], stream=True, auth=auth, params=params, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, ) as stream: try: stream.raise_for_status() except requests.exceptions.Timeout as exc: raise TimeOutError( exc, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT ) from exc except RequestException as e: raise_errors = True if len(assets_values) == 1 else False self._handle_asset_exception(e, asset, raise_errors=raise_errors) else: asset_rel_path = ( asset.rel_path.replace(assets_common_subdir, "").strip(os.sep) if flatten_top_dirs else asset.rel_path ) asset_rel_dir = os.path.dirname(asset_rel_path) if not getattr(asset, "filename", None): # try getting filename in GET header if was not found in HEAD result asset_content_disposition = stream.headers.get( "content-disposition", None ) if asset_content_disposition: asset.filename = parse_header( asset_content_disposition ).get_param("filename", None) if not getattr(asset, "filename", None): # default filename extracted from path asset.filename = os.path.basename(asset.rel_path) asset.rel_path = os.path.join(asset_rel_dir, asset.filename) if len(assets_values) == 1: # apply headers to asset product.assets[assets_values[0].key].headers = stream.headers yield from get_chunks(stream) else: # several assets to zip yield ( asset.rel_path, modified_at, perms, ZIP_AUTO(asset.size), get_chunks(stream), ) def _download_assets( self, product: EOProduct, fs_dir_path: str, record_filename: str, auth: Optional[PluginConfig] = None, progress_callback: Optional[ProgressCallback] = None, **kwargs: Union[str, bool, Dict[str, Any]], ) -> str: """Download product assets if they exist""" if progress_callback is None: logger.info("Progress bar unavailable, please call product.download()") progress_callback = ProgressCallback(disable=True) assets_urls = [ a["href"] for a in getattr(product, "assets", {}).values() if "href" in a ] if not assets_urls: raise NotAvailableError("No assets available for %s" % product) assets_values = product.assets.get_values(kwargs.get("asset", None)) chunks_tuples = self._stream_download_assets( product, auth, progress_callback, assets_values=assets_values, **kwargs ) # remove existing incomplete file if os.path.isfile(fs_dir_path): os.remove(fs_dir_path) # create product dest dir if not os.path.isdir(fs_dir_path): os.makedirs(fs_dir_path) # product conf overrides provider conf for "flatten_top_dirs" product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) flatten_top_dirs = product_conf.get( "flatten_top_dirs", getattr(self.config, "flatten_top_dirs", False) ) # count local assets local_assets_count = 0 for asset in assets_values: if asset["href"].startswith("file:"): local_assets_count += 1 continue if len(assets_values) == 1 and local_assets_count == 0: # start reading chunks to set asset.rel_path first_chunks_tuple = next(chunks_tuples) chunks = chain(iter([first_chunks_tuple]), chunks_tuples) chunks_tuples = [(assets_values[0].rel_path, None, None, None, chunks)] for chunk_tuple in chunks_tuples: asset_path = chunk_tuple[0] asset_chunks = chunk_tuple[4] asset_abs_path = os.path.join(fs_dir_path, asset_path) # create asset subdir if not exist asset_abs_path_dir = os.path.dirname(asset_abs_path) if not os.path.isdir(asset_abs_path_dir): os.makedirs(asset_abs_path_dir) if not os.path.isfile(asset_abs_path): with open(asset_abs_path, "wb") as fhandle: for chunk in asset_chunks: if chunk: fhandle.write(chunk) progress_callback(len(chunk)) # only one local asset if local_assets_count == len(assets_urls) and local_assets_count == 1: # remove empty {fs_dir_path} shutil.rmtree(fs_dir_path) # and return assets_urls[0] path fs_dir_path = uri_to_path(assets_urls[0]) # several local assets elif local_assets_count == len(assets_urls) and local_assets_count > 0: common_path = os.path.commonpath([uri_to_path(uri) for uri in assets_urls]) # remove empty {fs_dir_path} shutil.rmtree(fs_dir_path) # and return assets_urls common path fs_dir_path = common_path # no assets downloaded but some should have been elif len(os.listdir(fs_dir_path)) == 0: raise NotAvailableError("No assets could be downloaded") # flatten directory structure if flatten_top_dirs: flatten_top_directories(fs_dir_path) if kwargs.get("asset", None) is None: # save hash/record file with open(record_filename, "w") as fh: fh.write(product.remote_location) logger.debug("Download recorded in %s", record_filename) return fs_dir_path def _handle_asset_exception( self, e: RequestException, asset: Dict[str, Any], raise_errors: bool = False ) -> None: # check if error is identified as auth_error in provider conf auth_errors = getattr(self.config, "auth_error_code", [None]) if not isinstance(auth_errors, list): auth_errors = [auth_errors] if e.response and e.response.status_code in auth_errors: raise AuthenticationError( "HTTP Error %s returned, %s\nPlease check your credentials for %s" % ( e.response.status_code, e.response.text.strip(), self.provider, ) ) elif raise_errors: raise DownloadError(e) else: logger.warning("Unexpected error: %s" % e) logger.warning("Skipping %s" % asset["href"]) def _get_asset_sizes( self, assets_values: List[Dict[str, Any]], auth: Optional[PluginConfig], params: Optional[Dict[str, str]], zipped: bool = False, ) -> int: total_size = 0 # loop for assets size & filename for asset in assets_values: if not asset["href"].startswith("file:"): # HEAD request for size & filename asset_headers = requests.head( asset["href"], auth=auth, headers=USER_AGENT, timeout=HTTP_REQ_TIMEOUT, ).headers if not getattr(asset, "size", 0): # size from HEAD header / Content-length asset.size = int(asset_headers.get("Content-length", 0)) header_content_disposition = Message() if not getattr(asset, "size", 0) or not getattr(asset, "filename", 0): # header content-disposition header_content_disposition = parse_header( asset_headers.get("content-disposition", "") ) if not getattr(asset, "size", 0): # size from HEAD header / content-disposition / size asset.size = int(header_content_disposition.get_param("size", 0)) if not getattr(asset, "filename", 0): # filename from HEAD header / content-disposition / size asset.filename = header_content_disposition.get_param( "filename", None ) if not getattr(asset, "size", 0): # GET request for size with requests.get( asset["href"], stream=True, auth=auth, params=params, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, ) as stream: # size from GET header / Content-length asset.size = int(stream.headers.get("Content-length", 0)) if not getattr(asset, "size", 0): # size from GET header / content-disposition / size asset.size = int( parse_header( stream.headers.get("content-disposition", "") ).get_param("size", 0) ) total_size += asset.size return total_size def download_all( self, products: SearchResult, auth: Optional[PluginConfig] = None, downloaded_callback: Optional[DownloadedCallback] = None, progress_callback: Optional[ProgressCallback] = None, wait: int = DEFAULT_DOWNLOAD_WAIT, timeout: int = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Union[str, bool, Dict[str, Any]], ): """ Download all using parent (base plugin) method """ return super(HTTPDownload, self).download_all( products, auth=auth, downloaded_callback=downloaded_callback, progress_callback=progress_callback, wait=wait, timeout=timeout, **kwargs, )