Ensure we read buffer correctly in XTermParser

This commit is contained in:
Darren Burns
2022-06-14 11:00:20 +01:00
parent 13925b9bde
commit 1b8781f145
2 changed files with 31 additions and 7 deletions

View File

@@ -94,6 +94,7 @@ class XTermParser(Parser[events.Event]):
more_data = self.more_data
paste_buffer: list[str] = []
bracketed_paste = False
reuse_escape = False
while not self.is_eof:
if not bracketed_paste and paste_buffer:
@@ -107,7 +108,8 @@ class XTermParser(Parser[events.Event]):
on_token(events.Paste(self.sender, text=pasted_text))
paste_buffer.clear()
character = yield read1()
character = ESC if reuse_escape else (yield read1())
reuse_escape = False
if bracketed_paste:
paste_buffer.append(character)
@@ -137,22 +139,35 @@ class XTermParser(Parser[events.Event]):
# If we run into another ESC at this point, then we've failed
# to find a match, and should issue everything we've seen within
# the suspected sequence as Key events instead.
buffer = yield self.peek_buffer()
sequence_character = yield read1()
next_sequence = sequence + sequence_character
if (
buffer
and buffer[0] == ESC
sequence_character
and sequence_character == ESC
or len(sequence) > _MAX_SEQUENCE_SEARCH_THRESHOLD
):
for character in sequence:
if sequence_character == ESC:
# We've hit an escape, so we need to issue all the keys
# up to but not including it, since this escape could be
# part of an upcoming control sequence.
reuse_escape = True
reissue_sequence = sequence
else:
# We exceeded the sequence length threshold, so reissue all the
# characters in that sequence as key-presses.
reissue_sequence = next_sequence
for character in reissue_sequence:
key_events = sequence_to_key_events(character)
for event in key_events:
if event.key == "escape":
event = events.Key(event.sender, key="^")
on_token(event)
# We're done looking for escape sequences
break
sequence_character = yield read1()
sequence += sequence_character
sequence = next_sequence
self.debug_log(f"sequence={sequence!r}")

View File

@@ -15,6 +15,15 @@ from textual.events import (
from textual.messages import TerminalSupportsSynchronizedOutput
def chunks(data, size):
chunk_start = 0
chunk_end = size
while chunk_end <= len(data):
yield data[chunk_start:chunk_end]
chunk_start = chunk_end
chunk_end += size
@pytest.fixture
def parser():
return XTermParser(sender=mock.sentinel, more_data=lambda: False)