mirror of
https://github.com/Textualize/textual.git
synced 2025-10-17 02:38:12 +03:00
Merge pull request #420 from Textualize/esc-fix
fix for escape key processing
This commit is contained in:
@@ -47,7 +47,7 @@ class _ReadUntil(Awaitable):
|
||||
self.max_bytes = max_bytes
|
||||
|
||||
|
||||
class PeekBuffer(Awaitable):
|
||||
class _PeekBuffer(Awaitable):
|
||||
__slots__: list[str] = []
|
||||
|
||||
|
||||
@@ -61,7 +61,7 @@ class Parser(Generic[T]):
|
||||
read = _Read
|
||||
read1 = _Read1
|
||||
read_until = _ReadUntil
|
||||
peek_buffer = PeekBuffer
|
||||
peek_buffer = _PeekBuffer
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._buffer = io.StringIO()
|
||||
@@ -103,14 +103,14 @@ class Parser(Generic[T]):
|
||||
while tokens:
|
||||
yield popleft()
|
||||
|
||||
while pos < data_size or isinstance(self._awaiting, PeekBuffer):
|
||||
while pos < data_size or isinstance(self._awaiting, _PeekBuffer):
|
||||
|
||||
_awaiting = self._awaiting
|
||||
if isinstance(_awaiting, _Read1):
|
||||
self._awaiting = self._gen.send(data[pos : pos + 1])
|
||||
pos += 1
|
||||
|
||||
elif isinstance(_awaiting, PeekBuffer):
|
||||
elif isinstance(_awaiting, _PeekBuffer):
|
||||
self._awaiting = self._gen.send(data[pos:])
|
||||
|
||||
elif isinstance(_awaiting, _Read):
|
||||
|
||||
@@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
from typing import Any, Callable, Generator
|
||||
from typing import Any, Callable, Generator, Iterable
|
||||
|
||||
from . import log
|
||||
from . import events
|
||||
@@ -34,6 +34,11 @@ class XTermParser(Parser[events.Event]):
|
||||
def debug_log(self, *args: Any) -> None:
|
||||
if self._debug_log_file is not None:
|
||||
self._debug_log_file.write(" ".join(args) + "\n")
|
||||
self._debug_log_file.flush()
|
||||
|
||||
def feed(self, data: str) -> Iterable[events.Event]:
|
||||
self.debug_log(f"FEED {data!r}")
|
||||
return super().feed(data)
|
||||
|
||||
def parse_mouse_code(self, code: str, sender: MessageTarget) -> events.Event | None:
|
||||
sgr_match = self._re_sgr_mouse.match(code)
|
||||
@@ -83,10 +88,23 @@ class XTermParser(Parser[events.Event]):
|
||||
while not self.is_eof:
|
||||
character = yield read1()
|
||||
self.debug_log(f"character={character!r}")
|
||||
# The more_data is to allow the parse to distinguish between an escape sequence
|
||||
# and the escape key pressed
|
||||
if character == ESC and ((yield self.peek_buffer()) or more_data()):
|
||||
if character == ESC:
|
||||
# Could be the escape key was pressed OR the start of an escape sequence
|
||||
sequence: str = character
|
||||
peek_buffer = yield self.peek_buffer()
|
||||
if not peek_buffer:
|
||||
# An escape arrived without any following characters
|
||||
on_token(events.Key(self.sender, key=ESC))
|
||||
continue
|
||||
if peek_buffer and peek_buffer[0] == ESC:
|
||||
# There is an escape in the buffer, so ESC ESC has arrived
|
||||
yield read1()
|
||||
on_token(events.Key(self.sender, key=ESC))
|
||||
# If there is no further data, it is not part of a sequence,
|
||||
# So we don't need to go in to the loop
|
||||
if len(peek_buffer) == 1 and not more_data():
|
||||
continue
|
||||
|
||||
while True:
|
||||
sequence += yield read1()
|
||||
self.debug_log(f"sequence={sequence!r}")
|
||||
|
||||
@@ -190,18 +190,21 @@ class LinuxDriver(Driver):
|
||||
return False
|
||||
|
||||
parser = XTermParser(self._target, more_data)
|
||||
feed = parser.feed
|
||||
|
||||
utf8_decoder = getincrementaldecoder("utf-8")().decode
|
||||
decode = utf8_decoder
|
||||
read = os.read
|
||||
|
||||
EVENT_READ = selectors.EVENT_READ
|
||||
|
||||
try:
|
||||
while not self.exit_event.is_set():
|
||||
selector_events = selector.select(0.1)
|
||||
for _selector_key, mask in selector_events:
|
||||
if mask | selectors.EVENT_READ:
|
||||
if mask | EVENT_READ:
|
||||
unicode_data = decode(read(fileno, 1024))
|
||||
for event in parser.feed(unicode_data):
|
||||
for event in feed(unicode_data):
|
||||
self.process_event(event)
|
||||
except Exception as error:
|
||||
log(error)
|
||||
|
||||
@@ -285,5 +285,6 @@ class EventMonitor(threading.Thread):
|
||||
|
||||
def on_size_change(self, width: int, height: int) -> None:
|
||||
"""Called when terminal size changes."""
|
||||
event = Resize(self.target, Size(width, height))
|
||||
size = Size(width, height)
|
||||
event = Resize(self.target, size, size)
|
||||
run_coroutine_threadsafe(self.target.post_message(event), loop=self.loop)
|
||||
|
||||
55
tests/test_parser.py
Normal file
55
tests/test_parser.py
Normal file
@@ -0,0 +1,55 @@
|
||||
from textual._parser import Parser
|
||||
|
||||
|
||||
def test_read1():
|
||||
class TestParser(Parser[str]):
|
||||
"""A simple parser that reads a byte at a time from a stream."""
|
||||
|
||||
def parse(self, on_token):
|
||||
while True:
|
||||
data = yield self.read1()
|
||||
if not data:
|
||||
break
|
||||
on_token(data)
|
||||
|
||||
test_parser = TestParser()
|
||||
|
||||
test_data = "Where there is a Will there is a way!"
|
||||
|
||||
for size in range(1, len(test_data) + 1):
|
||||
# Feed the parser in pieces, first 1 character at a time, then 2, etc
|
||||
test_parser = TestParser()
|
||||
data = []
|
||||
for offset in range(0, len(test_data), size):
|
||||
for chunk in test_parser.feed(test_data[offset : offset + size]):
|
||||
data.append(chunk)
|
||||
# Check we have received all the data in characters, no matter the fee dsize
|
||||
assert len(data) == len(test_data)
|
||||
assert "".join(data) == test_data
|
||||
|
||||
|
||||
def test_read():
|
||||
class TestParser(Parser[str]):
|
||||
"""A parser that reads chunks of a given size from the stream."""
|
||||
|
||||
def __init__(self, size):
|
||||
self.size = size
|
||||
super().__init__()
|
||||
|
||||
def parse(self, on_token):
|
||||
while True:
|
||||
data = yield self.read1()
|
||||
if not data:
|
||||
break
|
||||
on_token(data)
|
||||
|
||||
test_data = "Where there is a Will there is a way!"
|
||||
|
||||
for read_size in range(1, len(test_data) + 1):
|
||||
for size in range(1, len(test_data) + 1):
|
||||
test_parser = TestParser(read_size)
|
||||
data = []
|
||||
for offset in range(0, len(test_data), size):
|
||||
for chunk in test_parser.feed(test_data[offset : offset + size]):
|
||||
data.append(chunk)
|
||||
assert "".join(data) == test_data
|
||||
Reference in New Issue
Block a user