tests for auto refresh

This commit is contained in:
Will McGugan
2022-08-18 10:20:03 +01:00
parent f81f4484ca
commit c717dec982
8 changed files with 82 additions and 45 deletions

View File

@@ -7,7 +7,7 @@ from textual.widget import Widget
class Clock(Widget): class Clock(Widget):
def on_mount(self): def on_mount(self):
self.styles.content_align = ("center", "middle") self.styles.content_align = ("center", "middle")
self.set_interval(1, self.refresh) self.auto_refresh = 1.0
def render(self): def render(self):
return datetime.now().strftime("%c") return datetime.now().strftime("%c")

View File

@@ -7,15 +7,15 @@ from textual.widget import Widget
class Clock(Widget): class Clock(Widget):
def on_mount(self): def on_mount(self):
self.styles.content_align = ("center", "middle") self.styles.content_align = ("center", "middle")
self.set_interval(1, self.refresh) self.auto_refresh = 1.0
def render(self): def render(self):
return datetime.now().strftime("%c") return datetime.now().strftime("%X")
class ClockApp(App): class ClockApp(App):
def on_mount(self): def compose(self):
self.mount(Clock()) yield Clock()
app = ClockApp() app = ClockApp()

View File

@@ -47,6 +47,7 @@ from ._event_broker import NoHandler, extract_handler_actions
from .binding import Bindings, NoBinding from .binding import Bindings, NoBinding
from .css.query import NoMatchingNodesError from .css.query import NoMatchingNodesError
from .css.stylesheet import Stylesheet from .css.stylesheet import Stylesheet
from .drivers.headless_driver import HeadlessDriver
from .design import ColorSystem from .design import ColorSystem
from .devtools.client import DevtoolsClient, DevtoolsConnectionError, DevtoolsLog from .devtools.client import DevtoolsClient, DevtoolsConnectionError, DevtoolsLog
from .devtools.redirect_output import StdoutRedirector from .devtools.redirect_output import StdoutRedirector
@@ -162,7 +163,7 @@ class App(Generic[ReturnType], DOMNode):
_init_uvloop() _init_uvloop()
super().__init__() super().__init__()
self.features: frozenset[FeatureFlag] = parse_features(os.getenv("TEXTUAL", "")) self.features: set[FeatureFlag] = parse_features(os.getenv("TEXTUAL", ""))
self.console = Console( self.console = Console(
file=(open(os.devnull, "wt") if self.is_headless else sys.__stdout__), file=(open(os.devnull, "wt") if self.is_headless else sys.__stdout__),
@@ -539,17 +540,23 @@ class App(Generic[ReturnType], DOMNode):
keys, action, description, show=show, key_display=key_display keys, action, description, show=show, key_display=key_display
) )
def run(self, quit_after: float | None = None) -> ReturnType | None: def run(
self, quit_after: float | None = None, headless: bool = False
) -> ReturnType | None:
"""The main entry point for apps. """The main entry point for apps.
Args: Args:
quit_after (float | None, optional): Quit after a given number of seconds, or None quit_after (float | None, optional): Quit after a given number of seconds, or None
to run forever. Defaults to None. to run forever. Defaults to None.
headless (bool, optional): Run in "headless" mode (don't write to stdout).
Returns: Returns:
ReturnType | None: _description_ ReturnType | None: _description_
""" """
if headless:
self.features.add("headless")
async def run_app() -> None: async def run_app() -> None:
if quit_after is not None: if quit_after is not None:
self.set_timer(quit_after, self.shutdown) self.set_timer(quit_after, self.shutdown)
@@ -932,29 +939,40 @@ class App(Generic[ReturnType], DOMNode):
self.set_interval(0.5, self.css_monitor, name="css monitor") self.set_interval(0.5, self.css_monitor, name="css monitor")
self.log("[b green]STARTED[/]", self.css_monitor) self.log("[b green]STARTED[/]", self.css_monitor)
process_messages = super().process_messages
async def run_process_messages():
mount_event = events.Mount(sender=self)
await self.dispatch_message(mount_event)
self.title = self._title
self.stylesheet.update(self)
self.refresh()
await self.animator.start()
await self._ready()
await process_messages()
await self.animator.stop()
await self.close_all()
self._running = True self._running = True
try: try:
load_event = events.Load(sender=self) load_event = events.Load(sender=self)
await self.dispatch_message(load_event) await self.dispatch_message(load_event)
driver = self._driver = self.driver_class(self.console, self) driver: Driver
driver.start_application_mode() if self.is_headless:
driver.enable_bracketed_paste() driver = self._driver = HeadlessDriver(self.console, self)
try: else:
with redirect_stdout(StdoutRedirector(self.devtools, self._log_file)): # type: ignore driver = self._driver = self.driver_class(self.console, self)
mount_event = events.Mount(sender=self)
await self.dispatch_message(mount_event)
self.title = self._title driver.start_application_mode()
self.stylesheet.update(self) try:
self.refresh() if self.is_headless:
await self.animator.start() await run_process_messages()
await self._ready() else:
await super().process_messages() with redirect_stdout(StdoutRedirector(self.devtools, self._log_file)): # type: ignore
await self.animator.stop() await run_process_messages()
await self.close_all()
finally: finally:
driver.disable_bracketed_paste()
driver.stop_application_mode() driver.stop_application_mode()
except Exception as error: except Exception as error:
self.on_exception(error) self.on_exception(error)

View File

@@ -20,6 +20,7 @@ from .css.parse import parse_declarations
from .css.styles import Styles, RenderStyles from .css.styles import Styles, RenderStyles
from .css.query import NoMatchingNodesError from .css.query import NoMatchingNodesError
from .message_pump import MessagePump from .message_pump import MessagePump
from ._timer import Timer
if TYPE_CHECKING: if TYPE_CHECKING:
from .app import App from .app import App
@@ -71,8 +72,30 @@ class DOMNode(MessagePump):
# A mapping of class names to Styles set in COMPONENT_CLASSES # A mapping of class names to Styles set in COMPONENT_CLASSES
self._component_styles: dict[str, RenderStyles] = {} self._component_styles: dict[str, RenderStyles] = {}
self._auto_refresh: float | None = None
self._auto_refresh_timer: Timer | None = None
super().__init__() super().__init__()
@property
def auto_refresh(self) -> float | None:
return self._auto_refresh
@auto_refresh.setter
def auto_refresh(self, interval: float | None) -> None:
if self._auto_refresh_timer is not None:
self._auto_refresh_timer.stop_no_wait()
self._auto_refresh_timer = None
if interval is not None:
self._auto_refresh_timer = self.set_interval(
interval, self._automatic_refresh
)
self._auto_refresh = interval
def _automatic_refresh(self) -> None:
"""Perform an automatic refresh (set with auto_refresh property)."""
self.refresh()
def __init_subclass__(cls, inherit_css: bool = True) -> None: def __init_subclass__(cls, inherit_css: bool = True) -> None:
super().__init_subclass__() super().__init_subclass__()
cls._inherit_css = inherit_css cls._inherit_css = inherit_css

View File

@@ -41,18 +41,6 @@ class Driver(ABC):
click_event = events.Click.from_event(event) click_event = events.Click.from_event(event)
self.send_event(click_event) self.send_event(click_event)
def enable_bracketed_paste(self) -> None:
"""Write the ANSI escape code `ESC[?2004h`, which
enables bracketed paste mode."""
self.console.file.write("\x1b[?2004h")
self.console.file.flush()
def disable_bracketed_paste(self) -> None:
"""Write the ANSI escape code `ESC[?2004l`, which
disables bracketed paste mode."""
self.console.file.write("\x1b[?2004l")
self.console.file.flush()
@abstractmethod @abstractmethod
def start_application_mode(self) -> None: def start_application_mode(self) -> None:
... ...

View File

@@ -70,6 +70,14 @@ class LinuxDriver(Driver):
# Note: E.g. lxterminal understands 1000h, but not the urxvt or sgr # Note: E.g. lxterminal understands 1000h, but not the urxvt or sgr
# extensions. # extensions.
def _enable_bracketed_paste(self) -> None:
"""Enable bracketed paste mode."""
self.console.file.write("\x1b[?2004h")
def _disable_bracketed_paste(self) -> None:
"""Disable bracketed pasgte mode."""
self.console.file.write("\x1b[?2004l")
def _disable_mouse_support(self) -> None: def _disable_mouse_support(self) -> None:
write = self.console.file.write write = self.console.file.write
write("\x1b[?1000l") # write("\x1b[?1000l") #
@@ -130,6 +138,7 @@ class LinuxDriver(Driver):
send_size_event() send_size_event()
self._key_thread.start() self._key_thread.start()
self._request_terminal_sync_mode_support() self._request_terminal_sync_mode_support()
self._enable_bracketed_paste()
def _request_terminal_sync_mode_support(self): def _request_terminal_sync_mode_support(self):
self.console.file.write("\033[?2026$p") self.console.file.write("\033[?2026$p")
@@ -168,6 +177,7 @@ class LinuxDriver(Driver):
pass pass
def stop_application_mode(self) -> None: def stop_application_mode(self) -> None:
self._disable_bracketed_paste()
self.disable_input() self.disable_input()
if self.attrs_before is not None: if self.attrs_before is not None:

View File

@@ -14,7 +14,7 @@ FEATURES: Final = {"devtools", "debug", "headless"}
FeatureFlag = Literal["devtools", "debug", "headless"] FeatureFlag = Literal["devtools", "debug", "headless"]
def parse_features(features: str) -> frozenset[FeatureFlag]: def parse_features(features: str) -> set[FeatureFlag]:
"""Parse features env var """Parse features env var
Args: Args:
@@ -24,11 +24,8 @@ def parse_features(features: str) -> frozenset[FeatureFlag]:
frozenset[FeatureFlag]: A frozen set of known features. frozenset[FeatureFlag]: A frozen set of known features.
""" """
features_set = frozenset( features_set = set(
set( feature.strip().lower() for feature in features.split(",") if feature.strip()
feature.strip().lower() ).intersection(FEATURES)
for feature in features.split(",")
if feature.strip() return cast("set[FeatureFlag]", features_set)
).intersection(FEATURES)
)
return cast("frozenset[FeatureFlag]", features_set)

View File

@@ -36,7 +36,8 @@ from .dom import DOMNode
from .geometry import Offset, Region, Size, Spacing, clamp from .geometry import Offset, Region, Size, Spacing, clamp
from .layouts.vertical import VerticalLayout from .layouts.vertical import VerticalLayout
from .message import Message from .message import Message
from .reactive import Reactive, watch from .reactive import Reactive
from ._timer import Timer
if TYPE_CHECKING: if TYPE_CHECKING: