mirror of
https://github.com/Textualize/textual.git
synced 2025-10-17 02:38:12 +03:00
[clock] Add a centralised Clock, responsible for anything related to time
This makes time quite easier to mock during integration tests :-)
This commit is contained in:
@@ -3,7 +3,6 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import contextlib
|
||||
import io
|
||||
import sys
|
||||
from math import ceil
|
||||
from pathlib import Path
|
||||
from time import monotonic
|
||||
@@ -13,16 +12,11 @@ from unittest import mock
|
||||
from rich.console import Console
|
||||
|
||||
from textual import events, errors
|
||||
from textual._clock import _Clock
|
||||
from textual.app import App, ComposeResult, WINDOWS
|
||||
from textual._context import active_app
|
||||
from textual.driver import Driver
|
||||
from textual.geometry import Size
|
||||
|
||||
if sys.version_info >= (3, 8):
|
||||
from typing import Protocol
|
||||
else:
|
||||
from typing_extensions import Protocol
|
||||
|
||||
from textual.geometry import Size, Region
|
||||
|
||||
# N.B. These classes would better be named TestApp/TestConsole/TestDriver/etc,
|
||||
# but it makes pytest emit warning as it will try to collect them as classes containing test cases :-/
|
||||
@@ -31,12 +25,6 @@ else:
|
||||
CLEAR_SCREEN_SEQUENCE = "\x1bP=1s\x1b\\"
|
||||
|
||||
|
||||
class MockedTimeMoveClockForward(Protocol):
|
||||
async def __call__(self, *, seconds: float) -> tuple[float, int]:
|
||||
"""Returns the new current (mocked) monotonic time and the number of activated Timers"""
|
||||
...
|
||||
|
||||
|
||||
class AppTest(App):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -80,7 +68,7 @@ class AppTest(App):
|
||||
time_mocking_ticks_granularity_fps: int = 60, # i.e. when moving forward by 1 second we'll do it though 60 ticks
|
||||
waiting_duration_after_initialisation: float = 1,
|
||||
waiting_duration_after_yield: float = 0,
|
||||
) -> AsyncContextManager[MockedTimeMoveClockForward]:
|
||||
) -> AsyncContextManager[ClockMock]:
|
||||
async def run_app() -> None:
|
||||
await self.process_messages()
|
||||
|
||||
@@ -88,29 +76,33 @@ class AppTest(App):
|
||||
async def get_running_state_context_manager():
|
||||
with mock_textual_timers(
|
||||
ticks_granularity_fps=time_mocking_ticks_granularity_fps
|
||||
) as move_clock_forward:
|
||||
) as clock_mock:
|
||||
run_task = asyncio.create_task(run_app())
|
||||
|
||||
# We have to do this because `run_app()` is running in its own async task, and our test is going to
|
||||
# run in this one - so the app must also be the active App in our current context:
|
||||
self._set_active()
|
||||
|
||||
await move_clock_forward(seconds=waiting_duration_after_initialisation)
|
||||
await clock_mock.move_clock_forward(
|
||||
seconds=waiting_duration_after_initialisation
|
||||
)
|
||||
# make sure the App has entered its main loop at this stage:
|
||||
assert self._driver is not None
|
||||
|
||||
await self.force_screen_update()
|
||||
await self.force_full_screen_update()
|
||||
|
||||
# And now it's time to pass the torch on to the test function!
|
||||
# We provide the `move_clock_forward` function to it,
|
||||
# so it can also do some time-based Textual stuff if it needs to:
|
||||
yield move_clock_forward
|
||||
yield clock_mock
|
||||
|
||||
await move_clock_forward(seconds=waiting_duration_after_yield)
|
||||
await clock_mock.move_clock_forward(
|
||||
seconds=waiting_duration_after_yield
|
||||
)
|
||||
|
||||
# Make sure our screen is up to date before exiting the context manager,
|
||||
# so tests using our `last_display_capture` for example can assert things on an up to date screen:
|
||||
await self.force_screen_update()
|
||||
await self.force_full_screen_update()
|
||||
|
||||
# End of simulated time: we just shut down ourselves:
|
||||
assert not run_task.done()
|
||||
@@ -161,14 +153,21 @@ class AppTest(App):
|
||||
return segment.text[0]
|
||||
return ""
|
||||
|
||||
async def force_screen_update(
|
||||
async def force_full_screen_update(
|
||||
self, *, repaint: bool = True, layout: bool = True
|
||||
) -> None:
|
||||
try:
|
||||
screen = self.screen
|
||||
except IndexError:
|
||||
return # the app may not have a screen yet
|
||||
|
||||
# We artificially tell the Compositor that the whole area should be refreshed
|
||||
screen._compositor._dirty_regions = {
|
||||
Region(0, 0, screen.size.width, screen.size.height),
|
||||
}
|
||||
screen.refresh(repaint=repaint, layout=layout)
|
||||
# We also have to make sure we have at least one dirty widget, or `screen._on_update()` will early return:
|
||||
screen._dirty_widgets.add(screen)
|
||||
screen._on_update()
|
||||
|
||||
await let_asyncio_process_some_events()
|
||||
@@ -254,6 +253,9 @@ class DriverTest(Driver):
|
||||
pass
|
||||
|
||||
|
||||
# It seems that we have to give _way more_ time to `asyncio` on Windows in order to see our different awaiters
|
||||
# properly triggered when we pause our own "move clock forward" loop.
|
||||
# It could be caused by the fact that the time resolution for `asyncio` on this platform seems rather low:
|
||||
# > The resolution of the monotonic clock on Windows is usually around 15.6 msec.
|
||||
# > The best resolution is 0.5 msec.
|
||||
# @link https://docs.python.org/3/library/asyncio-platforms.html:
|
||||
@@ -264,80 +266,92 @@ async def let_asyncio_process_some_events() -> None:
|
||||
await asyncio.sleep(ASYNCIO_EVENTS_PROCESSING_REQUIRED_PERIOD)
|
||||
|
||||
|
||||
class ClockMock(_Clock):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
ticks_granularity_fps: int = 60,
|
||||
):
|
||||
self._ticks_granularity_fps = ticks_granularity_fps
|
||||
self._single_tick_duration = 1.0 / ticks_granularity_fps
|
||||
self._start_time = self._current_time = None
|
||||
self._pending_sleep_events: list[tuple[float, asyncio.Event]] = []
|
||||
|
||||
def get_time(self) -> float:
|
||||
if self._current_time is None:
|
||||
self._start_clock()
|
||||
|
||||
# let's make the time advance _very_ slightly between 2 consecutive calls of this function,
|
||||
# within the same order of magnitude than 2 consecutive calls to ` timer.monotonic()`:
|
||||
self._current_time += 1.1e-06
|
||||
|
||||
return self._current_time
|
||||
|
||||
async def sleep(self, seconds: float) -> None:
|
||||
event = asyncio.Event()
|
||||
target_event_monotonic_time = self._current_time + seconds
|
||||
self._pending_sleep_events.append((target_event_monotonic_time, event))
|
||||
# Ok, let's wait for this Event
|
||||
# (which can only be "unlocked" by calls to `move_clock_forward()`)
|
||||
await event.wait()
|
||||
|
||||
async def move_clock_forward(self, *, seconds: float) -> tuple[float, int]:
|
||||
"""
|
||||
Artificially moves the Textual clock forward.
|
||||
|
||||
Args:
|
||||
seconds: for each second we will artificially tick `ticks_granularity_fps` times
|
||||
|
||||
Returns:
|
||||
tuple[float, int]: a tuple giving the new mocked current time and the number of sleep awaiters
|
||||
that were unblocked by this call to `move_clock_forward`
|
||||
"""
|
||||
if self._current_time is None:
|
||||
self._start_clock()
|
||||
|
||||
ticks_count = ceil(seconds * self._ticks_granularity_fps)
|
||||
activated_timers_count_total = 0
|
||||
for tick_counter in range(ticks_count):
|
||||
self._current_time += self._single_tick_duration
|
||||
activated_timers_count = self._check_sleep_timers_to_activate()
|
||||
activated_timers_count_total += activated_timers_count
|
||||
# Let's give an opportunity to asyncio-related stuff to happen,
|
||||
# now that we likely unlocked some occurrences of `await sleep(duration)`:
|
||||
if activated_timers_count:
|
||||
await let_asyncio_process_some_events()
|
||||
|
||||
await let_asyncio_process_some_events()
|
||||
|
||||
return self._current_time, activated_timers_count_total
|
||||
|
||||
def _start_clock(self) -> None:
|
||||
# N.B. `start_time` is not used, but it is useful to have when we set breakpoints there :-)
|
||||
self._start_time = self._current_time = monotonic()
|
||||
|
||||
def _check_sleep_timers_to_activate(self) -> int:
|
||||
activated_timers_count = 0
|
||||
for i, (target_event_monotonic_time, event) in enumerate(
|
||||
self._pending_sleep_events
|
||||
):
|
||||
if self._current_time < target_event_monotonic_time:
|
||||
continue # not time for you yet, dear awaiter...
|
||||
# Right, let's release this waiting event!
|
||||
event.set()
|
||||
activated_timers_count += 1
|
||||
# ...and remove it from our pending sleep events list:
|
||||
del self._pending_sleep_events[i]
|
||||
|
||||
return activated_timers_count
|
||||
|
||||
|
||||
def mock_textual_timers(
|
||||
*,
|
||||
ticks_granularity_fps: int = 60,
|
||||
) -> ContextManager[MockedTimeMoveClockForward]:
|
||||
single_tick_duration = 1.0 / ticks_granularity_fps
|
||||
|
||||
pending_sleep_events: list[tuple[float, asyncio.Event]] = []
|
||||
|
||||
) -> ContextManager[ClockMock]:
|
||||
@contextlib.contextmanager
|
||||
def mock_textual_timers_context_manager():
|
||||
# N.B. `start_time` is not used, but it is useful to have when we set breakpoints there :-)
|
||||
start_time = current_time = monotonic()
|
||||
|
||||
# Our replacement for "textual._timer.Timer._sleep":
|
||||
async def sleep_mock(duration: float) -> None:
|
||||
event = asyncio.Event()
|
||||
target_event_monotonic_time = current_time + duration
|
||||
pending_sleep_events.append((target_event_monotonic_time, event))
|
||||
# Ok, let's wait for this Event
|
||||
# (which can only be "unlocked" by calls to `move_clock_forward()`)
|
||||
await event.wait()
|
||||
|
||||
# Our replacement for "textual._timer.Timer.get_time" and "textual.message.Message._get_time":
|
||||
def get_time_mock() -> float:
|
||||
nonlocal current_time
|
||||
|
||||
# let's make the time advance slightly between 2 consecutive calls of this function,
|
||||
# within the same order of magnitude than 2 consecutive calls to ` timer.monotonic()`:
|
||||
current_time += 1.1e-06
|
||||
|
||||
return current_time
|
||||
|
||||
async def move_clock_forward(*, seconds: float) -> tuple[float, int]:
|
||||
nonlocal current_time, start_time
|
||||
|
||||
ticks_count = ceil(seconds * ticks_granularity_fps)
|
||||
activated_timers_count_total = 0
|
||||
for tick_counter in range(ticks_count):
|
||||
current_time += single_tick_duration
|
||||
activated_timers_count = check_sleep_timers_to_activate()
|
||||
activated_timers_count_total += activated_timers_count
|
||||
# Let's give an opportunity to asyncio-related stuff to happen,
|
||||
# now that we likely unlocked some occurrences of `await sleep(duration)`:
|
||||
if activated_timers_count:
|
||||
await let_asyncio_process_some_events()
|
||||
|
||||
await let_asyncio_process_some_events()
|
||||
|
||||
return current_time, activated_timers_count_total
|
||||
|
||||
def check_sleep_timers_to_activate() -> int:
|
||||
nonlocal pending_sleep_events
|
||||
|
||||
activated_timers_count = 0
|
||||
for i, (target_event_monotonic_time, event) in enumerate(
|
||||
pending_sleep_events
|
||||
):
|
||||
if current_time < target_event_monotonic_time:
|
||||
continue # not time for you yet, dear awaiter...
|
||||
# Right, let's release this waiting event!
|
||||
event.set()
|
||||
activated_timers_count += 1
|
||||
# ...and remove it from our pending sleep events list:
|
||||
del pending_sleep_events[i]
|
||||
|
||||
return activated_timers_count
|
||||
|
||||
with mock.patch("textual._timer._TIMERS_CAN_SKIP", new=False), mock.patch(
|
||||
"textual._timer.Timer._sleep", side_effect=sleep_mock
|
||||
), mock.patch(
|
||||
"textual._timer.Timer.get_time", side_effect=get_time_mock
|
||||
), mock.patch(
|
||||
"textual.message.Message._get_time", side_effect=get_time_mock
|
||||
):
|
||||
yield move_clock_forward
|
||||
clock_mock = ClockMock(ticks_granularity_fps=ticks_granularity_fps)
|
||||
with mock.patch("textual._clock._clock", new=clock_mock):
|
||||
yield clock_mock
|
||||
|
||||
return mock_textual_timers_context_manager()
|
||||
|
||||
Reference in New Issue
Block a user