mirror of
https://github.com/Textualize/textual.git
synced 2025-10-17 02:38:12 +03:00
[tests][e2e] Add a test for Widget#scroll_to_widget()
This commit is contained in:
@@ -47,6 +47,9 @@ includes = "src"
|
||||
[tool.pytest.ini_options]
|
||||
asyncio_mode = "auto"
|
||||
testpaths = ["tests"]
|
||||
markers = [
|
||||
"integration_test: marks tests as slow integration tests(deselect with '-m \"not integration_test\"')",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
|
||||
@@ -20,7 +20,7 @@ class VerticalContainer(Widget):
|
||||
|
||||
VerticalContainer Placeholder {
|
||||
margin: 1 0;
|
||||
height: 3;
|
||||
height: 5;
|
||||
border: solid lime;
|
||||
align: center top;
|
||||
}
|
||||
@@ -79,10 +79,10 @@ class MyTestApp(App):
|
||||
placeholders = self.query("Placeholder")
|
||||
placeholders_count = len(placeholders)
|
||||
placeholder = Placeholder(
|
||||
id=f"placeholder_{placeholders_count+1}",
|
||||
name=f"Placeholder #{placeholders_count+1}",
|
||||
id=f"placeholder_{placeholders_count}",
|
||||
name=f"Placeholder #{placeholders_count}",
|
||||
)
|
||||
root = self.query_one("#root")
|
||||
root = self.get_child("root")
|
||||
root.mount(placeholder)
|
||||
self.refresh(repaint=True, layout=True)
|
||||
self.refresh_css()
|
||||
|
||||
@@ -143,6 +143,9 @@ class App(Generic[ReturnType], DOMNode):
|
||||
self.driver_class = driver_class or self.get_driver_class()
|
||||
self._title = title
|
||||
self._screen_stack: list[Screen] = []
|
||||
self._sync_available = (
|
||||
os.environ.get("TERM_PROGRAM", "") != "Apple_Terminal" and not WINDOWS
|
||||
)
|
||||
|
||||
self.focused: Widget | None = None
|
||||
self.mouse_over: Widget | None = None
|
||||
@@ -806,20 +809,19 @@ class App(Generic[ReturnType], DOMNode):
|
||||
def refresh(self, *, repaint: bool = True, layout: bool = False) -> None:
|
||||
if not self._running:
|
||||
return
|
||||
sync_available = (
|
||||
os.environ.get("TERM_PROGRAM", "") != "Apple_Terminal" and not WINDOWS
|
||||
)
|
||||
if not self._closed:
|
||||
console = self.console
|
||||
try:
|
||||
if sync_available:
|
||||
if self._sync_available:
|
||||
console.file.write("\x1bP=1s\x1b\\")
|
||||
console.print(
|
||||
ScreenRenderable(
|
||||
Control.home(), self.screen._compositor, Control.home()
|
||||
Control.home(),
|
||||
self.screen._compositor,
|
||||
Control.home(),
|
||||
)
|
||||
)
|
||||
if sync_available:
|
||||
if self._sync_available:
|
||||
console.file.write("\x1bP=2s\x1b\\")
|
||||
console.file.flush()
|
||||
except Exception as error:
|
||||
|
||||
@@ -150,8 +150,6 @@ async def test_composition_of_vertical_container_with_children(
|
||||
expected_screen_size = Size(*screen_size)
|
||||
|
||||
async with app.in_running_state():
|
||||
app.log_tree()
|
||||
|
||||
# root widget checks:
|
||||
root_widget = cast(Widget, app.get_child("root"))
|
||||
assert root_widget.size == expected_screen_size
|
||||
|
||||
116
tests/test_integration_scrolling.py
Normal file
116
tests/test_integration_scrolling.py
Normal file
@@ -0,0 +1,116 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from typing import Sequence, cast
|
||||
|
||||
if sys.version_info >= (3, 8):
|
||||
from typing import Literal
|
||||
else:
|
||||
from typing_extensions import Literal # pragma: no cover
|
||||
|
||||
|
||||
import pytest
|
||||
|
||||
from sandbox.vertical_container import VerticalContainer
|
||||
from tests.utilities.test_app import AppTest
|
||||
from textual.app import ComposeResult
|
||||
from textual.geometry import Size
|
||||
from textual.widget import Widget
|
||||
from textual.widgets import Placeholder
|
||||
|
||||
SCREEN_SIZE = Size(100, 30)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.integration_test # this is a slow test, we may want to skip them in some contexts
|
||||
@pytest.mark.parametrize(
|
||||
(
|
||||
"screen_size",
|
||||
"placeholders_count",
|
||||
"scroll_to_placeholder_id",
|
||||
"scroll_to_animate",
|
||||
"waiting_duration",
|
||||
"last_screen_expected_placeholder_ids",
|
||||
"last_screen_expected_out_of_viewport_placeholder_ids",
|
||||
),
|
||||
(
|
||||
[SCREEN_SIZE, 10, None, None, 0.01, (0, 1, 2, 3, 4), "others"],
|
||||
[SCREEN_SIZE, 10, "placeholder_3", False, 0.01, (0, 1, 2, 3, 4), "others"],
|
||||
[SCREEN_SIZE, 10, "placeholder_5", False, 0.01, (1, 2, 3, 4, 5), "others"],
|
||||
[SCREEN_SIZE, 10, "placeholder_7", False, 0.01, (3, 4, 5, 6, 7), "others"],
|
||||
[SCREEN_SIZE, 10, "placeholder_9", False, 0.01, (5, 6, 7, 8, 9), "others"],
|
||||
# N.B. Scroll duration is hard-coded to 0.2 in the `scroll_to_widget` method atm
|
||||
# Waiting for this duration should allow us to see the scroll finished:
|
||||
[SCREEN_SIZE, 10, "placeholder_9", True, 0.21, (5, 6, 7, 8, 9), "others"],
|
||||
# After having waited for approximately half of the scrolling duration, we should
|
||||
# see the middle Placeholders as we're scrolling towards the last of them.
|
||||
# The state of the screen at this "halfway there" timing looks to not be deterministic though,
|
||||
# depending on the environment - so let's only assert stuff for the middle placeholders
|
||||
# and the first and last ones, but without being too specific about the others:
|
||||
[SCREEN_SIZE, 10, "placeholder_9", True, 0.1, (5, 6, 7), (1, 2, 9)],
|
||||
),
|
||||
)
|
||||
async def test_scroll_to_widget(
|
||||
screen_size: Size,
|
||||
placeholders_count: int,
|
||||
scroll_to_animate: bool | None,
|
||||
scroll_to_placeholder_id: str | None,
|
||||
waiting_duration: float | None,
|
||||
last_screen_expected_placeholder_ids: Sequence[int],
|
||||
last_screen_expected_out_of_viewport_placeholder_ids: Sequence[int]
|
||||
| Literal["others"],
|
||||
):
|
||||
class MyTestApp(AppTest):
|
||||
CSS = """
|
||||
Placeholder {
|
||||
height: 5; /* minimal height to see the name of a Placeholder */
|
||||
}
|
||||
"""
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
placeholders = [
|
||||
Placeholder(id=f"placeholder_{i}", name=f"Placeholder #{i}")
|
||||
for i in range(placeholders_count)
|
||||
]
|
||||
|
||||
yield VerticalContainer(*placeholders, id="root")
|
||||
|
||||
app = MyTestApp(size=screen_size, test_name="scroll_to_widget")
|
||||
|
||||
async with app.in_running_state(waiting_duration_post_yield=waiting_duration or 0):
|
||||
if scroll_to_placeholder_id:
|
||||
target_widget_container = cast(Widget, app.query("#root").first())
|
||||
target_widget = cast(
|
||||
Widget, app.query(f"#{scroll_to_placeholder_id}").first()
|
||||
)
|
||||
target_widget_container.scroll_to_widget(
|
||||
target_widget, animate=scroll_to_animate
|
||||
)
|
||||
|
||||
last_display_capture = app.last_display_capture
|
||||
|
||||
placeholders_visibility_by_id = {
|
||||
id_: f"placeholder_{id_}" in last_display_capture
|
||||
for id_ in range(placeholders_count)
|
||||
}
|
||||
|
||||
# Let's start by checking placeholders that should be visible:
|
||||
for placeholder_id in last_screen_expected_placeholder_ids:
|
||||
assert (
|
||||
placeholders_visibility_by_id[placeholder_id] is True
|
||||
), f"Placeholder '{placeholder_id}' should be visible but isn't"
|
||||
|
||||
# Ok, now for placeholders that should *not* be visible:
|
||||
if last_screen_expected_out_of_viewport_placeholder_ids == "others":
|
||||
# We're simply going to check that all the placeholders that are not in
|
||||
# `last_screen_expected_placeholder_ids` are not on the screen:
|
||||
last_screen_expected_out_of_viewport_placeholder_ids = sorted(
|
||||
tuple(
|
||||
set(range(placeholders_count))
|
||||
- set(last_screen_expected_placeholder_ids)
|
||||
)
|
||||
)
|
||||
for placeholder_id in last_screen_expected_out_of_viewport_placeholder_ids:
|
||||
assert (
|
||||
placeholders_visibility_by_id[placeholder_id] is False
|
||||
), f"Placeholder '{placeholder_id}' should not be visible but is"
|
||||
@@ -4,9 +4,10 @@ import asyncio
|
||||
import contextlib
|
||||
import io
|
||||
from pathlib import Path
|
||||
from typing import AsyncContextManager
|
||||
from typing import AsyncContextManager, cast
|
||||
|
||||
from rich.console import Console
|
||||
|
||||
from rich.console import Console, Capture
|
||||
from textual import events
|
||||
from textual.app import App, ReturnType, ComposeResult
|
||||
from textual.driver import Driver
|
||||
@@ -16,6 +17,9 @@ from textual.geometry import Size
|
||||
# N.B. These classes would better be named TestApp/TestConsole/TestDriver/etc,
|
||||
# but it makes pytest emit warning as it will try to collect them as classes containing test cases :-/
|
||||
|
||||
# This value is also hard-coded in Textual's `App` class:
|
||||
CLEAR_SCREEN_SEQUENCE = "\x1bP=1s\x1b\\"
|
||||
|
||||
|
||||
class AppTest(App):
|
||||
def __init__(
|
||||
@@ -25,7 +29,7 @@ class AppTest(App):
|
||||
size: Size,
|
||||
log_verbosity: int = 2,
|
||||
):
|
||||
# will log in "/tests/test.[test name].log":
|
||||
# Tests will log in "/tests/test.[test name].log":
|
||||
log_path = Path(__file__).parent.parent / f"test.{test_name}.log"
|
||||
super().__init__(
|
||||
driver_class=DriverTest,
|
||||
@@ -33,6 +37,11 @@ class AppTest(App):
|
||||
log_verbosity=log_verbosity,
|
||||
log_color_system="256",
|
||||
)
|
||||
|
||||
# We need this so the `CLEAR_SCREEN_SEQUENCE` is always sent for a screen refresh,
|
||||
# whatever the environment:
|
||||
self._sync_available = True
|
||||
|
||||
self._size = size
|
||||
self._console = ConsoleTest(width=size.width, height=size.height)
|
||||
self._error_console = ConsoleTest(width=size.width, height=size.height)
|
||||
@@ -49,16 +58,18 @@ class AppTest(App):
|
||||
def in_running_state(
|
||||
self,
|
||||
*,
|
||||
initialisation_timeout: float = 0.1,
|
||||
) -> AsyncContextManager[Capture]:
|
||||
waiting_duration_after_initialisation: float = 0.001,
|
||||
waiting_duration_post_yield: float = 0,
|
||||
) -> AsyncContextManager:
|
||||
async def run_app() -> None:
|
||||
await self.process_messages()
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def get_running_state_context_manager():
|
||||
self._set_active()
|
||||
run_task = asyncio.create_task(run_app())
|
||||
timeout_before_yielding_task = asyncio.create_task(
|
||||
asyncio.sleep(initialisation_timeout)
|
||||
asyncio.sleep(waiting_duration_after_initialisation)
|
||||
)
|
||||
done, pending = await asyncio.wait(
|
||||
(
|
||||
@@ -69,10 +80,11 @@ class AppTest(App):
|
||||
)
|
||||
if run_task in done or run_task not in pending:
|
||||
raise RuntimeError(
|
||||
"TestApp is no longer return after its initialization period"
|
||||
"TestApp is no longer running after its initialization period"
|
||||
)
|
||||
with self.console.capture() as capture:
|
||||
yield capture
|
||||
yield
|
||||
if waiting_duration_post_yield:
|
||||
await asyncio.sleep(waiting_duration_post_yield)
|
||||
assert not run_task.done()
|
||||
await self.shutdown()
|
||||
|
||||
@@ -83,6 +95,18 @@ class AppTest(App):
|
||||
"Use `async with my_test_app.in_running_state()` rather than `my_test_app.run()`"
|
||||
)
|
||||
|
||||
@property
|
||||
def total_capture(self) -> str | None:
|
||||
return self.console.file.getvalue()
|
||||
|
||||
@property
|
||||
def last_display_capture(self) -> str | None:
|
||||
total_capture = self.total_capture
|
||||
if not total_capture:
|
||||
return None
|
||||
last_display_start_index = total_capture.rindex(CLEAR_SCREEN_SEQUENCE)
|
||||
return total_capture[last_display_start_index:]
|
||||
|
||||
@property
|
||||
def console(self) -> ConsoleTest:
|
||||
return self._console
|
||||
@@ -110,10 +134,18 @@ class ConsoleTest(Console):
|
||||
file=file,
|
||||
width=width,
|
||||
height=height,
|
||||
force_terminal=True,
|
||||
force_terminal=False,
|
||||
legacy_windows=False,
|
||||
)
|
||||
|
||||
@property
|
||||
def file(self) -> io.StringIO:
|
||||
return cast(io.StringIO, self._file)
|
||||
|
||||
@property
|
||||
def is_dumb_terminal(self) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
class DriverTest(Driver):
|
||||
def start_application_mode(self) -> None:
|
||||
|
||||
Reference in New Issue
Block a user