From 59e6dbab4f4139b98ce8250399389c40ac12d5e5 Mon Sep 17 00:00:00 2001 From: GangGreenTemperTatum <104169244+GangGreenTemperTatum@users.noreply.github.com> Date: Tue, 20 Jan 2026 14:36:18 -0500 Subject: [PATCH 1/4] feat: native notification hook with three backend options --- dreadnode/agent/hooks/__init__.py | 12 ++ dreadnode/agent/hooks/notification.py | 106 +++++++++++++++++ pyproject.toml | 6 + tests/test_notification_hook.py | 161 ++++++++++++++++++++++++++ 4 files changed, 285 insertions(+) create mode 100644 dreadnode/agent/hooks/notification.py create mode 100644 tests/test_notification_hook.py diff --git a/dreadnode/agent/hooks/__init__.py b/dreadnode/agent/hooks/__init__.py index 227fdadb..24e5e9e5 100644 --- a/dreadnode/agent/hooks/__init__.py +++ b/dreadnode/agent/hooks/__init__.py @@ -4,12 +4,24 @@ retry_with_feedback, ) from dreadnode.agent.hooks.metrics import tool_metrics +from dreadnode.agent.hooks.notification import ( + LogNotificationBackend, + NotificationBackend, + TerminalNotificationBackend, + WebhookNotificationBackend, + notify, +) from dreadnode.agent.hooks.summarize import summarize_when_long __all__ = [ "Hook", + "LogNotificationBackend", + "NotificationBackend", + "TerminalNotificationBackend", + "WebhookNotificationBackend", "backoff_on_error", "backoff_on_ratelimit", + "notify", "retry_with_feedback", "summarize_when_long", "tool_metrics", diff --git a/dreadnode/agent/hooks/notification.py b/dreadnode/agent/hooks/notification.py new file mode 100644 index 00000000..26989fb7 --- /dev/null +++ b/dreadnode/agent/hooks/notification.py @@ -0,0 +1,106 @@ +import typing as t +from abc import ABC, abstractmethod + +from loguru import logger + +if t.TYPE_CHECKING: + from dreadnode.agent.events import AgentEvent + from dreadnode.agent.reactions import Reaction + + +class NotificationBackend(ABC): + @abstractmethod + async def send(self, event: "AgentEvent", message: str) -> None: + """Send a notification for the given event.""" + + +class LogNotificationBackend(NotificationBackend): + async def send(self, event: "AgentEvent", message: str) -> None: + logger.info(f"[{event.agent.name}] {message}") + + +class TerminalNotificationBackend(NotificationBackend): + async def send(self, event: "AgentEvent", message: str) -> None: + import sys + + print(f"[{event.agent.name}] {message}", file=sys.stderr) + + +class WebhookNotificationBackend(NotificationBackend): + def __init__(self, url: str, headers: dict[str, str] | None = None): + self.url = url + self.headers = headers or {} + + async def send(self, event: "AgentEvent", message: str) -> None: + import httpx + + payload = { + "agent": event.agent.name, + "event": event.__class__.__name__, + "message": message, + "timestamp": event.timestamp.isoformat(), + } + + async with httpx.AsyncClient() as client: + await client.post(self.url, json=payload, headers=self.headers) + + +def notify( + event_type: "type[AgentEvent] | t.Callable[[AgentEvent], bool]", + message: str | t.Callable[["AgentEvent"], str], + backend: NotificationBackend | None = None, +) -> t.Callable[["AgentEvent"], t.Awaitable["Reaction | None"]]: + """ + Create a notification hook that sends notifications when events occur. + + Unlike other hooks, notification hooks don't affect agent execution - they return + None (no reaction) and run asynchronously to deliver notifications. + + Args: + event_type: Event type to trigger on, or predicate function + message: Static message or callable that generates message from event + backend: Notification backend (defaults to terminal output) + + Returns: + Hook that sends notifications + + Example: + ```python + from dreadnode.agent import Agent + from dreadnode.agent.events import ToolStart + from dreadnode.agent.hooks.notification import notify + + agent = Agent( + name="analyzer", + hooks=[ + notify( + ToolStart, + lambda e: f"Starting tool: {e.tool_name}", + ), + ], + ) + ``` + """ + notification_backend = backend or TerminalNotificationBackend() + + async def notification_hook(event: "AgentEvent") -> "Reaction | None": + should_notify = False + + if isinstance(event_type, type): + should_notify = isinstance(event, event_type) + elif callable(event_type): + should_notify = event_type(event) + + if not should_notify: + return None + + msg = message(event) if callable(message) else message + + try: + await notification_backend.send(event, msg) + except Exception: # noqa: BLE001 + logger.exception("Notification hook failed") + + return None + + return notification_hook diff --git a/pyproject.toml b/pyproject.toml index 3f390777..d6881e72 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -192,3 +192,9 @@ skip-magic-trailing-comma = false "dreadnode/transforms/language.py" = [ "RUF001", # intentional use of ambiguous unicode characters for airt ] +"dreadnode/agent/tools/interaction.py" = [ + "T201", # print required for user interaction +] +"dreadnode/agent/hooks/notification.py" = [ + "T201", # print required for terminal notifications +] diff --git a/tests/test_notification_hook.py b/tests/test_notification_hook.py new file mode 100644 index 00000000..c5a34a39 --- /dev/null +++ b/tests/test_notification_hook.py @@ -0,0 +1,161 @@ +import typing as t +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from dreadnode.agent.events import AgentEvent, ToolStart +from dreadnode.agent.hooks.notification import ( + LogNotificationBackend, + NotificationBackend, + TerminalNotificationBackend, + WebhookNotificationBackend, + notify, +) + + +class MockEvent(AgentEvent): + pass + + +@pytest.fixture +def mock_event() -> AgentEvent: + agent = MagicMock() + agent.name = "test_agent" + thread = MagicMock() + messages: list[t.Any] = [] + events: list[AgentEvent] = [] + + return MockEvent( + session_id=MagicMock(), + agent=agent, + thread=thread, + messages=messages, + events=events, + ) + + +async def test_log_notification_backend(mock_event: AgentEvent) -> None: + from unittest.mock import patch + + backend = LogNotificationBackend() + + with patch("dreadnode.agent.hooks.notification.logger.info") as mock_logger: + await backend.send(mock_event, "Test notification") + + mock_logger.assert_called_once() + call_args = mock_logger.call_args[0][0] + assert "Test notification" in call_args + assert "test_agent" in call_args + + +async def test_terminal_notification_backend(mock_event: AgentEvent) -> None: + from io import StringIO + from unittest.mock import patch + + backend = TerminalNotificationBackend() + + stderr_capture = StringIO() + with patch("sys.stderr", stderr_capture): + await backend.send(mock_event, "Test notification") + + output = stderr_capture.getvalue() + assert "Test notification" in output + assert "test_agent" in output + + +async def test_webhook_notification_backend(mock_event: AgentEvent) -> None: + mock_client = AsyncMock() + mock_post = AsyncMock() + mock_client.__aenter__.return_value.post = mock_post + + backend = WebhookNotificationBackend("https://example.com/webhook") + + from unittest.mock import patch + + import httpx + + with patch.object(httpx, "AsyncClient", return_value=mock_client): + await backend.send(mock_event, "Test notification") + + mock_post.assert_called_once() + call_kwargs = mock_post.call_args.kwargs + assert call_kwargs["json"]["message"] == "Test notification" + assert call_kwargs["json"]["agent"] == "test_agent" + + +async def test_notify_hook_with_event_type(mock_event: AgentEvent) -> None: + backend = MagicMock(spec=NotificationBackend) + backend.send = AsyncMock() + + hook = notify(MockEvent, "Test message", backend=backend) + + reaction = await hook(mock_event) + + assert reaction is None + backend.send.assert_called_once_with(mock_event, "Test message") + + +async def test_notify_hook_uses_terminal_by_default(mock_event: AgentEvent) -> None: + from io import StringIO + from unittest.mock import patch + + hook = notify(MockEvent, "Default notification") + + stderr_capture = StringIO() + with patch("sys.stderr", stderr_capture): + reaction = await hook(mock_event) + + assert reaction is None + output = stderr_capture.getvalue() + assert "Default notification" in output + + +async def test_notify_hook_with_callable_message(mock_event: AgentEvent) -> None: + backend = MagicMock(spec=NotificationBackend) + backend.send = AsyncMock() + + hook = notify(MockEvent, lambda e: f"Event from {e.agent.name}", backend=backend) + + reaction = await hook(mock_event) + + assert reaction is None + backend.send.assert_called_once_with(mock_event, "Event from test_agent") + + +async def test_notify_hook_with_predicate(mock_event: AgentEvent) -> None: + backend = MagicMock(spec=NotificationBackend) + backend.send = AsyncMock() + + hook = notify(lambda e: e.agent.name == "test_agent", "Matched!", backend=backend) + + reaction = await hook(mock_event) + + assert reaction is None + backend.send.assert_called_once() + + +async def test_notify_hook_no_match(mock_event: AgentEvent) -> None: + backend = MagicMock(spec=NotificationBackend) + backend.send = AsyncMock() + + hook = notify(ToolStart, "Should not send", backend=backend) + + reaction = await hook(mock_event) + + assert reaction is None + backend.send.assert_not_called() + + +async def test_notify_hook_handles_backend_failure(mock_event: AgentEvent) -> None: + from unittest.mock import patch + + backend = MagicMock(spec=NotificationBackend) + backend.send = AsyncMock(side_effect=Exception("Backend failed")) + + hook = notify(MockEvent, "Test message", backend=backend) + + with patch("dreadnode.agent.hooks.notification.logger.exception") as mock_logger: + reaction = await hook(mock_event) + + assert reaction is None + mock_logger.assert_called_once_with("Notification hook failed") From 0dbb98ef784a57e023fef8d3154e9361d62a50bc Mon Sep 17 00:00:00 2001 From: GangGreenTemperTatum <104169244+GangGreenTemperTatum@users.noreply.github.com> Date: Tue, 20 Jan 2026 16:29:37 -0500 Subject: [PATCH 2/4] chore: redesign notification hooks for better ergonomics pr feedback --- dreadnode/agent/agent.py | 72 +++++++++++++++++++++++++++ dreadnode/agent/events.py | 31 ++++++++++++ dreadnode/agent/hooks/notification.py | 50 ++++++++++++++----- tests/test_notification_hook.py | 62 +++++++++++++++++++++-- 4 files changed, 197 insertions(+), 18 deletions(-) diff --git a/dreadnode/agent/agent.py b/dreadnode/agent/agent.py index 79fbb965..6d0124dd 100644 --- a/dreadnode/agent/agent.py +++ b/dreadnode/agent/agent.py @@ -30,6 +30,7 @@ _total_usage_from_events, ) from dreadnode.agent.hooks import Hook, retry_with_feedback +from dreadnode.agent.hooks.notification import NotificationBackend, TerminalNotificationBackend from dreadnode.agent.reactions import ( Continue, Fail, @@ -62,6 +63,16 @@ CommitBehavior = t.Literal["always", "on-success"] +async def _safe_send( + backend: NotificationBackend, event: AgentEvent, message: str +) -> None: + """Send notification with error handling.""" + try: + await backend.send(event, message) + except Exception: # noqa: BLE001 + logger.exception(f"Notification failed for {event.__class__.__name__}") + + class AgentWarning(UserWarning): """Warning raised when an agent is used in a way that may not be safe or intended.""" @@ -111,6 +122,24 @@ class Agent(Model): assert_scores: list[str] | t.Literal[True] = Field(default_factory=list) """Scores to ensure are truthy, otherwise the agent task is marked as failed.""" + notifications: t.Annotated[ + bool | NotificationBackend | None, SkipValidation + ] = Config(default=None, repr=False) + """ + Enable notifications. + - True: Uses TerminalNotificationBackend (stderr output) + - NotificationBackend instance: Uses custom backend + - None/False: Disabled + """ + notification_events: list[type[AgentEvent]] | t.Literal["all"] = Config( + default="all", repr=False + ) + """Which event types to notify on. Defaults to all events.""" + notification_formatter: t.Annotated[ + t.Callable[[AgentEvent], str] | None, SkipValidation + ] = Config(default=None, repr=False) + """Custom formatter for notification messages. If None, uses event's default representation.""" + _generator: rg.Generator | None = PrivateAttr(None, init=False) @field_validator("tools", mode="before") @@ -129,6 +158,49 @@ def validate_tools(cls, value: t.Any) -> t.Any: return tools + def model_post_init(self, context: t.Any) -> None: + super().model_post_init(context) + + # Auto-inject notification hook if enabled + if self.notifications: + backend = ( + self.notifications + if isinstance(self.notifications, NotificationBackend) + else TerminalNotificationBackend() + ) + + self.hooks.append( + self._create_notification_hook( + backend, + self.notification_events, + self.notification_formatter, + ) + ) + + def _create_notification_hook( + self, + backend: NotificationBackend, + events: list[type[AgentEvent]] | t.Literal["all"], + formatter: t.Callable[[AgentEvent], str] | None, + ) -> Hook: + """Create a notification hook that delegates formatting to events.""" + import asyncio + + async def notification_hook(event: AgentEvent) -> None: + # Filter events + if events != "all" and not any(isinstance(event, et) for et in events): + return + + # Use custom formatter if provided, otherwise delegate to event + message = formatter(event) if formatter else event.format_notification() + + # Fire and forget - don't block agent execution + _ = asyncio.create_task(_safe_send(backend, event, message)) # noqa: RUF006 + + return + + return notification_hook + def __repr__(self) -> str: description = shorten_string(self.description or "", 50) diff --git a/dreadnode/agent/events.py b/dreadnode/agent/events.py index 12b3d0a5..9c9928d8 100644 --- a/dreadnode/agent/events.py +++ b/dreadnode/agent/events.py @@ -115,12 +115,22 @@ def format_as_panel(self, *, truncate: bool = False) -> Panel: # noqa: ARG002 border_style="dim", ) + def format_notification(self) -> str: + """ + Format this event as a human-readable notification message. + Override in subclasses for custom formatting. + """ + return f"{self.__class__.__name__}" + def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult: yield self.format_as_panel() @dataclass class AgentStart(AgentEvent): + def format_notification(self) -> str: + return f"Starting agent: {self.agent.name}" + def format_as_panel(self, *, truncate: bool = False) -> Panel: return Panel( format_message(self.messages[0], truncate=truncate), @@ -158,6 +168,10 @@ def __repr__(self) -> str: message = f"Message(role={self.message.role}, content='{message_content}', tool_calls={tool_call_count})" return f"GenerationEnd(message={message})" + def format_notification(self) -> str: + tokens = self.usage.total_tokens if self.usage else "unknown" + return f"Generation complete ({tokens} tokens)" + def format_as_panel(self, *, truncate: bool = False) -> Panel: cost = round(self.estimated_cost, 6) if self.estimated_cost else "" usage = str(self.usage) or "" @@ -173,6 +187,9 @@ def format_as_panel(self, *, truncate: bool = False) -> Panel: @dataclass class AgentStalled(AgentEventInStep): + def format_notification(self) -> str: + return "Agent stalled: no tool calls and no stop conditions met" + def format_as_panel(self, *, truncate: bool = False) -> Panel: # noqa: ARG002 return Panel( Text( @@ -189,6 +206,9 @@ def format_as_panel(self, *, truncate: bool = False) -> Panel: # noqa: ARG002 class AgentError(AgentEventInStep): error: BaseException + def format_notification(self) -> str: + return f"Error: {self.error.__class__.__name__}: {self.error!s}" + def format_as_panel(self, *, truncate: bool = False) -> Panel: # noqa: ARG002 return Panel( repr(self), @@ -205,6 +225,9 @@ class ToolStart(AgentEventInStep): def __repr__(self) -> str: return f"ToolStart(tool_call={self.tool_call})" + def format_notification(self) -> str: + return f"Starting tool: {self.tool_call.name}" + def format_as_panel(self, *, truncate: bool = False) -> Panel: content: RenderableType try: @@ -245,6 +268,10 @@ def __repr__(self) -> str: message = f"Message(role={self.message.role}, content='{message_content}')" return f"ToolEnd(tool_call={self.tool_call}, message={message}, stop={self.stop})" + def format_notification(self) -> str: + status = " (requesting stop)" if self.stop else "" + return f"Finished tool: {self.tool_call.name}{status}" + def format_as_panel(self, *, truncate: bool = False) -> Panel: panel = format_message(self.message, truncate=truncate) subtitle = f"[dim]{self.tool_call.id}[/dim]" @@ -294,6 +321,10 @@ class AgentEnd(AgentEvent): stop_reason: "AgentStopReason" result: "AgentResult" + def format_notification(self) -> str: + status = "❌ Failed" if self.result.failed else "✅ Finished" + return f"{status}: {self.stop_reason} (steps: {self.result.steps}, tokens: {self.result.usage.total_tokens})" + def format_as_panel(self, *, truncate: bool = False) -> Panel: # noqa: ARG002 res = self.result status = "[bold red]Failed[/bold red]" if res.failed else "[bold green]Success[/bold green]" diff --git a/dreadnode/agent/hooks/notification.py b/dreadnode/agent/hooks/notification.py index 26989fb7..244d8182 100644 --- a/dreadnode/agent/hooks/notification.py +++ b/dreadnode/agent/hooks/notification.py @@ -4,8 +4,9 @@ from loguru import logger if t.TYPE_CHECKING: + import httpx + from dreadnode.agent.events import AgentEvent - from dreadnode.agent.reactions import Reaction class NotificationBackend(ABC): @@ -27,29 +28,46 @@ async def send(self, event: "AgentEvent", message: str) -> None: class WebhookNotificationBackend(NotificationBackend): - def __init__(self, url: str, headers: dict[str, str] | None = None): + def __init__(self, url: str, headers: dict[str, str] | None = None, timeout: float = 5.0): self.url = url self.headers = headers or {} + self.timeout = timeout + self._client: httpx.AsyncClient | None = None + + async def __aenter__(self) -> "WebhookNotificationBackend": + import httpx + + self._client = httpx.AsyncClient(timeout=self.timeout) + return self + + async def __aexit__(self, *args: t.Any) -> None: + if self._client: + await self._client.aclose() async def send(self, event: "AgentEvent", message: str) -> None: import httpx - payload = { + if not self._client: + self._client = httpx.AsyncClient(timeout=self.timeout) + + payload = self._build_payload(event, message) + await self._client.post(self.url, json=payload, headers=self.headers) + + def _build_payload(self, event: "AgentEvent", message: str) -> dict[str, str]: + """Override this to customize webhook payload.""" + return { "agent": event.agent.name, "event": event.__class__.__name__, "message": message, "timestamp": event.timestamp.isoformat(), } - async with httpx.AsyncClient() as client: - await client.post(self.url, json=payload, headers=self.headers) - def notify( event_type: "type[AgentEvent] | t.Callable[[AgentEvent], bool]", - message: str | t.Callable[["AgentEvent"], str], + message: str | t.Callable[["AgentEvent"], str] | None = None, backend: NotificationBackend | None = None, -) -> t.Callable[["AgentEvent"], t.Awaitable["Reaction | None"]]: +) -> t.Callable[["AgentEvent"], t.Awaitable[None]]: """ Create a notification hook that sends notifications when events occur. @@ -58,7 +76,8 @@ def notify( Args: event_type: Event type to trigger on, or predicate function - message: Static message or callable that generates message from event + message: Static message or callable that generates message from event. + If None, uses event.format_notification() backend: Notification backend (defaults to terminal output) Returns: @@ -73,6 +92,7 @@ def notify( agent = Agent( name="analyzer", hooks=[ + notify(ToolStart), # Uses default formatting notify( ToolStart, lambda e: f"Starting tool: {e.tool_name}", @@ -83,7 +103,7 @@ def notify( """ notification_backend = backend or TerminalNotificationBackend() - async def notification_hook(event: "AgentEvent") -> "Reaction | None": + async def notification_hook(event: "AgentEvent") -> None: should_notify = False if isinstance(event_type, type): @@ -92,15 +112,19 @@ async def notification_hook(event: "AgentEvent") -> "Reaction | None": should_notify = event_type(event) if not should_notify: - return None + return - msg = message(event) if callable(message) else message + # Use custom message if provided, otherwise delegate to event + if message is None: + msg = event.format_notification() + else: + msg = message(event) if callable(message) else message try: await notification_backend.send(event, msg) except Exception: # noqa: BLE001 logger.exception("Notification hook failed") - return None + return return notification_hook diff --git a/tests/test_notification_hook.py b/tests/test_notification_hook.py index c5a34a39..1948e9f0 100644 --- a/tests/test_notification_hook.py +++ b/tests/test_notification_hook.py @@ -64,16 +64,16 @@ async def test_terminal_notification_backend(mock_event: AgentEvent) -> None: async def test_webhook_notification_backend(mock_event: AgentEvent) -> None: + from unittest.mock import patch + + import httpx + mock_client = AsyncMock() mock_post = AsyncMock() - mock_client.__aenter__.return_value.post = mock_post + mock_client.post = mock_post backend = WebhookNotificationBackend("https://example.com/webhook") - from unittest.mock import patch - - import httpx - with patch.object(httpx, "AsyncClient", return_value=mock_client): await backend.send(mock_event, "Test notification") @@ -159,3 +159,55 @@ async def test_notify_hook_handles_backend_failure(mock_event: AgentEvent) -> No assert reaction is None mock_logger.assert_called_once_with("Notification hook failed") + + +async def test_notify_hook_uses_default_formatter(mock_event: AgentEvent) -> None: + backend = MagicMock(spec=NotificationBackend) + backend.send = AsyncMock() + + hook = notify(MockEvent, backend=backend) + + reaction = await hook(mock_event) + + assert reaction is None + backend.send.assert_called_once() + # Check that it used event.format_notification() + call_args = backend.send.call_args[0] + assert call_args[1] == "MockEvent" # Default format_notification returns class name + + +def test_agent_auto_inject_notifications_terminal() -> None: + from dreadnode.agent import Agent + + agent = Agent(name="test", notifications=True) + + # Should have auto-injected a notification hook + assert len(agent.hooks) == 1 + + +def test_agent_auto_inject_notifications_custom_backend() -> None: + from dreadnode.agent import Agent + + backend = LogNotificationBackend() + agent = Agent(name="test", notifications=backend) + + # Should have auto-injected a notification hook + assert len(agent.hooks) == 1 + + +def test_agent_no_notifications_by_default() -> None: + from dreadnode.agent import Agent + + agent = Agent(name="test") + + # Should not have any auto-injected hooks + assert len(agent.hooks) == 0 + + +def test_agent_notifications_disabled() -> None: + from dreadnode.agent import Agent + + agent = Agent(name="test", notifications=False) + + # Should not have any auto-injected hooks + assert len(agent.hooks) == 0 From b7a61f4189c4fe76213c79f7bf4d5675a3290837 Mon Sep 17 00:00:00 2001 From: GangGreenTemperTatum <104169244+GangGreenTemperTatum@users.noreply.github.com> Date: Tue, 20 Jan 2026 16:31:29 -0500 Subject: [PATCH 3/4] chore: cleanup --- dreadnode/agent/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dreadnode/agent/events.py b/dreadnode/agent/events.py index 9c9928d8..859de0c5 100644 --- a/dreadnode/agent/events.py +++ b/dreadnode/agent/events.py @@ -322,7 +322,7 @@ class AgentEnd(AgentEvent): result: "AgentResult" def format_notification(self) -> str: - status = "❌ Failed" if self.result.failed else "✅ Finished" + status = "Failed" if self.result.failed else "Finished" return f"{status}: {self.stop_reason} (steps: {self.result.steps}, tokens: {self.result.usage.total_tokens})" def format_as_panel(self, *, truncate: bool = False) -> Panel: # noqa: ARG002 From 00e6771ce90ee8a71bf94a54b48e4c865db0e1f7 Mon Sep 17 00:00:00 2001 From: GangGreenTemperTatum <104169244+GangGreenTemperTatum@users.noreply.github.com> Date: Tue, 20 Jan 2026 16:33:48 -0500 Subject: [PATCH 4/4] fix: linting and formatting --- dreadnode/agent/agent.py | 16 +++++++--------- dreadnode/agent/hooks/notification.py | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/dreadnode/agent/agent.py b/dreadnode/agent/agent.py index 6d0124dd..8daf6c36 100644 --- a/dreadnode/agent/agent.py +++ b/dreadnode/agent/agent.py @@ -63,9 +63,7 @@ CommitBehavior = t.Literal["always", "on-success"] -async def _safe_send( - backend: NotificationBackend, event: AgentEvent, message: str -) -> None: +async def _safe_send(backend: NotificationBackend, event: AgentEvent, message: str) -> None: """Send notification with error handling.""" try: await backend.send(event, message) @@ -122,9 +120,9 @@ class Agent(Model): assert_scores: list[str] | t.Literal[True] = Field(default_factory=list) """Scores to ensure are truthy, otherwise the agent task is marked as failed.""" - notifications: t.Annotated[ - bool | NotificationBackend | None, SkipValidation - ] = Config(default=None, repr=False) + notifications: t.Annotated[bool | NotificationBackend | None, SkipValidation] = Config( + default=None, repr=False + ) """ Enable notifications. - True: Uses TerminalNotificationBackend (stderr output) @@ -135,9 +133,9 @@ class Agent(Model): default="all", repr=False ) """Which event types to notify on. Defaults to all events.""" - notification_formatter: t.Annotated[ - t.Callable[[AgentEvent], str] | None, SkipValidation - ] = Config(default=None, repr=False) + notification_formatter: t.Annotated[t.Callable[[AgentEvent], str] | None, SkipValidation] = ( + Config(default=None, repr=False) + ) """Custom formatter for notification messages. If None, uses event's default representation.""" _generator: rg.Generator | None = PrivateAttr(None, init=False) diff --git a/dreadnode/agent/hooks/notification.py b/dreadnode/agent/hooks/notification.py index 244d8182..a9985b44 100644 --- a/dreadnode/agent/hooks/notification.py +++ b/dreadnode/agent/hooks/notification.py @@ -40,7 +40,7 @@ async def __aenter__(self) -> "WebhookNotificationBackend": self._client = httpx.AsyncClient(timeout=self.timeout) return self - async def __aexit__(self, *args: t.Any) -> None: + async def __aexit__(self, *args: object) -> None: if self._client: await self._client.aclose()