fix shutdown

This commit is contained in:
Will McGugan
2021-06-20 13:41:58 +01:00
parent 0dd46641e4
commit 442d270725
4 changed files with 20 additions and 82 deletions

View File

@@ -28,7 +28,6 @@ class MessagePump:
self._pending_message: Message | None = None
self._task: Task | None = None
self._child_tasks: set[Task] = set()
self._queue_empty_event = Event()
@property
def task(self) -> Task:
@@ -111,34 +110,16 @@ class MessagePump:
asyncio.get_event_loop().create_task(timer.run())
return timer
async def stop_messages(self) -> None:
if not self._closing:
await self.post_message(events.NoneEvent(self))
self._closing = True
return
if not (self._closing or self._closed):
self._queue_empty_event.clear()
await self.post_message(events.NoneEvent(self))
self._closing = True
await self._queue_empty_event.wait()
self._queue_empty_event.clear()
async def close_messages(self, wait: bool = True) -> None:
"""Close message queue, and optionally wait for queue to finish processing."""
if self._closed:
return
log.debug("close_messages %r wait=%r", self, wait)
self._closing = True
log.debug("close 1 %r", self)
await self._message_queue.put(None)
for task in self._child_tasks:
task.cancel()
log.debug("close 2 %r", self)
await self._message_queue.put(None)
log.debug("close 3 %r", self)
if wait and self._task is not None:
await self._task
self._task = None
log.debug("close 4 %r", self)
def start_messages(self) -> None:
self._task = asyncio.create_task(self.process_messages())
@@ -149,7 +130,6 @@ class MessagePump:
try:
message = await self.get_message()
except MessagePumpClosed:
log.debug("CLOSED %r", self)
break
except Exception as error:
log.exception("error in get_message()")
@@ -157,7 +137,7 @@ class MessagePump:
log.debug("%r -> %r", message, self)
# Combine any pending messages that may supersede this one
while True:
while not (self._closed or self._closing):
pending = self.peek_message()
if pending is None or not message.can_batch(pending):
break
@@ -172,21 +152,14 @@ class MessagePump:
log.exception("error in dispatch_message")
raise
finally:
log.debug("a")
if self._message_queue.empty():
log.debug("b")
self._queue_empty_event.set()
if not self._closed:
idle_handler = getattr(self, "on_idle", None)
log.debug("c %r", idle_handler)
if idle_handler is not None and not self._closed:
log.debug("d")
await idle_handler(events.Idle(self))
log.debug("e")
self._queue_empty_event.set()
log.debug("CLOSED %r", self)
async def dispatch_message(self, message: Message) -> bool | None:
log.debug("dispatch_message %r", message)
if isinstance(message, events.Event):
await self.on_event(message)
else:
@@ -196,7 +169,6 @@ class MessagePump:
async def on_event(self, event: events.Event) -> None:
method_name = f"on_{event.name}"
dispatch_function: MessageHandler = getattr(self, method_name, None)
log.debug("dispatching to %r", dispatch_function)
if dispatch_function is not None:
await dispatch_function(event)
if event.bubble and self._parent and not event._stop_propagaton:
@@ -217,15 +189,11 @@ class MessagePump:
return True
async def post_message(self, message: Message) -> bool:
log.debug("%r post_message 1", self)
if self._closing or self._closed:
return False
log.debug("%r post_message 2", self)
if not self.check_message_enabled(message):
return True
log.debug("%r post_message 3", self)
await self._message_queue.put(message)
log.debug("%r post_message 4", self)
return True
async def post_message_from_child(self, message: Message) -> bool: