# -*- coding: utf-8 -*-
# Copyright 2022, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import geojson
from ecmwfapi import ECMWFDataServer, ECMWFService
from ecmwfapi.api import APIException, Connection, get_apikey_values
from eodag.plugins.apis.base import Api
from eodag.plugins.download.base import Download
from eodag.plugins.search.base import Search
from eodag.plugins.search.build_search_result import BuildPostSearchResult
from eodag.rest.stac import DEFAULT_MISSION_START_DATE
from eodag.utils import (
DEFAULT_DOWNLOAD_TIMEOUT,
DEFAULT_DOWNLOAD_WAIT,
DEFAULT_ITEMS_PER_PAGE,
DEFAULT_PAGE,
get_geometry_from_various,
path_to_uri,
urlsplit,
)
from eodag.utils.exceptions import AuthenticationError, DownloadError
from eodag.utils.logging import get_logging_verbose
if TYPE_CHECKING:
from eodag.api.product import EOProduct
from eodag.api.search_result import SearchResult
from eodag.config import PluginConfig
from eodag.utils import DownloadedCallback, ProgressCallback
logger = logging.getLogger("eodag.apis.ecmwf")
ECMWF_MARS_KNOWN_FORMATS = {"grib": "grib", "netcdf": "nc"}
[docs]class EcmwfApi(Download, Api, BuildPostSearchResult):
"""A plugin that enables to build download-request and download data on ECMWF MARS.
Builds a single ready-to-download :class:`~eodag.api.product._product.EOProduct`
during the search stage.
Download will then be performed on ECMWF Public Datasets (if ``dataset`` parameter
is in query), or on MARS Operational Archive (if ``dataset`` parameter is not in
query).
This class inherits from :class:`~eodag.plugins.apis.base.Api` for compatibility,
:class:`~eodag.plugins.download.base.Download` for download methods, and
:class:`~eodag.plugins.search.qssearch.QueryStringSearch` for metadata-mapping and
query build methods.
"""
[docs] def __init__(self, provider: str, config: PluginConfig) -> None:
# init self.config.metadata_mapping using Search Base plugin
Search.__init__(self, provider, config)
# needed by QueryStringSearch.build_query_string / format_free_text_search
self.config.__dict__.setdefault("free_text_search_operations", {})
# needed for compatibility
self.config.__dict__.setdefault("pagination", {"next_page_query_obj": "{{}}"})
def do_search(self, *args: Any, **kwargs: Any) -> List[Dict[str, Any]]:
"""Should perform the actual search request."""
return [{}]
def query(
self,
product_type: Optional[str] = None,
items_per_page: int = DEFAULT_ITEMS_PER_PAGE,
page: int = DEFAULT_PAGE,
count: bool = True,
**kwargs: Any,
) -> Tuple[List[EOProduct], Optional[int]]:
"""Build ready-to-download SearchResult"""
# check productType, dates, geometry, use defaults if not specified
# productType
if not kwargs.get("productType"):
kwargs["productType"] = "%s_%s_%s" % (
kwargs.get("dataset", "mars"),
kwargs.get("type", ""),
kwargs.get("levtype", ""),
)
# start date
if "startTimeFromAscendingNode" not in kwargs:
kwargs["startTimeFromAscendingNode"] = (
getattr(self.config, "product_type_config", {}).get(
"missionStartDate", None
)
or DEFAULT_MISSION_START_DATE
)
# end date
if "completionTimeFromAscendingNode" not in kwargs:
kwargs["completionTimeFromAscendingNode"] = getattr(
self.config, "product_type_config", {}
).get("missionEndDate", None) or datetime.utcnow().isoformat(
timespec="seconds"
)
# geometry
if "geometry" in kwargs:
kwargs["geometry"] = get_geometry_from_various(geometry=kwargs["geometry"])
return BuildPostSearchResult.query(
self, items_per_page=items_per_page, page=page, count=count, **kwargs
)
def authenticate(self) -> Dict[str, Optional[str]]:
"""Check credentials and returns information needed for auth
:returns: {key, url, email} dictionary
:rtype: dict
:raises: :class:`~eodag.utils.exceptions.AuthenticationError`
"""
# Get credentials from eodag or using ecmwf conf
email = getattr(self.config, "credentials", {}).get("username", None)
key = getattr(self.config, "credentials", {}).get("password", None)
url = getattr(self.config, "api_endpoint", None)
if not all([email, key, url]):
key, url, email = get_apikey_values()
# APIRequest to check credentials
ecmwf_connection = Connection(
url=url,
email=email,
key=key,
)
try:
ecmwf_connection.call("{}/{}".format(url, "who-am-i"))
logger.debug("Credentials checked on ECMWF")
except APIException as e:
logger.error(e)
raise AuthenticationError("Please check your ECMWF credentials.")
return {"key": key, "url": url, "email": email}
def download(
self,
product: EOProduct,
auth: Optional[PluginConfig] = None,
progress_callback: Optional[ProgressCallback] = None,
wait: int = DEFAULT_DOWNLOAD_WAIT,
timeout: int = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Any,
) -> Optional[str]:
"""Download data from ECMWF MARS"""
product_format = product.properties.get("format", "grib")
product_extension = ECMWF_MARS_KNOWN_FORMATS.get(product_format, product_format)
# Prepare download
fs_path, record_filename = self._prepare_download(
product,
progress_callback=progress_callback,
outputs_extension=f".{product_extension}",
**kwargs,
)
if not fs_path or not record_filename:
if fs_path:
product.location = path_to_uri(fs_path)
return fs_path
# get download request dict from product.location/downloadLink url query string
# separate url & parameters
download_request = geojson.loads(urlsplit(product.location).query)
# Set verbosity
eodag_verbosity = get_logging_verbose()
if eodag_verbosity is not None and eodag_verbosity >= 3:
# debug verbosity
ecmwf_verbose = True
ecmwf_log = logger.debug
else:
# default verbosity
ecmwf_verbose = False
ecmwf_log = logger.info
auth_dict = self.authenticate()
# Send download request to ECMWF web API
logger.info("Request download on ECMWF: %s" % download_request)
try:
if "dataset" in download_request and not download_request[
"dataset"
].startswith("mars_"):
# Public dataset
ecmwf_server = ECMWFDataServer(
verbose=ecmwf_verbose, log=ecmwf_log, **auth_dict
)
ecmwf_server.retrieve(dict(download_request, **{"target": fs_path}))
else:
# Operational Archive
ecmwf_server = ECMWFService(
service="mars", verbose=ecmwf_verbose, log=ecmwf_log, **auth_dict
)
download_request.pop("dataset", None)
ecmwf_server.execute(download_request, fs_path)
except APIException as e:
logger.error(e)
raise DownloadError(e)
with open(record_filename, "w") as fh:
fh.write(product.properties["downloadLink"])
logger.debug("Download recorded in %s", record_filename)
# do not try to extract or delete grib/netcdf
kwargs["extract"] = False
product_path = self._finalize(
fs_path,
progress_callback=progress_callback,
outputs_extension=f".{product_extension}",
**kwargs,
)
product.location = path_to_uri(product_path)
return product_path
def download_all(
self,
products: SearchResult,
auth: Optional[PluginConfig] = None,
downloaded_callback: Optional[DownloadedCallback] = None,
progress_callback: Optional[ProgressCallback] = None,
wait: int = DEFAULT_DOWNLOAD_WAIT,
timeout: int = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Any,
) -> List[str]:
"""
Download all using parent (base plugin) method
"""
return super(EcmwfApi, self).download_all(
products,
auth=auth,
downloaded_callback=downloaded_callback,
progress_callback=progress_callback,
wait=wait,
timeout=timeout,
**kwargs,
)