working windows driver

This commit is contained in:
Will McGugan
2022-01-27 16:54:10 +00:00
parent 4118d3eb45
commit 988838a872
4 changed files with 254 additions and 309 deletions

View File

@@ -32,5 +32,7 @@ class SmoothApp(App):
self.bar.layout_offset_x = -40 self.bar.layout_offset_x = -40
self.set_timer(10, lambda: self.action("quit"))
SmoothApp.run(log="textual.log", log_verbosity=3)
SmoothApp.run(log="textual.log", log_verbosity=2)

View File

@@ -157,21 +157,17 @@ class LinuxDriver(Driver):
pass pass
def stop_application_mode(self) -> None: def stop_application_mode(self) -> None:
self.disable_input()
with timer("disable_input"): if self.attrs_before is not None:
self.disable_input() try:
termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
except termios.error:
pass
with timer("tcsetattr"): with self.console:
if self.attrs_before is not None: self.console.set_alt_screen(False)
try: self.console.show_cursor(True)
termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
except termios.error:
pass
with timer("set_alt_screen False, show cursor"):
with self.console:
self.console.set_alt_screen(False)
self.console.show_cursor(True)
def run_input_thread(self, loop) -> None: def run_input_thread(self, loop) -> None:
try: try:

View File

@@ -1,27 +1,23 @@
# -*- coding: utf-8 -*- from asyncio import AbstractEventLoop, run_coroutine_threadsafe
# Copyright 2019 - 2021 Avram Lubkin, All Rights Reserved from codecs import getincrementaldecoder
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Support functions and wrappers for calls to the Windows API
"""
import atexit
import codecs
from collections import namedtuple
import ctypes import ctypes
from ctypes import wintypes from ctypes import byref, Structure, Union, wintypes
import io from ctypes.wintypes import CHAR, HANDLE, WCHAR, BOOL, WORD, DWORD, SHORT, UINT
import msvcrt # pylint: disable=import-error import msvcrt
import os import os
import platform
import sys import sys
import threading
LPDWORD = ctypes.POINTER(wintypes.DWORD) from tkinter.tix import WINDOW
COORD = wintypes._COORD # pylint: disable=protected-access from typing import IO, Callable, List, Optional
from ..geometry import Size
from ..events import Event, Key, Resize
from .._types import EventTarget
from .._xterm_parser import XTermParser
KERNEL32 = ctypes.WinDLL("kernel32", use_last_error=True)
# Console input modes # Console input modes
ENABLE_ECHO_INPUT = 0x0004 ENABLE_ECHO_INPUT = 0x0004
@@ -41,315 +37,248 @@ ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
DISABLE_NEWLINE_AUTO_RETURN = 0x0008 DISABLE_NEWLINE_AUTO_RETURN = 0x0008
ENABLE_LVB_GRID_WORLDWIDE = 0x0010 ENABLE_LVB_GRID_WORLDWIDE = 0x0010
if tuple(int(num) for num in platform.version().split(".")) >= ( STD_INPUT_HANDLE = -10
10, STD_OUTPUT_HANDLE = -11
0,
10586,
):
VTMODE_SUPPORTED = True
CBREAK_MODE = ENABLE_PROCESSED_INPUT | ENABLE_VIRTUAL_TERMINAL_INPUT
RAW_MODE = ENABLE_VIRTUAL_TERMINAL_INPUT
else:
VTMODE_SUPPORTED = False
CBREAK_MODE = ENABLE_PROCESSED_INPUT
RAW_MODE = 0
GTS_SUPPORTED = hasattr(os, "get_terminal_size") WAIT_TIMEOUT = 0x00000102
TerminalSize = namedtuple("TerminalSize", ("columns", "lines"))
GetStdHandle = KERNEL32.GetStdHandle
GetStdHandle.argtypes = [wintypes.DWORD]
GetStdHandle.restype = wintypes.HANDLE
class ConsoleScreenBufferInfo( class COORD(Structure):
ctypes.Structure """https://docs.microsoft.com/en-us/windows/console/coord-str"""
): # pylint: disable=too-few-public-methods
"""
Python representation of CONSOLE_SCREEN_BUFFER_INFO structure
https://docs.microsoft.com/en-us/windows/console/console-screen-buffer-info-str
"""
_fields_ = [ _fields_ = [
("dwSize", COORD), ("X", SHORT),
("dwCursorPosition", COORD), ("Y", SHORT),
("wAttributes", wintypes.WORD),
("srWindow", wintypes.SMALL_RECT),
("dwMaximumWindowSize", COORD),
] ]
CSBIP = ctypes.POINTER(ConsoleScreenBufferInfo) class uChar(Union):
"""https://docs.microsoft.com/en-us/windows/console/key-event-record-str"""
_fields_ = [
("AsciiChar", CHAR),
("UnicodeChar", WCHAR),
]
def _check_bool(result, func, args): # pylint: disable=unused-argument class KEY_EVENT_RECORD(Structure):
""" """https://docs.microsoft.com/en-us/windows/console/key-event-record-str"""
Used as an error handler for Windows calls
Gets last error if call is not successful
"""
if not result: _fields_ = [
raise ctypes.WinError(ctypes.get_last_error()) ("bKeyDown", BOOL),
return args ("wRepeatCount", WORD),
("wVirtualKeyCode", WORD),
("wVirtualScanCode", WORD),
("uChar", uChar),
("dwControlKeyState", DWORD),
]
KERNEL32 = ctypes.WinDLL("kernel32", use_last_error=True) class MOUSE_EVENT_RECORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str"""
KERNEL32.GetConsoleCP.errcheck = _check_bool _fields_ = [
KERNEL32.GetConsoleCP.argtypes = tuple() ("dwMousePosition", COORD),
("dwButtonState", DWORD),
KERNEL32.GetConsoleMode.errcheck = _check_bool ("dwControlKeyState", DWORD),
KERNEL32.GetConsoleMode.argtypes = (wintypes.HANDLE, LPDWORD) ("dwEventFlags", DWORD),
]
KERNEL32.SetConsoleMode.errcheck = _check_bool
KERNEL32.SetConsoleMode.argtypes = (wintypes.HANDLE, wintypes.DWORD)
KERNEL32.GetConsoleScreenBufferInfo.errcheck = _check_bool
KERNEL32.GetConsoleScreenBufferInfo.argtypes = (wintypes.HANDLE, CSBIP)
def get_csbi(filehandle=None): class WINDOW_BUFFER_SIZE_RECORD(Structure):
""" """https://docs.microsoft.com/en-us/windows/console/window-buffer-size-record-str"""
_fields_ = [("dwSize", COORD)]
class MENU_EVENT_RECORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/menu-event-record-str"""
_fields_ = [("dwCommandId", UINT)]
class FOCUS_EVENT_RECORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/focus-event-record-str"""
_fields_ = [("bSetFocus", BOOL)]
class InputEvent(Union):
"""https://docs.microsoft.com/en-us/windows/console/input-record-str"""
_fields_ = [
("KeyEvent", KEY_EVENT_RECORD),
("MouseEvent", MOUSE_EVENT_RECORD),
("WindowBufferSizeEvent", WINDOW_BUFFER_SIZE_RECORD),
("MenuEvent", MENU_EVENT_RECORD),
("FocusEvent", FOCUS_EVENT_RECORD),
]
class INPUT_RECORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/input-record-str"""
_fields_ = [("EventType", wintypes.WORD), ("Event", InputEvent)]
def _set_console_mode(file: IO, mode: int) -> bool:
"""Set the console mode for a given file (stdout or stdin).
Args: Args:
filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle` file (IO): A file like object.
mode (int): New mode.
Returns: Returns:
:py:class:`ConsoleScreenBufferInfo`: CONSOLE_SCREEN_BUFFER_INFO_ structure bool: True on success, otherwise False.
Wrapper for GetConsoleScreenBufferInfo_
If ``filehandle`` is :py:data:`None`, uses the filehandle of :py:data:`sys.__stdout__`.
""" """
windows_filehandle = msvcrt.get_osfhandle(file.fileno())
if filehandle is None: success = KERNEL32.SetConsoleMode(windows_filehandle, mode)
filehandle = msvcrt.get_osfhandle(sys.__stdout__.fileno()) return success
csbi = ConsoleScreenBufferInfo()
KERNEL32.GetConsoleScreenBufferInfo(filehandle, ctypes.byref(csbi))
return csbi
def get_console_input_encoding(): def _get_console_mode(file: IO) -> int:
""" """Get the console mode for a given file (stdout or stdin)
Returns:
int: Current console mode
Raises:
OSError: Error calling Windows API
Query for the console input code page and provide an encoding
If the code page can not be resolved to a Python encoding, :py:data:`None` is returned.
"""
encoding = "cp%d" % KERNEL32.GetConsoleCP()
try:
codecs.lookup(encoding)
except LookupError:
return None
return encoding
def get_console_mode(filehandle):
"""
Args: Args:
filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle` file (IO): A file-like object.
Returns: Returns:
int: Current console mode int: The current console mode.
Raises:
OSError: Error calling Windows API
Wrapper for GetConsoleMode_
""" """
windows_filehandle = msvcrt.get_osfhandle(file.fileno())
mode = wintypes.DWORD() mode = wintypes.DWORD()
KERNEL32.GetConsoleMode(filehandle, ctypes.byref(mode)) KERNEL32.GetConsoleMode(windows_filehandle, ctypes.byref(mode))
return mode.value return mode.value
def set_console_mode(filehandle, mode): def enable_application_mode() -> Callable[[], None]:
""" """Enable application mode.
Args:
filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle`
mode(int): Desired console mode
Raises:
OSError: Error calling Windows API
Wrapper for SetConsoleMode_
"""
return bool(KERNEL32.SetConsoleMode(filehandle, mode))
def setcbreak(filehandle):
"""
Args:
filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle`
Raises:
OSError: Error calling Windows API
Convenience function which mimics :py:func:`tty.setcbreak` behavior
All console input options are disabled except ``ENABLE_PROCESSED_INPUT``
and, if supported, ``ENABLE_VIRTUAL_TERMINAL_INPUT``
"""
set_console_mode(filehandle, CBREAK_MODE)
def setraw(filehandle):
"""
Args:
filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle`
Raises:
OSError: Error calling Windows API
Convenience function which mimics :py:func:`tty.setraw` behavior
All console input options are disabled except, if supported, ``ENABLE_VIRTUAL_TERMINAL_INPUT``
"""
set_console_mode(filehandle, RAW_MODE)
def enable_vt_mode(filehandle=None):
"""
Args:
filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle`
Raises:
OSError: Error calling Windows API
Enables virtual terminal processing mode for the given console
If ``filehandle`` is :py:data:`None`, uses the filehandle of :py:data:`sys.__stdout__`.
"""
if filehandle is None:
filehandle = msvcrt.get_osfhandle(sys.__stdout__.fileno())
mode = get_console_mode(filehandle)
mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING
set_console_mode(filehandle, mode)
def get_terminal_size(fd): # pylint: disable=invalid-name
"""
Args:
fd(int): Python file descriptor
Returns: Returns:
:py:class:`os.terminal_size`: Named tuple representing terminal size Callable[[], None]: A callable that will restore terminal to previous state.
Convenience function for getting terminal size
In Python 3.3 and above, this is a wrapper for :py:func:`os.get_terminal_size`.
In older versions of Python, this function calls GetConsoleScreenBufferInfo_.
""" """
# In Python 3.3+ we can let the standard library handle this terminal_in = sys.stdin
if GTS_SUPPORTED: terminal_out = sys.stdout
return os.get_terminal_size(fd)
handle = msvcrt.get_osfhandle(fd) current_console_mode_in = _get_console_mode(terminal_in)
window = get_csbi(handle).srWindow current_console_mode_out = _get_console_mode(terminal_out)
return TerminalSize(window.Right - window.Left + 1, window.Bottom - window.Top + 1)
def restore() -> None:
"""Restore console mode to previous settings"""
_set_console_mode(terminal_in, current_console_mode_in)
_set_console_mode(terminal_out, current_console_mode_out)
_set_console_mode(
terminal_out, current_console_mode_out | ENABLE_VIRTUAL_TERMINAL_PROCESSING
)
_set_console_mode(terminal_in, ENABLE_VIRTUAL_TERMINAL_INPUT)
return restore
def flush_and_set_console(fd, mode): # pylint: disable=invalid-name def _wait_for_handles(handles: List[HANDLE], timeout: int = -1) -> Optional[HANDLE]:
""" """
Args: Waits for multiple handles. (Similar to 'select') Returns the handle which is ready.
filehandle(int): Windows filehandle object as returned by :py:func:`msvcrt.get_osfhandle` Returns `None` on timeout.
mode(int): Desired console mode http://msdn.microsoft.com/en-us/library/windows/desktop/ms687025(v=vs.85).aspx
Note that handles should be a list of `HANDLE` objects, not integers. See
Attempts to set console to specified mode, but will not raise on failure this comment in the patch by @quark-zju for the reason why:
''' Make sure HANDLE on Windows has a correct size
If the file descriptor is STDOUT or STDERR, attempts to flush first Previously, the type of various HANDLEs are native Python integer
types. The ctypes library will treat them as 4-byte integer when used
in function arguments. On 64-bit Windows, HANDLE is 8-byte and usually
a small integer. Depending on whether the extra 4 bytes are zero-ed out
or not, things can happen to work, or break. '''
This function returns either `None` or one of the given `HANDLE` objects.
(The return value can be tested with the `is` operator.)
""" """
arrtype = HANDLE * len(handles)
handle_array = arrtype(*handles)
try: ret: int = KERNEL32.WaitForMultipleObjects(
if fd in (sys.__stdout__.fileno(), sys.__stderr__.fileno()): len(handle_array), handle_array, BOOL(False), DWORD(timeout)
sys.__stdout__.flush() )
sys.__stderr__.flush()
except (AttributeError, TypeError, io.UnsupportedOperation):
pass
try: if ret == WAIT_TIMEOUT:
filehandle = msvcrt.get_osfhandle(fd) return None
set_console_mode(filehandle, mode) else:
except OSError: return handles[ret]
pass
def get_term(fd, fallback=True): # pylint: disable=invalid-name class EventMonitor(threading.Thread):
""" """A thread to send key / window events to Textual loop."""
Args:
fd(int): Python file descriptor
fallback(bool): Use fallback terminal type if type can not be determined
Returns:
str: Terminal type
Attempts to determine and enable the current terminal type def __init__(
self,
loop: AbstractEventLoop,
app,
target: EventTarget,
exit_event: threading.Event,
process_event: Callable[[Event], None],
) -> None:
self.loop = loop
self.app = app
self.target = target
self.exit_event = exit_event
self.process_event = process_event
self.app.log("event monitor constructed")
super().__init__()
The current logic is: def run(self) -> None:
self.app.log("event monitor thread started")
exit_requested = self.exit_event.is_set
parser = XTermParser(self.target, lambda: False)
- If TERM is defined in the environment, the value is returned try:
- Else, if ANSICON is defined in the environment, ``'ansicon'`` is returned read_count = wintypes.DWORD(0)
- Else, if virtual terminal mode is natively supported, hIn = GetStdHandle(STD_INPUT_HANDLE)
it is enabled and ``'vtwin10'`` is returned
- Else, if ``fallback`` is ``True``, Ansicon is loaded, and ``'ansicon'`` is returned
- If no other conditions are satisfied, ``'unknown'`` is returned
This logic may change in the future as additional terminal types are added. MAX_EVENTS = 1024
""" KEY_EVENT = 0x0001
WINDOW_BUFFER_SIZE_EVENT = 0x0004
# First try TERM arrtype = INPUT_RECORD * MAX_EVENTS
term = os.environ.get("TERM", None) input_records = arrtype()
ReadConsoleInputW = KERNEL32.ReadConsoleInputW
keys: List[str] = []
if term is None: while not exit_requested():
if _wait_for_handles([hIn], 100) is None:
continue
del keys[:]
ReadConsoleInputW(
hIn, byref(input_records), MAX_EVENTS, byref(read_count)
)
read_input_records = input_records[: read_count.value]
# See if ansicon is enabled apppend_key = keys.append
if os.environ.get("ANSICON", None): new_size: Optional[tuple[int, int]] = None
term = "ansicon" for input_record in read_input_records:
event_type = input_record.EventType
if event_type == KEY_EVENT:
key_event = input_record.Event.KeyEvent
key = key_event.uChar.UnicodeChar
if key_event.bKeyDown or key == "\x1b":
apppend_key(key)
elif event_type == WINDOW_BUFFER_SIZE_EVENT:
size = input_record.Event.WindowBufferSizeEvent.dwSize
new_size = (size.X, size.Y)
# See if Windows Terminal is being used if keys:
elif os.environ.get("WT_SESSION", None): for event in parser.feed("".join(keys)):
term = "vtwin10" self.process_event(event)
if new_size is not None:
self.on_size_change(*new_size)
# See if the version of Windows supports VTMODE except Exception as error:
elif VTMODE_SUPPORTED: self.app.log("EVENT MONITOR ERROR", error)
try: self.app.log("event monitor thread finished")
filehandle = msvcrt.get_osfhandle(fd)
mode = get_console_mode(filehandle)
except OSError:
term = "unknown"
else:
atexit.register(flush_and_set_console, fd, mode)
# pylint: disable=unsupported-binary-operation
set_console_mode(filehandle, mode | ENABLE_VIRTUAL_TERMINAL_PROCESSING)
term = "vtwin10"
# Currently falling back to Ansicon for older versions of Windows def on_size_change(self, width: int, height: int) -> None:
elif fallback: """Called when terminal size changes."""
import ansicon # pylint: disable=import-error,import-outside-toplevel event = Resize(self.target, Size(width, height))
run_coroutine_threadsafe(self.target.post_message(event), loop=self.loop)
ansicon.load()
try:
filehandle = msvcrt.get_osfhandle(fd)
mode = get_console_mode(filehandle)
except OSError:
term = "unknown"
else:
atexit.register(flush_and_set_console, fd, mode)
set_console_mode(filehandle, mode ^ ENABLE_WRAP_AT_EOL_OUTPUT)
term = "ansicon"
else:
term = "unknown"
return term

