[App] Finally, time mocking in tests seems to be working! 😅

I had to add a flag in the `_timer` module that allows us to completely disable the  "skip" feature of Timers, though - but it shouldn't cause too much trouble 🤞
This commit is contained in:
Olivier Philippon
2022-05-13 10:35:06 +01:00
parent 74ad6f73fa
commit 15df759197
6 changed files with 211 additions and 138 deletions

View File

@@ -3,19 +3,25 @@ from __future__ import annotations
import asyncio
import contextlib
import io
import sys
from math import ceil
from pathlib import Path
from time import monotonic
from typing import AsyncContextManager, cast, ContextManager, Callable
from typing import AsyncContextManager, cast, ContextManager
from unittest import mock
from rich.console import Console
from textual import events, errors
from textual._timer import Timer
from textual.app import App, ComposeResult
from textual.driver import Driver
from textual.geometry import Size
if sys.version_info >= (3, 8):
from typing import Protocol
else:
from typing_extensions import Protocol
# N.B. These classes would better be named TestApp/TestConsole/TestDriver/etc,
# but it makes pytest emit warning as it will try to collect them as classes containing test cases :-/
@@ -24,6 +30,12 @@ from textual.geometry import Size
CLEAR_SCREEN_SEQUENCE = "\x1bP=1s\x1b\\"
class MockedTimeMoveClockForward(Protocol):
async def __call__(self, *, seconds: float) -> tuple[float, int]:
"""Returns the new current (mocked) monotonic time and the number of activated Timers"""
...
class AppTest(App):
def __init__(
self,
@@ -64,52 +76,57 @@ class AppTest(App):
def in_running_state(
self,
*,
time_mocking_ticks_granularity_fps: int = 60, # i.e. when moving forward by 1 second we'll do it though 60 ticks
waiting_duration_after_initialisation: float = 0.1,
waiting_duration_post_yield: float = 0,
time_acceleration: bool = True,
time_acceleration_factor: float = 10,
# force_timers_tick_after_yield: bool = True,
) -> AsyncContextManager:
waiting_duration_after_yield: float = 0,
) -> AsyncContextManager[MockedTimeMoveClockForward]:
async def run_app() -> None:
await self.process_messages()
if time_acceleration:
waiting_duration_after_initialisation /= time_acceleration_factor
waiting_duration_post_yield /= time_acceleration_factor
time_acceleration_context: ContextManager = (
textual_timers_accelerate_time(acceleration_factor=time_acceleration_factor)
if time_acceleration
else contextlib.nullcontext()
)
@contextlib.asynccontextmanager
async def get_running_state_context_manager():
self._set_active()
with time_acceleration_context:
with mock_textual_timers(
ticks_granularity_fps=time_mocking_ticks_granularity_fps
) as move_time_forward:
run_task = asyncio.create_task(run_app())
timeout_before_yielding_task = asyncio.create_task(
asyncio.sleep(waiting_duration_after_initialisation)
)
done, pending = await asyncio.wait(
(
run_task,
timeout_before_yielding_task,
),
return_when=asyncio.FIRST_COMPLETED,
)
if run_task in done or run_task not in pending:
raise RuntimeError(
"TestApp is no longer running after its initialization period"
)
yield
waiting_duration = max(
waiting_duration_post_yield or 0,
self.screen._update_timer._interval,
)
await asyncio.sleep(waiting_duration)
await asyncio.sleep(0.001)
# timeout_before_yielding_task = asyncio.create_task(
# asyncio.sleep(waiting_duration_after_initialisation)
# )
# done, pending = await asyncio.wait(
# (
# run_task,
# timeout_before_yielding_task,
# ),
# return_when=asyncio.FIRST_COMPLETED,
# )
# if run_task in done or run_task not in pending:
# raise RuntimeError(
# "TestApp is no longer running after its initialization period"
# )
await move_time_forward(seconds=waiting_duration_after_initialisation)
assert self._driver is not None
self.force_screen_update()
yield move_time_forward
await move_time_forward(seconds=waiting_duration_after_yield)
self.force_screen_update()
# waiting_duration = max(
# waiting_duration_post_yield or 0,
# self.screen._update_timer._interval,
# )
# await asyncio.sleep(waiting_duration)
# if force_timers_tick_after_yield:
# await textual_timers_force_tick()
assert not run_task.done()
await self.shutdown()
@@ -124,27 +141,10 @@ class AppTest(App):
"""Just a commodity shortcut for `async with app.in_running_state(): pass`, for simple cases"""
async with self.in_running_state(
waiting_duration_after_initialisation=waiting_duration_after_initialisation,
waiting_duration_post_yield=waiting_duration_before_shutdown,
waiting_duration_after_yield=waiting_duration_before_shutdown,
):
pass
def run(self):
raise NotImplementedError(
"Use `async with my_test_app.in_running_state()` rather than `my_test_app.run()`"
)
@property
def total_capture(self) -> str | None:
return self.console.file.getvalue()
@property
def last_display_capture(self) -> str | None:
total_capture = self.total_capture
if not total_capture:
return None
last_display_start_index = total_capture.rindex(CLEAR_SCREEN_SEQUENCE)
return total_capture[last_display_start_index:]
def get_char_at(self, x: int, y: int) -> str:
"""Get the character at the given cell or empty string
@@ -175,6 +175,34 @@ class AppTest(App):
return segment.text[0]
return ""
def force_screen_update(self, *, repaint: bool = True, layout: bool = True) -> None:
try:
self.screen.refresh(repaint=repaint, layout=layout)
self.screen._on_update()
except IndexError:
pass # the app may not have a screen yet
def on_exception(self, error: Exception) -> None:
# In tests we want the errors to be raised, rather than printed to a Console
raise error
def run(self):
raise NotImplementedError(
"Use `async with my_test_app.in_running_state()` rather than `my_test_app.run()`"
)
@property
def total_capture(self) -> str | None:
return self.console.file.getvalue()
@property
def last_display_capture(self) -> str | None:
total_capture = self.total_capture
if not total_capture:
return None
last_display_start_index = total_capture.rindex(CLEAR_SCREEN_SEQUENCE)
return total_capture[last_display_start_index:]
@property
def console(self) -> ConsoleTest:
return self._console
@@ -231,43 +259,71 @@ class DriverTest(Driver):
pass
async def textual_timers_force_tick() -> None:
timer_instances_tick_tasks: list[asyncio.Task] = []
for timer in Timer._instances:
task = asyncio.create_task(timer._tick(next_timer=0, count=0))
timer_instances_tick_tasks.append(task)
await asyncio.wait(timer_instances_tick_tasks)
def mock_textual_timers(
*,
ticks_granularity_fps: int = 60,
) -> ContextManager[MockedTimeMoveClockForward]:
single_tick_duration = 1.0 / ticks_granularity_fps
pending_sleep_events: list[tuple[float, asyncio.Event]] = []
def textual_timers_accelerate_time(
*, acceleration_factor: float = 10
) -> ContextManager:
@contextlib.contextmanager
def accelerate_time_for_timer_context_manager():
starting_time = monotonic()
def mock_textual_timers_context_manager():
# N.B. `start_time` is not used, but it is useful to have when we set breakpoints there :-)
start_time = current_time = monotonic()
# Our replacement for "textual._timer.Timer._sleep":
async def timer_sleep(duration: float) -> None:
await asyncio.sleep(duration / acceleration_factor)
async def sleep_mock(duration: float) -> None:
event = asyncio.Event()
target_event_monotonic_time = current_time + duration
pending_sleep_events.append((target_event_monotonic_time, event))
# Ok, let's wait for this Event
# - which can only be "unlocked" by calls to `move_clock_forward()`
await event.wait()
# Our replacement for "textual._timer.Timer.get_time":
def timer_get_time() -> float:
real_now = monotonic()
real_elapsed_time = real_now - starting_time
accelerated_elapsed_time = real_elapsed_time * acceleration_factor
print(
f"timer_get_time:: accelerated_elapsed_time={accelerated_elapsed_time}"
)
return starting_time + accelerated_elapsed_time
# Our replacement for "textual._timer.Timer.get_time" and "textual.message.Message._get_time":
def get_time_mock() -> float:
return current_time
with mock.patch("textual._timer.Timer._sleep") as timer_sleep_mock, mock.patch(
"textual._timer.Timer.get_time"
) as timer_get_time_mock, mock.patch(
"textual.message.Message._get_time"
) as message_get_time_mock:
timer_sleep_mock.side_effect = timer_sleep
timer_get_time_mock.side_effect = timer_get_time
message_get_time_mock.side_effect = timer_get_time
yield
async def move_clock_forward(*, seconds: float) -> tuple[float, int]:
nonlocal current_time, start_time
return accelerate_time_for_timer_context_manager()
ticks_count = ceil(seconds * ticks_granularity_fps)
activated_timers_count_total = 0
for tick_counter in range(ticks_count):
current_time += single_tick_duration
activated_timers_count_total += check_sleep_timers_to_activate()
# Let's give an opportunity to asyncio-related stuff to happen,
# now that we unlocked some occurrences of `await sleep(duration)`:
await asyncio.sleep(0.0001)
return current_time, activated_timers_count_total
def check_sleep_timers_to_activate() -> int:
nonlocal pending_sleep_events
activated_timers_count = 0
for i, (target_event_monotonic_time, event) in enumerate(
pending_sleep_events
):
if target_event_monotonic_time < current_time:
continue
# Right, let's release this waiting event!
event.set()
activated_timers_count += 1
# ...and remove it from our pending sleep events list:
del pending_sleep_events[i]
return activated_timers_count
with mock.patch("textual._timer._TIMERS_CAN_SKIP", new=False), mock.patch(
"textual._timer.Timer._sleep", side_effect=sleep_mock
), mock.patch(
"textual._timer.Timer.get_time", side_effect=get_time_mock
), mock.patch(
"textual.message.Message._get_time", side_effect=get_time_mock
):
yield move_clock_forward
return mock_textual_timers_context_manager()