smooth scrolling

This commit is contained in:
Will McGugan
2021-06-24 20:11:51 +01:00
parent e555fc04f7
commit 70d18b8b27
15 changed files with 198 additions and 189 deletions

View File

@@ -4,7 +4,7 @@ from textual import events
from textual.app import App
from textual.widgets.header import Header
from textual.widgets.placeholder import Placeholder
from textual.widgets.window import Window
from textual.widgets.scroll_view import ScrollView
with open("richreadme.md", "rt") as fh:
readme = Markdown(fh.read(), hyperlinks=True, code_theme="fruity")
@@ -16,7 +16,7 @@ class MyApp(App):
async def on_startup(self, event: events.Startup) -> None:
await self.view.mount_all(
header=Header(self.title), left=Placeholder(), body=Window(readme)
header=Header(self.title), left=Placeholder(), body=ScrollView(readme)
)

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "textual"
version = "0.1.1"
version = "0.1.2"
description = "Text User Interface using Rich"
authors = ["Will McGugan <willmcgugan@gmail.com>"]
license = "MIT"

View File

@@ -4,22 +4,26 @@ import logging
import asyncio
from time import time
from tracemalloc import start
from typing import Callable
from dataclasses import dataclass
from ._timer import Timer
from ._types import MessageTarget
from .message_pump import MessagePump
EasingFunction = Callable[[float], float]
LinearEasing = lambda value: value
def InOutCubitEasing(x: float) -> float:
return 4 * x * x * x if x < 0.5 else 1 - pow(-2 * x + 2, 3) / 2
# https://easings.net/
EASING = {
"none": lambda x: 1.0,
"round": lambda x: 0.0 if x < 0.5 else 1.0,
"linear": lambda x: x,
"in_cubic": lambda x: x * x * x,
"in_out_cubic": lambda x: 4 * x * x * x if x < 0.5 else 1 - pow(-2 * x + 2, 3) / 2,
"out_cubic": lambda x: 1 - pow(1 - x, 3),
}
log = logging.getLogger("rich")
@@ -35,11 +39,26 @@ class Animation:
end_value: float
easing_function: EasingFunction
def __call__(self, obj: object, time: float) -> bool:
def __call__(self, time: float) -> bool:
if self.duration == 0:
value = self.end_value
else:
progress = min(1.0, (time - self.start_time) / self.duration)
if self.end_value > self.start_value:
eased_progress = self.easing_function(progress)
value = self.start_value + (self.end_value - self.start_value) * eased_progress
setattr(obj, self.attribute, value)
value = (
self.start_value
+ (self.end_value - self.start_value) * eased_progress
)
else:
eased_progress = 1 - self.easing_function(progress)
value = (
self.end_value
+ (self.start_value - self.end_value) * eased_progress
)
setattr(self.obj, self.attribute, value)
return value == self.end_value
@@ -53,22 +72,25 @@ class BoundAnimator:
attribute: str,
value: float,
*,
duration: float = 1,
easing: EasingFunction = InOutCubitEasing,
duration: float | None = None,
speed: float | None = None,
easing: EasingFunction | str = "in_out_cubic",
) -> None:
easing_function = EASING[easing] if isinstance(easing, str) else easing
self._animator.animate(
self._obj,
attribute=attribute,
value=value,
duration=duration,
easing=easing,
speed=speed,
easing=easing_function,
)
class Animator:
def __init__(self, target: MessageTarget) -> None:
def __init__(self, target: MessageTarget, frames_per_second: int = 30) -> None:
self._animations: dict[tuple[object, str], Animation] = {}
self._timer: Timer = Timer(target, 1 / 30, target, callback=self)
self._timer = Timer(target, 1 / frames_per_second, target, callback=self)
async def start(self) -> None:
asyncio.get_event_loop().create_task(self._timer.run())
@@ -85,22 +107,33 @@ class Animator:
attribute: str,
value: float,
*,
duration: float = 1,
easing: EasingFunction = InOutCubitEasing,
duration: float | None = None,
speed: float | None = None,
easing: EasingFunction = EASING["in_out_cubic"],
) -> None:
start_value = getattr(obj, attribute)
start_time = time()
animation_key = (obj, attribute)
if animation_key in self._animations:
self._animations[animation_key](start_time)
start_value = getattr(obj, attribute)
if duration is not None:
animation_duration = duration
else:
animation_duration = abs(value - start_value) / (speed or 50)
animation = Animation(
obj,
attribute=attribute,
start_time=start_time,
duration=duration,
duration=animation_duration,
start_value=start_value,
end_value=value,
easing_function=easing,
)
self._animations[(obj, attribute)] = animation
self._animations[animation_key] = animation
self._timer.resume()
async def __call__(self) -> None:
@@ -111,6 +144,5 @@ class Animator:
animation_keys = list(self._animations.keys())
for animation_key in animation_keys:
animation = self._animations[animation_key]
obj, _attribute = animation_key
if animation(obj, animation_time):
if animation(animation_time):
del self._animations[animation_key]

View File

@@ -41,6 +41,7 @@ class Timer:
self._repeat = repeat
self._stop_event = Event()
self._active = Event()
self._active.set()
def __rich_repr__(self) -> RichReprResult:
yield self._interval

View File

@@ -1,4 +1,5 @@
from __future__ import annotations
from argparse import Action
import asyncio
@@ -13,6 +14,7 @@ import rich.repr
from rich.screen import Screen
from rich import get_console
from rich.console import Console, RenderableType
from rich.traceback import Traceback
from . import events
from . import actions
@@ -45,6 +47,10 @@ LayoutDefinition = "dict[str, Any]"
# uvloop.install()
class ActionError(Exception):
pass
class ShutdownError(Exception):
pass
@@ -163,18 +169,10 @@ class App(MessagePump):
self.mouse_over = widget
async def process_messages(self) -> None:
try:
await self._process_messages()
except Exception:
self.console.print_exception(show_locals=True)
async def _process_messages(self) -> None:
log.debug("driver=%r", self.driver_class)
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, self.on_keyboard_interupt)
# loop = asyncio.get_event_loop()
# loop.add_signal_handler(signal.SIGINT, self.on_keyboard_interupt)
driver = self._driver = self.driver_class(self.console, self)
active_app.set(self)
self.view.set_parent(self)
await self.add(self.view)
@@ -184,14 +182,21 @@ class App(MessagePump):
try:
driver.start_application_mode()
except Exception:
self.console.print_exception()
log.exception("error starting application mode")
raise
else:
traceback: Traceback | None = None
await self.animator.start()
try:
await super().process_messages()
await self.animator.stop()
except Exception:
traceback = Traceback(show_locals=True)
await self.animator.stop()
await self.view.close_messages()
driver.stop_application_mode()
if traceback is not None:
self.console.print(traceback)
async def add(self, child: MessagePump) -> None:
self.children.add(child)
@@ -229,30 +234,33 @@ class App(MessagePump):
else:
await super().on_event(event)
async def action(self, action: str) -> None:
async def action(
self, action: str, default_namespace: object | None = None
) -> None:
"""Perform an action.
Args:
action (str): Action encoded in a string.
"""
default_target = default_namespace or self
target, params = actions.parse(action)
if "." in target:
destination, action_name = target.split(".", 1)
action_target = self._action_targets.get(destination, None)
if action_target is None:
raise ActionError("Action namespace {destination} is not known")
else:
destination = "app"
action_target = default_namespace or self
action_name = action
log.debug("ACTION %r %r", destination, action_name)
await self.dispatch_action(destination, action_name, params)
log.debug("ACTION %r %r", action_target, action_name)
await self.dispatch_action(action_target, action_name, params)
async def dispatch_action(
self, destination: str, action_name: str, params: Any
self, namespace: object, action_name: str, params: Any
) -> None:
action_target = self._action_targets.get(destination, None)
if action_target is not None:
method_name = f"action_{action_name}"
method = getattr(action_target, method_name, None)
method = getattr(namespace, method_name, None)
if method is not None:
await method(*params)

View File

@@ -62,6 +62,17 @@ class Idle(Event):
"""Sent when there are no more items in the message queue."""
class Action(Event):
__slots__ = ["action"]
def __init__(self, sender: MessageTarget, action: str) -> None:
super().__init__(sender)
self.action = action
def __rich_repr__(self) -> RichReprResult:
yield "action", self.action
class Resize(Event):
__slots__ = ["width", "height"]
width: int
@@ -168,7 +179,7 @@ class MouseUp(MouseEvent):
pass
class MouseScrollDown(InputEvent):
class MouseScrollDown(InputEvent, bubble=True):
__slots__ = ["x", "y"]
def __init__(self, sender: MessageTarget, x: int, y: int) -> None:
@@ -177,7 +188,7 @@ class MouseScrollDown(InputEvent):
self.y = y
class MouseScrollUp(MouseScrollDown):
class MouseScrollUp(MouseScrollDown, bubble=True):
pass

View File

@@ -1,6 +1,23 @@
from __future__ import annotations
from typing import Any, NamedTuple
from typing import Any, NamedTuple, TypeVar
T = TypeVar("T", int, float)
def clamp(value: T, minimum: T, maximum: T) -> T:
"""Clamps a value between two other values.
Args:
value (T): A value
minimum (T): Minimum value
maximum (T): maximum value
Returns:
T: New value that is not less than the minimum or greater than the maximum.
"""
return min(max(value, minimum), maximum)
class Point(NamedTuple):

View File

@@ -135,7 +135,7 @@ class MessagePump:
log.exception("error in get_message()")
raise error from None
log.debug("%r -> %r", message, self)
# log.debug("%r -> %r", message, self)
# Combine any pending messages that may supersede this one
while not (self._closed or self._closing):
pending = self.peek_message()
@@ -174,7 +174,8 @@ class MessagePump:
await dispatch_function(event)
if event.bubble and self._parent and not event._stop_propagaton:
if event.sender == self._parent:
log.debug("bubbled event abandoned; %r", event)
pass
# log.debug("bubbled event abandoned; %r", event)
elif not self._parent._closed and not self._parent._closing:
await self._parent.post_message(event)
@@ -204,7 +205,6 @@ class MessagePump:
async def emit(self, message: Message) -> bool:
if self._parent:
log.debug("EMIT %r -> %r %r", self, self._parent, message)
await self._parent.post_message_from_child(message)
return True
else:

View File

@@ -32,14 +32,14 @@ class PageRender:
def move_to(self, x: int = 0, y: int = 0) -> None:
self.offset = Point(x, y)
def refresh(self) -> None:
def clear(self) -> None:
self._render_width = None
self._render_height = None
del self._lines[:]
def update(self, renderable: RenderableType) -> None:
self.renderable = renderable
self.refresh()
self.clear()
def render(self, console: Console, options: ConsoleOptions) -> None:
width = self.width or options.max_width or console.width
@@ -84,6 +84,10 @@ class Page(Widget):
x: Reactive[int] = Reactive(0)
y: Reactive[int] = Reactive(0)
@property
def contents_size(self) -> Dimensions:
return self._page.size
def validate_y(self, value: int) -> int:
return max(0, value)
@@ -91,6 +95,13 @@ class Page(Widget):
x, y = self._page.offset
self._page.offset = Point(x, new)
def update(self, renderable: RenderableType | None = None) -> None:
if renderable:
self._page.update(renderable)
else:
self._page.clear()
@property
def virtual_size(self) -> Dimensions:
return self._page.size

View File

