Source code for eodag.plugins.download.http

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import logging
import os
import shutil
import tarfile
import zipfile
from datetime import datetime
from email.message import Message
from itertools import chain
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterator,
    List,
    Optional,
    Tuple,
    TypedDict,
    Union,
    cast,
)
from urllib.parse import parse_qs, urlparse

import geojson
import requests
from lxml import etree
from requests import RequestException
from requests.auth import AuthBase
from stream_zip import ZIP_AUTO, stream_zip

from eodag.api.product.metadata_mapping import (
    NOT_AVAILABLE,
    OFFLINE_STATUS,
    ONLINE_STATUS,
    STAGING_STATUS,
    mtd_cfg_as_conversion_and_querypath,
    properties_from_json,
    properties_from_xml,
)
from eodag.plugins.download.base import Download
from eodag.utils import (
    DEFAULT_DOWNLOAD_TIMEOUT,
    DEFAULT_DOWNLOAD_WAIT,
    DEFAULT_STREAM_REQUESTS_TIMEOUT,
    HTTP_REQ_TIMEOUT,
    USER_AGENT,
    ProgressCallback,
    StreamResponse,
    flatten_top_directories,
    guess_extension,
    guess_file_type,
    parse_header,
    path_to_uri,
    sanitize,
    string_to_jsonpath,
    uri_to_path,
)
from eodag.utils.exceptions import (
    AuthenticationError,
    DownloadError,
    MisconfiguredError,
    NotAvailableError,
    TimeOutError,
)

if TYPE_CHECKING:
    from requests import Response

    from eodag.api.product import Asset, EOProduct  # type: ignore
    from eodag.api.search_result import SearchResult
    from eodag.config import PluginConfig
    from eodag.types.download_args import DownloadConf
    from eodag.utils import DownloadedCallback, Unpack

logger = logging.getLogger("eodag.download.http")


