Worker API (#2182)

* worker class

* worker API tests

* tidy

* Decorator and more tests

* type fix

* error order

* more tests

* remove active message

* move worker manager to app

* cancel nodes

* typing fix

* revert change

* typing fixes and cleanup

* revert typing

* test fix

* cancel group

* Added test for worker

* comment

* workers docs

* Added exit_on_error

* changelog

* svg

* refactor test

* remove debug tweaks

* docstrings

* worker test

* fix typing in run

* fix 3.7 tests

* blog post

* fix deadlock test

* words

* words

* words

* workers docs

* blog post

* Apply suggestions from code review

Co-authored-by: Dave Pearson <davep@davep.org>

* docstring

* fix and docstring

* Apply suggestions from code review

Co-authored-by: Rodrigo Girão Serrão <5621605+rodrigogiraoserrao@users.noreply.github.com>

* Update src/textual/widgets/_markdown.py

Co-authored-by: Rodrigo Girão Serrão <5621605+rodrigogiraoserrao@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Rodrigo Girão Serrão <5621605+rodrigogiraoserrao@users.noreply.github.com>

* Update src/textual/worker.py

Co-authored-by: Rodrigo Girão Serrão <5621605+rodrigogiraoserrao@users.noreply.github.com>

* Fix black

* docstring

* merge

* changelog

---------

Co-authored-by: Dave Pearson <davep@davep.org>
Co-authored-by: Rodrigo Girão Serrão <5621605+rodrigogiraoserrao@users.noreply.github.com>
This commit is contained in:
Will McGugan
2023-04-04 13:12:51 +01:00
committed by GitHub
parent c1ef3702fd
commit b5689b1f69
31 changed files with 1813 additions and 56 deletions

View File

@@ -5,7 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
## Unreleased
## [0.18.0] - 2023-04-04
### Added
- Added Worker API https://github.com/Textualize/textual/pull/2182
### Changed
- Markdown.update is no longer a coroutine https://github.com/Textualize/textual/pull/2182
### [Fixed]
@@ -705,6 +713,7 @@ https://textual.textualize.io/blog/2022/11/08/version-040/#version-040
- New handler system for messages that doesn't require inheritance
- Improved traceback handling
[0.18.0]: https://github.com/Textualize/textual/compare/v0.17.4...v0.18.0
[0.17.3]: https://github.com/Textualize/textual/compare/v0.17.2...v0.17.3
[0.17.2]: https://github.com/Textualize/textual/compare/v0.17.1...v0.17.2
[0.17.1]: https://github.com/Textualize/textual/compare/v0.17.0...v0.17.1

1
docs/api/worker.md Normal file
View File

@@ -0,0 +1 @@
::: textual.worker

View File

@@ -0,0 +1 @@
::: textual._worker_manager

View File

@@ -0,0 +1,41 @@
---
draft: false
date: 2023-04-04
categories:
- Release
title: "Textual 0.18.0 adds API for managing concurrent workers"
authors:
- willmcgugan
---
# Textual 0.18.0 adds API for managing concurrent workers
Less than a week since the last release, and we have a new API to show you.
<!-- more -->
This release adds a new [Worker API](../../guide/workers.md) designed to manage concurrency, both asyncio tasks and threads.
An API to manage concurrency may seem like a strange addition to a library for building user interfaces, but on reflection it makes a lot of sense.
People are building Textual apps to interface with REST APIs, websockets, and processes; and they are running into predictable issues.
These aren't specifically Textual problems, but rather general problems related to async tasks and threads.
It's not enough for us to point users at the asyncio docs, we needed a better answer.
The new `run_worker` method provides an easy way of launching "Workers" (a wrapper over async tasks and threads) which also manages their lifetime.
One of the challenges I've found with tasks and threads is ensuring that they are shut down in an orderly manner. Interestingly enough, Textual already implemented an orderly shutdown procedure to close the tasks that power widgets: children are shut down before parents, all the way up to the App (the root node).
The new API piggybacks on to that existing mechanism to ensure that worker tasks are also shut down in the same order.
!!! tip
You won't need to worry about this [gnarly issue](https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/) with the new Worker API.
I'm particularly pleased with the new `@work` decorator which can turn a coroutine OR a regular function into a Textual Worker object, by scheduling it as either an asyncio task or a thread.
I suspect this will solve 90% of the concurrency issues we see with Textual apps.
See the [Worker API](../../guide/workers.md) for the details.
## Join us
If you want to talk about this update or anything else Textual related, join us on our [Discord server](https://discord.gg/Enf6Z3qhVr).

View File

@@ -0,0 +1,16 @@
Input {
dock: top;
width: 100%;
}
#weather-container {
width: 100%;
height: 1fr;
align: center middle;
overflow: auto;
}
#weather {
width: auto;
height: auto;
}

View File

@@ -0,0 +1,40 @@
import httpx
from rich.text import Text
from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.widgets import Input, Static
class WeatherApp(App):
"""App to display the current weather."""
CSS_PATH = "weather.css"
def compose(self) -> ComposeResult:
yield Input(placeholder="Enter a City")
with VerticalScroll(id="weather-container"):
yield Static(id="weather")
async def on_input_changed(self, message: Input.Changed) -> None:
"""Called when the input changes"""
await self.update_weather(message.value)
async def update_weather(self, city: str) -> None:
"""Update the weather for the given city."""
weather_widget = self.query_one("#weather", Static)
if city:
# Query the network API
url = f"https://wttr.in/{city}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
weather = Text.from_ansi(response.text)
weather_widget.update(weather)
else:
# No city, so just blank out the weather
weather_widget.update("")
if __name__ == "__main__":
app = WeatherApp()
app.run()

View File

@@ -0,0 +1,40 @@
import httpx
from rich.text import Text
from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.widgets import Input, Static
class WeatherApp(App):
"""App to display the current weather."""
CSS_PATH = "weather.css"
def compose(self) -> ComposeResult:
yield Input(placeholder="Enter a City")
with VerticalScroll(id="weather-container"):
yield Static(id="weather")
async def on_input_changed(self, message: Input.Changed) -> None:
"""Called when the input changes"""
self.run_worker(self.update_weather(message.value), exclusive=True)
async def update_weather(self, city: str) -> None:
"""Update the weather for the given city."""
weather_widget = self.query_one("#weather", Static)
if city:
# Query the network API
url = f"https://wttr.in/{city}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
weather = Text.from_ansi(response.text)
weather_widget.update(weather)
else:
# No city, so just blank out the weather
weather_widget.update("")
if __name__ == "__main__":
app = WeatherApp()
app.run()

