diff --git a/src/mcp/server/fastmcp/utilities/func_metadata.py b/src/mcp/server/fastmcp/utilities/func_metadata.py index be2296594..aae65ca5c 100644 --- a/src/mcp/server/fastmcp/utilities/func_metadata.py +++ b/src/mcp/server/fastmcp/utilities/func_metadata.py @@ -1,10 +1,13 @@ import inspect import json from collections.abc import Awaitable, Callable, Sequence +from functools import partial from itertools import chain from types import GenericAlias from typing import Annotated, Any, cast, get_args, get_origin, get_type_hints +import anyio +import anyio.to_thread import pydantic_core from pydantic import ( BaseModel, @@ -92,7 +95,7 @@ async def call_fn_with_arg_validation( if fn_is_async: return await fn(**arguments_parsed_dict) else: - return fn(**arguments_parsed_dict) + return await anyio.to_thread.run_sync(partial(fn, **arguments_parsed_dict)) def convert_result(self, result: Any) -> Any: """Convert the result of a function call to the appropriate format for diff --git a/tests/server/fastmcp/test_func_metadata.py b/tests/server/fastmcp/test_func_metadata.py index 8d3ac6ec5..4485fbdc6 100644 --- a/tests/server/fastmcp/test_func_metadata.py +++ b/tests/server/fastmcp/test_func_metadata.py @@ -3,11 +3,14 @@ # pyright: reportMissingParameterType=false # pyright: reportUnknownArgumentType=false # pyright: reportUnknownLambdaType=false +import threading +import time from collections.abc import Callable from dataclasses import dataclass from typing import Annotated, Any, Final, NamedTuple, TypedDict import annotated_types +import anyio import pytest from dirty_equals import IsPartialDict from pydantic import BaseModel, Field @@ -1189,3 +1192,80 @@ def func_with_metadata() -> Annotated[int, Field(gt=1)]: ... # pragma: no branc assert meta.output_schema is not None assert meta.output_schema["properties"]["result"] == {"exclusiveMinimum": 1, "title": "Result", "type": "integer"} + + +@pytest.mark.anyio +async def test_sync_tool_does_not_block_event_loop() -> None: + """Regression test: sync tools must not run inline on the event loop. + + If sync tools run inline, this test will fail because `fast_probe` + won't get scheduled until after `time.sleep`. + """ + + def slow_sync(x: int) -> int: + time.sleep(0.30) # intentionally blocks if run on event loop + return x + 1 + + md = func_metadata(slow_sync) + + start = anyio.current_time() + fast_probe_elapsed: float | None = None + slow_result: int | None = None + + async def run_slow() -> None: + nonlocal slow_result + # call_fn_with_arg_validation is the execution path used for tools + slow_result = await md.call_fn_with_arg_validation( + fn=slow_sync, + fn_is_async=False, + arguments_to_validate={"x": 1}, + arguments_to_pass_directly=None, + ) + + async def fast_probe() -> None: + nonlocal fast_probe_elapsed + # If event loop is not blocked, this should run "immediately" + await anyio.sleep(0) + fast_probe_elapsed = anyio.current_time() - start + + # Keep the whole test bounded even if something regresses badly + with anyio.fail_after(2): + async with anyio.create_task_group() as tg: + tg.start_soon(run_slow) + tg.start_soon(fast_probe) + + assert slow_result == 2 + + assert fast_probe_elapsed is not None + # If slow_sync blocks the loop, this will be ~0.30s and fail. + # If slow_sync is offloaded, this should typically be a few ms. + assert fast_probe_elapsed < 0.10 + + +@pytest.mark.anyio +async def test_sync_function_runs_in_worker_thread(): + """Ensure synchronous tools are executed in a worker thread via anyio.to_thread.run_sync, + instead of blocking the event loop thread. + """ + + def blocking_sync(delay: float) -> int: # pragma: no cover + # Sleep to simulate a blocking sync tool + time.sleep(delay) + # Return the thread ID we are running on + return threading.get_ident() + + meta = func_metadata(blocking_sync) + + # This is the event loop thread ID (where the test itself is running) + loop_thread_id = threading.get_ident() + + # Call the sync function through call_fn_with_arg_validation + result_thread_id = await meta.call_fn_with_arg_validation( + blocking_sync, + fn_is_async=False, + arguments_to_validate={"delay": 0.01}, + arguments_to_pass_directly=None, + ) + + # The tool should have executed in a different worker thread + assert result_thread_id != loop_thread_id