Make the main load worker into a asyncio task

Turns out, there's a maximum number of threads you can have going in the
underlying pool, that's tied to the number of CPUs. As such, there was a
limit on how many directory trees you could have up and running before it
would start to block all sorts of operations in the surrounding
application (in Parallels on macOS, with the Windows VM appearing to have
just the one CPU, it would give up after 8 directory trees).

So here we move to a slightly different approach: have the main loader still
run "forever", but be an async task; it then in turn farms the loading out
to threads which close once the loading is done.

So far tested on macOS and behaves as expected. Next to test on Windows.
This commit is contained in:
Dave Pearson
2023-05-17 11:34:05 +01:00
parent 64d9c60267
commit 82924c2d7c

View File

@@ -1,8 +1,8 @@
from __future__ import annotations from __future__ import annotations
from asyncio import Queue, QueueEmpty
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from queue import Empty, Queue
from typing import ClassVar, Iterable, Iterator from typing import ClassVar, Iterable, Iterator
from rich.style import Style from rich.style import Style
@@ -239,9 +239,6 @@ class DirectoryTree(Tree[DirEntry]):
""" """
return paths return paths
def _tlog(self, message: str) -> None:
self.app.call_from_thread(self.log.debug, f"{self.id} - {message}")
def _populate_node(self, node: TreeNode[DirEntry], content: Iterable[Path]) -> None: def _populate_node(self, node: TreeNode[DirEntry], content: Iterable[Path]) -> None:
"""Populate the given tree node with the given directory content. """Populate the given tree node with the given directory content.
@@ -271,9 +268,9 @@ class DirectoryTree(Tree[DirEntry]):
if worker.is_cancelled: if worker.is_cancelled:
break break
yield entry yield entry
self._tlog(f"Loaded entry {entry} from {location}")
def _load_directory(self, node: TreeNode[DirEntry], worker: Worker) -> None: @work
def _load_directory(self, node: TreeNode[DirEntry]) -> None:
"""Load the directory contents for a given node. """Load the directory contents for a given node.
Args: Args:
@@ -281,6 +278,7 @@ class DirectoryTree(Tree[DirEntry]):
""" """
assert node.data is not None assert node.data is not None
node.data.loaded = True node.data.loaded = True
worker = get_current_worker()
self.app.call_from_thread( self.app.call_from_thread(
self._populate_node, self._populate_node,
node, node,
@@ -290,21 +288,14 @@ class DirectoryTree(Tree[DirEntry]):
), ),
) )
_LOADER_INTERVAL: Final[float] = 0.2
"""How long the loader should block while waiting for queue content."""
@work(exclusive=True) @work(exclusive=True)
def _loader(self) -> None: async def _loader(self) -> None:
"""Background loading queue processor.""" """Background loading queue processor."""
self._tlog("_loader started")
worker = get_current_worker() worker = get_current_worker()
while not worker.is_cancelled: while not worker.is_cancelled:
try: try:
next_node = self._to_load.get(timeout=self._LOADER_INTERVAL) self._load_directory(await self._to_load.get())
self._tlog(f"Received {next_node} for loading") except QueueEmpty:
self._load_directory(next_node, worker)
self._tlog(f"Loaded {next_node}")
except Empty:
pass pass
def _on_tree_node_expanded(self, event: Tree.NodeExpanded) -> None: def _on_tree_node_expanded(self, event: Tree.NodeExpanded) -> None: