202 lines
6.7 KiB
Python
202 lines
6.7 KiB
Python
import openai as original_openai
|
|
from openai.openai_object import OpenAIObject
|
|
import time
|
|
import inspect
|
|
|
|
from openpipe.merge_openai_chunks import merge_openai_chunks
|
|
from openpipe.openpipe_meta import openpipe_meta
|
|
|
|
from .shared import (
|
|
_should_check_cache,
|
|
maybe_check_cache,
|
|
maybe_check_cache_async,
|
|
report_async,
|
|
report,
|
|
)
|
|
|
|
|
|
class WrappedChatCompletion(original_openai.ChatCompletion):
|
|
@classmethod
|
|
def create(cls, *args, **kwargs):
|
|
openpipe_options = kwargs.pop("openpipe", {})
|
|
|
|
cached_response = maybe_check_cache(
|
|
openpipe_options=openpipe_options, req_payload=kwargs
|
|
)
|
|
if cached_response:
|
|
return OpenAIObject.construct_from(cached_response, api_key=None)
|
|
|
|
requested_at = int(time.time() * 1000)
|
|
|
|
try:
|
|
chat_completion = original_openai.ChatCompletion.create(*args, **kwargs)
|
|
|
|
if inspect.isgenerator(chat_completion):
|
|
|
|
def _gen():
|
|
assembled_completion = None
|
|
for chunk in chat_completion:
|
|
assembled_completion = merge_openai_chunks(
|
|
assembled_completion, chunk
|
|
)
|
|
|
|
cache_status = (
|
|
"MISS"
|
|
if _should_check_cache(openpipe_options, kwargs)
|
|
else "SKIP"
|
|
)
|
|
chunk.openpipe = openpipe_meta(cache_status=cache_status)
|
|
|
|
yield chunk
|
|
|
|
received_at = int(time.time() * 1000)
|
|
|
|
report(
|
|
openpipe_options=openpipe_options,
|
|
requested_at=requested_at,
|
|
received_at=received_at,
|
|
req_payload=kwargs,
|
|
resp_payload=assembled_completion,
|
|
status_code=200,
|
|
)
|
|
|
|
return _gen()
|
|
else:
|
|
received_at = int(time.time() * 1000)
|
|
|
|
report(
|
|
openpipe_options=openpipe_options,
|
|
requested_at=requested_at,
|
|
received_at=received_at,
|
|
req_payload=kwargs,
|
|
resp_payload=chat_completion,
|
|
status_code=200,
|
|
)
|
|
|
|
cache_status = (
|
|
"MISS" if _should_check_cache(openpipe_options, kwargs) else "SKIP"
|
|
)
|
|
chat_completion["openpipe"] = openpipe_meta(cache_status=cache_status)
|
|
return chat_completion
|
|
except Exception as e:
|
|
received_at = int(time.time() * 1000)
|
|
|
|
if isinstance(e, original_openai.OpenAIError):
|
|
report(
|
|
openpipe_options=openpipe_options,
|
|
requested_at=requested_at,
|
|
received_at=received_at,
|
|
req_payload=kwargs,
|
|
resp_payload=e.json_body,
|
|
error_message=str(e),
|
|
status_code=e.http_status,
|
|
)
|
|
else:
|
|
report(
|
|
openpipe_options=openpipe_options,
|
|
requested_at=requested_at,
|
|
received_at=received_at,
|
|
req_payload=kwargs,
|
|
error_message=str(e),
|
|
)
|
|
|
|
raise e
|
|
|
|
@classmethod
|
|
async def acreate(cls, *args, **kwargs):
|
|
openpipe_options = kwargs.pop("openpipe", {})
|
|
|
|
cached_response = await maybe_check_cache_async(
|
|
openpipe_options=openpipe_options, req_payload=kwargs
|
|
)
|
|
if cached_response:
|
|
return OpenAIObject.construct_from(cached_response, api_key=None)
|
|
|
|
requested_at = int(time.time() * 1000)
|
|
|
|
try:
|
|
chat_completion = await original_openai.ChatCompletion.acreate(
|
|
*args, **kwargs
|
|
)
|
|
|
|
if inspect.isasyncgen(chat_completion):
|
|
|
|
async def _gen():
|
|
assembled_completion = None
|
|
async for chunk in chat_completion:
|
|
assembled_completion = merge_openai_chunks(
|
|
assembled_completion, chunk
|
|
)
|
|
cache_status = (
|
|
"MISS"
|
|
if _should_check_cache(openpipe_options, kwargs)
|
|
else "SKIP"
|
|
)
|
|
chunk.openpipe = openpipe_meta(cache_status=cache_status)
|
|
|
|
yield chunk
|
|
|
|
received_at = int(time.time() * 1000)
|
|
|
|
await report_async(
|
|
openpipe_options=openpipe_options,
|
|
requested_at=requested_at,
|
|
received_at=received_at,
|
|
req_payload=kwargs,
|
|
resp_payload=assembled_completion,
|
|
status_code=200,
|
|
)
|
|
|
|
return _gen()
|
|
else:
|
|
received_at = int(time.time() * 1000)
|
|
|
|
await report_async(
|
|
openpipe_options=openpipe_options,
|
|
requested_at=requested_at,
|
|
received_at=received_at,
|
|
req_payload=kwargs,
|
|
resp_payload=chat_completion,
|
|
status_code=200,
|
|
)
|
|
|
|
cache_status = (
|
|
"MISS" if _should_check_cache(openpipe_options, kwargs) else "SKIP"
|
|
)
|
|
chat_completion["openpipe"] = openpipe_meta(cache_status=cache_status)
|
|
|
|
return chat_completion
|
|
except Exception as e:
|
|
received_at = int(time.time() * 1000)
|
|
|
|
if isinstance(e, original_openai.OpenAIError):
|
|
await report_async(
|
|
openpipe_options=openpipe_options,
|
|
requested_at=requested_at,
|
|
received_at=received_at,
|
|
req_payload=kwargs,
|
|
resp_payload=e.json_body,
|
|
error_message=str(e),
|
|
status_code=e.http_status,
|
|
)
|
|
else:
|
|
await report_async(
|
|
openpipe_options=openpipe_options,
|
|
requested_at=requested_at,
|
|
received_at=received_at,
|
|
req_payload=kwargs,
|
|
error_message=str(e),
|
|
)
|
|
|
|
raise e
|
|
|
|
|
|
class OpenAIWrapper:
|
|
ChatCompletion = WrappedChatCompletion()
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(original_openai, name)
|
|
|
|
def __setattr__(self, name, value):
|
|
return setattr(original_openai, name, value)
|