Source code for paperap.signals

"""



----------------------------------------------------------------------------

METADATA:

File:    signals.py
        Project: paperap
Created: 2025-03-09
        Version: 0.0.9
Author:  Jess Mann
Email:   jess@jmann.me
        Copyright (c) 2025 Jess Mann

----------------------------------------------------------------------------

LAST MODIFIED:

2025-03-09     By Jess Mann

"""

from __future__ import annotations

import logging
from collections import defaultdict
from typing import (
    Any,
    Callable,
    Generic,
    Literal,
    Self,
    TypeAlias,
    TypedDict,
    TypeVar,
    final,
    overload,
)

logger = logging.getLogger(__name__)


[docs] class QueueType(TypedDict): """ A type used by SignalRegistry for storing queued signal actions. """ connect: dict[str, set[tuple[Callable[..., Any], int]]] disconnect: dict[str, set[Callable[..., Any]]] disable: dict[str, set[Callable[..., Any]]] enable: dict[str, set[Callable[..., Any]]]
ActionType = Literal["connect", "disconnect", "disable", "enable"]
[docs] @final class SignalPriority: """ Priority levels for signal handlers. Any int can be provided, but these are the recommended values. """ FIRST = 0 HIGH = 25 NORMAL = 50 LOW = 75 LAST = 100
[docs] class SignalParams(TypedDict): """ A type used by SignalRegistry for storing signal parameters. """ name: str description: str
[docs] class Signal[_ReturnType]: """ A signal that can be connected to and emitted. Handlers can be registered with a priority to control execution order. Each handler receives the output of the previous handler as its first argument, enabling a filter/transformation chain. """ name: str description: str _handlers: dict[int, list[Callable[..., _ReturnType]]] _disabled_handlers: set[Callable[..., _ReturnType]]
[docs] def __init__(self, name: str, description: str = "") -> None: self.name = name self.description = description self._handlers = defaultdict(list) self._disabled_handlers = set() super().__init__()
[docs] def connect(self, handler: Callable[..., _ReturnType], priority: int = SignalPriority.NORMAL) -> None: """ Connect a handler to this signal. Args: handler: The handler function to be called when the signal is emitted. priority: The priority level for this handler (lower numbers execute first). """ self._handlers[priority].append(handler) # Check if the handler was temporarily disabled in the registry if SignalRegistry.get_instance().is_queued("disable", self.name, handler): self._disabled_handlers.add(handler)
[docs] def disconnect(self, handler: Callable[..., _ReturnType]) -> None: """ Disconnect a handler from this signal. Args: handler: The handler to disconnect. """ for priority in self._handlers: if handler in self._handlers[priority]: self._handlers[priority].remove(handler)
@overload def emit(self, value: _ReturnType | None, *args: Any, **kwargs: Any) -> _ReturnType | None: ... @overload def emit(self, **kwargs: Any) -> _ReturnType | None: ...
[docs] def emit(self, *args: Any, **kwargs: Any) -> _ReturnType | None: """ Emit the signal, calling all connected handlers in priority order. Each handler receives the output of the previous handler as its first argument. Other arguments are passed unchanged. Args: *args: Positional arguments to pass to handlers. **kwargs: Keyword arguments to pass to handlers. Returns: The final result after all handlers have processed the data. """ current_value: _ReturnType | None = None remaining_args = args if args: # Start with the first argument as the initial value current_value = args[0] remaining_args = args[1:] # Get all priorities in ascending order (lower numbers execute first) priorities = sorted(self._handlers.keys()) # Process handlers in priority order for priority in priorities: for handler in self._handlers[priority]: if handler not in self._disabled_handlers: # Pass the current value as the first argument, along with any other args current_value = handler(current_value, *remaining_args, **kwargs) return current_value
[docs] def disable(self, handler: Callable[..., _ReturnType]) -> None: """ Temporarily disable a handler without disconnecting it. Args: handler: The handler to disable. """ self._disabled_handlers.add(handler)
[docs] def enable(self, handler: Callable[..., _ReturnType]) -> None: """ Re-enable a temporarily disabled handler. Args: handler: The handler to enable. """ if handler in self._disabled_handlers: self._disabled_handlers.remove(handler)
[docs] class SignalRegistry: """ Registry of all signals in the application. Signals can be created, connected to, and emitted through the registry. Examples: >>> SignalRegistry.emit( ... "document.save:success", ... "Fired when a document has been saved successfully", ... kwargs = {"document": document} ... ) >>> filtered_data = SignalRegistry.emit( ... "document.save:before", ... "Fired before a document is saved. Optionally filters the data that will be saved.", ... args = (data,), ... kwargs = {"document": document} ... ) >>> SignalRegistry.connect("document.save:success", my_handler) """ _instance: Self _signals: dict[str, Signal[Any]] _queue: QueueType
[docs] def __init__(self) -> None: self._signals = {} self._queue = { "connect": {}, # {signal_name: {(handler, priority), ...}} "disconnect": {}, # {signal_name: {handler, ...}} "disable": {}, # {signal_name: {handler, ...}} "enable": {}, # {signal_name: {handler, ...}} } super().__init__()
[docs] def __new__(cls) -> Self: """ Ensure that only one instance of the class is created. Returns: The singleton instance of this class. """ if not hasattr(cls, "_instance"): cls._instance = super().__new__(cls) return cls._instance
[docs] @classmethod def get_instance(cls) -> Self: """ Get the singleton instance of this class. Returns: The singleton instance of this class. """ if not hasattr(cls, "_instance"): cls._instance = cls() return cls._instance # type: ignore # mypy issue with Self return type
[docs] def register(self, signal: Signal[Any]) -> None: """ Register a signal and process queued actions. Args: signal: The signal to register. """ self._signals[signal.name] = signal # Process queued connections for handler, priority in self._queue["connect"].pop(signal.name, set()): signal.connect(handler, priority) # Process queued disconnections for handler in self._queue["disconnect"].pop(signal.name, set()): signal.disconnect(handler) # Process queued disables for handler in self._queue["disable"].pop(signal.name, set()): signal.disable(handler) # Process queued enables for handler in self._queue["enable"].pop(signal.name, set()): signal.enable(handler)
[docs] def queue_action(self, action: ActionType, name: str, handler: Callable[..., Any], priority: int | None = None) -> None: """ Queue any signal-related action to be processed when the signal is registered. Args: action: The action to queue (connect, disconnect, disable, enable). name: The signal name. handler: The handler function to queue. priority: The priority level for this handler (only for connect action). Raises: ValueError: If the action is invalid. """ if action not in self._queue: raise ValueError(f"Invalid queue action: {action}") if action == "connect": # If it's in the disconnect queue, remove it priority = priority if priority is not None else SignalPriority.NORMAL self._queue[action].setdefault(name, set()).add((handler, priority)) else: # For non-connect actions, just add the handler without priority self._queue[action].setdefault(name, set()).add(handler)
[docs] def get(self, name: str) -> Signal[Any] | None: """ Get a signal by name. Args: name: The signal name. Returns: The signal instance, or None if not found. """ return self._signals.get(name)
[docs] def list_signals(self) -> list[str]: """ List all registered signal names. Returns: A list of signal names. """ return list(self._signals.keys())
[docs] def create[R](self, name: str, description: str = "", return_type: type[R] | None = None) -> Signal[R]: """ Create and register a new signal. Args: name: Signal name description: Optional description for new signals return_type: Optional return type for new signals Returns: The new signal instance. """ signal = Signal[R](name, description) self.register(signal) return signal
@overload def emit[_ReturnType]( self, name: str, description: str = "", *, return_type: type[_ReturnType], args: _ReturnType | None = None, kwargs: dict[str, Any] | None = None, ) -> _ReturnType: ... @overload def emit[_ReturnType]( self, name: str, description: str = "", *, return_type: None = None, args: _ReturnType, kwargs: dict[str, Any] | None = None, ) -> _ReturnType: ... @overload def emit( self, name: str, description: str = "", *, return_type: None = None, args: None = None, kwargs: dict[str, Any] | None = None, ) -> None: ...
[docs] def emit[_ReturnType]( self, name: str, description: str = "", *, return_type: type[_ReturnType] | None = None, args: _ReturnType | None = None, kwargs: dict[str, Any] | None = None, ) -> _ReturnType | None: """ Emit a signal, calling handlers in priority order. Each handler transforms the first argument and passes it to the next handler. Args: name: Signal name description: Optional description for new signals return_type: Optional return type for new signals args: List of positional arguments (first one is transformed through the chain) kwargs: Keyword arguments passed to all handlers Returns: The transformed first argument after all handlers have processed it """ if not (signal := self.get(name)): signal = self.create(name, description, return_type) arg_tuple = (args,) kwargs = kwargs or {} return signal.emit(*arg_tuple, **kwargs)
[docs] def connect(self, name: str, handler: Callable[..., Any], priority: int = SignalPriority.NORMAL) -> None: """ Connect a handler to a signal, or queue it if the signal is not yet registered. Args: name: The signal name. handler: The handler function to connect. priority: The priority level for this handler (lower numbers execute first """ if signal := self.get(name): signal.connect(handler, priority) else: self.queue_action("connect", name, handler, priority)
[docs] def disconnect(self, name: str, handler: Callable[..., Any]) -> None: """ Disconnect a handler from a signal, or queue it if the signal is not yet registered. Args: name: The signal name. handler: The handler function to disconnect. """ if signal := self.get(name): signal.disconnect(handler) else: self.queue_action("disconnect", name, handler)
[docs] def disable(self, name: str, handler: Callable[..., Any]) -> None: """ Temporarily disable a handler for a signal, or queue it if the signal is not yet registered. Args: name: The signal name. handler: The handler function to disable """ if signal := self.get(name): signal.disable(handler) else: self.queue_action("disable", name, handler)
[docs] def enable(self, name: str, handler: Callable[..., Any]) -> None: """ Enable a previously disabled handler, or queue it if the signal is not yet registered. Args: name: The signal name. handler: The handler function to enable. """ if signal := self.get(name): signal.enable(handler) else: self.queue_action("enable", name, handler)
[docs] def is_queued(self, action: ActionType, name: str, handler: Callable[..., Any]) -> bool: """ Check if a handler is queued for a signal action. Args: action: The action to check (connect, disconnect, disable, enable). name: The signal name. handler: The handler function to check. Returns: True if the handler is queued, False otherwise. """ for queued_handler in self._queue[action].get(name, set()): # Handle "connect" case where queued_handler is a tuple (handler, priority) if isinstance(queued_handler, tuple): if queued_handler[0] == handler: return True elif queued_handler == handler: return True return False
registry = SignalRegistry.get_instance()