Utils#
This section provides an overview of the utility functions and classes available in the eodag library. These utilities are designed to assist with various tasks such as logging, handling callbacks, performing free text searches, working with Jupyter notebooks, interacting with S3 storage, and processing xarray data. Each subsection below details the specific utilities and their usage.
Logging#
- eodag.utils.logging.get_logging_verbose()[source]#
Get logging verbose level
>>> from eodag import setup_logging >>> get_logging_verbose() >>> setup_logging(verbose=0) >>> get_logging_verbose() 0 >>> setup_logging(verbose=1) >>> get_logging_verbose() 1 >>> setup_logging(verbose=2) >>> get_logging_verbose() 2 >>> setup_logging(verbose=3) >>> get_logging_verbose() 3
Callbacks#
- class eodag.utils.DownloadedCallback[source]#
Example class for callback after each download in
download_all()
- eodag.utils.ProgressCallback(*args, **kwargs)[source]#
A callable used to render progress to users for long running processes.
It inherits from
tqdm.auto.tqdm, and accepts the same arguments on instantiation:iterable,desc,total,leave,file,ncols,mininterval,maxinterval,miniters,ascii,disable,unit,unit_scale,dynamic_ncols,smoothing,bar_format,initial,position,postfix,unit_divisor.It can be globally disabled using
eodag.utils.logging.setup_logging(0)oreodag.utils.logging.setup_logging(level, no_progress_bar=True), and individually disabled usingdisable=True.
Dates#
eodag.rest.dates methods that must be importable without eodag[server] installeds
- eodag.utils.dates.append_time(input_date, time=None)[source]#
Appends a string-formatted time to a date.
- Parameters:
- Return type:
- Returns:
Datetime obtained by appenting the time to the date
- Examples:
>>> from eodag.utils.dates import append_time >>> from datetime import date >>> append_time(date(2020, 12, 13)) datetime.datetime(2020, 12, 13, 0, 0) >>> append_time(date(2020, 12, 13), "") datetime.datetime(2020, 12, 13, 0, 0) >>> append_time(date(2020, 12, 13), "2400") datetime.datetime(2020, 12, 13, 0, 0) >>> append_time(date(2020, 12, 13), "14_31") datetime.datetime(2020, 12, 13, 14, 31)
- eodag.utils.dates.compute_date_range_from_params(date=None, time=None, year=None, month=None, day=None)[source]#
Compute start/end ISO UTC datetime strings from date parameters.
Handles two modes:
date + optional time: parse the date string and apply time bounds
year + optional month/day/time: compute bounds from year/month/day/time ranges
Time values are expected in HHMM format (see
time_values_to_hhmm()).Returns
(None, None)if neitherdatenoryearis provided.- Parameters:
date (
str|None, default:None) – Date string (single date, or interval with/or/to/)time (
list[str] |None, default:None) – List of normalized time strings in HHMM formatyear (
list[str] |None, default:None) – List of year stringsmonth (
list[str] |None, default:None) – List of month strings (zero-padded)day (
list[str] |None, default:None) – List of day strings (zero-padded)
- Return type:
- Returns:
Tuple of (start_datetime, end_datetime) as ISO UTC strings
- Raises:
ValidationError – If a date string cannot be parsed
- Examples:
>>> compute_date_range_from_params(date="2020-12-15") ('2020-12-15T00:00:00.000Z', '2020-12-15T00:00:00.000Z') >>> compute_date_range_from_params(date="2020-12-15", time=["0600", "1800"]) ('2020-12-15T06:00:00.000Z', '2020-12-15T18:00:00.000Z') >>> compute_date_range_from_params(year=["2020", "2021"]) ('2020-01-01T00:00:00.000Z', '2021-12-31T23:59:59.000Z') >>> compute_date_range_from_params(year=["2020"], month=["03"], day=["15"]) ('2020-03-15T00:00:00.000Z', '2020-03-15T23:59:59.000Z') >>> compute_date_range_from_params() (None, None)
- eodag.utils.dates.datetime_range(start, end)[source]#
Generator function for all dates in-between
startandenddate.- Parameters:
- Return type:
- Returns:
Generator of dates
- Examples:
>>> from datetime import datetime >>> dtr = datetime_range(datetime(2020, 12, 31), datetime(2021, 1, 2)) >>> next(dtr) datetime.datetime(2020, 12, 31, 0, 0) >>> next(dtr) datetime.datetime(2021, 1, 1, 0, 0) >>> next(dtr) datetime.datetime(2021, 1, 2, 0, 0) >>> next(dtr) Traceback (most recent call last): ... StopIteration
- eodag.utils.dates.ensure_utc(value)[source]#
Ensure a datetime is UTC-aware.
If the datetime is naive, it is assumed to be UTC. If it already has a timezone, it is converted to UTC.
- Parameters:
value (
datetime) – A datetime object- Return type:
- Returns:
A timezone-aware datetime in UTC
- Examples:
>>> from datetime import datetime, timezone >>> ensure_utc(datetime(2020, 1, 1, 12, 0)) datetime.datetime(2020, 1, 1, 12, 0, tzinfo=tzutc()) >>> ensure_utc(datetime(2021, 4, 21, 0, 0, tzinfo=timezone.utc)) datetime.datetime(2021, 4, 21, 0, 0, tzinfo=tzutc())
- eodag.utils.dates.format_date(date)[source]#
Format a
datetimewith the format ‘YYYY-MM-DD’.- Parameters:
date (
datetime) – Datetime to format- Return type:
- Returns:
Date string in the format ‘YYYY-MM-DD’
- Examples:
>>> from datetime import datetime >>> format_date(datetime(2020, 12, 2)) '2020-12-02' >>> format_date(datetime(2020, 12, 2, 11, 22, 33)) '2020-12-02'
- eodag.utils.dates.format_date_range(start, end)[source]#
Format a range with the format ‘YYYY-MM-DD/YYYY-MM-DD’.
- Parameters:
- Return type:
- Returns:
Date range in the format ‘YYYY-MM-DD/YYYY-MM-DD’
- Examples:
>>> from datetime import datetime >>> format_date_range(datetime(2020, 12, 2, 11, 22, 33), datetime(2020, 12, 31)) '2020-12-02/2020-12-31'
- eodag.utils.dates.get_date(date)[source]#
Check if the input date can be parsed as a date
- Parameters:
- Return type:
- Returns:
The datetime represented with ISO 8601 UTC format
- Raises:
ValidationError – If the date string cannot be parsed
- Examples:
>>> from eodag.utils.exceptions import ValidationError >>> get_date("2023-09-23") '2023-09-23T00:00:00.000Z' >>> get_date(None) is None True >>> get_date("invalid-date") Traceback (most recent call last): ... ValidationError
- eodag.utils.dates.get_datetime(arguments)[source]#
Get start and end dates from a dict containing / separated dates in datetime item
- Parameters:
arguments (
dict[str,Any]) – dict containing a single date or / separated dates in datetime item- Return type:
- Returns:
Start date and end date from datetime string (duplicate value if only one date as input)
- Raises:
ValidationError – If a date string cannot be parsed
- Examples:
>>> get_datetime({"datetime": "2023-03-01/2023-03-31"}) ('2023-03-01T00:00:00.000Z', '2023-03-31T00:00:00.000Z') >>> get_datetime({"datetime": "2023-03-01"}) ('2023-03-01T00:00:00.000Z', '2023-03-01T00:00:00.000Z') >>> get_datetime({"datetime": "../2023-03-31"}) (None, '2023-03-31T00:00:00.000Z') >>> get_datetime({"datetime": "2023-03-01/.."}) ('2023-03-01T00:00:00.000Z', None) >>> get_datetime({"dtstart": "2023-03-01", "dtend": "2023-03-31"}) ('2023-03-01T00:00:00.000Z', '2023-03-31T00:00:00.000Z') >>> get_datetime({}) (None, None)
- eodag.utils.dates.get_min_max(value=None)[source]#
Returns the min and max from a list of strings or the same string if a single string is given.
- Parameters:
value (
str|list[str] |None, default:None) – a single string or a list of strings- Return type:
- Returns:
a tuple with the min and max values
- Examples:
>>> get_min_max(["a", "c", "b"]) ('a', 'c') >>> get_min_max(["a"]) ('a', 'a') >>> get_min_max("a") ('a', 'a')
- eodag.utils.dates.get_timestamp(date_time)[source]#
Return the Unix timestamp of an ISO8601 date/datetime in seconds.
If the datetime has no offset, it is assumed to be an UTC datetime.
- Parameters:
date_time (
str) – The datetime string to return as timestamp- Return type:
- Returns:
The timestamp corresponding to the
date_timestring in seconds- Raises:
ValueError – If
date_timecannot be parsed as ISO8601
- Examples:
>>> get_timestamp("2023-09-23T12:34:56Z") 1695472496.0 >>> get_timestamp("2023-09-23T12:34:56+02:00") 1695465296.0 >>> get_timestamp("2023-09-23") 1695427200.0
- eodag.utils.dates.is_range_in_range(valid_range, check_range)[source]#
Check if the check_range is completely within the valid_range.
This function checks if both the start and end dates of the check_range are within the start and end dates of the valid_range.
- Parameters:
- Return type:
- Returns:
True if check_range is within valid_range, otherwise False.
- Raises:
ValueError – If date parts cannot be parsed as ISO8601
- Examples:
>>> is_range_in_range("2023-01-01/2023-12-31", "2023-03-01/2023-03-31") True >>> is_range_in_range("2023-01-01/2023-12-31", "2022-12-01/2023-03-31") False >>> is_range_in_range("2023-01-01/2023-12-31", "2023-11-01/2024-01-01") False >>> is_range_in_range("2023-01-01/to/2023-12-31", "2023-11-01/to/2024-01-01") False >>> is_range_in_range("2023-01-01/to/2023-12-31", "2023-03-01/2023-03-31") True >>> is_range_in_range("2023-01-01/2023-12-31", "invalid-range") False >>> is_range_in_range("invalid-range", "2023-03-01/2023-03-31") False
- eodag.utils.dates.parse_date(date, time=None)[source]#
Parses a date string in formats YYYY-MM-DD, YYYMMDD, solo or in start/end or start/to/end intervals.
- Parameters:
date (
str) – Single or interval date string- Return type:
- Returns:
A tuple with the start and end datetime
- Raises:
ValidationError – If a date string cannot be parsed
- Examples:
>>> parse_date("2020-12-15") (datetime.datetime(2020, 12, 15, 0, 0, tzinfo=tzutc()), datetime.datetime(2020, 12, 15, 0, 0, tzinfo=tzutc())) >>> parse_date("2020-12-15/to/20201230") (datetime.datetime(2020, 12, 15, 0, 0, tzinfo=tzutc()), datetime.datetime(2020, 12, 30, 0, 0, tzinfo=tzutc()))
- eodag.utils.dates.parse_to_utc(raw)[source]#
Parse a date string to a UTC-aware datetime.
Uses
dateutil.parser.isoparsefor ISO strings. Falls back todateutil.parser.parsefor non-ISO formats. Always returns a timezone-aware datetime in UTC.- Parameters:
raw (
str) – A date string- Return type:
- Returns:
A timezone-aware datetime in UTC
- Raises:
ValidationError – If the string cannot be parsed
- Examples:
>>> parse_to_utc("2020-01-01") datetime.datetime(2020, 1, 1, 0, 0, tzinfo=tzutc()) >>> parse_to_utc("2021-04-21T00:00:00+02:00") datetime.datetime(2021, 4, 20, 22, 0, tzinfo=tzutc()) >>> parse_to_utc("invalid") Traceback (most recent call last): ... ValidationError
- eodag.utils.dates.parse_year_month_day(year, month=None, day=None, time=None)[source]#
Returns minimum and maximum datetimes from given lists of years, months, days, times.
- Parameters:
- Return type:
- Returns:
A tuple with the start and end datetime
- Examples:
>>> parse_year_month_day(["2020", "2021", "2022"], ["01", "03", "05"], "01", ["0000", "1200"]) (datetime.datetime(2020, 1, 1, 0, 0), datetime.datetime(2022, 5, 1, 12, 0))
- eodag.utils.dates.rfc3339_str_to_datetime(s)[source]#
Convert a string conforming to RFC 3339 to a
datetime.datetime.- Parameters:
s (
str) – The string to convert todatetime.datetime- Return type:
- Returns:
The datetime represented by the ISO8601 (RFC 3339) formatted string
- Raises:
ValidationError – If the string does not conform to RFC 3339
- Examples:
>>> from eodag.utils.exceptions import ValidationError >>> rfc3339_str_to_datetime("2023-09-23T12:34:56Z") datetime.datetime(2023, 9, 23, 12, 34, 56, tzinfo=datetime.timezone.utc)
>>> rfc3339_str_to_datetime("invalid-date") Traceback (most recent call last): ... ValidationError
- eodag.utils.dates.time_values_to_hhmm(time_values)[source]#
Convert time values to 4-digit HHMM format.
Strips non-digit characters (e.g.
"12:00"->"1200","06:00"->"0600"), then right-pads with zeros to handle 2-digit hour-only values (e.g."06"->"0600"). Deduplicates while preserving order.- Parameters:
time_values (
list[str]) – List of time strings in various formats- Return type:
- Returns:
List of unique time strings in HHMM format
- Examples:
>>> time_values_to_hhmm(["12:00", "06:00"]) ['1200', '0600'] >>> time_values_to_hhmm(["12:00", "12:00"]) ['1200'] >>> time_values_to_hhmm(["06"]) ['0600']
- eodag.utils.dates.to_iso_utc_string(raw)[source]#
Convert a datetime or date string to an ISO 8601 UTC string with millisecond precision.
- Parameters:
raw (
datetime|str|None) – A datetime object or date string to convert- Return type:
- Returns:
ISO 8601 formatted UTC string (
YYYY-MM-DDTHH:MM:SS.sssZ), orNone
- Examples:
>>> from datetime import datetime >>> to_iso_utc_string(datetime(2020, 1, 1, 12, 0)) '2020-01-01T12:00:00.000Z' >>> to_iso_utc_string("2020-01-01") '2020-01-01T00:00:00.000Z' >>> to_iso_utc_string("2021-04-21T00:00:00+02:00") '2021-04-20T22:00:00.000Z' >>> to_iso_utc_string(None) is None True
- eodag.utils.dates.validate_datetime_param(value, param_name, formatters)[source]#
Validate and collect parameter values matching any of the given datetime formats.
Ensures each value can be parsed by at least one of the
formatters(datetime.strptimepatterns), and returns the sorted list of valid values.- Parameters:
- Return type:
- Returns:
Sorted list of valid values, or
NoneifvalueisNone- Raises:
ValidationError – If none of the values match any formatter
- Examples:
>>> validate_datetime_param(["2023", "2024"], "year", ["%Y"]) ['2023', '2024'] >>> validate_datetime_param("12:00", "time", ["%H:%M", "%H%M"]) ['12:00'] >>> validate_datetime_param(None, "year", ["%Y"]) is None True >>> validate_datetime_param("bad", "year", ["%Y"]) Traceback (most recent call last): ... eodag.utils.exceptions.ValidationError: Malformed parameter "year": ...
Free text search#
- eodag.utils.free_text_search.compile_free_text_query(query)[source]#
Compiles a free-text logical search query into a dictionary evaluator function.
The evaluator checks whether the concatenated string values of a dictionary (case-insensitive) satisfy the given logical expression.
- Processing steps:
Tokenize the query into words, quoted phrases, wildcards, and operators.
Convert infix tokens into postfix notation using the Shunting Yard algorithm.
Build an evaluator function that applies the expression to dictionary fields.
- Supported features:
Logical operators:
AND,OR,NOTGrouping with parentheses:
(,)Exact phrases in quotes:
"foo bar"(case-insensitive substring match)- Wildcards inside tokens:
*→ matches zero or more characters?→ matches exactly one character
Plain tokens without wildcards → matched as whole words (word boundary aware)
Case-insensitive matching across all tokens and phrases
- Parameters:
query (
str) – A logical search expression (e.g.,'("foo bar" OR baz*) AND NOT qux').- Return type:
- Returns:
A function that takes a
dict[str, str]and returnsTrueif it matches.- Example:
>>> evaluator = compile_free_text_query('("FooAndBar" OR BAR) AND "FOOBAR collection"') >>> evaluator({ ... "title": "titleFOOBAR - Lorem FOOBAR collection", ... "description": "abstract FOOBAR - This is FOOBAR. FooAndBar" ... }) True >>> evaluator({ ... "title": "collection FOOBAR", ... "description": "abstract FOOBAR - This is FOOBAR. FooAndBar" ... }) False >>> evaluator({ ... "title": "titleFOOBAR - Lorem FOOBAR ", ... "description": "abstract FOOBAR - This is FOOBAR." ... }) False >>> evaluator({"title": "Only Bar here"}) False
Wildcard example:
>>> evaluator = compile_free_text_query('foo*') >>> evaluator({"title": "this is foobar"}) True >>> evaluator({"title": "something with fooo"}) True >>> evaluator({"title": "bar only"}) False
Notebook#
S3#
- class eodag.utils.s3.S3FileInfo(size, key, bucket_name, zip_filepath=None, data_start_offset=0, data_type='application/octet-stream', rel_path=None, file_start_offset=0, futures=<factory>, buffers=<factory>, next_yield=0)[source]#
Describe a S3 object with basic f_info and its download state.
- Parameters:
-
buffers:
dict[int,bytes]# Buffers for downloaded data chunks, mapping start byte offsets to the actual data. This allows for partial downloads and efficient memory usage. The key is the start byte offset, and the value is the bytes data for that offset. This is used to yield data in the correct order during streaming. It is updated as chunks are downloaded.
-
data_type:
str= 'application/octet-stream'# MIME type of the file, defaulting to application/octet-stream. It can be updated based on the file extension or content type.
-
futures:
dict# Mapping of futures to their start byte offsets, used to track download progress. Each future corresponds to a chunk of data being downloaded. The key is the future object, and the value is the start byte offset of that chunk in the logical file stream.
-
next_yield:
int= 0# The next offset to yield in the file, used to track progress during downloading and yielding chunks. It starts at 0 and is updated as data is yielded. This allows the streaming process to continue from where it left off, ensuring that all data is eventually yielded without duplication.
- eodag.utils.s3.fetch_range(bucket_name, key_name, start, end, client_s3)[source]#
Range-fetches a S3 key.
- eodag.utils.s3.file_position_from_s3_zip(s3_bucket, object_key, s3_client, target_filepath)[source]#
Get the start position and size of a specific file inside a ZIP archive stored in S3. This function assumes the file is uncompressed (ZIP_STORED).
The returned tuple contains:
file_data_start: The byte offset where the file data starts in the ZIP archive.
file_size: The size of the file in bytes.
- Parameters:
- Return type:
- Returns:
A tuple (file_data_start, file_size)
- Raises:
FileNotFoundError – If the target file is not found in the ZIP archive.
NotImplementedError – If the file is not uncompressed (ZIP_STORED)
- eodag.utils.s3.list_files_in_s3_zipped_object(bucket_name, key_name, s3_client)[source]#
List files in s3 zipped object, without downloading it.
See https://stackoverflow.com/questions/41789176/how-to-count-files-inside-zip-in-aws-s3-without-downloading-it; Based on https://stackoverflow.com/questions/51351000/read-zip-files-from-s3-without-downloading-the-entire-file
- eodag.utils.s3.open_s3_zipped_object(bucket_name, key_name, s3_client, zip_size=None, partial=True)[source]#
Fetches the central directory and EOCD (End Of Central Directory) from an S3 object and opens a ZipFile in memory.
This function retrieves the ZIP file’s central directory and EOCD by performing range requests on the S3 object. It supports partial fetching (only the central directory and EOCD) for efficiency, or full ZIP download if needed.
- Parameters:
bucket_name (
str) – Name of the S3 bucket containing the ZIP file.key_name (
str) – Key (path) of the ZIP file in the S3 bucket.s3_client (
S3Client) – S3 client instance used to perform range requests.zip_size (
int|None, default:None) – Size of the ZIP file in bytes. If None, it will be determined via a HEAD request.partial (
bool, default:True) – If True, only fetch the central directory and EOCD. If False, fetch the entire ZIP file.
- Return type:
- Returns:
Tuple containing the opened ZipFile object and the central directory bytes.
- Raises:
InvalidDataError – If the EOCD signature is not found in the last 64KB of the file.
- eodag.utils.s3.stream_download_from_s3(s3_client, files_info, byte_range=(None, None), compress='auto', zip_filename='archive', range_size=8388608, provider_max_workers=None)[source]#
Stream data from one or more S3 objects in chunks, with support for global byte ranges.
This function provides efficient streaming download of S3 objects with support for:
Single file streaming with direct MIME type detection
Multiple file streaming as ZIP archives
Byte range requests for partial content
Files within ZIP archives (using
.zip!notation)Concurrent chunk downloading for improved performance
Memory-efficient streaming without loading entire files
The response format depends on the compress parameter and number of files:
Single file +
compress="raw"or"auto": streams file directly with detected MIME typeMultiple files +
compress="zip"or"auto": creates ZIP archive containing all filescompress="zip": always creates ZIP archive regardless of file count
For files stored within ZIP archives, use the
.zip!notation in theS3FileInfo.key:"path/to/archive.zip!internal/file.txt"- Parameters:
s3_client (
S3Client) – Boto3 S3 client instance for making requestsfiles_info (
list[S3FileInfo]) – List of S3FileInfo objects describing files to download. Each object must contain at minimum:bucket_name,key, andsize. Optional fields include:data_type,rel_path,zip_filepath.byte_range (
tuple[int|None,int|None], default:(None, None)) – Global byte range to download as(start, end)tuple.Nonevalues indicate open-ended ranges. Applied across the logical concatenation of all files.compress (
Literal['zip','raw','auto'], default:'auto') –Output format control:
"zip": Always create ZIP archive"raw": Stream files directly (single) or as multipart (multiple)"auto": ZIP for multiple files, raw for single file
zip_filename (
str, default:'archive') – Base filename for ZIP archives (without.zipextension). Only used when creating ZIP archives.range_size (
int, default:8388608) – Size of each download chunk in bytes. Larger chunks reduce request overhead but use more memory. Default: 8MB.provider_max_workers (
int|None, default:None) – (optional) Maximum number of concurrent download threads of the provider used. Higher values improve throughput for multiple ranges.
- Returns:
StreamResponse object containing:
content: Iterator of bytes for the streaming responsemedia_type: MIME type ("application/zip"for archives, detected type for single files)headers: HTTP headers including Content-Disposition for downloads
- Return type:
- Raises:
InvalidDataError – If ZIP file structures are malformed
NotAvailableError – If S3 objects cannot be accessed
AuthenticationError – If S3 credentials are invalid
NotImplementedError – If compressed files within ZIP archives are encountered
Example usage:
import boto3 from eodag.utils.s3 import stream_download_from_s3, S3FileInfo # Create S3 client s3_client = boto3.client('s3') # Single file download files = [S3FileInfo(bucket_name="bucket", key="file.txt", size=1024)] response = stream_download_from_s3(s3_client, files) # Multiple files as ZIP archive files = [ S3FileInfo(bucket_name="bucket", key="file1.txt", size=1024), S3FileInfo(bucket_name="bucket", key="file2.txt", size=2048) ] response = stream_download_from_s3(s3_client, files, compress="zip") # File within ZIP archive files = [S3FileInfo( bucket_name="bucket", key="archive.zip!internal.txt", size=512 )] response = stream_download_from_s3(s3_client, files) # Process streaming response for chunk in response.content: # Handle chunk data pass
xarray#
Warning
These functions will only be available with eodag-cube installed.
Xarray-related utilities
- eodag_cube.utils.xarray.guess_engines(file)[source]#
Guess matching
xarrayengines for fsspecfsspec.core.OpenFile
- eodag_cube.utils.xarray.try_open_dataset(file, **xarray_kwargs)[source]#
Try opening xarray dataset from fsspec OpenFile
- Parameters:
file (
OpenFile) – fsspec https OpenFilexarray_kwargs (
Any) – (optional) keyword arguments passed toxarray.open_dataset()
- Return type:
- Returns:
opened xarray dataset
Misc#
Miscellaneous utilities to be used throughout eodag.
Everything that does not fit into one of the specialised categories of utilities in this package should go here
- class eodag.utils.MockResponse(json_data=None, status_code=200, headers=None)[source]#
Fake requests response
- Parameters:
- class eodag.utils.StreamResponse(content, filename=None, size=None, headers=None, media_type=None, status_code=None, arcname=None)[source]#
Represents a streaming response
- Parameters:
- class eodag.utils.StreamResponseContent(content)[source]#
ByteIO minimal compatibility, used by boto3.upload_fileobj that’s usually expect BytesIO object, not Iterable[bytes]
- static install_signal_handlers()[source]#
Register SIGINT/SIGTERM handlers that interrupt any live stream.
This must be called explicitly from the main thread (typically during a server’s startup) because
signal.signal()only works there. It is a no-op if the handlers are already installed or if it is called outside of the main thread.- Return type:
- Returns:
Trueif the handlers were installed,Falseotherwise.
- eodag.utils.cached_parse(str_to_parse: str) JSONPath[source]#
Cached
jsonpath_ng.ext.parse()>>> cached_parse.cache_clear() >>> cached_parse("$.foo") Child(Root(), Fields('foo')) >>> cached_parse.cache_info() CacheInfo(hits=0, misses=1, maxsize=128, currsize=1) >>> cached_parse("$.foo") Child(Root(), Fields('foo')) >>> cached_parse.cache_info() CacheInfo(hits=1, misses=1, maxsize=128, currsize=1) >>> cached_parse("$.bar") Child(Root(), Fields('bar')) >>> cached_parse.cache_info() CacheInfo(hits=1, misses=2, maxsize=128, currsize=2)
- Parameters:
str_to_parse (
str) – string to parse asjsonpath_ng.JSONPath- Return type:
JSONPath- Returns:
parsed
jsonpath_ng.JSONPath
- eodag.utils.cached_yaml_load_all(config_path)[source]#
Cached
yaml.load_all()Load all configurations stored in the configuration file as separated yaml documents
- eodag.utils.cast_scalar_value(value, new_type)[source]#
Convert a scalar (not nested) value type to the given one
>>> cast_scalar_value('1', int) 1 >>> cast_scalar_value(1, str) '1' >>> cast_scalar_value('false', bool) False
- eodag.utils.deepcopy(sth)[source]#
Customized and faster deepcopy inspired by https://stackoverflow.com/a/45858907
_copy_listand_copy_dictdispatchers available for the moment
- eodag.utils.dict_items_recursive_apply(config_dict, apply_method, **apply_method_parameters)[source]#
Recursive apply method to dict elements
>>> dict_items_recursive_apply( ... {"foo": {"bar": "baz"}, "qux": ["a", "b"]}, ... lambda k, v, x: v.upper() + x, **{"x": "!"} ... ) == {'foo': {'bar': 'BAZ!'}, 'qux': ['A!', 'B!']} True
- eodag.utils.dict_items_recursive_sort(config_dict)[source]#
Recursive sort dict elements
>>> dict_items_recursive_sort( ... {"b": {"b": "c", "a": 0}, "a": ["b", {2: 0, 0: 1, 1: 2}]}, ... ) == {"a": ["b", {0: 1, 1: 2, 2: 0}], "b": {"a": 0, "b": "c"}} True
- eodag.utils.dict_md5sum(input_dict)[source]#
Hash nested dictionary
>>> hd = dict_md5sum({"b": {"c": 1, "a": 2, "b": 3}, "a": 4}) >>> hd 'a195bcef1bb3b419e9e74b7cc5db8098' >>> assert(dict_md5sum({"a": 4, "b": {"b": 3, "c": 1, "a": 2}}) == hd)
- eodag.utils.flatten_top_directories(nested_dir_root, common_subdirs_path=None)[source]#
Flatten directory structure, removing common empty sub-directories
- eodag.utils.format_dict_items(config_dict, **format_variables)[source]#
Recursively apply
str.format()to**format_variablesonconfig_dictvalues>>> format_dict_items( ... {"foo": {"bar": "{a}"}, "baz": ["{b}?", "{b}!"]}, ... **{"a": "qux", "b": "quux"}, ... ) == {"foo": {"bar": "qux"}, "baz": ["quux?", "quux!"]} True
- eodag.utils.format_pydantic_error(e)[source]#
Format Pydantic ValidationError
- Parameters:
e (PydanticValidationError) – A Pydantic ValidationError object
- Return type:
- eodag.utils.format_string(key, str_to_format, **format_variables)[source]#
Format
"{foo}"-like string>>> format_string(None, "foo {bar}, {baz} ?", **{"bar": "qux", "baz": "quux"}) 'foo qux, quux ?'
- eodag.utils.get_bucket_name_and_prefix(url, bucket_path_level=None)[source]#
Extract bucket name and prefix from URL
- eodag.utils.get_collection_dates(collection_dict)[source]#
Extract mission start and end dates from collection configuration.
Extracts dates from the extent.temporal.interval structure.
- Parameters:
collection_dict (
dict[str,Any]) – Collection configuration dictionary- Return type:
- Returns:
Tuple of (mission_start_date, mission_end_date) as ISO strings or None
Example: >>> get_collection_dates({ … “extent”: {“temporal”: {“interval”: [[“2017-10-13T00:00:00Z”, “2023-12-31T23:59:59Z”]]}} … }) (‘2017-10-13T00:00:00Z’, ‘2023-12-31T23:59:59Z’)
>>> get_collection_dates({ ... "extent": {"temporal": {"interval": [["2017-10-13T00:00:00Z", None]]}} ... }) ('2017-10-13T00:00:00Z', None)
>>> get_collection_dates({}) (None, None)
- eodag.utils.get_geometry_from_ecmwf_area(area)[source]#
Creates a
shapely.geometryfrom bounding box in area format.area format: [max_lat,min_lon,min_lat,max_lon] or “max_lat/min_lon/min_lat/max_lon”
- eodag.utils.get_geometry_from_ecmwf_feature(geom)[source]#
Creates a
shapely.geometryfrom an ECMWF Polytope feature.- Supported ECMWF feature types:
polygon: returns aPolygonboundingbox: returns aPolygonviabox()position,timeseries,verticalprofile: return aPointtrajectory: returns aLineStringcircle: no direct Shapely equivalent, returnsNoneand lets default geometry take over
- eodag.utils.get_geometry_from_ecmwf_location(location)[source]#
Creates a
shapely.geometryfrom a single location.location format: {“latitude”: float, “longitude”: float}
- eodag.utils.get_geometry_from_various(locations_config=[], **query_args)[source]#
Creates a
shapely.geometryusing given query kwargs arguments- Parameters:
- Return type:
BaseGeometry|None- Returns:
shapely Geometry found
- Raises:
shapely.errors.ShapelyError – Error while creating shapely geometry
TypeError – Unexpected geometry type
ValueError – Location name is wrong or its value does not match
- eodag.utils.get_ssl_context(ssl_verify: bool) SSLContext[source]#
Returns an SSL context based on
ssl_verifyargument.- Parameters:
ssl_verify (
bool) –ssl_verifyparameter- Return type:
- Returns:
An SSL context object.
- eodag.utils.guess_extension(type)[source]#
Guess extension from mime type, using eodag extended mimetypes definition
>>> guess_extension('image/tiff') '.tiff' >>> guess_extension('application/x-grib') '.grib'
- eodag.utils.guess_file_type(file)[source]#
Guess the mime type of a file or URL based on its extension, using eodag extended mimetypes definition
>>> guess_file_type('foo.tiff') 'image/tiff' >>> guess_file_type('foo.grib') 'application/x-grib'
- eodag.utils.items_recursive_apply(input_obj, apply_method, **apply_method_parameters)[source]#
Recursive apply method to items contained in input object (dict or list)
>>> items_recursive_apply( ... {"foo": {"bar":"baz"}, "qux": ["a","b"]}, ... lambda k,v,x: v.upper()+x, **{"x":"!"} ... ) == {'foo': {'bar': 'BAZ!'}, 'qux': ['A!', 'B!']} True >>> items_recursive_apply( ... [{"foo": {"bar":"baz"}}, "qux"], ... lambda k,v,x: v.upper()+x, ... **{"x":"!"}) [{'foo': {'bar': 'BAZ!'}}, 'QUX!'] >>> items_recursive_apply( ... "foo", ... lambda k,v,x: v.upper()+x, ... **{"x":"!"}) 'foo'
- eodag.utils.items_recursive_sort(input_obj)[source]#
Recursive sort dict items contained in input object (dict or list)
>>> items_recursive_sort( ... {"b": {"b": "c", "a": 0}, "a": ["b", {2: 0, 0: 1, 1: 2}]}, ... ) == {"a": ["b", {0: 1, 1: 2, 2: 0}], "b": {"a": 0, "b": "c"}} True >>> items_recursive_sort(["b", {2: 0, 0: 1, 1:2}]) ['b', {0: 1, 1: 2, 2: 0}] >>> items_recursive_sort("foo") 'foo'
- eodag.utils.jsonpath_parse_dict_items(jsonpath_dict, values_dict)[source]#
Recursively parse
jsonpath_ng.JSONPathelements in dict>>> import jsonpath_ng.ext as jsonpath >>> from jsonpath_ng.ext import parse >>> jsonpath_parse_dict_items( ... {"foo": {"bar": parse("$.a.b")}, "qux": [parse("$.c"), parse("$.c")]}, ... {"a":{"b":"baz"}, "c":"quux"} ... ) == {'foo': {'bar': 'baz'}, 'qux': ['quux', 'quux']} True
- eodag.utils.list_items_recursive_apply(config_list, apply_method, **apply_method_parameters)[source]#
Recursive apply method to list elements
>>> list_items_recursive_apply( ... [{"foo": {"bar": "baz"}}, "qux"], ... lambda k, v, x: v.upper() + x, ... **{"x": "!"}) [{'foo': {'bar': 'BAZ!'}}, 'QUX!']
- eodag.utils.list_items_recursive_sort(config_list)[source]#
Recursive sort dicts in list elements
>>> list_items_recursive_sort(["b", {2: 0, 0: 1, 1: 2}]) ['b', {0: 1, 1: 2, 2: 0}]
- eodag.utils.maybe_generator(obj)[source]#
Generator function that get an arbitrary object and generate values from it if the object is a generator.
- eodag.utils.md5sum(file_path)[source]#
Get file MD5 checksum
>>> import os >>> md5sum(os.devnull) 'd41d8cd98f00b204e9800998ecf8427e'
- eodag.utils.merge_mappings(mapping1, mapping2)[source]#
Merge two mappings with string keys, values from
mapping2overriding values frommapping1.Do its best to detect the key in
mapping1to override. For example:>>> mapping2 = {"ext_keya": "new"} >>> mapping1 = {"ext:keyA": "obsolete"} >>> merge_mappings(mapping1, mapping2) >>> mapping1 {'ext:keyA': 'new'}
If
mapping2has a key that cannot be detected inmapping1, this new key is added tomapping1as is.
- eodag.utils.mutate_dict_in_place(func, mapping)[source]#
Apply func to values of mapping.
The mapping object’s values are modified in-place. The function is recursive, allowing to also modify values of nested dicts that may be level-1 values of mapping.
- eodag.utils.nested_pairs2dict(pairs)[source]#
Create a dict using nested pairs
>>> nested_pairs2dict([["foo", [["bar", "baz"]]]]) {'foo': {'bar': 'baz'}}
- eodag.utils.obj_md5sum(data)[source]#
Get MD5 checksum from JSON serializable object
>>> obj_md5sum(None) '37a6259cc0c1dae299a7866489dff0bd'
- eodag.utils.parse_header(header)[source]#
Parse HTTP header
>>> parse_header( ... 'Content-Disposition: form-data; name="field2"; filename="example.txt"' ... ).get_param("filename") 'example.txt'
- eodag.utils.parse_jsonpath(key, jsonpath_obj, **values_dict)[source]#
Parse jsonpah in
jsonpath_objusingvalues_dict>>> import jsonpath_ng.ext as jsonpath >>> from jsonpath_ng.ext import parse >>> parse_jsonpath(None, parse("$.foo.bar"), **{"foo": {"bar": "baz"}}) 'baz'
- eodag.utils.parse_le_uint16(data)[source]#
Parse little-endian unsigned 2-byte integer.
>>> parse_le_uint16(b'\x01\x00') 1 >>> parse_le_uint16(b'\xff\xff') 65535
- eodag.utils.parse_le_uint32(data)[source]#
Parse little-endian unsigned 4-byte integer.
>>> parse_le_uint32(b'\x01\x00\x00\x00') 1 >>> parse_le_uint32(b'\xff\xff\xff\xff') 4294967295
- eodag.utils.remove_str_array_quotes(input_str)[source]#
Remove quotes around arrays to avoid json parsing errors
- Parameters:
input_str (
str) – string to format- Return type:
- Returns:
string without quotes surrounding array brackets
>>> remove_str_array_quotes('"a":"["a", "b"]"') '"a":["a", "b"]' >>> remove_str_array_quotes('{"a":"["a", "b"]", "b": ["c", "d"]}') '{"a":["a", "b"], "b": ["c", "d"]}'
- eodag.utils.rename_subfolder(dirpath, name)[source]#
Rename first subfolder found in
dirpathwith givenname, raiseRuntimeErrorif no subfolder can be found- Parameters:
- Raises:
- Return type:
Example:
>>> import os >>> import tempfile >>> with tempfile.TemporaryDirectory() as tmpdir: ... somefolder = os.path.join(tmpdir, "somefolder") ... otherfolder = os.path.join(tmpdir, "otherfolder") ... os.makedirs(somefolder) ... assert os.path.isdir(somefolder) and not os.path.isdir(otherfolder) ... rename_subfolder(tmpdir, "otherfolder") ... assert not os.path.isdir(somefolder) and os.path.isdir(otherfolder)
Before:
$ tree <tmp-folder> <tmp-folder> └── somefolder └── somefileAfter:
$ tree <tmp-folder> <tmp-folder> └── otherfolder └── somefile
- eodag.utils.rename_with_version(file_path, suffix='old')[source]#
Renames a file by appending and incrementing a version number if a conflict exists.
- Parameters:
- Return type:
- Returns:
new file path with the version appended or incremented
Example:
>>> import tempfile >>> from pathlib import Path >>> with tempfile.TemporaryDirectory() as tmpdir: ... file_path = (Path(tmpdir) / "foo.txt") ... file_path.touch() ... (Path(tmpdir) / "foo_old1.txt").touch() ... expected = str(Path(tmpdir) / "foo_old2.txt") ... assert expected == rename_with_version(str(file_path))
- eodag.utils.sanitize(value)[source]#
Sanitize string to be used as a name of a directory.
>>> sanitize('productName') 'productName' >>> sanitize('name with multiple spaces') 'name_with_multiple_spaces' >>> sanitize('âtre fête île alcôve bûche çà génèse où Noël ovoïde capharnaüm') 'atre_fete_ile_alcove_buche_ca_genese_ou_Noel_ovoide_capharnaum' >>> sanitize('replace,ponctuation:;signs!?byunderscorekeeping-hyphen.dot_and_underscore') # noqa 'replace_ponctuation_signs_byunderscorekeeping-hyphen.dot_and_underscore'
- eodag.utils.slugify(value, allow_unicode=False)[source]#
Copied from Django Source code, only modifying last line (no need for safe strings).
source: django/django
Convert to ASCII if
allow_unicodeisFalse. Convert spaces to hyphens. Remove characters that aren’t alphanumerics, underscores, or hyphens. Convert to lowercase. Also strip leading and trailing whitespace.
- eodag.utils.sort_dict(input_dict)[source]#
Recursively sorts a dict by keys.
- Parameters:
- Return type:
- Returns:
sorted dict
>>> sort_dict({"b": {"c": 1, "a": 2, "b": 3}, "a": 4}) {'a': 4, 'b': {'a': 2, 'b': 3, 'c': 1}}
- eodag.utils.string_to_jsonpath(*args, force=False)[source]#
Get
jsonpath_ng.JSONPathfor$.foo.barlike string>>> from jsonpath_ng.jsonpath import Child, Fields, Index, Root, Slice >>> string_to_jsonpath(None, "$.foo.bar") Child(Child(Root(), Fields('foo')), Fields('bar')) >>> string_to_jsonpath("$.foo.bar") Child(Child(Root(), Fields('foo')), Fields('bar')) >>> string_to_jsonpath("foo") 'foo' >>> string_to_jsonpath("foo", force=True) Fields('foo') >>> string_to_jsonpath('$.foo[0][*]') == Child( ... Child(Child(Root(), Fields('foo')), Index(0)), ... Slice(start=None, end=None, step=None), ... ) True
- eodag.utils.strip_accents(s)[source]#
Strip accents of a string.
>>> strip_accents('productName') 'productName' >>> strip_accents('génèse') 'genese' >>> strip_accents('preserve-punct-special-chars:;,?!§%$£œ') 'preserve-punct-special-chars:;,?!§%$£œ'
- eodag.utils.update_nested_dict(old_dict, new_dict, extend_list_values=False, allow_empty_values=False, allow_extend_duplicates=True)[source]#
Update recursively
old_dictitems withnew_dictones>>> update_nested_dict( ... {"a": {"a.a": 1, "a.b": 2}, "b": 3}, ... {"a": {"a.a": 10}} ... ) == {'a': {'a.a': 10, 'a.b': 2}, 'b': 3} True >>> update_nested_dict( ... {"a": {"a.a": [1, 2]}}, ... {"a": {"a.a": [10, 2]}}, ... extend_list_values=True, ... allow_extend_duplicates=True ... ) == {'a': {'a.a': [1, 2, 10, 2]}} True >>> update_nested_dict( ... {"a": {"a.a": [1, 2]}}, ... {"a": {"a.a": [10, 2]}}, ... extend_list_values=True, ... allow_extend_duplicates=False ... ) == {'a': {'a.a': [1, 2, 10]}} True >>> update_nested_dict( ... {"a": {"a.a": 1, "a.b": 2}, "b": 3}, ... {"a": {"a.a": None}}, ... ) == {'a': {'a.a': 1, 'a.b': 2}, 'b': 3} True >>> update_nested_dict( ... {"a": {"a.a": 1, "a.b": 2}, "b": 3}, ... {"a": {"a.a": None}}, ... allow_empty_values=True ... ) == {'a': {'a.a': None, 'a.b': 2}, 'b': 3} True
Constants: Core#
- eodag.utils.GENERIC_COLLECTION = 'GENERIC_COLLECTION'#
Used as template for unknown/custom collection usage
- eodag.utils.GENERIC_STAC_PROVIDER = 'generic_stac_provider'#
Generic provider used in
import_stac_items()if no existing provider can be used
- eodag.utils.STAC_SEARCH_PLUGINS = ['GeodesSearch', 'StacSearch', 'StacListAssets', 'StaticStacSearch']#
List of known STAC search plugins. Required to complete plugin configuration with STAC plugins specific features.
- eodag.utils.STAC_VERSION = '1.1.0'#
actual STAC version in EODAG
Constants: HTTP requests#
- eodag.utils.USER_AGENT = {'User-Agent': 'eodag/4.4.0'}#
EODAG user agent used in HTTP requests
- eodag.utils.HTTP_REQ_TIMEOUT = 5#
default timeout for HTTP requests (in seconds)
- eodag.utils.DEFAULT_SEARCH_TIMEOUT = 20#
default timeout for search requests (in seconds)
- eodag.utils.DEFAULT_STREAM_REQUESTS_TIMEOUT = 60#
default timeout for stream requests (in seconds)
- eodag.utils.REQ_RETRY_TOTAL = 3#
default count for HTTP requests retry strategy
- eodag.utils.REQ_RETRY_BACKOFF_FACTOR = 2#
default backoff factor for HTTP requests retry strategy
- eodag.utils.REQ_RETRY_STATUS_FORCELIST = [401, 429, 500, 502, 503, 504]#
default status codes for which HTTP requests retry strategy is applied
- eodag.utils.DEFAULT_DOWNLOAD_WAIT = 0.2#
default wait time (in minutes) between download attempts
- eodag.utils.DEFAULT_DOWNLOAD_TIMEOUT = 10#
default timeout (in minutes) for download attempts
- eodag.utils.DEFAULT_TOKEN_EXPIRATION_MARGIN = 60#
default token expiration margin (in seconds). Safety buffer to prevent token rejection from unexpected expiry between validity check and request. Default value of
token_expiration_margin
Constants: Pagination#
- eodag.utils.DEFAULT_PAGE = 1#
pagination default starting page number
- eodag.utils.DEFAULT_MAX_LIMIT = 50#
Default maximum number of items per page requested by
search_all(). 50 instead of 20 (DEFAULT_LIMIT) to increase it to the known and current minimum value (mundi provider)
- eodag.utils.KNOWN_NEXT_PAGE_TOKEN_KEYS = ['token', 'next', 'page', 'skip']#
known next page token keys used to guess key in STAC providers next link responses
Constants: Metadata-mapping / default values#
- eodag.utils.DEFAULT_PROJ = 'EPSG:4326'#
default projection used in metadata-mapping converters and
CSWSearch
- eodag.utils.DEFAULT_MISSION_START_DATE = '2015-01-01T00:00:00.000Z'#
default collections start date
- eodag.utils.DEFAULT_SHAPELY_GEOMETRY = <POLYGON ((180 -90, 180 90, -180 90, -180 -90, 180 -90))>#
A geometry type representing an area that is enclosed by a linear ring.
A polygon is a two-dimensional feature and has a non-zero area. It may have one or more negative-space “holes” which are also bounded by linear rings. If any rings cross each other, the feature is invalid and operations on it may fail.
Parameters#
- shellsequence
A sequence of (x, y [,z]) numeric coordinate pairs or triples, or an array-like with shape (N, 2) or (N, 3). Also can be a sequence of Point objects.
- holessequence
A sequence of objects which satisfy the same requirements as the shell parameters above
Attributes#
- exteriorLinearRing
The ring which bounds the positive space of the polygon.
- interiorssequence
A sequence of rings which bound all existing holes.
Examples#
Create a square polygon with no holes
>>> from shapely import Polygon >>> coords = ((0., 0.), (0., 1.), (1., 1.), (1., 0.), (0., 0.)) >>> polygon = Polygon(coords) >>> polygon.area 1.0
- eodag.utils.ONLINE_STATUS = 'succeeded'#
Online status value for
order:statusproperty
Constants: Metadata-mapping / JSONPath regex#
- eodag.utils.JSONPATH_MATCH = re.compile('^[\\{\\(]*\\$(\\..*)*$')#
regex to detect if a string is a JSONPath expression, used in
eodag.utils.string_to_jsonpath()
- eodag.utils.WORKABLE_JSONPATH_MATCH = re.compile('^\\$(\\.[a-zA-Z0-9-_:\\.\\[\\]\\"\\(\\)=\\?\\*]+)*$')#
regex to detect if a string is a simple/workable JSONPath expression, that can be parsed using a simpler, optimized approach. Used in
eodag.utils.string_to_jsonpath()
- eodag.utils.ARRAY_FIELD_MATCH = re.compile('^[a-zA-Z0-9-_:]+(\\[[0-9\\*]+\\])+$')#
regex to detect if a string is a JSONPath array field, used in
eodag.utils.string_to_jsonpath()