Compare commits
8 Commits
mb/dependa
...
mb/flush-a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
10e31958fd | ||
|
|
373ffc48b6 | ||
|
|
5ae6229c03 | ||
|
|
615cbe966a | ||
|
|
3caccab608 | ||
|
|
78d3c77369 | ||
|
|
44b3e6cefa | ||
|
|
20ea073398 |
16
CHANGELOG.md
16
CHANGELOG.md
@@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added a `flush_audio()` method to `AzureTTSService`, `FishTTSService`,
|
||||
`PlayHTTTSService`, and `LMNTTTSService`.
|
||||
|
||||
- Added `set_language()` and `set_model()` to `AzureSTTService`,
|
||||
`AssemblySTTService`, and `GladiaSTTService`.
|
||||
|
||||
- Added `on_user_turn_audio_data` and `on_bot_turn_audio_data` to
|
||||
`AudioBufferProcessor`. This gives the ability to grab the audio of only that
|
||||
turn for both the user and the bot.
|
||||
@@ -65,6 +71,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Added `AzureRealtimeBetaLLMService` to support Azure's OpeanAI Realtime API. Added
|
||||
foundational example `19a-azure-realtime-beta.py`.
|
||||
|
||||
### Changed
|
||||
|
||||
- Moved `flush_audio()` from the `TTSService` base class to
|
||||
`WebsocketTTSService`.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue in `GoogleSTTService`, where it didn't have a `set_language`
|
||||
function. This required a name change from `set_languages` to `set_language`.
|
||||
|
||||
## [0.0.58] - 2025-02-26
|
||||
|
||||
### Added
|
||||
|
||||
@@ -21,6 +21,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.gladia import GladiaSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -270,10 +270,6 @@ class TTSService(AIService):
|
||||
def set_voice(self, voice: str):
|
||||
self._voice_id = voice
|
||||
|
||||
@abstractmethod
|
||||
async def flush_audio(self):
|
||||
pass
|
||||
|
||||
# Converts the text to audio.
|
||||
@abstractmethod
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
@@ -539,6 +535,11 @@ class WebsocketTTSService(TTSService, WebsocketService):
|
||||
TTSService.__init__(self, **kwargs)
|
||||
WebsocketService.__init__(self)
|
||||
|
||||
@abstractmethod
|
||||
async def flush_audio(self):
|
||||
"""Flush any buffered audio in this websocket-based TTS service."""
|
||||
pass
|
||||
|
||||
|
||||
class InterruptibleTTSService(WebsocketTTSService):
|
||||
"""This is a base class for websocket-based TTS services that don't support
|
||||
|
||||
@@ -57,6 +57,17 @@ class AssemblyAISTTService(STTService):
|
||||
logger.info(f"Switching STT language to: [{language}]")
|
||||
self._settings["language"] = language
|
||||
|
||||
async def set_model(self, model: str):
|
||||
"""Set the speech recognition model name for metrics.
|
||||
|
||||
Args:
|
||||
model: Model name for metrics tracking
|
||||
"""
|
||||
self.set_model_name(model)
|
||||
logger.info(
|
||||
f"Set model name to '{model}' for metrics (AssemblyAI real-time API doesn't support model selection)"
|
||||
)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
@@ -90,7 +101,6 @@ class AssemblyAISTTService(STTService):
|
||||
This method sets up the necessary callback functions and initializes the
|
||||
AssemblyAI transcriber.
|
||||
"""
|
||||
|
||||
if self._transcriber:
|
||||
return
|
||||
|
||||
|
||||
@@ -563,6 +563,22 @@ class AzureTTSService(AzureBaseTTSService):
|
||||
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
|
||||
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
|
||||
|
||||
async def flush_audio(self):
|
||||
"""Flush any pending audio in the queue.
|
||||
|
||||
This method clears the audio queue and ensures any pending synthesis
|
||||
is properly cleaned up.
|
||||
"""
|
||||
logger.trace(f"{self}: Flushing audio queue")
|
||||
|
||||
# Clear the queue
|
||||
while not self._audio_queue.empty():
|
||||
try:
|
||||
self._audio_queue.get_nowait()
|
||||
self._audio_queue.task_done()
|
||||
except asyncio.QueueEmpty:
|
||||
break
|
||||
|
||||
def _handle_synthesizing(self, evt):
|
||||
"""Handle audio chunks as they arrive"""
|
||||
if evt.result and evt.result.audio_data:
|
||||
@@ -577,9 +593,6 @@ class AzureTTSService(AzureBaseTTSService):
|
||||
logger.error(f"Speech synthesis canceled: {evt.result.cancellation_details.reason}")
|
||||
self._audio_queue.put_nowait(None)
|
||||
|
||||
async def flush_audio(self):
|
||||
logger.trace(f"{self}: flushing audio")
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"{self}: Generating TTS [{text}]")
|
||||
|
||||
@@ -692,6 +705,55 @@ class AzureSTTService(STTService):
|
||||
self._audio_stream = None
|
||||
self._speech_recognizer = None
|
||||
|
||||
async def set_language(self, language: Language):
|
||||
"""Set the language for speech recognition.
|
||||
|
||||
Args:
|
||||
language: The language to use for recognition
|
||||
"""
|
||||
azure_language = language_to_azure_language(language)
|
||||
if not azure_language:
|
||||
logger.warning(f"Unsupported language for Azure STT: {language}")
|
||||
return
|
||||
|
||||
logger.info(f"Switching STT language to: [{language}] ({azure_language})")
|
||||
|
||||
# Update the speech config language
|
||||
self._speech_config.speech_recognition_language = azure_language
|
||||
|
||||
# Disconnect and reconnect to apply the changes
|
||||
if self._speech_recognizer:
|
||||
# Stop the current recognizer
|
||||
self._speech_recognizer.stop_continuous_recognition_async()
|
||||
self._speech_recognizer = None
|
||||
|
||||
# Reconnect with new settings
|
||||
if self._audio_stream:
|
||||
audio_config = AudioConfig(stream=self._audio_stream)
|
||||
self._speech_recognizer = SpeechRecognizer(
|
||||
speech_config=self._speech_config, audio_config=audio_config
|
||||
)
|
||||
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
|
||||
self._speech_recognizer.start_continuous_recognition_async()
|
||||
logger.debug("Reconnected with new language settings")
|
||||
|
||||
async def set_model(self, model: str):
|
||||
"""Set the speech recognition model.
|
||||
|
||||
In Azure Speech Service, there isn't a direct concept of switching between
|
||||
named models. This method sets the model name for metrics purposes and
|
||||
logs a message, but doesn't change the actual recognition behavior.
|
||||
|
||||
To customize recognition behavior, use speech_config properties instead.
|
||||
|
||||
Args:
|
||||
model: Model name for metrics
|
||||
"""
|
||||
self.set_model_name(model)
|
||||
logger.info(
|
||||
f"Set model name to '{model}' for metrics (Azure STT doesn't support model switching)"
|
||||
)
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
await self.start_processing_metrics()
|
||||
if self._audio_stream:
|
||||
|
||||
@@ -148,6 +148,14 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing websocket: {e}")
|
||||
|
||||
async def flush_audio(self):
|
||||
"""Flush any buffered audio by sending a flush event to Fish Audio."""
|
||||
logger.trace(f"{self}: Flushing audio buffers")
|
||||
if not self._websocket:
|
||||
return
|
||||
flush_message = {"event": "flush"}
|
||||
await self._get_websocket().send(ormsgpack.packb(flush_message))
|
||||
|
||||
def _get_websocket(self):
|
||||
if self._websocket:
|
||||
return self._websocket
|
||||
|
||||
@@ -179,6 +179,17 @@ class GladiaSTTService(STTService):
|
||||
def language_to_service_language(self, language: Language) -> Optional[str]:
|
||||
return language_to_gladia_language(language)
|
||||
|
||||
async def set_language(self, language: Language):
|
||||
"""Placeholder: Set the speech recognition language."""
|
||||
logger.info(f"Set language name to '{language}' has not been implemented yet")
|
||||
|
||||
async def set_model(self, model: str):
|
||||
"""Set the speech recognition model."""
|
||||
self.set_model_name(model)
|
||||
logger.info(
|
||||
f"Set model name to '{model}' for metrics (Gladia doesn't support model switching)"
|
||||
)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
if self._websocket:
|
||||
|
||||
@@ -1727,7 +1727,7 @@ class GoogleSTTService(STTService):
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def set_languages(self, languages: List[Language]):
|
||||
async def set_language(self, languages: List[Language]):
|
||||
"""Update the service's recognition languages.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -170,6 +170,11 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
return self._websocket
|
||||
raise Exception("Websocket not connected")
|
||||
|
||||
async def flush_audio(self):
|
||||
if not self._websocket:
|
||||
return
|
||||
await self._get_websocket().send(json.dumps({"flush": True}))
|
||||
|
||||
async def _receive_messages(self):
|
||||
"""Receive messages from LMNT websocket."""
|
||||
async for message in self._get_websocket():
|
||||
|
||||
@@ -241,6 +241,14 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
await self.stop_all_metrics()
|
||||
self._request_id = None
|
||||
|
||||
async def flush_audio(self):
|
||||
"""Flush any pending audio in the buffer.
|
||||
|
||||
PlayHT's API doesn't provide a mechanism for flushing audio buffers,
|
||||
so this method is a no-op.
|
||||
"""
|
||||
logger.trace(f"{self}: flush_audio is a no-op for PlayHT")
|
||||
|
||||
async def _receive_messages(self):
|
||||
async for message in self._get_websocket():
|
||||
if isinstance(message, bytes):
|
||||
|
||||
Reference in New Issue
Block a user