From 6c31c0850ecddb75a01a61d06242e24aa3d24193 Mon Sep 17 00:00:00 2001 From: Will McGugan Date: Mon, 2 Jan 2023 16:33:14 +0000 Subject: [PATCH] more defensive sleep --- src/textual/_win_sleep.py | 89 +++++++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 37 deletions(-) diff --git a/src/textual/_win_sleep.py b/src/textual/_win_sleep.py index e90930afc..b53363fa3 100644 --- a/src/textual/_win_sleep.py +++ b/src/textual/_win_sleep.py @@ -1,54 +1,69 @@ -import ctypes -from ctypes.wintypes import LARGE_INTEGER +""" +A version of `time.sleep` that is more accurate than the standard library (even on Python 3.11). + +This should only be imported on Windows. + +""" + from time import sleep as time_sleep __all__ = ["sleep"] -kernel32 = ctypes.windll.kernel32 INFINITE = 0xFFFFFFFF WAIT_FAILED = 0xFFFFFFFF CREATE_WAITABLE_TIMER_HIGH_RESOLUTION = 0x00000002 +try: + import ctypes + from ctypes.wintypes import LARGE_INTEGER -def sleep(sleep_for: float) -> None: - """A replacement sleep for Windows. + kernel32 = ctypes.windll.kernel32 +except Exception: + sleep = time_sleep +else: - Python 3.11 added a more accurate sleep. This may be used on < Python 3.11 + def sleep(secs: float) -> None: + """A replacement sleep for Windows. - Args: - sleep_for (float): Seconds to sleep for. - """ + Note that unlike `time.sleep` this *may* sleep for slightly less than the + specified time. This is generally not an issue for Textual's use case. - # Subtract a millisecond to account for overhead - sleep_for = max(0, sleep_for - 0.001) - if sleep_for < 0.0005: - # Less than 0.5ms and its not worth doing the sleep - return + Args: + secs (float): Seconds to sleep for. + """ - handle = kernel32.CreateWaitableTimerExW( - None, - None, - CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, - 0x1F0003, - ) - if not handle: - time_sleep(sleep_for) - return + # Subtract a millisecond to account for overhead + sleep_for = max(0, secs - 0.001) + if sleep_for < 0.0005: + # Less than 0.5ms and its not worth doing the sleep + return - if not kernel32.SetWaitableTimer( - handle, - ctypes.byref(LARGE_INTEGER(int(sleep_for * -10_000_000))), - 0, - None, - None, - 0, - ): - kernel32.CloseHandle(handle) - time_sleep(sleep_for) - return + handle = kernel32.CreateWaitableTimerExW( + None, + None, + CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, + 0x1F0003, + ) + if not handle: + time_sleep(sleep_for) + return - if kernel32.WaitForSingleObject(handle, INFINITE) == WAIT_FAILED: - time_sleep(sleep_for) - kernel32.CloseHandle(handle) + try: + if not kernel32.SetWaitableTimer( + handle, + ctypes.byref(LARGE_INTEGER(int(sleep_for * -10_000_000))), + 0, + None, + None, + 0, + ): + kernel32.CloseHandle(handle) + time_sleep(sleep_for) + return + + if kernel32.WaitForSingleObject(handle, INFINITE) == WAIT_FAILED: + time_sleep(sleep_for) + finally: + kernel32.CloseHandle(handle)