diff --git a/package.json b/package.json index 6a9c02c..1c4b8b9 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ "typescript": "^5.3.3" }, "dependencies": { + "sb-stream-promise": "^2.0.0", "strip-json-comments": "^5.0.1", "zod": "^3.22.4" } diff --git a/src/config.ts b/src/config.ts index c4ad47a..cba69e7 100644 --- a/src/config.ts +++ b/src/config.ts @@ -30,7 +30,7 @@ const configSchemaClient = z.object({ }) export async function loadConfig( - runtime: Runtime + runtime: T ): Promise< T extends Runtime.Server ? z.infer : z.infer > { @@ -92,3 +92,11 @@ export async function loadConfig( ? z.infer : z.infer } + +export function rewriteArgsInServer( + args: string[], + config: z.infer +): string[] { + // TODO: Do paths rewriting + return args.slice() +} diff --git a/src/constants.ts b/src/constants.ts index 0005ee4..376594a 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -1,6 +1,9 @@ import os from 'node:os' import path from 'node:path' +export const SERVER_MAX_PAYLOAD_SIZE_BYTES = 102400 // 100kb +export const SERVER_CLUSTER_RESTART_DELAY_MS = 5000 + export enum Runtime { Server = 'server', Client = 'client', diff --git a/src/server.ts b/src/server.ts index cd56887..c956366 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,5 +1,18 @@ -import { loadConfig } from './config.js' -import { CONFIG_FILE_SEARCH_PATHS_SERVER, Runtime } from './constants.js' +import childProcess from 'node:child_process' +import cluster from 'node:cluster' +import http from 'node:http' +import os from 'node:os' + +import streamToPromise from 'sb-stream-promise' + +import { loadConfig, rewriteArgsInServer } from './config.js' +import { + CONFIG_FILE_SEARCH_PATHS_SERVER, + Runtime, + SERVER_CLUSTER_RESTART_DELAY_MS, + SERVER_MAX_PAYLOAD_SIZE_BYTES, +} from './constants.js' +import createLogger from './logger.js' async function main() { if (process.argv.includes('--debug-print-search-paths')) { @@ -13,6 +26,112 @@ async function main() { console.log(config) process.exit(1) } + + const logger = createLogger(config.log) + + if (cluster.isPrimary) { + const numForks = os.availableParallelism() + for (let i = 0; i < numForks; i += 1) { + cluster.fork() + } + + cluster.on('exit', (worker, code, signal) => { + logger.log( + `Worker process died pid=${worker.process.pid} code=${code} signal=${signal} -- respawning` + ) + setTimeout(cluster.fork.bind(cluster), SERVER_CLUSTER_RESTART_DELAY_MS) + }) + + logger.log( + `Server started with ${numForks} workers at ${config.listenAddress}:${config.listenPort}` + ) + + return + } + + const activeRequests = new Map() + http + .createServer((req, res) => { + if (req.headers.authorization !== `Bearer ${config.authSecret}`) { + res.setHeader('Content-Type', 'application/json') + res.writeHead(401) + res.end(JSON.stringify({ error: 'Unauthorized' })) + logger.error('Rejected request: Unauthorized') + return + } + if (req.method !== 'POST') { + res.setHeader('Content-Type', 'application/json') + res.writeHead(405) + res.end(JSON.stringify({ error: 'Method Not Allowed' })) + logger.error('Rejected request: Method not allowed') + return + } + + let activeRequest: null | childProcess.ChildProcess = null + + res.on('close', () => { + activeRequest?.kill() + activeRequests.delete(res) + }) + activeRequests.set(res, null) + + streamToPromise(req, SERVER_MAX_PAYLOAD_SIZE_BYTES) + .then(requestBody => { + let parsed: unknown + try { + parsed = JSON.parse(requestBody) + } catch (err) { + throw new Error('Malformed JSON in request body') + } + if (!Array.isArray(parsed) || parsed.some(item => typeof item !== 'string')) { + throw new Error('Request body MUST have a type of string[]') + } + return parsed as string[] + }) + .then( + args => { + res.setHeader('Content-Type', 'application/json') + res.writeHead(200) + + activeRequest = childProcess.spawn( + config.ffmpegPath, + rewriteArgsInServer(args, config), + { + stdio: ['ignore', 'pipe', 'pipe'], + } + ) + activeRequest.stdout?.on('data', chunk => { + res.write(`${JSON.stringify({ stream: 'stdout', data: chunk.toString() })}\n`) + }) + activeRequest.stdout?.on('end', () => { + res.write(`${JSON.stringify({ stream: 'stdout', data: null })}\n`) + }) + activeRequest.stderr?.on('data', chunk => { + res.write(`${JSON.stringify({ stream: 'stderr', data: chunk.toString() })}\n`) + }) + activeRequest.stderr?.on('end', () => { + res.write(`${JSON.stringify({ stream: 'stderr', data: null })}\n`) + }) + activeRequest.on('error', err => { + res.end(`${JSON.stringify({ stream: 'stderr', data: err.message })}\n`) + activeRequests.delete(res) + activeRequest = null + }) + activeRequest.on('exit', code => { + res.end(`${JSON.stringify({ exitCode: code })}\n`) + activeRequests.delete(res) + activeRequest = null + }) + }, + err => { + res.setHeader('Content-Type', 'application/json') + res.writeHead(400) + res.end(JSON.stringify({ error: err.message })) + logger.error('Rejected request: Bad Request') + } + ) + }) + .listen(config.listenPort, config.listenAddress) } main().catch(err => { diff --git a/tsconfig.json b/tsconfig.json index 5073ab4..d84eadd 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -40,5 +40,5 @@ "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */ "skipLibCheck": true /* Skip type checking all .d.ts files. */ }, - "include": ["src/"] + "include": ["src/", "typings/"], } diff --git a/typings/sb-stream-promise.d.ts b/typings/sb-stream-promise.d.ts new file mode 100644 index 0000000..0e4aeb3 --- /dev/null +++ b/typings/sb-stream-promise.d.ts @@ -0,0 +1,6 @@ +declare module 'sb-stream-promise' { + export default function streamToPromise( + stream: NodeJS.ReadableStream, + bytesLimit?: number + ): Promise +} diff --git a/yarn.lock b/yarn.lock index 515839a..4e63725 100644 --- a/yarn.lock +++ b/yarn.lock @@ -207,6 +207,11 @@ esbuild@^0.20.0: "@esbuild/win32-ia32" "0.20.0" "@esbuild/win32-x64" "0.20.0" +sb-stream-promise@^2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/sb-stream-promise/-/sb-stream-promise-2.0.0.tgz#f822b4d9b8e9739a2de30069e865904b1fbddcf5" + integrity sha512-Aw3zZ1icG9nVU5JvhjukyFftYqqVBOny9qsL6PPDwS+Qvj/78SVU+sJ+jgE7wmoCBcgkWe+3gjo6oKL8/FuOcA== + strip-json-comments@^5.0.1: version "5.0.1" resolved "https://registry.yarnpkg.com/strip-json-comments/-/strip-json-comments-5.0.1.tgz#0d8b7d01b23848ed7dbdf4baaaa31a8250d8cfa0"