mirror of
https://github.com/Textualize/textual.git
synced 2025-10-17 02:38:12 +03:00
Merge pull request #533 from Textualize/add-terminal-mode2026-support
[terminal buffering] Add support for the "mode 2026" - aka SynchronizedOutput
This commit is contained in:
@@ -39,7 +39,7 @@ class Introduction(Widget):
|
||||
}
|
||||
"""
|
||||
|
||||
def render(self) -> RenderableType:
|
||||
def render(self, styles) -> RenderableType:
|
||||
return Text(
|
||||
"Press '-' and '+' to add or remove placeholders.", justify="center"
|
||||
)
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
from typing import Dict, Tuple
|
||||
from typing import Mapping, Tuple
|
||||
|
||||
from .keys import Keys
|
||||
|
||||
# Mapping of vt100 escape codes to Keys.
|
||||
ANSI_SEQUENCES: Dict[str, Tuple[Keys, ...]] = {
|
||||
ANSI_SEQUENCES_KEYS: Mapping[str, Tuple[Keys, ...]] = {
|
||||
# Control keys.
|
||||
"\r": (Keys.Enter,),
|
||||
"\x00": (Keys.ControlAt,), # Control-At (Also for Ctrl-Space)
|
||||
@@ -299,3 +299,9 @@ ANSI_SEQUENCES: Dict[str, Tuple[Keys, ...]] = {
|
||||
"\x1b[1;8x": (Keys.Escape, Keys.ControlShift8),
|
||||
"\x1b[1;8y": (Keys.Escape, Keys.ControlShift9),
|
||||
}
|
||||
|
||||
|
||||
TERMINAL_MODES_ANSI_SEQUENCES: Mapping[str, str] = {
|
||||
"sync_start": "\x1b[?2026h",
|
||||
"sync_stop": "\x1b[?2026l",
|
||||
}
|
||||
|
||||
@@ -1,18 +1,19 @@
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
import os
|
||||
import re
|
||||
from typing import Any, Callable, Generator, Iterable
|
||||
|
||||
from . import log
|
||||
from . import log, messages
|
||||
from . import events
|
||||
from ._types import MessageTarget
|
||||
from ._parser import Awaitable, Parser, TokenCallback
|
||||
from ._ansi_sequences import ANSI_SEQUENCES
|
||||
|
||||
from ._ansi_sequences import ANSI_SEQUENCES_KEYS
|
||||
|
||||
_re_mouse_event = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
|
||||
_re_terminal_mode_response = re.compile(
|
||||
"^" + re.escape("\x1b[") + r"\?(?P<mode_id>\d+);(?P<setting_parameter>\d)\$y"
|
||||
)
|
||||
|
||||
|
||||
class XTermParser(Parser[events.Event]):
|
||||
@@ -82,7 +83,7 @@ class XTermParser(Parser[events.Event]):
|
||||
|
||||
ESC = "\x1b"
|
||||
read1 = self.read1
|
||||
get_ansi_sequence = ANSI_SEQUENCES.get
|
||||
get_key_ansi_sequence = ANSI_SEQUENCES_KEYS.get
|
||||
more_data = self.more_data
|
||||
|
||||
while not self.is_eof:
|
||||
@@ -108,21 +109,33 @@ class XTermParser(Parser[events.Event]):
|
||||
while True:
|
||||
sequence += yield read1()
|
||||
self.debug_log(f"sequence={sequence!r}")
|
||||
keys = get_ansi_sequence(sequence, None)
|
||||
# Was it a pressed key event that we received?
|
||||
keys = get_key_ansi_sequence(sequence, None)
|
||||
if keys is not None:
|
||||
for key in keys:
|
||||
on_token(events.Key(self.sender, key=key))
|
||||
break
|
||||
else:
|
||||
mouse_match = _re_mouse_event.match(sequence)
|
||||
if mouse_match is not None:
|
||||
mouse_code = mouse_match.group(0)
|
||||
event = self.parse_mouse_code(mouse_code, self.sender)
|
||||
if event:
|
||||
on_token(event)
|
||||
break
|
||||
# Or a mouse event?
|
||||
mouse_match = _re_mouse_event.match(sequence)
|
||||
if mouse_match is not None:
|
||||
mouse_code = mouse_match.group(0)
|
||||
event = self.parse_mouse_code(mouse_code, self.sender)
|
||||
if event:
|
||||
on_token(event)
|
||||
break
|
||||
# Or a mode report? (i.e. the terminal telling us if it supports a mode we requested)
|
||||
mode_report_match = _re_terminal_mode_response.match(sequence)
|
||||
if mode_report_match is not None:
|
||||
if (
|
||||
mode_report_match["mode_id"] == "2026"
|
||||
and int(mode_report_match["setting_parameter"]) > 0
|
||||
):
|
||||
on_token(
|
||||
messages.TerminalSupportsSynchronizedOutput(self.sender)
|
||||
)
|
||||
break
|
||||
else:
|
||||
keys = get_ansi_sequence(character, None)
|
||||
keys = get_key_ansi_sequence(character, None)
|
||||
if keys is not None:
|
||||
for key in keys:
|
||||
on_token(events.Key(self.sender, key=key))
|
||||
|
||||
@@ -22,6 +22,8 @@ from typing import (
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
from ._ansi_sequences import TERMINAL_MODES_ANSI_SEQUENCES
|
||||
|
||||
if sys.version_info >= (3, 8):
|
||||
from typing import Literal
|
||||
else:
|
||||
@@ -44,7 +46,6 @@ from ._animator import Animator
|
||||
from ._callback import invoke
|
||||
from ._context import active_app
|
||||
from ._event_broker import extract_handler_actions, NoHandler
|
||||
from ._timer import Timer
|
||||
from .binding import Bindings, NoBinding
|
||||
from .css.stylesheet import Stylesheet
|
||||
from .design import ColorSystem
|
||||
@@ -150,9 +151,7 @@ class App(Generic[ReturnType], DOMNode):
|
||||
self.driver_class = driver_class or self.get_driver_class()
|
||||
self._title = title
|
||||
self._screen_stack: list[Screen] = []
|
||||
self._sync_available = (
|
||||
os.environ.get("TERM_PROGRAM", "") != "Apple_Terminal" and not WINDOWS
|
||||
)
|
||||
self._sync_available = False
|
||||
|
||||
self.focused: Widget | None = None
|
||||
self.mouse_over: Widget | None = None
|
||||
@@ -893,14 +892,12 @@ class App(Generic[ReturnType], DOMNode):
|
||||
"""
|
||||
if self._running and not self._closed and not self.is_headless:
|
||||
console = self.console
|
||||
if self._sync_available:
|
||||
console.file.write("\x1bP=1s\x1b\\")
|
||||
self._begin_update()
|
||||
try:
|
||||
console.print(renderable)
|
||||
except Exception as error:
|
||||
self.on_exception(error)
|
||||
if self._sync_available:
|
||||
console.file.write("\x1bP=2s\x1b\\")
|
||||
self._end_update()
|
||||
console.file.flush()
|
||||
|
||||
def measure(self, renderable: RenderableType, max_width=100_000) -> int:
|
||||
@@ -974,6 +971,7 @@ class App(Generic[ReturnType], DOMNode):
|
||||
else:
|
||||
# Forward the event to the view
|
||||
await self.screen.forward_event(event)
|
||||
|
||||
else:
|
||||
await super().on_event(event)
|
||||
|
||||
@@ -1095,6 +1093,20 @@ class App(Generic[ReturnType], DOMNode):
|
||||
async def handle_styles_updated(self, message: messages.StylesUpdated) -> None:
|
||||
self.stylesheet.update(self, animate=True)
|
||||
|
||||
def handle_terminal_supports_synchronized_output(
|
||||
self, message: messages.TerminalSupportsSynchronizedOutput
|
||||
) -> None:
|
||||
log("SynchronizedOutput mode is supported by this terminal")
|
||||
self._sync_available = True
|
||||
|
||||
def _begin_update(self) -> None:
|
||||
if self._sync_available:
|
||||
self.console.file.write(TERMINAL_MODES_ANSI_SEQUENCES["sync_start"])
|
||||
|
||||
def _end_update(self) -> None:
|
||||
if self._sync_available:
|
||||
self.console.file.write(TERMINAL_MODES_ANSI_SEQUENCES["sync_stop"])
|
||||
|
||||
|
||||
_uvloop_init_done: bool = False
|
||||
|
||||
|
||||
@@ -2,12 +2,10 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from time import time
|
||||
import platform
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from . import events
|
||||
from . import log
|
||||
from ._types import MessageTarget
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
@@ -130,6 +130,11 @@ class LinuxDriver(Driver):
|
||||
self._key_thread = Thread(target=self.run_input_thread, args=(loop,))
|
||||
send_size_event()
|
||||
self._key_thread.start()
|
||||
self._request_terminal_sync_mode_support()
|
||||
|
||||
def _request_terminal_sync_mode_support(self):
|
||||
self.console.file.write("\033[?2026$p")
|
||||
self.console.file.flush()
|
||||
|
||||
@classmethod
|
||||
def _patch_lflag(cls, attrs: int) -> int:
|
||||
@@ -221,7 +226,6 @@ class LinuxDriver(Driver):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from time import sleep
|
||||
from rich.console import Console
|
||||
from .. import events
|
||||
|
||||
|
||||
@@ -58,3 +58,10 @@ class Prompt(Message, system=True):
|
||||
|
||||
def can_replace(self, message: Message) -> bool:
|
||||
return isinstance(message, StylesUpdated)
|
||||
|
||||
|
||||
class TerminalSupportsSynchronizedOutput(Message):
|
||||
"""
|
||||
Used to make the App aware that the terminal emulator supports synchronised output.
|
||||
@link https://gist.github.com/christianparpart/d8a62cc1ab659194337d73e399004036
|
||||
"""
|
||||
|
||||
@@ -13,16 +13,17 @@ from rich.console import Console
|
||||
|
||||
from textual import events, errors
|
||||
from textual._clock import _Clock
|
||||
from textual.app import App, ComposeResult, WINDOWS
|
||||
from textual.app import WINDOWS
|
||||
from textual._context import active_app
|
||||
from textual._ansi_sequences import TERMINAL_MODES_ANSI_SEQUENCES
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.driver import Driver
|
||||
from textual.geometry import Size, Region
|
||||
|
||||
# N.B. These classes would better be named TestApp/TestConsole/TestDriver/etc,
|
||||
# but it makes pytest emit warning as it will try to collect them as classes containing test cases :-/
|
||||
|
||||
# This value is also hard-coded in Textual's `App` class:
|
||||
CLEAR_SCREEN_SEQUENCE = "\x1bP=1s\x1b\\"
|
||||
_SYNC_START_SEQUENCE = TERMINAL_MODES_ANSI_SEQUENCES["sync_start"]
|
||||
|
||||
|
||||
class AppTest(App):
|
||||
@@ -45,8 +46,9 @@ class AppTest(App):
|
||||
# Let's disable all features by default
|
||||
self.features = frozenset()
|
||||
|
||||
# We need this so the `CLEAR_SCREEN_SEQUENCE` is always sent for a screen refresh,
|
||||
# We need this so the "start buffeting"` is always sent for a screen refresh,
|
||||
# whatever the environment:
|
||||
# (we use it to slice the output into distinct full screens displays)
|
||||
self._sync_available = True
|
||||
|
||||
self._size = size
|
||||
@@ -190,7 +192,7 @@ class AppTest(App):
|
||||
total_capture = self.total_capture
|
||||
if not total_capture:
|
||||
return None
|
||||
screen_captures = total_capture.split(CLEAR_SCREEN_SEQUENCE)
|
||||
screen_captures = total_capture.split(_SYNC_START_SEQUENCE)
|
||||
for single_screen_capture in reversed(screen_captures):
|
||||
if len(single_screen_capture) > 30:
|
||||
# let's return the last occurrence of a screen that seem to be properly "fully-paint"
|
||||
@@ -341,9 +343,8 @@ class ClockMock(_Clock):
|
||||
# ...and let's mark it for removal:
|
||||
activated_events_times_to_clear.append(monotonic_time)
|
||||
|
||||
if activated_events_times_to_clear:
|
||||
for event_time_to_clear in activated_events_times_to_clear:
|
||||
del self._pending_sleep_events[event_time_to_clear]
|
||||
for event_time_to_clear in activated_events_times_to_clear:
|
||||
del self._pending_sleep_events[event_time_to_clear]
|
||||
|
||||
return activated_timers_count
|
||||
|
||||
|
||||
Reference in New Issue
Block a user