Files
pipecat/message_handler/message_handler.py
Moishe Lettvin e724720e76 Getting started
2023-12-25 19:09:11 -05:00

145 lines
4.7 KiB
Python

import logging
import time
from dataclasses import dataclass
from queue import Queue, Empty
from threading import Thread
from storage.search import SearchIndexer
from services.ai_services import AIServiceConfig
@dataclass
class Message:
type: str
timestamp: float
message: str
class MessageHandler:
def __init__(self, intro):
self.messages = [Message("system", time.time(), intro)]
self.last_user_message_idx = None
def add_user_message(self, message):
if (self.last_user_message_idx is not None and self.last_user_message_idx != self.finalized_user_message_idx):
previous_message = self.messages[self.last_user_message_idx].message
self.messages[self.last_user_message_idx] = Message(
"user", time.time(), ' '.join([previous_message, message])
)
self.messages = self.messages[: self.last_user_message_idx + 1]
else:
self.messages.append(Message("user", time.time(), message))
self.last_user_message_idx = len(self.messages) - 1
def add_assistant_message(self, message):
if self.messages[-1].type == "assistant":
self.messages[-1].message += " " + message
else:
self.messages.append(Message("assistant", time.time(), message))
def add_assistant_messages(self, messages):
self.messages.append(Message("assistant", time.time(), " ".join(messages)))
def get_llm_messages(self):
return [{"role": m.type, "content": m.message} for m in self.messages]
def finalize_user_message(self):
pass
def shutdown(self):
pass
class IndexingMessageHandler(MessageHandler):
def __init__(
self, intro, services: AIServiceConfig, indexer: SearchIndexer
) -> None:
super().__init__(intro)
self.services = services
self.search_indexer = indexer
self.last_written_idx = 0
self.index_message_queue = Queue()
self.index_writer_thread = Thread(target=self.indexer_writer, daemon=True)
self.index_writer_thread.start()
self.finalized_user_message_idx = None
self.logger = logging.getLogger("bot-instance")
def shutdown(self):
self.finalize_user_message()
self.index_message_queue.put(None)
self.index_writer_thread.join()
def indexer_writer(self):
while True:
try:
message_idx = self.index_message_queue.get()
self.index_message_queue.task_done()
if message_idx is None:
return
if message_idx <= self.last_written_idx:
continue
self.last_written_idx = message_idx
message = self.messages[message_idx]
content = message.message
if message.type == "user":
content = self.cleanup_user_message(content)
# sometimes the LLM returns a string wrapped in quotes and sometimes it doesn't.
# if it didn't, wrap it in quotes
if content[0] != '"':
content = '"' + content + '"'
self.search_indexer.index_text(content)
except Empty:
pass
def cleanup_user_message(self, user_message) -> str:
messages = [
{
"role": "system",
"content": """
You are an assistant who is very good at making transcriptions
of human speech into well-capitalized and punctuated text, without
changing any words or the order of the words. Please change this
transcription to something suitable for the printed page.
""",
},
{"role": "user", "content": user_message},
]
result = self.services.llm.run_llm(messages)
if result:
user_message = result
return user_message
def finalize_user_message(self):
self.finalized_user_message_idx = self.last_user_message_idx
self.write_messages_to_index()
def write_messages_to_index(self):
if self.finalized_user_message_idx is None:
return
for idx in range(self.last_written_idx, len(self.messages)):
self.logger.info(
f"writing to index: {self.messages[idx].type} {self.messages[idx].message}"
)
if (
self.messages[idx].type == "user"
and idx > self.finalized_user_message_idx
):
break
if self.messages[idx].type != "system":
self.index_message_queue.put(idx)