WiP: Throttle back the number of concurrent loads of a DirectoryTree

Having got the initial version of background loading of nodes in the
directory tree working, this moves to a gentler approach where only so many
loads run at once, and a queue of jobs that need to be completed is kept.

This is an end-of-coding-session WiP commit; there's more to come on this.
But at the moment I'm happy with the way it's going.
This commit is contained in:
Dave Pearson
2023-05-10 16:26:36 +01:00
parent cd05d6cad6
commit 39971876d0

View File

@@ -2,10 +2,12 @@ from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from queue import Empty, Queue
from typing import ClassVar, Iterable, Iterator
from rich.style import Style
from rich.text import Text, TextType
from typing_extensions import Final
from .. import work
from ..events import Mount
@@ -126,6 +128,8 @@ class DirectoryTree(Tree[DirEntry]):
disabled=disabled,
)
self.path = path
self._waiting_load_jobs: Queue[TreeNode[DirEntry]] = Queue()
self._running_load_jobs: set[int] = set()
def reload(self) -> None:
"""Reload the `DirectoryTree` contents."""
@@ -267,6 +271,13 @@ class DirectoryTree(Tree[DirEntry]):
)
node.expand()
@dataclass
class _LoadFinished(Message):
"""Internal message to mark when a load of a node is finished."""
node: TreeNode[DirEntry]
"""The node that has finished loading."""
@work
def _load_directory(self, node: TreeNode[DirEntry]) -> None:
"""Load the directory contents for a given node.
@@ -276,6 +287,7 @@ class DirectoryTree(Tree[DirEntry]):
"""
assert node.data is not None
node.data.loaded = True
# TODO: Perhaps move this out of here and...
self.app.call_from_thread(
self._populate_node,
node,
@@ -284,9 +296,60 @@ class DirectoryTree(Tree[DirEntry]):
key=lambda path: (not path.is_dir(), path.name.lower()),
),
)
# TODO: ...attach it to this and have the receiver update the tree?
self.post_message(self._LoadFinished(node))
_MAX_CONCURRENT_JOBS: Final[int] = 5
"""The maximum number of load jobs to run at the same time."""
# TODO: Remove debug logging before going to an actual PR.
def _process_load_jobs(self) -> None:
"""Process the incoming load request queue."""
# While we still have spare capacity...
self.log.debug(
f"LOAD QUEUE: Running job count is {len(self._running_load_jobs)}"
)
while len(self._running_load_jobs) <= self._MAX_CONCURRENT_JOBS:
try:
# ...pull a load job off the queue.
new_job = self._waiting_load_jobs.get(block=False)
except Empty:
# Queue is empty; our work here is done.
self.log.debug("LOAD QUEUE: New job queue is empty")
return
# We've got a new directory load job, add it to the collection
# of running jobs and kick off the load.
self.log.debug(f"LOAD QUEUE: Staring a new job running {new_job.id}")
self._running_load_jobs.add(new_job.id)
self._load_directory(new_job)
self.log.debug(f"LOAD QUEUE: Running job queue is full")
def _on_directory_tree__load_finished(
self, event: DirectoryTree._LoadFinished
) -> None:
"""Act on a signal that a node has finished loading.
Args:
event: The event to process.
"""
event.stop()
self.log.debug(f"LOAD QUEUE: Done {event.node.id} - REMOVING")
self._running_load_jobs.remove(event.node.id)
self._process_load_jobs()
def _add_load_job(self, node: TreeNode[DirEntry]) -> None:
"""Add a directory loading job to the queue.
Args:
node: The node that needs loading.
"""
self.log.debug(f"LOAD QUEUE: New {node.id} - ADDING")
self._waiting_load_jobs.put(node)
self._process_load_jobs()
def _on_mount(self, _: Mount) -> None:
self._load_directory(self.root)
self._add_load_job(self.root)
def _on_tree_node_expanded(self, event: Tree.NodeExpanded) -> None:
event.stop()
@@ -295,7 +358,7 @@ class DirectoryTree(Tree[DirEntry]):
return
if dir_entry.path.is_dir():
if not dir_entry.loaded:
self._load_directory(event.node)
self._add_load_job(event.node)
else:
self.post_message(self.FileSelected(self, event.node, dir_entry.path))