Compare commits

..

1 Commits

Author SHA1 Message Date
James Hush
77c82c64c0 Simple content filter demo 2025-12-04 09:52:41 +01:00
33 changed files with 173 additions and 1320 deletions

View File

@@ -1,174 +0,0 @@
name: Generate Changelog for Release
on:
workflow_dispatch:
inputs:
version:
description: "Release version (e.g., 0.0.97)"
required: true
type: string
date:
description: "Release date (YYYY-MM-DD format, defaults to today)"
required: false
type: string
default: ""
permissions:
contents: write
pull-requests: write
jobs:
generate-changelog:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install uv
uses: astral-sh/setup-uv@v4
with:
enable-cache: true
- name: Install dependencies
run: |
uv sync --group dev
- name: Set release date
id: set_date
run: |
if [ -z "${{ inputs.date }}" ]; then
RELEASE_DATE=$(date +%Y-%m-%d)
echo "Using today's date: $RELEASE_DATE"
else
RELEASE_DATE="${{ inputs.date }}"
echo "Using provided date: $RELEASE_DATE"
fi
echo "release_date=$RELEASE_DATE" >> $GITHUB_OUTPUT
- name: Validate inputs
run: |
# Validate version format (basic check)
if ! [[ "${{ inputs.version }}" =~ ^[0-9]+\.[0-9]+\.[0-9]+.*$ ]]; then
echo "Error: Version must be in format X.Y.Z (e.g., 0.0.97)"
exit 1
fi
# Validate date format if provided
if [ -n "${{ inputs.date }}" ]; then
if ! date -d "${{ inputs.date }}" >/dev/null 2>&1; then
# Try macOS date format
if ! date -j -f "%Y-%m-%d" "${{ inputs.date }}" >/dev/null 2>&1; then
echo "Error: Date must be in YYYY-MM-DD format (e.g., 2025-12-04)"
exit 1
fi
fi
fi
- name: Check for changelog fragments
id: check_fragments
run: |
FRAGMENT_COUNT=$(find changelog -name "*.md" ! -name "_template.md.j2" | wc -l | tr -d ' ')
echo "fragment_count=$FRAGMENT_COUNT" >> $GITHUB_OUTPUT
if [ "$FRAGMENT_COUNT" -eq "0" ]; then
echo "❌ Error: No changelog fragments found in changelog/"
echo ""
echo "Cannot create a release without changelog entries."
echo "Add changelog fragments to the changelog/ directory (e.g., 1234.added.md) and try again."
exit 1
fi
# Validate fragment types
VALID_TYPES="added changed deprecated removed fixed security"
INVALID_FRAGMENTS=""
for file in changelog/*.md; do
# Skip template
if [[ "$file" == "changelog/_template.md.j2" ]]; then
continue
fi
# Extract type from filename (e.g., 1234.added.md -> added)
filename=$(basename "$file")
# Handle both 1234.added.md and 1234.added.2.md patterns
type=$(echo "$filename" | sed -E 's/^[0-9]+\.([a-z]+)(\.[0-9]+)?\.md$/\1/')
# Check if type is valid
if ! echo "$VALID_TYPES" | grep -wq "$type"; then
INVALID_FRAGMENTS="$INVALID_FRAGMENTS\n - $filename (type: '$type')"
fi
done
if [ -n "$INVALID_FRAGMENTS" ]; then
echo "❌ Error: Invalid changelog fragment types found:"
echo -e "$INVALID_FRAGMENTS"
echo ""
echo "Valid types are: $VALID_TYPES"
echo "Example: 1234.added.md, 5678.fixed.md"
exit 1
fi
echo "✓ Found $FRAGMENT_COUNT changelog fragment(s)"
echo "has_fragments=true" >> $GITHUB_OUTPUT
- name: Preview changelog
run: |
echo "## Preview of changelog for version ${{ inputs.version }}"
echo ""
uv run towncrier build --draft --version "${{ inputs.version }}" --date "${{ steps.set_date.outputs.release_date }}"
- name: Build changelog
run: |
uv run towncrier build --version "${{ inputs.version }}" --date "${{ steps.set_date.outputs.release_date }}" --yes
- name: Create Pull Request
uses: peter-evans/create-pull-request@v7
with:
token: ${{ secrets.GITHUB_TOKEN }}
commit-message: "Update changelog for version ${{ inputs.version }}"
title: "Release ${{ inputs.version }} - Changelog Update"
body: |
## Changelog Update for Release ${{ inputs.version }}
This PR updates the CHANGELOG.md with all changes for version **${{ inputs.version }}**.
### Summary
- **Version:** ${{ inputs.version }}
- **Date:** ${{ steps.set_date.outputs.release_date }}
- **Fragments processed:** ${{ steps.check_fragments.outputs.fragment_count }}
### What this PR does
- ✅ Adds new release section to CHANGELOG.md
- ✅ Removes processed changelog fragments
- ✅ Ready to merge for release
### Next Steps
1. Review the changelog entries below
2. Make any necessary edits to CHANGELOG.md if needed
3. Merge this PR
4. Continue with your release process
---
<details>
<summary>📋 Preview of changes</summary>
The changelog has been updated with entries from the following fragments:
```bash
${{ steps.check_fragments.outputs.fragment_count }} fragments processed
```
</details>
branch: changelog-${{ inputs.version }}
delete-branch: true
labels: |
changelog
release

View File

@@ -5,28 +5,21 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
<!-- towncrier release notes start -->
## [0.0.97] - 2025-12-05
## [Unreleased]
### Added
- Added new Gradium services, `GradiumSTTService` and `GradiumTTSService`, for
speech-to-text and text-to-speech functionality using Gradium's API.
- Additions for `AsyncAITTSService` and `AsyncAIHttpTTSService`:
- Added new `languages`: `pt`, `nl`, `ar`, `ru`, `ro`, `ja`, `he`, `hy`,
`tr`, `hi`, `zh`.
- Updated the default model to `asyncflow_multilingual_v1.0` for improved
accuracy and broader language coverage.
- Added optional tool and tool output filters for MCP services.
- Added `wait_for_all` argument to the base `LLMService`. When enabled, this
ensures all function calls complete before returning results to the LLM (i.e.,
before running a new inference with those results).
### Changed
- Updated Deepgram logging to include Deepgram request IDs for improved
debugging.
- Improved interruption handling to prevent bots from repeating themselves.
LLM services that return multiple sentences in a single response (e.g.,
`GoogleLLMService`) are now split into individual sentences before being sent
to TTS. This ensures interruptions occur at sentence boundaries, preventing
the bot from repeating content after being interrupted during long responses.
- Text Aggregation Improvements:
@@ -38,40 +31,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`PatternPairAggregator` now inherit from `SimpleTextAggregator`, reusing
the base class's sentence detection logic.
- Improved interruption handling to prevent bots from repeating themselves. LLM
services that return multiple sentences in a single response (e.g.,
`GoogleLLMService`) are now split into individual sentences before being sent
to TTS. This ensures interruptions occur at sentence boundaries, preventing
the bot from repeating content after being interrupted during long responses.
- Updated `AICFilter` to use Quail STT as the default model
(`AICModelType.QUAIL_STT`). Quail STT is optimized for human-to-machine
interaction (e.g., voice agents, speech-to-text) and operates at a native
sample rate of 16 kHz with fixed enhancement parameters.
- If an unexpected exception is caught, or if `FrameProcessor.push_error()` is
called with an exception, the file name and line number where the exception
occured are now logged.
- Updated Smart Turn model weights to v3.1.
- Smart Turn analyzer now uses the full context of the turn rather than just
the audio since VAD last triggered.
- Updated `CartesiaSTTService` to return the full transcription `result` in the
`TranscriptionFrame` and `InterimTranscriptionFrame`. This provides access to
word timestamp data.
- `HumeTTSService` changes:
- Added tracking headers (`X-Hume-Client-Name` and `X-Hume-Client-Version`)
to all requests made by `HumeTTSService` to the Hume API for better usage
tracking and analytics.
- Added `stop()` and `cancel()` cleanup methods to `HumeTTSService` to
properly close the HTTP client and prevent resource leaks.
- Updated Deepgram logging to include Deepgram request IDs for improved debugging.
### Deprecated
- Package `pipecat.sync` is deprecated, use `pipecat.utils.sync` instead.
- The `noise_gate_enable` parameter in `AICFilter` is deprecated and no longer
has any effect. Noise gating is now handled automatically by the AIC VAD
system. Use `AICFilter.create_vad_analyzer()` for VAD functionality instead.
- NVIDIA Services name changes (all functionality is unchanged):
- `NimLLMService` is now deprecated, use `NvidiaLLMService` instead.
@@ -80,41 +54,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Use `uv pip install pipecat-ai[nvidia]` instead of
`uv pip install pipecat-ai[riva]`
- The `noise_gate_enable` parameter in `AICFilter` is deprecated and no longer
has any effect. Noise gating is now handled automatically by the AIC VAD
system. Use `AICFilter.create_vad_analyzer()` for VAD functionality instead.
- Package `pipecat.sync` is deprecated, use `pipecat.utils.sync` instead.
### Fixed
- Fixed bug in `PatternPairAggregator` where pattern handlers could be called
multiple times for `KEEP` or `AGGREGATE` patterns.
- Fixed an issue where `LLMTextFrame.skip_tts` was being overwritten by LLM
services.
- Fixed sentence aggregation to correctly handle ambiguous punctuation in
streaming text, such as currency ("$29.95") and abbreviations ("Mr. Smith").
- Fixed an issue in `AWSTranscribeSTTService` where the `region` arg was always
set to `us-east-1` when providing an AWS_REGION env var.
- Fixed bug in `PatternPairAggregator` where pattern handlers could be called
multiple times for `KEEP` or `AGGREGATE` patterns.
- Fixed an issue in `SarvamTTSService` where the last sentence was not being
spoken. Now, audio is flushed when the TTS services receives the
`LLMFullResponseEndFrame` or `EndFrame`.
- Fixed an issue in `AWSTranscribeSTTService` where the `region` arg was
always set to `us-east-1` when providing an AWS_REGION env var.
- Fixed an issue in `DeepgramTTSService` where a `TTSStoppedFrame` was
incorrectly pushed after a functional call. This caused an issue with the
voice-ui-kit's conversational panel rending of the LLM output after a
function call.
- Fixed an issue where `LLMTextFrame.skip_tts` was being overwritten by LLM
services.
- Fixed an issue that caused `WebsocketService` instances to attempt
reconnection during shutdown.
- Fixed an issue in `ElevenLabsTTSService` where character usage metrics were
only reported on the first TTS generation per turn.
## [0.0.96] - 2025-11-26 🦃 "Happy Thanksgiving!" 🦃
### Added

View File

@@ -17,121 +17,24 @@ We welcome contributions of all kinds! Your help is appreciated. Follow these st
git checkout -b your-branch-name
```
4. **Make your changes**: Edit or add files as necessary.
5. **Add a changelog entry**: Create a changelog fragment file (see [Changelog Entries](#changelog-entries) below).
6. **Test your changes**: Ensure that your changes look correct and follow the style set in the codebase.
7. **Commit your changes**: Once you're satisfied with your changes, commit them with a meaningful message.
5. **Test your changes**: Ensure that your changes look correct and follow the style set in the codebase.
6. **Commit your changes**: Once you're satisfied with your changes, commit them with a meaningful message.
```bash
git commit -m "Description of your changes"
```
8. **Push your changes**: Push your branch to your forked repository.
7. **Push your changes**: Push your branch to your forked repository.
```bash
git push origin your-branch-name
```
9. **Submit a Pull Request (PR)**: Open a PR from your forked repository to the main branch of this repo.
8. **Submit a Pull Request (PR)**: Open a PR from your forked repository to the main branch of this repo.
> Important: Describe the changes you've made clearly!
Our maintainers will review your PR, and once everything is good, your contributions will be merged!
## Changelog Entries
Every pull request that makes a user-facing change should include a changelog entry. We use a changelog fragment system to avoid merge conflicts.
### Creating a Changelog Fragment
1. Create a new file in the `changelog/` directory with this naming pattern:
```
<PR_number>.<type>.md
```
2. Choose the appropriate type:
- `added.md` - New features
- `changed.md` - Changes in existing functionality
- `deprecated.md` - Soon-to-be removed features
- `removed.md` - Removed features
- `fixed.md` - Bug fixes
- `security.md` - Security fixes
3. Write your changelog entry as a Markdown bullet point. Include the `-` at the start:
**Example files:**
`changelog/1234.added.md`:
```markdown
- Added support for Anthropic Claude 3.5 Sonnet with improved streaming performance.
```
`changelog/5678.fixed.md`:
```markdown
- Fixed an issue where audio frames were dropped during high-load scenarios.
```
**For entries with nested bullets:**
`changelog/1234.changed.md`:
```markdown
- Updated service configuration:
- Changed default timeout to 30 seconds
- Added retry logic for failed connections
```
### Multiple Changes in One PR
**Different types of changes:** Create separate fragment files for each type:
```
changelog/1234.added.md
changelog/1234.fixed.md
```
**Multiple changes of the same type:** Create numbered fragment files:
```
changelog/1234.changed.md
changelog/1234.changed.2.md
```
**Related changes:** Use nested bullets in a single fragment:
```markdown
- Updated service configuration:
- Changed default timeout to 30 seconds
- Added retry logic for failed connections
```
**Rule of thumb:** One logical change per fragment file. If changes are unrelated, use separate files.
### Preview Your Changes
To see what your changelog entry will look like:
```bash
towncrier build --draft --version Unreleased
```
This won't modify any files, just show you a preview.
### When to Skip Changelog Entries
You can skip adding a changelog entry for:
- Documentation-only changes
- Internal refactoring with no user-facing impact
- Test-only changes
- CI/build configuration changes
If you're unsure whether your change needs a changelog entry, ask in your PR!
## Dependency Management
This project uses [uv](https://docs.astral.sh/uv/) for dependency management. The `uv.lock` file is committed to ensure reproducible builds.

View File

@@ -74,9 +74,9 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
| Category | Services |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Gradium](https://docs.pipecat.ai/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Gradium](https://docs.pipecat.ai/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |

View File

@@ -1,16 +0,0 @@
{% for section, _ in sections.items() %}
{% if sections[section] %}
{% for category, val in definitions.items() if category in sections[section]%}
### {{ definitions[category]['name'] }}
{% for text, values in sections[section][category].items() %}
{{ text }}
{% endfor %}
{% endfor %}
{% else %}
No significant changes.
{% endif %}
{% endfor %}

View File

@@ -73,9 +73,6 @@ GOOGLE_CLOUD_PROJECT_ID=...
GOOGLE_CLOUD_LOCATION=...
GOOGLE_TEST_CREDENTIALS=...
# Gradium
GRAPDIUM_API_KEY=...
# Grok
GROK_API_KEY=...
@@ -194,4 +191,4 @@ TWILIO_AUTH_TOKEN=...
WHATSAPP_TOKEN=...
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=...
WHATSAPP_PHONE_NUMBER_ID=...
WHATSAPP_APP_SECRET=...
WHATSAPP_APP_SECRET=...

View File

@@ -13,12 +13,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import Frame, LLMContextFrame, LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -30,6 +31,44 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
FILTERED_WORDS = ["apple", "banana", "car"]
class ContentFilterProcessor(FrameProcessor):
"""Processor that filters LLMContextFrames containing specific words.
If the user's message contains any of the filtered words, the context
is replaced with a message indicating the assistant cannot respond.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMContextFrame):
# Check the last user message for filtered words
messages = frame.context.messages
if messages:
last_message = messages[-1]
content = last_message.get("content", "")
if isinstance(content, str):
content_lower = content.lower()
if any(word in content_lower for word in FILTERED_WORDS):
logger.info(f"Filtered content detected: {content}")
# Create a new context with a filtered response instruction
filtered_context = LLMContext(
messages=[
{
"role": "system",
"content": "The user is asking about something you cannot give an answer about. Tell them you don't know how to respond.",
}
]
)
await self.push_frame(LLMContextFrame(filtered_context), direction)
return
await self.push_frame(frame, direction)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -76,12 +115,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
content_filter = ContentFilterProcessor()
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
content_filter, # Content filter
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output

View File

@@ -1,127 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gradium.stt import GradiumSTTService
from pipecat.services.gradium.tts import GradiumTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = GradiumSTTService(api_key=os.getenv("GRADIUM_API_KEY"))
tts = GradiumTTSService(
api_key=os.getenv("GRADIUM_API_KEY"),
voice_id="YTpq7expH9539ERJ",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -76,7 +76,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = FireworksLLMService(
api_key=os.getenv("FIREWORKS_API_KEY"),
model="accounts/fireworks/models/gpt-oss-20b",
model="accounts/fireworks/models/llama-v3p1-405b-instruct",
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.

View File

@@ -64,14 +64,11 @@ class UrlToImageProcessor(FrameProcessor):
await self.push_frame(frame, direction)
def extract_url(self, text: str):
try:
data = json.loads(text)
if "artObject" in data:
return data["artObject"]["webImage"]["url"]
if "artworks" in data and len(data["artworks"]):
return data["artworks"][0]["webImage"]["url"]
except:
pass
data = json.loads(text)
if "artObject" in data:
return data["artObject"]["webImage"]["url"]
if "artworks" in data and len(data["artworks"]):
return data["artworks"][0]["webImage"]["url"]
return None
@@ -91,23 +88,6 @@ class UrlToImageProcessor(FrameProcessor):
logger.error(error_msg)
# full list of tools available from rijksmuseum MCP:
# - get_artwork_details
# - get_artwork_image
# - get_user_sets
# - get_user_set_details
# - open_image_in_browser
# - get_artist_timeline
mcp_tools_filter = ["get_artwork_details", "get_artwork_image", "open_image_in_browser"]
def open_image_output_filter(output: str):
pattern = r"Successfully opened image in browser: "
text_to_print = re.sub(pattern, "", output)
print(f"🖼️ link to high resolution artwork: {text_to_print}")
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -156,10 +136,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# https://github.com/r-huijts/rijksmuseum-mcp
args=["-y", "mcp-server-rijksmuseum"],
env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")},
),
# Optional
tools_filter=mcp_tools_filter, # Optional
tools_output_filters={"open_image_in_browser": open_image_output_filter},
)
)
except Exception as e:
logger.error(f"error setting up mcp")

View File

@@ -67,14 +67,13 @@ class UrlToImageProcessor(FrameProcessor):
await self.push_frame(frame, direction)
def extract_url(self, text: str):
try:
data = json.loads(text)
if "artObject" in data:
return data["artObject"]["webImage"]["url"]
if "artworks" in data and len(data["artworks"]):
return data["artworks"][0]["webImage"]["url"]
except:
pass
data = json.loads(text)
if "artObject" in data:
return data["artObject"]["webImage"]["url"]
if "artworks" in data and len(data["artworks"]):
return data["artworks"][0]["webImage"]["url"]
return None
async def run_image_process(self, image_url: str):
try:

View File

@@ -63,7 +63,6 @@ fireworks = []
fish = [ "ormsgpack~=1.7.0", "pipecat-ai[websockets-base]" ]
gladia = [ "pipecat-ai[websockets-base]" ]
google = [ "google-cloud-speech>=2.33.0,<3", "google-cloud-texttospeech>=2.31.0,<3", "google-genai>=1.41.0,<2", "pipecat-ai[websockets-base]" ]
gradium = [ "pipecat-ai[websockets-base]" ]
grok = []
groq = [ "groq~=0.23.0" ]
gstreamer = [ "pygobject~=3.50.0" ]
@@ -130,7 +129,6 @@ dev = [
"setuptools~=78.1.1",
"setuptools_scm~=8.3.1",
"python-dotenv>=1.0.1,<2.0.0",
"towncrier~=25.8.0",
]
docs = [
@@ -161,7 +159,7 @@ where = ["src"]
"src/pipecat/audio/dtmf/dtmf-star.wav",
]
"pipecat.services.aws_nova_sonic" = ["src/pipecat/services/aws_nova_sonic/ready.wav"]
"pipecat.audio.turn.smart_turn.data" = ["src/pipecat/audio/turn/smart_turn/data/smart-turn-v3.1-cpu.onnx"]
"pipecat.audio.turn.smart_turn.data" = ["src/pipecat/audio/turn/smart_turn/data/smart-turn-v3.0.onnx"]
[tool.pytest.ini_options]
addopts = "--verbose"
@@ -208,44 +206,3 @@ convention = "google"
command_line = "--module pytest"
source = ["src"]
omit = ["*/tests/*"]
[tool.towncrier]
package = "pipecat"
package_dir = "src"
filename = "CHANGELOG.md"
directory = "changelog"
start_string = "<!-- towncrier release notes start -->\n"
template = "changelog/_template.md.j2"
title_format = "## [{version}] - {project_date}"
underlines = ["", "", ""]
wrap = true
[[tool.towncrier.type]]
directory = "added"
name = "Added"
showcontent = true
[[tool.towncrier.type]]
directory = "changed"
name = "Changed"
showcontent = true
[[tool.towncrier.type]]
directory = "deprecated"
name = "Deprecated"
showcontent = true
[[tool.towncrier.type]]
directory = "removed"
name = "Removed"
showcontent = true
[[tool.towncrier.type]]
directory = "fixed"
name = "Fixed"
showcontent = true
[[tool.towncrier.type]]
directory = "security"
name = "Security"
showcontent = true

View File

@@ -28,6 +28,7 @@ from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
STOP_SECS = 3
PRE_SPEECH_MS = 0
MAX_DURATION_SECONDS = 8 # Max allowed segment duration
USE_ONLY_LAST_VAD_SEGMENT = True
class SmartTurnParams(BaseTurnParams):
@@ -42,6 +43,8 @@ class SmartTurnParams(BaseTurnParams):
stop_secs: float = STOP_SECS
pre_speech_ms: float = PRE_SPEECH_MS
max_duration_secs: float = MAX_DURATION_SECONDS
# not exposing this for now yet until the model can handle it.
# use_only_last_vad_segment: bool = USE_ONLY_LAST_VAD_SEGMENT
class SmartTurnTimeoutException(Exception):
@@ -157,7 +160,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
state, result = await loop.run_in_executor(
self._executor, self._process_speech_segment, self._audio_buffer
)
if state == EndOfTurnState.COMPLETE:
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
self._clear(state)
logger.debug(f"End of Turn result: {state}")
return state, result

View File

@@ -42,15 +42,17 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
Args:
smart_turn_model_path: Path to the ONNX model file. If this is not
set, the bundled smart-turn-v3.1-cpu model will be used.
set, the bundled smart-turn-v3.0 model will be used.
cpu_count: The number of CPUs to use for inference. Defaults to 1.
**kwargs: Additional arguments passed to BaseSmartTurn.
"""
super().__init__(**kwargs)
logger.debug("Loading Local Smart Turn v3 model...")
if not smart_turn_model_path:
# Load bundled model
model_name = "smart-turn-v3.1-cpu.onnx"
model_name = "smart-turn-v3.0.onnx"
package_path = "pipecat.audio.turn.smart_turn.data"
try:
@@ -68,8 +70,6 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
impresources.files(package_path).joinpath(model_name)
)
logger.debug(f"Loading Local Smart Turn v3.x model from {smart_turn_model_path}...")
so = ort.SessionOptions()
so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
so.inter_op_num_threads = 1
@@ -79,7 +79,7 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
self._feature_extractor = WhisperFeatureExtractor(chunk_length=8)
self._session = ort.InferenceSession(smart_turn_model_path, sess_options=so)
logger.debug("Loaded Local Smart Turn v3.x")
logger.debug("Loaded Local Smart Turn v3")
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Predict end-of-turn using local ONNX model."""

View File

@@ -12,7 +12,6 @@ management, and frame flow control mechanisms.
"""
import asyncio
import traceback
from dataclasses import dataclass
from enum import Enum
from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence, Tuple, Type
@@ -678,17 +677,7 @@ class FrameProcessor(BaseObject):
if not error.processor:
error.processor = self
await self._call_event_handler("on_error", error)
if error.exception:
tb = traceback.extract_tb(error.exception.__traceback__)
last = tb[-1]
error_message = (
f"{error.processor} exception ({last.filename}:{last.lineno}): {error.error}"
)
else:
error_message = f"{error.processor} error: {error.error}"
logger.error(error_message)
logger.error(f"{error.processor} error: {error.error}")
await self.push_frame(error, FrameDirection.UPSTREAM)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):

View File

@@ -935,8 +935,8 @@ class RTVIObserverParams:
system_logs_enabled: Indicates if system logs should be sent.
errors_enabled: [Deprecated] Indicates if errors messages should be sent.
skip_aggregator_types: List of aggregation types to skip sending as tts/output messages.
Note: if using this to avoid sending secure information, be sure to also disable
bot_llm_enabled to avoid leaking through LLM messages.
Note: if using this to avoid sending secure information, be sure to also disable
bot_llm_enabled to avoid leaking through LLM messages.
bot_output_transforms: A list of callables to transform text before just before sending it
to TTS. Each callable takes the aggregated text and its type, and returns the
transformed text. To register, provide a list of tuples of

View File

@@ -56,17 +56,6 @@ def language_to_async_language(language: Language) -> Optional[str]:
Language.ES: "es",
Language.DE: "de",
Language.IT: "it",
Language.PT: "pt",
Language.NL: "nl",
Language.AR: "ar",
Language.RU: "ru",
Language.RO: "ro",
Language.JA: "ja",
Language.HE: "he",
Language.HY: "hy",
Language.TR: "tr",
Language.HI: "hi",
Language.ZH: "zh",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
@@ -85,7 +74,7 @@ class AsyncAITTSService(InterruptibleTTSService):
language: Language to use for synthesis.
"""
language: Optional[Language] = None
language: Optional[Language] = Language.EN
def __init__(
self,
@@ -94,7 +83,7 @@ class AsyncAITTSService(InterruptibleTTSService):
voice_id: str,
version: str = "v1",
url: str = "wss://api.async.ai/text_to_speech/websocket/ws",
model: str = "asyncflow_multilingual_v1.0",
model: str = "asyncflow_v2.0",
sample_rate: Optional[int] = None,
encoding: str = "pcm_s16le",
container: str = "raw",
@@ -110,7 +99,7 @@ class AsyncAITTSService(InterruptibleTTSService):
https://docs.async.ai/list-voices-16699698e0
version: Async API version.
url: WebSocket URL for Async TTS API.
model: TTS model to use (e.g., "asyncflow_multilingual_v1.0").
model: TTS model to use (e.g., "asyncflow_v2.0").
sample_rate: Audio sample rate.
encoding: Audio encoding format.
container: Audio container format.
@@ -139,7 +128,7 @@ class AsyncAITTSService(InterruptibleTTSService):
},
"language": self.language_to_service_language(params.language)
if params.language
else None,
else "en",
}
self.set_model_name(model)
@@ -368,7 +357,7 @@ class AsyncAIHttpTTSService(TTSService):
language: Language to use for synthesis.
"""
language: Optional[Language] = None
language: Optional[Language] = Language.EN
def __init__(
self,
@@ -376,7 +365,7 @@ class AsyncAIHttpTTSService(TTSService):
api_key: str,
voice_id: str,
aiohttp_session: aiohttp.ClientSession,
model: str = "asyncflow_multilingual_v1.0",
model: str = "asyncflow_v2.0",
url: str = "https://api.async.ai",
version: str = "v1",
sample_rate: Optional[int] = None,
@@ -391,7 +380,7 @@ class AsyncAIHttpTTSService(TTSService):
api_key: Async API key.
voice_id: ID of the voice to use for synthesis.
aiohttp_session: An aiohttp session for making HTTP requests.
model: TTS model to use (e.g., "asyncflow_multilingual_v1.0").
model: TTS model to use (e.g., "asyncflow_v2.0").
url: Base URL for Async API.
version: API version string for Async API.
sample_rate: Audio sample rate.
@@ -415,7 +404,7 @@ class AsyncAIHttpTTSService(TTSService):
},
"language": self.language_to_service_language(params.language)
if params.language
else None,
else "en",
}
self.set_voice(voice_id)
self.set_model_name(model)

View File

@@ -20,6 +20,7 @@ from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
@@ -348,7 +349,6 @@ class CartesiaSTTService(WebsocketSTTService):
self._user_id,
time_now_iso8601(),
language,
result=data,
)
)
await self._handle_transcription(transcript, is_final, language)
@@ -361,6 +361,5 @@ class CartesiaSTTService(WebsocketSTTService):
self._user_id,
time_now_iso8601(),
language,
result=data,
)
)

View File

@@ -160,7 +160,7 @@ def build_elevenlabs_voice_settings(
class PronunciationDictionaryLocator(BaseModel):
"""Locator for a pronunciation dictionary.
Parameters:
Attributes:
pronunciation_dictionary_id: The ID of the pronunciation dictionary.
version_id: The version ID of the pronunciation dictionary.
"""
@@ -731,8 +731,10 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._websocket.send(json.dumps(msg))
logger.trace(f"Created new context {self._context_id}")
await self._send_text(text)
await self.start_tts_usage_metrics(text)
await self._send_text(text)
await self.start_tts_usage_metrics(text)
else:
await self._send_text(text)
except Exception as e:
yield TTSStoppedFrame()
yield ErrorFrame(error=f"Unknown error occurred: {e}")

View File

@@ -1,5 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

View File

@@ -1,239 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Gradium's speech-to-text service implementation.
This module provides integration with Gradium's real-time speech-to-text
WebSocket API for streaming audio transcription.
"""
import base64
import json
from typing import AsyncGenerator
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
StartFrame,
TranscriptionFrame,
)
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error('In order to use Gradium, you need to `pip install "pipecat-ai[gradium]"`.')
raise Exception(f"Missing module: {e}")
SAMPLE_RATE = 24000
class GradiumSTTService(WebsocketSTTService):
"""Gradium real-time speech-to-text service.
Provides real-time speech transcription using Gradium's WebSocket API.
Supports both interim and final transcriptions with configurable parameters
for audio processing and connection management.
"""
def __init__(
self,
*,
api_key: str,
api_endpoint_base_url: str = "wss://eu.api.gradium.ai/api/speech/asr",
json_config: str | None = None,
**kwargs,
):
"""Initialize the Gradium STT service.
Args:
api_key: Gradium API key for authentication.
api_endpoint_base_url: WebSocket endpoint URL. Defaults to Gradium's streaming endpoint.
json_config: Optional JSON configuration string for additional model settings.
**kwargs: Additional arguments passed to parent STTService class.
"""
super().__init__(sample_rate=SAMPLE_RATE, **kwargs)
self._api_key = api_key
self._api_endpoint_base_url = api_endpoint_base_url
self._websocket = None
self._json_config = json_config
self._receive_task = None
self._audio_buffer = bytearray()
self._chunk_size_ms = 80
self._chunk_size_bytes = 0
def can_generate_metrics(self) -> bool:
"""Check if the service can generate metrics.
Returns:
True if metrics generation is supported.
"""
return True
async def start(self, frame: StartFrame):
"""Start the speech-to-text service.
Args:
frame: Start frame to begin processing.
"""
await super().start(frame)
self._chunk_size_bytes = int(self._chunk_size_ms * self.sample_rate * 2 / 1000)
await self._connect()
async def stop(self, frame: EndFrame):
"""Stop the speech-to-text service.
Args:
frame: End frame to stop processing.
"""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the speech-to-text service.
Args:
frame: Cancel frame to abort processing.
"""
await super().cancel(frame)
await self._disconnect()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Process audio data for speech-to-text conversion.
Args:
audio: Raw audio bytes to process.
Yields:
None (processing handled via WebSocket messages).
"""
self._audio_buffer.extend(audio)
await self.start_ttfb_metrics()
await self.start_processing_metrics()
while len(self._audio_buffer) >= self._chunk_size_bytes:
chunk = bytes(self._audio_buffer[: self._chunk_size_bytes])
self._audio_buffer = self._audio_buffer[self._chunk_size_bytes :]
chunk = base64.b64encode(chunk).decode("utf-8")
msg = {"type": "audio", "audio": chunk}
if self._websocket and self._websocket.state is State.OPEN:
await self._websocket.send(json.dumps(msg))
yield None
@traced_stt
async def _trace_transcription(self, transcript: str, is_final: bool, language: Language):
"""Record transcription event for tracing."""
pass
async def _connect(self):
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.state is State.OPEN:
return
ws_url = self._api_endpoint_base_url
headers = {
"x-api-key": self._api_key,
"x-api-source": "pipecat",
}
self._websocket = await websocket_connect(
ws_url,
additional_headers=headers,
)
await self._call_event_handler("on_connected")
setup_msg = {
"type": "setup",
"input_format": "pcm",
}
if self._json_config is not None:
setup_msg["json_config"] = self._json_config
await self._websocket.send(json.dumps(setup_msg))
ready_msg = await self._websocket.recv()
ready_msg = json.loads(ready_msg)
if ready_msg["type"] == "error":
raise Exception(f"received error {ready_msg['message']}")
if ready_msg["type"] != "ready":
raise Exception(f"unexpected first message type {ready_msg['type']}")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
raise
async def _disconnect(self):
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _disconnect_websocket(self):
try:
if self._websocket and self._websocket.state is State.OPEN:
logger.debug("Disconnecting from Gradium STT")
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _process_messages(self):
async for message in self._get_websocket():
try:
data = json.loads(message)
await self._process_response(data)
except json.JSONDecodeError:
logger.warning(f"Received non-JSON message: {message}")
async def _receive_messages(self):
while True:
await self._process_messages()
logger.debug(f"{self} Gradium connection was disconnected (timeout?), reconnecting")
await self._connect_websocket()
async def _process_response(self, msg):
type_ = msg.get("type", "")
if type_ == "text":
await self._handle_text(msg["text"])
elif type_ == "end_of_stream":
await self._handle_end_of_stream()
elif type_ == "error":
await self.push_error(error_msg=f"Error: {msg}")
async def _handle_end_of_stream(self):
"""Handle termination message."""
logger.debug("Received end_of_stream message from server")
async def _handle_text(self, text: str):
"""Handle transcription results."""
await self.push_frame(
TranscriptionFrame(
text,
self._user_id,
time_now_iso8601(),
)
)

View File

@@ -1,315 +0,0 @@
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
"""Gradium Text-to-Speech service implementation."""
import base64
import json
import uuid
from typing import Any, AsyncGenerator, Mapping, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import InterruptibleWordTTSService
from pipecat.utils.tracing.service_decorators import traced_tts
try:
from websockets import ConnectionClosedOK
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Gradium, you need to `pip install pipecat-ai[gradium]`.")
raise Exception(f"Missing module: {e}")
SAMPLE_RATE = 48000
class GradiumTTSService(InterruptibleWordTTSService):
"""Text-to-Speech service using Gradium's websocket API."""
class InputParams(BaseModel):
"""Configuration parameters for Gradium TTS service.
Parameters:
temp: Temperature to be used for generation, defaults to 0.6.
"""
temp: Optional[float] = 0.6
def __init__(
self,
*,
api_key: str,
voice_id: str = "YTpq7expH9539ERJ",
url: str = "wss://eu.api.gradium.ai/api/speech/tts",
model: str = "default",
json_config: Optional[str] = None,
params: Optional[InputParams] = None,
**kwargs,
):
"""Initialize the Gradium TTS service.
Args:
api_key: Gradium API key for authentication.
voice_id: the voice identifier.
url: Gradium websocket API endpoint.
model: Model ID to use for synthesis.
json_config: Optional JSON configuration string for additional model settings.
params: Additional configuration parameters.
**kwargs: Additional arguments passed to parent class.
"""
# Initialize with parent class settings for proper frame handling
super().__init__(
push_stop_frames=True,
pause_frame_processing=True,
sample_rate=SAMPLE_RATE,
**kwargs,
)
params = params or GradiumTTSService.InputParams()
# Store service configuration
self._api_key = api_key
self._url = url
self._voice_id = voice_id
self._json_config = json_config
self._model = model
self._settings = {
"voice_id": voice_id,
"model_name": model,
"output_format": "pcm",
}
# State tracking
self._receive_task = None
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Gradium service supports metrics generation.
"""
return True
async def set_model(self, model: str):
"""Update the TTS model.
Args:
model: The model name to use for synthesis.
"""
self._model = model
await super().set_model(model)
async def _update_settings(self, settings: Mapping[str, Any]):
"""Update service settings and reconnect if voice changed."""
prev_voice = self._voice_id
await super()._update_settings(settings)
if not prev_voice == self._voice_id:
self._settings["voice_id"] = self._voice_id
logger.info(f"Switching TTS voice to: [{self._voice_id}]")
await self._disconnect()
await self._connect()
def _build_msg(self, text: str = "") -> dict:
"""Build JSON message for Gradium API."""
return {"text": text, "type": "text"}
async def start(self, frame: StartFrame):
"""Start the service and establish websocket connection.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
"""Stop the service and close connection.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel current operation and clean up.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
async def _connect(self):
"""Establish websocket connection and start receive task."""
logger.debug(f"{self}: connecting")
# If the server disconnected, cancel the receive-task so that it can be reset below.
if self._websocket is None or self._websocket.state is not State.OPEN:
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._connect_websocket()
if self._websocket and not self._receive_task:
logger.debug(f"{self}: setting receive task")
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
"""Close websocket connection and clean up tasks."""
logger.debug(f"{self}: disconnecting")
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
"""Connect to Gradium websocket API with configured settings."""
try:
if self._websocket and self._websocket.state is State.OPEN:
return
headers = {"x-api-key": self._api_key, "x-api-source": "pipecat"}
self._websocket = await websocket_connect(self._url, additional_headers=headers)
setup_msg = {
"type": "setup",
"output_format": "pcm",
"voice_id": self._voice_id,
}
if self._json_config is not None:
setup_msg["json_config"] = self._json_config
await self._websocket.send(json.dumps(setup_msg))
ready_msg = await self._websocket.recv()
ready_msg = json.loads(ready_msg)
if ready_msg["type"] == "error":
raise Exception(f"received error {ready_msg['message']}")
if ready_msg["type"] != "ready":
raise Exception(f"unexpected first message type {ready_msg['type']}")
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
"""Close websocket connection and reset state."""
try:
await self.stop_all_metrics()
if self._websocket:
await self._websocket.close()
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
"""Get active websocket connection or raise exception."""
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def flush_audio(self):
"""Flush any pending audio synthesis."""
if not self._websocket:
return
try:
msg = {"type": "end_of_stream"}
await self._websocket.send(json.dumps(msg))
except ConnectionClosedOK:
logger.debug(f"{self}: connection closed normally during flush")
except Exception as e:
logger.error(f"{self} exception: {e}")
async def _receive_messages(self):
"""Process incoming websocket messages."""
# TODO(laurent): This should not be necessary as it should happen when
# receiving the messages but this does not seem to always be the case
# and that may lead to a busy polling loop.
if self._websocket and self._websocket.state is State.CLOSED:
raise ConnectionClosedOK(None, None)
async for message in self._get_websocket():
msg = json.loads(message)
if msg["type"] == "audio":
# Process audio chunk
await self.stop_ttfb_metrics()
self.start_word_timestamps()
frame = TTSAudioRawFrame(
audio=base64.b64decode(msg["audio"]),
sample_rate=self.sample_rate,
num_channels=1,
)
await self.push_frame(frame)
elif msg["type"] == "text":
await self.add_word_timestamps([(msg["text"], msg["start_s"])])
elif msg["type"] == "end_of_stream":
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
elif msg["type"] == "error":
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(error_msg=f"Error: {msg['message']}")
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push frame and handle end-of-turn conditions.
Args:
frame: The frame to push.
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Gradium's streaming API.
Args:
text: The text to convert to speech.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
_state = self._websocket.state if self._websocket is not None else None
logger.debug(f"{self}: Generating TTS [{text}] {_state}")
try:
if not self._websocket or self._websocket.state is State.CLOSED:
self._websocket = None
await self._connect()
try:
yield TTSStartedFrame()
msg = self._build_msg(text=text)
await self._get_websocket().send(json.dumps(msg))
await self.start_tts_usage_metrics(text)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")

View File

@@ -8,14 +8,10 @@ import base64
import os
from typing import Any, AsyncGenerator, Optional
import httpx
from loguru import logger
from pydantic import BaseModel
from pipecat import __version__
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
@@ -30,7 +26,11 @@ from pipecat.utils.tracing.service_decorators import traced_tts
try:
from hume import AsyncHumeClient
from hume.tts import FormatPcm, PostedUtterance, PostedUtteranceVoiceWithId
from hume.tts import (
FormatPcm,
PostedUtterance,
PostedUtteranceVoiceWithId,
)
from hume.tts.types import TimestampMessage
except ModuleNotFoundError as e: # pragma: no cover - import-time guidance
logger.error(f"Exception: {e}")
@@ -40,12 +40,6 @@ except ModuleNotFoundError as e: # pragma: no cover - import-time guidance
HUME_SAMPLE_RATE = 48_000 # Hume TTS streams at 48 kHz
# Tracking headers for Hume API requests
DEFAULT_HEADERS = {
"X-Hume-Client-Name": "pipecat",
"X-Hume-Client-Version": __version__,
}
class HumeTTSService(WordTTSService):
"""Hume Octave Text-to-Speech service.
@@ -110,11 +104,7 @@ class HumeTTSService(WordTTSService):
**kwargs,
)
# Create a custom httpx.AsyncClient with tracking headers
# Headers are included in all requests made by the Hume SDK
self._http_client = httpx.AsyncClient(headers=DEFAULT_HEADERS)
self._client = AsyncHumeClient(api_key=api_key, httpx_client=self._http_client)
self._client = AsyncHumeClient(api_key=api_key)
self._params = params or HumeTTSService.InputParams()
# Store voice in the base class (mirrors other services)
@@ -148,26 +138,6 @@ class HumeTTSService(WordTTSService):
self._cumulative_time = 0.0
self._started = False
async def stop(self, frame: EndFrame) -> None:
"""Stop the service and cleanup resources.
Args:
frame: The end frame.
"""
await super().stop(frame)
if hasattr(self, "_http_client") and self._http_client:
await self._http_client.aclose()
async def cancel(self, frame: CancelFrame) -> None:
"""Cancel the service and cleanup resources.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
if hasattr(self, "_http_client") and self._http_client:
await self._http_client.aclose()
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push a frame and handle state changes.

View File

@@ -166,17 +166,20 @@ class LLMService(AIService):
# However, subclasses should override this with a more specific adapter when necessary.
adapter_class: Type[BaseLLMAdapter] = OpenAILLMAdapter
def __init__(self, run_in_parallel: bool = True, **kwargs):
def __init__(self, run_in_parallel: bool = True, wait_for_all: bool = False, **kwargs):
"""Initialize the LLM service.
Args:
run_in_parallel: Whether to run function calls in parallel or sequentially.
Defaults to True.
wait_for_all: Whether to wait for all function calls (parallel or
sequential) to complete. Defaults to False.
**kwargs: Additional arguments passed to the parent AIService.
"""
super().__init__(**kwargs)
self._run_in_parallel = run_in_parallel
self._wait_for_all = wait_for_all
self._start_callbacks = {}
self._adapter = self.adapter_class()
self._functions: Dict[Optional[str], FunctionCallRegistryItem] = {}
@@ -543,10 +546,29 @@ class LLMService(AIService):
self._function_call_tasks[task] = runner_item
task.add_done_callback(self._function_call_task_finished)
if self._wait_for_all:
# Protect gather from being cancelled. This will protect all tasks
# form being cancelled. That is fine, because we cancel them
# explicitly when handling the interruption (InterruptionFrame). We
# need to set `return_exceptions=True` because `asyncio.shield()`
# will get cancelled (from FrameProcessor process task), then
# `asyncio.gather()` will keep running (because it was protected by
# the shield). Then, individiaul function call tasks will be
# cancelled by us and we don't need to propagate those
# CancelledErrors at that point.
await asyncio.shield(asyncio.gather(*tasks, return_exceptions=True))
async def _run_sequential_function_calls(self, runner_items: Sequence[FunctionCallRunnerItem]):
# Enqueue all function calls for background execution.
for runner_item in runner_items:
await self._sequential_runner_queue.put(runner_item)
if self._wait_for_all:
# Run each function call sequentially, waiting for each to complete.
for runner_item in runner_items:
self._function_call_tasks[None] = runner_item
await self._run_function_call(runner_item)
del self._function_call_tasks[None]
else:
# Enqueue all function calls for background execution.
for runner_item in runner_items:
await self._sequential_runner_queue.put(runner_item)
async def _call_start_function(
self, context: OpenAILLMContext | LLMContext, function_name: str

View File

@@ -7,7 +7,7 @@
"""MCP (Model Context Protocol) client for integrating external tools with LLMs."""
import json
from typing import Any, Callable, Dict, List, Optional, TypeAlias
from typing import Any, Dict, List, TypeAlias
from loguru import logger
@@ -46,24 +46,17 @@ class MCPClient(BaseObject):
def __init__(
self,
server_params: ServerParameters,
tools_filter: Optional[List[str]] = None,
tools_output_filters: Optional[Dict[str, Callable[[Any], Any]]] = None,
**kwargs,
):
"""Initialize the MCP client with server parameters.
Args:
server_params: Server connection parameters (stdio or SSE).
tools_filter: Optional list of tool names to register. If None, all tools are registered.
tools_output_filters: Optional dict mapping tool names to filter functions that process tool outputs.
Each filter function receives the raw tool output (any type) and returns the processed output (any type).
**kwargs: Additional arguments passed to the parent BaseObject.
"""
super().__init__(**kwargs)
self._server_params = server_params
self._session = ClientSession
self._tools_filter = tools_filter
self._tools_output_filters = tools_output_filters or {}
if isinstance(server_params, StdioServerParameters):
self._client = stdio_client
@@ -271,26 +264,13 @@ class MCPClient(BaseObject):
else:
# logger.debug(f"Non-text result content: '{content}'")
pass
logger.info(f"Tool '{function_name}' completed successfully")
logger.debug(f"Final response: {response}")
else:
logger.error(f"Error getting content from {function_name} results.")
# Apply output filter if configured for this tool
if function_name in self._tools_output_filters:
try:
response = self._tools_output_filters[function_name](response)
logger.debug(f"Final response (after filter): {response}")
except Exception:
logger.error(f"Error applying output filter for {function_name}")
response = ""
if response and len(response) and isinstance(response, str):
logger.info(f"Tool '{function_name}' completed successfully")
logger.debug(f"Final response: {response}")
else:
response = "Sorry, could not call the mcp tool"
await result_callback(response)
final_response = response if len(response) else "Sorry, could not call the mcp tool"
await result_callback(final_response)
async def _list_tools_helper(self, session):
available_tools = await session.list_tools()
@@ -303,12 +283,6 @@ class MCPClient(BaseObject):
for tool in available_tools.tools:
tool_name = tool.name
# Apply tools filter if configured
if self._tools_filter and tool_name not in self._tools_filter:
logger.debug(f"Skipping tool '{tool_name}' - not in allowed tools list")
continue
logger.debug(f"Processing tool: {tool_name}")
logger.debug(f"Tool description: {tool.description}")

View File

@@ -133,7 +133,6 @@ class BaseOpenAILLMService(LLMService):
self._retry_timeout_secs = retry_timeout_secs
self._retry_on_timeout = retry_on_timeout
self.set_model_name(model)
self._full_model_name: str = ""
self._client = self.create_client(
api_key=api_key,
base_url=base_url,
@@ -186,22 +185,6 @@ class BaseOpenAILLMService(LLMService):
"""
return True
def set_full_model_name(self, full_model_name: str):
"""Set the full AI model name.
Args:
full_model_name: The full name of the AI model to use.
"""
self._full_model_name = full_model_name
def get_full_model_name(self):
"""Get the current full model name.
Returns:
The full name of the AI model being used.
"""
return self._full_model_name
async def get_chat_completions(
self, params_from_context: OpenAILLMInvocationParams
) -> AsyncStream[ChatCompletionChunk]:
@@ -377,9 +360,6 @@ class BaseOpenAILLMService(LLMService):
)
await self.start_llm_usage_metrics(tokens)
if chunk.model and self.get_full_model_name() != chunk.model:
self.set_full_model_name(chunk.model)
if chunk.choices is None or len(chunk.choices) == 0:
continue

View File

@@ -12,7 +12,7 @@ from typing import Awaitable, Callable, Optional
import websockets
from loguru import logger
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
from websockets.exceptions import ConnectionClosedOK
from websockets.protocol import State
from pipecat.frames.frames import ErrorFrame
@@ -137,10 +137,6 @@ class WebsocketService(ABC):
# Normal closure, don't retry
logger.debug(f"{self} connection closed normally: {e}")
break
except ConnectionClosedError as e:
# Error closure, don't retry
logger.warning(f"{self} connection closed, but with an error: {e}")
break
except Exception as e:
message = f"{self} error receiving messages: {e}"
logger.error(message)

View File

@@ -12,7 +12,6 @@ comprehensive monitoring and cleanup capabilities.
"""
import asyncio
import traceback
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Coroutine, Dict, Optional, Sequence
@@ -163,9 +162,7 @@ class TaskManager(BaseTaskManager):
# Re-raise the exception to ensure the task is cancelled.
raise
except Exception as e:
tb = traceback.extract_tb(e.__traceback__)
last = tb[-1]
logger.error(f"{name} unexpected exception ({last.filename}:{last.lineno}): {e}")
logger.error(f"{name}: unexpected exception: {e}")
if not self._params:
raise Exception("TaskManager is not setup: unable to get event loop")
@@ -200,17 +197,9 @@ class TaskManager(BaseTaskManager):
# Here are sure the task is cancelled properly.
pass
except Exception as e:
tb = traceback.extract_tb(e.__traceback__)
last = tb[-1]
logger.error(
f"{name} unexpected exception while cancelling task ({last.filename}:{last.lineno}): {e}"
)
logger.error(f"{name}: unexpected exception while cancelling task: {e}")
except BaseException as e:
tb = traceback.extract_tb(e.__traceback__)
last = tb[-1]
logger.critical(
f"{name} fatal base exception while cancelling task ({last.filename}:{last.lineno}): {e}"
)
logger.critical(f"{name}: fatal base exception while cancelling task: {e}")
raise
def current_tasks(self) -> Sequence[asyncio.Task]:

View File

@@ -203,7 +203,7 @@ def parse_start_end_tags(
class TextPartForConcatenation:
"""Class representing a part of text for concatenation with concatenate_aggregated_text.
Parameters:
Attributes:
text: The text content.
includes_inter_part_spaces: Whether any necessary inter-frame
(leading/trailing) spaces are already included in the text.

View File

@@ -26,15 +26,15 @@ class MatchAction(Enum):
Parameters:
REMOVE: The text along with its delimiters will be removed from the streaming text.
Sentence aggregation will continue on as if this text did not exist.
Sentence aggregation will continue on as if this text did not exist.
KEEP: The delimiters will be removed, but the content between them will be kept.
Sentence aggregation will continue on with the internal text included.
Sentence aggregation will continue on with the internal text included.
AGGREGATE: The delimiters will be removed and the content between will be treated
as a separate aggregation. Any text before the start of the pattern will be
returned early, whether or not a complete sentence was found. Then the pattern
will be returned. Then the aggregation will continue on sentence matching after
the closing delimiter is found. The content between the delimiters is not
aggregated by sentence. It is aggregated as one single block of text.
as a separate aggregation. Any text before the start of the pattern will be
returned early, whether or not a complete sentence was found. Then the pattern
will be returned. Then the aggregation will continue on sentence matching after
the closing delimiter is found. The content between the delimiters is not
aggregated by sentence. It is aggregated as one single block of text.
"""
REMOVE = "remove"
@@ -133,15 +133,17 @@ class PatternPairAggregator(SimpleTextAggregator):
Args:
type: Identifier for this pattern pair. Should be unique and ideally descriptive.
(e.g., 'code', 'speaker', 'custom'). type can not be 'sentence' or 'word' as
those are reserved for the default behavior.
(e.g., 'code', 'speaker', 'custom'). type can not be 'sentence' or 'word' as
those are reserved for the default behavior.
start_pattern: Pattern that marks the beginning of content.
end_pattern: Pattern that marks the end of content.
action: What to do when a complete pattern is matched.
- MatchAction.REMOVE: Remove the matched pattern from the text.
- MatchAction.KEEP: Keep the matched pattern in the text and treat it as normal text. This allows you to register handlers for the pattern without affecting the aggregation logic.
- MatchAction.AGGREGATE: Return the matched pattern as a separate aggregation object.
action: What to do when a complete pattern is matched:
- MatchAction.REMOVE: Remove the matched pattern from the text.
- MatchAction.KEEP: Keep the matched pattern in the text and treat it as
normal text. This allows you to register handlers for
the pattern without affecting the aggregation logic.
- MatchAction.AGGREGATE: Return the matched pattern as a separate
aggregation object.
Returns:
Self for method chaining.

View File

@@ -483,9 +483,7 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
# Add all available attributes to the span
attribute_kwargs = {
"service_name": service_class_name,
"model": getattr(
self, getattr(self, "_full_model_name", "model_name"), "unknown"
),
"model": getattr(self, "model_name", "unknown"),
"stream": True, # Most LLM services use streaming
"parameters": params,
}

22
uv.lock generated
View File

@@ -4496,9 +4496,6 @@ google = [
{ name = "google-genai" },
{ name = "websockets" },
]
gradium = [
{ name = "websockets" },
]
groq = [
{ name = "groq" },
]
@@ -4658,7 +4655,6 @@ dev = [
{ name = "ruff" },
{ name = "setuptools" },
{ name = "setuptools-scm" },
{ name = "towncrier" },
]
docs = [
{ name = "sphinx", version = "8.1.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" },
@@ -4734,7 +4730,6 @@ requires-dist = [
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'fish'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'gladia'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'google'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'gradium'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'heygen'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'lmnt'" },
{ name = "pipecat-ai", extras = ["websockets-base"], marker = "extra == 'neuphonic'" },
@@ -4776,7 +4771,7 @@ requires-dist = [
{ name = "wait-for2", marker = "python_full_version < '3.12'", specifier = ">=0.4.1" },
{ name = "websockets", marker = "extra == 'websockets-base'", specifier = ">=13.1,<16.0" },
]
provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "gradium", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "koala", "krisp", "langchain", "livekit", "lmnt", "local", "local-smart-turn", "local-smart-turn-v3", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "neuphonic", "noisereduce", "nvidia", "openai", "openpipe", "openrouter", "perplexity", "playht", "qwen", "remote-smart-turn", "rime", "riva", "runner", "sagemaker", "sambanova", "sarvam", "sentry", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"]
provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "koala", "krisp", "langchain", "livekit", "lmnt", "local", "local-smart-turn", "local-smart-turn-v3", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "neuphonic", "noisereduce", "nvidia", "openai", "openpipe", "openrouter", "perplexity", "playht", "qwen", "remote-smart-turn", "rime", "riva", "runner", "sagemaker", "sambanova", "sarvam", "sentry", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"]
[package.metadata.requires-dev]
dev = [
@@ -4793,7 +4788,6 @@ dev = [
{ name = "ruff", specifier = ">=0.12.11,<1" },
{ name = "setuptools", specifier = "~=78.1.1" },
{ name = "setuptools-scm", specifier = "~=8.3.1" },
{ name = "towncrier", specifier = "~=25.8.0" },
]
docs = [
{ name = "sphinx", specifier = ">=8.1.3" },
@@ -7253,20 +7247,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c1/7b/30d423bdb2546250d719d7821aaf9058cc093d165565b245b159c788a9dd/torchvision-0.22.0-cp313-cp313t-win_amd64.whl", hash = "sha256:e5d680162694fac4c8a374954e261ddfb4eb0ce103287b0f693e4e9c579ef957", size = 1638621, upload-time = "2025-04-23T14:41:46.06Z" },
]
[[package]]
name = "towncrier"
version = "25.8.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
{ name = "jinja2" },
{ name = "tomli", marker = "python_full_version < '3.11'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c2/eb/5bf25a34123698d3bbab39c5bc5375f8f8bcbcc5a136964ade66935b8b9d/towncrier-25.8.0.tar.gz", hash = "sha256:eef16d29f831ad57abb3ae32a0565739866219f1ebfbdd297d32894eb9940eb1", size = 76322, upload-time = "2025-08-30T11:41:55.393Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/42/06/8ba22ec32c74ac1be3baa26116e3c28bc0e76a5387476921d20b6fdade11/towncrier-25.8.0-py3-none-any.whl", hash = "sha256:b953d133d98f9aeae9084b56a3563fd2519dfc6ec33f61c9cd2c61ff243fb513", size = 65101, upload-time = "2025-08-30T11:41:53.644Z" },
]
[[package]]
name = "tqdm"
version = "4.67.1"