From 10dd53e7f639ff801bc154f52b0110f7141f12cd Mon Sep 17 00:00:00 2001 From: Kyle Corbitt Date: Fri, 18 Aug 2023 11:16:00 -0700 Subject: [PATCH] Run workers in a separate Docker container We've outgrown the run-everything-on-one-machine setup. This change moves background jobs to a different Docker image in production. It also adds a `jobKey` to certain jobs so if we try to process the same cell multiple times it'll only actually run the job once. --- app/run-prod.sh | 4 +--- app/src/env.mjs | 5 +++++ app/src/server/tasks/defineTask.ts | 8 +++++--- app/src/server/tasks/queryModel.task.ts | 4 ++-- app/src/server/tasks/worker.ts | 2 +- render.yaml | 14 ++++++++++---- 6 files changed, 24 insertions(+), 13 deletions(-) diff --git a/app/run-prod.sh b/app/run-prod.sh index 4390479..0a6839e 100755 --- a/app/run-prod.sh +++ b/app/run-prod.sh @@ -10,6 +10,4 @@ pnpm tsx src/promptConstructor/migrate.ts echo "Starting the server" -pnpm concurrently --kill-others \ - "pnpm start" \ - "pnpm tsx src/server/tasks/worker.ts" \ No newline at end of file +pnpm start diff --git a/app/src/env.mjs b/app/src/env.mjs index 3624c47..8ce2d48 100644 --- a/app/src/env.mjs +++ b/app/src/env.mjs @@ -26,6 +26,10 @@ export const env = createEnv({ SMTP_PORT: z.string().default("placeholder"), SMTP_LOGIN: z.string().default("placeholder"), SMTP_PASSWORD: z.string().default("placeholder"), + WORKER_CONCURRENCY: z + .string() + .default("10") + .transform((val) => parseInt(val)), }, /** @@ -68,6 +72,7 @@ export const env = createEnv({ SMTP_PORT: process.env.SMTP_PORT, SMTP_LOGIN: process.env.SMTP_LOGIN, SMTP_PASSWORD: process.env.SMTP_PASSWORD, + WORKER_CONCURRENCY: process.env.WORKER_CONCURRENCY, }, /** * Run `build` or `dev` with `SKIP_ENV_VALIDATION` to skip env validation. diff --git a/app/src/server/tasks/defineTask.ts b/app/src/server/tasks/defineTask.ts index 08b3ade..39daafa 100644 --- a/app/src/server/tasks/defineTask.ts +++ b/app/src/server/tasks/defineTask.ts @@ -1,4 +1,4 @@ -import { type Helpers, type Task, makeWorkerUtils } from "graphile-worker"; +import { type Helpers, type Task, makeWorkerUtils, TaskSpec } from "graphile-worker"; import { env } from "~/env.mjs"; let workerUtilsPromise: ReturnType | null = null; @@ -16,9 +16,11 @@ function defineTask( taskIdentifier: string, taskHandler: (payload: TPayload, helpers: Helpers) => Promise, ) { - const enqueue = async (payload: TPayload, runAt?: Date) => { + const enqueue = async (payload: TPayload, spec?: TaskSpec) => { console.log("Enqueuing task", taskIdentifier, payload); - await (await workerUtils()).addJob(taskIdentifier, payload, { runAt }); + + const utils = await workerUtils(); + return await utils.addJob(taskIdentifier, payload, spec); }; const handler = (payload: TPayload, helpers: Helpers) => { diff --git a/app/src/server/tasks/queryModel.task.ts b/app/src/server/tasks/queryModel.task.ts index 4580f2c..081bc2c 100644 --- a/app/src/server/tasks/queryModel.task.ts +++ b/app/src/server/tasks/queryModel.task.ts @@ -153,7 +153,7 @@ export const queryModel = defineTask("queryModel", async (task) = stream, numPreviousTries: numPreviousTries + 1, }, - retryTime, + { runAt: retryTime, jobKey: cellId }, ); await prisma.scenarioVariantCell.update({ where: { id: cellId }, @@ -184,6 +184,6 @@ export const queueQueryModel = async (cellId: string, stream: boolean) => { jobQueuedAt: new Date(), }, }), - queryModel.enqueue({ cellId, stream, numPreviousTries: 0 }), + queryModel.enqueue({ cellId, stream, numPreviousTries: 0 }, { jobKey: cellId }), ]); }; diff --git a/app/src/server/tasks/worker.ts b/app/src/server/tasks/worker.ts index c49b960..b74097c 100644 --- a/app/src/server/tasks/worker.ts +++ b/app/src/server/tasks/worker.ts @@ -17,7 +17,7 @@ const taskList = registeredTasks.reduce((acc, task) => { // Run a worker to execute jobs: const runner = await run({ connectionString: env.DATABASE_URL, - concurrency: 10, + concurrency: env.WORKER_CONCURRENCY, // Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc noHandleSignals: false, pollInterval: 1000, diff --git a/render.yaml b/render.yaml index 7fdd631..179223e 100644 --- a/render.yaml +++ b/render.yaml @@ -7,7 +7,7 @@ databases: services: - type: web name: querykey-prod-web - env: docker + runtime: docker dockerfilePath: ./app/Dockerfile dockerContext: . plan: standard @@ -21,8 +21,6 @@ services: name: querykey-prod property: connectionString - fromGroup: querykey-prod - - key: NEXT_PUBLIC_SOCKET_URL - value: https://querykey-prod-wss.onrender.com # Render support says we need to manually set this because otherwise # sometimes it checks a different random port that NextJS opens for # liveness and the liveness check fails. @@ -31,8 +29,16 @@ services: - type: web name: querykey-prod-wss - env: docker + runtime: docker dockerfilePath: ./app/Dockerfile dockerContext: . plan: free dockerCommand: pnpm tsx src/wss-server.ts + + - type: worker + name: querykey-prod-worker + runtime: docker + dockerfilePath: ./app/Dockerfile + dockerContext: . + plan: starter + dockerCommand: pnpm tsx src/server/tasks/worker.ts