Testing devtools print forwarding

This commit is contained in:
Darren Burns
2022-04-12 16:18:13 +01:00
parent c1b167f48a
commit b75d384d42
6 changed files with 124 additions and 9 deletions

View File

View File

@@ -0,0 +1,97 @@
from datetime import datetime, timezone
import pytest
import time_machine
from rich.align import Align
from rich.console import Console
from rich.segment import Segment
from tests.utilities.render import wait_for_predicate
from textual.devtools.renderables import DevtoolsLogMessage, DevtoolsInternalMessage
TIMESTAMP = 1649166819
WIDTH = 40
# The string "Hello, world!" is encoded in the payload below
EXAMPLE_LOG = {
"type": "client_log",
"payload": {
"encoded_segments": "gASVQgAAAAAAAABdlCiMDHJpY2guc2VnbWVudJSMB1NlZ"
"21lbnSUk5SMDUhlbGxvLCB3b3JsZCGUTk6HlIGUaAOMAQqUTk6HlIGUZS4=",
"line_number": 123,
"path": "abc/hello.py",
"timestamp": TIMESTAMP,
},
}
@pytest.fixture(scope="module")
def console():
return Console(width=WIDTH)
@time_machine.travel(TIMESTAMP)
def test_log_message_render(console):
message = DevtoolsLogMessage(
[Segment("content")],
path="abc/hello.py",
line_number=123,
unix_timestamp=TIMESTAMP,
)
table = next(iter(message.__rich_console__(console, console.options)))
assert len(table.rows) == 1
columns = list(table.columns)
left_cells = list(columns[0].cells)
left = left_cells[0]
right_cells = list(columns[1].cells)
right: Align = right_cells[0]
# Since we can't guarantee the timezone the tests will run in...
local_time = (
datetime.fromtimestamp(TIMESTAMP)
.replace(tzinfo=timezone.utc)
.astimezone(tz=datetime.now().astimezone().tzinfo)
)
timezone_name = local_time.tzname()
string_timestamp = local_time.time()
assert left == f" [#888177]{string_timestamp} [dim]{timezone_name}[/]"
assert right.align == "right"
assert "hello.py:123" in right.renderable
def test_internal_message_render(console):
message = DevtoolsInternalMessage("hello")
rule = next(iter(message.__rich_console__(console, console.options)))
assert rule.title == "hello"
assert rule.characters == ""
async def test_devtools_valid_client_log(devtools):
await devtools.websocket.send_json(EXAMPLE_LOG)
assert devtools.is_connected
async def test_devtools_string_not_json_message(devtools):
await devtools.websocket.send_str("ABCDEFG")
assert devtools.is_connected
async def test_devtools_invalid_json_message(devtools):
await devtools.websocket.send_json({"invalid": "json"})
assert devtools.is_connected
async def test_devtools_spillover_message(devtools):
await devtools.websocket.send_json(
{"type": "client_spillover", "payload": {"spillover": 123}}
)
assert devtools.is_connected
async def test_devtools_console_size_change(server, devtools):
# Update the width of the console on the server-side
server.app["service"].console.width = 124
# Wait for the client side to update the console on their end
await wait_for_predicate(lambda: devtools.console.width == 124)

View File

@@ -0,0 +1,106 @@
import json
from asyncio import Queue
from datetime import datetime
import time_machine
from aiohttp.web_ws import WebSocketResponse
from rich.console import ConsoleDimensions
from rich.panel import Panel
from tests.utilities.render import wait_for_predicate
from textual.devtools.client import DevtoolsClient
from textual.devtools.redirect_output import DevtoolsLog
TIMESTAMP = 1649166819
def test_devtools_client_initialize_defaults():
devtools = DevtoolsClient()
assert devtools.url == "ws://127.0.0.1:8081"
async def test_devtools_client_is_connected(devtools):
assert devtools.is_connected
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_devtools_log_places_encodes_and_queues_message(devtools):
await devtools._stop_log_queue_processing()
devtools.log(DevtoolsLog("Hello, world!"))
queued_log = await devtools.log_queue.get()
queued_log_json = json.loads(queued_log)
assert queued_log_json == {
"type": "client_log",
"payload": {
"timestamp": TIMESTAMP,
"path": "",
"line_number": 0,
"encoded_segments": "gANdcQAoY3JpY2guc2VnbWVudApTZWdtZW50CnEBWA0AAABIZWxsbywgd29ybGQhcQJOTodxA4FxBGgBWAEAAAAKcQVOTodxBoFxB2Uu",
},
}
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_devtools_log_places_encodes_and_queues_many_logs_as_string(devtools):
await devtools._stop_log_queue_processing()
devtools.log(DevtoolsLog(("hello", "world")))
queued_log = await devtools.log_queue.get()
queued_log_json = json.loads(queued_log)
assert queued_log_json == {
"type": "client_log",
"payload": {
"timestamp": TIMESTAMP,
"path": "",
"line_number": 0,
"encoded_segments": "gANdcQAoY3JpY2guc2VnbWVudApTZWdtZW50CnEBWAsAAABoZWxsbyB3b3JsZHECTk6HcQOBcQRoAVgBAAAACnEFTk6HcQaBcQdlLg==",
},
}
async def test_devtools_log_spillover(devtools):
# Give the devtools an intentionally small max queue size
await devtools._stop_log_queue_processing()
devtools.log_queue = Queue(maxsize=2)
# Force spillover of 2
devtools.log(DevtoolsLog((Panel("hello, world"),)))
devtools.log(DevtoolsLog("second message"))
devtools.log(DevtoolsLog("third message")) # Discarded by rate-limiting
devtools.log(DevtoolsLog("fourth message")) # Discarded by rate-limiting
assert devtools.spillover == 2
# Consume log queue
while not devtools.log_queue.empty():
await devtools.log_queue.get()
# Add another message now that we're under spillover threshold
devtools.log(DevtoolsLog("another message"))
await devtools.log_queue.get()
# Ensure we're informing the server of spillover rate-limiting
spillover_message = await devtools.log_queue.get()
assert json.loads(spillover_message) == {
"type": "client_spillover",
"payload": {"spillover": 2},
}
async def test_devtools_client_update_console_dimensions(devtools, server):
"""Sending new server info through websocket from server to client should (eventually)
result in the dimensions of the devtools client console being updated to match.
"""
server_to_client: WebSocketResponse = next(
iter(server.app["service"].clients)
).websocket
server_info = {
"type": "server_info",
"payload": {
"width": 123,
"height": 456,
},
}
await server_to_client.send_json(server_info)
await wait_for_predicate(
lambda: devtools.console.size == ConsoleDimensions(123, 456)
)