View File

@@ -7,11 +7,9 @@ from codecs import getincrementaldecoder
import msvcrt import msvcrt
import os import os
import selectors
import signal
import sys import sys
from threading import Event, Thread from threading import Event, Thread
from typing import List, Optional, TYPE_CHECKING from typing import Callable, List, Optional, TYPE_CHECKING
from ..driver import Driver from ..driver import Driver
from ..geometry import Size from ..geometry import Size
@@ -69,10 +67,8 @@ class WindowsDriver(Driver):
self.exit_event = Event() self.exit_event = Event()
self._key_thread: Thread | None = None self._key_thread: Thread | None = None
self._event_thread: Thread | None = None
def _get_terminal_size(self) -> tuple[int, int]: self._restore_console: Callable[[], None] | None = None
width, height = win32.get_terminal_size(self.out_fileno)
return (width, height)
def _enable_mouse_support(self) -> None: def _enable_mouse_support(self) -> None:
write = self.console.file.write write = self.console.file.write
@@ -99,8 +95,7 @@ class WindowsDriver(Driver):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
win32.enable_vt_mode(msvcrt.get_osfhandle(self.out_fileno)) self._restore_console = win32.enable_application_mode()
win32.setraw(msvcrt.get_osfhandle(self.in_fileno))
self.console.set_alt_screen(True) self.console.set_alt_screen(True)
self._enable_mouse_support() self._enable_mouse_support()
@@ -111,11 +106,13 @@ class WindowsDriver(Driver):
app = active_app.get() app = active_app.get()
self._key_thread = Thread( # self._key_thread = Thread(
target=self.run_input_thread, args=(asyncio.get_event_loop(), app) # target=self.run_input_thread, args=(asyncio.get_event_loop(), app)
# )
self._event_thread = win32.EventMonitor(
loop, app, self._target, self.exit_event, self.process_event
) )
width, height = os.get_terminal_size(self.out_fileno)
width, height = win32.get_terminal_size(self.out_fileno)
asyncio.run_coroutine_threadsafe( asyncio.run_coroutine_threadsafe(
self._target.post_message(events.Resize(self._target, Size(width, height))), self._target.post_message(events.Resize(self._target, Size(width, height))),
@@ -124,22 +121,26 @@ class WindowsDriver(Driver):
from .._context import active_app from .._context import active_app
self._key_thread.start() # self._key_thread.start()
self._event_thread.start()
def disable_input(self) -> None: def disable_input(self) -> None:
try: try:
if not self.exit_event.is_set(): if not self.exit_event.is_set():
self._disable_mouse_support() self._disable_mouse_support()
self.exit_event.set() self.exit_event.set()
if self._key_thread is not None: if self._event_thread is not None:
self._key_thread.join() self._event_thread.join()
self._event_thread = None
except Exception as error: except Exception as error:
# TODO: log this # TODO: log this
pass pass
def stop_application_mode(self) -> None: def stop_application_mode(self) -> None:
self.disable_input() self.disable_input()
if self._restore_console:
self._restore_console()
with self.console: with self.console:
self.console.set_alt_screen(False) self.console.set_alt_screen(False)
self.console.show_cursor(True) self.console.show_cursor(True)
@@ -162,12 +163,29 @@ class WindowsDriver(Driver):
input_handle = msvcrt.get_osfhandle(self.in_fileno) input_handle = msvcrt.get_osfhandle(self.in_fileno)
app.log("input_handle", input_handle) app.log("input_handle", input_handle)
app.log("starting thread") app.log("starting thread")
import time
terminal_size = os.get_terminal_size(self.out_fileno)
import shutil
try: try:
while not self.exit_event.is_set(): while not self.exit_event.is_set():
new_terminal_size = os.get_terminal_size(self.out_fileno)
if new_terminal_size != terminal_size:
app.log("SIZE CHANGE", new_terminal_size)
terminal_size = new_terminal_size
width, height = new_terminal_size
event = events.Resize(self._target, Size(width, height))
app.log(event)
self.console.size = (width, height)
self.send_event(event)
if wait_for_handles([input_handle], 100) is None: if wait_for_handles([input_handle], 100) is None:
continue continue
unicode_data = decode(read(self.in_fileno, 1024)) unicode_data = decode(read(self.in_fileno, 1024))
app.log("key", repr(unicode_data)) # app.log("key", repr(unicode_data))
for event in parser.feed(unicode_data): for event in parser.feed(unicode_data):
self.process_event(event) self.process_event(event)
except Exception as error: except Exception as error: