allow nested views

This commit is contained in:
Will McGugan
2021-06-22 08:06:07 +01:00
parent 1e8cf0ff06
commit fcc4d29bc7
14 changed files with 394 additions and 202 deletions

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Iterable from typing import Iterable
@@ -8,21 +9,28 @@ from rich.console import Console, ConsoleOptions, RenderableType, RenderResult
from rich.control import Control from rich.control import Control
from rich.segment import Segment from rich.segment import Segment
from ._loop import loop_last
log = logging.getLogger("rich") log = logging.getLogger("rich")
class LineCache: class LineCache:
def __init__(self) -> None: def __init__(self, lines: list[list[Segment]]) -> None:
self.lines: list[list[Segment]] = [] self.lines = lines
self._dirty: list[bool] = []
def update(
self, console: Console, options: ConsoleOptions, renderable: RenderableType
) -> None:
self.lines = console.render_lines(renderable, options, new_lines=True)
self._dirty = [True] * len(self.lines) self._dirty = [True] * len(self.lines)
@classmethod
def from_renderable(
cls,
console: Console,
renderable: RenderableType,
width: int,
height: int,
) -> "LineCache":
options = console.options.update_dimensions(width, height)
lines = console.render_lines(renderable, options)
return cls(lines)
@property @property
def dirty(self) -> bool: def dirty(self) -> bool:
return any(self._dirty) return any(self._dirty)
@@ -30,14 +38,22 @@ class LineCache:
def __rich_console__( def __rich_console__(
self, console: Console, options: ConsoleOptions self, console: Console, options: ConsoleOptions
) -> RenderResult: ) -> RenderResult:
new_line = Segment.line()
for line in self.lines: for line in self.lines:
yield from line yield from line
yield new_line
def render(self, x: int, y: int) -> Iterable[Segment]: def render(self, x: int, y: int, width: int, height: int) -> Iterable[Segment]:
move_to = Control.move_to move_to = Control.move_to
for offset_y, (line, dirty) in enumerate(zip(self.lines, self._dirty), y): lines = self.lines[:height]
new_line = Segment.line()
for last, (offset_y, (line, dirty)) in loop_last(
enumerate(zip(lines, self._dirty), y)
):
if dirty: if dirty:
yield move_to(x, offset_y).segment yield move_to(x, offset_y).segment
yield from line yield from Segment.adjust_line_length(line, width)
if not last:
yield new_line
self._dirty[:] = [False] * len(self.lines) self._dirty[:] = [False] * len(self.lines)

View File

@@ -65,12 +65,12 @@ class XTermParser(Parser[events.Event]):
while not self.is_eof: while not self.is_eof:
character = yield read1() character = yield read1()
log.debug("character=%r", character) # log.debug("character=%r", character)
if character == ESC and ((yield self.peek_buffer()) or more_data()): if character == ESC and ((yield self.peek_buffer()) or more_data()):
sequence: str = character sequence: str = character
while True: while True:
sequence += yield read1() sequence += yield read1()
log.debug(f"sequence=%r", sequence) # log.debug(f"sequence=%r", sequence)
keys = get_ansi_sequence(sequence, None) keys = get_ansi_sequence(sequence, None)
if keys is not None: if keys is not None:
for key in keys: for key in keys:

View File

