Merge pull request #1444 from Textualize/strip-optimization

adds Strip primitive
This commit is contained in:
Will McGugan
2022-12-30 01:40:28 -08:00
committed by GitHub
21 changed files with 2956 additions and 2204 deletions

View File

@@ -5,12 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
## [0.8.3] - Unreleased
## [0.9.0] - Unreleased
### Added
- Added textual.strip.Strip primitive
- Added textual._cache.FIFOCache
- Added an option to clear columns in DataTable.clear() https://github.com/Textualize/textual/pull/1427
### Changed
- Widget.render_line now returns a Strip
- Fix for slow updates on Windows
## [0.8.2] - 2022-12-28
### Fixed
@@ -308,6 +315,10 @@ https://textual.textualize.io/blog/2022/11/08/version-040/#version-040
- New handler system for messages that doesn't require inheritance
- Improved traceback handling
[0.9.0]: https://github.com/Textualize/textual/compare/v0.8.2...v0.9.0
[0.8.2]: https://github.com/Textualize/textual/compare/v0.8.1...v0.8.2
[0.8.1]: https://github.com/Textualize/textual/compare/v0.8.0...v0.8.1
[0.8.0]: https://github.com/Textualize/textual/compare/v0.7.0...v0.8.0
[0.7.0]: https://github.com/Textualize/textual/compare/v0.6.0...v0.7.0
[0.6.0]: https://github.com/Textualize/textual/compare/v0.5.0...v0.6.0
[0.5.0]: https://github.com/Textualize/textual/compare/v0.4.0...v0.5.0

View File

@@ -0,0 +1,54 @@
---
draft: false
date: 2022-12-30
categories:
- DevLog
authors:
- willmcgugan
---
# A better asyncio sleep for Windows to fix animation
I spent some time optimizing Textual on Windows recently, and discovered something which may be of interest to anyone working with async code on that platform.
<!-- more -->
Animation, scrolling, and fading had always been unsatisfactory on Windows. Textual was usable, but the lag when scrolling made it a little unpleasant to use. On macOS and Linux, scrolling is fast enough that it feels close to a native app, and not something running in a terminal. Yet the Windows experience never improved, even as Textual got faster with each release.
I had chalked this up to Windows Terminal being slow to render updates. After all, the classic Windows terminal was (and still is) glacially slow. Perhaps Microsoft just weren't focusing on performance.
In retrospect, that was highly improbable. Like all modern terminals, Windows Terminal uses the GPU to render updates. Even without focussing on performance, it should be fast.
I figured I'd give it once last attempt to speed up Textual on Windows. If I failed, Windows would forever be a third-class platform for Textual apps.
It turned out that it was nothing to do with performance, per se. The issue was with a single asyncio function: `asyncio.sleep`.
Textual has a `Timer` class which creates events at regular intervals. It powers the JS-like `set_interval` and `set_timer` functions. It is also used internally to do animation (such as smooth scrolling). This Timer class calls `asyncio.sleep` to wait the time between one event and the next.
On macOS and Linux, calling `asynco.sleep` is fairly accurate. If you call `sleep(3.14)`, it will return within 1% of 3.14 seconds. This is not the case for Windows, which for historical reasons uses a timer with a granularity of 15 milliseconds. The upshot is that sleep times will be rounded up to the nearest multiple of 15 milliseconds.
This limit appears holds true for all async primitives on Windows. If you wait for something with a timeout, it will return on a multiple of 15 milliseconds. Fortunately there is work in the CPython pipeline to make this more accurate. Thanks to [Steve Dower](https://twitter.com/zooba) for pointing this out.
This lack of accuracy in the timer meant that timer events were created at a far slower rate that intended. Animation was slower because Textual was waiting too long between updates.
Once I had figured that out, I needed an alternative to `asyncio.sleep` for Textual's Timer class. And I found one. The following version of `sleep` is accurate to well within 1%:
```python
from time import sleep
from asyncio import get_running_loop
async def sleep(sleep_for: float) -> None:
"""An asyncio sleep.
On Windows this achieves a better granularity that asyncio.sleep
Args:
sleep_for (float): Seconds to sleep for.
"""
await get_running_loop().run_in_executor(None, time_sleep, sleep_for)
```
That is a drop-in replacement for sleep on Windows. With it, Textual runs a *lot* smoother. Easily on par with macOS and Linux.
It's not quite perfect. There is a little *tearing* during full "screen" updates, but performance is decent all round. I suspect when [this bug]( https://bugs.python.org/issue37871) is fixed (big thanks to [Paul Moore](https://twitter.com/pf_moore) for looking in to that), and Microsoft implements [this protocol](https://gist.github.com/christianparpart/d8a62cc1ab659194337d73e399004036) then Textual on Windows will be A+.
This Windows improvement will be in v0.9.0 of [Textual](https://github.com/Textualize/textual), which will be released in a few days.

View File

@@ -14,13 +14,14 @@ where the overhead of the cache is a small fraction of the total processing time
from __future__ import annotations
from threading import Lock
from typing import Dict, Generic, KeysView, TypeVar, overload
CacheKey = TypeVar("CacheKey")
CacheValue = TypeVar("CacheValue")
DefaultValue = TypeVar("DefaultValue")
__all__ = ["LRUCache", "FIFOCache"]
class LRUCache(Generic[CacheKey, CacheValue]):
"""
@@ -37,12 +38,22 @@ class LRUCache(Generic[CacheKey, CacheValue]):
"""
__slots__ = [
"_maxsize",
"_cache",
"_full",
"_head",
"hits",
"misses",
]
def __init__(self, maxsize: int) -> None:
self._maxsize = maxsize
self._cache: Dict[CacheKey, list[object]] = {}
self._full = False
self._head: list[object] = []
self._lock = Lock()
self.hits = 0
self.misses = 0
super().__init__()
@property
@@ -60,6 +71,11 @@ class LRUCache(Generic[CacheKey, CacheValue]):
def __len__(self) -> int:
return len(self._cache)
def __repr__(self) -> str:
return (
f"<LRUCache maxsize={self._maxsize} hits={self.hits} misses={self.misses}>"
)
def grow(self, maxsize: int) -> None:
"""Grow the maximum size to at least `maxsize` elements.
@@ -70,7 +86,6 @@ class LRUCache(Generic[CacheKey, CacheValue]):
def clear(self) -> None:
"""Clear the cache."""
with self._lock:
self._cache.clear()
self._full = False
self._head = []
@@ -87,7 +102,6 @@ class LRUCache(Generic[CacheKey, CacheValue]):
key (CacheKey): Key.
value (CacheValue): Value.
"""
with self._lock:
link = self._cache.get(key)
if link is None:
head = self._head
@@ -135,8 +149,8 @@ class LRUCache(Generic[CacheKey, CacheValue]):
"""
link = self._cache.get(key)
if link is None:
self.misses += 1
return default
with self._lock:
if link is not self._head:
# Remove link from list
link[0][1] = link[1] # type: ignore[index]
@@ -146,12 +160,14 @@ class LRUCache(Generic[CacheKey, CacheValue]):
link[0] = head[0]
link[1] = head
self._head = head[0][1] = head[0] = link # type: ignore[index]
self.hits += 1
return link[3] # type: ignore[return-value]
def __getitem__(self, key: CacheKey) -> CacheValue:
link = self._cache[key]
with self._lock:
link = self._cache.get(key)
if link is None:
self.misses += 1
raise KeyError(key)
if link is not self._head:
link[0][1] = link[1] # type: ignore[index]
link[1][0] = link[0] # type: ignore[index]
@@ -159,7 +175,110 @@ class LRUCache(Generic[CacheKey, CacheValue]):
link[0] = head[0]
link[1] = head
self._head = head[0][1] = head[0] = link # type: ignore[index]
self.hits += 1
return link[3] # type: ignore[return-value]
def __contains__(self, key: CacheKey) -> bool:
return key in self._cache
class FIFOCache(Generic[CacheKey, CacheValue]):
"""A simple cache that discards the first added key when full (First In First Out).
This has a lower overhead than LRUCache, but won't manage a working set as efficiently.
It is most suitable for a cache with a relatively low maximum size that is not expected to
do many lookups.
Args:
maxsize (int): Maximum size of the cache.
"""
__slots__ = [
"_maxsize",
"_cache",
"hits",
"misses",
]
def __init__(self, maxsize: int) -> None:
self._maxsize = maxsize
self._cache: dict[CacheKey, CacheValue] = {}
self.hits = 0
self.misses = 0
def __bool__(self) -> bool:
return bool(self._cache)
def __len__(self) -> int:
return len(self._cache)
def __repr__(self) -> str:
return (
f"<FIFOCache maxsize={self._maxsize} hits={self.hits} misses={self.misses}>"
)
def clear(self) -> None:
"""Clear the cache."""
self._cache.clear()
def keys(self) -> KeysView[CacheKey]:
"""Get cache keys."""
# Mostly for tests
return self._cache.keys()
def set(self, key: CacheKey, value: CacheValue) -> None:
"""Set a value.
Args:
key (CacheKey): Key.
value (CacheValue): Value.
"""
if key not in self._cache and len(self._cache) >= self._maxsize:
for first_key in self._cache:
self._cache.pop(first_key)
break
self._cache[key] = value
__setitem__ = set
@overload
def get(self, key: CacheKey) -> CacheValue | None:
...
@overload
def get(self, key: CacheKey, default: DefaultValue) -> CacheValue | DefaultValue:
...
def get(
self, key: CacheKey, default: DefaultValue | None = None
) -> CacheValue | DefaultValue | None:
"""Get a value from the cache, or return a default if the key is not present.
Args:
key (CacheKey): Key
default (Optional[DefaultValue], optional): Default to return if key is not present. Defaults to None.
Returns:
Union[CacheValue, Optional[DefaultValue]]: Either the value or a default.
"""
try:
result = self._cache[key]
except KeyError:
self.misses += 1
return default
else:
self.hits += 1
return result
def __getitem__(self, key: CacheKey) -> CacheValue:
try:
result = self._cache[key]
except KeyError:
self.misses += 1
raise KeyError(key) from None
else:
self.hits += 1
return result
def __contains__(self, key: CacheKey) -> bool:
return key in self._cache

View File

@@ -13,7 +13,6 @@ without having to render the entire screen.
from __future__ import annotations
from itertools import chain
from operator import itemgetter
from typing import TYPE_CHECKING, Iterable, NamedTuple, cast
@@ -26,10 +25,11 @@ from rich.style import Style
from . import errors
from ._cells import cell_len
from ._loop import loop_last
from ._types import Lines
from .strip import Strip
from ._typing import TypeAlias
from .geometry import NULL_OFFSET, Offset, Region, Size
if TYPE_CHECKING:
from .widget import Widget
@@ -66,8 +66,8 @@ CompositorMap: TypeAlias = "dict[Widget, MapGeometry]"
class LayoutUpdate:
"""A renderable containing the result of a render for a given region."""
def __init__(self, lines: Lines, region: Region) -> None:
self.lines = lines
def __init__(self, strips: list[Strip], region: Region) -> None:
self.strips = strips
self.region = region
def __rich_console__(
@@ -76,7 +76,7 @@ class LayoutUpdate:
x = self.region.x
new_line = Segment.line()
move_to = Control.move_to
for last, (y, line) in loop_last(enumerate(self.lines, self.region.y)):
for last, (y, line) in loop_last(enumerate(self.strips, self.region.y)):
yield move_to(x, y)
yield from line
if not last:
@@ -92,7 +92,7 @@ class ChopsUpdate:
def __init__(
self,
chops: list[dict[int, list[Segment] | None]],
chops: list[dict[int, Strip | None]],
spans: list[tuple[int, int, int]],
chop_ends: list[list[int]],
) -> None:
@@ -117,13 +117,12 @@ class ChopsUpdate:
last_y = self.spans[-1][0]
_cell_len = cell_len
for y, x1, x2 in self.spans:
line = chops[y]
ends = chop_ends[y]
for end, (x, segments) in zip(ends, line.items()):
for end, (x, strip) in zip(ends, line.items()):
# TODO: crop to x extents
if segments is None:
if strip is None:
continue
if x > x2 or end <= x1:
@@ -131,10 +130,10 @@ class ChopsUpdate:
if x2 > x >= x1 and end <= x2:
yield move_to(x, y)
yield from segments
yield from strip
continue
iter_segments = iter(segments)
iter_segments = iter(strip)
if x < x1:
for segment in iter_segments:
next_x = x + _cell_len(segment.text)
@@ -280,12 +279,11 @@ class Compositor:
# i.e. if something is moved / deleted / added
if screen not in self._dirty_regions:
crop_screen = screen.intersection
changes = map.items() ^ old_map.items()
regions = {
region
for region in (
crop_screen(map_geometry.visible_region)
map_geometry.clip.intersection(map_geometry.region)
for _, map_geometry in changes
)
if region
@@ -635,11 +633,11 @@ class Compositor:
def _get_renders(
self, crop: Region | None = None
) -> Iterable[tuple[Region, Region, Lines]]:
) -> Iterable[tuple[Region, Region, list[Strip]]]:
"""Get rendered widgets (lists of segments) in the composition.
Returns:
Iterable[tuple[Region, Region, Lines]]: An iterable of <region>, <clip region>, and <lines>
Iterable[tuple[Region, Region, Strips]]: An iterable of <region>, <clip region>, and <strips>
"""
# If a renderable throws an error while rendering, the user likely doesn't care about the traceback
# up to this point.
@@ -685,18 +683,6 @@ class Compositor:
_Region(delta_x, delta_y, new_width, new_height)
)
@classmethod
def _assemble_chops(
cls, chops: list[dict[int, list[Segment] | None]]
) -> list[list[Segment]]:
"""Combine chops in to lines."""
from_iterable = chain.from_iterable
segment_lines: list[list[Segment]] = [
list(from_iterable(line for line in bucket.values() if line is not None))
for bucket in chops
]
return segment_lines
def render(self, full: bool = False) -> RenderableType | None:
"""Render a layout.
@@ -728,8 +714,6 @@ class Compositor:
else:
return None
divide = Segment.divide
# Maps each cut on to a list of segments
cuts = self.cuts
@@ -738,19 +722,19 @@ class Compositor:
"Callable[[list[int]], dict[int, list[Segment] | None]]", dict.fromkeys
)
# A mapping of cut index to a list of segments for each line
chops: list[dict[int, list[Segment] | None]]
chops: list[dict[int, Strip | None]]
chops = [fromkeys(cut_set[:-1]) for cut_set in cuts]
cut_segments: Iterable[list[Segment]]
cut_strips: Iterable[Strip]
# Go through all the renders in reverse order and fill buckets with no render
renders = self._get_renders(crop)
intersection = Region.intersection
for region, clip, lines in renders:
for region, clip, strips in renders:
render_region = intersection(region, clip)
for y, line in zip(render_region.line_range, lines):
for y, strip in zip(render_region.line_range, strips):
if not is_rendered_line(y):
continue
@@ -763,20 +747,20 @@ class Compositor:
]
if len(final_cuts) <= 2:
# Two cuts, which means the entire line
cut_segments = [line]
cut_strips = [strip]
else:
render_x = render_region.x
relative_cuts = [cut - render_x for cut in final_cuts[1:]]
cut_segments = divide(line, relative_cuts)
cut_strips = strip.divide(relative_cuts)
# Since we are painting front to back, the first segments for a cut "wins"
for cut, segments in zip(final_cuts, cut_segments):
for cut, strip in zip(final_cuts, cut_strips):
if chops_line[cut] is None:
chops_line[cut] = segments
chops_line[cut] = strip
if full:
render_lines = self._assemble_chops(chops)
return LayoutUpdate(render_lines, screen_region)
render_strips = [Strip.join(chop.values()) for chop in chops]
return LayoutUpdate(render_strips, screen_region)
else:
chop_ends = [cut_set[1:] for cut_set in cuts]
return ChopsUpdate(chops, spans, chop_ends)
@@ -785,7 +769,7 @@ class Compositor:
self, console: Console, options: ConsoleOptions
) -> RenderResult:
if self._dirty_regions:
yield self.render()
yield self.render() or ""
def update_widgets(self, widgets: set[Widget]) -> None:
"""Update a given widget in the composition.

View File

@@ -13,14 +13,14 @@ class LineFilter(ABC):
"""Base class for a line filter."""
@abstractmethod
def filter(self, segments: list[Segment]) -> list[Segment]:
def apply(self, segments: list[Segment]) -> list[Segment]:
"""Transform a list of segments."""
class Monochrome(LineFilter):
"""Convert all colors to monochrome."""
def filter(self, segments: list[Segment]) -> list[Segment]:
def apply(self, segments: list[Segment]) -> list[Segment]:
to_monochrome = self.to_monochrome
_Segment = Segment
return [

View File

@@ -10,7 +10,6 @@ from rich.segment import Segment
from rich.style import Style
from ._cells import cell_len
from ._types import Lines
from .css.types import AlignHorizontal, AlignVertical
from .geometry import Size
@@ -22,8 +21,8 @@ def line_crop(
Args:
segments (list[Segment]): A list of Segments for a line.
start (int): Start offset
end (int): End offset (exclusive)
start (int): Start offset (cells)
end (int): End offset (cells, exclusive)
total (int): Total cell length of segments.
Returns:
list[Segment]: A new shorter list of segments
@@ -130,7 +129,7 @@ def line_pad(
def align_lines(
lines: Lines,
lines: list[list[Segment]],
style: Style,
size: Size,
horizontal: AlignHorizontal,
@@ -153,7 +152,7 @@ def align_lines(
width, height = size
shape_width, shape_height = Segment.get_shape(lines)
def blank_lines(count: int) -> Lines:
def blank_lines(count: int) -> list[list[Segment]]:
return [[Segment(" " * width, style)]] * count
top_blank_lines = bottom_blank_lines = 0

65
src/textual/_sleep.py Normal file
View File

@@ -0,0 +1,65 @@
from __future__ import annotations
from time import sleep, perf_counter
from asyncio import get_running_loop
from threading import Thread, Event
class Sleeper(Thread):
def __init__(
self,
) -> None:
self._exit = False
self._sleep_time = 0.0
self._event = Event()
self.future = None
self._loop = get_running_loop()
super().__init__(daemon=True)
def run(self):
while True:
self._event.wait()
if self._exit:
break
sleep(self._sleep_time)
self._event.clear()
# self.future.set_result(None)
self._loop.call_soon_threadsafe(self.future.set_result, None)
async def sleep(self, sleep_time: float) -> None:
future = self.future = self._loop.create_future()
self._sleep_time = sleep_time
self._event.set()
await future
# await self._async_event.wait()
# self._async_event.clear()
async def check_sleeps() -> None:
sleeper = Sleeper()
sleeper.start()
async def profile_sleep(sleep_for: float) -> float:
start = perf_counter()
while perf_counter() - start < sleep_for:
sleep(0)
# await sleeper.sleep(sleep_for)
elapsed = perf_counter() - start
return elapsed
for t in range(15, 120, 5):
sleep_time = 1 / t
elapsed = await profile_sleep(sleep_time)
difference = (elapsed / sleep_time * 100) - 100
print(
f"sleep={sleep_time*1000:.01f}ms clock={elapsed*1000:.01f}ms diff={difference:.02f}%"
)
from asyncio import run
run(check_sleeps())

View File

@@ -11,12 +11,12 @@ from ._border import get_box, render_row
from ._filter import LineFilter
from ._opacity import _apply_opacity
from ._segment_tools import line_crop, line_pad, line_trim
from ._types import Lines
from ._typing import TypeAlias
from .color import Color
from .geometry import Region, Size, Spacing
from .renderables.text_opacity import TextOpacity
from .renderables.tint import Tint
from .strip import Strip
if TYPE_CHECKING:
from .css.styles import StylesBase
@@ -25,35 +25,6 @@ if TYPE_CHECKING:
RenderLineCallback: TypeAlias = Callable[[int], List[Segment]]
def style_links(
segments: Iterable[Segment], link_id: str, link_style: Style
) -> list[Segment]:
"""Apply a style to the given link id.
Args:
segments (Iterable[Segment]): Segments.
link_id (str): A link id.
link_style (Style): Style to apply.
Returns:
list[Segment]: A list of new segments.
"""
_Segment = Segment
segments = [
_Segment(
text,
(style + link_style if style is not None else None)
if (style and not style._null and style._link_id == link_id)
else style,
control,
)
for text, style, control in segments
]
return segments
@lru_cache(1024 * 8)
def make_blank(width, style: Style) -> Segment:
"""Make a blank segment.
@@ -95,7 +66,7 @@ class StylesCache:
"""
def __init__(self) -> None:
self._cache: dict[int, list[Segment]] = {}
self._cache: dict[int, Strip] = {}
self._dirty_lines: set[int] = set()
self._width = 1
@@ -123,7 +94,7 @@ class StylesCache:
self._cache.clear()
self._dirty_lines.clear()
def render_widget(self, widget: Widget, crop: Region) -> Lines:
def render_widget(self, widget: Widget, crop: Region) -> list[Strip]:
"""Render the content for a widget.
Args:
@@ -135,7 +106,7 @@ class StylesCache:
"""
base_background, background = widget.background_colors
styles = widget.styles
lines = self.render(
strips = self.render(
styles,
widget.region.size,
base_background,
@@ -147,7 +118,6 @@ class StylesCache:
filter=widget.app._filter,
)
if widget.auto_links:
_style_links = style_links
hover_style = widget.hover_style
link_hover_style = widget.link_hover_style
if (
@@ -157,12 +127,12 @@ class StylesCache:
and "@click" in hover_style.meta
):
if link_hover_style:
lines = [
_style_links(line, hover_style.link_id, link_hover_style)
for line in lines
strips = [
strip.style_links(hover_style.link_id, link_hover_style)
for strip in strips
]
return lines
return strips
def render(
self,
@@ -175,7 +145,7 @@ class StylesCache:
padding: Spacing | None = None,
crop: Region | None = None,
filter: LineFilter | None = None,
) -> Lines:
) -> list[Strip]:
"""Render a widget content plus CSS styles.
Args:
@@ -202,15 +172,14 @@ class StylesCache:
if width != self._width:
self.clear()
self._width = width
lines: Lines = []
add_line = lines.append
simplify = Segment.simplify
strips: list[Strip] = []
add_strip = strips.append
is_dirty = self._dirty_lines.__contains__
render_line = self.render_line
for y in crop.line_range:
if is_dirty(y) or y not in self._cache:
line = render_line(
strip = render_line(
styles,
y,
size,
@@ -220,21 +189,19 @@ class StylesCache:
background,
render_content_line,
)
line = list(simplify(line))
self._cache[y] = line
self._cache[y] = strip
else:
line = self._cache[y]
strip = self._cache[y]
if filter:
line = filter.filter(line)
add_line(line)
strip = strip.apply_filter(filter)
add_strip(strip)
self._dirty_lines.difference_update(crop.line_range)
if crop.column_span != (0, width):
_line_crop = line_crop
x1, x2 = crop.column_span
lines = [_line_crop(line, x1, x2, width) for line in lines]
strips = [strip.crop(x1, x2) for strip in strips]
return lines
return strips
def render_line(
self,
@@ -246,7 +213,7 @@ class StylesCache:
base_background: Color,
background: Color,
render_content_line: RenderLineCallback,
) -> list[Segment]:
) -> Strip:
"""Render a styled line.
Args:
@@ -288,7 +255,7 @@ class StylesCache:
inner = from_color(bgcolor=(base_background + background).rich_color)
outer = from_color(bgcolor=base_background.rich_color)
def post(segments: Iterable[Segment]) -> list[Segment]:
def post(segments: Iterable[Segment]) -> Iterable[Segment]:
"""Post process segments to apply opacity and tint.
Args:
@@ -303,8 +270,7 @@ class StylesCache:
segments = Tint.process_segments(segments, styles.tint)
if styles.opacity != 1.0:
segments = _apply_opacity(segments, base_background, styles.opacity)
segments = list(segments)
return segments if isinstance(segments, list) else list(segments)
return segments
line: Iterable[Segment]
# Draw top or bottom borders (A)
@@ -402,4 +368,5 @@ class StylesCache:
else:
line = [*line, right]
return post(line)
strip = Strip(post(line), width)
return strip

