Source code for bot_api.internal.event_handler

from threading import Lock
from typing import Any, Generic, TypeVar, Callable, List
import heapq
from weakref import WeakSet
from ..events import EventABC

T = TypeVar('T', bound=EventABC)


[docs] class EventHandler(Generic[T]): """Generic event handler for handling and dispatching events of type T. Events can be published to all subscribed listeners, which will be invoked in order of their priority. Subscribers with higher priority are invoked before lower priority ones. This handler provides thread-safety through synchronization and exception handling during event publication. Type Parameters: T: The type of event that this handler processes. Must implement the EventABC interface. """ _DEFAULT_PRIORITY = 1
[docs] def __init__(self): """Initialize a new EventHandler instance.""" self._lock = Lock() self._subscriber_entries: List[EventHandler.EntryWithPriority] = [] self._subscriber_set: WeakSet[Callable[[T], None]] = WeakSet()
[docs] def subscribe( self, subscriber: Callable[[T], None], priority: int = _DEFAULT_PRIORITY, ) -> None: """Subscribe a new event handler with a given priority. Args: subscriber: The subscriber callback that will handle the events priority: The priority of the subscriber; higher values indicate higher priority (0-100) Raises: ValueError: If priority is outside valid range or subscriber is already registered TypeError: If subscriber is None """ if subscriber is None: raise TypeError("Subscriber cannot be None") if priority < 0: raise TypeError("Priority must be a non-negative value") with self._lock: # Check for duplicate subscribers using a set for O(1) lookup if subscriber in self._subscriber_set: raise ValueError("Subscriber is already registered") self._subscriber_set.add(subscriber) entry = EventHandler.EntryWithPriority(subscriber, priority) # Use heapq to maintain the priority queue heapq.heappush(self._subscriber_entries, entry)
[docs] def unsubscribe(self, subscriber: Callable[[T], None]) -> bool: """Unsubscribe a subscriber from this event handler. Args: subscriber: The subscriber to be removed from subscriptions Returns: bool: True if the subscriber was found and removed, False otherwise """ if subscriber is None: return False with self._lock: # Find and remove the subscriber entry for i, entry in enumerate(self._subscriber_entries): if entry.subscriber == subscriber: # Remove the entry and rebuild the heap self._subscriber_entries.pop(i) heapq.heapify(self._subscriber_entries) # Reorder remaining entries # Remove from subscriber set self._subscriber_set.remove(subscriber) return True return False
[docs] def clear(self) -> None: """Removes all subscribers from this event handler.""" with self._lock: self._subscriber_entries.clear() self._subscriber_set.clear()
# No need to heapify an empty list
[docs] def publish(self, event_data: T) -> None: """Publishes an event, invoking all subscribed listeners in order of their priority. Args: event_data: The event data to be published to the subscribers Raises: TypeError: If event_data does not implement the EventABC interface """ # Get a copy of subscriber entries under lock to allow concurrent modifications with self._lock: # Sort entries to ensure correct priority order for invocation # This is necessary because heapq only guarantees the smallest element is at position 0 sorted_entries = sorted(self._subscriber_entries) # Process subscribers in priority order for entry in sorted_entries: # try: entry.subscriber(event_data)
# except Exception: # # Catch the exception to allow other subscribers to process # pass
[docs] def get_subscriber_count(self) -> int: """Returns the number of subscribers currently registered. Returns: int: The count of subscribers """ # Get a copy of subscriber entries under lock to allow concurrent modifications with self._lock: return len(self._subscriber_entries)
[docs] class EntryWithPriority: """Private class to store a subscriber together with its priority."""
[docs] def __init__( self, subscriber: Callable[[T], None], priority: int, ): """Constructs a new entry with the specified subscriber and priority. Args: subscriber: The subscriber callback function priority: The priority of the subscriber """ self.priority = priority self.subscriber = subscriber
def __lt__(self, other: 'EventHandler.EntryWithPriority') -> bool: # Reverse comparison for higher priority first (heapq prioritizes lower values) return other.priority < self.priority def __eq__(self, other: Any): if not isinstance(other, EventHandler.EntryWithPriority): return NotImplemented return self.subscriber == other.subscriber and self.priority == other.priority