From 7f27e70440c177b2a047b7f74a78ed5cd5b4b596 Mon Sep 17 00:00:00 2001 From: Olivier Philippon Date: Tue, 24 May 2022 13:05:47 +0100 Subject: [PATCH] [terminal buffering] Address PR feedback --- src/textual/_ansi_sequences.py | 16 +------- src/textual/_terminal_features.py | 66 ++++++++----------------------- src/textual/_terminal_modes.py | 51 ++++++++++++++++++++++-- src/textual/_xterm_parser.py | 18 ++++++--- src/textual/app.py | 49 +++++++++++++---------- src/textual/events.py | 34 ---------------- tests/utilities/test_app.py | 4 +- 7 files changed, 107 insertions(+), 131 deletions(-) diff --git a/src/textual/_ansi_sequences.py b/src/textual/_ansi_sequences.py index 86ed71299..b467219b3 100644 --- a/src/textual/_ansi_sequences.py +++ b/src/textual/_ansi_sequences.py @@ -1,10 +1,6 @@ from typing import Dict, Tuple -from ._terminal_modes import ( - get_mode_report_sequence, - Mode, - ModeReportParameter, -) +from ._terminal_modes import Mode from .keys import Keys # Mapping of vt100 escape codes to Keys. @@ -306,14 +302,6 @@ ANSI_SEQUENCES_KEYS: Dict[str, Tuple[Keys, ...]] = { } -# Mapping of escape codes to report whether they support a "mode" we requested. -ANSI_SEQUENCES_MODE_REPORTS: Dict[str, Tuple[Mode, ModeReportParameter]] = { - get_mode_report_sequence(mode, parameter): (mode, parameter) - for mode, parameter in [ - (mode, parameter) for parameter in ModeReportParameter for mode in Mode - ] -} - -TERMINAL_MODES_ANSI_SEQUENCES: Dict[Mode, dict] = { +TERMINAL_MODES_ANSI_SEQUENCES: Dict[Mode, Dict[str, str]] = { Mode.SynchronizedOutput: {"start_sync": "\x1b[?2026h", "end_sync": "\x1b[?2026l"}, } diff --git a/src/textual/_terminal_features.py b/src/textual/_terminal_features.py index 14e4f1be8..4d5e8ddd2 100644 --- a/src/textual/_terminal_features.py +++ b/src/textual/_terminal_features.py @@ -10,65 +10,33 @@ class TerminalSupportedFeatures(NamedTuple): Handles information about the features the current terminal emulator seems to support. """ - mode2026_synchronized_update: bool = False + synchronised_output: bool = False """@link https://gist.github.com/christianparpart/d8a62cc1ab659194337d73e399004036""" - @classmethod - def from_autodetect(cls) -> TerminalSupportedFeatures: - """ - Tries to autodetect various features we can work with the terminal emulator we're running in, - and returns an instance of `TerminalSupportedFeatures` on which matching property will be set to `True` - for features that seem to be usable. - - Returns: - TerminalSupportedFeatures: a new TerminalSupportedFeatures - """ - - # Here we might detect some features later on, by checking the OS type, the env vars, etc. - # (the same way we were doing it to detect iTerm2 "synchronized update" mode) - - # Detecting "mode2026" is complicated, as we have to use an async request/response - # machinery with the terminal emulator - for now we should just assume it's not supported. - # See the use of the Mode and ModeReportParameter classes in the Textual code to check this machinery. - mode2026_synchronized_update = False - - return cls( - mode2026_synchronized_update=mode2026_synchronized_update, - ) - - @property - def supports_synchronized_update(self) -> bool: - """ - Tells the caller if the current terminal emulator seems to support "synchronised updates" - (i.e. "buffered" updates). - At the moment we only support the generic "mode 2026". - - Returns: - bool: whether the terminal seems to support buffered mode or not - """ - return self.mode2026_synchronized_update - - def synchronized_update_sequences(self) -> tuple[str, str]: + def synchronized_output_start_sequence(self) -> str: """ Returns the ANSI sequence that we should send to the terminal to tell it that - it should buffer the content we're about to send, as well as the ANIS sequence to end the buffering. - If the terminal doesn't seem to support synchronised updates both strings will be empty. + it should start buffering the content we're about to send. + If the terminal doesn't seem to support synchronised updates the string will be empty. Returns: - tuple[str, str]: the start and end ANSI sequences, respectively. They will both be empty strings + str: the "synchronised output start" ANSI sequence. It will be ab empty string if the terminal emulator doesn't seem to support the "synchronised updates" mode. """ - return ( - self._synchronized_update_start_sequence(), - self._synchronized_update_end_sequence(), - ) - - def _synchronized_update_start_sequence(self) -> str: - if self.mode2026_synchronized_update: + if self.synchronised_output: return TERMINAL_MODES_ANSI_SEQUENCES[Mode.SynchronizedOutput]["start_sync"] return "" - def _synchronized_update_end_sequence(self) -> str: - if self.mode2026_synchronized_update: + def synchronized_output_end_sequence(self) -> str: + """ + Returns the ANSI sequence that we should send to the terminal to tell it that + it should stop buffering the content we're about to send. + If the terminal doesn't seem to support synchronised updates the string will be empty. + + Returns: + str: the "synchronised output stop" ANSI sequence. It will be ab empty string + if the terminal emulator doesn't seem to support the "synchronised updates" mode. + """ + if self.synchronised_output: return TERMINAL_MODES_ANSI_SEQUENCES[Mode.SynchronizedOutput]["end_sync"] return "" diff --git a/src/textual/_terminal_modes.py b/src/textual/_terminal_modes.py index fa194a29d..601e7543f 100644 --- a/src/textual/_terminal_modes.py +++ b/src/textual/_terminal_modes.py @@ -2,6 +2,12 @@ from __future__ import annotations from enum import Enum, IntEnum, unique +import rich +import rich.repr + +from textual._types import MessageTarget +from textual.message import Message + @unique class Mode(Enum): @@ -23,6 +29,47 @@ class ModeReportParameter(IntEnum): PermanentlyReset = 4 +@rich.repr.auto +class ModeReportResponse(Message): + """Sent when the terminals responses to a "mode" (i.e. a supported feature) request""" + + __slots__ = ["mode", "report_parameter"] + + def __init__( + self, sender: MessageTarget, mode: Mode, report_parameter: ModeReportParameter + ) -> None: + """ + Args: + sender (MessageTarget): The sender of the event + mode (Mode): The mode the terminal emulator is giving us results about + report_parameter (ModeReportParameter): The status of this mode for the terminal emulator + """ + super().__init__(sender) + self.mode = mode + self.report_parameter = report_parameter + + @classmethod + def from_terminal_mode_response( + cls, sender: MessageTarget, mode_id: str, setting_parameter: str + ) -> ModeReportResponse: + mode = Mode(mode_id) + report_parameter = ModeReportParameter(int(setting_parameter)) + return ModeReportResponse(sender, mode, report_parameter) + + def __rich_repr__(self) -> rich.repr.Result: + yield "mode", self.mode + yield "report_parameter", self.report_parameter + + @property + def mode_is_supported(self) -> bool: + """Return True if the mode seems to be supported by the terminal emulator. + + Returns: + bool: True if it's supported. False otherwise. + """ + return self.report_parameter in MODE_REPORTS_PARAMETERS_INDICATING_SUPPORT + + MODE_REPORTS_PARAMETERS_INDICATING_SUPPORT = frozenset( { ModeReportParameter.Set, @@ -35,7 +82,3 @@ MODE_REPORTS_PARAMETERS_INDICATING_SUPPORT = frozenset( def get_mode_request_sequence(mode: Mode) -> str: return "\033[?" + mode.value + "$p\n" - - -def get_mode_report_sequence(mode: Mode, parameter: ModeReportParameter) -> str: - return f"\x1b[?{mode.value};{parameter.value}$y" diff --git a/src/textual/_xterm_parser.py b/src/textual/_xterm_parser.py index 9246d6711..b7742da00 100644 --- a/src/textual/_xterm_parser.py +++ b/src/textual/_xterm_parser.py @@ -7,11 +7,15 @@ from typing import Any, Callable, Generator, Iterable from . import log from . import events +from ._terminal_modes import ModeReportResponse from ._types import MessageTarget from ._parser import Awaitable, Parser, TokenCallback -from ._ansi_sequences import ANSI_SEQUENCES_KEYS, ANSI_SEQUENCES_MODE_REPORTS +from ._ansi_sequences import ANSI_SEQUENCES_KEYS _re_mouse_event = re.compile("^" + re.escape("\x1b[") + r"(\d+);(?P\d)\$y" +) class XTermParser(Parser[events.Event]): @@ -82,7 +86,6 @@ class XTermParser(Parser[events.Event]): ESC = "\x1b" read1 = self.read1 get_key_ansi_sequence = ANSI_SEQUENCES_KEYS.get - get_mode_report_sequence = ANSI_SEQUENCES_MODE_REPORTS.get more_data = self.more_data while not self.is_eof: @@ -123,11 +126,14 @@ class XTermParser(Parser[events.Event]): on_token(event) break # Or a mode report? (i.e. the terminal telling us if it supports a mode we requested) - mode_report_match = get_mode_report_sequence(sequence, None) + mode_report_match = _re_terminal_mode_response.match(sequence) if mode_report_match is not None: - mode_report, parameter = mode_report_match - event = events.ModeReport(self.sender, mode_report, parameter) - on_token(event) + message = ModeReportResponse.from_terminal_mode_response( + self.sender, + mode_report_match["mode_id"], + mode_report_match["setting_parameter"], + ) + on_token(message) break else: keys = get_key_ansi_sequence(character, None) diff --git a/src/textual/app.py b/src/textual/app.py index 53ede369c..64512c621 100644 --- a/src/textual/app.py +++ b/src/textual/app.py @@ -23,8 +23,7 @@ from typing import ( ) from ._terminal_features import TerminalSupportedFeatures -from ._terminal_modes import Mode -from .events import ModeReport +from ._terminal_modes import Mode, ModeReportResponse if sys.version_info >= (3, 8): from typing import Literal @@ -146,7 +145,7 @@ class App(Generic[ReturnType], DOMNode): self.driver_class = driver_class or self.get_driver_class() self._title = title self._screen_stack: list[Screen] = [] - self._terminal_features = TerminalSupportedFeatures.from_autodetect() + self._terminal_features = TerminalSupportedFeatures() self.focused: Widget | None = None self.mouse_over: Widget | None = None @@ -860,18 +859,12 @@ class App(Generic[ReturnType], DOMNode): """ if self._running and not self._closed: console = self.console - ( - sync_update_start, - sync_update_end, - ) = self._terminal_features.synchronized_update_sequences() - if sync_update_start: - console.file.write(sync_update_start) + self._begin_update() try: console.print(renderable) except Exception as error: self.on_exception(error) - if sync_update_end: - console.file.write(sync_update_end) + self._end_update() console.file.flush() def measure(self, renderable: RenderableType, max_width=100_000) -> int: @@ -946,16 +939,6 @@ class App(Generic[ReturnType], DOMNode): # Forward the event to the view await self.screen.forward_event(event) - elif isinstance(event, ModeReport): - if event.mode is Mode.SynchronizedOutput: - is_supported = event.mode_is_supported - log( - f"SynchronizedOutput (aka 'mode2026') {'is' if is_supported else ' is not'} supported" - ) - self._terminal_features = self._terminal_features._replace( - mode2026_synchronized_update=is_supported - ) - else: await super().on_event(event) @@ -1077,6 +1060,30 @@ class App(Generic[ReturnType], DOMNode): async def handle_styles_updated(self, message: messages.StylesUpdated) -> None: self.stylesheet.update(self, animate=True) + def handle_mode_report_response(self, message: ModeReportResponse) -> None: + if message.mode is Mode.SynchronizedOutput: + is_supported = message.mode_is_supported + log( + f"SynchronizedOutput mode {'is' if is_supported else 'is not'} supported by this terminal" + ) + self._terminal_features = self._terminal_features._replace( + synchronised_output=is_supported + ) + + def _begin_update(self) -> None: + synchronized_output_start_sequence = ( + self._terminal_features.synchronized_output_start_sequence() + ) + if synchronized_output_start_sequence: + self.console.file.write(synchronized_output_start_sequence) + + def _end_update(self) -> None: + synchronized_output_end_sequence = ( + self._terminal_features.synchronized_output_end_sequence() + ) + if synchronized_output_end_sequence: + self.console.file.write(synchronized_output_end_sequence) + _uvloop_init_done: bool = False diff --git a/src/textual/events.py b/src/textual/events.py index f3a792a8b..4ad4577cc 100644 --- a/src/textual/events.py +++ b/src/textual/events.py @@ -416,37 +416,3 @@ class DescendantFocus(Event, verbosity=2, bubble=True): class DescendantBlur(Event, verbosity=2, bubble=True): pass - - -@rich.repr.auto -class ModeReport(InputEvent): - """Sent when the terminals responses to a "mode "(i.e. a supported feature) request""" - - __slots__ = ["mode", "report_parameter"] - - def __init__( - self, sender: MessageTarget, mode: Mode, report_parameter: ModeReportParameter - ) -> None: - """ - - Args: - sender (MessageTarget): The sender of the event - mode (Mode): The mode the terminal emulator is giving us results about - report_parameter (ModeReportParameter): The status of this mode for the terminal emulator - """ - super().__init__(sender) - self.mode = mode - self.report_parameter = report_parameter - - def __rich_repr__(self) -> rich.repr.Result: - yield "mode", self.mode - yield "report_parameter", self.report_parameter - - @property - def mode_is_supported(self) -> bool: - """Return True if the mode seems to be supported by the terminal emulator. - - Returns: - bool: True if it's supported. False otherwise. - """ - return self.report_parameter in MODE_REPORTS_PARAMETERS_INDICATING_SUPPORT diff --git a/tests/utilities/test_app.py b/tests/utilities/test_app.py index 9b6eb88e9..d319fb693 100644 --- a/tests/utilities/test_app.py +++ b/tests/utilities/test_app.py @@ -53,9 +53,7 @@ class AppTest(App): # We need this so the `CLEAR_SCREEN_SEQUENCE` is always sent for a screen refresh, # whatever the environment: # (we use it to slice the output into distinct full screens displays) - self._terminal_features = TerminalSupportedFeatures( - mode2026_synchronized_update=True, - ) + self._terminal_features = TerminalSupportedFeatures(synchronised_output=True) self._size = size self._console = ConsoleTest(width=size.width, height=size.height)