fix for escape key processing

This commit is contained in:
Will McGugan
2022-04-26 13:54:09 +01:00
parent 1d6e3fbe10
commit a1cbd0c700
3 changed files with 31 additions and 10 deletions

View File

@@ -47,7 +47,7 @@ class _ReadUntil(Awaitable):
self.max_bytes = max_bytes
class PeekBuffer(Awaitable):
class _PeekBuffer(Awaitable):
__slots__: list[str] = []
@@ -61,7 +61,7 @@ class Parser(Generic[T]):
read = _Read
read1 = _Read1
read_until = _ReadUntil
peek_buffer = PeekBuffer
peek_buffer = _PeekBuffer
def __init__(self) -> None:
self._buffer = io.StringIO()
@@ -103,14 +103,14 @@ class Parser(Generic[T]):
while tokens:
yield popleft()
while pos < data_size or isinstance(self._awaiting, PeekBuffer):
while pos < data_size or isinstance(self._awaiting, _PeekBuffer):
_awaiting = self._awaiting
if isinstance(_awaiting, _Read1):
self._awaiting = self._gen.send(data[pos : pos + 1])
pos += 1
elif isinstance(_awaiting, PeekBuffer):
elif isinstance(_awaiting, _PeekBuffer):
self._awaiting = self._gen.send(data[pos:])
elif isinstance(_awaiting, _Read):

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
import os
import re
from typing import Any, Callable, Generator
from typing import Any, Callable, Generator, Iterable
from . import log
from . import events
@@ -34,6 +34,11 @@ class XTermParser(Parser[events.Event]):
def debug_log(self, *args: Any) -> None:
if self._debug_log_file is not None:
self._debug_log_file.write(" ".join(args) + "\n")
self._debug_log_file.flush()
def feed(self, data: str) -> Iterable[events.Event]:
self.debug_log(f"FEED {data!r}")
return super().feed(data)
def parse_mouse_code(self, code: str, sender: MessageTarget) -> events.Event | None:
sgr_match = self._re_sgr_mouse.match(code)
@@ -83,10 +88,23 @@ class XTermParser(Parser[events.Event]):
while not self.is_eof:
character = yield read1()
self.debug_log(f"character={character!r}")
# The more_data is to allow the parse to distinguish between an escape sequence
# and the escape key pressed
if character == ESC and ((yield self.peek_buffer()) or more_data()):
if character == ESC:
# Could be the escape key was pressed OR the start of an escape sequence
sequence: str = character
peek_buffer = yield self.peek_buffer()
if not peek_buffer:
# An escape arrived without any following characters
on_token(events.Key(self.sender, key=ESC))
continue
if peek_buffer and peek_buffer[0] == ESC:
# There is an escape in the buffer, so ESC ESC has arrived
yield read1()
on_token(events.Key(self.sender, key=ESC))
# If there is no further data, it is not part of a sequence,
# So we don't need to go in to the loop
if len(peek_buffer) == 1 and not more_data():
continue
while True:
sequence += yield read1()
self.debug_log(f"sequence={sequence!r}")

View File

@@ -190,18 +190,21 @@ class LinuxDriver(Driver):
return False
parser = XTermParser(self._target, more_data)
feed = parser.feed
utf8_decoder = getincrementaldecoder("utf-8")().decode
decode = utf8_decoder
read = os.read
EVENT_READ = selectors.EVENT_READ
try:
while not self.exit_event.is_set():
selector_events = selector.select(0.1)
for _selector_key, mask in selector_events:
if mask | selectors.EVENT_READ:
if mask | EVENT_READ:
unicode_data = decode(read(fileno, 1024))
for event in parser.feed(unicode_data):
for event in feed(unicode_data):
self.process_event(event)
except Exception as error:
log(error)