simplify idle events, add batching

This commit is contained in:
Will McGugan
2021-06-06 12:09:51 +01:00
parent 91b7efa4a9
commit d9fe76ac5a
5 changed files with 106 additions and 31 deletions

View File

@@ -1,6 +1,6 @@
from typing import Optional, NamedTuple, Set, Type, TYPE_CHECKING
import asyncio
from asyncio import PriorityQueue
from asyncio import PriorityQueue, QueueEmpty
import logging
@@ -56,6 +56,7 @@ class MessagePump:
self._closing: bool = False
self._closed: bool = False
self._disabled_messages: Set[Type[Message]] = set()
self._pending_message: Optional[MessageQueueItem] = None
def check_message_enabled(self, message: Message) -> bool:
return type(message) not in self._disabled_messages
@@ -74,6 +75,11 @@ class MessagePump:
Returns:
Optional[Event]: Event object or None.
"""
if self._pending_message is not None:
try:
return self._pending_message
finally:
self._pending_message = None
if self._closed:
raise MessagePumpClosed("The message pump is closed")
queue_item = await self._message_queue.get()
@@ -82,6 +88,20 @@ class MessagePump:
raise MessagePumpClosed("The message pump is now closed")
return queue_item
def peek_message(self) -> Optional[MessageQueueItem]:
"""Peek the message at the head of the queue (does not remove it from the queue),
or return None if the queue is empty.
Returns:
Optional[Message]: The message or None.
"""
if self._pending_message is None:
self._pending_message = self._message_queue.get_nowait()
if self._pending_message is not None:
return self._pending_message
return None
def set_timer(
self,
delay: float,
@@ -121,11 +141,20 @@ class MessagePump:
except Exception:
log.exception("error getting message")
break
# Combine any pending messages that may supersede this one
while True:
pending = self.peek_message()
if pending is None or not message.can_batch(pending.message):
break
priority, message = pending
try:
await self.dispatch_message(message, priority)
finally:
if self._message_queue.empty():
await self.dispatch_message(events.Idle(self))
idle_handler = getattr(self, "on_idle", None)
if idle_handler is not None:
await idle_handler(events.Idle(self))
async def dispatch_message(
self, message: Message, priority: int = 0
@@ -141,7 +170,7 @@ class MessagePump:
dispatch_function: MessageHandler = getattr(self, method_name, None)
if dispatch_function is not None:
await dispatch_function(event)
if event.bubble and self._parent:
if event.bubble and self._parent and not event._stop_propagaton:
if event.sender == self._parent:
log.debug("bubbled event abandoned; %r", event)
else: