mirror of
https://github.com/Textualize/textual.git
synced 2025-10-17 02:38:12 +03:00
more defensive sleep
This commit is contained in:
@@ -1,54 +1,69 @@
|
||||
import ctypes
|
||||
from ctypes.wintypes import LARGE_INTEGER
|
||||
"""
|
||||
A version of `time.sleep` that is more accurate than the standard library (even on Python 3.11).
|
||||
|
||||
This should only be imported on Windows.
|
||||
|
||||
"""
|
||||
|
||||
from time import sleep as time_sleep
|
||||
|
||||
|
||||
__all__ = ["sleep"]
|
||||
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
|
||||
INFINITE = 0xFFFFFFFF
|
||||
WAIT_FAILED = 0xFFFFFFFF
|
||||
CREATE_WAITABLE_TIMER_HIGH_RESOLUTION = 0x00000002
|
||||
|
||||
try:
|
||||
import ctypes
|
||||
from ctypes.wintypes import LARGE_INTEGER
|
||||
|
||||
def sleep(sleep_for: float) -> None:
|
||||
"""A replacement sleep for Windows.
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
except Exception:
|
||||
sleep = time_sleep
|
||||
else:
|
||||
|
||||
Python 3.11 added a more accurate sleep. This may be used on < Python 3.11
|
||||
def sleep(secs: float) -> None:
|
||||
"""A replacement sleep for Windows.
|
||||
|
||||
Args:
|
||||
sleep_for (float): Seconds to sleep for.
|
||||
"""
|
||||
Note that unlike `time.sleep` this *may* sleep for slightly less than the
|
||||
specified time. This is generally not an issue for Textual's use case.
|
||||
|
||||
# Subtract a millisecond to account for overhead
|
||||
sleep_for = max(0, sleep_for - 0.001)
|
||||
if sleep_for < 0.0005:
|
||||
# Less than 0.5ms and its not worth doing the sleep
|
||||
return
|
||||
Args:
|
||||
secs (float): Seconds to sleep for.
|
||||
"""
|
||||
|
||||
handle = kernel32.CreateWaitableTimerExW(
|
||||
None,
|
||||
None,
|
||||
CREATE_WAITABLE_TIMER_HIGH_RESOLUTION,
|
||||
0x1F0003,
|
||||
)
|
||||
if not handle:
|
||||
time_sleep(sleep_for)
|
||||
return
|
||||
# Subtract a millisecond to account for overhead
|
||||
sleep_for = max(0, secs - 0.001)
|
||||
if sleep_for < 0.0005:
|
||||
# Less than 0.5ms and its not worth doing the sleep
|
||||
return
|
||||
|
||||
if not kernel32.SetWaitableTimer(
|
||||
handle,
|
||||
ctypes.byref(LARGE_INTEGER(int(sleep_for * -10_000_000))),
|
||||
0,
|
||||
None,
|
||||
None,
|
||||
0,
|
||||
):
|
||||
kernel32.CloseHandle(handle)
|
||||
time_sleep(sleep_for)
|
||||
return
|
||||
handle = kernel32.CreateWaitableTimerExW(
|
||||
None,
|
||||
None,
|
||||
CREATE_WAITABLE_TIMER_HIGH_RESOLUTION,
|
||||
0x1F0003,
|
||||
)
|
||||
if not handle:
|
||||
time_sleep(sleep_for)
|
||||
return
|
||||
|
||||
if kernel32.WaitForSingleObject(handle, INFINITE) == WAIT_FAILED:
|
||||
time_sleep(sleep_for)
|
||||
kernel32.CloseHandle(handle)
|
||||
try:
|
||||
if not kernel32.SetWaitableTimer(
|
||||
handle,
|
||||
ctypes.byref(LARGE_INTEGER(int(sleep_for * -10_000_000))),
|
||||
0,
|
||||
None,
|
||||
None,
|
||||
0,
|
||||
):
|
||||
kernel32.CloseHandle(handle)
|
||||
time_sleep(sleep_for)
|
||||
return
|
||||
|
||||
if kernel32.WaitForSingleObject(handle, INFINITE) == WAIT_FAILED:
|
||||
time_sleep(sleep_for)
|
||||
finally:
|
||||
kernel32.CloseHandle(handle)
|
||||
|
||||
Reference in New Issue
Block a user