diff --git a/pyproject.toml b/pyproject.toml index fb8460d6b..896c16292 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ includes = "src" asyncio_mode = "auto" testpaths = ["tests"] markers = [ - "integration_test: marks tests as slow integration tests(deselect with '-m \"not integration_test\"')", + "integration_test: marks tests as slow integration tests (deselect with '-m \"not integration_test\"')", ] [build-system] diff --git a/sandbox/basic.css b/sandbox/basic.css index 3f78e4bca..cd32cce1b 100644 --- a/sandbox/basic.css +++ b/sandbox/basic.css @@ -84,7 +84,7 @@ Tweet { /* border: outer $primary; */ padding: 1; border: wide $panel-darken-2; - overflow-y: auto; + overflow: auto; /* scrollbar-gutter: stable; */ align-horizontal: center; box-sizing: border-box; @@ -114,10 +114,10 @@ TweetHeader { } TweetBody { - width: 100w; + width: 130%; background: $panel; color: $text-panel; - height: auto; + height: auto; padding: 0 1 0 0; } diff --git a/sandbox/scroll_to_widget.py b/sandbox/scroll_to_widget.py index 81b0bf83c..5e75847cd 100644 --- a/sandbox/scroll_to_widget.py +++ b/sandbox/scroll_to_widget.py @@ -35,7 +35,7 @@ class Introduction(Widget): } """ - def render(self) -> RenderableType: + def render(self, styles) -> RenderableType: return Text( "Press keys 0 to 9 to scroll to the Placeholder with that ID.", justify="center", diff --git a/src/textual/_animator.py b/src/textual/_animator.py index c03378953..dbc5d9670 100644 --- a/src/textual/_animator.py +++ b/src/textual/_animator.py @@ -3,14 +3,12 @@ from __future__ import annotations from abc import ABC, abstractmethod import asyncio import sys -from time import monotonic from typing import Any, Callable, TypeVar from dataclasses import dataclass -from . import log +from . import _clock from ._easing import DEFAULT_EASING, EASING -from ._profile import timer from ._timer import Timer from ._types import MessageTarget @@ -139,10 +137,6 @@ class Animator: pause=True, ) - def get_time(self) -> float: - """Get the current wall clock time.""" - return monotonic() - async def start(self) -> None: """Start the animator task.""" @@ -186,10 +180,13 @@ class Animator: raise AttributeError( f"Can't animate attribute {attribute!r} on {obj!r}; attribute does not exist" ) + assert (duration is not None and speed is None) or ( + duration is None and speed is not None + ), "An Animation should have a duration OR a speed" if final_value is ...: final_value = value - start_time = self.get_time() + start_time = self._get_time() animation_key = (id(obj), attribute) @@ -240,9 +237,15 @@ class Animator: if not self._animations: self._timer.pause() else: - animation_time = self.get_time() + animation_time = self._get_time() animation_keys = list(self._animations.keys()) for animation_key in animation_keys: animation = self._animations[animation_key] if animation(animation_time): del self._animations[animation_key] + + def _get_time(self) -> float: + """Get the current wall clock time, via the internal Timer.""" + # N.B. We could remove this method and always call `self._timer.get_time()` internally, + # but it's handy to have in mocking situations + return _clock.get_time_no_wait() diff --git a/src/textual/_clock.py b/src/textual/_clock.py new file mode 100644 index 000000000..4e1ac2224 --- /dev/null +++ b/src/textual/_clock.py @@ -0,0 +1,58 @@ +import asyncio +from time import monotonic + + +""" +A module that serves as the single source of truth for everything time-related in a Textual app. +Having this logic centralised makes it easier to simulate time in integration tests, +by mocking the few functions exposed by this module. +""" + + +# N.B. This class and its singleton instance have to be hidden APIs because we want to be able to mock time, +# even for Python modules that imported functions such as `get_time` *before* we mocked this internal _Clock. +# (so mocking public APIs such as `get_time` wouldn't affect direct references to then that were done during imports) +class _Clock: + async def get_time(self) -> float: + return self.get_time_no_wait() + + def get_time_no_wait(self) -> float: + return monotonic() + + async def sleep(self, seconds: float) -> None: + await asyncio.sleep(seconds) + + +# That's our target for mocking time! :-) +_clock = _Clock() + + +def get_time_no_wait() -> float: + """ + Get the current wall clock time. + + Returns: + float: the value (in fractional seconds) of a monotonic clock, i.e. a clock that cannot go backwards. + """ + return _clock.get_time_no_wait() + + +async def get_time() -> float: + """ + Asynchronous version of `get_time`. Useful in situations where we want asyncio to be + able to "do things" elsewhere right before we fetch the time. + + Returns: + float: the value (in fractional seconds) of a monotonic clock, i.e. a clock that cannot go backwards. + """ + return await _clock.get_time() + + +async def sleep(seconds: float) -> None: + """ + Coroutine that completes after a given time (in seconds). + + Args: + seconds (float): the duration we should wait for before unblocking the awaiter + """ + return await _clock.sleep(seconds) diff --git a/src/textual/_context.py b/src/textual/_context.py index e16817631..04b264d33 100644 --- a/src/textual/_context.py +++ b/src/textual/_context.py @@ -5,4 +5,9 @@ from contextvars import ContextVar if TYPE_CHECKING: from .app import App + +class NoActiveAppError(RuntimeError): + pass + + active_app: ContextVar["App"] = ContextVar("active_app") diff --git a/src/textual/_timer.py b/src/textual/_timer.py index ef4d41a2b..3231fe016 100644 --- a/src/textual/_timer.py +++ b/src/textual/_timer.py @@ -5,17 +5,15 @@ import weakref from asyncio import ( CancelledError, Event, - sleep, Task, ) -from functools import partial -from time import monotonic from typing import Awaitable, Callable, Union from rich.repr import Result, rich_repr from . import events from ._callback import invoke +from . import _clock from ._types import MessageTarget TimerCallback = Union[Callable[[], Awaitable[None]], Callable[[], None]] @@ -109,32 +107,38 @@ class Timer: count = 0 _repeat = self._repeat _interval = self._interval - start = monotonic() + start = _clock.get_time_no_wait() try: while _repeat is None or count <= _repeat: next_timer = start + ((count + 1) * _interval) - if self._skip and next_timer < monotonic(): + now = await _clock.get_time() + if self._skip and next_timer < now: count += 1 continue - wait_time = max(0, next_timer - monotonic()) + now = await _clock.get_time() + wait_time = max(0, next_timer - now) if wait_time: - await sleep(wait_time) - event = events.Timer( - self.sender, - timer=self, - time=next_timer, - count=count, - callback=self._callback, - ) + await _clock.sleep(wait_time) count += 1 try: - if self._callback is not None: - await invoke(self._callback) - else: - await self.target.post_priority_message(event) - + await self._tick(next_timer=next_timer, count=count) except EventTargetGone: break await self._active.wait() except CancelledError: pass + + async def _tick(self, *, next_timer: float, count: int) -> None: + """Triggers the Timer's action: either call its callback, or sends an event to its target""" + if self._callback is not None: + await invoke(self._callback) + else: + event = events.Timer( + self.sender, + timer=self, + time=next_timer, + count=count, + callback=self._callback, + ) + + await self.target.post_priority_message(event) diff --git a/src/textual/box_model.py b/src/textual/box_model.py index f239123fa..c501ff680 100644 --- a/src/textual/box_model.py +++ b/src/textual/box_model.py @@ -96,8 +96,8 @@ def get_box_model( content_height = max(1, content_height) # Get box dimensions by adding gutter - width = content_width + gutter.width - height = content_height + gutter.height - model = BoxModel(Size(width, height), margin) + size = Size(content_width, content_height) + gutter.totals + + model = BoxModel(size, margin) return model diff --git a/src/textual/dom.py b/src/textual/dom.py index 760bc3def..700534398 100644 --- a/src/textual/dom.py +++ b/src/textual/dom.py @@ -9,6 +9,7 @@ from rich.style import Style from rich.text import Text from rich.tree import Tree +from ._context import NoActiveAppError from ._node_list import NodeList from .color import Color from .css._error_tools import friendly_list @@ -463,7 +464,7 @@ class DOMNode(MessagePump): try: self.app.stylesheet.update(self.app, animate=True) self.refresh() - except LookupError: + except NoActiveAppError: pass def remove_class(self, *class_names: str) -> None: @@ -477,7 +478,7 @@ class DOMNode(MessagePump): try: self.app.stylesheet.update(self.app, animate=True) self.refresh() - except LookupError: + except NoActiveAppError: pass def toggle_class(self, *class_names: str) -> None: @@ -491,7 +492,7 @@ class DOMNode(MessagePump): try: self.app.stylesheet.update(self.app, animate=True) self.refresh() - except LookupError: + except NoActiveAppError: pass def has_pseudo_class(self, *class_names: str) -> bool: diff --git a/src/textual/message.py b/src/textual/message.py index 21a35c735..13bd9c537 100644 --- a/src/textual/message.py +++ b/src/textual/message.py @@ -1,10 +1,10 @@ from __future__ import annotations -from time import monotonic from typing import ClassVar import rich.repr +from . import _clock from .case import camel_to_snake from ._types import MessageTarget @@ -39,7 +39,7 @@ class Message: self.sender = sender self.name = camel_to_snake(self.__class__.__name__.replace("Message", "")) - self.time = monotonic() + self.time = _clock.get_time_no_wait() self._forwarded = False self._no_default_action = False self._stop_propagation = False diff --git a/src/textual/message_pump.py b/src/textual/message_pump.py index 02d533e4c..0c04dbbcd 100644 --- a/src/textual/message_pump.py +++ b/src/textual/message_pump.py @@ -12,7 +12,7 @@ from . import events from . import log from ._timer import Timer, TimerCallback from ._callback import invoke -from ._context import active_app +from ._context import active_app, NoActiveAppError from .message import Message from . import messages @@ -74,8 +74,16 @@ class MessagePump: @property def app(self) -> "App": - """Get the current app.""" - return active_app.get() + """ + Get the current app. + + Raises: + NoActiveAppError: if no active app could be found for the current asyncio context + """ + try: + return active_app.get() + except LookupError: + raise NoActiveAppError() @property def is_parent_active(self): @@ -152,7 +160,13 @@ class MessagePump: pause: bool = False, ) -> Timer: timer = Timer( - self, delay, self, name=name, callback=callback, repeat=0, pause=pause + self, + delay, + self, + name=name or f"set_timer#{Timer._timer_count}", + callback=callback, + repeat=0, + pause=pause, ) self._child_tasks.add(timer.start()) return timer @@ -170,7 +184,7 @@ class MessagePump: self, interval, self, - name=name, + name=name or f"set_interval#{Timer._timer_count}", callback=callback, repeat=repeat or None, pause=pause, diff --git a/src/textual/screen.py b/src/textual/screen.py index e784d3a9b..6f9725647 100644 --- a/src/textual/screen.py +++ b/src/textual/screen.py @@ -1,5 +1,7 @@ from __future__ import annotations +import sys + from rich.console import RenderableType import rich.repr from rich.style import Style @@ -12,6 +14,14 @@ from ._compositor import Compositor, MapGeometry from .reactive import Reactive from .widget import Widget +if sys.version_info >= (3, 8): + from typing import Final +else: + from typing_extensions import Final + +# Screen updates will be batched so that they don't happen more often than 20 times per second: +UPDATE_PERIOD: Final = 1 / 20 + @rich.repr.auto class Screen(Widget): @@ -158,7 +168,9 @@ class Screen(Widget): self.check_idle() def on_mount(self, event: events.Mount) -> None: - self._update_timer = self.set_interval(1 / 20, self._on_update, pause=True) + self._update_timer = self.set_interval( + UPDATE_PERIOD, self._on_update, name="screen_update", pause=True + ) async def on_resize(self, event: events.Resize) -> None: self.size_updated(event.size, event.virtual_size, event.container_size) diff --git a/src/textual/widget.py b/src/textual/widget.py index 9afb55831..11d17666c 100644 --- a/src/textual/widget.py +++ b/src/textual/widget.py @@ -184,7 +184,10 @@ class Widget(DOMNode): int: The optimal width of the content. """ if self.is_container: - return self.layout.get_content_width(self, container, viewport) + return ( + self.layout.get_content_width(self, container, viewport) + + self.scrollbar_width + ) cache_key = container.width if self._content_width_cache[0] == cache_key: @@ -214,11 +217,14 @@ class Widget(DOMNode): """ if self.is_container: assert self.layout is not None - height = self.layout.get_content_height( - self, - container, - viewport, - width, + height = ( + self.layout.get_content_height( + self, + container, + viewport, + width, + ) + + self.scrollbar_height ) else: cache_key = width @@ -255,11 +261,21 @@ class Widget(DOMNode): @property def max_scroll_x(self) -> float: - return max(0, self.virtual_size.width - self.container_size.width) + """The maximum value of `scroll_x`.""" + return max( + 0, + self.virtual_size.width - self.container_size.width + self.scrollbar_width, + ) @property def max_scroll_y(self) -> float: - return max(0, self.virtual_size.height - self.container_size.height) + """The maximum value of `scroll_y`.""" + return max( + 0, + self.virtual_size.height + - self.container_size.height + + self.scrollbar_height, + ) @property def vertical_scrollbar(self) -> ScrollBar: @@ -335,12 +351,27 @@ class Widget(DOMNode): tuple[bool, bool]: A tuple of (, ) """ - if self.layout is None: + if not self.is_container: return False, False enabled = self.show_vertical_scrollbar, self.show_horizontal_scrollbar return enabled + @property + def scrollbar_dimensions(self) -> tuple[int, int]: + """Get the size of any scrollbars on the widget""" + return (int(self.show_horizontal_scrollbar), int(self.show_vertical_scrollbar)) + + @property + def scrollbar_width(self) -> int: + """Get the width used by the *vertical* scrollbar.""" + return int(self.show_vertical_scrollbar) + + @property + def scrollbar_height(self) -> int: + """Get the height used by the *horizontal* scrollbar.""" + return int(self.show_horizontal_scrollbar) + def set_dirty(self) -> None: """Set the Widget as 'dirty' (requiring re-render).""" self._dirty_regions.clear() @@ -366,7 +397,6 @@ class Widget(DOMNode): bool: True if the scroll position changed, otherwise False. """ scrolled_x = scrolled_y = False - if animate: # TODO: configure animation speed if x is not None: @@ -915,8 +945,6 @@ class Widget(DOMNode): def on_descendant_focus(self, event: events.DescendantFocus) -> None: self.descendant_has_focus = True - if self.is_container and isinstance(event.sender, Widget): - self.scroll_to_widget(event.sender, animate=True) def on_descendant_blur(self, event: events.DescendantBlur) -> None: self.descendant_has_focus = False diff --git a/tests/test_animator.py b/tests/test_animator.py index 76caf31dd..fd3f7c038 100644 --- a/tests/test_animator.py +++ b/tests/test_animator.py @@ -177,12 +177,12 @@ class MockAnimator(Animator): self._time = 0.0 self._on_animation_frame_called = False - def get_time(self): - return self._time - def on_animation_frame(self): self._on_animation_frame_called = True + def _get_time(self): + return self._time + def test_animator(): @@ -245,10 +245,3 @@ def test_bound_animator(): easing=EASING[DEFAULT_EASING], ) assert animator._animations[(id(animate_test), "foo")] == expected - - -def test_animator_get_time(): - target = Mock() - animator = Animator(target) - assert isinstance(animator.get_time(), float) - assert animator.get_time() <= animator.get_time() diff --git a/tests/test_integration_layout.py b/tests/test_integration_layout.py index 4dc5d01c0..7fde168fa 100644 --- a/tests/test_integration_layout.py +++ b/tests/test_integration_layout.py @@ -1,5 +1,4 @@ from __future__ import annotations -import asyncio from typing import cast, List import pytest @@ -107,7 +106,6 @@ async def test_composition_of_vertical_container_with_children( expected_placeholders_size: tuple[int, int], expected_root_widget_virtual_size: tuple[int, int], expected_placeholders_offset_x: int, - event_loop: asyncio.AbstractEventLoop, ): class VerticalContainer(Widget): CSS = ( diff --git a/tests/test_integration_scrolling.py b/tests/test_integration_scrolling.py index 3ee0cab31..038270954 100644 --- a/tests/test_integration_scrolling.py +++ b/tests/test_integration_scrolling.py @@ -1,17 +1,8 @@ from __future__ import annotations - -import sys from typing import Sequence, cast -if sys.version_info >= (3, 8): - from typing import Literal -else: - from typing_extensions import Literal # pragma: no cover - - import pytest -from sandbox.vertical_container import VerticalContainer from tests.utilities.test_app import AppTest from textual.app import ComposeResult from textual.geometry import Size @@ -21,7 +12,6 @@ from textual.widgets import Placeholder SCREEN_SIZE = Size(100, 30) -@pytest.mark.skip("flaky test") @pytest.mark.asyncio @pytest.mark.integration_test # this is a slow test, we may want to skip them in some contexts @pytest.mark.parametrize( @@ -32,23 +22,19 @@ SCREEN_SIZE = Size(100, 30) "scroll_to_animate", "waiting_duration", "last_screen_expected_placeholder_ids", - "last_screen_expected_out_of_viewport_placeholder_ids", ), ( - [SCREEN_SIZE, 10, None, None, 0.01, (0, 1, 2, 3, 4), "others"], - [SCREEN_SIZE, 10, "placeholder_3", False, 0.01, (0, 1, 2, 3, 4), "others"], - [SCREEN_SIZE, 10, "placeholder_5", False, 0.01, (1, 2, 3, 4, 5), "others"], - [SCREEN_SIZE, 10, "placeholder_7", False, 0.01, (3, 4, 5, 6, 7), "others"], - [SCREEN_SIZE, 10, "placeholder_9", False, 0.01, (5, 6, 7, 8, 9), "others"], + [SCREEN_SIZE, 10, None, None, 0.01, (0, 1, 2, 3, 4)], + [SCREEN_SIZE, 10, "placeholder_3", False, 0.01, (0, 1, 2, 3, 4)], + [SCREEN_SIZE, 10, "placeholder_5", False, 0.01, (1, 2, 3, 4, 5)], + [SCREEN_SIZE, 10, "placeholder_7", False, 0.01, (3, 4, 5, 6, 7)], + [SCREEN_SIZE, 10, "placeholder_9", False, 0.01, (5, 6, 7, 8, 9)], # N.B. Scroll duration is hard-coded to 0.2 in the `scroll_to_widget` method atm # Waiting for this duration should allow us to see the scroll finished: - [SCREEN_SIZE, 10, "placeholder_9", True, 0.21, (5, 6, 7, 8, 9), "others"], + [SCREEN_SIZE, 10, "placeholder_9", True, 0.21, (5, 6, 7, 8, 9)], # After having waited for approximately half of the scrolling duration, we should # see the middle Placeholders as we're scrolling towards the last of them. - # The state of the screen at this "halfway there" timing looks to not be deterministic though, - # depending on the environment - so let's only assert stuff for the middle placeholders - # and the first and last ones, but without being too specific about the others: - [SCREEN_SIZE, 10, "placeholder_9", True, 0.1, (5, 6, 7), (1, 2, 9)], + [SCREEN_SIZE, 10, "placeholder_9", True, 0.1, (4, 5, 6, 7, 8)], ), ) async def test_scroll_to_widget( @@ -58,9 +44,19 @@ async def test_scroll_to_widget( scroll_to_placeholder_id: str | None, waiting_duration: float | None, last_screen_expected_placeholder_ids: Sequence[int], - last_screen_expected_out_of_viewport_placeholder_ids: Sequence[int] - | Literal["others"], ): + class VerticalContainer(Widget): + CSS = """ + VerticalContainer { + layout: vertical; + overflow: hidden auto; + } + VerticalContainer Placeholder { + margin: 1 0; + height: 5; + } + """ + class MyTestApp(AppTest): CSS = """ Placeholder { @@ -78,7 +74,7 @@ async def test_scroll_to_widget( app = MyTestApp(size=screen_size, test_name="scroll_to_widget") - async with app.in_running_state(waiting_duration_post_yield=waiting_duration or 0): + async with app.in_running_state(waiting_duration_after_yield=waiting_duration or 0): if scroll_to_placeholder_id: target_widget_container = cast(Widget, app.query("#root").first()) target_widget = cast( @@ -97,21 +93,21 @@ async def test_scroll_to_widget( # Let's start by checking placeholders that should be visible: for placeholder_id in last_screen_expected_placeholder_ids: - assert ( - placeholders_visibility_by_id[placeholder_id] is True - ), f"Placeholder '{placeholder_id}' should be visible but isn't" + assert placeholders_visibility_by_id[placeholder_id] is True, ( + f"Placeholder '{placeholder_id}' should be visible but isn't" + f" :: placeholders_visibility_by_id={placeholders_visibility_by_id}" + ) # Ok, now for placeholders that should *not* be visible: - if last_screen_expected_out_of_viewport_placeholder_ids == "others": - # We're simply going to check that all the placeholders that are not in - # `last_screen_expected_placeholder_ids` are not on the screen: - last_screen_expected_out_of_viewport_placeholder_ids = sorted( - tuple( - set(range(placeholders_count)) - - set(last_screen_expected_placeholder_ids) - ) + # We're simply going to check that all the placeholders that are not in + # `last_screen_expected_placeholder_ids` are not on the screen: + last_screen_expected_out_of_viewport_placeholder_ids = sorted( + tuple( + set(range(placeholders_count)) - set(last_screen_expected_placeholder_ids) ) + ) for placeholder_id in last_screen_expected_out_of_viewport_placeholder_ids: - assert ( - placeholders_visibility_by_id[placeholder_id] is False - ), f"Placeholder '{placeholder_id}' should not be visible but is" + assert placeholders_visibility_by_id[placeholder_id] is False, ( + f"Placeholder '{placeholder_id}' should not be visible but is" + f" :: placeholders_visibility_by_id={placeholders_visibility_by_id}" + ) diff --git a/tests/utilities/test_app.py b/tests/utilities/test_app.py index 802578842..ec4c0bdfc 100644 --- a/tests/utilities/test_app.py +++ b/tests/utilities/test_app.py @@ -3,16 +3,20 @@ from __future__ import annotations import asyncio import contextlib import io +from math import ceil from pathlib import Path -from typing import AsyncContextManager, cast +from time import monotonic +from typing import AsyncContextManager, cast, ContextManager +from unittest import mock from rich.console import Console from textual import events, errors -from textual.app import App, ReturnType, ComposeResult +from textual._clock import _Clock +from textual.app import App, ComposeResult, WINDOWS +from textual._context import active_app from textual.driver import Driver -from textual.geometry import Size - +from textual.geometry import Size, Region # N.B. These classes would better be named TestApp/TestConsole/TestDriver/etc, # but it makes pytest emit warning as it will try to collect them as classes containing test cases :-/ @@ -61,33 +65,42 @@ class AppTest(App): def in_running_state( self, *, - waiting_duration_after_initialisation: float = 0.1, - waiting_duration_post_yield: float = 0, - ) -> AsyncContextManager: + time_mocking_ticks_granularity_fps: int = 60, # i.e. when moving forward by 1 second we'll do it though 60 ticks + waiting_duration_after_initialisation: float = 1, + waiting_duration_after_yield: float = 0, + ) -> AsyncContextManager[ClockMock]: async def run_app() -> None: await self.process_messages() @contextlib.asynccontextmanager async def get_running_state_context_manager(): - self._set_active() - run_task = asyncio.create_task(run_app()) - timeout_before_yielding_task = asyncio.create_task( - asyncio.sleep(waiting_duration_after_initialisation) - ) - done, pending = await asyncio.wait( - ( - run_task, - timeout_before_yielding_task, - ), - return_when=asyncio.FIRST_COMPLETED, - ) - if run_task in done or run_task not in pending: - raise RuntimeError( - "TestApp is no longer running after its initialization period" - ) - yield - if waiting_duration_post_yield: - await asyncio.sleep(waiting_duration_post_yield) + with mock_textual_timers( + ticks_granularity_fps=time_mocking_ticks_granularity_fps + ) as clock_mock: + run_task = asyncio.create_task(run_app()) + + # We have to do this because `run_app()` is running in its own async task, and our test is going to + # run in this one - so the app must also be the active App in our current context: + self._set_active() + + await clock_mock.advance_clock(waiting_duration_after_initialisation) + # make sure the App has entered its main loop at this stage: + assert self._driver is not None + + await self.force_full_screen_update() + + # And now it's time to pass the torch on to the test function! + # We provide the `move_clock_forward` function to it, + # so it can also do some time-based Textual stuff if it needs to: + yield clock_mock + + await clock_mock.advance_clock(waiting_duration_after_yield) + + # Make sure our screen is up-to-date before exiting the context manager, + # so tests using our `last_display_capture` for example can assert things on a fully refreshed screen: + await self.force_full_screen_update() + + # End of simulated time: we just shut down ourselves: assert not run_task.done() await self.shutdown() @@ -102,27 +115,10 @@ class AppTest(App): """Just a commodity shortcut for `async with app.in_running_state(): pass`, for simple cases""" async with self.in_running_state( waiting_duration_after_initialisation=waiting_duration_after_initialisation, - waiting_duration_post_yield=waiting_duration_before_shutdown, + waiting_duration_after_yield=waiting_duration_before_shutdown, ): pass - def run(self): - raise NotImplementedError( - "Use `async with my_test_app.in_running_state()` rather than `my_test_app.run()`" - ) - - @property - def total_capture(self) -> str | None: - return self.console.file.getvalue() - - @property - def last_display_capture(self) -> str | None: - total_capture = self.total_capture - if not total_capture: - return None - last_display_start_index = total_capture.rindex(CLEAR_SCREEN_SEQUENCE) - return total_capture[last_display_start_index:] - def get_char_at(self, x: int, y: int) -> str: """Get the character at the given cell or empty string @@ -153,6 +149,54 @@ class AppTest(App): return segment.text[0] return "" + async def force_full_screen_update( + self, *, repaint: bool = True, layout: bool = True + ) -> None: + try: + screen = self.screen + except IndexError: + return # the app may not have a screen yet + + # We artificially tell the Compositor that the whole area should be refreshed + screen._compositor._dirty_regions = { + Region(0, 0, screen.size.width, screen.size.height), + } + screen.refresh(repaint=repaint, layout=layout) + # We also have to make sure we have at least one dirty widget, or `screen._on_update()` will early return: + screen._dirty_widgets.add(screen) + screen._on_update() + + await let_asyncio_process_some_events() + + def on_exception(self, error: Exception) -> None: + # In tests we want the errors to be raised, rather than printed to a Console + raise error + + def run(self): + raise NotImplementedError( + "Use `async with my_test_app.in_running_state()` rather than `my_test_app.run()`" + ) + + @property + def active_app(self) -> App | None: + return active_app.get() + + @property + def total_capture(self) -> str | None: + return self.console.file.getvalue() + + @property + def last_display_capture(self) -> str | None: + total_capture = self.total_capture + if not total_capture: + return None + screen_captures = total_capture.split(CLEAR_SCREEN_SEQUENCE) + for single_screen_capture in reversed(screen_captures): + if len(single_screen_capture) > 30: + # let's return the last occurrence of a screen that seem to be properly "fully-paint" + return single_screen_capture + return None + @property def console(self) -> ConsoleTest: return self._console @@ -207,3 +251,111 @@ class DriverTest(Driver): def stop_application_mode(self) -> None: pass + + +# It seems that we have to give _way more_ time to `asyncio` on Windows in order to see our different awaiters +# properly triggered when we pause our own "move clock forward" loop. +# It could be caused by the fact that the time resolution for `asyncio` on this platform seems rather low: +# > The resolution of the monotonic clock on Windows is usually around 15.6 msec. +# > The best resolution is 0.5 msec. +# @link https://docs.python.org/3/library/asyncio-platforms.html: +ASYNCIO_EVENTS_PROCESSING_REQUIRED_PERIOD = 0.025 if WINDOWS else 0.005 + + +async def let_asyncio_process_some_events() -> None: + await asyncio.sleep(ASYNCIO_EVENTS_PROCESSING_REQUIRED_PERIOD) + + +class ClockMock(_Clock): + # To avoid issues with floats we will store the current time as an integer internally. + # Tenths of microseconds should be a good enough granularity: + TIME_RESOLUTION = 10_000_000 + + def __init__( + self, + *, + ticks_granularity_fps: int = 60, + ): + self._ticks_granularity_fps = ticks_granularity_fps + self._single_tick_duration = int(self.TIME_RESOLUTION / ticks_granularity_fps) + self._start_time: int = -1 + self._current_time: int = -1 + # For each call to our `sleep` method we will store an asyncio.Event + # and the time at which we should trigger it: + self._pending_sleep_events: dict[int, list[asyncio.Event]] = {} + + def get_time_no_wait(self) -> float: + if self._current_time == -1: + self._start_clock() + + return self._current_time / self.TIME_RESOLUTION + + async def sleep(self, seconds: float) -> None: + event = asyncio.Event() + internal_waiting_duration = int(seconds * self.TIME_RESOLUTION) + target_event_monotonic_time = self._current_time + internal_waiting_duration + self._pending_sleep_events.setdefault(target_event_monotonic_time, []).append( + event + ) + # Ok, let's wait for this Event + # (which can only be "unlocked" by calls to `advance_clock()`) + await event.wait() + + async def advance_clock(self, seconds: float) -> None: + """ + Artificially advance the Textual clock forward. + + Args: + seconds: for each second we will artificially tick `ticks_granularity_fps` times + """ + if self._current_time == -1: + self._start_clock() + + ticks_count = ceil(seconds * self._ticks_granularity_fps) + activated_timers_count_total = 0 # useful when debugging this code :-) + for tick_counter in range(ticks_count): + self._current_time += self._single_tick_duration + activated_timers_count = self._check_sleep_timers_to_activate() + activated_timers_count_total += activated_timers_count + # Now that we likely unlocked some occurrences of `await sleep(duration)`, + # let's give an opportunity to asyncio-related stuff to happen: + if activated_timers_count: + await let_asyncio_process_some_events() + + await let_asyncio_process_some_events() + + def _start_clock(self) -> None: + # N.B. `start_time` is not actually used, but it is useful to have when we set breakpoints there :-) + self._start_time = self._current_time = int(monotonic() * self.TIME_RESOLUTION) + + def _check_sleep_timers_to_activate(self) -> int: + activated_timers_count = 0 + activated_events_times_to_clear: list[int] = [] + for (monotonic_time, target_events) in self._pending_sleep_events.items(): + if self._current_time < monotonic_time: + continue # not time for you yet, dear awaiter... + # Right, let's release these waiting events! + for event in target_events: + event.set() + activated_timers_count += len(target_events) + # ...and let's mark it for removal: + activated_events_times_to_clear.append(monotonic_time) + + if activated_events_times_to_clear: + for event_time_to_clear in activated_events_times_to_clear: + del self._pending_sleep_events[event_time_to_clear] + + return activated_timers_count + + +def mock_textual_timers( + *, + ticks_granularity_fps: int = 60, +) -> ContextManager[ClockMock]: + @contextlib.contextmanager + def mock_textual_timers_context_manager(): + clock_mock = ClockMock(ticks_granularity_fps=ticks_granularity_fps) + with mock.patch("textual._clock._clock", new=clock_mock): + yield clock_mock + + return mock_textual_timers_context_manager()