From 69c8a2f5114d0d1ca4d39e7c9d2ca12dd0798241 Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Sat, 27 Dec 2025 23:35:23 +0530 Subject: [PATCH 1/5] Fixing async issue --- .../server/fastmcp/utilities/func_metadata.py | 5 +- tests/server/fastmcp/test_func_metadata.py | 118 ++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/src/mcp/server/fastmcp/utilities/func_metadata.py b/src/mcp/server/fastmcp/utilities/func_metadata.py index fa443d2fcb..a550df2159 100644 --- a/src/mcp/server/fastmcp/utilities/func_metadata.py +++ b/src/mcp/server/fastmcp/utilities/func_metadata.py @@ -5,6 +5,8 @@ from types import GenericAlias from typing import Annotated, Any, cast, get_args, get_origin, get_type_hints +import anyio +import anyio.to_thread import pydantic_core from pydantic import ( BaseModel, @@ -14,6 +16,7 @@ WithJsonSchema, create_model, ) +from functools import partial from pydantic.fields import FieldInfo from pydantic.json_schema import GenerateJsonSchema, JsonSchemaWarningKind from typing_extensions import is_typeddict @@ -92,7 +95,7 @@ async def call_fn_with_arg_validation( if fn_is_async: return await fn(**arguments_parsed_dict) else: - return fn(**arguments_parsed_dict) + await anyio.to_thread.run_sync(partial(fn, **arguments_parsed_dict)) def convert_result(self, result: Any) -> Any: """ diff --git a/tests/server/fastmcp/test_func_metadata.py b/tests/server/fastmcp/test_func_metadata.py index 61e524290e..a665b09b71 100644 --- a/tests/server/fastmcp/test_func_metadata.py +++ b/tests/server/fastmcp/test_func_metadata.py @@ -5,9 +5,12 @@ # pyright: reportUnknownLambdaType=false from collections.abc import Callable from dataclasses import dataclass +import threading +import time from typing import Annotated, Any, Final, TypedDict import annotated_types +import anyio import pytest from dirty_equals import IsPartialDict from pydantic import BaseModel, Field @@ -1202,3 +1205,118 @@ def func_with_metadata() -> Annotated[int, Field(gt=1)]: ... # pragma: no branc assert meta.output_schema is not None assert meta.output_schema["properties"]["result"] == {"exclusiveMinimum": 1, "title": "Result", "type": "integer"} + +@pytest.mark.anyio +async def test_sync_function_runs_in_worker_thread(): + """ + Ensure synchronous tools are executed in a worker thread via anyio.to_thread.run_sync, + instead of blocking the event loop thread. + """ + + def blocking_sync(delay: float) -> int: # pragma: no cover + # Sleep to simulate a blocking sync tool + time.sleep(delay) + # Return the thread ID we are running on + return threading.get_ident() + + meta = func_metadata(blocking_sync) + + # This is the event loop thread ID (where the test itself is running) + loop_thread_id = threading.get_ident() + + # Call the sync function through call_fn_with_arg_validation + result_thread_id = await meta.call_fn_with_arg_validation( + blocking_sync, + fn_is_async=False, + arguments_to_validate={"delay": 0.01}, + arguments_to_pass_directly=None, + ) + + # The tool should have executed in a different worker thread + assert result_thread_id != loop_thread_id + + +@pytest.mark.anyio +async def test_sync_blocking_tool_does_not_block_event_loop(): + """ + A blocking synchronous tool (time.sleep) should not prevent other tasks + on the event loop from running, because it is offloaded to a worker thread. + """ + + def blocking_tool(delay: float) -> str: # pragma: no cover + time.sleep(delay) + return "done" + + meta = func_metadata(blocking_tool) + + flag = {"ran": False} + + async def run_tool(): + result = await meta.call_fn_with_arg_validation( + blocking_tool, + fn_is_async=False, + arguments_to_validate={"delay": 0.2}, + arguments_to_pass_directly=None, + ) + assert result == "done" + + async def concurrent_task(): + # If the event loop is *not* blocked, this will run while the tool sleeps + await anyio.sleep(0.05) + flag["ran"] = True + + async with anyio.create_task_group() as tg: + tg.start_soon(run_tool) + tg.start_soon(concurrent_task) + + # If the sync tool had blocked the event loop, concurrent_task would never + # have executed and flag["ran"] would still be False. + assert flag["ran"] is True + +@pytest.mark.anyio +async def test_sync_tool_does_not_block_event_loop() -> None: + """ + Regression test: sync tools must not run inline on the event loop. + + If sync tools run inline, this test will fail because `fast_probe` + won't get scheduled until after `time.sleep`. + """ + + def slow_sync(x: int) -> int: + time.sleep(0.30) # intentionally blocks if run on event loop + return x + 1 + + md = func_metadata(slow_sync) + + start = anyio.current_time() + fast_probe_elapsed: float | None = None + slow_result: int | None = None + + async def run_slow() -> None: + nonlocal slow_result + # call_fn_with_arg_validation is the execution path used for tools + slow_result = await md.call_fn_with_arg_validation( + fn=slow_sync, + fn_is_async=False, + arguments_to_validate={"x": 1}, + arguments_to_pass_directly=None, + ) + + async def fast_probe() -> None: + nonlocal fast_probe_elapsed + # If event loop is not blocked, this should run "immediately" + await anyio.sleep(0) + fast_probe_elapsed = anyio.current_time() - start + + # Keep the whole test bounded even if something regresses badly + with anyio.fail_after(2): + async with anyio.create_task_group() as tg: + tg.start_soon(run_slow) + tg.start_soon(fast_probe) + + assert slow_result == 2 + + assert fast_probe_elapsed is not None + # If slow_sync blocks the loop, this will be ~0.30s and fail. + # If slow_sync is offloaded, this should typically be a few ms. + assert fast_probe_elapsed < 0.10 \ No newline at end of file From 1117e622dbdf9010cbfde92eef78e95ce7056a0e Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Mon, 29 Dec 2025 19:49:58 +0530 Subject: [PATCH 2/5] Fix: offload sync tool execution to AnyIO worker thread to prevent event-loop blocking --- .../server/fastmcp/utilities/func_metadata.py | 4 +- tests/server/fastmcp/test_func_metadata.py | 102 ++++++------------ 2 files changed, 35 insertions(+), 71 deletions(-) diff --git a/src/mcp/server/fastmcp/utilities/func_metadata.py b/src/mcp/server/fastmcp/utilities/func_metadata.py index a550df2159..dd6346a8ca 100644 --- a/src/mcp/server/fastmcp/utilities/func_metadata.py +++ b/src/mcp/server/fastmcp/utilities/func_metadata.py @@ -1,6 +1,7 @@ import inspect import json from collections.abc import Awaitable, Callable, Sequence +from functools import partial from itertools import chain from types import GenericAlias from typing import Annotated, Any, cast, get_args, get_origin, get_type_hints @@ -16,7 +17,6 @@ WithJsonSchema, create_model, ) -from functools import partial from pydantic.fields import FieldInfo from pydantic.json_schema import GenerateJsonSchema, JsonSchemaWarningKind from typing_extensions import is_typeddict @@ -95,7 +95,7 @@ async def call_fn_with_arg_validation( if fn_is_async: return await fn(**arguments_parsed_dict) else: - await anyio.to_thread.run_sync(partial(fn, **arguments_parsed_dict)) + return await anyio.to_thread.run_sync(partial(fn, **arguments_parsed_dict)) def convert_result(self, result: Any) -> Any: """ diff --git a/tests/server/fastmcp/test_func_metadata.py b/tests/server/fastmcp/test_func_metadata.py index a665b09b71..d35df4e4cb 100644 --- a/tests/server/fastmcp/test_func_metadata.py +++ b/tests/server/fastmcp/test_func_metadata.py @@ -3,10 +3,10 @@ # pyright: reportMissingParameterType=false # pyright: reportUnknownArgumentType=false # pyright: reportUnknownLambdaType=false -from collections.abc import Callable -from dataclasses import dataclass import threading import time +from collections.abc import Callable +from dataclasses import dataclass from typing import Annotated, Any, Final, TypedDict import annotated_types @@ -1206,72 +1206,6 @@ def func_with_metadata() -> Annotated[int, Field(gt=1)]: ... # pragma: no branc assert meta.output_schema is not None assert meta.output_schema["properties"]["result"] == {"exclusiveMinimum": 1, "title": "Result", "type": "integer"} -@pytest.mark.anyio -async def test_sync_function_runs_in_worker_thread(): - """ - Ensure synchronous tools are executed in a worker thread via anyio.to_thread.run_sync, - instead of blocking the event loop thread. - """ - - def blocking_sync(delay: float) -> int: # pragma: no cover - # Sleep to simulate a blocking sync tool - time.sleep(delay) - # Return the thread ID we are running on - return threading.get_ident() - - meta = func_metadata(blocking_sync) - - # This is the event loop thread ID (where the test itself is running) - loop_thread_id = threading.get_ident() - - # Call the sync function through call_fn_with_arg_validation - result_thread_id = await meta.call_fn_with_arg_validation( - blocking_sync, - fn_is_async=False, - arguments_to_validate={"delay": 0.01}, - arguments_to_pass_directly=None, - ) - - # The tool should have executed in a different worker thread - assert result_thread_id != loop_thread_id - - -@pytest.mark.anyio -async def test_sync_blocking_tool_does_not_block_event_loop(): - """ - A blocking synchronous tool (time.sleep) should not prevent other tasks - on the event loop from running, because it is offloaded to a worker thread. - """ - - def blocking_tool(delay: float) -> str: # pragma: no cover - time.sleep(delay) - return "done" - - meta = func_metadata(blocking_tool) - - flag = {"ran": False} - - async def run_tool(): - result = await meta.call_fn_with_arg_validation( - blocking_tool, - fn_is_async=False, - arguments_to_validate={"delay": 0.2}, - arguments_to_pass_directly=None, - ) - assert result == "done" - - async def concurrent_task(): - # If the event loop is *not* blocked, this will run while the tool sleeps - await anyio.sleep(0.05) - flag["ran"] = True - - async with anyio.create_task_group() as tg: - tg.start_soon(run_tool) - tg.start_soon(concurrent_task) - - # If the sync tool had blocked the event loop, concurrent_task would never - # have executed and flag["ran"] would still be False. - assert flag["ran"] is True @pytest.mark.anyio async def test_sync_tool_does_not_block_event_loop() -> None: @@ -1319,4 +1253,34 @@ async def fast_probe() -> None: assert fast_probe_elapsed is not None # If slow_sync blocks the loop, this will be ~0.30s and fail. # If slow_sync is offloaded, this should typically be a few ms. - assert fast_probe_elapsed < 0.10 \ No newline at end of file + assert fast_probe_elapsed < 0.10 + + +@pytest.mark.anyio +async def test_sync_function_runs_in_worker_thread(): + """ + Ensure synchronous tools are executed in a worker thread via anyio.to_thread.run_sync, + instead of blocking the event loop thread. + """ + + def blocking_sync(delay: float) -> int: # pragma: no cover + # Sleep to simulate a blocking sync tool + time.sleep(delay) + # Return the thread ID we are running on + return threading.get_ident() + + meta = func_metadata(blocking_sync) + + # This is the event loop thread ID (where the test itself is running) + loop_thread_id = threading.get_ident() + + # Call the sync function through call_fn_with_arg_validation + result_thread_id = await meta.call_fn_with_arg_validation( + blocking_sync, + fn_is_async=False, + arguments_to_validate={"delay": 0.01}, + arguments_to_pass_directly=None, + ) + + # The tool should have executed in a different worker thread + assert result_thread_id != loop_thread_id From 2eed9cece8be1983d7da14320e89949ab688e48a Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Sat, 17 Jan 2026 18:08:38 +0530 Subject: [PATCH 3/5] ruff formatter fix --- tests/server/fastmcp/test_concurrency.py | 29 ++++++++++++++++++++++ tests/server/fastmcp/test_func_metadata.py | 6 ++--- 2 files changed, 31 insertions(+), 4 deletions(-) create mode 100644 tests/server/fastmcp/test_concurrency.py diff --git a/tests/server/fastmcp/test_concurrency.py b/tests/server/fastmcp/test_concurrency.py new file mode 100644 index 0000000000..ea71606d17 --- /dev/null +++ b/tests/server/fastmcp/test_concurrency.py @@ -0,0 +1,29 @@ +import asyncio +import time + + +def fetch_sync(name: str, delay: float) -> str: + time.sleep(delay) + return f"{name} done" + +async def fetch_async(name: str, delay: float) -> str: + await asyncio.sleep(delay) # yields control to event loop + return f"{name} done" + +async def main(): + start = time.perf_counter() + + + result1 = await fetch_async("A", 1) + + result2 = fetch_sync("B", 1) + # result2 = await fetch_async("B", 1) + + result3 = await fetch_async("C", 1) + + # print(f"{result1} | {result2} | {result3} | {result4}") + print(f"{result1} | {result2} | {result3}") + print("elapsed:", round(time.perf_counter() - start, 2), "seconds") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/tests/server/fastmcp/test_func_metadata.py b/tests/server/fastmcp/test_func_metadata.py index 4a53c32012..4485fbdc6e 100644 --- a/tests/server/fastmcp/test_func_metadata.py +++ b/tests/server/fastmcp/test_func_metadata.py @@ -1196,8 +1196,7 @@ def func_with_metadata() -> Annotated[int, Field(gt=1)]: ... # pragma: no branc @pytest.mark.anyio async def test_sync_tool_does_not_block_event_loop() -> None: - """ - Regression test: sync tools must not run inline on the event loop. + """Regression test: sync tools must not run inline on the event loop. If sync tools run inline, this test will fail because `fast_probe` won't get scheduled until after `time.sleep`. @@ -1245,8 +1244,7 @@ async def fast_probe() -> None: @pytest.mark.anyio async def test_sync_function_runs_in_worker_thread(): - """ - Ensure synchronous tools are executed in a worker thread via anyio.to_thread.run_sync, + """Ensure synchronous tools are executed in a worker thread via anyio.to_thread.run_sync, instead of blocking the event loop thread. """ From 3b0aa12e5411a22965654a59bd4aad90910cc1bd Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Sat, 17 Jan 2026 18:10:31 +0530 Subject: [PATCH 4/5] fixing EOL formatter --- tests/server/fastmcp/test_concurrency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/fastmcp/test_concurrency.py b/tests/server/fastmcp/test_concurrency.py index ea71606d17..e20a7f2d41 100644 --- a/tests/server/fastmcp/test_concurrency.py +++ b/tests/server/fastmcp/test_concurrency.py @@ -26,4 +26,4 @@ async def main(): print("elapsed:", round(time.perf_counter() - start, 2), "seconds") if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) From 6fd02ca0fff7c617a55ff26bbf8b392809a59b03 Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Sat, 17 Jan 2026 18:13:41 +0530 Subject: [PATCH 5/5] removing test concurrency file - not needed --- tests/server/fastmcp/test_concurrency.py | 29 ------------------------ 1 file changed, 29 deletions(-) delete mode 100644 tests/server/fastmcp/test_concurrency.py diff --git a/tests/server/fastmcp/test_concurrency.py b/tests/server/fastmcp/test_concurrency.py deleted file mode 100644 index e20a7f2d41..0000000000 --- a/tests/server/fastmcp/test_concurrency.py +++ /dev/null @@ -1,29 +0,0 @@ -import asyncio -import time - - -def fetch_sync(name: str, delay: float) -> str: - time.sleep(delay) - return f"{name} done" - -async def fetch_async(name: str, delay: float) -> str: - await asyncio.sleep(delay) # yields control to event loop - return f"{name} done" - -async def main(): - start = time.perf_counter() - - - result1 = await fetch_async("A", 1) - - result2 = fetch_sync("B", 1) - # result2 = await fetch_async("B", 1) - - result3 = await fetch_async("C", 1) - - # print(f"{result1} | {result2} | {result3} | {result4}") - print(f"{result1} | {result2} | {result3}") - print("elapsed:", round(time.perf_counter() - start, 2), "seconds") - -if __name__ == "__main__": - asyncio.run(main())