Compare commits

...

76 Commits

Author SHA1 Message Date
kompfner
c861beb066 Fix await usage in transcription timeout task 2026-01-23 11:15:16 -05:00
Aleix Conchillo Flaqué
8951442b8e Merge pull request #3534 from pipecat-ai/aleix/claude-skills-pr-description
claude: add pr-description skill
2026-01-22 17:34:46 -08:00
Aleix Conchillo Flaqué
7e6e3031e7 claude: add pr-description skill 2026-01-22 13:41:50 -08:00
Aleix Conchillo Flaqué
308829f92b Merge pull request #3533 from pipecat-ai/aleix/claude-skills-docstring
claude: add docstring skill
2026-01-22 12:58:38 -08:00
Aleix Conchillo Flaqué
82a799e63e claude: add docstring skill 2026-01-22 12:53:38 -08:00
Cale Shapera
6b5bcae86f change default Inworld TTS model to inworld-tts-1.5-max (#3531) 2026-01-22 14:21:15 -05:00
Mark Backman
836073849c Merge pull request #3527 from weakcamel/patch-1
Update README.md - fix Google Imagen URL
2026-01-22 10:46:10 -05:00
Waldek Maleska
b13b65d6e2 Update README.md - fix Google Imagen URL 2026-01-22 15:17:41 +00:00
Mark Backman
3d545b718d Merge pull request #3344 from omChauhanDev/fix/stt-dynamic-language-update
fix: treat language as first-class STT setting
2026-01-22 09:21:56 -05:00
marcus-daily
f2fa5d9733 Updating changelog 2026-01-22 14:17:59 +00:00
marcus-daily
76b774072c Formatting fixes 2026-01-22 14:17:59 +00:00
marcus-daily
b6341ffaa5 Save Smart Turn input data if SMART_TURN_LOG_DATA is set 2026-01-22 14:17:59 +00:00
Mark Backman
29fae67c9e Merge pull request #3523 from omChauhanDev/add-location-support-google-tts
feat(google): add location parameter to TTS services
2026-01-22 09:12:16 -05:00
Mark Backman
718ea1c15e Merge pull request #3526 from pipecat-ai/mb/remove-logs
Remove application logs
2026-01-22 08:48:07 -05:00
Mark Backman
8e09d94614 Remove application logs 2026-01-22 08:28:52 -05:00
Aleix Conchillo Flaqué
de73e28563 Merge pull request #3510 from omChauhanDev/feat/add-reached-filter-methods
feat(task): add additive filter methods for frame monitoring
2026-01-21 21:05:33 -08:00
Aleix Conchillo Flaqué
55250b4f7e Merge pull request #3521 from pipecat-ai/aleix/claude-changelog-skill
claude: initial /changelog skill
2026-01-21 20:50:47 -08:00
Om Chauhan
281145a991 added changelog 2026-01-22 09:55:57 +05:30
Om Chauhan
7bd32e2fe5 feat(google): add location parameter to TTS services 2026-01-22 09:49:19 +05:30
James Hush
8f05d95f50 feat: add video_out_codec parameter for DailyTransport (#3520)
* feat: add video_out_codec parameter for DailyTransport

Add video_out_codec parameter to TransportParams allowing configuration
of the preferred video codec (VP8, H264, H265) for video output.

When set, this passes the preferredCodec option to Daily's
VideoPublishingSettings during the join operation.

* chore: move video_out_codec parameter to changelog folder (#3522)

* Initial plan

* Move video_out_codec parameter to changelog/3520.added.md

Co-authored-by: jamsea <614910+jamsea@users.noreply.github.com>

* Revert all CHANGELOG.md changes, keep only changelog/3520.added.md

Co-authored-by: jamsea <614910+jamsea@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: jamsea <614910+jamsea@users.noreply.github.com>

---------

Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: jamsea <614910+jamsea@users.noreply.github.com>
2026-01-22 11:31:07 +08:00
Om Chauhan
87c12f3098 changed frame filter storage type from tuples to sets 2026-01-22 08:43:46 +05:30
Om Chauhan
9c0bf89247 added changelog 2026-01-22 08:43:46 +05:30
Om Chauhan
6e44a2ab49 feat(task): add additive filter methods for frame monitoring 2026-01-22 08:43:46 +05:30
Aleix Conchillo Flaqué
7aa7b86aed claude: initial /changelog skill 2026-01-21 18:43:04 -08:00
Aleix Conchillo Flaqué
5ad9faeb4c Merge pull request #3519 from pipecat-ai/aleix/embedded-rtvi-processor
automatically add RTVI to the pipeline
2026-01-21 18:17:26 -08:00
Aleix Conchillo Flaqué
9e8f8b45c6 added changelog files for #3519 2026-01-21 18:14:17 -08:00
Aleix Conchillo Flaqué
0ee11ad333 tests: disable RTVI in tests by default 2026-01-21 18:14:17 -08:00
Aleix Conchillo Flaqué
124a3c35af RTVIObserver: don't handle some frames direction 2026-01-21 18:14:17 -08:00
Aleix Conchillo Flaqué
054e504868 examples(foundational): remove RTVI (automatically added by PipelineTask) 2026-01-21 18:14:17 -08:00
Aleix Conchillo Flaqué
e85a00cc0e PipelineTask: automatically add RTVI processor and RTVI observer
If `enable_rtvi` is enabled (enabled by default) and RTVI processor will be
added automatically to the pipeline. Also, and RTVI observer will be
registered.
2026-01-21 18:14:17 -08:00
Aleix Conchillo Flaqué
cc61cdbba3 RTVIProcessor: add create_rtvi_observer() 2026-01-21 18:14:17 -08:00
Aleix Conchillo Flaqué
62f4708d43 transports: broadcast InputTransportMessageFrame frames 2026-01-21 18:14:17 -08:00
Aleix Conchillo Flaqué
ba0ddb1832 FrameProcessor: copy kwargs when broadcasting frame 2026-01-21 18:14:17 -08:00
Aleix Conchillo Flaqué
eacd2a4b71 FrameProcessor: add broadcast_frame_instance() 2026-01-21 18:14:17 -08:00
Mark Backman
7ed110650d Merge pull request #3516 from okue/minorpatch1
refactor(user_mute): remove unnecessary _bot_speaking assignment in _handle_bot_stopped_speaking
2026-01-21 10:33:59 -05:00
okue
4a724379fc refactor(user_mute): remove unnecessary _bot_speaking assignment in _handle_bot_stopped_speaking
The _bot_speaking flag does not need to be set in this method,
so the redundant assignment has been removed.
2026-01-21 23:59:15 +09:00
Aleix Conchillo Flaqué
768d3958dd Merge pull request #3512 from pipecat-ai/changelog-0.0.100
Release 0.0.100 - Changelog Update
2026-01-20 19:32:56 -08:00
aconchillo
5f9ff8bd58 Update changelog for version 0.0.100 2026-01-20 19:21:19 -08:00
Aleix Conchillo Flaqué
59ed422052 Merge pull request #3511 from pipecat-ai/aleix/camb-tts-client-on-start
CambTTSService: initialize client during StartFrame
2026-01-20 19:17:45 -08:00
Aleix Conchillo Flaqué
7e0ca113af CambTTSService: initialize client during StartFrame 2026-01-20 19:07:12 -08:00
Aleix Conchillo Flaqué
13c52e0e6d Merge pull request #3509 from pipecat-ai/aleix/nvidia-stt-tts-improvements
NVIDIA STT/TTS performance improvements
2026-01-20 16:39:12 -08:00
Aleix Conchillo Flaqué
a787fd9cd8 NVIDIATTSService: process incoming audio frame right away
Process audio as soon as we receive it from the generator. Previously, we were
reading from the generator and adding elements into a queue until there was no
more data, then we would process the queue.
2026-01-20 15:41:05 -08:00
Aleix Conchillo Flaqué
14495c425a NVIDIASTTService: no need for additional queue and task 2026-01-20 13:50:17 -08:00
Aleix Conchillo Flaqué
461bd0a2e0 update changelog for #3494 and #3499 2026-01-20 13:26:40 -08:00
Aleix Conchillo Flaqué
bd45ce2b4e Merge pull request #3499 from lukepayyapilli/fix/livekit-video-queue-memory-leak
fix(livekit): prevent memory leak when video_in_enabled is False
2026-01-20 13:21:21 -08:00
Aleix Conchillo Flaqué
a266644b06 Merge pull request #3494 from omChauhanDev/fix/uninterruptible-frame-handling
fix: preserve UninterruptibleFrames in __reset_process_queue
2026-01-20 13:19:40 -08:00
Mark Backman
03faadd7f9 Merge pull request #3508 from pipecat-ai/ss/log-daily-ids
Log Daily participant and meeting session IDs upon successful join in…
2026-01-20 15:43:48 -05:00
Aleix Conchillo Flaqué
bf43032652 Merge pull request #3504 from pipecat-ai/aleix/nvidia-stt-tts-error-handling
NVIDIA STT/TTS error handling
2026-01-20 09:41:08 -08:00
Sunah Suh
fa6f924b31 Log Daily participant and meeting session IDs upon successful join in Daily Transport 2026-01-20 11:31:17 -06:00
Aleix Conchillo Flaqué
a010a020fd add changelog fo 3504 2026-01-20 09:03:30 -08:00
Aleix Conchillo Flaqué
655006aff5 NvidiaSegmentedSTTService: simplify exception handling 2026-01-20 08:58:14 -08:00
Aleix Conchillo Flaqué
671dc8cd9b NvidiaSTTService: initialize client on StartFrame
Initialize client on StartFrame so errrors are reported within the pipeline.
2026-01-20 08:58:14 -08:00
Aleix Conchillo Flaqué
9a718ded1e NvidiaTTSService: initialize client on StartFrame
Initialize client on StartFrame so errrors are reported within the pipeline.
2026-01-20 08:58:14 -08:00
Aleix Conchillo Flaqué
024809b39a Merge pull request #3503 from pipecat-ai/aleix/ai-service-start-end-cancel
AIService: handle StartFrame/EndFrame/CancelFrame exceptions
2026-01-20 08:56:39 -08:00
Aleix Conchillo Flaqué
6cf0d53d00 AIService: handle StartFrame/EndFrame/CancelFrame exceptions
If AIService subclasses implement start()/stop()/cancel() and exception are not
handled, execution will not continue and therefore the originator frames will
not be pushed. This would cause the pipeline to not be started (i.e. StartFrame
would not be pushed downstream) or stopped properly.
2026-01-20 08:54:22 -08:00
kompfner
778dacc9a8 Merge pull request #3486 from pipecat-ai/pk/fix-nova-sonic-reset-conversation
Fix `AWSNovaSonicLLMService.reset_conversation()`
2026-01-20 10:07:38 -05:00
Paul Kompfner
06b3ecd2d6 In AWS Nova Sonic service, send the "interactive" user message (which triggers the bot response) only after sending the audio input start event, per the AWS team's recommendation 2026-01-20 09:56:25 -05:00
Paul Kompfner
b4d143e39b Add CHANGELOG for fixing AWSNovaSonicLLMService.reset_conversation() 2026-01-20 09:56:25 -05:00
Paul Kompfner
c89083e72e Improve 20e example to ask the bot to give a recap when loading a previous conversation from disk 2026-01-20 09:56:25 -05:00
Luke Payyapilli
1ac811ab32 chore: revert unrelated uv.lock changes 2026-01-20 09:19:43 -05:00
Luke Payyapilli
f6359d460e chore: install livekit as optional extra in CI instead of dev dep 2026-01-20 09:16:16 -05:00
Aleix Conchillo Flaqué
f03a7175c7 Merge pull request #3501 from pipecat-ai/aleix/improve-eval-numerical-word-prompt
scripts(eval): give examples to numerical word answers
2026-01-19 20:22:06 -08:00
Aleix Conchillo Flaqué
aed44c863a scripts(eval): give examples to numerical word answers
Some models need extra help.
2026-01-19 14:37:00 -08:00
Mark Backman
cddd6d5b0a Merge pull request #3492 from pipecat-ai/mb/remove-unused-imports
Remove unused imports
2026-01-19 14:07:16 -05:00
Mark Backman
11cf891ac8 Manual updates for unused imports 2026-01-19 14:03:22 -05:00
Luke Payyapilli
c89ae717fe style: fix ruff formatting 2026-01-19 11:13:41 -05:00
Luke Payyapilli
562bdd3084 test: add livekit to dev deps and improve test clarity 2026-01-19 11:11:54 -05:00
Mark Backman
cc4c3650e1 Merge pull request #3491 from pipecat-ai/mb/update-release-evals
Add Camb TTS to release evals
2026-01-19 11:04:05 -05:00
Luke Payyapilli
dfc1f09b77 fix(livekit): prevent memory leak when video_in_enabled is False 2026-01-19 11:00:23 -05:00
Filipi da Silva Fuchter
5fc46cc450 Merge pull request #3493 from omChauhanDev/fix/globally-unique-pc-id
fix: make SmallWebRTCConnection pc_id globally unique
2026-01-19 09:04:48 -05:00
Om Chauhan
4a9eb82f92 fix: preserve UninterruptibleFrames in __reset_process_queue 2026-01-18 20:39:13 +05:30
Om Chauhan
990d8386e4 fix: make SmallWebRTCConnection pc_id globally unique 2026-01-18 19:41:51 +05:30
Mark Backman
ce7d823770 Remove unused imports 2026-01-18 08:22:22 -05:00
Mark Backman
0b93c3f900 Add Camb TTS to release evals 2026-01-17 16:27:16 -05:00
Paul Kompfner
6fa797c8e4 Fix AWS Nova Sonic reset_conversation(), which would previously error out.
Issues:
- After disconnecting, we were prematurely sending audio messages using the new prompt and content names, before the new prompt and content were created
- We weren't properly sending system instruction and conversation history messages to Nova Sonic with `"interactive": false`
2026-01-16 22:31:54 -05:00
Om Chauhan
1ceb01665f fix: treat language as first-class STT setting 2026-01-04 11:04:30 +05:30
124 changed files with 1492 additions and 345 deletions

View File

@@ -0,0 +1,40 @@
---
name: changelog
description: Create changelog files for important commits in a PR
---
Create changelog files for the important commits in this PR. The PR number is provided as an argument.
## Instructions
1. First, check what commits are on the current branch compared to main:
```
git log main..HEAD --oneline
```
2. For each significant change, create a changelog file in the `changelog/` folder using the format:
- `{PR_NUMBER}.added.md` - for new features
- `{PR_NUMBER}.added.2.md`, `{PR_NUMBER}.added.3.md` - for additional new features
- `{PR_NUMBER}.changed.md` - for changes to existing functionality
- `{PR_NUMBER}.fixed.md` - for bug fixes
- `{PR_NUMBER}.deprecated.md` - for deprecations
3. Each changelog file should at least contain a main single line starting with `- ` followed by a clear description of the change.
4. If the change is complicated, changelog files can have indented lines after the main line with additional details or code samples.
5. Use ⚠️ emoji prefix for breaking changes.
## Example
For PR #3519 with a new feature and a bug fix:
`changelog/3519.added.md`:
```
- Added `SomeNewFeature` for doing something useful.
```
`changelog/3519.fixed.md`:
```
- Fixed an issue where something was not working correctly.
```

View File

@@ -0,0 +1,257 @@
---
name: docstring
description: Document a Python module and its classes using Google style
---
Document a Python module and its classes using Google-style docstrings following project conventions. The class name is provided as an argument.
## Instructions
1. First, find the class in the codebase:
```
Search for "class ClassName" in src/pipecat/
```
2. If multiple files contain that class name:
- List all matches with their file paths
- Ask the user which one they want to document
- Wait for confirmation before proceeding
3. Once the file is identified, read the module to understand its structure:
- Identify all classes, functions, and important type aliases
- Understand the purpose of each component
4. Apply documentation in this order:
- Module docstring (at top, after imports)
- Class docstrings
- `__init__` methods (always document constructor parameters)
- Public methods (not starting with `_`)
- Dataclass/config classes with field descriptions
5. Skip documentation for:
- Private methods (starting with `_`)
- Simple dunder methods (`__str__`, `__repr__`, `__post_init__`)
- Very simple pass-through properties
- **Already documented code** - If a class, method, or function already has a complete docstring that follows the project style, do not modify it. A docstring is complete if it has:
- A one-line summary
- Args section (if it has parameters)
- Returns section (if it returns something meaningful)
- Only add or improve documentation where it is missing or incomplete
## Module Docstring Format
```python
"""[One-line description of module purpose].
[Optional: Longer explanation of functionality, key classes, or use cases.]
"""
```
Example:
```python
"""Neuphonic text-to-speech service implementations.
This module provides WebSocket and HTTP-based integrations with Neuphonic's
text-to-speech API for real-time audio synthesis.
"""
```
## Class Docstring Format
```python
class ClassName:
"""One-line summary describing what the class does.
[Longer description explaining purpose, behavior, and key features.
Use action-oriented language.]
[Optional: Event handlers, usage notes, or important caveats.]
"""
```
Example:
```python
class FrameProcessor(BaseObject):
"""Base class for all frame processors in the pipeline.
Frame processors are the building blocks of Pipecat pipelines, they can be
linked to form complex processing pipelines. They receive frames, process
them, and pass them to the next or previous processor in the chain.
Event handlers available:
- on_before_process_frame: Called before a frame is processed
- on_after_process_frame: Called after a frame is processed
Example::
@processor.event_handler("on_before_process_frame")
async def on_before_process_frame(processor, frame):
...
@processor.event_handler("on_after_process_frame")
async def on_after_process_frame(processor, frame):
...
"""
```
Note: When listing event handlers, do NOT use backticks. Include an `Example::` section (with double colon for Sphinx) showing the decorator pattern and function signature for each event.
## Constructor (`__init__`) Format
```python
def __init__(self, *, param1: Type, param2: Type = default, **kwargs):
"""Initialize the [ClassName].
Args:
param1: Description of param1 and its purpose.
param2: Description of param2. Defaults to [default].
**kwargs: Additional arguments passed to parent class.
"""
```
Example:
```python
def __init__(
self,
*,
api_key: str,
voice_id: Optional[str] = None,
sample_rate: Optional[int] = 22050,
**kwargs,
):
"""Initialize the Neuphonic TTS service.
Args:
api_key: Neuphonic API key for authentication.
voice_id: ID of the voice to use for synthesis.
sample_rate: Audio sample rate in Hz. Defaults to 22050.
**kwargs: Additional arguments passed to parent InterruptibleTTSService.
"""
```
## Method Docstring Format
```python
async def method_name(self, param1: Type) -> ReturnType:
"""One-line summary of what method does.
[Longer description if behavior isn't obvious.]
Args:
param1: Description of param1.
Returns:
Description of return value.
Raises:
ExceptionType: When this exception is raised.
"""
```
Example:
```python
async def put(self, item: Tuple[Frame, FrameDirection, FrameCallback]):
"""Put an item into the priority queue.
System frames (`SystemFrame`) have higher priority than any other
frames. If a non-frame item is provided it will have the highest priority.
Args:
item: The item to enqueue.
"""
```
## Dataclass/Config Format
```python
@dataclass
class ConfigName:
"""One-line description of configuration.
[Explanation of when/how to use this config.]
Parameters:
field1: Description of field1.
field2: Description of field2. Defaults to [default].
"""
field1: Type
field2: Type = default_value
```
Example:
```python
@dataclass
class FrameProcessorSetup:
"""Configuration parameters for frame processor initialization.
Parameters:
clock: The clock instance for timing operations.
task_manager: The task manager for handling async operations.
observer: Optional observer for monitoring frame processing events.
"""
clock: BaseClock
task_manager: BaseTaskManager
observer: Optional[BaseObserver] = None
```
## Enum Documentation Format
```python
class EnumName(Enum):
"""One-line description of the enum purpose.
[Longer description of how the enum is used.]
Parameters:
VALUE1: Description of VALUE1.
VALUE2: Description of VALUE2.
"""
VALUE1 = 1
VALUE2 = 2
```
## Writing Style Guidelines
- **Concise and professional** - No casual language or filler words
- **Action-oriented** - Start with verbs: "Processes...", "Manages...", "Converts..."
- **Purpose before implementation** - Explain WHY before HOW
- **Clear parameter descriptions** - Include type hints, defaults, and purpose
- **No redundant type info** - Type hints are in the signature, don't repeat in description
- **Use backticks for code references** - Wrap class names, method names, event names, parameter names, and code snippets in backticks
Good: "Neuphonic API key for authentication."
Bad: "str: The API key (string) that is used for authenticating with Neuphonic."
Good: "Triggers `on_speech_started` when the `VADAnalyzer` detects speech."
Bad: "Triggers on_speech_started when the VADAnalyzer detects speech."
## Deprecation Notice Format
When documenting deprecated code:
```python
"""[Description].
.. deprecated:: X.X.X
`ClassName` is deprecated and will be removed in a future version.
Use `NewClassName` instead.
"""
```
## Checklist
Before finishing, verify:
- [ ] Module has a docstring at the top (after copyright header and imports)
- [ ] All public classes have docstrings
- [ ] All `__init__` methods document their parameters
- [ ] All public methods have docstrings with Args/Returns/Raises as needed
- [ ] Dataclasses use "Parameters:" section for field descriptions
- [ ] Enums document each value in "Parameters:" section
- [ ] Writing is concise and action-oriented
- [ ] No documentation added to private methods (starting with `_`)
- [ ] Existing complete docstrings were left unchanged

View File

@@ -0,0 +1,128 @@
---
name: pr-description
description: Update a GitHub PR description with a summary of changes
---
Update a GitHub pull request description based on the changes in the PR.
## Arguments
```
/pr-description <PR_NUMBER> [--fixes <ISSUE_NUMBERS>]
```
- `PR_NUMBER` (required): The pull request number to update
- `--fixes` (optional): Comma-separated issue numbers that this PR fixes (e.g., `--fixes 123,456`)
Examples:
- `/pr-description 3534`
- `/pr-description 3534 --fixes 123`
- `/pr-description 3534 --fixes 123,456,789`
## Instructions
1. First, gather information about the PR:
- Use GitHub plugin to get PR details (title, current description, base branch)
- Use local git to get commits: `git log main..HEAD --oneline`
- Use local git to get the diff: `git diff main..HEAD`
- Parse any `--fixes` argument for issue numbers
2. Check the existing PR description:
- If it already has a complete, accurate description that reflects the changes, do nothing
- If it's missing sections, incomplete, or outdated compared to the actual changes, proceed to update
- If it only has the template placeholder text, generate a full description
3. Analyze the changes:
- Understand the purpose of each commit
- Identify any breaking changes (API changes, removed features, behavior changes)
- Look for new features, bug fixes, refactoring, or documentation changes
- Collect issue numbers from:
- The `--fixes` argument (if provided)
- Commit messages (patterns like "Fixes #123", "Closes #456", "Resolves #789")
4. Generate or update the PR description with these sections:
## PR Description Format
### Summary (always include)
Brief bullet points describing what changed and why. Focus on the *purpose* and *impact*, not implementation details.
```markdown
## Summary
- Added X to enable Y
- Fixed bug where Z would happen
- Refactored W for better maintainability
```
### Breaking Changes (include only if applicable)
Document any changes that affect existing users or APIs.
```markdown
## Breaking Changes
- `ClassName.method()` now requires a `param` argument
- Removed deprecated `old_function()` - use `new_function()` instead
```
### Testing (include when non-obvious)
How to verify the changes work. Skip for trivial changes.
```markdown
## Testing
- Run `uv run pytest tests/test_feature.py` to verify the fix
- Example usage: `uv run examples/new_feature.py`
```
### Fixes (include if issues are provided or found in commits)
List issues this PR fixes. GitHub will automatically close these issues when the PR is merged.
```markdown
## Fixes
- Fixes #123
- Fixes #456
```
Note: Use "Fixes #X" format (not "Closes" or "Resolves") for consistency. Each issue should be on its own line with "Fixes" to ensure GitHub auto-closes them.
## Guidelines
- **Be concise** - Reviewers should understand the PR in 30 seconds
- **Focus on why** - The diff shows *what* changed, explain *why*
- **Skip empty sections** - Only include sections that have content
- **Use bullet points** - Easier to scan than paragraphs
- **Don't duplicate the diff** - Avoid listing every file or line changed
## Example Output
```markdown
## Summary
- Added `/docstring` skill for documenting Python modules with Google-style docstrings
- Skill finds classes by name and handles conflicts when multiple matches exist
- Skips already-documented code to avoid unnecessary changes
## Testing
/docstring ClassName
## Fixes
- Fixes #123
```
## Checklist
Before updating the PR:
- [ ] Verified existing description needs updating (not already complete)
- [ ] Summary accurately reflects the changes
- [ ] Breaking changes are clearly documented (if any)
- [ ] No unnecessary sections included
- [ ] Description is concise and scannable

View File

@@ -33,7 +33,7 @@ jobs:
- name: Install dependencies
run: |
uv sync --group dev --extra anthropic --extra aws --extra google --extra langchain --extra websocket
uv sync --group dev --extra anthropic --extra aws --extra google --extra langchain --extra livekit --extra websocket
- name: Run tests with coverage
run: |

View File

@@ -37,7 +37,7 @@ jobs:
- name: Install dependencies
run: |
uv sync --group dev --extra anthropic --extra aws --extra google --extra langchain --extra websocket
uv sync --group dev --extra anthropic --extra aws --extra google --extra langchain --extra livekit --extra websocket
- name: Test with pytest
run: |

11
.gitignore vendored
View File

@@ -4,7 +4,14 @@ __pycache__/
*~
venv
.venv
/.idea
.idea
.gradle
.next
next-env.d.ts
local.properties
*.log
*.lock
smart_turn_audio_log
#*#
# Distribution / Packaging
@@ -27,7 +34,7 @@ share/python-wheels/
*.egg
MANIFEST
.DS_Store
.env
.env*
fly.toml
# Examples

View File

@@ -7,6 +7,129 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- towncrier release notes start -->
## [0.0.100] - 2026-01-20
### Added
- Added Hathora service to support Hathora-hosted TTS and STT models (only
non-streaming)
(PR [#3169](https://github.com/pipecat-ai/pipecat/pull/3169))
- Added `CambTTSService`, using Camb.ai's TTS integration with MARS models
(mars-flash, mars-pro, mars-instruct) for high-quality text-to-speech
synthesis.
(PR [#3349](https://github.com/pipecat-ai/pipecat/pull/3349))
- Added the `additional_headers` param to `WebsocketClientParams`, allowing
`WebsocketClientTransport` to send custom headers on connect, for cases such
as authentication.
(PR [#3461](https://github.com/pipecat-ai/pipecat/pull/3461))
- Added `UserIdleController` for detecting user idle state, integrated into
`LLMUserAggregator` and `UserTurnProcessor` via optional `user_idle_timeout`
parameter. Emits `on_user_turn_idle` event for application-level handling.
Deprecated `UserIdleProcessor` in favor of the new compositional approach.
(PR [#3482](https://github.com/pipecat-ai/pipecat/pull/3482))
- Added `on_user_mute_started` and `on_user_mute_stopped` event handlers to
`LLMUserAggregator` for tracking user mute state changes.
(PR [#3490](https://github.com/pipecat-ai/pipecat/pull/3490))
### Changed
- Enhanced interruption handling in `AsyncAITTSService` by supporting
multi-context WebSocket sessions for more robust context management.
(PR [#3287](https://github.com/pipecat-ai/pipecat/pull/3287))
- Throttle `UserSpeakingFrame` to broadcast at most every 200ms instead of on
every audio chunk, reducing frame processing overhead during user speech.
(PR [#3483](https://github.com/pipecat-ai/pipecat/pull/3483))
### Deprecated
- For consistency with other package names, we just deprecated
`pipecat.turns.mute` (introduced in Pipecat 0.0.99) in favor of
`pipecat.turns.user_mute`.
(PR [#3479](https://github.com/pipecat-ai/pipecat/pull/3479))
### Fixed
- Corrected TTFB metric calculation in `AsyncAIHttpTTSService`.
(PR [#3287](https://github.com/pipecat-ai/pipecat/pull/3287))
- Fixed an issue where the "bot-llm-text" RTVI event would not fire for
realtime (speech-to-speech) services:
- `AWSNovaSonicLLMService`
- `GeminiLiveLLMService`
- `OpenAIRealtimeLLMService`
- `GrokRealtimeLLMService`
The issue was that these services weren't pushing `LLMTextFrame`s. Now
they do.
(PR [#3446](https://github.com/pipecat-ai/pipecat/pull/3446))
- Fixed an issue where `on_user_turn_stop_timeout` could fire while a user is
talking when using `ExternalUserTurnStrategies`.
(PR [#3454](https://github.com/pipecat-ai/pipecat/pull/3454))
- Fixed an issue where user turn start strategies were not being reset after a
user turn started, causing incorrect strategy behavior.
(PR [#3455](https://github.com/pipecat-ai/pipecat/pull/3455))
- Fixed `MinWordsUserTurnStartStrategy` to not aggregate transcriptions,
preventing incorrect turn starts when words are spoken with pauses between
them.
(PR [#3462](https://github.com/pipecat-ai/pipecat/pull/3462))
- Fixed an issue where Grok Realtime would error out when running with
SmallWebRTC transport.
(PR [#3480](https://github.com/pipecat-ai/pipecat/pull/3480))
- Fixed a `Mem0MemoryService` issue where passing `async_mode: true` was
causing an error. See
https://docs.mem0.ai/platform/features/async-mode-default-change.
(PR [#3484](https://github.com/pipecat-ai/pipecat/pull/3484))
- Fixed `AWSNovaSonicLLMService.reset_conversation()`, which would previously
error out. Now it successfully reconnects and "rehydrates" from the context
object.
(PR [#3486](https://github.com/pipecat-ai/pipecat/pull/3486))
- Fixed `AzureTTSService` transcript formatting issues:
- Punctuation now appears without extra spaces (e.g., "Hello!" instead of
"Hello !")
- CJK languages (Chinese, Japanese, Korean) no longer have unwanted spaces
between characters
(PR [#3489](https://github.com/pipecat-ai/pipecat/pull/3489))
- Fixed an issue where `UninterruptibleFrame` frames would not be preserved in
some cases.
(PR [#3494](https://github.com/pipecat-ai/pipecat/pull/3494))
- Fixed memory leak in `LiveKitTransport` when `video_in_enabled` is `False`.
(PR [#3499](https://github.com/pipecat-ai/pipecat/pull/3499))
- Fixed an issue in `AIService` where unhandled exceptions in `start()`,
`stop()`, or `cancel()` implementations would prevent `process_frame()` to
continue and therefore `StartFrame`, `EndFrame`, or `CancelFrame` from being
pushed downstream, causing the pipeline to not start or stop properly.
(PR [#3503](https://github.com/pipecat-ai/pipecat/pull/3503))
- Moved `NVIDIATTSService` and `NVIDIASTTService` client initialization from
constructor to `start()` for better error handling.
(PR [#3504](https://github.com/pipecat-ai/pipecat/pull/3504))
- Optimized `NVIDIATTSService` to process incoming audio frames immediately.
(PR [#3509](https://github.com/pipecat-ai/pipecat/pull/3509))
- Optimized `NVIDIASTTService` by removing unnecessary queue and task.
(PR [#3509](https://github.com/pipecat-ai/pipecat/pull/3509))
- Fixed a `CambTTSService` issue where client was being initialized in the
constructor which wouldn't allow for proper Pipeline error handling.
(PR [#3511](https://github.com/pipecat-ai/pipecat/pull/3511))
## [0.0.99] - 2026-01-13
### Added

View File

@@ -81,7 +81,7 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
| Serializers | [Exotel](https://docs.pipecat.ai/server/utilities/serializers/exotel), [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx), [Vonage](https://docs.pipecat.ai/server/utilities/serializers/vonage) |
| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/google-imagen), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |

View File

@@ -1 +0,0 @@
- Added Hathora service to support Hathora-hosted TTS and STT models (only non-streaming)

View File

@@ -1 +0,0 @@
- Enhanced interruption handling in `AsyncAITTSService` by supporting multi-context WebSocket sessions for more robust context management.

View File

@@ -1 +0,0 @@
- Corrected TTFB metric calculation in `AsyncAIHttpTTSService`.

View File

@@ -1 +0,0 @@
- Added `CambTTSService`, using Camb.ai's TTS integration with MARS models (mars-flash, mars-pro, mars-instruct) for high-quality text-to-speech synthesis.

View File

@@ -1,8 +0,0 @@
- Fixed an issue where the "bot-llm-text" RTVI event would not fire for realtime (speech-to-speech) services:
- `AWSNovaSonicLLMService`
- `GeminiLiveLLMService`
- `OpenAIRealtimeLLMService`
- `GrokRealtimeLLMService`
The issue was that these services weren't pushing `LLMTextFrame`s. Now they do.

View File

@@ -1 +0,0 @@
- Fixed an issue where `on_user_turn_stop_timeout` could fire while a user is talking when using `ExternalUserTurnStrategies`.

View File

@@ -1 +0,0 @@
- Fixed an issue where user turn start strategies were not being reset after a user turn started, causing incorrect strategy behavior.

View File

@@ -1 +0,0 @@
- Added the `additional_headers` param to `WebsocketClientParams`, allowing `WebsocketClientTransport` to send custom headers on connect, for cases such as authentication.

View File

@@ -1 +0,0 @@
- Fixed `MinWordsUserTurnStartStrategy` to not aggregate transcriptions, preventing incorrect turn starts when words are spoken with pauses between them.

View File

@@ -1 +0,0 @@
- For consistency with other package names, we just deprecated `pipecat.turns.mute` (introduced in Pipecat 0.0.99) in favor of `pipecat.turns.user_mute`.

View File

@@ -1 +0,0 @@
- Fixed an issue where Grok Realtime would error out when running with SmallWebRTC transport.

View File

@@ -1 +0,0 @@
- Added `UserIdleController` for detecting user idle state, integrated into `LLMUserAggregator` and `UserTurnProcessor` via optional `user_idle_timeout` parameter. Emits `on_user_turn_idle` event for application-level handling. Deprecated `UserIdleProcessor` in favor of the new compositional approach.

View File

@@ -1 +0,0 @@
- Throttle `UserSpeakingFrame` to broadcast at most every 200ms instead of on every audio chunk, reducing frame processing overhead during user speech.

View File

@@ -1 +0,0 @@
- Fixed a `Mem0MemoryService` issue where passing `async_mode: true` was causing an error. See https://docs.mem0.ai/platform/features/async-mode-default-change.

View File

@@ -1,3 +0,0 @@
- Fixed `AzureTTSService` transcript formatting issues:
- Punctuation now appears without extra spaces (e.g., "Hello!" instead of "Hello !")
- CJK languages (Chinese, Japanese, Korean) no longer have unwanted spaces between characters

View File

@@ -1 +0,0 @@
- Added `on_user_mute_started` and `on_user_mute_stopped` event handlers to `LLMUserAggregator` for tracking user mute state changes.

View File

@@ -0,0 +1 @@
- Added `add_reached_upstream_filter()` and `add_reached_downstream_filter()` methods to `PipelineTask` for appending frame types.

1
changelog/3510.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `reached_upstream_types` and `reached_downstream_types` read-only properties to `PipelineTask` for inspecting current frame filters.

View File

@@ -0,0 +1 @@
- Changed frame filter storage from tuples to sets in `PipelineTask`.

View File

@@ -0,0 +1 @@
- Added `RTVIProcessor.create_rtvi_observer()` factory method for creating RTVI observers.

View File

@@ -0,0 +1 @@
- Added `FrameProcessor.broadcast_frame_instance(frame)` method to broadcast a frame instance by extracting its fields and creating new instances for each direction.

1
changelog/3519.added.md Normal file
View File

@@ -0,0 +1 @@
- `PipelineTask` now automatically adds `RTVIProcessor` and registers `RTVIObserver` when `enable_rtvi=True` (default), simplifying pipeline setup.

View File

@@ -0,0 +1 @@
- Fixed `FrameProcessor.broadcast_frame()` to deep copy kwargs, preventing shared mutable references between the downstream and upstream frame instances.

1
changelog/3519.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Transports now properly broadcast `InputTransportMessageFrame` frames both upstream and downstream instead of only pushing downstream.

1
changelog/3520.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `video_out_codec` parameter to `TransportParams` allowing configuration of the preferred video codec (e.g., `"VP8"`, `"H264"`, `"H265"`) for video output in `DailyTransport`.

1
changelog/3523.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `location` parameter to Google TTS services (`GoogleHttpTTSService`, `GoogleTTSService`, `GeminiTTSService`) for regional endpoint support.

1
changelog/3525.added.md Normal file
View File

@@ -0,0 +1 @@
- Added new `SMART_TURN_LOG_DATA` environment variable, which causes Smart Turn input data to be saved to disk

View File

@@ -0,0 +1,2 @@
- Changed default Inworld TTS model from `inworld-tts-1` to
`inworld-tts-1.5-max`.

View File

@@ -10,7 +10,6 @@ import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams

View File

@@ -45,7 +45,6 @@ from pipecat.services.google.tts import GoogleTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies

View File

@@ -28,7 +28,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.filters.krisp_viva_filter import KrispVivaFilter
from pipecat.audio.turn.krisp_viva_turn import KrispTurnParams, KrispVivaTurn
from pipecat.audio.turn.krisp_viva_turn import KrispVivaTurn
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame

View File

@@ -23,7 +23,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
@@ -93,12 +92,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)
rtvi = RTVIProcessor()
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
user_aggregator,
llm,
@@ -115,7 +111,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
observers=[
RTVIObserver(rtvi),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),

View File

@@ -22,7 +22,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
@@ -88,12 +87,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
user_aggregator,
llm,
@@ -110,7 +106,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
observers=[
RTVIObserver(rtvi),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),

View File

@@ -22,7 +22,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
@@ -90,12 +89,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi,
stt,
user_aggregator, # User responses
llm, # LLM
@@ -114,7 +110,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[
RTVIObserver(rtvi),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
@@ -123,10 +118,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
],
)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")

View File

@@ -22,7 +22,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,

View File

@@ -17,7 +17,7 @@ from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame, UserImageRequestFrame
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask

View File

@@ -22,7 +22,6 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
@@ -114,6 +113,14 @@ async def load_conversation(params: FunctionCallParams):
# "content": f"{AWSNovaSonicLLMService.AWAIT_TRIGGER_ASSISTANT_RESPONSE_INSTRUCTION}",
# }
# )
# If the last message isn't from the user, add a message asking for a recap
if messages and messages[-1].get("role") != "user":
messages.append(
{
"role": "user",
"content": "Can you catch me up on what we were talking about?",
}
)
params.context.set_messages(messages)
await params.llm.reset_conversation()
# await params.llm.trigger_assistant_response()

View File

@@ -59,7 +59,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
@@ -255,12 +254,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
),
)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
user_aggregator,
memory,
@@ -278,12 +275,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[RTVIObserver(rtvi)],
)
@rtvi.event_handler("on_client_ready")
@task.rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Get personalized greeting based on user memories. Can pass agent_id and run_id as per requirement of the application to manage short term memory or agent specific memory.
greeting = await get_initial_greeting(
memory_client=memory.memory_client, user_id=USER_ID, agent_id=None, run_id=None

View File

@@ -22,7 +22,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -87,8 +86,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)
rtvi = RTVIProcessor()
pipeline = Pipeline(
[
transport.input(), # Transport user input
@@ -108,13 +105,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@rtvi.event_handler("on_client_ready")
@task.rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Kick off the conversation
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])

View File

@@ -9,7 +9,6 @@ import asyncio
import io
import json
import os
import re
import shutil
import aiohttp

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2025, Daily
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
@@ -22,7 +22,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
@@ -125,14 +124,10 @@ async def run_bot(pipecat_transport):
),
)
# RTVI events for Pipecat client UI
rtvi = RTVIProcessor()
pipeline = Pipeline(
[
pipecat_transport.input(),
user_aggregator,
rtvi,
llm, # LLM
EdgeDetectionProcessor(
pipecat_transport._params.video_out_width,
@@ -149,13 +144,11 @@ async def run_bot(pipecat_transport):
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@rtvi.event_handler("on_client_ready")
@task.rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
logger.info("Pipecat client ready.")
await rtvi.set_bot_ready()
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])

View File

@@ -13,7 +13,7 @@ from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame, ThoughtTranscriptionMessage, TranscriptionMessage
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask

View File

@@ -53,8 +53,6 @@ from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.grok.realtime.events import (
SessionProperties,
WebSearchTool,
XSearchTool,
)
from pipecat.services.grok.realtime.llm import GrokRealtimeLLMService
from pipecat.services.llm_service import FunctionCallParams

View File

@@ -1,11 +1,11 @@
agent_name = "quickstart"
image = "your_username/quickstart:0.1"
secret_set = "quickstart-secrets"
agent_name = "quickstart-test"
image = "markatdaily/quickstart-test:latest"
secret_set = "quickstart-test-secrets"
agent_profile = "agent-1x"
# RECOMMENDED: Set an image pull secret:
# https://docs.pipecat.ai/deployment/pipecat-cloud/fundamentals/secrets#image-pull-secrets
# image_credentials = "your_image_pull_secret"
image_credentials = "dockerhub-access"
[scaling]
min_agents = 1

View File

@@ -293,12 +293,13 @@ async def run_eval_pipeline(
"You should only call the eval function if:\n"
"- The user explicitly attempts to answer the question, AND\n"
f"- Their answer can be cleanly evaluated using: {eval_config.eval}\n"
"Ignore greetings, comments, non-answers, or requests for clarification."
"Ignore greetings, comments, non-answers, or requests for clarification.\n"
"Numerical word answers are allowed (e.g., 'five' is the same as '5').\n"
)
if eval_config.eval_speaks_first:
system_prompt = f"You are an evaluation agent, be extremly brief. Numerical word answers are allowed. You will start the conversation by saying: '{example_prompt}'. {common_system_prompt}"
system_prompt = f"You are an evaluation agent, be extremly brief. You will start the conversation by saying: '{example_prompt}'. {common_system_prompt}"
else:
system_prompt = f"You are an evaluation agent, be extremly brief. Numerical word answers are allowed. First, ask one question: {example_prompt}. {common_system_prompt}"
system_prompt = f"You are an evaluation agent, be extremly brief. First, ask one question: {example_prompt}. {common_system_prompt}"
messages = [
{

View File

@@ -137,6 +137,7 @@ TESTS_07 = [
# ("07zd-interruptible-aicoustics.py", EVAL_SIMPLE_MATH),
("07ze-interruptible-hume.py", EVAL_SIMPLE_MATH),
("07zf-interruptible-gradium.py", EVAL_SIMPLE_MATH),
("07zg-interruptible-camb.py", EVAL_SIMPLE_MATH),
("07zh-interruptible-hathora.py", EVAL_SIMPLE_MATH),
# Needs a local XTTS docker instance running.
# ("07i-interruptible-xtts.py", EVAL_SIMPLE_MATH),

View File

@@ -22,7 +22,7 @@ from pathlib import Path
try:
import numpy as np
import soundfile as sf
import soundfile as sf # noqa: F401
from audio_file_utils import calculate_audio_stats, read_audio_file, write_audio_file
except ImportError as e:
print(f"Error: Missing required dependencies: {e}")

View File

@@ -23,7 +23,7 @@ from pathlib import Path
try:
import numpy as np
import soundfile as sf
import soundfile as sf # noqa: F401
from audio_file_utils import read_audio_file
except ImportError as e:
print(f"Error: Missing required dependencies: {e}")

View File

@@ -10,7 +10,7 @@ import base64
import copy
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Literal, Optional, TypedDict
from typing import Any, Dict, List, Optional, TypedDict
from loguru import logger

View File

@@ -9,7 +9,7 @@
import base64
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, TypedDict
from typing import Any, Dict, List, Optional, TypedDict
from loguru import logger
from openai import NotGiven

View File

@@ -7,10 +7,8 @@
"""OpenAI LLM adapter for Pipecat."""
import copy
import json
from typing import Any, Dict, List, TypedDict
from openai._types import NOT_GIVEN as OPEN_AI_NOT_GIVEN
from openai._types import NotGiven as OpenAINotGiven
from openai.types.chat import (
ChatCompletionMessageParam,

View File

@@ -9,7 +9,6 @@
This module provides an audio filter implementation using Krisp VIVA SDK.
"""
import asyncio
import os
import numpy as np

View File

@@ -16,6 +16,7 @@ import numpy as np
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import BaseSmartTurn
from pipecat.utils.env import env_truthy
try:
import onnxruntime as ort
@@ -48,6 +49,8 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
"""
super().__init__(**kwargs)
self._log_data = env_truthy("SMART_TURN_LOG_DATA", default=False)
if not smart_turn_model_path:
# Load bundled model
model_name = "smart-turn-v3.2-cpu.onnx"
@@ -81,6 +84,49 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
logger.debug("Loaded Local Smart Turn v3.x")
def _write_audio_to_wav(
self, audio_array: np.ndarray, sample_rate: int = 16000, suffix: str = ""
) -> None:
"""Write audio data to a WAV file in a background thread.
Args:
audio_array: The audio data as a numpy array (float32, normalized to [-1, 1]).
sample_rate: The sample rate of the audio data.
suffix: Optional suffix to append to the filename (e.g., "_raw", "_padded").
"""
import os
import threading
import wave
from datetime import datetime
# Generate filename with current timestamp (millisecond precision)
timestamp = datetime.now().strftime("%Y-%m-%d__%H:%M:%S.%f")[:-3]
log_dir = "./smart_turn_audio_log"
os.makedirs(log_dir, exist_ok=True)
filename = os.path.join(log_dir, f"{timestamp}{suffix}.wav")
# Make a copy of the audio data to avoid issues with the array being modified
audio_copy = audio_array.copy()
def write_wav():
try:
# Convert float32 audio to int16 for WAV file
audio_int16 = (audio_copy * 32767).astype(np.int16)
with wave.open(filename, "wb") as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 2 bytes for int16
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_int16.tobytes())
logger.debug(f"Wrote audio to {filename}")
except Exception as e:
logger.error(f"Failed to write audio to {filename}: {e}")
# Start background thread to write the WAV file
thread = threading.Thread(target=write_wav, daemon=True)
thread.start()
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Predict end-of-turn using local ONNX model."""
@@ -95,6 +141,8 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
return np.pad(audio_array, (padding, 0), mode="constant", constant_values=0)
return audio_array
audio_for_logging = audio_array
# Truncate to 8 seconds (keeping the end) or pad to 8 seconds
audio_array = truncate_audio_to_last_n_seconds(audio_array, n_seconds=8)
@@ -122,6 +170,10 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
# Make prediction (1 for Complete, 0 for Incomplete)
prediction = 1 if probability > 0.5 else 0
if self._log_data:
suffix = "_complete" if prediction == 1 else "_incomplete"
self._write_audio_to_wav(audio_for_logging, sample_rate=16000, suffix=suffix)
return {
"prediction": prediction,
"probability": probability,

View File

@@ -15,7 +15,7 @@ import asyncio
import importlib.util
import os
from pathlib import Path
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Tuple, Type
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Set, Tuple, Type
from loguru import logger
from pydantic import BaseModel, ConfigDict, Field
@@ -49,6 +49,7 @@ from pipecat.pipeline.pipeline import Pipeline, PipelineSink, PipelineSource
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.processors.frameworks.rtvi import RTVIObserverParams, RTVIProcessor
from pipecat.utils.asyncio.task_manager import BaseTaskManager, TaskManager, TaskManagerParams
from pipecat.utils.tracing.setup import is_tracing_available
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
@@ -225,9 +226,12 @@ class PipelineTask(BasePipelineTask):
conversation_id: Optional[str] = None,
enable_tracing: bool = False,
enable_turn_tracking: bool = True,
enable_rtvi: bool = True,
idle_timeout_frames: Tuple[Type[Frame], ...] = (BotSpeakingFrame, UserSpeakingFrame),
idle_timeout_secs: Optional[float] = IDLE_TIMEOUT_SECS,
observers: Optional[List[BaseObserver]] = None,
rtvi_processor: Optional[RTVIProcessor] = None,
rtvi_observer_params: Optional[RTVIObserverParams] = None,
task_manager: Optional[BaseTaskManager] = None,
):
"""Initialize the PipelineTask.
@@ -244,6 +248,7 @@ class PipelineTask(BasePipelineTask):
check_dangling_tasks: Whether to check for processors' tasks finishing properly.
clock: Clock implementation for timing operations.
conversation_id: Optional custom ID for the conversation.
enable_rtvi: Whether to automatically add RTVI support to the pipeline.
enable_tracing: Whether to enable tracing.
enable_turn_tracking: Whether to enable turn tracking.
idle_timeout_frames: A tuple with the frames that should trigger an idle
@@ -252,6 +257,8 @@ class PipelineTask(BasePipelineTask):
None. If a pipeline is idle the pipeline task will be cancelled
automatically.
observers: List of observers for monitoring pipeline execution.
rtvi_observer_params: The RTVI observer parameter to use if RTVI is enabled.
rtvi_processor: The RTVI processor to add if RTVI is enabled.
task_manager: Optional task manager for handling asyncio tasks.
"""
super().__init__()
@@ -306,6 +313,16 @@ class PipelineTask(BasePipelineTask):
self._heartbeat_push_task: Optional[asyncio.Task] = None
self._heartbeat_monitor_task: Optional[asyncio.Task] = None
# RTVI support
self._rtvi = None
if enable_rtvi:
self._rtvi = rtvi_processor or RTVIProcessor()
observers.append(self._rtvi.create_rtvi_observer(params=rtvi_observer_params))
@self.rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi: RTVIProcessor):
await rtvi.set_bot_ready()
# This is the idle event. When selected frames are pushed from any
# processor we consider the pipeline is not idle. We use an observer
# which will be listening any part of the pipeline.
@@ -335,7 +352,8 @@ class PipelineTask(BasePipelineTask):
# allows us to receive and react to downstream frames.
source = PipelineSource(self._source_push_frame, name=f"{self}::Source")
sink = PipelineSink(self._sink_push_frame, name=f"{self}::Sink")
self._pipeline = Pipeline([pipeline], source=source, sink=sink)
processors = [self._rtvi, pipeline] if self._rtvi else [pipeline]
self._pipeline = Pipeline(processors, source=source, sink=sink)
# The task observer acts as a proxy to the provided observers. This way,
# we only need to pass a single observer (using the StartFrame) which
@@ -348,8 +366,8 @@ class PipelineTask(BasePipelineTask):
# in. This is mainly for efficiency reason because each event handler
# creates a task and most likely you only care about one or two frame
# types.
self._reached_upstream_types: Tuple[Type[Frame], ...] = ()
self._reached_downstream_types: Tuple[Type[Frame], ...] = ()
self._reached_upstream_types: Set[Type[Frame]] = set()
self._reached_downstream_types: Set[Type[Frame]] = set()
self._register_event_handler("on_frame_reached_upstream")
self._register_event_handler("on_frame_reached_downstream")
self._register_event_handler("on_idle_timeout")
@@ -398,6 +416,35 @@ class PipelineTask(BasePipelineTask):
"""
return self._turn_trace_observer
@property
def rtvi(self) -> RTVIProcessor:
"""Get the RTVI processor if RTVI is enabled.
Returns:
The RTVI processor added to the pipeline when RTVI is enabled.
"""
if not self._rtvi:
raise Exception(f"{self} RTVI is not enabled.")
return self._rtvi
@property
def reached_upstream_types(self) -> Tuple[Type[Frame], ...]:
"""Get the currently configured upstream frame type filters.
Returns:
Tuple of frame types that trigger the on_frame_reached_upstream event.
"""
return tuple(self._reached_upstream_types)
@property
def reached_downstream_types(self) -> Tuple[Type[Frame], ...]:
"""Get the currently configured downstream frame type filters.
Returns:
Tuple of frame types that trigger the on_frame_reached_downstream event.
"""
return tuple(self._reached_downstream_types)
def event_handler(self, event_name: str):
"""Decorator for registering event handlers.
@@ -441,7 +488,7 @@ class PipelineTask(BasePipelineTask):
Args:
types: Tuple of frame types to monitor for upstream events.
"""
self._reached_upstream_types = types
self._reached_upstream_types = set(types)
def set_reached_downstream_filter(self, types: Tuple[Type[Frame], ...]):
"""Set which frame types trigger the on_frame_reached_downstream event.
@@ -449,7 +496,23 @@ class PipelineTask(BasePipelineTask):
Args:
types: Tuple of frame types to monitor for downstream events.
"""
self._reached_downstream_types = types
self._reached_downstream_types = set(types)
def add_reached_upstream_filter(self, types: Tuple[Type[Frame], ...]):
"""Add frame types to trigger the on_frame_reached_upstream event.
Args:
types: Tuple of frame types to add to upstream monitoring.
"""
self._reached_upstream_types.update(types)
def add_reached_downstream_filter(self, types: Tuple[Type[Frame], ...]):
"""Add frame types to trigger the on_frame_reached_downstream event.
Args:
types: Tuple of frame types to add to downstream monitoring.
"""
self._reached_downstream_types.update(types)
def has_finished(self) -> bool:
"""Check if the pipeline task has finished execution.
@@ -749,7 +812,7 @@ class PipelineTask(BasePipelineTask):
pipeline to be stopped (e.g. EndTaskFrame) in which case we would send
an EndFrame down the pipeline.
"""
if isinstance(frame, self._reached_upstream_types):
if isinstance(frame, tuple(self._reached_upstream_types)):
await self._call_event_handler("on_frame_reached_upstream", frame)
if isinstance(frame, EndTaskFrame):
@@ -788,7 +851,7 @@ class PipelineTask(BasePipelineTask):
processors have handled the EndFrame and therefore we can exit the task
cleanly.
"""
if isinstance(frame, self._reached_downstream_types):
if isinstance(frame, tuple(self._reached_downstream_types)):
await self._call_event_handler("on_frame_reached_downstream", frame)
if isinstance(frame, StartFrame):

View File

@@ -34,7 +34,6 @@ from PIL import Image
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import AudioRawFrame, Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
# JSON custom encoder to handle bytes arrays so that we can log contexts
# with images to the console.

View File

@@ -18,7 +18,7 @@ from typing import List
from loguru import logger
from pipecat.frames.frames import ErrorFrame, Frame, TranscriptionFrame
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

View File

@@ -12,7 +12,9 @@ management, and frame flow control mechanisms.
"""
import asyncio
import dataclasses
import traceback
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum
from typing import (
@@ -779,8 +781,40 @@ class FrameProcessor(BaseObject):
frame_cls: The class of the frame to be broadcasted.
**kwargs: Keyword arguments to be passed to the frame's constructor.
"""
await self.push_frame(frame_cls(**kwargs))
await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM)
await self.push_frame(frame_cls(**deepcopy(kwargs)))
await self.push_frame(frame_cls(**deepcopy(kwargs)), FrameDirection.UPSTREAM)
async def broadcast_frame_instance(self, frame: Frame):
"""Broadcasts a frame instance upstream and downstream.
This method creates two new frame instances copying all fields from the
original frame except `id` and `name`, which get fresh values.
Args:
frame: The frame instance to broadcast.
Note:
Prefer using `broadcast_frame()` when possible, as it is more
efficient. This method should only be used when you are not the
creator of the frame and need to broadcast an existing instance.
"""
frame_cls = type(frame)
init_fields = {f.name: getattr(frame, f.name) for f in dataclasses.fields(frame) if f.init}
extra_fields = {
f.name: getattr(frame, f.name)
for f in dataclasses.fields(frame)
if not f.init and f.name not in ("id", "name")
}
new_frame = frame_cls(**deepcopy(init_fields))
for k, v in deepcopy(extra_fields).items():
setattr(new_frame, k, v)
await self.push_frame(new_frame)
new_frame = frame_cls(**deepcopy(init_fields))
for k, v in deepcopy(extra_fields).items():
setattr(new_frame, k, v)
await self.push_frame(new_frame, FrameDirection.UPSTREAM)
async def __start(self, frame: StartFrame):
"""Handle the start frame to initialize processor state.
@@ -950,7 +984,8 @@ class FrameProcessor(BaseObject):
# Process current queue and keep UninterruptibleFrame frames.
while not self.__process_queue.empty():
item = self.__process_queue.get_nowait()
if isinstance(item, UninterruptibleFrame):
frame = item[0]
if isinstance(frame, UninterruptibleFrame):
new_queue.put_nowait(item)
self.__process_queue.task_done()

View File

@@ -1100,13 +1100,11 @@ class RTVIObserver(BaseObserver):
if (
isinstance(frame, (UserStartedSpeakingFrame, UserStoppedSpeakingFrame))
and (direction == FrameDirection.DOWNSTREAM)
and self._params.user_speaking_enabled
):
await self._handle_interruptions(frame)
elif (
isinstance(frame, (BotStartedSpeakingFrame, BotStoppedSpeakingFrame))
and (direction == FrameDirection.UPSTREAM)
and self._params.bot_speaking_enabled
):
await self._handle_bot_speaking(frame)
@@ -1413,6 +1411,18 @@ class RTVIProcessor(FrameProcessor):
self._registered_services[service.name] = service
def create_rtvi_observer(self, *, params: Optional[RTVIObserverParams] = None, **kwargs):
"""Creates a new RTVI Observer.
Args:
params: Settings to enable/disable specific messages.
**kwargs: Additional arguments passed to the observer.
Returns:
A new RTVI observer.
"""
return RTVIObserver(self, params=params, **kwargs)
async def set_client_ready(self):
"""Mark the client as ready and trigger the ready event."""
self._client_ready = True

View File

@@ -263,7 +263,7 @@ def _setup_webrtc_routes(
"""Handle WebRTC offer requests via SmallWebRTCRequestHandler."""
# Prepare runner arguments with the callback to run your bot
async def webrtc_connection_callback(connection):
async def webrtc_connection_callback(connection: SmallWebRTCConnection):
bot_module = _get_bot_module()
runner_args = SmallWebRTCRunnerArguments(
@@ -406,13 +406,7 @@ def _setup_whatsapp_routes(app: FastAPI):
return
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
from pipecat.transports.whatsapp.api import WhatsAppWebhookRequest
from pipecat.transports.whatsapp.client import WhatsAppClient
except ImportError as e:

View File

@@ -126,7 +126,7 @@ class ProtobufFrameSerializer(FrameSerializer):
if "pts" in args_dict:
del args_dict["pts"]
# Special handling for MessageFrame -> OutputTransportMessageUrgentFrame
# Special handling for MessageFrame -> InputTransportMessageFrame
if class_name == MessageFrame:
try:
msg = json.loads(args_dict["data"])

View File

@@ -148,11 +148,11 @@ class AIService(FrameProcessor):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
await self.start(frame)
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
await self._start(frame)
elif isinstance(frame, EndFrame):
await self.stop(frame)
await self._stop(frame)
elif isinstance(frame, CancelFrame):
await self._cancel(frame)
async def process_generator(self, generator: AsyncGenerator[Frame | None, None]):
"""Process frames from an async generator.
@@ -169,3 +169,21 @@ class AIService(FrameProcessor):
await self.push_error_frame(f)
else:
await self.push_frame(f)
async def _start(self, frame: StartFrame):
try:
await self.start(frame)
except Exception as e:
logger.error(f"{self}: exception processing {frame}: {e}")
async def _stop(self, frame: EndFrame):
try:
await self.stop(frame)
except Exception as e:
logger.error(f"{self}: exception processing {frame}: {e}")
async def _cancel(self, frame: CancelFrame):
try:
await self.cancel(frame)
except Exception as e:
logger.error(f"{self}: exception processing {frame}: {e}")

View File

@@ -296,6 +296,7 @@ class AWSNovaSonicLLMService(LLMService):
self._user_text_buffer = ""
self._assistant_text_buffer = ""
self._completed_tool_calls = set()
self._audio_input_started = False
file_path = files("pipecat.services.aws.nova_sonic").joinpath("ready.wav")
with wave.open(file_path.open("rb"), "rb") as wav_file:
@@ -532,14 +533,30 @@ class AWSNovaSonicLLMService(LLMService):
if system_instruction:
await self._send_text_event(text=system_instruction, role=Role.SYSTEM)
# Send conversation history
for message in llm_connection_params["messages"]:
# Send conversation history (except for the last message if it's from the
# user, which we'll send as interactive after starting audio input)
messages = llm_connection_params["messages"]
last_user_message = None
for i, message in enumerate(messages):
# logger.debug(f"Seeding conversation history with message: {message}")
await self._send_text_event(text=message.text, role=message.role)
is_last_message = i == len(messages) - 1
if is_last_message and message.role == Role.USER:
# Save for sending after audio input starts
last_user_message = message
else:
await self._send_text_event(text=message.text, role=message.role)
# Start audio input
await self._send_audio_input_start_event()
# Now send the last user message as interactive to trigger bot response
if last_user_message:
# logger.debug(
# f"Sending last user message as interactive to trigger bot response: {last_user_message}")
await self._send_text_event(
text=last_user_message.text, role=last_user_message.role, interactive=True
)
# Start receiving events
self._receive_task = self.create_task(self._receive_task_handler())
@@ -602,6 +619,7 @@ class AWSNovaSonicLLMService(LLMService):
self._user_text_buffer = ""
self._assistant_text_buffer = ""
self._completed_tool_calls = set()
self._audio_input_started = False
logger.info("Finished disconnecting")
except Exception as e:
@@ -727,8 +745,18 @@ class AWSNovaSonicLLMService(LLMService):
}}
'''
await self._send_client_event(audio_content_start)
self._audio_input_started = True
async def _send_text_event(self, text: str, role: Role):
async def _send_text_event(self, text: str, role: Role, interactive: bool = False):
"""Send a text event to the LLM.
Args:
text: The text content to send.
role: The role associated with the text (e.g., USER, ASSISTANT, SYSTEM).
interactive: Whether the content is interactive. Defaults to False.
False: conversation history or system instruction, sent prior to interactive audio
True: text input sent during (or at the start of) interactive audio
"""
if not self._stream or not self._prompt_name or not text:
return
@@ -741,7 +769,7 @@ class AWSNovaSonicLLMService(LLMService):
"promptName": "{self._prompt_name}",
"contentName": "{content_name}",
"type": "TEXT",
"interactive": true,
"interactive": {json.dumps(interactive)},
"role": "{role.value}",
"textInputConfiguration": {{
"mediaType": "text/plain"
@@ -779,7 +807,7 @@ class AWSNovaSonicLLMService(LLMService):
await self._send_client_event(text_content_end)
async def _send_user_audio_event(self, audio: bytes):
if not self._stream:
if not self._stream or not self._audio_input_started:
return
blob = base64.b64encode(audio)

View File

@@ -10,7 +10,6 @@ This module provides a WebSocket-based connection to AWS Transcribe for real-tim
speech-to-text transcription with support for multiple languages and audio formats.
"""
import asyncio
import json
import os
import random

View File

@@ -10,7 +10,6 @@ This module provides integration with Amazon Polly for text-to-speech synthesis,
supporting multiple languages, voices, and SSML features.
"""
import asyncio
import os
from typing import AsyncGenerator, List, Optional

View File

@@ -17,3 +17,8 @@ with warnings.catch_warnings():
DeprecationWarning,
stacklevel=2,
)
__all__ = [
"AWSNovaSonicLLMService",
"Params",
]

View File

@@ -8,8 +8,6 @@
from typing import Optional
from loguru import logger
from pipecat.transcriptions.language import Language, resolve_language

View File

@@ -15,7 +15,6 @@ import io
from typing import AsyncGenerator
import aiohttp
from loguru import logger
from PIL import Image
from pipecat.frames.frames import ErrorFrame, Frame, URLImageRawFrame

View File

@@ -199,9 +199,10 @@ class CambTTSService(TTSService):
"""
super().__init__(sample_rate=sample_rate, **kwargs)
params = params or CambTTSService.InputParams()
self._api_key = api_key
self._timeout = timeout
self._client = AsyncCambAI(api_key=api_key, timeout=timeout)
params = params or CambTTSService.InputParams()
# Warn if sample rate doesn't match model's supported rate
if sample_rate and sample_rate != MODEL_SAMPLE_RATES.get(model):
@@ -222,6 +223,8 @@ class CambTTSService(TTSService):
self.set_voice(str(voice_id))
self._voice_id = voice_id
self._client = None
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -249,6 +252,8 @@ class CambTTSService(TTSService):
"""
await super().start(frame)
self._client = AsyncCambAI(api_key=self._api_key, timeout=self._timeout)
# Use model-specific sample rate if not explicitly specified
if not self._init_sample_rate:
self._sample_rate = MODEL_SAMPLE_RATES.get(self.model_name, 22050)
@@ -289,6 +294,8 @@ class CambTTSService(TTSService):
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
assert self._client is not None, "Camb.ai TTS service not initialized"
# Buffer for aligning chunks to 2-byte boundaries (16-bit PCM)
audio_buffer = b""

View File

@@ -6,8 +6,6 @@
"""Cerebras LLM service implementation using OpenAI-compatible interface."""
from typing import List
from loguru import logger
from pipecat.adapters.services.open_ai_adapter import OpenAILLMInvocationParams

View File

@@ -27,7 +27,6 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601

View File

@@ -6,8 +6,6 @@
"""DeepSeek LLM service implementation using OpenAI-compatible interface."""
from typing import List
from loguru import logger
from pipecat.adapters.services.open_ai_adapter import OpenAILLMInvocationParams

View File

@@ -6,8 +6,6 @@
"""Fireworks AI service implementation using OpenAI-compatible interface."""
from typing import List
from loguru import logger
from pipecat.adapters.services.open_ai_adapter import OpenAILLMInvocationParams

View File

@@ -1,2 +1,7 @@
from .file_api import GeminiFileAPI
from .gemini import GeminiMultimodalLiveLLMService
__all__ = [
"GeminiFileAPI",
"GeminiMultimodalLiveLLMService",
]

View File

@@ -1,3 +1,9 @@
from .file_api import GeminiFileAPI
from .llm import GeminiLiveLLMService
from .llm_vertex import GeminiLiveVertexLLMService
__all__ = [
"GeminiFileAPI",
"GeminiLiveLLMService",
"GeminiLiveVertexLLMService",
]

View File

@@ -1674,7 +1674,7 @@ class GeminiLiveLLMService(LLMService):
# start a timeout task to flush it later
if self._user_transcription_buffer:
self._transcription_timeout_task = self.create_task(
self._transcription_timeout_handler()
await self._transcription_timeout_handler()
)
async def _handle_msg_output_transcription(self, message: LiveServerMessage):

View File

@@ -40,7 +40,6 @@ from pipecat.frames.frames import (
LLMThoughtStartFrame,
LLMThoughtTextFrame,
LLMUpdateSettingsFrame,
OutputImageRawFrame,
UserImageRawFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Google RTVI integration models and observer implementation.
"""Google RTVI processor and observer implementation.
This module provides integration with Google's services through the RTVI framework,
including models for search responses and an observer for handling Google-specific
@@ -15,10 +15,8 @@ from typing import List, Literal, Optional
from pydantic import BaseModel
from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import FramePushed
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIObserverParams, RTVIProcessor
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame
@@ -88,4 +86,23 @@ class GoogleRTVIObserver(RTVIObserver):
rendered_content=frame.rendered_content,
)
)
await self.push_transport_message_urgent(message)
await self.send_rtvi_message(message)
class GoogleRTVIProcessor(RTVIProcessor):
"""RTVI processor for Google service integration.
Creates a specific Google RTVI Observer.
"""
def create_rtvi_observer(self, *, params: Optional[RTVIObserverParams] = None, **kwargs):
"""Creates a new RTVI Observer.
Args:
params: Settings to enable/disable specific messages.
**kwargs: Additional arguments passed to the observer.
Returns:
A new RTVI observer.
"""
return GoogleRTVIObserver(self)

View File

@@ -29,7 +29,6 @@ from pydantic import BaseModel, Field, field_validator
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,

View File

@@ -40,6 +40,7 @@ from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language, resolve_language
try:
from google.api_core.client_options import ClientOptions
from google.auth import default
from google.auth.exceptions import GoogleAuthError
from google.cloud import texttospeech_v1
@@ -515,6 +516,7 @@ class GoogleHttpTTSService(TTSService):
*,
credentials: Optional[str] = None,
credentials_path: Optional[str] = None,
location: Optional[str] = None,
voice_id: str = "en-US-Chirp3-HD-Charon",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
@@ -525,6 +527,7 @@ class GoogleHttpTTSService(TTSService):
Args:
credentials: JSON string containing Google Cloud service account credentials.
credentials_path: Path to Google Cloud service account JSON file.
location: Google Cloud location for regional endpoint (e.g., "us-central1").
voice_id: Google TTS voice identifier (e.g., "en-US-Standard-A").
sample_rate: Audio sample rate in Hz. If None, uses default.
params: Voice customization parameters including pitch, rate, volume, etc.
@@ -534,6 +537,7 @@ class GoogleHttpTTSService(TTSService):
params = params or GoogleHttpTTSService.InputParams()
self._location = location
self._settings = {
"pitch": params.pitch,
"rate": params.rate,
@@ -586,7 +590,15 @@ class GoogleHttpTTSService(TTSService):
if not creds:
raise ValueError("No valid credentials provided.")
return texttospeech_v1.TextToSpeechAsyncClient(credentials=creds)
client_options = None
if self._location:
client_options = ClientOptions(
api_endpoint=f"{self._location}-texttospeech.googleapis.com"
)
return texttospeech_v1.TextToSpeechAsyncClient(
credentials=creds, client_options=client_options
)
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -783,7 +795,15 @@ class GoogleBaseTTSService(TTSService):
if not creds:
raise ValueError("No valid credentials provided.")
return texttospeech_v1.TextToSpeechAsyncClient(credentials=creds)
client_options = None
if self._location:
client_options = ClientOptions(
api_endpoint=f"{self._location}-texttospeech.googleapis.com"
)
return texttospeech_v1.TextToSpeechAsyncClient(
credentials=creds, client_options=client_options
)
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -903,6 +923,7 @@ class GoogleTTSService(GoogleBaseTTSService):
*,
credentials: Optional[str] = None,
credentials_path: Optional[str] = None,
location: Optional[str] = None,
voice_id: str = "en-US-Chirp3-HD-Charon",
voice_cloning_key: Optional[str] = None,
sample_rate: Optional[int] = None,
@@ -914,6 +935,7 @@ class GoogleTTSService(GoogleBaseTTSService):
Args:
credentials: JSON string containing Google Cloud service account credentials.
credentials_path: Path to Google Cloud service account JSON file.
location: Google Cloud location for regional endpoint (e.g., "us-central1").
voice_id: Google TTS voice identifier (e.g., "en-US-Chirp3-HD-Charon").
voice_cloning_key: The voice cloning key for Chirp 3 custom voices.
sample_rate: Audio sample rate in Hz. If None, uses default.
@@ -924,6 +946,7 @@ class GoogleTTSService(GoogleBaseTTSService):
params = params or GoogleTTSService.InputParams()
self._location = location
self._settings = {
"language": self.language_to_service_language(params.language)
if params.language
@@ -1083,6 +1106,7 @@ class GeminiTTSService(GoogleBaseTTSService):
model: str = "gemini-2.5-flash-tts",
credentials: Optional[str] = None,
credentials_path: Optional[str] = None,
location: Optional[str] = None,
voice_id: str = "Kore",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
@@ -1101,6 +1125,7 @@ class GeminiTTSService(GoogleBaseTTSService):
"gemini-2.5-flash-tts" or "gemini-2.5-pro-tts".
credentials: JSON string containing Google Cloud service account credentials.
credentials_path: Path to Google Cloud service account JSON file.
location: Google Cloud location for regional endpoint (e.g., "us-central1").
voice_id: Voice name from the available Gemini voices.
sample_rate: Audio sample rate in Hz. If None, uses Google's default 24kHz.
params: TTS configuration parameters.
@@ -1127,6 +1152,7 @@ class GeminiTTSService(GoogleBaseTTSService):
if voice_id not in self.AVAILABLE_VOICES:
logger.warning(f"Voice '{voice_id}' not in known voices list. Using anyway.")
self._location = location
self._model = model
self._voice_id = voice_id
self._settings = {

View File

@@ -6,7 +6,6 @@
import base64
import json
import uuid
from typing import Any, AsyncGenerator, Mapping, Optional
from loguru import logger

View File

@@ -16,7 +16,6 @@ from pipecat import version as pipecat_version
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,

View File

@@ -72,7 +72,7 @@ class InworldHttpTTSService(WordTTSService):
api_key: str,
aiohttp_session: aiohttp.ClientSession,
voice_id: str = "Ashley",
model: str = "inworld-tts-1",
model: str = "inworld-tts-1.5-max",
streaming: bool = True,
sample_rate: Optional[int] = None,
encoding: str = "LINEAR16",
@@ -427,7 +427,7 @@ class InworldTTSService(AudioContextWordTTSService):
*,
api_key: str,
voice_id: str = "Ashley",
model: str = "inworld-tts-1",
model: str = "inworld-tts-1.5-max",
url: str = "wss://api.inworld.ai/tts/v1/voice:streamBidirectional",
sample_rate: Optional[int] = None,
encoding: str = "LINEAR16",

View File

@@ -16,7 +16,7 @@ from typing import Any, Dict, List, Optional
from loguru import logger
from pydantic import BaseModel, Field
from pipecat.frames.frames import ErrorFrame, Frame, LLMContextFrame, LLMMessagesFrame
from pipecat.frames.frames import Frame, LLMContextFrame, LLMMessagesFrame
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,

View File

@@ -9,12 +9,10 @@
from typing import List, Sequence
from loguru import logger
from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from openai.types.chat import ChatCompletionMessageParam
from pipecat.adapters.services.open_ai_adapter import OpenAILLMInvocationParams
from pipecat.frames.frames import FunctionCallFromLLM
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai.llm import OpenAILLMService

View File

@@ -46,7 +46,7 @@ def detect_device():
and dtype is the recommended torch data type for that device.
"""
try:
import intel_extension_for_pytorch
import intel_extension_for_pytorch # noqa: F401
if torch.xpu.is_available():
return torch.device("xpu"), torch.float32

View File

@@ -134,6 +134,7 @@ class NvidiaSTTService(STTService):
params = params or NvidiaSTTService.InputParams()
self._server = server
self._api_key = api_key
self._use_ssl = use_ssl
self._profanity_filter = False
@@ -162,18 +163,53 @@ class NvidiaSTTService(STTService):
self.set_model_name(model_function_map.get("model_name"))
metadata = [
["function-id", self._function_id],
["authorization", f"Bearer {api_key}"],
]
auth = riva.client.Auth(None, self._use_ssl, server, metadata)
self._asr_service = riva.client.ASRService(auth)
self._asr_service = None
self._queue = None
self._config = None
self._thread_task = None
self._response_task = None
def _initialize_client(self):
metadata = [
["function-id", self._function_id],
["authorization", f"Bearer {self._api_key}"],
]
auth = riva.client.Auth(None, self._use_ssl, self._server, metadata)
self._asr_service = riva.client.ASRService(auth)
def _create_recognition_config(self):
"""Create the NVIDIA Riva ASR recognition configuration."""
config = riva.client.StreamingRecognitionConfig(
config=riva.client.RecognitionConfig(
encoding=riva.client.AudioEncoding.LINEAR_PCM,
language_code=self._language_code,
model="",
max_alternatives=1,
profanity_filter=self._profanity_filter,
enable_automatic_punctuation=self._automatic_punctuation,
verbatim_transcripts=not self._no_verbatim_transcripts,
sample_rate_hertz=self.sample_rate,
audio_channel_count=1,
),
interim_results=True,
)
riva.client.add_word_boosting_to_config(
config, self._boosted_lm_words, self._boosted_lm_score
)
riva.client.add_endpoint_parameters_to_config(
config,
self._start_history,
self._start_threshold,
self._stop_history,
self._stop_history_eou,
self._stop_threshold,
self._stop_threshold_eou,
)
riva.client.add_custom_configuration_to_config(config, self._custom_configuration)
return config
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -206,49 +242,15 @@ class NvidiaSTTService(STTService):
frame: StartFrame indicating pipeline start.
"""
await super().start(frame)
self._initialize_client()
self._config = self._create_recognition_config()
if self._config:
return
config = riva.client.StreamingRecognitionConfig(
config=riva.client.RecognitionConfig(
encoding=riva.client.AudioEncoding.LINEAR_PCM,
language_code=self._language_code,
model="",
max_alternatives=1,
profanity_filter=self._profanity_filter,
enable_automatic_punctuation=self._automatic_punctuation,
verbatim_transcripts=not self._no_verbatim_transcripts,
sample_rate_hertz=self.sample_rate,
audio_channel_count=1,
),
interim_results=True,
)
riva.client.add_word_boosting_to_config(
config, self._boosted_lm_words, self._boosted_lm_score
)
riva.client.add_endpoint_parameters_to_config(
config,
self._start_history,
self._start_threshold,
self._stop_history,
self._stop_history_eou,
self._stop_threshold,
self._stop_threshold_eou,
)
riva.client.add_custom_configuration_to_config(config, self._custom_configuration)
self._config = config
self._queue = asyncio.Queue()
if not self._thread_task:
self._thread_task = self.create_task(self._thread_task_handler())
if not self._response_task:
self._response_queue = asyncio.Queue()
self._response_task = self.create_task(self._response_task_handler())
logger.debug(f"Initialized NvidiaSTTService with model: {self.model_name}")
async def stop(self, frame: EndFrame):
"""Stop the NVIDIA Riva STT service and clean up resources.
@@ -273,10 +275,6 @@ class NvidiaSTTService(STTService):
await self.cancel_task(self._thread_task)
self._thread_task = None
if self._response_task:
await self.cancel_task(self._response_task)
self._response_task = None
def _response_handler(self):
responses = self._asr_service.streaming_response_generator(
audio_chunks=self,
@@ -285,9 +283,7 @@ class NvidiaSTTService(STTService):
for response in responses:
if not response.results:
continue
asyncio.run_coroutine_threadsafe(
self._response_queue.put(response), self.get_event_loop()
)
asyncio.run_coroutine_threadsafe(self._handle_response(response), self.get_event_loop())
async def _thread_task_handler(self):
try:
@@ -339,12 +335,6 @@ class NvidiaSTTService(STTService):
)
)
async def _response_task_handler(self):
while True:
response = await self._response_queue.get()
await self._handle_response(response)
self._response_queue.task_done()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Process audio data for speech-to-text transcription.
@@ -503,8 +493,6 @@ class NvidiaSegmentedSTTService(SegmentedSTTService):
auth = riva.client.Auth(None, self._use_ssl, self._server, metadata)
self._asr_service = riva.client.ASRService(auth)
logger.info(f"Initialized NvidiaSegmentedSTTService with model: {self.model_name}")
def _create_recognition_config(self):
"""Create the NVIDIA Riva ASR recognition configuration."""
# Create base configuration
@@ -572,6 +560,7 @@ class NvidiaSegmentedSTTService(SegmentedSTTService):
await super().start(frame)
self._initialize_client()
self._config = self._create_recognition_config()
logger.debug(f"Initialized NvidiaSegmentedSTTService with model: {self.model_name}")
async def set_language(self, language: Language):
"""Set the language for the STT service.
@@ -605,21 +594,12 @@ class NvidiaSegmentedSTTService(SegmentedSTTService):
Frame: TranscriptionFrame containing the transcribed text.
"""
try:
await self.start_processing_metrics()
await self.start_ttfb_metrics()
# Make sure the client is initialized
if self._asr_service is None:
self._initialize_client()
# Make sure the config is created
if self._config is None:
self._config = self._create_recognition_config()
# Type assertion to satisfy the IDE
assert self._asr_service is not None, "ASR service not initialized"
assert self._config is not None, "Recognition config not created"
await self.start_processing_metrics()
await self.start_ttfb_metrics()
# Process audio with NVIDIA Riva ASR - explicitly request non-future response
raw_response = self._asr_service.offline_recognize(audio, self._config, future=False)
@@ -627,43 +607,40 @@ class NvidiaSegmentedSTTService(SegmentedSTTService):
await self.stop_processing_metrics()
# Process the response - handle different possible return types
try:
# If it's a future-like object, get the result
if hasattr(raw_response, "result"):
response = raw_response.result()
else:
response = raw_response
# If it's a future-like object, get the result
if hasattr(raw_response, "result"):
response = raw_response.result()
else:
response = raw_response
# Process transcription results
transcription_found = False
# Process transcription results
transcription_found = False
# Now we can safely check results
# Type hint for the IDE
results = getattr(response, "results", [])
# Now we can safely check results
# Type hint for the IDE
results = getattr(response, "results", [])
for result in results:
alternatives = getattr(result, "alternatives", [])
if alternatives:
text = alternatives[0].transcript.strip()
if text:
logger.debug(f"Transcription: [{text}]")
yield TranscriptionFrame(
text,
self._user_id,
time_now_iso8601(),
self._language_enum,
)
transcription_found = True
for result in results:
alternatives = getattr(result, "alternatives", [])
if alternatives:
text = alternatives[0].transcript.strip()
if text:
logger.debug(f"Transcription: [{text}]")
yield TranscriptionFrame(
text,
self._user_id,
time_now_iso8601(),
self._language_enum,
)
transcription_found = True
await self._handle_transcription(text, True, self._language_enum)
if not transcription_found:
logger.debug("No transcription results found in NVIDIA Riva response")
except AttributeError as ae:
logger.error(f"Unexpected response structure from NVIDIA Riva: {ae}")
yield ErrorFrame(f"Unexpected NVIDIA Riva response format: {str(ae)}")
await self._handle_transcription(text, True, self._language_enum)
if not transcription_found:
logger.debug(f"{self}: No transcription results found in NVIDIA Riva response")
except AttributeError as ae:
logger.error(f"{self}: Unexpected response structure from NVIDIA Riva: {ae}")
yield ErrorFrame(f"{self}: Unexpected NVIDIA Riva response format: {str(ae)}")
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -12,7 +12,7 @@ gRPC API for high-quality speech synthesis.
import asyncio
import os
from typing import AsyncGenerator, Mapping, Optional
from typing import AsyncGenerator, AsyncIterable, Generator, Mapping, Optional
from pipecat.utils.tracing.service_decorators import traced_tts
@@ -25,6 +25,7 @@ from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
Frame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
@@ -34,14 +35,12 @@ from pipecat.transcriptions.language import Language
try:
import riva.client
import riva.client.proto.riva_tts_pb2 as rtts
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use NVIDIA Riva TTS, you need to `pip install pipecat-ai[nvidia]`.")
raise Exception(f"Missing module: {e}")
NVIDIA_TTS_TIMEOUT_SECS = 5
class NvidiaTTSService(TTSService):
"""NVIDIA Riva text-to-speech service.
@@ -93,6 +92,7 @@ class NvidiaTTSService(TTSService):
params = params or NvidiaTTSService.InputParams()
self._server = server
self._api_key = api_key
self._voice_id = voice_id
self._language_code = params.language
@@ -102,18 +102,8 @@ class NvidiaTTSService(TTSService):
self.set_model_name(model_function_map.get("model_name"))
self.set_voice(voice_id)
metadata = [
["function-id", self._function_id],
["authorization", f"Bearer {api_key}"],
]
auth = riva.client.Auth(None, self._use_ssl, server, metadata)
self._service = riva.client.SpeechSynthesisService(auth)
# warm up the service
config_response = self._service.stub.GetRivaSynthesisConfig(
riva.client.proto.riva_tts_pb2.RivaSynthesisConfigRequest()
)
self._service = None
self._config = None
async def set_model(self, model: str):
"""Attempt to set the TTS model.
@@ -129,6 +119,39 @@ class NvidiaTTSService(TTSService):
f"{self.__class__.__name__}(api_key=<api_key>, model_function_map={example})"
)
def _initialize_client(self):
if self._service is not None:
return
metadata = [
["function-id", self._function_id],
["authorization", f"Bearer {self._api_key}"],
]
auth = riva.client.Auth(None, self._use_ssl, self._server, metadata)
self._service = riva.client.SpeechSynthesisService(auth)
def _create_synthesis_config(self):
if not self._service:
return
# warm up the service
config = self._service.stub.GetRivaSynthesisConfig(
riva.client.proto.riva_tts_pb2.RivaSynthesisConfigRequest()
)
return config
async def start(self, frame: StartFrame):
"""Start the Cartesia TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._initialize_client()
self._config = self._create_synthesis_config()
logger.debug(f"Initialized NvidiaTTSService with model: {self.model_name}")
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using NVIDIA Riva TTS.
@@ -140,39 +163,43 @@ class NvidiaTTSService(TTSService):
Frame: Audio frames containing the synthesized speech data.
"""
def read_audio_responses(queue: asyncio.Queue):
def add_response(r):
asyncio.run_coroutine_threadsafe(queue.put(r), self.get_event_loop())
def read_audio_responses() -> Generator[rtts.SynthesizeSpeechResponse, None, None]:
responses = self._service.synthesize_online(
text,
self._voice_id,
self._language_code,
sample_rate_hz=self.sample_rate,
zero_shot_audio_prompt_file=None,
zero_shot_quality=self._quality,
custom_dictionary={},
)
return responses
def async_next(it):
try:
responses = self._service.synthesize_online(
text,
self._voice_id,
self._language_code,
sample_rate_hz=self.sample_rate,
zero_shot_audio_prompt_file=None,
zero_shot_quality=self._quality,
custom_dictionary={},
)
for r in responses:
add_response(r)
add_response(None)
except Exception as e:
logger.error(f"{self} exception: {e}")
add_response(None)
return next(it)
except StopIteration:
return None
await self.start_ttfb_metrics()
yield TTSStartedFrame()
logger.debug(f"{self}: Generating TTS [{text}]")
async def async_iterator(iterator) -> AsyncIterable[rtts.SynthesizeSpeechResponse]:
while True:
item = await asyncio.to_thread(async_next, iterator)
if item is None:
return
yield item
try:
queue = asyncio.Queue()
await asyncio.to_thread(read_audio_responses, queue)
assert self._service is not None, "TTS service not initialized"
assert self._config is not None, "Synthesis configuration not created"
# Wait for the thread to start.
resp = await asyncio.wait_for(queue.get(), timeout=NVIDIA_TTS_TIMEOUT_SECS)
while resp:
await self.start_ttfb_metrics()
yield TTSStartedFrame()
logger.debug(f"{self}: Generating TTS [{text}]")
responses = await asyncio.to_thread(read_audio_responses)
async for resp in async_iterator(responses):
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(
audio=resp.audio,
@@ -180,10 +207,12 @@ class NvidiaTTSService(TTSService):
num_channels=1,
)
yield frame
resp = await asyncio.wait_for(queue.get(), timeout=NVIDIA_TTS_TIMEOUT_SECS)
await self.start_tts_usage_metrics(text)
yield TTSStoppedFrame()
except asyncio.TimeoutError:
logger.error(f"{self} timeout waiting for audio response")
yield ErrorFrame(error=f"{self} error: {e}")
await self.start_tts_usage_metrics(text)
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")

View File

@@ -25,3 +25,13 @@ with warnings.catch_warnings():
DeprecationWarning,
stacklevel=2,
)
__all__ = [
"AzureRealtimeLLMService",
"InputAudioNoiseReduction",
"InputAudioTranscription",
"SemanticTurnDetection",
"SessionProperties",
"TurnDetection",
"OpenAIRealtimeLLMService",
]

View File

@@ -7,3 +7,13 @@ from .events import (
TurnDetection,
)
from .openai import OpenAIRealtimeBetaLLMService
__all__ = [
"AzureRealtimeBetaLLMService",
"InputAudioNoiseReduction",
"InputAudioTranscription",
"SemanticTurnDetection",
"SessionProperties",
"TurnDetection",
"OpenAIRealtimeBetaLLMService",
]

View File

@@ -10,7 +10,7 @@ This module provides an OpenPipe-specific implementation of the OpenAI LLM servi
enabling integration with OpenPipe's fine-tuning and monitoring capabilities.
"""
from typing import Dict, List, Optional
from typing import Dict, Optional
from loguru import logger

View File

@@ -152,6 +152,8 @@ class STTService(AIService):
self._settings[key] = value
if key == "language":
await self.set_language(value)
elif key == "language":
await self.set_language(value)
elif key == "model":
self.set_model_name(value)
else:

View File

@@ -19,7 +19,6 @@ from typing import Any, Dict, List, Literal, Optional, Union
import aiohttp
from loguru import logger
from openai.types import chat as openai_chat_types
from pydantic import BaseModel, Field
from pipecat.adapters.schemas.tools_schema import ToolsSchema

Some files were not shown because too many files have changed in this diff Show More