Textual dev (#2884)

* WiP: Move the devtools and related code to `textual-dev` (#2834)

* Remove the textual script from the project file

This is moving into the textual-dev package.

* Remove the textual CLI code from Textual

This has all gone to live in textual-dev.

* Remove the devtools testing from Textual's unit tests

They've moved over to textual-dev instead.

* Remove the devtools server from Textual itself

The start of the process to remove as much of the core devtools as possible
from Textual.

* Switch the console docs example screenshot over to textual_dev

* Remove rednerables.py from Textual

* Remove the last parts of devtools from Textual

This is the last step. It remains to be seen if this is sustainable, but for
testing purposes this is the extreme case we're aiming for. I *think* this
will work though.

Hereon we'll be needing to do an editable install of textual-dev into
textual, and more generally and once this is "live" we'll be needing to make
sure that textual[dev] is installed when doing development work on textual
apps.

The thing that remains to be seen however is how this all works
with *developing* Textual itself. Will I always need to do an editable
install? Still got to figure that one out.

* Start to whittle down the pyproject file

The next step is to try and work out what can come out of the pyproject
file.

* Remove aiohttp from Textual

* Remove some more development dependencies we don't need any more

* Relock

* Remove the pointer to the previews directory

* Reintroduce the border preview snapshot test

* Reintroduce the color preview snapshot test

* Reinstate the key press for the border preview snapshot test

* Reintroduce the easing preview snapshot test

* Reintroduce the keys tool snapshot test

* Add pytest-asyncio as a development dependency

* Relock

* Pin the textual-dev version to 0.1.0 or later

Mostly to try and get the tests kicked off properly.

* Relock dependencies

* Further `textual-dev` changes (#2850)

* Remove the textual script from the project file

This is moving into the textual-dev package.

* Remove the textual CLI code from Textual

This has all gone to live in textual-dev.

* Remove the devtools testing from Textual's unit tests

They've moved over to textual-dev instead.

* Remove the devtools server from Textual itself

The start of the process to remove as much of the core devtools as possible
from Textual.

* Switch the console docs example screenshot over to textual_dev

* Remove rednerables.py from Textual

* Remove the last parts of devtools from Textual

This is the last step. It remains to be seen if this is sustainable, but for
testing purposes this is the extreme case we're aiming for. I *think* this
will work though.

Hereon we'll be needing to do an editable install of textual-dev into
textual, and more generally and once this is "live" we'll be needing to make
sure that textual[dev] is installed when doing development work on textual
apps.

The thing that remains to be seen however is how this all works
with *developing* Textual itself. Will I always need to do an editable
install? Still got to figure that one out.

* Start to whittle down the pyproject file

The next step is to try and work out what can come out of the pyproject
file.

* Remove aiohttp from Textual

* Remove some more development dependencies we don't need any more

* Relock

* Remove the pointer to the previews directory

* Reintroduce the border preview snapshot test

* Reintroduce the color preview snapshot test

* Reinstate the key press for the border preview snapshot test

* Reintroduce the easing preview snapshot test

* Reintroduce the keys tool snapshot test

* Add pytest-asyncio as a development dependency

* Relock

* Pin the textual-dev version to 0.1.0 or later

Mostly to try and get the tests kicked off properly.

* Relock dependencies

* Whitespace cleaning

* Swap mentions of textual[dev] to textual-dev

* Remove the dev extra

* Tweak README.md in response to PR review

* Tweak animation.md in response to PR review

* Tweak getting_started.md in response to PR review

* bump version

* lock

* drop dev

* more

* version bump

---------

Co-authored-by: Dave Pearson <davep@davep.org>
This commit is contained in:
Will McGugan
2023-07-03 15:37:40 +01:00
committed by GitHub
parent 6b3b1ce67f
commit 65e81c58be
45 changed files with 2028 additions and 4556 deletions

View File

@@ -40,7 +40,7 @@ jobs:
path: .venv
key: venv-${{ runner.os }}-${{ matrix.python-version }}-${{ hashFiles('**/poetry.lock') }}
- name: Install dependencies
run: poetry install --extras "dev"
run: poetry install
if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
# - name: Typecheck with mypy
# run: |

View File

@@ -5,8 +5,7 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
## Unreleased
## [0.29.0] - 2023-07-03
### Added
@@ -15,14 +14,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Added `default` parameter to `DataTable.add_column` for populating existing rows https://github.com/Textualize/textual/pull/2836
- Added can-focus pseudo-class to target widgets that may receive focus
### Fixed
- Fixed crash when columns were added to populated `DataTable` https://github.com/Textualize/textual/pull/2836
- Fixed issues with opacity on Screens https://github.com/Textualize/textual/issues/2616
- Fixed style problem with selected selections in a non-focused selection list https://github.com/Textualize/textual/issues/2768
## [0.28.1] - 2023-06-20
### Fixed
@@ -1098,6 +1095,7 @@ https://textual.textualize.io/blog/2022/11/08/version-040/#version-040
- New handler system for messages that doesn't require inheritance
- Improved traceback handling
[0.29.0]: https://github.com/Textualize/textual/compare/v0.28.1...v0.29.0
[0.28.1]: https://github.com/Textualize/textual/compare/v0.28.0...v0.28.1
[0.28.0]: https://github.com/Textualize/textual/compare/v0.27.0...v0.28.0
[0.27.0]: https://github.com/Textualize/textual/compare/v0.26.0...v0.27.0

2
FAQ.md
View File

@@ -28,7 +28,7 @@ You likely have an older version of Textual. You can install the latest version
The following should do it:
```
pip install "textual[dev]" -U
pip install textual-dev -U
```
<a name="how-can-i-select-and-copy-text-in-a-textual-app"></a>

View File

@@ -13,12 +13,12 @@ Textual is a *Rapid Application Development* framework for Python.
Build sophisticated user interfaces with a simple Python API. Run your apps in the terminal and (coming soon) a web browser!
<details>
<details>
<summary> 🎬 Demonstration </summary>
<hr>
A quick run through of some Textual features.
https://user-images.githubusercontent.com/554369/197355913-65d3c125-493d-4c05-a590-5311f16c40ff.mov
@@ -32,7 +32,7 @@ https://user-images.githubusercontent.com/554369/197355913-65d3c125-493d-4c05-a5
Textual adds interactivity to [Rich](https://github.com/Textualize/rich) with an API inspired by modern web development.
On modern terminal software (installed by default on most systems), Textual apps can use **16.7 million** colors with mouse support and smooth flicker-free animation. A powerful layout engine and re-usable components makes it possible to build apps that rival the desktop and web experience.
On modern terminal software (installed by default on most systems), Textual apps can use **16.7 million** colors with mouse support and smooth flicker-free animation. A powerful layout engine and re-usable components makes it possible to build apps that rival the desktop and web experience.
## Compatibility
@@ -43,10 +43,16 @@ Textual runs on Linux, macOS, and Windows. Textual requires Python 3.7 or above.
Install Textual via pip:
```
pip install "textual[dev]"
pip install textual
```
The addition of `[dev]` installs Textual development tools. See the [docs](https://textual.textualize.io/getting_started/) if you need help getting started.
If you plan on developing Textual apps, you should also install the development tools with the following command:
```
pip install textual-dev
```
See the [docs](https://textual.textualize.io/getting_started/) if you need help getting started.
## Demo
@@ -82,12 +88,12 @@ https://user-images.githubusercontent.com/554369/197188237-88d3f7e4-4e5f-40b5-b9
</details>
<details>
<details>
<summary> 📷 Calculator </summary>
<hr>
This is [calculator.py](https://github.com/Textualize/textual/blob/main/examples/calculator.py) which demonstrates Textual grid layouts.
![calculator screenshot](https://raw.githubusercontent.com/Textualize/textual/main/imgs/calculator.png)
</details>
@@ -97,7 +103,7 @@ This is [calculator.py](https://github.com/Textualize/textual/blob/main/examples
<hr>
This is the Stopwatch example from the [tutorial](https://textual.textualize.io/tutorial/).
https://user-images.githubusercontent.com/554369/197360718-0c834ef5-6285-4d37-85cf-23eed4aa56c5.mov
@@ -112,12 +118,12 @@ https://user-images.githubusercontent.com/554369/197360718-0c834ef5-6285-4d37-85
The `textual` command has a few sub-commands to preview Textual styles.
<details>
<details>
<summary> 🎬 Easing reference </summary>
<hr>
This is the *easing* reference which demonstrates the easing parameter on animation, with both movement and opacity. You can run it with the following command:
```bash
textual easing
```
@@ -128,12 +134,12 @@ https://user-images.githubusercontent.com/554369/196157100-352852a6-2b09-4dc8-a8
</details>
<details>
<details>
<summary> 🎬 Borders reference </summary>
<hr>
This is the borders reference which demonstrates some of the borders styles in Textual. You can run it with the following command:
```bash
textual borders
```
@@ -141,16 +147,16 @@ textual borders
https://user-images.githubusercontent.com/554369/196158235-4b45fb78-053d-4fd5-b285-e09b4f1c67a8.mov
</details>
<details>
<details>
<summary> 🎬 Colors reference </summary>
<hr>
This is a reference for Textual's color design system.
```bash
textual colors
```
@@ -161,6 +167,5 @@ https://user-images.githubusercontent.com/554369/197357417-2d407aac-8969-44d3-82
</details>
</details>

View File

@@ -0,0 +1,31 @@
---
draft: false
date: 2023-07-03
categories:
- Release
title: "Textual 0.27.0 refactors dev tools"
authors:
- willmcgugan
---
# Textual 0.27.0 refactors dev tools
It's been a slow week or two at Textualize, with Textual devs taking well-earned annual leave, but we still managed to get a new version out.
<!-- more -->
Version 0.27.0 has shipped with a number of fixes (see the [release notes](https://github.com/Textualize/textual/releases/tag/v0.29.0) for details), but I'd like to use this post to explain a change we made to how Textual developer tools are distributed.
Previously if you installed `textual[dev]` you would get the Textual dev tools plus the library itself. If you were distributing Textual apps and didn't need the developer tools you could drop the `[dev]`.
We did this because the less dependencies a package has, the fewer installation issues you can expect to get in the future. And Textual is surprisingly lean if you only need to *run* apps, and not build them.
Alas, this wasn't quite as elegant solution as we hoped. The dependencies defined in extras wouldn't install commands, so `textual` was bundled with the core library. This meant that if you installed the Textual package *without* the `[dev]` you would still get the `textual` command on your path but it wouldn't run.
We solved this by creating two packages: `textual` contains the core library (with minimal dependencies) and `textual-dev` contains the developer tools. If you are building Textual apps, you should install both as follows:
```
pip install textual textual-dev
```
That's the only difference. If you run in to any issues feel free to ask on the [Discord server](https://discord.gg/Enf6Z3qhVr)!

View File

@@ -2,8 +2,9 @@
Simulates a screenshot of the Textual devtools
"""
from textual_dev.renderables import DevConsoleHeader
from textual.app import App
from textual.devtools.renderables import DevConsoleHeader
from textual.widgets import Static

View File

@@ -20,20 +20,28 @@ Textual requires Python 3.7 or later (if you have a choice, pick the most recent
## Installation
You can install Textual via PyPI.
If you plan on developing Textual apps, then you should install `textual[dev]`. The `[dev]` part installs a few extra dependencies for development.
```
pip install "textual[dev]"
```
If you only plan on _running_ Textual apps, then you can drop the `[dev]` part:
You can install Textual via PyPI, with the following command:
```
pip install textual
```
If you plan on developing Textual apps, you should also install textual developer tools:
```
pip install textual-dev
```
### Textual CLI
If you installed the developer tools you should have access to the `textual` command. There are a number of sub-commands available which will aid you in building Textual apps. Run the following for a list of the available commands:
```bash
textual --help
```
See [devtools](guide/devtools.md) for more about the `textual` command.
## Demo
Once you have Textual installed, run the following to get an impression of what it can do:
@@ -79,15 +87,7 @@ python code_browser.py ../
```
## Textual CLI
If you installed the dev dependencies you have access to the `textual` CLI command. There are a number of sub-commands which will aid you in building Textual apps.
```bash
textual --help
```
See [devtools](guide/devtools.md) for more about the `textual` command.
## Need help?

View File

@@ -72,7 +72,7 @@ You can specify which easing method to use via the `easing` parameter on the `an
!!! note
The `textual easing` preview requires the `dev` extras to be installed (using `pip install textual[dev]`).
The `textual easing` preview requires the `textual-dev` package to be installed (using `pip install textual-dev`).
## Completion callbacks

2438
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "textual"
version = "0.28.1"
version = "0.29.0"
homepage = "https://github.com/Textualize/textual"
description = "Modern Text User Interface framework"
authors = ["Will McGugan <will@textualize.io>"]
@@ -33,9 +33,6 @@ include = [
{ path = "docs-offline/**/*", format = "sdist" }
]
[tool.poetry.scripts]
textual = "textual.cli.cli:run"
[tool.poetry.dependencies]
python = "^3.7"
rich = ">=13.3.3"
@@ -44,14 +41,6 @@ markdown-it-py = {extras = ["plugins", "linkify"], version = ">=2.1.0"}
importlib-metadata = ">=4.11.3"
typing-extensions = "^4.4.0"
# Dependencies below are required for devtools only
aiohttp = { version = ">=3.8.1", optional = true }
click = {version = ">=8.1.2", optional = true}
msgpack = { version = ">=1.0.3", optional = true }
[tool.poetry.extras]
dev = ["aiohttp", "click", "msgpack"]
[tool.poetry.group.dev.dependencies]
pytest = "^7.1.3"
black = "^23.1.0"
@@ -67,8 +56,9 @@ pytest-aiohttp = "^1.0.4"
time-machine = "^2.6.0"
mkdocs-rss-plugin = "^1.5.0"
httpx = "^0.23.1"
msgpack-types = "^0.2.0"
types-setuptools = "^67.2.0.1"
textual-dev = ">=1.0.0"
pytest-asyncio = "*"
pytest-textual-snapshot = ">=0.1.0"
[tool.black]

View File

@@ -10,5 +10,5 @@ You likely have an older version of Textual. You can install the latest version
The following should do it:
```
pip install "textual[dev]" -U
pip install textual-dev -U
```

View File

@@ -96,11 +96,11 @@ from .screen import Screen, ScreenResultCallbackType, ScreenResultType
from .widget import AwaitMount, Widget
if TYPE_CHECKING:
from textual_dev.client import DevtoolsClient
from typing_extensions import Coroutine, TypeAlias
# Unused & ignored imports are needed for the docs to link to these objects:
from .css.query import WrongType # type: ignore # noqa: F401
from .devtools.client import DevtoolsClient
from .message import Message
from .pilot import Pilot
from .widget import MountError # type: ignore # noqa: F401
@@ -432,7 +432,7 @@ class App(Generic[ReturnType], DOMNode):
self.devtools: DevtoolsClient | None = None
if "devtools" in self.features:
try:
from .devtools.client import DevtoolsClient
from textual_dev.client import DevtoolsClient
except ImportError:
# Dev dependencies not installed
pass
@@ -806,7 +806,7 @@ class App(Generic[ReturnType], DOMNode):
return
try:
from .devtools.client import DevtoolsLog
from textual_dev.client import DevtoolsLog
if len(objects) == 1 and not kwargs:
devtools.log(
@@ -1859,7 +1859,7 @@ class App(Generic[ReturnType], DOMNode):
active_message_pump.set(self)
if self.devtools is not None:
from .devtools.client import DevtoolsConnectionError
from textual_dev.client import DevtoolsConnectionError
try:
await self.devtools.connect()
@@ -1971,7 +1971,7 @@ class App(Generic[ReturnType], DOMNode):
if self.devtools is not None:
devtools = self.devtools
assert devtools is not None
from .devtools.redirect_output import StdoutRedirector
from textual_dev.redirect_output import StdoutRedirector
redirector = StdoutRedirector(devtools)
with redirect_stderr(redirector):

View File

@@ -1,3 +0,0 @@
from .cli import run
run()

View File

@@ -1,148 +0,0 @@
"""
Functions to run Textual apps with an updated environment.
Note that these methods will execute apps in a new process, and abandon the current process.
This means that (if they succeed) they will never return.
"""
from __future__ import annotations
import os
import platform
import subprocess
import sys
from string import Template
from typing import NoReturn, Sequence
WINDOWS = platform.system() == "Windows"
EXEC_SCRIPT = Template(
"""\
from textual.app import App
try:
from $MODULE import $APP as app;
except ImportError:
raise SystemExit("Unable to import '$APP' from module '$MODULE'") from None
if isinstance(app, App):
# If we imported an app, run it
app.run()
else:
# Otherwise it is assumed to be a class
app().run()
"""
)
"""A template script to import and run an app."""
class ExecImportError(Exception):
"""Raised if a Python import is invalid."""
def run_app(
import_name: str, args: Sequence[str], environment: dict[str, str] | None = None
) -> None:
"""Run a textual app.
Note:
The current process is abandoned.
Args:
command_args: Arguments to pass to the Textual app.
environment: Environment variables, or None to use current process.
"""
if environment is None:
app_environment = dict(os.environ)
else:
app_environment = environment
if _is_python_path(import_name):
# If it is a Python path we can exec it now
exec_python([import_name, *args], app_environment)
else:
# Otherwise it is assumed to be a Python import
try:
exec_import(import_name, args, app_environment)
except (SyntaxError, ExecImportError):
print(f"Unable to import Textual app from {import_name!r}")
def _is_python_path(path: str) -> bool:
"""Does the given file look like it's run with Python?
Args:
path: The path to check.
Returns:
True if the path references Python code, False it it doesn't.
"""
if path.endswith(".py"):
return True
try:
with open(path, "r") as source:
first_line = source.readline()
except IOError:
return False
return first_line.startswith("#!") and "py" in first_line
def _flush() -> None:
"""Flush output before exec."""
sys.stderr.flush()
sys.stdout.flush()
def exec_python(args: Sequence[str], environment: dict[str, str]) -> None:
"""Execute a Python script.
Args:
args: Additional arguments.
environment: Environment variables.
"""
_flush()
if WINDOWS:
subprocess.call([sys.executable, *args], env=environment)
else:
os.execvpe(sys.executable, ["python", *args], environment)
def exec_command(
command: str, args: Sequence[str], environment: dict[str, str]
) -> None:
"""Execute a command with the given environment.
Args:
command: Command to execute.
environment: Environment variables.
"""
_flush()
if WINDOWS:
subprocess.call([command, *args], env=environment)
else:
os.execvpe(command, [command, *args], environment)
def exec_import(
import_name: str, args: Sequence[str], environment: dict[str, str]
) -> None:
"""Import and execute an app.
Raises:
SyntaxError: If any imports are not valid Python symbols.
ExecImportError: If the module could not be imported.
Args:
import_name: The Python import.
args: Command line arguments.
environment: Environment variables.
"""
module, _colon, app = import_name.partition(":")
app = app or "app"
script = EXEC_SCRIPT.substitute(MODULE=module, APP=app)
# Compiling the script will raise a SyntaxError if there are any invalid symbols
compile(script, "textual-exec", "exec")
_flush()
exec_python(["-c", script, *args], environment)

View File

@@ -1,241 +0,0 @@
from __future__ import annotations
import platform
import shlex
import sys
from ..constants import DEVTOOLS_PORT
from ._run import exec_command, run_app
try:
import click
except ImportError:
print("Please install 'textual[dev]' to use the 'textual' command")
sys.exit(1)
from importlib_metadata import version
WINDOWS = platform.system() == "Windows"
"""True if we're running on Windows."""
@click.group()
@click.version_option(version("textual"))
def run():
pass
@run.command(help="Run the Textual Devtools console.")
@click.option(
"--port",
"port",
type=int,
default=None,
metavar="PORT",
help=f"Port to use for the development mode console. Defaults to {DEVTOOLS_PORT}.",
)
@click.option("-v", "verbose", help="Enable verbose logs.", is_flag=True)
@click.option("-x", "--exclude", "exclude", help="Exclude log group(s)", multiple=True)
def console(port: int | None, verbose: bool, exclude: list[str]) -> None:
"""Launch the textual console."""
from rich.console import Console
from textual.devtools.server import _run_devtools
console = Console()
console.clear()
console.show_cursor(False)
try:
_run_devtools(verbose=verbose, exclude=exclude, port=port)
finally:
console.show_cursor(True)
def _pre_run_warnings() -> None:
"""Look for and report any issues with the environment.
This is the right place to add code that looks at the terminal, or other
environmental issues, and if a problem is seen it should be printed so
the developer can see it easily.
"""
import os
from rich.console import Console
from rich.panel import Panel
console = Console()
# Add any test/warning pair here. The list contains a tuple where the
# first item is `True` if a problem situation is detected, and the
# second item is a message to show the user on exit from `textual run`.
warnings = [
(
(
platform.system() == "Darwin"
and os.environ.get("TERM_PROGRAM") == "Apple_Terminal"
),
"The default terminal app on macOS is limited to 256 colors. See our FAQ for more details:\n\n"
"https://github.com/Textualize/textual/blob/main/FAQ.md#why-doesn't-textual-look-good-on-macos",
)
]
for concerning, concern in warnings:
if concerning:
console.print(
Panel.fit(
f"⚠️ {concern}", style="yellow", border_style="red", padding=(1, 2)
)
)
@run.command(
"run",
context_settings={
"ignore_unknown_options": True,
},
)
@click.argument("import_name", metavar="FILE or FILE:APP")
@click.option("--dev", "dev", help="Enable development mode.", is_flag=True)
@click.option(
"--port",
"port",
type=int,
default=None,
metavar="PORT",
help=f"Port to use for the development mode console. Defaults to {DEVTOOLS_PORT}.",
)
@click.option(
"--press", "press", default=None, help="Comma separated keys to simulate press."
)
@click.option(
"--screenshot",
type=int,
default=None,
metavar="DELAY",
help="Take screenshot after DELAY seconds.",
)
@click.option(
"-c",
"--command",
"command",
type=bool,
default=False,
help="Run as command rather that a file / module.",
is_flag=True,
)
@click.option(
"-r",
"--show-return",
"show_return",
type=bool,
default=False,
help="Show any return value on exit.",
is_flag=True,
)
@click.argument("extra_args", nargs=-1, type=click.UNPROCESSED)
def _run_app(
import_name: str,
dev: bool,
port: int | None,
press: str | None,
screenshot: int | None,
extra_args: tuple[str],
command: bool = False,
show_return: bool = False,
) -> None:
"""Run a Textual app.
The app to run may be given as a path (ending with .py) which will be equivalent to running the
script with python, or as a Python import which will import and run an app called "app".
In the case of an import, you can import and run an alternative app by appending a colon followed
by the name of the app instance or class.
Here are some examples:
textual run foo.py
textual run module.foo
textual run module.foo:MyApp
Add the --dev switch to enable the textual console.
textual run --dev foo.py
Use the -c switch to run a command that launches a Textual app.
textual run -c textual colors
"""
import os
from textual.features import parse_features
environment = dict(os.environ)
features = set(parse_features(environment.get("TEXTUAL", "")))
if dev:
features.add("debug")
features.add("devtools")
environment["TEXTUAL"] = ",".join(sorted(features))
if port is not None:
environment["TEXTUAL_DEVTOOLS_PORT"] = str(port)
if press is not None:
environment["TEXTUAL_PRESS"] = str(press)
if screenshot is not None:
environment["TEXTUAL_SCREENSHOT"] = str(screenshot)
if show_return:
environment["TEXTUAL_SHOW_RETURN"] = "1"
_pre_run_warnings()
import_name, *args = [*shlex.split(import_name, posix=not WINDOWS), *extra_args]
if command:
exec_command(import_name, args, environment)
else:
run_app(import_name, args, environment)
@run.command("borders")
def borders():
"""Explore the border styles available in Textual."""
from textual.cli.previews import borders
borders.app.run()
@run.command("easing")
def easing():
"""Explore the animation easing functions available in Textual."""
from textual.cli.previews import easing
easing.app.run()
@run.command("colors")
def colors():
"""Explore the design system."""
from textual.cli.previews import colors
colors.app.run()
@run.command("keys")
def keys():
"""Show key events."""
from textual.cli.previews import keys
keys.app.run()
@run.command("diagnose")
def run_diagnose():
"""Print information about the Textual environment."""
from textual.cli.tools.diagnose import diagnose
diagnose()

View File

@@ -1,71 +0,0 @@
from textual.app import App, ComposeResult
from textual.containers import Vertical
from textual.css.constants import VALID_BORDER
from textual.widgets import Button, Label
TEXT = """I must not fear.
Fear is the mind-killer.
Fear is the little-death that brings total obliteration.
I will face my fear.
I will permit it to pass over me and through me.
And when it has gone past, I will turn the inner eye to see its path.
Where the fear has gone there will be nothing. Only I will remain."""
class BorderButtons(Vertical):
DEFAULT_CSS = """
BorderButtons {
dock: left;
width: 24;
overflow-y: scroll;
}
BorderButtons > Button {
width: 100%;
}
"""
def compose(self) -> ComposeResult:
for border in sorted(VALID_BORDER):
if border:
yield Button(border, id=border)
class BorderApp(App):
"""Demonstrates the border styles."""
CSS = """
Screen {
align: center middle;
overflow: auto;
}
#text {
margin: 2 4;
padding: 2 4;
border: solid $secondary;
height: auto;
background: $panel;
color: $text;
border-title-align: center;
}
"""
def compose(self):
yield BorderButtons()
self.text = Label(TEXT, id="text")
self.text.shrink = True
self.text.border_title = "solid"
yield self.text
def on_button_pressed(self, event: Button.Pressed) -> None:
self.text.border_title = event.button.id
self.text.styles.border = (
event.button.id,
self.stylesheet._variables["secondary"],
)
app = BorderApp()
if __name__ == "__main__":
app.run()

View File

@@ -1,332 +0,0 @@
Label {
width: 100%;
}
ColorButtons {
dock: left;
overflow-y: auto;
width: 30;
}
ColorButtons > Button {
width: 100%;
}
ColorsView {
width: 100%;
height: 100%;
align: center middle;
overflow-x: auto;
background: $background;
scrollbar-gutter: stable;
}
ColorItem {
layout: horizontal;
height: 3;
width: 1fr;
}
ColorBar {
height: auto;
width: 1fr;
content-align: center middle;
}
ColorBar.label {
width: 2fr;
}
ColorItem {
width: 100%;
padding: 1 2;
}
ColorGroup {
margin: 2 0;
width: 80;
height: auto;
padding: 1 4 2 4;
background: $surface;
border: wide $surface;
}
ColorGroup.-active {
border: wide $secondary;
}
.text {
color: $text;
}
.muted {
color: $text-muted;
}
.disabled {
color: $text-disabled;
}
Label {
padding: 0 0 1 0;
content-align: center middle;
color: $text;
text-style: bold;
}
.primary-darken-3 {
background: $primary-darken-3;
}
.primary-darken-2 {
background: $primary-darken-2;
}
.primary-darken-1 {
background: $primary-darken-1;
}
.primary {
background: $primary;
}
.primary-lighten-1 {
background: $primary-lighten-1;
}
.primary-lighten-2 {
background: $primary-lighten-2;
}
.primary-lighten-3 {
background: $primary-lighten-3;
}
.secondary-darken-3 {
background: $secondary-darken-3;
}
.secondary-darken-2 {
background: $secondary-darken-2;
}
.secondary-darken-1 {
background: $secondary-darken-1;
}
.secondary {
background: $secondary;
}
.secondary-lighten-1 {
background: $secondary-lighten-1;
}
.secondary-lighten-2 {
background: $secondary-lighten-2;
}
.secondary-lighten-3 {
background: $secondary-lighten-3;
}
.background-darken-3 {
background: $background-darken-3;
}
.background-darken-2 {
background: $background-darken-2;
}
.background-darken-1 {
background: $background-darken-1;
}
.background {
background: $background;
}
.background-lighten-1 {
background: $background-lighten-1;
}
.background-lighten-2 {
background: $background-lighten-2;
}
.background-lighten-3 {
background: $background-lighten-3;
}
.primary-background-darken-3 {
background: $primary-background-darken-3;
}
.primary-background-darken-2 {
background: $primary-background-darken-2;
}
.primary-background-darken-1 {
background: $primary-background-darken-1;
}
.primary-background {
background: $primary-background;
}
.primary-background-lighten-1 {
background: $primary-background-lighten-1;
}
.primary-background-lighten-2 {
background: $primary-background-lighten-2;
}
.primary-background-lighten-3 {
background: $primary-background-lighten-3;
}
.secondary-background-darken-3 {
background: $secondary-background-darken-3;
}
.secondary-background-darken-2 {
background: $secondary-background-darken-2;
}
.secondary-background-darken-1 {
background: $secondary-background-darken-1;
}
.secondary-background {
background: $secondary-background;
}
.secondary-background-lighten-1 {
background: $secondary-background-lighten-1;
}
.secondary-background-lighten-2 {
background: $secondary-background-lighten-2;
}
.secondary-background-lighten-3 {
background: $secondary-background-lighten-3;
}
.surface-darken-3 {
background: $surface-darken-3;
}
.surface-darken-2 {
background: $surface-darken-2;
}
.surface-darken-1 {
background: $surface-darken-1;
}
.surface {
background: $surface;
}
.surface-lighten-1 {
background: $surface-lighten-1;
}
.surface-lighten-2 {
background: $surface-lighten-2;
}
.surface-lighten-3 {
background: $surface-lighten-3;
}
.panel-darken-3 {
background: $panel-darken-3;
}
.panel-darken-2 {
background: $panel-darken-2;
}
.panel-darken-1 {
background: $panel-darken-1;
}
.panel {
background: $panel;
}
.panel-lighten-1 {
background: $panel-lighten-1;
}
.panel-lighten-2 {
background: $panel-lighten-2;
}
.panel-lighten-3 {
background: $panel-lighten-3;
}
.boost-darken-3 {
background: $boost-darken-3;
}
.boost-darken-2 {
background: $boost-darken-2;
}
.boost-darken-1 {
background: $boost-darken-1;
}
.boost {
background: $boost;
}
.boost-lighten-1 {
background: $boost-lighten-1;
}
.boost-lighten-2 {
background: $boost-lighten-2;
}
.boost-lighten-3 {
background: $boost-lighten-3;
}
.warning-darken-3 {
background: $warning-darken-3;
}
.warning-darken-2 {
background: $warning-darken-2;
}
.warning-darken-1 {
background: $warning-darken-1;
}
.warning {
background: $warning;
}
.warning-lighten-1 {
background: $warning-lighten-1;
}
.warning-lighten-2 {
background: $warning-lighten-2;
}
.warning-lighten-3 {
background: $warning-lighten-3;
}
.error-darken-3 {
background: $error-darken-3;
}
.error-darken-2 {
background: $error-darken-2;
}
.error-darken-1 {
background: $error-darken-1;
}
.error {
background: $error;
}
.error-lighten-1 {
background: $error-lighten-1;
}
.error-lighten-2 {
background: $error-lighten-2;
}
.error-lighten-3 {
background: $error-lighten-3;
}
.success-darken-3 {
background: $success-darken-3;
}
.success-darken-2 {
background: $success-darken-2;
}
.success-darken-1 {
background: $success-darken-1;
}
.success {
background: $success;
}
.success-lighten-1 {
background: $success-lighten-1;
}
.success-lighten-2 {
background: $success-lighten-2;
}
.success-lighten-3 {
background: $success-lighten-3;
}
.accent-darken-3 {
background: $accent-darken-3;
}
.accent-darken-2 {
background: $accent-darken-2;
}
.accent-darken-1 {
background: $accent-darken-1;
}
.accent {
background: $accent;
}
.accent-lighten-1 {
background: $accent-lighten-1;
}
.accent-lighten-2 {
background: $accent-lighten-2;
}
.accent-lighten-3 {
background: $accent-lighten-3;
}

View File

@@ -1,79 +0,0 @@
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.design import ColorSystem
from textual.widgets import Button, Footer, Label, Static
class ColorButtons(VerticalScroll):
def compose(self) -> ComposeResult:
for border in ColorSystem.COLOR_NAMES:
if border:
yield Button(border, id=border)
class ColorBar(Static):
pass
class ColorItem(Horizontal):
pass
class ColorGroup(Vertical):
pass
class Content(Vertical):
pass
class ColorsView(VerticalScroll):
def compose(self) -> ComposeResult:
LEVELS = [
"darken-3",
"darken-2",
"darken-1",
"",
"lighten-1",
"lighten-2",
"lighten-3",
]
for color_name in ColorSystem.COLOR_NAMES:
with ColorGroup(id=f"group-{color_name}"):
yield Label(f'"{color_name}"')
for level in LEVELS:
color = f"{color_name}-{level}" if level else color_name
with ColorItem(classes=color):
yield ColorBar(f"${color}", classes="text label")
yield ColorBar("$text-muted", classes="muted")
yield ColorBar("$text-disabled", classes="disabled")
class ColorsApp(App):
CSS_PATH = "colors.css"
BINDINGS = [("d", "toggle_dark", "Toggle dark mode")]
def compose(self) -> ComposeResult:
yield Content(ColorButtons())
yield Footer()
def on_mount(self) -> None:
self.call_after_refresh(self.update_view)
def update_view(self) -> None:
content = self.query_one("Content", Content)
content.mount(ColorsView())
def on_button_pressed(self, event: Button.Pressed) -> None:
self.query(ColorGroup).remove_class("-active")
group = self.query_one(f"#group-{event.button.id}", ColorGroup)
group.add_class("-active")
group.scroll_visible(top=True, speed=150)
app = ColorsApp()
if __name__ == "__main__":
app.run()

View File

@@ -1,58 +0,0 @@
Label {
width: 100%;
}
EasingButtons > Button {
width: 100%;
}
EasingButtons {
dock: left;
overflow-y: scroll;
width: 20;
}
#bar-container {
content-align: center middle;
}
#duration-input {
width: 30;
background: $boost;
padding: 0 1;
border: tall transparent;
}
#duration-input:focus {
border: tall $accent;
}
#inputs {
padding: 1;
height: auto;
dock: top;
background: $boost;
}
Bar {
width: 1fr;
}
#other {
width: 1fr;
background: $panel;
padding: 1;
height: 100%;
border-left: vkey $background;
}
#opacity-widget {
padding: 1;
background: $warning;
color: $text;
border: wide $background;
}
#label {
width: auto;
padding: 1;
}

View File

@@ -1,126 +0,0 @@
from __future__ import annotations
from rich.console import RenderableType
from textual._easing import EASING
from textual.app import App, ComposeResult
from textual.cli.previews.borders import TEXT
from textual.containers import Horizontal, Vertical
from textual.reactive import reactive, var
from textual.scrollbar import ScrollBarRender
from textual.widget import Widget
from textual.widgets import Button, Footer, Input, Label
VIRTUAL_SIZE = 100
WINDOW_SIZE = 10
START_POSITION = 0.0
END_POSITION = float(VIRTUAL_SIZE - WINDOW_SIZE)
class EasingButtons(Widget):
def compose(self) -> ComposeResult:
for easing in sorted(EASING, reverse=True):
yield Button(easing, id=easing)
class Bar(Widget):
position = reactive(START_POSITION)
animation_running = reactive(False)
DEFAULT_CSS = """
Bar {
background: $surface;
color: $error;
}
Bar.-active {
background: $surface;
color: $success;
}
"""
def watch_animation_running(self, running: bool) -> None:
self.set_class(running, "-active")
def render(self) -> RenderableType:
return ScrollBarRender(
virtual_size=VIRTUAL_SIZE,
window_size=WINDOW_SIZE,
position=self.position,
style=self.rich_style,
)
class EasingApp(App):
position = reactive(START_POSITION)
duration = var(1.0)
def on_load(self):
self.bind(
"ctrl+p", "focus('duration-input')", description="Focus: Duration Input"
)
self.bind("ctrl+b", "toggle_dark", description="Toggle Dark")
def compose(self) -> ComposeResult:
self.animated_bar = Bar()
self.animated_bar.position = START_POSITION
duration_input = Input("1.0", placeholder="Duration", id="duration-input")
self.opacity_widget = Label(
f"[b]Welcome to Textual![/]\n\n{TEXT}", id="opacity-widget"
)
yield EasingButtons()
with Vertical():
with Horizontal(id="inputs"):
yield Label("Animation Duration:", id="label")
yield duration_input
with Horizontal():
yield self.animated_bar
yield Vertical(self.opacity_widget, id="other")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
self.animated_bar.animation_running = True
def _animation_complete():
self.animated_bar.animation_running = False
target_position = (
END_POSITION if self.position == START_POSITION else START_POSITION
)
assert event.button.id is not None # Should be set to an easing function str.
self.animate(
"position",
value=target_position,
final_value=target_position,
duration=self.duration,
easing=event.button.id,
on_complete=_animation_complete,
)
def watch_position(self, value: int):
self.animated_bar.position = value
self.opacity_widget.styles.opacity = 1 - value / END_POSITION
def on_input_changed(self, event: Input.Changed):
if event.input.id == "duration-input":
new_duration = _try_float(event.value)
if new_duration is not None:
self.duration = new_duration
def action_toggle_dark(self):
self.dark = not self.dark
def _try_float(string: str) -> float | None:
try:
return float(string)
except ValueError:
return None
app = EasingApp(css_path="easing.css")
if __name__ == "__main__":
app.run()

View File

@@ -1,72 +0,0 @@
from __future__ import annotations
from rich.panel import Panel
from rich.text import Text
from textual import events
from textual.app import App, ComposeResult
from textual.containers import Horizontal
from textual.reactive import Reactive, var
from textual.widgets import Button, Header, TextLog
INSTRUCTIONS = """\
[u]Press some keys![/]
To quit the app press [b]ctrl+c[/b] [i]twice[/i] or press the Quit button below.\
"""
class KeyLog(TextLog, inherit_bindings=False):
"""We don't want to handle scroll keys."""
class KeysApp(App, inherit_bindings=False):
"""Show key events in a text log."""
TITLE = "Textual Keys"
BINDINGS = [("c", "clear", "Clear")]
CSS = """
#buttons {
dock: bottom;
height: 3;
}
Button {
width: 1fr;
}
"""
last_key: Reactive[str | None] = var(None)
def compose(self) -> ComposeResult:
yield Header()
yield Horizontal(
Button("Clear", id="clear", variant="warning"),
Button("Quit", id="quit", variant="error"),
id="buttons",
)
yield KeyLog()
def on_ready(self) -> None:
self.query_one(KeyLog).write(Panel(Text.from_markup(INSTRUCTIONS)), expand=True)
def on_key(self, event: events.Key) -> None:
self.query_one(KeyLog).write(event)
if event.key == "ctrl+c":
if self.last_key == "ctrl+c":
self.exit()
else:
self.query_one(KeyLog).write("Press Ctrl+C again to quit")
self.last_key = event.key
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "quit":
self.exit()
elif event.button.id == "clear":
self.query_one(KeyLog).clear()
app = KeysApp()
if __name__ == "__main__":
app.run()

View File

@@ -1,5 +0,0 @@
import sys
from ._run import run
run(sys.argv[1], sys.argv[1:])

View File

@@ -1,161 +0,0 @@
"""Textual CLI command code to print diagnostic information."""
from __future__ import annotations
import os
import platform
import sys
from functools import singledispatch
from typing import Any
from importlib_metadata import version
from rich.console import Console, ConsoleDimensions
def _section(title: str, values: dict[str, str]) -> None:
"""Print a collection of named values within a titled section.
Args:
title: The title for the section.
values: The values to print out.
"""
max_name = max(map(len, values.keys()))
max_value = max(map(len, values.values()))
print(f"## {title}")
print()
print(f"| {'Name':{max_name}} | {'Value':{max_value}} |")
print(f"|-{'-' * max_name}-|-{'-'*max_value}-|")
for name, value in values.items():
print(f"| {name:{max_name}} | {value:{max_value}} |")
print()
def _versions() -> None:
"""Print useful version numbers."""
_section("Versions", {"Textual": version("textual"), "Rich": version("rich")})
def _python() -> None:
"""Print information about Python."""
_section(
"Python",
{
"Version": platform.python_version(),
"Implementation": platform.python_implementation(),
"Compiler": platform.python_compiler(),
"Executable": sys.executable,
},
)
def _os() -> None:
_section(
"Operating System",
{
"System": platform.system(),
"Release": platform.release(),
"Version": platform.version(),
},
)
def _guess_term() -> str:
"""Try and guess which terminal is being used.
Returns:
The best guess at the name of the terminal.
"""
# First obvious place to look is in $TERM_PROGRAM.
term_program = os.environ.get("TERM_PROGRAM")
if term_program is None:
# Seems we couldn't get it that way. Let's check for some of the
# more common terminal signatures.
if "ALACRITTY_WINDOW_ID" in os.environ:
term_program = "Alacritty"
elif "KITTY_PID" in os.environ:
term_program = "Kitty"
elif "WT_SESSION" in os.environ:
term_program = "Windows Terminal"
elif "INSIDE_EMACS" in os.environ and os.environ["INSIDE_EMACS"]:
term_program = (
f"GNU Emacs {' '.join(os.environ['INSIDE_EMACS'].split(','))}"
)
elif "JEDITERM_SOURCE_ARGS" in os.environ:
term_program = "PyCharm"
else:
# See if we can pull out some sort of version information too.
term_version = os.environ.get("TERM_PROGRAM_VERSION")
if term_version is not None:
term_program = f"{term_program} ({term_version})"
return "*Unknown*" if term_program is None else term_program
def _env(var_name: str) -> str:
"""Get a representation of an environment variable.
Args:
var_name: The name of the variable to get.
Returns:
The value, or an indication that it isn't set.
"""
return os.environ.get(var_name, "*Not set*")
def _term() -> None:
"""Print information about the terminal."""
_section(
"Terminal",
{
"Terminal Application": _guess_term(),
"TERM": _env("TERM"),
"COLORTERM": _env("COLORTERM"),
"FORCE_COLOR": _env("FORCE_COLOR"),
"NO_COLOR": _env("NO_COLOR"),
},
)
@singledispatch
def _str_rich(value: Any) -> str:
"""Convert a rich console option to a string.
Args:
value: The value to convert to a string.
Returns:
The string version of the value for output
"""
return str(value)
@_str_rich.register
def _(value: ConsoleDimensions) -> str:
return f"width={value.width}, height={value.height}"
def _console() -> None:
"""Print The Rich console options."""
_section(
"Rich Console options",
{k: _str_rich(v) for k, v in Console().options.__dict__.items()},
)
def diagnose() -> None:
"""Print information about Textual and its environment to help diagnose problems."""
print("<!-- This is valid Markdown, do not quote! -->")
print("# Textual Diagnostics")
print()
_versions()
_python()
_os()
_term()
_console()
# TODO: Recommended changes. Given all of the above, make any useful
# recommendations to the user (eg: don't use Windows console, use
# Windows Terminal; don't use macOS Terminal.app, etc).

View File

@@ -1,265 +0,0 @@
from __future__ import annotations
import asyncio
import inspect
import json
import pickle
from asyncio import Queue, QueueFull, Task
from io import StringIO
from time import time
from typing import Any, NamedTuple, Type
import aiohttp
import msgpack
from aiohttp import ClientConnectorError, ClientResponseError, ClientWebSocketResponse
from rich.console import Console
from rich.segment import Segment
from .._log import LogGroup, LogVerbosity
from ..constants import DEVTOOLS_PORT
WEBSOCKET_CONNECT_TIMEOUT = 3
LOG_QUEUE_MAXSIZE = 512
class DevtoolsLog(NamedTuple):
"""A devtools log message.
Attributes:
objects_or_string: Corresponds to the data that will
ultimately be passed to Console.print in order to generate the log
Segments.
caller: Information about where this log message was
created. In other words, where did the user call `print` or `App.log`
from. Used to display line number and file name in the devtools window.
"""
objects_or_string: tuple[Any, ...] | str
caller: inspect.Traceback
class DevtoolsConsole(Console):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.record = True
def export_segments(self) -> list[Segment]:
"""Return the list of Segments that have be printed using this console
Returns:
The list of Segments that have been printed using this console
"""
with self._record_buffer_lock:
segments = self._record_buffer[:]
self._record_buffer.clear()
return segments
class DevtoolsConnectionError(Exception):
"""Raise when the devtools client is unable to connect to the server"""
class ClientShutdown:
"""Sentinel type sent to client queue(s) to indicate shutdown"""
class DevtoolsClient:
"""Client responsible for websocket communication with the devtools server.
Communicates using a simple JSON protocol.
Messages have the format `{"type": <str>, "payload": <json>}`.
Valid values for `"type"` (that can be sent from client -> server) are
`"client_log"` (for log messages) and `"client_spillover"` (for reporting
to the server that messages were discarded due to rate limiting).
A `"client_log"` message has a `"payload"` format as follows:
```
{"timestamp": <int, unix timestamp>,
"path": <str, path of file>,
"line_number": <int, line number log was made from>,
"encoded_segments": <str, pickled then b64 encoded Segments to log>}
```
A `"client_spillover"` message has a `"payload"` format as follows:
```
{"spillover": <int, the number of messages discarded by rate-limiting>}
```
Args:
host: The host the devtools server is running on, defaults to "127.0.0.1"
port: The port the devtools server is accessed via, `DEVTOOLS_PORT` by default.
"""
def __init__(self, host: str = "127.0.0.1", port: int | None = None) -> None:
if port is None:
port = DEVTOOLS_PORT
self.url: str = f"ws://{host}:{port}"
self.session: aiohttp.ClientSession | None = None
self.log_queue_task: Task | None = None
self.update_console_task: Task | None = None
self.console: DevtoolsConsole = DevtoolsConsole(file=StringIO())
self.websocket: ClientWebSocketResponse | None = None
self.log_queue: Queue[str | bytes | Type[ClientShutdown]] | None = None
self.spillover: int = 0
self.verbose: bool = False
async def connect(self) -> None:
"""Connect to the devtools server.
Raises:
DevtoolsConnectionError: If we're unable to establish
a connection to the server for any reason.
"""
self.session = aiohttp.ClientSession()
self.log_queue = Queue(maxsize=LOG_QUEUE_MAXSIZE)
try:
self.websocket = await self.session.ws_connect(
f"{self.url}/textual-devtools-websocket",
timeout=WEBSOCKET_CONNECT_TIMEOUT,
)
except (ClientConnectorError, ClientResponseError):
raise DevtoolsConnectionError()
log_queue = self.log_queue
websocket = self.websocket
async def update_console() -> None:
"""Coroutine function scheduled as a Task, which listens on
the websocket for updates from the server regarding any changes
in the server Console dimensions. When the client learns of this
change, it will update its own Console to ensure it renders at
the correct width for server-side display.
"""
assert self.websocket is not None
async for message in self.websocket:
if message.type == aiohttp.WSMsgType.TEXT:
message_json = json.loads(message.data)
if message_json["type"] == "server_info":
payload = message_json["payload"]
self.console.width = payload["width"]
self.console.height = payload["height"]
self.verbose = payload.get("verbose", False)
async def send_queued_logs():
"""Coroutine function which is scheduled as a Task, which consumes
messages from the log queue and sends them to the server via websocket.
"""
while True:
log = await log_queue.get()
if log is ClientShutdown:
log_queue.task_done()
break
if isinstance(log, str):
await websocket.send_str(log)
else:
assert isinstance(log, bytes)
await websocket.send_bytes(log)
log_queue.task_done()
self.log_queue_task = asyncio.create_task(send_queued_logs())
self.update_console_task = asyncio.create_task(update_console())
async def _stop_log_queue_processing(self) -> None:
"""Schedule end of processing of the log queue, meaning that any messages a
user logs will be added to the queue, but not consumed and sent to
the server.
"""
if self.log_queue is not None:
await self.log_queue.put(ClientShutdown)
if self.log_queue_task:
await self.log_queue_task
async def _stop_incoming_message_processing(self) -> None:
"""Schedule stop of the task which listens for incoming messages from the
server around changes in the server console size.
"""
if self.websocket:
await self.websocket.close()
if self.update_console_task:
await self.update_console_task
if self.session:
await self.session.close()
async def disconnect(self) -> None:
"""Disconnect from the devtools server by stopping tasks and
closing connections.
"""
await self._stop_log_queue_processing()
await self._stop_incoming_message_processing()
@property
def is_connected(self) -> bool:
"""Checks connection to devtools server.
Returns:
True if this host is connected to the server. False otherwise.
"""
if not self.session or not self.websocket:
return False
return not (self.session.closed or self.websocket.closed)
def log(
self,
log: DevtoolsLog,
group: LogGroup = LogGroup.UNDEFINED,
verbosity: LogVerbosity = LogVerbosity.NORMAL,
) -> None:
"""Queue a log to be sent to the devtools server for display.
Args:
log: The log to write to devtools
"""
if isinstance(log.objects_or_string, str):
self.console.print(log.objects_or_string, markup=False)
else:
self.console.print(*log.objects_or_string, markup=False)
segments = self.console.export_segments()
encoded_segments = self._encode_segments(segments)
message: bytes | None = msgpack.packb(
{
"type": "client_log",
"payload": {
"group": group.value,
"verbosity": verbosity.value,
"timestamp": int(time()),
"path": getattr(log.caller, "filename", ""),
"line_number": getattr(log.caller, "lineno", 0),
"segments": encoded_segments,
},
}
)
assert message is not None
try:
if self.log_queue:
self.log_queue.put_nowait(message)
if self.spillover > 0 and self.log_queue.qsize() < LOG_QUEUE_MAXSIZE:
# Tell the server how many messages we had to discard due
# to the log queue filling to capacity on the client.
spillover_message = json.dumps(
{
"type": "client_spillover",
"payload": {
"spillover": self.spillover,
},
}
)
self.log_queue.put_nowait(spillover_message)
self.spillover = 0
except QueueFull:
self.spillover += 1
@classmethod
def _encode_segments(cls, segments: list[Segment]) -> bytes:
"""Pickle a list of Segments
Args:
segments: A list of Segments to encode
Returns:
The Segment list pickled with the latest protocol.
"""
pickled = pickle.dumps(segments, protocol=4)
return pickled

View File

@@ -1,109 +0,0 @@
from __future__ import annotations
import inspect
from typing import TYPE_CHECKING, cast
from .._log import LogGroup, LogVerbosity
from .client import DevtoolsLog
if TYPE_CHECKING:
from .client import DevtoolsClient
class StdoutRedirector:
"""
A write-only file-like object which redirects anything written to it to the devtools
instance associated with the given Textual application. Used within Textual to redirect
data written using `print` (or any other stdout writes) to the devtools and/or to the
log file.
"""
def __init__(self, devtools: DevtoolsClient) -> None:
"""
Args:
devtools: The running Textual app instance.
log_file: The log file for the Textual App.
"""
self.devtools = devtools
self._buffer: list[DevtoolsLog] = []
def write(self, string: str) -> None:
"""Write the log string to the internal buffer. If the string contains
a newline character `\n`, the whole string will be buffered and then the
buffer will be flushed immediately after.
Args:
string: The string to write to the buffer.
"""
if not self.devtools.is_connected:
return
current_frame = inspect.currentframe()
assert current_frame is not None
previous_frame = current_frame.f_back
assert previous_frame is not None
caller = inspect.getframeinfo(previous_frame)
self._buffer.append(DevtoolsLog(string, caller=caller))
# By default, `print` adds a "\n" suffix which results in a buffer
# flush. You can choose a different suffix with the `end` parameter.
# If you modify the `end` parameter to something other than "\n",
# then `print` will no longer flush automatically. However, if a
# string you are printing contains a "\n", that will trigger
# a flush after that string has been buffered, regardless of the value
# of `end`.
if "\n" in string:
self.flush()
def flush(self) -> None:
"""Flush the buffer. This will send all buffered log messages to
the devtools server and the log file. In the case of the devtools,
where possible, log messages will be batched and sent as one.
"""
self._write_to_devtools()
self._buffer.clear()
def _write_to_devtools(self) -> None:
"""Send the contents of the buffer to the devtools."""
if not self.devtools.is_connected:
return
log_batch: list[DevtoolsLog] = []
for log in self._buffer:
end_of_batch = log_batch and (
log_batch[-1].caller.filename != log.caller.filename
or log_batch[-1].caller.lineno != log.caller.lineno
)
if end_of_batch:
self._log_devtools_batched(log_batch)
log_batch.clear()
log_batch.append(log)
if log_batch:
self._log_devtools_batched(log_batch)
def _log_devtools_batched(self, log_batch: list[DevtoolsLog]) -> None:
"""Write a single batch of logs to devtools. A batch means contiguous logs
which have been written from the same line number and file path.
A single `print` call may correspond to multiple writes.
e.g. `print("a", "b", "c")` is 3 calls to `write`, so we batch
up these 3 write calls since they come from the same location, so that
they appear inside the same log message in the devtools window
rather than a single `print` statement resulting in 3 separate
logs being displayed.
Args:
log_batch: A batch of logs to send to the
devtools server as one. Log content will be joined together.
"""
# This code is only called via stdout.write, and so by this point we know
# that the log message content is a string. The cast below tells mypy this.
batched_log = "".join(cast(str, log.objects_or_string) for log in log_batch)
batched_log = batched_log.rstrip()
self.devtools.log(
DevtoolsLog(batched_log, caller=log_batch[-1].caller),
LogGroup.PRINT,
LogVerbosity.NORMAL,
)

View File

@@ -1,134 +0,0 @@
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Iterable
from importlib_metadata import version
from rich.align import Align
from rich.console import Console, ConsoleOptions, RenderResult
from rich.markup import escape
from rich.rule import Rule
from rich.segment import Segment, Segments
from rich.style import Style
from rich.styled import Styled
from rich.table import Table
from rich.text import Text
from typing_extensions import Literal
from textual._log import LogGroup
DevConsoleMessageLevel = Literal["info", "warning", "error"]
class DevConsoleHeader:
def __init__(self, verbose: bool = False) -> None:
self.verbose = verbose
def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
preamble = Text.from_markup(
f"[bold]Textual Development Console [magenta]v{version('textual')}\n"
"[magenta]Run a Textual app with [reverse]textual run --dev my_app.py[/] to connect.\n"
"[magenta]Press [reverse]Ctrl+C[/] to quit."
)
if self.verbose:
preamble.append(Text.from_markup("\n[cyan]Verbose logs enabled"))
render_options = options.update(width=options.max_width - 4)
lines = console.render_lines(preamble, render_options)
new_line = Segment.line()
padding = Segment("", Style.parse("bright_magenta"))
for line in lines:
yield padding
yield from line
yield new_line
class DevConsoleLog:
"""Renderable representing a single log message
Args:
segments: The segments to display
path: The path of the file on the client that the log call was made from
line_number: The line number of the file on the client the log call was made from
unix_timestamp: Seconds since January 1st 1970
"""
def __init__(
self,
segments: Iterable[Segment],
path: str,
line_number: int,
unix_timestamp: int,
group: int,
verbosity: int,
severity: int,
) -> None:
self.segments = segments
self.path = path
self.line_number = line_number
self.unix_timestamp = unix_timestamp
self.group = group
self.verbosity = verbosity
self.severity = severity
def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
local_time = datetime.fromtimestamp(self.unix_timestamp)
table = Table.grid(expand=True)
file_link = escape(f"file://{Path(self.path).absolute()}")
file_and_line = escape(f"{Path(self.path).name}:{self.line_number}")
group = LogGroup(self.group).name
time = local_time.time()
group_text = Text(group)
if group == "WARNING":
group_text.stylize("bold yellow reverse")
elif group == "ERROR":
group_text.stylize("bold red reverse")
else:
group_text.stylize("dim")
log_message = Text.assemble((f"[{time}]", "dim"), " ", group_text)
table.add_row(
log_message,
Align.right(
Text(f"{file_and_line}", style=Style(dim=True, link=file_link))
),
)
yield table
if group == "PRINT":
yield Styled(Segments(self.segments), "bold")
else:
yield from self.segments
class DevConsoleNotice:
"""Renderable for messages written by the devtools console itself
Args:
message: The message to display
level: The message level ("info", "warning", or "error").
Determines colors used to render the message and the perceived importance.
"""
def __init__(self, message: str, *, level: DevConsoleMessageLevel = "info") -> None:
self.message = message
self.level = level
def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
level_to_style = {
"info": "dim",
"warning": "yellow",
"error": "red",
}
yield Rule(self.message, style=level_to_style.get(self.level, "dim"))

View File

@@ -1,86 +0,0 @@
from __future__ import annotations
import asyncio
from aiohttp.web import run_app
from aiohttp.web_app import Application
from aiohttp.web_request import Request
from aiohttp.web_routedef import get
from aiohttp.web_ws import WebSocketResponse
from ..constants import DEVTOOLS_PORT
from .client import DEVTOOLS_PORT
from .service import DevtoolsService
DEFAULT_SIZE_CHANGE_POLL_DELAY_SECONDS = 2
async def websocket_handler(request: Request) -> WebSocketResponse:
"""aiohttp websocket handler for sending data between devtools client and server
Args:
request: The request to the websocket endpoint
Returns:
The websocket response
"""
service: DevtoolsService = request.app["service"]
return await service.handle(request)
async def _on_shutdown(app: Application) -> None:
"""aiohttp shutdown handler, called when the aiohttp server is stopped"""
service: DevtoolsService = app["service"]
await service.shutdown()
async def _on_startup(app: Application) -> None:
service: DevtoolsService = app["service"]
await service.start()
def _run_devtools(
verbose: bool, exclude: list[str] | None = None, port: int | None = None
) -> None:
app = _make_devtools_aiohttp_app(verbose=verbose, exclude=exclude)
def noop_print(_: str) -> None:
pass
try:
run_app(
app,
port=DEVTOOLS_PORT if port is None else port,
print=noop_print,
loop=asyncio.get_event_loop(),
)
except OSError:
from rich import print
print()
print("[bold red]Couldn't start server")
print("Is there another instance of [reverse]textual console[/] running?")
def _make_devtools_aiohttp_app(
size_change_poll_delay_secs: float = DEFAULT_SIZE_CHANGE_POLL_DELAY_SECONDS,
verbose: bool = False,
exclude: list[str] | None = None,
) -> Application:
app = Application()
app.on_shutdown.append(_on_shutdown)
app.on_startup.append(_on_startup)
app["verbose"] = verbose
app["service"] = DevtoolsService(
update_frequency=size_change_poll_delay_secs, verbose=verbose, exclude=exclude
)
app.add_routes(
[
get("/textual-devtools-websocket", websocket_handler),
]
)
return app

View File

@@ -1,289 +0,0 @@
"""Manages a running devtools instance"""
from __future__ import annotations
import asyncio
import json
import pickle
from json import JSONDecodeError
from typing import Any
import msgpack
from aiohttp import WSMsgType
from aiohttp.abc import Request
from aiohttp.web_ws import WebSocketResponse
from rich.console import Console
from rich.markup import escape
from textual._log import LogGroup
from textual._time import time
from textual.devtools.renderables import (
DevConsoleHeader,
DevConsoleLog,
DevConsoleNotice,
)
QUEUEABLE_TYPES = {"client_log", "client_spillover"}
class DevtoolsService:
"""A running instance of devtools has a single DevtoolsService which is
responsible for tracking connected client applications.
"""
def __init__(
self,
update_frequency: float,
verbose: bool = False,
exclude: list[str] | None = None,
) -> None:
"""
Args:
update_frequency: The number of seconds to wait between
sending updates of the console size to connected clients.
verbose: Enable verbose logging on client.
exclude: List of log groups to exclude from output.
"""
self.update_frequency = update_frequency
self.verbose = verbose
self.exclude = {name.upper() for name in exclude} if exclude else set()
self.console = Console()
self.shutdown_event = asyncio.Event()
self.clients: list[ClientHandler] = []
async def start(self):
"""Starts devtools tasks"""
self.size_poll_task = asyncio.create_task(self._console_size_poller())
self.console.print(DevConsoleHeader(verbose=self.verbose))
@property
def clients_connected(self) -> bool:
"""Returns True if there are connected clients, False otherwise."""
return len(self.clients) > 0
async def _console_size_poller(self) -> None:
"""Poll console dimensions, and add a `server_info` message to the Queue
any time a change occurs. We only poll if there are clients connected,
and if we're not shutting down the server.
"""
current_width = self.console.width
current_height = self.console.height
await self._send_server_info_to_all()
while not self.shutdown_event.is_set():
width = self.console.width
height = self.console.height
dimensions_changed = width != current_width or height != current_height
if dimensions_changed:
await self._send_server_info_to_all()
current_width = width
current_height = height
try:
await asyncio.wait_for(
self.shutdown_event.wait(), timeout=self.update_frequency
)
except asyncio.TimeoutError:
pass
async def _send_server_info_to_all(self) -> None:
"""Add `server_info` message to the queues of every client"""
for client_handler in self.clients:
await self.send_server_info(client_handler)
async def send_server_info(self, client_handler: ClientHandler) -> None:
"""Send information about the server e.g. width and height of Console to
a connected client.
Args:
client_handler: The client to send information to
"""
await client_handler.send_message(
{
"type": "server_info",
"payload": {
"width": self.console.width,
"height": self.console.height,
"verbose": self.verbose,
},
}
)
async def handle(self, request: Request) -> WebSocketResponse:
"""Handles a single client connection"""
client = ClientHandler(request, service=self)
self.clients.append(client)
websocket = await client.run()
self.clients.remove(client)
return websocket
async def shutdown(self) -> None:
"""Stop server async tasks and clean up all client handlers"""
# Stop polling/writing Console dimensions to clients
self.shutdown_event.set()
await self.size_poll_task
# We're shutting down the server, so inform all connected clients
for client in self.clients:
await client.close()
self.clients.clear()
class ClientHandler:
"""Handles a single client connection to the devtools.
A single DevtoolsService managers many ClientHandlers. A single ClientHandler
corresponds to a single running Textual application instance, and is responsible
for communication with that Textual app.
"""
def __init__(self, request: Request, service: DevtoolsService) -> None:
"""
Args:
request: The aiohttp.Request associated with this client
service: The parent DevtoolsService which is responsible
for the handling of this client.
"""
self.request = request
self.service = service
self.websocket = WebSocketResponse()
async def send_message(self, message: dict[str, object]) -> None:
"""Send a message to a client
Args:
message: The dict which will be sent
to the client.
"""
await self.outgoing_queue.put(message)
async def _consume_outgoing(self) -> None:
"""Consume messages from the outgoing (server -> client) Queue."""
while True:
message_json = await self.outgoing_queue.get()
if message_json is None:
self.outgoing_queue.task_done()
break
type = message_json["type"]
if type == "server_info":
await self.websocket.send_json(message_json)
self.outgoing_queue.task_done()
async def _consume_incoming(self) -> None:
"""Consume messages from the incoming (client -> server) Queue, and print
the corresponding renderables to the console for each message.
"""
last_message_time: float | None = None
while True:
message = await self.incoming_queue.get()
if message is None:
self.incoming_queue.task_done()
break
type = message["type"]
if type == "client_log":
payload = message["payload"]
if LogGroup(payload.get("group", 0)).name in self.service.exclude:
continue
encoded_segments = payload["segments"]
segments = pickle.loads(encoded_segments)
message_time = time()
if (
last_message_time is not None
and message_time - last_message_time > 0.5
):
# Print a rule if it has been longer than half a second since the last message
self.service.console.rule()
self.service.console.print(
DevConsoleLog(
segments=segments,
path=payload["path"],
line_number=payload["line_number"],
unix_timestamp=payload["timestamp"],
group=payload.get("group", 0),
verbosity=payload.get("verbosity", 0),
severity=payload.get("severity", 0),
)
)
last_message_time = message_time
elif type == "client_spillover":
spillover = int(message["payload"]["spillover"])
info_renderable = DevConsoleNotice(
f"Discarded {spillover} messages", level="warning"
)
self.service.console.print(info_renderable)
self.incoming_queue.task_done()
async def run(self) -> WebSocketResponse:
"""Prepare the websocket and communication queues, and continuously
read messages from the queues.
Returns:
The WebSocketResponse associated with this client.
"""
await self.websocket.prepare(self.request)
self.incoming_queue: asyncio.Queue[dict | None] = asyncio.Queue()
self.outgoing_queue: asyncio.Queue[dict | None] = asyncio.Queue()
self.outgoing_messages_task = asyncio.create_task(self._consume_outgoing())
self.incoming_messages_task = asyncio.create_task(self._consume_incoming())
if self.request.remote:
self.service.console.print(
DevConsoleNotice(f"Client '{escape(self.request.remote)}' connected")
)
try:
await self.service.send_server_info(client_handler=self)
async for websocket_message in self.websocket:
if websocket_message.type in (WSMsgType.TEXT, WSMsgType.BINARY):
message: dict[str, Any]
try:
if isinstance(websocket_message.data, bytes):
message = msgpack.unpackb(websocket_message.data)
else:
message = json.loads(websocket_message.data)
except JSONDecodeError:
self.service.console.print(escape(str(websocket_message.data)))
continue
type = message.get("type")
if not type:
continue
if (
type in QUEUEABLE_TYPES
and not self.service.shutdown_event.is_set()
):
await self.incoming_queue.put(message)
elif websocket_message.type == WSMsgType.ERROR:
self.service.console.print(websocket_message.data)
self.service.console.print(
DevConsoleNotice("Websocket error occurred", level="error")
)
break
except Exception as error:
self.service.console.print(DevConsoleNotice(str(error), level="error"))
finally:
if self.request.remote:
self.service.console.print(
"\n",
DevConsoleNotice(
f"Client '{escape(self.request.remote)}' disconnected"
),
)
await self.close()
return self.websocket
async def close(self) -> None:
"""Stop all incoming/outgoing message processing,
and shutdown the websocket connection associated with this
client.
"""
# Stop any writes to the websocket first
await self.outgoing_queue.put(None)
await self.outgoing_messages_task
# Now we can shut the socket down
await self.websocket.close()
# This task is independent of the websocket
await self.incoming_queue.put(None)
await self.incoming_messages_task

View File

@@ -1,10 +0,0 @@
from click.testing import CliRunner
from importlib_metadata import version
from textual.cli.cli import run
def test_cli_version():
runner = CliRunner()
result = runner.invoke(run, ["--version"])
assert version("textual") in result.output

View File

@@ -1,14 +0,0 @@
import os
import sys
import pytest
_MACOS_CI = sys.platform == "darwin" and os.getenv("CI", "0") != "0"
_WINDOWS = sys.platform == "win32"
# TODO - this needs to be revisited - perhaps when aiohttp 4.0 is released?
# We get occasional test failures relating to devtools. These *appear* to be limited to MacOS,
# and the error messages suggest the event loop is being shutdown before async fixture
# teardown code has finished running. These are very rare, but are much more of an issue on
# CI since they can delay builds that have passed locally.
pytestmark = pytest.mark.skipif(_MACOS_CI or _WINDOWS, reason="Issue #411")

View File

@@ -1,27 +0,0 @@
import pytest
from textual.devtools.client import DevtoolsClient
from textual.devtools.server import _make_devtools_aiohttp_app
from textual.devtools.service import DevtoolsService
@pytest.fixture
async def server(aiohttp_server, unused_tcp_port):
app = _make_devtools_aiohttp_app(
size_change_poll_delay_secs=0.001,
)
server = await aiohttp_server(app, port=unused_tcp_port)
service: DevtoolsService = app["service"]
yield server
await service.shutdown()
await server.close()
@pytest.fixture
async def devtools(aiohttp_client, server):
client = await aiohttp_client(server)
devtools = DevtoolsClient(host=client.host, port=client.port)
await devtools.connect()
yield devtools
await devtools.disconnect()
await client.close()

View File

@@ -1,96 +0,0 @@
from datetime import datetime
import msgpack
import pytest
import time_machine
from rich.align import Align
from rich.console import Console
from rich.segment import Segment
from tests.utilities.render import wait_for_predicate
from textual.devtools.renderables import DevConsoleLog, DevConsoleNotice
TIMESTAMP = 1649166819
WIDTH = 40
# The string "Hello, world!" is encoded in the payload below
_EXAMPLE_LOG = {
"type": "client_log",
"payload": {
"segments": b"\x80\x04\x955\x00\x00\x00\x00\x00\x00\x00]\x94\x8c\x0crich.segment\x94\x8c\x07Segment\x94\x93\x94\x8c\rHello, world!\x94NN\x87\x94\x81\x94a.",
"line_number": 123,
"path": "abc/hello.py",
"timestamp": TIMESTAMP,
},
}
EXAMPLE_LOG = msgpack.packb(_EXAMPLE_LOG)
@pytest.fixture(scope="module")
def console():
return Console(width=WIDTH)
@time_machine.travel(TIMESTAMP)
def test_log_message_render(console):
message = DevConsoleLog(
[Segment("content")],
path="abc/hello.py",
line_number=123,
unix_timestamp=TIMESTAMP,
group=0,
verbosity=0,
severity=0,
)
table = next(iter(message.__rich_console__(console, console.options)))
assert len(table.rows) == 1
columns = list(table.columns)
left_cells = list(columns[0].cells)
left = left_cells[0]
right_cells = list(columns[1].cells)
right: Align = right_cells[0]
# Since we can't guarantee the timezone the tests will run in...
local_time = datetime.fromtimestamp(TIMESTAMP)
string_timestamp = local_time.time()
assert left.plain == f"[{string_timestamp}] UNDEFINED"
assert right.align == "right"
assert "hello.py:123" in right.renderable
def test_internal_message_render(console):
message = DevConsoleNotice("hello")
rule = next(iter(message.__rich_console__(console, console.options)))
assert rule.title == "hello"
assert rule.characters == ""
async def test_devtools_valid_client_log(devtools):
await devtools.websocket.send_bytes(EXAMPLE_LOG)
assert devtools.is_connected
async def test_devtools_string_not_json_message(devtools):
await devtools.websocket.send_str("ABCDEFG")
assert devtools.is_connected
async def test_devtools_invalid_json_message(devtools):
await devtools.websocket.send_json({"invalid": "json"})
assert devtools.is_connected
async def test_devtools_spillover_message(devtools):
await devtools.websocket.send_json(
{"type": "client_spillover", "payload": {"spillover": 123}}
)
assert devtools.is_connected
async def test_devtools_console_size_change(server, devtools):
# Update the width of the console on the server-side
server.app["service"].console.width = 124
# Wait for the client side to update the console on their end
await wait_for_predicate(lambda: devtools.console.width == 124)

View File

@@ -1,107 +0,0 @@
import json
import types
from asyncio import Queue
from datetime import datetime
import msgpack
import time_machine
from aiohttp.web_ws import WebSocketResponse
from rich.console import ConsoleDimensions
from rich.panel import Panel
from tests.utilities.render import wait_for_predicate
from textual.constants import DEVTOOLS_PORT
from textual.devtools.client import DevtoolsClient
from textual.devtools.redirect_output import DevtoolsLog
CALLER_LINENO = 123
CALLER_PATH = "a/b/c.py"
CALLER = types.SimpleNamespace(filename=CALLER_PATH, lineno=CALLER_LINENO)
TIMESTAMP = 1649166819
def test_devtools_client_initialize_defaults():
devtools = DevtoolsClient()
assert devtools.url == f"ws://127.0.0.1:{DEVTOOLS_PORT}"
async def test_devtools_client_is_connected(devtools):
assert devtools.is_connected
@time_machine.travel(datetime.utcfromtimestamp(TIMESTAMP))
async def test_devtools_log_places_encodes_and_queues_message(devtools):
await devtools._stop_log_queue_processing()
devtools.log(DevtoolsLog("Hello, world!", CALLER))
queued_log = await devtools.log_queue.get()
queued_log_data = msgpack.unpackb(queued_log)
print(repr(queued_log_data))
@time_machine.travel(datetime.utcfromtimestamp(TIMESTAMP))
async def test_devtools_log_places_encodes_and_queues_many_logs_as_string(devtools):
await devtools._stop_log_queue_processing()
devtools.log(DevtoolsLog(("hello", "world"), CALLER))
queued_log = await devtools.log_queue.get()
queued_log_data = msgpack.unpackb(queued_log)
print(repr(queued_log_data))
assert queued_log_data == {
"type": "client_log",
"payload": {
"group": 0,
"verbosity": 0,
"timestamp": 1649166819,
"path": "a/b/c.py",
"line_number": 123,
"segments": b"\x80\x04\x95@\x00\x00\x00\x00\x00\x00\x00]\x94(\x8c\x0crich.segment\x94\x8c\x07Segment\x94\x93\x94\x8c\x0bhello world\x94NN\x87\x94\x81\x94h\x03\x8c\x01\n\x94NN\x87\x94\x81\x94e.",
},
}
async def test_devtools_log_spillover(devtools):
# Give the devtools an intentionally small max queue size
await devtools._stop_log_queue_processing()
devtools.log_queue = Queue(maxsize=2)
# Force spillover of 2
devtools.log(DevtoolsLog((Panel("hello, world"),), CALLER))
devtools.log(DevtoolsLog("second message", CALLER))
devtools.log(DevtoolsLog("third message", CALLER)) # Discarded by rate-limiting
devtools.log(DevtoolsLog("fourth message", CALLER)) # Discarded by rate-limiting
assert devtools.spillover == 2
# Consume log queue
while not devtools.log_queue.empty():
await devtools.log_queue.get()
# Add another message now that we're under spillover threshold
devtools.log(DevtoolsLog("another message", CALLER))
await devtools.log_queue.get()
# Ensure we're informing the server of spillover rate-limiting
spillover_message = await devtools.log_queue.get()
assert json.loads(spillover_message) == {
"type": "client_spillover",
"payload": {"spillover": 2},
}
async def test_devtools_client_update_console_dimensions(devtools, server):
"""Sending new server info through websocket from server to client should (eventually)
result in the dimensions of the devtools client console being updated to match.
"""
server_to_client: WebSocketResponse = next(
iter(server.app["service"].clients)
).websocket
server_info = {
"type": "server_info",
"payload": {
"width": 123,
"height": 456,
},
}
await server_to_client.send_json(server_info)
await wait_for_predicate(
lambda: devtools.console.size == ConsoleDimensions(123, 456)
)

View File

@@ -1,108 +0,0 @@
from contextlib import redirect_stdout
from datetime import datetime
import msgpack
import time_machine
from textual.devtools.redirect_output import StdoutRedirector
TIMESTAMP = 1649166819
@time_machine.travel(datetime.utcfromtimestamp(TIMESTAMP))
async def test_print_redirect_to_devtools_only(devtools):
await devtools._stop_log_queue_processing()
with redirect_stdout(StdoutRedirector(devtools)): # type: ignore
print("Hello, world!")
assert devtools.log_queue.qsize() == 1
queued_log = await devtools.log_queue.get()
queued_log_data = msgpack.unpackb(queued_log)
print(repr(queued_log_data))
payload = queued_log_data["payload"]
assert queued_log_data["type"] == "client_log"
assert payload["timestamp"] == TIMESTAMP
assert (
payload["segments"]
== b"\x80\x04\x95B\x00\x00\x00\x00\x00\x00\x00]\x94(\x8c\x0crich.segment\x94\x8c\x07Segment\x94\x93\x94\x8c\rHello, world!\x94NN\x87\x94\x81\x94h\x03\x8c\x01\n\x94NN\x87\x94\x81\x94e."
)
async def test_print_redirect_to_logfile_only(devtools):
await devtools.disconnect()
with redirect_stdout(StdoutRedirector(devtools)): # type: ignore
print("Hello, world!")
async def test_print_redirect_to_devtools_and_logfile(devtools):
await devtools._stop_log_queue_processing()
with redirect_stdout(StdoutRedirector(devtools)): # type: ignore
print("Hello, world!")
assert devtools.log_queue.qsize() == 1
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_print_without_flush_not_sent_to_devtools(devtools):
await devtools._stop_log_queue_processing()
with redirect_stdout(StdoutRedirector(devtools)): # type: ignore
# End is no longer newline character, so print will no longer
# flush the output buffer by default.
print("Hello, world!", end="")
assert devtools.log_queue.qsize() == 0
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_print_forced_flush_sent_to_devtools(devtools):
await devtools._stop_log_queue_processing()
with redirect_stdout(StdoutRedirector(devtools)): # type: ignore
print("Hello, world!", end="", flush=True)
assert devtools.log_queue.qsize() == 1
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_print_multiple_args_batched_as_one_log(devtools):
await devtools._stop_log_queue_processing()
redirector = StdoutRedirector(devtools)
with redirect_stdout(redirector): # type: ignore
# This print adds 3 messages to the buffer that can be batched
print("The first", "batch", "of logs", end="")
# This message cannot be batched with the previous message,
# and so it will be the 2nd item added to the log queue.
print("I'm in the second batch")
assert devtools.log_queue.qsize() == 2
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_print_strings_containing_newline_flushed(devtools):
await devtools._stop_log_queue_processing()
with redirect_stdout(StdoutRedirector(devtools)): # type: ignore
# Flushing is disabled since end="", but the first
# string will be flushed since it contains a newline
print("Hel\nlo", end="")
print("world", end="")
assert devtools.log_queue.qsize() == 1
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_flush_flushes_buffered_logs(devtools):
await devtools._stop_log_queue_processing()
redirector = StdoutRedirector(devtools)
with redirect_stdout(redirector): # type: ignore
print("x", end="")
assert devtools.log_queue.qsize() == 0
redirector.flush()
assert devtools.log_queue.qsize() == 1

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,5 @@
from textual_dev.previews import BorderApp
app = BorderApp()
if __name__ == "__main__":
app.run()

View File

@@ -0,0 +1,5 @@
from textual_dev.previews import ColorsApp
app = ColorsApp()
if __name__ == "__main__":
app.run()

View File

@@ -0,0 +1,5 @@
from textual_dev.previews import EasingApp
app = EasingApp()
if __name__ == "__main__":
app.run()

View File

@@ -0,0 +1,5 @@
from textual_dev.previews import KeysApp
app = KeysApp()
if __name__ == "__main__":
app.run()

View File

@@ -7,7 +7,6 @@ WIDGET_EXAMPLES_DIR = Path("../../docs/examples/widgets")
LAYOUT_EXAMPLES_DIR = Path("../../docs/examples/guide/layout")
STYLES_EXAMPLES_DIR = Path("../../docs/examples/styles")
SNAPSHOT_APPS_DIR = Path("./snapshot_apps")
CLI_PREVIEWS_DIR = Path("../../src/textual/cli/previews")
# --- Layout related stuff ---
@@ -352,26 +351,6 @@ def test_programmatic_scrollbar_gutter_change(snap_compare):
)
# --- CLI Preview Apps ---
# For our CLI previews e.g. `textual easing`, `textual colors` etc, we have snapshots
def test_borders_preview(snap_compare):
assert snap_compare(CLI_PREVIEWS_DIR / "borders.py", press=["enter"])
def test_colors_preview(snap_compare):
assert snap_compare(CLI_PREVIEWS_DIR / "colors.py")
def test_easing_preview(snap_compare):
assert snap_compare(CLI_PREVIEWS_DIR / "easing.py")
def test_keys_preview(snap_compare):
assert snap_compare(CLI_PREVIEWS_DIR / "keys.py", press=["a", "b"])
# --- Other ---
@@ -579,3 +558,20 @@ def test_tooltips_in_compound_widgets(snap_compare):
await pilot.pause()
assert snap_compare(SNAPSHOT_APPS_DIR / "tooltips.py", run_before=run_before)
# --- textual-dev library preview tests ---
def test_textual_dev_border_preview(snap_compare):
assert snap_compare(SNAPSHOT_APPS_DIR / "dev_previews_border.py", press=["enter"])
def test_textual_dev_colors_preview(snap_compare):
assert snap_compare(SNAPSHOT_APPS_DIR / "dev_previews_color.py")
def test_textual_dev_easing_preview(snap_compare):
assert snap_compare(SNAPSHOT_APPS_DIR / "dev_previews_easing.py")
def test_textual_dev_keys_preview(snap_compare):
assert snap_compare(SNAPSHOT_APPS_DIR / "dev_previews_keys.py", press=["a", "b"])

View File

@@ -1,9 +1,6 @@
import asyncio
import io
import re
from typing import Callable
import pytest
from rich.console import Console, RenderableType
re_link_ids = re.compile(r"id=[\d\.\-]*?;.*?\x1b")
@@ -24,32 +21,3 @@ def render(renderable: RenderableType, no_wrap: bool = False) -> str:
console.print(renderable, no_wrap=no_wrap, end="")
output = replace_link_ids(capture.get())
return output
async def wait_for_predicate(
predicate: Callable[[], bool],
timeout_secs: float = 2,
poll_delay_secs: float = 0.001,
) -> None:
"""Wait for the given predicate to become True by evaluating it every `poll_delay_secs`
seconds. Fail the pytest test if the predicate does not become True after `timeout_secs`
seconds.
Args:
predicate (Callable[[], bool]): The predicate function which will be called repeatedly.
timeout_secs (float): If the predicate doesn't evaluate to True after this number of
seconds, the test will fail.
poll_delay_secs (float): The number of seconds to wait between each call to the
predicate function.
"""
time_taken = 0
while True:
result = predicate()
if result:
return
await asyncio.sleep(poll_delay_secs)
time_taken += poll_delay_secs
if time_taken > timeout_secs:
pytest.fail(
f"Predicate {predicate} did not return True after {timeout_secs} seconds."
)