View File

@@ -0,0 +1,116 @@
import json
import pprint
from contextlib import redirect_stdout
from datetime import datetime
import time_machine
from textual.devtools.redirect_output import DevtoolsRedirector
TIMESTAMP = 1649166819
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_print_is_redirected_to_devtools(devtools):
await devtools._stop_log_queue_processing()
with redirect_stdout(DevtoolsRedirector(devtools)):
print("Hello, world!")
assert devtools.log_queue.qsize() == 1
queued_log = await devtools.log_queue.get()
queued_log_json = json.loads(queued_log)
payload = queued_log_json["payload"]
assert queued_log_json["type"] == "client_log"
assert payload["timestamp"] == TIMESTAMP
assert (
payload["encoded_segments"]
== "gANdcQAoY3JpY2guc2VnbWVudApTZWdtZW50CnEBWA0AAABIZWxsbywgd29ybGQhcQJOTodxA4FxBGgBWAEAAAAKcQVOTodxBoFxB2Uu"
)
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_print_without_flush_not_sent_to_devtools(devtools):
await devtools._stop_log_queue_processing()
with redirect_stdout(DevtoolsRedirector(devtools)):
# End is no longer newline character, so print will no longer
# flush the output buffer by default.
print("Hello, world!", end="")
assert devtools.log_queue.qsize() == 0
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_print_forced_flush_sent_to_devtools(devtools):
await devtools._stop_log_queue_processing()
with redirect_stdout(DevtoolsRedirector(devtools)):
print("Hello, world!", end="", flush=True)
assert devtools.log_queue.qsize() == 1
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_print_multiple_args_batched_as_one_log(devtools):
await devtools._stop_log_queue_processing()
with redirect_stdout(DevtoolsRedirector(devtools)):
# We call print with multiple arguments here, but it
# results in a single log added to the log queue.
print("Hello", "world", "multiple")
assert devtools.log_queue.qsize() == 1
queued_log = await devtools.log_queue.get()
queued_log_json = json.loads(queued_log)
payload = queued_log_json["payload"]
assert queued_log_json["type"] == "client_log"
assert payload["timestamp"] == TIMESTAMP
assert payload[
"encoded_segments"] == "gANdcQAoY3JpY2guc2VnbWVudApTZWdtZW50CnEBWBQAAABIZWxsbyB3b3JsZCBtdWx0aXBsZXECTk6HcQOBcQRoAVgBAAAACnEFTk6HcQaBcQdlLg=="
assert len(payload["path"]) > 0
assert payload["line_number"] != 0
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_print_multiple_args_batched_as_one_log(devtools):
await devtools._stop_log_queue_processing()
redirector = DevtoolsRedirector(devtools)
with redirect_stdout(redirector):
# This print adds 3 messages to the buffer that can be batched
print("The first", "batch", "of logs", end="")
# This message cannot be batched with the previous message,
# and so it will be the 2nd item added to the log queue.
print("I'm in the second batch")
assert devtools.log_queue.qsize() == 2
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_print_strings_containing_newline_flushed(devtools):
await devtools._stop_log_queue_processing()
with redirect_stdout(DevtoolsRedirector(devtools)):
# Flushing is disabled since end="", but the first
# string will be flushed since it contains a newline
print("Hel\nlo", end="")
print("world", end="")
assert devtools.log_queue.qsize() == 1
@time_machine.travel(datetime.fromtimestamp(TIMESTAMP))
async def test_flush_flushes_buffered_logs(devtools):
await devtools._stop_log_queue_processing()
redirector = DevtoolsRedirector(devtools)
with redirect_stdout(redirector):
print("x", end="")
assert devtools.log_queue.qsize() == 0
redirector.flush()
assert devtools.log_queue.qsize() == 1