mirror of
https://github.com/steelbrain/ffmpeg-over-ip.git
synced 2024-10-12 01:34:56 +03:00
🎉 Server works!
This commit is contained in:
@@ -17,6 +17,7 @@
|
||||
"typescript": "^5.3.3"
|
||||
},
|
||||
"dependencies": {
|
||||
"sb-stream-promise": "^2.0.0",
|
||||
"strip-json-comments": "^5.0.1",
|
||||
"zod": "^3.22.4"
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ const configSchemaClient = z.object({
|
||||
})
|
||||
|
||||
export async function loadConfig<T extends Runtime.Client | Runtime.Server>(
|
||||
runtime: Runtime
|
||||
runtime: T
|
||||
): Promise<
|
||||
T extends Runtime.Server ? z.infer<typeof configSchemaServer> : z.infer<typeof configSchemaClient>
|
||||
> {
|
||||
@@ -92,3 +92,11 @@ export async function loadConfig<T extends Runtime.Client | Runtime.Server>(
|
||||
? z.infer<typeof configSchemaServer>
|
||||
: z.infer<typeof configSchemaClient>
|
||||
}
|
||||
|
||||
export function rewriteArgsInServer(
|
||||
args: string[],
|
||||
config: z.infer<typeof configSchemaServer>
|
||||
): string[] {
|
||||
// TODO: Do paths rewriting
|
||||
return args.slice()
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import os from 'node:os'
|
||||
import path from 'node:path'
|
||||
|
||||
export const SERVER_MAX_PAYLOAD_SIZE_BYTES = 102400 // 100kb
|
||||
export const SERVER_CLUSTER_RESTART_DELAY_MS = 5000
|
||||
|
||||
export enum Runtime {
|
||||
Server = 'server',
|
||||
Client = 'client',
|
||||
|
||||
123
src/server.ts
123
src/server.ts
@@ -1,5 +1,18 @@
|
||||
import { loadConfig } from './config.js'
|
||||
import { CONFIG_FILE_SEARCH_PATHS_SERVER, Runtime } from './constants.js'
|
||||
import childProcess from 'node:child_process'
|
||||
import cluster from 'node:cluster'
|
||||
import http from 'node:http'
|
||||
import os from 'node:os'
|
||||
|
||||
import streamToPromise from 'sb-stream-promise'
|
||||
|
||||
import { loadConfig, rewriteArgsInServer } from './config.js'
|
||||
import {
|
||||
CONFIG_FILE_SEARCH_PATHS_SERVER,
|
||||
Runtime,
|
||||
SERVER_CLUSTER_RESTART_DELAY_MS,
|
||||
SERVER_MAX_PAYLOAD_SIZE_BYTES,
|
||||
} from './constants.js'
|
||||
import createLogger from './logger.js'
|
||||
|
||||
async function main() {
|
||||
if (process.argv.includes('--debug-print-search-paths')) {
|
||||
@@ -13,6 +26,112 @@ async function main() {
|
||||
console.log(config)
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
const logger = createLogger(config.log)
|
||||
|
||||
if (cluster.isPrimary) {
|
||||
const numForks = os.availableParallelism()
|
||||
for (let i = 0; i < numForks; i += 1) {
|
||||
cluster.fork()
|
||||
}
|
||||
|
||||
cluster.on('exit', (worker, code, signal) => {
|
||||
logger.log(
|
||||
`Worker process died pid=${worker.process.pid} code=${code} signal=${signal} -- respawning`
|
||||
)
|
||||
setTimeout(cluster.fork.bind(cluster), SERVER_CLUSTER_RESTART_DELAY_MS)
|
||||
})
|
||||
|
||||
logger.log(
|
||||
`Server started with ${numForks} workers at ${config.listenAddress}:${config.listenPort}`
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
const activeRequests = new Map<http.ServerResponse, null | childProcess.ChildProcess>()
|
||||
http
|
||||
.createServer((req, res) => {
|
||||
if (req.headers.authorization !== `Bearer ${config.authSecret}`) {
|
||||
res.setHeader('Content-Type', 'application/json')
|
||||
res.writeHead(401)
|
||||
res.end(JSON.stringify({ error: 'Unauthorized' }))
|
||||
logger.error('Rejected request: Unauthorized')
|
||||
return
|
||||
}
|
||||
if (req.method !== 'POST') {
|
||||
res.setHeader('Content-Type', 'application/json')
|
||||
res.writeHead(405)
|
||||
res.end(JSON.stringify({ error: 'Method Not Allowed' }))
|
||||
logger.error('Rejected request: Method not allowed')
|
||||
return
|
||||
}
|
||||
|
||||
let activeRequest: null | childProcess.ChildProcess = null
|
||||
|
||||
res.on('close', () => {
|
||||
activeRequest?.kill()
|
||||
activeRequests.delete(res)
|
||||
})
|
||||
activeRequests.set(res, null)
|
||||
|
||||
streamToPromise(req, SERVER_MAX_PAYLOAD_SIZE_BYTES)
|
||||
.then(requestBody => {
|
||||
let parsed: unknown
|
||||
try {
|
||||
parsed = JSON.parse(requestBody)
|
||||
} catch (err) {
|
||||
throw new Error('Malformed JSON in request body')
|
||||
}
|
||||
if (!Array.isArray(parsed) || parsed.some(item => typeof item !== 'string')) {
|
||||
throw new Error('Request body MUST have a type of string[]')
|
||||
}
|
||||
return parsed as string[]
|
||||
})
|
||||
.then(
|
||||
args => {
|
||||
res.setHeader('Content-Type', 'application/json')
|
||||
res.writeHead(200)
|
||||
|
||||
activeRequest = childProcess.spawn(
|
||||
config.ffmpegPath,
|
||||
rewriteArgsInServer(args, config),
|
||||
{
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
}
|
||||
)
|
||||
activeRequest.stdout?.on('data', chunk => {
|
||||
res.write(`${JSON.stringify({ stream: 'stdout', data: chunk.toString() })}\n`)
|
||||
})
|
||||
activeRequest.stdout?.on('end', () => {
|
||||
res.write(`${JSON.stringify({ stream: 'stdout', data: null })}\n`)
|
||||
})
|
||||
activeRequest.stderr?.on('data', chunk => {
|
||||
res.write(`${JSON.stringify({ stream: 'stderr', data: chunk.toString() })}\n`)
|
||||
})
|
||||
activeRequest.stderr?.on('end', () => {
|
||||
res.write(`${JSON.stringify({ stream: 'stderr', data: null })}\n`)
|
||||
})
|
||||
activeRequest.on('error', err => {
|
||||
res.end(`${JSON.stringify({ stream: 'stderr', data: err.message })}\n`)
|
||||
activeRequests.delete(res)
|
||||
activeRequest = null
|
||||
})
|
||||
activeRequest.on('exit', code => {
|
||||
res.end(`${JSON.stringify({ exitCode: code })}\n`)
|
||||
activeRequests.delete(res)
|
||||
activeRequest = null
|
||||
})
|
||||
},
|
||||
err => {
|
||||
res.setHeader('Content-Type', 'application/json')
|
||||
res.writeHead(400)
|
||||
res.end(JSON.stringify({ error: err.message }))
|
||||
logger.error('Rejected request: Bad Request')
|
||||
}
|
||||
)
|
||||
})
|
||||
.listen(config.listenPort, config.listenAddress)
|
||||
}
|
||||
|
||||
main().catch(err => {
|
||||
|
||||
@@ -40,5 +40,5 @@
|
||||
"skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */
|
||||
"skipLibCheck": true /* Skip type checking all .d.ts files. */
|
||||
},
|
||||
"include": ["src/"]
|
||||
"include": ["src/", "typings/"],
|
||||
}
|
||||
|
||||
6
typings/sb-stream-promise.d.ts
vendored
Normal file
6
typings/sb-stream-promise.d.ts
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
declare module 'sb-stream-promise' {
|
||||
export default function streamToPromise(
|
||||
stream: NodeJS.ReadableStream,
|
||||
bytesLimit?: number
|
||||
): Promise<string>
|
||||
}
|
||||
@@ -207,6 +207,11 @@ esbuild@^0.20.0:
|
||||
"@esbuild/win32-ia32" "0.20.0"
|
||||
"@esbuild/win32-x64" "0.20.0"
|
||||
|
||||
sb-stream-promise@^2.0.0:
|
||||
version "2.0.0"
|
||||
resolved "https://registry.yarnpkg.com/sb-stream-promise/-/sb-stream-promise-2.0.0.tgz#f822b4d9b8e9739a2de30069e865904b1fbddcf5"
|
||||
integrity sha512-Aw3zZ1icG9nVU5JvhjukyFftYqqVBOny9qsL6PPDwS+Qvj/78SVU+sJ+jgE7wmoCBcgkWe+3gjo6oKL8/FuOcA==
|
||||
|
||||
strip-json-comments@^5.0.1:
|
||||
version "5.0.1"
|
||||
resolved "https://registry.yarnpkg.com/strip-json-comments/-/strip-json-comments-5.0.1.tgz#0d8b7d01b23848ed7dbdf4baaaa31a8250d8cfa0"
|
||||
|
||||
Reference in New Issue
Block a user