diff --git a/src/textual/widgets/_directory_tree.py b/src/textual/widgets/_directory_tree.py index 9ba82d6df..fb6d7c5c5 100644 --- a/src/textual/widgets/_directory_tree.py +++ b/src/textual/widgets/_directory_tree.py @@ -2,18 +2,14 @@ from __future__ import annotations from dataclasses import dataclass from pathlib import Path -from queue import Empty, Queue -from typing import ClassVar, Iterable, Iterator +from typing import ClassVar, Iterable from rich.style import Style from rich.text import Text, TextType -from typing_extensions import Final -from .. import work from ..events import Mount from ..message import Message from ..reactive import var -from ..worker import get_current_worker from ._tree import TOGGLE_STYLE, Tree, TreeNode @@ -128,21 +124,12 @@ class DirectoryTree(Tree[DirEntry]): classes=classes, disabled=disabled, ) - self._waiting_load_jobs: Queue[TreeNode[DirEntry]] = Queue() - self._running_load_jobs: set[int] = set() self.path = path def reload(self) -> None: """Reload the `DirectoryTree` contents.""" - # We're about to nuke the whole tree and start over, so we don't - # want any dangling load jobs. Before we do anything else, ensure - # they're all marked as cancelled and that the queue of pending jobs - # has been emptied. - self._cancel_all_jobs() - # That out of the way, we can reset the tree and start loading the - # root's content. self.reset(str(self.path), DirEntry(Path(self.path))) - self._add_load_job(self.root) + self._load_directory(self.root) def validate_path(self, path: str | Path) -> Path: """Ensure that the path is of the `Path` type. @@ -242,30 +229,18 @@ class DirectoryTree(Tree[DirEntry]): """ return paths - def _directory_content(self, directory: Path) -> Iterator[Path]: - """Get the entries within a given directory. + def _load_directory(self, node: TreeNode[DirEntry]) -> None: + """Load the directory contents for a given node. Args: - directory: The directory to get the content of. - - Returns: - An iterator of `Path` objects. - """ - worker = get_current_worker() - for entry in directory.iterdir(): - if worker.is_cancelled: - return - yield entry - - def _populate_node( - self, node: TreeNode[DirEntry], directory: Iterable[Path] - ) -> None: - """Populate the given node with the contents of a directory. - - Args: - node: The node to populate. - directory: The directory contents to populate it with. + node: The node to load the directory contents for. """ + assert node.data is not None + node.data.loaded = True + directory = sorted( + self.filter_paths(node.data.path.iterdir()), + key=lambda path: (not path.is_dir(), path.name.lower()), + ) for path in directory: node.add( path.name, @@ -274,112 +249,8 @@ class DirectoryTree(Tree[DirEntry]): ) node.expand() - @dataclass - class _LoadFinished(Message): - """Internal message to mark when a load of a node is finished.""" - - node: TreeNode[DirEntry] - """The node that has finished loading.""" - - @work - def _load_directory(self, node: TreeNode[DirEntry]) -> None: - """Load the directory contents for a given node. - - Args: - node: The node to load the directory contents for. - """ - - # We should not ever be asked to load a directory for a node that - # has no directory information. - assert node.data is not None - - # Mark the node as loaded; we do this as soon as possible. - node.data.loaded = True - - # From now on we get the directory content and populate the node in - # simple steps, checking that the worker hasn't been cancelled at - # every step of the way. We /could/ just run to the end, but we - # might as well drop out of here as soon as we can tell we've been - # asked to stop. - worker = get_current_worker() - - # Load up the content of the directory. - content = self.filter_paths(self._directory_content(node.data.path)) - if worker.is_cancelled: - return - - # We're still going, sort the content, case-insensitive, placing - # directory entries up front. - content = sorted( - content, - key=lambda path: (not path.is_dir(), path.name.lower()), - ) - if worker.is_cancelled: - return - - # We have directory content, it's filtered, it's sorted, we're still - # working, so now let's update the actual node in the tree. - self.app.call_from_thread(self._populate_node, node, content) - - # Finally, if we're 100% sure we've not been cancelled, post a - # message to say the load has finished. Our caller should not be - # told we finished fine if they've cancelled us. - if not worker.is_cancelled: - self.post_message(self._LoadFinished(node)) - - _MAX_CONCURRENT_JOBS: Final[int] = 5 - """The maximum number of load jobs to run at the same time.""" - - def _cancel_all_jobs(self) -> None: - """Cancel all running load jobs.""" - self._waiting_load_jobs = Queue() - self._running_load_jobs = set() - # TODO: Check if there's an Textual-API-way to say "get all workers - # in this DOM node", or "cancel all of the works I made", or - # something. This seems fine, but I want to be 100% sure. - for job in (worker for worker in self.app.workers if worker.node == self): - job.cancel() - - def _process_load_jobs(self) -> None: - """Process the incoming load request queue.""" - # While we still have spare capacity... - while len(self._running_load_jobs) <= self._MAX_CONCURRENT_JOBS: - try: - # ...pull a load job off the queue. - new_job = self._waiting_load_jobs.get(block=False) - except Empty: - # Queue is empty; our work here is done. - return - # At this point we've got a new directory load job; add it to - # the collection of running jobs and kick off the load, but only - # if there isn't already a job for it. - if not new_job.id in self._running_load_jobs: - self._running_load_jobs.add(new_job.id) - self._load_directory(new_job) - - def _on_directory_tree__load_finished( - self, event: DirectoryTree._LoadFinished - ) -> None: - """Act on a signal that a node has finished loading. - - Args: - event: The event to process. - """ - event.stop() - self._running_load_jobs.remove(event.node.id) - self._process_load_jobs() - - def _add_load_job(self, node: TreeNode[DirEntry]) -> None: - """Add a directory loading job to the queue. - - Args: - node: The node that needs loading. - """ - self._waiting_load_jobs.put(node) - self._process_load_jobs() - def _on_mount(self, _: Mount) -> None: - self._add_load_job(self.root) + self._load_directory(self.root) def _on_tree_node_expanded(self, event: Tree.NodeExpanded) -> None: event.stop() @@ -388,7 +259,7 @@ class DirectoryTree(Tree[DirEntry]): return if dir_entry.path.is_dir(): if not dir_entry.loaded: - self._add_load_job(event.node) + self._load_directory(event.node) else: self.post_message(self.FileSelected(self, event.node, dir_entry.path))