Speechmatics STT + multi-speaker conversations (#2036)

* initial config

* skeleton

* Added a README (to be added to).

* Payloads coming from the ASR.

* doc update

* handle the partials and finals

* enable diarization in the example

* support sending messages to pipecat pipeline

* requirements fix in README

* updated example (with amusement)

* updated example to match master

* updated docs

* support for diarization tags

* logic fix for wrapper

* Use an internal SpeechFrame for speaker_id (not user_id).

* only include speaker tags on finalised transcript (as this may skew end of utterance detection)

* updated docs

* correction to docs and updated example

* updated requirement

* Fix for using default EU server.

* Updates from PR comments.

* Refactor based on comments in the original PR.

Primary focus on documentation, naming conventions and how `user_id` is used.

* Check for SMX installed when importing.

* Variable name change

* Comment correction.

* Support for Esporanto and Uyghur

* Impoved language support

* function name change

* Locale fix

* intercept

* interim changes

* pass the pipeline task to the module for adding events to the top of the pipeline

* logging for the pipeline

* Reduce timeout for content aggregator.

* staged update

* testing with Azure

* Updated context (Azure was dropping punctuation) and using better ElevenLabs model.

* Updated to RT 0.3.0 and use OpenAI (not Azure).

* Missing OpenAI import; parameter name change for output locale validation.

* Revert to `0.2.0` of RT SDK.

* fix for assignment of `output_locale_code`.

* update Speechmatics library to 0.3.1

* new transcription example

* updated asyncio task handling

* Updated doc strings

* enable OpenTelemetry logging

* removed import from stt for __init__

* updated examples and default values

* updated examples

* prevent lock up when closing the STT connection
This commit is contained in:
Sam Sykes
2025-07-03 21:25:13 +01:00
committed by GitHub
parent 8f6544efe2
commit 7596d71460
9 changed files with 1085 additions and 13 deletions

View File

@@ -51,19 +51,19 @@ You can connect to Pipecat from any platform using our official SDKs:
## 🧩 Available services
| Category | Services |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova) [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
| Category | Services |
| ------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)

View File

@@ -46,6 +46,7 @@ pipecat-ai[sambanova]
pipecat-ai[silero]
pipecat-ai[simli]
pipecat-ai[soundfile]
pipecat-ai[speechmatics]
pipecat-ai[tavus]
pipecat-ai[together]
# pipecat-ai[ultravox] # Mocked

View File

@@ -109,6 +109,10 @@ MINIMAX_GROUP_ID=...
# Sarvam AI
SARVAM_API_KEY=...
# Speechmatics
SPEECHMATICS_API_KEY=...
# SambaNova
SAMBANOVA_API_KEY=...

View File

