Source code for maverick.eventbus
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Callable, Optional
import uuid
import logging
from pydantic import BaseModel, ConfigDict
from .protocol import EventHandler
from .enums import GameEventType
if TYPE_CHECKING: # pragma: no cover
from .game import Game
from .events import GameEvent
__all__ = ["EventBus", "Subscription"]
log = logging.getLogger("maverick.events")
class Subscription(BaseModel):
"""Immutable subscription record."""
token: str
event_type: GameEventType
handler: EventHandler
priority: int = 0
once: bool = False
mask: Optional[Callable[[Any], bool]] = None
model_config = ConfigDict(
frozen=True, # Makes the model immutable
extra="forbid", # Prevents accidental fields
arbitrary_types_allowed=True,
)
[docs]
class EventBus:
"""Simple event bus for managing event subscriptions and emissions.
Parameters
----------
strict : bool
If True, exceptions in event handlers will propagate. Otherwise, they will be logged.
"""
[docs]
def __init__(self, *, strict: bool = False):
self._subs: list[Subscription] = []
self._strict = strict
[docs]
def subscribe(
self,
event_type: GameEventType,
handler: EventHandler,
*,
priority: int = 0,
once: bool = False,
mask: Optional[Callable[[Any], bool]] = None,
) -> str:
"""Subscribe to an event type with a handler.
Parameters
----------
event_type : str
The type of event to subscribe to.
handler : EventHandler
The function to call when the event is emitted.
priority : int
Priority of the handler; higher priority handlers are called first.
once : bool
If True, the handler will be removed after the first call.
mask : Optional[Callable[[Any], bool]]
Optional filter function to determine if the handler should be called.
"""
token = uuid.uuid4().hex
self._subs.append(
Subscription(
token=token,
event_type=event_type,
handler=handler,
priority=priority,
once=once,
mask=mask,
)
)
# higher priority first
self._subs.sort(key=lambda s: s.priority, reverse=True)
return token
[docs]
def unsubscribe(self, token: str) -> None:
"""Unsubscribe a handler using its token."""
self._subs = [s for s in self._subs if s.token != token]
[docs]
def emit(self, event: "GameEvent", game: "Game") -> None:
"""Emit an event to all subscribed handlers."""
# iterate over a snapshot so handlers can subscribe/unsubscribe safely
subs = list(self._subs)
to_remove: list[str] = []
for s in subs:
if getattr(event, "type", None) != s.event_type:
continue
if s.mask is not None and not s.mask(event):
continue
try:
s.handler(event, game)
except Exception:
if self._strict:
raise
log.exception("Event handler failed: %s", s, exc_info=True)
if s.once:
to_remove.append(s.token)
for token in to_remove:
self.unsubscribe(token)