Compare commits

...

35 Commits

Author SHA1 Message Date
James Hush
d175e5e5fc Hidden assistant demo 2025-07-07 11:58:03 +08:00
Mark Backman
6eed6ff779 Merge pull request #2147 from pipecat-ai/mb/user-idle-long-function-call
UserIdleProcessor: Account for function calls in progress
2025-07-04 14:11:16 -07:00
Mark Backman
1375211610 UserIdleProcessor: Account for function calls in progress 2025-07-04 14:05:05 -07:00
Mark Backman
4e9369a702 Merge pull request #2149 from pipecat-ai/mb/twilio-hang-up-handling 2025-07-04 12:44:17 -07:00
Mark Backman
f9e8748a96 TwilioFrameSerializer: Handle user hanging up before the serializer 2025-07-04 09:42:16 -07:00
Filipi da Silva Fuchter
20d6bf267a Merge pull request #2146 from pipecat-ai/remove_gemini_duplicated_code
Removing duplicated code inside Gemini.
2025-07-04 11:59:10 -03:00
Filipi Fuchter
b573f9dab2 Removing duplicated code inside Gemini. 2025-07-04 10:57:53 -03:00
Mark Backman
dbc76389d8 Merge pull request #2140 from pipecat-ai/mb/fix-26-imports
Fix: missing import in 26f foundational example
2025-07-03 14:12:54 -07:00
Aleix Conchillo Flaqué
c27f838444 Merge pull request #2124 from pipecat-ai/aleix/frame-processor-no-push-queue
FrameProcessor: remove unnecessary push task
2025-07-03 14:03:05 -07:00
Aleix Conchillo Flaqué
ce84485e26 Merge pull request #2142 from pipecat-ai/aleix/publish-workflow-message
github: update publish message to make it clear
2025-07-03 14:02:51 -07:00
Mark Backman
6cf254e2f9 Fix: missing import in 26f foundational example, update twilio transport_params to FastAPIWebsocketParams 2025-07-03 13:58:18 -07:00
Aleix Conchillo Flaqué
02b63c28a5 FrameProcessor: remove unnecessary push task
When we call `FrameProcessor.push_frame()` we end up calling
`FrameProcessor.queue_frame()` on the next or previous processor which already
uses the input queue and guarantees frame ordering. So, there's no need to have
a two queues next to each other.
2025-07-03 13:57:32 -07:00
Aleix Conchillo Flaqué
57c6ce7ffa github: update publish message to make it clear 2025-07-03 13:55:02 -07:00
Aleix Conchillo Flaqué
2f3272ea2f Merge pull request #2135 from pipecat-ai/aleix/pipecat-0.0.74
update CHANGELOG for 0.0.74
2025-07-03 13:46:00 -07:00
Aleix Conchillo Flaqué
f5c2d57e4b update CHANGELOG for 0.0.74 2025-07-03 13:44:21 -07:00
Aleix Conchillo Flaqué
baa878272d scripts(evals): added 07a-interruptible-speechmatics.py 2025-07-03 13:44:21 -07:00
Aleix Conchillo Flaqué
093285868e scripts(evals): update timeout back to 90 seconds 2025-07-03 13:37:17 -07:00
Filipi da Silva Fuchter
6c9d058ec2 Merge pull request #2139 from pipecat-ai/filipi/changelog_improvements
Mentioning the SpeechmaticsSTTService in the changelog.
2025-07-03 17:36:55 -03:00
Filipi Fuchter
5df7be6892 Mentioning the SpeechmaticsSTTService in the changelog. 2025-07-03 17:35:30 -03:00
Mark Backman
2deca816ae Merge pull request #2137 from pipecat-ai/mb/fish-audio-normalize
FishAudioTTSService: arg cleanup, add new InputParam and arg
2025-07-03 13:29:14 -07:00
Mark Backman
b8d2fceced Merge pull request #2138 from pipecat-ai/mb/fix-google-llm-import-order
GoogleLLMService: Linting fixes
2025-07-03 13:26:32 -07:00
Sam Sykes
7596d71460 Speechmatics STT + multi-speaker conversations (#2036)
* initial config

* skeleton

* Added a README (to be added to).

* Payloads coming from the ASR.

* doc update

* handle the partials and finals

* enable diarization in the example

* support sending messages to pipecat pipeline

* requirements fix in README

* updated example (with amusement)

* updated example to match master

* updated docs

* support for diarization tags

* logic fix for wrapper

* Use an internal SpeechFrame for speaker_id (not user_id).

* only include speaker tags on finalised transcript (as this may skew end of utterance detection)

* updated docs

* correction to docs and updated example

* updated requirement

* Fix for using default EU server.

* Updates from PR comments.

* Refactor based on comments in the original PR.

Primary focus on documentation, naming conventions and how `user_id` is used.

* Check for SMX installed when importing.

* Variable name change

* Comment correction.

* Support for Esporanto and Uyghur

* Impoved language support

* function name change

* Locale fix

* intercept

* interim changes

* pass the pipeline task to the module for adding events to the top of the pipeline

* logging for the pipeline

* Reduce timeout for content aggregator.

* staged update

* testing with Azure

* Updated context (Azure was dropping punctuation) and using better ElevenLabs model.

* Updated to RT 0.3.0 and use OpenAI (not Azure).

* Missing OpenAI import; parameter name change for output locale validation.

* Revert to `0.2.0` of RT SDK.

* fix for assignment of `output_locale_code`.

* update Speechmatics library to 0.3.1

* new transcription example

* updated asyncio task handling

* Updated doc strings

* enable OpenTelemetry logging

* removed import from stt for __init__

* updated examples and default values

* updated examples

* prevent lock up when closing the STT connection
2025-07-03 17:25:13 -03:00
Mark Backman
096067b097 GoogleLLMService: Linting fixes 2025-07-03 13:23:13 -07:00
Mark Backman
ec09505f6b FishAudioTTSService: Add normalize as InputParam, model_id as arg 2025-07-03 13:14:15 -07:00
Mark Backman
251ea756c8 FishTTSService: deprecate model, add reference_id 2025-07-03 12:56:24 -07:00
Aleix Conchillo Flaqué
8f6544efe2 Merge pull request #2133 from pipecat-ai/vp-changelog-fileapi
docs: add changelog line for gemini files api
2025-07-03 11:13:02 -07:00
otaqwawi
6045a8ad8c Add option to change the base URL for Google Generative AI. (#2113)
* Add option to change the base URL for Google Generative AI.
This would be useful to support private instance or gateway of the API

* fix: add proper type hints for http_options in Google LLM service
2025-07-03 11:12:35 -07:00
Aleix Conchillo Flaqué
b184d62634 Merge pull request #2134 from pipecat-ai/aleix/evals-cancel-expired-tasks
cancel expire evals tasks
2025-07-03 10:07:27 -07:00
Aleix Conchillo Flaqué
1a8d512abb scripts(evals): make sure we cancel pending tasks after timeout 2025-07-03 10:01:42 -07:00
vipyne
a62be8ea32 docs: add changelog line for gemini files api 2025-07-03 11:44:34 -05:00
Mark Backman
c230d94ff0 Merge pull request #2125 from pipecat-ai/mb/deprecate-handle-function-call-start
Add docs deprecation for handle_function_call_start
2025-07-03 12:27:17 -04:00
Aleix Conchillo Flaqué
e7b02773f5 Merge pull request #2131 from pipecat-ai/aleix/dtmf-aggregator-dangling-tasks
DtmfAggregator: cancel interruption task to avoid a dangling task
2025-07-03 08:34:50 -07:00
Aleix Conchillo Flaqué
af8b4901d4 DtmfAggregator: cancel interruption task to avoid a dangling task 2025-07-03 08:18:48 -07:00
Aleix Conchillo Flaqué
bf664534cc PipelineTask: cancel idle queue before cancelling task 2025-07-03 08:15:31 -07:00
Mark Backman
4ae045d704 Add docs deprecation for handle_function_call_start 2025-07-02 19:53:48 -07:00
31 changed files with 1275 additions and 224 deletions

View File

@@ -5,7 +5,7 @@ on:
inputs:
gitref:
type: string
description: "what git ref to build"
description: "what git tag to build (e.g. v0.0.74)"
required: true
jobs:

View File

@@ -9,6 +9,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added call hang-up error handling in `TwilioFrameSerializer`, which handles
the case where the user has hung up before the `TwilioFrameSerializer` hangs
up the call.
### Changed
- The `UserIdleProcessor` now handles the scenario where function calls take
longer than the idle timeout duration. This allows you to use the
`UserIdleProcessor` in conjunction with function calls that take a while to
return a result.
### Performance
- Remove unncessary push task in each `FrameProcessor`.
## [0.0.74] - 2025-07-03
### Added
- Added a new STT service, `SpeechmaticsSTTService`. This service provides
real-time speech-to-text transcription using the Speechmatics API. It supports
partial and final transcriptions, multiple languages, various audio formats,
and speaker diarization.
- Added `normalize` and `model_id` to `FishAudioTTSService`.
- Added `http_options` argument to `GoogleLLMService`.
- Added `run_llm` field to `LLMMessagesAppendFrame` and `LLMMessagesUpdateFrame`
frames. If true, a context frame will be pushed triggering the LLM to respond.
@@ -50,9 +78,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
tools = ToolsSchema(standard_tools=[do_something])
```
- `user_id` is now populated in the `TranscriptionFrame` and
`InterimTranscriptionFrame` when using a transport that provides a
`user_id`, like `DailyTransport` or `LiveKitTransport`.
- `user_id` is now populated in the `TranscriptionFrame` and
`InterimTranscriptionFrame` when using a transport that provides a `user_id`,
like `DailyTransport` or `LiveKitTransport`.
- Added `watchdog_coroutine()`. This is a watchdog helper for couroutines. So,
if you have a coroutine that is waiting for a result and that takes a long
@@ -61,6 +89,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `session_token` parameter to `AWSNovaSonicLLMService`.
- Added Gemini Multimodal Live File API for uploading, fetching, listing, and
deleting files. See `26f-gemini-multimodal-live-files-api.py` for example usage.
### Changed
- Updated all the services to use the new `SOXRStreamAudioResampler`, ensuring smooth
@@ -72,7 +103,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue where audio would get stuck in the queue when an interrupt occurs
- Fixed an issue where audio would get stuck in the queue when an interrupt occurs
during Azure TTS synthesis.
- Fixed a race condition that occurs in Python 3.10+ where the task could miss
@@ -80,6 +111,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed a `AWSNovaSonicLLMService` issue introduced in 0.0.72.
### Deprecated
- In `FishAudioTTSService`, deprecated `model` and replaced with
`reference_id`. This change is to better align with Fish Audio's variable
naming and to reduce confusion about what functionality the variable
controls.
## [0.0.73] - 2025-06-26
### Fixed

View File

@@ -51,19 +51,19 @@ You can connect to Pipecat from any platform using our official SDKs:
## 🧩 Available services
| Category | Services |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova) [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
| Category | Services |
| ------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)

View File

@@ -46,6 +46,7 @@ pipecat-ai[sambanova]
pipecat-ai[silero]
pipecat-ai[simli]
pipecat-ai[soundfile]
pipecat-ai[speechmatics]
pipecat-ai[tavus]
pipecat-ai[together]
# pipecat-ai[ultravox] # Mocked

View File

@@ -109,6 +109,10 @@ MINIMAX_GROUP_ID=...
# Sarvam AI
SARVAM_API_KEY=...
# Speechmatics
SPEECHMATICS_API_KEY=...
# SambaNova
SAMBANOVA_API_KEY=...

View File

@@ -0,0 +1,153 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.openai.base_llm import BaseOpenAILLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
"""Run example using Speechmatics STT.
This example will use diarization within our STT service and output the words spoken by
each individual speaker and wrap them with XML tags for the LLM to process. Note the
instructions in the system context for the LLM. This greatly improves the conversation
experience by allowing the LLM to understand who is speaking in a multi-party call.
If you do not wish to use diarization, then set the `enable_speaker_diarization` parameter
to `False` or omit it altogether. The `text_format` will only be used if diarization is enabled.
By default, this example will use our ENHANCED operating point, which is optimized for
high accuracy. You can change this by setting the `operating_point` parameter to a different
value.
For more information on operating points, see the Speechmatics documentation:
https://docs.speechmatics.com/rt-api-ref
"""
logger.info(f"Starting bot")
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
language=Language.EN,
enable_speaker_diarization=True,
text_format="<{speaker_id}>{text}</{speaker_id}>",
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
model="eleven_turbo_v2_5",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
params=BaseOpenAILLMService.InputParams(temperature=0.75),
)
messages = [
{
"role": "system",
"content": (
"You are a helpful British assistant called Alfred. "
"Your goal is to demonstrate your capabilities in a succinct way. "
"Your output will be converted to audio so don't include special characters in your answers. "
"Always include punctuation in your responses. "
"Give very short replies - do not give longer replies unless strictly necessary. "
"Respond to what the user said in a concise, funny, creative and helpful way. "
"Use `<Sn/>` tags to identify different speakers - do not use tags in your replies."
),
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(
context,
user_params=LLMUserAggregatorParams(aggregation_timeout=0.005),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say a short hello to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -35,7 +35,7 @@ transport_params = {
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: TransportParams(
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),

View File

@@ -0,0 +1,89 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(audio_in_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True),
"webrtc": lambda: TransportParams(audio_in_enabled=True),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
"""Run example using Speechmatics STT.
This example will use diarization within our STT service and output the words spoken by
each individual speaker and wrap them with XML tags.
If you do not wish to use diarization, then set the `enable_speaker_diarization` parameter
to `False` or omit it altogether. The `text_format` will only be used if diarization is enabled.
By default, this example will use our ENHANCED operating point, which is optimized for
high accuracy. You can change this by setting the `operating_point` parameter to a different
value.
For more information on operating points, see the Speechmatics documentation:
https://docs.speechmatics.com/rt-api-ref
"""
logger.info(f"Starting bot")
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
language=Language.EN,
enable_speaker_diarization=True,
text_format="<{speaker_id}>{text}</{speaker_id}>",
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)

View File

@@ -42,7 +42,7 @@ transport_params = {
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: TransportParams(
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),

View File

@@ -33,7 +33,7 @@ transport_params = {
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: TransportParams(
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),

View File

@@ -55,7 +55,7 @@ transport_params = {
# endpointing, for now.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"twilio": lambda: TransportParams(
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
# set stop_secs to something roughly similar to the internal setting

View File

@@ -18,10 +18,10 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.gemini_multimodal_live.gemini import (
GeminiMultimodalLiveContext,
GeminiMultimodalLiveLLMService,
)
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)

View File

@@ -24,6 +24,7 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
from pipecat.utils.tracing.setup import setup_tracing
@@ -61,7 +62,7 @@ transport_params = {
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: TransportParams(
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),

View File

@@ -24,6 +24,7 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
from pipecat.utils.tracing.setup import setup_tracing
@@ -58,7 +59,7 @@ transport_params = {
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: TransportParams(
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),

View File

@@ -4,18 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Bot Implementation.
This module implements a chatbot using OpenAI's GPT-4 model for natural language
processing. It includes:
- Real-time audio/video interaction through Daily
- Animated robot avatar
- Text-to-speech using ElevenLabs
- Support for both English and Spanish
The bot runs as part of a pipeline that processes audio/video frames and manages
the conversation flow.
"""
import asyncio
import os
@@ -24,150 +12,72 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
OutputImageRawFrame,
SpriteFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.transports.services.helpers.daily_rest import (
DailyMeetingTokenParams,
DailyMeetingTokenProperties,
DailyRESTHelper,
DailyRoomParams,
)
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
sprites = []
script_dir = os.path.dirname(__file__)
# Load sequential animation frames
for i in range(1, 26):
# Build the full path to the image file
full_path = os.path.join(script_dir, f"assets/robot0{i}.png")
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
# Create a smooth animation by adding reversed frames
flipped = sprites[::-1]
sprites.extend(flipped)
# Define static and animated states
quiet_frame = sprites[0] # Static frame for when bot is listening
talking_frame = SpriteFrame(images=sprites) # Animation sequence for when bot is talking
class TalkingAnimation(FrameProcessor):
"""Manages the bot's visual animation states.
Switches between static (listening) and animated (talking) states based on
the bot's current speaking status.
"""
def __init__(self):
super().__init__()
self._is_talking = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and update animation state.
Args:
frame: The incoming frame to process
direction: The direction of frame flow in the pipeline
"""
await super().process_frame(frame, direction)
# Switch to talking animation when bot starts speaking
if isinstance(frame, BotStartedSpeakingFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
# Return to static frame when bot stops speaking
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_frame(quiet_frame)
self._is_talking = False
await self.push_frame(frame, direction)
async def main():
"""Main bot execution function.
Sets up and runs the bot pipeline including:
- Daily video transport
- Speech-to-text and text-to-speech services
- Language model integration
- Animation processing
- RTVI event handling
"""
"""Main bot execution function."""
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
daily_rest_helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=session,
)
room = await daily_rest_helper.create_room(
DailyRoomParams(properties={"enable_prejoin_ui": False})
)
token_params = DailyMeetingTokenParams(
properties=DailyMeetingTokenProperties(
is_owner=True,
permissions={
"hasPresence": False, # Example: join as a hidden participant
},
start_video_off=True,
start_audio_off=True,
)
)
token = await daily_rest_helper.get_token(room_url=room.url, params=token_params)
# Set up Daily transport with video/audio parameters
transport = DailyTransport(
room_url,
room.url,
token,
"Chatbot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=576,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
#
# Spanish
#
# transcription_settings=DailyTranscriptionSettings(
# language="es",
# tier="nova",
# model="2-general"
# )
),
)
# Initialize text-to-speech service
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
#
# English
#
voice_id="pNInz6obpgDQGcFmaJgB",
#
# Spanish
#
# model="eleven_multilingual_v2",
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
# Initialize LLM service
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
#
# English
#
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
#
# Spanish
#
# "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.",
"content": "Summerize the conversation so far in a single sentence.",
},
]
@@ -176,8 +86,6 @@ async def main():
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
ta = TalkingAnimation()
#
# RTVI events for Pipecat client UI
#
@@ -189,8 +97,6 @@ async def main():
rtvi,
context_aggregator.user(),
llm,
tts,
ta,
transport.output(),
context_aggregator.assistant(),
]
@@ -204,7 +110,6 @@ async def main():
),
observers=[RTVIObserver(rtvi)],
)
await task.queue_frame(quiet_frame)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):

View File

@@ -87,6 +87,7 @@ remote-smart-turn = []
silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soundfile = [ "soundfile~=0.13.0" ]
speechmatics = [ "speechmatics-rt>=0.3.1" ]
tavus=[]
together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]

View File

@@ -49,7 +49,7 @@ python run-release-evals.py -p 07 -a -v
You can also run evals for a single example (not part of the release set):
```sh
python run-eval.py YOUR_EXAMPLE_SCRIPT -a -v
python run-eval.py -p "A simple math addition" -a -v YOUR_EXAMPLE_SCRIPT
```
Your script needs to follow any of the foundation examples pattern.

View File

@@ -100,17 +100,18 @@ class EvalRunner:
start_time = time.time()
try:
await asyncio.wait(
[
asyncio.create_task(run_example_pipeline(script_path)),
asyncio.create_task(run_eval_pipeline(self, example_file, prompt, eval)),
],
timeout=90,
)
except asyncio.CancelledError:
pass
tasks = [
asyncio.create_task(run_example_pipeline(script_path)),
asyncio.create_task(run_eval_pipeline(self, example_file, prompt, eval)),
]
_, pending = await asyncio.wait(tasks, timeout=90)
if pending:
logger.error(f"ERROR: Eval timeout expired, cancelling pending tasks...")
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
except Exception as e:
print(f"ERROR: Unable to run {example_file}: {e}")
logger.error(f"ERROR: Unable to run {example_file}: {e}")
try:
result = await asyncio.wait_for(self._queue.get(), timeout=1.0)
@@ -134,6 +135,7 @@ class EvalRunner:
async def save_audio(self, name: str, audio: bytes, sample_rate: int, num_channels: int):
if len(audio) > 0:
filename = self._recording_file_name(name)
logger.debug(f"Saving {name} audio to {filename}")
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
@@ -142,7 +144,6 @@ class EvalRunner:
wf.writeframes(audio)
async with aiofiles.open(filename, "wb") as file:
await file.write(buffer.getvalue())
logger.debug(f"Saving {name} audio to {filename}")
else:
logger.warning(f"There's no audio to save for {name}")

View File

@@ -39,6 +39,7 @@ TESTS_07 = [
# 07 series
("07-interruptible.py", PROMPT_SIMPLE_MATH, None),
("07-interruptible-cartesia-http.py", PROMPT_SIMPLE_MATH, None),
("07a-interruptible-speechmatics.py", PROMPT_SIMPLE_MATH, None),
("07b-interruptible-langchain.py", PROMPT_SIMPLE_MATH, None),
("07c-interruptible-deepgram.py", PROMPT_SIMPLE_MATH, None),
("07d-interruptible-elevenlabs.py", PROMPT_SIMPLE_MATH, None),

View File

@@ -64,6 +64,7 @@ class DTMFAggregator(FrameProcessor):
self._digit_event = asyncio.Event()
self._aggregation_task: Optional[asyncio.Task] = None
self._interruption_task: Optional[asyncio.Task] = None
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Process incoming frames and handle DTMF aggregation.
@@ -81,6 +82,7 @@ class DTMFAggregator(FrameProcessor):
if self._aggregation:
await self._flush_aggregation()
await self._stop_aggregation_task()
await self._stop_interruption_task()
await self.push_frame(frame, direction)
elif isinstance(frame, InputDTMFFrame):
# Push the DTMF frame downstream first
@@ -100,7 +102,7 @@ class DTMFAggregator(FrameProcessor):
# For first digit, schedule interruption in separate task
if is_first_digit:
asyncio.create_task(self._send_interruption_task())
self._interruption_task = self.create_task(self._send_interruption_task())
# Check for immediate flush conditions
if frame.button == self._termination_digit:
@@ -111,12 +113,13 @@ class DTMFAggregator(FrameProcessor):
async def _send_interruption_task(self):
"""Send interruption frame safely in a separate task."""
try:
# Send the interruption frame
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
except Exception as e:
# Log error but don't propagate
print(f"Error sending interruption: {e}")
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
async def _stop_interruption_task(self) -> None:
"""Stops the interruption task."""
if self._interruption_task:
await self.cancel_task(self._interruption_task)
self._interruption_task = None
def _create_aggregation_task(self) -> None:
"""Creates the aggregation task if it hasn't been created yet."""

View File

@@ -152,11 +152,6 @@ class FrameProcessor(BaseObject):
self.__input_event = None
self.__input_frame_task: Optional[asyncio.Task] = None
# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are the
# exception to this rule. This create this task.
self.__push_frame_task: Optional[asyncio.Task] = None
@property
def id(self) -> int:
"""Get the unique identifier for this processor.
@@ -385,7 +380,6 @@ class FrameProcessor(BaseObject):
"""Clean up processor resources."""
await super().cleanup()
await self.__cancel_input_task()
await self.__cancel_push_task()
if self._metrics is not None:
await self._metrics.cleanup()
@@ -512,10 +506,7 @@ class FrameProcessor(BaseObject):
if not self._check_started(frame):
return
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
await self.__push_queue.put((frame, direction))
await self.__internal_push_frame(frame, direction)
async def __start(self, frame: StartFrame):
"""Handle the start frame to initialize processor state.
@@ -530,7 +521,6 @@ class FrameProcessor(BaseObject):
self._interruption_strategies = frame.interruption_strategies
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
self.__create_input_task()
self.__create_push_task()
async def __cancel(self, frame: CancelFrame):
"""Handle the cancel frame to stop processor operation.
@@ -540,7 +530,6 @@ class FrameProcessor(BaseObject):
"""
self._cancelling = True
await self.__cancel_input_task()
await self.__cancel_push_task()
async def __pause(self, frame: FrameProcessorPauseFrame | FrameProcessorPauseUrgentFrame):
"""Handle pause frame to pause processor operation.
@@ -567,9 +556,6 @@ class FrameProcessor(BaseObject):
async def _start_interruption(self):
"""Start handling an interruption by canceling current tasks."""
try:
# Cancel the push frame task. This will stop pushing frames downstream.
await self.__cancel_push_task()
# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
except Exception as e:
@@ -579,9 +565,6 @@ class FrameProcessor(BaseObject):
# Create a new input queue and task.
self.__create_input_task()
# Create a new output queue and task.
self.__create_push_task()
async def _stop_interruption(self):
"""Stop handling an interruption."""
# Nothing to do right now.
@@ -677,23 +660,3 @@ class FrameProcessor(BaseObject):
await self.push_error(ErrorFrame(str(e)))
finally:
self.__input_queue.task_done()
def __create_push_task(self):
"""Create the frame pushing task."""
if not self.__push_frame_task:
self.__push_queue = WatchdogQueue(self.task_manager)
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
async def __cancel_push_task(self):
"""Cancel the frame pushing task."""
if self.__push_frame_task:
self.__push_queue.cancel()
await self.cancel_task(self.__push_frame_task)
self.__push_frame_task = None
async def __push_frame_task_handler(self):
"""Handle frames from the push queue."""
while True:
(frame, direction) = await self.__push_queue.get()
await self.__internal_push_frame(frame, direction)
self.__push_queue.task_done()

View File

@@ -1005,6 +1005,10 @@ class RTVIProcessor(FrameProcessor):
):
"""Handle the start of a function call from the LLM.
.. deprecated:: 0.0.66
This method is deprecated and will be removed in a future version.
Use `RTVIProcessor.handle_function_call()` instead.
Args:
function_name: Name of the function being called.
llm: The LLM processor making the call.

View File

@@ -15,6 +15,8 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
@@ -168,6 +170,13 @@ class UserIdleProcessor(FrameProcessor):
self._idle_event.set()
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()
elif isinstance(frame, FunctionCallInProgressFrame):
# Function calls can take longer than the timeout, so we want to prevent idle callbacks
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, FunctionCallResultFrame):
self._interrupted = False
self._idle_event.set()
async def cleanup(self) -> None:
"""Cleans up resources when processor is shutting down."""

View File

@@ -185,8 +185,26 @@ class TwilioFrameSerializer(FrameSerializer):
async with session.post(endpoint, auth=auth, data=params) as response:
if response.status == 200:
logger.info(f"Successfully terminated Twilio call {call_sid}")
elif response.status == 404:
# Handle the case where the call has already ended
# Error code 20404: "The requested resource was not found"
# Source: https://www.twilio.com/docs/errors/20404
try:
error_data = await response.json()
if error_data.get("code") == 20404:
logger.debug(f"Twilio call {call_sid} was already terminated")
return
except:
pass # Fall through to log the raw error
# Log other 404 errors
error_text = await response.text()
logger.error(
f"Failed to terminate Twilio call {call_sid}: "
f"Status {response.status}, Response: {error_text}"
)
else:
# Get the error details for better debugging
# Log other errors
error_text = await response.text()
logger.error(
f"Failed to terminate Twilio call {call_sid}: "

View File

@@ -58,12 +58,14 @@ class FishAudioTTSService(InterruptibleTTSService):
Parameters:
language: Language for synthesis. Defaults to English.
latency: Latency mode ("normal" or "balanced"). Defaults to "normal".
normalize: Whether to normalize audio output. Defaults to True.
prosody_speed: Speech speed multiplier (0.5-2.0). Defaults to 1.0.
prosody_volume: Volume adjustment in dB. Defaults to 0.
"""
language: Optional[Language] = Language.EN
latency: Optional[str] = "normal" # "normal" or "balanced"
normalize: Optional[bool] = True
prosody_speed: Optional[float] = 1.0 # Speech speed (0.5-2.0)
prosody_volume: Optional[int] = 0 # Volume adjustment in dB
@@ -71,7 +73,9 @@ class FishAudioTTSService(InterruptibleTTSService):
self,
*,
api_key: str,
model: str, # This is the reference_id
reference_id: Optional[str] = None, # This is the voice ID
model: Optional[str] = None, # Deprecated
model_id: str = "speech-1.5",
output_format: FishAudioOutputFormat = "pcm",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
@@ -81,7 +85,14 @@ class FishAudioTTSService(InterruptibleTTSService):
Args:
api_key: Fish Audio API key for authentication.
model: Reference ID of the voice model to use for synthesis.
reference_id: Reference ID of the voice model to use for synthesis.
model: Deprecated. Reference ID of the voice model to use for synthesis.
.. deprecated:: 0.0.74
The `model` parameter is deprecated and will be removed in version 0.1.0.
Use `reference_id` instead to specify the voice model.
model_id: Specify which Fish Audio TTS model to use (e.g. "speech-1.5")
output_format: Audio output format. Defaults to "pcm".
sample_rate: Audio sample rate. If None, uses default.
params: Additional input parameters for voice customization.
@@ -96,6 +107,26 @@ class FishAudioTTSService(InterruptibleTTSService):
params = params or FishAudioTTSService.InputParams()
# Validation for model and reference_id parameters
if model and reference_id:
raise ValueError(
"Cannot specify both 'model' and 'reference_id'. Use 'reference_id' only."
)
if model is None and reference_id is None:
raise ValueError("Must specify 'reference_id' (or deprecated 'model') parameter.")
if model:
import warnings
warnings.warn(
"Parameter 'model' is deprecated and will be removed in a future version. "
"Use 'reference_id' instead.",
DeprecationWarning,
stacklevel=2,
)
reference_id = model
self._api_key = api_key
self._base_url = "wss://api.fish.audio/v1/tts/live"
self._websocket = None
@@ -107,14 +138,15 @@ class FishAudioTTSService(InterruptibleTTSService):
"sample_rate": 0,
"latency": params.latency,
"format": output_format,
"normalize": params.normalize,
"prosody": {
"speed": params.prosody_speed,
"volume": params.prosody_volume,
},
"reference_id": model,
"reference_id": reference_id,
}
self.set_model_name(model)
self.set_model_name(model_id)
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -125,14 +157,15 @@ class FishAudioTTSService(InterruptibleTTSService):
return True
async def set_model(self, model: str):
"""Set the TTS model (reference ID).
"""Set the TTS model and reconnect.
Args:
model: The reference ID of the voice model to use.
model: The model name to use for synthesis.
"""
self._settings["reference_id"] = model
await super().set_model(model)
logger.info(f"Switching TTS model to: [{model}]")
await self._disconnect()
await self._connect()
async def start(self, frame: StartFrame):
"""Start the Fish Audio TTS service.
@@ -182,6 +215,7 @@ class FishAudioTTSService(InterruptibleTTSService):
logger.debug("Connecting to Fish Audio")
headers = {"Authorization": f"Bearer {self._api_key}"}
headers["model"] = self.model_name
self._websocket = await websockets.connect(self._base_url, extra_headers=headers)
# Send initial start message with ormsgpack

View File

@@ -572,9 +572,6 @@ class GeminiMultimodalLiveLLMService(LLMService):
# Initialize the File API client
self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url)
# Initialize the File API client
self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url)
def can_generate_metrics(self) -> bool:
"""Check if the service can generate usage metrics.

View File

@@ -68,6 +68,7 @@ try:
FunctionCall,
FunctionResponse,
GenerateContentConfig,
HttpOptions,
Part,
)
except ModuleNotFoundError as e:
@@ -678,6 +679,7 @@ class GoogleLLMService(LLMService):
system_instruction: Optional[str] = None,
tools: Optional[List[Dict[str, Any]]] = None,
tool_config: Optional[Dict[str, Any]] = None,
http_options: Optional[HttpOptions] = None,
**kwargs,
):
"""Initialize the Google LLM service.
@@ -689,6 +691,7 @@ class GoogleLLMService(LLMService):
system_instruction: System instruction/prompt for the model.
tools: List of available tools/functions.
tool_config: Configuration for tool usage.
http_options: HTTP options for the client.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
@@ -698,7 +701,8 @@ class GoogleLLMService(LLMService):
self.set_model_name(model)
self._api_key = api_key
self._system_instruction = system_instruction
self._create_client(api_key)
self._http_options = http_options
self._create_client(api_key, http_options)
self._settings = {
"max_tokens": params.max_tokens,
"temperature": params.temperature,
@@ -717,6 +721,9 @@ class GoogleLLMService(LLMService):
"""
return True
def _create_client(self, api_key: str, http_options: Optional[HttpOptions] = None):
self._client = genai.Client(api_key=api_key, http_options=http_options)
def needs_mcp_alternate_schema(self) -> bool:
"""Check if this LLM service requires alternate MCP schema.
@@ -728,9 +735,6 @@ class GoogleLLMService(LLMService):
"""
return True
def _create_client(self, api_key: str):
self._client = genai.Client(api_key=api_key)
def _maybe_unset_thinking_budget(self, generation_params: Dict[str, Any]):
try:
# There's no way to introspect on model capabilities, so

View File

@@ -0,0 +1,5 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

View File

@@ -0,0 +1,813 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Speechmatics STT service integration."""
import asyncio
import datetime
import re
from dataclasses import dataclass, field
from typing import Any, AsyncGenerator, Optional
from urllib.parse import urlencode
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
)
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from speechmatics.rt import (
AsyncClient,
AudioEncoding,
AudioFormat,
ConversationConfig,
OperatingPoint,
ServerMessageType,
SpeakerDiarizationConfig,
TranscriptionConfig,
__version__,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Speechmatics, you need to `pip install pipecat-ai[speechmatics]`."
)
raise Exception(f"Missing module: {e}")
class AudioBuffer:
"""Audio buffer for STT clients.
The Python SDK expects audio in a pre-defined number of frames. This
buffer will accumulate the data from the pipeline and provide it to the
STT client in the correct lengths, waiting for the number of frames to
be available.
"""
def __init__(self, maxsize: int = 0):
"""Initialize the audio buffer.
Args:
maxsize: Maximum size of the buffer.
"""
self._queue = asyncio.Queue(maxsize=maxsize)
self._current_chunk = b""
self._position = 0
self._closed = False
def write_audio(self, data: bytes) -> None:
"""Write audio data to the buffer (thread-safe).
Args:
data: Audio data to write.
"""
if data:
try:
self._queue.put_nowait(data)
except asyncio.QueueFull:
pass
async def read(self, size: int) -> bytes:
"""Read exactly `size` bytes from the buffer (thread-safe).
This process will block until the required number of bytes are available
in the buffer. Audio is received from the pipeline in varying sizes, so
this buffer will accumulate the data and provide it to the STT client in
the correct lengths, waiting for the number of frames to be available.
Calling stop() will close the buffer and release the blocking read
process.
Args:
size: Number of bytes to read.
Returns:
bytes: Audio data read from the buffer.
"""
result = b""
bytes_needed = size
while bytes_needed > 0 and not self._closed:
# Use data from current chunk if available
if self._position < len(self._current_chunk):
available = len(self._current_chunk) - self._position
take = min(bytes_needed, available)
result += self._current_chunk[self._position : self._position + take]
self._position += take
bytes_needed -= take
continue
# Get next chunk
try:
chunk = await asyncio.wait_for(self._queue.get(), timeout=0.1)
if chunk is None:
continue
self._current_chunk = chunk
self._position = 0
except asyncio.TimeoutError:
await asyncio.sleep(0)
continue
return result
def stop(self) -> None:
"""Close the audio buffer."""
self._closed = True
@dataclass
class SpeechFragment:
"""Fragment of an utterance.
Parameters:
start_time: Start time of the fragment in seconds (from session start).
end_time: End time of the fragment in seconds (from session start).
language: Language of the fragment. Defaults to `Language.EN`.
is_eos: Whether the fragment is the end of a sentence. Defaults to `False`.
is_final: Whether the fragment is the final fragment. Defaults to `False`.
attaches_to: Whether the fragment attaches to the previous or next fragment (punctuation). Defaults to empty string.
content: Content of the fragment. Defaults to empty string.
speaker: Speaker of the fragment (if diarization is enabled). Defaults to `None`.
confidence: Confidence of the fragment (0.0 to 1.0). Defaults to `1.0`.
result: Raw result of the fragment from the TTS.
"""
start_time: float
end_time: float
language: Language = Language.EN
is_eos: bool = False
is_final: bool = False
attaches_to: str = ""
content: str = ""
speaker: Optional[str] = None
confidence: float = 1.0
result: Optional[Any] = None
@dataclass
class SpeakerFragments:
"""SpeechFragment items grouped by speaker_id.
Parameters:
speaker_id: The ID of the speaker.
timestamp: The timestamp of the frame.
language: The language of the frame.
fragments: The list of SpeechFragment items.
"""
speaker_id: Optional[str] = None
timestamp: Optional[str] = None
language: Optional[Language] = None
fragments: list[SpeechFragment] = field(default_factory=list)
def __str__(self):
"""Return a string representation of the object."""
return f"SpeakerFragments(speaker_id: {self.speaker_id}, timestamp: {self.timestamp}, language: {self.language}, text: {self._format_text()})"
def _format_text(self, format: Optional[str] = None) -> str:
"""Wrap text with speaker ID in an optional f-string format.
Args:
format: Format to wrap the text with.
Returns:
str: The wrapped text.
"""
# Cumulative contents
content = ""
# Assemble the text
for frag in self.fragments:
if content == "" or frag.attaches_to == "previous":
content += frag.content
else:
content += " " + frag.content
# Format the text, if format is provided
if format is None or self.speaker_id is None:
return content
return format.format(**{"speaker_id": self.speaker_id, "text": content})
def _as_frame_attributes(self, format: Optional[str] = None) -> dict[str, Any]:
"""Return a dictionary of attributes for a TranscriptionFrame.
Args:
format: Format to wrap the text with.
Returns:
dict[str, Any]: The dictionary of attributes.
"""
return {
"text": self._format_text(format),
"user_id": self.speaker_id,
"timestamp": self.timestamp,
"language": self.language,
"result": [frag.result for frag in self.fragments],
}
class SpeechmaticsSTTService(STTService):
"""Speechmatics STT service implementation.
This service provides real-time speech-to-text transcription using the Speechmatics API.
It supports partial and final transcriptions, multiple languages, various audio formats,
and speaker diarization.
"""
def __init__(
self,
*,
api_key: str,
language: Optional[Language] = None,
language_code: Optional[str] = None,
base_url: str = "wss://eu2.rt.speechmatics.com/v2",
domain: Optional[str] = None,
output_locale: Optional[Language] = None,
output_locale_code: Optional[str] = None,
enable_partials: bool = True,
max_delay: float = 1.5,
sample_rate: Optional[int] = 16000,
chunk_size: int = 256,
audio_encoding: AudioEncoding = AudioEncoding.PCM_S16LE,
end_of_utterance_silence_trigger: float = 0.5,
operating_point: OperatingPoint = OperatingPoint.ENHANCED,
enable_speaker_diarization: bool = False,
text_format: str = "<{speaker_id}>{text}</{speaker_id}>",
max_speakers: Optional[int] = None,
transcription_config: Optional[TranscriptionConfig] = None,
**kwargs,
):
"""Initialize the Speechmatics STT service.
Args:
api_key: Speechmatics API key for authentication.
language: Language code for transcription. Defaults to `None`.
language_code: Language code string for transcription. Defaults to `None`.
base_url: Base URL for Speechmatics API. Defaults to `wss://eu2.rt.speechmatics.com/v2`.
domain: Domain for Speechmatics API. Defaults to `None`.
output_locale: Output locale for transcription, e.g. `Language.EN_GB`. Defaults to `None`.
output_locale_code: Output locale code for transcription. Defaults to `None`.
enable_partials: Enable partial transcription results. Defaults to `True`.
max_delay: Maximum delay for transcription in seconds. Defaults to `1.5`.
sample_rate: Audio sample rate in Hz. Defaults to `16000`.
chunk_size: Audio chunk size for streaming. Defaults to `256`.
audio_encoding: Audio encoding format. Defaults to `pcm_s16le`.
end_of_utterance_silence_trigger: Silence duration in seconds to trigger end of utterance detection. Defaults to `0.5`.
operating_point: Operating point for transcription accuracy vs. latency tradeoff. Defaults to `enhanced`.
enable_speaker_diarization: Enable speaker diarization to identify different speakers. Defaults to `False`.
text_format: Wrapper for speaker ID. Defaults to `<{speaker_id}>{text}</{speaker_id}>`.
max_speakers: Maximum number of speakers to detect. Defaults to `None` (auto-detect).
transcription_config: Custom transcription configuration (other set parameters are merged). Defaults to `None`.
**kwargs: Additional arguments passed to STTService.
"""
super().__init__(sample_rate=sample_rate, **kwargs)
# Client configuration
self._api_key: str = api_key
self._language: Optional[Language] = language
self._language_code: Optional[str] = language_code
self._base_url: str = base_url
self._domain: Optional[str] = domain
self._output_locale: Optional[Language] = output_locale
self._output_locale_code: Optional[str] = output_locale_code
self._enable_partials: bool = enable_partials
self._max_delay: float = max_delay
self._sample_rate: int = sample_rate
self._chunk_size: int = chunk_size
self._audio_encoding: AudioEncoding = audio_encoding
self._end_of_utterance_silence_trigger: Optional[float] = end_of_utterance_silence_trigger
self._operating_point: OperatingPoint = operating_point
self._enable_speaker_diarization: bool = enable_speaker_diarization
self._text_format: str = text_format
self._max_speakers: Optional[int] = max_speakers
# Check we have required attributes
if not self._api_key:
raise ValueError("Missing Speechmatics API key")
if not self._base_url:
raise ValueError("Missing Speechmatics base URL")
# Validate the language code
if self._language and self._language_code:
raise ValueError("Language and language code cannot both be specified")
elif self._language:
self._language_code = _language_to_speechmatics_language(self._language)
# Validate the output locale code
if self._output_locale and self._output_locale_code:
raise ValueError("Output locale and output locale code cannot both be specified")
elif self._output_locale:
self._output_locale_code = _locale_to_speechmatics_locale(
self._language_code, self._output_locale
)
# Complete configuration objects
self._transcription_config: TranscriptionConfig = None
self._process_config(transcription_config)
# STT client
self._client: Optional[AsyncClient] = None
self._client_task: Optional[asyncio.Task] = None
self._audio_buffer: AudioBuffer = AudioBuffer(maxsize=10)
self._start_time: Optional[datetime.datetime] = None
# Current utterance speech data
self._speech_fragments: list[SpeechFragment] = []
async def start(self, frame: StartFrame):
"""Called when the new session starts."""
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
"""Called when the session ends."""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Called when the session is cancelled."""
await super().cancel(frame)
await self._disconnect()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Adds audio to the audio buffer and yields None."""
self._audio_buffer.write_audio(audio)
yield None
async def _run_client(self) -> None:
"""Runs the Speechmatics client in a thread."""
await self._client.transcribe(
self._audio_buffer,
transcription_config=self._transcription_config,
audio_format=AudioFormat(
encoding=self._audio_encoding,
sample_rate=self.sample_rate,
chunk_size=self._chunk_size,
),
)
async def _connect(self) -> None:
"""Connect to the STT service."""
# Create new STT RT client
self._client = AsyncClient(
api_key=self._api_key,
url=_get_endpoint_url(self._base_url),
)
# Log the event
logger.debug("Connected to Speechmatics STT service")
# Recognition started event
@self._client.on(ServerMessageType.RECOGNITION_STARTED)
def _evt_on_recognition_started(message: dict[str, Any]):
logger.debug(f"Recognition started (session: {message.get('id')})")
self._start_time = datetime.datetime.now(datetime.timezone.utc)
# Partial transcript event
@self._client.on(ServerMessageType.ADD_PARTIAL_TRANSCRIPT)
def _evt_on_partial_transcript(message: dict[str, Any]):
self._handle_transcript(message, is_final=False)
# Final transcript event
@self._client.on(ServerMessageType.ADD_TRANSCRIPT)
def _evt_on_final_transcript(message: dict[str, Any]):
self._handle_transcript(message, is_final=True)
# End of Utterance
@self._client.on(ServerMessageType.END_OF_UTTERANCE)
def _evt_on_end_of_utterance(message: dict[str, Any]):
logger.debug("End of utterance received from STT")
asyncio.run_coroutine_threadsafe(
self._send_frames(finalized=True), self.get_event_loop()
)
# Start the client in a thread
self._client_task = self.create_task(self._run_client())
async def _disconnect(self) -> None:
"""Disconnect from the STT service."""
# Stop the audio buffer
self._audio_buffer.stop()
# Disconnect the client
try:
if self._client:
await asyncio.wait_for(self._client.close(), timeout=1.0)
except asyncio.TimeoutError:
logger.warning("Timeout while closing Speechmatics client connection")
except Exception as e:
logger.error(f"Error closing Speechmatics client: {e}")
finally:
self._client = None
# Cancel the client task
if self._client_task:
await self.cancel_task(self._client_task)
self._client_task = None
# Log the event
logger.debug("Disconnected from Speechmatics STT service")
def _process_config(self, transcription_config: Optional[TranscriptionConfig] = None) -> None:
"""Create a formatted STT transcription config.
This takes an optional TranscriptionConfig object and populates it with the
values from the STT service. Individual parameters take priority over those
within the config object.
Args:
transcription_config: Optional transcription config to use.
"""
# Transcription config
if not transcription_config:
transcription_config = TranscriptionConfig(
language=self._language_code or "en",
domain=self._domain,
output_locale=self._output_locale_code,
operating_point=self._operating_point,
diarization="speaker" if self._enable_speaker_diarization else None,
enable_partials=self._enable_partials,
max_delay=self._max_delay or 2.0,
)
else:
if self._language_code:
transcription_config.language = self._language_code
if self._domain:
transcription_config.domain = self._domain
if self._output_locale_code:
transcription_config.output_locale = self._output_locale_code
if self._operating_point:
transcription_config.operating_point = self._operating_point
if self._enable_speaker_diarization:
transcription_config.diarization = "speaker"
if self._enable_partials:
transcription_config.enable_partials = self._enable_partials
if self._max_delay:
transcription_config.max_delay = self._max_delay
# Diarization
if self._enable_speaker_diarization and self._max_speakers:
transcription_config.speaker_diarization_config = SpeakerDiarizationConfig(
max_speakers=self._max_speakers,
)
# End of Utterance
if self._end_of_utterance_silence_trigger:
transcription_config.conversation_config = ConversationConfig(
end_of_utterance_silence_trigger=self._end_of_utterance_silence_trigger,
)
# Set config
self._transcription_config = transcription_config
def _handle_transcript(self, message: dict[str, Any], is_final: bool) -> None:
"""Handle the partial and final transcript events.
Args:
message: The new Partial or Final from the STT engine.
is_final: Whether the data is final or partial.
"""
# Add the speech fragments
has_changed = self._add_speech_fragments(
message=message,
is_final=is_final,
)
# Skip if unchanged
if not has_changed:
return
# Send frames
asyncio.run_coroutine_threadsafe(self._send_frames(), self.get_event_loop())
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
pass
async def _send_frames(self, finalized: bool = False) -> None:
"""Send frames to the pipeline.
Send speech frames to the pipeline. If VAD is enabled, then this will
also send an interruption and user started speaking frames. When the
final transcript is received, then this will send a user stopped speaking
and stop interruption frames.
Args:
finalized: Whether the data is final or partial.
"""
# Get speech frames (InterimTranscriptionFrame)
speech_frames = self._get_frames_from_fragments()
# Skip if no frames
if not speech_frames:
return
# If final, then re=parse into TranscriptionFrame
if finalized:
# Reset the speech fragments
self._speech_fragments.clear()
# Transform frames
frames = [
TranscriptionFrame(**frame._as_frame_attributes(self._text_format))
for frame in speech_frames
]
# Log transcript(s)
logger.debug(f"Finalized transcript: {[f.text for f in frames]}")
# Return as interim results
else:
frames = [
InterimTranscriptionFrame(**frame._as_frame_attributes()) for frame in speech_frames
]
# Send the frames back to pipecat
for frame in frames:
await self._handle_transcription(
transcript=frame.text,
is_final=finalized,
language=frame.language,
)
await self.push_frame(frame)
def _add_speech_fragments(self, message: dict[str, Any], is_final: bool = False) -> bool:
"""Takes a new Partial or Final from the STT engine.
Accumulates it into the _speech_data list. As new final data is added, all
partials are removed from the list.
Note: If a known speaker is `__[A-Z0-9_]{2,}__`, then the words are skipped,
as this is used to protect against self-interruption by the assistant or to
block out specific known voices.
Args:
message: The new Partial or Final from the STT engine.
is_final: Whether the data is final or partial.
Returns:
bool: True if the speech data was updated, False otherwise.
"""
# Parsed new speech data from the STT engine
fragments: list[SpeechFragment] = []
# Current length of the speech data
current_length = len(self._speech_fragments)
# Iterate over the results in the payload
for result in message.get("results", []):
alt = result.get("alternatives", [{}])[0]
if alt.get("content", None):
# Create the new fragment
fragment = SpeechFragment(
start_time=result.get("start_time", 0),
end_time=result.get("end_time", 0),
language=alt.get("language", Language.EN),
is_eos=alt.get("is_eos", False),
is_final=is_final,
attaches_to=result.get("attaches_to", ""),
content=alt.get("content", ""),
speaker=alt.get("speaker", None),
confidence=alt.get("confidence", 1.0),
result=result,
)
# Drop `__XX__` speakers
if fragment.speaker and re.match(r"^__[A-Z0-9_]{2,}__$", fragment.speaker):
continue
# Add the fragment
fragments.append(fragment)
# Remove existing partials, as new partials and finals are provided
self._speech_fragments = [frag for frag in self._speech_fragments if frag.is_final]
# Return if no new fragments and length of the existing data is unchanged
if not fragments and len(self._speech_fragments) == current_length:
return False
# Add the fragments to the speech data
self._speech_fragments.extend(fragments)
# Data was updated
return True
def _get_frames_from_fragments(self) -> list[SpeakerFragments]:
"""Get speech data objects for the current fragment list.
Each speech fragments is grouped by contiguous speaker and then
returned as internal SpeakerFragments objects with the `speaker_id` field
set to the current speaker (string). An utterance may contain speech from
more than one speaker (e.g. S1, S2, S1, S3, ...), so they are kept
in strict order for the context of the conversation.
Returns:
list[SpeakerFragments]: The list of objects.
"""
# Speaker groups
current_speaker: str | None = None
speaker_groups: list[list[SpeechFragment]] = [[]]
# Group by speakers
for frag in self._speech_fragments:
if frag.speaker != current_speaker:
current_speaker = frag.speaker
if speaker_groups[-1]:
speaker_groups.append([])
speaker_groups[-1].append(frag)
# Create SpeakerFragments objects
speaker_fragments: list[SpeakerFragments] = []
for group in speaker_groups:
sd = self._get_speaker_fragments_from_fragment_group(group)
if sd:
speaker_fragments.append(sd)
# Return the grouped SpeakerFragments objects
return speaker_fragments
def _get_speaker_fragments_from_fragment_group(
self,
group: list[SpeechFragment],
) -> SpeakerFragments | None:
"""Take a group of fragments and piece together into SpeakerFragments.
Each fragment for a given speaker is assembled into a string,
taking into consideration whether words are attached to the
previous or next word (notably punctuation). This ensures that
the text does not have extra spaces. This will also check for
any straggling punctuation from earlier utterances that should
be removed.
Args:
group: List of SpeechFragment objects.
Returns:
SpeakerFragments: The object for the group.
"""
# Check for starting fragments that are attached to previous
if group and group[0].attaches_to == "previous":
group = group[1:]
# Check for trailing fragments that are attached to next
if group and group[-1].attaches_to == "next":
group = group[:-1]
# Check there are results
if not group:
return None
# Get the timing extremes
start_time = min(frag.start_time for frag in group)
# Timestamp
ts = (self._start_time + datetime.timedelta(seconds=start_time)).isoformat(
timespec="milliseconds"
)
# Return the SpeakerFragments object
return SpeakerFragments(
speaker_id=group[0].speaker,
timestamp=ts,
language=group[0].language,
fragments=group,
)
def _get_endpoint_url(url: str) -> str:
"""Format the endpoint URL with the SDK and app versions.
Args:
url: The base URL for the endpoint.
Returns:
str: The formatted endpoint URL.
"""
query_params = dict()
query_params["sm-app"] = f"pipecat/{__version__}"
query = urlencode(query_params)
return f"{url}?{query}"
def _language_to_speechmatics_language(language: Language) -> str:
"""Convert a Language enum to a Speechmatics language code.
Args:
language: The Language enum to convert.
Returns:
str: The Speechmatics language code, if found.
"""
# List of supported input languages
BASE_LANGUAGES = {
Language.AR: "ar",
Language.BA: "ba",
Language.EU: "eu",
Language.BE: "be",
Language.BG: "bg",
Language.BN: "bn",
Language.YUE: "yue",
Language.CA: "ca",
Language.HR: "hr",
Language.CS: "cs",
Language.DA: "da",
Language.NL: "nl",
Language.EN: "en",
Language.EO: "eo",
Language.ET: "et",
Language.FA: "fa",
Language.FI: "fi",
Language.FR: "fr",
Language.GL: "gl",
Language.DE: "de",
Language.EL: "el",
Language.HE: "he",
Language.HI: "hi",
Language.HU: "hu",
Language.IT: "it",
Language.ID: "id",
Language.GA: "ga",
Language.JA: "ja",
Language.KO: "ko",
Language.LV: "lv",
Language.LT: "lt",
Language.MS: "ms",
Language.MT: "mt",
Language.CMN: "cmn",
Language.MR: "mr",
Language.MN: "mn",
Language.NO: "no",
Language.PL: "pl",
Language.PT: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.SL: "sl",
Language.ES: "es",
Language.SV: "sv",
Language.SW: "sw",
Language.TA: "ta",
Language.TH: "th",
Language.TR: "tr",
Language.UG: "ug",
Language.UK: "uk",
Language.UR: "ur",
Language.VI: "vi",
Language.CY: "cy",
}
# Get the language code
result = BASE_LANGUAGES.get(language)
# Fail if language is not supported
if not result:
raise ValueError(f"Unsupported language: {language}")
# Return the language code
return result
def _locale_to_speechmatics_locale(language_code: str, locale: Language) -> Optional[str]:
"""Convert a Language enum to a Speechmatics language code.
Args:
language_code: The language code.
locale: The Language enum to convert.
Returns:
str: The Speechmatics language code, if found.
"""
# Languages and output locales
LOCALES = {
"en": {
Language.EN_GB: "en-GB",
Language.EN_US: "en-US",
Language.EN_AU: "en-AU",
},
}
# Get the locale code
result = LOCALES.get(language_code, {}).get(locale)
# Fail if locale is not supported
if not result:
logger.warning(f"Unsupported output locale: {locale}, defaulting to {language_code}")
# Return the locale code
return result

View File

@@ -145,6 +145,9 @@ class Language(StrEnum):
EN_US = "en-US"
EN_ZA = "en-ZA"
# Esperanto
EO = "eo"
# Spanish
ES = "es"
ES_AR = "es-AR"
@@ -474,6 +477,9 @@ class Language(StrEnum):
# Tatar
TT = "tt"
# Uyghur
UG = "ug"
# Ukrainian
UK = "uk"
UK_UA = "uk-UA"

View File

@@ -214,7 +214,7 @@ class TestDTMFAggregator(unittest.IsolatedAsyncioTestCase):
]
# All the InputDTMFFrames plus one TranscriptionFrame
expected_down_frames = [InputDTMFFrame] * 12 + [TranscriptionFrame]
expected_down_frames = [InputDTMFFrame] * len(frames_to_send) + [TranscriptionFrame]
received_down_frames, _ = await run_test(
aggregator,