@@ -13,9 +13,18 @@ from rich.style import Style, StyleType
log = logging.getLogger("rich")
from . import events
from .message import Message
from .widget import Reactive, Widget
class ScrollUp(Message, bubble=True):
pass
class ScrollDown(Message, bubble=True):
pass
class ScrollBarRender:
def __init__(
self,

View File

@@ -50,7 +50,7 @@ class View(ABC, WidgetBase):
) -> None:
...
async def mount_all(self, **widgets: Widget) -> None:
async def mount_all(self, **widgets: WidgetBase) -> None:
for slot, widget in widgets.items():
await self.mount(widget, slot=slot)
self.require_repaint()
@@ -129,9 +129,7 @@ class LayoutView(View):
if not isinstance(widget, WidgetBase):
continue
log.debug("%r is_root %r", self, self.is_root_view)
if self.is_root_view:
log.debug("RENDERING %r %r %r", widget, message, region)
try:
update = widget.render_update(
region.x + message.offset_x, region.y + message.offset_y
@@ -151,7 +149,8 @@ class LayoutView(View):
)
break
else:
log.warning("Update widget not found")
pass
# log.warning("Update widget not found")
# async def on_create(self, event: events.Created) -> None:
# await self.mount(Header(self.title))
@@ -188,14 +187,11 @@ class LayoutView(View):
async def _on_mouse_move(self, event: events.MouseMove) -> None:
try:
widget, region = self.get_widget_at(event.x, event.y, deep=True)
log.debug("MOVE =%r %r", widget, region)
log.debug("mouse over %r %r", widget, region)
except NoWidget:
await self.app.set_mouse_over(None)
else:
await self.app.set_mouse_over(widget)
log.debug("posting mouse move to %r", widget)
await widget.forward_event(
events.MouseMove(
self,
@@ -209,7 +205,7 @@ class LayoutView(View):
)
async def forward_event(self, event: events.Event) -> None:
log.debug("FORWARD %r %r", self, event)
if isinstance(event, (events.Enter, events.Leave)):
await self.post_message(event)
@@ -217,7 +213,6 @@ class LayoutView(View):
await self._on_mouse_move(event)
elif isinstance(event, events.MouseEvent):
log.debug("MOUSE %r", event)
try:
widget, region = self.get_widget_at(event.x, event.y, deep=True)
except NoWidget:

View File

@@ -82,7 +82,6 @@ class Reactive(Generic[ReactiveType]):
def __set__(self, obj: "WidgetBase", value: ReactiveType) -> None:
if getattr(obj, self.internal_name) != value:
log.debug("%s -> %s", self.internal_name, value)
current_value = getattr(obj, self.internal_name, None)
validate_function = getattr(obj, f"validate_{self.name}", None)
@@ -205,10 +204,12 @@ class WidgetBase(MessagePump):
Align.center(Pretty(self), vertical="middle"), title=self.__class__.__name__
)
async def action(self, action: str, *params) -> None:
await self.app.action(action, self)
async def post_message(self, message: Message) -> bool:
if not self.check_message_enabled(message):
return True
return await super().post_message(message)
async def on_event(self, event: events.Event) -> None:
@@ -221,7 +222,6 @@ class WidgetBase(MessagePump):
async def on_idle(self, event: events.Idle) -> None:
if self.check_repaint():
log.debug("REPAINTING")
await self.repaint()
@@ -244,13 +244,15 @@ class Widget(WidgetBase):
self._line_cache = LineCache.from_renderable(
self.console, renderable, width, height
)
log.debug("%.1fms %r render elapsed", (time() - start) * 1000, self)
assert self._line_cache is not None
return self._line_cache
# def __rich__(self) -> LineCache:
# return self.line_cache
async def focus(self) -> None:
await self.app.set_focus(self)
def get_style_at(self, x: int, y: int) -> Style:
return self.line_cache.get_style_at(x, y)
@@ -279,7 +281,9 @@ class Widget(WidgetBase):
yield from self.line_cache.render(x, y, width, height)
async def on_mouse_move(self, event: events.MouseMove) -> None:
log.debug("%r", self.get_style_at(event.x, event.y))
style_under_cursor = self.get_style_at(event.x, event.y)
if style_under_cursor:
log.debug("%r", style_under_cursor)
async def on_mouse_up(self, event: events.MouseUp) -> None:
style = self.get_style_at(event.x, event.y)

View File

@@ -51,5 +51,4 @@ class Header(Widget):
return header
async def on_mount(self, event: events.Mount) -> None:
return
self.set_interval(1.0, callback=self.refresh)

View File