View File

@@ -1,6 +1,8 @@
import platform
from time import monotonic, perf_counter
from asyncio import sleep as asyncio_sleep, get_running_loop
from time import monotonic, perf_counter, sleep as time_sleep
PLATFORM = platform.system()
WINDOWS = PLATFORM == "Windows"
@@ -10,3 +12,19 @@ if WINDOWS:
time = perf_counter
else:
time = monotonic
if WINDOWS:
async def sleep(sleep_for: float) -> None:
"""An asyncio sleep.
On Windows this achieves a better granularity that asyncio.sleep
Args:
sleep_for (float): Seconds to sleep for.
"""
await get_running_loop().run_in_executor(None, time_sleep, sleep_for)
else:
sleep = asyncio_sleep

View File

@@ -2,10 +2,12 @@ from typing import Awaitable, Callable, List, TYPE_CHECKING, Union
from rich.segment import Segment
from textual._typing import Protocol
from ._typing import Protocol
if TYPE_CHECKING:
from .message import Message
from .strip import Strip
class MessageTarget(Protocol):
@@ -27,5 +29,5 @@ class EventTarget(Protocol):
...
Lines = List[List[Segment]]
SegmentLines = List[List["Segment"]]
CallbackType = Union[Callable[[], Awaitable[None]], Callable[[], None]]

