Files
OpenPipe-llm/client-libs/python/openpipe/openai.py
Kyle Corbitt 8ed47eb4dd Add a python client library
We still don't have any documentation and things are in flux, but you can report your OpenAI API calls to OpenPipe now.
2023-08-11 16:54:50 -07:00

165 lines
5.3 KiB
Python

import openai as original_openai
import time
import inspect
from openpipe.merge_openai_chunks import merge_streamed_chunks
from .shared import report_async, report
class ChatCompletionWrapper:
def __getattr__(self, name):
return getattr(original_openai.ChatCompletion, name)
def __setattr__(self, name, value):
return setattr(original_openai.ChatCompletion, name, value)
@classmethod
def create(cls, *args, **kwargs):
openpipe_options = kwargs.pop("openpipe", {})
requested_at = int(time.time() * 1000)
try:
chat_completion = original_openai.ChatCompletion.create(*args, **kwargs)
if inspect.isgenerator(chat_completion):
def _gen():
assembled_completion = None
for chunk in chat_completion:
assembled_completion = merge_streamed_chunks(
assembled_completion, chunk
)
yield chunk
received_at = int(time.time() * 1000)
report(
openpipe_options=openpipe_options,
requested_at=requested_at,
received_at=received_at,
req_payload=kwargs,
resp_payload=assembled_completion,
status_code=200,
)
return _gen()
else:
received_at = int(time.time() * 1000)
report(
openpipe_options=openpipe_options,
requested_at=requested_at,
received_at=received_at,
req_payload=kwargs,
resp_payload=chat_completion,
status_code=200,
)
return chat_completion
except Exception as e:
received_at = int(time.time() * 1000)
if isinstance(e, original_openai.OpenAIError):
report(
openpipe_options=openpipe_options,
requested_at=requested_at,
received_at=received_at,
req_payload=kwargs,
resp_payload=e.json_body,
error_message=str(e),
status_code=e.http_status,
)
else:
report(
openpipe_options=openpipe_options,
requested_at=requested_at,
received_at=received_at,
req_payload=kwargs,
error_message=str(e),
)
raise e
@classmethod
async def acreate(cls, *args, **kwargs):
openpipe_options = kwargs.pop("openpipe", {})
requested_at = int(time.time() * 1000)
try:
chat_completion = original_openai.ChatCompletion.acreate(*args, **kwargs)
if inspect.isgenerator(chat_completion):
def _gen():
assembled_completion = None
for chunk in chat_completion:
assembled_completion = merge_streamed_chunks(
assembled_completion, chunk
)
yield chunk
received_at = int(time.time() * 1000)
report_async(
openpipe_options=openpipe_options,
requested_at=requested_at,
received_at=received_at,
req_payload=kwargs,
resp_payload=assembled_completion,
status_code=200,
)
return _gen()
else:
received_at = int(time.time() * 1000)
report_async(
openpipe_options=openpipe_options,
requested_at=requested_at,
received_at=received_at,
req_payload=kwargs,
resp_payload=chat_completion,
status_code=200,
)
return chat_completion
except Exception as e:
received_at = int(time.time() * 1000)
if isinstance(e, original_openai.OpenAIError):
report_async(
openpipe_options=openpipe_options,
requested_at=requested_at,
received_at=received_at,
req_payload=kwargs,
resp_payload=e.json_body,
error_message=str(e),
status_code=e.http_status,
)
else:
report_async(
openpipe_options=openpipe_options,
requested_at=requested_at,
received_at=received_at,
req_payload=kwargs,
error_message=str(e),
)
raise e
class OpenAIWrapper:
ChatCompletion = ChatCompletionWrapper()
def __getattr__(self, name):
return getattr(original_openai, name)
def __setattr__(self, name, value):
return setattr(original_openai, name, value)
def __dir__(self):
return dir(original_openai) + ["openpipe_base_url", "openpipe_api_key"]