Source code for eodag.plugins.search.csw

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import logging
import re
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

import pyproj
from owslib.csw import CatalogueServiceWeb
from owslib.fes import (
    BBox,
    PropertyIsEqualTo,
    PropertyIsGreaterThanOrEqualTo,
    PropertyIsLessThanOrEqualTo,
    PropertyIsLike,
)
from owslib.ows import ExceptionReport
from shapely import geometry, wkt

from eodag.api.product import EOProduct
from eodag.api.product.metadata_mapping import properties_from_xml
from eodag.plugins.search.base import Search
from eodag.utils import DEFAULT_ITEMS_PER_PAGE, DEFAULT_PAGE, DEFAULT_PROJ
from eodag.utils.import_system import patch_owslib_requests

if TYPE_CHECKING:
    from owslib.fes import OgcExpression

    from eodag.config import PluginConfig


logger = logging.getLogger("eodag.search.csw")

SUPPORTED_REFERENCE_SCHEMES = ["WWW:DOWNLOAD-1.0-http--download"]


[docs]class CSWSearch(Search): """A plugin for implementing search based on OGC CSW"""
[docs] def __init__(self, provider: str, config: PluginConfig) -> None: super(CSWSearch, self).__init__(provider, config) self.catalog = None
def clear(self) -> None: """Clear search context""" super().clear() self.catalog = None def query( self, product_type: Optional[str] = None, items_per_page: int = DEFAULT_ITEMS_PER_PAGE, page: int = DEFAULT_PAGE, count: bool = True, **kwargs: Any, ) -> Tuple[List[EOProduct], Optional[int]]: """Perform a search on a OGC/CSW-like interface""" product_type = kwargs.get("productType") if product_type is None: return [], 0 auth = kwargs.get("auth") if auth: self.__init_catalog(**getattr(auth.config, "credentials", {})) else: self.__init_catalog() results: List[EOProduct] = [] if self.catalog: provider_product_type = self.config.products[product_type]["productType"] for product_type_def in self.config.search_definition["product_type_tags"]: product_type_search_tag = product_type_def["name"] logger.debug( "Querying <%s> tag for product type %s", product_type_search_tag, provider_product_type, ) constraints = self.__convert_query_params( product_type_def, provider_product_type, kwargs ) with patch_owslib_requests(verify=True): try: self.catalog.getrecords2( constraints=constraints, esn="full", maxrecords=10 ) except ExceptionReport: import traceback as tb logger.warning( "Failed to query %s for product type %s : %s", product_type_search_tag, product_type, tb.format_exc(), ) continue partial_results = [ self.__build_product(record, product_type, **kwargs) for record in self.catalog.records.values() ] logger.info( "Found %s results querying %s", len(partial_results), product_type_search_tag, ) results.extend(partial_results) logger.info("Found %s overall results", len(results)) total_results = len(results) if count else None return results, total_results def __init_catalog( self, username: Optional[str] = None, password: Optional[str] = None ) -> None: """Initializes a catalogue by performing a GetCapabilities request on the url""" if not self.catalog: api_endpoint = self.config.api_endpoint version = getattr(self.config, "version", "2.0.2") logger.debug("Initialising CSW catalog at %s", api_endpoint) with patch_owslib_requests(verify=True): try: self.catalog = CatalogueServiceWeb( api_endpoint, version=version, username=username, password=password, ) except Exception as e: logger.warning( "Initialization of catalog failed due to error: (%s: %s)", type(e), e, ) def __build_product(self, rec: Any, product_type: str, **kwargs: Any) -> EOProduct: """Enable search results to be handled by http download plugin""" download_url = "" resource_filter = re.compile( self.config.search_definition.get("resource_location_filter", "") ) for ref in rec.references: if ref["scheme"] in SUPPORTED_REFERENCE_SCHEMES: if resource_filter.pattern and resource_filter.search(ref["url"]): download_url = ref["url"] else: download_url = ref["url"] # noqa break properties = properties_from_xml(rec.xml, self.config.metadata_mapping) if not properties["geometry"]: bbox = rec.bbox_wgs84 if not bbox: code = "EPSG:4326" if rec.bbox.crs and rec.bbox.crs.code and rec.bbox.crs.code > 0: code = ":".join((str(rec.bbox.crs.id), str(rec.bbox.crs.code))) rec_proj = pyproj.Proj(init=code) default_proj_as_pyproj = pyproj.Proj(DEFAULT_PROJ) maxx, maxy = pyproj.transform( rec_proj, default_proj_as_pyproj, rec.bbox.maxx, rec.bbox.maxy ) minx, miny = pyproj.transform( rec_proj, default_proj_as_pyproj, rec.bbox.minx, rec.bbox.miny ) bbox = (minx, miny, maxx, maxy) properties["geometry"] = geometry.box(*bbox) # Ensure the geometry property is shapely-compatible (the geometry is assumed # to be a wkt) else: properties["geometry"] = wkt.loads(properties["geometry"]) return EOProduct( product_type, self.provider, # TODO: EOProduct has no more *args in its __init__ (search_args attribute removed) # Not sure why download_url was here in the first place, needs to be updated, # possibly by having instead 'downloadLink' in the properties # download_url, properties, searched_bbox=kwargs.get("footprints"), ) def __convert_query_params( self, product_type_def: Dict[str, Any], product_type: str, params: Dict[str, Any], ) -> Union[List[OgcExpression], List[List[OgcExpression]]]: """Translates eodag search to CSW constraints using owslib constraint classes""" constraints: List[OgcExpression] = [] # How the match should be performed (fuzzy, prefix, postfix or exact). # defaults to fuzzy pt_tag, matching = ( product_type_def["name"], product_type_def.get("matching", "fuzzy"), ) if matching == "prefix": constraints.append(PropertyIsLike(pt_tag, "{}%".format(product_type))) elif matching == "postfix": constraints.append(PropertyIsLike(pt_tag, "%{}".format(product_type))) elif matching == "exact": constraints.append(PropertyIsEqualTo(pt_tag, product_type)) else: # unknown matching is considered to be equal to 'fuzzy' constraints.append(PropertyIsLike(pt_tag, "%{}%".format(product_type))) # `footprint` fp = params.get("geometry") if fp: constraints.append( BBox([fp["lonmin"], fp["latmin"], fp["lonmax"], fp["latmax"]]) ) # dates start, end = ( params.get("startTimeFromAscendingNode"), params.get("completionTimeFromAscendingNode"), ) if start and "date_tags" in self.config.search_definition: constraints.append( PropertyIsGreaterThanOrEqualTo( self.config.search_definition["date_tags"]["start"], start ) ) if end and "date_tags" in self.config.search_definition: constraints.append( PropertyIsLessThanOrEqualTo( self.config.search_definition["date_tags"]["end"], end ) ) # [[a, b]] is interpreted as a && b while [a, b] is interpreted as a || b return [constraints] if len(constraints) > 1 else constraints