mirror of
https://github.com/Textualize/textual.git
synced 2025-10-17 02:38:12 +03:00
Merge branch 'css' of github.com:Textualize/textual into style-error-improvements
This commit is contained in:
@@ -391,7 +391,9 @@ class Compositor:
|
||||
self._cuts = [sorted(set(line_cuts)) for line_cuts in cuts]
|
||||
return self._cuts
|
||||
|
||||
def _get_renders(self) -> Iterable[tuple[Region, Region, Lines]]:
|
||||
def _get_renders(
|
||||
self, crop: Region | None = None
|
||||
) -> Iterable[tuple[Region, Region, Lines]]:
|
||||
"""Get rendered widgets (lists of segments) in the composition.
|
||||
|
||||
Returns:
|
||||
@@ -402,15 +404,21 @@ class Compositor:
|
||||
_rich_traceback_guard = True
|
||||
|
||||
if self.map:
|
||||
widget_regions = sorted(
|
||||
[
|
||||
if crop:
|
||||
overlaps = crop.overlaps
|
||||
mapped_regions = [
|
||||
(widget, region, order, clip)
|
||||
for widget, (region, order, clip, _, _) in self.map.items()
|
||||
if widget.visible and not widget.is_transparent and overlaps(crop)
|
||||
]
|
||||
else:
|
||||
mapped_regions = [
|
||||
(widget, region, order, clip)
|
||||
for widget, (region, order, clip, _, _) in self.map.items()
|
||||
if widget.visible and not widget.is_transparent
|
||||
],
|
||||
key=itemgetter(2),
|
||||
reverse=True,
|
||||
)
|
||||
]
|
||||
|
||||
widget_regions = sorted(mapped_regions, key=itemgetter(2), reverse=True)
|
||||
else:
|
||||
widget_regions = []
|
||||
|
||||
@@ -480,23 +488,20 @@ class Compositor:
|
||||
]
|
||||
|
||||
# Go through all the renders in reverse order and fill buckets with no render
|
||||
renders = self._get_renders()
|
||||
renders = self._get_renders(crop)
|
||||
intersection = Region.intersection
|
||||
|
||||
for region, clip, lines in renders:
|
||||
render_region = intersection(region, clip)
|
||||
|
||||
for y, line in zip(render_region.y_range, lines):
|
||||
first_cut, last_cut = render_region.x_extents
|
||||
final_cuts = [cut for cut in cuts[y] if (last_cut >= cut >= first_cut)]
|
||||
|
||||
# TODO: Suspect this may break for region not on cut boundaries
|
||||
if len(final_cuts) == 2:
|
||||
# Two cuts, which means the entire line
|
||||
cut_segments = [line]
|
||||
else:
|
||||
# More than one cut, which means we need to divide the line
|
||||
# if not final_cuts:
|
||||
# continue
|
||||
render_x = render_region.x
|
||||
relative_cuts = [cut - render_x for cut in final_cuts]
|
||||
_, *cut_segments = divide(line, relative_cuts)
|
||||
|
||||
@@ -47,7 +47,7 @@ class _ReadUntil(Awaitable):
|
||||
self.max_bytes = max_bytes
|
||||
|
||||
|
||||
class PeekBuffer(Awaitable):
|
||||
class _PeekBuffer(Awaitable):
|
||||
__slots__: list[str] = []
|
||||
|
||||
|
||||
@@ -61,7 +61,7 @@ class Parser(Generic[T]):
|
||||
read = _Read
|
||||
read1 = _Read1
|
||||
read_until = _ReadUntil
|
||||
peek_buffer = PeekBuffer
|
||||
peek_buffer = _PeekBuffer
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._buffer = io.StringIO()
|
||||
@@ -103,14 +103,14 @@ class Parser(Generic[T]):
|
||||
while tokens:
|
||||
yield popleft()
|
||||
|
||||
while pos < data_size or isinstance(self._awaiting, PeekBuffer):
|
||||
while pos < data_size or isinstance(self._awaiting, _PeekBuffer):
|
||||
|
||||
_awaiting = self._awaiting
|
||||
if isinstance(_awaiting, _Read1):
|
||||
self._awaiting = self._gen.send(data[pos : pos + 1])
|
||||
pos += 1
|
||||
|
||||
elif isinstance(_awaiting, PeekBuffer):
|
||||
elif isinstance(_awaiting, _PeekBuffer):
|
||||
self._awaiting = self._gen.send(data[pos:])
|
||||
|
||||
elif isinstance(_awaiting, _Read):
|
||||
|
||||
@@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
from typing import Any, Callable, Generator
|
||||
from typing import Any, Callable, Generator, Iterable
|
||||
|
||||
from . import log
|
||||
from . import events
|
||||
@@ -34,6 +34,11 @@ class XTermParser(Parser[events.Event]):
|
||||
def debug_log(self, *args: Any) -> None:
|
||||
if self._debug_log_file is not None:
|
||||
self._debug_log_file.write(" ".join(args) + "\n")
|
||||
self._debug_log_file.flush()
|
||||
|
||||
def feed(self, data: str) -> Iterable[events.Event]:
|
||||
self.debug_log(f"FEED {data!r}")
|
||||
return super().feed(data)
|
||||
|
||||
def parse_mouse_code(self, code: str, sender: MessageTarget) -> events.Event | None:
|
||||
sgr_match = self._re_sgr_mouse.match(code)
|
||||
@@ -83,10 +88,23 @@ class XTermParser(Parser[events.Event]):
|
||||
while not self.is_eof:
|
||||
character = yield read1()
|
||||
self.debug_log(f"character={character!r}")
|
||||
# The more_data is to allow the parse to distinguish between an escape sequence
|
||||
# and the escape key pressed
|
||||
if character == ESC and ((yield self.peek_buffer()) or more_data()):
|
||||
if character == ESC:
|
||||
# Could be the escape key was pressed OR the start of an escape sequence
|
||||
sequence: str = character
|
||||
peek_buffer = yield self.peek_buffer()
|
||||
if not peek_buffer:
|
||||
# An escape arrived without any following characters
|
||||
on_token(events.Key(self.sender, key=ESC))
|
||||
continue
|
||||
if peek_buffer and peek_buffer[0] == ESC:
|
||||
# There is an escape in the buffer, so ESC ESC has arrived
|
||||
yield read1()
|
||||
on_token(events.Key(self.sender, key=ESC))
|
||||
# If there is no further data, it is not part of a sequence,
|
||||
# So we don't need to go in to the loop
|
||||
if len(peek_buffer) == 1 and not more_data():
|
||||
continue
|
||||
|
||||
while True:
|
||||
sequence += yield read1()
|
||||
self.debug_log(f"sequence={sequence!r}")
|
||||
|
||||
@@ -253,7 +253,7 @@ class App(DOMNode):
|
||||
key_values = " ".join(
|
||||
f"{key}={value}" for key, value in kwargs.items()
|
||||
)
|
||||
output = " ".join((output, key_values))
|
||||
output = f"{output} {key_values}" if output else key_values
|
||||
if self._log_console is not None:
|
||||
self._log_console.print(output, soft_wrap=True)
|
||||
if self.devtools.is_connected:
|
||||
|
||||
@@ -131,12 +131,12 @@ class ScalarProperty:
|
||||
):
|
||||
raise StyleValueError("'auto' not allowed here")
|
||||
|
||||
if new_value.unit != Unit.AUTO:
|
||||
if new_value is not None and new_value.unit not in self.units:
|
||||
if new_value is not None and new_value.unit != Unit.AUTO:
|
||||
if new_value.unit not in self.units:
|
||||
raise StyleValueError(
|
||||
f"{self.name} units must be one of {friendly_list(get_symbols(self.units))}"
|
||||
)
|
||||
if new_value is not None and new_value.is_percent:
|
||||
if new_value.is_percent:
|
||||
new_value = Scalar(
|
||||
float(new_value.value), self.percent_unit, Unit.WIDTH
|
||||
)
|
||||
@@ -715,7 +715,7 @@ class NameListProperty:
|
||||
def __get__(
|
||||
self, obj: StylesBase, objtype: type[StylesBase] | None = None
|
||||
) -> tuple[str, ...]:
|
||||
return cast(tuple[str, ...], obj.get_rule(self.name, ()))
|
||||
return cast("tuple[str, ...]", obj.get_rule(self.name, ()))
|
||||
|
||||
def __set__(self, obj: StylesBase, names: str | tuple[str] | None = None):
|
||||
|
||||
|
||||
@@ -190,18 +190,21 @@ class LinuxDriver(Driver):
|
||||
return False
|
||||
|
||||
parser = XTermParser(self._target, more_data)
|
||||
feed = parser.feed
|
||||
|
||||
utf8_decoder = getincrementaldecoder("utf-8")().decode
|
||||
decode = utf8_decoder
|
||||
read = os.read
|
||||
|
||||
EVENT_READ = selectors.EVENT_READ
|
||||
|
||||
try:
|
||||
while not self.exit_event.is_set():
|
||||
selector_events = selector.select(0.1)
|
||||
for _selector_key, mask in selector_events:
|
||||
if mask | selectors.EVENT_READ:
|
||||
if mask | EVENT_READ:
|
||||
unicode_data = decode(read(fileno, 1024))
|
||||
for event in parser.feed(unicode_data):
|
||||
for event in feed(unicode_data):
|
||||
self.process_event(event)
|
||||
except Exception as error:
|
||||
log(error)
|
||||
|
||||
@@ -285,5 +285,6 @@ class EventMonitor(threading.Thread):
|
||||
|
||||
def on_size_change(self, width: int, height: int) -> None:
|
||||
"""Called when terminal size changes."""
|
||||
event = Resize(self.target, Size(width, height))
|
||||
size = Size(width, height)
|
||||
event = Resize(self.target, size, size)
|
||||
run_coroutine_threadsafe(self.target.post_message(event), loop=self.loop)
|
||||
|
||||
@@ -524,6 +524,7 @@ class Widget(DOMNode):
|
||||
self._container_size = container_size
|
||||
|
||||
if self.is_container:
|
||||
self._refresh_scrollbars()
|
||||
width, height = self.container_size
|
||||
if self.show_vertical_scrollbar:
|
||||
self.vertical_scrollbar.window_virtual_size = virtual_size.height
|
||||
@@ -534,7 +535,6 @@ class Widget(DOMNode):
|
||||
|
||||
self.refresh(layout=True)
|
||||
self.call_later(self.scroll_to, self.scroll_x, self.scroll_y)
|
||||
self._refresh_scrollbars()
|
||||
else:
|
||||
self.refresh()
|
||||
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
from decimal import Decimal
|
||||
|
||||
import pytest
|
||||
|
||||
from rich.style import Style
|
||||
|
||||
from textual.color import Color
|
||||
from textual.css.errors import StyleValueError
|
||||
from textual.css.scalar import Scalar, Unit
|
||||
from textual.css.styles import Styles, RenderStyles
|
||||
from textual.dom import DOMNode
|
||||
from textual.widget import Widget
|
||||
|
||||
|
||||
def test_styles_reset():
|
||||
@@ -131,3 +135,53 @@ def test_opacity_set_invalid_type_error():
|
||||
styles = RenderStyles(DOMNode(), Styles(), Styles())
|
||||
with pytest.raises(StyleValueError):
|
||||
styles.opacity = "invalid value"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"size_dimension_input,size_dimension_expected_output",
|
||||
[
|
||||
# fmt: off
|
||||
[None, None],
|
||||
[1, Scalar(1, Unit.CELLS, Unit.WIDTH)],
|
||||
[1.0, Scalar(1.0, Unit.CELLS, Unit.WIDTH)],
|
||||
[1.2, Scalar(1.2, Unit.CELLS, Unit.WIDTH)],
|
||||
[1.2e3, Scalar(1200.0, Unit.CELLS, Unit.WIDTH)],
|
||||
["20", Scalar(20, Unit.CELLS, Unit.WIDTH)],
|
||||
["1.4", Scalar(1.4, Unit.CELLS, Unit.WIDTH)],
|
||||
[Scalar(100, Unit.CELLS, Unit.WIDTH), Scalar(100, Unit.CELLS, Unit.WIDTH)],
|
||||
[Scalar(10.3, Unit.CELLS, Unit.WIDTH), Scalar(10.3, Unit.CELLS, Unit.WIDTH)],
|
||||
[Scalar(10.4, Unit.CELLS, Unit.HEIGHT), Scalar(10.4, Unit.CELLS, Unit.HEIGHT)],
|
||||
[Scalar(10.5, Unit.PERCENT, Unit.WIDTH), Scalar(10.5, Unit.WIDTH, Unit.WIDTH)],
|
||||
[Scalar(10.6, Unit.PERCENT, Unit.PERCENT), Scalar(10.6, Unit.WIDTH, Unit.WIDTH)],
|
||||
[Scalar(10.7, Unit.HEIGHT, Unit.PERCENT), Scalar(10.7, Unit.HEIGHT, Unit.PERCENT)],
|
||||
# percentage values are normalised to floats and get the WIDTH "percent_unit":
|
||||
[Scalar(11, Unit.PERCENT, Unit.HEIGHT), Scalar(11.0, Unit.WIDTH, Unit.WIDTH)],
|
||||
# fmt: on
|
||||
],
|
||||
)
|
||||
def test_widget_style_size_can_accept_various_data_types_and_normalize_them(
|
||||
size_dimension_input, size_dimension_expected_output
|
||||
):
|
||||
widget = Widget()
|
||||
|
||||
widget.styles.width = size_dimension_input
|
||||
assert widget.styles.width == size_dimension_expected_output
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"size_dimension_input",
|
||||
[
|
||||
"a",
|
||||
"1.4e3",
|
||||
3.14j,
|
||||
Decimal("3.14"),
|
||||
list(),
|
||||
tuple(),
|
||||
dict(),
|
||||
],
|
||||
)
|
||||
def test_widget_style_size_fails_if_data_type_is_not_supported(size_dimension_input):
|
||||
widget = Widget()
|
||||
|
||||
with pytest.raises(StyleValueError):
|
||||
widget.styles.width = size_dimension_input
|
||||
|
||||
55
tests/test_parser.py
Normal file
55
tests/test_parser.py
Normal file
@@ -0,0 +1,55 @@
|
||||
from textual._parser import Parser
|
||||
|
||||
|
||||
def test_read1():
|
||||
class TestParser(Parser[str]):
|
||||
"""A simple parser that reads a byte at a time from a stream."""
|
||||
|
||||
def parse(self, on_token):
|
||||
while True:
|
||||
data = yield self.read1()
|
||||
if not data:
|
||||
break
|
||||
on_token(data)
|
||||
|
||||
test_parser = TestParser()
|
||||
|
||||
test_data = "Where there is a Will there is a way!"
|
||||
|
||||
for size in range(1, len(test_data) + 1):
|
||||
# Feed the parser in pieces, first 1 character at a time, then 2, etc
|
||||
test_parser = TestParser()
|
||||
data = []
|
||||
for offset in range(0, len(test_data), size):
|
||||
for chunk in test_parser.feed(test_data[offset : offset + size]):
|
||||
data.append(chunk)
|
||||
# Check we have received all the data in characters, no matter the fee dsize
|
||||
assert len(data) == len(test_data)
|
||||
assert "".join(data) == test_data
|
||||
|
||||
|
||||
def test_read():
|
||||
class TestParser(Parser[str]):
|
||||
"""A parser that reads chunks of a given size from the stream."""
|
||||
|
||||
def __init__(self, size):
|
||||
self.size = size
|
||||
super().__init__()
|
||||
|
||||
def parse(self, on_token):
|
||||
while True:
|
||||
data = yield self.read1()
|
||||
if not data:
|
||||
break
|
||||
on_token(data)
|
||||
|
||||
test_data = "Where there is a Will there is a way!"
|
||||
|
||||
for read_size in range(1, len(test_data) + 1):
|
||||
for size in range(1, len(test_data) + 1):
|
||||
test_parser = TestParser(read_size)
|
||||
data = []
|
||||
for offset in range(0, len(test_data), size):
|
||||
for chunk in test_parser.feed(test_data[offset : offset + size]):
|
||||
data.append(chunk)
|
||||
assert "".join(data) == test_data
|
||||
@@ -1,16 +1,22 @@
|
||||
from contextlib import nullcontext as does_not_raise
|
||||
from decimal import Decimal
|
||||
|
||||
import pytest
|
||||
|
||||
from textual.css.errors import StyleValueError
|
||||
from textual.css.scalar import Scalar, Unit
|
||||
from textual.widget import Widget
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"set_val, get_val, style_str", [
|
||||
"set_val, get_val, style_str",
|
||||
[
|
||||
[True, True, "visible"],
|
||||
[False, False, "hidden"],
|
||||
["hidden", False, "hidden"],
|
||||
["visible", True, "visible"],
|
||||
])
|
||||
],
|
||||
)
|
||||
def test_widget_set_visible_true(set_val, get_val, style_str):
|
||||
widget = Widget()
|
||||
widget.visible = set_val
|
||||
|
||||
Reference in New Issue
Block a user