[docs] class HTTPDownload(Download): """HTTPDownload plugin. Handles product download over HTTP protocol :param provider: provider name :type provider: str :param config: Download plugin configuration: * ``config.base_uri`` (str) - default endpoint url * ``config.extract`` (bool) - (optional) extract downloaded archive or not * ``config.auth_error_code`` (int) - (optional) authentication error code * ``config.dl_url_params`` (dict) - (optional) attitional parameters to send in the request * ``config.archive_depth`` (int) - (optional) level in extracted path tree where to find data * ``config.flatten_top_dirs`` (bool) - (optional) flatten directory structure * ``config.ignore_assets`` (bool) - (optional) ignore assets and download using downloadLink * ``config.order_enabled`` (bool) - (optional) wether order is enabled or not if product is `OFFLINE` * ``config.order_method`` (str) - (optional) HTTP request method, GET (default) or POST * ``config.order_headers`` (dict) - (optional) order request headers * ``config.order_on_response`` (dict) - (optional) edit or add new product properties * ``config.order_status`` (:class:`~eodag.config.PluginConfig.OrderStatus`) - Order status handling :type config: :class:`~eodag.config.PluginConfig` """
[docs] def __init__(self, provider: str, config: PluginConfig) -> None: super(HTTPDownload, self).__init__(provider, config) if not hasattr(self.config, "base_uri"): raise MisconfiguredError( "{} plugin require a base_uri configuration key".format( type(self).__name__ ) )
def orderDownload( self, product: EOProduct, auth: Optional[AuthBase] = None, **kwargs: Unpack[DownloadConf], ) -> Optional[Dict[str, Any]]: """Send product order request. It will be executed once before the download retry loop, if the product is OFFLINE and has `orderLink` in its properties. Product ordering can be configured using the following download plugin parameters: - **order_enabled**: Wether order is enabled or not (may not use this method if no `orderLink` exists) - **order_method**: (optional) HTTP request method, GET (default) or POST - **order_on_response**: (optional) things to do with obtained order response: - *metadata_mapping*: edit or add new product propoerties properties Product properties used for order: - **orderLink**: order request URL :param product: The EO product to order :type product: :class:`~eodag.api.product._product.EOProduct` :param auth: (optional) authenticated object :type auth: Optional[AuthBase] :param kwargs: download additional kwargs :type kwargs: Union[str, bool, dict] :returns: the returned json status response :rtype: dict """ product.properties["storageStatus"] = STAGING_STATUS order_method = getattr(self.config, "order_method", "GET").upper() ssl_verify = getattr(self.config, "ssl_verify", True) OrderKwargs = TypedDict( "OrderKwargs", {"json": Dict[str, Union[Any, List[str]]]}, total=False ) order_kwargs: OrderKwargs = {} if order_method == "POST": # separate url & parameters parts = urlparse(str(product.properties["orderLink"])) query_dict = parse_qs(parts.query) if not query_dict and parts.query: query_dict = geojson.loads(parts.query) order_url = parts._replace(query=None).geturl() if query_dict: order_kwargs["json"] = query_dict else: order_url = product.properties["orderLink"] order_kwargs = {} headers = {**getattr(self.config, "order_headers", {}), **USER_AGENT} try: with requests.request( method=order_method, url=order_url, auth=auth, timeout=HTTP_REQ_TIMEOUT, headers=headers, verify=ssl_verify, **order_kwargs, ) as response: logger.debug(f"{order_method} {order_url} {headers} {order_kwargs}") try: response.raise_for_status() ordered_message = response.text logger.debug(ordered_message) product.properties["storageStatus"] = STAGING_STATUS except RequestException as e: if e.response and hasattr(e.response, "content"): error_message = f"{e.response.content.decode('utf-8')} - {e}" else: error_message = str(e) logger.warning( "%s could not be ordered, request returned %s", product.properties["title"], error_message, ) self._check_auth_exception(e) return self.order_response_process(response, product) except requests.exceptions.Timeout as exc: raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc def order_response_process( self, response: Response, product: EOProduct ) -> Optional[Dict[str, Any]]: """Process order response :param response: The order response :type response: :class:`~requests.Response` :param product: The orderd EO product :type product: :class:`~eodag.api.product._product.EOProduct` :returns: the returned json status response :rtype: dict """ on_response_mm = getattr(self.config, "order_on_response", {}).get( "metadata_mapping", {} ) if not on_response_mm: return None logger.debug("Parsing order response to update product metada-mapping") on_response_mm_jsonpath = mtd_cfg_as_conversion_and_querypath( on_response_mm, ) json_response = response.json() properties_update = properties_from_json( {"json": json_response, "headers": {**response.headers}}, on_response_mm_jsonpath, ) product.properties.update( {k: v for k, v in properties_update.items() if v != NOT_AVAILABLE} ) if "downloadLink" in product.properties: product.remote_location = product.location = product.properties[ "downloadLink" ] logger.debug(f"Product location updated to {product.location}") return json_response def orderDownloadStatus( self, product: EOProduct, auth: Optional[AuthBase] = None, ) -> None: """Send product order status request. It will be executed before each download retry. Product order status request can be configured using the following download plugin parameters: - **order_status**: :class:`~eodag.config.PluginConfig.OrderStatus` Product properties used for order status: - **orderStatusLink**: order status request URL :param product: The ordered EO product :type product: :class:`~eodag.api.product._product.EOProduct` :param auth: (optional) authenticated object :type auth: Optional[AuthBase] :param kwargs: download additional kwargs :type kwargs: Union[str, bool, dict] """ status_config = getattr(self.config, "order_status", {}) success_code: Optional[int] = status_config.get("success", {}).get("http_code") def _request( url: str, method: str = "GET", headers: Optional[Dict[str, Any]] = None, json: Optional[Any] = None, timeout: int = HTTP_REQ_TIMEOUT, ) -> Response: """Send request and handle allow redirects""" logger.debug(f"{method} {url} {headers} {json}") try: response = requests.request( method=method, url=url, auth=auth, timeout=timeout, headers={**(headers or {}), **USER_AGENT}, allow_redirects=False, # Redirection is manually handled json=json, ) logger.debug( f"Order download status request responded with {response.status_code}" ) response.raise_for_status() # Raise an exception if status code indicates an error # Handle redirection (if needed) if ( 300 <= response.status_code < 400 and response.status_code != success_code ): # cf: https://www.rfc-editor.org/rfc/rfc9110.html#name-303-see-other if response.status_code == 303: method = "GET" if new_url := response.headers.get("Location"): return _request(new_url, method, headers, json, timeout) return response except requests.exceptions.Timeout as exc: raise TimeOutError(exc, timeout=timeout) from exc status_request: Dict[str, Any] = status_config.get("request", {}) status_request_method = str(status_request.get("method", "GET")).upper() if status_request_method == "POST": # separate url & parameters parts = urlparse(str(product.properties["orderStatusLink"])) status_url = parts._replace(query=None).geturl() query_dict = parse_qs(parts.query) if not query_dict and parts.query: query_dict: Any = geojson.loads(parts.query) json_data = query_dict if query_dict else None else: status_url = product.properties["orderStatusLink"] json_data = None try: response = _request( status_url, status_request_method, status_request.get("headers"), json_data, ) json_response = response.json() if not isinstance(json_response, dict): raise RequestException("response content is not a dict") status_dict: Dict[str, Any] = json_response except RequestException as e: raise DownloadError( "%s order status could not be checked, request returned %s" % ( product.properties["title"], e, ) ) from e status_mm = status_config.get("metadata_mapping", {}) status_mm_jsonpath = ( mtd_cfg_as_conversion_and_querypath( status_mm, ) if status_mm else {} ) logger.debug("Parsing order status response") status_dict = properties_from_json( {"json": response.json(), "headers": {**response.headers}}, status_mm_jsonpath, ) # display progress percentage if "percent" in status_dict: status_percent = str(status_dict["percent"]) if status_percent.isdigit(): status_percent += "%" logger.info(f"{product.properties['title']} order status: {status_percent}") status_message = status_dict.get("message") product.properties["orderStatus"] = status_dict.get("status") # handle status error errors: Dict[str, Any] = status_config.get("error", {}) if errors and errors.items() <= status_dict.items(): raise DownloadError( f"Provider {product.provider} returned: {status_dict.get('error_message', status_message)}" ) success_status: Dict[str, Any] = status_config.get("success", {}).get("status") # if not success if (success_status and success_status != status_dict.get("status")) or ( success_code and success_code != response.status_code ): error = NotAvailableError(status_message) raise error product.properties["storageStatus"] = ONLINE_STATUS config_on_success: Dict[str, Any] = status_config.get("on_success", {}) if not config_on_success: # Nothing left to do return None # need search on success ? if config_on_success.get("need_search"): logger.debug(f"Search for new location: {product.properties['searchLink']}") try: response = _request(product.properties["searchLink"]) except RequestException as e: logger.warning( "%s order status could not be checked, request returned %s", product.properties["title"], e, ) return None result_type = config_on_success.get("result_type", "json") result_entry = config_on_success.get("results_entry") on_success_mm = config_on_success.get("metadata_mapping", {}) on_success_mm_querypath = ( mtd_cfg_as_conversion_and_querypath( on_success_mm, ) if on_success_mm else {} ) try: if result_type == "xml": if not result_entry: raise MisconfiguredError( '"result_entry" is required with "result_type" "xml"' 'in "order_status.on_success"' ) root_node = etree.fromstring(response.content) namespaces = {k or "ns": v for k, v in root_node.nsmap.items()} results = [ etree.tostring(entry) for entry in root_node.xpath( result_entry, namespaces=namespaces, ) ] if len(results) != 1: raise DownloadError( "Could not get a single result after order success for " f"{product.properties['searchLink']} request. " f"Please search and download {product} again" ) assert isinstance(results, list), "results must be in a list" # single result result = results[0] if on_success_mm_querypath: properties_update = properties_from_xml( result, on_success_mm_querypath, ) else: properties_update = {} else: if result_entry: entry_jsonpath = string_to_jsonpath(result_entry, force=True) json_response = entry_jsonpath.find(response.json()) raise NotImplementedError( 'result_entry in config.on_success is not yet supported for result_type "json"' ) else: json_response = response.json() if on_success_mm_querypath: logger.debug( "Parsing on-success metadata-mapping using order status response" ) properties_update = properties_from_json( {"json": json_response, "headers": {**response.headers}}, on_success_mm_querypath, ) else: properties_update = {} except Exception as e: if isinstance(e, DownloadError): raise logger.debug(e) raise DownloadError( f"Could not parse result after order success for {product.properties['searchLink']}" f" request. Please search and download {product} again" ) from e # update product product.properties.update(properties_update) if "downloadLink" in properties_update: product.location = product.remote_location = product.properties[ "downloadLink" ] else: self.order_response_process(response, product) def download( self, product: EOProduct, auth: Optional[Union[AuthBase, Dict[str, str]]] = None, progress_callback: Optional[ProgressCallback] = None, wait: int = DEFAULT_DOWNLOAD_WAIT, timeout: int = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> Optional[str]: """Download a product using HTTP protocol. The downloaded product is assumed to be a Zip file. If it is not, the user is warned, it is renamed to remove the zip extension and no further treatment is done (no extraction) """ if auth is not None and not isinstance(auth, AuthBase): raise MisconfiguredError(f"Incompatible auth plugin: {type(auth)}") if progress_callback is None: logger.info( "Progress bar unavailable, please call product.download() instead of plugin.download()" ) progress_callback = ProgressCallback(disable=True) outputs_extension = getattr(self.config, "products", {}).get( product.product_type, {} ).get("outputs_extension", None) or getattr( self.config, "outputs_extension", ".zip" ) kwargs["outputs_extension"] = kwargs.get("outputs_extension", outputs_extension) fs_path, record_filename = self._prepare_download( product, progress_callback=progress_callback, **kwargs, ) if not fs_path or not record_filename: if fs_path: product.location = path_to_uri(fs_path) return fs_path # download assets if exist instead of remote_location if len(product.assets) > 0 and not getattr(self.config, "ignore_assets", False): try: fs_path = self._download_assets( product, fs_path.replace(".zip", ""), record_filename, auth, progress_callback, **kwargs, ) if kwargs.get("asset", None) is None: product.location = path_to_uri(fs_path) return fs_path except NotAvailableError as e: if kwargs.get("asset", None) is not None: raise NotAvailableError(e).with_traceback(e.__traceback__) else: pass url = product.remote_location @self._download_retry(product, wait, timeout) def download_request( product: EOProduct, auth: AuthBase, progress_callback: ProgressCallback, wait: int, timeout: int, **kwargs: Unpack[DownloadConf], ) -> None: chunks = self._stream_download(product, auth, progress_callback, **kwargs) with open(fs_path, "wb") as fhandle: for chunk in chunks: fhandle.write(chunk) download_request(product, auth, progress_callback, wait, timeout, **kwargs) with open(record_filename, "w") as fh: fh.write(url) logger.debug("Download recorded in %s", record_filename) # Check that the downloaded file is really a zip file if not zipfile.is_zipfile(fs_path) and outputs_extension == ".zip": logger.warning( "Downloaded product is not a Zip File. Please check its file type before using it" ) new_fs_path = os.path.join( os.path.dirname(fs_path), sanitize(product.properties["title"]), ) if os.path.isfile(fs_path) and not tarfile.is_tarfile(fs_path): if not os.path.isdir(new_fs_path): os.makedirs(new_fs_path) shutil.move(fs_path, new_fs_path) file_path = os.path.join(new_fs_path, os.path.basename(fs_path)) new_file_path = file_path[: file_path.index(".zip")] shutil.move(file_path, new_file_path) # in the case where the outputs extension has not been set # to ".tar" in the product type nor provider configuration elif tarfile.is_tarfile(fs_path): if not new_fs_path.endswith(".tar"): new_fs_path += ".tar" shutil.move(fs_path, new_fs_path) kwargs["outputs_extension"] = ".tar" product_path = self._finalize( new_fs_path, progress_callback=progress_callback, **kwargs, ) product.location = path_to_uri(product_path) return product_path else: # not a file (dir with zip extension) shutil.move(fs_path, new_fs_path) product.location = path_to_uri(new_fs_path) return new_fs_path if os.path.isfile(fs_path) and not ( zipfile.is_zipfile(fs_path) or tarfile.is_tarfile(fs_path) ): new_fs_path = os.path.join( os.path.dirname(fs_path), sanitize(product.properties["title"]), ) if not os.path.isdir(new_fs_path): os.makedirs(new_fs_path) shutil.move(fs_path, new_fs_path) product.location = path_to_uri(new_fs_path) return new_fs_path product_path = self._finalize( fs_path, progress_callback=progress_callback, **kwargs, ) product.location = path_to_uri(product_path) return product_path def _check_stream_size(self, product: EOProduct) -> int: stream_size = int(self.stream.headers.get("content-length", 0)) if ( stream_size == 0 and "storageStatus" in product.properties and product.properties["storageStatus"] != ONLINE_STATUS ): raise NotAvailableError( "%s(initially %s) ordered, got: %s" % ( product.properties["title"], product.properties["storageStatus"], self.stream.reason, ) ) return stream_size def _check_product_filename(self, product: EOProduct) -> str: filename = None asset_content_disposition = self.stream.headers.get("content-disposition", None) if asset_content_disposition: filename = cast( Optional[str], parse_header(asset_content_disposition).get_param("filename", None), ) if not filename: # default filename extracted from path filename = str(os.path.basename(self.stream.url)) filename_extension = os.path.splitext(filename)[1] if not filename_extension: if content_type := getattr(product, "headers", {}).get("Content-Type"): ext = guess_extension(content_type) if ext: filename += ext else: outputs_extension: Optional[str] = ( getattr(self.config, "products", {}) .get(product.product_type, {}) .get("outputs_extension") ) if outputs_extension: filename += outputs_extension return filename def _stream_download_dict( self, product: EOProduct, auth: Optional[Union[AuthBase, Dict[str, str]]] = None, progress_callback: Optional[ProgressCallback] = None, wait: int = DEFAULT_DOWNLOAD_WAIT, timeout: int = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> StreamResponse: r""" Returns dictionnary of :class:`~fastapi.responses.StreamingResponse` keyword-arguments. It contains a generator to streamed download chunks and the response headers. :param product: The EO product to download :type product: :class:`~eodag.api.product._product.EOProduct` :param auth: (optional) authenticated object :type auth: Optional[Union[AuthBase, Dict[str, str]]] :param progress_callback: (optional) A progress callback :type progress_callback: :class:`~eodag.utils.ProgressCallback` :param wait: (optional) If download fails, wait time in minutes between two download tries :type wait: int :param timeout: (optional) If download fails, maximum time in minutes before stop retrying to download :type timeout: int :param kwargs: `outputs_prefix` (str), `extract` (bool), `delete_archive` (bool) and `dl_url_params` (dict) can be provided as additional kwargs and will override any other values defined in a configuration file or with environment variables. :type kwargs: Union[str, bool, dict] :returns: Dictionnary of :class:`~fastapi.responses.StreamingResponse` keyword-arguments :rtype: dict """ if auth is not None and not isinstance(auth, AuthBase): raise MisconfiguredError(f"Incompatible auth plugin: {type(auth)}") # download assets if exist instead of remote_location if len(product.assets) > 0 and not getattr(self.config, "ignore_assets", False): try: assets_values = product.assets.get_values(kwargs.get("asset", None)) chunks_tuples = self._stream_download_assets( product, auth, progress_callback, assets_values=assets_values, **kwargs, ) if len(assets_values) == 1: # start reading chunks to set asset.headers first_chunks_tuple = next(chunks_tuples) # update headers assets_values[0].headers[ "content-disposition" ] = f"attachment; filename={assets_values[0].filename}" if assets_values[0].get("type", None): assets_values[0].headers["content-type"] = assets_values[0][ "type" ] return StreamResponse( content=chain(iter([first_chunks_tuple]), chunks_tuples), headers=assets_values[0].headers, ) else: outputs_filename = ( sanitize(product.properties["title"]) if "title" in product.properties else sanitize(product.properties.get("id", "download")) ) return StreamResponse( content=stream_zip(chunks_tuples), media_type="application/zip", headers={ "content-disposition": f"attachment; filename={outputs_filename}.zip", }, ) except NotAvailableError as e: if kwargs.get("asset", None) is not None: raise NotAvailableError(e).with_traceback(e.__traceback__) else: pass chunks = self._stream_download(product, auth, progress_callback, **kwargs) # start reading chunks to set product.headers try: first_chunk = next(chunks) except StopIteration: # product is empty file logger.warning("product %s is empty", product.properties["id"]) return StreamResponse(content=chain(iter([]))) return StreamResponse( content=chain(iter([first_chunk]), chunks), headers=product.headers, ) def _check_auth_exception(self, e: Optional[RequestException]) -> None: # check if error is identified as auth_error in provider conf auth_errors = getattr(self.config, "auth_error_code", [None]) if not isinstance(auth_errors, list): auth_errors = [auth_errors] response_text = ( e.response.text.strip() if e is not None and e.response is not None else "" ) if ( e is not None and e.response is not None and e.response.status_code in auth_errors ): raise AuthenticationError( "HTTP Error %s returned, %s\nPlease check your credentials for %s" % ( e.response.status_code, response_text, self.provider, ) ) def _process_exception( self, e: Optional[RequestException], product: EOProduct, ordered_message: str ) -> None: self._check_auth_exception(e) response_text = ( e.response.text.strip() if e is not None and e.response is not None else "" ) # product not available if product.properties.get("storageStatus", ONLINE_STATUS) != ONLINE_STATUS: msg = ( ordered_message if ordered_message and not response_text else response_text ) raise NotAvailableError( "%s(initially %s) requested, returned: %s" % ( product.properties["title"], product.properties["storageStatus"], msg, ) ) else: import traceback as tb if e: logger.error( "Error while getting resource :\n%s\n%s", tb.format_exc(), response_text, ) else: logger.error("Error while getting resource :\n%s", tb.format_exc()) def _stream_download( self, product: EOProduct, auth: Optional[AuthBase] = None, progress_callback: Optional[ProgressCallback] = None, **kwargs: Unpack[DownloadConf], ) -> Iterator[Any]: """ fetches a zip file containing the assets of a given product as a stream and returns a generator yielding the chunks of the file :param product: product for which the assets should be downloaded :type product: :class:`~eodag.api.product._product.EOProduct` :param auth: The configuration of a plugin of type Authentication :type auth: Optional[Union[AuthBase, Dict[str, str]]] :param progress_callback: A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :type progress_callback: :class:`~eodag.utils.ProgressCallback` :param kwargs: additional arguments :type kwargs: dict """ if progress_callback is None: logger.info("Progress bar unavailable, please call product.download()") progress_callback = ProgressCallback(disable=True) ordered_message = "" if ( "orderLink" in product.properties and product.properties.get("storageStatus") == OFFLINE_STATUS and not product.properties.get("orderStatus") ): self.orderDownload(product=product, auth=auth) if ( product.properties.get("orderStatusLink", None) and product.properties.get("storageStatus") != ONLINE_STATUS ): self.orderDownloadStatus(product=product, auth=auth) params = kwargs.pop("dl_url_params", None) or getattr( self.config, "dl_url_params", {} ) req_method = ( product.properties.get("downloadMethod", "").lower() or getattr(self.config, "method", "GET").lower() ) url = product.remote_location if req_method == "post": # separate url & parameters parts = urlparse(url) query_dict = parse_qs(parts.query) if not query_dict and parts.query: query_dict = geojson.loads(parts.query) req_url = parts._replace(query=None).geturl() req_kwargs: Dict[str, Any] = {"json": query_dict} if query_dict else {} else: req_url = url req_kwargs = {} if req_url.startswith(NOT_AVAILABLE): raise NotAvailableError("Download link is not available") s = requests.Session() with s.request( req_method, req_url, stream=True, auth=auth, params=params, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, **req_kwargs, ) as self.stream: try: self.stream.raise_for_status() except requests.exceptions.Timeout as exc: raise TimeOutError( exc, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT ) from exc except RequestException as e: self._process_exception(e, product, ordered_message) else: # check if product was ordered if getattr( self.stream, "status_code", None ) is not None and self.stream.status_code == getattr( self.config, "order_status", {} ).get( "ordered", {} ).get( "http_code" ): product.properties["storageStatus"] = "ORDERED" self._process_exception(None, product, ordered_message) stream_size = self._check_stream_size(product) or None product.headers = self.stream.headers filename = self._check_product_filename(product) or None product.headers[ "content-disposition" ] = f"attachment; filename={filename}" content_type = product.headers.get("Content-Type") if filename and not content_type: product.headers["Content-Type"] = guess_file_type(filename) progress_callback.reset(total=stream_size) for chunk in self.stream.iter_content(chunk_size=64 * 1024): if chunk: progress_callback(len(chunk)) yield chunk def _stream_download_assets( self, product: EOProduct, auth: Optional[AuthBase] = None, progress_callback: Optional[ProgressCallback] = None, assets_values: List[Asset] = [], **kwargs: Unpack[DownloadConf], ) -> Iterator[Tuple[str, datetime, int, Any, Iterator[Any]]]: if progress_callback is None: logger.info("Progress bar unavailable, please call product.download()") progress_callback = ProgressCallback(disable=True) assets_urls = [ a["href"] for a in getattr(product, "assets", {}).values() if "href" in a ] if not assets_urls: raise NotAvailableError("No assets available for %s" % product) # get extra parameters to pass to the query params = kwargs.pop("dl_url_params", None) or getattr( self.config, "dl_url_params", {} ) total_size = self._get_asset_sizes(assets_values, auth, params) or None progress_callback.reset(total=total_size) def get_chunks(stream: Response) -> Any: for chunk in stream.iter_content(chunk_size=64 * 1024): if chunk: progress_callback(len(chunk)) yield chunk # zipped files properties modified_at = datetime.now() perms = 0o600 # loop for assets paths and get common_subdir asset_rel_paths_list = [] for asset in assets_values: asset_rel_path_parts = urlparse(asset["href"]).path.strip("/").split("/") asset_rel_path_parts_sanitized = [ sanitize(part) for part in asset_rel_path_parts ] asset.rel_path = os.path.join(*asset_rel_path_parts_sanitized) asset_rel_paths_list.append(asset.rel_path) if asset_rel_paths_list: assets_common_subdir = os.path.commonpath(asset_rel_paths_list) # product conf overrides provider conf for "flatten_top_dirs" product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) flatten_top_dirs = product_conf.get( "flatten_top_dirs", getattr(self.config, "flatten_top_dirs", False) ) ssl_verify = getattr(self.config, "ssl_verify", True) # loop for assets download for asset in assets_values: if asset["href"].startswith("file:"): logger.info( f"Local asset detected. Download skipped for {asset['href']}" ) continue with requests.get( asset["href"], stream=True, auth=auth, params=params, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, verify=ssl_verify, ) as stream: try: stream.raise_for_status() except requests.exceptions.Timeout as exc: raise TimeOutError( exc, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT ) from exc except RequestException as e: raise_errors = True if len(assets_values) == 1 else False self._handle_asset_exception(e, asset, raise_errors=raise_errors) else: asset_rel_path = ( asset.rel_path.replace(assets_common_subdir, "").strip(os.sep) if flatten_top_dirs else asset.rel_path ) asset_rel_dir = os.path.dirname(asset_rel_path) if not getattr(asset, "filename", None): # try getting filename in GET header if was not found in HEAD result asset_content_disposition = stream.headers.get( "content-disposition", None ) if asset_content_disposition: asset.filename = cast( Optional[str], parse_header(asset_content_disposition).get_param( "filename", None ), ) if not getattr(asset, "filename", None): # default filename extracted from path asset.filename = os.path.basename(asset.rel_path) asset.rel_path = os.path.join( asset_rel_dir, cast(str, asset.filename) ) if len(assets_values) == 1: # apply headers to asset product.assets[assets_values[0].key].headers = stream.headers yield from get_chunks(stream) else: # several assets to zip yield ( asset.rel_path, modified_at, perms, ZIP_AUTO(asset.size), get_chunks(stream), ) def _download_assets( self, product: EOProduct, fs_dir_path: str, record_filename: str, auth: Optional[AuthBase] = None, progress_callback: Optional[ProgressCallback] = None, **kwargs: Unpack[DownloadConf], ) -> str: """Download product assets if they exist""" if progress_callback is None: logger.info("Progress bar unavailable, please call product.download()") progress_callback = ProgressCallback(disable=True) assets_urls = [ a["href"] for a in getattr(product, "assets", {}).values() if "href" in a ] if not assets_urls: raise NotAvailableError("No assets available for %s" % product) assets_values = product.assets.get_values(kwargs.get("asset", None)) chunks_tuples = self._stream_download_assets( product, auth, progress_callback, assets_values=assets_values, **kwargs ) # remove existing incomplete file if os.path.isfile(fs_dir_path): os.remove(fs_dir_path) # create product dest dir if not os.path.isdir(fs_dir_path): os.makedirs(fs_dir_path) # product conf overrides provider conf for "flatten_top_dirs" product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) flatten_top_dirs = product_conf.get( "flatten_top_dirs", getattr(self.config, "flatten_top_dirs", False) ) # count local assets local_assets_count = 0 for asset in assets_values: if asset["href"].startswith("file:"): local_assets_count += 1 continue if len(assets_values) == 1 and local_assets_count == 0: # start reading chunks to set asset.rel_path first_chunks_tuple = next(chunks_tuples) chunks = chain(iter([first_chunks_tuple]), chunks_tuples) chunks_tuples = [(assets_values[0].rel_path, None, None, None, chunks)] for chunk_tuple in chunks_tuples: asset_path = chunk_tuple[0] asset_chunks = chunk_tuple[4] asset_abs_path = os.path.join(fs_dir_path, asset_path) asset_abs_path_temp = asset_abs_path + "~" # create asset subdir if not exist asset_abs_path_dir = os.path.dirname(asset_abs_path) if not os.path.isdir(asset_abs_path_dir): os.makedirs(asset_abs_path_dir) # remove temporary file if os.path.isfile(asset_abs_path_temp): os.remove(asset_abs_path_temp) if not os.path.isfile(asset_abs_path): logger.debug("Downloading to temporary file '%s'", asset_abs_path_temp) with open(asset_abs_path_temp, "wb") as fhandle: for chunk in asset_chunks: if chunk: fhandle.write(chunk) progress_callback(len(chunk)) logger.debug( "Download completed. Renaming temporary file '%s' to '%s'", os.path.basename(asset_abs_path_temp), os.path.basename(asset_abs_path), ) os.rename(asset_abs_path_temp, asset_abs_path) # only one local asset if local_assets_count == len(assets_urls) and local_assets_count == 1: # remove empty {fs_dir_path} shutil.rmtree(fs_dir_path) # and return assets_urls[0] path fs_dir_path = uri_to_path(assets_urls[0]) # several local assets elif local_assets_count == len(assets_urls) and local_assets_count > 0: common_path = os.path.commonpath([uri_to_path(uri) for uri in assets_urls]) # remove empty {fs_dir_path} shutil.rmtree(fs_dir_path) # and return assets_urls common path fs_dir_path = common_path # no assets downloaded but some should have been elif len(os.listdir(fs_dir_path)) == 0: raise NotAvailableError("No assets could be downloaded") # flatten directory structure if flatten_top_dirs: flatten_top_directories(fs_dir_path) if kwargs.get("asset", None) is None: # save hash/record file with open(record_filename, "w") as fh: fh.write(product.remote_location) logger.debug("Download recorded in %s", record_filename) return fs_dir_path def _handle_asset_exception( self, e: RequestException, asset: Asset, raise_errors: bool = False ) -> None: # check if error is identified as auth_error in provider conf auth_errors = getattr(self.config, "auth_error_code", [None]) if not isinstance(auth_errors, list): auth_errors = [auth_errors] if e.response is not None and e.response.status_code in auth_errors: raise AuthenticationError( "HTTP Error %s returned, %s\nPlease check your credentials for %s" % ( e.response.status_code, e.response.text.strip(), self.provider, ) ) elif raise_errors: raise DownloadError(e) else: logger.warning("Unexpected error: %s" % e) logger.warning("Skipping %s" % asset["href"]) def _get_asset_sizes( self, assets_values: List[Asset], auth: Optional[AuthBase], params: Optional[Dict[str, str]], zipped: bool = False, ) -> int: total_size = 0 ssl_verify = getattr(self.config, "ssl_verify", True) # loop for assets size & filename for asset in assets_values: if not asset["href"].startswith("file:"): # HEAD request for size & filename asset_headers = requests.head( asset["href"], auth=auth, headers=USER_AGENT, timeout=HTTP_REQ_TIMEOUT, ).headers if not getattr(asset, "size", 0): # size from HEAD header / Content-length asset.size = int(asset_headers.get("Content-length", 0)) header_content_disposition = Message() if not getattr(asset, "size", 0) or not getattr(asset, "filename", 0): # header content-disposition header_content_disposition = parse_header( asset_headers.get("content-disposition", "") ) if not getattr(asset, "size", 0): # size from HEAD header / content-disposition / size size_str = str(header_content_disposition.get_param("size", 0)) asset.size = int(size_str) if size_str.isdigit() else 0 if not getattr(asset, "filename", 0): # filename from HEAD header / content-disposition / size asset_filename = header_content_disposition.get_param( "filename", None ) asset.filename = str(asset_filename) if asset_filename else None if not getattr(asset, "size", 0): # GET request for size with requests.get( asset["href"], stream=True, auth=auth, params=params, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, verify=ssl_verify, ) as stream: # size from GET header / Content-length asset.size = int(stream.headers.get("Content-length", 0)) if not getattr(asset, "size", 0): # size from GET header / content-disposition / size size_str = str( parse_header( stream.headers.get("content-disposition", "") ).get_param("size", 0) ) asset.size = int(size_str) if size_str.isdigit() else 0 total_size += asset.size return total_size def download_all( self, products: SearchResult, auth: Optional[Union[AuthBase, Dict[str, str]]] = None, downloaded_callback: Optional[DownloadedCallback] = None, progress_callback: Optional[ProgressCallback] = None, wait: int = DEFAULT_DOWNLOAD_WAIT, timeout: int = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ): """ Download all using parent (base plugin) method """ return super(HTTPDownload, self).download_all( products, auth=auth, downloaded_callback=downloaded_callback, progress_callback=progress_callback, wait=wait, timeout=timeout, **kwargs, )