Compare commits
35 Commits
aleix/pipe
...
hush/hidde
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d175e5e5fc | ||
|
|
6eed6ff779 | ||
|
|
1375211610 | ||
|
|
4e9369a702 | ||
|
|
f9e8748a96 | ||
|
|
20d6bf267a | ||
|
|
b573f9dab2 | ||
|
|
dbc76389d8 | ||
|
|
c27f838444 | ||
|
|
ce84485e26 | ||
|
|
6cf254e2f9 | ||
|
|
02b63c28a5 | ||
|
|
57c6ce7ffa | ||
|
|
2f3272ea2f | ||
|
|
f5c2d57e4b | ||
|
|
baa878272d | ||
|
|
093285868e | ||
|
|
6c9d058ec2 | ||
|
|
5df7be6892 | ||
|
|
2deca816ae | ||
|
|
b8d2fceced | ||
|
|
7596d71460 | ||
|
|
096067b097 | ||
|
|
ec09505f6b | ||
|
|
251ea756c8 | ||
|
|
8f6544efe2 | ||
|
|
6045a8ad8c | ||
|
|
b184d62634 | ||
|
|
1a8d512abb | ||
|
|
a62be8ea32 | ||
|
|
c230d94ff0 | ||
|
|
e7b02773f5 | ||
|
|
af8b4901d4 | ||
|
|
bf664534cc | ||
|
|
4ae045d704 |
2
.github/workflows/publish.yaml
vendored
2
.github/workflows/publish.yaml
vendored
@@ -5,7 +5,7 @@ on:
|
||||
inputs:
|
||||
gitref:
|
||||
type: string
|
||||
description: "what git ref to build"
|
||||
description: "what git tag to build (e.g. v0.0.74)"
|
||||
required: true
|
||||
|
||||
jobs:
|
||||
|
||||
46
CHANGELOG.md
46
CHANGELOG.md
@@ -9,6 +9,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added call hang-up error handling in `TwilioFrameSerializer`, which handles
|
||||
the case where the user has hung up before the `TwilioFrameSerializer` hangs
|
||||
up the call.
|
||||
|
||||
### Changed
|
||||
|
||||
- The `UserIdleProcessor` now handles the scenario where function calls take
|
||||
longer than the idle timeout duration. This allows you to use the
|
||||
`UserIdleProcessor` in conjunction with function calls that take a while to
|
||||
return a result.
|
||||
|
||||
### Performance
|
||||
|
||||
- Remove unncessary push task in each `FrameProcessor`.
|
||||
|
||||
## [0.0.74] - 2025-07-03
|
||||
|
||||
### Added
|
||||
|
||||
- Added a new STT service, `SpeechmaticsSTTService`. This service provides
|
||||
real-time speech-to-text transcription using the Speechmatics API. It supports
|
||||
partial and final transcriptions, multiple languages, various audio formats,
|
||||
and speaker diarization.
|
||||
|
||||
- Added `normalize` and `model_id` to `FishAudioTTSService`.
|
||||
|
||||
- Added `http_options` argument to `GoogleLLMService`.
|
||||
|
||||
- Added `run_llm` field to `LLMMessagesAppendFrame` and `LLMMessagesUpdateFrame`
|
||||
frames. If true, a context frame will be pushed triggering the LLM to respond.
|
||||
|
||||
@@ -50,9 +78,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
tools = ToolsSchema(standard_tools=[do_something])
|
||||
```
|
||||
|
||||
- `user_id` is now populated in the `TranscriptionFrame` and
|
||||
`InterimTranscriptionFrame` when using a transport that provides a
|
||||
`user_id`, like `DailyTransport` or `LiveKitTransport`.
|
||||
- `user_id` is now populated in the `TranscriptionFrame` and
|
||||
`InterimTranscriptionFrame` when using a transport that provides a `user_id`,
|
||||
like `DailyTransport` or `LiveKitTransport`.
|
||||
|
||||
- Added `watchdog_coroutine()`. This is a watchdog helper for couroutines. So,
|
||||
if you have a coroutine that is waiting for a result and that takes a long
|
||||
@@ -61,6 +89,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
- Added `session_token` parameter to `AWSNovaSonicLLMService`.
|
||||
|
||||
- Added Gemini Multimodal Live File API for uploading, fetching, listing, and
|
||||
deleting files. See `26f-gemini-multimodal-live-files-api.py` for example usage.
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated all the services to use the new `SOXRStreamAudioResampler`, ensuring smooth
|
||||
@@ -72,7 +103,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue where audio would get stuck in the queue when an interrupt occurs
|
||||
- Fixed an issue where audio would get stuck in the queue when an interrupt occurs
|
||||
during Azure TTS synthesis.
|
||||
|
||||
- Fixed a race condition that occurs in Python 3.10+ where the task could miss
|
||||
@@ -80,6 +111,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
- Fixed a `AWSNovaSonicLLMService` issue introduced in 0.0.72.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- In `FishAudioTTSService`, deprecated `model` and replaced with
|
||||
`reference_id`. This change is to better align with Fish Audio's variable
|
||||
naming and to reduce confusion about what functionality the variable
|
||||
controls.
|
||||
|
||||
## [0.0.73] - 2025-06-26
|
||||
|
||||
### Fixed
|
||||
|
||||
26
README.md
26
README.md
@@ -51,19 +51,19 @@ You can connect to Pipecat from any platform using our official SDKs:
|
||||
|
||||
## 🧩 Available services
|
||||
|
||||
| Category | Services |
|
||||
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova) [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
|
||||
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
|
||||
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
|
||||
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
|
||||
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
|
||||
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
|
||||
| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
|
||||
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
|
||||
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
|
||||
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) |
|
||||
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
|
||||
| Category | Services |
|
||||
| ------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
|
||||
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
|
||||
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
|
||||
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
|
||||
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
|
||||
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
|
||||
| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
|
||||
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
|
||||
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
|
||||
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) |
|
||||
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
|
||||
|
||||
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@ pipecat-ai[sambanova]
|
||||
pipecat-ai[silero]
|
||||
pipecat-ai[simli]
|
||||
pipecat-ai[soundfile]
|
||||
pipecat-ai[speechmatics]
|
||||
pipecat-ai[tavus]
|
||||
pipecat-ai[together]
|
||||
# pipecat-ai[ultravox] # Mocked
|
||||
|
||||
@@ -109,6 +109,10 @@ MINIMAX_GROUP_ID=...
|
||||
# Sarvam AI
|
||||
SARVAM_API_KEY=...
|
||||
|
||||
# Speechmatics
|
||||
SPEECHMATICS_API_KEY=...
|
||||
|
||||
|
||||
# SambaNova
|
||||
SAMBANOVA_API_KEY=...
|
||||
|
||||
|
||||
153
examples/foundational/07a-interruptible-speechmatics.py
Normal file
153
examples/foundational/07a-interruptible-speechmatics.py
Normal file
@@ -0,0 +1,153 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
|
||||
from pipecat.services.openai.base_llm import BaseOpenAILLMService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
|
||||
from pipecat.transports.services.daily import DailyParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
|
||||
"""Run example using Speechmatics STT.
|
||||
|
||||
This example will use diarization within our STT service and output the words spoken by
|
||||
each individual speaker and wrap them with XML tags for the LLM to process. Note the
|
||||
instructions in the system context for the LLM. This greatly improves the conversation
|
||||
experience by allowing the LLM to understand who is speaking in a multi-party call.
|
||||
|
||||
If you do not wish to use diarization, then set the `enable_speaker_diarization` parameter
|
||||
to `False` or omit it altogether. The `text_format` will only be used if diarization is enabled.
|
||||
|
||||
By default, this example will use our ENHANCED operating point, which is optimized for
|
||||
high accuracy. You can change this by setting the `operating_point` parameter to a different
|
||||
value.
|
||||
|
||||
For more information on operating points, see the Speechmatics documentation:
|
||||
https://docs.speechmatics.com/rt-api-ref
|
||||
"""
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = SpeechmaticsSTTService(
|
||||
api_key=os.getenv("SPEECHMATICS_API_KEY"),
|
||||
language=Language.EN,
|
||||
enable_speaker_diarization=True,
|
||||
text_format="<{speaker_id}>{text}</{speaker_id}>",
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
model="eleven_turbo_v2_5",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
params=BaseOpenAILLMService.InputParams(temperature=0.75),
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
"You are a helpful British assistant called Alfred. "
|
||||
"Your goal is to demonstrate your capabilities in a succinct way. "
|
||||
"Your output will be converted to audio so don't include special characters in your answers. "
|
||||
"Always include punctuation in your responses. "
|
||||
"Give very short replies - do not give longer replies unless strictly necessary. "
|
||||
"Respond to what the user said in a concise, funny, creative and helpful way. "
|
||||
"Use `<Sn/>` tags to identify different speakers - do not use tags in your replies."
|
||||
),
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(aggregation_timeout=0.005),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Say a short hello to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.examples.run import main
|
||||
|
||||
main(run_example, transport_params=transport_params)
|
||||
@@ -35,7 +35,7 @@ transport_params = {
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: TransportParams(
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
|
||||
89
examples/foundational/13h-speechmatics-transcription.py
Normal file
89
examples/foundational/13h-speechmatics-transcription.py
Normal file
@@ -0,0 +1,89 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import Frame, TranscriptionFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
|
||||
from pipecat.transports.services.daily import DailyParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
class TranscriptionLogger(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
print(f"Transcription: {frame.text}")
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(audio_in_enabled=True),
|
||||
"twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True),
|
||||
"webrtc": lambda: TransportParams(audio_in_enabled=True),
|
||||
}
|
||||
|
||||
|
||||
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
|
||||
"""Run example using Speechmatics STT.
|
||||
|
||||
This example will use diarization within our STT service and output the words spoken by
|
||||
each individual speaker and wrap them with XML tags.
|
||||
|
||||
If you do not wish to use diarization, then set the `enable_speaker_diarization` parameter
|
||||
to `False` or omit it altogether. The `text_format` will only be used if diarization is enabled.
|
||||
|
||||
By default, this example will use our ENHANCED operating point, which is optimized for
|
||||
high accuracy. You can change this by setting the `operating_point` parameter to a different
|
||||
value.
|
||||
|
||||
For more information on operating points, see the Speechmatics documentation:
|
||||
https://docs.speechmatics.com/rt-api-ref
|
||||
"""
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = SpeechmaticsSTTService(
|
||||
api_key=os.getenv("SPEECHMATICS_API_KEY"),
|
||||
language=Language.EN,
|
||||
enable_speaker_diarization=True,
|
||||
text_format="<{speaker_id}>{text}</{speaker_id}>",
|
||||
)
|
||||
|
||||
tl = TranscriptionLogger()
|
||||
|
||||
pipeline = Pipeline([transport.input(), stt, tl])
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.examples.run import main
|
||||
|
||||
main(run_example, transport_params=transport_params)
|
||||
@@ -42,7 +42,7 @@ transport_params = {
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: TransportParams(
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
|
||||
@@ -33,7 +33,7 @@ transport_params = {
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: TransportParams(
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
|
||||
@@ -55,7 +55,7 @@ transport_params = {
|
||||
# endpointing, for now.
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
|
||||
),
|
||||
"twilio": lambda: TransportParams(
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
# set stop_secs to something roughly similar to the internal setting
|
||||
|
||||
@@ -18,10 +18,10 @@ from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.gemini_multimodal_live.gemini import (
|
||||
GeminiMultimodalLiveContext,
|
||||
GeminiMultimodalLiveLLMService,
|
||||
)
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
|
||||
from pipecat.transports.services.daily import DailyParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -24,6 +24,7 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
|
||||
from pipecat.transports.services.daily import DailyParams
|
||||
from pipecat.utils.tracing.setup import setup_tracing
|
||||
|
||||
@@ -61,7 +62,7 @@ transport_params = {
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: TransportParams(
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
|
||||
@@ -24,6 +24,7 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
|
||||
from pipecat.transports.services.daily import DailyParams
|
||||
from pipecat.utils.tracing.setup import setup_tracing
|
||||
|
||||
@@ -58,7 +59,7 @@ transport_params = {
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: TransportParams(
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
|
||||
@@ -4,18 +4,6 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""OpenAI Bot Implementation.
|
||||
|
||||
This module implements a chatbot using OpenAI's GPT-4 model for natural language
|
||||
processing. It includes:
|
||||
- Real-time audio/video interaction through Daily
|
||||
- Animated robot avatar
|
||||
- Text-to-speech using ElevenLabs
|
||||
- Support for both English and Spanish
|
||||
|
||||
The bot runs as part of a pipeline that processes audio/video frames and manages
|
||||
the conversation flow.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
@@ -24,150 +12,72 @@ import sys
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from PIL import Image
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
Frame,
|
||||
OutputImageRawFrame,
|
||||
SpriteFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.transports.services.helpers.daily_rest import (
|
||||
DailyMeetingTokenParams,
|
||||
DailyMeetingTokenProperties,
|
||||
DailyRESTHelper,
|
||||
DailyRoomParams,
|
||||
)
|
||||
|
||||
load_dotenv(override=True)
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
sprites = []
|
||||
script_dir = os.path.dirname(__file__)
|
||||
|
||||
# Load sequential animation frames
|
||||
for i in range(1, 26):
|
||||
# Build the full path to the image file
|
||||
full_path = os.path.join(script_dir, f"assets/robot0{i}.png")
|
||||
# Get the filename without the extension to use as the dictionary key
|
||||
# Open the image and convert it to bytes
|
||||
with Image.open(full_path) as img:
|
||||
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
|
||||
|
||||
# Create a smooth animation by adding reversed frames
|
||||
flipped = sprites[::-1]
|
||||
sprites.extend(flipped)
|
||||
|
||||
# Define static and animated states
|
||||
quiet_frame = sprites[0] # Static frame for when bot is listening
|
||||
talking_frame = SpriteFrame(images=sprites) # Animation sequence for when bot is talking
|
||||
|
||||
|
||||
class TalkingAnimation(FrameProcessor):
|
||||
"""Manages the bot's visual animation states.
|
||||
|
||||
Switches between static (listening) and animated (talking) states based on
|
||||
the bot's current speaking status.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._is_talking = False
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process incoming frames and update animation state.
|
||||
|
||||
Args:
|
||||
frame: The incoming frame to process
|
||||
direction: The direction of frame flow in the pipeline
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Switch to talking animation when bot starts speaking
|
||||
if isinstance(frame, BotStartedSpeakingFrame):
|
||||
if not self._is_talking:
|
||||
await self.push_frame(talking_frame)
|
||||
self._is_talking = True
|
||||
# Return to static frame when bot stops speaking
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
await self.push_frame(quiet_frame)
|
||||
self._is_talking = False
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main bot execution function.
|
||||
|
||||
Sets up and runs the bot pipeline including:
|
||||
- Daily video transport
|
||||
- Speech-to-text and text-to-speech services
|
||||
- Language model integration
|
||||
- Animation processing
|
||||
- RTVI event handling
|
||||
"""
|
||||
"""Main bot execution function."""
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY"),
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=session,
|
||||
)
|
||||
|
||||
room = await daily_rest_helper.create_room(
|
||||
DailyRoomParams(properties={"enable_prejoin_ui": False})
|
||||
)
|
||||
|
||||
token_params = DailyMeetingTokenParams(
|
||||
properties=DailyMeetingTokenProperties(
|
||||
is_owner=True,
|
||||
permissions={
|
||||
"hasPresence": False, # Example: join as a hidden participant
|
||||
},
|
||||
start_video_off=True,
|
||||
start_audio_off=True,
|
||||
)
|
||||
)
|
||||
|
||||
token = await daily_rest_helper.get_token(room_url=room.url, params=token_params)
|
||||
|
||||
# Set up Daily transport with video/audio parameters
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
room.url,
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_out_enabled=True,
|
||||
video_out_width=1024,
|
||||
video_out_height=576,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
#
|
||||
# Spanish
|
||||
#
|
||||
# transcription_settings=DailyTranscriptionSettings(
|
||||
# language="es",
|
||||
# tier="nova",
|
||||
# model="2-general"
|
||||
# )
|
||||
),
|
||||
)
|
||||
|
||||
# Initialize text-to-speech service
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
#
|
||||
# English
|
||||
#
|
||||
voice_id="pNInz6obpgDQGcFmaJgB",
|
||||
#
|
||||
# Spanish
|
||||
#
|
||||
# model="eleven_multilingual_v2",
|
||||
# voice_id="gD1IexrzCvsXPHUuT0s3",
|
||||
)
|
||||
|
||||
# Initialize LLM service
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
#
|
||||
# English
|
||||
#
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
|
||||
#
|
||||
# Spanish
|
||||
#
|
||||
# "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.",
|
||||
"content": "Summerize the conversation so far in a single sentence.",
|
||||
},
|
||||
]
|
||||
|
||||
@@ -176,8 +86,6 @@ async def main():
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
ta = TalkingAnimation()
|
||||
|
||||
#
|
||||
# RTVI events for Pipecat client UI
|
||||
#
|
||||
@@ -189,8 +97,6 @@ async def main():
|
||||
rtvi,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
ta,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
@@ -204,7 +110,6 @@ async def main():
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
await task.queue_frame(quiet_frame)
|
||||
|
||||
@rtvi.event_handler("on_client_ready")
|
||||
async def on_client_ready(rtvi):
|
||||
|
||||
@@ -87,6 +87,7 @@ remote-smart-turn = []
|
||||
silero = [ "onnxruntime~=1.20.1" ]
|
||||
simli = [ "simli-ai~=0.1.10"]
|
||||
soundfile = [ "soundfile~=0.13.0" ]
|
||||
speechmatics = [ "speechmatics-rt>=0.3.1" ]
|
||||
tavus=[]
|
||||
together = []
|
||||
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]
|
||||
|
||||
@@ -49,7 +49,7 @@ python run-release-evals.py -p 07 -a -v
|
||||
You can also run evals for a single example (not part of the release set):
|
||||
|
||||
```sh
|
||||
python run-eval.py YOUR_EXAMPLE_SCRIPT -a -v
|
||||
python run-eval.py -p "A simple math addition" -a -v YOUR_EXAMPLE_SCRIPT
|
||||
```
|
||||
|
||||
Your script needs to follow any of the foundation examples pattern.
|
||||
|
||||
@@ -100,17 +100,18 @@ class EvalRunner:
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
await asyncio.wait(
|
||||
[
|
||||
asyncio.create_task(run_example_pipeline(script_path)),
|
||||
asyncio.create_task(run_eval_pipeline(self, example_file, prompt, eval)),
|
||||
],
|
||||
timeout=90,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
tasks = [
|
||||
asyncio.create_task(run_example_pipeline(script_path)),
|
||||
asyncio.create_task(run_eval_pipeline(self, example_file, prompt, eval)),
|
||||
]
|
||||
_, pending = await asyncio.wait(tasks, timeout=90)
|
||||
if pending:
|
||||
logger.error(f"ERROR: Eval timeout expired, cancelling pending tasks...")
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
await asyncio.gather(*pending, return_exceptions=True)
|
||||
except Exception as e:
|
||||
print(f"ERROR: Unable to run {example_file}: {e}")
|
||||
logger.error(f"ERROR: Unable to run {example_file}: {e}")
|
||||
|
||||
try:
|
||||
result = await asyncio.wait_for(self._queue.get(), timeout=1.0)
|
||||
@@ -134,6 +135,7 @@ class EvalRunner:
|
||||
async def save_audio(self, name: str, audio: bytes, sample_rate: int, num_channels: int):
|
||||
if len(audio) > 0:
|
||||
filename = self._recording_file_name(name)
|
||||
logger.debug(f"Saving {name} audio to {filename}")
|
||||
with io.BytesIO() as buffer:
|
||||
with wave.open(buffer, "wb") as wf:
|
||||
wf.setsampwidth(2)
|
||||
@@ -142,7 +144,6 @@ class EvalRunner:
|
||||
wf.writeframes(audio)
|
||||
async with aiofiles.open(filename, "wb") as file:
|
||||
await file.write(buffer.getvalue())
|
||||
logger.debug(f"Saving {name} audio to {filename}")
|
||||
else:
|
||||
logger.warning(f"There's no audio to save for {name}")
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ TESTS_07 = [
|
||||
# 07 series
|
||||
("07-interruptible.py", PROMPT_SIMPLE_MATH, None),
|
||||
("07-interruptible-cartesia-http.py", PROMPT_SIMPLE_MATH, None),
|
||||
("07a-interruptible-speechmatics.py", PROMPT_SIMPLE_MATH, None),
|
||||
("07b-interruptible-langchain.py", PROMPT_SIMPLE_MATH, None),
|
||||
("07c-interruptible-deepgram.py", PROMPT_SIMPLE_MATH, None),
|
||||
("07d-interruptible-elevenlabs.py", PROMPT_SIMPLE_MATH, None),
|
||||
|
||||
@@ -64,6 +64,7 @@ class DTMFAggregator(FrameProcessor):
|
||||
|
||||
self._digit_event = asyncio.Event()
|
||||
self._aggregation_task: Optional[asyncio.Task] = None
|
||||
self._interruption_task: Optional[asyncio.Task] = None
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
|
||||
"""Process incoming frames and handle DTMF aggregation.
|
||||
@@ -81,6 +82,7 @@ class DTMFAggregator(FrameProcessor):
|
||||
if self._aggregation:
|
||||
await self._flush_aggregation()
|
||||
await self._stop_aggregation_task()
|
||||
await self._stop_interruption_task()
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, InputDTMFFrame):
|
||||
# Push the DTMF frame downstream first
|
||||
@@ -100,7 +102,7 @@ class DTMFAggregator(FrameProcessor):
|
||||
|
||||
# For first digit, schedule interruption in separate task
|
||||
if is_first_digit:
|
||||
asyncio.create_task(self._send_interruption_task())
|
||||
self._interruption_task = self.create_task(self._send_interruption_task())
|
||||
|
||||
# Check for immediate flush conditions
|
||||
if frame.button == self._termination_digit:
|
||||
@@ -111,12 +113,13 @@ class DTMFAggregator(FrameProcessor):
|
||||
|
||||
async def _send_interruption_task(self):
|
||||
"""Send interruption frame safely in a separate task."""
|
||||
try:
|
||||
# Send the interruption frame
|
||||
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
|
||||
except Exception as e:
|
||||
# Log error but don't propagate
|
||||
print(f"Error sending interruption: {e}")
|
||||
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
async def _stop_interruption_task(self) -> None:
|
||||
"""Stops the interruption task."""
|
||||
if self._interruption_task:
|
||||
await self.cancel_task(self._interruption_task)
|
||||
self._interruption_task = None
|
||||
|
||||
def _create_aggregation_task(self) -> None:
|
||||
"""Creates the aggregation task if it hasn't been created yet."""
|
||||
|
||||
@@ -152,11 +152,6 @@ class FrameProcessor(BaseObject):
|
||||
self.__input_event = None
|
||||
self.__input_frame_task: Optional[asyncio.Task] = None
|
||||
|
||||
# Every processor in Pipecat should only output frames from a single
|
||||
# task. This avoid problems like audio overlapping. System frames are the
|
||||
# exception to this rule. This create this task.
|
||||
self.__push_frame_task: Optional[asyncio.Task] = None
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
"""Get the unique identifier for this processor.
|
||||
@@ -385,7 +380,6 @@ class FrameProcessor(BaseObject):
|
||||
"""Clean up processor resources."""
|
||||
await super().cleanup()
|
||||
await self.__cancel_input_task()
|
||||
await self.__cancel_push_task()
|
||||
if self._metrics is not None:
|
||||
await self._metrics.cleanup()
|
||||
|
||||
@@ -512,10 +506,7 @@ class FrameProcessor(BaseObject):
|
||||
if not self._check_started(frame):
|
||||
return
|
||||
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
else:
|
||||
await self.__push_queue.put((frame, direction))
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
|
||||
async def __start(self, frame: StartFrame):
|
||||
"""Handle the start frame to initialize processor state.
|
||||
@@ -530,7 +521,6 @@ class FrameProcessor(BaseObject):
|
||||
self._interruption_strategies = frame.interruption_strategies
|
||||
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
|
||||
self.__create_input_task()
|
||||
self.__create_push_task()
|
||||
|
||||
async def __cancel(self, frame: CancelFrame):
|
||||
"""Handle the cancel frame to stop processor operation.
|
||||
@@ -540,7 +530,6 @@ class FrameProcessor(BaseObject):
|
||||
"""
|
||||
self._cancelling = True
|
||||
await self.__cancel_input_task()
|
||||
await self.__cancel_push_task()
|
||||
|
||||
async def __pause(self, frame: FrameProcessorPauseFrame | FrameProcessorPauseUrgentFrame):
|
||||
"""Handle pause frame to pause processor operation.
|
||||
@@ -567,9 +556,6 @@ class FrameProcessor(BaseObject):
|
||||
async def _start_interruption(self):
|
||||
"""Start handling an interruption by canceling current tasks."""
|
||||
try:
|
||||
# Cancel the push frame task. This will stop pushing frames downstream.
|
||||
await self.__cancel_push_task()
|
||||
|
||||
# Cancel the input task. This will stop processing queued frames.
|
||||
await self.__cancel_input_task()
|
||||
except Exception as e:
|
||||
@@ -579,9 +565,6 @@ class FrameProcessor(BaseObject):
|
||||
# Create a new input queue and task.
|
||||
self.__create_input_task()
|
||||
|
||||
# Create a new output queue and task.
|
||||
self.__create_push_task()
|
||||
|
||||
async def _stop_interruption(self):
|
||||
"""Stop handling an interruption."""
|
||||
# Nothing to do right now.
|
||||
@@ -677,23 +660,3 @@ class FrameProcessor(BaseObject):
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
finally:
|
||||
self.__input_queue.task_done()
|
||||
|
||||
def __create_push_task(self):
|
||||
"""Create the frame pushing task."""
|
||||
if not self.__push_frame_task:
|
||||
self.__push_queue = WatchdogQueue(self.task_manager)
|
||||
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
|
||||
|
||||
async def __cancel_push_task(self):
|
||||
"""Cancel the frame pushing task."""
|
||||
if self.__push_frame_task:
|
||||
self.__push_queue.cancel()
|
||||
await self.cancel_task(self.__push_frame_task)
|
||||
self.__push_frame_task = None
|
||||
|
||||
async def __push_frame_task_handler(self):
|
||||
"""Handle frames from the push queue."""
|
||||
while True:
|
||||
(frame, direction) = await self.__push_queue.get()
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
self.__push_queue.task_done()
|
||||
|
||||
@@ -1005,6 +1005,10 @@ class RTVIProcessor(FrameProcessor):
|
||||
):
|
||||
"""Handle the start of a function call from the LLM.
|
||||
|
||||
.. deprecated:: 0.0.66
|
||||
This method is deprecated and will be removed in a future version.
|
||||
Use `RTVIProcessor.handle_function_call()` instead.
|
||||
|
||||
Args:
|
||||
function_name: Name of the function being called.
|
||||
llm: The LLM processor making the call.
|
||||
|
||||
@@ -15,6 +15,8 @@ from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
StartFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
@@ -168,6 +170,13 @@ class UserIdleProcessor(FrameProcessor):
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, BotSpeakingFrame):
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, FunctionCallInProgressFrame):
|
||||
# Function calls can take longer than the timeout, so we want to prevent idle callbacks
|
||||
self._interrupted = True
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, FunctionCallResultFrame):
|
||||
self._interrupted = False
|
||||
self._idle_event.set()
|
||||
|
||||
async def cleanup(self) -> None:
|
||||
"""Cleans up resources when processor is shutting down."""
|
||||
|
||||
@@ -185,8 +185,26 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
async with session.post(endpoint, auth=auth, data=params) as response:
|
||||
if response.status == 200:
|
||||
logger.info(f"Successfully terminated Twilio call {call_sid}")
|
||||
elif response.status == 404:
|
||||
# Handle the case where the call has already ended
|
||||
# Error code 20404: "The requested resource was not found"
|
||||
# Source: https://www.twilio.com/docs/errors/20404
|
||||
try:
|
||||
error_data = await response.json()
|
||||
if error_data.get("code") == 20404:
|
||||
logger.debug(f"Twilio call {call_sid} was already terminated")
|
||||
return
|
||||
except:
|
||||
pass # Fall through to log the raw error
|
||||
|
||||
# Log other 404 errors
|
||||
error_text = await response.text()
|
||||
logger.error(
|
||||
f"Failed to terminate Twilio call {call_sid}: "
|
||||
f"Status {response.status}, Response: {error_text}"
|
||||
)
|
||||
else:
|
||||
# Get the error details for better debugging
|
||||
# Log other errors
|
||||
error_text = await response.text()
|
||||
logger.error(
|
||||
f"Failed to terminate Twilio call {call_sid}: "
|
||||
|
||||
@@ -58,12 +58,14 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
Parameters:
|
||||
language: Language for synthesis. Defaults to English.
|
||||
latency: Latency mode ("normal" or "balanced"). Defaults to "normal".
|
||||
normalize: Whether to normalize audio output. Defaults to True.
|
||||
prosody_speed: Speech speed multiplier (0.5-2.0). Defaults to 1.0.
|
||||
prosody_volume: Volume adjustment in dB. Defaults to 0.
|
||||
"""
|
||||
|
||||
language: Optional[Language] = Language.EN
|
||||
latency: Optional[str] = "normal" # "normal" or "balanced"
|
||||
normalize: Optional[bool] = True
|
||||
prosody_speed: Optional[float] = 1.0 # Speech speed (0.5-2.0)
|
||||
prosody_volume: Optional[int] = 0 # Volume adjustment in dB
|
||||
|
||||
@@ -71,7 +73,9 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
model: str, # This is the reference_id
|
||||
reference_id: Optional[str] = None, # This is the voice ID
|
||||
model: Optional[str] = None, # Deprecated
|
||||
model_id: str = "speech-1.5",
|
||||
output_format: FishAudioOutputFormat = "pcm",
|
||||
sample_rate: Optional[int] = None,
|
||||
params: Optional[InputParams] = None,
|
||||
@@ -81,7 +85,14 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
|
||||
Args:
|
||||
api_key: Fish Audio API key for authentication.
|
||||
model: Reference ID of the voice model to use for synthesis.
|
||||
reference_id: Reference ID of the voice model to use for synthesis.
|
||||
model: Deprecated. Reference ID of the voice model to use for synthesis.
|
||||
|
||||
.. deprecated:: 0.0.74
|
||||
The `model` parameter is deprecated and will be removed in version 0.1.0.
|
||||
Use `reference_id` instead to specify the voice model.
|
||||
|
||||
model_id: Specify which Fish Audio TTS model to use (e.g. "speech-1.5")
|
||||
output_format: Audio output format. Defaults to "pcm".
|
||||
sample_rate: Audio sample rate. If None, uses default.
|
||||
params: Additional input parameters for voice customization.
|
||||
@@ -96,6 +107,26 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
|
||||
params = params or FishAudioTTSService.InputParams()
|
||||
|
||||
# Validation for model and reference_id parameters
|
||||
if model and reference_id:
|
||||
raise ValueError(
|
||||
"Cannot specify both 'model' and 'reference_id'. Use 'reference_id' only."
|
||||
)
|
||||
|
||||
if model is None and reference_id is None:
|
||||
raise ValueError("Must specify 'reference_id' (or deprecated 'model') parameter.")
|
||||
|
||||
if model:
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"Parameter 'model' is deprecated and will be removed in a future version. "
|
||||
"Use 'reference_id' instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
reference_id = model
|
||||
|
||||
self._api_key = api_key
|
||||
self._base_url = "wss://api.fish.audio/v1/tts/live"
|
||||
self._websocket = None
|
||||
@@ -107,14 +138,15 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
"sample_rate": 0,
|
||||
"latency": params.latency,
|
||||
"format": output_format,
|
||||
"normalize": params.normalize,
|
||||
"prosody": {
|
||||
"speed": params.prosody_speed,
|
||||
"volume": params.prosody_volume,
|
||||
},
|
||||
"reference_id": model,
|
||||
"reference_id": reference_id,
|
||||
}
|
||||
|
||||
self.set_model_name(model)
|
||||
self.set_model_name(model_id)
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
@@ -125,14 +157,15 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
return True
|
||||
|
||||
async def set_model(self, model: str):
|
||||
"""Set the TTS model (reference ID).
|
||||
"""Set the TTS model and reconnect.
|
||||
|
||||
Args:
|
||||
model: The reference ID of the voice model to use.
|
||||
model: The model name to use for synthesis.
|
||||
"""
|
||||
self._settings["reference_id"] = model
|
||||
await super().set_model(model)
|
||||
logger.info(f"Switching TTS model to: [{model}]")
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the Fish Audio TTS service.
|
||||
@@ -182,6 +215,7 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
|
||||
logger.debug("Connecting to Fish Audio")
|
||||
headers = {"Authorization": f"Bearer {self._api_key}"}
|
||||
headers["model"] = self.model_name
|
||||
self._websocket = await websockets.connect(self._base_url, extra_headers=headers)
|
||||
|
||||
# Send initial start message with ormsgpack
|
||||
|
||||
@@ -572,9 +572,6 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
# Initialize the File API client
|
||||
self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url)
|
||||
|
||||
# Initialize the File API client
|
||||
self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url)
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if the service can generate usage metrics.
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ try:
|
||||
FunctionCall,
|
||||
FunctionResponse,
|
||||
GenerateContentConfig,
|
||||
HttpOptions,
|
||||
Part,
|
||||
)
|
||||
except ModuleNotFoundError as e:
|
||||
@@ -678,6 +679,7 @@ class GoogleLLMService(LLMService):
|
||||
system_instruction: Optional[str] = None,
|
||||
tools: Optional[List[Dict[str, Any]]] = None,
|
||||
tool_config: Optional[Dict[str, Any]] = None,
|
||||
http_options: Optional[HttpOptions] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the Google LLM service.
|
||||
@@ -689,6 +691,7 @@ class GoogleLLMService(LLMService):
|
||||
system_instruction: System instruction/prompt for the model.
|
||||
tools: List of available tools/functions.
|
||||
tool_config: Configuration for tool usage.
|
||||
http_options: HTTP options for the client.
|
||||
**kwargs: Additional arguments passed to parent class.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
@@ -698,7 +701,8 @@ class GoogleLLMService(LLMService):
|
||||
self.set_model_name(model)
|
||||
self._api_key = api_key
|
||||
self._system_instruction = system_instruction
|
||||
self._create_client(api_key)
|
||||
self._http_options = http_options
|
||||
self._create_client(api_key, http_options)
|
||||
self._settings = {
|
||||
"max_tokens": params.max_tokens,
|
||||
"temperature": params.temperature,
|
||||
@@ -717,6 +721,9 @@ class GoogleLLMService(LLMService):
|
||||
"""
|
||||
return True
|
||||
|
||||
def _create_client(self, api_key: str, http_options: Optional[HttpOptions] = None):
|
||||
self._client = genai.Client(api_key=api_key, http_options=http_options)
|
||||
|
||||
def needs_mcp_alternate_schema(self) -> bool:
|
||||
"""Check if this LLM service requires alternate MCP schema.
|
||||
|
||||
@@ -728,9 +735,6 @@ class GoogleLLMService(LLMService):
|
||||
"""
|
||||
return True
|
||||
|
||||
def _create_client(self, api_key: str):
|
||||
self._client = genai.Client(api_key=api_key)
|
||||
|
||||
def _maybe_unset_thinking_budget(self, generation_params: Dict[str, Any]):
|
||||
try:
|
||||
# There's no way to introspect on model capabilities, so
|
||||
|
||||
5
src/pipecat/services/speechmatics/__init__.py
Normal file
5
src/pipecat/services/speechmatics/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
813
src/pipecat/services/speechmatics/stt.py
Normal file
813
src/pipecat/services/speechmatics/stt.py
Normal file
@@ -0,0 +1,813 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Speechmatics STT service integration."""
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, AsyncGenerator, Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.services.stt_service import STTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.tracing.service_decorators import traced_stt
|
||||
|
||||
try:
|
||||
from speechmatics.rt import (
|
||||
AsyncClient,
|
||||
AudioEncoding,
|
||||
AudioFormat,
|
||||
ConversationConfig,
|
||||
OperatingPoint,
|
||||
ServerMessageType,
|
||||
SpeakerDiarizationConfig,
|
||||
TranscriptionConfig,
|
||||
__version__,
|
||||
)
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use Speechmatics, you need to `pip install pipecat-ai[speechmatics]`."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class AudioBuffer:
|
||||
"""Audio buffer for STT clients.
|
||||
|
||||
The Python SDK expects audio in a pre-defined number of frames. This
|
||||
buffer will accumulate the data from the pipeline and provide it to the
|
||||
STT client in the correct lengths, waiting for the number of frames to
|
||||
be available.
|
||||
"""
|
||||
|
||||
def __init__(self, maxsize: int = 0):
|
||||
"""Initialize the audio buffer.
|
||||
|
||||
Args:
|
||||
maxsize: Maximum size of the buffer.
|
||||
"""
|
||||
self._queue = asyncio.Queue(maxsize=maxsize)
|
||||
self._current_chunk = b""
|
||||
self._position = 0
|
||||
self._closed = False
|
||||
|
||||
def write_audio(self, data: bytes) -> None:
|
||||
"""Write audio data to the buffer (thread-safe).
|
||||
|
||||
Args:
|
||||
data: Audio data to write.
|
||||
"""
|
||||
if data:
|
||||
try:
|
||||
self._queue.put_nowait(data)
|
||||
except asyncio.QueueFull:
|
||||
pass
|
||||
|
||||
async def read(self, size: int) -> bytes:
|
||||
"""Read exactly `size` bytes from the buffer (thread-safe).
|
||||
|
||||
This process will block until the required number of bytes are available
|
||||
in the buffer. Audio is received from the pipeline in varying sizes, so
|
||||
this buffer will accumulate the data and provide it to the STT client in
|
||||
the correct lengths, waiting for the number of frames to be available.
|
||||
|
||||
Calling stop() will close the buffer and release the blocking read
|
||||
process.
|
||||
|
||||
Args:
|
||||
size: Number of bytes to read.
|
||||
|
||||
Returns:
|
||||
bytes: Audio data read from the buffer.
|
||||
"""
|
||||
result = b""
|
||||
bytes_needed = size
|
||||
|
||||
while bytes_needed > 0 and not self._closed:
|
||||
# Use data from current chunk if available
|
||||
if self._position < len(self._current_chunk):
|
||||
available = len(self._current_chunk) - self._position
|
||||
take = min(bytes_needed, available)
|
||||
result += self._current_chunk[self._position : self._position + take]
|
||||
self._position += take
|
||||
bytes_needed -= take
|
||||
continue
|
||||
|
||||
# Get next chunk
|
||||
try:
|
||||
chunk = await asyncio.wait_for(self._queue.get(), timeout=0.1)
|
||||
if chunk is None:
|
||||
continue
|
||||
self._current_chunk = chunk
|
||||
self._position = 0
|
||||
except asyncio.TimeoutError:
|
||||
await asyncio.sleep(0)
|
||||
continue
|
||||
|
||||
return result
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Close the audio buffer."""
|
||||
self._closed = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class SpeechFragment:
|
||||
"""Fragment of an utterance.
|
||||
|
||||
Parameters:
|
||||
start_time: Start time of the fragment in seconds (from session start).
|
||||
end_time: End time of the fragment in seconds (from session start).
|
||||
language: Language of the fragment. Defaults to `Language.EN`.
|
||||
is_eos: Whether the fragment is the end of a sentence. Defaults to `False`.
|
||||
is_final: Whether the fragment is the final fragment. Defaults to `False`.
|
||||
attaches_to: Whether the fragment attaches to the previous or next fragment (punctuation). Defaults to empty string.
|
||||
content: Content of the fragment. Defaults to empty string.
|
||||
speaker: Speaker of the fragment (if diarization is enabled). Defaults to `None`.
|
||||
confidence: Confidence of the fragment (0.0 to 1.0). Defaults to `1.0`.
|
||||
result: Raw result of the fragment from the TTS.
|
||||
"""
|
||||
|
||||
start_time: float
|
||||
end_time: float
|
||||
language: Language = Language.EN
|
||||
is_eos: bool = False
|
||||
is_final: bool = False
|
||||
attaches_to: str = ""
|
||||
content: str = ""
|
||||
speaker: Optional[str] = None
|
||||
confidence: float = 1.0
|
||||
result: Optional[Any] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class SpeakerFragments:
|
||||
"""SpeechFragment items grouped by speaker_id.
|
||||
|
||||
Parameters:
|
||||
speaker_id: The ID of the speaker.
|
||||
timestamp: The timestamp of the frame.
|
||||
language: The language of the frame.
|
||||
fragments: The list of SpeechFragment items.
|
||||
"""
|
||||
|
||||
speaker_id: Optional[str] = None
|
||||
timestamp: Optional[str] = None
|
||||
language: Optional[Language] = None
|
||||
fragments: list[SpeechFragment] = field(default_factory=list)
|
||||
|
||||
def __str__(self):
|
||||
"""Return a string representation of the object."""
|
||||
return f"SpeakerFragments(speaker_id: {self.speaker_id}, timestamp: {self.timestamp}, language: {self.language}, text: {self._format_text()})"
|
||||
|
||||
def _format_text(self, format: Optional[str] = None) -> str:
|
||||
"""Wrap text with speaker ID in an optional f-string format.
|
||||
|
||||
Args:
|
||||
format: Format to wrap the text with.
|
||||
|
||||
Returns:
|
||||
str: The wrapped text.
|
||||
"""
|
||||
# Cumulative contents
|
||||
content = ""
|
||||
|
||||
# Assemble the text
|
||||
for frag in self.fragments:
|
||||
if content == "" or frag.attaches_to == "previous":
|
||||
content += frag.content
|
||||
else:
|
||||
content += " " + frag.content
|
||||
|
||||
# Format the text, if format is provided
|
||||
if format is None or self.speaker_id is None:
|
||||
return content
|
||||
return format.format(**{"speaker_id": self.speaker_id, "text": content})
|
||||
|
||||
def _as_frame_attributes(self, format: Optional[str] = None) -> dict[str, Any]:
|
||||
"""Return a dictionary of attributes for a TranscriptionFrame.
|
||||
|
||||
Args:
|
||||
format: Format to wrap the text with.
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: The dictionary of attributes.
|
||||
"""
|
||||
return {
|
||||
"text": self._format_text(format),
|
||||
"user_id": self.speaker_id,
|
||||
"timestamp": self.timestamp,
|
||||
"language": self.language,
|
||||
"result": [frag.result for frag in self.fragments],
|
||||
}
|
||||
|
||||
|
||||
class SpeechmaticsSTTService(STTService):
|
||||
"""Speechmatics STT service implementation.
|
||||
|
||||
This service provides real-time speech-to-text transcription using the Speechmatics API.
|
||||
It supports partial and final transcriptions, multiple languages, various audio formats,
|
||||
and speaker diarization.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
language: Optional[Language] = None,
|
||||
language_code: Optional[str] = None,
|
||||
base_url: str = "wss://eu2.rt.speechmatics.com/v2",
|
||||
domain: Optional[str] = None,
|
||||
output_locale: Optional[Language] = None,
|
||||
output_locale_code: Optional[str] = None,
|
||||
enable_partials: bool = True,
|
||||
max_delay: float = 1.5,
|
||||
sample_rate: Optional[int] = 16000,
|
||||
chunk_size: int = 256,
|
||||
audio_encoding: AudioEncoding = AudioEncoding.PCM_S16LE,
|
||||
end_of_utterance_silence_trigger: float = 0.5,
|
||||
operating_point: OperatingPoint = OperatingPoint.ENHANCED,
|
||||
enable_speaker_diarization: bool = False,
|
||||
text_format: str = "<{speaker_id}>{text}</{speaker_id}>",
|
||||
max_speakers: Optional[int] = None,
|
||||
transcription_config: Optional[TranscriptionConfig] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the Speechmatics STT service.
|
||||
|
||||
Args:
|
||||
api_key: Speechmatics API key for authentication.
|
||||
language: Language code for transcription. Defaults to `None`.
|
||||
language_code: Language code string for transcription. Defaults to `None`.
|
||||
base_url: Base URL for Speechmatics API. Defaults to `wss://eu2.rt.speechmatics.com/v2`.
|
||||
domain: Domain for Speechmatics API. Defaults to `None`.
|
||||
output_locale: Output locale for transcription, e.g. `Language.EN_GB`. Defaults to `None`.
|
||||
output_locale_code: Output locale code for transcription. Defaults to `None`.
|
||||
enable_partials: Enable partial transcription results. Defaults to `True`.
|
||||
max_delay: Maximum delay for transcription in seconds. Defaults to `1.5`.
|
||||
sample_rate: Audio sample rate in Hz. Defaults to `16000`.
|
||||
chunk_size: Audio chunk size for streaming. Defaults to `256`.
|
||||
audio_encoding: Audio encoding format. Defaults to `pcm_s16le`.
|
||||
end_of_utterance_silence_trigger: Silence duration in seconds to trigger end of utterance detection. Defaults to `0.5`.
|
||||
operating_point: Operating point for transcription accuracy vs. latency tradeoff. Defaults to `enhanced`.
|
||||
enable_speaker_diarization: Enable speaker diarization to identify different speakers. Defaults to `False`.
|
||||
text_format: Wrapper for speaker ID. Defaults to `<{speaker_id}>{text}</{speaker_id}>`.
|
||||
max_speakers: Maximum number of speakers to detect. Defaults to `None` (auto-detect).
|
||||
transcription_config: Custom transcription configuration (other set parameters are merged). Defaults to `None`.
|
||||
**kwargs: Additional arguments passed to STTService.
|
||||
"""
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
# Client configuration
|
||||
self._api_key: str = api_key
|
||||
self._language: Optional[Language] = language
|
||||
self._language_code: Optional[str] = language_code
|
||||
self._base_url: str = base_url
|
||||
self._domain: Optional[str] = domain
|
||||
self._output_locale: Optional[Language] = output_locale
|
||||
self._output_locale_code: Optional[str] = output_locale_code
|
||||
self._enable_partials: bool = enable_partials
|
||||
self._max_delay: float = max_delay
|
||||
self._sample_rate: int = sample_rate
|
||||
self._chunk_size: int = chunk_size
|
||||
self._audio_encoding: AudioEncoding = audio_encoding
|
||||
self._end_of_utterance_silence_trigger: Optional[float] = end_of_utterance_silence_trigger
|
||||
self._operating_point: OperatingPoint = operating_point
|
||||
self._enable_speaker_diarization: bool = enable_speaker_diarization
|
||||
self._text_format: str = text_format
|
||||
self._max_speakers: Optional[int] = max_speakers
|
||||
|
||||
# Check we have required attributes
|
||||
if not self._api_key:
|
||||
raise ValueError("Missing Speechmatics API key")
|
||||
if not self._base_url:
|
||||
raise ValueError("Missing Speechmatics base URL")
|
||||
|
||||
# Validate the language code
|
||||
if self._language and self._language_code:
|
||||
raise ValueError("Language and language code cannot both be specified")
|
||||
elif self._language:
|
||||
self._language_code = _language_to_speechmatics_language(self._language)
|
||||
|
||||
# Validate the output locale code
|
||||
if self._output_locale and self._output_locale_code:
|
||||
raise ValueError("Output locale and output locale code cannot both be specified")
|
||||
elif self._output_locale:
|
||||
self._output_locale_code = _locale_to_speechmatics_locale(
|
||||
self._language_code, self._output_locale
|
||||
)
|
||||
|
||||
# Complete configuration objects
|
||||
self._transcription_config: TranscriptionConfig = None
|
||||
self._process_config(transcription_config)
|
||||
|
||||
# STT client
|
||||
self._client: Optional[AsyncClient] = None
|
||||
self._client_task: Optional[asyncio.Task] = None
|
||||
self._audio_buffer: AudioBuffer = AudioBuffer(maxsize=10)
|
||||
self._start_time: Optional[datetime.datetime] = None
|
||||
|
||||
# Current utterance speech data
|
||||
self._speech_fragments: list[SpeechFragment] = []
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Called when the new session starts."""
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
"""Called when the session ends."""
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
"""Called when the session is cancelled."""
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Adds audio to the audio buffer and yields None."""
|
||||
self._audio_buffer.write_audio(audio)
|
||||
yield None
|
||||
|
||||
async def _run_client(self) -> None:
|
||||
"""Runs the Speechmatics client in a thread."""
|
||||
await self._client.transcribe(
|
||||
self._audio_buffer,
|
||||
transcription_config=self._transcription_config,
|
||||
audio_format=AudioFormat(
|
||||
encoding=self._audio_encoding,
|
||||
sample_rate=self.sample_rate,
|
||||
chunk_size=self._chunk_size,
|
||||
),
|
||||
)
|
||||
|
||||
async def _connect(self) -> None:
|
||||
"""Connect to the STT service."""
|
||||
# Create new STT RT client
|
||||
self._client = AsyncClient(
|
||||
api_key=self._api_key,
|
||||
url=_get_endpoint_url(self._base_url),
|
||||
)
|
||||
|
||||
# Log the event
|
||||
logger.debug("Connected to Speechmatics STT service")
|
||||
|
||||
# Recognition started event
|
||||
@self._client.on(ServerMessageType.RECOGNITION_STARTED)
|
||||
def _evt_on_recognition_started(message: dict[str, Any]):
|
||||
logger.debug(f"Recognition started (session: {message.get('id')})")
|
||||
self._start_time = datetime.datetime.now(datetime.timezone.utc)
|
||||
|
||||
# Partial transcript event
|
||||
@self._client.on(ServerMessageType.ADD_PARTIAL_TRANSCRIPT)
|
||||
def _evt_on_partial_transcript(message: dict[str, Any]):
|
||||
self._handle_transcript(message, is_final=False)
|
||||
|
||||
# Final transcript event
|
||||
@self._client.on(ServerMessageType.ADD_TRANSCRIPT)
|
||||
def _evt_on_final_transcript(message: dict[str, Any]):
|
||||
self._handle_transcript(message, is_final=True)
|
||||
|
||||
# End of Utterance
|
||||
@self._client.on(ServerMessageType.END_OF_UTTERANCE)
|
||||
def _evt_on_end_of_utterance(message: dict[str, Any]):
|
||||
logger.debug("End of utterance received from STT")
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._send_frames(finalized=True), self.get_event_loop()
|
||||
)
|
||||
|
||||
# Start the client in a thread
|
||||
self._client_task = self.create_task(self._run_client())
|
||||
|
||||
async def _disconnect(self) -> None:
|
||||
"""Disconnect from the STT service."""
|
||||
# Stop the audio buffer
|
||||
self._audio_buffer.stop()
|
||||
|
||||
# Disconnect the client
|
||||
try:
|
||||
if self._client:
|
||||
await asyncio.wait_for(self._client.close(), timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Timeout while closing Speechmatics client connection")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing Speechmatics client: {e}")
|
||||
finally:
|
||||
self._client = None
|
||||
|
||||
# Cancel the client task
|
||||
if self._client_task:
|
||||
await self.cancel_task(self._client_task)
|
||||
self._client_task = None
|
||||
|
||||
# Log the event
|
||||
logger.debug("Disconnected from Speechmatics STT service")
|
||||
|
||||
def _process_config(self, transcription_config: Optional[TranscriptionConfig] = None) -> None:
|
||||
"""Create a formatted STT transcription config.
|
||||
|
||||
This takes an optional TranscriptionConfig object and populates it with the
|
||||
values from the STT service. Individual parameters take priority over those
|
||||
within the config object.
|
||||
|
||||
Args:
|
||||
transcription_config: Optional transcription config to use.
|
||||
"""
|
||||
# Transcription config
|
||||
if not transcription_config:
|
||||
transcription_config = TranscriptionConfig(
|
||||
language=self._language_code or "en",
|
||||
domain=self._domain,
|
||||
output_locale=self._output_locale_code,
|
||||
operating_point=self._operating_point,
|
||||
diarization="speaker" if self._enable_speaker_diarization else None,
|
||||
enable_partials=self._enable_partials,
|
||||
max_delay=self._max_delay or 2.0,
|
||||
)
|
||||
else:
|
||||
if self._language_code:
|
||||
transcription_config.language = self._language_code
|
||||
if self._domain:
|
||||
transcription_config.domain = self._domain
|
||||
if self._output_locale_code:
|
||||
transcription_config.output_locale = self._output_locale_code
|
||||
if self._operating_point:
|
||||
transcription_config.operating_point = self._operating_point
|
||||
if self._enable_speaker_diarization:
|
||||
transcription_config.diarization = "speaker"
|
||||
if self._enable_partials:
|
||||
transcription_config.enable_partials = self._enable_partials
|
||||
if self._max_delay:
|
||||
transcription_config.max_delay = self._max_delay
|
||||
|
||||
# Diarization
|
||||
if self._enable_speaker_diarization and self._max_speakers:
|
||||
transcription_config.speaker_diarization_config = SpeakerDiarizationConfig(
|
||||
max_speakers=self._max_speakers,
|
||||
)
|
||||
|
||||
# End of Utterance
|
||||
if self._end_of_utterance_silence_trigger:
|
||||
transcription_config.conversation_config = ConversationConfig(
|
||||
end_of_utterance_silence_trigger=self._end_of_utterance_silence_trigger,
|
||||
)
|
||||
|
||||
# Set config
|
||||
self._transcription_config = transcription_config
|
||||
|
||||
def _handle_transcript(self, message: dict[str, Any], is_final: bool) -> None:
|
||||
"""Handle the partial and final transcript events.
|
||||
|
||||
Args:
|
||||
message: The new Partial or Final from the STT engine.
|
||||
is_final: Whether the data is final or partial.
|
||||
"""
|
||||
# Add the speech fragments
|
||||
has_changed = self._add_speech_fragments(
|
||||
message=message,
|
||||
is_final=is_final,
|
||||
)
|
||||
|
||||
# Skip if unchanged
|
||||
if not has_changed:
|
||||
return
|
||||
|
||||
# Send frames
|
||||
asyncio.run_coroutine_threadsafe(self._send_frames(), self.get_event_loop())
|
||||
|
||||
@traced_stt
|
||||
async def _handle_transcription(
|
||||
self, transcript: str, is_final: bool, language: Optional[Language] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
pass
|
||||
|
||||
async def _send_frames(self, finalized: bool = False) -> None:
|
||||
"""Send frames to the pipeline.
|
||||
|
||||
Send speech frames to the pipeline. If VAD is enabled, then this will
|
||||
also send an interruption and user started speaking frames. When the
|
||||
final transcript is received, then this will send a user stopped speaking
|
||||
and stop interruption frames.
|
||||
|
||||
Args:
|
||||
finalized: Whether the data is final or partial.
|
||||
"""
|
||||
# Get speech frames (InterimTranscriptionFrame)
|
||||
speech_frames = self._get_frames_from_fragments()
|
||||
|
||||
# Skip if no frames
|
||||
if not speech_frames:
|
||||
return
|
||||
|
||||
# If final, then re=parse into TranscriptionFrame
|
||||
if finalized:
|
||||
# Reset the speech fragments
|
||||
self._speech_fragments.clear()
|
||||
|
||||
# Transform frames
|
||||
frames = [
|
||||
TranscriptionFrame(**frame._as_frame_attributes(self._text_format))
|
||||
for frame in speech_frames
|
||||
]
|
||||
|
||||
# Log transcript(s)
|
||||
logger.debug(f"Finalized transcript: {[f.text for f in frames]}")
|
||||
|
||||
# Return as interim results
|
||||
else:
|
||||
frames = [
|
||||
InterimTranscriptionFrame(**frame._as_frame_attributes()) for frame in speech_frames
|
||||
]
|
||||
|
||||
# Send the frames back to pipecat
|
||||
for frame in frames:
|
||||
await self._handle_transcription(
|
||||
transcript=frame.text,
|
||||
is_final=finalized,
|
||||
language=frame.language,
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
|
||||
def _add_speech_fragments(self, message: dict[str, Any], is_final: bool = False) -> bool:
|
||||
"""Takes a new Partial or Final from the STT engine.
|
||||
|
||||
Accumulates it into the _speech_data list. As new final data is added, all
|
||||
partials are removed from the list.
|
||||
|
||||
Note: If a known speaker is `__[A-Z0-9_]{2,}__`, then the words are skipped,
|
||||
as this is used to protect against self-interruption by the assistant or to
|
||||
block out specific known voices.
|
||||
|
||||
Args:
|
||||
message: The new Partial or Final from the STT engine.
|
||||
is_final: Whether the data is final or partial.
|
||||
|
||||
Returns:
|
||||
bool: True if the speech data was updated, False otherwise.
|
||||
"""
|
||||
# Parsed new speech data from the STT engine
|
||||
fragments: list[SpeechFragment] = []
|
||||
|
||||
# Current length of the speech data
|
||||
current_length = len(self._speech_fragments)
|
||||
|
||||
# Iterate over the results in the payload
|
||||
for result in message.get("results", []):
|
||||
alt = result.get("alternatives", [{}])[0]
|
||||
if alt.get("content", None):
|
||||
# Create the new fragment
|
||||
fragment = SpeechFragment(
|
||||
start_time=result.get("start_time", 0),
|
||||
end_time=result.get("end_time", 0),
|
||||
language=alt.get("language", Language.EN),
|
||||
is_eos=alt.get("is_eos", False),
|
||||
is_final=is_final,
|
||||
attaches_to=result.get("attaches_to", ""),
|
||||
content=alt.get("content", ""),
|
||||
speaker=alt.get("speaker", None),
|
||||
confidence=alt.get("confidence", 1.0),
|
||||
result=result,
|
||||
)
|
||||
|
||||
# Drop `__XX__` speakers
|
||||
if fragment.speaker and re.match(r"^__[A-Z0-9_]{2,}__$", fragment.speaker):
|
||||
continue
|
||||
|
||||
# Add the fragment
|
||||
fragments.append(fragment)
|
||||
|
||||
# Remove existing partials, as new partials and finals are provided
|
||||
self._speech_fragments = [frag for frag in self._speech_fragments if frag.is_final]
|
||||
|
||||
# Return if no new fragments and length of the existing data is unchanged
|
||||
if not fragments and len(self._speech_fragments) == current_length:
|
||||
return False
|
||||
|
||||
# Add the fragments to the speech data
|
||||
self._speech_fragments.extend(fragments)
|
||||
|
||||
# Data was updated
|
||||
return True
|
||||
|
||||
def _get_frames_from_fragments(self) -> list[SpeakerFragments]:
|
||||
"""Get speech data objects for the current fragment list.
|
||||
|
||||
Each speech fragments is grouped by contiguous speaker and then
|
||||
returned as internal SpeakerFragments objects with the `speaker_id` field
|
||||
set to the current speaker (string). An utterance may contain speech from
|
||||
more than one speaker (e.g. S1, S2, S1, S3, ...), so they are kept
|
||||
in strict order for the context of the conversation.
|
||||
|
||||
Returns:
|
||||
list[SpeakerFragments]: The list of objects.
|
||||
"""
|
||||
# Speaker groups
|
||||
current_speaker: str | None = None
|
||||
speaker_groups: list[list[SpeechFragment]] = [[]]
|
||||
|
||||
# Group by speakers
|
||||
for frag in self._speech_fragments:
|
||||
if frag.speaker != current_speaker:
|
||||
current_speaker = frag.speaker
|
||||
if speaker_groups[-1]:
|
||||
speaker_groups.append([])
|
||||
speaker_groups[-1].append(frag)
|
||||
|
||||
# Create SpeakerFragments objects
|
||||
speaker_fragments: list[SpeakerFragments] = []
|
||||
for group in speaker_groups:
|
||||
sd = self._get_speaker_fragments_from_fragment_group(group)
|
||||
if sd:
|
||||
speaker_fragments.append(sd)
|
||||
|
||||
# Return the grouped SpeakerFragments objects
|
||||
return speaker_fragments
|
||||
|
||||
def _get_speaker_fragments_from_fragment_group(
|
||||
self,
|
||||
group: list[SpeechFragment],
|
||||
) -> SpeakerFragments | None:
|
||||
"""Take a group of fragments and piece together into SpeakerFragments.
|
||||
|
||||
Each fragment for a given speaker is assembled into a string,
|
||||
taking into consideration whether words are attached to the
|
||||
previous or next word (notably punctuation). This ensures that
|
||||
the text does not have extra spaces. This will also check for
|
||||
any straggling punctuation from earlier utterances that should
|
||||
be removed.
|
||||
|
||||
Args:
|
||||
group: List of SpeechFragment objects.
|
||||
|
||||
Returns:
|
||||
SpeakerFragments: The object for the group.
|
||||
"""
|
||||
# Check for starting fragments that are attached to previous
|
||||
if group and group[0].attaches_to == "previous":
|
||||
group = group[1:]
|
||||
|
||||
# Check for trailing fragments that are attached to next
|
||||
if group and group[-1].attaches_to == "next":
|
||||
group = group[:-1]
|
||||
|
||||
# Check there are results
|
||||
if not group:
|
||||
return None
|
||||
|
||||
# Get the timing extremes
|
||||
start_time = min(frag.start_time for frag in group)
|
||||
|
||||
# Timestamp
|
||||
ts = (self._start_time + datetime.timedelta(seconds=start_time)).isoformat(
|
||||
timespec="milliseconds"
|
||||
)
|
||||
|
||||
# Return the SpeakerFragments object
|
||||
return SpeakerFragments(
|
||||
speaker_id=group[0].speaker,
|
||||
timestamp=ts,
|
||||
language=group[0].language,
|
||||
fragments=group,
|
||||
)
|
||||
|
||||
|
||||
def _get_endpoint_url(url: str) -> str:
|
||||
"""Format the endpoint URL with the SDK and app versions.
|
||||
|
||||
Args:
|
||||
url: The base URL for the endpoint.
|
||||
|
||||
Returns:
|
||||
str: The formatted endpoint URL.
|
||||
"""
|
||||
query_params = dict()
|
||||
query_params["sm-app"] = f"pipecat/{__version__}"
|
||||
query = urlencode(query_params)
|
||||
|
||||
return f"{url}?{query}"
|
||||
|
||||
|
||||
def _language_to_speechmatics_language(language: Language) -> str:
|
||||
"""Convert a Language enum to a Speechmatics language code.
|
||||
|
||||
Args:
|
||||
language: The Language enum to convert.
|
||||
|
||||
Returns:
|
||||
str: The Speechmatics language code, if found.
|
||||
"""
|
||||
# List of supported input languages
|
||||
BASE_LANGUAGES = {
|
||||
Language.AR: "ar",
|
||||
Language.BA: "ba",
|
||||
Language.EU: "eu",
|
||||
Language.BE: "be",
|
||||
Language.BG: "bg",
|
||||
Language.BN: "bn",
|
||||
Language.YUE: "yue",
|
||||
Language.CA: "ca",
|
||||
Language.HR: "hr",
|
||||
Language.CS: "cs",
|
||||
Language.DA: "da",
|
||||
Language.NL: "nl",
|
||||
Language.EN: "en",
|
||||
Language.EO: "eo",
|
||||
Language.ET: "et",
|
||||
Language.FA: "fa",
|
||||
Language.FI: "fi",
|
||||
Language.FR: "fr",
|
||||
Language.GL: "gl",
|
||||
Language.DE: "de",
|
||||
Language.EL: "el",
|
||||
Language.HE: "he",
|
||||
Language.HI: "hi",
|
||||
Language.HU: "hu",
|
||||
Language.IT: "it",
|
||||
Language.ID: "id",
|
||||
Language.GA: "ga",
|
||||
Language.JA: "ja",
|
||||
Language.KO: "ko",
|
||||
Language.LV: "lv",
|
||||
Language.LT: "lt",
|
||||
Language.MS: "ms",
|
||||
Language.MT: "mt",
|
||||
Language.CMN: "cmn",
|
||||
Language.MR: "mr",
|
||||
Language.MN: "mn",
|
||||
Language.NO: "no",
|
||||
Language.PL: "pl",
|
||||
Language.PT: "pt",
|
||||
Language.RO: "ro",
|
||||
Language.RU: "ru",
|
||||
Language.SK: "sk",
|
||||
Language.SL: "sl",
|
||||
Language.ES: "es",
|
||||
Language.SV: "sv",
|
||||
Language.SW: "sw",
|
||||
Language.TA: "ta",
|
||||
Language.TH: "th",
|
||||
Language.TR: "tr",
|
||||
Language.UG: "ug",
|
||||
Language.UK: "uk",
|
||||
Language.UR: "ur",
|
||||
Language.VI: "vi",
|
||||
Language.CY: "cy",
|
||||
}
|
||||
|
||||
# Get the language code
|
||||
result = BASE_LANGUAGES.get(language)
|
||||
|
||||
# Fail if language is not supported
|
||||
if not result:
|
||||
raise ValueError(f"Unsupported language: {language}")
|
||||
|
||||
# Return the language code
|
||||
return result
|
||||
|
||||
|
||||
def _locale_to_speechmatics_locale(language_code: str, locale: Language) -> Optional[str]:
|
||||
"""Convert a Language enum to a Speechmatics language code.
|
||||
|
||||
Args:
|
||||
language_code: The language code.
|
||||
locale: The Language enum to convert.
|
||||
|
||||
Returns:
|
||||
str: The Speechmatics language code, if found.
|
||||
"""
|
||||
# Languages and output locales
|
||||
LOCALES = {
|
||||
"en": {
|
||||
Language.EN_GB: "en-GB",
|
||||
Language.EN_US: "en-US",
|
||||
Language.EN_AU: "en-AU",
|
||||
},
|
||||
}
|
||||
|
||||
# Get the locale code
|
||||
result = LOCALES.get(language_code, {}).get(locale)
|
||||
|
||||
# Fail if locale is not supported
|
||||
if not result:
|
||||
logger.warning(f"Unsupported output locale: {locale}, defaulting to {language_code}")
|
||||
|
||||
# Return the locale code
|
||||
return result
|
||||
@@ -145,6 +145,9 @@ class Language(StrEnum):
|
||||
EN_US = "en-US"
|
||||
EN_ZA = "en-ZA"
|
||||
|
||||
# Esperanto
|
||||
EO = "eo"
|
||||
|
||||
# Spanish
|
||||
ES = "es"
|
||||
ES_AR = "es-AR"
|
||||
@@ -474,6 +477,9 @@ class Language(StrEnum):
|
||||
# Tatar
|
||||
TT = "tt"
|
||||
|
||||
# Uyghur
|
||||
UG = "ug"
|
||||
|
||||
# Ukrainian
|
||||
UK = "uk"
|
||||
UK_UA = "uk-UA"
|
||||
|
||||
@@ -214,7 +214,7 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
|
||||
]
|
||||
|
||||
# All the InputDTMFFrames plus one TranscriptionFrame
|
||||
expected_down_frames = [InputDTMFFrame] * 12 + [TranscriptionFrame]
|
||||
expected_down_frames = [InputDTMFFrame] * len(frames_to_send) + [TranscriptionFrame]
|
||||
|
||||
received_down_frames, _ = await run_test(
|
||||
aggregator,
|
||||
|
||||
Reference in New Issue
Block a user