# -*- coding: utf-8 -*-
# Copyright 2026, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import datetime as dt
import logging
import math
from calendar import monthrange
from typing import Annotated, Any, Optional, cast
import requests
from pydantic.fields import FieldInfo
from pyproj import CRS, Transformer
from typing_extensions import get_args
from eodag.api.product._assets import AssetsDict
from eodag.api.product._product import EOProduct
from eodag.api.product.metadata_mapping import (
mtd_cfg_as_conversion_and_querypath,
properties_from_json,
)
from eodag.api.search_result import SearchResult
from eodag.plugins.search import PreparedSearch
from eodag.plugins.search.base import Search
from eodag.types import json_field_definition_to_python
from eodag.types.queryables import Queryables
from eodag.utils import DEFAULT_LIMIT, HTTP_REQ_TIMEOUT, USER_AGENT, deepcopy
from eodag.utils.cache import instance_cached_method
from eodag.utils.dates import parse_to_utc, to_iso_utc_string
from eodag.utils.exceptions import (
MisconfiguredError,
RequestError,
TimeOutError,
ValidationError,
)
logger = logging.getLogger("eodag.search.cop_ghsl")
def _convert_bbox_to_lonlat_mollweide(bbox: list[str]) -> list[float]:
"""
convert a bbox from Mollweide coordinate system (metres)
to WGS84 coordinate system (longitude and latitude)
"""
bbox_int = [int(x.replace(" ", "")) for x in bbox]
crs_mollweide = CRS("ESRI:54009")
crs_wgs84 = CRS("WGS84")
transformer = Transformer.from_crs(crs_from=crs_mollweide, crs_to=crs_wgs84)
x1, y1 = bbox_int[:2]
lat1, lon1 = transformer.transform(x1, y1)
if math.isinf(lat1):
# one corner is outside of the surface -> find latitude and take max longitude value
lat1, _ = transformer.transform(0, y1)
lon1 = -180 if x1 < 0 else 180
x2, y2 = bbox_int[2:]
lat2, lon2 = transformer.transform(x2, y2)
if math.isinf(lat2):
lat2, _ = transformer.transform(0, y2)
lon2 = -180 if x1 < 0 else 180
return [lon1, lat1, lon2, lat2]
def _convert_bbox_to_lonlat_EPSG3035(bbox: list[str]) -> list[float]:
"""
convert a bbox from ETRS89/LAEA Europe (EPSG:3035) coordinate system (metres)
to WGS84 coordinate system (longitude and latitude)
"""
bbox_int = [int(x.replace(",", "")) for x in bbox]
crs_3035 = CRS("3035")
crs_wgs84 = CRS("WGS84")
transformer = Transformer.from_crs(
crs_from=crs_3035, crs_to=crs_wgs84, always_xy=True
)
x1, y1 = bbox_int[:2]
lon1, lat1 = transformer.transform(x1, y1)
x2, y2 = bbox_int[2:]
lon2, lat2 = transformer.transform(x2, y2)
return [lon1, lat1, lon2, lat2]
def _get_available_values_from_constraints(
constraints: list[dict[str, Any]], filters: dict[str, Any], collection: str
) -> dict[str, list[Any]]:
"""get the available values for each parameter from the constraints"""
available_values: dict[str, list[Any]] = {}
constraint_keys = set([k for const in constraints for k in const.keys()])
if (
"tile_size" in constraint_keys
): # tiles are available -> filtering by geometry is possible
constraint_keys.add("geometry")
not_found_keys = set(filters.keys()) - constraint_keys
if "month" in not_found_keys and isinstance(filters["month"], list):
# month added from datetime but filter not available
filters.pop("month")
not_found_keys.remove("month")
if not_found_keys and not_found_keys != {"id"}:
raise ValidationError(
f"Parameters {not_found_keys} do not exist for collection {collection}; "
f"available parameters: {constraint_keys}"
)
filtered_constraints = deepcopy(constraints)
for filter_key, value in filters.items():
available_values_key = []
values_in_constraints = []
for i, constraint in enumerate(constraints):
if constraint not in filtered_constraints:
continue
if filter_key in constraint and isinstance(value, list):
value_found = False
for v in value:
if v in constraint[filter_key]:
value_found = True
values_in_constraints.append(v)
if not value_found:
filtered_constraints.remove(constraint)
continue
elif filter_key in constraint:
available_values_key.extend(constraint[filter_key])
if (
value not in constraint[filter_key]
and str(value) not in constraint[filter_key]
):
filtered_constraints.remove(constraint)
if len(filtered_constraints) == 0:
raise ValidationError(
f"Value {value} is not available for parameter {filter_key} with the given parameter values; "
f"available values: {set(available_values_key)}"
)
if not values_in_constraints and isinstance(value, list):
constraints_values = []
for const in constraints:
constraints_values.extend(const[filter_key])
raise ValidationError(
f"No values for {filter_key} available in given range; available values "
f"{sorted(set(constraints_values))}"
)
for constraint in filtered_constraints:
for key, values in constraint.items():
filter_values = values
if key in filters and isinstance(filters[key], list):
filter_values = list(set(filters[key]).intersection(set(values)))
if key in available_values:
available_values[key].extend(filter_values)
else:
available_values[key] = filter_values
return available_values
def _replace_datetimes(params: dict[str, Any]):
"""replace datetimes by year/month"""
start_date_str = params.pop("start_datetime", None)
end_date_str = params.pop("end_datetime", None)
if start_date_str and not end_date_str:
end_date_str = start_date_str
if end_date_str and not start_date_str:
start_date_str = end_date_str
if not start_date_str:
return
start_date = parse_to_utc(start_date_str)
end_date = parse_to_utc(end_date_str)
start_year = start_date.year
end_year = end_date.year
years = [str(y) for y in range(start_year, end_year + 1)]
if "year" not in params:
params["year"] = years
if start_year == end_year and "month" not in params:
# month is only used for collection where only one year is available
start_month = start_date.month
end_month = end_date.month
months = [f"{m:02}" for m in range(start_month, end_month + 1)]
params["month"] = months
[docs]
class CopGhslSearch(Search):
"""
Search plugin to fetch items from Copernicus Global Human Settlement Layer
"""
def _check_input_parameters_valid(self, collection: str, params: Any):
"""
Check if all required parameters are given and if the values are valid
raises a ValidationError if this is not the case
"""
constraints_data = self._fetch_constraints(collection)
constraints_values = constraints_data["constraints"]
# get available values - will raise error if wrong parameters or wrong parameter values in request
grouped_by = params.pop("grouped_by", None)
available_values = _get_available_values_from_constraints(
constraints_values, params, collection
)
if grouped_by and grouped_by not in params:
params[grouped_by] = available_values[grouped_by]
missing_params = set(available_values.keys()) - set(params.keys())
if missing_params:
raise ValidationError(
f"parameter(s) {missing_params} missing in the request"
)
# update lists in params with available values
for param in params:
if isinstance(params[param], list):
params[param] = sorted(available_values[param])
def _get_start_and_end_from_properties(
self, properties: dict[str, Any]
) -> dict[str, str]:
"""get the start and end time from year/month in the properties or missionStart/EndDate"""
if "month" in properties:
start_date = dt.datetime(
year=int(properties["year"]),
month=int(properties["month"]),
day=1,
hour=0,
minute=0,
second=0,
)
end_day = monthrange(int(properties["year"]), int(properties["month"]))[1]
end_date = dt.datetime(
year=int(properties["year"]),
month=int(properties["month"]),
day=end_day,
hour=23,
minute=59,
second=59,
)
elif "year" in properties:
start_date = dt.datetime(
year=int(properties["year"]), month=1, day=1, hour=0, minute=0, second=0
)
end_date = dt.datetime(
year=int(properties["year"]),
month=12,
day=31,
hour=23,
minute=59,
second=59,
)
else:
interval = self.get_collection_cfg_value("extent")["temporal"]["interval"]
start_date_str = interval[0][0]
end_date_str = interval[0][1]
return {"start_date": start_date_str, "end_date": end_date_str}
result: dict[str, str] = {}
# cast to tell the type checker that value won't be None here
result["start_date"] = cast(str, to_iso_utc_string(start_date))
result["end_date"] = cast(str, to_iso_utc_string(end_date))
return result
def _create_products_from_tiles(
self,
tiles: dict[str, list[dict[str, Any]]],
unit: str,
collection: str,
params: dict[str, Any],
additional_filter: Optional[str] = None,
need_count=False,
) -> tuple[list[EOProduct], int]:
"""
create EOProduct objects from the input parameters and the tiles containing bboxes
if the bbox is given in metres, it is transformed to longitude and latitude
"""
products = []
metadata_mapping = params.pop("metadata_mapping", {})
parsed_metadata_mapping = mtd_cfg_as_conversion_and_querypath(metadata_mapping)
filter_geometry = params.pop("geometry", None)
page = params.pop("page")
per_page = params.pop("per_page")
start_index = per_page * (page - 1)
end_index = start_index + per_page - 1
# parameters that need formatting
dataset = metadata_mapping.get("dataset", None)
if not dataset:
raise MisconfiguredError(f"dataset mapping not available for {collection}")
id_params = deepcopy(params)
id_params["proj:code"] = id_params["proj:code"].replace("EPSG:", "")
if "tile_size" in parsed_metadata_mapping:
# format tile_size
tile_size = properties_from_json(
{"tile_size": params["tile_size"]}, parsed_metadata_mapping
)["tile_size"]
params.update({"tile_size": tile_size})
# additional filter
if additional_filter:
add_filter_value = params.pop(additional_filter)
if add_filter_value == "TOTAL":
params.update({"add_filter": ""})
else:
params.update({"add_filter": add_filter_value})
if isinstance(params["year"], int) or isinstance(params["year"], str):
list_years = [str(params["year"])]
else:
list_years = params["year"]
current_index = 0
for year in list_years:
properties = deepcopy(params)
properties["order:status"] = "succeeded"
properties["start_datetime"] = to_iso_utc_string(
dt.datetime(year=int(year), month=1, day=1)
)
properties["end_datetime"] = to_iso_utc_string(
dt.datetime(
year=int(year), month=12, day=31, hour=23, minute=59, second=59
)
)
properties["year"] = year
# information for id and download path
id_params["year"] = year
id_params = {k: str(v) for k, v in id_params.items()}
product_id_base = (
collection + "__" + "_".join(v for v in id_params.values() if v)
)
params["year"] = year
params = {k.replace(":", "_"): v for k, v in params.items()}
# remove EPSG prefix for download url
params["proj_code"] = params["proj_code"].replace("EPSG:", "")
dataset = dataset.format(**params)
dataset = dataset.replace(
"__", "_"
) # in case additional filter value is empty
# create items from tiles
for tile in tiles[year]:
if not tile: # empty grid position
continue
# get geometry from tile
if unit == "lat/lon": # bbox is given as latitude/longitude
properties["geometry"] = tile["BBox"]
elif unit == "metres" and "BBox" in tile: # bbox is given in metres
bbox_lon_lat = _convert_bbox_to_lonlat_mollweide(tile["BBox"])
properties["geometry"] = bbox_lon_lat
else: # ETRS89/LAEA Europe coordinate system
bbox_lon_lat = _convert_bbox_to_lonlat_EPSG3035(tile["BBox_3035"])
properties["geometry"] = bbox_lon_lat
# create id
product_id = f"{product_id_base}__{tile['tileID']}"
properties["id"] = properties["title"] = product_id
download_link = metadata_mapping.get("eodag:download_link").format(
dataset=dataset, tile_id=tile["tileID"]
)
properties["eodag:download_link"] = download_link
product = EOProduct(
provider="cop_ghsl", properties=properties, collection=collection
)
if not filter_geometry or filter_geometry.intersects(product.geometry):
if current_index >= start_index and current_index <= end_index:
products.append(product)
elif current_index > end_index and not need_count:
break
current_index += 1
return products, current_index
def _create_products_without_tiles(
self, collection: str, prep: PreparedSearch, filter_params: dict[str, Any]
) -> tuple[list[EOProduct], Optional[int]]:
filters = deepcopy(filter_params)
default_geometry = getattr(self.config, "metadata_mapping")[
"eodag:default_geometry"
]
properties = {}
properties["geometry"] = default_geometry[1]
properties["order:status"] = "succeeded"
if "proj:code" in filters:
filters["proj:code"] = filters["proj:code"].replace("EPSG:", "")
collection_config = self.config.products.get(collection, {})
download_link = collection_config.get("metadata_mapping", {}).get(
"eodag:download_link", None
)
if not download_link:
raise MisconfiguredError(
f"Download link configuration missing for collection {collection}"
)
# collection with assets mapping
assets_mapping = filters.pop("assets_mapping", None)
products = []
per_page = getattr(prep, "limit", DEFAULT_LIMIT)
page = getattr(prep, "PAGE", 1)
start_index = per_page * (page - 1)
end_index = start_index + per_page - 1
grouped_by = filters.pop("grouped_by", None)
if grouped_by: # dataset with several files differentiated by one parameter
format_params = {k: str(v) for k, v in filters.items() if v}
format_params.pop("metadata_mapping", None)
grouped_by_values = filters[grouped_by]
if isinstance(grouped_by_values, str) or isinstance(grouped_by_values, int):
grouped_by_values = [grouped_by_values]
num_products = len(grouped_by_values)
for i, value in enumerate(grouped_by_values):
if i < start_index:
continue
filters[grouped_by] = format_params[grouped_by] = str(value)
product_id = collection + "__" + "_".join(format_params.values())
properties["id"] = properties["title"] = product_id
properties.update(format_params)
if "proj:code" in filter_params:
properties["proj:code"] = filter_params["proj:code"]
properties["eodag:download_link"] = download_link.format(
**format_params
)
datetimes = self._get_start_and_end_from_properties(format_params)
properties["start_datetime"] = datetimes["start_date"]
properties["end_datetime"] = datetimes["end_date"]
properties[grouped_by] = value
product = EOProduct(
provider="cop_ghsl", properties=properties, collection=collection
)
if assets_mapping: # item with several assets
assets = AssetsDict(product=product)
for key, mapping in assets_mapping.items():
filters = {k.replace(":", "_"): v for k, v in filters.items()}
download_link = mapping["href"].format(**filters)
assets.update(
{
key: {
"href": download_link,
"title": mapping["title"],
"type": mapping["type"],
}
}
)
product.assets = assets
products.append(product)
if i == end_index:
break
else: # collection with only one file to download
product_id = f"{collection}_ALL"
properties["id"] = properties["title"] = product_id
datetimes = self._get_start_and_end_from_properties(properties)
properties["start_datetime"] = datetimes["start_date"]
properties["end_datetime"] = datetimes["end_date"]
properties["eodag:download_link"] = download_link
product = EOProduct(
provider="cop_ghsl", properties=properties, collection=collection
)
products.append(product)
num_products = 1
if prep.count:
return products, num_products
else:
return products, None
def _get_tile_from_product_id(
self, query_params: dict[str, Any]
) -> Optional[tuple[dict[str, list[dict[str, Any]]], str]]:
"""fetch the tile for a specific product id from the provider
returns a a dict with a list of length 1 to simplify further processing
"""
product_id = query_params.pop("id")
collection = query_params["collection"]
tile_id = product_id.split("__")[-1]
filter_part = product_id.split("__")[1]
constraints_values = self._fetch_constraints(collection)["constraints"]
available_values = _get_available_values_from_constraints(
constraints_values, {}, collection
)
collection_config = deepcopy(self.config.products.get(collection, {}))
for key, values in available_values.items():
for value in values:
param_value = value
if key == "proj:code":
param_value = value.replace("EPSG:", "")
if param_value in filter_part:
query_params[key] = value
break
tiles_or_none = self._get_tiles_for_filters(collection_config, query_params)
if tiles_or_none:
tiles, unit = tiles_or_none
matching_tile = [
tile
for tile in tiles[query_params["year"]]
if tile and tile["tileID"] == tile_id
]
return {query_params["year"]: matching_tile}, unit
else:
return None
def _get_tiles_for_filters(
self, collection_config: dict[str, Any], params: dict[str, Any]
) -> Optional[tuple[dict[str, list[dict[str, Any]]], str]]:
"""fetch the tiles matching the given filters from the provider"""
logger.debug(f"get tiles for filter parameters {params}")
collection = params.pop("collection")
ssl_verify = getattr(self.config, "ssl_verify", True)
timeout = getattr(self.config, "timeout", HTTP_REQ_TIMEOUT)
# update filters with values from collection mapping
provider_product_type = collection_config.pop("_collection", None)
if not provider_product_type:
raise MisconfiguredError(
f"provider collection mapping not available for {collection}"
)
params.pop("geometry", None)
params.update(collection_config)
params.pop("metadata_mapping", None)
params.pop("assets_mapping", None)
self._check_input_parameters_valid(collection, params)
# update parameters based on changes during validation
# fetch available tiles based on filters
if "year" not in params:
logger.warning(f"no tiles available for {collection}")
return None
if isinstance(params["year"], int) or isinstance(params["year"], str):
list_years = [str(params["year"])]
else:
list_years = params["year"]
all_tiles = {}
# remove EPSG prefix for request to provider
proj_code = params["proj:code"].replace("EPSG:", "")
for year in list_years:
try:
filter_str = (
f"{provider_product_type}_{year}_"
f"{params['tile_size']}_{proj_code}"
)
except KeyError:
logger.warning(f"no tiles available for {collection}")
return None
tiles_url = self.config.api_endpoint + "/tilesDLD_" + filter_str + ".json"
try:
res = requests.get(
tiles_url, verify=ssl_verify, timeout=timeout, headers=USER_AGENT
)
if res.status_code == 404:
return None
res.raise_for_status()
tiles = res.json()["grid"]
if params["proj:code"] == "EPSG:3035":
tiles = []
for t_id, bbox in res.json()["BBoxes"].items():
tiles.append({"tileID": t_id, "BBox_3035": bbox})
all_tiles[year] = tiles
unit = res.json().get("unit", "")
except requests.exceptions.Timeout as exc:
raise TimeOutError(exc, timeout=timeout) from exc
except requests.exceptions.RequestException as exc:
raise RequestError.from_error(
exc, f"Unable to fetch {tiles_url}"
) from exc
return all_tiles, unit
def query(
self,
prep: PreparedSearch = PreparedSearch(),
**kwargs: Any,
) -> SearchResult:
"""
Implementation of search for the Copernicus GHSL provider
:param prep: object containing search parameters
:param kwargs: additional search arguments
:returns: list of products and total number of products
"""
page = int(getattr(prep, "next_page_token") or "1")
limit = getattr(prep, "limit") or DEFAULT_LIMIT
number_matched = kwargs.pop("number_matched", None)
# get year/month from start/end time if not given separately
_replace_datetimes(kwargs)
# search params for SearchResult
search_params = deepcopy(kwargs)
search_params["limit"] = limit
collection = kwargs.get("collection", None)
if not collection:
collection = kwargs["collection"] = prep.collection
if not isinstance(collection, str):
raise MisconfiguredError("invalid collection %s", collection)
collection_config = deepcopy(self.config.products.get(collection, {}))
if "id" in kwargs and "ALL" not in kwargs["id"]:
tiles_or_none = self._get_tile_from_product_id(kwargs)
else:
tiles_or_none = self._get_tiles_for_filters(collection_config, kwargs)
if tiles_or_none:
tiles, unit = tiles_or_none
else:
kwargs.update(collection_config)
products, count = self._create_products_without_tiles(
collection, prep, kwargs
)
return SearchResult(
products=products,
search_params=search_params,
number_matched=count,
next_page_token_key="page",
next_page_token=str(page + 1),
raise_errors=True,
)
# create products from tiles
kwargs.update(collection_config)
kwargs["page"] = page
kwargs["per_page"] = limit
constraints_filters = self._fetch_constraints(collection)
additional_filter = constraints_filters.get("additional_filter")
need_count = prep.count and not number_matched
products, count = self._create_products_from_tiles(
tiles,
unit,
collection,
kwargs,
additional_filter=additional_filter,
need_count=need_count,
)
if prep.count:
if number_matched:
total_items = number_matched
else:
total_items = count
else:
total_items = None
return SearchResult(
products=products,
search_params=search_params,
number_matched=total_items,
next_page_token_key="page",
next_page_token=str(page + 1),
raise_errors=True,
)
@instance_cached_method()
def _fetch_constraints(self, collection: str) -> dict[str, Any]:
logger.debug(f"fetching constraints for {collection}")
constraints_url = self.config.discover_queryables["constraints_url"].format(
collection=collection
)
timeout = getattr(self.config, "timeout", HTTP_REQ_TIMEOUT)
try:
res = requests.get(constraints_url, timeout=timeout, headers=USER_AGENT)
if res.status_code == 404:
logger.warning(f"no constraints found for {collection}")
return {"constraints": {}}
res.raise_for_status()
return res.json()
except requests.exceptions.Timeout as exc:
raise TimeOutError(exc, timeout=timeout) from exc
except requests.exceptions.RequestException as exc:
raise RequestError.from_error(
exc, f"Unable to fetch constraints from {constraints_url}"
) from exc
def discover_queryables(
self, **kwargs
) -> Optional[dict[str, Annotated[Any, FieldInfo]]]:
"""Create queryables list based on constraints
:param kwargs: additional filters for queryables (`collection` and other search
arguments)
:returns: queryable parameters dict
"""
collection = kwargs.pop("collection")
kwargs.pop("_collection")
grouped_by = kwargs.pop("grouped_by", None)
_replace_datetimes(kwargs)
constraints_values = self._fetch_constraints(collection)["constraints"]
available_values = _get_available_values_from_constraints(
constraints_values, kwargs, collection
)
queryables = {}
for name, values in available_values.items():
required = True
if name == grouped_by:
required = False
queryables[name] = Annotated[
get_args(
json_field_definition_to_python(
{"type": "string", "title": name, "enum": values},
default_value=kwargs.get(name, None),
required=required,
)
)
]
# add datetimes queryables if year filter is available
if "year" in available_values:
queryables.update(
{
"start": Queryables.get_with_default(
"start", kwargs.get("start_datetime")
),
"end": Queryables.get_with_default(
"end",
kwargs.get("end_datetime"),
),
}
)
# add geometry queryable if there are tiles
if "tile_size" in available_values and not grouped_by:
queryables.update(
{
"geom": Queryables.get_with_default("geom", None),
}
)
return queryables