diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 82288a4e9..5f23a9ed1 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -72,8 +72,6 @@ class LLMResponseAggregator(FrameProcessor): if isinstance(frame, self._start_frame): self._seen_start_frame = True self._aggregating = True - - await self.push_frame(frame, direction) elif isinstance(frame, self._end_frame): self._seen_end_frame = True @@ -85,8 +83,6 @@ class LLMResponseAggregator(FrameProcessor): # Send the aggregation if we are not aggregating anymore (i.e. no # more interim results received). send_aggregation = not self._aggregating - - await self.push_frame(frame, direction) elif isinstance(frame, self._accumulator_frame): if self._aggregating: self._aggregation += f" {frame.text}" diff --git a/src/pipecat/processors/aggregators/user_response.py b/src/pipecat/processors/aggregators/user_response.py index ae063b3cd..5c6520f1f 100644 --- a/src/pipecat/processors/aggregators/user_response.py +++ b/src/pipecat/processors/aggregators/user_response.py @@ -89,8 +89,6 @@ class ResponseAggregator(FrameProcessor): if isinstance(frame, self._start_frame): self._seen_start_frame = True self._aggregating = True - - await self.push_frame(frame, direction) elif isinstance(frame, self._end_frame): self._seen_end_frame = True @@ -102,8 +100,6 @@ class ResponseAggregator(FrameProcessor): # Send the aggregation if we are not aggregating anymore (i.e. no # more interim results received). send_aggregation = not self._aggregating - - await self.push_frame(frame, direction) elif isinstance(frame, self._accumulator_frame): if self._aggregating: self._aggregation += f" {frame.text}" diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index a3cb5cde5..696c61a73 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -18,6 +18,8 @@ from pipecat.frames.frames import ( EndFrame, ErrorFrame, Frame, + TTSStartedFrame, + TTSStoppedFrame, TextFrame, VisionImageRawFrame, ) @@ -69,14 +71,20 @@ class TTSService(AIService): self._current_sentence = "" if text: - await self.process_generator(self.run_tts(text)) + await self._push_tts_frames(text) + + async def _push_tts_frames(self, text: str): + await self.push_frame(TextFrame(text)) + await self.push_frame(TTSStartedFrame()) + await self.process_generator(self.run_tts(text)) + await self.push_frame(TTSStoppedFrame()) async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): await self._process_text_frame(frame) elif isinstance(frame, EndFrame): if self._current_sentence: - await self.process_generator(self.run_tts(self._current_sentence)) + await self._push_tts_frames(self._current_sentence) await self.push_frame(frame) else: await self.push_frame(frame, direction) @@ -154,6 +162,8 @@ class STTService(AIService): self._wave.close() await self.push_frame(frame, direction) elif isinstance(frame, AudioRawFrame): + # In this service we accumulate audio internally and at the end we + # push a TextFrame. We don't really want to push audio frames down. await self._append_audio(frame) else: await self.push_frame(frame, direction) @@ -171,6 +181,7 @@ class ImageGenService(AIService): async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, TextFrame): + await self.push_frame(frame, direction) await self.process_generator(self.run_image_gen(frame.text)) else: await self.push_frame(frame, direction) diff --git a/src/pipecat/services/elevenlabs.py b/src/pipecat/services/elevenlabs.py index 0bc207aef..53660a80e 100644 --- a/src/pipecat/services/elevenlabs.py +++ b/src/pipecat/services/elevenlabs.py @@ -8,7 +8,7 @@ import aiohttp from typing import AsyncGenerator -from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, TTSStartedFrame, TTSStoppedFrame +from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, TTSStartedFrame, TTSStoppedFrame, TextFrame from pipecat.services.ai_services import TTSService from loguru import logger @@ -53,9 +53,7 @@ class ElevenLabsTTSService(TTSService): yield ErrorFrame(f"Audio fetch status code: {r.status}, error: {r.text}") return - yield TTSStartedFrame() async for chunk in r.content: if len(chunk) > 0: frame = AudioRawFrame(chunk, 16000, 1) yield frame - yield TTSStoppedFrame()