75 lines
1.7 KiB
Python
75 lines
1.7 KiB
Python
import asyncio, time
|
|
|
|
from fastapi.websockets import WebSocket
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi import APIRouter
|
|
from djitellopy import Tello
|
|
|
|
|
|
router = APIRouter()
|
|
tello = Tello()
|
|
|
|
|
|
@router.get("/health")
|
|
def test():
|
|
return {"msg": "ok"}
|
|
|
|
@router.get("/flight")
|
|
def test_flight():
|
|
try:
|
|
tello.connect()
|
|
except Exception as e:
|
|
return {"msg": "failed to connect"}
|
|
tello.takeoff()
|
|
if not tello.is_flying:
|
|
return {"msg": "failed to take off"}
|
|
tello.rotate_counter_clockwise(180)
|
|
time.sleep(2)
|
|
tello.rotate_clockwise(180)
|
|
time.sleep(5)
|
|
tello.land()
|
|
if not tello.is_flying:
|
|
return {"msg": "succesfully landed"}
|
|
else:
|
|
return {"msg": "landing failed, still flying!!!"}
|
|
|
|
@router.get("/continuous-response")
|
|
async def test_continuous_response():
|
|
return HTMLResponse(html_template)
|
|
|
|
@router.websocket("/ws")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
await websocket.accept()
|
|
async for message in generate_output():
|
|
await websocket.send_text(message)
|
|
|
|
|
|
async def generate_output():
|
|
for i in range(10):
|
|
await asyncio.sleep(1)
|
|
yield f"Progress: {i + 1}\n"
|
|
yield "Done!\n"
|
|
|
|
|
|
html_template = """
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>Websocket Example</title>
|
|
<script>
|
|
var ws = new WebSocket("ws://" + window.location.host + "/test/ws");
|
|
|
|
ws.onmessage = function(event) {
|
|
var progress_log = document.getElementById("progress-log");
|
|
progress_log.innerHTML += event.data + "<br>";
|
|
}
|
|
</script>
|
|
</head>
|
|
<body>
|
|
<h1>Websocket Example</h1>
|
|
<p>Progress:</p>
|
|
<div id="progress-log"></div>
|
|
</body>
|
|
</html>
|
|
"""
|