Merge branch 'css' into docs-update

This commit is contained in:
Will McGugan
2022-05-23 18:09:43 +01:00
committed by GitHub
17 changed files with 417 additions and 153 deletions

View File

@@ -49,7 +49,7 @@ includes = "src"
asyncio_mode = "auto"
testpaths = ["tests"]
markers = [
"integration_test: marks tests as slow integration tests(deselect with '-m \"not integration_test\"')",
"integration_test: marks tests as slow integration tests (deselect with '-m \"not integration_test\"')",
]
[build-system]

View File

@@ -84,7 +84,7 @@ Tweet {
/* border: outer $primary; */
padding: 1;
border: wide $panel-darken-2;
overflow-y: auto;
overflow: auto;
/* scrollbar-gutter: stable; */
align-horizontal: center;
box-sizing: border-box;
@@ -114,7 +114,7 @@ TweetHeader {
}
TweetBody {
width: 100w;
width: 130%;
background: $panel;
color: $text-panel;
height: auto;

View File

@@ -35,7 +35,7 @@ class Introduction(Widget):
}
"""
def render(self) -> RenderableType:
def render(self, styles) -> RenderableType:
return Text(
"Press keys 0 to 9 to scroll to the Placeholder with that ID.",
justify="center",

View File

@@ -3,14 +3,12 @@ from __future__ import annotations
from abc import ABC, abstractmethod
import asyncio
import sys
from time import monotonic
from typing import Any, Callable, TypeVar
from dataclasses import dataclass
from . import log
from . import _clock
from ._easing import DEFAULT_EASING, EASING
from ._profile import timer
from ._timer import Timer
from ._types import MessageTarget
@@ -139,10 +137,6 @@ class Animator:
pause=True,
)
def get_time(self) -> float:
"""Get the current wall clock time."""
return monotonic()
async def start(self) -> None:
"""Start the animator task."""
@@ -186,10 +180,13 @@ class Animator:
raise AttributeError(
f"Can't animate attribute {attribute!r} on {obj!r}; attribute does not exist"
)
assert (duration is not None and speed is None) or (
duration is None and speed is not None
), "An Animation should have a duration OR a speed"
if final_value is ...:
final_value = value
start_time = self.get_time()
start_time = self._get_time()
animation_key = (id(obj), attribute)
@@ -240,9 +237,15 @@ class Animator:
if not self._animations:
self._timer.pause()
else:
animation_time = self.get_time()
animation_time = self._get_time()
animation_keys = list(self._animations.keys())
for animation_key in animation_keys:
animation = self._animations[animation_key]
if animation(animation_time):
del self._animations[animation_key]
def _get_time(self) -> float:
"""Get the current wall clock time, via the internal Timer."""
# N.B. We could remove this method and always call `self._timer.get_time()` internally,
# but it's handy to have in mocking situations
return _clock.get_time_no_wait()

58
src/textual/_clock.py Normal file
View File

@@ -0,0 +1,58 @@
import asyncio
from time import monotonic
"""
A module that serves as the single source of truth for everything time-related in a Textual app.
Having this logic centralised makes it easier to simulate time in integration tests,
by mocking the few functions exposed by this module.
"""
# N.B. This class and its singleton instance have to be hidden APIs because we want to be able to mock time,
# even for Python modules that imported functions such as `get_time` *before* we mocked this internal _Clock.
# (so mocking public APIs such as `get_time` wouldn't affect direct references to then that were done during imports)
class _Clock:
async def get_time(self) -> float:
return self.get_time_no_wait()
def get_time_no_wait(self) -> float:
return monotonic()
async def sleep(self, seconds: float) -> None:
await asyncio.sleep(seconds)
# That's our target for mocking time! :-)
_clock = _Clock()
def get_time_no_wait() -> float:
"""
Get the current wall clock time.
Returns:
float: the value (in fractional seconds) of a monotonic clock, i.e. a clock that cannot go backwards.
"""
return _clock.get_time_no_wait()
async def get_time() -> float:
"""
Asynchronous version of `get_time`. Useful in situations where we want asyncio to be
able to "do things" elsewhere right before we fetch the time.
Returns:
float: the value (in fractional seconds) of a monotonic clock, i.e. a clock that cannot go backwards.
"""
return await _clock.get_time()
async def sleep(seconds: float) -> None:
"""
Coroutine that completes after a given time (in seconds).
Args:
seconds (float): the duration we should wait for before unblocking the awaiter
"""
return await _clock.sleep(seconds)

View File

@@ -5,4 +5,9 @@ from contextvars import ContextVar
if TYPE_CHECKING:
from .app import App
class NoActiveAppError(RuntimeError):
pass
active_app: ContextVar["App"] = ContextVar("active_app")

View File

@@ -5,17 +5,15 @@ import weakref
from asyncio import (
CancelledError,
Event,
sleep,
Task,
)
from functools import partial
from time import monotonic
from typing import Awaitable, Callable, Union
from rich.repr import Result, rich_repr
from . import events
from ._callback import invoke
from . import _clock
from ._types import MessageTarget
TimerCallback = Union[Callable[[], Awaitable[None]], Callable[[], None]]
@@ -109,32 +107,38 @@ class Timer:
count = 0
_repeat = self._repeat
_interval = self._interval
start = monotonic()
start = _clock.get_time_no_wait()
try:
while _repeat is None or count <= _repeat:
next_timer = start + ((count + 1) * _interval)
if self._skip and next_timer < monotonic():
now = await _clock.get_time()
if self._skip and next_timer < now:
count += 1
continue
wait_time = max(0, next_timer - monotonic())
now = await _clock.get_time()
wait_time = max(0, next_timer - now)
if wait_time:
await sleep(wait_time)
event = events.Timer(
self.sender,
timer=self,
time=next_timer,
count=count,
callback=self._callback,
)
await _clock.sleep(wait_time)
count += 1
try:
if self._callback is not None:
await invoke(self._callback)
else:
await self.target.post_priority_message(event)
await self._tick(next_timer=next_timer, count=count)
except EventTargetGone:
break
await self._active.wait()
except CancelledError:
pass
async def _tick(self, *, next_timer: float, count: int) -> None:
"""Triggers the Timer's action: either call its callback, or sends an event to its target"""
if self._callback is not None:
await invoke(self._callback)
else:
event = events.Timer(
self.sender,
timer=self,
time=next_timer,
count=count,
callback=self._callback,
)
await self.target.post_priority_message(event)

View File

@@ -96,8 +96,8 @@ def get_box_model(
content_height = max(1, content_height)
# Get box dimensions by adding gutter
width = content_width + gutter.width
height = content_height + gutter.height
model = BoxModel(Size(width, height), margin)
size = Size(content_width, content_height) + gutter.totals
model = BoxModel(size, margin)
return model

View File

@@ -9,6 +9,7 @@ from rich.style import Style
from rich.text import Text
from rich.tree import Tree
from ._context import NoActiveAppError
from ._node_list import NodeList
from .color import Color
from .css._error_tools import friendly_list
@@ -463,7 +464,7 @@ class DOMNode(MessagePump):
try:
self.app.stylesheet.update(self.app, animate=True)
self.refresh()
except LookupError:
except NoActiveAppError:
pass
def remove_class(self, *class_names: str) -> None:
@@ -477,7 +478,7 @@ class DOMNode(MessagePump):
try:
self.app.stylesheet.update(self.app, animate=True)
self.refresh()
except LookupError:
except NoActiveAppError:
pass
def toggle_class(self, *class_names: str) -> None:
@@ -491,7 +492,7 @@ class DOMNode(MessagePump):
try:
self.app.stylesheet.update(self.app, animate=True)
self.refresh()
except LookupError:
except NoActiveAppError:
pass
def has_pseudo_class(self, *class_names: str) -> bool:

View File

@@ -1,10 +1,10 @@
from __future__ import annotations
from time import monotonic
from typing import ClassVar
import rich.repr
from . import _clock
from .case import camel_to_snake
from ._types import MessageTarget
@@ -39,7 +39,7 @@ class Message:
self.sender = sender
self.name = camel_to_snake(self.__class__.__name__.replace("Message", ""))
self.time = monotonic()
self.time = _clock.get_time_no_wait()
self._forwarded = False
self._no_default_action = False
self._stop_propagation = False

View File

@@ -12,7 +12,7 @@ from . import events
from . import log
from ._timer import Timer, TimerCallback
from ._callback import invoke
from ._context import active_app
from ._context import active_app, NoActiveAppError
from .message import Message
from . import messages
@@ -74,8 +74,16 @@ class MessagePump:
@property
def app(self) -> "App":
"""Get the current app."""
return active_app.get()
"""
Get the current app.
Raises:
NoActiveAppError: if no active app could be found for the current asyncio context
"""
try:
return active_app.get()
except LookupError:
raise NoActiveAppError()
@property
def is_parent_active(self):
@@ -152,7 +160,13 @@ class MessagePump:
pause: bool = False,
) -> Timer:
timer = Timer(
self, delay, self, name=name, callback=callback, repeat=0, pause=pause
self,
delay,
self,
name=name or f"set_timer#{Timer._timer_count}",
callback=callback,
repeat=0,
pause=pause,
)
self._child_tasks.add(timer.start())
return timer
@@ -170,7 +184,7 @@ class MessagePump:
self,
interval,
self,
name=name,
name=name or f"set_interval#{Timer._timer_count}",
callback=callback,
repeat=repeat or None,
pause=pause,

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import sys
from rich.console import RenderableType
import rich.repr
from rich.style import Style
@@ -12,6 +14,14 @@ from ._compositor import Compositor, MapGeometry
from .reactive import Reactive
from .widget import Widget
if sys.version_info >= (3, 8):
from typing import Final
else:
from typing_extensions import Final
# Screen updates will be batched so that they don't happen more often than 20 times per second:
UPDATE_PERIOD: Final = 1 / 20
@rich.repr.auto
class Screen(Widget):
@@ -158,7 +168,9 @@ class Screen(Widget):
self.check_idle()
def on_mount(self, event: events.Mount) -> None:
self._update_timer = self.set_interval(1 / 20, self._on_update, pause=True)
self._update_timer = self.set_interval(
UPDATE_PERIOD, self._on_update, name="screen_update", pause=True
)
async def on_resize(self, event: events.Resize) -> None:
self.size_updated(event.size, event.virtual_size, event.container_size)

View File

@@ -184,7 +184,10 @@ class Widget(DOMNode):
int: The optimal width of the content.
"""
if self.is_container:
return self.layout.get_content_width(self, container, viewport)
return (
self.layout.get_content_width(self, container, viewport)
+ self.scrollbar_width
)
cache_key = container.width
if self._content_width_cache[0] == cache_key:
@@ -214,11 +217,14 @@ class Widget(DOMNode):
"""
if self.is_container:
assert self.layout is not None
height = self.layout.get_content_height(
self,
container,
viewport,
width,
height = (
self.layout.get_content_height(
self,
container,
viewport,
width,
)
+ self.scrollbar_height
)
else:
cache_key = width
@@ -255,11 +261,21 @@ class Widget(DOMNode):
@property
def max_scroll_x(self) -> float:
return max(0, self.virtual_size.width - self.container_size.width)
"""The maximum value of `scroll_x`."""
return max(
0,
self.virtual_size.width - self.container_size.width + self.scrollbar_width,
)
@property
def max_scroll_y(self) -> float:
return max(0, self.virtual_size.height - self.container_size.height)
"""The maximum value of `scroll_y`."""
return max(
0,
self.virtual_size.height
- self.container_size.height
+ self.scrollbar_height,
)
@property
def vertical_scrollbar(self) -> ScrollBar:
@@ -335,12 +351,27 @@ class Widget(DOMNode):
tuple[bool, bool]: A tuple of (<vertical scrollbar enabled>, <horizontal scrollbar enabled>)
"""
if self.layout is None:
if not self.is_container:
return False, False
enabled = self.show_vertical_scrollbar, self.show_horizontal_scrollbar
return enabled
@property
def scrollbar_dimensions(self) -> tuple[int, int]:
"""Get the size of any scrollbars on the widget"""
return (int(self.show_horizontal_scrollbar), int(self.show_vertical_scrollbar))
@property
def scrollbar_width(self) -> int:
"""Get the width used by the *vertical* scrollbar."""
return int(self.show_vertical_scrollbar)
@property
def scrollbar_height(self) -> int:
"""Get the height used by the *horizontal* scrollbar."""
return int(self.show_horizontal_scrollbar)
def set_dirty(self) -> None:
"""Set the Widget as 'dirty' (requiring re-render)."""
self._dirty_regions.clear()
@@ -366,7 +397,6 @@ class Widget(DOMNode):
bool: True if the scroll position changed, otherwise False.
"""
scrolled_x = scrolled_y = False
if animate:
# TODO: configure animation speed
if x is not None:
@@ -915,8 +945,6 @@ class Widget(DOMNode):
def on_descendant_focus(self, event: events.DescendantFocus) -> None:
self.descendant_has_focus = True
if self.is_container and isinstance(event.sender, Widget):
self.scroll_to_widget(event.sender, animate=True)
def on_descendant_blur(self, event: events.DescendantBlur) -> None:
self.descendant_has_focus = False

View File

@@ -177,12 +177,12 @@ class MockAnimator(Animator):
self._time = 0.0
self._on_animation_frame_called = False
def get_time(self):
return self._time
def on_animation_frame(self):
self._on_animation_frame_called = True
def _get_time(self):
return self._time
def test_animator():
@@ -245,10 +245,3 @@ def test_bound_animator():
easing=EASING[DEFAULT_EASING],
)
assert animator._animations[(id(animate_test), "foo")] == expected
def test_animator_get_time():
target = Mock()
animator = Animator(target)
assert isinstance(animator.get_time(), float)
assert animator.get_time() <= animator.get_time()

View File

@@ -1,5 +1,4 @@
from __future__ import annotations
import asyncio
from typing import cast, List
import pytest
@@ -107,7 +106,6 @@ async def test_composition_of_vertical_container_with_children(
expected_placeholders_size: tuple[int, int],
expected_root_widget_virtual_size: tuple[int, int],
expected_placeholders_offset_x: int,
event_loop: asyncio.AbstractEventLoop,
):
class VerticalContainer(Widget):
CSS = (

View File

@@ -1,17 +1,8 @@
from __future__ import annotations
import sys
from typing import Sequence, cast
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal # pragma: no cover
import pytest
from sandbox.vertical_container import VerticalContainer
from tests.utilities.test_app import AppTest
from textual.app import ComposeResult
from textual.geometry import Size
@@ -21,7 +12,6 @@ from textual.widgets import Placeholder
SCREEN_SIZE = Size(100, 30)
@pytest.mark.skip("flaky test")
@pytest.mark.asyncio
@pytest.mark.integration_test # this is a slow test, we may want to skip them in some contexts
@pytest.mark.parametrize(
@@ -32,23 +22,19 @@ SCREEN_SIZE = Size(100, 30)
"scroll_to_animate",
"waiting_duration",
"last_screen_expected_placeholder_ids",
"last_screen_expected_out_of_viewport_placeholder_ids",
),
(
[SCREEN_SIZE, 10, None, None, 0.01, (0, 1, 2, 3, 4), "others"],
[SCREEN_SIZE, 10, "placeholder_3", False, 0.01, (0, 1, 2, 3, 4), "others"],
[SCREEN_SIZE, 10, "placeholder_5", False, 0.01, (1, 2, 3, 4, 5), "others"],
[SCREEN_SIZE, 10, "placeholder_7", False, 0.01, (3, 4, 5, 6, 7), "others"],
[SCREEN_SIZE, 10, "placeholder_9", False, 0.01, (5, 6, 7, 8, 9), "others"],
[SCREEN_SIZE, 10, None, None, 0.01, (0, 1, 2, 3, 4)],
[SCREEN_SIZE, 10, "placeholder_3", False, 0.01, (0, 1, 2, 3, 4)],
[SCREEN_SIZE, 10, "placeholder_5", False, 0.01, (1, 2, 3, 4, 5)],
[SCREEN_SIZE, 10, "placeholder_7", False, 0.01, (3, 4, 5, 6, 7)],
[SCREEN_SIZE, 10, "placeholder_9", False, 0.01, (5, 6, 7, 8, 9)],
# N.B. Scroll duration is hard-coded to 0.2 in the `scroll_to_widget` method atm
# Waiting for this duration should allow us to see the scroll finished:
[SCREEN_SIZE, 10, "placeholder_9", True, 0.21, (5, 6, 7, 8, 9), "others"],
[SCREEN_SIZE, 10, "placeholder_9", True, 0.21, (5, 6, 7, 8, 9)],
# After having waited for approximately half of the scrolling duration, we should
# see the middle Placeholders as we're scrolling towards the last of them.
# The state of the screen at this "halfway there" timing looks to not be deterministic though,
# depending on the environment - so let's only assert stuff for the middle placeholders
# and the first and last ones, but without being too specific about the others:
[SCREEN_SIZE, 10, "placeholder_9", True, 0.1, (5, 6, 7), (1, 2, 9)],
[SCREEN_SIZE, 10, "placeholder_9", True, 0.1, (4, 5, 6, 7, 8)],
),
)
async def test_scroll_to_widget(
@@ -58,9 +44,19 @@ async def test_scroll_to_widget(
scroll_to_placeholder_id: str | None,
waiting_duration: float | None,
last_screen_expected_placeholder_ids: Sequence[int],
last_screen_expected_out_of_viewport_placeholder_ids: Sequence[int]
| Literal["others"],
):
class VerticalContainer(Widget):
CSS = """
VerticalContainer {
layout: vertical;
overflow: hidden auto;
}
VerticalContainer Placeholder {
margin: 1 0;
height: 5;
}
"""
class MyTestApp(AppTest):
CSS = """
Placeholder {
@@ -78,7 +74,7 @@ async def test_scroll_to_widget(
app = MyTestApp(size=screen_size, test_name="scroll_to_widget")
async with app.in_running_state(waiting_duration_post_yield=waiting_duration or 0):
async with app.in_running_state(waiting_duration_after_yield=waiting_duration or 0):
if scroll_to_placeholder_id:
target_widget_container = cast(Widget, app.query("#root").first())
target_widget = cast(
@@ -97,21 +93,21 @@ async def test_scroll_to_widget(
# Let's start by checking placeholders that should be visible:
for placeholder_id in last_screen_expected_placeholder_ids:
assert (
placeholders_visibility_by_id[placeholder_id] is True
), f"Placeholder '{placeholder_id}' should be visible but isn't"
assert placeholders_visibility_by_id[placeholder_id] is True, (
f"Placeholder '{placeholder_id}' should be visible but isn't"
f" :: placeholders_visibility_by_id={placeholders_visibility_by_id}"
)
# Ok, now for placeholders that should *not* be visible:
if last_screen_expected_out_of_viewport_placeholder_ids == "others":
# We're simply going to check that all the placeholders that are not in
# `last_screen_expected_placeholder_ids` are not on the screen:
last_screen_expected_out_of_viewport_placeholder_ids = sorted(
tuple(
set(range(placeholders_count))
- set(last_screen_expected_placeholder_ids)
)
# We're simply going to check that all the placeholders that are not in
# `last_screen_expected_placeholder_ids` are not on the screen:
last_screen_expected_out_of_viewport_placeholder_ids = sorted(
tuple(
set(range(placeholders_count)) - set(last_screen_expected_placeholder_ids)
)
)
for placeholder_id in last_screen_expected_out_of_viewport_placeholder_ids:
assert (
placeholders_visibility_by_id[placeholder_id] is False
), f"Placeholder '{placeholder_id}' should not be visible but is"
assert placeholders_visibility_by_id[placeholder_id] is False, (
f"Placeholder '{placeholder_id}' should not be visible but is"
f" :: placeholders_visibility_by_id={placeholders_visibility_by_id}"
)

View File

@@ -3,16 +3,20 @@ from __future__ import annotations
import asyncio
import contextlib
import io
from math import ceil
from pathlib import Path
from typing import AsyncContextManager, cast
from time import monotonic
from typing import AsyncContextManager, cast, ContextManager
from unittest import mock
from rich.console import Console
from textual import events, errors
from textual.app import App, ReturnType, ComposeResult
from textual._clock import _Clock
from textual.app import App, ComposeResult, WINDOWS
from textual._context import active_app
from textual.driver import Driver
from textual.geometry import Size
from textual.geometry import Size, Region
# N.B. These classes would better be named TestApp/TestConsole/TestDriver/etc,
# but it makes pytest emit warning as it will try to collect them as classes containing test cases :-/
@@ -61,33 +65,42 @@ class AppTest(App):
def in_running_state(
self,
*,
waiting_duration_after_initialisation: float = 0.1,
waiting_duration_post_yield: float = 0,
) -> AsyncContextManager:
time_mocking_ticks_granularity_fps: int = 60, # i.e. when moving forward by 1 second we'll do it though 60 ticks
waiting_duration_after_initialisation: float = 1,
waiting_duration_after_yield: float = 0,
) -> AsyncContextManager[ClockMock]:
async def run_app() -> None:
await self.process_messages()
@contextlib.asynccontextmanager
async def get_running_state_context_manager():
self._set_active()
run_task = asyncio.create_task(run_app())
timeout_before_yielding_task = asyncio.create_task(
asyncio.sleep(waiting_duration_after_initialisation)
)
done, pending = await asyncio.wait(
(
run_task,
timeout_before_yielding_task,
),
return_when=asyncio.FIRST_COMPLETED,
)
if run_task in done or run_task not in pending:
raise RuntimeError(
"TestApp is no longer running after its initialization period"
)
yield
if waiting_duration_post_yield:
await asyncio.sleep(waiting_duration_post_yield)
with mock_textual_timers(
ticks_granularity_fps=time_mocking_ticks_granularity_fps
) as clock_mock:
run_task = asyncio.create_task(run_app())
# We have to do this because `run_app()` is running in its own async task, and our test is going to
# run in this one - so the app must also be the active App in our current context:
self._set_active()
await clock_mock.advance_clock(waiting_duration_after_initialisation)
# make sure the App has entered its main loop at this stage:
assert self._driver is not None
await self.force_full_screen_update()
# And now it's time to pass the torch on to the test function!
# We provide the `move_clock_forward` function to it,
# so it can also do some time-based Textual stuff if it needs to:
yield clock_mock
await clock_mock.advance_clock(waiting_duration_after_yield)
# Make sure our screen is up-to-date before exiting the context manager,
# so tests using our `last_display_capture` for example can assert things on a fully refreshed screen:
await self.force_full_screen_update()
# End of simulated time: we just shut down ourselves:
assert not run_task.done()
await self.shutdown()
@@ -102,27 +115,10 @@ class AppTest(App):
"""Just a commodity shortcut for `async with app.in_running_state(): pass`, for simple cases"""
async with self.in_running_state(
waiting_duration_after_initialisation=waiting_duration_after_initialisation,
waiting_duration_post_yield=waiting_duration_before_shutdown,
waiting_duration_after_yield=waiting_duration_before_shutdown,
):
pass
def run(self):
raise NotImplementedError(
"Use `async with my_test_app.in_running_state()` rather than `my_test_app.run()`"
)
@property
def total_capture(self) -> str | None:
return self.console.file.getvalue()
@property
def last_display_capture(self) -> str | None:
total_capture = self.total_capture
if not total_capture:
return None
last_display_start_index = total_capture.rindex(CLEAR_SCREEN_SEQUENCE)
return total_capture[last_display_start_index:]
def get_char_at(self, x: int, y: int) -> str:
"""Get the character at the given cell or empty string
@@ -153,6 +149,54 @@ class AppTest(App):
return segment.text[0]
return ""
async def force_full_screen_update(
self, *, repaint: bool = True, layout: bool = True
) -> None:
try:
screen = self.screen
except IndexError:
return # the app may not have a screen yet
# We artificially tell the Compositor that the whole area should be refreshed
screen._compositor._dirty_regions = {
Region(0, 0, screen.size.width, screen.size.height),
}
screen.refresh(repaint=repaint, layout=layout)
# We also have to make sure we have at least one dirty widget, or `screen._on_update()` will early return:
screen._dirty_widgets.add(screen)
screen._on_update()
await let_asyncio_process_some_events()
def on_exception(self, error: Exception) -> None:
# In tests we want the errors to be raised, rather than printed to a Console
raise error
def run(self):
raise NotImplementedError(
"Use `async with my_test_app.in_running_state()` rather than `my_test_app.run()`"
)
@property
def active_app(self) -> App | None:
return active_app.get()
@property
def total_capture(self) -> str | None:
return self.console.file.getvalue()
@property
def last_display_capture(self) -> str | None:
total_capture = self.total_capture
if not total_capture:
return None
screen_captures = total_capture.split(CLEAR_SCREEN_SEQUENCE)
for single_screen_capture in reversed(screen_captures):
if len(single_screen_capture) > 30:
# let's return the last occurrence of a screen that seem to be properly "fully-paint"
return single_screen_capture
return None
@property
def console(self) -> ConsoleTest:
return self._console
@@ -207,3 +251,111 @@ class DriverTest(Driver):
def stop_application_mode(self) -> None:
pass
# It seems that we have to give _way more_ time to `asyncio` on Windows in order to see our different awaiters
# properly triggered when we pause our own "move clock forward" loop.
# It could be caused by the fact that the time resolution for `asyncio` on this platform seems rather low:
# > The resolution of the monotonic clock on Windows is usually around 15.6 msec.
# > The best resolution is 0.5 msec.
# @link https://docs.python.org/3/library/asyncio-platforms.html:
ASYNCIO_EVENTS_PROCESSING_REQUIRED_PERIOD = 0.025 if WINDOWS else 0.005
async def let_asyncio_process_some_events() -> None:
await asyncio.sleep(ASYNCIO_EVENTS_PROCESSING_REQUIRED_PERIOD)
class ClockMock(_Clock):
# To avoid issues with floats we will store the current time as an integer internally.
# Tenths of microseconds should be a good enough granularity:
TIME_RESOLUTION = 10_000_000
def __init__(
self,
*,
ticks_granularity_fps: int = 60,
):
self._ticks_granularity_fps = ticks_granularity_fps
self._single_tick_duration = int(self.TIME_RESOLUTION / ticks_granularity_fps)
self._start_time: int = -1
self._current_time: int = -1
# For each call to our `sleep` method we will store an asyncio.Event
# and the time at which we should trigger it:
self._pending_sleep_events: dict[int, list[asyncio.Event]] = {}
def get_time_no_wait(self) -> float:
if self._current_time == -1:
self._start_clock()
return self._current_time / self.TIME_RESOLUTION
async def sleep(self, seconds: float) -> None:
event = asyncio.Event()
internal_waiting_duration = int(seconds * self.TIME_RESOLUTION)
target_event_monotonic_time = self._current_time + internal_waiting_duration
self._pending_sleep_events.setdefault(target_event_monotonic_time, []).append(
event
)
# Ok, let's wait for this Event
# (which can only be "unlocked" by calls to `advance_clock()`)
await event.wait()
async def advance_clock(self, seconds: float) -> None:
"""
Artificially advance the Textual clock forward.
Args:
seconds: for each second we will artificially tick `ticks_granularity_fps` times
"""
if self._current_time == -1:
self._start_clock()
ticks_count = ceil(seconds * self._ticks_granularity_fps)
activated_timers_count_total = 0 # useful when debugging this code :-)
for tick_counter in range(ticks_count):
self._current_time += self._single_tick_duration
activated_timers_count = self._check_sleep_timers_to_activate()
activated_timers_count_total += activated_timers_count
# Now that we likely unlocked some occurrences of `await sleep(duration)`,
# let's give an opportunity to asyncio-related stuff to happen:
if activated_timers_count:
await let_asyncio_process_some_events()
await let_asyncio_process_some_events()
def _start_clock(self) -> None:
# N.B. `start_time` is not actually used, but it is useful to have when we set breakpoints there :-)
self._start_time = self._current_time = int(monotonic() * self.TIME_RESOLUTION)
def _check_sleep_timers_to_activate(self) -> int:
activated_timers_count = 0
activated_events_times_to_clear: list[int] = []
for (monotonic_time, target_events) in self._pending_sleep_events.items():
if self._current_time < monotonic_time:
continue # not time for you yet, dear awaiter...
# Right, let's release these waiting events!
for event in target_events:
event.set()
activated_timers_count += len(target_events)
# ...and let's mark it for removal:
activated_events_times_to_clear.append(monotonic_time)
if activated_events_times_to_clear:
for event_time_to_clear in activated_events_times_to_clear:
del self._pending_sleep_events[event_time_to_clear]
return activated_timers_count
def mock_textual_timers(
*,
ticks_granularity_fps: int = 60,
) -> ContextManager[ClockMock]:
@contextlib.contextmanager
def mock_textual_timers_context_manager():
clock_mock = ClockMock(ticks_granularity_fps=ticks_granularity_fps)
with mock.patch("textual._clock._clock", new=clock_mock):
yield clock_mock
return mock_textual_timers_context_manager()