mirror of
https://github.com/Textualize/textual.git
synced 2025-10-17 02:38:12 +03:00
* worker class * worker API tests * tidy * Decorator and more tests * type fix * error order * more tests * remove active message * move worker manager to app * cancel nodes * typing fix * revert change * typing fixes and cleanup * revert typing * test fix * cancel group * Added test for worker * comment * workers docs * Added exit_on_error * changelog * svg * refactor test * remove debug tweaks * docstrings * worker test * fix typing in run * fix 3.7 tests * blog post * fix deadlock test * words * words * words * workers docs * blog post * Apply suggestions from code review Co-authored-by: Dave Pearson <davep@davep.org> * docstring * fix and docstring * Apply suggestions from code review Co-authored-by: Rodrigo Girão Serrão <5621605+rodrigogiraoserrao@users.noreply.github.com> * Update src/textual/widgets/_markdown.py Co-authored-by: Rodrigo Girão Serrão <5621605+rodrigogiraoserrao@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Rodrigo Girão Serrão <5621605+rodrigogiraoserrao@users.noreply.github.com> * Update src/textual/worker.py Co-authored-by: Rodrigo Girão Serrão <5621605+rodrigogiraoserrao@users.noreply.github.com> * Fix black * docstring * merge * changelog --------- Co-authored-by: Dave Pearson <davep@davep.org> Co-authored-by: Rodrigo Girão Serrão <5621605+rodrigogiraoserrao@users.noreply.github.com>
34 lines
882 B
Python
34 lines
882 B
Python
import asyncio
|
|
|
|
from textual import work
|
|
from textual.app import App
|
|
from textual.worker import Worker, WorkerState
|
|
|
|
|
|
async def test_work() -> None:
|
|
"""Test basic usage of the @work decorator."""
|
|
states: list[WorkerState] = []
|
|
|
|
class WorkApp(App):
|
|
worker: Worker
|
|
|
|
@work
|
|
async def foo(self) -> str:
|
|
await asyncio.sleep(0.1)
|
|
return "foo"
|
|
|
|
def on_mount(self) -> None:
|
|
self.worker = self.foo()
|
|
|
|
def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
|
|
states.append(event.state)
|
|
|
|
app = WorkApp()
|
|
|
|
async with app.run_test() as pilot:
|
|
await app.workers.wait_for_complete()
|
|
result = await app.worker.wait()
|
|
assert result == "foo"
|
|
await pilot.pause()
|
|
assert states == [WorkerState.PENDING, WorkerState.RUNNING, WorkerState.SUCCESS]
|