Prioritize job execution

Makes it so our most critical jobs go through first. Priority order:

1. Force-refetched cells
2. Cells visible on the current page
3. All other cells
4. Retries
5. Evaluations
This commit is contained in:
Kyle Corbitt
2023-08-18 13:44:33 -07:00
parent 9b1f2ac30a
commit 3be0a90960
5 changed files with 27 additions and 13 deletions

View File

@@ -43,7 +43,7 @@ export default function OutputCell({
type OutputSchema = Parameters<typeof provider.normalizeOutput>[0]; type OutputSchema = Parameters<typeof provider.normalizeOutput>[0];
const { mutateAsync: hardRefetchMutate } = api.scenarioVariantCells.forceRefetch.useMutation(); const { mutateAsync: hardRefetchMutate } = api.scenarioVariantCells.hardRefetch.useMutation();
const [hardRefetch, hardRefetching] = useHandledAsyncCallback(async () => { const [hardRefetch, hardRefetching] = useHandledAsyncCallback(async () => {
await hardRefetchMutate({ scenarioId: scenario.id, variantId: variant.id }); await hardRefetchMutate({ scenarioId: scenario.id, variantId: variant.id });
await utils.scenarioVariantCells.get.invalidate({ await utils.scenarioVariantCells.get.invalidate({

View File

@@ -61,7 +61,7 @@ export const scenarioVariantCellsRouter = createTRPCRouter({
evalsComplete, evalsComplete,
}; };
}), }),
forceRefetch: protectedProcedure hardRefetch: protectedProcedure
.input( .input(
z.object({ z.object({
scenarioId: z.string(), scenarioId: z.string(),
@@ -85,7 +85,10 @@ export const scenarioVariantCellsRouter = createTRPCRouter({
}); });
if (!cell) { if (!cell) {
await generateNewCell(input.variantId, input.scenarioId, { stream: true }); await generateNewCell(input.variantId, input.scenarioId, {
stream: true,
hardRefetch: true,
});
return; return;
} }
@@ -96,7 +99,7 @@ export const scenarioVariantCellsRouter = createTRPCRouter({
}, },
}); });
await queueQueryModel(cell.id, true); await queueQueryModel(cell.id, { stream: true, hardRefetch: true });
}), }),
getTemplatedPromptMessage: publicProcedure getTemplatedPromptMessage: publicProcedure
.input( .input(

View File

@@ -25,7 +25,6 @@ function calculateDelay(numPreviousTries: number): number {
} }
export const queryModel = defineTask<QueryModelJob>("queryModel", async (task) => { export const queryModel = defineTask<QueryModelJob>("queryModel", async (task) => {
console.log("RUNNING TASK", task);
const { cellId, stream, numPreviousTries } = task; const { cellId, stream, numPreviousTries } = task;
const cell = await prisma.scenarioVariantCell.findUnique({ const cell = await prisma.scenarioVariantCell.findUnique({
where: { id: cellId }, where: { id: cellId },
@@ -153,7 +152,7 @@ export const queryModel = defineTask<QueryModelJob>("queryModel", async (task) =
stream, stream,
numPreviousTries: numPreviousTries + 1, numPreviousTries: numPreviousTries + 1,
}, },
{ runAt: retryTime, jobKey: cellId }, { runAt: retryTime, jobKey: cellId, priority: 3 },
); );
await prisma.scenarioVariantCell.update({ await prisma.scenarioVariantCell.update({
where: { id: cellId }, where: { id: cellId },
@@ -172,7 +171,13 @@ export const queryModel = defineTask<QueryModelJob>("queryModel", async (task) =
} }
}); });
export const queueQueryModel = async (cellId: string, stream: boolean) => { export const queueQueryModel = async (
cellId: string,
options: { stream?: boolean; hardRefetch?: boolean } = {},
) => {
// Hard refetches are higher priority than streamed queries, which are higher priority than non-streamed queries.
const jobPriority = options.hardRefetch ? 0 : options.stream ? 1 : 2;
await Promise.all([ await Promise.all([
prisma.scenarioVariantCell.update({ prisma.scenarioVariantCell.update({
where: { where: {
@@ -184,6 +189,13 @@ export const queueQueryModel = async (cellId: string, stream: boolean) => {
jobQueuedAt: new Date(), jobQueuedAt: new Date(),
}, },
}), }),
queryModel.enqueue({ cellId, stream, numPreviousTries: 0 }, { jobKey: cellId }),
queryModel.enqueue(
{ cellId, stream: options.stream ?? false, numPreviousTries: 0 },
// Streamed queries are higher priority than non-streamed queries. Lower
// numbers are higher priority in graphile-worker.
{ jobKey: cellId, priority: jobPriority },
),
]); ]);
}; };

View File

@@ -13,5 +13,6 @@ export const runNewEval = defineTask<RunNewEvalJob>("runNewEval", async (task) =
}); });
export const queueRunNewEval = async (experimentId: string) => { export const queueRunNewEval = async (experimentId: string) => {
await runNewEval.enqueue({ experimentId }); // Evals are lower priority than completions
await runNewEval.enqueue({ experimentId }, { priority: 4 });
}; };

View File

@@ -9,10 +9,8 @@ import parsePromptConstructor from "~/promptConstructor/parse";
export const generateNewCell = async ( export const generateNewCell = async (
variantId: string, variantId: string,
scenarioId: string, scenarioId: string,
options?: { stream?: boolean }, options: { stream?: boolean; hardRefetch?: boolean } = {},
): Promise<void> => { ): Promise<void> => {
const stream = options?.stream ?? false;
const variant = await prisma.promptVariant.findUnique({ const variant = await prisma.promptVariant.findUnique({
where: { where: {
id: variantId, id: variantId,
@@ -121,6 +119,6 @@ export const generateNewCell = async (
}), }),
); );
} else { } else {
await queueQueryModel(cell.id, stream); await queueQueryModel(cell.id, options);
} }
}; };