-
Notifications
You must be signed in to change notification settings - Fork 2
feat: native notification hook with three backend options #304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: native notification hook with three backend options #304
Conversation
|
Great work, @GangGreenTemperTatum . The current design requires users toi wrap every notification via
agent = Agent(name="scanner", hooks=[notify(ToolStart, ...), notify(ToolEnd, ...)])
Looking at We could consider leveraging the same pattern for notifications. Recommended Design: COnsider adding a top-level something like below class Agent(Model):
# ... existing fields ...
notifications: bool | NotificationBackend | None = None
"""
Enable notifications.
- True: Uses TerminalNotificationBackend (stderr output)
- NotificationBackend instance: Uses custom backend
- None/False: Disabled
"""
notification_events: list[type[AgentEvent]] | Literal["all"] = "all"
"""Which event types to notify on. Defaults to all events."""
notification_formatter: Callable[[AgentEvent], str] | None = None
"""Custom formatter for notification messages. If None, uses event's default representation."""
def model_post_init(self, context: t.Any) -> None:
super().model_post_init(context)
# Auto-inject notification hook if enabled
if self.notifications:
backend = (
self.notifications
if isinstance(self.notifications, NotificationBackend)
else TerminalNotificationBackend()
)
self.hooks.append(
self._create_notification_hook(
backend,
self.notification_events,
self.notification_formatter,
)
)In that way: 80-90% of users can consume like below agent = Agent(
name="scanner",
notifications=True,
)Could filter specific events: agent = Agent(
name="scanner",
notifications=True,
notification_events=[ToolStart, ToolEnd, AgentEnd],
)Power users can still customize: agent = Agent(
name="scanner",
notifications=WebhookNotificationBackend(
url="https:/....",
headers={"Authorization": "Bearer token"},
),
)Implementation: Delegating Formatting to resp[ective Events delegate formatting to the events themselves. This is scalable and maintainable: # In events.py - add a method to the base class
@dataclass
class AgentEvent:
# ... existing fields ...
def format_notification(self) -> str:
"""
Format this event as a human-readable notification message.
Override in subclasses for custom formatting.
"""
return f"{self.__class__.__name__}"
@dataclass
class ToolStart(AgentEventInStep):
tool_call: rg.tools.ToolCall
def format_notification(self) -> str:
return f"Starting tool: {self.tool_call.name}"
@dataclass
class ToolEnd(AgentEventInStep):
tool_call: rg.tools.ToolCall
message: rg.Message
stop: bool
def format_notification(self) -> str:
status = " (requesting stop)" if self.stop else ""
return f"Finished tool: {self.tool_call.name}{status}"
@dataclass
class GenerationEnd(AgentEventInStep):
message: rg.Message
usage: "rg.generator.Usage | None"
def format_notification(self) -> str:
tokens = self.usage.total_tokens if self.usage else "unknown"
return f"Generation complete ({tokens} tokens)"
@dataclass
class AgentEnd(AgentEvent):
stop_reason: "AgentStopReason"
result: "AgentResult"
def format_notification(self) -> str:
status = "❌ Failed" if self.result.failed else "✅ Finished"
return f"{status}: {self.stop_reason} (steps: {self.result.steps}, tokens: {self.result.usage.total_tokens})"
@dataclass
class AgentError(AgentEventInStep):
error: BaseException
def format_notification(self) -> str:
return f"Error: {self.error.__class__.__name__}: {str(self.error)}"
@dataclass
class AgentStalled(AgentEventInStep):
def format_notification(self) -> str:
return "Agent stalled: no tool calls and no stop conditions met"Then the noitifcation hook becomes simple something like below: def _create_notification_hook(
self,
backend: NotificationBackend,
events: list[type[AgentEvent]] | Literal["all"],
formatter: Callable[[AgentEvent], str] | None,
) -> Hook:
"""Create a notification hook that delegates formatting to events."""
async def notification_hook(event: AgentEvent) -> None:
# Filter events
if events != "all" and not any(isinstance(event, et) for et in events):
return None
# Use custom formatter if provided, otherwise delegate to event
message = formatter(event) if formatter else event.format_notification()
# Fire and forget - don't block agent execution
asyncio.create_task(_safe_send(backend, event, message))
return None
return notification_hook
async def _safe_send(backend: NotificationBackend, event: AgentEvent, message: str) -> None:
"""Send notification with error handling."""
try:
await backend.send(event, message)
except Exception:
logger.exception(f"Notification failed for {event.__class__.__name__}")With this:
|
| } | ||
|
|
||
| async with httpx.AsyncClient() as client: | ||
| await client.post(self.url, json=payload, headers=self.headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are creating a new client per notification, consider creating in the init once and leverage withc context managers something like below:
class WebhookNotificationBackend(NotificationBackend):
def __init__(self, url: str, headers: dict[str, str] | None = None, timeout: float = 5.0):
self.url = url
self.headers = headers or {}
self.timeout = timeout
self._client: httpx.AsyncClient | None = None
async def __aenter__(self):
self._client = httpx.AsyncClient(timeout=self.timeout)
return self
async def __aexit__(self, *args):
if self._client:
await self._client.aclose()
async def send(self, event: "AgentEvent", message: str) -> None:
if not self._client:
self._client = httpx.AsyncClient(timeout=self.timeout)
payload = self._build_payload(event, message)
await self._client.post(self.url, json=payload, headers=self.headers)
def _build_payload(self, event: "AgentEvent", message: str) -> dict:
"""Override this to customize webhook payload."""
return {
"agent": event.agent.name,
"event": event.__class__.__name__,
"message": message,
"timestamp": event.timestamp.isoformat(),
}| """ | ||
| notification_backend = backend or TerminalNotificationBackend() | ||
|
|
||
| async def notification_hook(event: "AgentEvent") -> "Reaction | None": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we returning Reaction, based on the function should be None?
|
ty so much for the review @rdheekonda ! i just refactored my approach and hoping you could take a look (no rush) when you get a second if OK pls? new changes reflect |
* feat: native notification hook with three backend options * chore: redesign notification hooks for better ergonomics pr feedback * chore: cleanup * fix: linting and formatting
[HOOKS]
notificationhook for local and remote backendsKey Changes:
notificationhook in the sdk with three backend optionsAdded:
notificationhook in the sdk with three backend options and testsUsgae example:
Generated Summary:
LogNotificationBackend,TerminalNotificationBackend, andWebhookNotificationBackendfor sending notifications.notifyfunction allows for flexible notification sending based on event types and messages.NotificationBackendclass is implemented as an abstract base class, enhancing extensibility.notifyfunction includes usage examples to assist developers.pyproject.tomlto include the new notification file for linting purposes.This summary was generated with ❤️ by rigging