tui module moved from Rich repo

This commit is contained in:
Will McGugan
2021-05-31 11:08:09 +01:00
parent db5e202e26
commit 8f6b84c14f
22 changed files with 1447 additions and 26 deletions

37
.gitignore vendored
View File

@@ -1,3 +1,13 @@
*.ipynb
.pytype
.DS_Store
.vscode
mypy_report
docs/build
docs/source/_build
tools/*.txt
playground/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
@@ -20,8 +30,6 @@ parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
@@ -40,14 +48,12 @@ pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
@@ -59,7 +65,6 @@ coverage.xml
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
@@ -77,26 +82,11 @@ target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
# celery beat schedule file
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
@@ -122,8 +112,3 @@ venv.bak/
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

83
poetry.lock generated Normal file
View File

@@ -0,0 +1,83 @@
[[package]]
name = "colorama"
version = "0.4.4"
description = "Cross-platform colored terminal text."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
[[package]]
name = "commonmark"
version = "0.9.1"
description = "Python parser for the CommonMark Markdown spec"
category = "main"
optional = false
python-versions = "*"
[package.extras]
test = ["flake8 (==3.7.8)", "hypothesis (==3.55.3)"]
[[package]]
name = "pygments"
version = "2.9.0"
description = "Pygments is a syntax highlighting package written in Python."
category = "main"
optional = false
python-versions = ">=3.5"
[[package]]
name = "rich"
version = "10.2.2"
description = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal"
category = "main"
optional = false
python-versions = "^3.6"
develop = false
[package.dependencies]
typing-extensions = {version = "^3.7.4", markers = "python_version < \"3.8\""}
pygments = "^2.6.0"
colorama = "^0.4.0"
commonmark = "^0.9.0"
[package.extras]
jupyter = ["ipywidgets (>=7.5.1,<8.0.0)"]
[package.source]
type = "git"
url = "git@github.com:willmcgugan/rich"
reference = "tui"
resolved_reference = "34630b02b60c1c1cce4b072410cd93f6a6385039"
[[package]]
name = "typing-extensions"
version = "3.10.0.0"
description = "Backported and Experimental Type Hints for Python 3.5+"
category = "main"
optional = false
python-versions = "*"
[metadata]
lock-version = "1.1"
python-versions = "^3.7"
content-hash = "b5400274e471bd51f46cb352194316d479f45e3affa0d9aba926a7bbf2e0c551"
[metadata.files]
colorama = [
{file = "colorama-0.4.4-py2.py3-none-any.whl", hash = "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2"},
{file = "colorama-0.4.4.tar.gz", hash = "sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b"},
]
commonmark = [
{file = "commonmark-0.9.1-py2.py3-none-any.whl", hash = "sha256:da2f38c92590f83de410ba1a3cbceafbc74fee9def35f9251ba9a971d6d66fd9"},
{file = "commonmark-0.9.1.tar.gz", hash = "sha256:452f9dc859be7f06631ddcb328b6919c67984aca654e5fefb3914d54691aed60"},
]
pygments = [
{file = "Pygments-2.9.0-py3-none-any.whl", hash = "sha256:d66e804411278594d764fc69ec36ec13d9ae9147193a1740cd34d272ca383b8e"},
{file = "Pygments-2.9.0.tar.gz", hash = "sha256:a18f47b506a429f6f4b9df81bb02beab9ca21d0a5fee38ed15aef65f0545519f"},
]
rich = []
typing-extensions = [
{file = "typing_extensions-3.10.0.0-py2-none-any.whl", hash = "sha256:0ac0f89795dd19de6b97debb0c6af1c70987fd80a2d62d1958f7e56fcc31b497"},
{file = "typing_extensions-3.10.0.0-py3-none-any.whl", hash = "sha256:779383f6086d90c99ae41cf0ff39aac8a7937a9283ce0a414e5dd782f4c94a84"},
{file = "typing_extensions-3.10.0.0.tar.gz", hash = "sha256:50b6f157849174217d0656f99dc82fe932884fb250826c18350e159ec6cdf342"},
]

19
pyproject.toml Normal file
View File

@@ -0,0 +1,19 @@
[tool.poetry]
name = "rich.tui"
version = "0.1.0"
description = "Build Text User Interfaces with Rich"
authors = ["Will McGugan <willmcgugan@gmail.com>"]
license = "MIT"
packages = [
{ include = "rich" }
]
[tool.poetry.dependencies]
python = "^3.7"
[tool.poetry.dev-dependencies]
rich = {git = "git@github.com:willmcgugan/rich", rev = "tui"}
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

7
rich-tui.code-workspace Normal file
View File

@@ -0,0 +1,7 @@
{
"folders": [
{
"path": "."
}
]
}

8
rich/tui/_context.py Normal file
View File

@@ -0,0 +1,8 @@
from typing import TYPE_CHECKING
from contextvars import ContextVar
if TYPE_CHECKING:
from .app import App
active_app: ContextVar["App"] = ContextVar("active_app")

79
rich/tui/_timer.py Normal file
View File

@@ -0,0 +1,79 @@
from time import monotonic
from typing import Awaitable, Optional, Callable
from asyncio import Event, wait_for, TimeoutError
import weakref
from rich.repr import rich_repr, RichReprResult
from . import events
from .types import MessageTarget
TimerCallback = Callable[[], Awaitable[None]]
class EventTargetGone(Exception):
pass
@rich_repr
class Timer:
_timer_count: int = 1
def __init__(
self,
event_target: MessageTarget,
interval: float,
sender: MessageTarget,
*,
name: Optional[str] = None,
callback: Optional[TimerCallback] = None,
repeat: int = None,
) -> None:
self._target_repr = repr(event_target)
self._target = weakref.ref(event_target)
self._interval = interval
self.sender = sender
self.name = f"Timer#{self._timer_count}" if name is None else name
self._timer_count += 1
self._callback = callback
self._repeat = repeat
self._stop_event = Event()
def __rich_repr__(self) -> RichReprResult:
yield self._interval
yield "name", self.name
yield "repeat", self._repeat, None
@property
def target(self) -> MessageTarget:
target = self._target()
if target is None:
raise EventTargetGone()
return target
def stop(self) -> None:
self._stop_event.set()
async def run(self) -> None:
count = 0
_repeat = self._repeat
_interval = self._interval
_wait = self._stop_event.wait
start = monotonic()
while _repeat is None or count <= _repeat:
next_timer = start + (count * _interval)
try:
if await wait_for(_wait(), max(0, next_timer - monotonic())):
break
except TimeoutError:
pass
event = events.Timer(
self.sender, timer=self, count=count, callback=self._callback
)
try:
await self.target.post_message(event)
except EventTargetGone:
break
count += 1

34
rich/tui/actions.py Normal file
View File

@@ -0,0 +1,34 @@
from time import time
from typing import ClassVar
from enum import auto, Enum
from .case import camel_to_snake
class ActionType(Enum):
CUSTOM = auto()
QUIT = auto()
class Action:
type: ClassVar[ActionType]
def __init__(self) -> None:
self.time = time()
def __init_subclass__(cls, type: ActionType) -> None:
super().__init_subclass__()
cls.type = type
@property
def name(self) -> str:
if not hasattr(self, "_name"):
_name = camel_to_snake(self.__class__.__name__)
if _name.endswith("_event"):
_name = _name[:-6]
self._name = _name
return self._name
class QuitAction(Action, type=ActionType.QUIT):
pass

131
rich/tui/app.py Normal file
View File

@@ -0,0 +1,131 @@
import asyncio
import logging
import signal
from typing import Any, Dict, Set
from rich.control import Control
from rich.repr import rich_repr, RichReprResult
from rich.screen import Screen
from . import events
from ._context import active_app
from .. import get_console
from ..console import Console
from .driver import Driver, CursesDriver
from .message_pump import MessagePump
from .view import View, LayoutView
log = logging.getLogger("rich")
LayoutDefinition = Dict[str, Any]
@rich_repr
class App(MessagePump):
view: View
def __init__(
self,
console: Console = None,
view: View = None,
screen: bool = True,
title: str = "Megasoma Application",
):
super().__init__()
self.console = console or get_console()
self._screen = screen
self.title = title
self.view = view or LayoutView()
self.children: Set[MessagePump] = set()
def __rich_repr__(self) -> RichReprResult:
yield "title", self.title
@classmethod
def run(cls, console: Console = None, screen: bool = True):
async def run_app() -> None:
app = cls(console=console, screen=screen)
await app.process_messages()
asyncio.run(run_app())
def on_keyboard_interupt(self) -> None:
loop = asyncio.get_event_loop()
event = events.ShutdownRequest(sender=self)
asyncio.run_coroutine_threadsafe(self.post_message(event), loop=loop)
async def process_messages(self) -> None:
loop = asyncio.get_event_loop()
driver = CursesDriver(self.console, self)
driver.start_application_mode()
loop.add_signal_handler(signal.SIGINT, self.on_keyboard_interupt)
active_app.set(self)
await self.add(self.view)
await self.post_message(events.Startup(sender=self))
self.refresh()
try:
await super().process_messages()
finally:
loop.remove_signal_handler(signal.SIGINT)
driver.stop_application_mode()
await asyncio.gather(*(child.close_messages() for child in self.children))
self.children.clear()
async def add(self, child: MessagePump) -> None:
self.children.add(child)
asyncio.create_task(child.process_messages())
await child.post_message(events.Created(sender=self))
def refresh(self) -> None:
console = self.console
with console:
console.print(
Screen(Control.home(), self.view, Control.home(), application_mode=True)
)
async def on_startup(self, event: events.Startup) -> None:
pass
async def on_shutdown_request(self, event: events.ShutdownRequest) -> None:
await self.close_messages()
async def on_resize(self, event: events.Resize) -> None:
await self.view.post_message(event)
if not event.suppressed:
self.refresh()
async def on_mouse_move(self, event: events.MouseMove) -> None:
await self.view.post_message(event)
if __name__ == "__main__":
import asyncio
from logging import FileHandler
from .widgets.header import Header
from .widgets.placeholder import Placeholder
logging.basicConfig(
level="NOTSET",
format="%(message)s",
datefmt="[%X]",
handlers=[FileHandler("richtui.log")],
)
class MyApp(App):
async def on_key(self, event: events.Key) -> None:
log.debug("on_key %r", event)
if event.key == "q":
await self.close_messages()
async def on_startup(self, event: events.Startup) -> None:
await self.view.mount(Header(self.title), slot="header")
await self.view.mount(Placeholder(), slot="body")
self.refresh()
MyApp.run()

22
rich/tui/case.py Normal file
View File

@@ -0,0 +1,22 @@
import re
def camel_to_snake(name: str, _re_snake=re.compile("[a-z][A-Z]")) -> str:
"""Convert name from CamelCase to snake_case.
Args:
name (str): A symbol name, such as a class name.
Returns:
str: Name in camel case.
"""
def repl(match) -> str:
lower, upper = match.group()
return f"{lower}_{upper.lower()}"
return _re_snake.sub(repl, name).lower()
if __name__ == "__main__":
print(camel_to_snake("HelloWorldEvent"))

186
rich/tui/driver.py Normal file
View File

@@ -0,0 +1,186 @@
from abc import ABC, abstractmethod
import asyncio
import logging
import os
import signal
import curses
import platform
import sys
import shutil
from threading import Event, Thread
from typing import Optional, Tuple, TYPE_CHECKING
from . import events
from .types import MessageTarget
if TYPE_CHECKING:
from ..console import Console
log = logging.getLogger("rich")
WINDOWS = platform.system() == "Windows"
class Driver(ABC):
def __init__(self, console: "Console", target: "MessageTarget") -> None:
self.console = console
self._target = target
@abstractmethod
def start_application_mode(self):
...
@abstractmethod
def stop_application_mode(self):
...
class CursesDriver(Driver):
_MOUSE_PRESSED = [
curses.BUTTON1_PRESSED,
curses.BUTTON2_PRESSED,
curses.BUTTON3_PRESSED,
curses.BUTTON4_PRESSED,
]
_MOUSE_RELEASED = [
curses.BUTTON1_RELEASED,
curses.BUTTON2_RELEASED,
curses.BUTTON3_RELEASED,
curses.BUTTON4_RELEASED,
]
_MOUSE_CLICKED = [
curses.BUTTON1_CLICKED,
curses.BUTTON2_CLICKED,
curses.BUTTON3_CLICKED,
curses.BUTTON4_CLICKED,
]
_MOUSE_DOUBLE_CLICKED = [
curses.BUTTON1_DOUBLE_CLICKED,
curses.BUTTON2_DOUBLE_CLICKED,
curses.BUTTON3_DOUBLE_CLICKED,
curses.BUTTON4_DOUBLE_CLICKED,
]
_MOUSE = [
(events.MousePressed, _MOUSE_PRESSED),
(events.MouseReleased, _MOUSE_RELEASED),
(events.MouseClicked, _MOUSE_CLICKED),
(events.MouseDoubleClicked, _MOUSE_DOUBLE_CLICKED),
]
def __init__(self, console: "Console", target: "MessageTarget") -> None:
super().__init__(console, target)
self._stdscr = None
self._exit_event = Event()
self._key_thread: Optional[Thread] = None
def _get_terminal_size(self) -> Tuple[int, int]:
width: Optional[int] = 80
height: Optional[int] = 25
if WINDOWS: # pragma: no cover
width, height = shutil.get_terminal_size()
else:
try:
width, height = os.get_terminal_size(sys.stdin.fileno())
except (AttributeError, ValueError, OSError):
try:
width, height = os.get_terminal_size(sys.stdout.fileno())
except (AttributeError, ValueError, OSError):
pass
width = width or 80
height = height or 25
return width, height
def start_application_mode(self):
loop = asyncio.get_event_loop()
def on_terminal_resize(signum, stack) -> None:
terminal_size = self.console.size = self._get_terminal_size()
width, height = terminal_size
event = events.Resize(self._target, width, height)
self.console.size = terminal_size
asyncio.run_coroutine_threadsafe(
self._target.post_message(event),
loop=loop,
)
signal.signal(signal.SIGWINCH, on_terminal_resize)
self._stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
curses.halfdelay(1)
curses.mousemask(curses.REPORT_MOUSE_POSITION | curses.ALL_MOUSE_EVENTS)
# curses.mousemask(-1)
self._stdscr.keypad(True)
self.console.show_cursor(False)
self.console.file.write("\033[?1003h\n")
self._key_thread = Thread(
target=self.run_key_thread, args=(asyncio.get_event_loop(),)
)
self.console.size = self._get_terminal_size()
self._key_thread.start()
def stop_application_mode(self):
signal.signal(signal.SIGWINCH, signal.SIG_DFL)
self._exit_event.set()
self._key_thread.join()
curses.nocbreak()
self._stdscr.keypad(False)
curses.echo()
curses.endwin()
self.console.show_cursor(True)
def run_key_thread(self, loop) -> None:
stdscr = self._stdscr
assert stdscr is not None
exit_event = self._exit_event
def send_event(event: events.Event) -> None:
asyncio.run_coroutine_threadsafe(
self._target.post_message(event),
loop=loop,
)
while not exit_event.is_set():
code = stdscr.getch()
if code == -1:
continue
if code == curses.KEY_MOUSE:
try:
_id, x, y, _z, button_state = curses.getmouse()
except Exception:
log.exception("error in curses.getmouse")
else:
if button_state & curses.REPORT_MOUSE_POSITION:
send_event(events.MouseMove(self._target, x, y))
alt = bool(button_state & curses.BUTTON_ALT)
ctrl = bool(button_state & curses.BUTTON_CTRL)
shift = bool(button_state & curses.BUTTON_SHIFT)
for event_type, masks in self._MOUSE:
for button, mask in enumerate(masks, 1):
if button_state & mask:
send_event(
event_type(
self._target,
x,
y,
button,
alt=alt,
ctrl=ctrl,
shift=shift,
)
)
else:
send_event(events.Key(self._target, code=code))

228
rich/tui/events.py Normal file
View File

@@ -0,0 +1,228 @@
from dataclasses import dataclass, field
import re
from enum import auto, Enum
from time import monotonic
from typing import ClassVar, Optional, Set, TYPE_CHECKING
from ..repr import rich_repr, RichReprResult
from .message import Message
from .types import Callback, MessageTarget
if TYPE_CHECKING:
from ._timer import Timer as TimerClass
from ._timer import TimerCallback
class EventType(Enum):
"""Event type enumeration."""
LOAD = auto()
STARTUP = auto()
CREATED = auto()
IDLE = auto()
RESIZE = auto()
MOUNT = auto()
UNMOUNT = auto()
SHUTDOWN_REQUEST = auto()
SHUTDOWN = auto()
EXIT = auto()
REFRESH = auto()
TIMER = auto()
FOCUS = auto()
BLUR = auto()
KEY = auto()
MOUSE_MOVE = auto()
MOUSE_PRESSED = auto()
MOUSE_RELEASED = auto()
MOUSE_CLICKED = auto()
MOUSE_DOUBLE_CLICKED = auto()
MOUSE_ENTER = auto()
MOUSE_LEAVE = auto()
CUSTOM = 1000
@rich_repr
class Event(Message):
type: ClassVar[EventType]
def __rich_repr__(self) -> RichReprResult:
return
yield
def __init_subclass__(
cls, type: EventType, priority: int = 0, bubble: bool = False
) -> None:
super().__init_subclass__(priority=priority, bubble=bubble)
def __enter__(self) -> "Event":
return self
def __exit__(self, exc_type, exc_value, exc_tb) -> Optional[bool]:
if exc_type is not None:
# Log and suppress exception
return True
class ShutdownRequest(Event, type=EventType.SHUTDOWN_REQUEST):
pass
class Load(Event, type=EventType.SHUTDOWN_REQUEST):
pass
class Startup(Event, type=EventType.SHUTDOWN_REQUEST):
pass
class Created(Event, type=EventType.CREATED):
pass
class Idle(Event, type=EventType.IDLE):
"""Sent when there are no more items in the message queue."""
class Resize(Event, type=EventType.RESIZE):
width: int
height: int
def __init__(self, sender: MessageTarget, width: int, height: int) -> None:
self.width = width
self.height = height
super().__init__(sender)
def __rich_repr__(self) -> RichReprResult:
yield self.width
yield self.height
class Mount(Event, type=EventType.MOUNT):
pass
class Unmount(Event, type=EventType.UNMOUNT):
pass
class Shutdown(Event, type=EventType.SHUTDOWN):
pass
class Refresh(Event, type=EventType.REFRESH):
pass
@rich_repr
class Key(Event, type=EventType.KEY, bubble=True):
code: int = 0
def __init__(self, sender: MessageTarget, code: int) -> None:
super().__init__(sender)
self.code = code
def __rich_repr__(self) -> RichReprResult:
yield "code", self.code
yield "key", self.key
@property
def key(self) -> str:
return chr(self.code)
@rich_repr
class MouseMove(Event, type=EventType.MOUSE_MOVE):
def __init__(self, sender: MessageTarget, x: int, y: int) -> None:
super().__init__(sender)
self.x = x
self.y = y
def __rich_repr__(self) -> RichReprResult:
yield "x", self.x
yield "y", self.y
@rich_repr
class MousePressed(Event, type=EventType.MOUSE_PRESSED):
def __init__(
self,
sender: MessageTarget,
x: int,
y: int,
button: int,
alt: bool = False,
ctrl: bool = False,
shift: bool = False,
) -> None:
super().__init__(sender)
self.x = x
self.y = y
self.button = button
self.alt = alt
self.ctrl = ctrl
self.shift = shift
def __rich_repr__(self) -> RichReprResult:
yield "x", self.x
yield "y", self.y
yield "button", self.button,
yield "alt", self.alt, False
yield "ctrl", self.ctrl, False
yield "shift", self.shift, False
class MouseReleased(MousePressed, type=EventType.MOUSE_RELEASED):
pass
class MouseClicked(MousePressed, type=EventType.MOUSE_CLICKED):
pass
class MouseDoubleClicked(MousePressed, type=EventType.MOUSE_DOUBLE_CLICKED):
pass
@rich_repr
class Timer(Event, type=EventType.TIMER, priority=10):
def __init__(
self,
sender: MessageTarget,
timer: "TimerClass",
count: int = 0,
callback: Optional["TimerCallback"] = None,
) -> None:
super().__init__(sender)
self.timer = timer
self.count = count
self.callback = callback
def __rich_repr__(self) -> RichReprResult:
yield self.timer.name
@rich_repr
class MouseEnter(Event, type=EventType.MOUSE_ENTER):
def __init__(self, sender: MessageTarget, x: int, y: int) -> None:
super().__init__(sender)
self.x = x
self.y = y
def __rich_repr__(self) -> RichReprResult:
yield "x", self.x
yield "y", self.y
@rich_repr
class MouseLeave(Event, type=EventType.MOUSE_LEAVE):
pass
class Focus(Event, type=EventType.FOCUS):
pass
class Blur(Event, type=EventType.BLUR):
pass

View File

@@ -0,0 +1,28 @@
from rich.layout import Layout
from rich.table import Table
from rich.tui.app import App
from rich.widgets.color_changer import ColorChanger
class SimpleApp(App):
table: Table
def __init__(self):
super().__init__()
self.table = table = Table("foo", "bar", "baz")
table.add_row("1", "2", "3")
def visualize(self):
layout = Layout()
layout.split_column(
Layout(self.table, name="top"), Layout(ColorChanger(), name="bottom")
)
layout["bottom"].split_row(Layout(name="left"), Layout(name="right"))
return layout
if __name__ == "__main__":
app = SimpleApp()
app.run_message_loop()

34
rich/tui/message.py Normal file
View File

@@ -0,0 +1,34 @@
from time import monotonic
from typing import ClassVar
from .case import camel_to_snake
from .types import MessageTarget
class Message:
"""Base class for a message."""
sender: MessageTarget
bubble: ClassVar[bool] = False
default_priority: ClassVar[int] = 0
suppressed: bool = False
def __init__(self, sender: MessageTarget) -> None:
self.sender = sender
self.name = camel_to_snake(self.__class__.__name__)
self.time = monotonic()
def __init_subclass__(cls, bubble: bool = False, priority: int = 0) -> None:
super().__init_subclass__()
cls.bubble = bubble
cls.default_priority = priority
def suppress_default(self, suppress: bool = True) -> None:
"""Suppress the default action.
Args:
suppress (bool, optional): True if the default action should be suppressed,
or False if the default actions should be performed. Defaults to True.
"""
self.suppress = suppress

173
rich/tui/message_pump.py Normal file
View File

@@ -0,0 +1,173 @@
from functools import total_ordering
from typing import AsyncIterable, Optional, NamedTuple, Set, Type, Tuple, TYPE_CHECKING
import asyncio
from asyncio import PriorityQueue
from functools import total_ordering
import logging
from . import events
from .message import Message
from ._timer import Timer, TimerCallback
from .types import MessageHandler
log = logging.getLogger("rich")
class MessageQueueItem(NamedTuple):
priority: int
message: Message
def __lt__(self, other: "MessageQueueItem") -> bool:
return self.priority < other.priority
def __le__(self, other: "MessageQueueItem") -> bool:
return self.priority <= other.priority
def __gt__(self, other: "MessageQueueItem") -> bool:
return self.priority > other.priority
def __ge__(self, other: "MessageQueueItem") -> bool:
return self.priority >= other.priority
def __eq__(self, other: "MessageQueueItem") -> bool:
return self.priority == other.priority
def __ne__(self, other: "MessageQueueItem") -> bool:
return self.priority != other.priority
class MessagePumpClosed(Exception):
pass
class MessagePump:
def __init__(
self, queue_size: int = 10, parent: Optional["MessagePump"] = None
) -> None:
self._message_queue: "PriorityQueue[Optional[MessageQueueItem]]" = (
PriorityQueue(queue_size)
)
self._parent = parent
self._closing: bool = False
self._closed: bool = False
self._disabled_messages: Set[Message] = set()
def check_message_enabled(self, message: Message) -> bool:
return type(message) not in self._disabled_messages
def disable_messages(self, *messages: Type[Message]) -> None:
"""Disable message types from being proccessed."""
self._disabled_messages.intersection_update(messages)
def enable_messages(self, *messages: Type[Message]) -> None:
"""Enable processing of messages types."""
self._disabled_messages.difference_update(messages)
async def get_message(self) -> MessageQueueItem:
"""Get the next event on the queue, or None if queue is closed.
Returns:
Optional[Event]: Event object or None.
"""
if self._closed:
raise MessagePumpClosed("The message pump is closed")
queue_item = await self._message_queue.get()
if queue_item is None:
self._closed = True
raise MessagePumpClosed("The message pump is now closed")
return queue_item
def set_timer(
self,
delay: float,
*,
name: Optional[str] = None,
callback: TimerCallback = None,
) -> Timer:
timer = Timer(self, delay, self, name=name, callback=callback, repeat=0)
asyncio.get_event_loop().create_task(timer.run())
return timer
def set_interval(
self,
interval: float,
*,
name: Optional[str] = None,
callback: TimerCallback = None,
repeat: int = 0,
):
timer = Timer(
self, interval, self, name=name, callback=callback, repeat=repeat or None
)
asyncio.get_event_loop().create_task(timer.run())
return timer
async def close_messages(self) -> None:
self._closing = True
await self._message_queue.put(None)
async def process_messages(self) -> None:
"""Process messages until the queue is closed."""
while not self._closed:
try:
priority, message = await self.get_message()
except MessagePumpClosed:
break
except Exception:
log.exception("error getting message")
break
log.debug("%r -> %r", message, self)
await self.dispatch_message(message, priority)
if self._message_queue.empty():
await self.dispatch_message(events.Idle(self))
async def dispatch_message(
self, message: Message, priority: int = 0
) -> Optional[bool]:
if isinstance(message, events.Event):
await self.on_event(message, priority)
else:
return await self.on_message(message)
return False
async def on_event(self, event: events.Event, priority: int) -> None:
method_name = f"on_{event.name}"
dispatch_function: MessageHandler = getattr(self, method_name, None)
if dispatch_function is not None:
await dispatch_function(event)
if event.bubble and self._parent:
await self._parent.post_message(event, priority)
async def on_message(self, message: Message) -> None:
pass
async def post_message(
self,
message: Message,
priority: Optional[int] = None,
) -> bool:
if self._closing or self._closed:
return False
if not self.check_message_enabled(message):
return True
event_priority = priority if priority is not None else message.default_priority
item = MessageQueueItem(event_priority, message)
await self._message_queue.put(item)
return True
async def post_message_from_child(
self, message: Message, priority: Optional[int] = None
) -> None:
await self.post_message(message, priority=priority)
async def emit(self, message: Message, priority: Optional[int] = None) -> bool:
if self._parent:
await self._parent.post_message_from_child(message, priority=priority)
return True
else:
return False
async def on_timer(self, event: events.Timer) -> None:
if event.callback is not None:
await event.callback()

85
rich/tui/scrollbar.py Normal file
View File

@@ -0,0 +1,85 @@
from typing import List, Optional
from rich.segment import Segment
from rich.style import Style
def render_bar(
height: int = 25,
size: float = 100,
window_size: float = 25,
position: float = 0,
bar_style: Optional[Style] = None,
back_style: Optional[Style] = None,
ascii_only: bool = False,
vertical: bool = True,
) -> List[Segment]:
if vertical:
if ascii_only:
solid = "|"
half_start = "|"
half_end = "|"
else:
solid = ""
half_start = ""
half_end = ""
else:
if ascii_only:
solid = "-"
half_start = "-"
half_end = "-"
else:
solid = ""
half_start = ""
half_end = ""
_bar_style = bar_style or Style.parse("bright_magenta")
_back_style = back_style or Style.parse("#555555")
_Segment = Segment
start_bar_segment = _Segment(half_start, _bar_style)
end_bar_segment = _Segment(half_end, _bar_style)
bar_segment = _Segment(solid, _bar_style)
start_back_segment = _Segment(half_end, _back_style)
end_back_segment = _Segment(half_end, _back_style)
back_segment = _Segment(solid, _back_style)
segments = [back_segment] * height
step_size = size / height
start = position / step_size
end = (position + window_size) / step_size
start_index = int(start)
end_index = int(end)
bar_height = (end_index - start_index) + 1
segments[start_index:end_index] = [bar_segment] * bar_height
sub_position = start % 1.0
if sub_position >= 0.5:
segments[start_index] = start_bar_segment
elif start_index:
segments[start_index - 1] = end_back_segment
sub_position = end % 1.0
if sub_position < 0.5:
segments[end_index] = end_bar_segment
elif end_index + 1 < len(segments):
segments[end_index + 1] = start_back_segment
return segments
if __name__ == "__main__":
from rich.console import Console
from rich.segment import Segments
console = Console()
bar = render_bar(height=20, position=10, vertical=False, ascii_only=False)
console.print(Segments(bar, new_lines=False))

29
rich/tui/types.py Normal file
View File

@@ -0,0 +1,29 @@
from typing import Awaitable, Callable, Optional, Protocol, TYPE_CHECKING
if TYPE_CHECKING:
from .events import Event
from .message import Message
Callback = Callable[[], None]
# IntervalID = int
class MessageTarget(Protocol):
async def post_message(
self,
message: "Message",
priority: Optional[int] = None,
) -> bool:
...
class EventTarget(Protocol):
async def post_message(
self,
message: "Message",
priority: Optional[int] = None,
) -> bool:
...
MessageHandler = Callable[["Message"], Awaitable]

131
rich/tui/view.py Normal file
View File

@@ -0,0 +1,131 @@
from abc import ABC, abstractmethod
import logging
from typing import Optional, Tuple, TYPE_CHECKING
from rich.console import Console, ConsoleOptions, RenderResult, RenderableType
from rich.layout import Layout
from rich.region import Region
from rich.repr import rich_repr, RichReprResult
from . import events
from ._context import active_app
from .message_pump import MessagePump
from .widget import Widget
from .widgets.header import Header
if TYPE_CHECKING:
from .app import App
log = logging.getLogger("rich")
class NoWidget(Exception):
pass
@rich_repr
class View(ABC, MessagePump):
@property
def app(self) -> "App":
return active_app.get()
@property
def console(self) -> Console:
return active_app.get().console
def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
return
yield
def __rich_repr__(self) -> RichReprResult:
return
yield
async def on_resize(self, event: events.Resize) -> None:
pass
@abstractmethod
async def mount(self, widget: Widget, *, slot: str = "main") -> None:
...
class LayoutView(View):
layout: Layout
def __init__(
self,
layout: Layout = None,
name: str = "default",
title: str = "Layout Application",
) -> None:
self.name = name
self.title = title
if layout is None:
layout = Layout()
layout.split_column(
Layout(name="header", size=3, ratio=0),
Layout(name="main", ratio=1),
Layout(name="footer", size=1, ratio=0),
)
layout["main"].split_row(
Layout(name="left", size=30, visible=True),
Layout(name="body", ratio=1),
Layout(name="right", size=30, visible=False),
)
self.layout = layout
self.mouse_over: Optional[MessagePump] = None
super().__init__()
def __rich_repr__(self) -> RichReprResult:
yield "name", self.name
def __rich__(self) -> RenderableType:
return self.layout
def get_widget_at(self, x: int, y: int) -> Tuple[MessagePump, Region]:
for layout, (region, render) in self.layout.map.items():
if region.contains(x, y):
if isinstance(layout.renderable, MessagePump):
return layout.renderable, region
else:
break
raise NoWidget(f"No widget at ${x}, ${y}")
async def on_create(self, event: events.Created) -> None:
await self.mount(Header(self.title))
async def mount(self, widget: Widget, *, slot: str = "main") -> None:
self.layout[slot].update(widget)
await self.app.add(widget)
await widget.post_message(events.Mount(sender=self))
async def on_startup(self, event: events.Startup) -> None:
await self.mount(Header(self.title), slot="header")
async def on_mouse_move(self, event: events.MouseMove) -> None:
try:
widget, region = self.get_widget_at(event.x, event.y)
except NoWidget:
if self.mouse_over is not None:
try:
await self.mouse_over.post_message(events.MouseLeave(self))
finally:
self.mouse_over = None
else:
if self.mouse_over != widget:
try:
if self.mouse_over is not None:
await self.mouse_over.post_message(events.MouseLeave(self))
if widget is not None:
await widget.post_message(
events.MouseEnter(
self, event.x - region.x, event.y - region.x
)
)
finally:
self.mouse_over = widget
await widget.post_message(
events.MouseMove(self, event.x - region.x, event.y - region.y)
)

83
rich/tui/widget.py Normal file
View File

@@ -0,0 +1,83 @@
from logging import getLogger
from typing import ClassVar, NamedTuple, Optional, TYPE_CHECKING
from rich.align import Align
from rich import box
from rich.console import Console, ConsoleOptions, RenderResult, RenderableType
from rich.pretty import Pretty
from rich.panel import Panel
from rich.repr import rich_repr, RichReprResult
from . import events
from ._context import active_app
from .message_pump import MessagePump
if TYPE_CHECKING:
from .app import App
log = getLogger("rich")
class WidgetDimensions(NamedTuple):
width: int
height: int
@rich_repr
class Widget(MessagePump):
_count: ClassVar[int] = 0
can_focus: bool = False
mouse_events: bool = False
idle_events: bool = False
def __init__(self, name: Optional[str] = None) -> None:
self.name = name or f"Widget#{self._count}"
Widget._count += 1
self.size = WidgetDimensions(0, 0)
self.size_changed = False
self.mouse_over = False
super().__init__()
if not self.mouse_events:
self.disable_messages(events.MouseMove)
if not self.idle_events:
self.disable_messages(events.Idle)
@property
def app(self) -> "App":
return active_app.get()
@property
def console(self) -> Console:
return active_app.get().console
async def refresh(self) -> None:
self.app.refresh()
def __rich_repr__(self) -> RichReprResult:
yield "name", self.name
def render(
self, console: Console, options: ConsoleOptions, new_size: WidgetDimensions
) -> RenderableType:
return Panel(
Align.center(Pretty(self), vertical="middle"),
title=self.__class__.__name__,
border_style="green" if self.mouse_over else "blue",
box=box.HEAVY if self.mouse_over else box.ROUNDED,
)
def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
new_size = WidgetDimensions(options.max_width, options.height or console.height)
renderable = self.render(console, options, new_size)
self.size = new_size
yield renderable
async def on_event(self, event: events.Event, priority: int) -> None:
if isinstance(event, (events.MouseEnter, events.MouseLeave)):
self.mouse_over = isinstance(event, events.MouseEnter)
log.debug("%r", self.mouse_over)
await self.refresh()
await super().on_event(event, priority)

View File

View File

@@ -0,0 +1,53 @@
from datetime import datetime
from rich.console import RenderableType
from rich.panel import Panel
from rich.repr import rich_repr, RichReprResult
from rich.style import StyleType
from rich.table import Table
from rich.text import TextType
from .. import events
from ..widget import Widget
class Header(Widget):
def __init__(
self,
title: TextType,
*,
panel: bool = True,
style: StyleType = "white on blue",
clock: bool = True
) -> None:
self.title = title
self.panel = panel
self.style = style
self.clock = clock
super().__init__()
def __rich_repr__(self) -> RichReprResult:
yield self.title
def get_clock(self) -> str:
return datetime.now().time().strftime("%X")
def __rich__(self) -> RenderableType:
header_table = Table.grid(padding=(0, 1), expand=True)
header_table.style = self.style
header_table.add_column(justify="left", ratio=0)
header_table.add_column("title", justify="center", ratio=1)
if self.clock:
header_table.add_column("clock", justify="right")
header_table.add_row("🐞", self.title, self.get_clock())
else:
header_table.add_row("🐞", self.title)
if self.panel:
header = Panel(header_table, style=self.style)
else:
header = header_table
return header
async def on_mount(self, event: events.Mount) -> None:
pass
# self.set_interval(1.0, callback=self.refresh)

View File

@@ -0,0 +1,9 @@
from ..widget import Widget
from rich.repr import RichReprResult
class Placeholder(Widget):
def __rich_repr__(self) -> RichReprResult:
yield "name", self.name
yield "mouse_over", self.mouse_over

View File

@@ -0,0 +1,14 @@
from typing import Optional
from rich.console import RenderableType
from ..widget import Widget
class Window(Widget):
renderable: Optional[RenderableType]
def __init__(self, renderable: RenderableType):
self.renderable = renderable
def update(self, renderable: RenderableType) -> None:
self.renderable = renderable