Merge branch 'auto-refresh' into docs-intro

This commit is contained in:
Will McGugan
2022-08-18 13:35:25 +01:00
11 changed files with 138 additions and 42 deletions

View File

@@ -7,15 +7,15 @@ from textual.widget import Widget
class Clock(Widget): class Clock(Widget):
def on_mount(self): def on_mount(self):
self.styles.content_align = ("center", "middle") self.styles.content_align = ("center", "middle")
self.set_interval(1, self.refresh) self.auto_refresh = 1.0
def render(self): def render(self):
return datetime.now().strftime("%c") return datetime.now().strftime("%X")
class ClockApp(App): class ClockApp(App):
def on_mount(self): def compose(self):
self.mount(Clock()) yield Clock()
app = ClockApp() app = ClockApp()

View File

@@ -7,7 +7,7 @@ from textual.widget import Widget
class Clock(Widget): class Clock(Widget):
def on_mount(self): def on_mount(self):
self.styles.content_align = ("center", "middle") self.styles.content_align = ("center", "middle")
self.set_interval(1, self.refresh) self.auto_refresh = 1.0
def render(self): def render(self):
return datetime.now().strftime("%c") return datetime.now().strftime("%c")

View File

@@ -20,6 +20,7 @@ from typing import (
TextIO, TextIO,
Type, Type,
TypeVar, TypeVar,
cast,
) )
from weakref import WeakSet, WeakValueDictionary from weakref import WeakSet, WeakValueDictionary
@@ -52,6 +53,7 @@ from .devtools.client import DevtoolsClient, DevtoolsConnectionError, DevtoolsLo
from .devtools.redirect_output import StdoutRedirector from .devtools.redirect_output import StdoutRedirector
from .dom import DOMNode from .dom import DOMNode
from .driver import Driver from .driver import Driver
from .drivers.headless_driver import HeadlessDriver
from .features import FeatureFlag, parse_features from .features import FeatureFlag, parse_features
from .file_monitor import FileMonitor from .file_monitor import FileMonitor
from .geometry import Offset, Region, Size from .geometry import Offset, Region, Size
@@ -539,17 +541,25 @@ class App(Generic[ReturnType], DOMNode):
keys, action, description, show=show, key_display=key_display keys, action, description, show=show, key_display=key_display
) )
def run(self, quit_after: float | None = None) -> ReturnType | None: def run(
self, quit_after: float | None = None, headless: bool = False
) -> ReturnType | None:
"""The main entry point for apps. """The main entry point for apps.
Args: Args:
quit_after (float | None, optional): Quit after a given number of seconds, or None quit_after (float | None, optional): Quit after a given number of seconds, or None
to run forever. Defaults to None. to run forever. Defaults to None.
headless (bool, optional): Run in "headless" mode (don't write to stdout).
Returns: Returns:
ReturnType | None: _description_ ReturnType | None: The return value specified in `App.exit` or None if exit wasn't called.
""" """
if headless:
self.features = cast(
"frozenset[FeatureFlag]", self.features.union({"headless"})
)
async def run_app() -> None: async def run_app() -> None:
if quit_after is not None: if quit_after is not None:
self.set_timer(quit_after, self.shutdown) self.set_timer(quit_after, self.shutdown)
@@ -932,29 +942,41 @@ class App(Generic[ReturnType], DOMNode):
self.set_interval(0.5, self.css_monitor, name="css monitor") self.set_interval(0.5, self.css_monitor, name="css monitor")
self.log("[b green]STARTED[/]", self.css_monitor) self.log("[b green]STARTED[/]", self.css_monitor)
process_messages = super().process_messages
async def run_process_messages():
mount_event = events.Mount(sender=self)
await self.dispatch_message(mount_event)
self.title = self._title
self.stylesheet.update(self)
self.refresh()
await self.animator.start()
await self._ready()
await process_messages()
await self.animator.stop()
await self.close_all()
self._running = True self._running = True
try: try:
load_event = events.Load(sender=self) load_event = events.Load(sender=self)
await self.dispatch_message(load_event) await self.dispatch_message(load_event)
driver = self._driver = self.driver_class(self.console, self) driver: Driver
driver.start_application_mode() driver_class = cast(
driver.enable_bracketed_paste() "type[Driver]",
try: HeadlessDriver if self.is_headless else self.driver_class,
with redirect_stdout(StdoutRedirector(self.devtools, self._log_file)): # type: ignore )
mount_event = events.Mount(sender=self) driver = self._driver = driver_class(self.console, self)
await self.dispatch_message(mount_event)
self.title = self._title driver.start_application_mode()
self.stylesheet.update(self) try:
self.refresh() if self.is_headless:
await self.animator.start() await run_process_messages()
await self._ready() else:
await super().process_messages() with redirect_stdout(StdoutRedirector(self.devtools, self._log_file)): # type: ignore
await self.animator.stop() await run_process_messages()
await self.close_all()
finally: finally:
driver.disable_bracketed_paste()
driver.stop_application_mode() driver.stop_application_mode()
except Exception as error: except Exception as error:
self.on_exception(error) self.on_exception(error)

View File

@@ -20,6 +20,7 @@ from .css.parse import parse_declarations
from .css.styles import Styles, RenderStyles from .css.styles import Styles, RenderStyles
from .css.query import NoMatchingNodesError from .css.query import NoMatchingNodesError
from .message_pump import MessagePump from .message_pump import MessagePump
from ._timer import Timer
if TYPE_CHECKING: if TYPE_CHECKING:
from .app import App from .app import App
@@ -71,8 +72,30 @@ class DOMNode(MessagePump):
# A mapping of class names to Styles set in COMPONENT_CLASSES # A mapping of class names to Styles set in COMPONENT_CLASSES
self._component_styles: dict[str, RenderStyles] = {} self._component_styles: dict[str, RenderStyles] = {}
self._auto_refresh: float | None = None
self._auto_refresh_timer: Timer | None = None
super().__init__() super().__init__()
@property
def auto_refresh(self) -> float | None:
return self._auto_refresh
@auto_refresh.setter
def auto_refresh(self, interval: float | None) -> None:
if self._auto_refresh_timer is not None:
self._auto_refresh_timer.stop_no_wait()
self._auto_refresh_timer = None
if interval is not None:
self._auto_refresh_timer = self.set_interval(
interval, self._automatic_refresh
)
self._auto_refresh = interval
def _automatic_refresh(self) -> None:
"""Perform an automatic refresh (set with auto_refresh property)."""
self.refresh()
def __init_subclass__(cls, inherit_css: bool = True) -> None: def __init_subclass__(cls, inherit_css: bool = True) -> None:
super().__init_subclass__() super().__init_subclass__()
cls._inherit_css = inherit_css cls._inherit_css = inherit_css

View File

@@ -41,18 +41,6 @@ class Driver(ABC):
click_event = events.Click.from_event(event) click_event = events.Click.from_event(event)
self.send_event(click_event) self.send_event(click_event)
def enable_bracketed_paste(self) -> None:
"""Write the ANSI escape code `ESC[?2004h`, which
enables bracketed paste mode."""
self.console.file.write("\x1b[?2004h")
self.console.file.flush()
def disable_bracketed_paste(self) -> None:
"""Write the ANSI escape code `ESC[?2004l`, which
disables bracketed paste mode."""
self.console.file.write("\x1b[?2004l")
self.console.file.flush()
@abstractmethod @abstractmethod
def start_application_mode(self) -> None: def start_application_mode(self) -> None:
... ...

View File

@@ -0,0 +1,17 @@
from __future__ import annotations
from ..driver import Driver
class HeadlessDriver(Driver):
"""A do-nothing driver for testing."""
def start_application_mode(self) -> None:
pass
def disable_input(self) -> None:
pass
def stop_application_mode(self) -> None:
pass

View File

@@ -70,6 +70,14 @@ class LinuxDriver(Driver):
# Note: E.g. lxterminal understands 1000h, but not the urxvt or sgr # Note: E.g. lxterminal understands 1000h, but not the urxvt or sgr
# extensions. # extensions.
def _enable_bracketed_paste(self) -> None:
"""Enable bracketed paste mode."""
self.console.file.write("\x1b[?2004h")
def _disable_bracketed_paste(self) -> None:
"""Disable bracketed paste mode."""
self.console.file.write("\x1b[?2004l")
def _disable_mouse_support(self) -> None: def _disable_mouse_support(self) -> None:
write = self.console.file.write write = self.console.file.write
write("\x1b[?1000l") # write("\x1b[?1000l") #
@@ -130,6 +138,7 @@ class LinuxDriver(Driver):
send_size_event() send_size_event()
self._key_thread.start() self._key_thread.start()
self._request_terminal_sync_mode_support() self._request_terminal_sync_mode_support()
self._enable_bracketed_paste()
def _request_terminal_sync_mode_support(self): def _request_terminal_sync_mode_support(self):
self.console.file.write("\033[?2026$p") self.console.file.write("\033[?2026$p")
@@ -168,6 +177,7 @@ class LinuxDriver(Driver):
pass pass
def stop_application_mode(self) -> None: def stop_application_mode(self) -> None:
self._disable_bracketed_paste()
self.disable_input() self.disable_input()
if self.attrs_before is not None: if self.attrs_before is not None:

View File

@@ -44,6 +44,14 @@ class WindowsDriver(Driver):
write("\x1b[?1006l") write("\x1b[?1006l")
self.console.file.flush() self.console.file.flush()
def _enable_bracketed_paste(self) -> None:
"""Enable bracketed paste mode."""
self.console.file.write("\x1b[?2004h")
def _disable_bracketed_paste(self) -> None:
"""Disable bracketed paste mode."""
self.console.file.write("\x1b[?2004l")
def start_application_mode(self) -> None: def start_application_mode(self) -> None:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
@@ -54,6 +62,7 @@ class WindowsDriver(Driver):
self._enable_mouse_support() self._enable_mouse_support()
self.console.show_cursor(False) self.console.show_cursor(False)
self.console.file.write("\033[?1003h\n") self.console.file.write("\033[?1003h\n")
self._enable_bracketed_paste()
app = active_app.get() app = active_app.get()
@@ -75,6 +84,7 @@ class WindowsDriver(Driver):
pass pass
def stop_application_mode(self) -> None: def stop_application_mode(self) -> None:
self._disable_bracketed_paste()
self.disable_input() self.disable_input()
if self._restore_console: if self._restore_console:
self._restore_console() self._restore_console()

View File

@@ -25,10 +25,7 @@ def parse_features(features: str) -> frozenset[FeatureFlag]:
""" """
features_set = frozenset( features_set = frozenset(
set( feature.strip().lower() for feature in features.split(",") if feature.strip()
feature.strip().lower() ).intersection(FEATURES)
for feature in features.split(",")
if feature.strip()
).intersection(FEATURES)
)
return cast("frozenset[FeatureFlag]", features_set) return cast("frozenset[FeatureFlag]", features_set)

View File

@@ -36,7 +36,8 @@ from .dom import DOMNode
from .geometry import Offset, Region, Size, Spacing, clamp from .geometry import Offset, Region, Size, Spacing, clamp
from .layouts.vertical import VerticalLayout from .layouts.vertical import VerticalLayout
from .message import Message from .message import Message
from .reactive import Reactive, watch from .reactive import Reactive
from ._timer import Timer
if TYPE_CHECKING: if TYPE_CHECKING:

View File

@@ -0,0 +1,28 @@
from time import time
from textual.app import App
class RefreshApp(App[float]):
def __init__(self):
self.count = 0
super().__init__()
def on_mount(self):
self.start = time()
self.auto_refresh = 0.1
def _automatic_refresh(self):
self.count += 1
if self.count == 3:
self.exit(time() - self.start)
super()._automatic_refresh()
def test_auto_refresh():
app = RefreshApp()
elapsed = app.run(quit_after=1, headless=True)
assert elapsed is not None
# CI can run slower, so we need to give this a bit of margin
assert elapsed >= 0.3 and elapsed < 0.6