From 1b8781f1456a654473b260916d4f5a7da6fed2a0 Mon Sep 17 00:00:00 2001 From: Darren Burns Date: Tue, 14 Jun 2022 11:00:20 +0100 Subject: [PATCH] Ensure we read buffer correctly in XTermParser --- src/textual/_xterm_parser.py | 29 ++++++++++++++++++++++------- tests/test_xterm_parser.py | 9 +++++++++ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/textual/_xterm_parser.py b/src/textual/_xterm_parser.py index 38a994c14..767e9ebcf 100644 --- a/src/textual/_xterm_parser.py +++ b/src/textual/_xterm_parser.py @@ -94,6 +94,7 @@ class XTermParser(Parser[events.Event]): more_data = self.more_data paste_buffer: list[str] = [] bracketed_paste = False + reuse_escape = False while not self.is_eof: if not bracketed_paste and paste_buffer: @@ -107,7 +108,8 @@ class XTermParser(Parser[events.Event]): on_token(events.Paste(self.sender, text=pasted_text)) paste_buffer.clear() - character = yield read1() + character = ESC if reuse_escape else (yield read1()) + reuse_escape = False if bracketed_paste: paste_buffer.append(character) @@ -137,22 +139,35 @@ class XTermParser(Parser[events.Event]): # If we run into another ESC at this point, then we've failed # to find a match, and should issue everything we've seen within # the suspected sequence as Key events instead. - buffer = yield self.peek_buffer() + sequence_character = yield read1() + next_sequence = sequence + sequence_character if ( - buffer - and buffer[0] == ESC + sequence_character + and sequence_character == ESC or len(sequence) > _MAX_SEQUENCE_SEARCH_THRESHOLD ): - for character in sequence: + if sequence_character == ESC: + # We've hit an escape, so we need to issue all the keys + # up to but not including it, since this escape could be + # part of an upcoming control sequence. + reuse_escape = True + reissue_sequence = sequence + else: + # We exceeded the sequence length threshold, so reissue all the + # characters in that sequence as key-presses. + reissue_sequence = next_sequence + + for character in reissue_sequence: key_events = sequence_to_key_events(character) for event in key_events: if event.key == "escape": event = events.Key(event.sender, key="^") on_token(event) + + # We're done looking for escape sequences break - sequence_character = yield read1() - sequence += sequence_character + sequence = next_sequence self.debug_log(f"sequence={sequence!r}") diff --git a/tests/test_xterm_parser.py b/tests/test_xterm_parser.py index dc201d12b..b32f8147b 100644 --- a/tests/test_xterm_parser.py +++ b/tests/test_xterm_parser.py @@ -15,6 +15,15 @@ from textual.events import ( from textual.messages import TerminalSupportsSynchronizedOutput +def chunks(data, size): + chunk_start = 0 + chunk_end = size + while chunk_end <= len(data): + yield data[chunk_start:chunk_end] + chunk_start = chunk_end + chunk_end += size + + @pytest.fixture def parser(): return XTermParser(sender=mock.sentinel, more_data=lambda: False)