added auto pilot

This commit is contained in:
Will McGugan
2022-10-28 21:43:23 +01:00
parent bb80aeb7f9
commit 4370198bf2
4 changed files with 61 additions and 98 deletions

View File

@@ -70,13 +70,13 @@ class DictionaryApp(App):
if __name__ == "__main__": if __name__ == "__main__":
app = DictionaryApp() app = DictionaryApp()
async def run(): from textual.pilot import Pilot
async with app.run_managed() as pilot:
await pilot.press(*"Hello")
await pilot.pause(2)
await pilot.press(*" World!")
await pilot.pause(3)
import asyncio async def auto_pilot(pilot: Pilot) -> None:
await pilot.press(*"Hello")
await pilot.pause(2)
await pilot.press(*" World!")
await pilot.pause(3)
pilot.app.exit()
asyncio.run(run()) app.run(auto_pilot=auto_pilot)

View File

@@ -6,6 +6,7 @@ import shlex
from typing import Iterable from typing import Iterable
from textual.app import App from textual.app import App
from textual.pilot import Pilot
from textual._import_app import import_app from textual._import_app import import_app
@@ -78,23 +79,16 @@ def take_svg_screenshot(
if title is None: if title is None:
title = app.title title = app.title
svg: str = "" svg: str | None = ""
async def run_app(app: App) -> None: async def auto_pilot(pilot: Pilot) -> None:
nonlocal svg app = pilot.app
async with app.run_managed(headless=True) as pilot: await pilot.press(*press)
await pilot.press(*press) svg = app.export_screenshot(title=title)
svg = app.export_screenshot(title=title) app.exit(svg)
asyncio.run(run_app(app)) svg = app.run(headless=True, auto_pilot=auto_pilot)
assert svg is not None
# app.run(
# quit_after=5,
# press=press or ["ctrl+c"],
# headless=True,
# screenshot=True,
# screenshot_title=title,
# )
return svg return svg

View File

@@ -15,7 +15,9 @@ from pathlib import Path, PurePath
from time import perf_counter from time import perf_counter
from typing import ( from typing import (
Any, Any,
Awaitable,
Callable, Callable,
Coroutine,
Generic, Generic,
Iterable, Iterable,
Type, Type,
@@ -62,7 +64,12 @@ from .widget import AwaitMount, Widget
if TYPE_CHECKING: if TYPE_CHECKING:
from .devtools.client import DevtoolsClient from .devtools.client import DevtoolsClient
from .pilot import Pilot
if sys.version_info >= (3, 10):
from typing import TypeAlias
else: # pragma: no cover
from typing_extensions import TypeAlias
PLATFORM = platform.system() PLATFORM = platform.system()
WINDOWS = PLATFORM == "Windows" WINDOWS = PLATFORM == "Windows"
@@ -100,6 +107,10 @@ ComposeResult = Iterable[Widget]
RenderResult = RenderableType RenderResult = RenderableType
# AutopilotCallbackType: TypeAlias = "Callable[[Pilot], Awaitable[None]]"
AutopilotCallbackType: TypeAlias = "Callable[[Pilot], Coroutine[Any, Any, None]]"
class AppError(Exception): class AppError(Exception):
pass pass
@@ -582,13 +593,13 @@ class App(Generic[ReturnType], DOMNode):
keys, action, description, show=show, key_display=key_display keys, action, description, show=show, key_display=key_display
) )
async def _press_keys(self, press: Iterable[str]) -> None: async def _press_keys(self, keys: Iterable[str]) -> None:
"""A task to send key events.""" """A task to send key events."""
app = self app = self
driver = app._driver driver = app._driver
assert driver is not None assert driver is not None
await asyncio.sleep(0.02) await asyncio.sleep(0.02)
for key in press: for key in keys:
if key == "_": if key == "_":
print("(pause 50ms)") print("(pause 50ms)")
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
@@ -622,70 +633,30 @@ class App(Generic[ReturnType], DOMNode):
await asyncio.sleep(0.02) await asyncio.sleep(0.02)
await app._animator.wait_for_idle() await app._animator.wait_for_idle()
@asynccontextmanager
async def run_managed(self, headless: bool = False):
"""Context manager to run the app.
Args:
headless (bool, optional): Enable headless mode. Defaults to False.
"""
from ._pilot import Pilot
ready_event = asyncio.Event()
async def on_ready():
ready_event.set()
async def run_app(app: App) -> None:
await app._process_messages(ready_callback=on_ready, headless=headless)
self._set_active()
asyncio.create_task(run_app(self))
# Wait for the app to be ready
await ready_event.wait()
# Yield a pilot object
yield Pilot(self)
await self._shutdown()
async def run_async( async def run_async(
self, self,
*, *,
headless: bool = False, headless: bool = False,
quit_after: float | None = None, auto_pilot: AutopilotCallbackType,
press: Iterable[str] | None = None, ) -> ReturnType | None:
ready_callback: Callable | None = None, """Run the app asynchronously.
):
"""Run the app asynchronously. This is an async context manager, which shuts down the app on exit.
Example:
async def run_app():
app = MyApp()
async with app.run_async() as result:
print(result)
Args: Args:
quit_after (float | None, optional): Quit after a given number of seconds, or None headless (bool, optional): Run in headless mode (no output). Defaults to False.
to run forever. Defaults to None. auto_pilot (AutopilotCallbackType): An auto pilot coroutine.
headless (bool, optional): Run in "headless" mode (don't write to stdout).
press (str, optional): An iterable of keys to simulate being pressed.
Returns:
ReturnType | None: App return value.
""" """
app = self from .pilot import Pilot
if quit_after is not None: app = self
app.set_timer(quit_after, app.exit)
async def app_ready() -> None: async def app_ready() -> None:
"""Called by the message loop when the app is ready.""" """Called by the message loop when the app is ready."""
if press: if auto_pilot is not None:
asyncio.create_task(self._press_keys(app, press)) pilot = Pilot(app)
if ready_callback is not None: asyncio.create_task(auto_pilot(pilot))
await invoke(ready_callback)
await app._process_messages(ready_callback=app_ready, headless=headless) await app._process_messages(ready_callback=app_ready, headless=headless)
await app._shutdown() await app._shutdown()
@@ -695,37 +666,23 @@ class App(Generic[ReturnType], DOMNode):
self, self,
*, *,
headless: bool = False, headless: bool = False,
quit_after: float | None = None, auto_pilot: AutopilotCallbackType,
press: Iterable[str] | None = None,
screenshot: bool = False,
screenshot_title: str | None = None,
) -> ReturnType | None: ) -> ReturnType | None:
"""The main entry point for apps. """Run the app.
Args: Args:
headless (bool, optional): Run in "headless" mode (don't write to stdout). headless (bool, optional): Run in headless mode (no output). Defaults to False.
quit_after (float | None, optional): Quit after a given number of seconds, or None auto_pilot (AutopilotCallbackType): An auto pilot coroutine.
to run forever. Defaults to None.
press (str, optional): An iterable of keys to simulate being pressed.
screenshot (bool, optional): Take a screenshot after pressing keys (svg data stored in self._screenshot). Defaults to False.
screenshot_title (str | None, optional): Title of screenshot, or None to use App title. Defaults to None.
Returns: Returns:
ReturnType | None: The return value specified in `App.exit` or None if exit wasn't called. ReturnType | None: App return value.
""" """
async def run_app() -> None: async def run_app() -> None:
"""Run the app.""" """Run the app."""
def take_screenshot() -> None:
if screenshot:
self._screenshot = self.export_screenshot(title=screenshot_title)
await self.run_async( await self.run_async(
quit_after=quit_after,
headless=headless, headless=headless,
press=press, auto_pilot=auto_pilot,
ready_callback=take_screenshot,
) )
if _ASYNCIO_GET_EVENT_LOOP_IS_DEPRECATED: if _ASYNCIO_GET_EVENT_LOOP_IS_DEPRECATED:

View File

@@ -1,16 +1,28 @@
from __future__ import annotations from __future__ import annotations
import rich.repr
import asyncio import asyncio
from typing import Iterable, TYPE_CHECKING from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from .app import App from .app import App
@rich.repr.auto(angular=True)
class Pilot: class Pilot:
"""Pilot object to drive an app."""
def __init__(self, app: App) -> None: def __init__(self, app: App) -> None:
self._app = app self._app = app
def __rich_repr__(self) -> rich.repr.Result:
yield "app", "self._app"
@property
def app(self) -> App:
return self._app
async def press(self, *keys: str) -> None: async def press(self, *keys: str) -> None:
"""Simulate key-presses. """Simulate key-presses.