Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 153 additions & 76 deletions interpreter/core/async_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,65 +103,103 @@ async def output(self):
return await self.output_queue.async_q.get()

def respond(self, run_code=None):
try:
if run_code == None:
run_code = self.auto_run

for chunk_og in self._respond_and_store():
chunk = (
chunk_og.copy()
) # This fixes weird double token chunks. Probably a deeper problem?

if chunk["type"] == "confirmation":
if run_code:
run_code = False
continue
else:
break

if self.stop_event.is_set():
return
for attempt in range(5): # 5 attempts
try:
if run_code == None:
run_code = self.auto_run

if self.print:
if "start" in chunk:
print("\n")
if chunk["type"] in ["code", "console"] and "format" in chunk:
if "start" in chunk:
print("\n------------\n\n```" + chunk["format"], flush=True)
if "end" in chunk:
print("\n```\n\n------------\n\n", flush=True)
if chunk.get("format") != "active_line":
if "format" in chunk and "base64" in chunk["format"]:
print("\n[An image was produced]")
else:
content = chunk.get("content", "")
content = (
str(content).encode("ascii", "ignore").decode("ascii")
)
print(content, end="", flush=True)
sent_chunks = False

if self.debug:
print("Interpreter produced this chunk:", chunk)
for chunk_og in self._respond_and_store():
chunk = (
chunk_og.copy()
) # This fixes weird double token chunks. Probably a deeper problem?

self.output_queue.sync_q.put(chunk)
if chunk["type"] == "confirmation":
if run_code:
run_code = False
continue
else:
break

self.output_queue.sync_q.put(complete_message)
if self.stop_event.is_set():
return

if self.print or self.debug:
print("\nServer response complete.\n")
if self.print:
if "start" in chunk:
print("\n")
if chunk["type"] in ["code", "console"] and "format" in chunk:
if "start" in chunk:
print(
"\n------------\n\n```" + chunk["format"],
flush=True,
)
if "end" in chunk:
print("\n```\n\n------------\n\n", flush=True)
if chunk.get("format") != "active_line":
if "format" in chunk and "base64" in chunk["format"]:
print("\n[An image was produced]")
else:
content = chunk.get("content", "")
content = (
str(content)
.encode("ascii", "ignore")
.decode("ascii")
)
print(content, end="", flush=True)

if self.debug:
print("Interpreter produced this chunk:", chunk)

self.output_queue.sync_q.put(chunk)
sent_chunks = True

if not sent_chunks:
print("ERROR. NO CHUNKS SENT. TRYING AGAIN.")
print("Messages:", self.messages)
messages = [
"Hello? Answer please.",
"Just say something, anything.",
"Are you there?",
"Can you respond?",
"Please reply.",
]
self.messages.append(
{
"role": "user",
"type": "message",
"content": messages[attempt % len(messages)],
}
)
time.sleep(1)
else:
self.output_queue.sync_q.put(complete_message)
if self.debug:
print("\nServer response complete.\n")
return

except Exception as e:
error = traceback.format_exc() + "\n" + str(e)
error_message = {
"role": "server",
"type": "error",
"content": traceback.format_exc() + "\n" + str(e),
}
self.output_queue.sync_q.put(error_message)
self.output_queue.sync_q.put(complete_message)
print("\n\n--- SENT ERROR: ---\n\n")
print(error)
print("\n\n--- (ERROR ABOVE WAS SENT) ---\n\n")
except Exception as e:
error = traceback.format_exc() + "\n" + str(e)
error_message = {
"role": "server",
"type": "error",
"content": traceback.format_exc() + "\n" + str(e),
}
self.output_queue.sync_q.put(error_message)
self.output_queue.sync_q.put(complete_message)
print("\n\n--- SENT ERROR: ---\n\n")
print(error)
print("\n\n--- (ERROR ABOVE WAS SENT) ---\n\n")
return

error_message = {
"role": "server",
"type": "error",
"content": "No chunks sent or unknown error.",
}
self.output_queue.sync_q.put(error_message)
self.output_queue.sync_q.put(complete_message)
raise Exception("No chunks sent or unknown error.")

def accumulate(self, chunk):
"""
Expand Down Expand Up @@ -678,29 +716,51 @@ class ChatCompletionRequest(BaseModel):
stream: Optional[bool] = False

async def openai_compatible_generator():
for i, chunk in enumerate(async_interpreter._respond_and_store()):
output_content = None

if chunk["type"] == "message" and "content" in chunk:
output_content = chunk["content"]
if chunk["type"] == "code" and "start" in chunk:
output_content = " "

if output_content:
await asyncio.sleep(0)
output_chunk = {
"id": i,
"object": "chat.completion.chunk",
"created": time.time(),
"model": "open-interpreter",
"choices": [{"delta": {"content": output_content}}],
}
yield f"data: {json.dumps(output_chunk)}\n\n"
made_chunk = False

for message in [
".",
"Just say something, anything.",
"Hello? Answer please.",
"Are you there?",
"Can you respond?",
"Please reply.",
]:
for i, chunk in enumerate(
async_interpreter.chat(message=message, stream=True, display=True)
):
made_chunk = True

if async_interpreter.stop_event.is_set():
break

output_content = None

if chunk["type"] == "message" and "content" in chunk:
output_content = chunk["content"]
if chunk["type"] == "code" and "start" in chunk:
output_content = " "

if output_content:
await asyncio.sleep(0)
output_chunk = {
"id": i,
"object": "chat.completion.chunk",
"created": time.time(),
"model": "open-interpreter",
"choices": [{"delta": {"content": output_content}}],
}
yield f"data: {json.dumps(output_chunk)}\n\n"

if made_chunk:
break

@router.post("/openai/chat/completions")
async def chat_completion(request: ChatCompletionRequest):
# Convert to LMC

async_interpreter.stop_event.set()

last_message = request.messages[-1]

if last_message.role != "user":
Expand All @@ -711,18 +771,26 @@ async def chat_completion(request: ChatCompletionRequest):
return

if type(last_message.content) == str:
async_interpreter.messages.append(last_message)
if type(last_message.content) == list:
async_interpreter.messages.append(
{
"role": "user",
"type": "message",
"content": last_message.content,
}
)
print(">", last_message.content)
elif type(last_message.content) == list:
for content in last_message.content:
if content["type"] == "text":
async_interpreter.messages.append(
{"role": "user", "type": "message", "content": content}
{"role": "user", "type": "message", "content": str(content)}
)
print(">", content)
elif content["type"] == "image_url":
if "url" not in content["image_url"]:
raise Exception("`url` must be in `image_url`.")
url = content["image_url"]["url"]
print(url[:100])
print("> [user sent an image]", url[:100])
if "base64," not in url:
raise Exception(
'''Image must be in the format: "data:image/jpeg;base64,{base64_image}"'''
Expand All @@ -741,12 +809,21 @@ async def chat_completion(request: ChatCompletionRequest):
}
)

if os.getenv("INTERPRETER_SERVER_REQUIRE_START", False):
if last_message.content != "{START}":
return
if async_interpreter.messages[-1]["content"] == "{START}":
# Remove that {START} message that would have just been added
async_interpreter.messages = async_interpreter.messages[:-1]

async_interpreter.stop_event.clear()

if request.stream:
return StreamingResponse(
openai_compatible_generator(), media_type="application/x-ndjson"
)
else:
messages = async_interpreter.chat(message="", stream=False, display=True)
messages = async_interpreter.chat(message=".", stream=False, display=True)
content = messages[-1]["content"]
return {
"id": "200",
Expand Down
Loading