Merge pull request #181 from OpenPipe/catch-rejections
Catch unhandled rejections in background worker
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
// https://docs.sentry.io/platforms/javascript/guides/nextjs/
|
||||
|
||||
import * as Sentry from "@sentry/nextjs";
|
||||
import { isError } from "lodash-es";
|
||||
import { env } from "~/env.mjs";
|
||||
|
||||
if (env.NEXT_PUBLIC_SENTRY_DSN) {
|
||||
@@ -15,4 +16,10 @@ if (env.NEXT_PUBLIC_SENTRY_DSN) {
|
||||
// Setting this option to true will print useful information to the console while you're setting up Sentry.
|
||||
debug: false,
|
||||
});
|
||||
} else {
|
||||
// Install local debug exception handler for rejected promises
|
||||
process.on("unhandledRejection", (reason) => {
|
||||
const reasonDetails = isError(reason) ? reason?.stack : reason;
|
||||
console.log("Unhandled Rejection at:", reasonDetails);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -30,6 +30,10 @@ export const env = createEnv({
|
||||
.string()
|
||||
.default("10")
|
||||
.transform((val) => parseInt(val)),
|
||||
WORKER_MAX_POOL_SIZE: z
|
||||
.string()
|
||||
.default("10")
|
||||
.transform((val) => parseInt(val)),
|
||||
},
|
||||
|
||||
/**
|
||||
@@ -73,6 +77,7 @@ export const env = createEnv({
|
||||
SMTP_LOGIN: process.env.SMTP_LOGIN,
|
||||
SMTP_PASSWORD: process.env.SMTP_PASSWORD,
|
||||
WORKER_CONCURRENCY: process.env.WORKER_CONCURRENCY,
|
||||
WORKER_MAX_POOL_SIZE: process.env.WORKER_MAX_POOL_SIZE,
|
||||
},
|
||||
/**
|
||||
* Run `build` or `dev` with `SKIP_ENV_VALIDATION` to skip env validation.
|
||||
|
||||
47
app/src/server/tasks/test-tasks.ts
Normal file
47
app/src/server/tasks/test-tasks.ts
Normal file
@@ -0,0 +1,47 @@
|
||||
import "dotenv/config";
|
||||
|
||||
import defineTask from "./defineTask";
|
||||
import { type TaskList, run } from "graphile-worker";
|
||||
import { env } from "~/env.mjs";
|
||||
|
||||
import "../../../sentry.server.config";
|
||||
|
||||
export type TestTask = { i: number };
|
||||
|
||||
// When a new eval is created, we want to run it on all existing outputs, but return the new eval first
|
||||
export const testTask = defineTask<TestTask>("testTask", (task) => {
|
||||
console.log("ran task ", task.i);
|
||||
|
||||
void new Promise((_resolve, reject) => setTimeout(reject, 500));
|
||||
return Promise.resolve();
|
||||
});
|
||||
|
||||
const registeredTasks = [testTask];
|
||||
|
||||
const taskList = registeredTasks.reduce((acc, task) => {
|
||||
acc[task.task.identifier] = task.task.handler;
|
||||
return acc;
|
||||
}, {} as TaskList);
|
||||
|
||||
// process.on("unhandledRejection", (reason, promise) => {
|
||||
// console.log("Unhandled Rejection at:", reason?.stack || reason);
|
||||
// });
|
||||
|
||||
// Run a worker to execute jobs:
|
||||
const runner = await run({
|
||||
connectionString: env.DATABASE_URL,
|
||||
concurrency: 10,
|
||||
// Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
|
||||
noHandleSignals: false,
|
||||
pollInterval: 1000,
|
||||
taskList,
|
||||
});
|
||||
|
||||
console.log("Worker successfully started");
|
||||
|
||||
for (let i = 0; i < 10; i++) {
|
||||
await testTask.enqueue({ i });
|
||||
await new Promise((resolve) => setTimeout(resolve, 1000));
|
||||
}
|
||||
|
||||
await runner.promise;
|
||||
@@ -1,5 +1,6 @@
|
||||
import { type TaskList, run } from "graphile-worker";
|
||||
import "dotenv/config";
|
||||
import "../../../sentry.server.config";
|
||||
|
||||
import { env } from "~/env.mjs";
|
||||
import { queryModel } from "./queryModel.task";
|
||||
@@ -18,6 +19,7 @@ const taskList = registeredTasks.reduce((acc, task) => {
|
||||
const runner = await run({
|
||||
connectionString: env.DATABASE_URL,
|
||||
concurrency: env.WORKER_CONCURRENCY,
|
||||
maxPoolSize: env.WORKER_MAX_POOL_SIZE,
|
||||
// Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
|
||||
noHandleSignals: false,
|
||||
pollInterval: 1000,
|
||||
|
||||
Reference in New Issue
Block a user