[terminal buffering] Add support for the "mode 2026"

That task is definitely way more complicated that it seemed to be 😅
This commit is contained in:
Olivier Philippon
2022-05-20 12:51:40 +01:00
parent c212fd501f
commit d14659c1a3
10 changed files with 239 additions and 29 deletions

View File

@@ -39,7 +39,7 @@ class Introduction(Widget):
}
"""
def render(self) -> RenderableType:
def render(self, styles) -> RenderableType:
return Text(
"Press '-' and '+' to add or remove placeholders.", justify="center"
)

View File

@@ -1,9 +1,14 @@
from typing import Dict, Tuple
from ._terminal_modes import (
get__mode_report_sequence,
Mode,
ModeReportParameter,
)
from .keys import Keys
# Mapping of vt100 escape codes to Keys.
ANSI_SEQUENCES: Dict[str, Tuple[Keys, ...]] = {
ANSI_SEQUENCES_KEYS: Dict[str, Tuple[Keys, ...]] = {
# Control keys.
"\r": (Keys.Enter,),
"\x00": (Keys.ControlAt,), # Control-At (Also for Ctrl-Space)
@@ -299,3 +304,12 @@ ANSI_SEQUENCES: Dict[str, Tuple[Keys, ...]] = {
"\x1b[1;8x": (Keys.Escape, Keys.ControlShift8),
"\x1b[1;8y": (Keys.Escape, Keys.ControlShift9),
}
# Mapping of escape codes to report whether they support a "mode" we requested.
ANSI_SEQUENCES_MODE_REPORTS: Dict[str, Tuple[Mode, ModeReportParameter]] = {
get__mode_report_sequence(mode, parameter): (mode, parameter)
for mode, parameter in [
(mode, parameter) for parameter in ModeReportParameter for mode in Mode
]
}

View File

@@ -0,0 +1,84 @@
from __future__ import annotations
import os
import platform
from dataclasses import dataclass
@dataclass
class TerminalSupportedFeatures:
"""
Handles information about the features the current terminal emulator seems to support.
"""
iterm2_synchronized_update: bool = False
"""@link https://gitlab.com/gnachman/iterm2/-/wikis/synchronized-updates-spec"""
mode2026_synchronized_update: bool = False
"""@link https://gist.github.com/christianparpart/d8a62cc1ab659194337d73e399004036"""
@classmethod
def from_autodetect(cls) -> TerminalSupportedFeatures:
"""
Tries to autodetect various features we can work with the terminal emulator we're running in,
and returns an instance of `TerminalSupportedFeatures` on which matching property will be set to `True`
for features that seem to be usable.
Returns:
TerminalSupportedFeatures: a new TerminalSupportedFeatures
"""
# Using macOS, but not using the default terminal: let's assume we're on iTerm2
iterm2_synchronized_update = (
platform.system() == "Darwin"
and os.environ.get("TERM_PROGRAM", "") != "Apple_Terminal"
)
# Detecting "mode2026" is more complicated, as we have to use an async request/response
# machinery with the terminal emulator - for now we should just assume it's not supported.
# See the use of the Mode and ModeReportParameter classes in the Textual code to check this machinery.
mode2026_synchronized_update = False
return cls(
iterm2_synchronized_update=iterm2_synchronized_update,
mode2026_synchronized_update=mode2026_synchronized_update,
)
@property
def supports_synchronized_update(self) -> bool:
"""
Tells the caller if the current terminal emulator seems to support "synchronised updates"
(i.e. "buffered" updates).
At the moment we support the iTerm2 specific one, as wel las the more generic "mode 2026".
Returns:
bool: whether the terminal seems to support buffered mode or not
"""
return self.iterm2_synchronized_update or self.mode2026_synchronized_update
def synchronized_update_sequences(self) -> tuple[str, str]:
"""
Returns the ANSI sequence that we should send to the terminal to tell it that
it should buffer the content we're about to send, as well as the ANIS sequence to end the buffering.
If the terminal doesn't seem to support synchronised updates both strings will be empty.
Returns:
tuple[str, str]: the start and end ANSI sequences, respectively. They will both be empty strings
if the terminal emulator doesn't seem to support the "synchronised updates" mode.
"""
return (
self._synchronized_update_start_sequence(),
self._synchronized_update_end_sequence(),
)
def _synchronized_update_start_sequence(self) -> str:
if self.iterm2_synchronized_update:
return "\x1bP=1s\x1b\\"
if self.mode2026_synchronized_update:
return "\x1b[?2026h"
return ""
def _synchronized_update_end_sequence(self) -> str:
if self.iterm2_synchronized_update:
return "\x1bP=2s\x1b\\"
if self.mode2026_synchronized_update:
return "\x1b[?2026l"
return ""

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
from enum import Enum, IntEnum, unique
@unique
class Mode(Enum):
# "Modes" are features that terminal emulators can optionally support
# We have to "request" to the emulator if they support a given feature, and they can respond with a "report"
# @link https://vt100.net/docs/vt510-rm/DECRQM.html
# @link https://vt100.net/docs/vt510-rm/DECRPM.html
SynchronizedOutput = "2026"
@unique
class ModeReportParameter(IntEnum):
# N.B. The values of this enum are not arbitrary: they match the ones of the spec
# @link https://vt100.net/docs/vt510-rm/DECRPM.html
NotRecognized = 0
Set = 1
Reset = 2
PermanentlySet = 3
PermanentlyReset = 4
MODE_REPORTS_PARAMETERS_INDICATING_SUPPORT = frozenset(
{
ModeReportParameter.Set,
ModeReportParameter.Reset,
ModeReportParameter.PermanentlySet,
ModeReportParameter.PermanentlyReset,
}
)
def get_mode_request_sequence(mode: Mode) -> str:
return "\033[?" + mode.value + "$p\n"
def get__mode_report_sequence(mode: Mode, parameter: ModeReportParameter) -> str:
return f"\x1b[?{mode.value};{parameter.value}$y"

