191 lines
5.7 KiB
Python
191 lines
5.7 KiB
Python
"""Tests for event system."""
|
|
|
|
import pytest
|
|
from fastapi import APIRouter
|
|
|
|
from fastapi_route_loader.events import (
|
|
EventDispatcher,
|
|
RouterEvent,
|
|
RouterEventType,
|
|
RouterLoadedEvent,
|
|
RouterUnloadedEvent,
|
|
RouterUpdatedEvent,
|
|
)
|
|
|
|
|
|
class TestRouterEvents:
|
|
"""Test router event classes."""
|
|
|
|
def test_router_loaded_event(self):
|
|
router = APIRouter()
|
|
event = RouterLoadedEvent("test_router", router, {"key": "value"})
|
|
|
|
assert event.event_type == RouterEventType.LOADED
|
|
assert event.router_name == "test_router"
|
|
assert event.router is router
|
|
assert event.metadata == {"key": "value"}
|
|
|
|
def test_router_loaded_event_no_metadata(self):
|
|
router = APIRouter()
|
|
event = RouterLoadedEvent("test_router", router)
|
|
|
|
assert event.event_type == RouterEventType.LOADED
|
|
assert event.metadata == {}
|
|
|
|
def test_router_unloaded_event(self):
|
|
router = APIRouter()
|
|
event = RouterUnloadedEvent("test_router", router, {"reason": "cleanup"})
|
|
|
|
assert event.event_type == RouterEventType.UNLOADED
|
|
assert event.router_name == "test_router"
|
|
assert event.router is router
|
|
assert event.metadata == {"reason": "cleanup"}
|
|
|
|
def test_router_updated_event(self):
|
|
old_router = APIRouter()
|
|
new_router = APIRouter()
|
|
event = RouterUpdatedEvent(
|
|
"test_router", new_router, old_router, {"version": "2.0"}
|
|
)
|
|
|
|
assert event.event_type == RouterEventType.UPDATED
|
|
assert event.router_name == "test_router"
|
|
assert event.router is new_router
|
|
assert event.old_router is old_router
|
|
assert event.metadata == {"version": "2.0"}
|
|
|
|
def test_event_immutability(self):
|
|
router = APIRouter()
|
|
event = RouterLoadedEvent("test_router", router)
|
|
|
|
with pytest.raises(AttributeError):
|
|
event.router_name = "new_name"
|
|
|
|
|
|
class TestEventDispatcher:
|
|
"""Test event dispatcher."""
|
|
|
|
def test_subscribe_and_dispatch(self):
|
|
dispatcher = EventDispatcher()
|
|
router = APIRouter()
|
|
events_received = []
|
|
|
|
def handler(event: RouterEvent) -> None:
|
|
events_received.append(event)
|
|
|
|
dispatcher.subscribe(RouterEventType.LOADED, handler)
|
|
event = RouterLoadedEvent("test_router", router)
|
|
dispatcher.dispatch(event)
|
|
|
|
assert len(events_received) == 1
|
|
assert events_received[0] is event
|
|
|
|
def test_subscribe_to_all_events(self):
|
|
dispatcher = EventDispatcher()
|
|
router = APIRouter()
|
|
events_received = []
|
|
|
|
def handler(event: RouterEvent) -> None:
|
|
events_received.append(event)
|
|
|
|
dispatcher.subscribe(None, handler)
|
|
|
|
loaded_event = RouterLoadedEvent("test_router", router)
|
|
dispatcher.dispatch(loaded_event)
|
|
|
|
unloaded_event = RouterUnloadedEvent("test_router", router)
|
|
dispatcher.dispatch(unloaded_event)
|
|
|
|
assert len(events_received) == 2
|
|
assert events_received[0] is loaded_event
|
|
assert events_received[1] is unloaded_event
|
|
|
|
def test_multiple_handlers(self):
|
|
dispatcher = EventDispatcher()
|
|
router = APIRouter()
|
|
handler1_calls = []
|
|
handler2_calls = []
|
|
|
|
def handler1(event: RouterEvent) -> None:
|
|
handler1_calls.append(event)
|
|
|
|
def handler2(event: RouterEvent) -> None:
|
|
handler2_calls.append(event)
|
|
|
|
dispatcher.subscribe(RouterEventType.LOADED, handler1)
|
|
dispatcher.subscribe(RouterEventType.LOADED, handler2)
|
|
|
|
event = RouterLoadedEvent("test_router", router)
|
|
dispatcher.dispatch(event)
|
|
|
|
assert len(handler1_calls) == 1
|
|
assert len(handler2_calls) == 1
|
|
|
|
def test_unsubscribe(self):
|
|
dispatcher = EventDispatcher()
|
|
router = APIRouter()
|
|
events_received = []
|
|
|
|
def handler(event: RouterEvent) -> None:
|
|
events_received.append(event)
|
|
|
|
dispatcher.subscribe(RouterEventType.LOADED, handler)
|
|
dispatcher.unsubscribe(RouterEventType.LOADED, handler)
|
|
|
|
event = RouterLoadedEvent("test_router", router)
|
|
dispatcher.dispatch(event)
|
|
|
|
assert len(events_received) == 0
|
|
|
|
def test_unsubscribe_global_handler(self):
|
|
dispatcher = EventDispatcher()
|
|
router = APIRouter()
|
|
events_received = []
|
|
|
|
def handler(event: RouterEvent) -> None:
|
|
events_received.append(event)
|
|
|
|
dispatcher.subscribe(None, handler)
|
|
dispatcher.unsubscribe(None, handler)
|
|
|
|
event = RouterLoadedEvent("test_router", router)
|
|
dispatcher.dispatch(event)
|
|
|
|
assert len(events_received) == 0
|
|
|
|
def test_clear_handlers(self):
|
|
dispatcher = EventDispatcher()
|
|
router = APIRouter()
|
|
events_received = []
|
|
|
|
def handler(event: RouterEvent) -> None:
|
|
events_received.append(event)
|
|
|
|
dispatcher.subscribe(RouterEventType.LOADED, handler)
|
|
dispatcher.subscribe(None, handler)
|
|
dispatcher.clear()
|
|
|
|
event = RouterLoadedEvent("test_router", router)
|
|
dispatcher.dispatch(event)
|
|
|
|
assert len(events_received) == 0
|
|
|
|
def test_handler_exception_does_not_stop_other_handlers(self):
|
|
dispatcher = EventDispatcher()
|
|
router = APIRouter()
|
|
handler2_calls = []
|
|
|
|
def handler1(event: RouterEvent) -> None:
|
|
raise ValueError("Handler error")
|
|
|
|
def handler2(event: RouterEvent) -> None:
|
|
handler2_calls.append(event)
|
|
|
|
dispatcher.subscribe(RouterEventType.LOADED, handler1)
|
|
dispatcher.subscribe(RouterEventType.LOADED, handler2)
|
|
|
|
event = RouterLoadedEvent("test_router", router)
|
|
|
|
with pytest.raises(ValueError, match="Handler error"):
|
|
dispatcher.dispatch(event)
|