Compare commits

...

8 Commits

Author SHA1 Message Date
Mark Backman
10e31958fd Add flush_audio() to LMNTTTSService 2025-03-07 15:42:31 -05:00
Mark Backman
373ffc48b6 Add placeholders for Gladia's set_model and set_language methods 2025-03-07 15:38:23 -05:00
Mark Backman
5ae6229c03 Add flush_audio() to PlayHTTTSService 2025-03-07 15:26:47 -05:00
Mark Backman
615cbe966a Add set_model to AssemblySTTService 2025-03-07 15:18:10 -05:00
Mark Backman
3caccab608 Azure STT and TTS: Satisfy requirements for set_model, set_language, flush_audio 2025-03-07 15:07:50 -05:00
Mark Backman
78d3c77369 Added a flush_audio method to FishTTSService 2025-03-07 14:28:19 -05:00
Mark Backman
44b3e6cefa Fix: GoogleSTTService's set_languages method should be set_language to satisfy base class requirement 2025-03-07 14:28:19 -05:00
Mark Backman
20ea073398 Move flush_audio to WebsocketTTSService base class 2025-03-07 14:28:19 -05:00
10 changed files with 131 additions and 9 deletions

View File

@@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added a `flush_audio()` method to `AzureTTSService`, `FishTTSService`,
`PlayHTTTSService`, and `LMNTTTSService`.
- Added `set_language()` and `set_model()` to `AzureSTTService`,
`AssemblySTTService`, and `GladiaSTTService`.
- Added `on_user_turn_audio_data` and `on_bot_turn_audio_data` to
`AudioBufferProcessor`. This gives the ability to grab the audio of only that
turn for both the user and the bot.
@@ -65,6 +71,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `AzureRealtimeBetaLLMService` to support Azure's OpeanAI Realtime API. Added
foundational example `19a-azure-realtime-beta.py`.
### Changed
- Moved `flush_audio()` from the `TTSService` base class to
`WebsocketTTSService`.
### Fixed
- Fixed an issue in `GoogleSTTService`, where it didn't have a `set_language`
function. This required a name change from `set_languages` to `set_language`.
## [0.0.58] - 2025-02-26
### Added

View File

@@ -21,6 +21,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transcriptions.language import Language
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)

View File

@@ -270,10 +270,6 @@ class TTSService(AIService):
def set_voice(self, voice: str):
self._voice_id = voice
@abstractmethod
async def flush_audio(self):
pass
# Converts the text to audio.
@abstractmethod
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
@@ -539,6 +535,11 @@ class WebsocketTTSService(TTSService, WebsocketService):
TTSService.__init__(self, **kwargs)
WebsocketService.__init__(self)
@abstractmethod
async def flush_audio(self):
"""Flush any buffered audio in this websocket-based TTS service."""
pass
class InterruptibleTTSService(WebsocketTTSService):
"""This is a base class for websocket-based TTS services that don't support

View File

@@ -57,6 +57,17 @@ class AssemblyAISTTService(STTService):
logger.info(f"Switching STT language to: [{language}]")
self._settings["language"] = language
async def set_model(self, model: str):
"""Set the speech recognition model name for metrics.
Args:
model: Model name for metrics tracking
"""
self.set_model_name(model)
logger.info(
f"Set model name to '{model}' for metrics (AssemblyAI real-time API doesn't support model selection)"
)
async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()
@@ -90,7 +101,6 @@ class AssemblyAISTTService(STTService):
This method sets up the necessary callback functions and initializes the
AssemblyAI transcriber.
"""
if self._transcriber:
return

View File

