From 988838a872d2c7af6a1113546ace4f15b74a3254 Mon Sep 17 00:00:00 2001 From: Will McGugan Date: Thu, 27 Jan 2022 16:54:10 +0000 Subject: [PATCH] working windows driver --- examples/animation.py | 4 +- src/textual/drivers/linux_driver.py | 22 +- src/textual/drivers/win32.py | 483 +++++++++++--------------- src/textual/drivers/windows_driver.py | 54 ++- 4 files changed, 254 insertions(+), 309 deletions(-) diff --git a/examples/animation.py b/examples/animation.py index b6b58e1be..2578834c2 100644 --- a/examples/animation.py +++ b/examples/animation.py @@ -32,5 +32,7 @@ class SmoothApp(App): self.bar.layout_offset_x = -40 + self.set_timer(10, lambda: self.action("quit")) -SmoothApp.run(log="textual.log", log_verbosity=3) + +SmoothApp.run(log="textual.log", log_verbosity=2) diff --git a/src/textual/drivers/linux_driver.py b/src/textual/drivers/linux_driver.py index 057688988..45db1925a 100644 --- a/src/textual/drivers/linux_driver.py +++ b/src/textual/drivers/linux_driver.py @@ -157,21 +157,17 @@ class LinuxDriver(Driver): pass def stop_application_mode(self) -> None: + self.disable_input() - with timer("disable_input"): - self.disable_input() + if self.attrs_before is not None: + try: + termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before) + except termios.error: + pass - with timer("tcsetattr"): - if self.attrs_before is not None: - try: - termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before) - except termios.error: - pass - - with timer("set_alt_screen False, show cursor"): - with self.console: - self.console.set_alt_screen(False) - self.console.show_cursor(True) + with self.console: + self.console.set_alt_screen(False) + self.console.show_cursor(True) def run_input_thread(self, loop) -> None: try: diff --git a/src/textual/drivers/win32.py b/src/textual/drivers/win32.py index a73a05463..3455b31bb 100644 --- a/src/textual/drivers/win32.py +++ b/src/textual/drivers/win32.py @@ -1,27 +1,23 @@ -# -*- coding: utf-8 -*- -# Copyright 2019 - 2021 Avram Lubkin, All Rights Reserved +from asyncio import AbstractEventLoop, run_coroutine_threadsafe +from codecs import getincrementaldecoder -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. - -""" -Support functions and wrappers for calls to the Windows API -""" - -import atexit -import codecs -from collections import namedtuple import ctypes -from ctypes import wintypes -import io -import msvcrt # pylint: disable=import-error +from ctypes import byref, Structure, Union, wintypes +from ctypes.wintypes import CHAR, HANDLE, WCHAR, BOOL, WORD, DWORD, SHORT, UINT +import msvcrt import os -import platform import sys +import threading -LPDWORD = ctypes.POINTER(wintypes.DWORD) -COORD = wintypes._COORD # pylint: disable=protected-access +from tkinter.tix import WINDOW +from typing import IO, Callable, List, Optional + +from ..geometry import Size +from ..events import Event, Key, Resize +from .._types import EventTarget +from .._xterm_parser import XTermParser + +KERNEL32 = ctypes.WinDLL("kernel32", use_last_error=True) # Console input modes ENABLE_ECHO_INPUT = 0x0004 @@ -41,315 +37,248 @@ ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 DISABLE_NEWLINE_AUTO_RETURN = 0x0008 ENABLE_LVB_GRID_WORLDWIDE = 0x0010 -if tuple(int(num) for num in platform.version().split(".")) >= ( - 10, - 0, - 10586, -): - VTMODE_SUPPORTED = True - CBREAK_MODE = ENABLE_PROCESSED_INPUT | ENABLE_VIRTUAL_TERMINAL_INPUT - RAW_MODE = ENABLE_VIRTUAL_TERMINAL_INPUT -else: - VTMODE_SUPPORTED = False - CBREAK_MODE = ENABLE_PROCESSED_INPUT - RAW_MODE = 0 +STD_INPUT_HANDLE = -10 +STD_OUTPUT_HANDLE = -11 -GTS_SUPPORTED = hasattr(os, "get_terminal_size") -TerminalSize = namedtuple("TerminalSize", ("columns", "lines")) +WAIT_TIMEOUT = 0x00000102 + +GetStdHandle = KERNEL32.GetStdHandle +GetStdHandle.argtypes = [wintypes.DWORD] +GetStdHandle.restype = wintypes.HANDLE -class ConsoleScreenBufferInfo( - ctypes.Structure -): # pylint: disable=too-few-public-methods - """ - Python representation of CONSOLE_SCREEN_BUFFER_INFO structure - https://docs.microsoft.com/en-us/windows/console/console-screen-buffer-info-str - """ +class COORD(Structure): + """https://docs.microsoft.com/en-us/windows/console/coord-str""" _fields_ = [ - ("dwSize", COORD), - ("dwCursorPosition", COORD), - ("wAttributes", wintypes.WORD), - ("srWindow", wintypes.SMALL_RECT), - ("dwMaximumWindowSize", COORD), + ("X", SHORT), + ("Y", SHORT), ] -CSBIP = ctypes.POINTER(ConsoleScreenBufferInfo) +class uChar(Union): + """https://docs.microsoft.com/en-us/windows/console/key-event-record-str""" + + _fields_ = [ + ("AsciiChar", CHAR), + ("UnicodeChar", WCHAR), + ] -def _check_bool(result, func, args): # pylint: disable=unused-argument - """ - Used as an error handler for Windows calls - Gets last error if call is not successful - """ +class KEY_EVENT_RECORD(Structure): + """https://docs.microsoft.com/en-us/windows/console/key-event-record-str""" - if not result: - raise ctypes.WinError(ctypes.get_last_error()) - return args + _fields_ = [ + ("bKeyDown", BOOL), + ("wRepeatCount", WORD), + ("wVirtualKeyCode", WORD), + ("wVirtualScanCode", WORD), + ("uChar", uChar), + ("dwControlKeyState", DWORD), + ] -KERNEL32 = ctypes.WinDLL("kernel32", use_last_error=True) +class MOUSE_EVENT_RECORD(Structure): + """https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str""" -KERNEL32.GetConsoleCP.errcheck = _check_bool -KERNEL32.GetConsoleCP.argtypes = tuple() - -KERNEL32.GetConsoleMode.errcheck = _check_bool -KERNEL32.GetConsoleMode.argtypes = (wintypes.HANDLE, LPDWORD) - -KERNEL32.SetConsoleMode.errcheck = _check_bool -KERNEL32.SetConsoleMode.argtypes = (wintypes.HANDLE, wintypes.DWORD) - -KERNEL32.GetConsoleScreenBufferInfo.errcheck = _check_bool -KERNEL32.GetConsoleScreenBufferInfo.argtypes = (wintypes.HANDLE, CSBIP) + _fields_ = [ + ("dwMousePosition", COORD), + ("dwButtonState", DWORD), + ("dwControlKeyState", DWORD), + ("dwEventFlags", DWORD), + ] -def get_csbi(filehandle=None): - """ +class WINDOW_BUFFER_SIZE_RECORD(Structure): + """https://docs.microsoft.com/en-us/windows/console/window-buffer-size-record-str""" + + _fields_ = [("dwSize", COORD)] + + +class MENU_EVENT_RECORD(Structure): + """https://docs.microsoft.com/en-us/windows/console/menu-event-record-str""" + + _fields_ = [("dwCommandId", UINT)] + + +class FOCUS_EVENT_RECORD(Structure): + """https://docs.microsoft.com/en-us/windows/console/focus-event-record-str""" + + _fields_ = [("bSetFocus", BOOL)] + + +class InputEvent(Union): + """https://docs.microsoft.com/en-us/windows/console/input-record-str""" + + _fields_ = [ + ("KeyEvent", KEY_EVENT_RECORD), + ("MouseEvent", MOUSE_EVENT_RECORD), + ("WindowBufferSizeEvent", WINDOW_BUFFER_SIZE_RECORD), + ("MenuEvent", MENU_EVENT_RECORD), + ("FocusEvent", FOCUS_EVENT_RECORD), + ] + + +class INPUT_RECORD(Structure): + """https://docs.microsoft.com/en-us/windows/console/input-record-str""" + + _fields_ = [("EventType", wintypes.WORD), ("Event", InputEvent)] + + +def _set_console_mode(file: IO, mode: int) -> bool: + """Set the console mode for a given file (stdout or stdin). + Args: - filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle` + file (IO): A file like object. + mode (int): New mode. Returns: - :py:class:`ConsoleScreenBufferInfo`: CONSOLE_SCREEN_BUFFER_INFO_ structure - - Wrapper for GetConsoleScreenBufferInfo_ - - If ``filehandle`` is :py:data:`None`, uses the filehandle of :py:data:`sys.__stdout__`. - + bool: True on success, otherwise False. """ - - if filehandle is None: - filehandle = msvcrt.get_osfhandle(sys.__stdout__.fileno()) - - csbi = ConsoleScreenBufferInfo() - KERNEL32.GetConsoleScreenBufferInfo(filehandle, ctypes.byref(csbi)) - return csbi + windows_filehandle = msvcrt.get_osfhandle(file.fileno()) + success = KERNEL32.SetConsoleMode(windows_filehandle, mode) + return success -def get_console_input_encoding(): - """ - Returns: - int: Current console mode +def _get_console_mode(file: IO) -> int: + """Get the console mode for a given file (stdout or stdin) - Raises: - OSError: Error calling Windows API - - Query for the console input code page and provide an encoding - - If the code page can not be resolved to a Python encoding, :py:data:`None` is returned. - """ - - encoding = "cp%d" % KERNEL32.GetConsoleCP() - - try: - codecs.lookup(encoding) - except LookupError: - return None - - return encoding - - -def get_console_mode(filehandle): - """ Args: - filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle` + file (IO): A file-like object. Returns: - int: Current console mode - - Raises: - OSError: Error calling Windows API - - Wrapper for GetConsoleMode_ + int: The current console mode. """ - + windows_filehandle = msvcrt.get_osfhandle(file.fileno()) mode = wintypes.DWORD() - KERNEL32.GetConsoleMode(filehandle, ctypes.byref(mode)) + KERNEL32.GetConsoleMode(windows_filehandle, ctypes.byref(mode)) return mode.value -def set_console_mode(filehandle, mode): - """ - Args: - filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle` - mode(int): Desired console mode - - Raises: - OSError: Error calling Windows API - - Wrapper for SetConsoleMode_ - """ - - return bool(KERNEL32.SetConsoleMode(filehandle, mode)) - - -def setcbreak(filehandle): - """ - Args: - filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle` - - Raises: - OSError: Error calling Windows API - - Convenience function which mimics :py:func:`tty.setcbreak` behavior - - All console input options are disabled except ``ENABLE_PROCESSED_INPUT`` - and, if supported, ``ENABLE_VIRTUAL_TERMINAL_INPUT`` - """ - - set_console_mode(filehandle, CBREAK_MODE) - - -def setraw(filehandle): - """ - Args: - filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle` - - Raises: - OSError: Error calling Windows API - - Convenience function which mimics :py:func:`tty.setraw` behavior - - All console input options are disabled except, if supported, ``ENABLE_VIRTUAL_TERMINAL_INPUT`` - """ - - set_console_mode(filehandle, RAW_MODE) - - -def enable_vt_mode(filehandle=None): - """ - Args: - filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle` - - Raises: - OSError: Error calling Windows API - - Enables virtual terminal processing mode for the given console - - If ``filehandle`` is :py:data:`None`, uses the filehandle of :py:data:`sys.__stdout__`. - """ - - if filehandle is None: - filehandle = msvcrt.get_osfhandle(sys.__stdout__.fileno()) - - mode = get_console_mode(filehandle) - mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING - set_console_mode(filehandle, mode) - - -def get_terminal_size(fd): # pylint: disable=invalid-name - """ - Args: - fd(int): Python file descriptor +def enable_application_mode() -> Callable[[], None]: + """Enable application mode. Returns: - :py:class:`os.terminal_size`: Named tuple representing terminal size - - Convenience function for getting terminal size - - In Python 3.3 and above, this is a wrapper for :py:func:`os.get_terminal_size`. - In older versions of Python, this function calls GetConsoleScreenBufferInfo_. + Callable[[], None]: A callable that will restore terminal to previous state. """ - # In Python 3.3+ we can let the standard library handle this - if GTS_SUPPORTED: - return os.get_terminal_size(fd) + terminal_in = sys.stdin + terminal_out = sys.stdout - handle = msvcrt.get_osfhandle(fd) - window = get_csbi(handle).srWindow - return TerminalSize(window.Right - window.Left + 1, window.Bottom - window.Top + 1) + current_console_mode_in = _get_console_mode(terminal_in) + current_console_mode_out = _get_console_mode(terminal_out) + + def restore() -> None: + """Restore console mode to previous settings""" + _set_console_mode(terminal_in, current_console_mode_in) + _set_console_mode(terminal_out, current_console_mode_out) + + _set_console_mode( + terminal_out, current_console_mode_out | ENABLE_VIRTUAL_TERMINAL_PROCESSING + ) + _set_console_mode(terminal_in, ENABLE_VIRTUAL_TERMINAL_INPUT) + return restore -def flush_and_set_console(fd, mode): # pylint: disable=invalid-name +def _wait_for_handles(handles: List[HANDLE], timeout: int = -1) -> Optional[HANDLE]: """ - Args: - filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle` - mode(int): Desired console mode - - Attempts to set console to specified mode, but will not raise on failure - - If the file descriptor is STDOUT or STDERR, attempts to flush first + Waits for multiple handles. (Similar to 'select') Returns the handle which is ready. + Returns `None` on timeout. + http://msdn.microsoft.com/en-us/library/windows/desktop/ms687025(v=vs.85).aspx + Note that handles should be a list of `HANDLE` objects, not integers. See + this comment in the patch by @quark-zju for the reason why: + ''' Make sure HANDLE on Windows has a correct size + Previously, the type of various HANDLEs are native Python integer + types. The ctypes library will treat them as 4-byte integer when used + in function arguments. On 64-bit Windows, HANDLE is 8-byte and usually + a small integer. Depending on whether the extra 4 bytes are zero-ed out + or not, things can happen to work, or break. ''' + This function returns either `None` or one of the given `HANDLE` objects. + (The return value can be tested with the `is` operator.) """ + arrtype = HANDLE * len(handles) + handle_array = arrtype(*handles) - try: - if fd in (sys.__stdout__.fileno(), sys.__stderr__.fileno()): - sys.__stdout__.flush() - sys.__stderr__.flush() - except (AttributeError, TypeError, io.UnsupportedOperation): - pass + ret: int = KERNEL32.WaitForMultipleObjects( + len(handle_array), handle_array, BOOL(False), DWORD(timeout) + ) - try: - filehandle = msvcrt.get_osfhandle(fd) - set_console_mode(filehandle, mode) - except OSError: - pass + if ret == WAIT_TIMEOUT: + return None + else: + return handles[ret] -def get_term(fd, fallback=True): # pylint: disable=invalid-name - """ - Args: - fd(int): Python file descriptor - fallback(bool): Use fallback terminal type if type can not be determined - Returns: - str: Terminal type +class EventMonitor(threading.Thread): + """A thread to send key / window events to Textual loop.""" - Attempts to determine and enable the current terminal type + def __init__( + self, + loop: AbstractEventLoop, + app, + target: EventTarget, + exit_event: threading.Event, + process_event: Callable[[Event], None], + ) -> None: + self.loop = loop + self.app = app + self.target = target + self.exit_event = exit_event + self.process_event = process_event + self.app.log("event monitor constructed") + super().__init__() - The current logic is: + def run(self) -> None: + self.app.log("event monitor thread started") + exit_requested = self.exit_event.is_set + parser = XTermParser(self.target, lambda: False) - - If TERM is defined in the environment, the value is returned - - Else, if ANSICON is defined in the environment, ``'ansicon'`` is returned - - Else, if virtual terminal mode is natively supported, - it is enabled and ``'vtwin10'`` is returned - - Else, if ``fallback`` is ``True``, Ansicon is loaded, and ``'ansicon'`` is returned - - If no other conditions are satisfied, ``'unknown'`` is returned + try: + read_count = wintypes.DWORD(0) + hIn = GetStdHandle(STD_INPUT_HANDLE) - This logic may change in the future as additional terminal types are added. - """ + MAX_EVENTS = 1024 + KEY_EVENT = 0x0001 + WINDOW_BUFFER_SIZE_EVENT = 0x0004 - # First try TERM - term = os.environ.get("TERM", None) + arrtype = INPUT_RECORD * MAX_EVENTS + input_records = arrtype() + ReadConsoleInputW = KERNEL32.ReadConsoleInputW + keys: List[str] = [] - if term is None: + while not exit_requested(): + if _wait_for_handles([hIn], 100) is None: + continue + del keys[:] + ReadConsoleInputW( + hIn, byref(input_records), MAX_EVENTS, byref(read_count) + ) + read_input_records = input_records[: read_count.value] - # See if ansicon is enabled - if os.environ.get("ANSICON", None): - term = "ansicon" + apppend_key = keys.append + new_size: Optional[tuple[int, int]] = None + for input_record in read_input_records: + event_type = input_record.EventType + if event_type == KEY_EVENT: + key_event = input_record.Event.KeyEvent + key = key_event.uChar.UnicodeChar + if key_event.bKeyDown or key == "\x1b": + apppend_key(key) + elif event_type == WINDOW_BUFFER_SIZE_EVENT: + size = input_record.Event.WindowBufferSizeEvent.dwSize + new_size = (size.X, size.Y) - # See if Windows Terminal is being used - elif os.environ.get("WT_SESSION", None): - term = "vtwin10" + if keys: + for event in parser.feed("".join(keys)): + self.process_event(event) + if new_size is not None: + self.on_size_change(*new_size) - # See if the version of Windows supports VTMODE - elif VTMODE_SUPPORTED: - try: - filehandle = msvcrt.get_osfhandle(fd) - mode = get_console_mode(filehandle) - except OSError: - term = "unknown" - else: - atexit.register(flush_and_set_console, fd, mode) - # pylint: disable=unsupported-binary-operation - set_console_mode(filehandle, mode | ENABLE_VIRTUAL_TERMINAL_PROCESSING) - term = "vtwin10" + except Exception as error: + self.app.log("EVENT MONITOR ERROR", error) + self.app.log("event monitor thread finished") - # Currently falling back to Ansicon for older versions of Windows - elif fallback: - import ansicon # pylint: disable=import-error,import-outside-toplevel - - ansicon.load() - - try: - filehandle = msvcrt.get_osfhandle(fd) - mode = get_console_mode(filehandle) - except OSError: - term = "unknown" - else: - atexit.register(flush_and_set_console, fd, mode) - set_console_mode(filehandle, mode ^ ENABLE_WRAP_AT_EOL_OUTPUT) - term = "ansicon" - - else: - term = "unknown" - - return term + def on_size_change(self, width: int, height: int) -> None: + """Called when terminal size changes.""" + event = Resize(self.target, Size(width, height)) + run_coroutine_threadsafe(self.target.post_message(event), loop=self.loop) diff --git a/src/textual/drivers/windows_driver.py b/src/textual/drivers/windows_driver.py index f31ac5363..743d35b11 100644 --- a/src/textual/drivers/windows_driver.py +++ b/src/textual/drivers/windows_driver.py @@ -7,11 +7,9 @@ from codecs import getincrementaldecoder import msvcrt import os -import selectors -import signal import sys from threading import Event, Thread -from typing import List, Optional, TYPE_CHECKING +from typing import Callable, List, Optional, TYPE_CHECKING from ..driver import Driver from ..geometry import Size @@ -69,10 +67,8 @@ class WindowsDriver(Driver): self.exit_event = Event() self._key_thread: Thread | None = None - - def _get_terminal_size(self) -> tuple[int, int]: - width, height = win32.get_terminal_size(self.out_fileno) - return (width, height) + self._event_thread: Thread | None = None + self._restore_console: Callable[[], None] | None = None def _enable_mouse_support(self) -> None: write = self.console.file.write @@ -99,8 +95,7 @@ class WindowsDriver(Driver): loop = asyncio.get_event_loop() - win32.enable_vt_mode(msvcrt.get_osfhandle(self.out_fileno)) - win32.setraw(msvcrt.get_osfhandle(self.in_fileno)) + self._restore_console = win32.enable_application_mode() self.console.set_alt_screen(True) self._enable_mouse_support() @@ -111,11 +106,13 @@ class WindowsDriver(Driver): app = active_app.get() - self._key_thread = Thread( - target=self.run_input_thread, args=(asyncio.get_event_loop(), app) + # self._key_thread = Thread( + # target=self.run_input_thread, args=(asyncio.get_event_loop(), app) + # ) + self._event_thread = win32.EventMonitor( + loop, app, self._target, self.exit_event, self.process_event ) - - width, height = win32.get_terminal_size(self.out_fileno) + width, height = os.get_terminal_size(self.out_fileno) asyncio.run_coroutine_threadsafe( self._target.post_message(events.Resize(self._target, Size(width, height))), @@ -124,22 +121,26 @@ class WindowsDriver(Driver): from .._context import active_app - self._key_thread.start() + # self._key_thread.start() + + self._event_thread.start() def disable_input(self) -> None: try: if not self.exit_event.is_set(): self._disable_mouse_support() self.exit_event.set() - if self._key_thread is not None: - self._key_thread.join() + if self._event_thread is not None: + self._event_thread.join() + self._event_thread = None except Exception as error: # TODO: log this pass def stop_application_mode(self) -> None: self.disable_input() - + if self._restore_console: + self._restore_console() with self.console: self.console.set_alt_screen(False) self.console.show_cursor(True) @@ -162,12 +163,29 @@ class WindowsDriver(Driver): input_handle = msvcrt.get_osfhandle(self.in_fileno) app.log("input_handle", input_handle) app.log("starting thread") + import time + + terminal_size = os.get_terminal_size(self.out_fileno) + import shutil + try: while not self.exit_event.is_set(): + + new_terminal_size = os.get_terminal_size(self.out_fileno) + + if new_terminal_size != terminal_size: + app.log("SIZE CHANGE", new_terminal_size) + terminal_size = new_terminal_size + width, height = new_terminal_size + event = events.Resize(self._target, Size(width, height)) + app.log(event) + self.console.size = (width, height) + self.send_event(event) + if wait_for_handles([input_handle], 100) is None: continue unicode_data = decode(read(self.in_fileno, 1024)) - app.log("key", repr(unicode_data)) + # app.log("key", repr(unicode_data)) for event in parser.feed(unicode_data): self.process_event(event) except Exception as error: