Source code for penguin.llm
import os
import openai
from typing import List, Optional
GPT_MODEL = "gpt-4o"
KNOWLEDGE_DIR = '/docs/llm_knowledge_base'
openai.api_key = os.getenv("OPENAI_API_KEY")
PROMPTS = {
"config_graph": """Here is a configuration graph. Choose the best, unexplored config to run next. Simply return the UID string and nothing else. If no UID is present in the graph, return 'None'"""
}
[docs]
class AssistantManager:
"""
A class to manage OpenAI assistants, threads, and vector stores.
"""
def __init__(self):
"""
Initialize the AssistantManager with empty client, assistant, thread, and vector_store.
"""
self.client: Optional[openai.OpenAI] = None
self.assistant: Optional[openai.types.Assistant] = None
self.thread: Optional[openai.types.Thread] = None
self.vector_store: Optional[openai.types.VectorStore] = None
[docs]
def exists_client(self) -> bool:
"""
Check if the OpenAI client exists.
Returns:
bool: True if the client exists, False otherwise.
"""
return self.client is not None
[docs]
def exists_assistant(self) -> bool:
"""
Check if the assistant exists.
Returns:
bool: True if the assistant exists, False otherwise.
"""
return self.assistant is not None
[docs]
def create_assistant(self, name: str, instructions: str, tools: Optional[List[dict]] = None, model: str = GPT_MODEL):
"""
Create an assistant if it doesn't already exist.
Args:
name (str): The name of the assistant.
instructions (str): Instructions for the assistant.
tools (Optional[List[dict]]): List of tools for the assistant.
model (str): The model to use for the assistant.
"""
if self.exists_assistant():
return
self.assistant = self.client.beta.assistants.create(
name=name,
instructions=instructions,
tools=tools,
model=model,
)
[docs]
def create_run(self, prompt: str) -> str:
"""
Create a new thread, add a message, and start a run.
Args:
prompt (str): The prompt to send to the assistant.
Returns:
str: The ID of the created run.
"""
self.thread = self.client.beta.threads.create()
self.client.beta.threads.messages.create(thread_id=self.thread.id, role="user", content=prompt)
run = self.client.beta.threads.runs.create_and_poll(
thread_id=self.thread.id,
assistant_id=self.assistant.id,
)
len_msgs = len(list(self.client.beta.threads.messages.list(self.thread.id)))
msg = self.client.beta.threads.messages.list(self.thread.id, run_id=run.id).data[0].content[0].text.value
print(f'===== MESSAGES [{len_msgs}] =====\n{msg}\n')
return run.id
[docs]
def upload_knowledge_files(self):
"""
Upload knowledge files to the vector store and update the assistant.
"""
print('===== Knowledge Files =====')
file_paths = [os.path.join(self.KNOWLEDGE_DIR, fn) for fn in os.listdir(self.KNOWLEDGE_DIR)]
for fp in file_paths:
print(f'[FILE] {fp}')
file_streams = [open(path, 'rb') for path in file_paths]
self.vector_store = self.client.beta.vector_stores.create(name="Penguin Tool Documentation")
file_batch = self.client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=self.vector_store.id,
files=file_streams
)
print(f'\tUPLOAD STATUS: {file_batch.status} => {file_batch.file_counts}\n')
# XXX API bug, status shows that upload failed, but openai playground displays files correctly
self.assistant = self.client.beta.assistants.update(
assistant_id=self.assistant.id,
tool_resources={"file_search": {"vector_store_ids": [self.vector_store.id]}},
)
[docs]
def select_best_config(self, graph: str, prompt: str) -> str:
"""
Select the best configuration based on the given graph and prompt.
Args:
graph (str): The graph to use for configuration selection.
prompt (str): The prompt to send to the assistant.
Returns:
str: The selected configuration as a string.
"""
if not self.exists_client():
self.client = openai.OpenAI()
if not self.exists_assistant():
llm_name = "llm_rehoster"
instructions = ""
self.create_assistant(
name=llm_name,
instructions=instructions,
tools=[{"type": "file_search"}]
)
self.upload_knowledge_files()
full_prompt = f'{prompt}\n\n{graph}'
print(f'===== PROMPT =====\n{full_prompt}\n')
run_id = self.create_run(full_prompt)
uid_str = self.client.beta.threads.messages.list(self.thread.id, run_id=run_id).data[0].content[0].text.value
return uid_str