fix for removing

This commit is contained in:
Will McGugan
2022-08-19 09:33:36 +01:00
parent e761e7ae8f
commit fd349aa658
7 changed files with 73 additions and 57 deletions

View File

@@ -87,7 +87,7 @@ class MessagePump(metaclass=MessagePumpMeta):
self._disabled_messages: set[type[Message]] = set()
self._pending_message: Message | None = None
self._task: Task | None = None
self._child_tasks: WeakSet[Task] = WeakSet()
self._timers: WeakSet[Timer] = WeakSet()
@property
def task(self) -> Task:
@@ -130,6 +130,10 @@ class MessagePump(metaclass=MessagePumpMeta):
"""
self._parent = parent
def _detach(self) -> None:
"""Set the parent to None to remove the node from the tree."""
self._parent = None
def check_message_enabled(self, message: Message) -> bool:
return type(message) not in self._disabled_messages
@@ -199,7 +203,8 @@ class MessagePump(metaclass=MessagePumpMeta):
repeat=0,
pause=pause,
)
self._child_tasks.add(timer.start())
timer.start()
self._timers.add(timer)
return timer
def set_interval(
@@ -220,7 +225,8 @@ class MessagePump(metaclass=MessagePumpMeta):
repeat=repeat or None,
pause=pause,
)
self._child_tasks.add(timer.start())
timer.start()
self._timers.add(timer)
return timer
def call_later(self, callback: Callable, *args, **kwargs) -> None:
@@ -248,13 +254,11 @@ class MessagePump(metaclass=MessagePumpMeta):
if self._closed or self._closing:
return
self._closing = True
for timer in self._timers:
await timer.stop()
self._timers.clear()
await self._message_queue.put(MessagePriority(None))
cancel_tasks = list(self._child_tasks)
for task in cancel_tasks:
task.cancel()
for task in cancel_tasks:
await task
self._child_tasks.clear()
if self._task is not None and asyncio.current_task() != self._task:
# Ensure everything is closed before returning
await self._task
@@ -265,11 +269,13 @@ class MessagePump(metaclass=MessagePumpMeta):
async def process_messages(self) -> None:
self._running = True
try:
return await self._process_messages()
await self._process_messages()
except CancelledError:
pass
finally:
self._running = False
for timer in self._timers:
await timer.stop()
async def _process_messages(self) -> None:
"""Process messages until the queue is closed."""