View File

@@ -0,0 +1,42 @@
import httpx
from rich.text import Text
from textual import work
from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.widgets import Input, Static
class WeatherApp(App):
"""App to display the current weather."""
CSS_PATH = "weather.css"
def compose(self) -> ComposeResult:
yield Input(placeholder="Enter a City")
with VerticalScroll(id="weather-container"):
yield Static(id="weather")
async def on_input_changed(self, message: Input.Changed) -> None:
"""Called when the input changes"""
self.update_weather(message.value)
@work(exclusive=True)
async def update_weather(self, city: str) -> None:
"""Update the weather for the given city."""
weather_widget = self.query_one("#weather", Static)
if city:
# Query the network API
url = f"https://wttr.in/{city}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
weather = Text.from_ansi(response.text)
weather_widget.update(weather)
else:
# No city, so just blank out the weather
weather_widget.update("")
if __name__ == "__main__":
app = WeatherApp()
app.run()

View File

@@ -0,0 +1,47 @@
import httpx
from rich.text import Text
from textual import work
from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.widgets import Input, Static
from textual.worker import Worker
class WeatherApp(App):
"""App to display the current weather."""
CSS_PATH = "weather.css"
def compose(self) -> ComposeResult:
yield Input(placeholder="Enter a City")
with VerticalScroll(id="weather-container"):
yield Static(id="weather")
async def on_input_changed(self, message: Input.Changed) -> None:
"""Called when the input changes"""
self.update_weather(message.value)
@work(exclusive=True)
async def update_weather(self, city: str) -> None:
"""Update the weather for the given city."""
weather_widget = self.query_one("#weather", Static)
if city:
# Query the network API
url = f"https://wttr.in/{city}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
weather = Text.from_ansi(response.text)
weather_widget.update(weather)
else:
# No city, so just blank out the weather
weather_widget.update("")
def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
"""Called when the worker state changes."""
self.log(event)
if __name__ == "__main__":
app = WeatherApp()
app.run()

View File

@@ -0,0 +1,52 @@
from urllib.request import Request, urlopen
from rich.text import Text
from textual import work
from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.widgets import Input, Static
from textual.worker import Worker, get_current_worker
class WeatherApp(App):
"""App to display the current weather."""
CSS_PATH = "weather.css"
def compose(self) -> ComposeResult:
yield Input(placeholder="Enter a City")
with VerticalScroll(id="weather-container"):
yield Static(id="weather")
async def on_input_changed(self, message: Input.Changed) -> None:
"""Called when the input changes"""
self.update_weather(message.value)
@work(exclusive=True)
def update_weather(self, city: str) -> None:
"""Update the weather for the given city."""
weather_widget = self.query_one("#weather", Static)
worker = get_current_worker()
if city:
# Query the network API
url = f"https://wttr.in/{city}"
request = Request(url)
request.add_header("User-agent", "CURL")
response_text = urlopen(request).read().decode("utf-8")
weather = Text.from_ansi(response_text)
if not worker.is_cancelled:
self.call_from_thread(weather_widget.update, weather)
else:
# No city, so just blank out the weather
if not worker.is_cancelled:
self.call_from_thread(weather_widget.update, "")
def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
"""Called when the worker state changes."""
self.log(event)
if __name__ == "__main__":
app = WeatherApp()
app.run()

171
docs/guide/workers.md Normal file
View File

