Compare commits

..

72 Commits

Author SHA1 Message Date
mattie ruth backman
50b19a9e77 minor updates to get started and working on latest modal 2025-04-23 21:25:45 -04:00
Aleix Conchillo Flaqué
f9d1a53e28 Merge pull request #1609 from pipecat-ai/aleix/pyproject-py-typed
pyproject: fix license fields
2025-04-21 16:14:22 -07:00
Mark Backman
3f3010af79 Add a SmartTurnMetricsData class, emitted by Metrics Frame in response to smart turn responses 2025-04-21 18:56:14 -04:00
Aleix Conchillo Flaqué
a02d47ddbd Merge pull request #1625 from 0xPatryk/patch-1
Fixed AttributeError: object has no attribute '_sample_rate"
2025-04-21 15:40:54 -07:00
Patryk
a649aff3e7 Fixed AttributeError: 'OpenAITTSService' object has no attribute '_sample_rate' 2025-04-21 11:03:45 +02:00
Mark Backman
747a821943 Merge pull request #1614 from pipecat-ai/mb/changelog-for-1525
Add CHANGELOG entry for PR 1525
2025-04-19 07:10:13 -04:00
Aleix Conchillo Flaqué
010db3ccd5 README: minor update 2025-04-18 20:57:05 -07:00
Aleix Conchillo Flaqué
db773b8b93 Merge pull request #1616 from pipecat-ai/aleix/new-readme
make README more fun
2025-04-18 18:15:35 -07:00
Mark Backman
16b7bf71b4 Additional README changes 2025-04-18 21:00:57 -04:00
Aleix Conchillo Flaqué
82d19508a4 make README more fun 2025-04-18 14:37:28 -07:00
Mark Backman
dc3646f0e7 Merge pull request #1615 from pipecat-ai/mb/issue-template
Add issue templates and move the pull request template to .github
2025-04-18 14:58:09 -04:00
Mark Backman
62e659cd3a Update to .yml templates so that types are used 2025-04-18 13:21:01 -04:00
Mark Backman
b2945f44fd Add issue templates and move the pull request template to .github 2025-04-18 12:17:46 -04:00
Mark Backman
618fbef81c Add CHANGELOG entry for PR 1525 2025-04-18 11:32:34 -04:00
Mark Backman
70c42dfa6e Merge pull request #1525 from shaiyon/google-default-creds
Enable usage of Application Default Credentials in Google services
2025-04-18 11:31:08 -04:00
Mark Backman
9ab374dd1f Merge pull request #1612 from pipecat-ai/mb/07g-stt-model
examples: Fix 07g by changing STT model
2025-04-18 08:04:20 -04:00
Mark Backman
cc6d284417 examples: Fix 07g by changing STT model 2025-04-18 07:13:34 -04:00
Filipi da Silva Fuchter
f77d8f0b6f Merge pull request #1611 from pipecat-ai/smart_turn_changelog
Mentioning the Smart Turn Detection into the changelog.
2025-04-17 23:02:57 -03:00
Varun Singh
9c0beb05cf Merge pull request #1597 from pipecat-ai/vr000m-opus-added
Changing default codec to OPUS for telephony
2025-04-17 18:42:12 -07:00
Aleix Conchillo Flaqué
858981c404 Merge pull request #1610 from pipecat-ai/aleix/add-base-turn-analyzer
audio: add BaseTurnAnalyzer class
2025-04-17 18:38:08 -07:00
Aleix Conchillo Flaqué
9eed225aa2 audio: add BaseTurnAnalyzer class 2025-04-17 18:37:52 -07:00
Filipi Fuchter
9f7371e485 Mentioning the Smart Turn Detection into the changelog. 2025-04-17 22:31:40 -03:00
Aleix Conchillo Flaqué
d77c37ff14 pyproject: add py.typed (PEP 561) 2025-04-17 17:29:04 -07:00
Aleix Conchillo Flaqué
b4916f9dae pyproject: fix license fields 2025-04-17 17:28:14 -07:00
Aleix Conchillo Flaqué
004a920920 Merge pull request #1563 from Bnowako/packaging-type-information
Add marker file for static type checkers
2025-04-17 17:26:15 -07:00
Filipi da Silva Fuchter
203c5a3a60 Merge pull request #1592 from pipecat-ai/smart_turn
Smart turn
2025-04-17 18:21:47 -03:00
Filipi Fuchter
7f6fb1754b Merge remote-tracking branch 'origin/smart_turn' into smart_turn 2025-04-17 17:53:53 -03:00
Filipi Fuchter
a390ce13a4 Removing the UserEndOfTurnFrame 2025-04-17 17:53:31 -03:00
Filipi da Silva Fuchter
61d31d1c40 Restoring stop_secs to default value.
Co-authored-by: Mark Backman <mark@daily.co>
2025-04-17 17:44:47 -03:00
Filipi da Silva Fuchter
e872ff943a Using the default model for OpenAi.
Co-authored-by: Mark Backman <mark@daily.co>
2025-04-17 17:43:39 -03:00
Filipi da Silva Fuchter
c71005e249 Using the default model for OpenAi.
Co-authored-by: Mark Backman <mark@daily.co>
2025-04-17 17:43:23 -03:00
Filipi Fuchter
6e06bf97c0 Preventing emitting the UserStartedSpeaking event multiple times. 2025-04-17 17:21:29 -03:00
Filipi Fuchter
a80dc94e91 Fixing ruff format. 2025-04-17 16:47:17 -03:00
Filipi Fuchter
3ea9cfd251 Keeping the _speech_triggered as true if the state is incomplete. 2025-04-17 16:46:15 -03:00
Filipi Fuchter
a80f82cdb6 Moving the environment variables to inside the demo. 2025-04-17 16:28:50 -03:00
Aleix Conchillo Flaqué
d24bab354f Merge pull request #1607 from pipecat-ai/aleix/fix-websocket-disconnects
services: fix TTS websocket services disconnections
2025-04-17 12:27:52 -07:00
Filipi Fuchter
53ee3fb64c Changing the log levels used in smart_turn 2025-04-17 16:14:13 -03:00
Filipi Fuchter
3599761e4e Changing the default behavior to only use the last vad segment, and increasing the default stop_secs to 3 2025-04-17 16:07:03 -03:00
Aleix Conchillo Flaqué
c0b3fe3985 services: only read from TTS websocket if websocket connection established 2025-04-17 11:54:07 -07:00
Aleix Conchillo Flaqué
497d48b6c8 services: fix TTS websocket services disconnections
Fixes #1467
2025-04-17 11:29:49 -07:00
Filipi Fuchter
e179916c9c Creating a new param use_only_last_vad_segment 2025-04-17 11:49:51 -03:00
Filipi Fuchter
b0b38beb19 Returning the max duration back to 8 seconds. 2025-04-17 11:39:48 -03:00
Filipi Fuchter
8577139d21 Fixing to keep the last max samples. 2025-04-17 11:39:06 -03:00
Filipi Fuchter
e2fbbb4b40 Renaming the smart turn classes. 2025-04-17 10:43:21 -03:00
Filipi Fuchter
88ce117e84 Changing the max duration default value to 16 seconds. 2025-04-17 10:35:13 -03:00
Filipi Fuchter
266537c3f4 Fixing to respect the stop_secs. 2025-04-17 10:07:08 -03:00
Filipi Fuchter
230d2f80fa Merge branch 'main' into smart_turn 2025-04-17 09:36:30 -03:00
Filipi Fuchter
3f0688aefa Testing smart turn using stop_secs as 5 seconds 2025-04-17 09:36:03 -03:00
Filipi da Silva Fuchter
5be3e6979e Merge pull request #1533 from pipecat-ai/daily_small_webrtc
Example interoping between SmallWebRTC and Daily
2025-04-17 09:19:23 -03:00
Filipi Fuchter
a458c1e92b Improving the README and fixing the env.example 2025-04-16 18:38:48 -03:00
Filipi Fuchter
5bbf1d0209 Example interoping between SmallWebRTC and Daily. 2025-04-16 17:14:12 -03:00
Filipi Fuchter
8e36bdbed7 Adding some comments to the code. 2025-04-16 09:11:27 -03:00
Filipi Fuchter
cd8bd7f487 Adding some comments to the code. 2025-04-16 08:58:40 -03:00
Filipi Fuchter
5fa47b7a5c Adding the dependencies for the remote smart turn 2025-04-16 08:45:01 -03:00
Filipi Fuchter
616961b487 Stop removing segments from the end 2025-04-16 08:04:38 -03:00
Filipi Fuchter
650d4d9ee2 Changing the start speech time and adding logs. 2025-04-16 07:55:20 -03:00
Filipi Fuchter
2627cb6bf2 Allowing to define SmartTurnParams 2025-04-16 07:13:13 -03:00
Filipi Fuchter
0e4115049b Refactoring to use keep alive sessions. 2025-04-16 06:44:57 -03:00
Filipi Fuchter
3ebef9346f Adding support for RemoteSmartTurn 2025-04-16 06:33:42 -03:00
Filipi Fuchter
3e2d21779f Refactoring the BaseEndOfTurnAnalyzer to include most of the logic 2025-04-16 06:11:56 -03:00
Filipi Fuchter
cfefcac35f Resetting the silence frames when the user speaks. 2025-04-15 20:51:36 -03:00
Filipi Fuchter
57b39c084f Triggering to check if the turn is complete based on the maximum timeout 2025-04-15 20:42:41 -03:00
Filipi Fuchter
11b6de0900 Triggering to check if the turn is complete each time the user stops speaking based on the vad 2025-04-15 17:28:00 -03:00
Varun Singh
824bc9bf16 Update dial.js 2025-04-15 12:48:33 -07:00
Varun Singh
d0ddef6c12 Update server.py 2025-04-15 12:37:33 -07:00
Filipi Fuchter
e6325a8229 Integrating with the smart turn model to predict 2025-04-15 16:01:09 -03:00
Filipi Fuchter
3588b06718 Adding missing torch dependency. 2025-04-15 12:28:36 -03:00
Filipi Fuchter
73874f6ec0 Loading the smart turn model. 2025-04-15 12:11:06 -03:00
Filipi Fuchter
6ab9a8ad7f Starting to create a local smart turn 2025-04-15 11:24:39 -03:00
Filipi Fuchter
821e303249 Bringing Aleix initial implementation for the smart turn. 2025-04-15 10:21:40 -03:00
Bnowako
61cba0136f Add marker file for static type checkers 2025-04-11 11:00:57 +02:00
Shaiyon Hariri
af23200511 Use default google creds as fallback when not provided in llm_vertex,stt, and tts 2025-04-03 16:42:58 -04:00
50 changed files with 1713 additions and 239 deletions

87
.github/ISSUE_TEMPLATE/1-bug_report.yml vendored Normal file
View File

@@ -0,0 +1,87 @@
name: Bug report
description: Report a bug or unexpected behavior
type: Bug
body:
- type: markdown
attributes:
value: |
## Bug Report
Thank you for taking the time to fill out this bug report.
- type: markdown
attributes:
value: |
### Environment
- type: input
id: pipecat-version
attributes:
label: pipecat version
description: Which version are you using?
placeholder: e.g., 0.0.63
validations:
required: true
- type: input
id: python-version
attributes:
label: Python version
description: Which Python version are you using?
placeholder: e.g., 3.12.8
validations:
required: true
- type: input
id: os
attributes:
label: Operating System
description: Which OS are you using?
placeholder: e.g., Ubuntu 24.04, Windows 11, macOS 12.5
validations:
required: true
- type: textarea
id: description
attributes:
label: Issue description
description: Provide a clear description of the issue.
validations:
required: true
- type: textarea
id: repro
attributes:
label: Reproduction steps
description: List the steps to reproduce the issue.
placeholder: |
1. Do this...
2. Then do that...
3. Observe the error...
validations:
required: true
- type: textarea
id: expected
attributes:
label: Expected behavior
description: What did you expect to happen?
validations:
required: true
- type: textarea
id: actual
attributes:
label: Actual behavior
description: What actually happened?
validations:
required: true
- type: textarea
id: logs
attributes:
label: Logs
description: If applicable, include any relevant logs or error messages
render: shell
validations:
required: false

67
.github/ISSUE_TEMPLATE/2-question.yml vendored Normal file
View File

@@ -0,0 +1,67 @@
name: Question
description: Ask a question or get help
type: Question
body:
- type: markdown
attributes:
value: |
## Question
Use this form to ask a question about pipecat.
- type: markdown
attributes:
value: |
### Environment (if applicable)
- type: input
id: pipecat-version
attributes:
label: pipecat version
description: Which version are you using? (if applicable)
placeholder: e.g., 0.0.63
validations:
required: false
- type: input
id: python-version
attributes:
label: Python version
description: Which Python version are you using? (if applicable)
placeholder: e.g., 3.12.8
validations:
required: false
- type: input
id: os
attributes:
label: Operating System
description: Which OS are you using? (if applicable)
placeholder: e.g., Ubuntu 24.04, Windows 11, macOS 12.5
validations:
required: false
- type: textarea
id: question
attributes:
label: Question
description: Provide your question in detail here.
validations:
required: true
- type: textarea
id: tried
attributes:
label: What I've tried
description: Describe what you've already tried or research you've done.
placeholder: I've looked at the documentation and tried...
validations:
required: false
- type: textarea
id: context
attributes:
label: Context
description: Any additional context or information that might help others understand your question better.
validations:
required: false

View File

@@ -0,0 +1,52 @@
name: Feature request
description: Suggest an enhancement or new feature
type: Enhancement
body:
- type: markdown
attributes:
value: |
## Feature Request
Thank you for suggesting an enhancement to pipecat.
- type: textarea
id: problem
attributes:
label: Problem Statement
description: A clear description of the problem this feature would solve.
placeholder: I'm always frustrated when...
validations:
required: true
- type: textarea
id: solution
attributes:
label: Proposed Solution
description: A clear and concise description of what you want to happen.
validations:
required: true
- type: textarea
id: alternatives
attributes:
label: Alternative Solutions
description: Any alternative solutions or features you've considered.
validations:
required: false
- type: textarea
id: context
attributes:
label: Additional Context
description: Add any other context, mockups, or screenshots about the feature request here.
placeholder: You can drag and drop images here to include them.
validations:
required: false
- type: checkboxes
id: contribution
attributes:
label: Would you be willing to help implement this feature?
options:
- label: Yes, I'd like to contribute
- label: No, I'm just suggesting

View File

@@ -0,0 +1,82 @@
name: Service Issue
description: An issue with a third-party service
type: Service Issue
body:
- type: markdown
attributes:
value: |
## Service Issue
Use this form to report an issue with a third-party service integration.
- type: input
id: pipecat-version
attributes:
label: pipecat version
description: Which version are you using?
placeholder: e.g., 0.0.63
validations:
required: true
- type: input
id: service-name
attributes:
label: Service Name
description: Which third-party service is having issues?
placeholder: e.g., OpenAI, ElevenLabs, Anthropic
validations:
required: true
- type: input
id: service-version
attributes:
label: Service or model version
description: Which version of the service API or model are you using?
placeholder: e.g., v1, gpt-4.1
validations:
required: false
- type: textarea
id: description
attributes:
label: Issue Description
description: Provide a clear description of the service issue.
validations:
required: true
- type: textarea
id: reproduction
attributes:
label: Reproduction Steps
description: Provide steps to reproduce the issue.
placeholder: |
1. Configure service X
2. Call method Y
3. See error Z
validations:
required: true
- type: textarea
id: expected
attributes:
label: Expected Behavior
description: What did you expect to happen?
validations:
required: true
- type: textarea
id: actual
attributes:
label: Actual Behavior
description: What actually happened?
validations:
required: true
- type: textarea
id: logs
attributes:
label: Error Logs
description: If available, include any error messages or logs.
render: shell
validations:
required: false

View File

@@ -0,0 +1,56 @@
name: New Service
description: Request to support a new third-party service
type: New Service
body:
- type: markdown
attributes:
value: |
## New Service Request
Use this form to request support for a new third-party service in pipecat.
- type: input
id: service-name
attributes:
label: Service Name
description: What is the name of the third-party service?
placeholder: e.g., NewAPI, SomeService
validations:
required: true
- type: input
id: service-website
attributes:
label: Service Website
description: Link to the service's website or documentation
placeholder: e.g., https://newapi.com
validations:
required: true
- type: textarea
id: service-description
attributes:
label: Service Description
description: Briefly describe what this service does and how it works.
validations:
required: true
- type: textarea
id: api-info
attributes:
label: API Information
description: If available, provide details about the service's API.
placeholder: |
- API documentation link
- Authentication method
- Key endpoints you'd like supported
validations:
required: false
- type: checkboxes
id: contribution
attributes:
label: Would you be willing to help implement this service?
options:
- label: Yes, I'd like to contribute
- label: No, I'm just suggesting

74
.github/ISSUE_TEMPLATE/6-dependency.yml vendored Normal file
View File

@@ -0,0 +1,74 @@
name: Dependency Issue
description: An issue with a Pipecat dependency (not a third-party service)
type: Dependency Issue
body:
- type: markdown
attributes:
value: |
## Dependency Issue
Use this form to report an issue with a Pipecat dependency.
- type: input
id: pipecat-version
attributes:
label: pipecat version
description: Which version are you using?
placeholder: e.g., 0.0.63
validations:
required: true
- type: input
id: dependency-name
attributes:
label: Dependency Name
description: Which Pipecat dependency is causing the issue?
placeholder: e.g., openai, anthropic, fastapi
validations:
required: true
- type: input
id: dependency-version
attributes:
label: Dependency Version
description: Which version of the dependency are you using?
placeholder: e.g., 1.2.3
validations:
required: true
- type: textarea
id: description
attributes:
label: Issue Description
description: Provide a clear description of the dependency issue.
validations:
required: true
- type: textarea
id: impact
attributes:
label: Impact
description: How is this dependency issue affecting your usage of pipecat?
validations:
required: true
- type: textarea
id: reproduction
attributes:
label: Reproduction Steps
description: If applicable, provide steps to reproduce the issue.
placeholder: |
1. Install dependency X
2. Run command Y
3. See error Z
validations:
required: false
- type: textarea
id: logs
attributes:
label: Error Logs
description: If applicable, include any relevant error messages or logs.
render: shell
validations:
required: false

View File

@@ -0,0 +1,70 @@
name: Troubleshooting
description: Help with a specific use case
type: Troubleshooting
body:
- type: markdown
attributes:
value: |
## Troubleshooting Request
Use this form to get help with a specific use case or implementation.
- type: input
id: pipecat-version
attributes:
label: pipecat version
description: Which version are you using?
placeholder: e.g., 0.0.63
validations:
required: true
- type: input
id: python-version
attributes:
label: Python version
description: Which version of Python are you using?
placeholder: e.g., 3.12.8
validations:
required: true
- type: input
id: os
attributes:
label: Operating System
description: Which OS are you using?
placeholder: e.g., Ubuntu 24.04, Windows 11, macOS 12.5
validations:
required: true
- type: textarea
id: use-case
attributes:
label: Use Case Description
description: Describe what you're trying to accomplish with pipecat.
validations:
required: true
- type: textarea
id: current-approach
attributes:
label: Current Approach
description: What have you tried so far? Include code snippets if relevant.
render: python
validations:
required: true
- type: textarea
id: errors
attributes:
label: Errors or Unexpected Behavior
description: Describe any errors or unexpected behavior you're encountering.
validations:
required: true
- type: textarea
id: additional-context
attributes:
label: Additional Context
description: Any other information that might help us understand your situation.
validations:
required: false

1
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View File

@@ -0,0 +1 @@
blank_issues_enabled: false

View File

@@ -9,6 +9,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `SmartTurnMetricsData`, which contains end-of-turn prediction metrics,
to the `MetricsFrame`. Using `MetricsFrame`, you can now retrieve prediction
confidence scores and processing time metrics from the smart turn analyzers.
- Added support for Application Default Credentials in Google services,
`GoogleSTTService`, `GoogleTTSService`, and `GoogleVertexLLMService`.
- Added support for Smart Turn Detection via the `turn_analyzer` transport
parameter. You can now choose between `SmartTurnAnalyzer()` for remote
inference or `LocalCoreMLSmartTurnAnalyzer()` for on-device inference using
Core ML.
- `DeepgramTTSService` accepts `base_url` argument again, allowing you to
connect to an on-prem service.
@@ -55,6 +67,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue that would cause TTS websocket-based services to not cleanup
resources properly when disconnecting.
- Fixed a `TavusVideoService` issue that was causing audio choppiness.
- Fixed an issue in `SmallWebRTCTransport` where an error was thrown if the

233
README.md
View File

@@ -1,43 +1,72 @@
<h1><div align="center">
 <img alt="pipecat" width="300px" height="auto" src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/pipecat.png">