@@ -563,6 +563,22 @@ class AzureTTSService(AzureBaseTTSService):
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
async def flush_audio(self):
"""Flush any pending audio in the queue.
This method clears the audio queue and ensures any pending synthesis
is properly cleaned up.
"""
logger.trace(f"{self}: Flushing audio queue")
# Clear the queue
while not self._audio_queue.empty():
try:
self._audio_queue.get_nowait()
self._audio_queue.task_done()
except asyncio.QueueEmpty:
break
def _handle_synthesizing(self, evt):
"""Handle audio chunks as they arrive"""
if evt.result and evt.result.audio_data:
@@ -577,9 +593,6 @@ class AzureTTSService(AzureBaseTTSService):
logger.error(f"Speech synthesis canceled: {evt.result.cancellation_details.reason}")
self._audio_queue.put_nowait(None)
async def flush_audio(self):
logger.trace(f"{self}: flushing audio")
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
@@ -692,6 +705,55 @@ class AzureSTTService(STTService):
self._audio_stream = None
self._speech_recognizer = None
async def set_language(self, language: Language):
"""Set the language for speech recognition.
Args:
language: The language to use for recognition
"""
azure_language = language_to_azure_language(language)
if not azure_language:
logger.warning(f"Unsupported language for Azure STT: {language}")
return
logger.info(f"Switching STT language to: [{language}] ({azure_language})")
# Update the speech config language
self._speech_config.speech_recognition_language = azure_language
# Disconnect and reconnect to apply the changes
if self._speech_recognizer:
# Stop the current recognizer
self._speech_recognizer.stop_continuous_recognition_async()
self._speech_recognizer = None
# Reconnect with new settings
if self._audio_stream:
audio_config = AudioConfig(stream=self._audio_stream)
self._speech_recognizer = SpeechRecognizer(
speech_config=self._speech_config, audio_config=audio_config
)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
logger.debug("Reconnected with new language settings")
async def set_model(self, model: str):
"""Set the speech recognition model.
In Azure Speech Service, there isn't a direct concept of switching between
named models. This method sets the model name for metrics purposes and
logs a message, but doesn't change the actual recognition behavior.
To customize recognition behavior, use speech_config properties instead.
Args:
model: Model name for metrics
"""
self.set_model_name(model)
logger.info(
f"Set model name to '{model}' for metrics (Azure STT doesn't support model switching)"
)
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
await self.start_processing_metrics()
if self._audio_stream:

View File

@@ -148,6 +148,14 @@ class FishAudioTTSService(InterruptibleTTSService):
except Exception as e:
logger.error(f"Error closing websocket: {e}")
async def flush_audio(self):
"""Flush any buffered audio by sending a flush event to Fish Audio."""
logger.trace(f"{self}: Flushing audio buffers")
if not self._websocket:
return
flush_message = {"event": "flush"}
await self._get_websocket().send(ormsgpack.packb(flush_message))
def _get_websocket(self):
if self._websocket:
return self._websocket

View File

@@ -179,6 +179,17 @@ class GladiaSTTService(STTService):
def language_to_service_language(self, language: Language) -> Optional[str]:
return language_to_gladia_language(language)
async def set_language(self, language: Language):
"""Placeholder: Set the speech recognition language."""
logger.info(f"Set language name to '{language}' has not been implemented yet")
async def set_model(self, model: str):
"""Set the speech recognition model."""
self.set_model_name(model)
logger.info(
f"Set model name to '{model}' for metrics (Gladia doesn't support model switching)"
)
async def start(self, frame: StartFrame):
await super().start(frame)
if self._websocket:

View File

@@ -1727,7 +1727,7 @@ class GoogleSTTService(STTService):
await self._disconnect()
await self._connect()
async def set_languages(self, languages: List[Language]):
async def set_language(self, languages: List[Language]):
"""Update the service's recognition languages.
Args:

View File

@@ -170,6 +170,11 @@ class LmntTTSService(InterruptibleTTSService):
return self._websocket
raise Exception("Websocket not connected")
async def flush_audio(self):
if not self._websocket:
return
await self._get_websocket().send(json.dumps({"flush": True}))
async def _receive_messages(self):
"""Receive messages from LMNT websocket."""
async for message in self._get_websocket():

View File

@@ -241,6 +241,14 @@ class PlayHTTTSService(InterruptibleTTSService):
await self.stop_all_metrics()
self._request_id = None
async def flush_audio(self):
"""Flush any pending audio in the buffer.
PlayHT's API doesn't provide a mechanism for flushing audio buffers,
so this method is a no-op.
"""
logger.trace(f"{self}: flush_audio is a no-op for PlayHT")
async def _receive_messages(self):
async for message in self._get_websocket():
if isinstance(message, bytes):