@@ -0,0 +1,153 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.openai.base_llm import BaseOpenAILLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
"""Run example using Speechmatics STT.
This example will use diarization within our STT service and output the words spoken by
each individual speaker and wrap them with XML tags for the LLM to process. Note the
instructions in the system context for the LLM. This greatly improves the conversation
experience by allowing the LLM to understand who is speaking in a multi-party call.
If you do not wish to use diarization, then set the `enable_speaker_diarization` parameter
to `False` or omit it altogether. The `text_format` will only be used if diarization is enabled.
By default, this example will use our ENHANCED operating point, which is optimized for
high accuracy. You can change this by setting the `operating_point` parameter to a different
value.
For more information on operating points, see the Speechmatics documentation:
https://docs.speechmatics.com/rt-api-ref
"""
logger.info(f"Starting bot")
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
language=Language.EN,
enable_speaker_diarization=True,
text_format="<{speaker_id}>{text}</{speaker_id}>",
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
model="eleven_turbo_v2_5",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
params=BaseOpenAILLMService.InputParams(temperature=0.75),
)
messages = [
{
"role": "system",
"content": (
"You are a helpful British assistant called Alfred. "
"Your goal is to demonstrate your capabilities in a succinct way. "
"Your output will be converted to audio so don't include special characters in your answers. "
"Always include punctuation in your responses. "
"Give very short replies - do not give longer replies unless strictly necessary. "
"Respond to what the user said in a concise, funny, creative and helpful way. "
"Use `<Sn/>` tags to identify different speakers - do not use tags in your replies."
),
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(
context,
user_params=LLMUserAggregatorParams(aggregation_timeout=0.005),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say a short hello to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -0,0 +1,89 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(audio_in_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True),
"webrtc": lambda: TransportParams(audio_in_enabled=True),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
"""Run example using Speechmatics STT.
This example will use diarization within our STT service and output the words spoken by
each individual speaker and wrap them with XML tags.
If you do not wish to use diarization, then set the `enable_speaker_diarization` parameter
to `False` or omit it altogether. The `text_format` will only be used if diarization is enabled.
By default, this example will use our ENHANCED operating point, which is optimized for
high accuracy. You can change this by setting the `operating_point` parameter to a different
value.
For more information on operating points, see the Speechmatics documentation:
https://docs.speechmatics.com/rt-api-ref
"""
logger.info(f"Starting bot")
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
language=Language.EN,
enable_speaker_diarization=True,
text_format="<{speaker_id}>{text}</{speaker_id}>",
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -87,6 +87,7 @@ remote-smart-turn = []
silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soundfile = [ "soundfile~=0.13.0" ]
speechmatics = [ "speechmatics-rt>=0.3.1" ]
tavus=[]
together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]

View File

@@ -0,0 +1,5 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

View File

@@ -0,0 +1,813 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Speechmatics STT service integration."""
import asyncio
import datetime
import re
from dataclasses import dataclass, field
from typing import Any, AsyncGenerator, Optional
from urllib.parse import urlencode
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
)
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from speechmatics.rt import (
AsyncClient,
AudioEncoding,
AudioFormat,
ConversationConfig,
OperatingPoint,
ServerMessageType,
SpeakerDiarizationConfig,
TranscriptionConfig,
__version__,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Speechmatics, you need to `pip install pipecat-ai[speechmatics]`."
)
raise Exception(f"Missing module: {e}")
class AudioBuffer:
"""Audio buffer for STT clients.
The Python SDK expects audio in a pre-defined number of frames. This
buffer will accumulate the data from the pipeline and provide it to the
STT client in the correct lengths, waiting for the number of frames to
be available.
"""
def __init__(self, maxsize: int = 0):
"""Initialize the audio buffer.
Args:
maxsize: Maximum size of the buffer.
"""
self._queue = asyncio.Queue(maxsize=maxsize)
self._current_chunk = b""
self._position = 0
self._closed = False
def write_audio(self, data: bytes) -> None:
"""Write audio data to the buffer (thread-safe).
Args:
data: Audio data to write.
"""
if data:
try:
self._queue.put_nowait(data)
except asyncio.QueueFull:
pass
async def read(self, size: int) -> bytes:
"""Read exactly `size` bytes from the buffer (thread-safe).
This process will block until the required number of bytes are available
in the buffer. Audio is received from the pipeline in varying sizes, so
this buffer will accumulate the data and provide it to the STT client in
the correct lengths, waiting for the number of frames to be available.
Calling stop() will close the buffer and release the blocking read
process.
Args:
size: Number of bytes to read.
Returns:
bytes: Audio data read from the buffer.
"""
result = b""
bytes_needed = size
while bytes_needed > 0 and not self._closed:
# Use data from current chunk if available
if self._position < len(self._current_chunk):
available = len(self._current_chunk) - self._position
take = min(bytes_needed, available)
result += self._current_chunk[self._position : self._position + take]
self._position += take
bytes_needed -= take
continue
# Get next chunk
try:
chunk = await asyncio.wait_for(self._queue.get(), timeout=0.1)
if chunk is None:
continue
self._current_chunk = chunk
self._position = 0
except asyncio.TimeoutError:
await asyncio.sleep(0)
continue
return result
def stop(self) -> None:
"""Close the audio buffer."""
self._closed = True
@dataclass
class SpeechFragment:
"""Fragment of an utterance.
Parameters:
start_time: Start time of the fragment in seconds (from session start).
end_time: End time of the fragment in seconds (from session start).
language: Language of the fragment. Defaults to `Language.EN`.
is_eos: Whether the fragment is the end of a sentence. Defaults to `False`.
is_final: Whether the fragment is the final fragment. Defaults to `False`.
attaches_to: Whether the fragment attaches to the previous or next fragment (punctuation). Defaults to empty string.
content: Content of the fragment. Defaults to empty string.
speaker: Speaker of the fragment (if diarization is enabled). Defaults to `None`.
confidence: Confidence of the fragment (0.0 to 1.0). Defaults to `1.0`.
result: Raw result of the fragment from the TTS.
"""
start_time: float
end_time: float
language: Language = Language.EN
is_eos: bool = False
is_final: bool = False
attaches_to: str = ""
content: str = ""
speaker: Optional[str] = None
confidence: float = 1.0
result: Optional[Any] = None
@dataclass
class SpeakerFragments:
"""SpeechFragment items grouped by speaker_id.
Parameters:
speaker_id: The ID of the speaker.
timestamp: The timestamp of the frame.
language: The language of the frame.
fragments: The list of SpeechFragment items.
"""
speaker_id: Optional[str] = None
timestamp: Optional[str] = None
language: Optional[Language] = None
fragments: list[SpeechFragment] = field(default_factory=list)
def __str__(self):
"""Return a string representation of the object."""
return f"SpeakerFragments(speaker_id: {self.speaker_id}, timestamp: {self.timestamp}, language: {self.language}, text: {self._format_text()})"
def _format_text(self, format: Optional[str] = None) -> str:
"""Wrap text with speaker ID in an optional f-string format.
Args:
format: Format to wrap the text with.
Returns:
str: The wrapped text.
"""
# Cumulative contents
content = ""
# Assemble the text
for frag in self.fragments:
if content == "" or frag.attaches_to == "previous":
content += frag.content
else:
content += " " + frag.content
# Format the text, if format is provided
if format is None or self.speaker_id is None:
return content
return format.format(**{"speaker_id": self.speaker_id, "text": content})
def _as_frame_attributes(self, format: Optional[str] = None) -> dict[str, Any]:
"""Return a dictionary of attributes for a TranscriptionFrame.
Args:
format: Format to wrap the text with.
Returns:
dict[str, Any]: The dictionary of attributes.
"""
return {
"text": self._format_text(format),
"user_id": self.speaker_id,
"timestamp": self.timestamp,
"language": self.language,
"result": [frag.result for frag in self.fragments],
}
class SpeechmaticsSTTService(STTService):
"""Speechmatics STT service implementation.
This service provides real-time speech-to-text transcription using the Speechmatics API.
It supports partial and final transcriptions, multiple languages, various audio formats,
and speaker diarization.
"""
def __init__(
self,
*,
api_key: str,
language: Optional[Language] = None,
language_code: Optional[str] = None,
base_url: str = "wss://eu2.rt.speechmatics.com/v2",
domain: Optional[str] = None,
output_locale: Optional[Language] = None,
output_locale_code: Optional[str] = None,
enable_partials: bool = True,
max_delay: float = 1.5,
sample_rate: Optional[int] = 16000,
chunk_size: int = 256,
audio_encoding: AudioEncoding = AudioEncoding.PCM_S16LE,
end_of_utterance_silence_trigger: float = 0.5,
operating_point: OperatingPoint = OperatingPoint.ENHANCED,
enable_speaker_diarization: bool = False,
text_format: str = "<{speaker_id}>{text}</{speaker_id}>",
max_speakers: Optional[int] = None,
transcription_config: Optional[TranscriptionConfig] = None,
**kwargs,
):
"""Initialize the Speechmatics STT service.
Args:
api_key: Speechmatics API key for authentication.
language: Language code for transcription. Defaults to `None`.
language_code: Language code string for transcription. Defaults to `None`.
base_url: Base URL for Speechmatics API. Defaults to `wss://eu2.rt.speechmatics.com/v2`.
domain: Domain for Speechmatics API. Defaults to `None`.
output_locale: Output locale for transcription, e.g. `Language.EN_GB`. Defaults to `None`.
output_locale_code: Output locale code for transcription. Defaults to `None`.
enable_partials: Enable partial transcription results. Defaults to `True`.
max_delay: Maximum delay for transcription in seconds. Defaults to `1.5`.
sample_rate: Audio sample rate in Hz. Defaults to `16000`.
chunk_size: Audio chunk size for streaming. Defaults to `256`.
audio_encoding: Audio encoding format. Defaults to `pcm_s16le`.
end_of_utterance_silence_trigger: Silence duration in seconds to trigger end of utterance detection. Defaults to `0.5`.
operating_point: Operating point for transcription accuracy vs. latency tradeoff. Defaults to `enhanced`.
enable_speaker_diarization: Enable speaker diarization to identify different speakers. Defaults to `False`.
text_format: Wrapper for speaker ID. Defaults to `<{speaker_id}>{text}</{speaker_id}>`.
max_speakers: Maximum number of speakers to detect. Defaults to `None` (auto-detect).
transcription_config: Custom transcription configuration (other set parameters are merged). Defaults to `None`.
**kwargs: Additional arguments passed to STTService.
"""
super().__init__(sample_rate=sample_rate, **kwargs)
# Client configuration
self._api_key: str = api_key
self._language: Optional[Language] = language
self._language_code: Optional[str] = language_code
self._base_url: str = base_url
self._domain: Optional[str] = domain
self._output_locale: Optional[Language] = output_locale
self._output_locale_code: Optional[str] = output_locale_code
self._enable_partials: bool = enable_partials
self._max_delay: float = max_delay
self._sample_rate: int = sample_rate
self._chunk_size: int = chunk_size
self._audio_encoding: AudioEncoding = audio_encoding
self._end_of_utterance_silence_trigger: Optional[float] = end_of_utterance_silence_trigger
self._operating_point: OperatingPoint = operating_point
self._enable_speaker_diarization: bool = enable_speaker_diarization
self._text_format: str = text_format
self._max_speakers: Optional[int] = max_speakers
# Check we have required attributes
if not self._api_key:
raise ValueError("Missing Speechmatics API key")
if not self._base_url:
raise ValueError("Missing Speechmatics base URL")
# Validate the language code
if self._language and self._language_code:
raise ValueError("Language and language code cannot both be specified")
elif self._language:
self._language_code = _language_to_speechmatics_language(self._language)
# Validate the output locale code
if self._output_locale and self._output_locale_code:
raise ValueError("Output locale and output locale code cannot both be specified")
elif self._output_locale:
self._output_locale_code = _locale_to_speechmatics_locale(
self._language_code, self._output_locale
)
# Complete configuration objects
self._transcription_config: TranscriptionConfig = None
self._process_config(transcription_config)
# STT client
self._client: Optional[AsyncClient] = None
self._client_task: Optional[asyncio.Task] = None
self._audio_buffer: AudioBuffer = AudioBuffer(maxsize=10)
self._start_time: Optional[datetime.datetime] = None
# Current utterance speech data
self._speech_fragments: list[SpeechFragment] = []
async def start(self, frame: StartFrame):
"""Called when the new session starts."""
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
"""Called when the session ends."""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Called when the session is cancelled."""
await super().cancel(frame)
await self._disconnect()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Adds audio to the audio buffer and yields None."""
self._audio_buffer.write_audio(audio)
yield None
async def _run_client(self) -> None:
"""Runs the Speechmatics client in a thread."""
await self._client.transcribe(
self._audio_buffer,
transcription_config=self._transcription_config,
audio_format=AudioFormat(
encoding=self._audio_encoding,
sample_rate=self.sample_rate,
chunk_size=self._chunk_size,
),
)
async def _connect(self) -> None:
"""Connect to the STT service."""
# Create new STT RT client
self._client = AsyncClient(
api_key=self._api_key,
url=_get_endpoint_url(self._base_url),
)
# Log the event
logger.debug("Connected to Speechmatics STT service")
# Recognition started event
@self._client.on(ServerMessageType.RECOGNITION_STARTED)
def _evt_on_recognition_started(message: dict[str, Any]):
logger.debug(f"Recognition started (session: {message.get('id')})")
self._start_time = datetime.datetime.now(datetime.timezone.utc)
# Partial transcript event
@self._client.on(ServerMessageType.ADD_PARTIAL_TRANSCRIPT)
def _evt_on_partial_transcript(message: dict[str, Any]):
self._handle_transcript(message, is_final=False)
# Final transcript event
@self._client.on(ServerMessageType.ADD_TRANSCRIPT)
def _evt_on_final_transcript(message: dict[str, Any]):
self._handle_transcript(message, is_final=True)
# End of Utterance
@self._client.on(ServerMessageType.END_OF_UTTERANCE)
def _evt_on_end_of_utterance(message: dict[str, Any]):
logger.debug("End of utterance received from STT")
asyncio.run_coroutine_threadsafe(
self._send_frames(finalized=True), self.get_event_loop()
)
# Start the client in a thread
self._client_task = self.create_task(self._run_client())
async def _disconnect(self) -> None:
"""Disconnect from the STT service."""
# Stop the audio buffer
self._audio_buffer.stop()
# Disconnect the client
try:
if self._client:
await asyncio.wait_for(self._client.close(), timeout=1.0)
except asyncio.TimeoutError:
logger.warning("Timeout while closing Speechmatics client connection")
except Exception as e:
logger.error(f"Error closing Speechmatics client: {e}")
finally:
self._client = None
# Cancel the client task
if self._client_task:
await self.cancel_task(self._client_task)
self._client_task = None
# Log the event
logger.debug("Disconnected from Speechmatics STT service")
def _process_config(self, transcription_config: Optional[TranscriptionConfig] = None) -> None:
"""Create a formatted STT transcription config.
This takes an optional TranscriptionConfig object and populates it with the
values from the STT service. Individual parameters take priority over those
within the config object.
Args:
transcription_config: Optional transcription config to use.
"""
# Transcription config
if not transcription_config:
transcription_config = TranscriptionConfig(
language=self._language_code or "en",
domain=self._domain,
output_locale=self._output_locale_code,
operating_point=self._operating_point,
diarization="speaker" if self._enable_speaker_diarization else None,
enable_partials=self._enable_partials,
max_delay=self._max_delay or 2.0,
)
else:
if self._language_code:
transcription_config.language = self._language_code
if self._domain:
transcription_config.domain = self._domain
if self._output_locale_code:
transcription_config.output_locale = self._output_locale_code
if self._operating_point:
transcription_config.operating_point = self._operating_point
if self._enable_speaker_diarization:
transcription_config.diarization = "speaker"
if self._enable_partials:
transcription_config.enable_partials = self._enable_partials
if self._max_delay:
transcription_config.max_delay = self._max_delay
# Diarization
if self._enable_speaker_diarization and self._max_speakers:
transcription_config.speaker_diarization_config = SpeakerDiarizationConfig(
max_speakers=self._max_speakers,
)
# End of Utterance
if self._end_of_utterance_silence_trigger:
transcription_config.conversation_config = ConversationConfig(
end_of_utterance_silence_trigger=self._end_of_utterance_silence_trigger,
)
# Set config
self._transcription_config = transcription_config
def _handle_transcript(self, message: dict[str, Any], is_final: bool) -> None:
"""Handle the partial and final transcript events.
Args:
message: The new Partial or Final from the STT engine.
is_final: Whether the data is final or partial.
"""
# Add the speech fragments
has_changed = self._add_speech_fragments(
message=message,
is_final=is_final,
)
# Skip if unchanged
if not has_changed:
return
# Send frames
asyncio.run_coroutine_threadsafe(self._send_frames(), self.get_event_loop())
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
pass
async def _send_frames(self, finalized: bool = False) -> None:
"""Send frames to the pipeline.
Send speech frames to the pipeline. If VAD is enabled, then this will
also send an interruption and user started speaking frames. When the
final transcript is received, then this will send a user stopped speaking
and stop interruption frames.
Args:
finalized: Whether the data is final or partial.
"""
# Get speech frames (InterimTranscriptionFrame)
speech_frames = self._get_frames_from_fragments()
# Skip if no frames
if not speech_frames:
return
# If final, then re=parse into TranscriptionFrame
if finalized:
# Reset the speech fragments
self._speech_fragments.clear()
# Transform frames
frames = [
TranscriptionFrame(**frame._as_frame_attributes(self._text_format))
for frame in speech_frames
]
# Log transcript(s)
logger.debug(f"Finalized transcript: {[f.text for f in frames]}")
# Return as interim results
else:
frames = [
InterimTranscriptionFrame(**frame._as_frame_attributes()) for frame in speech_frames
]
# Send the frames back to pipecat
for frame in frames:
await self._handle_transcription(
transcript=frame.text,
is_final=finalized,
language=frame.language,
)
await self.push_frame(frame)
def _add_speech_fragments(self, message: dict[str, Any], is_final: bool = False) -> bool:
"""Takes a new Partial or Final from the STT engine.
Accumulates it into the _speech_data list. As new final data is added, all
partials are removed from the list.
Note: If a known speaker is `__[A-Z0-9_]{2,}__`, then the words are skipped,
as this is used to protect against self-interruption by the assistant or to
block out specific known voices.
Args:
message: The new Partial or Final from the STT engine.
is_final: Whether the data is final or partial.
Returns:
bool: True if the speech data was updated, False otherwise.
"""
# Parsed new speech data from the STT engine
fragments: list[SpeechFragment] = []
# Current length of the speech data
current_length = len(self._speech_fragments)
# Iterate over the results in the payload
for result in message.get("results", []):
alt = result.get("alternatives", [{}])[0]
if alt.get("content", None):
# Create the new fragment
fragment = SpeechFragment(
start_time=result.get("start_time", 0),
end_time=result.get("end_time", 0),
language=alt.get("language", Language.EN),
is_eos=alt.get("is_eos", False),
is_final=is_final,
attaches_to=result.get("attaches_to", ""),
content=alt.get("content", ""),
speaker=alt.get("speaker", None),
confidence=alt.get("confidence", 1.0),
result=result,
)
# Drop `__XX__` speakers
if fragment.speaker and re.match(r"^__[A-Z0-9_]{2,}__$", fragment.speaker):
continue
# Add the fragment
fragments.append(fragment)
# Remove existing partials, as new partials and finals are provided
self._speech_fragments = [frag for frag in self._speech_fragments if frag.is_final]
# Return if no new fragments and length of the existing data is unchanged
if not fragments and len(self._speech_fragments) == current_length:
return False
# Add the fragments to the speech data
self._speech_fragments.extend(fragments)
# Data was updated
return True
def _get_frames_from_fragments(self) -> list[SpeakerFragments]:
"""Get speech data objects for the current fragment list.
Each speech fragments is grouped by contiguous speaker and then
returned as internal SpeakerFragments objects with the `speaker_id` field
set to the current speaker (string). An utterance may contain speech from
more than one speaker (e.g. S1, S2, S1, S3, ...), so they are kept
in strict order for the context of the conversation.
Returns:
list[SpeakerFragments]: The list of objects.
"""
# Speaker groups
current_speaker: str | None = None
speaker_groups: list[list[SpeechFragment]] = [[]]
# Group by speakers
for frag in self._speech_fragments:
if frag.speaker != current_speaker:
current_speaker = frag.speaker
if speaker_groups[-1]:
speaker_groups.append([])
speaker_groups[-1].append(frag)
# Create SpeakerFragments objects
speaker_fragments: list[SpeakerFragments] = []
for group in speaker_groups:
sd = self._get_speaker_fragments_from_fragment_group(group)
if sd:
speaker_fragments.append(sd)
# Return the grouped SpeakerFragments objects
return speaker_fragments
def _get_speaker_fragments_from_fragment_group(
self,
group: list[SpeechFragment],
) -> SpeakerFragments | None:
"""Take a group of fragments and piece together into SpeakerFragments.
Each fragment for a given speaker is assembled into a string,
taking into consideration whether words are attached to the
previous or next word (notably punctuation). This ensures that
the text does not have extra spaces. This will also check for
any straggling punctuation from earlier utterances that should
be removed.
Args:
group: List of SpeechFragment objects.
Returns:
SpeakerFragments: The object for the group.
"""
# Check for starting fragments that are attached to previous
if group and group[0].attaches_to == "previous":
group = group[1:]
# Check for trailing fragments that are attached to next
if group and group[-1].attaches_to == "next":
group = group[:-1]
# Check there are results
if not group:
return None
# Get the timing extremes
start_time = min(frag.start_time for frag in group)
# Timestamp
ts = (self._start_time + datetime.timedelta(seconds=start_time)).isoformat(
timespec="milliseconds"
)
# Return the SpeakerFragments object
return SpeakerFragments(
speaker_id=group[0].speaker,
timestamp=ts,
language=group[0].language,
fragments=group,
)
def _get_endpoint_url(url: str) -> str:
"""Format the endpoint URL with the SDK and app versions.
Args:
url: The base URL for the endpoint.
Returns:
str: The formatted endpoint URL.
"""
query_params = dict()
query_params["sm-app"] = f"pipecat/{__version__}"
query = urlencode(query_params)
return f"{url}?{query}"
def _language_to_speechmatics_language(language: Language) -> str:
"""Convert a Language enum to a Speechmatics language code.
Args:
language: The Language enum to convert.
Returns:
str: The Speechmatics language code, if found.
"""
# List of supported input languages
BASE_LANGUAGES = {
Language.AR: "ar",
Language.BA: "ba",
Language.EU: "eu",
Language.BE: "be",
Language.BG: "bg",
Language.BN: "bn",
Language.YUE: "yue",
Language.CA: "ca",
Language.HR: "hr",
Language.CS: "cs",
Language.DA: "da",
Language.NL: "nl",
Language.EN: "en",
Language.EO: "eo",
Language.ET: "et",
Language.FA: "fa",
Language.FI: "fi",
Language.FR: "fr",
Language.GL: "gl",
Language.DE: "de",
Language.EL: "el",
Language.HE: "he",
Language.HI: "hi",
Language.HU: "hu",
Language.IT: "it",
Language.ID: "id",
Language.GA: "ga",
Language.JA: "ja",
Language.KO: "ko",
Language.LV: "lv",
Language.LT: "lt",
Language.MS: "ms",
Language.MT: "mt",
Language.CMN: "cmn",
Language.MR: "mr",
Language.MN: "mn",
Language.NO: "no",
Language.PL: "pl",
Language.PT: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.SL: "sl",
Language.ES: "es",
Language.SV: "sv",
Language.SW: "sw",
Language.TA: "ta",
Language.TH: "th",
Language.TR: "tr",
Language.UG: "ug",
Language.UK: "uk",
Language.UR: "ur",
Language.VI: "vi",
Language.CY: "cy",
}
# Get the language code
result = BASE_LANGUAGES.get(language)
# Fail if language is not supported
if not result:
raise ValueError(f"Unsupported language: {language}")
# Return the language code
return result
def _locale_to_speechmatics_locale(language_code: str, locale: Language) -> Optional[str]:
"""Convert a Language enum to a Speechmatics language code.
Args:
language_code: The language code.
locale: The Language enum to convert.
Returns:
str: The Speechmatics language code, if found.
"""
# Languages and output locales
LOCALES = {
"en": {
Language.EN_GB: "en-GB",
Language.EN_US: "en-US",
Language.EN_AU: "en-AU",
},
}
# Get the locale code
result = LOCALES.get(language_code, {}).get(locale)
# Fail if locale is not supported
if not result:
logger.warning(f"Unsupported output locale: {locale}, defaulting to {language_code}")
# Return the locale code
return result

View File

@@ -145,6 +145,9 @@ class Language(StrEnum):
EN_US = "en-US"
EN_ZA = "en-ZA"
# Esperanto
EO = "eo"
# Spanish
ES = "es"
ES_AR = "es-AR"
@@ -474,6 +477,9 @@ class Language(StrEnum):
# Tatar
TT = "tt"
# Uyghur
UG = "ug"
# Ukrainian
UK = "uk"
UK_UA = "uk-UA"