a little outputqueue refactoring

This commit is contained in:
Moishe Lettvin
2023-12-29 20:36:17 -05:00
parent 5c7e8eba61
commit f2218d21c3
6 changed files with 66 additions and 58 deletions

View File

@@ -4,14 +4,14 @@ import re
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from queue import Queue, PriorityQueue, Empty
from threading import Event, Semaphore, Thread
from typing import Any, Generator, Iterator, Optional, Type, TypedDict
from typing import Any, Generator, Iterator, Optional, Type
from dailyai.services.ai_services import AIServiceConfig
from dailyai.output_queue import OutputQueueFrame, FrameType
from dailyai.message_handler.message_handler import MessageHandler
frame_idx = 0
from dailyai.services.ai_services import AIServiceConfig
class AsyncProcessorState:
# Setting class variables, other synchronous activities
@@ -211,6 +211,9 @@ class AsyncProcessor:
def do_finalization(self) -> None:
pass
# A common class for responses that use a message queue and
# an output queue.
class OrchestratorResponse(AsyncProcessor):
def __init__(
@@ -265,10 +268,10 @@ class LLMResponse(OrchestratorResponse):
if out.strip():
yield out.strip()
def get_frames_from_tts_response(self, audio_frame) -> list[dict[str, Any]]:
return [{"type": "audio_frame", "data": audio_frame}]
def get_frames_from_tts_response(self, audio_frame) -> list[OutputQueueFrame]:
return [OutputQueueFrame(FrameType.AUDIO_FRAME, audio_frame)]
def get_frames_from_chunk(self, chunk) -> Generator[list[dict[str, Any]], Any, None]:
def get_frames_from_chunk(self, chunk) -> Generator[list[OutputQueueFrame], Any, None]:
for audio_frame in self.services.tts.run_tts(chunk):
yield self.get_frames_from_tts_response(audio_frame)
@@ -299,14 +302,13 @@ class LLMResponse(OrchestratorResponse):
]:
break
prepared_chunk = self.chunks_in_preparation.get()
if prepared_chunk[0] is None:
if prepared_chunk[0] == None:
return
self.play_prepared_chunk(prepared_chunk)
def play_prepared_chunk(self, prepared_chunk) -> None:
chunk, tts_generator = prepared_chunk
global frame_idx
for frames in tts_generator:
if self.state not in [
AsyncProcessorState.READY,
@@ -315,14 +317,11 @@ class LLMResponse(OrchestratorResponse):
break
if not self.has_sent_first_frame:
self.output_queue.put({"type": "start_stream", "idx": frame_idx})
frame_idx += 1
self.output_queue.put(OutputQueueFrame(FrameType.START_STREAM, None))
self.has_sent_first_frame = True
for frame in frames:
frame["idx"] = frame_idx
self.output_queue.put(frame)
frame_idx += 1
self.output_queue.join()
self.llm_responses.append(chunk)