View File

@@ -9,8 +9,7 @@ from . import log
from . import events
from ._types import MessageTarget
from ._parser import Awaitable, Parser, TokenCallback
from ._ansi_sequences import ANSI_SEQUENCES
from ._ansi_sequences import ANSI_SEQUENCES_KEYS, ANSI_SEQUENCES_MODE_REPORTS
_re_mouse_event = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
@@ -82,7 +81,8 @@ class XTermParser(Parser[events.Event]):
ESC = "\x1b"
read1 = self.read1
get_ansi_sequence = ANSI_SEQUENCES.get
get_key_ansi_sequence = ANSI_SEQUENCES_KEYS.get
get_mode_report_sequence = ANSI_SEQUENCES_MODE_REPORTS.get
more_data = self.more_data
while not self.is_eof:
@@ -108,21 +108,29 @@ class XTermParser(Parser[events.Event]):
while True:
sequence += yield read1()
self.debug_log(f"sequence={sequence!r}")
keys = get_ansi_sequence(sequence, None)
# Was it a pressed key event that we received?
keys = get_key_ansi_sequence(sequence, None)
if keys is not None:
for key in keys:
on_token(events.Key(self.sender, key=key))
break
else:
mouse_match = _re_mouse_event.match(sequence)
if mouse_match is not None:
mouse_code = mouse_match.group(0)
event = self.parse_mouse_code(mouse_code, self.sender)
if event:
on_token(event)
break
# Or a mouse event?
mouse_match = _re_mouse_event.match(sequence)
if mouse_match is not None:
mouse_code = mouse_match.group(0)
event = self.parse_mouse_code(mouse_code, self.sender)
if event:
on_token(event)
break
# Or a mode report? (i.e. the terminal telling us if it supports a mode we requested)
mode_report_match = get_mode_report_sequence(sequence, None)
if mode_report_match is not None:
mode_report, parameter = mode_report_match
event = events.ModeReport(self.sender, mode_report, parameter)
on_token(event)
break
else:
keys = get_ansi_sequence(character, None)
keys = get_key_ansi_sequence(character, None)
if keys is not None:
for key in keys:
on_token(events.Key(self.sender, key=key))

View File