@@ -9,10 +9,10 @@ import warnings
from rich.control import Control from rich.control import Control
from rich.layout import Layout from rich.layout import Layout
from rich.repr import rich_repr, RichReprResult import rich.repr
from rich.screen import Screen from rich.screen import Screen
from rich import get_console from rich import get_console
from rich.console import Console from rich.console import Console, RenderableType
from . import events from . import events
from . import actions from . import actions
@@ -21,6 +21,7 @@ from .driver import Driver
from ._linux_driver import LinuxDriver from ._linux_driver import LinuxDriver
from .message_pump import MessagePump from .message_pump import MessagePump
from .view import View, LayoutView from .view import View, LayoutView
from .widget import Widget, WidgetBase
log = logging.getLogger("rich") log = logging.getLogger("rich")
@@ -28,9 +29,9 @@ log = logging.getLogger("rich")
# asyncio will warn against resources not being cleared # asyncio will warn against resources not being cleared
warnings.simplefilter("always", ResourceWarning) warnings.simplefilter("always", ResourceWarning)
# https://github.com/boto/boto3/issues/454 # https://github.com/boto/boto3/issues/454
warnings.filterwarnings( # warnings.filterwarnings(
"ignore", category=ResourceWarning, message="unclosed.*<ssl.SSLSocket.*>" # "ignore", category=ResourceWarning, message="unclosed.*<ssl.SSLSocket.*>"
) # )
LayoutDefinition = "dict[str, Any]" LayoutDefinition = "dict[str, Any]"
@@ -47,7 +48,7 @@ class ShutdownError(Exception):
pass pass
@rich_repr @rich.repr.auto
class App(MessagePump): class App(MessagePump):
view: View view: View
@@ -68,24 +69,40 @@ class App(MessagePump):
self.title = title self.title = title
self.view = view or self.create_default_view() self.view = view or self.create_default_view()
self.children: set[MessagePump] = set() self.children: set[MessagePump] = set()
self.focused: WidgetBase | None = None
self.mouse_over: WidgetBase | None = None
self._driver: Driver | None = None self._driver: Driver | None = None
self._action_targets = {"app": self, "view": self.view} self._action_targets = {"app": self, "view": self.view}
def __rich_repr__(self) -> RichReprResult: def __rich_repr__(self) -> rich.repr.RichReprResult:
yield "title", self.title yield "title", self.title
def __rich__(self) -> RenderableType:
return self.view
@classmethod @classmethod
def run( def run(
cls, console: Console = None, screen: bool = True, driver: Type[Driver] = None cls, console: Console = None, screen: bool = True, driver: Type[Driver] = None
): ):
"""Run the app.
Args:
console (Console, optional): Console object. Defaults to None.
screen (bool, optional): Enable application mode. Defaults to True.
driver (Type[Driver], optional): Driver class or None for default. Defaults to None.
"""
async def run_app() -> None: async def run_app() -> None:
app = cls(console=console, screen=screen, driver_class=driver) app = cls(console=console, screen=screen, driver_class=driver)
await app.process_messages() await app.process_messages()
asyncio.run(run_app()) asyncio.run(run_app())
def create_default_view(self) -> View: def create_default_view(self) -> View:
"""Create the default view."""
layout = Layout() layout = Layout()
layout.split_column( layout.split_column(
Layout(name="header", size=3, ratio=0), Layout(name="header", size=3, ratio=0),
@@ -105,6 +122,40 @@ class App(MessagePump):
event = events.ShutdownRequest(sender=self) event = events.ShutdownRequest(sender=self)
asyncio.run_coroutine_threadsafe(self.post_message(event), loop=loop) asyncio.run_coroutine_threadsafe(self.post_message(event), loop=loop)
async def set_focus(self, widget: Widget | None) -> None:
log.debug("set_focus %r", widget)
if widget == self.focused:
return
if widget is None:
if self.focused is not None:
focused = self.focused
self.focused = None
await focused.post_message(events.Blur(self))
elif widget.can_focus:
if self.focused is not None:
await self.focused.post_message(events.Blur(self))
if widget is not None and self.focused != widget:
self.focused = widget
await widget.post_message(events.Focus(self))
async def set_mouse_over(self, widget: WidgetBase | None) -> None:
if widget is None:
if self.mouse_over is not None:
try:
await self.mouse_over.post_message(events.Leave(self))
finally:
self.mouse_over = None
else:
if self.mouse_over != widget:
try:
if self.mouse_over is not None:
await self.mouse_over.forward_event(events.Leave(self))
if widget is not None:
await widget.forward_event(events.Enter(self))
finally:
self.mouse_over = widget
async def process_messages(self) -> None: async def process_messages(self) -> None:
try: try:
await self._process_messages() await self._process_messages()
@@ -119,7 +170,7 @@ class App(MessagePump):
driver = self._driver = self.driver_class(self.console, self) driver = self._driver = self.driver_class(self.console, self)
active_app.set(self) active_app.set(self)
self.view.set_parent(self)
await self.add(self.view) await self.add(self.view)
await self.post_message(events.Startup(sender=self)) await self.post_message(events.Startup(sender=self))
@@ -164,13 +215,12 @@ class App(MessagePump):
return return
if isinstance(event, events.InputEvent): if isinstance(event, events.InputEvent):
if isinstance(event, events.Key) and self.focused is not None:
await self.focused.forward_event(event)
await self.view.forward_event(event) await self.view.forward_event(event)
else: else:
await super().on_event(event) await super().on_event(event)
async def on_idle(self, event: events.Idle) -> None:
await self.view.post_message(event)
async def action(self, action: str) -> None: async def action(self, action: str) -> None:
"""Perform an action. """Perform an action.
@@ -235,7 +285,7 @@ if __name__ == "__main__":
from .widgets.header import Header from .widgets.header import Header
from .widgets.footer import Footer from .widgets.footer import Footer
from .widgets.window import Window
from .widgets.placeholder import Placeholder from .widgets.placeholder import Placeholder
from .scrollbar import ScrollBar from .scrollbar import ScrollBar
@@ -243,17 +293,6 @@ if __name__ == "__main__":
import os import os
readme_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "richreadme.md"
)
scroll_view = LayoutView()
scroll_bar = ScrollBar()
with open(readme_path, "rt") as fh:
readme = Markdown(fh.read(), hyperlinks=True, code_theme="fruity")
scroll_view.layout.split_row(
Layout(readme, ratio=1), Layout(scroll_bar, ratio=2, size=2)
)
# from rich.console import Console # from rich.console import Console
# console = Console() # console = Console()
@@ -280,22 +319,31 @@ if __name__ == "__main__":
footer.add_key("b", "Toggle sidebar") footer.add_key("b", "Toggle sidebar")
footer.add_key("q", "Quit") footer.add_key("q", "Quit")
readme_path = os.path.join( # readme_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "richreadme.md" # os.path.dirname(os.path.abspath(__file__)), "richreadme.md"
) # )
scroll_view = LayoutView() # scroll_view = LayoutView()
scroll_bar = ScrollBar() # scroll_bar = ScrollBar()
with open(readme_path, "rt") as fh: # with open(readme_path, "rt") as fh:
readme = Markdown(fh.read(), hyperlinks=True, code_theme="fruity") # readme = Markdown(fh.read(), hyperlinks=True, code_theme="fruity")
scroll_view.layout.split_column( # scroll_view.layout.split_column(
Layout(readme, ratio=1), Layout(scroll_bar, ratio=2, size=2) # Layout(readme, ratio=1), Layout(scroll_bar, ratio=2, size=2)
) # )
layout = Layout()
layout.split_column(Layout(name="l1"), Layout(name="l2"))
sub_view = LayoutView(name="Sub view", layout=layout)
await sub_view.mount_all(l1=Placeholder(), l2=Placeholder())
await self.view.mount_all( await self.view.mount_all(
header=Header(self.title), header=Header(self.title),
left=Placeholder(), left=Placeholder(),
body=scroll_view, body=sub_view,
footer=footer, footer=footer,
) )
# app = MyApp()
# from rich.console import Console
# console = Console()
# console.print(app, height=30)
MyApp.run() MyApp.run()

View File

@@ -226,18 +226,8 @@ class Timer(Event, type=EventType.TIMER):
yield self.timer.name yield self.timer.name
@rich_repr
class Enter(Event, type=EventType.ENTER): class Enter(Event, type=EventType.ENTER):
__slots__ = ["x", "y"] pass
def __init__(self, sender: MessageTarget, x: int, y: int) -> None:
super().__init__(sender)
self.x = x
self.y = y
def __rich_repr__(self) -> RichReprResult:
yield "x", self.x
yield "y", self.y
class Leave(Event, type=EventType.LEAVE): class Leave(Event, type=EventType.LEAVE):

View File

@@ -49,6 +49,10 @@ class Region(NamedTuple):
x, y = point x, y = point
return ((self_x + width) > x >= self_x) and (((self_y + height) > y >= self_y)) return ((self_x + width) > x >= self_x) and (((self_y + height) > y >= self_y))
def translate(self, x: int, y: int) -> Region:
_x, _y, width, height = self
return Region(self.x + _x, self.y + _y, width, height)
def __contains__(self, other: Any) -> bool: def __contains__(self, other: Any) -> bool:
try: try:
x, y = other x, y = other

View File

@@ -1,7 +1,7 @@
from time import monotonic from time import monotonic
from typing import ClassVar from typing import ClassVar
from rich.repr import rich_repr import rich.repr
from .case import camel_to_snake from .case import camel_to_snake
from ._types import MessageTarget from ._types import MessageTarget
@@ -29,9 +29,8 @@ class Message:
self._stop_propagaton = False self._stop_propagaton = False
super().__init__() super().__init__()
def __rich_repr__(self): def __rich_repr__(self) -> rich.repr.RichReprResult:
return yield self.sender
yield
def __init_subclass__(cls, bubble: bool = False) -> None: def __init_subclass__(cls, bubble: bool = False) -> None:
super().__init_subclass__() super().__init_subclass__()

View File

@@ -152,7 +152,7 @@ class MessagePump:
log.exception("error in dispatch_message") log.exception("error in dispatch_message")
raise raise
finally: finally:
if self._message_queue.empty(): if isinstance(message, events.Event) and self._message_queue.empty():
if not self._closed: if not self._closed:
idle_handler = getattr(self, "on_idle", None) idle_handler = getattr(self, "on_idle", None)
if idle_handler is not None and not self._closed: if idle_handler is not None and not self._closed:
@@ -203,9 +203,11 @@ class MessagePump:
async def emit(self, message: Message) -> bool: async def emit(self, message: Message) -> bool:
if self._parent: if self._parent:
log.debug("EMIT %r -> %r %r", self, self._parent, message)
await self._parent.post_message_from_child(message) await self._parent.post_message_from_child(message)
return True return True
else: else:
log.warning("NO PARENT %r %r", self, message)
return False return False
async def on_timer(self, event: events.Timer) -> None: async def on_timer(self, event: events.Timer) -> None:

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from typing import Iterable
from rich.console import Console, RenderableType
from rich.control import Control
from rich.segment import Segment, Segments
from .geometry import Point
from ._loop import loop_last
class ScreenUpdate:
def __init__(
self, console: Console, renderable: RenderableType, width: int, height: int
) -> None:
self.lines = console.render_lines(
renderable, console.options.update_dimensions(width, height)
)
self.offset = Point(0, 0)
def render(self, x: int, y: int) -> Iterable[Segment]:
move_to = Control.move_to
new_line = Segment.line()
for last, (offset_y, line) in loop_last(enumerate(self.lines, y)):
yield move_to(x, offset_y).segment
yield from line
if not last:
yield new_line
def __rich__(self) -> RenderableType:
x, y = self.offset
update = self.render(x, y)
return Segments(update)

View File

@@ -5,10 +5,10 @@ from time import time
import logging import logging
from typing import Optional, Tuple, TYPE_CHECKING from typing import Optional, Tuple, TYPE_CHECKING
from rich.console import Console, ConsoleOptions, RenderResult from rich.console import Console, ConsoleOptions, RenderResult, RenderableType
from rich.layout import Layout from rich.layout import Layout
from rich.region import Region as LayoutRegion from rich.region import Region as LayoutRegion
from rich.repr import rich_repr, RichReprResult import rich.repr
from rich.segment import Segments from rich.segment import Segments
from . import events from . import events
@@ -16,7 +16,7 @@ from ._context import active_app
from .geometry import Dimensions, Region from .geometry import Dimensions, Region
from .message import Message from .message import Message
from .message_pump import MessagePump from .message_pump import MessagePump
from .widget import Widget, UpdateMessage from .widget import Widget, WidgetBase, UpdateMessage
from .widgets.header import Header from .widgets.header import Header
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -29,7 +29,7 @@ class NoWidget(Exception):
pass pass
class View(ABC, MessagePump): class View(ABC, WidgetBase):
@property @property
def app(self) -> "App": def app(self) -> "App":
return active_app.get() return active_app.get()
@@ -45,44 +45,46 @@ class View(ABC, MessagePump):
yield yield
@abstractmethod @abstractmethod
async def mount(self, widget: MessagePump, *, slot: str = "main") -> None: async def mount(self, widget: Widget, *, slot: str = "main") -> None:
... ...
@abstractmethod async def mount_all(self, **widgets: Widget) -> None:
def get_widget_at(self, x: int, y: int) -> Tuple[Widget, Region]:
...
async def mount_all(self, **widgets: MessagePump) -> None:
for slot, widget in widgets.items(): for slot, widget in widgets.items():
await self.mount(widget, slot=slot) await self.mount(widget, slot=slot)
self.require_repaint()
async def forward_event(self, event: events.Event) -> None: async def forward_event(self, event: events.Event) -> None:
pass pass
@rich_repr @rich.repr.auto
class LayoutView(View): class LayoutView(View):
layout: Layout def __init__(self, layout: Layout = None, name: str = "default") -> None:
def __init__(
self,
layout: Layout = None,
name: str = "default",
title: str = "Layout Application",
) -> None:
self.name = name self.name = name
self.title = title
self.layout = layout or Layout() self.layout = layout or Layout()
self.mouse_over: MessagePump | None = None self.mouse_over: WidgetBase | None = None
self.focused: Widget | None = None self.focused: WidgetBase | None = None
self.size = Dimensions(0, 0) self.size = Dimensions(0, 0)
self._widgets: set[MessagePump] = set() self._widgets: set[WidgetBase] = set()
super().__init__() super().__init__(name)
self.enable_messages(events.Idle) self.enable_messages(events.Idle)
def __rich_repr__(self) -> RichReprResult: def __rich_repr__(self) -> rich.repr.RichReprResult:
yield "name", self.name yield "name", self.name
@property
def is_root_view(self) -> bool:
return self._parent is self.app
# def check_repaint(self) -> bool:
# return True
def render(self) -> RenderableType:
return self.layout
# def __rich__(self) -> Layout:
# return self.render()
# def __rich_console__( # def __rich_console__(
# self, console: Console, options: ConsoleOptions # self, console: Console, options: ConsoleOptions
# ) -> RenderResult: # ) -> RenderResult:
@@ -90,77 +92,84 @@ class LayoutView(View):
# segments = console.render(self.layout, options.update_dimensions(width, height)) # segments = console.render(self.layout, options.update_dimensions(width, height))
# yield from segments # yield from segments
def __rich__(self) -> Layout: def get_widget_at(
return self.layout self, x: int, y: int, offset_x: int = 0, offset_y: int = 0, deep: bool = False
) -> Tuple[Widget, Region]:
def get_widget_at(self, x: int, y: int) -> Tuple[Widget, Region]:
for layout, (layout_region, render) in self.layout.map.items(): for layout, (layout_region, render) in self.layout.map.items():
region = Region(*layout_region) region = Region(*layout_region)
if region.contains(x, y): if region.contains(x, y):
if isinstance(layout.renderable, Widget): widget = layout.renderable
return layout.renderable, region if deep and isinstance(layout.renderable, WidgetBase):
else: widget = layout.renderable
break if isinstance(widget, View):
raise NoWidget(f"No widget at ${x}, ${y}") translate_x = region.x
translate_y = region.y
widget, region = widget.get_widget_at(
x - region.x, y - region.y, deep=True
)
region = region.translate(translate_x, translate_y)
return widget, region
async def repaint(self) -> None: raise NoWidget(f"No widget at {x}, {y}")
await self.emit(UpdateMessage(self))
async def on_event(self, event: events.Event) -> None:
if isinstance(event, events.Resize):
new_size = Dimensions(event.width, event.height)
if self.size != new_size:
self.size = new_size
await self.repaint()
await super().on_event(event)
async def on_message(self, message: Message) -> None: async def on_message(self, message: Message) -> None:
log.debug("on_message %r", repr(message))
if isinstance(message, UpdateMessage):
widget = message.sender
if widget in self._widgets:
for layout, (region, render) in self.layout.map.items():
if layout.renderable is widget:
assert isinstance(widget, Widget)
update = widget.render_update(region.x, region.y)
segments = Segments(update)
self.console.print(segments, end="")
async def mount(self, widget: MessagePump, *, slot: str = "main") -> None: if isinstance(message, UpdateMessage):
widget = message.widget
# if widget in self._widgets:
for layout, (region, render) in self.layout.map.items():
if layout.renderable is message.sender:
if not isinstance(widget, WidgetBase):
continue
log.debug("%r is_root %r", self, self.is_root_view)
if self.is_root_view:
log.debug("RENDERING %r %r %r", widget, message, region)
try:
update = widget.render_update(
region.x + message.offset_x, region.y + message.offset_y
)
except Exception:
log.exception("update error")
raise
self.console.print(Segments(update), end="")
else:
await self._parent.on_message(
UpdateMessage(
self,
widget,
offset_x=message.offset_x + region.x,
offset_y=message.offset_y + region.y,
)
)
break
else:
log.warning("Update widget not found")
# async def on_create(self, event: events.Created) -> None:
# await self.mount(Header(self.title))
async def mount(self, widget: Widget, *, slot: str = "main") -> None:
self.layout[slot].update(widget) self.layout[slot].update(widget)
await self.app.add(widget) await self.app.add(widget)
widget.set_parent(self) widget.set_parent(self)
await widget.post_message(events.Mount(sender=self)) await widget.post_message(events.Mount(sender=self))
self._widgets.add(widget) self._widgets.add(widget)
async def set_focus(self, widget: Optional[Widget]) -> None:
log.debug("set_focus %r", widget)
if widget == self.focused:
return
if widget is None:
if self.focused is not None:
focused = self.focused
self.focused = None
await focused.post_message(events.Blur(self))
elif widget.can_focus:
if self.focused is not None:
await self.focused.post_message(events.Blur(self))
if widget is not None and self.focused != widget:
self.focused = widget
await widget.post_message(events.Focus(self))
async def layout_update(self) -> None: async def layout_update(self) -> None:
if not self.size: if not self.size:
return return
width, height = self.size width, height = self.size
region_map = self.layout._make_region_map(width, height) region_map = self.layout._make_region_map(width, height)
for layout, region in region_map.items(): for layout, region in region_map.items():
if isinstance(layout.renderable, Widget): if isinstance(layout.renderable, WidgetBase):
await layout.renderable.post_message( await layout.renderable.post_message(
events.Resize(self, region.width, region.height) events.Resize(self, region.width, region.height)
) )
await self.repaint() self.app.refresh()
# await self.repaint()
async def on_resize(self, event: events.Resize) -> None: async def on_resize(self, event: events.Resize) -> None:
self.size = Dimensions(event.width, event.height) self.size = Dimensions(event.width, event.height)
@@ -168,25 +177,15 @@ class LayoutView(View):
async def _on_mouse_move(self, event: events.MouseMove) -> None: async def _on_mouse_move(self, event: events.MouseMove) -> None:
try: try:
widget, region = self.get_widget_at(event.x, event.y) widget, region = self.get_widget_at(event.x, event.y, deep=True)
log.debug("mouse over %r %r", widget, region)
except NoWidget: except NoWidget:
if self.mouse_over is not None: await self.app.set_mouse_over(None)
try:
await self.mouse_over.post_message(events.Leave(self))
finally:
self.mouse_over = None
else: else:
if self.mouse_over != widget: await self.app.set_mouse_over(widget)
try:
if self.mouse_over is not None: log.debug("posting mouse move to %r", widget)
await self.mouse_over.post_message(events.Leave(self)) await widget.forward_event(
if widget is not None:
await widget.post_message(
events.Enter(self, event.x - region.x, event.y - region.y)
)
finally:
self.mouse_over = widget
await widget.post_message(
events.MouseMove( events.MouseMove(
self, self,
event.x - region.x, event.x - region.x,
@@ -195,19 +194,21 @@ class LayoutView(View):
event.shift, event.shift,
event.meta, event.meta,
event.ctrl, event.ctrl,
screen_x=event.screen_x,
screen_y=event.screen_y,
) )
) )
async def forward_event(self, event: events.Event) -> None: async def forward_event(self, event: events.Event) -> None:
if isinstance(event, (events.MouseDown)): log.debug("FORWARD %r %r", self, event)
if isinstance(event, (events.Enter, events.Leave)):
await self.post_message(event)
elif isinstance(event, (events.MouseDown)):
try: try:
widget, _region = self.get_widget_at(event.x, event.y) widget, _region = self.get_widget_at(event.x, event.y, deep=True)
except NoWidget: except NoWidget:
await self.set_focus(None) await self.app.set_focus(None)
else: else:
await self.set_focus(widget) await self.app.set_focus(widget)
elif isinstance(event, events.MouseMove): elif isinstance(event, events.MouseMove):
await self._on_mouse_move(event) await self._on_mouse_move(event)
@@ -219,6 +220,7 @@ class LayoutView(View):
pass pass
else: else:
await widget.forward_event(event) await widget.forward_event(event)
elif isinstance(event, (events.MouseScrollDown, events.MouseScrollUp)): elif isinstance(event, (events.MouseScrollDown, events.MouseScrollUp)):
widget, _region = self.get_widget_at(event.x, event.y) widget, _region = self.get_widget_at(event.x, event.y)
scroll_widget = widget or self.focused scroll_widget = widget or self.focused
@@ -232,6 +234,3 @@ class LayoutView(View):
visible = self.layout[layout_name].visible visible = self.layout[layout_name].visible
self.layout[layout_name].visible = not visible self.layout[layout_name].visible = not visible
await self.layout_update() await self.layout_update()
async def on_idle(self, event: events.Idle) -> None:
await self.repaint()

View File

@@ -13,14 +13,15 @@ from typing import (
from rich.align import Align from rich.align import Align
from rich.console import Console, ConsoleOptions, RenderableType, RenderResult from rich.console import Console, ConsoleOptions, RenderableType
from rich.pretty import Pretty from rich.pretty import Pretty
from rich.panel import Panel from rich.panel import Panel
from rich.repr import rich_repr, RichReprResult import rich.repr
from rich.segment import Segment from rich.segment import Segment
from . import events from . import events
from ._context import active_app from ._context import active_app
from ._loop import loop_last
from ._line_cache import LineCache from ._line_cache import LineCache
from .message import Message from .message import Message
from .message_pump import MessagePump from .message_pump import MessagePump
@@ -33,17 +34,39 @@ if TYPE_CHECKING:
log = getLogger("rich") log = getLogger("rich")
T = TypeVar("T")
@rich.repr.auto
class UpdateMessage(Message): class UpdateMessage(Message):
def __init__(
self,
sender: MessagePump,
widget: Widget,
offset_x: int = 0,
offset_y: int = 0,
):
super().__init__(sender)
self.widget = widget
self.offset_x = offset_x
self.offset_y = offset_y
def __rich_repr__(self) -> rich.repr.RichReprResult:
yield self.sender
yield "widget"
yield "offset_x", self.offset_x, 0
yield "offset_y", self.offset_y, 0
def can_batch(self, message: Message) -> bool: def can_batch(self, message: Message) -> bool:
return isinstance(message, UpdateMessage) and message.sender == self.sender return isinstance(message, UpdateMessage) and message.sender == self.sender
class Reactive(Generic[T]): ReactiveType = TypeVar("ReactiveType")
class Reactive(Generic[ReactiveType]):
def __init__( def __init__(
self, default: T, validator: Callable[[object, T], T] | None = None self,
default: ReactiveType,
validator: Callable[[object, ReactiveType], ReactiveType] | None = None,
) -> None: ) -> None:
self._default = default self._default = default
self.validator = validator self.validator = validator
@@ -52,10 +75,10 @@ class Reactive(Generic[T]):
self.internal_name = f"_{name}" self.internal_name = f"_{name}"
setattr(owner, self.internal_name, self._default) setattr(owner, self.internal_name, self._default)
def __get__(self, obj: "Widget", obj_type: type[object]) -> T: def __get__(self, obj: "Widget", obj_type: type[object]) -> ReactiveType:
return getattr(obj, self.internal_name) return getattr(obj, self.internal_name)
def __set__(self, obj: "Widget", value: T) -> None: def __set__(self, obj: "Widget", value: ReactiveType) -> None:
if getattr(obj, self.internal_name) != value: if getattr(obj, self.internal_name) != value:
log.debug("%s -> %s", self.internal_name, value) log.debug("%s -> %s", self.internal_name, value)
if self.validator: if self.validator:
@@ -64,7 +87,8 @@ class Reactive(Generic[T]):
obj.require_repaint() obj.require_repaint()
class Widget(MessagePump): @rich.repr.auto
class WidgetBase(MessagePump):
_count: ClassVar[int] = 0 _count: ClassVar[int] = 0
can_focus: bool = False can_focus: bool = False
@@ -74,10 +98,9 @@ class Widget(MessagePump):
self.size = Dimensions(0, 0) self.size = Dimensions(0, 0)
self.size_changed = False self.size_changed = False
self._repaint_required = False self._repaint_required = False
self._line_cache: LineCache = LineCache()
super().__init__() super().__init__()
self.disable_messages(events.MouseMove) # self.disable_messages(events.MouseMove)
def __init_subclass__( def __init_subclass__(
cls, cls,
@@ -86,9 +109,12 @@ class Widget(MessagePump):
super().__init_subclass__() super().__init_subclass__()
cls.can_focus = can_focus cls.can_focus = can_focus
def __rich_repr__(self) -> RichReprResult: def __rich_repr__(self) -> rich.repr.RichReprResult:
yield "name", self.name yield "name", self.name
def __rich__(self) -> RenderableType:
return self.render()
@property @property
def app(self) -> "App": def app(self) -> "App":
"""Get the current app.""" """Get the current app."""
@@ -97,39 +123,60 @@ class Widget(MessagePump):
@property @property
def console(self) -> Console: def console(self) -> Console:
"""Get the current console.""" """Get the current console."""
try: return active_app.get().console
return active_app.get().console
except LookupError:
return Console()
# def __rich__(self) -> LineCache:
# return self.line_cache
def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
renderable = self.render(console, options)
self._line_cache.update(console, options, renderable)
yield self._line_cache
def require_repaint(self) -> None: def require_repaint(self) -> None:
"""Mark widget as requiring a repaint.
Actual repaint is done by parent on idle.
"""
self._repaint_required = True self._repaint_required = True
def check_repaint(self) -> bool:
return True
return self._repaint_required
async def forward_event(self, event: events.Event) -> None: async def forward_event(self, event: events.Event) -> None:
await self.post_message(event) await self.post_message(event)
async def refresh(self) -> None: async def refresh(self) -> None:
self._repaint_required = True """Re-render the window and repaint it."""
self.require_repaint()
await self.repaint() await self.repaint()
async def repaint(self) -> None: async def repaint(self) -> None:
await self.emit(UpdateMessage(self)) """Instructs parent to repaint this widget."""
await self.emit(UpdateMessage(self, self))
def render_update(self, x: int, y: int) -> Iterable[Segment]: def render_update(self, x: int, y: int) -> Iterable[Segment]:
width, height = self.size """Render an update to a portion of the screen.
yield from self.line_cache.render(x, y, width, height)
def render(self, console: Console, options: ConsoleOptions) -> RenderableType: Args:
x (int): X offset from origin.
y (int): Y offset form origin.
Returns:
Iterable[Segment]: Partial update.
"""
return
width, height = self.size
lines = self.console.render_lines(
self.render(), self.console.options.update_dimensions(width, height)
)
new_line = Segment.line()
for last, line in loop_last(lines):
yield from line
if not last:
yield new_line
def render(self) -> RenderableType:
"""Get renderable for widget.
Returns:
RenderableType: Any renderable
"""
return Panel( return Panel(
Align.center(Pretty(self), vertical="middle"), title=self.__class__.__name__ Align.center(Pretty(self), vertical="middle"), title=self.__class__.__name__
) )
@@ -149,5 +196,56 @@ class Widget(MessagePump):
await super().on_event(event) await super().on_event(event)
async def on_idle(self, event: events.Idle) -> None: async def on_idle(self, event: events.Idle) -> None:
if self.line_cache is None or self.line_cache.dirty: if self.check_repaint():
log.debug("REPAINTING")
await self.repaint() await self.repaint()
class Widget(WidgetBase):
def __init__(self, name: str | None = None) -> None:
super().__init__(name)
self._line_cache: LineCache | None = None
@property
def line_cache(self) -> LineCache:
if self._line_cache is None:
width, height = self.size
start = time()
try:
renderable = self.render()
except Exception:
log.exception("error in render")
raise
self._line_cache = LineCache.from_renderable(
self.console, renderable, width, height
)
log.debug("%.1fms %r render elapsed", (time() - start) * 1000, self)
assert self._line_cache is not None
return self._line_cache
# def __rich__(self) -> LineCache:
# return self.line_cache
def render(self) -> RenderableType:
return self.line_cache
def require_repaint(self) -> None:
self._line_cache = None
super().require_repaint()
def check_repaint(self) -> bool:
return self._line_cache is None or self.line_cache.dirty
def render_update(self, x: int, y: int) -> Iterable[Segment]:
"""Render an update to a portion of the screen.
Args:
x (int): X offset from origin.
y (int): Y offset form origin.
Returns:
Iterable[Segment]: Partial update.
"""
width, height = self.size
yield from self.line_cache.render(x, y, width, height)

View File

@@ -17,7 +17,7 @@ class Footer(Widget):
def add_key(self, key: str, label: str) -> None: def add_key(self, key: str, label: str) -> None:
self.keys.append((key, label)) self.keys.append((key, label))
def render(self, console: Console, options: ConsoleOptions) -> RenderableType: def render(self) -> RenderableType:
text = Text( text = Text(
style="white on dark_green", style="white on dark_green",

View File

@@ -32,7 +32,7 @@ class Header(Widget):
def get_clock(self) -> str: def get_clock(self) -> str:
return datetime.now().time().strftime("%X") return datetime.now().time().strftime("%X")
def render(self, console: Console, options: ConsoleOptions) -> RenderableType: def render(self) -> RenderableType:
header_table = Table.grid(padding=(0, 1), expand=True) header_table = Table.grid(padding=(0, 1), expand=True)
header_table.style = self.style header_table.style = self.style

View File

@@ -3,23 +3,24 @@ from rich.align import Align
from rich.console import Console, ConsoleOptions, RenderableType from rich.console import Console, ConsoleOptions, RenderableType
from rich.panel import Panel from rich.panel import Panel
from rich.pretty import Pretty from rich.pretty import Pretty
from rich.repr import RichReprResult import rich.repr
from .. import events from .. import events
from ..widget import Reactive, Widget from ..widget import Reactive, Widget
@rich.repr.auto
class Placeholder(Widget, can_focus=True): class Placeholder(Widget, can_focus=True):
has_focus: Reactive[bool] = Reactive(False) has_focus: Reactive[bool] = Reactive(False)
mouse_over: Reactive[bool] = Reactive(False) mouse_over: Reactive[bool] = Reactive(False)
def __rich_repr__(self) -> RichReprResult: def __rich_repr__(self) -> rich.repr.RichReprResult:
yield "name", self.name yield "name", self.name
yield "has_focus", self.has_focus yield "has_focus", self.has_focus
yield "mouse_over", self.mouse_over yield "mouse_over", self.mouse_over
def render(self, console: Console, options: ConsoleOptions) -> RenderableType: def render(self) -> RenderableType:
return Panel( return Panel(
Align.center(Pretty(self), vertical="middle"), Align.center(Pretty(self), vertical="middle"),
title=self.__class__.__name__, title=self.__class__.__name__,

View File

@@ -67,7 +67,7 @@ class Window(Widget):
self.renderable = renderable self.renderable = renderable
del self._lines[:] del self._lines[:]
def render(self, console: Console, options: ConsoleOptions) -> RenderableType: def render(self) -> RenderableType:
height = self.size.height height = self.size.height
lines = self.get_lines(console, options) lines = self.get_lines(console, options)
position = int(self.position) position = int(self.position)