@@ -9,7 +9,7 @@ from rich.text import Text
from .. import events
from ..scrollbar import ScrollBar
from ..geometry import clamp
from ..page import Page
from ..view import LayoutView
from ..widget import Reactive, StaticWidget
@@ -20,8 +20,13 @@ log = logging.getLogger("rich")
class ScrollView(LayoutView):
def __init__(
self, renderable: RenderableType, name: str | None = None, style: StyleType = ""
self,
renderable: RenderableType,
name: str | None = None,
style: StyleType = "",
fluid: bool = True,
) -> None:
self.fluid = fluid
layout = Layout(name="outer")
layout.split_row(
Layout(name="content", ratio=1), Layout(name="vertical_scrollbar", size=1)
@@ -37,12 +42,17 @@ class ScrollView(LayoutView):
x: Reactive[float] = Reactive(0)
y: Reactive[float] = Reactive(0)
def validate_y(self, value: float) -> int:
return max(0, round(value))
target_y: Reactive[float] = Reactive(0)
def validate_y(self, value: float) -> float:
return clamp(value, 0, self._page.contents_size.height - self.size.height)
def validate_target_y(self, value: float) -> float:
return clamp(value, 0, self._page.contents_size.height - self.size.height)
def update_y(self, old_value: float, new_value: float) -> None:
self._page.y = int(new_value)
self._vertical_scrollbar.position = int(new_value)
self._page.y = round(new_value)
self._vertical_scrollbar.position = round(new_value)
async def on_mount(self, event: events.Mount) -> None:
await self.mount_all(
@@ -54,22 +64,32 @@ class ScrollView(LayoutView):
async def on_idle(self, event: events.Idle) -> None:
self._vertical_scrollbar.virtual_size = self._page.virtual_size.height
self._vertical_scrollbar.window_size = self.size.height
# self._vertical_scrollbar.position = self.position_y
await super().on_idle(event)
async def on_mouse_scroll_up(self, event: events.MouseScrollUp) -> None:
self._vertical_scrollbar.position += 1
self.target_y += 1.5
self.animate("y", self.target_y, easing="out_cubic", speed=80)
async def on_mouse_scroll_down(self, event: events.MouseScrollUp) -> None:
self._vertical_scrollbar.position -= 1
self.target_y -= 1.5
self.animate("y", self.target_y, easing="out_cubic", speed=80)
async def on_key(self, event: events.Key) -> None:
key = event.key
if key == "down":
self.y += 1
self.target_y += 2
self.animate("y", self.target_y, easing="linear", speed=100)
elif key == "up":
self.y -= 1
self.target_y -= 2
self.animate("y", self.target_y, easing="linear", speed=100)
elif key == "pagedown":
self.animate("y", self.y + self.size.height)
self.target_y += self.size.height
self.animate("y", self.target_y, easing="out_cubic")
elif key == "pageup":
self.animate("y", self.y - self.size.height)
self.target_y -= self.size.height
self.animate("y", self.target_y, easing="out_cubic")
async def on_resize(self, event: events.Resize) -> None:
if self.fluid:
self._page.update()
await super().on_resize(event)

View File

@@ -1,98 +0,0 @@
from __future__ import annotations
import logging
import logging
import sys
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal
from rich.console import Console, ConsoleOptions, RenderableType
from rich.padding import Padding
from rich.segment import Segment
from .. import events
from ..widget import Widget, Reactive
from ..geometry import Point, Dimensions
from ..scrollbar import VerticalBar
log = logging.getLogger("rich")
ScrollMethod = Literal["always", "never", "auto", "overlay"]
class Window(Widget):
def __init__(
self, renderable: RenderableType, y_scroll: ScrollMethod = "always"
) -> None:
self.renderable = renderable
self.y_scroll = y_scroll
self._virtual_size: Dimensions = Dimensions(0, 0)
self._renderable_updated = True
self._lines: list[list[Segment]] = []
super().__init__()
def _validate_position(self, position: float) -> float:
_position = position
validated_pos = min(
max(0, position), self._virtual_size.height - self.size.height
)
log.debug("virtual_size=%r size=%r", self._virtual_size, self.size)
log.debug("%r %r", _position, validated_pos)
return validated_pos
position: Reactive[float] = Reactive(60, validator=_validate_position)
show_vertical_bar: Reactive[bool] = Reactive(True)
@property
def virtual_size(self) -> Dimensions:
return self._virtual_size or self.size
def get_lines(
self, console: Console, options: ConsoleOptions
) -> list[list[Segment]]:
if not self._lines:
width = self.size.width
if self.show_vertical_bar and self.y_scroll != "overlay":
width -= 1
self._lines = console.render_lines(
Padding(self.renderable, 1), options.update_width(width)
)
self._virtual_size = Dimensions(0, len(self._lines))
return self._lines
def update(self, renderable: RenderableType) -> None:
self.renderable = renderable
del self._lines[:]
def render(self) -> RenderableType:
height = self.size.height
lines = self.get_lines(console, options)
position = int(self.position)
log.debug("%r, %r, %r", height, self._virtual_size, self.position)
return VerticalBar(
lines[position : position + height],
height,
self._virtual_size.height,
self.position,
overlay=self.y_scroll == "overlay",
)
async def on_key(self, event: events.Key) -> None:
if event.key == "down":
self.position += 1
elif event.key == "up":
self.position -= 1
async def on_mouse_scroll_up(self, event: events.MouseScrollUp) -> None:
self.position += 1
async def on_mouse_scroll_down(self, event: events.MouseScrollUp) -> None:
self.position -= 1
async def on_resize(self, event: events.Resize) -> None:
del self._lines[:]
self.position = self.position
self.require_repaint()