View File

@@ -30,6 +30,16 @@ class ScrollView(Widget):
"""Not transparent, i.e. renders something."""
return False
def watch_scroll_x(self, new_value: float) -> None:
if self.show_horizontal_scrollbar:
self.horizontal_scrollbar.position = int(new_value)
self.refresh()
def watch_scroll_y(self, new_value: float) -> None:
if self.show_vertical_scrollbar:
self.vertical_scrollbar.position = int(new_value)
self.refresh()
def on_mount(self):
self._refresh_scrollbars()
@@ -68,6 +78,8 @@ class ScrollView(Widget):
virtual_size (Size): New virtual size.
container_size (Size): New container size.
"""
if self._size != size or container_size != container_size:
self.refresh()
if (
self._size != size
or virtual_size != self.virtual_size
@@ -77,9 +89,7 @@ class ScrollView(Widget):
virtual_size = self.virtual_size
self._container_size = size - self.styles.gutter.totals
self._scroll_update(virtual_size)
self.scroll_to(self.scroll_x, self.scroll_y, animate=False)
self.refresh()
def render(self) -> RenderableType:
"""Render the scrollable region (if `render_lines` is not implemented).

289
src/textual/strip.py Normal file
View File

@@ -0,0 +1,289 @@
from __future__ import annotations
from itertools import chain
from typing import Iterable, Iterator
import rich.repr
from rich.cells import cell_len, set_cell_size
from rich.segment import Segment
from rich.style import Style
from ._cache import FIFOCache
from ._filter import LineFilter
@rich.repr.auto
class Strip:
"""Represents a 'strip' (horizontal line) of a Textual Widget.
A Strip is like an immutable list of Segments. The immutability allows for effective caching.
Args:
segments (Iterable[Segment]): An iterable of segments.
cell_length (int | None, optional): The cell length if known, or None to calculate on demand. Defaults to None.
"""
__slots__ = [
"_segments",
"_cell_length",
"_divide_cache",
"_crop_cache",
]
def __init__(
self, segments: Iterable[Segment], cell_length: int | None = None
) -> None:
self._segments = list(segments)
self._cell_length = cell_length
self._divide_cache: FIFOCache[tuple[int, ...], list[Strip]] = FIFOCache(4)
self._crop_cache: FIFOCache[tuple[int, int], Strip] = FIFOCache(4)
def __rich_repr__(self) -> rich.repr.Result:
yield self._segments
yield self.cell_length
@classmethod
def blank(cls, cell_length: int, style: Style | None) -> Strip:
"""Create a blank strip.
Args:
cell_length (int): Desired cell length.
style (Style | None): Style of blank.
Returns:
Strip: New strip.
"""
return cls([Segment(" " * cell_length, style)], cell_length)
@classmethod
def from_lines(cls, lines: list[list[Segment]], cell_length: int) -> list[Strip]:
"""Convert lines (lists of segments) to a list of Strips.
Args:
lines (list[list[Segment]]): List of lines, where a line is a list of segments.
cell_length (int): Cell length of lines (must be same).
Returns:
list[Strip]: List of strips.
"""
return [cls(segments, cell_length) for segments in lines]
@property
def cell_length(self) -> int:
"""Get the number of cells required to render this object."""
# Done on demand and cached, as this is an O(n) operation
if self._cell_length is None:
self._cell_length = Segment.get_line_length(self._segments)
return self._cell_length
@classmethod
def join(cls, strips: Iterable[Strip | None]) -> Strip:
"""Join a number of strips in to one.
Args:
strips (Iterable[Strip]): An iterable of Strips.
Returns:
Strip: A new combined strip.
"""
segments: list[list[Segment]] = []
add_segments = segments.append
total_cell_length = 0
for strip in strips:
if strip is not None:
total_cell_length += strip.cell_length
add_segments(strip._segments)
strip = cls(chain.from_iterable(segments), total_cell_length)
return strip
def __bool__(self) -> bool:
return bool(self._segments)
def __iter__(self) -> Iterator[Segment]:
return iter(self._segments)
def __reversed__(self) -> Iterator[Segment]:
return reversed(self._segments)
def __len__(self) -> int:
return len(self._segments)
def __eq__(self, strip: Strip) -> bool:
return (
self._segments == strip._segments and self.cell_length == strip.cell_length
)
def adjust_cell_length(self, cell_length: int, style: Style | None = None) -> Strip:
"""Adjust the cell length, possibly truncating or extending.
Args:
cell_length (int): New desired cell length.
style (Style | None): Style when extending, or `None`. Defaults to `None`.
Returns:
Strip: A new strip with the supplied cell length.
"""
new_line: list[Segment]
line = self._segments
current_cell_length = self.cell_length
_Segment = Segment
if current_cell_length < cell_length:
# Cell length is larger, so pad with spaces.
new_line = line + [
_Segment(" " * (cell_length - current_cell_length), style)
]
elif current_cell_length > cell_length:
# Cell length is shorter so we need to truncate.
new_line = []
append = new_line.append
line_length = 0
for segment in line:
segment_length = segment.cell_length
if line_length + segment_length < cell_length:
append(segment)
line_length += segment_length
else:
text, segment_style, _ = segment
text = set_cell_size(text, cell_length - line_length)
append(_Segment(text, segment_style))
break
else:
# Strip is already the required cell length, so return self.
return self
return Strip(new_line, cell_length)
def simplify(self) -> Strip:
"""Simplify the segments (join segments with same style)
Returns:
Strip: New strip.
"""
line = Strip(
Segment.simplify(self._segments),
self._cell_length,
)
return line
def apply_filter(self, filter: LineFilter) -> Strip:
"""Apply a filter to all segments in the strip.
Args:
filter (LineFilter): A line filter object.
Returns:
Strip: A new Strip.
"""
return Strip(filter.apply(self._segments), self._cell_length)
def style_links(self, link_id: str, link_style: Style) -> Strip:
"""Apply a style to Segments with the given link_id.
Args:
link_id (str): A link id.
link_style (Style): Style to apply.
Returns:
Strip: New strip (or same Strip if no changes).
"""
_Segment = Segment
if not any(
segment.style._link_id == link_id
for segment in self._segments
if segment.style
):
return self
segments = [
_Segment(
text,
(style + link_style if style is not None else None)
if (style and not style._null and style._link_id == link_id)
else style,
control,
)
for text, style, control in self._segments
]
return Strip(segments, self._cell_length)
def crop(self, start: int, end: int) -> Strip:
"""Crop a strip between two cell positions.
Args:
start (int): The start cell position (inclusive).
end (int): The end cell position (exclusive).
Returns:
Strip: A new Strip.
"""
if start == 0 and end == self.cell_length:
return self
cache_key = (start, end)
cached = self._crop_cache.get(cache_key)
if cached is not None:
return cached
_cell_len = cell_len
pos = 0
output_segments: list[Segment] = []
add_segment = output_segments.append
iter_segments = iter(self._segments)
segment: Segment | None = None
if start > self.cell_length:
strip = Strip([], 0)
else:
for segment in iter_segments:
end_pos = pos + _cell_len(segment.text)
if end_pos > start:
segment = segment.split_cells(start - pos)[1]
break
pos = end_pos
if end >= self.cell_length:
# The end crop is the end of the segments, so we can collect all remaining segments
if segment:
add_segment(segment)
output_segments.extend(iter_segments)
strip = Strip(output_segments, self.cell_length - start)
else:
pos = start
while segment is not None:
end_pos = pos + _cell_len(segment.text)
if end_pos < end:
add_segment(segment)
else:
add_segment(segment.split_cells(end - pos)[0])
break
pos = end_pos
segment = next(iter_segments, None)
strip = Strip(output_segments, end - start)
self._crop_cache[cache_key] = strip
return strip
def divide(self, cuts: Iterable[int]) -> list[Strip]:
"""Divide the strip in to multiple smaller strips by cutting at given (cell) indices.
Args:
cuts (Iterable[int]): An iterable of cell positions as ints.
Returns:
list[Strip]: A new list of strips.
"""
pos = 0
cache_key = tuple(cuts)
cached = self._divide_cache.get(cache_key)
if cached is not None:
return cached
strips: list[Strip] = []
add_strip = strips.append
for segments, cut in zip(Segment.divide(self._segments, cuts), cuts):
add_strip(Strip(segments, cut - pos))
pos += cut
self._divide_cache[cache_key] = strips
return strips

