connection runs

This commit is contained in:
Will McGugan
2024-06-18 12:12:29 +01:00
parent 57588883c7
commit 4f4fa713e0
5 changed files with 186 additions and 94 deletions

View File

@@ -73,7 +73,5 @@ class DictionaryApp(App):
if __name__ == "__main__":
from textual_serve.server import Server
server = Server(DictionaryApp)
server.serve()
app = DictionaryApp()
app.run()

View File

@@ -0,0 +1,4 @@
from textual_serve.server import Server
server = Server("python dictionary.py")
server.serve()

View File

@@ -1,30 +1,43 @@
import asyncio
import base64
import io
import json
import os
import pickle
import sys
from typing import Callable
from typing import Awaitable, Callable, Literal, TypeAlias
from asyncio.subprocess import Process
from importlib.metadata import version
import rich.repr
from textual.app import App
from .packets import Handlers
from .json_codec import JSONCodec
from .packet_decoder import PacketDecoder
Meta: TypeAlias = "dict[str, str | None | int | bool]"
@rich.repr.auto
class AppService(Handlers):
def __init__(self, app_factory: Callable[[], App]) -> None:
self.app_factory = app_factory
self._pickled_app_factory: bytes = pickle.dumps(app_factory)
def __init__(
self,
command: str,
remote_write_bytes: Callable[[bytes], Awaitable],
remote_write_str: Callable[[str], Awaitable],
) -> None:
self.command = command
self.remote_write_bytes = remote_write_bytes
self.remote_write_str = remote_write_str
self.codec = JSONCodec()
self._packet_decoder = PacketDecoder(self.codec)
self._task: asyncio.Task | None = None
self._stdin: asyncio.StreamWriter | None = None
self._exit_event = asyncio.Event()
@property
def stdin(self) -> asyncio.StreamWriter:
assert self._stdin is not None
return self._stdin
def _build_environment(self, width: int = 80, height: int = 24) -> dict[str, str]:
"""Build an environment dict for the App subprocess.
@@ -54,44 +67,143 @@ class AppService(Handlers):
height: height of the terminal.
"""
environment = self._build_environment(width=width, height=height)
encoded_app_factory = base64.b64encode(self._pickled_app_factory)
process = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"textual_server.runner",
encoded_app_factory,
process = await asyncio.create_subprocess_shell(
self.command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=environment,
)
assert process.stdin is not None
process.stdin.write(encoded_app_factory + b"\n")
await process.stdin.drain()
return process
@classmethod
def encode_packet(cls, packet_type: Literal[b"D", b"M"], payload: bytes) -> bytes:
"""Encode a packet.
Args:
packet_type: The packet type (b"D" for data or b"M" for meta)
payload: The payload.
Returns:
Data as bytes.
"""
return b"%s%s%s" % (packet_type, len(payload).to_bytes(4, "big"), payload)
async def send_bytes(self, data: bytes) -> bool:
"""Send bytes to process.
Args:
data: Data to send.
Returns:
True if the data was sent, otherwise False.
"""
stdin = self.stdin
try:
stdin.write(self.encode_packet(b"D", data))
except RuntimeError:
return False
await stdin.drain()
return True
async def send_meta(self, data: Meta) -> bool:
"""Send meta information to process.
Args:
data: Meta dict to send.
Returns:
True if the data was sent, otherwise False.
"""
stdin = self.stdin
data_bytes = json.dumps(data).encode("utf-8")
try:
stdin.write(self.encode_packet(b"M", data_bytes))
except RuntimeError:
return False
await stdin.drain()
return True
async def set_terminal_size(self, width: int, height: int) -> None:
await self.send_meta(
{
"type": "resize",
"width": width,
"height": height,
}
)
def start(self, width: int, height: int) -> None:
self._task = asyncio.create_task(self.run(width, height))
async def stop(self) -> None:
if self._task is not None:
self._task.cancel()
await self._task
self._task = None
async def run(self, width: int = 80, height: int = 24) -> None:
META = b"M"
DATA = b"D"
process = await self._open_app_process(width, height)
stdout = process.stdout
stderr = process.stderr
assert stdout is not None
while True:
initial = await stdout.readexactly(1)
assert stderr is not None
self._stdin = process.stdin
size_bytes = await stdout.readexactly(4)
size = int.from_bytes(size_bytes, "big")
payload = await stdout.readexactly(size)
stderr_data = io.BytesIO()
if initial == "D":
await self.on_data(payload)
elif initial == "M":
await self.on_meta(payload)
else:
raise RuntimeError("unknown packet")
async def read_stderr() -> None:
"""Task to read stderr."""
try:
while True:
data = await stderr.read(1024 * 4)
if not data:
break
stderr_data.write(data)
except asyncio.CancelledError:
pass
stderr_task = asyncio.create_task(read_stderr())
try:
ready = False
for _ in range(10):
line = await stdout.readline()
if not line:
break
if line == b"__GANGLION__\n":
ready = True
break
if ready:
while not self._exit_event.is_set():
type_bytes = await stdout.readexactly(1)
size_bytes = await stdout.readexactly(4)
size = int.from_bytes(size_bytes, "big")
payload = await stdout.readexactly(size)
if type_bytes == DATA:
await self.on_data(payload)
elif type_bytes == META:
await self.on_meta(payload)
else:
raise RuntimeError("unknown packet")
except asyncio.IncompleteReadError:
pass
except asyncio.CancelledError:
pass
finally:
stderr_task.cancel()
await stderr_task
async def on_data(self, payload: bytes) -> None:
packet = self._packet_decoder.decode(payload)
assert packet is not None
await self.dispatch_packet(packet)
await self.remote_write_bytes(payload)
async def on_meta(self, data: object) -> None:
pass
meta_data = json.loads(data)
print("on_meta", meta_data)

View File

@@ -1,11 +1,22 @@
if __name__ == "__main__":
print("RUNNER")
import base64
import sys
import pickle
if sys.__stdin__ is not None:
print(1)
app_pickle_base64 = sys.__stdin__.readline()
print(2)
app_pickle = base64.b64decode(app_pickle_base64)
app = pickle.loads(app_pickle)
print(3, app_pickle)
try:
app = pickle.loads(app_pickle)
except Exception as error:
print(error)
raise
print(4)
print("RUNNER", app)
assert hasattr(app, "run")
app.run()

View File

@@ -1,21 +1,19 @@
from __future__ import annotations
import asyncio
import base64
import io
import logging
import os
from pathlib import Path
import signal
import sys
from typing import Any, Callable
import pickle
from asyncio.subprocess import Process
from importlib.metadata import version
import aiohttp_jinja2
from aiohttp import web
from aiohttp import WSMsgType
from aiohttp.web_runner import GracefulExit
import jinja2
@@ -23,6 +21,9 @@ from rich import print
from textual.app import App
from .app_service import AppService
log = logging.getLogger("textual")
@@ -31,7 +32,7 @@ class Server:
def __init__(
self,
app_factory: Callable[[], App],
command: str,
host: str = "0.0.0.0",
port: int = 8000,
name: str = "Textual App",
@@ -48,7 +49,7 @@ class Server:
statics_path: Path to statics folder. May be absolute or relative to server.py.
templates_path" Path to templates folder. May be absolute or relative to server.py.
"""
self.app_factory = app_factory
self.command = command
self.host = host
self.port = port
self.name = name
@@ -65,8 +66,6 @@ class Server:
self.statics_path = base_path / statics_path
self.templates_path = base_path / templates_path
self._pickled_app_factory: bytes = pickle.dumps(app_factory)
def request_exit(self, reason: str | None = None) -> None:
"""Gracefully exit the application, optionally supplying a reason.
@@ -133,50 +132,6 @@ class Server:
print(context)
return context
def build_environment(self, width: int = 80, height: int = 24) -> dict[str, str]:
"""Build an environment dict for the App subprocess.
Args:
width: Initial width.
height: Initial height.
Returns:
A environment dict.
"""
environment = dict(os.environ.copy())
environment["TEXTUAL_DRIVER"] = "textual.drivers.web_driver:WebDriver"
environment["TEXTUAL_FPS"] = "60"
environment["TEXTUAL_COLOR_SYSTEM"] = "truecolor"
environment["TERM_PROGRAM"] = "textual"
environment["TERM_PROGRAM_VERSION"] = version("textual-serve")
environment["COLUMNS"] = str(width)
environment["ROWS"] = str(height)
return environment
async def open_app_process(self, width: int = 80, height: int = 24) -> Process:
"""Open a process to run the app.
Args:
width: Width of the terminal.
height: height of the terminal.
"""
environment = self.build_environment(width=width, height=height)
encoded_app_factory = base64.b64encode(self._pickled_app_factory)
process = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"textual_server.runner",
encoded_app_factory,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=environment,
)
assert process.stdin is not None
process.stdin.write(encoded_app_factory + b"\n")
await process.stdin.drain()
return process
async def handle_websocket(self, request: web.Request) -> web.WebSocketResponse:
websocket = web.WebSocketResponse(heartbeat=15)
@@ -189,19 +144,31 @@ class Server:
width = to_int(request.query.get("width", "80"), 80)
height = to_int(request.query.get("height", "24"), 24)
process = await self.open_app_process(width, height)
TEXT = WSMsgType.TEXT
BINARY = WSMsgType.BINARY
try:
await websocket.prepare(request)
app_service = AppService(
self.command, websocket.send_bytes, websocket.send_str
)
app_service.start(width, height)
async for message in websocket:
print(message)
pass
if message.type == TEXT:
match message.json():
case ["stdin", data]:
await app_service.send_bytes(data.encode("utf-8"))
case ["resize", {"width": width, "height": height}]:
await app_service.set_terminal_size(width, height)
elif message.type == BINARY:
pass
except asyncio.CancelledError:
await websocket.close()
finally:
pass
await app_service.stop()
return websocket