Compare commits

...

14 Commits

Author SHA1 Message Date
Kyle Corbitt
8fa7b691db make max pool size configurable 2023-08-18 22:56:24 -07:00
Kyle Corbitt
947eba3216 Catch unhandled rejections in background worker
Previously, an unhandled promise rejection in the background worker would crash the process. This way we log it and don't crash.
2023-08-18 19:03:54 -07:00
arcticfly
ef1f9458f4 Add prompt ids (#177)
* Add prompt ids

* Add prompt ids
2023-08-18 16:56:17 -07:00
Kyle Corbitt
c6c7e746ee Merge pull request #180 from OpenPipe/priorities
Prioritize job execution
2023-08-18 13:46:31 -07:00
Kyle Corbitt
3be0a90960 Prioritize job execution
Makes it so our most critical jobs go through first. Priority order:

1. Force-refetched cells
2. Cells visible on the current page
3. All other cells
4. Retries
5. Evaluations
2023-08-18 13:44:33 -07:00
Kyle Corbitt
9b1f2ac30a new script to run workers 2023-08-18 13:01:01 -07:00
Kyle Corbitt
1b394cc72b more resources 2023-08-18 12:14:28 -07:00
Kyle Corbitt
26b9731bab worker env 2023-08-18 11:45:54 -07:00
Kyle Corbitt
7c8ec8f6a7 Merge pull request #179 from OpenPipe/job-dedupe
Run workers in a separate Docker container
2023-08-18 11:26:32 -07:00
Kyle Corbitt
10dd53e7f6 Run workers in a separate Docker container
We've outgrown the run-everything-on-one-machine setup. This change moves background jobs to a different Docker image in production. It also adds a `jobKey` to certain jobs so if we try to process the same cell multiple times it'll only actually run the job once.
2023-08-18 11:16:00 -07:00
Kyle Corbitt
b1802fc04b Merge pull request #176 from OpenPipe/more-js
Streaming + logging works in Typescript SDK
2023-08-18 08:56:56 -07:00
Kyle Corbitt
f2135ddc72 Streaming + logging works in Typescript SDK
Also added some high-level tests to minimize the chances that we're breaking anything.

The typescript SDK is mostly functional at this point, with the exception that we don't have a build process or way to import it when deployed as an NPM package.
2023-08-18 08:53:08 -07:00
arcticfly
ca89eafb0b Create new uiId for forked variants and scenarios (#175)
* Create new uiIds for forked variants and scenarios

* Add replaceVariant.mutateAsync to onSave dependencies
2023-08-18 08:09:07 -07:00
arcticfly
b50d47beaf Square header border when scrolled down (#174)
* Square header border when scrolled down

* Remove unused import
2023-08-18 01:41:47 -07:00
52 changed files with 716 additions and 456 deletions

View File

@@ -45,4 +45,4 @@ EXPOSE 3000
ENV PORT 3000
# Run the "run-prod.sh" script
CMD /code/app/run-prod.sh
CMD /code/app/scripts/run-prod.sh

View File

@@ -12,8 +12,8 @@
"build": "next build",
"dev:next": "TZ=UTC next dev",
"dev:wss": "pnpm tsx --watch src/wss-server.ts",
"dev:worker": "NODE_ENV='development' pnpm tsx --watch src/server/tasks/worker.ts",
"dev": "concurrently --kill-others 'pnpm dev:next' 'pnpm dev:wss' 'pnpm dev:worker'",
"worker": "NODE_ENV='development' pnpm tsx --watch src/server/tasks/worker.ts",
"dev": "concurrently --kill-others 'pnpm dev:next' 'pnpm dev:wss' 'pnpm worker --watch'",
"postinstall": "prisma generate",
"lint": "next lint",
"start": "TZ=UTC next start",

View File

@@ -0,0 +1,6 @@
#! /bin/bash
set -e
cd "$(dirname "$0")/.."
apt-get update
apt-get install -y htop psql

View File

@@ -10,6 +10,4 @@ pnpm tsx src/promptConstructor/migrate.ts
echo "Starting the server"
pnpm concurrently --kill-others \
"pnpm start" \
"pnpm tsx src/server/tasks/worker.ts"
pnpm start

10
app/scripts/run-workers-prod.sh Executable file
View File

@@ -0,0 +1,10 @@
#! /bin/bash
set -e
echo "Migrating the database"
pnpm prisma migrate deploy
echo "Starting 4 workers"
pnpm concurrently "pnpm worker" "pnpm worker" "pnpm worker" "pnpm worker"

13
app/scripts/test-docker.sh Executable file
View File

@@ -0,0 +1,13 @@
#! /bin/bash
set -e
cd "$(dirname "$0")/../.."
echo "Env is"
echo $ENVIRONMENT
docker build . --file app/Dockerfile --tag "openpipe-prod"
# Run the image
docker run --env-file app/.env -it --entrypoint "/bin/bash" "openpipe-prod"

View File

@@ -3,6 +3,7 @@
// https://docs.sentry.io/platforms/javascript/guides/nextjs/
import * as Sentry from "@sentry/nextjs";
import { isError } from "lodash-es";
import { env } from "~/env.mjs";
if (env.NEXT_PUBLIC_SENTRY_DSN) {
@@ -15,4 +16,10 @@ if (env.NEXT_PUBLIC_SENTRY_DSN) {
// Setting this option to true will print useful information to the console while you're setting up Sentry.
debug: false,
});
} else {
// Install local debug exception handler for rejected promises
process.on("unhandledRejection", (reason) => {
const reasonDetails = isError(reason) ? reason?.stack : reason;
console.log("Unhandled Rejection at:", reasonDetails);
});
}

View File

@@ -8,7 +8,7 @@ import {
useHandledAsyncCallback,
useVisibleScenarioIds,
} from "~/utils/hooks";
import { cellPadding } from "../constants";
import { cellPadding } from "./constants";
import { ActionButton } from "./ScenariosHeader";
export default function AddVariantButton() {

View File

@@ -43,7 +43,7 @@ export default function OutputCell({
type OutputSchema = Parameters<typeof provider.normalizeOutput>[0];
const { mutateAsync: hardRefetchMutate } = api.scenarioVariantCells.forceRefetch.useMutation();
const { mutateAsync: hardRefetchMutate } = api.scenarioVariantCells.hardRefetch.useMutation();
const [hardRefetch, hardRefetching] = useHandledAsyncCallback(async () => {
await hardRefetchMutate({ scenarioId: scenario.id, variantId: variant.id });
await utils.scenarioVariantCells.get.invalidate({

View File

@@ -16,7 +16,7 @@ import {
VStack,
} from "@chakra-ui/react";
import { BsArrowsAngleExpand, BsX } from "react-icons/bs";
import { cellPadding } from "../constants";
import { cellPadding } from "./constants";
import { FloatingLabelInput } from "./FloatingLabelInput";
import { ScenarioEditorModal } from "./ScenarioEditorModal";

View File

@@ -11,7 +11,7 @@ import {
IconButton,
Spinner,
} from "@chakra-ui/react";
import { cellPadding } from "../constants";
import { cellPadding } from "./constants";
import {
useExperiment,
useExperimentAccess,

View File

@@ -110,7 +110,7 @@ export default function VariantEditor(props: { variant: PromptVariant }) {
setIsChanged(false);
await utils.promptVariants.list.invalidate();
}, [checkForChanges]);
}, [checkForChanges, replaceVariant.mutateAsync]);
useEffect(() => {
if (monaco) {

View File

@@ -1,11 +1,11 @@
import { useState, type DragEvent } from "react";
import { type PromptVariant } from "../OutputsTable/types";
import { type PromptVariant } from "../types";
import { api } from "~/utils/api";
import { RiDraggable } from "react-icons/ri";
import { useExperimentAccess, useHandledAsyncCallback } from "~/utils/hooks";
import { HStack, Icon, Text, GridItem, type GridItemProps } from "@chakra-ui/react"; // Changed here
import { cellPadding, headerMinHeight } from "../constants";
import AutoResizeTextArea from "../AutoResizeTextArea";
import AutoResizeTextArea from "../../AutoResizeTextArea";
import VariantHeaderMenuButton from "./VariantHeaderMenuButton";
export default function VariantHeader(
@@ -75,7 +75,7 @@ export default function VariantHeader(
padding={0}
sx={{
position: "sticky",
top: "-2",
top: "0",
// Ensure that the menu always appears above the sticky header of other variants
zIndex: menuOpen ? "dropdown" : 10,
}}

View File

@@ -1,6 +1,4 @@
import { type PromptVariant } from "../OutputsTable/types";
import { api } from "~/utils/api";
import { useHandledAsyncCallback, useVisibleScenarioIds } from "~/utils/hooks";
import { useState } from "react";
import {
Icon,
Menu,
@@ -14,10 +12,13 @@ import {
} from "@chakra-ui/react";
import { BsFillTrashFill, BsGear, BsStars } from "react-icons/bs";
import { FaRegClone } from "react-icons/fa";
import { useState } from "react";
import { RefinePromptModal } from "../RefinePromptModal/RefinePromptModal";
import { RiExchangeFundsFill } from "react-icons/ri";
import { ChangeModelModal } from "../ChangeModelModal/ChangeModelModal";
import { api } from "~/utils/api";
import { useHandledAsyncCallback, useVisibleScenarioIds } from "~/utils/hooks";
import { type PromptVariant } from "../types";
import { RefinePromptModal } from "../../RefinePromptModal/RefinePromptModal";
import { ChangeModelModal } from "../../ChangeModelModal/ChangeModelModal";
export default function VariantHeaderMenuButton({
variant,

View File

@@ -1,6 +1,6 @@
import { HStack, Icon, Text, useToken } from "@chakra-ui/react";
import { type PromptVariant } from "./types";
import { cellPadding } from "../constants";
import { cellPadding } from "./constants";
import { api } from "~/utils/api";
import chroma from "chroma-js";
import { BsCurrencyDollar } from "react-icons/bs";

View File

@@ -3,13 +3,14 @@ import { api } from "~/utils/api";
import AddVariantButton from "./AddVariantButton";
import ScenarioRow from "./ScenarioRow";
import VariantEditor from "./VariantEditor";
import VariantHeader from "../VariantHeader/VariantHeader";
import VariantHeader from "./VariantHeader/VariantHeader";
import VariantStats from "./VariantStats";
import { ScenariosHeader } from "./ScenariosHeader";
import { borders } from "./styles";
import { useScenarios } from "~/utils/hooks";
import ScenarioPaginator from "./ScenarioPaginator";
import { Fragment } from "react";
import useScrolledPast from "./useHasScrolledPast";
export default function OutputsTable({ experimentId }: { experimentId: string | undefined }) {
const variants = api.promptVariants.list.useQuery(
@@ -18,6 +19,7 @@ export default function OutputsTable({ experimentId }: { experimentId: string |
);
const scenarios = useScenarios();
const shouldFlattenHeader = useScrolledPast(50);
if (!variants.data || !scenarios.data) return null;
@@ -63,8 +65,8 @@ export default function OutputsTable({ experimentId }: { experimentId: string |
variant={variant}
canHide={variants.data.length > 1}
rowStart={1}
borderTopLeftRadius={isFirst ? 8 : 0}
borderTopRightRadius={isLast ? 8 : 0}
borderTopLeftRadius={isFirst && !shouldFlattenHeader ? 8 : 0}
borderTopRightRadius={isLast && !shouldFlattenHeader ? 8 : 0}
{...sharedProps}
/>
<GridItem rowStart={2} {...sharedProps}>

View File

@@ -0,0 +1,34 @@
import { useState, useEffect } from "react";
const useScrolledPast = (scrollThreshold: number) => {
const [hasScrolledPast, setHasScrolledPast] = useState(true);
useEffect(() => {
const container = document.getElementById("output-container");
if (!container) {
console.warn('Element with id "outputs-container" not found.');
return;
}
const checkScroll = () => {
const { scrollTop } = container;
// Check if scrollTop is greater than or equal to scrollThreshold
setHasScrolledPast(scrollTop > scrollThreshold);
};
checkScroll();
container.addEventListener("scroll", checkScroll);
// Cleanup
return () => {
container.removeEventListener("scroll", checkScroll);
};
}, []);
return hasScrolledPast;
};
export default useScrolledPast;

View File

@@ -67,7 +67,13 @@ export default function ProjectMenu() {
);
return (
<VStack w="full" alignItems="flex-start" spacing={0} py={1}>
<VStack
w="full"
alignItems="flex-start"
spacing={0}
py={1}
zIndex={popover.isOpen ? "dropdown" : undefined}
>
<Popover
placement="bottom"
isOpen={popover.isOpen}

View File

@@ -26,6 +26,14 @@ export const env = createEnv({
SMTP_PORT: z.string().default("placeholder"),
SMTP_LOGIN: z.string().default("placeholder"),
SMTP_PASSWORD: z.string().default("placeholder"),
WORKER_CONCURRENCY: z
.string()
.default("10")
.transform((val) => parseInt(val)),
WORKER_MAX_POOL_SIZE: z
.string()
.default("10")
.transform((val) => parseInt(val)),
},
/**
@@ -68,6 +76,8 @@ export const env = createEnv({
SMTP_PORT: process.env.SMTP_PORT,
SMTP_LOGIN: process.env.SMTP_LOGIN,
SMTP_PASSWORD: process.env.SMTP_PASSWORD,
WORKER_CONCURRENCY: process.env.WORKER_CONCURRENCY,
WORKER_MAX_POOL_SIZE: process.env.WORKER_MAX_POOL_SIZE,
},
/**
* Run `build` or `dev` with `SKIP_ENV_VALIDATION` to skip env validation.

View File

@@ -16,7 +16,16 @@ export async function getCompletion(
try {
if (onStream) {
const resp = await openai.chat.completions.create(
{ ...input, stream: true },
{
...input,
stream: true,
openpipe: {
tags: {
prompt_id: "getCompletion",
stream: "true",
},
},
},
{
maxRetries: 0,
},
@@ -34,7 +43,16 @@ export async function getCompletion(
}
} else {
const resp = await openai.chat.completions.create(
{ ...input, stream: false },
{
...input,
stream: false,
openpipe: {
tags: {
prompt_id: "getCompletion",
stream: "false",
},
},
},
{
maxRetries: 0,
},

View File

@@ -124,7 +124,7 @@ export default function Experiment() {
<ExperimentHeaderButtons />
</PageHeaderContainer>
<ExperimentSettingsDrawer />
<Box w="100%" overflowX="auto" flex={1}>
<Box w="100%" overflowX="auto" flex={1} id="output-container">
<OutputsTable experimentId={experiment.data?.id} />
</Box>
</VStack>

View File

@@ -89,6 +89,11 @@ export const autogenerateDatasetEntries = async (
function_call: { name: "add_list_of_data" },
temperature: 0.5,
openpipe: {
tags: {
prompt_id: "autogenerateDatasetEntries",
},
},
});
const completionCallbacks = batchSizes.map((batchSize) =>

View File

@@ -98,6 +98,11 @@ export const autogenerateScenarioValues = async (
function_call: { name: "add_scenario" },
temperature: 0.5,
openpipe: {
tags: {
prompt_id: "autogenerateScenarioValues",
},
},
});
const parsed = JSON.parse(

View File

@@ -66,7 +66,7 @@ export const v1ApiRouter = createOpenApiRouter({
if (!existingResponse) return { respPayload: null };
await prisma.loggedCall.create({
const newCall = await prisma.loggedCall.create({
data: {
projectId: ctx.key.projectId,
requestedAt: new Date(input.requestedAt),
@@ -75,11 +75,7 @@ export const v1ApiRouter = createOpenApiRouter({
},
});
await createTags(
existingResponse.originalLoggedCall.projectId,
existingResponse.originalLoggedCallId,
input.tags,
);
await createTags(newCall.projectId, newCall.id, input.tags);
return {
respPayload: existingResponse.respPayload,
};
@@ -111,7 +107,7 @@ export const v1ApiRouter = createOpenApiRouter({
.default({}),
}),
)
.output(z.object({ status: z.literal("ok") }))
.output(z.object({ status: z.union([z.literal("ok"), z.literal("error")]) }))
.mutation(async ({ input, ctx }) => {
const reqPayload = await reqValidator.spa(input.reqPayload);
const respPayload = await respValidator.spa(input.respPayload);
@@ -212,6 +208,7 @@ export const v1ApiRouter = createOpenApiRouter({
createdAt: true,
cacheHit: true,
tags: true,
id: true,
modelResponse: {
select: {
id: true,
@@ -237,7 +234,7 @@ async function createTags(projectId: string, loggedCallId: string, tags: Record<
const tagsToCreate = Object.entries(tags).map(([name, value]) => ({
projectId,
loggedCallId,
name: name.replaceAll(/[^a-zA-Z0-9_$]/g, "_"),
name: name.replaceAll(/[^a-zA-Z0-9_$.]/g, "_"),
value,
}));
await prisma.loggedCallTag.createMany({

View File

@@ -178,6 +178,7 @@ export const experimentsRouter = createTRPCRouter({
existingToNewVariantIds.set(variant.id, newVariantId);
variantsToCreate.push({
...variant,
uiId: uuidv4(),
id: newVariantId,
experimentId: newExperimentId,
});
@@ -191,6 +192,7 @@ export const experimentsRouter = createTRPCRouter({
scenariosToCreate.push({
...scenario,
id: newScenarioId,
uiId: uuidv4(),
experimentId: newExperimentId,
variableValues: scenario.variableValues as Prisma.InputJsonValue,
});

View File

@@ -61,7 +61,7 @@ export const scenarioVariantCellsRouter = createTRPCRouter({
evalsComplete,
};
}),
forceRefetch: protectedProcedure
hardRefetch: protectedProcedure
.input(
z.object({
scenarioId: z.string(),
@@ -85,7 +85,10 @@ export const scenarioVariantCellsRouter = createTRPCRouter({
});
if (!cell) {
await generateNewCell(input.variantId, input.scenarioId, { stream: true });
await generateNewCell(input.variantId, input.scenarioId, {
stream: true,
hardRefetch: true,
});
return;
}
@@ -96,7 +99,7 @@ export const scenarioVariantCellsRouter = createTRPCRouter({
},
});
await queueQueryModel(cell.id, true);
await queueQueryModel(cell.id, { stream: true, hardRefetch: true });
}),
getTemplatedPromptMessage: publicProcedure
.input(

View File

@@ -1,19 +0,0 @@
import "dotenv/config";
import { openai } from "../utils/openai";
const resp = await openai.chat.completions.create({
model: "gpt-3.5-turbo-0613",
stream: true,
messages: [
{
role: "user",
content: "count to 20",
},
],
});
for await (const part of resp) {
console.log("part", part);
}
console.log("final resp", resp);

View File

@@ -1,4 +1,4 @@
import { type Helpers, type Task, makeWorkerUtils } from "graphile-worker";
import { type Helpers, type Task, makeWorkerUtils, TaskSpec } from "graphile-worker";
import { env } from "~/env.mjs";
let workerUtilsPromise: ReturnType<typeof makeWorkerUtils> | null = null;
@@ -16,9 +16,11 @@ function defineTask<TPayload>(
taskIdentifier: string,
taskHandler: (payload: TPayload, helpers: Helpers) => Promise<void>,
) {
const enqueue = async (payload: TPayload, runAt?: Date) => {
const enqueue = async (payload: TPayload, spec?: TaskSpec) => {
console.log("Enqueuing task", taskIdentifier, payload);
await (await workerUtils()).addJob(taskIdentifier, payload, { runAt });
const utils = await workerUtils();
return await utils.addJob(taskIdentifier, payload, spec);
};
const handler = (payload: TPayload, helpers: Helpers) => {

View File

@@ -25,7 +25,6 @@ function calculateDelay(numPreviousTries: number): number {
}
export const queryModel = defineTask<QueryModelJob>("queryModel", async (task) => {
console.log("RUNNING TASK", task);
const { cellId, stream, numPreviousTries } = task;
const cell = await prisma.scenarioVariantCell.findUnique({
where: { id: cellId },
@@ -153,7 +152,7 @@ export const queryModel = defineTask<QueryModelJob>("queryModel", async (task) =
stream,
numPreviousTries: numPreviousTries + 1,
},
retryTime,
{ runAt: retryTime, jobKey: cellId, priority: 3 },
);
await prisma.scenarioVariantCell.update({
where: { id: cellId },
@@ -172,7 +171,13 @@ export const queryModel = defineTask<QueryModelJob>("queryModel", async (task) =
}
});
export const queueQueryModel = async (cellId: string, stream: boolean) => {
export const queueQueryModel = async (
cellId: string,
options: { stream?: boolean; hardRefetch?: boolean } = {},
) => {
// Hard refetches are higher priority than streamed queries, which are higher priority than non-streamed queries.
const jobPriority = options.hardRefetch ? 0 : options.stream ? 1 : 2;
await Promise.all([
prisma.scenarioVariantCell.update({
where: {
@@ -184,6 +189,13 @@ export const queueQueryModel = async (cellId: string, stream: boolean) => {
jobQueuedAt: new Date(),
},
}),
queryModel.enqueue({ cellId, stream, numPreviousTries: 0 }),
queryModel.enqueue(
{ cellId, stream: options.stream ?? false, numPreviousTries: 0 },
// Streamed queries are higher priority than non-streamed queries. Lower
// numbers are higher priority in graphile-worker.
{ jobKey: cellId, priority: jobPriority },
),
]);
};

View File

@@ -13,5 +13,6 @@ export const runNewEval = defineTask<RunNewEvalJob>("runNewEval", async (task) =
});
export const queueRunNewEval = async (experimentId: string) => {
await runNewEval.enqueue({ experimentId });
// Evals are lower priority than completions
await runNewEval.enqueue({ experimentId }, { priority: 4 });
};

View File

@@ -0,0 +1,47 @@
import "dotenv/config";
import defineTask from "./defineTask";
import { type TaskList, run } from "graphile-worker";
import { env } from "~/env.mjs";
import "../../../sentry.server.config";
export type TestTask = { i: number };
// When a new eval is created, we want to run it on all existing outputs, but return the new eval first
export const testTask = defineTask<TestTask>("testTask", (task) => {
console.log("ran task ", task.i);
void new Promise((_resolve, reject) => setTimeout(reject, 500));
return Promise.resolve();
});
const registeredTasks = [testTask];
const taskList = registeredTasks.reduce((acc, task) => {
acc[task.task.identifier] = task.task.handler;
return acc;
}, {} as TaskList);
// process.on("unhandledRejection", (reason, promise) => {
// console.log("Unhandled Rejection at:", reason?.stack || reason);
// });
// Run a worker to execute jobs:
const runner = await run({
connectionString: env.DATABASE_URL,
concurrency: 10,
// Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
noHandleSignals: false,
pollInterval: 1000,
taskList,
});
console.log("Worker successfully started");
for (let i = 0; i < 10; i++) {
await testTask.enqueue({ i });
await new Promise((resolve) => setTimeout(resolve, 1000));
}
await runner.promise;

View File

@@ -1,5 +1,6 @@
import { type TaskList, run } from "graphile-worker";
import "dotenv/config";
import "../../../sentry.server.config";
import { env } from "~/env.mjs";
import { queryModel } from "./queryModel.task";
@@ -17,7 +18,8 @@ const taskList = registeredTasks.reduce((acc, task) => {
// Run a worker to execute jobs:
const runner = await run({
connectionString: env.DATABASE_URL,
concurrency: 10,
concurrency: env.WORKER_CONCURRENCY,
maxPoolSize: env.WORKER_MAX_POOL_SIZE,
// Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
noHandleSignals: false,
pollInterval: 1000,

View File

@@ -109,6 +109,12 @@ const requestUpdatedPromptFunction = async (
function_call: {
name: "update_prompt_constructor_function",
},
openpipe: {
tags: {
prompt_id: "deriveNewConstructFn",
model_translation: (!!newModel).toString(),
},
},
});
const argString = completion.choices[0]?.message?.function_call?.arguments || "{}";

View File

@@ -9,10 +9,8 @@ import parsePromptConstructor from "~/promptConstructor/parse";
export const generateNewCell = async (
variantId: string,
scenarioId: string,
options?: { stream?: boolean },
options: { stream?: boolean; hardRefetch?: boolean } = {},
): Promise<void> => {
const stream = options?.stream ?? false;
const variant = await prisma.promptVariant.findUnique({
where: {
id: variantId,
@@ -121,6 +119,6 @@ export const generateNewCell = async (
}),
);
} else {
await queueQueryModel(cell.id, stream);
await queueQueryModel(cell.id, options);
}
};

View File

@@ -53,6 +53,11 @@ export const runGpt4Eval = async (
},
},
],
openpipe: {
tags: {
prompt_id: "runOneEval",
},
},
});
try {

View File

@@ -1,9 +0,0 @@
#! /bin/bash
set -e
cd "$(dirname "$0")/.."
source app/.env
docker build . --file app/Dockerfile

View File

@@ -141,9 +141,19 @@
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": [
"ok"
"anyOf": [
{
"type": "string",
"enum": [
"ok"
]
},
{
"type": "string",
"enum": [
"error"
]
}
]
}
},

View File

@@ -13,7 +13,8 @@ from .local_testing_only_get_latest_logged_call_response_200_tags import (
from .report_json_body import ReportJsonBody
from .report_json_body_tags import ReportJsonBodyTags
from .report_response_200 import ReportResponse200
from .report_response_200_status import ReportResponse200Status
from .report_response_200_status_type_0 import ReportResponse200StatusType0
from .report_response_200_status_type_1 import ReportResponse200StatusType1
__all__ = (
"CheckCacheJsonBody",
@@ -25,5 +26,6 @@ __all__ = (
"ReportJsonBody",
"ReportJsonBodyTags",
"ReportResponse200",
"ReportResponse200Status",
"ReportResponse200StatusType0",
"ReportResponse200StatusType1",
)

View File

@@ -1,8 +1,9 @@
from typing import Any, Dict, Type, TypeVar
from typing import Any, Dict, Type, TypeVar, Union
from attrs import define
from ..models.report_response_200_status import ReportResponse200Status
from ..models.report_response_200_status_type_0 import ReportResponse200StatusType0
from ..models.report_response_200_status_type_1 import ReportResponse200StatusType1
T = TypeVar("T", bound="ReportResponse200")
@@ -11,13 +12,19 @@ T = TypeVar("T", bound="ReportResponse200")
class ReportResponse200:
"""
Attributes:
status (ReportResponse200Status):
status (Union[ReportResponse200StatusType0, ReportResponse200StatusType1]):
"""
status: ReportResponse200Status
status: Union[ReportResponse200StatusType0, ReportResponse200StatusType1]
def to_dict(self) -> Dict[str, Any]:
status = self.status.value
status: str
if isinstance(self.status, ReportResponse200StatusType0):
status = self.status.value
else:
status = self.status.value
field_dict: Dict[str, Any] = {}
field_dict.update(
@@ -31,7 +38,23 @@ class ReportResponse200:
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
status = ReportResponse200Status(d.pop("status"))
def _parse_status(data: object) -> Union[ReportResponse200StatusType0, ReportResponse200StatusType1]:
try:
if not isinstance(data, str):
raise TypeError()
status_type_0 = ReportResponse200StatusType0(data)
return status_type_0
except: # noqa: E722
pass
if not isinstance(data, str):
raise TypeError()
status_type_1 = ReportResponse200StatusType1(data)
return status_type_1
status = _parse_status(d.pop("status"))
report_response_200 = cls(
status=status,

View File

@@ -1,7 +1,7 @@
from enum import Enum
class ReportResponse200Status(str, Enum):
class ReportResponse200StatusType0(str, Enum):
OK = "ok"
def __str__(self) -> str:

View File

@@ -0,0 +1,8 @@
from enum import Enum
class ReportResponse200StatusType1(str, Enum):
ERROR = "error"
def __str__(self) -> str:
return str(self.value)

View File

@@ -24,10 +24,18 @@ def _get_tags(openpipe_options):
return ReportJsonBodyTags.from_dict(tags)
def _should_check_cache(openpipe_options):
def _should_check_cache(openpipe_options, req_payload):
if configured_client.token == "":
return False
return openpipe_options.get("cache", False)
cache_requested = openpipe_options.get("cache", False)
streaming = req_payload.get("stream", False)
if cache_requested and streaming:
print(
"Caching is not yet supported for streaming requests. Ignoring cache flag. Vote for this feature at https://github.com/OpenPipe/OpenPipe/issues/159"
)
return False
return cache_requested
def _process_cache_payload(
@@ -44,7 +52,7 @@ def maybe_check_cache(
openpipe_options={},
req_payload={},
):
if not _should_check_cache(openpipe_options):
if not _should_check_cache(openpipe_options, req_payload):
return None
try:
payload = check_cache.sync(
@@ -68,7 +76,7 @@ async def maybe_check_cache_async(
openpipe_options={},
req_payload={},
):
if not _should_check_cache(openpipe_options):
if not _should_check_cache(openpipe_options, req_payload):
return None
try:

View File

@@ -13,15 +13,17 @@
"author": "",
"license": "Apache-2.0",
"dependencies": {
"encoding": "^0.1.13",
"form-data": "^4.0.0",
"lodash-es": "^4.17.21",
"node-fetch": "^3.3.2",
"node-fetch": "^2.6.12",
"openai-beta": "npm:openai@4.0.0-beta.7",
"openai-legacy": "npm:openai@3.3.0"
},
"devDependencies": {
"@types/lodash-es": "^4.17.8",
"@types/node": "^20.4.8",
"@types/node-fetch": "^2.6.4",
"dotenv": "^16.3.1",
"tsx": "^3.12.7",
"typescript": "^5.0.4",

View File

@@ -2,301 +2,283 @@
/* istanbul ignore file */
/* tslint:disable */
/* eslint-disable */
import FormData from "form-data";
import fetch, { Headers } from "node-fetch";
import type { RequestInit, Response } from "node-fetch";
import FormData from 'form-data';
import fetch, { Headers } from 'node-fetch';
import type { RequestInit, Response } from 'node-fetch';
import type { AbortSignal } from 'node-fetch/externals';
// @ts-expect-error TODO maybe I need an older node-fetch or something?
import type { AbortSignal } from "node-fetch/externals";
import { ApiError } from './ApiError';
import type { ApiRequestOptions } from './ApiRequestOptions';
import type { ApiResult } from './ApiResult';
import { CancelablePromise } from './CancelablePromise';
import type { OnCancel } from './CancelablePromise';
import type { OpenAPIConfig } from './OpenAPI';
import { ApiError } from "./ApiError";
import type { ApiRequestOptions } from "./ApiRequestOptions";
import type { ApiResult } from "./ApiResult";
import { CancelablePromise } from "./CancelablePromise";
import type { OnCancel } from "./CancelablePromise";
import type { OpenAPIConfig } from "./OpenAPI";
export const isDefined = <T>(
value: T | null | undefined
): value is Exclude<T, null | undefined> => {
return value !== undefined && value !== null;
export const isDefined = <T>(value: T | null | undefined): value is Exclude<T, null | undefined> => {
return value !== undefined && value !== null;
};
export const isString = (value: any): value is string => {
return typeof value === "string";
return typeof value === 'string';
};
export const isStringWithValue = (value: any): value is string => {
return isString(value) && value !== "";
return isString(value) && value !== '';
};
export const isBlob = (value: any): value is Blob => {
return (
typeof value === "object" &&
typeof value.type === "string" &&
typeof value.stream === "function" &&
typeof value.arrayBuffer === "function" &&
typeof value.constructor === "function" &&
typeof value.constructor.name === "string" &&
/^(Blob|File)$/.test(value.constructor.name) &&
/^(Blob|File)$/.test(value[Symbol.toStringTag])
);
return (
typeof value === 'object' &&
typeof value.type === 'string' &&
typeof value.stream === 'function' &&
typeof value.arrayBuffer === 'function' &&
typeof value.constructor === 'function' &&
typeof value.constructor.name === 'string' &&
/^(Blob|File)$/.test(value.constructor.name) &&
/^(Blob|File)$/.test(value[Symbol.toStringTag])
);
};
export const isFormData = (value: any): value is FormData => {
return value instanceof FormData;
return value instanceof FormData;
};
export const base64 = (str: string): string => {
try {
return btoa(str);
} catch (err) {
// @ts-ignore
return Buffer.from(str).toString("base64");
}
try {
return btoa(str);
} catch (err) {
// @ts-ignore
return Buffer.from(str).toString('base64');
}
};
export const getQueryString = (params: Record<string, any>): string => {
const qs: string[] = [];
const qs: string[] = [];
const append = (key: string, value: any) => {
qs.push(`${encodeURIComponent(key)}=${encodeURIComponent(String(value))}`);
};
const append = (key: string, value: any) => {
qs.push(`${encodeURIComponent(key)}=${encodeURIComponent(String(value))}`);
};
const process = (key: string, value: any) => {
if (isDefined(value)) {
if (Array.isArray(value)) {
value.forEach((v) => {
process(key, v);
});
} else if (typeof value === "object") {
Object.entries(value).forEach(([k, v]) => {
process(`${key}[${k}]`, v);
});
} else {
append(key, value);
}
const process = (key: string, value: any) => {
if (isDefined(value)) {
if (Array.isArray(value)) {
value.forEach(v => {
process(key, v);
});
} else if (typeof value === 'object') {
Object.entries(value).forEach(([k, v]) => {
process(`${key}[${k}]`, v);
});
} else {
append(key, value);
}
}
};
Object.entries(params).forEach(([key, value]) => {
process(key, value);
});
if (qs.length > 0) {
return `?${qs.join('&')}`;
}
};
Object.entries(params).forEach(([key, value]) => {
process(key, value);
});
if (qs.length > 0) {
return `?${qs.join("&")}`;
}
return "";
return '';
};
const getUrl = (config: OpenAPIConfig, options: ApiRequestOptions): string => {
const encoder = config.ENCODE_PATH || encodeURI;
const encoder = config.ENCODE_PATH || encodeURI;
const path = options.url
.replace("{api-version}", config.VERSION)
.replace(/{(.*?)}/g, (substring: string, group: string) => {
if (options.path?.hasOwnProperty(group)) {
return encoder(String(options.path[group]));
}
return substring;
});
const path = options.url
.replace('{api-version}', config.VERSION)
.replace(/{(.*?)}/g, (substring: string, group: string) => {
if (options.path?.hasOwnProperty(group)) {
return encoder(String(options.path[group]));
}
return substring;
});
const url = `${config.BASE}${path}`;
if (options.query) {
return `${url}${getQueryString(options.query)}`;
}
return url;
const url = `${config.BASE}${path}`;
if (options.query) {
return `${url}${getQueryString(options.query)}`;
}
return url;
};
export const getFormData = (options: ApiRequestOptions): FormData | undefined => {
if (options.formData) {
const formData = new FormData();
if (options.formData) {
const formData = new FormData();
const process = (key: string, value: any) => {
if (isString(value) || isBlob(value)) {
formData.append(key, value);
} else {
formData.append(key, JSON.stringify(value));
}
};
const process = (key: string, value: any) => {
if (isString(value) || isBlob(value)) {
formData.append(key, value);
} else {
formData.append(key, JSON.stringify(value));
}
};
Object.entries(options.formData)
.filter(([_, value]) => isDefined(value))
.forEach(([key, value]) => {
if (Array.isArray(value)) {
value.forEach((v) => process(key, v));
} else {
process(key, value);
}
});
Object.entries(options.formData)
.filter(([_, value]) => isDefined(value))
.forEach(([key, value]) => {
if (Array.isArray(value)) {
value.forEach(v => process(key, v));
} else {
process(key, value);
}
});
return formData;
}
return undefined;
return formData;
}
return undefined;
};
type Resolver<T> = (options: ApiRequestOptions) => Promise<T>;
export const resolve = async <T>(
options: ApiRequestOptions,
resolver?: T | Resolver<T>
): Promise<T | undefined> => {
if (typeof resolver === "function") {
return (resolver as Resolver<T>)(options);
}
return resolver;
export const resolve = async <T>(options: ApiRequestOptions, resolver?: T | Resolver<T>): Promise<T | undefined> => {
if (typeof resolver === 'function') {
return (resolver as Resolver<T>)(options);
}
return resolver;
};
export const getHeaders = async (
config: OpenAPIConfig,
options: ApiRequestOptions
): Promise<Headers> => {
const token = await resolve(options, config.TOKEN);
const username = await resolve(options, config.USERNAME);
const password = await resolve(options, config.PASSWORD);
const additionalHeaders = await resolve(options, config.HEADERS);
export const getHeaders = async (config: OpenAPIConfig, options: ApiRequestOptions): Promise<Headers> => {
const token = await resolve(options, config.TOKEN);
const username = await resolve(options, config.USERNAME);
const password = await resolve(options, config.PASSWORD);
const additionalHeaders = await resolve(options, config.HEADERS);
const headers = Object.entries({
Accept: "application/json",
...additionalHeaders,
...options.headers,
})
.filter(([_, value]) => isDefined(value))
.reduce(
(headers, [key, value]) => ({
...headers,
[key]: String(value),
}),
{} as Record<string, string>
);
const headers = Object.entries({
Accept: 'application/json',
...additionalHeaders,
...options.headers,
})
.filter(([_, value]) => isDefined(value))
.reduce((headers, [key, value]) => ({
...headers,
[key]: String(value),
}), {} as Record<string, string>);
if (isStringWithValue(token)) {
headers["Authorization"] = `Bearer ${token}`;
}
if (isStringWithValue(username) && isStringWithValue(password)) {
const credentials = base64(`${username}:${password}`);
headers["Authorization"] = `Basic ${credentials}`;
}
if (options.body) {
if (options.mediaType) {
headers["Content-Type"] = options.mediaType;
} else if (isBlob(options.body)) {
headers["Content-Type"] = "application/octet-stream";
} else if (isString(options.body)) {
headers["Content-Type"] = "text/plain";
} else if (!isFormData(options.body)) {
headers["Content-Type"] = "application/json";
if (isStringWithValue(token)) {
headers['Authorization'] = `Bearer ${token}`;
}
}
return new Headers(headers);
if (isStringWithValue(username) && isStringWithValue(password)) {
const credentials = base64(`${username}:${password}`);
headers['Authorization'] = `Basic ${credentials}`;
}
if (options.body) {
if (options.mediaType) {
headers['Content-Type'] = options.mediaType;
} else if (isBlob(options.body)) {
headers['Content-Type'] = 'application/octet-stream';
} else if (isString(options.body)) {
headers['Content-Type'] = 'text/plain';
} else if (!isFormData(options.body)) {
headers['Content-Type'] = 'application/json';
}
}
return new Headers(headers);
};
export const getRequestBody = (options: ApiRequestOptions): any => {
if (options.body !== undefined) {
if (options.mediaType?.includes("/json")) {
return JSON.stringify(options.body);
} else if (isString(options.body) || isBlob(options.body) || isFormData(options.body)) {
return options.body as any;
} else {
return JSON.stringify(options.body);
if (options.body !== undefined) {
if (options.mediaType?.includes('/json')) {
return JSON.stringify(options.body)
} else if (isString(options.body) || isBlob(options.body) || isFormData(options.body)) {
return options.body as any;
} else {
return JSON.stringify(options.body);
}
}
}
return undefined;
return undefined;
};
export const sendRequest = async (
options: ApiRequestOptions,
url: string,
body: any,
formData: FormData | undefined,
headers: Headers,
onCancel: OnCancel
options: ApiRequestOptions,
url: string,
body: any,
formData: FormData | undefined,
headers: Headers,
onCancel: OnCancel
): Promise<Response> => {
const controller = new AbortController();
const controller = new AbortController();
const request: RequestInit = {
headers,
method: options.method,
body: body ?? formData,
signal: controller.signal as AbortSignal,
};
const request: RequestInit = {
headers,
method: options.method,
body: body ?? formData,
signal: controller.signal as AbortSignal,
};
onCancel(() => controller.abort());
onCancel(() => controller.abort());
return await fetch(url, request);
return await fetch(url, request);
};
export const getResponseHeader = (
response: Response,
responseHeader?: string
): string | undefined => {
if (responseHeader) {
const content = response.headers.get(responseHeader);
if (isString(content)) {
return content;
export const getResponseHeader = (response: Response, responseHeader?: string): string | undefined => {
if (responseHeader) {
const content = response.headers.get(responseHeader);
if (isString(content)) {
return content;
}
}
}
return undefined;
return undefined;
};
export const getResponseBody = async (response: Response): Promise<any> => {
if (response.status !== 204) {
try {
const contentType = response.headers.get("Content-Type");
if (contentType) {
const jsonTypes = ["application/json", "application/problem+json"];
const isJSON = jsonTypes.some((type) => contentType.toLowerCase().startsWith(type));
if (isJSON) {
return await response.json();
} else {
return await response.text();
if (response.status !== 204) {
try {
const contentType = response.headers.get('Content-Type');
if (contentType) {
const jsonTypes = ['application/json', 'application/problem+json']
const isJSON = jsonTypes.some(type => contentType.toLowerCase().startsWith(type));
if (isJSON) {
return await response.json();
} else {
return await response.text();
}
}
} catch (error) {
console.error(error);
}
}
} catch (error) {
console.error(error);
}
}
return undefined;
return undefined;
};
export const catchErrorCodes = (options: ApiRequestOptions, result: ApiResult): void => {
const errors: Record<number, string> = {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
500: "Internal Server Error",
502: "Bad Gateway",
503: "Service Unavailable",
...options.errors,
};
const errors: Record<number, string> = {
400: 'Bad Request',
401: 'Unauthorized',
403: 'Forbidden',
404: 'Not Found',
500: 'Internal Server Error',
502: 'Bad Gateway',
503: 'Service Unavailable',
...options.errors,
}
const error = errors[result.status];
if (error) {
throw new ApiError(options, result, error);
}
const error = errors[result.status];
if (error) {
throw new ApiError(options, result, error);
}
if (!result.ok) {
const errorStatus = result.status ?? "unknown";
const errorStatusText = result.statusText ?? "unknown";
const errorBody = (() => {
try {
return JSON.stringify(result.body, null, 2);
} catch (e) {
return undefined;
}
})();
if (!result.ok) {
const errorStatus = result.status ?? 'unknown';
const errorStatusText = result.statusText ?? 'unknown';
const errorBody = (() => {
try {
return JSON.stringify(result.body, null, 2);
} catch (e) {
return undefined;
}
})();
throw new ApiError(
options,
result,
`Generic Error: status: ${errorStatus}; status text: ${errorStatusText}; body: ${errorBody}`
);
}
throw new ApiError(options, result,
`Generic Error: status: ${errorStatus}; status text: ${errorStatusText}; body: ${errorBody}`
);
}
};
/**
@@ -306,36 +288,33 @@ export const catchErrorCodes = (options: ApiRequestOptions, result: ApiResult):
* @returns CancelablePromise<T>
* @throws ApiError
*/
export const request = <T>(
config: OpenAPIConfig,
options: ApiRequestOptions
): CancelablePromise<T> => {
return new CancelablePromise(async (resolve, reject, onCancel) => {
try {
const url = getUrl(config, options);
const formData = getFormData(options);
const body = getRequestBody(options);
const headers = await getHeaders(config, options);
export const request = <T>(config: OpenAPIConfig, options: ApiRequestOptions): CancelablePromise<T> => {
return new CancelablePromise(async (resolve, reject, onCancel) => {
try {
const url = getUrl(config, options);
const formData = getFormData(options);
const body = getRequestBody(options);
const headers = await getHeaders(config, options);
if (!onCancel.isCancelled) {
const response = await sendRequest(options, url, body, formData, headers, onCancel);
const responseBody = await getResponseBody(response);
const responseHeader = getResponseHeader(response, options.responseHeader);
if (!onCancel.isCancelled) {
const response = await sendRequest(options, url, body, formData, headers, onCancel);
const responseBody = await getResponseBody(response);
const responseHeader = getResponseHeader(response, options.responseHeader);
const result: ApiResult = {
url,
ok: response.ok,
status: response.status,
statusText: response.statusText,
body: responseHeader ?? responseBody,
};
const result: ApiResult = {
url,
ok: response.ok,
status: response.status,
statusText: response.statusText,
body: responseHeader ?? responseBody,
};
catchErrorCodes(options, result);
catchErrorCodes(options, result);
resolve(result.body);
}
} catch (error) {
reject(error);
}
});
resolve(result.body);
}
} catch (error) {
reject(error);
}
});
};

View File

@@ -82,7 +82,7 @@ export class DefaultService {
tags?: Record<string, string>;
},
): CancelablePromise<{
status: 'ok';
status: ('ok' | 'error');
}> {
return this.httpRequest.request({
method: 'POST',

View File

@@ -2,10 +2,13 @@ import dotenv from "dotenv";
import { expect, test } from "vitest";
import OpenAI from ".";
import {
ChatCompletion,
CompletionCreateParams,
CreateChatCompletionRequestMessage,
} from "openai-beta/resources/chat/completions";
import { OPClient } from "../codegen";
import mergeChunks from "./mergeChunks";
import assert from "assert";
dotenv.config({ path: "../.env" });
@@ -31,9 +34,7 @@ test("basic call", async () => {
};
const completion = await oaiClient.chat.completions.create({
...payload,
openpipe: {
tags: { promptId: "test" },
},
openpipe: { tags: { promptId: "test" } },
});
await completion.openpipe.reportingFinished;
const lastLogged = await lastLoggedCall();
@@ -46,29 +47,32 @@ const randomString = (length: number) => {
const characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
return Array.from(
{ length },
() => characters[Math.floor(Math.random() * characters.length)]
() => characters[Math.floor(Math.random() * characters.length)],
).join("");
};
test.skip("streaming", async () => {
test("streaming", async () => {
const completion = await oaiClient.chat.completions.create({
model: "gpt-3.5-turbo",
messages: [{ role: "system", content: "count to 4" }],
messages: [{ role: "system", content: "count to 3" }],
stream: true,
});
let merged = null;
let merged: ChatCompletion | null = null;
for await (const chunk of completion) {
merged = merge_openai_chunks(merged, chunk);
merged = mergeChunks(merged, chunk);
}
const lastLogged = await lastLoggedCall();
expect(lastLogged?.modelResponse?.respPayload.choices[0].message.content).toBe(
merged.choices[0].message.content
);
await completion.openpipe.reportingFinished;
expect(merged).toMatchObject(lastLogged?.modelResponse?.respPayload);
expect(lastLogged?.modelResponse?.reqPayload.messages).toMatchObject([
{ role: "system", content: "count to 3" },
]);
});
test.skip("bad call streaming", async () => {
test("bad call streaming", async () => {
try {
await oaiClient.chat.completions.create({
model: "gpt-3.5-turbo-blaster",
@@ -76,26 +80,29 @@ test.skip("bad call streaming", async () => {
stream: true,
});
} catch (e) {
await e.openpipe.reportingFinished;
const lastLogged = await lastLoggedCall();
expect(lastLogged?.modelResponse?.errorMessage).toBe(
"The model `gpt-3.5-turbo-blaster` does not exist"
expect(lastLogged?.modelResponse?.errorMessage).toEqual(
"The model `gpt-3.5-turbo-blaster` does not exist",
);
expect(lastLogged?.modelResponse?.statusCode).toBe(404);
expect(lastLogged?.modelResponse?.statusCode).toEqual(404);
}
});
test("bad call", async () => {
try {
await oaiClient.chat.completions.create({
model: "gpt-3.5-turbo-booster",
model: "gpt-3.5-turbo-buster",
messages: [{ role: "system", content: "count to 10" }],
});
} catch (e) {
assert("openpipe" in e);
await e.openpipe.reportingFinished;
const lastLogged = await lastLoggedCall();
expect(lastLogged?.modelResponse?.errorMessage).toBe(
"The model `gpt-3.5-turbo-booster` does not exist"
expect(lastLogged?.modelResponse?.errorMessage).toEqual(
"The model `gpt-3.5-turbo-buster` does not exist",
);
expect(lastLogged?.modelResponse?.statusCode).toBe(404);
expect(lastLogged?.modelResponse?.statusCode).toEqual(404);
}
});
@@ -109,12 +116,12 @@ test("caching", async () => {
messages: [message],
openpipe: { cache: true },
});
expect(completion.openpipe.cacheStatus).toBe("MISS");
expect(completion.openpipe.cacheStatus).toEqual("MISS");
await completion.openpipe.reportingFinished;
const firstLogged = await lastLoggedCall();
expect(completion.choices[0].message.content).toBe(
firstLogged?.modelResponse?.respPayload.choices[0].message.content
expect(completion.choices[0].message.content).toEqual(
firstLogged?.modelResponse?.respPayload.choices[0].message.content,
);
const completion2 = await oaiClient.chat.completions.create({
@@ -122,5 +129,5 @@ test("caching", async () => {
messages: [message],
openpipe: { cache: true },
});
expect(completion2.openpipe.cacheStatus).toBe("HIT");
expect(completion2.openpipe.cacheStatus).toEqual("HIT");
});

View File

@@ -5,9 +5,9 @@ import {
ChatCompletion,
ChatCompletionChunk,
CompletionCreateParams,
Completions,
} from "openai-beta/resources/chat/completions";
import { WrappedStream } from "./streaming";
import { DefaultService, OPClient } from "../codegen";
import { Stream } from "openai-beta/streaming";
import { OpenPipeArgs, OpenPipeMeta, type OpenPipeConfig, getTags } from "../shared";
@@ -27,11 +27,11 @@ export default class OpenAI extends openai.OpenAI {
BASE:
openpipe?.baseUrl ?? readEnv("OPENPIPE_BASE_URL") ?? "https://app.openpipe.ai/api/v1",
TOKEN: openPipeApiKey,
})
}),
);
} else {
console.warn(
"You're using the OpenPipe client without an API key. No completion requests will be logged."
"You're using the OpenPipe client without an API key. No completion requests will be logged.",
);
}
}
@@ -43,10 +43,10 @@ class WrappedChat extends openai.OpenAI.Chat {
this.completions.opClient = client;
}
completions: InstrumentedCompletions = new InstrumentedCompletions(this.client);
completions: WrappedCompletions = new WrappedCompletions(this.client);
}
class InstrumentedCompletions extends openai.OpenAI.Chat.Completions {
class WrappedCompletions extends openai.OpenAI.Chat.Completions {
opClient?: OPClient;
constructor(client: openai.OpenAI, opClient?: OPClient) {
@@ -54,32 +54,35 @@ class InstrumentedCompletions extends openai.OpenAI.Chat.Completions {
this.opClient = opClient;
}
_report(args: Parameters<DefaultService["report"]>[0]) {
async _report(args: Parameters<DefaultService["report"]>[0]) {
try {
return this.opClient ? this.opClient.default.report(args) : Promise.resolve();
this.opClient ? await this.opClient.default.report(args) : Promise.resolve();
} catch (e) {
console.error(e);
return Promise.resolve();
}
}
create(
body: CompletionCreateParams.CreateChatCompletionRequestNonStreaming & OpenPipeArgs,
options?: Core.RequestOptions
options?: Core.RequestOptions,
): Promise<Core.APIResponse<ChatCompletion & { openpipe: OpenPipeMeta }>>;
create(
body: CompletionCreateParams.CreateChatCompletionRequestStreaming & OpenPipeArgs,
options?: Core.RequestOptions
): Promise<Core.APIResponse<Stream<ChatCompletionChunk>>>;
options?: Core.RequestOptions,
): Promise<Core.APIResponse<WrappedStream>>;
async create(
{ openpipe, ...body }: CompletionCreateParams & OpenPipeArgs,
options?: Core.RequestOptions
): Promise<
Core.APIResponse<(ChatCompletion & { openpipe: OpenPipeMeta }) | Stream<ChatCompletionChunk>>
> {
console.log("LALALA REPORT", this.opClient);
options?: Core.RequestOptions,
): Promise<Core.APIResponse<(ChatCompletion & { openpipe: OpenPipeMeta }) | WrappedStream>> {
const requestedAt = Date.now();
const cacheRequested = openpipe?.cache ?? false;
let reportingFinished: OpenPipeMeta["reportingFinished"] = Promise.resolve();
let cacheRequested = openpipe?.cache ?? false;
if (cacheRequested && body.stream) {
console.warn(
`Caching is not yet supported for streaming requests. Ignoring cache flag. Vote for this feature at https://github.com/OpenPipe/OpenPipe/issues/159`,
);
cacheRequested = false;
}
if (cacheRequested) {
try {
@@ -92,12 +95,13 @@ class InstrumentedCompletions extends openai.OpenAI.Chat.Completions {
.then((res) => res.respPayload);
if (cached) {
const meta = {
cacheStatus: "HIT",
reportingFinished,
};
return {
...cached,
openpipe: {
cacheStatus: "HIT",
reportingFinished: Promise.resolve(),
},
openpipe: meta,
};
}
} catch (e) {
@@ -105,15 +109,23 @@ class InstrumentedCompletions extends openai.OpenAI.Chat.Completions {
}
}
let reportingFinished: OpenPipeMeta["reportingFinished"] = Promise.resolve();
try {
if (body.stream) {
const stream = await super.create(body, options);
const wrappedStream = new WrappedStream(stream, (response) =>
this._report({
requestedAt,
receivedAt: Date.now(),
reqPayload: body,
respPayload: response,
statusCode: 200,
tags: getTags(openpipe),
}),
);
// Do some logging of each chunk here
return stream;
return wrappedStream;
} else {
const response = await super.create(body, options);
@@ -147,6 +159,16 @@ class InstrumentedCompletions extends openai.OpenAI.Chat.Completions {
tags: getTags(openpipe),
});
}
// make sure error is an object we can add properties to
if (typeof error === "object" && error !== null) {
error = {
...error,
openpipe: {
cacheStatus: cacheRequested ? "MISS" : "SKIP",
reportingFinished,
},
};
}
throw error;
}

View File

@@ -0,0 +1,43 @@
import { ChatCompletion, ChatCompletionChunk } from "openai-beta/resources/chat";
import { Stream } from "openai-beta/streaming";
import { OpenPipeMeta } from "../shared";
import mergeChunks from "./mergeChunks";
export class WrappedStream extends Stream<ChatCompletionChunk> {
openpipe: OpenPipeMeta;
private resolveReportingFinished: () => void = () => {};
private report: (response: unknown) => Promise<void>;
constructor(stream: Stream<ChatCompletionChunk>, report: (response: unknown) => Promise<void>) {
super(stream.response, stream.controller);
this.report = report;
const reportingFinished = new Promise<void>((resolve) => {
this.resolveReportingFinished = resolve;
});
this.openpipe = {
cacheStatus: "MISS",
reportingFinished,
};
}
async *[Symbol.asyncIterator](): AsyncIterator<ChatCompletionChunk, any, undefined> {
const iterator = super[Symbol.asyncIterator]();
let combinedResponse: ChatCompletion | null = null;
while (true) {
const result = await iterator.next();
if (result.done) break;
combinedResponse = mergeChunks(combinedResponse, result.value);
yield result.value;
}
await this.report(combinedResponse);
// Resolve the promise here
this.resolveReportingFinished();
}
}

View File

@@ -1,4 +1,5 @@
import pkg from "../package.json";
import { DefaultService } from "./codegen";
export type OpenPipeConfig = {
apiKey?: string;
@@ -15,9 +16,11 @@ export type OpenPipeMeta = {
// We report your call to OpenPipe asynchronously in the background. If you
// need to wait until the report is sent to take further action, you can await
// this promise.
reportingFinished: Promise<void | { status: "ok" }>;
reportingFinished: Promise<void>;
};
export type ReportFn = (...args: Parameters<DefaultService["report"]>) => Promise<void>;
export const getTags = (args: OpenPipeArgs["openpipe"]): Record<string, string> => ({
...args?.tags,
...(args?.cache ? { $cache: args.cache?.toString() } : {}),

79
pnpm-lock.yaml generated
View File

@@ -166,7 +166,7 @@ importers:
version: 6.9.4
openai:
specifier: 4.0.0-beta.7
version: 4.0.0-beta.7
version: 4.0.0-beta.7(encoding@0.1.13)
openpipe:
specifier: workspace:*
version: link:../client-libs/typescript
@@ -357,6 +357,9 @@ importers:
client-libs/typescript:
dependencies:
encoding:
specifier: ^0.1.13
version: 0.1.13
form-data:
specifier: ^4.0.0
version: 4.0.0
@@ -364,11 +367,11 @@ importers:
specifier: ^4.17.21
version: 4.17.21
node-fetch:
specifier: ^3.3.2
version: 3.3.2
specifier: ^2.6.12
version: 2.6.12(encoding@0.1.13)
openai-beta:
specifier: npm:openai@4.0.0-beta.7
version: /openai@4.0.0-beta.7
version: /openai@4.0.0-beta.7(encoding@0.1.13)
openai-legacy:
specifier: npm:openai@3.3.0
version: /openai@3.3.0
@@ -379,6 +382,9 @@ importers:
'@types/node':
specifier: ^20.4.8
version: 20.4.8
'@types/node-fetch':
specifier: ^2.6.4
version: 2.6.4
dotenv:
specifier: ^16.3.1
version: 16.3.1
@@ -416,7 +422,7 @@ packages:
digest-fetch: 1.3.0
form-data-encoder: 1.7.2
formdata-node: 4.4.1
node-fetch: 2.6.12
node-fetch: 2.6.12(encoding@0.1.13)
transitivePeerDependencies:
- encoding
dev: false
@@ -2690,7 +2696,7 @@ packages:
dependencies:
https-proxy-agent: 5.0.1
mkdirp: 0.5.6
node-fetch: 2.6.12
node-fetch: 2.6.12(encoding@0.1.13)
progress: 2.0.3
proxy-from-env: 1.1.0
which: 2.0.2
@@ -3180,7 +3186,6 @@ packages:
dependencies:
'@types/node': 20.4.10
form-data: 3.0.1
dev: false
/@types/node@18.16.0:
resolution: {integrity: sha512-BsAaKhB+7X+H4GnSjGhJG9Qi8Tw+inU9nJDwmD5CgOmBLEI6ArdhikpLX7DjbjDRDTbqZzU2LSQNZg8WGPiSZQ==}
@@ -3831,7 +3836,6 @@ packages:
/asynckit@0.4.0:
resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==}
dev: false
/available-typed-arrays@1.0.5:
resolution: {integrity: sha512-DMD0KiN46eipeziST1LPP/STfDU0sufISXmjSgvVsoU2tqxctQeASejWcfNtxYKqETM1UxQ8sp2OrSBWpHY6sw==}
@@ -4222,7 +4226,6 @@ packages:
engines: {node: '>= 0.8'}
dependencies:
delayed-stream: 1.0.0
dev: false
/comma-separated-tokens@1.0.8:
resolution: {integrity: sha512-GHuDRO12Sypu2cV70d1dkA2EUmXHgntrzbpvOB+Qy+49ypNfGgFQIC2fhhXbnyrJRynDCAARsT7Ou0M6hirpfw==}
@@ -4507,11 +4510,6 @@ packages:
assert-plus: 1.0.0
dev: false
/data-uri-to-buffer@4.0.1:
resolution: {integrity: sha512-0R9ikRb668HB7QDxT1vkpuUBtqc53YyAwMwGeUFKRojY/NWKvdZ+9UYtRfGmhqNbRkTSVpMbmyhXipFFv2cb/A==}
engines: {node: '>= 12'}
dev: false
/date-fns@2.30.0:
resolution: {integrity: sha512-fnULvOpxnC5/Vg3NCiWelDsLiUc9bRwAPs/+LfTLNvetFCtCTN+yQz15C/fs4AwX1R9K5GLtLfn8QW+dWisaAw==}
engines: {node: '>=0.11'}
@@ -4595,7 +4593,6 @@ packages:
/delayed-stream@1.0.0:
resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==}
engines: {node: '>=0.4.0'}
dev: false
/depd@1.1.2:
resolution: {integrity: sha512-7emPTl6Dpo6JRXOXjLRxck+FlLRX5847cLKEn00PLAgc3g2hTZZgr+e4c2v6QpSmLeFP3n5yUo7ft6avBK/5jQ==}
@@ -4729,6 +4726,12 @@ packages:
engines: {node: '>= 0.8'}
dev: false
/encoding@0.1.13:
resolution: {integrity: sha512-ETBauow1T35Y/WZMkio9jiM0Z5xjHHmJ4XmjZOq1l/dXz3lr2sRn87nJy20RupqSh1F2m3HHPSp8ShIPQJrJ3A==}
dependencies:
iconv-lite: 0.6.3
dev: false
/engine.io-client@6.5.2:
resolution: {integrity: sha512-CQZqbrpEYnrpGqC07a9dJDz4gePZUgTPMU3NKJPSeQOyw27Tst4Pl3FemKoFGAlHzgZmKjoRmiJvbWfhCXUlIg==}
dependencies:
@@ -5399,14 +5402,6 @@ packages:
format: 0.2.2
dev: false
/fetch-blob@3.2.0:
resolution: {integrity: sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ==}
engines: {node: ^12.20 || >= 14.13}
dependencies:
node-domexception: 1.0.0
web-streams-polyfill: 3.2.1
dev: false
/fflate@0.4.8:
resolution: {integrity: sha512-FJqqoDBR00Mdj9ppamLa/Y7vxm+PRmNWA67N846RvsoYVMKB4q3y/de5PA7gUmRMYK/8CMz2GDZQmCRN1wBcWA==}
dev: false
@@ -5522,7 +5517,6 @@ packages:
asynckit: 0.4.0
combined-stream: 1.0.8
mime-types: 2.1.35
dev: false
/form-data@4.0.0:
resolution: {integrity: sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==}
@@ -5546,13 +5540,6 @@ packages:
web-streams-polyfill: 4.0.0-beta.3
dev: false
/formdata-polyfill@4.0.10:
resolution: {integrity: sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==}
engines: {node: '>=12.20.0'}
dependencies:
fetch-blob: 3.2.0
dev: false
/forwarded@0.2.0:
resolution: {integrity: sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==}
engines: {node: '>= 0.6'}
@@ -5968,6 +5955,13 @@ packages:
safer-buffer: 2.1.2
dev: false
/iconv-lite@0.6.3:
resolution: {integrity: sha512-4fCk79wshMdzMp2rH06qWrJE4iolqLhCUH+OiuIgU++RB0+94NlDL81atO7GX55uUKueo0txHNtvEyI6D7WdMw==}
engines: {node: '>=0.10.0'}
dependencies:
safer-buffer: 2.1.2
dev: false
/ignore@5.2.4:
resolution: {integrity: sha512-MAb38BcSbH0eHNBxn7ql2NH/kX33OkB3lZ1BNdh7ENeRChHTYsTvWrMubiIAMNS2llXEEgZ1MUOBtXChP3kaFQ==}
engines: {node: '>= 4'}
@@ -6259,7 +6253,7 @@ packages:
resolution: {integrity: sha512-7vuh85V5cdDofPyxn58nrPjBktZo0u9x1g8WtjQol+jZDaE+fhN+cIvTj11GndBnMnyfrUOG1sZQxCdjKh+DKg==}
engines: {node: '>= 10.13.0'}
dependencies:
'@types/node': 18.16.0
'@types/node': 20.4.10
merge-stream: 2.0.0
supports-color: 8.1.1
@@ -6859,7 +6853,7 @@ packages:
engines: {node: '>=10.5.0'}
dev: false
/node-fetch@2.6.12:
/node-fetch@2.6.12(encoding@0.1.13):
resolution: {integrity: sha512-C/fGU2E8ToujUivIO0H+tpQ6HWo4eEmchoPIoXtxCrVghxdKq+QOHqEZW7tuP3KlV3bC8FRMO5nMCC7Zm1VP6g==}
engines: {node: 4.x || >=6.0.0}
peerDependencies:
@@ -6868,18 +6862,10 @@ packages:
encoding:
optional: true
dependencies:
encoding: 0.1.13
whatwg-url: 5.0.0
dev: false
/node-fetch@3.3.2:
resolution: {integrity: sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==}
engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0}
dependencies:
data-uri-to-buffer: 4.0.1
fetch-blob: 3.2.0
formdata-polyfill: 4.0.10
dev: false
/node-mocks-http@1.12.2:
resolution: {integrity: sha512-xhWwC0dh35R9rf0j3bRZXuISXdHxxtMx0ywZQBwjrg3yl7KpRETzogfeCamUIjltpn0Fxvs/ZhGJul1vPLrdJQ==}
engines: {node: '>=0.6'}
@@ -7027,7 +7013,7 @@ packages:
- debug
dev: false
/openai@4.0.0-beta.7:
/openai@4.0.0-beta.7(encoding@0.1.13):
resolution: {integrity: sha512-jHjwvpMuGkNxiQ3erwLZsOvPEhcVrMtwtfNeYmGCjhbdB+oStVw/7pIhIPkualu8rlhLwgMR7awknIaN3IQcOA==}
dependencies:
'@types/node': 18.16.0
@@ -7037,7 +7023,7 @@ packages:
digest-fetch: 1.3.0
form-data-encoder: 1.7.2
formdata-node: 4.4.1
node-fetch: 2.6.12
node-fetch: 2.6.12(encoding@0.1.13)
transitivePeerDependencies:
- encoding
dev: false
@@ -9137,11 +9123,6 @@ packages:
glob-to-regexp: 0.4.1
graceful-fs: 4.2.11
/web-streams-polyfill@3.2.1:
resolution: {integrity: sha512-e0MO3wdXWKrLbL0DgGnUV7WHVuw9OUvL4hjgnPkIeEvESk74gAITi5G606JtZPp39cd8HA9VQzCIvA49LpPN5Q==}
engines: {node: '>= 8'}
dev: false
/web-streams-polyfill@4.0.0-beta.3:
resolution: {integrity: sha512-QW95TCTaHmsYfHDybGMwO5IJIM93I/6vTRk+daHTWFPhwh+C8Cg7j7XyKrwrj8Ib6vYXe0ocYNrmzY4xAAN6ug==}
engines: {node: '>= 14'}

View File

@@ -2,27 +2,23 @@ databases:
- name: querykey-prod
databaseName: querykey_prod
user: querykey
plan: starter
plan: standard
services:
- type: web
name: querykey-prod-web
env: docker
runtime: docker
dockerfilePath: ./app/Dockerfile
dockerContext: .
plan: standard
plan: pro
domains:
- app.openpipe.ai
envVars:
- key: NODE_ENV
value: production
- key: DATABASE_URL
fromDatabase:
name: querykey-prod
property: connectionString
- fromGroup: querykey-prod
- key: NEXT_PUBLIC_SOCKET_URL
value: https://querykey-prod-wss.onrender.com
# Render support says we need to manually set this because otherwise
# sometimes it checks a different random port that NextJS opens for
# liveness and the liveness check fails.
@@ -31,8 +27,22 @@ services:
- type: web
name: querykey-prod-wss
env: docker
runtime: docker
dockerfilePath: ./app/Dockerfile
dockerContext: .
plan: free
dockerCommand: pnpm tsx src/wss-server.ts
- type: worker
name: querykey-prod-worker
runtime: docker
dockerfilePath: ./app/Dockerfile
dockerContext: .
plan: pro
dockerCommand: /code/app/scripts/run-workers-prod.sh
envVars:
- key: DATABASE_URL
fromDatabase:
name: querykey-prod
property: connectionString
- fromGroup: querykey-prod