View File

@@ -9,19 +9,15 @@ from __future__ import annotations
import asyncio
import weakref
from asyncio import (
CancelledError,
Event,
Task,
)
from asyncio import CancelledError, Event, Task
from typing import Awaitable, Callable, Union
from rich.repr import Result, rich_repr
from . import events
from . import _clock, events
from ._callback import invoke
from ._context import active_app
from . import _clock
from ._time import sleep
from ._types import MessageTarget
TimerCallback = Union[Callable[[], Awaitable[None]], Callable[[], None]]
@@ -140,6 +136,7 @@ class Timer:
_interval = self._interval
await self._active.wait()
start = _clock.get_time_no_wait()
while _repeat is None or count <= _repeat:
next_timer = start + ((count + 1) * _interval)
now = await _clock.get_time()
@@ -148,8 +145,8 @@ class Timer:
continue
now = await _clock.get_time()
wait_time = max(0, next_timer - now)
if wait_time:
await _clock.sleep(wait_time)
if wait_time > 1 / 1000:
await sleep(wait_time)
count += 1
await self._active.wait()

View File

@@ -37,13 +37,11 @@ from rich.text import Text
from . import errors, events, messages
from ._animator import DEFAULT_EASING, Animatable, BoundAnimator, EasingFunction
from ._arrange import DockArrangeResult, arrange
from ._cache import LRUCache
from ._context import active_app
from ._easing import DEFAULT_SCROLL_EASING
from ._layout import Layout
from ._segment_tools import align_lines
from ._styles_cache import StylesCache
from ._types import Lines
from .actions import SkipAction
from .await_remove import AwaitRemove
from .binding import Binding
@@ -57,6 +55,7 @@ from .message import Message
from .messages import CallbackType
from .reactive import Reactive
from .render import measure
from .strip import Strip
from .walk import walk_depth_first
if TYPE_CHECKING:
@@ -156,7 +155,7 @@ class RenderCache(NamedTuple):
"""Stores results of a previous render."""
size: Size
lines: Lines
lines: list[Strip]
class WidgetError(Exception):
@@ -250,8 +249,8 @@ class Widget(DOMNode):
self._content_width_cache: tuple[object, int] = (None, 0)
self._content_height_cache: tuple[object, int] = (None, 0)
self._arrangement_cache_updates: int = -1
self._arrangement_cache: LRUCache[Size, DockArrangeResult] = LRUCache(4)
self._arrangement_cache_key: tuple[Size, int] = (Size(), -1)
self._cached_arrangement: DockArrangeResult | None = None
self._styles_cache = StylesCache()
self._rich_style_cache: dict[str, tuple[Style, Style]] = {}
@@ -463,22 +462,23 @@ class Widget(DOMNode):
"""
assert self.is_container
if self._arrangement_cache_updates != self.children._updates:
self._arrangement_cache_updates = self.children._updates
self._arrangement_cache.clear()
cache_key = (size, self.children._updates)
if (
self._arrangement_cache_key == cache_key
and self._cached_arrangement is not None
):
return self._cached_arrangement
cached_arrangement = self._arrangement_cache.get(size, None)
if cached_arrangement is not None:
return cached_arrangement
arrangement = self._arrangement_cache[size] = arrange(
self._arrangement_cache_key = cache_key
arrangement = self._cached_arrangement = arrange(
self, self.children, size, self.screen.size
)
return arrangement
def _clear_arrangement_cache(self) -> None:
"""Clear arrangement cache, forcing a new arrange operation."""
self._arrangement_cache.clear()
self._cached_arrangement = None
def _get_virtual_dom(self) -> Iterable[Widget]:
"""Get widgets not part of the DOM.
@@ -2097,11 +2097,11 @@ class Widget(DOMNode):
align_vertical,
)
)
self._render_cache = RenderCache(self.size, lines)
strips = [Strip(line, width) for line in lines]
self._render_cache = RenderCache(self.size, strips)
self._dirty_regions.clear()
def render_line(self, y: int) -> list[Segment]:
def render_line(self, y: int) -> Strip:
"""Render a line of content.
Args:
@@ -2115,10 +2115,10 @@ class Widget(DOMNode):
try:
line = self._render_cache.lines[y]
except IndexError:
line = [Segment(" " * self.size.width, self.rich_style)]
line = Strip.blank(self.size.width, self.rich_style)
return line
def render_lines(self, crop: Region) -> Lines:
def render_lines(self, crop: Region) -> list[Strip]:
"""Render the widget in to lines.
Args:
@@ -2127,8 +2127,8 @@ class Widget(DOMNode):
Returns:
Lines: A list of list of segments.
"""
lines = self._styles_cache.render_widget(self, crop)
return lines
strips = self._styles_cache.render_widget(self, crop)
return strips
def get_style_at(self, x: int, y: int) -> Style:
"""Get the Rich style in a widget at a given relative offset.

View File

@@ -14,11 +14,12 @@ from rich.text import Text, TextType
from .. import events, messages
from .._cache import LRUCache
from .._segment_tools import line_crop
from .._types import Lines
from .._types import SegmentLines
from ..geometry import Region, Size, Spacing, clamp
from ..reactive import Reactive
from ..render import measure
from ..scroll_view import ScrollView
from ..strip import Strip
from .._typing import Literal
CursorType = Literal["cell", "row", "column"]
@@ -207,14 +208,14 @@ class DataTable(ScrollView, Generic[CellType], can_focus=True):
self.row_count = 0
self._y_offsets: list[tuple[int, int]] = []
self._row_render_cache: LRUCache[
tuple[int, int, Style, int, int], tuple[Lines, Lines]
tuple[int, int, Style, int, int], tuple[SegmentLines, SegmentLines]
]
self._row_render_cache = LRUCache(1000)
self._cell_render_cache: LRUCache[tuple[int, int, Style, bool, bool], Lines]
self._cell_render_cache = LRUCache(10000)
self._line_cache: LRUCache[
tuple[int, int, int, int, int, int, Style], list[Segment]
self._cell_render_cache: LRUCache[
tuple[int, int, Style, bool, bool], SegmentLines
]
self._cell_render_cache = LRUCache(10000)
self._line_cache: LRUCache[tuple[int, int, int, int, int, int, Style], Strip]
self._line_cache = LRUCache(1000)
self._line_no = 0
@@ -452,7 +453,7 @@ class DataTable(ScrollView, Generic[CellType], can_focus=True):
width: int,
cursor: bool = False,
hover: bool = False,
) -> Lines:
) -> SegmentLines:
"""Render the given cell.
Args:
@@ -490,7 +491,7 @@ class DataTable(ScrollView, Generic[CellType], can_focus=True):
base_style: Style,
cursor_column: int = -1,
hover_column: int = -1,
) -> tuple[Lines, Lines]:
) -> tuple[SegmentLines, SegmentLines]:
"""Render a row in to lines for each cell.
Args:
@@ -565,9 +566,7 @@ class DataTable(ScrollView, Generic[CellType], can_focus=True):
raise LookupError("Y coord {y!r} is greater than total height")
return self._y_offsets[y]
def _render_line(
self, y: int, x1: int, x2: int, base_style: Style
) -> list[Segment]:
def _render_line(self, y: int, x1: int, x2: int, base_style: Style) -> Strip:
"""Render a line in to a list of segments.
Args:
@@ -585,7 +584,7 @@ class DataTable(ScrollView, Generic[CellType], can_focus=True):
try:
row_index, line_no = self._get_offsets(y)
except LookupError:
return [Segment(" " * width, base_style)]
return Strip.blank(width, base_style)
cursor_column = (
self.cursor_column
if (self.show_cursor and self.cursor_row == row_index)
@@ -612,13 +611,12 @@ class DataTable(ScrollView, Generic[CellType], can_focus=True):
scrollable_line: list[Segment] = list(chain.from_iterable(scrollable))
segments = fixed_line + line_crop(scrollable_line, x1 + fixed_width, x2, width)
segments = Segment.adjust_line_length(segments, width, style=base_style)
simplified_segments = list(Segment.simplify(segments))
strip = Strip(segments).adjust_cell_length(width, base_style).simplify()
self._line_cache[cache_key] = simplified_segments
return segments
self._line_cache[cache_key] = strip
return strip
def render_line(self, y: int) -> list[Segment]:
def render_line(self, y: int) -> Strip:
width, height = self.size
scroll_x, scroll_y = self.scroll_offset
fixed_top_row_count = sum(

View File

@@ -15,7 +15,7 @@ from ..geometry import Size, Region
from ..scroll_view import ScrollView
from .._cache import LRUCache
from .._segment_tools import line_crop
from .._types import Lines
from ..strip import Strip
class TextLog(ScrollView, can_focus=True):
@@ -48,8 +48,8 @@ class TextLog(ScrollView, can_focus=True):
super().__init__(name=name, id=id, classes=classes)
self.max_lines = max_lines
self._start_line: int = 0
self.lines: list[list[Segment]] = []
self._line_cache: LRUCache[tuple[int, int, int, int], list[Segment]]
self.lines: list[Strip] = []
self._line_cache: LRUCache[tuple[int, int, int, int], Strip]
self._line_cache = LRUCache(1024)
self.max_width: int = 0
self.min_width = min_width
@@ -120,7 +120,8 @@ class TextLog(ScrollView, can_focus=True):
self.max_width,
max(sum(segment.cell_length for segment in _line) for _line in lines),
)
self.lines.extend(lines)
strips = Strip.from_lines(lines, render_width)
self.lines.extend(strips)
if self.max_lines is not None and len(self.lines) > self.max_lines:
self._start_line += len(self.lines) - self.max_lines
@@ -138,13 +139,13 @@ class TextLog(ScrollView, can_focus=True):
self.virtual_size = Size(self.max_width, len(self.lines))
self.refresh()
def render_line(self, y: int) -> list[Segment]:
def render_line(self, y: int) -> Strip:
scroll_x, scroll_y = self.scroll_offset
line = self._render_line(scroll_y + y, scroll_x, self.size.width)
line = list(Segment.apply_style(line, self.rich_style))
return line
strip = Strip(Segment.apply_style(line, self.rich_style), self.size.width)
return strip
def render_lines(self, crop: Region) -> Lines:
def render_lines(self, crop: Region) -> list[Strip]:
"""Render the widget in to lines.
Args:
@@ -156,19 +157,20 @@ class TextLog(ScrollView, can_focus=True):
lines = self._styles_cache.render_widget(self, crop)
return lines
def _render_line(self, y: int, scroll_x: int, width: int) -> list[Segment]:
def _render_line(self, y: int, scroll_x: int, width: int) -> Strip:
if y >= len(self.lines):
return [Segment(" " * width, self.rich_style)]
return Strip.blank(width, self.rich_style)
key = (y + self._start_line, scroll_x, width, self.max_width)
if key in self._line_cache:
return self._line_cache[key]
line = self.lines[y]
line = Segment.adjust_line_length(
line, max(self.max_width, width), self.rich_style
line = (
self.lines[y]
.adjust_cell_length(max(self.max_width, width), self.rich_style)
.crop(scroll_x, scroll_x + width)
)
line = line_crop(line, scroll_x, scroll_x + width, self.max_width)
self._line_cache[key] = line
return line

View File

@@ -5,22 +5,21 @@ from typing import ClassVar, Generic, NewType, TypeVar
import rich.repr
from rich.segment import Segment
from rich.style import Style, NULL_STYLE
from rich.style import NULL_STYLE, Style
from rich.text import Text, TextType
from ..binding import Binding
from ..geometry import clamp, Region, Size
from .._loop import loop_last
from .. import events
from .._cache import LRUCache
from ..message import Message
from ..reactive import reactive, var
from .._loop import loop_last
from .._segment_tools import line_crop, line_pad
from .._types import MessageTarget
from .._typing import TypeAlias
from ..binding import Binding
from ..geometry import Region, Size, clamp
from ..message import Message
from ..reactive import reactive, var
from ..scroll_view import ScrollView
from .. import events
from ..strip import Strip
NodeID = NewType("NodeID", int)
TreeDataType = TypeVar("TreeDataType")
@@ -365,7 +364,7 @@ class Tree(Generic[TreeDataType], ScrollView, can_focus=True):
self._current_id = 0
self.root = self._add_node(None, text_label, data)
self._line_cache: LRUCache[LineCacheKey, list[Segment]] = LRUCache(1024)
self._line_cache: LRUCache[LineCacheKey, Strip] = LRUCache(1024)
self._tree_lines_cached: list[_TreeLine] | None = None
self._cursor_node: TreeNode[TreeDataType] | None = None
@@ -666,7 +665,7 @@ class Tree(Generic[TreeDataType], ScrollView, can_focus=True):
self.cursor_line = -1
self.refresh()
def render_line(self, y: int) -> list[Segment]:
def render_line(self, y: int) -> Strip:
width = self.size.width
scroll_x, scroll_y = self.scroll_offset
style = self.rich_style
@@ -677,14 +676,12 @@ class Tree(Generic[TreeDataType], ScrollView, can_focus=True):
style,
)
def _render_line(
self, y: int, x1: int, x2: int, base_style: Style
) -> list[Segment]:
def _render_line(self, y: int, x1: int, x2: int, base_style: Style) -> Strip:
tree_lines = self._tree_lines
width = self.size.width
if y >= len(tree_lines):
return [Segment(" " * width, base_style)]
return Strip.blank(width, base_style)
line = tree_lines[y]
@@ -699,7 +696,7 @@ class Tree(Generic[TreeDataType], ScrollView, can_focus=True):
tuple(node._updates for node in line.path),
)
if cache_key in self._line_cache:
segments = self._line_cache[cache_key]
strip = self._line_cache[cache_key]
else:
base_guide_style = self.get_component_rich_style(
"tree--guides", partial=True
@@ -785,11 +782,10 @@ class Tree(Generic[TreeDataType], ScrollView, can_focus=True):
segments = list(guides.render(self.app.console))
pad_width = max(self.virtual_size.width, width)
segments = line_pad(segments, 0, pad_width - guides.cell_len, line_style)
self._line_cache[cache_key] = segments
strip = self._line_cache[cache_key] = Strip(segments)
segments = line_crop(segments, x1, x2, Segment.get_line_length(segments))
return segments
strip = strip.crop(x1, x2)
return strip
def _on_resize(self, event: events.Resize) -> None:
self._line_cache.grow(event.size.height)

File diff suppressed because one or more lines are too long

View File

@@ -3,12 +3,14 @@ from __future__ import unicode_literals
import pytest
from textual._cache import LRUCache
from textual._cache import FIFOCache, LRUCache
def test_lru_cache():
cache = LRUCache(3)
assert str(cache) == "<LRUCache maxsize=3 hits=0 misses=0>"
# insert some values
cache["foo"] = 1
cache["bar"] = 2
@@ -35,6 +37,38 @@ def test_lru_cache():
assert "eggegg" in cache
def test_lru_cache_hits():
cache = LRUCache(4)
assert cache.hits == 0
assert cache.misses == 0
try:
cache["foo"]
except KeyError:
assert cache.hits == 0
assert cache.misses == 1
cache["foo"] = 1
assert cache.hits == 0
assert cache.misses == 1
cache["foo"]
cache["foo"]
assert cache.hits == 2
assert cache.misses == 1
cache.get("bar")
assert cache.hits == 2
assert cache.misses == 2
cache.get("foo")
assert cache.hits == 3
assert cache.misses == 2
assert str(cache) == "<LRUCache maxsize=4 hits=3 misses=2>"
def test_lru_cache_get():
cache = LRUCache(3)
@@ -61,6 +95,7 @@ def test_lru_cache_get():
assert "egg" not in cache
assert "eggegg" in cache
def test_lru_cache_maxsize():
cache = LRUCache(3)
@@ -146,3 +181,63 @@ def test_lru_cache_len(keys: list[str], expected_len: int):
for value, key in enumerate(keys):
cache[key] = value
assert len(cache) == expected_len
def test_fifo_cache():
cache = FIFOCache(4)
assert len(cache) == 0
assert not cache
assert "foo" not in cache
cache["foo"] = 1
assert "foo" in cache
assert len(cache) == 1
assert cache
cache["bar"] = 2
cache["baz"] = 3
cache["egg"] = 4
# Cache is full
assert list(cache.keys()) == ["foo", "bar", "baz", "egg"]
assert len(cache) == 4
cache["Paul"] = 100
assert list(cache.keys()) == ["bar", "baz", "egg", "Paul"]
assert len(cache) == 4
assert cache["baz"] == 3
assert cache["bar"] == 2
cache["Chani"] = 101
assert list(cache.keys()) == ["baz", "egg", "Paul", "Chani"]
assert len(cache) == 4
cache.clear()
assert len(cache) == 0
assert list(cache.keys()) == []
def test_fifo_cache_hits():
cache = FIFOCache(4)
assert cache.hits == 0
assert cache.misses == 0
try:
cache["foo"]
except KeyError:
assert cache.hits == 0
assert cache.misses == 1
cache["foo"] = 1
assert cache.hits == 0
assert cache.misses == 1
cache["foo"]
cache["foo"]
assert cache.hits == 2
assert cache.misses == 1
cache.get("bar")
assert cache.hits == 2
assert cache.misses == 2
cache.get("foo")
assert cache.hits == 3
assert cache.misses == 2
assert str(cache) == "<FIFOCache maxsize=4 hits=3 misses=2>"

145
tests/test_strip.py Normal file
View File

@@ -0,0 +1,145 @@
from rich.segment import Segment
from rich.style import Style
from textual.strip import Strip
from textual._filter import Monochrome
def test_cell_length() -> None:
strip = Strip([Segment("foo"), Segment("💩"), Segment("bar")])
assert strip._cell_length is None
assert strip.cell_length == 8
assert strip._cell_length == 8
def test_repr() -> None:
strip = Strip([Segment("foo")])
assert repr(strip) == "Strip([Segment('foo')], 3)"
def test_join() -> None:
strip1 = Strip([Segment("foo")])
strip2 = Strip([Segment("bar")])
strip = Strip.join([strip1, strip2])
assert len(strip) == 2
assert strip.cell_length == 6
assert list(strip) == [Segment("foo"), Segment("bar")]
def test_bool() -> None:
assert not Strip([])
assert Strip([Segment("foo")])
def test_iter() -> None:
assert list(Strip([])) == []
assert list(Strip([Segment("foo")])) == [Segment("foo")]
assert list(Strip([Segment("foo"), Segment("bar")])) == [
Segment("foo"),
Segment("bar"),
]
def test_len():
assert len(Strip([])) == 0
assert len(Strip([Segment("foo")])) == 1
assert len(Strip([Segment("foo"), Segment("bar")])) == 2
def test_reversed():
assert list(reversed(Strip([]))) == []
assert list(reversed(Strip([Segment("foo")]))) == [Segment("foo")]
assert list(reversed(Strip([Segment("foo"), Segment("bar")]))) == [
Segment("bar"),
Segment("foo"),
]
def test_eq():
assert Strip([]) == Strip([])
assert Strip([Segment("foo")]) == Strip([Segment("foo")])
assert Strip([Segment("foo")]) != Strip([Segment("bar")])
def test_adjust_cell_length():
for repeat in range(3):
assert Strip([]).adjust_cell_length(3) == Strip([Segment(" ")])
assert Strip([Segment("f")]).adjust_cell_length(3) == Strip(
[Segment("f"), Segment(" ")]
)
assert Strip([Segment("💩")]).adjust_cell_length(3) == Strip(
[Segment("💩"), Segment(" ")]
)
assert Strip([Segment("💩💩")]).adjust_cell_length(3) == Strip([Segment("💩 ")])
assert Strip([Segment("💩💩")]).adjust_cell_length(4) == Strip([Segment("💩💩")])
assert Strip([Segment("💩"), Segment("💩💩")]).adjust_cell_length(2) == Strip(
[Segment("💩")]
)
assert Strip([Segment("💩"), Segment("💩💩")]).adjust_cell_length(4) == Strip(
[Segment("💩"), Segment("💩")]
)
def test_simplify():
assert Strip([Segment("foo"), Segment("bar")]).simplify() == Strip(
[Segment("foobar")]
)
def test_apply_filter():
strip = Strip([Segment("foo", Style.parse("red"))])
expected = Strip([Segment("foo", Style.parse("#1b1b1b"))])
print(repr(strip))
print(repr(expected))
assert strip.apply_filter(Monochrome()) == expected
def test_style_links():
link_style = Style.on(click="clicked")
strip = Strip(
[
Segment("foo"),
Segment("bar", link_style),
Segment("baz"),
]
)
hover_style = Style(underline=True)
new_strip = strip.style_links(link_style._link_id, hover_style)
expected = Strip(
[
Segment("foo"),
Segment("bar", link_style + hover_style),
Segment("baz"),
]
)
assert new_strip == expected
def test_crop():
for repeat in range(3):
assert Strip([Segment("foo")]).crop(0, 3) == Strip([Segment("foo")])
assert Strip([Segment("foo")]).crop(0, 2) == Strip([Segment("fo")])
assert Strip([Segment("foo")]).crop(0, 1) == Strip([Segment("f")])
assert Strip([Segment("foo")]).crop(1, 3) == Strip([Segment("oo")])
assert Strip([Segment("foo")]).crop(1, 2) == Strip([Segment("o")])
assert Strip([Segment("foo")]).crop(1, 1) == Strip([Segment("")])
assert Strip([Segment("foo💩"), Segment("b💩ar"), Segment("ba💩z")]).crop(
1, 6
) == Strip([Segment("oo💩"), Segment("b")])
def test_divide():
for repeat in range(3):
assert Strip([Segment("foo")]).divide([1, 2]) == [
Strip([Segment("f")]),
Strip([Segment("o")]),
]

View File

@@ -4,13 +4,13 @@ from rich.segment import Segment
from rich.style import Style
from textual._styles_cache import StylesCache
from textual._types import Lines
from textual.color import Color
from textual.css.styles import Styles
from textual.geometry import Region, Size
from textual.strip import Strip
def _extract_content(lines: Lines):
def _extract_content(lines: list[list[Segment]]):
"""Extract the text content from lines."""
content = ["".join(segment.text for segment in line) for line in lines]
return content
@@ -44,10 +44,11 @@ def test_no_styles():
)
style = Style.from_color(bgcolor=Color.parse("green").rich_color)
expected = [
[Segment("foo", style)],
[Segment("bar", style)],
[Segment("baz", style)],
Strip([Segment("foo", style)], 3),
Strip([Segment("bar", style)], 3),
Strip([Segment("baz", style)], 3),
]
assert lines == expected