Source code for eodag.utils.streamresponse

import os
import signal
import threading
from dataclasses import dataclass, field
from typing import Iterable, Iterator, Mapping, Optional, Union


[docs] class StreamResponseContent(Iterable[bytes]): """ ByteIO minimal compatibility, used by boto3.upload_fileobj that's usually expect BytesIO object, not Iterable[bytes] """ __initialized: bool = False __instances: list["StreamResponseContent"] = []
[docs] @staticmethod def install_signal_handlers() -> bool: """Register SIGINT/SIGTERM handlers that interrupt any live stream. This must be called explicitly from the main thread (typically during a server's startup) because :func:`signal.signal` only works there. It is a no-op if the handlers are already installed or if it is called outside of the main thread. :returns: ``True`` if the handlers were installed, ``False`` otherwise. """ if StreamResponseContent.__initialized: return False if threading.current_thread() is not threading.main_thread(): return False StreamResponseContent.__initialized = True # Catch end of main process to internal status def signal_handler(sig, frame): for stream in StreamResponseContent.__instances: stream.interrupt() StreamResponseContent.__instances = [] signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) return True
def __init__(self, content: Union[Iterable[bytes], bytes]): self.buffer: bytes = b"" self.interrupted: bool = False StreamResponseContent.__instances.append(self) if isinstance(content, bytes): content = [content] self.iterator: Iterator[bytes] = iter(content) def __iter__(self) -> Iterator[bytes]: return self.iterator def interrupt(self): if not self.interrupted: self.interrupted = True def read(self, size=-1) -> bytes: # Fill the buffer with enough bytes to fulfil the request. while not self.interrupted and (size < 0 or len(self.buffer) < size): try: chunk = next(self.iterator) self.buffer = self.buffer + bytes(chunk) except StopIteration: break if self.interrupted: raise InterruptedError() result: bytes = b"" if size < 0: result, self.buffer = self.buffer, b"" else: result, self.buffer = self.buffer[:size], self.buffer[size:] return bytes(result)
[docs] @dataclass class StreamResponse: """Represents a streaming response""" content: StreamResponseContent _filename: Optional[str] = field(default=None, repr=False, init=False) _size: Optional[int] = field(default=None, repr=False, init=False) headers: dict[str, str] = field(default_factory=dict) media_type: Optional[str] = None status_code: Optional[int] = None arcname: Optional[str] = None def __init__( self, content: Union[Iterable[bytes], bytes], filename: Optional[str] = None, size: Optional[int] = None, headers: Optional[Mapping[str, str]] = None, media_type: Optional[str] = None, status_code: Optional[int] = None, arcname: Optional[str] = None, ): self.content = StreamResponseContent(content) self.headers = dict(headers) if headers else {} self.media_type = media_type self.status_code = status_code self.arcname = arcname # use property setters to update headers self.filename = filename self.size = size # filename handling @property def filename(self) -> Optional[str]: """Get the filename for the streaming response. :returns: The filename, or None if not set """ return self._filename @filename.setter def filename(self, value: Optional[str]) -> None: """Set the filename and update the Content-Disposition header accordingly. :param value: The filename to set, or None to clear it """ self._filename = value if value: outputs_filename = os.path.basename(value) self.headers[ "Content-Disposition" ] = f'attachment; filename="{outputs_filename}"' elif "Content-Disposition" in self.headers: del self.headers["Content-Disposition"] # size handling @property def size(self) -> Optional[int]: """Get the content size for the streaming response. :returns: The content size in bytes, or None if not set """ return self._size @size.setter def size(self, value: Optional[int]) -> None: """Set the content size and update the Content-Length header accordingly. :param value: The content size in bytes, or None to clear it """ self._size = value if value is not None: self.headers["Content-Length"] = str(value) elif "Content-Length" in self.headers: del self.headers["Content-Length"]
__all__ = ["StreamResponse"]