mirror of
https://github.com/Textualize/textual.git
synced 2025-10-17 02:38:12 +03:00
signals
This commit is contained in:
@@ -1505,7 +1505,7 @@ class DOMNode(MessagePump):
|
||||
See [actions](/guide/actions#dynamic-actions) for how to use this method.
|
||||
|
||||
"""
|
||||
self.call_later(self.screen.refresh_bindings)
|
||||
self.screen.refresh_bindings()
|
||||
|
||||
async def action_toggle(self, attribute_name: str) -> None:
|
||||
"""Toggle an attribute on the node.
|
||||
|
||||
@@ -245,7 +245,7 @@ class MessagePump(metaclass=_MessagePumpMeta):
|
||||
return node
|
||||
|
||||
@property
|
||||
def _is_linked_to_app(self) -> bool:
|
||||
def is_attached(self) -> bool:
|
||||
"""Is this node linked to the app through the DOM?"""
|
||||
node: MessagePump | None = self
|
||||
|
||||
@@ -275,19 +275,6 @@ class MessagePump(metaclass=_MessagePumpMeta):
|
||||
"""
|
||||
return self.app._logger
|
||||
|
||||
@property
|
||||
def is_attached(self) -> bool:
|
||||
"""Is the node attached to the app via the DOM?"""
|
||||
from .app import App
|
||||
|
||||
node = self
|
||||
|
||||
while not isinstance(node, App):
|
||||
if node._parent is None:
|
||||
return False
|
||||
node = node._parent
|
||||
return True
|
||||
|
||||
def _attach(self, parent: MessagePump) -> None:
|
||||
"""Set the parent, and therefore attach this node to the tree.
|
||||
|
||||
|
||||
@@ -52,7 +52,12 @@ class Signal(Generic[SignalT]):
|
||||
yield "name", self._name
|
||||
yield "subscriptions", list(self._subscriptions.keys())
|
||||
|
||||
def subscribe(self, node: MessagePump, callback: SignalCallbackType) -> None:
|
||||
def subscribe(
|
||||
self,
|
||||
node: MessagePump,
|
||||
callback: SignalCallbackType,
|
||||
immediate: bool = False,
|
||||
) -> None:
|
||||
"""Subscribe a node to this signal.
|
||||
|
||||
When the signal is published, the callback will be invoked.
|
||||
@@ -60,17 +65,29 @@ class Signal(Generic[SignalT]):
|
||||
Args:
|
||||
node: Node to subscribe.
|
||||
callback: A callback function which takes a single argument and returns anything (return type ignored).
|
||||
immediate: Invoke the callback immediately on publish if `True`, otherwise post it to the DOM node.
|
||||
|
||||
Raises:
|
||||
SignalError: Raised when subscribing a non-mounted widget.
|
||||
"""
|
||||
if immediate:
|
||||
|
||||
def signal_callback(data: object):
|
||||
"""Invoke the callback immediately."""
|
||||
callback(data)
|
||||
|
||||
else:
|
||||
|
||||
def signal_callback(data: object):
|
||||
"""Post the callback to the node, to call at the next opertunity."""
|
||||
node.call_next(callback, data)
|
||||
|
||||
if not node.is_running:
|
||||
raise SignalError(
|
||||
f"Node must be running to subscribe to a signal (has {node} been mounted)?"
|
||||
)
|
||||
callbacks = self._subscriptions.setdefault(node, [])
|
||||
if callback not in callbacks:
|
||||
callbacks.append(callback)
|
||||
callbacks.append(signal_callback)
|
||||
|
||||
def unsubscribe(self, node: MessagePump) -> None:
|
||||
"""Unsubscribe a node from this signal.
|
||||
|
||||
@@ -934,7 +934,7 @@ class Widget(DOMNode):
|
||||
Only one of ``before`` or ``after`` can be provided. If both are
|
||||
provided a ``MountError`` will be raised.
|
||||
"""
|
||||
if not self._is_linked_to_app:
|
||||
if not self.is_attached:
|
||||
raise MountError(f"Can't mount widget(s) before {self!r} is mounted")
|
||||
# Check for duplicate IDs in the incoming widgets
|
||||
ids_to_mount = [widget.id for widget in widgets if widget.id is not None]
|
||||
@@ -1126,7 +1126,7 @@ class Widget(DOMNode):
|
||||
if self._parent is not None:
|
||||
async with self.batch():
|
||||
await self.query("*").exclude(".-textual-system").remove()
|
||||
if self._is_linked_to_app:
|
||||
if self.is_attached:
|
||||
await self.mount_all(compose(self))
|
||||
|
||||
def _post_register(self, app: App) -> None:
|
||||
|
||||
@@ -48,8 +48,8 @@ async def test_suspend_supported(capfd: pytest.CaptureFixture[str]) -> None:
|
||||
calls.add("resume signal")
|
||||
|
||||
def on_mount(self) -> None:
|
||||
self.app_suspend_signal.subscribe(self, self.on_suspend)
|
||||
self.app_resume_signal.subscribe(self, self.on_resume)
|
||||
self.app_suspend_signal.subscribe(self, self.on_suspend, immediate=True)
|
||||
self.app_resume_signal.subscribe(self, self.on_resume, immediate=True)
|
||||
|
||||
async with SuspendApp(driver_class=HeadlessSuspendDriver).run_test(
|
||||
headless=False
|
||||
|
||||
Reference in New Issue
Block a user