Skip to content

Question About Message support for SSE and implementation of seprator input #643

@amorfatianima

Description

@amorfatianima

I have two questions looking for help:

  1. Like Disscussion#248, if the case is simply Message via SSE instead of Task (eg. simple chat demo), the design of EventConsumer can not work, so is there some other implementation or developers can't use EventConsumer?
# a2a/server/events/event_consumer.py
class EventConsumer:
    # ...
    async def consume_all(self) -> AsyncGenerator[Event]:
        # ...
        while True:
            # is_final_event will be true if Message
            is_final_event = (
                isinstance(event, Message)
            )
            if is_final_event:
                # queue will be closed if Message
                await self.queue.close(True)
  1. EventSourceResponse in sse_starlette provides the sep param for change among three official seprators (\r, \n and \r\n), but jsonrpc_app init EventSourceResponse without fixing the sep param (default seprartor is \r\n), is there any other way to input the seprator?
# a2a/server/apps/jsonrpc/jsonrpc_app.py
class JSONRPCApplication(ABC):
    def _create_response(
        self,
        context: ServerCallContext,
        handler_result: (
            AsyncGenerator[SendStreamingMessageResponse]
            | JSONRPCErrorResponse
            | JSONRPCResponse
        ),
    ) -> Response:
        # ...
        if isinstance(handler_result, AsyncGenerator):

            # not fixing the sep param
            return EventSourceResponse(
                event_generator(handler_result), headers=headers
            )

# sse_starlette/sse.py
class EventSourceResponse(Response):

    DEFAULT_SEPARATOR = "\r\n"

    def __init__(
        self,
        content: ContentStream,
        status_code: int = 200,
        headers: Optional[Mapping[str, str]] = None,
        media_type: str = "text/event-stream",
        background: Optional[BackgroundTask] = None,
        ping: Optional[int] = None,
        sep: Optional[str] = None,
        ping_message_factory: Optional[Callable[[], ServerSentEvent]] = None,
        data_sender_callable: Optional[
            Callable[[], Coroutine[None, None, None]]
        ] = None,
        send_timeout: Optional[float] = None,
        client_close_handler_callable: Optional[
            Callable[[Message], Awaitable[None]]
        ] = None,
    ) -> None:
        
        if sep not in (None, "\r\n", "\r", "\n"):
            raise ValueError(f"sep must be one of: \\r\\n, \\r, \\n, got: {sep}")

        self.sep = sep or self.DEFAULT_SEPARATOR

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions