Reset all DirectoryTree worker changes

After deciding
https://github.com/Textualize/textual/pull/2545#issuecomment-1547544057 it
makes more sense to roll back to the state of `main` than to try and get to
where I want to be from where we've decided we didn't want to be.

Can't get there from here, so let's go rogue-like on this PR...
This commit is contained in:
Dave Pearson
2023-05-16 15:29:36 +01:00
parent 804d85a2c9
commit 926c0a2b4f

View File

@@ -2,18 +2,14 @@ from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from queue import Empty, Queue
from typing import ClassVar, Iterable, Iterator
from typing import ClassVar, Iterable
from rich.style import Style
from rich.text import Text, TextType
from typing_extensions import Final
from .. import work
from ..events import Mount
from ..message import Message
from ..reactive import var
from ..worker import get_current_worker
from ._tree import TOGGLE_STYLE, Tree, TreeNode
@@ -128,21 +124,12 @@ class DirectoryTree(Tree[DirEntry]):
classes=classes,
disabled=disabled,
)
self._waiting_load_jobs: Queue[TreeNode[DirEntry]] = Queue()
self._running_load_jobs: set[int] = set()
self.path = path
def reload(self) -> None:
"""Reload the `DirectoryTree` contents."""
# We're about to nuke the whole tree and start over, so we don't
# want any dangling load jobs. Before we do anything else, ensure
# they're all marked as cancelled and that the queue of pending jobs
# has been emptied.
self._cancel_all_jobs()
# That out of the way, we can reset the tree and start loading the
# root's content.
self.reset(str(self.path), DirEntry(Path(self.path)))
self._add_load_job(self.root)
self._load_directory(self.root)
def validate_path(self, path: str | Path) -> Path:
"""Ensure that the path is of the `Path` type.
@@ -242,30 +229,18 @@ class DirectoryTree(Tree[DirEntry]):
"""
return paths
def _directory_content(self, directory: Path) -> Iterator[Path]:
"""Get the entries within a given directory.
def _load_directory(self, node: TreeNode[DirEntry]) -> None:
"""Load the directory contents for a given node.
Args:
directory: The directory to get the content of.
Returns:
An iterator of `Path` objects.
"""
worker = get_current_worker()
for entry in directory.iterdir():
if worker.is_cancelled:
return
yield entry
def _populate_node(
self, node: TreeNode[DirEntry], directory: Iterable[Path]
) -> None:
"""Populate the given node with the contents of a directory.
Args:
node: The node to populate.
directory: The directory contents to populate it with.
node: The node to load the directory contents for.
"""
assert node.data is not None
node.data.loaded = True
directory = sorted(
self.filter_paths(node.data.path.iterdir()),
key=lambda path: (not path.is_dir(), path.name.lower()),
)
for path in directory:
node.add(
path.name,
@@ -274,112 +249,8 @@ class DirectoryTree(Tree[DirEntry]):
)
node.expand()
@dataclass
class _LoadFinished(Message):
"""Internal message to mark when a load of a node is finished."""
node: TreeNode[DirEntry]
"""The node that has finished loading."""
@work
def _load_directory(self, node: TreeNode[DirEntry]) -> None:
"""Load the directory contents for a given node.
Args:
node: The node to load the directory contents for.
"""
# We should not ever be asked to load a directory for a node that
# has no directory information.
assert node.data is not None
# Mark the node as loaded; we do this as soon as possible.
node.data.loaded = True
# From now on we get the directory content and populate the node in
# simple steps, checking that the worker hasn't been cancelled at
# every step of the way. We /could/ just run to the end, but we
# might as well drop out of here as soon as we can tell we've been
# asked to stop.
worker = get_current_worker()
# Load up the content of the directory.
content = self.filter_paths(self._directory_content(node.data.path))
if worker.is_cancelled:
return
# We're still going, sort the content, case-insensitive, placing
# directory entries up front.
content = sorted(
content,
key=lambda path: (not path.is_dir(), path.name.lower()),
)
if worker.is_cancelled:
return
# We have directory content, it's filtered, it's sorted, we're still
# working, so now let's update the actual node in the tree.
self.app.call_from_thread(self._populate_node, node, content)
# Finally, if we're 100% sure we've not been cancelled, post a
# message to say the load has finished. Our caller should not be
# told we finished fine if they've cancelled us.
if not worker.is_cancelled:
self.post_message(self._LoadFinished(node))
_MAX_CONCURRENT_JOBS: Final[int] = 5
"""The maximum number of load jobs to run at the same time."""
def _cancel_all_jobs(self) -> None:
"""Cancel all running load jobs."""
self._waiting_load_jobs = Queue()
self._running_load_jobs = set()
# TODO: Check if there's an Textual-API-way to say "get all workers
# in this DOM node", or "cancel all of the works I made", or
# something. This seems fine, but I want to be 100% sure.
for job in (worker for worker in self.app.workers if worker.node == self):
job.cancel()
def _process_load_jobs(self) -> None:
"""Process the incoming load request queue."""
# While we still have spare capacity...
while len(self._running_load_jobs) <= self._MAX_CONCURRENT_JOBS:
try:
# ...pull a load job off the queue.
new_job = self._waiting_load_jobs.get(block=False)
except Empty:
# Queue is empty; our work here is done.
return
# At this point we've got a new directory load job; add it to
# the collection of running jobs and kick off the load, but only
# if there isn't already a job for it.
if not new_job.id in self._running_load_jobs:
self._running_load_jobs.add(new_job.id)
self._load_directory(new_job)
def _on_directory_tree__load_finished(
self, event: DirectoryTree._LoadFinished
) -> None:
"""Act on a signal that a node has finished loading.
Args:
event: The event to process.
"""
event.stop()
self._running_load_jobs.remove(event.node.id)
self._process_load_jobs()
def _add_load_job(self, node: TreeNode[DirEntry]) -> None:
"""Add a directory loading job to the queue.
Args:
node: The node that needs loading.
"""
self._waiting_load_jobs.put(node)
self._process_load_jobs()
def _on_mount(self, _: Mount) -> None:
self._add_load_job(self.root)
self._load_directory(self.root)
def _on_tree_node_expanded(self, event: Tree.NodeExpanded) -> None:
event.stop()
@@ -388,7 +259,7 @@ class DirectoryTree(Tree[DirEntry]):
return
if dir_entry.path.is_dir():
if not dir_entry.loaded:
self._add_load_job(event.node)
self._load_directory(event.node)
else:
self.post_message(self.FileSelected(self, event.node, dir_entry.path))