diff --git a/docs/examples/introduction/clock01.py b/docs/examples/introduction/clock01.py index 7a525c1ff..af5f33194 100644 --- a/docs/examples/introduction/clock01.py +++ b/docs/examples/introduction/clock01.py @@ -7,15 +7,15 @@ from textual.widget import Widget class Clock(Widget): def on_mount(self): self.styles.content_align = ("center", "middle") - self.set_interval(1, self.refresh) + self.auto_refresh = 1.0 def render(self): - return datetime.now().strftime("%c") + return datetime.now().strftime("%X") class ClockApp(App): - def on_mount(self): - self.mount(Clock()) + def compose(self): + yield Clock() app = ClockApp() diff --git a/docs/examples/introduction/clock02.py b/docs/examples/introduction/clock02.py index 8731d9d8c..3bfd17637 100644 --- a/docs/examples/introduction/clock02.py +++ b/docs/examples/introduction/clock02.py @@ -7,7 +7,7 @@ from textual.widget import Widget class Clock(Widget): def on_mount(self): self.styles.content_align = ("center", "middle") - self.set_interval(1, self.refresh) + self.auto_refresh = 1.0 def render(self): return datetime.now().strftime("%c") diff --git a/src/textual/app.py b/src/textual/app.py index 9913e54d6..5e5decfb9 100644 --- a/src/textual/app.py +++ b/src/textual/app.py @@ -20,6 +20,7 @@ from typing import ( TextIO, Type, TypeVar, + cast, ) from weakref import WeakSet, WeakValueDictionary @@ -52,6 +53,7 @@ from .devtools.client import DevtoolsClient, DevtoolsConnectionError, DevtoolsLo from .devtools.redirect_output import StdoutRedirector from .dom import DOMNode from .driver import Driver +from .drivers.headless_driver import HeadlessDriver from .features import FeatureFlag, parse_features from .file_monitor import FileMonitor from .geometry import Offset, Region, Size @@ -539,17 +541,25 @@ class App(Generic[ReturnType], DOMNode): keys, action, description, show=show, key_display=key_display ) - def run(self, quit_after: float | None = None) -> ReturnType | None: + def run( + self, quit_after: float | None = None, headless: bool = False + ) -> ReturnType | None: """The main entry point for apps. Args: quit_after (float | None, optional): Quit after a given number of seconds, or None to run forever. Defaults to None. + headless (bool, optional): Run in "headless" mode (don't write to stdout). Returns: - ReturnType | None: _description_ + ReturnType | None: The return value specified in `App.exit` or None if exit wasn't called. """ + if headless: + self.features = cast( + "frozenset[FeatureFlag]", self.features.union({"headless"}) + ) + async def run_app() -> None: if quit_after is not None: self.set_timer(quit_after, self.shutdown) @@ -932,29 +942,41 @@ class App(Generic[ReturnType], DOMNode): self.set_interval(0.5, self.css_monitor, name="css monitor") self.log("[b green]STARTED[/]", self.css_monitor) + process_messages = super().process_messages + + async def run_process_messages(): + mount_event = events.Mount(sender=self) + await self.dispatch_message(mount_event) + + self.title = self._title + self.stylesheet.update(self) + self.refresh() + await self.animator.start() + await self._ready() + await process_messages() + await self.animator.stop() + await self.close_all() + self._running = True try: load_event = events.Load(sender=self) await self.dispatch_message(load_event) - driver = self._driver = self.driver_class(self.console, self) - driver.start_application_mode() - driver.enable_bracketed_paste() - try: - with redirect_stdout(StdoutRedirector(self.devtools, self._log_file)): # type: ignore - mount_event = events.Mount(sender=self) - await self.dispatch_message(mount_event) + driver: Driver + driver_class = cast( + "type[Driver]", + HeadlessDriver if self.is_headless else self.driver_class, + ) + driver = self._driver = driver_class(self.console, self) - self.title = self._title - self.stylesheet.update(self) - self.refresh() - await self.animator.start() - await self._ready() - await super().process_messages() - await self.animator.stop() - await self.close_all() + driver.start_application_mode() + try: + if self.is_headless: + await run_process_messages() + else: + with redirect_stdout(StdoutRedirector(self.devtools, self._log_file)): # type: ignore + await run_process_messages() finally: - driver.disable_bracketed_paste() driver.stop_application_mode() except Exception as error: self.on_exception(error) diff --git a/src/textual/dom.py b/src/textual/dom.py index f1a0fc59f..68a14b28c 100644 --- a/src/textual/dom.py +++ b/src/textual/dom.py @@ -20,6 +20,7 @@ from .css.parse import parse_declarations from .css.styles import Styles, RenderStyles from .css.query import NoMatchingNodesError from .message_pump import MessagePump +from ._timer import Timer if TYPE_CHECKING: from .app import App @@ -71,8 +72,30 @@ class DOMNode(MessagePump): # A mapping of class names to Styles set in COMPONENT_CLASSES self._component_styles: dict[str, RenderStyles] = {} + self._auto_refresh: float | None = None + self._auto_refresh_timer: Timer | None = None + super().__init__() + @property + def auto_refresh(self) -> float | None: + return self._auto_refresh + + @auto_refresh.setter + def auto_refresh(self, interval: float | None) -> None: + if self._auto_refresh_timer is not None: + self._auto_refresh_timer.stop_no_wait() + self._auto_refresh_timer = None + if interval is not None: + self._auto_refresh_timer = self.set_interval( + interval, self._automatic_refresh + ) + self._auto_refresh = interval + + def _automatic_refresh(self) -> None: + """Perform an automatic refresh (set with auto_refresh property).""" + self.refresh() + def __init_subclass__(cls, inherit_css: bool = True) -> None: super().__init_subclass__() cls._inherit_css = inherit_css diff --git a/src/textual/driver.py b/src/textual/driver.py index 93e1fdd69..a349540f3 100644 --- a/src/textual/driver.py +++ b/src/textual/driver.py @@ -41,18 +41,6 @@ class Driver(ABC): click_event = events.Click.from_event(event) self.send_event(click_event) - def enable_bracketed_paste(self) -> None: - """Write the ANSI escape code `ESC[?2004h`, which - enables bracketed paste mode.""" - self.console.file.write("\x1b[?2004h") - self.console.file.flush() - - def disable_bracketed_paste(self) -> None: - """Write the ANSI escape code `ESC[?2004l`, which - disables bracketed paste mode.""" - self.console.file.write("\x1b[?2004l") - self.console.file.flush() - @abstractmethod def start_application_mode(self) -> None: ... diff --git a/src/textual/drivers/headless_driver.py b/src/textual/drivers/headless_driver.py new file mode 100644 index 000000000..ede6c4e8a --- /dev/null +++ b/src/textual/drivers/headless_driver.py @@ -0,0 +1,17 @@ +from __future__ import annotations + + +from ..driver import Driver + + +class HeadlessDriver(Driver): + """A do-nothing driver for testing.""" + + def start_application_mode(self) -> None: + pass + + def disable_input(self) -> None: + pass + + def stop_application_mode(self) -> None: + pass diff --git a/src/textual/drivers/linux_driver.py b/src/textual/drivers/linux_driver.py index 95a9dd1ed..11d6067de 100644 --- a/src/textual/drivers/linux_driver.py +++ b/src/textual/drivers/linux_driver.py @@ -70,6 +70,14 @@ class LinuxDriver(Driver): # Note: E.g. lxterminal understands 1000h, but not the urxvt or sgr # extensions. + def _enable_bracketed_paste(self) -> None: + """Enable bracketed paste mode.""" + self.console.file.write("\x1b[?2004h") + + def _disable_bracketed_paste(self) -> None: + """Disable bracketed paste mode.""" + self.console.file.write("\x1b[?2004l") + def _disable_mouse_support(self) -> None: write = self.console.file.write write("\x1b[?1000l") # @@ -130,6 +138,7 @@ class LinuxDriver(Driver): send_size_event() self._key_thread.start() self._request_terminal_sync_mode_support() + self._enable_bracketed_paste() def _request_terminal_sync_mode_support(self): self.console.file.write("\033[?2026$p") @@ -168,6 +177,7 @@ class LinuxDriver(Driver): pass def stop_application_mode(self) -> None: + self._disable_bracketed_paste() self.disable_input() if self.attrs_before is not None: diff --git a/src/textual/drivers/windows_driver.py b/src/textual/drivers/windows_driver.py index e63aad542..fb51973ea 100644 --- a/src/textual/drivers/windows_driver.py +++ b/src/textual/drivers/windows_driver.py @@ -44,6 +44,14 @@ class WindowsDriver(Driver): write("\x1b[?1006l") self.console.file.flush() + def _enable_bracketed_paste(self) -> None: + """Enable bracketed paste mode.""" + self.console.file.write("\x1b[?2004h") + + def _disable_bracketed_paste(self) -> None: + """Disable bracketed paste mode.""" + self.console.file.write("\x1b[?2004l") + def start_application_mode(self) -> None: loop = asyncio.get_running_loop() @@ -54,6 +62,7 @@ class WindowsDriver(Driver): self._enable_mouse_support() self.console.show_cursor(False) self.console.file.write("\033[?1003h\n") + self._enable_bracketed_paste() app = active_app.get() @@ -75,6 +84,7 @@ class WindowsDriver(Driver): pass def stop_application_mode(self) -> None: + self._disable_bracketed_paste() self.disable_input() if self._restore_console: self._restore_console() diff --git a/src/textual/features.py b/src/textual/features.py index bc9b984db..3d7aa6c0a 100644 --- a/src/textual/features.py +++ b/src/textual/features.py @@ -25,10 +25,7 @@ def parse_features(features: str) -> frozenset[FeatureFlag]: """ features_set = frozenset( - set( - feature.strip().lower() - for feature in features.split(",") - if feature.strip() - ).intersection(FEATURES) - ) + feature.strip().lower() for feature in features.split(",") if feature.strip() + ).intersection(FEATURES) + return cast("frozenset[FeatureFlag]", features_set) diff --git a/src/textual/widget.py b/src/textual/widget.py index 1922d9c41..92ef7018b 100644 --- a/src/textual/widget.py +++ b/src/textual/widget.py @@ -36,7 +36,8 @@ from .dom import DOMNode from .geometry import Offset, Region, Size, Spacing, clamp from .layouts.vertical import VerticalLayout from .message import Message -from .reactive import Reactive, watch +from .reactive import Reactive +from ._timer import Timer if TYPE_CHECKING: diff --git a/tests/test_auto_refresh.py b/tests/test_auto_refresh.py new file mode 100644 index 000000000..5d211224c --- /dev/null +++ b/tests/test_auto_refresh.py @@ -0,0 +1,28 @@ +from time import time + +from textual.app import App + + +class RefreshApp(App[float]): + def __init__(self): + self.count = 0 + super().__init__() + + def on_mount(self): + self.start = time() + self.auto_refresh = 0.1 + + def _automatic_refresh(self): + self.count += 1 + if self.count == 3: + self.exit(time() - self.start) + super()._automatic_refresh() + + +def test_auto_refresh(): + app = RefreshApp() + + elapsed = app.run(quit_after=1, headless=True) + assert elapsed is not None + # CI can run slower, so we need to give this a bit of margin + assert elapsed >= 0.3 and elapsed < 0.6