@@ -22,6 +22,10 @@ from typing import (
TYPE_CHECKING,
)
from ._terminal_features import TerminalSupportedFeatures
from ._terminal_modes import Mode
from .events import ModeReport
if sys.version_info >= (3, 8):
from typing import Literal
else:
@@ -44,7 +48,6 @@ from ._animator import Animator
from ._callback import invoke
from ._context import active_app
from ._event_broker import extract_handler_actions, NoHandler
from ._timer import Timer
from .binding import Bindings, NoBinding
from .css.stylesheet import Stylesheet
from .design import ColorSystem
@@ -143,9 +146,7 @@ class App(Generic[ReturnType], DOMNode):
self.driver_class = driver_class or self.get_driver_class()
self._title = title
self._screen_stack: list[Screen] = []
self._sync_available = (
os.environ.get("TERM_PROGRAM", "") != "Apple_Terminal" and not WINDOWS
)
self._terminal_features = TerminalSupportedFeatures.from_autodetect()
self.focused: Widget | None = None
self.mouse_over: Widget | None = None
@@ -698,6 +699,7 @@ class App(Generic[ReturnType], DOMNode):
self.log("---")
self.log(driver=self.driver_class)
self.log(loop=asyncio.get_running_loop())
self.log(terminal_features=self._terminal_features)
self.log(features=self.features)
try:
@@ -858,14 +860,18 @@ class App(Generic[ReturnType], DOMNode):
"""
if self._running and not self._closed:
console = self.console
if self._sync_available:
console.file.write("\x1bP=1s\x1b\\")
(
sync_update_start,
sync_update_end,
) = self._terminal_features.synchronized_update_sequences()
if sync_update_start:
console.file.write(sync_update_start)
try:
console.print(renderable)
except Exception as error:
self.on_exception(error)
if self._sync_available:
console.file.write("\x1bP=2s\x1b\\")
if sync_update_end:
console.file.write(sync_update_end)
console.file.flush()
def measure(self, renderable: RenderableType, max_width=100_000) -> int:
@@ -939,6 +945,15 @@ class App(Generic[ReturnType], DOMNode):
else:
# Forward the event to the view
await self.screen.forward_event(event)
elif isinstance(event, ModeReport):
if event.mode is Mode.SynchronizedOutput:
is_supported = event.mode_is_supported
log(
f"SynchronizedOutput (aka 'mode2026') {'is' if is_supported else ' is not'} supported"
)
self._terminal_features.mode2026_synchronized_update = is_supported
else:
await super().on_event(event)

View File

@@ -2,12 +2,10 @@ from __future__ import annotations
import asyncio
from time import time
import platform
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from . import events
from . import log
from ._types import MessageTarget
if TYPE_CHECKING:

View File

@@ -21,6 +21,7 @@ from ..driver import Driver
from ..geometry import Size
from .._types import MessageTarget
from .._xterm_parser import XTermParser
from .._terminal_modes import get_mode_request_sequence, Mode
from .._profile import timer
@@ -123,6 +124,11 @@ class LinuxDriver(Driver):
self._key_thread = Thread(target=self.run_input_thread, args=(loop,))
send_size_event()
self._key_thread.start()
self._request_terminal_mode_support(Mode.SynchronizedOutput)
def _request_terminal_mode_support(self, mode: Mode):
self.console.file.write(get_mode_request_sequence(mode) + "\n")
self.console.file.flush()
@classmethod
def _patch_lflag(cls, attrs: int) -> int:
@@ -214,7 +220,6 @@ class LinuxDriver(Driver):
if __name__ == "__main__":
from time import sleep
from rich.console import Console
from .. import events

View File

@@ -6,6 +6,11 @@ import rich.repr
from rich.style import Style
from . import log
from ._terminal_modes import (
Mode,
ModeReportParameter,
MODE_REPORTS_PARAMETERS_INDICATING_SUPPORT,
)
from .geometry import Offset, Size
from .message import Message
from ._types import MessageTarget
@@ -411,3 +416,37 @@ class DescendantFocus(Event, verbosity=2, bubble=True):
class DescendantBlur(Event, verbosity=2, bubble=True):
pass
@rich.repr.auto
class ModeReport(InputEvent):
"""Sent when the terminals responses to a "mode "(i.e. a supported feature) request"""
__slots__ = ["mode", "report_parameter"]
def __init__(
self, sender: MessageTarget, mode: Mode, report_parameter: ModeReportParameter
) -> None:
"""
Args:
sender (MessageTarget): The sender of the event
mode (Mode): The mode the terminal emulator is giving us results about
report_parameter (ModeReportParameter): The status of this mode for the terminal emulator
"""
super().__init__(sender)
self.mode = mode
self.report_parameter = report_parameter
def __rich_repr__(self) -> rich.repr.Result:
yield "mode", self.mode
yield "report_parameter", self.report_parameter
@property
def mode_is_supported(self) -> bool:
"""Return True if the mode seems to be supported by the terminal emulator.
Returns:
bool: True if it's supported. False otherwise.
"""
return self.report_parameter in MODE_REPORTS_PARAMETERS_INDICATING_SUPPORT

View File

@@ -13,8 +13,10 @@ from rich.console import Console
from textual import events, errors
from textual._clock import _Clock
from textual.app import App, ComposeResult, WINDOWS
from textual.app import WINDOWS
from textual._context import active_app
from textual._terminal_features import TerminalSupportedFeatures
from textual.app import App, ComposeResult
from textual.driver import Driver
from textual.geometry import Size, Region
@@ -45,9 +47,13 @@ class AppTest(App):
# Let's disable all features by default
self.features = frozenset()
# We need this so the `CLEAR_SCREEN_SEQUENCE` is always sent for a screen refresh,
# We need this so the iTerm2 `CLEAR_SCREEN_SEQUENCE` is always sent for a screen refresh,
# whatever the environment:
self._sync_available = True
# (we use it to slice the output into distinct full screens displays)
self._terminal_features = TerminalSupportedFeatures(
iterm2_synchronized_update=True,
mode2026_synchronized_update=False,
)
self._size = size
self._console = ConsoleTest(width=size.width, height=size.height)