services(tts): always send received TextFrame downstream

This commit is contained in:
Aleix Conchillo Flaqué
2024-05-17 17:11:11 -07:00
parent 8dc81042c3
commit 455ec4f1fd
4 changed files with 14 additions and 13 deletions

View File

@@ -72,8 +72,6 @@ class LLMResponseAggregator(FrameProcessor):
if isinstance(frame, self._start_frame):
self._seen_start_frame = True
self._aggregating = True
await self.push_frame(frame, direction)
elif isinstance(frame, self._end_frame):
self._seen_end_frame = True
@@ -85,8 +83,6 @@ class LLMResponseAggregator(FrameProcessor):
# Send the aggregation if we are not aggregating anymore (i.e. no
# more interim results received).
send_aggregation = not self._aggregating
await self.push_frame(frame, direction)
elif isinstance(frame, self._accumulator_frame):
if self._aggregating:
self._aggregation += f" {frame.text}"

View File

@@ -89,8 +89,6 @@ class ResponseAggregator(FrameProcessor):
if isinstance(frame, self._start_frame):
self._seen_start_frame = True
self._aggregating = True
await self.push_frame(frame, direction)
elif isinstance(frame, self._end_frame):
self._seen_end_frame = True
@@ -102,8 +100,6 @@ class ResponseAggregator(FrameProcessor):
# Send the aggregation if we are not aggregating anymore (i.e. no
# more interim results received).
send_aggregation = not self._aggregating
await self.push_frame(frame, direction)
elif isinstance(frame, self._accumulator_frame):
if self._aggregating:
self._aggregation += f" {frame.text}"

View File

@@ -18,6 +18,8 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
TTSStartedFrame,
TTSStoppedFrame,
TextFrame,
VisionImageRawFrame,
)
@@ -69,14 +71,20 @@ class TTSService(AIService):
self._current_sentence = ""
if text:
await self.process_generator(self.run_tts(text))
await self._push_tts_frames(text)
async def _push_tts_frames(self, text: str):
await self.push_frame(TextFrame(text))
await self.push_frame(TTSStartedFrame())
await self.process_generator(self.run_tts(text))
await self.push_frame(TTSStoppedFrame())
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, TextFrame):
await self._process_text_frame(frame)
elif isinstance(frame, EndFrame):
if self._current_sentence:
await self.process_generator(self.run_tts(self._current_sentence))
await self._push_tts_frames(self._current_sentence)
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
@@ -154,6 +162,8 @@ class STTService(AIService):
self._wave.close()
await self.push_frame(frame, direction)
elif isinstance(frame, AudioRawFrame):
# In this service we accumulate audio internally and at the end we
# push a TextFrame. We don't really want to push audio frames down.
await self._append_audio(frame)
else:
await self.push_frame(frame, direction)
@@ -171,6 +181,7 @@ class ImageGenService(AIService):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, TextFrame):
await self.push_frame(frame, direction)
await self.process_generator(self.run_image_gen(frame.text))
else:
await self.push_frame(frame, direction)

View File

@@ -8,7 +8,7 @@ import aiohttp
from typing import AsyncGenerator
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, TTSStartedFrame, TTSStoppedFrame
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, TTSStartedFrame, TTSStoppedFrame, TextFrame
from pipecat.services.ai_services import TTSService
from loguru import logger
@@ -53,9 +53,7 @@ class ElevenLabsTTSService(TTSService):
yield ErrorFrame(f"Audio fetch status code: {r.status}, error: {r.text}")
return
yield TTSStartedFrame()
async for chunk in r.content:
if len(chunk) > 0:
frame = AudioRawFrame(chunk, 16000, 1)
yield frame
yield TTSStoppedFrame()