diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b453036..25c9e30 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -20,7 +20,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] - python-version: [ '3.9', '3.10', '3.11', '3.12', '3.13' ] + python-version: [ '3.10', '3.11', '3.12', '3.13', '3.14' ] steps: - name: Checkout diff --git a/plugins/akernel_task/fps_akernel_task/akernel_task.py b/plugins/akernel_task/fps_akernel_task/akernel_task.py index 4e88019..a0349dd 100644 --- a/plugins/akernel_task/fps_akernel_task/akernel_task.py +++ b/plugins/akernel_task/fps_akernel_task/akernel_task.py @@ -8,8 +8,9 @@ class AKernelTask(_Kernel): - def __init__(self, *args, **kwargs): + def __init__(self, *args, execute_in_thread: bool = False, **kwargs): super().__init__() + self.execute_in_thread = execute_in_thread async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None: async with ( @@ -37,6 +38,7 @@ async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> self._to_stdin_receive_stream, self._from_stdin_send_stream, self._from_iopub_send_stream, + execute_in_thread=self.execute_in_thread, ) self.task_group.start_soon(self.kernel.start) task_status.started() diff --git a/plugins/akernel_task/fps_akernel_task/main.py b/plugins/akernel_task/fps_akernel_task/main.py index 7f99c85..bb7aa73 100644 --- a/plugins/akernel_task/fps_akernel_task/main.py +++ b/plugins/akernel_task/fps_akernel_task/main.py @@ -1,5 +1,7 @@ from __future__ import annotations +from functools import partial + from fps import Module from jupyverse_api.kernel import KernelFactory @@ -9,6 +11,20 @@ class AKernelTaskModule(Module): + def __init__(self, *args, execute_in_thread: bool = False, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.execute_in_thread = execute_in_thread + + async def prepare(self) -> None: + kernels = await self.get(Kernels) + kernels.register_kernel_factory( + "akernel", KernelFactory(partial(AKernelTask, execute_in_thread=self.execute_in_thread)) + ) + + +class AKernelThreadTaskModule(Module): async def prepare(self) -> None: kernels = await self.get(Kernels) - kernels.register_kernel_factory("akernel", KernelFactory(AKernelTask)) + kernels.register_kernel_factory( + "akernel-thread", KernelFactory(partial(AKernelTask, execute_in_thread=True)) + ) diff --git a/plugins/akernel_task/pyproject.toml b/plugins/akernel_task/pyproject.toml index f13a5e7..bf5299a 100644 --- a/plugins/akernel_task/pyproject.toml +++ b/plugins/akernel_task/pyproject.toml @@ -29,5 +29,5 @@ text = "MIT" Homepage = "https://github.com/davidbrochart/akernel" [project.entry-points] -"fps.modules" = {akernel_task = "fps_akernel_task.main:AKernelTaskModule"} -"jupyverse.modules" = {akernel_task = "fps_akernel_task.main:AKernelTaskModule"} +"fps.modules" = {akernel_task = "fps_akernel_task.main:AKernelTaskModule", akernelthread_task = "fps_akernel_task.main:AKernelThreadTaskModule"} +"jupyverse.modules" = {akernel_task = "fps_akernel_task.main:AKernelTaskModule", akernelthread_task = "fps_akernel_task.main:AKernelThreadTaskModule"} diff --git a/pyproject.toml b/pyproject.toml index 08437e6..1ccb52a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,16 +8,16 @@ version = "0.3.4" description = "An asynchronous Python Jupyter kernel" readme = "README.md" license = {text = "MIT"} -requires-python = ">=3.9" +requires-python = ">=3.10" authors = [{name = "David Brochart", email = "david.brochart@gmail.com"}] classifiers = [ "Development Status :: 4 - Beta", "Programming Language :: Python", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] @@ -44,8 +44,8 @@ test = [ [project.optional-dependencies] subprocess = [ - "zmq-anyio >=0.3.9,<0.4.0", - "typer >=0.4.0", + "zmq-anyio >=0.3.13,<0.4.0", + "cyclopts >=4.3.0,<5.0.0", ] react = [ @@ -57,7 +57,7 @@ cache = [ ] [project.scripts] -akernel = "akernel.akernel:cli" +akernel = "akernel.akernel:app" [tool.hatch.build.targets.wheel] ignore-vcs = true @@ -65,6 +65,7 @@ packages = ["src/akernel"] [tool.hatch.build.targets.wheel.shared-data] "share/jupyter/kernels/akernel/kernel.json" = "share/jupyter/kernels/akernel/kernel.json" +"share/jupyter/kernels/akernel-thread/kernel.json" = "share/jupyter/kernels/akernel-thread/kernel.json" [project.urls] Homepage = "https://github.com/davidbrochart/akernel" diff --git a/share/jupyter/kernels/akernel-thread/kernel.json b/share/jupyter/kernels/akernel-thread/kernel.json new file mode 100644 index 0000000..6569676 --- /dev/null +++ b/share/jupyter/kernels/akernel-thread/kernel.json @@ -0,0 +1,11 @@ +{ + "argv": [ + "akernel", + "launch", + "--execute-in-thread", + "-f", + "{connection_file}" + ], + "display_name": "Python 3 (akernel-thread)", + "language": "python" +} diff --git a/src/akernel/akernel.py b/src/akernel/akernel.py index 9f8955e..27733ea 100644 --- a/src/akernel/akernel.py +++ b/src/akernel/akernel.py @@ -1,26 +1,30 @@ from __future__ import annotations import json -from typing import Optional, cast +from typing import Annotated, cast -import typer from anyio import create_memory_object_stream, create_task_group, run, sleep_forever +from cyclopts import App, Parameter from .connect import connect_channel from .kernel import Kernel from .kernelspec import write_kernelspec -cli = typer.Typer() +app = App() -@cli.command() +@app.command() def install( - mode: str = typer.Argument("", help="Mode of the kernel to install."), - cache_dir: Optional[str] = typer.Option( - None, "-c", help="Path to the cache directory, if mode is 'cache'." - ), -): + mode: str = "", + cache_dir: str | None = None, +) -> None: + """Install the kernel. + + Args: + mode: Mode of the kernel to install. + cache_dir: Path to the cache directory, if mode is 'cache'. + """ kernel_name = "akernel" if mode: modes = mode.split("-") @@ -31,27 +35,48 @@ def install( write_kernelspec(kernel_name, mode, display_name, cache_dir) -@cli.command() +@app.command() def launch( - mode: str = typer.Argument("", help="Mode of the kernel to launch."), - cache_dir: Optional[str] = typer.Option( - None, "-c", help="Path to the cache directory, if mode is 'cache'." - ), - connection_file: str = typer.Option(..., "-f", help="Path to the connection file."), + connection_file: Annotated[str, Parameter(alias=["-f"])], + mode: str = "", + cache_dir: str | None = None, + execute_in_thread: bool = False, ): - akernel = AKernel(mode, cache_dir, connection_file) + """Launch the kernel. + + Args: + mode: Mode of the kernel to launch. + cache_dir: Path to the cache directory, if mode is 'cache'. + connection_file: Path to the connection file. + execute_in_thread: Whether to run user code in a thread. + """ + akernel = AKernel(mode, cache_dir, connection_file, execute_in_thread) run(akernel.start) class AKernel: - def __init__(self, mode, cache_dir, connection_file): - self._to_shell_send_stream, self._to_shell_receive_stream = create_memory_object_stream[list[bytes]]() - self._from_shell_send_stream, self._from_shell_receive_stream = create_memory_object_stream[list[bytes]]() - self._to_control_send_stream, self._to_control_receive_stream = create_memory_object_stream[list[bytes]]() - self._from_control_send_stream, self._from_control_receive_stream = create_memory_object_stream[list[bytes]]() - self._to_stdin_send_stream, self._to_stdin_receive_stream = create_memory_object_stream[list[bytes]]() - self._from_stdin_send_stream, self._from_stdin_receive_stream = create_memory_object_stream[list[bytes]]() - self._from_iopub_send_stream, self._from_iopub_receive_stream = create_memory_object_stream[list[bytes]](max_buffer_size=float("inf")) + def __init__(self, mode, cache_dir, connection_file, execute_in_thread): + self._to_shell_send_stream, self._to_shell_receive_stream = create_memory_object_stream[ + list[bytes] + ]() + self._from_shell_send_stream, self._from_shell_receive_stream = create_memory_object_stream[ + list[bytes] + ]() + self._to_control_send_stream, self._to_control_receive_stream = create_memory_object_stream[ + list[bytes] + ]() + self._from_control_send_stream, self._from_control_receive_stream = ( + create_memory_object_stream[list[bytes]]() + ) + self._to_stdin_send_stream, self._to_stdin_receive_stream = create_memory_object_stream[ + list[bytes] + ]() + self._from_stdin_send_stream, self._from_stdin_receive_stream = create_memory_object_stream[ + list[bytes] + ]() + self._from_iopub_send_stream, self._from_iopub_receive_stream = create_memory_object_stream[ + list[bytes] + ](max_buffer_size=float("inf")) self.kernel = Kernel( self._to_shell_receive_stream, self._from_shell_send_stream, @@ -62,6 +87,7 @@ def __init__(self, mode, cache_dir, connection_file): self._from_iopub_send_stream, mode, cache_dir, + execute_in_thread, ) with open(connection_file) as f: connection_cfg = json.load(f) @@ -136,4 +162,4 @@ async def from_iopub(self) -> None: if __name__ == "__main__": - cli() + app() diff --git a/src/akernel/code.py b/src/akernel/code.py index 7831682..3e6c30b 100644 --- a/src/akernel/code.py +++ b/src/akernel/code.py @@ -151,7 +151,7 @@ def get_async_code(self) -> str: def get_async_bytecode(self) -> CodeType: tree = self.get_async_ast() - #tree = gast.gast_to_ast(gtree) + # tree = gast.gast_to_ast(gtree) bytecode = compile(tree, filename="", mode="exec") return bytecode diff --git a/src/akernel/kernel.py b/src/akernel/kernel.py index 6be41bd..afb13a2 100644 --- a/src/akernel/kernel.py +++ b/src/akernel/kernel.py @@ -1,14 +1,22 @@ from __future__ import annotations import asyncio -import sys import platform -import json +import sys from io import StringIO from contextvars import ContextVar -from typing import Dict, Any, List, Union, Awaitable, cast - -from anyio import Event, create_task_group, sleep +from typing import Dict, Any, List, Awaitable + +from anyio import ( + Event, + create_memory_object_stream, + create_task_group, + from_thread, + get_cancelled_exc_class, + run, + sleep, + to_thread, +) import comm # type: ignore from akernel.comm.manager import CommManager from akernel.display import display @@ -61,6 +69,7 @@ def __init__( from_iopub_send_stream, kernel_mode: str = "", cache_dir: str | None = None, + execute_in_thread: bool = False, ): global KERNEL KERNEL = self @@ -77,6 +86,7 @@ def __init__( self.kernel_mode = kernel_mode self.cache_dir = cache_dir + self.execute_in_thread = execute_in_thread self._concurrent_kernel = None self._multi_kernel = None self._cache_kernel = None @@ -100,6 +110,10 @@ def __init__( else: self.cache = None self.stop_event = Event() + if execute_in_thread: + import threading + + self._stop_event = threading.Event() self.key = "0" def chain_execution(self) -> None: @@ -137,7 +151,6 @@ def init_kernel(self, namespace): return self.globals[namespace] = { - "ainput": self.ainput, "asyncio": asyncio, "print": self.print, "__task__": self.task, @@ -145,6 +158,10 @@ def init_kernel(self, namespace): "__unchain_execution__": self.unchain_execution, "_": None, } + if self.execute_in_thread: + self.globals[namespace]["input"] = self.input + else: + self.globals[namespace]["ainput"] = self.ainput self.locals[namespace] = {} if self.react_kernel: code = ( @@ -166,8 +183,47 @@ def interrupt(self): task.cancel() self.running_cells = {} + async def thread_execute(self): + while True: + message = await to_thread.run_sync(self.to_thread_queue.get) + if message is None: + return + + parent, idents, async_cell = message + PARENT_VAR.set(parent) + IDENTS_VAR.set(idents) + result = None + exception = None + traceback = None + try: + result = await async_cell() + except get_cancelled_exc_class(): + return + except Exception: + exc_type, exception, traceback = sys.exc_info() + from_thread.run_sync( + self.from_thread_send_stream.send_nowait, (result, exception, traceback) + ) + + async def thread_main(self) -> None: + async with create_task_group() as tg: + tg.start_soon(self.thread_execute) + await to_thread.run_sync(self._stop_event.wait) + tg.cancel_scope.cancel() + + def thread(self) -> None: + run(self.thread_main) + async def start(self) -> None: async with create_task_group() as self.task_group: + if self.execute_in_thread: + from queue import Queue + + self.to_thread_queue = Queue() + self.from_thread_send_stream, self.from_thread_receive_stream = ( + create_memory_object_stream(max_buffer_size=1) + ) + self.task_group.start_soon(to_thread.run_sync, self.thread) msg = self.create_message("status", content={"execution_state": self.execution_state}) to_send = serialize(msg, self.key) await self.from_iopub_send_stream.send(to_send) @@ -186,15 +242,21 @@ async def start(self) -> None: async def _start(self) -> None: self.task_group.start_soon(self.listen_shell) self.task_group.start_soon(self.listen_control) - while True: - # run until shutdown request - await self.stop_event.wait() - if self.restart: - # kernel restart - self.stop_event = Event() - else: - # kernel shutdown - break + try: + while True: + # run until shutdown request + await self.stop_event.wait() + if self.restart: + # kernel restart + self.stop_event = Event() + else: + # kernel shutdown + break + except get_cancelled_exc_class(): + if self.execute_in_thread: + self._stop_event.set() + self.to_thread_queue.put(None) + raise async def listen_shell(self) -> None: while True: @@ -203,7 +265,10 @@ async def listen_shell(self) -> None: # if there was a blocking cell execution, and it was interrupted, # let's ignore all the following execution requests until the pipe # is empty - if self.interrupted and self.to_shell_receive_stream.statistics().tasks_waiting_send == 0: + if ( + self.interrupted + and self.to_shell_receive_stream.statistics().tasks_waiting_send == 0 + ): self.interrupted = False msg_list = await self.to_shell_receive_stream.receive() idents, msg_list = feed_identities(msg_list) @@ -369,21 +434,37 @@ async def execute_and_finish( if self._chain_execution and prev_task_i in self.cell_done: await self.cell_done[prev_task_i].wait() del self.cell_done[prev_task_i] - PARENT_VAR.set(parent) - IDENTS_VAR.set(idents) parent_header = parent["header"] traceback, exception = [], None namespace = self.get_namespace(parent_header) try: - result = await self.locals[namespace][f"__async_cell{task_i}__"]() + if self.execute_in_thread: + self.to_thread_queue.put( + (parent, idents, self.locals[namespace][f"__async_cell{task_i}__"]) + ) + result, exception, traceback = await self.from_thread_receive_stream.receive() + else: + PARENT_VAR.set(parent) + IDENTS_VAR.set(idents) + result = await self.locals[namespace][f"__async_cell{task_i}__"]() except KeyboardInterrupt: self.interrupt() - except Exception as e: - exception = e - traceback = get_traceback(code, e, execution_count) + except Exception: + if self.execute_in_thread: + raise + else: + exc_type, exception, traceback = sys.exc_info() + traceback = get_traceback(code, exception, traceback, execution_count) else: - await self.show_result(result, self.globals[namespace], parent_header) - cache_execution(self.cache, cache_info, self.globals[namespace], result) + if self.execute_in_thread: + if exception is not None: + traceback = get_traceback(code, exception, traceback, execution_count) + else: + await self.show_result(result, self.globals[namespace], parent_header) + cache_execution(self.cache, cache_info, self.globals[namespace], result) + else: + await self.show_result(result, self.globals[namespace], parent_header) + cache_execution(self.cache, cache_info, self.globals[namespace], result) finally: self.cell_done[task_i].set() del self.locals[namespace][f"__async_cell{task_i}__"] @@ -455,6 +536,24 @@ def task(self, cell_i: int = -1) -> Awaitable: return self.running_cells[i] return asyncio.sleep(0) + def input(self, prompt: str = "") -> Any: + parent = PARENT_VAR.get() + idents = IDENTS_VAR.get() + if parent["content"]["allow_stdin"]: + msg = self.create_message( + "input_request", + parent_header=parent["header"], + content={"prompt": prompt, "password": False}, + address=idents[0], + ) + to_send = serialize(msg, self.key) + from_thread.run_sync(self.from_stdin_send_stream.send_nowait, to_send) + msg_list = from_thread.run(self.to_stdin_receive_stream.receive) + idents, msg_list = feed_identities(msg_list) + msg = deserialize(msg_list) + if msg["content"]["status"] == "ok": + return msg["content"]["value"] + async def ainput(self, prompt: str = "") -> Any: parent = PARENT_VAR.get() idents = IDENTS_VAR.get() @@ -466,11 +565,10 @@ async def ainput(self, prompt: str = "") -> Any: address=idents[0], ) to_send = serialize(msg, self.key) - await self.from_stdin_send_stream.send(to_send) + self.from_stdin_send_stream.send_nowait(to_send) msg_list = await self.to_stdin_receive_stream.receive() idents, msg_list = feed_identities(msg_list) msg = deserialize(msg_list) - idents, msg = res if msg["content"]["status"] == "ok": return msg["content"]["value"] @@ -499,7 +597,10 @@ def print( content={"name": name, "text": text}, ) to_send = serialize(msg, self.key) - self.from_iopub_send_stream.send_nowait(to_send) + if self.execute_in_thread: + from_thread.run_sync(self.from_iopub_send_stream.send_nowait, to_send) + else: + self.from_iopub_send_stream.send_nowait(to_send) def create_message( self, diff --git a/src/akernel/message.py b/src/akernel/message.py index 0ca14dc..2078ab9 100644 --- a/src/akernel/message.py +++ b/src/akernel/message.py @@ -29,7 +29,7 @@ def utcnow() -> datetime: def feed_identities(msg_list: list[bytes]) -> tuple[list[bytes], list[bytes]]: idx = msg_list.index(DELIM) idents = msg_list[:idx] or [b"foo"] - return idents , msg_list[idx + 1 :] # noqa + return idents, msg_list[idx + 1 :] # noqa def create_message_header(msg_type: str, session_id: str, msg_cnt: int) -> dict[str, Any]: diff --git a/src/akernel/traceback.py b/src/akernel/traceback.py index 74d43de..407279a 100644 --- a/src/akernel/traceback.py +++ b/src/akernel/traceback.py @@ -1,15 +1,19 @@ from __future__ import annotations -import sys import types +from traceback import extract_tb, format_list from typing import cast from colorama import Fore, Style # type: ignore -def get_traceback(code: str, exception, execution_count: int = 0): - exc_info = sys.exc_info() - tb = cast(types.TracebackType, exc_info[2]) +def get_traceback(code: str, exception, traceback, execution_count: int = 0): + tb_list = extract_tb(traceback) + tb = format_list(tb_list) + # tb += [f"{Fore.RED}{type(exception).__name__}{Style.RESET_ALL}: {exception.args[0]}"] + # return tb + + tb = cast(types.TracebackType, traceback) while True: if tb.tb_next is None: break diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 08929c8..2b2f024 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -11,7 +11,9 @@ TIMEOUT = 5 -KERNELSPEC_PATH = str(Path(sys.prefix) / "share" / "jupyter" / "kernels" / "akernel" / "kernel.json") +KERNELSPEC_PATH = str( + Path(sys.prefix) / "share" / "jupyter" / "kernels" / "akernel" / "kernel.json" +) ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")