<img alt="pipecat" width="300px" height="auto" src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/pipecat.png">
</div></h1>
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) ![Tests](https://github.com/pipecat-ai/pipecat/actions/workflows/tests.yaml/badge.svg) [![codecov](https://codecov.io/gh/pipecat-ai/pipecat/graph/badge.svg?token=LNVUIVO4Y9)](https://codecov.io/gh/pipecat-ai/pipecat) [![Docs](https://img.shields.io/badge/Documentation-blue)](https://docs.pipecat.ai) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat)
Pipecat is an open source Python framework for building voice and multimodal conversational agents. It handles the complex orchestration of AI services, network transport, audio processing, and multimodal interactions, letting you focus on creating engaging experiences.
# 🎙️ Pipecat: Real-Time Voice & Multimodal AI Agents
## What you can build
**Pipecat** is an open-source Python framework for building real-time voice and multimodal conversational agents. Orchestrate audio and video, AI services, different transports, and conversation pipelines effortlessly—so you can focus on what makes your agent unique.
- **Voice Assistants**: [Natural, real-time conversations with AI](https://demo.dailybots.ai/)
- **Interactive Agents**: Personal coaches and meeting assistants
- **Multimodal Apps**: Combine voice, video, images, and text
- **Creative Tools**: [Story-telling experiences](https://storytelling-chatbot.fly.dev/) and social companions
- **Business Solutions**: [Customer intake flows](https://www.youtube.com/watch?v=lDevgsp9vn0) and support bots
- **Complex conversational flows**: [Refer to Pipecat Flows](https://github.com/pipecat-ai/pipecat-flows) to learn more
## 🚀 What You Can Build
## See it in action
- **Voice Assistants** natural, streaming conversations with AI
- **AI Companions** coaches, meeting assistants, characters
- **Multimodal Interfaces** voice, video, images, and more
- **Interactive Storytelling** creative tools with generative media
- **Business Agents** customer intake, support bots, guided flows
- **Complex Dialog Systems** design logic with structured conversations
🧭 Looking to build structured conversations? Check out [Pipecat Flows](https://github.com/pipecat-ai/pipecat-flows) for managing complex conversational states and transitions.
## 🧠 Why Pipecat?
- **Voice-first**: Integrates speech recognition, text-to-speech, and conversation handling
- **Pluggable**: Supports many AI services and tools
- **Composable Pipelines**: Build complex behavior from modular components
- **Real-Time**: Ultra-low latency interaction with different transports (e.g. WebSockets or WebRTC)
## 🎬 See it in action
<p float="left">
<a href="https://github.com/pipecat-ai/pipecat/tree/main/examples/simple-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/examples/simple-chatbot/image.png" width="280" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat/tree/main/examples/storytelling-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/examples/storytelling-chatbot/image.png" width="280" /></a>
<a href="https://github.com/pipecat-ai/pipecat/tree/main/examples/simple-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/examples/simple-chatbot/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat/tree/main/examples/storytelling-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/examples/storytelling-chatbot/image.png" width="400" /></a>
<br/>
<a href="https://github.com/pipecat-ai/pipecat/tree/main/examples/translation-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/examples/translation-chatbot/image.png" width="280" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat/tree/main/examples/moondream-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/examples/moondream-chatbot/image.png" width="280" /></a>
<a href="https://github.com/pipecat-ai/pipecat/tree/main/examples/translation-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/examples/translation-chatbot/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat/tree/main/examples/moondream-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/examples/moondream-chatbot/image.png" width="400" /></a>
</p>
## Key features
## 📱 Client SDKs
- **Voice-first Design**: Built-in speech recognition, TTS, and conversation handling
- **Flexible Integration**: Works with popular AI services (OpenAI, ElevenLabs, etc.)
- **Pipeline Architecture**: Build complex apps from simple, reusable components
- **Real-time Processing**: Frame-based pipeline architecture for fluid interactions
- **Production Ready**: Enterprise-grade WebRTC and Websocket support
You can connect to Pipecat from any platform using our official SDKs:
💡 Looking to build structured conversations? Check out [Pipecat Flows](https://github.com/pipecat-ai/pipecat-flows) for managing complex conversational states and transitions.
| Platform | SDK Repo | Description |
| -------- | ------------------------------------------------------------------------------ | -------------------------------- |
| Web | [pipecat-client-web](https://github.com/pipecat-ai/pipecat-client-web) | JavaScript and React client SDKs |
| iOS | [pipecat-client-ios](https://github.com/pipecat-ai/pipecat-client-ios) | Swift SDK for iOS |
| Android | [pipecat-client-android](https://github.com/pipecat-ai/pipecat-client-android) | Kotlin SDK for Android |
| C++ | [pipecat-client-cxx](https://github.com/pipecat-ai/pipecat-client-cxx) | C++ client SDK |
## Getting started
## 🧩 Available services
You can get started with Pipecat running on your local machine, then move your agent processes to the cloud when youre ready. You can also add a 📞 telephone number, 🖼️ image output, 📺 video input, use different LLMs, and more.
| Category | Services |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) |
| Analytics & Metrics | [Canonical AI](https://docs.pipecat.ai/server/services/analytics/canonical), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)
## ⚡ Getting started
You can get started with Pipecat running on your local machine, then move your agent processes to the cloud when youre ready.
```shell
# Install the module
@@ -53,141 +82,51 @@ To keep things lightweight, only the core framework is included by default. If y
pip install "pipecat-ai[option,...]"
```
### Available services
| Category | Services | Install Command Example |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Parakeet (NVIDIA)](https://docs.pipecat.ai/server/services/stt/parakeet), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [Together AI](https://docs.pipecat.ai/server/services/llm/together) | `pip install "pipecat-ai[openai]"` |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [FastPitch (NVIDIA)](https://docs.pipecat.ai/server/services/tts/fastpitch), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` |
| Speech-to-Speech | [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) | `pip install "pipecat-ai[google]"` |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local | `pip install "pipecat-ai[daily]"` |
| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) | `pip install "pipecat-ai[tavus,simli]"` |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) | `pip install "pipecat-ai[mem0]"` |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) | `pip install "pipecat-ai[moondream]"` |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) | `pip install "pipecat-ai[silero]"` |
| Analytics & Metrics | [Canonical AI](https://docs.pipecat.ai/server/services/analytics/canonical), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) | `pip install "pipecat-ai[canonical]"` |
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)
## Code examples
## 🧪 Code examples
- [Foundational](https://github.com/pipecat-ai/pipecat/tree/main/examples/foundational) — small snippets that build on each other, introducing one or two concepts at a time
- [Example apps](https://github.com/pipecat-ai/pipecat/tree/main/examples/) — complete applications that you can use as starting points for development
## A simple voice agent running locally
## 🛠️ Hacking on the framework itself
Here is a very basic Pipecat bot that greets a user when they join a real-time session. We'll use [Daily](https://daily.co) for real-time media transport, and [Cartesia](https://cartesia.ai/) for text-to-speech.
1. Set up a virtual environment before following these instructions. From the root of the repo:
```python
import asyncio
```shell
python3 -m venv venv
source venv/bin/activate
```
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
2. Install the development dependencies:
async def main():
# Use Daily as a real-time media transport (WebRTC)
transport = DailyTransport(
room_url=...,
token="", # leave empty. Note: token is _not_ your api key
bot_name="Bot Name",
params=DailyParams(audio_out_enabled=True))
```shell
pip install -r dev-requirements.txt
```
# Use Cartesia for Text-to-Speech
tts = CartesiaTTSService(
api_key=...,
voice_id=...
)
3. Install the git pre-commit hooks (these help ensure your code follows project rules):
# Simple pipeline that will process text to speech and output the result
pipeline = Pipeline([tts, transport.output()])
```shell
pre-commit install
```
# Create Pipecat processor that can run one or more pipelines tasks
runner = PipelineRunner()
4. Install the `pipecat-ai` package locally in editable mode:
# Assign the task callable to run the pipeline
task = PipelineTask(pipeline)
```shell
pip install -e .
```
# Register an event handler to play audio when a
# participant joins the transport WebRTC session
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
# Queue a TextFrame that will get spoken by the TTS service (Cartesia)
await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
> The `-e` or `--editable` option allows you to modify the code without reinstalling.
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
5. Include optional dependencies as needed. For example:
# Run the pipeline task
await runner.run(task)
```shell
pip install -e ".[daily,deepgram,cartesia,openai,silero]"
```
if __name__ == "__main__":
asyncio.run(main())
```
6. (Optional) If you want to use this package from another directory:
Run it with:
```shell
python app.py
```
Daily provides a prebuilt WebRTC user interface. While the app is running, you can visit at `https://<yourdomain>.daily.co/<room_url>` and listen to the bot say hello!
## WebRTC for production use
WebSockets are fine for server-to-server communication or for initial development. But for production use, youll need client-server audio to use a protocol designed for real-time media transport. (For an explanation of the difference between WebSockets and WebRTC, see [this post.](https://www.daily.co/blog/how-to-talk-to-an-llm-with-your-voice/#webrtc))
One way to get up and running quickly with WebRTC is to sign up for a Daily developer account. Daily gives you SDKs and global infrastructure for audio (and video) routing. Every account gets 10,000 audio/video/transcription minutes free each month.
Sign up [here](https://dashboard.daily.co/u/signup) and [create a room](https://docs.daily.co/reference/rest-api/rooms) in the developer Dashboard.
## Hacking on the framework itself
_Note: You may need to set up a virtual environment before following these instructions. From the root of the repo:_
```shell
python3 -m venv venv
source venv/bin/activate
```
Install the development dependencies:
```shell
pip install -r dev-requirements.txt
```
Install the git pre-commit hooks (these help ensure your code follows project rules):
```shell
pre-commit install
```
Install the `pipecat-ai` package locally in editable mode:
```shell
pip install -e .
```
The `-e` or `--editable` option allows you to modify the code without reinstalling.
To include optional dependencies, add them to the install command. For example:
```shell
pip install -e ".[daily,deepgram,cartesia,openai,silero]" # Updated for the services you're using
```
If you want to use this package from another directory:
```shell
pip install "path_to_this_repo[option,...]"
```
```shell
pip install "path_to_this_repo[option,...]"
```
### Running tests
@@ -197,11 +136,11 @@ From the root directory, run:
pytest
```
## Setting up your editor
### Setting up your editor
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting via [Ruff](https://github.com/astral-sh/ruff).
### Emacs
#### Emacs
You can use [use-package](https://github.com/jwiegley/use-package) to install [emacs-lazy-ruff](https://github.com/christophermadsen/emacs-lazy-ruff) package and configure `ruff` arguments:
@@ -223,7 +162,7 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
:hook ((python-mode . pyvenv-auto-run)))
```
### Visual Studio Code
#### Visual Studio Code
Install the
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, and enable formatting on save:
@@ -235,7 +174,7 @@ Install the
}
```
### PyCharm
#### PyCharm
`ruff` was installed in the `venv` environment described before, now to enable autoformatting on save, go to `File` -> `Settings` -> `Tools` -> `File Watchers` and add a new watcher with the following settings:
@@ -245,7 +184,7 @@ Install the
4. **Arguments**: `format $FilePath$`
5. **Program**: `$PyInterpreterDirectory$/ruff`
## Contributing
## 🤝 Contributing
We welcome contributions from the community! Whether you're fixing bugs, improving documentation, or adding new features, here's how you can help:
@@ -258,7 +197,7 @@ Before submitting a pull request, please check existing issues and PRs to avoid
We aim to review all contributions promptly and provide constructive feedback to help get your changes merged.
## Getting help
## 🛟 Getting help
➡️ [Join our Discord](https://discord.gg/pipecat)

View File

@@ -1,22 +0,0 @@
# Description
Is this reporting a bug or feature request?
If reporting a bug, please fill out the following:
### Environment
- pipecat-ai version:
- python version:
- OS:
### Issue description
Provide a clear description of the issue.
### Repro steps
List the steps to reproduce the issue.
### Expected behavior
### Actual behavior
### Logs

View File

@@ -92,4 +92,8 @@ ASSEMBLYAI_API_KEY=...
OPENROUTER_API_KEY=...
# Piper
PIPER_BASE_URL=...
PIPER_BASE_URL=...
# Smart turn
LOCAL_SMART_TURN_MODEL_PATH=
REMOTE_SMART_TURN_URL=

View File

@@ -10,24 +10,27 @@ import aiohttp
import modal
from bot import _voice_bot_process
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from fastapi.responses import RedirectResponse
from loguru import logger
MAX_SESSION_TIME = 15 * 60 # 15 minutes
app = modal.App("pipecat-modal")
image = modal.Image.debian_slim(python_version="3.12").pip_install_from_requirements(
"requirements.txt"
image = (
modal.Image.debian_slim(python_version="3.13")
.apt_install("ffmpeg")
.pip_install_from_requirements("requirements.txt")
.pip_install("pipecat-ai[daily,silero,cartesia,openai]")
.add_local_python_source("bot")
)
app = modal.App("pipecat-modal", image=image)
@app.function(
image=image,
cpu=1.0,
secrets=[modal.Secret.from_dotenv()],
keep_warm=1,
min_containers=1,
enable_memory_snapshot=True,
max_inputs=1, # Do not reuse instances across requests
retries=0,
@@ -40,7 +43,7 @@ def launch_bot_process(room_url: str, token: str):
image=image,
secrets=[modal.Secret.from_dotenv()],
)
@modal.web_endpoint(method="POST")
@modal.fastapi_endpoint(method="GET")
async def start():
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
@@ -77,4 +80,4 @@ async def start():
# Return room URL to the user to join
# Note: in production, you would want to return a token to the user
return JSONResponse(content={"room_url": room.url, token: token})
return RedirectResponse(room.url)

View File

@@ -1,5 +1,4 @@
python-dotenv==1.0.1
modal==0.71.3
pipecat-ai[daily,silero,cartesia,openai]==0.0.52
fastapi==0.115.6
aiohttp==3.11.11

View File

@@ -141,6 +141,7 @@ async def dial(request: RoomRequest, raw_request: Request):
"display_name": request.From,
"sip_mode": "dial-in",
"num_endpoints": 2 if request.call_transfer is not None else 1,
"codecs": {"audio": ["OPUS"]},
}
daily_room_properties["sip"] = sip_config

View File

@@ -103,6 +103,7 @@ export default async function handler(req, res) {
display_name: From,
sip_mode: 'dial-in',
num_endpoints: call_transfer !== null ? 2 : 1,
codecs: {"audio": ["OPUS"]},
};
daily_room_properties.sip = sip_config;
}
@@ -172,4 +173,4 @@ export const config = {
sizeLimit: '1mb',
},
},
};
};

View File

@@ -42,7 +42,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([TTSSpeakFrame(f"<spell>Hello there!</spell>"), EndFrame()])
await task.queue_frames([TTSSpeakFrame(f"Hello there!"), EndFrame()])
runner = PipelineRunner(handle_sigint=False)

View File

@@ -40,7 +40,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
stt = OpenAISTTService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-transcribe-latest",
model="gpt-4o-transcribe",
prompt="Expect words related to dogs, such as breed names.",
)

View File

@@ -0,0 +1,111 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn import SmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
remote_smart_turn_url = os.getenv("REMOTE_SMART_TURN_URL")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_audio_passthrough=True,
turn_analyzer=SmartTurnAnalyzer(url=remote_smart_turn_url),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -0,0 +1,129 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.local_smart_turn import LocalCoreMLSmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
# To use this locally, set the environment variable LOCAL_SMART_TURN_MODEL_PATH
# to the path where the smart-turn repo is cloned.
#
# Example setup:
#
# # Git LFS (Large File Storage)
# brew install git-lfs
# # Hugging Face uses LFS to store large model files, including .mlpackage
# git lfs install
# # Clone the repo with the smart_turn_classifier.mlpackage
# git clone https://huggingface.co/pipecat-ai/smart-turn
#
# Then set the env variable:
# export LOCAL_SMART_TURN_MODEL_PATH=./smart-turn
# or add it to your .env file
smart_turn_model_path = os.getenv("LOCAL_SMART_TURN_MODEL_PATH")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_audio_passthrough=True,
turn_analyzer=LocalCoreMLSmartTurnAnalyzer(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -0,0 +1,61 @@
# SmallWebRTC and Daily
A Pipecat example demonstrating how to interoperate audio and video between `SmallWebRTCTransport` and `DailyTransport`.
## 🚀 Quick Start
### 1⃣ Start the Bot Server
#### 🔧 Set Up the Environment
1. Create and activate a virtual environment:
```bash
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Configure environment variables:
- Copy `env.example` to `.env`
```bash
cp env.example .env
```
- Add your API keys
#### ▶️ Run the Server
```bash
python server.py
```
### 1⃣ Connect the first client using Daily Prebuilt
- Open your browser and navigate to the same URL that you configured inside your `.env` file:
- `DAILY_SAMPLE_ROOM_URL`
### 2⃣ Connect the second client using SmallWebRTC Prebuilt UI
- Open your browser and navigate to:
👉 http://localhost:7860
- (Or use your custom port, if configured)
## ⚠️ Important Note
Ensure the bot server is running before using any client implementations.
## 📌 Requirements
- Python **3.10+**
- Node.js **16+** (for JavaScript components)
- Google API Key
- Modern web browser with WebRTC support
---
### 💡 Notes
- Ensure all dependencies are installed before running the server.
- Check the `.env` file for missing configurations.
- WebRTC requires a secure environment (HTTPS) for full functionality in production.
Happy coding! 🎉

View File

@@ -0,0 +1,128 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import sys
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import (
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import Frame, FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
async def run_bot(webrtc_connection):
pipecat_transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
camera_in_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,
vad_enabled=False,
),
)
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
daily_transport = DailyTransport(
room_url,
None,
"SmallWebRTC",
params=DailyParams(
camera_in_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,
vad_enabled=False,
),
)
pipeline = Pipeline(
[
ParallelPipeline(
[
daily_transport.input(),
MirrorProcessor(),
pipecat_transport.output(),
],
[
pipecat_transport.input(),
MirrorProcessor(),
daily_transport.output(),
],
)
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=False,
),
)
@daily_transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
await transport.capture_participant_video(participant["id"])
@pipecat_transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Pipecat Client connected")
@pipecat_transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Pipecat Client disconnected")
@pipecat_transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info("Pipecat Client closed")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)

View File

@@ -0,0 +1,2 @@
DAILY_API_KEY=
DAILY_SAMPLE_ROOM_URL=

View File

@@ -0,0 +1,5 @@
python-dotenv
fastapi[all]
uvicorn
aiortc
pipecat-ai[silero, webrtc, daily]

View File

@@ -0,0 +1,89 @@
import argparse
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Dict
import uvicorn
from bot import run_bot
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = ["stun:stun.l.google.com:19302"]
# Mount the frontend at /
app.mount("/prebuilt", SmallWebRTCPrebuiltUI)
@app.get("/", include_in_schema=False)
async def root_redirect():
return RedirectResponse(url="/prebuilt/")
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request["sdp"], type=request["type"], restart_pc=request.get("restart_pc", False)
)
else:
pipecat_connection = SmallWebRTCConnection(ice_servers)
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
background_tasks.add_task(run_bot, pipecat_connection)
answer = pipecat_connection.get_answer()
# Updating the peer connection inside the map
pcs_map[answer["pc_id"]] = pipecat_connection
return answer
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # Run app
coros = [pc.close() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebRTC demo")
parser.add_argument(
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
)
parser.add_argument(
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
)
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -6,14 +6,14 @@ build-backend = "setuptools.build_meta"
name = "pipecat-ai"
dynamic = ["version"]
description = "An open source framework for voice (and multimodal) assistants"
license = { text = "BSD 2-Clause License" }
license = "BSD-2-Clause"
license-files = ["LICENSE"]
readme = "README.md"
requires-python = ">=3.10"
keywords = ["webrtc", "audio", "video", "ai"]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"License :: OSI Approved :: BSD License",
"Topic :: Communications :: Conferencing",
"Topic :: Multimedia :: Sound/Audio",
"Topic :: Multimedia :: Video",
@@ -79,6 +79,8 @@ qwen = []
rime = [ "websockets~=13.1" ]
riva = [ "nvidia-riva-client~=2.19.0" ]
sentry = [ "sentry-sdk~=2.23.1" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch==2.5.0", "torchaudio==2.5.0" ]
remote-smart-turn = []
silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soundfile = [ "soundfile~=0.13.0" ]
@@ -90,9 +92,11 @@ websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ]
whisper = [ "faster-whisper~=1.1.1" ]
[tool.setuptools.packages.find]
# All the following settings are optional:
where = ["src"]
[tool.setuptools.package-data]
"pipecat" = ["py.typed"]
[tool.pytest.ini_options]
addopts = "--verbose"
testpaths = ["tests"]

View File

View File

@@ -0,0 +1,176 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import time
from abc import abstractmethod
from typing import Any, Dict, Optional, Tuple
import numpy as np
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
# Default timing parameters
STOP_SECS = 3
PRE_SPEECH_MS = 0
MAX_DURATION_SECONDS = 8 # Max allowed segment duration
USE_ONLY_LAST_VAD_SEGMENT = True
class SmartTurnParams(BaseModel):
stop_secs: float = STOP_SECS
pre_speech_ms: float = PRE_SPEECH_MS
max_duration_secs: float = MAX_DURATION_SECONDS
# not exposing this for now yet until the model can handle it.
# use_only_last_vad_segment: bool = USE_ONLY_LAST_VAD_SEGMENT
class BaseSmartTurn(BaseTurnAnalyzer):
def __init__(
self, *, sample_rate: Optional[int] = None, params: SmartTurnParams = SmartTurnParams()
):
super().__init__(sample_rate=sample_rate)
self._params = params
# Configuration
self._stop_ms = self._params.stop_secs * 1000 # silence threshold in ms
# Inference state
self._audio_buffer = []
self._speech_triggered = False
self._silence_ms = 0
self._speech_start_time = None
@property
def speech_triggered(self) -> bool:
return self._speech_triggered
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
# Convert raw audio to float32 format and append to the buffer
audio_int16 = np.frombuffer(buffer, dtype=np.int16)
audio_float32 = np.frombuffer(audio_int16, dtype=np.int16).astype(np.float32) / 32768.0
self._audio_buffer.append((time.time(), audio_float32))
state = EndOfTurnState.INCOMPLETE
if is_speech:
# Reset silence tracking on speech
self._silence_ms = 0
self._speech_triggered = True
if self._speech_start_time is None:
self._speech_start_time = time.time()
else:
if self._speech_triggered:
chunk_duration_ms = len(audio_int16) / (self._sample_rate / 1000)
self._silence_ms += chunk_duration_ms
# If silence exceeds threshold, mark end of turn
if self._silence_ms >= self._stop_ms:
logger.debug(
f"End of Turn complete due to stop_secs. Silence in ms: {self._silence_ms}"
)
state = EndOfTurnState.COMPLETE
self._clear(state)
else:
# Trim buffer to prevent unbounded growth before speech
max_buffer_time = (
(self._params.pre_speech_ms / 1000)
+ self._params.stop_secs
+ self._params.max_duration_secs
)
while (
self._audio_buffer and self._audio_buffer[0][0] < time.time() - max_buffer_time
):
self._audio_buffer.pop(0)
return state
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
state, result = self._process_speech_segment(self._audio_buffer)
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
self._clear(state)
logger.debug(f"End of Turn result: {state}")
return state, result
def _clear(self, turn_state: EndOfTurnState):
# If the state is still incomplete, keep the _speech_triggered as True
self._speech_triggered = turn_state == EndOfTurnState.INCOMPLETE
self._audio_buffer = []
self._speech_start_time = None
self._silence_ms = 0
def _process_speech_segment(self, audio_buffer) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
state = EndOfTurnState.INCOMPLETE
if not audio_buffer:
return state, None
# Extract recent audio segment for prediction
start_time = self._speech_start_time - (self._params.pre_speech_ms / 1000)
start_index = 0
for i, (t, _) in enumerate(audio_buffer):
if t >= start_time:
start_index = i
break
end_index = len(audio_buffer) - 1
# Extract the audio segment
segment_audio_chunks = [chunk for _, chunk in audio_buffer[start_index : end_index + 1]]
segment_audio = np.concatenate(segment_audio_chunks)
# Limit maximum duration
max_samples = int(self._params.max_duration_secs * self.sample_rate)
if len(segment_audio) > max_samples:
# slices the array to keep the last max_samples samples, discarding the earlier part.
segment_audio = segment_audio[-max_samples:]
result_data = None
if len(segment_audio) > 0:
start_time = time.perf_counter()
result = self._predict_endpoint(segment_audio)
state = (
EndOfTurnState.COMPLETE if result["prediction"] == 1 else EndOfTurnState.INCOMPLETE
)
end_time = time.perf_counter()
# Calculate processing time
e2e_processing_time_ms = (end_time - start_time) * 1000
# Prepare the result data
result_data = SmartTurnMetricsData(
processor="BaseSmartTurn",
is_complete=result["prediction"] == 1,
probability=result["probability"],
inference_time_ms=result.get("inference_time", 0) * 1000,
server_total_time_ms=result.get("total_time", 0) * 1000,
e2e_processing_time_ms=e2e_processing_time_ms,
)
logger.trace(f"Prediction: {'Complete' if result_data.is_complete else 'Incomplete'}")
logger.trace(f"Probability of complete: {result_data.probability:.4f}")
logger.trace(f"Inference time: {result_data.inference_time_ms:.2f}ms")
logger.trace(f"Server total time: {result_data.server_total_time_ms:.2f}ms")
logger.trace(f"E2E processing time: {result_data.e2e_processing_time_ms:.2f}ms")
else:
logger.trace(f"params: {self._params}, stop_ms: {self._stop_ms}")
logger.trace("Captured empty audio segment, skipping prediction.")
return state, result_data
@abstractmethod
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, Any]:
"""Abstract method to predict if a turn has ended based on audio.
Args:
buffer: Float32 numpy array of audio samples at 16kHz.
Returns:
Dictionary with:
- prediction: 1 if turn is complete, else 0
- probability: Confidence of the prediction
"""
pass

View File

@@ -0,0 +1,80 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional, Tuple
from pipecat.metrics.metrics import MetricsData
class EndOfTurnState(Enum):
COMPLETE = 1
INCOMPLETE = 2
class BaseTurnAnalyzer(ABC):
"""Abstract base class for analyzing user end of turn.
This class inherits from BaseObject to leverage its event handling system
while still defining an abstract interface through abstract methods.
"""
def __init__(self, *, sample_rate: Optional[int] = None):
self._init_sample_rate = sample_rate
self._sample_rate = 0
@property
def sample_rate(self) -> int:
"""Returns the current sample rate.
Returns:
int: The effective sample rate for audio processing.
"""
return self._sample_rate
def set_sample_rate(self, sample_rate: int):
"""Sets the sample rate for audio processing.
If the initial sample rate was provided, it will use that; otherwise, it sets to
the provided sample rate.
Args:
sample_rate (int): The sample rate to set.
"""
self._sample_rate = self._init_sample_rate or sample_rate
@property
@abstractmethod
def speech_triggered(self) -> bool:
"""Determines if speech has been detected.
Returns:
bool: True if speech is triggered, otherwise False.
"""
pass
@abstractmethod
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
"""Appends audio data for analysis.
Args:
buffer (bytes): The audio data to append.
is_speech (bool): Indicates whether the appended audio is speech or not.
Returns:
EndOfTurnState: The resulting state after appending the audio.
"""
pass
@abstractmethod
def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
"""Analyzes if an end of turn has occurred based on the audio input.
Returns:
EndOfTurnState: The result of the end of turn analysis.
"""
pass

View File

@@ -0,0 +1,65 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from typing import Dict
import numpy as np
import torch
from loguru import logger
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
try:
import coremltools as ct
from transformers import AutoFeatureExtractor
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use the LocalSmartTurnAnalyzer, you need to `pip install pipecat-ai[local-smart-turn]`."
)
raise Exception(f"Missing module: {e}")
class LocalCoreMLSmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, smart_turn_model_path: str, **kwargs):
super().__init__(**kwargs)
if not smart_turn_model_path:
logger.error("smart_turn_model_path is not set.")
raise Exception("smart_turn_model_path must be provided.")
core_ml_model_path = f"{smart_turn_model_path}/coreml/smart_turn_classifier.mlpackage"
logger.debug("Loading Local Smart Turn model...")
# Only load the processor, not the torch model
self._turn_processor = AutoFeatureExtractor.from_pretrained(smart_turn_model_path)
self._turn_model = ct.models.MLModel(core_ml_model_path)
logger.debug("Loaded Local Smart Turn")
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
inputs = self._turn_processor(
audio_array,
sampling_rate=16000,
padding="max_length",
truncation=True,
max_length=800, # Maximum length as specified in training
return_attention_mask=True,
return_tensors="pt",
)
output = self._turn_model.predict(dict(inputs))
logits = output["logits"] # Core ML returns numpy array
logits_tensor = torch.tensor(logits)
probabilities = torch.nn.functional.softmax(logits_tensor, dim=1)
completion_prob = probabilities[0, 1].item() # Probability of class 1 (Complete)
prediction = 1 if completion_prob > 0.5 else 0
return {
"prediction": prediction,
"probability": completion_prob,
}

View File

@@ -0,0 +1,75 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import io
import os
from typing import Dict
import numpy as np
import requests
from loguru import logger
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
class SmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, url: str, **kwargs):
super().__init__(**kwargs)
self.remote_smart_turn_url = url
if not self.remote_smart_turn_url:
logger.error("remote_smart_turn_url is not set.")
raise Exception("remote_smart_turn_url must be provided.")
# Use a session to reuse connections (keep-alive)
self.session = requests.Session()
self.session.headers.update({"Connection": "keep-alive"})
def _serialize_array(self, audio_array: np.ndarray) -> bytes:
logger.trace("Serializing NumPy array to bytes...")
buffer = io.BytesIO()
np.save(buffer, audio_array)
serialized_bytes = buffer.getvalue()
logger.trace(f"Serialized size: {len(serialized_bytes)} bytes")
return serialized_bytes
def _send_raw_request(self, data_bytes: bytes):
headers = {"Content-Type": "application/octet-stream"}
logger.trace(
f"Sending {len(data_bytes)} bytes as raw body to {self.remote_smart_turn_url}..."
)
try:
response = self.session.post(
self.remote_smart_turn_url,
data=data_bytes,
headers=headers,
timeout=60,
)
logger.trace("\n--- Response ---")
logger.trace(f"Status Code: {response.status_code}")
if response.ok:
try:
logger.trace("Response JSON:")
logger.trace(response.json())
return response.json()
except requests.exceptions.JSONDecodeError:
logger.trace("Response Content (non-JSON):")
logger.trace(response.text)
else:
logger.trace("Response Content (Error):")
logger.trace(response.text)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
raise Exception("Failed to send raw request to Daily Smart Turn.")
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
serialized_array = self._serialize_array(audio_array)
return self._send_raw_request(serialized_array)

View File

@@ -30,3 +30,13 @@ class LLMUsageMetricsData(MetricsData):
class TTSUsageMetricsData(MetricsData):
value: int
class SmartTurnMetricsData(MetricsData):
"""Metrics data for smart turn predictions."""
is_complete: bool
probability: float
inference_time_ms: float
server_total_time_ms: float
e2e_processing_time_ms: float

0
src/pipecat/py.typed Normal file
View File

View File

@@ -185,7 +185,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
async def _connect(self):
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
@@ -197,7 +198,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
async def _connect_websocket(self):
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to Cartesia")
self._websocket = await websockets.connect(
@@ -215,11 +216,11 @@ class CartesiaTTSService(AudioContextWordTTSService):
if self._websocket:
logger.debug("Disconnecting from Cartesia")
await self._websocket.close()
self._websocket = None
self._context_id = None
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._context_id = None
self._websocket = None
def _get_websocket(self):
if self._websocket:
@@ -279,7 +280,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
await self._connect()
if not self._context_id:

View File

@@ -309,10 +309,10 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
async def _connect(self):
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
if not self._keepalive_task:
if self._websocket and not self._keepalive_task:
self._keepalive_task = self.create_task(self._keepalive_task_handler())
async def _disconnect(self):
@@ -328,7 +328,7 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
async def _connect_websocket(self):
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to ElevenLabs")
@@ -375,11 +375,11 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
logger.debug("Disconnecting from ElevenLabs")
await self._websocket.send(json.dumps({"text": ""}))
await self._websocket.close()
self._websocket = None
self._started = False
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._started = False
self._websocket = None
def _get_websocket(self):
if self._websocket:
@@ -419,7 +419,7 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
await self._connect()
try:

View File

@@ -104,7 +104,8 @@ class FishAudioTTSService(InterruptibleTTSService):
async def _connect(self):
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
@@ -116,7 +117,7 @@ class FishAudioTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to Fish Audio")
@@ -141,16 +142,17 @@ class FishAudioTTSService(InterruptibleTTSService):
stop_message = {"event": "stop"}
await self._websocket.send(ormsgpack.packb(stop_message))
await self._websocket.close()
self._websocket = None
self._request_id = None
self._started = False
except Exception as e:
logger.error(f"Error closing websocket: {e}")
finally:
self._request_id = None
self._started = False
self._websocket = None
async def flush_audio(self):
"""Flush any buffered audio by sending a flush event to Fish Audio."""
logger.trace(f"{self}: Flushing audio buffers")
if not self._websocket:
if not self._websocket or self._websocket.closed:
return
flush_message = {"event": "flush"}
await self._get_websocket().send(ormsgpack.packb(flush_message))

View File

@@ -285,7 +285,7 @@ class GladiaSTTService(STTService):
settings = self._prepare_settings()
response = await self._setup_gladia(settings)
self._websocket = await websockets.connect(response["url"])
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler())
async def stop(self, frame: EndFrame):

View File

@@ -17,6 +17,8 @@ from loguru import logger
from pipecat.services.openai.llm import OpenAILLMService
try:
from google.auth import default
from google.auth.exceptions import GoogleAuthError
from google.auth.transport.requests import Request
from google.oauth2 import service_account
@@ -100,6 +102,13 @@ class GoogleVertexLLMService(OpenAILLMService):
creds = service_account.Credentials.from_service_account_file(
credentials_path, scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
else:
try:
creds, project_id = default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
except GoogleAuthError:
pass
if not creds:
raise ValueError("No valid credentials provided.")

View File

@@ -32,6 +32,8 @@ from pipecat.utils.time import time_now_iso8601
try:
from google.api_core.client_options import ClientOptions
from google.auth import default
from google.auth.exceptions import GoogleAuthError
from google.cloud import speech_v2
from google.cloud.speech_v2.types import cloud_speech
from google.oauth2 import service_account
@@ -451,6 +453,7 @@ class GoogleSTTService(STTService):
client_options = ClientOptions(api_endpoint=f"{self._location}-speech.googleapis.com")
# Extract project ID and create client
creds: Optional[service_account.Credentials] = None
if credentials:
json_account_info = json.loads(credentials)
self._project_id = json_account_info.get("project_id")
@@ -461,7 +464,16 @@ class GoogleSTTService(STTService):
self._project_id = json_account_info.get("project_id")
creds = service_account.Credentials.from_service_account_file(credentials_path)
else:
raise ValueError("Either credentials or credentials_path must be provided")
try:
creds, project_id = default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
self._project_id = project_id
except GoogleAuthError:
pass
if not creds:
raise ValueError("No valid credentials provided.")
if not self._project_id:
raise ValueError("Project ID not found in credentials")

View File

@@ -27,6 +27,8 @@ from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language
try:
from google.auth import default
from google.auth.exceptions import GoogleAuthError
from google.cloud import texttospeech_v1
from google.oauth2 import service_account
@@ -251,6 +253,16 @@ class GoogleTTSService(TTSService):
elif credentials_path:
# Use service account JSON file if provided
creds = service_account.Credentials.from_service_account_file(credentials_path)
else:
try:
creds, project_id = default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
except GoogleAuthError:
pass
if not creds:
raise ValueError("No valid credentials provided.")
return texttospeech_v1.TextToSpeechAsyncClient(credentials=creds)

View File

@@ -109,7 +109,7 @@ class LmntTTSService(InterruptibleTTSService):
async def _connect(self):
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
@@ -122,7 +122,7 @@ class LmntTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
"""Connect to LMNT websocket."""
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to LMNT")
@@ -158,11 +158,11 @@ class LmntTTSService(InterruptibleTTSService):
# errors on the websocket, so we just skip it for now.
# await self._websocket.send(json.dumps({"eof": True}))
await self._websocket.close()
self._websocket = None
self._started = False
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._started = False
self._websocket = None
def _get_websocket(self):
if self._websocket:
@@ -170,7 +170,7 @@ class LmntTTSService(InterruptibleTTSService):
raise Exception("Websocket not connected")
async def flush_audio(self):
if not self._websocket:
if not self._websocket or self._websocket.closed:
return
await self._get_websocket().send(json.dumps({"flush": True}))
@@ -203,7 +203,7 @@ class LmntTTSService(InterruptibleTTSService):
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
await self._connect()
try:

View File

@@ -106,6 +106,9 @@ class NeuphonicTTSService(InterruptibleTTSService):
self._started = False
self._cumulative_time = 0
self._receive_task = None
self._keepalive_task = None
def can_generate_metrics(self) -> bool:
return True
@@ -159,8 +162,11 @@ class NeuphonicTTSService(InterruptibleTTSService):
async def _connect(self):
await self._connect_websocket()
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
self._keepalive_task = self.create_task(self._keepalive_task_handler())
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
if self._websocket and not self._keepalive_task:
self._keepalive_task = self.create_task(self._keepalive_task_handler())
async def _disconnect(self):
if self._receive_task:
@@ -175,6 +181,9 @@ class NeuphonicTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to Neuphonic")
tts_config = {
@@ -190,7 +199,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
url = f"{self._url}/speak/{self._settings['lang_code']}?{'&'.join(query_params)}"
self._websocket = await websockets.connect(url)
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
@@ -203,11 +211,11 @@ class NeuphonicTTSService(InterruptibleTTSService):
if self._websocket:
logger.debug("Disconnecting from Neuphonic")
await self._websocket.close()
self._websocket = None
self._started = False
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._started = False
self._websocket = None
async def _receive_messages(self):
async for message in self._websocket:
@@ -235,7 +243,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
logger.debug(f"Generating TTS: [{text}]")
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
await self._connect()
try:

View File

@@ -70,7 +70,7 @@ class OpenAITTSService(TTSService):
if sample_rate and sample_rate != self.OPENAI_SAMPLE_RATE:
logger.warning(
f"OpenAI TTS only supports {self.OPENAI_SAMPLE_RATE}Hz sample rate. "
f"Current rate of {self.sample_rate}Hz may cause issues."
f"Current rate of {sample_rate}Hz may cause issues."
)
super().__init__(sample_rate=sample_rate, **kwargs)

View File

@@ -157,7 +157,7 @@ class PlayHTTTSService(InterruptibleTTSService):
async def _connect(self):
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
@@ -169,7 +169,7 @@ class PlayHTTTSService(InterruptibleTTSService):
async def _connect_websocket(self):
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
logger.debug("Connecting to PlayHT")
@@ -197,11 +197,11 @@ class PlayHTTTSService(InterruptibleTTSService):
if self._websocket:
logger.debug("Disconnecting from PlayHT")
await self._websocket.close()
self._websocket = None
self._request_id = None
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._request_id = None
self._websocket = None
async def _get_websocket_url(self):
async with aiohttp.ClientSession() as session:

View File

@@ -168,7 +168,7 @@ class RimeTTSService(AudioContextWordTTSService):
"""Establish websocket connection and start receive task."""
await self._connect_websocket()
if not self._receive_task:
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
@@ -182,7 +182,7 @@ class RimeTTSService(AudioContextWordTTSService):
async def _connect_websocket(self):
"""Connect to Rime websocket API with configured settings."""
try:
if self._websocket:
if self._websocket and self._websocket.open:
return
params = "&".join(f"{k}={v}" for k, v in self._settings.items())
@@ -201,10 +201,11 @@ class RimeTTSService(AudioContextWordTTSService):
if self._websocket:
await self._websocket.send(json.dumps(self._build_eos_msg()))
await self._websocket.close()
self._websocket = None
self._context_id = None
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
finally:
self._context_id = None
self._websocket = None
def _get_websocket(self):
"""Get active websocket connection or raise exception."""
@@ -316,7 +317,7 @@ class RimeTTSService(AudioContextWordTTSService):
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
await self._connect()
try:

View File

@@ -31,7 +31,7 @@ class WebsocketService(ABC):
bool: True if connection is verified working, False otherwise
"""
try:
if not self._websocket:
if not self._websocket or self._websocket.closed:
return False
await self._websocket.ping()
return True

View File

@@ -6,10 +6,14 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from typing import Mapping, Optional
from loguru import logger
from pipecat.audio.turn.base_turn_analyzer import (
BaseTurnAnalyzer,
EndOfTurnState,
)
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.frames.frames import (
BotInterruptionFrame,
@@ -20,6 +24,7 @@ from pipecat.frames.frames import (
FilterUpdateSettingsFrame,
Frame,
InputAudioRawFrame,
MetricsFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
@@ -28,6 +33,7 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
VADParamsUpdateFrame,
)
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
@@ -64,12 +70,20 @@ class BaseInputTransport(FrameProcessor):
def vad_analyzer(self) -> Optional[VADAnalyzer]:
return self._params.vad_analyzer
@property
def turn_analyzer(self) -> Optional[BaseTurnAnalyzer]:
return self._params.turn_analyzer
async def start(self, frame: StartFrame):
self._sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
# Configure VAD analyzer.
if self._params.vad_enabled and self._params.vad_analyzer:
self._params.vad_analyzer.set_sample_rate(self._sample_rate)
# Configure End of turn analyzer.
if self._params.turn_analyzer:
self._params.turn_analyzer.set_sample_rate(self._sample_rate)
# Start audio filter.
if self._params.audio_in_filter:
await self._params.audio_in_filter.start(self._sample_rate)
@@ -187,10 +201,18 @@ class BaseInputTransport(FrameProcessor):
and new_vad_state != VADState.STOPPING
):
frame = None
if new_vad_state == VADState.SPEAKING:
frame = UserStartedSpeakingFrame()
elif new_vad_state == VADState.QUIET:
frame = UserStoppedSpeakingFrame()
# If the turn analyser is enabled, this will prevent:
# - Creating the UserStoppedSpeakingFrame
# - Creating the UserStartedSpeakingFrame multiple times
can_create_user_frames = (
self._params.turn_analyzer is None
or not self._params.turn_analyzer.speech_triggered
)
if can_create_user_frames:
if new_vad_state == VADState.SPEAKING:
frame = UserStartedSpeakingFrame()
elif new_vad_state == VADState.QUIET:
frame = UserStoppedSpeakingFrame()
if frame:
await self._handle_user_interruption(frame)
@@ -198,6 +220,32 @@ class BaseInputTransport(FrameProcessor):
vad_state = new_vad_state
return vad_state
async def _handle_end_of_turn(self):
if self.turn_analyzer:
state, prediction = await self.get_event_loop().run_in_executor(
self._executor, self.turn_analyzer.analyze_end_of_turn
)
await self._handle_prediction_result(prediction)
await self._handle_end_of_turn_complete(state)
async def _handle_end_of_turn_complete(self, state: EndOfTurnState):
if state == EndOfTurnState.COMPLETE:
await self._handle_user_interruption(UserStoppedSpeakingFrame())
async def _run_turn_analyzer(
self, frame: InputAudioRawFrame, vad_state: VADState, previous_vad_state: VADState
):
is_speech = vad_state == VADState.SPEAKING or vad_state == VADState.STARTING
# If silence exceeds threshold, we are going to receive EndOfTurnState.COMPLETE
end_of_turn_state = self._params.turn_analyzer.append_audio(frame.audio, is_speech)
if end_of_turn_state == EndOfTurnState.COMPLETE:
await self._handle_end_of_turn_complete(end_of_turn_state)
# Otherwise we are going to trigger to check if the turn is completed based on the VAD
elif vad_state == VADState.QUIET and vad_state != previous_vad_state:
await self._handle_end_of_turn()
async def _audio_task_handler(self):
vad_state: VADState = VADState.QUIET
while True:
@@ -211,12 +259,24 @@ class BaseInputTransport(FrameProcessor):
# Check VAD and push event if necessary. We just care about
# changes from QUIET to SPEAKING and vice versa.
previous_vad_state = vad_state
if self._params.vad_enabled:
vad_state = await self._handle_vad(frame, vad_state)
audio_passthrough = self._params.vad_audio_passthrough
if self._params.turn_analyzer:
await self._run_turn_analyzer(frame, vad_state, previous_vad_state)
# Push audio downstream if passthrough.
if audio_passthrough:
await self.push_frame(frame)
self._audio_in_queue.task_done()
async def _handle_prediction_result(self, result: MetricsData):
"""Handle a prediction result event from the turn analyzer.
Args:
result: The prediction result MetricsData.
"""
await self.push_frame(MetricsFrame(data=[result]))

View File

@@ -386,10 +386,13 @@ class BaseOutputTransport(FrameProcessor):
async def _draw_image(self, frame: OutputImageRawFrame):
desired_size = (self._params.camera_out_width, self._params.camera_out_height)
# TODO: we should refactor in the future to support dynamic resolutions
# which is kind of what happens in P2P connections.
# We need to add support for that inside the DailyTransport
if frame.size != desired_size:
image = Image.frombytes(frame.format, frame.size, frame.image)
resized_image = image.resize(desired_size)
logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
frame = OutputImageRawFrame(
resized_image.tobytes(), resized_image.size, resized_image.format
)

View File

@@ -11,6 +11,7 @@ from pydantic import BaseModel, ConfigDict
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.utils.base_object import BaseObject
@@ -41,6 +42,7 @@ class TransportParams(BaseModel):
vad_enabled: bool = False
vad_audio_passthrough: bool = False
vad_analyzer: Optional[VADAnalyzer] = None
turn_analyzer: Optional[BaseTurnAnalyzer] = None
class BaseTransport(BaseObject):