win32 support

This commit is contained in:
Will McGugan
2022-01-20 16:02:56 +00:00
parent 54e6342864
commit 4118d3eb45

View File

@@ -1,14 +1,17 @@
from __future__ import annotations
import asyncio
from ctypes import windll
from ctypes.wintypes import BOOL, DWORD, HANDLE
from codecs import getincrementaldecoder
import msvcrt
import os
import selectors
import signal
import sys
from threading import Event, Thread
from typing import TYPE_CHECKING
from typing import List, Optional, TYPE_CHECKING
from ..driver import Driver
from ..geometry import Size
@@ -22,6 +25,38 @@ from .._xterm_parser import XTermParser
if TYPE_CHECKING:
from rich.console import Console
from textual.app import App
WAIT_TIMEOUT = 0x00000102
def wait_for_handles(handles: List[HANDLE], timeout: int = -1) -> Optional[HANDLE]:
"""
Waits for multiple handles. (Similar to 'select') Returns the handle which is ready.
Returns `None` on timeout.
http://msdn.microsoft.com/en-us/library/windows/desktop/ms687025(v=vs.85).aspx
Note that handles should be a list of `HANDLE` objects, not integers. See
this comment in the patch by @quark-zju for the reason why:
''' Make sure HANDLE on Windows has a correct size
Previously, the type of various HANDLEs are native Python integer
types. The ctypes library will treat them as 4-byte integer when used
in function arguments. On 64-bit Windows, HANDLE is 8-byte and usually
a small integer. Depending on whether the extra 4 bytes are zero-ed out
or not, things can happen to work, or break. '''
This function returns either `None` or one of the given `HANDLE` objects.
(The return value can be tested with the `is` operator.)
"""
arrtype = HANDLE * len(handles)
handle_array = arrtype(*handles)
ret: int = windll.kernel32.WaitForMultipleObjects(
len(handle_array), handle_array, BOOL(False), DWORD(timeout)
)
if ret == WAIT_TIMEOUT:
return None
else:
return handles[ret]
class WindowsDriver(Driver):
@@ -64,17 +99,20 @@ class WindowsDriver(Driver):
loop = asyncio.get_event_loop()
filehandle = msvcrt.get_osfhandle(self.out_fileno)
win32.enable_vt_mode(filehandle)
win32.enable_vt_mode(msvcrt.get_osfhandle(self.out_fileno))
win32.setraw(msvcrt.get_osfhandle(self.in_fileno))
self.console.set_alt_screen(True)
self._enable_mouse_support()
self.console.show_cursor(False)
self.console.file.write("\033[?1003h\n")
win32.setraw(msvcrt.get_osfhandle(self.in_fileno))
from .._context import active_app
app = active_app.get()
self._key_thread = Thread(
target=self.run_input_thread, args=(asyncio.get_event_loop(),)
target=self.run_input_thread, args=(asyncio.get_event_loop(), app)
)
width, height = win32.get_terminal_size(self.out_fileno)
@@ -83,7 +121,9 @@ class WindowsDriver(Driver):
self._target.post_message(events.Resize(self._target, Size(width, height))),
loop=loop,
)
log("starting key thread")
from .._context import active_app
self._key_thread.start()
def disable_input(self) -> None:
@@ -104,45 +144,33 @@ class WindowsDriver(Driver):
self.console.set_alt_screen(False)
self.console.show_cursor(True)
def run_input_thread(self, loop) -> None:
def run_input_thread(self, loop, app: App) -> None:
try:
self._run_input_thread(loop)
except Exception:
pass # TODO: log
self._run_input_thread(loop, app)
except Exception as error:
app.log(error)
def _run_input_thread(self, loop) -> None:
log("input thread")
def _run_input_thread(self, loop, app: App) -> None:
app.log("input thread")
selector = selectors.DefaultSelector()
selector.register(self.in_fileno, selectors.EVENT_READ)
fileno = self.in_fileno
def more_data() -> bool:
"""Check if there is more data to parse."""
for key, events in selector.select(0.01):
if events:
return True
return False
parser = XTermParser(self._target, more_data)
parser = XTermParser(self._target, lambda: False)
utf8_decoder = getincrementaldecoder("utf-8")().decode
decode = utf8_decoder
read = os.read
log("starting thread")
input_handle = msvcrt.get_osfhandle(self.in_fileno)
app.log("input_handle", input_handle)
app.log("starting thread")
try:
while not self.exit_event.is_set():
selector_events = selector.select(0.1)
for _selector_key, mask in selector_events:
log(mask)
if mask | selectors.EVENT_READ:
unicode_data = decode(read(fileno, 1024))
log("ket", unicode_data)
if wait_for_handles([input_handle], 100) is None:
continue
unicode_data = decode(read(self.in_fileno, 1024))
app.log("key", repr(unicode_data))
for event in parser.feed(unicode_data):
self.process_event(event)
except Exception as error:
log(error)
app.log(error)
finally:
selector.close()
app.log("input thread finished")