mirror of
https://github.com/Textualize/textual.git
synced 2025-10-17 02:38:12 +03:00
sleep
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import platform
|
||||
import sys
|
||||
|
||||
from asyncio import sleep as asyncio_sleep, get_running_loop
|
||||
from time import monotonic, perf_counter, sleep as time_sleep
|
||||
@@ -14,17 +15,30 @@ else:
|
||||
time = monotonic
|
||||
|
||||
|
||||
import ctypes
|
||||
|
||||
winmm = ctypes.WinDLL("winmm")
|
||||
|
||||
|
||||
if WINDOWS:
|
||||
|
||||
async def sleep(sleep_for: float) -> None:
|
||||
"""An asyncio sleep.
|
||||
if sys.version_info >= (3, 11, 0):
|
||||
|
||||
On Windows this achieves a better granularity that asyncio.sleep
|
||||
async def sleep(sleep_for: float) -> None:
|
||||
"""An asyncio sleep.
|
||||
|
||||
Args:
|
||||
sleep_for (float): Seconds to sleep for.
|
||||
"""
|
||||
await get_running_loop().run_in_executor(None, time_sleep, sleep_for)
|
||||
On Windows this achieves a better granularity that asyncio.sleep
|
||||
|
||||
Args:
|
||||
sleep_for (float): Seconds to sleep for.
|
||||
"""
|
||||
await get_running_loop().run_in_executor(None, time_sleep, sleep_for)
|
||||
|
||||
else:
|
||||
from ._win_sleep import sleep as win_sleep
|
||||
|
||||
async def sleep(sleep_for: float) -> None:
|
||||
await get_running_loop().run_in_executor(None, win_sleep, sleep_for)
|
||||
|
||||
else:
|
||||
sleep = asyncio_sleep
|
||||
|
||||
34
src/textual/_win_sleep.py
Normal file
34
src/textual/_win_sleep.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import ctypes
|
||||
from ctypes.wintypes import LARGE_INTEGER
|
||||
from time import sleep as time_sleep
|
||||
|
||||
__all__ = ["sleep"]
|
||||
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
|
||||
INFINITE = 0xFFFFFFFF
|
||||
WAIT_FAILED = 0xFFFFFFFF
|
||||
CREATE_WAITABLE_TIMER_HIGH_RESOLUTION = 0x00000002
|
||||
|
||||
|
||||
def sleep(sleep_for: float) -> None:
|
||||
handle = kernel32.CreateWaitableTimerExW(
|
||||
None, None, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, 0x1F0003
|
||||
)
|
||||
if not handle:
|
||||
time_sleep(sleep_for)
|
||||
return
|
||||
|
||||
if not kernel32.SetWaitableTimer(
|
||||
handle,
|
||||
ctypes.byref(LARGE_INTEGER(int(sleep_for * -10000))),
|
||||
0,
|
||||
None,
|
||||
None,
|
||||
0,
|
||||
):
|
||||
time_sleep(sleep_for)
|
||||
return
|
||||
|
||||
kernel32.WaitForSingleObject(handle, INFINITE)
|
||||
kernel32.CancelWaitableTimer(handle)
|
||||
Reference in New Issue
Block a user