Source code for eodag.plugins.download.aws

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import re
from pathlib import Path

import boto3
import requests
from botocore.exceptions import ClientError, ProfileNotFound
from botocore.handlers import disable_signing
from lxml import etree

from eodag.api.product.metadata_mapping import (
    mtd_cfg_as_conversion_and_querypath,
    properties_from_json,
    properties_from_xml,
)
from eodag.plugins.download.base import Download
from eodag.utils import (
    USER_AGENT,
    ProgressCallback,
    flatten_top_directories,
    get_bucket_name_and_prefix,
    path_to_uri,
    rename_subfolder,
)
from eodag.utils.exceptions import AuthenticationError, DownloadError, NotAvailableError
from eodag.utils.stac_reader import HTTP_REQ_TIMEOUT

logger = logging.getLogger("eodag.plugins.download.aws")

# AWS chunk path identify patterns

# S2 L2A Tile files -----------------------------------------------------------
S2L2A_TILE_IMG_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/R(?P<res>[0-9]+m)/(?P<file>[A-Z0-9_]+)\.jp2$"
)
S2L2A_TILE_AUX_DIR_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/auxiliary/(?P<file>AUX_.+)$"
)
# S2 L2A QI Masks
S2_TILE_QI_MSK_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/qi/(?P<file_base>.+)_(?P<file_suffix>[0-9]+m\.jp2)$"
)
# S2 L2A QI PVI
S2_TILE_QI_PVI_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/qi/L2A_PVI\.jp2$"
)
# S2 Tile files ---------------------------------------------------------------
S2_TILE_IMG_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/(?P<file>[A-Z0-9_]+\.jp2)$"
)
S2_TILE_PREVIEW_DIR_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/preview/(?P<file>.+)$"
)
S2_TILE_AUX_DIR_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/auxiliary/(?P<file>.+)$"
)
S2_TILE_QI_DIR_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/qi/(?P<file>.+)$"
)
S2_TILE_THUMBNAIL_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/(?P<file>preview\.\w+)$"
)
S2_TILE_MTD_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/(?P<file>metadata\.xml)$"
)
# S2 Tile generic
S2_TILE_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/(?P<file>.+)$"
)
# S2 Product files
S2_PROD_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/(?P<file>.+)$"
)
S2_PROD_DS_MTD_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/datastrip/"
    + r"(?P<num>.+)/(?P<file>metadata\.xml)$"
)
S2_PROD_DS_QI_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/datastrip/"
    + r"(?P<num>.+)/qi/(?P<file>.+)$"
)
S2_PROD_DS_QI_REPORT_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/datastrip/"
    + r"(?P<num>.+)/qi/(?P<filename>.+)_report\.xml$"
)
S2_PROD_INSPIRE_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/"
    + r"(?P<file>inspire\.xml)$"
)
# S2 Product generic
S2_PROD_MTD_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/"
    + r"(?P<file>metadata\.xml)$"
)
# S1 files --------------------------------------------------------------------
S1_CALIB_REGEX = re.compile(
    r"^GRD/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<beam>[A-Z0-9]+)/(?P<prod_pol>[A-Z0-9]+)/"
    + r"(?P<title>[A-Z0-9_]+)/annotation/calibration/"
    + r"(?P<file_prefix>[a-z]+)-(?P<file_beam>[a-z]+)-(?P<file_pol>.+)\.xml$"
)
S1_ANNOT_REGEX = re.compile(
    r"^GRD/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<beam>[A-Z0-9]+)/(?P<prod_pol>[A-Z0-9]+)/"
    + r"(?P<title>[A-Z0-9_]+)/annotation/"
    + r"(?P<file_beam>[a-z]+)-(?P<file_pol>.+)\.xml$"
)
S1_MEAS_REGEX = re.compile(
    r"^GRD/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<beam>[A-Z0-9]+)/(?P<prod_pol>[A-Z0-9]+)/"
    + r"(?P<title>[A-Z0-9_]+)/measurement/"
    + r"(?P<file_beam>[a-z]+)-(?P<file_pol>.+)\.(?P<file_ext>[a-z0-9]+)$"
)
S1_REPORT_REGEX = re.compile(
    r"^GRD/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<beam>[A-Z0-9]+)/(?P<prod_pol>[A-Z0-9]+)/"
    + r"(?P<title>[A-Z0-9_]+)/(?P<file>report-\w+\.pdf)$"
)
# S1 generic
S1_REGEX = re.compile(
    r"^GRD/[0-9]{4}/[0-9]+/[0-9]+/[A-Z0-9]+/[A-Z0-9]+/(?P<title>S1[A-Z0-9_]+)/(?P<file>.+)$"
)
# CBERS4 generic
CBERS4_REGEX = re.compile(
    r"^GRD/[0-9]{4}/[0-9]+/[0-9]+/[A-Z0-9]+/[A-Z0-9]+/(?P<title>S1[A-Z0-9_]+)/(?P<file>.+)$"
)