@@ -0,0 +1,171 @@
# Workers
In this chapter we will explore the topic of *concurrency* and how to use Textual's Worker API to make it easier.
!!! tip "The Worker API was added in version 0.18.0"
## Concurrency
There are many interesting uses for Textual which require reading data from an internet service.
When an app requests data from the network it is important that it doesn't prevent the user interface from updating.
In other words, the requests should be concurrent (happen at the same time) as the UI updates.
Managing this concurrency is a tricky topic, in any language or framework.
Even for experienced developers, there are gotchas which could make your app lock up or behave oddly.
Textual's Worker API makes concurrency far less error prone and easier to reason about.
## Workers
Before we go into detail, let's see an example that demonstrates a common pitfall for apps that make network requests.
The following app uses [httpx](https://www.python-httpx.org/) to get the current weather for any given city, by making a request to [wttr.in](https://wttr.in/).
=== "weather01.py"
```python title="weather01.py"
--8<-- "docs/examples/guide/workers/weather01.py"
```
=== "weather.css"
```sass title="weather.css"
--8<-- "docs/examples/guide/workers/weather.css"
```
=== "Output"
```{.textual path="docs/examples/guide/workers/weather01.py"}
```
If you were to run this app, you should see weather information update as you type.
But you may find that the input is not as responsive as usual, with a noticeable delay between pressing a key and seeing it echoed in screen.
This is because we are making a request to the weather API within a message handler, and the app will not be able to process other messages until the request has completed (which may be anything from a few hundred milliseconds to several seconds later).
To resolve this we can use the [run_worker][textual.dom.DOMNode.run_worker] method which runs the `update_weather` coroutine (`async def` function) in the background. Here's the code:
```python title="weather02.py" hl_lines="21"
--8<-- "docs/examples/guide/workers/weather02.py"
```
This one line change will make typing as responsive as you would expect from any app.
The `run_worker` method schedules a new *worker* to run `update_weather`, and returns a [Worker](textual.worker.Worker) object. This happens almost immediately, so it won't prevent other messages from being processed. The `update_weather` function is now running concurrently, and will finish a second or two later.
!!! tip
The [Worker][textual.worker.Worker] object has a few useful methods on it, but you can often ignore it as we did in `weather02.py`.
The call to `run_worker` also sets `exclusive=True` which solves an additional problem with concurrent network requests: when pulling data from the network, there is no guarantee that you will receive the responses in the same order as the requests.
For instance, if you start typing "Paris", you may get the response for "Pari" *after* the response for "Paris", which could show the wrong weather information.
The `exclusive` flag tells Textual to cancel all previous workers before starting the new one.
### Work decorator
An alternative to calling `run_worker` manually is the [work][textual.work] decorator, which automatically generates a worker from the decorated method.
Let's use this decorator in our weather app:
```python title="weather03.py" hl_lines="4 22 24"
--8<-- "docs/examples/guide/workers/weather03.py"
```
The addition of `@work(exclusive=True)` converts the `update_weather` coroutine into a regular function which when called will create and start a worker.
Note that even though `update_weather` is an `async def` function, the decorator means that we don't need to use the `await` keyword when calling it.
!!! tip
The decorator takes the same arguments as `run_worker`.
### Worker return values
When you run a worker, the return value of the function won't be available until the work has completed.
You can check the return value of a worker with the `worker.result` attribute which will initially be `None`, but will be replaced with the return value of the function when it completes.
If you need the return value you can call [worker.wait][textual.worker.Worker.wait] which is a coroutine that will wait for the work to complete.
But note that if you do this in a message handler it will also prevent the widget from updating until the worker returns.
Often a better approach is to handle [worker events](#worker-events) which will notify your app when a worker completes, and the return value is available without waiting.
### Cancelling workers
You can cancel a worker at any time before it is finished by calling [Worker.cancel][textual.worker.Worker.cancel].
This will raise a [CancelledError][asyncio.CancelledError] within the coroutine, and should cause it to exit prematurely.
### Worker errors
The default behavior when a worker encounters an exception is to exit the app and display the traceback in the terminal.
You can also create workers which will *not* immediately exit on exception, by setting `exit_on_error=False` on the call to `run_worker` or the `@work` decorator.
### Worker lifetime
Workers are managed by a single [WorkerManager][textual._worker_manager.WorkerManager] instance, which you can access via `app.workers`.
This is a container-like object which you iterate over to see your active workers.
Workers are tied to the DOM node (widget, screen, or app) where they are created.
This means that if you remove the widget or pop the screen where they are created, then the tasks will be cleaned up automatically.
Similarly if you exit the app, any running tasks will be cancelled.
Worker objects have a `state` attribute which will contain a [WorkerState][textual.worker.WorkerState] enumeration that indicates what the worker is doing at any given time.
The `state` attribute will contain one of the following values:
| Value | Description |
| --------- | ----------------------------------------------------------------------------------- |
| PENDING | The worker was created, but not yet started. |
| RUNNING | The worker is currently running. |
| CANCELLED | The worker was cancelled and is not longer running. |
| ERROR | The worker raised an exception, and `worker.error` will contain the exception. |
| SUCCESS | The worker completed successful, and `worker.result` will contain the return value. |
Wokers start with a `PENDING` state, then go to `RUNNING`. From there, they will go to `CANCELLED`, `ERROR` or `SUCCESS`.
<div class="excalidraw">
--8<-- "docs/images/workers/lifetime.excalidraw.svg"
</div>
### Worker events
When a worker changes state, it sends a [Worker.StateChanged][textual.worker.Worker.StateChanged] event to the widget where the worker was created.
You can handle this message by defining an `on_worker_state_changed` event handler.
For instance, here is how we might log the state of the worker that updates the weather:
```python title="weather04.py" hl_lines="4 40-42"
--8<-- "docs/examples/guide/workers/weather04.py"
```
If you run the above code with `textual` you should see the worker lifetime events logged in the Textual [console](./devtools.md#console).
```
textual run weather04.py --dev
```
### Thread workers
In previous examples we used `run_worker` or the `work` decorator in conjunction with coroutines.
This works well if you are using an async API like `httpx`, but if your API doesn't support async you may need to use *threads*.
!!! info "What are threads?"
Threads are a form of concurrency supplied by your Operating System. Threads allow your code to run more than a single function simultaneously.
You can create threads by applying `run_worker` or the `work` decorator to a plain (non async) method or function.
The API for thread workers is identical to async workers, but there are a few differences you need to be aware of when writing threaded code.
The first difference is that you should avoid calling methods on your UI directly, or setting reactive variables.
You can work around this with the [App.call_from_thread][textual.app.App.call_from_thread] method which schedules a call in the main thread.
The second difference is that you can't cancel threads in the same way as coroutines, but you *can* manually check if the worker was cancelled.
Let's demonstrate thread workers by replacing `httpx` with `urllib.request` (in the standard library). The `urllib` module is not async aware, so we will need to use threads:
```python title="weather05.py" hl_lines="1 26-43"
--8<-- "docs/examples/guide/workers/weather05.py"
```
The `update_weather` function doesn't have the `async` keyword, so the `@work` decorator will create a thread worker.
Note the use of [get_current_worker][textual.worker.get_current_worker] which the function uses to check if it has been cancelled or not.
#### Posting messages
Most Textual functions are not thread-safe which means you will need to use `call_from_thread` to run them from a thread worker.
An exception would be [post_message][textual.widget.Widget.post_message] which *is* thread-safe.
If your worker needs to make multiple updates to the UI, it is a good idea to send [custom messages](./events.md) and let the message handler update the state of the UI.

242
docs/images/weather.svg Normal file

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 102 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 39 KiB

View File

@@ -1,13 +1,12 @@
from __future__ import annotations
import asyncio
try:
import httpx
except ImportError:
raise ImportError("Please install httpx with 'pip install httpx' ")
from textual import work
from textual.app import App, ComposeResult
from textual.containers import Content
from textual.widgets import Input, Markdown
@@ -31,26 +30,26 @@ class DictionaryApp(App):
async def on_input_changed(self, message: Input.Changed) -> None:
"""A coroutine to handle a text changed message."""
if message.value:
# Look up the word in the background
asyncio.create_task(self.lookup_word(message.value))
self.lookup_word(message.value)
else:
# Clear the results
await self.query_one("#results", Markdown).update("")
self.query_one("#results", Markdown).update("")
@work(exclusive=True)
async def lookup_word(self, word: str) -> None:
"""Looks up a word."""
url = f"https://api.dictionaryapi.dev/api/v2/entries/en/{word}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
try:
results = response.json()
except Exception:
self.query_one("#results", Static).update(response.text)
return
self.query_one("#results", Markdown).update(response.text)
if word == self.query_one(Input).value:
markdown = self.make_word_markdown(results)
await self.query_one("#results", Markdown).update(markdown)
self.query_one("#results", Markdown).update(markdown)
def make_word_markdown(self, results: object) -> str:
"""Convert the results in to markdown."""

View File

@@ -20,6 +20,7 @@ nav:
- "guide/widgets.md"
- "guide/animation.md"
- "guide/screens.md"
- "guide/workers.md"
- "widget_gallery.md"
- Reference:
- "reference/index.md"
@@ -196,6 +197,8 @@ nav:
- "api/walk.md"
- "api/welcome.md"
- "api/widget.md"
- "api/worker.md"
- "api/worker_manager.md"
- "roadmap.md"
- "Blog":
- blog/index.md

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "textual"
version = "0.17.3"
version = "0.18.0"
homepage = "https://github.com/Textualize/textual"
description = "Modern Text User Interface framework"
authors = ["Will McGugan <will@textualize.io>"]

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import inspect
from functools import partial, wraps
from typing import TYPE_CHECKING, Callable
import rich.repr
@@ -9,11 +10,12 @@ from rich.console import RenderableType
from . import constants
from ._context import active_app
from ._log import LogGroup, LogVerbosity
from ._work_decorator import work as work
if TYPE_CHECKING:
from typing_extensions import TypeAlias
__all__ = ["log", "panic", "__version__"] # type: ignore
__all__ = ["log", "panic", "__version__", "work"] # type: ignore
LogCallable: TypeAlias = "Callable"
@@ -147,6 +149,11 @@ class Logger:
"""Logs from stdlib logging module."""
return Logger(self._log, LogGroup.LOGGING)
@property
def worker(self) -> Logger:
"""Logs worker information."""
return Logger(self._log, LogGroup.WORKER)
log = Logger(None)

View File

@@ -27,12 +27,14 @@ class AnimationError(Exception):
"""An issue prevented animation from starting."""
T = TypeVar("T")
ReturnType = TypeVar("ReturnType")
@runtime_checkable
class Animatable(Protocol):
def blend(self: T, destination: T, factor: float) -> T: # pragma: no cover
def blend(
self: ReturnType, destination: ReturnType, factor: float
) -> ReturnType: # pragma: no cover
...

View File

@@ -13,6 +13,7 @@ class LogGroup(Enum):
PRINT = 6
SYSTEM = 7
LOGGING = 8
WORKER = 9
class LogVerbosity(Enum):

View File

@@ -0,0 +1,111 @@
from __future__ import annotations
from functools import partial, wraps
from typing import TYPE_CHECKING, Callable, Coroutine, TypeVar, Union, cast, overload
from typing_extensions import ParamSpec, TypeAlias
if TYPE_CHECKING:
from .worker import Worker
FactoryParamSpec = ParamSpec("FactoryParamSpec")
DecoratorParamSpec = ParamSpec("DecoratorParamSpec")
ReturnType = TypeVar("ReturnType")
Decorator: TypeAlias = Callable[
[
Union[
Callable[DecoratorParamSpec, ReturnType],
Callable[DecoratorParamSpec, Coroutine[None, None, ReturnType]],
]
],
Callable[DecoratorParamSpec, "Worker[ReturnType]"],
]
@overload
def work(
method: Callable[FactoryParamSpec, Coroutine[None, None, ReturnType]]
) -> Callable[FactoryParamSpec, "Worker[ReturnType]"]:
...
@overload
def work(
method: Callable[FactoryParamSpec, ReturnType]
) -> Callable[FactoryParamSpec, "Worker[ReturnType]"]:
...
@overload
def work(*, exclusive: bool = False) -> Decorator[..., ReturnType]:
...
def work(
method: Callable[FactoryParamSpec, ReturnType]
| Callable[FactoryParamSpec, Coroutine[None, None, ReturnType]]
| None = None,
*,
name: str = "",
group: str = "default",
exit_on_error: bool = True,
exclusive: bool = False,
) -> Callable[FactoryParamSpec, Worker[ReturnType]] | Decorator:
"""Worker decorator factory.
Args:
method: A function or coroutine.
name: A short string to identify the worker (in logs and debugging).
group: A short string to identify a group of workers.
exit_on_error: Exit the app if the worker raises an error. Set to `False` to suppress exceptions.
exclusive: Cancel all workers in the same group.
"""
def decorator(
method: (
Callable[DecoratorParamSpec, ReturnType]
| Callable[DecoratorParamSpec, Coroutine[None, None, ReturnType]]
)
) -> Callable[DecoratorParamSpec, Worker[ReturnType]]:
"""The decorator."""
@wraps(method)
def decorated(
*args: DecoratorParamSpec.args, **kwargs: DecoratorParamSpec.kwargs
) -> Worker[ReturnType]:
"""The replaced callable."""
from .dom import DOMNode
self = args[0]
assert isinstance(self, DOMNode)
positional_arguments = ", ".join(repr(arg) for arg in args[1:])
keyword_arguments = ", ".join(
f"{name}={value!r}" for name, value in kwargs.items()
)
tokens = [positional_arguments, keyword_arguments]
worker_description = (
f"{method.__name__}({', '.join(token for token in tokens if token)})"
)
worker = cast(
"Worker[ReturnType]",
self.run_worker(
partial(method, *args, **kwargs),
name=name or method.__name__,
group=group,
description=worker_description,
exclusive=exclusive,
exit_on_error=exit_on_error,
),
)
return worker
return decorated
if method is None:
return decorator
else:
return decorator(method)

View File

@@ -0,0 +1,172 @@
from __future__ import annotations
import asyncio
from collections import Counter
from operator import attrgetter
from typing import TYPE_CHECKING, Any, Iterable, Iterator
import rich.repr
from .worker import Worker, WorkerState, WorkType
if TYPE_CHECKING:
from .app import App
from .dom import DOMNode
@rich.repr.auto(angular=True)
class WorkerManager:
"""An object to manager a number of workers.
You will not have to construct this class manually, as widgets, screens, and apps
have a worker manager accessibly via a `workers` attribute.
"""
def __init__(self, app: App) -> None:
"""Initialize a worker manager.
Args:
app: An App instance.
"""
self._app = app
"""A reference to the app."""
self._workers: set[Worker] = set()
"""The workers being managed."""
def __rich_repr__(self) -> rich.repr.Result:
counter: Counter[WorkerState] = Counter()
counter.update(worker.state for worker in self._workers)
for state, count in sorted(counter.items()):
yield state.name, count
def __iter__(self) -> Iterator[Worker[Any]]:
return iter(sorted(self._workers, key=attrgetter("_created_time")))
def __reversed__(self) -> Iterator[Worker[Any]]:
return iter(
sorted(self._workers, key=attrgetter("_created_time"), reverse=True)
)
def __bool__(self) -> bool:
return bool(self._workers)
def __len__(self) -> int:
return len(self._workers)
def __contains__(self, worker: object) -> bool:
return worker in self._workers
def add_worker(
self, worker: Worker, start: bool = True, exclusive: bool = True
) -> None:
"""Add a new worker.
Args:
worker: A Worker instance.
start: Start the worker if True, otherwise the worker must be started manually.
exclusive: Cancel all workers in the same group as `worker`.
"""
if exclusive and worker.group:
self.cancel_group(worker.node, worker.group)
self._workers.add(worker)
if start:
worker._start(self._app, self._remove_worker)
def _new_worker(
self,
work: WorkType,
node: DOMNode,
*,
name: str | None = "",
group: str = "default",
description: str = "",
exit_on_error: bool = True,
start: bool = True,
exclusive: bool = False,
) -> Worker:
"""Create a worker from a function, coroutine, or awaitable.
Args:
work: A callable, a coroutine, or other awaitable.
name: A name to identify the worker.
group: The worker group.
description: A description of the worker.
exit_on_error: Exit the app if the worker raises an error. Set to `False` to suppress exceptions.
start: Automatically start the worker.
exclusive: Cancel all workers in the same group.
Returns:
A Worker instance.
"""
worker: Worker[Any] = Worker(
node,
work,
name=name or getattr(work, "__name__", "") or "",
group=group,
description=description or repr(work),
exit_on_error=exit_on_error,
)
self.add_worker(worker, start=start, exclusive=exclusive)
return worker
def _remove_worker(self, worker: Worker) -> None:
"""Remove a worker from the manager.
Args:
worker: A Worker instance.
"""
self._workers.discard(worker)
def start_all(self) -> None:
"""Start all the workers."""
for worker in self._workers:
worker._start(self._app, self._remove_worker)
def cancel_all(self) -> None:
"""Cancel all workers."""
for worker in self._workers:
worker.cancel()
def cancel_group(self, node: DOMNode, group: str) -> list[Worker]:
"""Cancel a single group.
Args:
node: Worker DOM node.
group: A group name.
Return:
A list of workers that were cancelled.
"""
workers = [
worker
for worker in self._workers
if (worker.group == group and worker.node == node)
]
for worker in workers:
worker.cancel()
return workers
def cancel_node(self, node: DOMNode) -> list[Worker]:
"""Cancel all workers associated with a given node
Args:
node: A DOM node (widget, screen, or App).
Returns:
List of cancelled workers.
."""
workers = [worker for worker in self._workers if worker.node == node]
for worker in workers:
worker.cancel()
return workers
async def wait_for_complete(self, workers: Iterable[Worker] | None = None) -> None:
"""Wait for workers to complete.
Args:
workers: An iterable of workers or None to wait for all workers in the manager.
"""
await asyncio.gather(*[worker.wait() for worker in (workers or self)])

View File

@@ -58,6 +58,7 @@ from ._context import active_app, active_message_pump
from ._event_broker import NoHandler, extract_handler_actions
from ._path import _make_path_object_relative
from ._wait import wait_for_idle
from ._worker_manager import WorkerManager
from .actions import ActionParseResult, SkipAction
from .await_remove import AwaitRemove
from .binding import Binding, Bindings
@@ -93,6 +94,7 @@ PLATFORM = platform.system()
WINDOWS = PLATFORM == "Windows"
# asyncio will warn against resources not being cleared
if constants.DEBUG:
warnings.simplefilter("always", ResourceWarning)
# `asyncio.get_event_loop()` is deprecated since Python 3.10:
@@ -316,6 +318,7 @@ class App(Generic[ReturnType], DOMNode):
legacy_windows=False,
_environ=environ,
)
self._workers = WorkerManager(self)
self.error_console = Console(markup=False, stderr=True)
self.driver_class = driver_class or self.get_driver_class()
self._screen_stack: list[Screen] = []
@@ -411,7 +414,6 @@ class App(Generic[ReturnType], DOMNode):
self.devtools = DevtoolsClient()
self._loop: asyncio.AbstractEventLoop | None = None
self._thread_id: int = 0
self._return_value: ReturnType | None = None
self._exit = False
@@ -427,6 +429,11 @@ class App(Generic[ReturnType], DOMNode):
self.set_class(self.dark, "-dark-mode")
self.set_class(not self.dark, "-light-mode")
@property
def workers(self) -> WorkerManager:
"""A worker manager."""
return self._workers
@property
def return_value(self) -> ReturnType | None:
"""The return type of the app."""
@@ -933,6 +940,8 @@ class App(Generic[ReturnType], DOMNode):
app_ready_event.set()
async def run_app(app) -> None:
app._loop = asyncio.get_running_loop()
app._thread_id = threading.get_ident()
await app._process_messages(
ready_callback=on_app_ready,
headless=headless,
@@ -1620,6 +1629,7 @@ class App(Generic[ReturnType], DOMNode):
except asyncio.CancelledError:
pass
finally:
self.workers.cancel_all()
self._running = False
try:
await self.animator.stop()

View File

@@ -24,6 +24,7 @@ from rich.tree import Tree
from ._context import NoActiveAppError
from ._node_list import NodeList
from ._types import WatchCallbackType
from ._worker_manager import WorkerManager
from .binding import Binding, Bindings, BindingType
from .color import BLACK, WHITE, Color
from .css._error_tools import friendly_list
@@ -42,6 +43,7 @@ if TYPE_CHECKING:
from .css.query import DOMQuery, QueryType
from .screen import Screen
from .widget import Widget
from .worker import Worker, WorkType, ResultType
from typing_extensions import Self, TypeAlias
from typing_extensions import Literal
@@ -208,6 +210,49 @@ class DOMNode(MessagePump):
)
self._auto_refresh = interval
@property
def workers(self) -> WorkerManager:
"""A worker manager."""
return self.app.workers
def run_worker(
self,
work: WorkType[ResultType],
name: str | None = "",
group: str = "default",
description: str = "",
exit_on_error: bool = True,
start: bool = True,
exclusive: bool = True,
) -> Worker[ResultType]:
"""Run work in a worker.
A worker runs a function, coroutine, or awaitable, in the *background* as an async task or as a thread.
Args:
work: A function, async function, or an awaitable object to run in a worker.
name: A short string to identify the worker (in logs and debugging).
group: A short string to identify a group of workers.
description: A longer string to store longer information on the worker.
exit_on_error: Exit the app if the worker raises an error. Set to `False` to suppress exceptions.
start: Start the worker immediately.
exclusive: Cancel all workers in the same group.
Returns:
New Worker instance.
"""
worker: Worker[ResultType] = self.workers._new_worker(
work,
self,
name=name,
group=group,
description=description,
exit_on_error=exit_on_error,
start=start,
exclusive=exclusive,
)
return worker
@property
def is_modal(self) -> bool:
"""Is the node a modal?"""
@@ -899,6 +944,7 @@ class DOMNode(MessagePump):
styles = self.styles
for key, value in update_styles.items():
setattr(styles, key, value)
return self
def has_class(self, *class_names: str) -> bool:
"""Check if the Node has all the given class names.

View File

@@ -9,6 +9,7 @@ from __future__ import annotations
import asyncio
import inspect
import threading
from asyncio import CancelledError, Queue, QueueEmpty, Task
from contextlib import contextmanager
from functools import partial
@@ -69,9 +70,10 @@ class MessagePumpMeta(type):
class MessagePump(metaclass=MessagePumpMeta):
"""Base class which supplies a message pump."""
def __init__(self, parent: MessagePump | None = None) -> None:
self._message_queue: Queue[Message | None] = Queue()
self._active_message: Message | None = None
self._parent = parent
self._running: bool = False
self._closing: bool = False
@@ -84,6 +86,7 @@ class MessagePump(metaclass=MessagePumpMeta):
self._max_idle: float | None = None
self._mounted_event = asyncio.Event()
self._next_callbacks: list[CallbackType] = []
self._thread_id: int = threading.get_ident()
@property
def _prevent_message_types_stack(self) -> list[set[type[Message]]]:
@@ -451,6 +454,7 @@ class MessagePump(metaclass=MessagePumpMeta):
async def _process_messages_loop(self) -> None:
"""Process messages until the queue is closed."""
_rich_traceback_guard = True
self._thread_id = threading.get_ident()
while not self._closed:
try:
message = await self._get_message()
@@ -474,9 +478,6 @@ class MessagePump(metaclass=MessagePumpMeta):
except MessagePumpClosed:
break
self._active_message = message
try:
try:
await self._dispatch_message(message)
except CancelledError:
@@ -506,8 +507,6 @@ class MessagePump(metaclass=MessagePumpMeta):
except Exception as error:
self.app._handle_exception(error)
break
finally:
self._active_message = None
async def _flush_next_callbacks(self) -> None:
"""Invoke pending callbacks in next callbacks queue."""
@@ -631,6 +630,11 @@ class MessagePump(metaclass=MessagePumpMeta):
# Add a copy of the prevented message types to the message
# This is so that prevented messages are honoured by the event's handler
message._prevent.update(self._get_prevented_messages())
if self._thread_id != threading.get_ident() and self.app._loop is not None:
# If we're not calling from the same thread, make it threadsafe
loop = self.app._loop
loop.call_soon_threadsafe(self._message_queue.put_nowait, message)
else:
self._message_queue.put_nowait(message)
return True

View File

@@ -2904,6 +2904,9 @@ class Widget(DOMNode):
def _on_scroll_to_region(self, message: messages.ScrollToRegion) -> None:
self.scroll_to_region(message.region, animate=True)
def _on_unmount(self) -> None:
self.workers.cancel_node(self)
def action_scroll_home(self) -> None:
if not self._allow_scroll:
raise SkipAction()

View File

@@ -587,9 +587,9 @@ class Markdown(Widget):
self.href: str = href
"""The link that was selected."""
async def on_mount(self) -> None:
def on_mount(self) -> None:
if self._markdown is not None:
await self.update(self._markdown)
self.update(self._markdown)
async def load(self, path: Path) -> bool:
"""Load a new Markdown document.
@@ -605,10 +605,10 @@ class Markdown(Widget):
except Exception:
return False
await self.update(markdown)
self.update(markdown)
return True
async def update(self, markdown: str) -> None:
def update(self, markdown: str) -> None:
"""Update the document with new Markdown.
Args:
@@ -750,8 +750,8 @@ class Markdown(Widget):
self.post_message(Markdown.TableOfContentsUpdated(table_of_contents))
with self.app.batch_update():
await self.query("MarkdownBlock").remove()
await self.mount_all(output)
self.query("MarkdownBlock").remove()
self.mount_all(output)
class MarkdownTableOfContents(Widget, can_focus_children=True):
@@ -874,7 +874,7 @@ class MarkdownViewer(VerticalScroll, can_focus=True, can_focus_children=True):
async def on_mount(self) -> None:
if self._markdown is not None:
await self.document.update(self._markdown)
self.document.update(self._markdown)
async def go(self, location: str | PurePath) -> bool:
"""Navigate to a new document path."""

393
src/textual/worker.py Normal file
View File

@@ -0,0 +1,393 @@
from __future__ import annotations
import asyncio
import enum
import inspect
from contextvars import ContextVar
from time import monotonic
from typing import (
TYPE_CHECKING,
Awaitable,
Callable,
Coroutine,
Generic,
TypeVar,
Union,
cast,
)
import rich.repr
from rich.traceback import Traceback
from typing_extensions import TypeAlias
from .message import Message
if TYPE_CHECKING:
from .app import App
from .dom import DOMNode
active_worker: ContextVar[Worker] = ContextVar("active_worker")
"""Currently active worker context var."""
class NoActiveWorker(Exception):
"""There is no active worker."""
class WorkerError(Exception):
"""A worker related error."""
class WorkerFailed(WorkerError):
"""The worker raised an exception and did not complete."""
def __init__(self, error: BaseException) -> None:
self.error = error
super().__init__(f"Worker raised exception: {error!r}")
class DeadlockError(WorkerError):
"""The operation would result in a deadlock."""
class WorkerCancelled(WorkerError):
"""The worker was cancelled and did not complete."""
def get_current_worker() -> Worker:
"""Get the currently active worker.
Raises:
NoActiveWorker: If there is no active worker.
Returns:
A Worker instance.
"""
try:
return active_worker.get()
except LookupError:
raise NoActiveWorker(
"There is no active worker in this task or thread."
) from None
class WorkerState(enum.Enum):
"""A description of the worker's current state."""
PENDING = 1
"""Worker is initialized, but not running."""
RUNNING = 2
"""Worker is running."""
CANCELLED = 3
"""Worker is not running, and was cancelled."""
ERROR = 4
"""Worker is not running, and exited with an error."""
SUCCESS = 5
"""Worker is not running, and completed successfully."""
ResultType = TypeVar("ResultType")
WorkType: TypeAlias = Union[
Callable[[], Coroutine[None, None, ResultType]],
Callable[[], ResultType],
Awaitable[ResultType],
]
class _ReprText:
"""Shim to insert a word into the Worker's repr."""
def __init__(self, text: str) -> None:
self.text = text
def __repr__(self) -> str:
return self.text
@rich.repr.auto(angular=True)
class Worker(Generic[ResultType]):
"""A class to manage concurrent work (either a task or a thread)."""
@rich.repr.auto
class StateChanged(Message, bubble=False):
"""The worker state changed."""
namespace = "worker"
def __init__(self, worker: Worker, state: WorkerState) -> None:
"""Initialize the StateChanged message.
Args:
worker: The worker object.
state: New state.
"""
self.worker = worker
self.state = state
super().__init__()
def __rich_repr__(self) -> rich.repr.Result:
yield self.worker
yield self.state
def __init__(
self,
node: DOMNode,
work: WorkType | None = None,
*,
name: str = "",
group: str = "default",
description: str = "",
exit_on_error: bool = True,
) -> None:
"""Initialize a Worker.
Args:
node: The widget, screen, or App that initiated the work.
work: A callable, coroutine, or other awaitable object to run in the worker.
name: Name of the worker (short string to help identify when debugging).
group: The worker group.
description: Description of the worker (longer string with more details).
exit_on_error: Exit the app if the worker raises an error. Set to `False` to suppress exceptions.
"""
self._node = node
self._work = work
self.name = name
self.group = group
self.description = description
self.exit_on_error = exit_on_error
self._state = WorkerState.PENDING
self.state = self._state
self._error: BaseException | None = None
self._completed_steps: int = 0
self._total_steps: int | None = None
self._cancelled: bool = False
self._created_time = monotonic()
self._result: ResultType | None = None
self._task: asyncio.Task | None = None
self._node.post_message(self.StateChanged(self, self._state))
def __rich_repr__(self) -> rich.repr.Result:
yield _ReprText(self.state.name)
yield "name", self.name, ""
yield "group", self.group, "default"
yield "description", self.description, ""
yield "progress", round(self.progress, 1), 0.0
@property
def node(self) -> DOMNode:
"""The node where this worker was run from."""
return self._node
@property
def state(self) -> WorkerState:
"""The current state of the worker."""
return self._state
@state.setter
def state(self, state: WorkerState) -> None:
"""Set the state, and send a message."""
changed = state != self._state
self._state = state
if changed:
self._node.post_message(self.StateChanged(self, state))
@property
def is_cancelled(self) -> bool:
"""Has the work been cancelled?
Note that cancelled work may still be running.
"""
return self._cancelled
@property
def is_running(self) -> bool:
"""Is the task running?"""
return self.state == WorkerState.RUNNING
@property
def is_finished(self) -> bool:
"""Has the task finished (cancelled, error, or success)?"""
return self.state in (
WorkerState.CANCELLED,
WorkerState.ERROR,
WorkerState.SUCCESS,
)
@property
def completed_steps(self) -> int:
"""The number of completed steps."""
return self._completed_steps
@property
def total_steps(self) -> int | None:
"""The number of total steps, or None if indeterminate."""
return self._total_steps
@property
def progress(self) -> float:
"""Progress as a percentage.
If the total steps is None, then this will return 0. The percentage will be clamped between 0 and 100.
"""
if not self._total_steps:
return 0.0
return max(0, min(100, (self._completed_steps / self._total_steps) * 100.0))
@property
def result(self) -> ResultType | None:
"""The result of the worker, or `None` if there is no result."""
return self._result
def update(
self, completed_steps: int | None = None, total_steps: int | None = -1
) -> None:
"""Update the number of completed steps.
Args:
completed_steps: The number of completed seps, or `None` to not change.
total_steps: The total number of steps, `None` for indeterminate, or -1 to leave unchanged.
"""
if completed_steps is not None:
self._completed_steps += completed_steps
if total_steps != -1:
self._total_steps = None if total_steps is None else min(0, total_steps)
def advance(self, steps: int = 1) -> None:
"""Advance the number of completed steps.
Args:
steps: Number of steps to advance.
"""
self._completed_steps += steps
async def run(self) -> ResultType:
"""Run the work.
Implement this method in a subclass, or pass a callable to the constructor.
Returns:
Return value of work.
"""
if (
inspect.iscoroutinefunction(self._work)
or hasattr(self._work, "func")
and inspect.iscoroutinefunction(self._work.func)
):
# Coroutine, we can await it.
result: ResultType = await self._work()
elif inspect.isawaitable(self._work):
result = await self._work
else:
assert callable(self._work)
loop = asyncio.get_running_loop()
def run_work(work: Callable[[], ResultType]) -> ResultType:
"""Set the active worker, and run the work."""
active_worker.set(self)
return work()
result = await loop.run_in_executor(None, run_work, self._work)
return result
async def _run(self, app: App) -> None:
"""Run the worker.
Args:
app: App instance.
"""
app._set_active()
active_worker.set(self)
self.state = WorkerState.RUNNING
app.log.worker(self)
try:
self._result = await self.run()
except asyncio.CancelledError as error:
self.state = WorkerState.CANCELLED
self._error = error
app.log.worker(self)
except Exception as error:
self.state = WorkerState.ERROR
self._error = error
app.log.worker(self, "failed", repr(error))
app.log.worker(Traceback())
if self.exit_on_error:
app.fatal_error()
else:
self.state = WorkerState.SUCCESS
app.log.worker(self)
def _start(
self, app: App, done_callback: Callable[[Worker], None] | None = None
) -> None:
"""Start the worker.
Args:
app: An app instance.
done_callback: A callback to call when the task is done.
"""
if self._task is not None:
return
self.state = WorkerState.RUNNING
self._task = asyncio.create_task(self._run(app))
def task_done_callback(_task: asyncio.Task) -> None:
"""Run the callback.
Called by `Task.add_done_callback`.
Args:
The worker's task.
"""
if done_callback is not None:
done_callback(self)
self._task.add_done_callback(task_done_callback)
def cancel(self) -> None:
"""Cancel the task."""
self._cancelled = True
if self._task is not None:
self._task.cancel()
async def wait(self) -> ResultType:
"""Wait for the work to complete.
Raises:
WorkerFailed: If the Worker raised an exception.
WorkerCancelled: If the Worker was cancelled before it completed.
Returns:
The return value of the work.
"""
try:
if active_worker.get() is self:
raise DeadlockError(
"Can't call worker.wait from within the worker function!"
)
except LookupError:
# Not in a worker
pass
if self.state == WorkerState.PENDING:
raise WorkerError("Worker must be started before calling this method.")
if self._task is not None:
try:
await self._task
except asyncio.CancelledError as error:
self.state = WorkerState.CANCELLED
self._error = error
if self.state == WorkerState.ERROR:
assert self._error is not None
raise WorkerFailed(self._error)
elif self.state == WorkerState.CANCELLED:
raise WorkerCancelled("Worker was cancelled, and did not complete.")
return cast("ResultType", self._result)

View File

@@ -1,4 +1,6 @@
import asyncio
import sys
import threading
import pytest
@@ -60,6 +62,8 @@ async def test_installed_screens():
async def test_screens():
app = App()
app._loop = asyncio.get_running_loop()
app._thread_id = threading.get_ident()
# There should be nothing in the children since the app hasn't run yet
assert not app._nodes
assert not app.children

View File

@@ -0,0 +1,33 @@
import asyncio
from textual import work
from textual.app import App
from textual.worker import Worker, WorkerState
async def test_work() -> None:
"""Test basic usage of the @work decorator."""
states: list[WorkerState] = []
class WorkApp(App):
worker: Worker
@work
async def foo(self) -> str:
await asyncio.sleep(0.1)
return "foo"
def on_mount(self) -> None:
self.worker = self.foo()
def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
states.append(event.state)
app = WorkApp()
async with app.run_test() as pilot:
await app.workers.wait_for_complete()
result = await app.worker.wait()
assert result == "foo"
await pilot.pause()
assert states == [WorkerState.PENDING, WorkerState.RUNNING, WorkerState.SUCCESS]

155
tests/test_worker.py Normal file
View File

@@ -0,0 +1,155 @@
import asyncio
import pytest
from textual.app import App
from textual.worker import (
Worker,
WorkerCancelled,
WorkerFailed,
WorkerState,
get_current_worker,
)
async def test_initialize():
"""Test initial values."""
def foo() -> str:
return "foo"
app = App()
async with app.run_test():
worker = Worker(app, foo, name="foo", group="foo-group", description="Foo test")
repr(worker)
assert worker.state == WorkerState.PENDING
assert not worker.is_cancelled
assert not worker.is_running
assert not worker.is_finished
assert worker.completed_steps == 0
assert worker.total_steps is None
assert worker.progress == 0.0
assert worker.result is None
async def test_run_success() -> None:
"""Test successful runs."""
def foo() -> str:
"""Regular function."""
return "foo"
async def bar() -> str:
"""Coroutine."""
return "bar"
async def baz() -> str:
"""Coroutine."""
return "baz"
class RunApp(App):
pass
app = RunApp()
async with app.run_test():
# Call regular function
foo_worker: Worker[str] = Worker(
app, foo, name="foo", group="foo-group", description="Foo test"
)
# Call coroutine function
bar_worker: Worker[str] = Worker(
app, bar, name="bar", group="bar-group", description="Bar test"
)
# Call coroutine
baz_worker: Worker[str] = Worker(
app, baz(), name="baz", group="baz-group", description="Baz test"
)
assert foo_worker.result is None
assert bar_worker.result is None
assert baz_worker.result is None
foo_worker._start(app)
bar_worker._start(app)
baz_worker._start(app)
assert await foo_worker.wait() == "foo"
assert await bar_worker.wait() == "bar"
assert await baz_worker.wait() == "baz"
assert foo_worker.result == "foo"
assert bar_worker.result == "bar"
assert baz_worker.result == "baz"
async def test_run_error() -> None:
async def run_error() -> str:
await asyncio.sleep(0.1)
1 / 0
return "Never"
class ErrorApp(App):
pass
app = ErrorApp()
async with app.run_test():
worker: Worker[str] = Worker(app, run_error)
worker._start(app)
with pytest.raises(WorkerFailed):
await worker.wait()
async def test_run_cancel() -> None:
"""Test run may be cancelled."""
async def run_error() -> str:
await asyncio.sleep(0.1)
return "Never"
class ErrorApp(App):
pass
app = ErrorApp()
async with app.run_test():
worker: Worker[str] = Worker(app, run_error)
worker._start(app)
await asyncio.sleep(0)
worker.cancel()
assert worker.is_cancelled
with pytest.raises(WorkerCancelled):
await worker.wait()
async def test_run_cancel_immediately() -> None:
"""Edge case for cancelling immediately."""
async def run_error() -> str:
await asyncio.sleep(0.1)
return "Never"
class ErrorApp(App):
pass
app = ErrorApp()
async with app.run_test():
worker: Worker[str] = Worker(app, run_error)
worker._start(app)
worker.cancel()
assert worker.is_cancelled
with pytest.raises(WorkerCancelled):
await worker.wait()
async def test_get_worker() -> None:
"""Check get current worker."""
async def run_worker() -> Worker:
worker = get_current_worker()
return worker
class WorkerApp(App):
pass
app = WorkerApp()
async with app.run_test():
worker: Worker[Worker] = Worker(app, run_worker)
worker._start(app)
assert await worker.wait() is worker

View File

@@ -0,0 +1,96 @@
import asyncio
import time
from textual.app import App, ComposeResult
from textual.widget import Widget
from textual.worker import Worker, WorkerState
def test_worker_manager_init():
app = App()
assert isinstance(repr(app.workers), str)
assert not bool(app.workers)
assert len(app.workers) == 0
assert list(app.workers) == []
assert list(reversed(app.workers)) == []
async def test_run_worker_async() -> None:
"""Check self.run_worker"""
worker_events: list[Worker.StateChanged] = []
work_result: str = ""
new_worker: Worker
class WorkerWidget(Widget):
async def work(self) -> str:
nonlocal work_result
await asyncio.sleep(0.02)
work_result = "foo"
return "foo"
def on_mount(self):
nonlocal new_worker
new_worker = self.run_worker(self.work, start=False)
def on_worker_state_changed(self, event) -> None:
worker_events.append(event)
class WorkerApp(App):
def compose(self) -> ComposeResult:
yield WorkerWidget()
app = WorkerApp()
async with app.run_test():
assert new_worker in app.workers
assert len(app.workers) == 1
app.workers.start_all()
await app.workers.wait_for_complete()
assert len(app.workers) == 0
assert work_result == "foo"
assert isinstance(worker_events[0].worker.node, WorkerWidget)
states = [event.state for event in worker_events]
assert states == [
WorkerState.PENDING,
WorkerState.RUNNING,
WorkerState.SUCCESS,
]
async def test_run_worker_thread() -> None:
"""Check self.run_worker"""
worker_events: list[Worker.StateChanged] = []
work_result: str = ""
class WorkerWidget(Widget):
def work(self) -> str:
nonlocal work_result
time.sleep(0.02)
work_result = "foo"
return "foo"
def on_mount(self):
self.run_worker(self.work)
def on_worker_state_changed(self, event) -> None:
worker_events.append(event)
class WorkerApp(App):
def compose(self) -> ComposeResult:
yield WorkerWidget()
app = WorkerApp()
async with app.run_test():
await app.workers.wait_for_complete()
assert work_result == "foo"
assert isinstance(worker_events[0].worker.node, WorkerWidget)
states = [event.state for event in worker_events]
assert states == [
WorkerState.PENDING,
WorkerState.RUNNING,
WorkerState.SUCCESS,
]