# S1 image number conf per polarization ---------------------------------------
S1_IMG_NB_PER_POLAR = {
    "SH": {"HH": 1},
    "SV": {"VV": 1},
    "DH": {"HH": 1, "HV": 2},
    "DV": {"VV": 1, "VH": 2},
    "HH": {"HH": 1},
    "HV": {"HV": 1},
    "VV": {"VV": 1},
    "VH": {"VH": 1},
}


[docs]class AwsDownload(Download): """Download on AWS using S3 protocol. :param provider: provider name :type provider: str :param config: Download plugin configuration config.requester_pays: :param config: Download plugin configuration: * ``config.base_uri`` (str) - s3 endpoint url * ``config.requester_pays`` (bool) - whether download is done from a requester-pays bucket or not * ``config.flatten_top_dirs`` (bool) - flatten directory structure * ``config.products`` (dict) - product_type specific configuration :type config: :class:`~eodag.config.PluginConfig` """
[docs] def __init__(self, provider, config): super(AwsDownload, self).__init__(provider, config) self.requester_pays = getattr(self.config, "requester_pays", False) self.s3_session = None
def download(self, product, auth=None, progress_callback=None, **kwargs): """Download method for AWS S3 API. The product can be downloaded as it is, or as SAFE-formatted product. SAFE-build is configured for a given provider and product type. If the product title is configured to be updated during download and SAFE-formatted, its destination path will be: `{outputs_prefix}/{title}/{updated_title}.SAFE` :param product: The EO product to download :type product: :class:`~eodag.api.product._product.EOProduct` :param auth: (optional) The configuration of a plugin of type Authentication :type auth: :class:`~eodag.config.PluginConfig` :param progress_callback: (optional) A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :type progress_callback: :class:`~eodag.utils.ProgressCallback` or None :param kwargs: `outputs_prefix` (str), `extract` (bool), `delete_archive` (bool) and `dl_url_params` (dict) can be provided as additional kwargs and will override any other values defined in a configuration file or with environment variables. :type kwargs: Union[str, bool, dict] :returns: The absolute path to the downloaded product in the local filesystem :rtype: str """ if progress_callback is None: logger.info( "Progress bar unavailable, please call product.download() instead of plugin.download()" ) progress_callback = ProgressCallback(disable=True) # prepare download & create dirs (before updating metadata) product_local_path, record_filename = self._prepare_download( product, progress_callback=progress_callback, **kwargs ) if not product_local_path or not record_filename: if product_local_path: product.location = path_to_uri(product_local_path) return product_local_path product_local_path = product_local_path.replace(".zip", "") # remove existing incomplete file if os.path.isfile(product_local_path): os.remove(product_local_path) # create product dest dir if not os.path.isdir(product_local_path): os.makedirs(product_local_path) product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) build_safe = product_conf.get("build_safe", False) # product conf overrides provider conf for "flatten_top_dirs" flatten_top_dirs = product_conf.get( "flatten_top_dirs", getattr(self.config, "flatten_top_dirs", False) ) # xtra metadata needed for SAFE product if build_safe and "fetch_metadata" in product_conf.keys(): fetch_format = product_conf["fetch_metadata"]["fetch_format"] update_metadata = product_conf["fetch_metadata"]["update_metadata"] fetch_url = product_conf["fetch_metadata"]["fetch_url"].format( **product.properties ) logger.info("Fetching extra metadata from %s" % fetch_url) resp = requests.get(fetch_url, headers=USER_AGENT, timeout=HTTP_REQ_TIMEOUT) update_metadata = mtd_cfg_as_conversion_and_querypath(update_metadata) if fetch_format == "json": json_resp = resp.json() update_metadata = properties_from_json(json_resp, update_metadata) product.properties.update(update_metadata) elif fetch_format == "xml": update_metadata = properties_from_xml(resp.content, update_metadata) product.properties.update(update_metadata) else: logger.warning( "SAFE metadata fetch format %s not implemented" % fetch_format ) # if assets are defined, use them instead of scanning product.location if hasattr(product, "assets") and not getattr( self.config, "ignore_assets", False ): bucket_names_and_prefixes = [] for complementary_url in getattr(product, "assets", {}).values(): bucket_names_and_prefixes.append( self.get_product_bucket_name_and_prefix( product, complementary_url.get("href", "") ) ) else: bucket_names_and_prefixes = [ self.get_product_bucket_name_and_prefix(product) ] # add complementary urls try: for complementary_url_key in product_conf.get("complementary_url_key", []): bucket_names_and_prefixes.append( self.get_product_bucket_name_and_prefix( product, product.properties[complementary_url_key] ) ) except KeyError: logger.warning( "complementary_url_key %s is missing in %s properties" % (complementary_url_key, product.properties["id"]) ) # authenticate authenticated_objects = {} auth_error_messages = set() for idx, pack in enumerate(bucket_names_and_prefixes): try: bucket_name, prefix = pack if bucket_name not in authenticated_objects: # get Prefixes longest common base path common_prefix = "" prefix_split = prefix.split("/") prefixes_in_bucket = len( [p for b, p in bucket_names_and_prefixes if b == bucket_name] ) for i in range(1, len(prefix_split)): common_prefix = "/".join(prefix_split[0:i]) if ( len( [ p for b, p in bucket_names_and_prefixes if b == bucket_name and common_prefix in p ] ) < prefixes_in_bucket ): common_prefix = "/".join(prefix_split[0 : i - 1]) break # connect to aws s3 and get bucket auhenticated objects s3_objects = self.get_authenticated_objects( bucket_name, common_prefix, auth ) authenticated_objects[bucket_name] = s3_objects else: s3_objects = authenticated_objects[bucket_name] except AuthenticationError as e: logger.warning("Unexpected error: %s" % e) logger.warning("Skipping %s/%s" % (bucket_name, prefix)) auth_error_messages.add(str(e)) except ClientError as e: err = e.response["Error"] auth_messages = [ "AccessDenied", "InvalidAccessKeyId", "SignatureDoesNotMatch", ] if err["Code"] in auth_messages and "key" in err["Message"].lower(): raise AuthenticationError( "HTTP error {} returned\n{}: {}\nPlease check your credentials for {}".format( e.response["ResponseMetadata"]["HTTPStatusCode"], err["Code"], err["Message"], self.provider, ) ) logger.warning("Unexpected error: %s" % e) logger.warning("Skipping %s/%s" % (bucket_name, prefix)) auth_error_messages.add(str(e)) # could not auth on any bucket if not authenticated_objects: raise AuthenticationError(", ".join(auth_error_messages)) # downloadable files product_chunks = [] for bucket_name, prefix in bucket_names_and_prefixes: # unauthenticated items filtered out if bucket_name in authenticated_objects.keys(): product_chunks.extend( authenticated_objects[bucket_name].filter(Prefix=prefix) ) unique_product_chunks = set(product_chunks) total_size = sum([p.size for p in unique_product_chunks]) # download progress_callback.reset(total=total_size) try: for product_chunk in unique_product_chunks: try: chunk_rel_path = self.get_chunk_dest_path( product, product_chunk, build_safe=build_safe, ) except NotAvailableError as e: # out of SAFE format chunk logger.warning(e) continue chunk_abs_path = os.path.join(product_local_path, chunk_rel_path) chunk_abs_path_dir = os.path.dirname(chunk_abs_path) if not os.path.isdir(chunk_abs_path_dir): os.makedirs(chunk_abs_path_dir) if not os.path.isfile(chunk_abs_path): product_chunk.Bucket().download_file( product_chunk.key, chunk_abs_path, ExtraArgs=getattr(s3_objects, "_params", {}), Callback=progress_callback, ) except AuthenticationError as e: logger.warning("Unexpected error: %s" % e) logger.warning("Skipping %s/%s" % (bucket_name, prefix)) except ClientError as e: err = e.response["Error"] auth_messages = [ "AccessDenied", "InvalidAccessKeyId", "SignatureDoesNotMatch", ] if err["Code"] in auth_messages and "key" in err["Message"].lower(): raise AuthenticationError( "HTTP error {} returned\n{}: {}\nPlease check your credentials for {}".format( e.response["ResponseMetadata"]["HTTPStatusCode"], err["Code"], err["Message"], self.provider, ) ) logger.warning("Unexpected error: %s" % e) logger.warning("Skipping %s/%s" % (bucket_name, prefix)) # finalize safe product if build_safe and "S2_MSI" in product.product_type: self.finalize_s2_safe_product(product_local_path) # flatten directory structure elif flatten_top_dirs: flatten_top_directories(product_local_path) if build_safe: self.check_manifest_file_list(product_local_path) # save hash/record file with open(record_filename, "w") as fh: fh.write(product.remote_location) logger.debug("Download recorded in %s", record_filename) product.location = path_to_uri(product_local_path) return product_local_path def get_rio_env(self, bucket_name, prefix, auth_dict): """Get rasterio environment variables needed for data access authentication. :param bucket_name: Bucket containg objects :type bucket_name: str :param prefix: Prefix used to try auth :type prefix: str :param auth_dict: Dictionnary containing authentication keys :type auth_dict: dict :returns: The rasterio environement variables :rtype: dict """ if self.s3_session is not None: if self.requester_pays: return {"session": self.s3_session, "requester_pays": True} else: return {"session": self.s3_session} _ = self.get_authenticated_objects(bucket_name, prefix, auth_dict) if self.s3_session is not None: if self.requester_pays: return {"session": self.s3_session, "requester_pays": True} else: return {"session": self.s3_session} else: return {"aws_unsigned": True} def get_authenticated_objects(self, bucket_name, prefix, auth_dict): """Get boto3 authenticated objects for the given bucket using the most adapted auth strategy. Also expose ``s3_session`` as class variable if available. :param bucket_name: Bucket containg objects :type bucket_name: str :param prefix: Prefix used to filter objects on auth try (not used to filter returned objects) :type prefix: str :param auth_dict: Dictionnary containing authentication keys :type auth_dict: dict :returns: The boto3 authenticated objects :rtype: :class:`~boto3.resources.collection.s3.Bucket.objectsCollection` """ auth_methods = [ self._get_authenticated_objects_unsigned, self._get_authenticated_objects_from_auth_profile, self._get_authenticated_objects_from_auth_keys, self._get_authenticated_objects_from_env, ] # skip _get_authenticated_objects_from_env if credentials were filled in eodag conf if auth_dict: del auth_methods[-1] for try_auth_method in auth_methods: try: s3_objects = try_auth_method(bucket_name, prefix, auth_dict) if s3_objects: logger.debug("Auth using %s succeeded", try_auth_method.__name__) return s3_objects except ClientError as e: if e.response.get("Error", {}).get("Code", {}) in [ "AccessDenied", "InvalidAccessKeyId", "SignatureDoesNotMatch", ]: pass else: raise e except ProfileNotFound: pass logger.debug("Auth using %s failed", try_auth_method.__name__) raise AuthenticationError( "Unable do authenticate on s3://%s using any available credendials configuration" % bucket_name ) def _get_authenticated_objects_unsigned(self, bucket_name, prefix, auth_dict): """Auth strategy using no-sign-request""" s3_resource = boto3.resource( service_name="s3", endpoint_url=getattr(self.config, "base_uri", None) ) s3_resource.meta.client.meta.events.register( "choose-signer.s3.*", disable_signing ) objects = s3_resource.Bucket(bucket_name).objects list(objects.filter(Prefix=prefix).limit(1)) return objects def _get_authenticated_objects_from_auth_profile( self, bucket_name, prefix, auth_dict ): """Auth strategy using RequestPayer=requester and ``aws_profile`` from provided credentials""" if "profile_name" in auth_dict.keys(): s3_session = boto3.session.Session(profile_name=auth_dict["profile_name"]) s3_resource = s3_session.resource( service_name="s3", endpoint_url=getattr(self.config, "base_uri", None), ) if self.requester_pays: objects = s3_resource.Bucket(bucket_name).objects.filter( RequestPayer="requester" ) else: objects = s3_resource.Bucket(bucket_name).objects list(objects.filter(Prefix=prefix).limit(1)) self.s3_session = s3_session return objects else: return None def _get_authenticated_objects_from_auth_keys(self, bucket_name, prefix, auth_dict): """Auth strategy using RequestPayer=requester and ``aws_access_key_id``/``aws_secret_access_key`` from provided credentials""" if all(k in auth_dict for k in ("aws_access_key_id", "aws_secret_access_key")): s3_session = boto3.session.Session( aws_access_key_id=auth_dict["aws_access_key_id"], aws_secret_access_key=auth_dict["aws_secret_access_key"], ) s3_resource = s3_session.resource( service_name="s3", endpoint_url=getattr(self.config, "base_uri", None), ) if self.requester_pays: objects = s3_resource.Bucket(bucket_name).objects.filter( RequestPayer="requester" ) else: objects = s3_resource.Bucket(bucket_name).objects list(objects.filter(Prefix=prefix).limit(1)) self.s3_session = s3_session return objects else: return None def _get_authenticated_objects_from_env(self, bucket_name, prefix, auth_dict): """Auth strategy using RequestPayer=requester and current environment""" s3_session = boto3.session.Session() s3_resource = s3_session.resource( service_name="s3", endpoint_url=getattr(self.config, "base_uri", None) ) if self.requester_pays: objects = s3_resource.Bucket(bucket_name).objects.filter( RequestPayer="requester" ) else: objects = s3_resource.Bucket(bucket_name).objects list(objects.filter(Prefix=prefix).limit(1)) self.s3_session = s3_session return objects def get_product_bucket_name_and_prefix(self, product, url=None): """Extract bucket name and prefix from product URL :param product: The EO product to download :type product: :class:`~eodag.api.product._product.EOProduct` :param url: (optional) URL to use as product.location :type url: str :returns: bucket_name and prefix as str :rtype: tuple """ if not url: url = product.location bucket_path_level = getattr(self.config, "bucket_path_level", None) bucket, prefix = get_bucket_name_and_prefix( url=url, bucket_path_level=bucket_path_level ) if bucket is None: bucket = ( getattr(self.config, "products", {}) .get(product.product_type, {}) .get("default_bucket", "") ) return bucket, prefix def check_manifest_file_list(self, product_path): """Checks if products listed in manifest.safe exist""" manifest_path_list = [ os.path.join(d, x) for d, _, f in os.walk(product_path) for x in f if x == "manifest.safe" ] if len(manifest_path_list) == 0: raise FileNotFoundError( f"No manifest.safe could be found in {product_path}" ) else: safe_path = os.path.dirname(manifest_path_list[0]) root = etree.parse(os.path.join(safe_path, "manifest.safe")).getroot() for safe_file in root.xpath("//fileLocation"): safe_file_path = os.path.join(safe_path, safe_file.get("href")) if not os.path.isfile(safe_file_path) and "HTML" in safe_file.get("href"): # add empty files for missing HTML/* Path(safe_file_path).touch() elif not os.path.isfile(safe_file_path): logger.warning("SAFE build: %s is missing" % safe_file.get("href")) def finalize_s2_safe_product(self, product_path): """Add missing dirs to downloaded product""" try: logger.debug("Finalize SAFE product") manifest_path_list = [ os.path.join(d, x) for d, _, f in os.walk(product_path) for x in f if x == "manifest.safe" ] if len(manifest_path_list) == 0: raise FileNotFoundError( f"No manifest.safe could be found in {product_path}" ) else: safe_path = os.path.dirname(manifest_path_list[0]) # create empty missing dirs auxdata_path = os.path.join(safe_path, "AUX_DATA") if not os.path.isdir(auxdata_path): os.makedirs(auxdata_path) html_path = os.path.join(safe_path, "HTML") if not os.path.isdir(html_path): os.makedirs(html_path) repinfo_path = os.path.join(safe_path, "rep_info") if not os.path.isdir(repinfo_path): os.makedirs(repinfo_path) # granule tile dirname root = etree.parse(os.path.join(safe_path, "manifest.safe")).getroot() tile_id = os.path.basename( os.path.dirname( root.xpath("//fileLocation[contains(@href,'MTD_TL.xml')]")[0].get( "href" ) ) ) granule_folder = os.path.join(safe_path, "GRANULE") rename_subfolder(granule_folder, tile_id) # datastrip scene dirname scene_id = os.path.basename( os.path.dirname( root.xpath("//fileLocation[contains(@href,'MTD_DS.xml')]")[0].get( "href" ) ) ) datastrip_folder = os.path.join(safe_path, "DATASTRIP") rename_subfolder(datastrip_folder, scene_id) except Exception as e: logger.exception("Could not finalize SAFE product from downloaded data") raise DownloadError(e) def get_chunk_dest_path(self, product, chunk, dir_prefix=None, build_safe=False): """Get chunk SAFE destination path""" if build_safe: # S2 common if "S2_MSI" in product.product_type: title_search = re.search( r"^\w+_\w+_(\w+)_(\w+)_(\w+)_(\w+)_(\w+)$", product.properties["title"], ) title_date1 = title_search.group(1) if title_search else None title_part3 = title_search.group(4) if title_search else None ds_dir_search = re.search( r"^.+_(DS_\w+_+\w+_\w+)_\w+.\w+$", product.properties.get("originalSceneID", ""), ) ds_dir = ds_dir_search.group(1) if ds_dir_search else 0 s2_processing_level = product.product_type.split("_")[-1] # S1 common elif product.product_type == "S1_SAR_GRD": s1_title_suffix_search = re.search( r"^.+_([A-Z0-9_]+_[A-Z0-9_]+_[A-Z0-9_]+_[A-Z0-9_]+)_\w+$", product.properties["title"], ) s1_title_suffix = ( s1_title_suffix_search.group(1).lower().replace("_", "-") if s1_title_suffix_search else None ) # S2 L2A Tile files ----------------------------------------------- if S2L2A_TILE_IMG_REGEX.match(chunk.key): found_dict = S2L2A_TILE_IMG_REGEX.match(chunk.key).groupdict() product_path = ( "%s.SAFE/GRANULE/%s/IMG_DATA/R%s/T%s%s%s_%s_%s_%s.jp2" % ( product.properties["title"], found_dict["num"], found_dict["res"], found_dict["tile1"], found_dict["tile2"], found_dict["tile3"], title_date1, found_dict["file"], found_dict["res"], ) ) elif S2L2A_TILE_AUX_DIR_REGEX.match(chunk.key): found_dict = S2L2A_TILE_AUX_DIR_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/GRANULE/%s/AUX_DATA/%s" % ( product.properties["title"], found_dict["num"], found_dict["file"], ) # S2 L2A QI Masks elif S2_TILE_QI_MSK_REGEX.match(chunk.key): found_dict = S2_TILE_QI_MSK_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/GRANULE/%s/QI_DATA/MSK_%sPRB_%s" % ( product.properties["title"], found_dict["num"], found_dict["file_base"], found_dict["file_suffix"], ) # S2 L2A QI PVI elif S2_TILE_QI_PVI_REGEX.match(chunk.key): found_dict = S2_TILE_QI_PVI_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/GRANULE/%s/QI_DATA/%s_%s_PVI.jp2" % ( product.properties["title"], found_dict["num"], title_part3, title_date1, ) # S2 Tile files --------------------------------------------------- elif S2_TILE_PREVIEW_DIR_REGEX.match(chunk.key): found_dict = S2_TILE_PREVIEW_DIR_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/GRANULE/%s/preview/%s" % ( product.properties["title"], found_dict["num"], found_dict["file"], ) elif S2_TILE_IMG_REGEX.match(chunk.key): found_dict = S2_TILE_IMG_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/GRANULE/%s/IMG_DATA/T%s%s%s_%s_%s" % ( product.properties["title"], found_dict["num"], found_dict["tile1"], found_dict["tile2"], found_dict["tile3"], title_date1, found_dict["file"], ) elif S2_TILE_THUMBNAIL_REGEX.match(chunk.key): found_dict = S2_TILE_THUMBNAIL_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/GRANULE/%s/%s" % ( product.properties["title"], found_dict["num"], found_dict["file"], ) elif S2_TILE_MTD_REGEX.match(chunk.key): found_dict = S2_TILE_MTD_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/GRANULE/%s/MTD_TL.xml" % ( product.properties["title"], found_dict["num"], ) elif S2_TILE_AUX_DIR_REGEX.match(chunk.key): found_dict = S2_TILE_AUX_DIR_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/GRANULE/%s/AUX_DATA/AUX_%s" % ( product.properties["title"], found_dict["num"], found_dict["file"], ) elif S2_TILE_QI_DIR_REGEX.match(chunk.key): found_dict = S2_TILE_QI_DIR_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/GRANULE/%s/QI_DATA/%s" % ( product.properties["title"], found_dict["num"], found_dict["file"], ) # S2 Tiles generic elif S2_TILE_REGEX.match(chunk.key): found_dict = S2_TILE_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/GRANULE/%s/%s" % ( product.properties["title"], found_dict["num"], found_dict["file"], ) # S2 Product files elif S2_PROD_DS_MTD_REGEX.match(chunk.key): found_dict = S2_PROD_DS_MTD_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/DATASTRIP/%s/MTD_DS.xml" % ( product.properties["title"], ds_dir, ) elif S2_PROD_DS_QI_REPORT_REGEX.match(chunk.key): found_dict = S2_PROD_DS_QI_REPORT_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/DATASTRIP/%s/QI_DATA/%s.xml" % ( product.properties["title"], ds_dir, found_dict["filename"], ) elif S2_PROD_DS_QI_REGEX.match(chunk.key): found_dict = S2_PROD_DS_QI_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/DATASTRIP/%s/QI_DATA/%s" % ( product.properties["title"], ds_dir, found_dict["file"], ) elif S2_PROD_INSPIRE_REGEX.match(chunk.key): found_dict = S2_PROD_INSPIRE_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/INSPIRE.xml" % (product.properties["title"],) elif S2_PROD_MTD_REGEX.match(chunk.key): found_dict = S2_PROD_MTD_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/MTD_MSI%s.xml" % ( product.properties["title"], s2_processing_level, ) # S2 Product generic elif S2_PROD_REGEX.match(chunk.key): found_dict = S2_PROD_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/%s" % ( product.properties["title"], found_dict["file"], ) # S1 -------------------------------------------------------------- elif S1_CALIB_REGEX.match(chunk.key): found_dict = S1_CALIB_REGEX.match(chunk.key).groupdict() product_path = ( "%s.SAFE/annotation/calibration/%s-%s-%s-grd-%s-%s-%03d.xml" % ( product.properties["title"], found_dict["file_prefix"], product.properties["platformSerialIdentifier"].lower(), found_dict["file_beam"], found_dict["file_pol"], s1_title_suffix, S1_IMG_NB_PER_POLAR.get( product.properties["polarizationMode"], {} ).get(found_dict["file_pol"].upper(), 1), ) ) elif S1_ANNOT_REGEX.match(chunk.key): found_dict = S1_ANNOT_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/annotation/%s-%s-grd-%s-%s-%03d.xml" % ( product.properties["title"], product.properties["platformSerialIdentifier"].lower(), found_dict["file_beam"], found_dict["file_pol"], s1_title_suffix, S1_IMG_NB_PER_POLAR.get( product.properties["polarizationMode"], {} ).get(found_dict["file_pol"].upper(), 1), ) elif S1_MEAS_REGEX.match(chunk.key): found_dict = S1_MEAS_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/measurement/%s-%s-grd-%s-%s-%03d.%s" % ( product.properties["title"], product.properties["platformSerialIdentifier"].lower(), found_dict["file_beam"], found_dict["file_pol"], s1_title_suffix, S1_IMG_NB_PER_POLAR.get( product.properties["polarizationMode"], {} ).get(found_dict["file_pol"].upper(), 1), found_dict["file_ext"], ) elif S1_REPORT_REGEX.match(chunk.key): found_dict = S1_REPORT_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/%s.SAFE-%s" % ( product.properties["title"], product.properties["title"], found_dict["file"], ) # S1 generic elif S1_REGEX.match(chunk.key): found_dict = S1_REGEX.match(chunk.key).groupdict() product_path = "%s.SAFE/%s" % ( product.properties["title"], found_dict["file"], ) # out of SAFE format else: raise NotAvailableError( f"Ignored {chunk.key} out of SAFE matching pattern" ) # no SAFE format else: if not dir_prefix: dir_prefix = chunk.key product_path = chunk.key.split(dir_prefix.strip("/") + "/")[-1] logger.debug(f"Downloading {chunk.key} to {product_path}") return product_path def download_all( self, products, auth=None, downloaded_callback=None, progress_callback=None, **kwargs, ): """ download_all using parent (base plugin) method """ return super(AwsDownload, self).download_all( products, auth=auth, downloaded_callback=downloaded_callback, progress_callback=progress_callback, **kwargs, )