diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3c198d362..73e772609 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.9.7 + rev: v0.12.1 hooks: - id: ruff language_version: python3 diff --git a/CHANGELOG.md b/CHANGELOG.md index e3ad46575..ee43fad6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,70 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- The development runner how handles custom `body` data for `DailyTransport`. + The `body` data is passed to the Pipecat client. You can POST to the `/start` + endpoint with a request body of: + + ``` + { + "createDailyRoom": true, + "dailyRoomProperties": { "start_video_off": true }, + "body": { "custom_data": "value" } + } + ``` + + The `body` information is parsed and used in the application. The + `dailyRoomProperties` are currently not handled. + +- Added detailed latency logging to `UserBotLatencyLogObserver`, capturing + average response time between user stop and bot start, as well as minimum and + maximum response latency. + +### Changed + +- The development runners `/connect` and `/start` endpoint now both return + `dailyRoom` and `dailyToken` in place of the previous `room_url` and `token`. + +- Updated the `pipecat.runner.daily` utility to only a take `DAILY_API_URL` and + `DAILY_SAMPLE_ROOM_URL` environment variables instead of argparsing `-u` and + `-k`, respectively. + +- Updated `daily-python` to 0.19.6. + +- Changed `TavusVideoService` to send audio or video frames only after the + transport is ready, preventing warning messages at startup. + +- The development runner now strips any provided protocol (e.g. https://) from + the proxy address and issues a warning. It also strips trailing `/`. + ### Fixed - Fixed an issue in `LiveKitTransport` where empty `AudioRawFrame`s were pushed down the pipeline. This resulted in warnings by the STT processor. +- Fixed `PiperTTSService` to send text as a JSON object in the request body, + resolving compatibility with Piper's HTTP API. + +- Fixed an issue with the `TavusVideoService` where an error was thrown due to + missing transcription callbacks. + +- Fixed an issue in `SpeechmaticsSTTService` where the `user_id` was set to + `None` when diarization is not enabled. + +### Performance + +- Fixed an issue in `TaskObserver` (a proxy to all observers) that was degrading + global performance. + +### Deprecated + +- In the `pipecat.runner.daily`, the `configure_with_args()` function is + deprecated. Use the `configure()` function instead. + +- The development runner's `/connect` endpoint is deprecated and will be + removed in a future version. Use the `/start` endpoint in its place. In the + meantime, both endpoints work and deliver equivalent functionality. - Added `source` field to `ErrorFrame` to indicate `FrameProcessor` that generated the error. @@ -209,6 +269,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 pushed by the `BaseInputTransport` at Start and any time a `VADParamsUpdateFrame` is received. +- Added support for Simli Trinity Avatars. A new `is_trinity_avatar` parameter + has been introduced to specify whether the provided `faceId` corresponds to a + Trinity avatar, which is required for optimal Trinity avatar performance. + ### Changed - Two package dependencies have been updated: diff --git a/README.md b/README.md index 6eca5af47..cf1263817 100644 --- a/README.md +++ b/README.md @@ -69,80 +69,80 @@ You can connect to Pipecat from any platform using our official SDKs: ## ⚑ Getting started -You can get started with Pipecat running on your local machine, then move your agent processes to the cloud when you’re ready. +You can get started with Pipecat running on your local machine, then move your agent processes to the cloud when you're ready. -```shell -# Install the module -pip install pipecat-ai +1. Install uv -# Set up your environment -cp dot-env.template .env -``` + ```bash + curl -LsSf https://astral.sh/uv/install.sh | sh + ``` -To keep things lightweight, only the core framework is included by default. If you need support for third-party AI services, you can add the necessary dependencies with: + > **Need help?** Refer to the [uv install documentation](https://docs.astral.sh/uv/getting-started/installation/). -```shell -pip install "pipecat-ai[option,...]" -``` +2. Install the module + + ```bash + # For new projects + uv init my-pipecat-app + cd my-pipecat-app + uv add pipecat-ai + + # Or for existing projects + uv add pipecat-ai + ``` + +3. Set up your environment + + ```bash + cp env.example .env + ``` + +4. To keep things lightweight, only the core framework is included by default. If you need support for third-party AI services, you can add the necessary dependencies with: + + ```bash + uv add "pipecat-ai[option,...]" + ``` + +> **Using pip?** You can still use `pip install pipecat-ai` and `pip install "pipecat-ai[option,...]"` to get set up. ## πŸ§ͺ Code examples - [Foundational](https://github.com/pipecat-ai/pipecat/tree/main/examples/foundational) β€” small snippets that build on each other, introducing one or two concepts at a time - [Example apps](https://github.com/pipecat-ai/pipecat-examples) β€” complete applications that you can use as starting points for development -## πŸ› οΈ Hacking on the framework itself +## πŸ› οΈ Contributing to the framework -1. Set up a virtual environment before following these instructions. From the root of the repo: +1. Clone the repository and navigate to it: - ```shell - python3 -m venv venv - source venv/bin/activate + ```bash + git clone https://github.com/pipecat-ai/pipecat.git + cd pipecat ``` -2. Install the development dependencies: +2. Install development and testing dependencies: - ```shell - pip install -r dev-requirements.txt + ```bash + uv sync --group dev --all-extras --no-extra krisp ``` -3. Install the git pre-commit hooks (these help ensure your code follows project rules): +3. Install the git pre-commit hooks: - ```shell - pre-commit install - ``` - -4. Install the `pipecat-ai` package locally in editable mode: - - ```shell - pip install -e . - ``` - - > The `-e` or `--editable` option allows you to modify the code without reinstalling. - -5. Include optional dependencies as needed. For example: - - ```shell - pip install -e ".[daily,deepgram,cartesia,openai,silero]" - ``` - -6. (Optional) If you want to use this package from another directory: - - ```shell - pip install "path_to_this_repo[option,...]" + ```bash + uv run pre-commit install ``` ### Running tests -Install the test dependencies: +To run all tests, from the root directory: -```shell -pip install -r test-requirements.txt +```bash +uv run pytest ``` -From the root directory, run: +Run a specific test suite: -```shell -pytest +```bash +uv run pytest tests/test_name.py ``` ### Setting up your editor diff --git a/dot-env.template b/env.example similarity index 99% rename from dot-env.template rename to env.example index 696c2ca0a..0aed0eb83 100644 --- a/dot-env.template +++ b/env.example @@ -122,7 +122,6 @@ SONIOX_API_KEY= # Speechmatics SPEECHMATICS_API_KEY=... - # SambaNova SAMBANOVA_API_KEY=... diff --git a/examples/foundational/04b-transports-livekit.py b/examples/foundational/04b-transports-livekit.py index 6594021de..cddb68b90 100644 --- a/examples/foundational/04b-transports-livekit.py +++ b/examples/foundational/04b-transports-livekit.py @@ -9,7 +9,6 @@ import json import os import sys -from deepgram import LiveOptions from dotenv import load_dotenv from loguru import logger @@ -51,12 +50,7 @@ async def main(): ), ) - stt = DeepgramSTTService( - api_key=os.getenv("DEEPGRAM_API_KEY"), - live_options=LiveOptions( - vad_events=True, - ), - ) + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) @@ -78,20 +72,20 @@ async def main(): context = OpenAILLMContext(messages) context_aggregator = llm.create_context_aggregator(context) - runner = PipelineRunner() + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) task = PipelineTask( - Pipeline( - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ], - ), + pipeline, params=PipelineParams( enable_metrics=True, enable_usage_metrics=True, @@ -132,6 +126,8 @@ async def main(): ], ) + runner = PipelineRunner() + await runner.run(task) diff --git a/examples/foundational/07b-interruptible-langchain.py b/examples/foundational/07b-interruptible-langchain.py index c6833888c..aaca623dd 100644 --- a/examples/foundational/07b-interruptible-langchain.py +++ b/examples/foundational/07b-interruptible-langchain.py @@ -25,6 +25,8 @@ from pipecat.processors.aggregators.llm_response import ( LLMUserResponseAggregator, ) from pipecat.processors.frameworks.langchain import LangchainProcessor +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.transports.base_transport import BaseTransport, TransportParams diff --git a/examples/foundational/13b-deepgram-transcription.py b/examples/foundational/13b-deepgram-transcription.py index 7eba63b29..cde1e3629 100644 --- a/examples/foundational/13b-deepgram-transcription.py +++ b/examples/foundational/13b-deepgram-transcription.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import argparse import os from dotenv import load_dotenv @@ -15,6 +14,8 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport from pipecat.services.deepgram.stt import DeepgramSTTService, Language, LiveOptions from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams diff --git a/examples/foundational/13c-gladia-translation.py b/examples/foundational/13c-gladia-translation.py index 9b7f8ab6f..81a524158 100644 --- a/examples/foundational/13c-gladia-translation.py +++ b/examples/foundational/13c-gladia-translation.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import argparse import os from dotenv import load_dotenv @@ -15,6 +14,8 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport from pipecat.services.gladia.config import ( GladiaInputParams, LanguageConfig, diff --git a/examples/foundational/13d-assemblyai-transcription.py b/examples/foundational/13d-assemblyai-transcription.py index 8ada794cb..a00e21f5e 100644 --- a/examples/foundational/13d-assemblyai-transcription.py +++ b/examples/foundational/13d-assemblyai-transcription.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import argparse import os from dotenv import load_dotenv @@ -15,6 +14,8 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport from pipecat.services.assemblyai.stt import AssemblyAISTTService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams diff --git a/examples/foundational/13h-speechmatics-transcription.py b/examples/foundational/13h-speechmatics-transcription.py index 173d0641c..92c8d4f91 100644 --- a/examples/foundational/13h-speechmatics-transcription.py +++ b/examples/foundational/13h-speechmatics-transcription.py @@ -49,7 +49,7 @@ async def run_bot(transport: BaseTransport): This example will use diarization within our STT service and output the words spoken by each individual speaker and wrap them with XML tags. - If you do not wish to use diarization, then set the `enable_speaker_diarization` parameter + If you do not wish to use diarization, then set the `enable_diarization` parameter to `False` or omit it altogether. The `text_format` will only be used if diarization is enabled. By default, this example will use our ENHANCED operating point, which is optimized for diff --git a/examples/foundational/22d-natural-conversation-gemini-audio.py b/examples/foundational/22d-natural-conversation-gemini-audio.py index ef57a27c9..d600c1dcd 100644 --- a/examples/foundational/22d-natural-conversation-gemini-audio.py +++ b/examples/foundational/22d-natural-conversation-gemini-audio.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import argparse import asyncio import os import time @@ -44,6 +43,8 @@ from pipecat.processors.aggregators.openai_llm_context import ( ) from pipecat.processors.filters.function_filter import FunctionFilter from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.google.llm import GoogleLLMContext, GoogleLLMService from pipecat.sync.base_notifier import BaseNotifier diff --git a/examples/foundational/27-simli-layer.py b/examples/foundational/27-simli-layer.py index 97eba7bba..dbf6dc3ea 100644 --- a/examples/foundational/27-simli-layer.py +++ b/examples/foundational/27-simli-layer.py @@ -63,7 +63,7 @@ async def run_bot(transport: BaseTransport): ) simli_ai = SimliVideoService( - SimliConfig(os.getenv("SIMLI_API_KEY"), os.getenv("SIMLI_FACE_ID")) + SimliConfig(os.getenv("SIMLI_API_KEY"), os.getenv("SIMLI_FACE_ID")), ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini") diff --git a/examples/foundational/43a-heygen-video-service.py b/examples/foundational/43a-heygen-video-service.py index 619d6ab09..3e67b8672 100644 --- a/examples/foundational/43a-heygen-video-service.py +++ b/examples/foundational/43a-heygen-video-service.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # - import os import aiohttp diff --git a/examples/foundational/README.md b/examples/foundational/README.md index 16e3ce42f..91631017b 100644 --- a/examples/foundational/README.md +++ b/examples/foundational/README.md @@ -2,70 +2,71 @@ This directory contains examples showing how to build voice and multimodal agents with Pipecat. Each example demonstrates specific features, progressing from basic to advanced concepts. -## Learning Paths +## Setup -Depending on what you're trying to build, these learning paths will guide you through relevant examples: - -- **New to Pipecat**: Start with examples [01](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/01-say-one-thing.py), [02](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/02-llm-say-one-thing.py), [07](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/07-interruptible.py) -- **Building conversational bots**: [07](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/07-interruptible.py), [10](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/10-wake-phrase.py), [38](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/38-smart-turn-fal.py) -- **Common add-on capabilities**: [17](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/17-detect-user-idle.py), [24](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/24-stt-mute-filter.py), [28](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/28-transcription-processor.py), [34](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/34-audio-recording.py) -- **Adding visual capabilities**: [03](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/03-still-frame.py), [12](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/12a-describe-video-gemini-flash.py), [26](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/26c-gemini-multimodal-live-video.py) -- **Advanced agent capabilities**: [14](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/14-function-calling.py), [20](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/20a-persistent-context-openai.py), [37](https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/37-mem0.py) - -## Quick Start - -1. Set up a virtual environment: +1. Make sure you have uv installed: ```bash - python -m venv venv - source venv/bin/activate # On Windows: venv\Scripts\activate + curl -LsSf https://astral.sh/uv/install.sh | sh ``` -2. Install dependencies: + > **Need help?** Refer to the [uv install documentation](https://docs.astral.sh/uv/getting-started/installation/). + +2. Create a venv and install example dependencies: ```bash - pip install -r requirements.txt + uv sync --all-extras --no-extra krisp ``` -3. Create a `.env` file with your API keys. - -4. Run any example: +3. Create a `.env` file with your API keys: ```bash - python 01-say-one-thing.py + cp env.example .env + # Edit .env with your API keys ``` -5. Open the web interface at http://localhost:7860 and click "Connect" +4. Navigate to the examples directory: + + ```bash + cd examples/foundational + ``` + +5. Run any example: + + ```bash + uv run python 01-say-one-thing.py + ``` + +6. Open the web interface at http://localhost:7860/client/ and click "Connect" ## Running examples with other transports -It is possible to run most of the examples with other transports such as Twilio or Daily. +Most examples support running with other transports, like Twilio or Daily. ### Daily You need to create a Daily account at https://dashboard.daily.co/u/signup. Once signed up, you can create your own room from the dashboard and set the environment variables `DAILY_SAMPLE_ROOM_URL` and `DAILY_API_KEY`. Alternatively, you can let the example create a room for you (still needs `DAILY_API_KEY` environment variable). Then, start any example with `-t daily`: ```bash -python 07-interruptible.py -t daily +uv run 07-interruptible.py -t daily ``` ### Twilio -It is also possible to run the example through a Twilio phone number. You will -need to setup a few things: +It is also possible to run the example through a Twilio phone number. You will need to setup a few things: 1. Install and run [ngrok](https://ngrok.com/download). - ```bash - ngrok http 7860 - ``` +```bash +ngrok http 7860 +``` 2. Configure your Twilio phone number. One way is to setup a TwiML app and set the request URL to the ngrok URL from step (1). Then, set your phone number to use the new TwiML app. Then, run the example with: ```bash -python 07-interruptible.py -t twilio -x NGROK_HOST_NAME (no protocol) +uv run 07-interruptible.py -t twilio -x NGROK_HOST_NAME ``` ## Examples by Feature @@ -133,21 +134,18 @@ python 07-interruptible.py -t twilio -x NGROK_HOST_NAME (no protocol) - **[16-gpu-container-local-bot.py](./16-gpu-container-local-bot.py)**: GPU-accelerated local bot (Performance measurement) -### Utilities - ## Advanced Usage ### Customizing Network Settings ```bash -python --host 0.0.0.0 --port 8080 +uv run python --host 0.0.0.0 --port 8080 ``` ### Troubleshooting - **No audio/video**: Check browser permissions for microphone and camera - **Connection errors**: Verify API keys in `.env` file -- **Missing dependencies**: Run `pip install -r requirements.txt` - **Port conflicts**: Use `--port` to change the port -For more examples, visit our [GitHub repository](https://github.com/pipecat-ai/pipecat/tree/main/examples). +For more examples, visit our the [`pipecat-examples repository](https://github.com/pipecat-ai/pipecat-examples). diff --git a/examples/foundational/requirements.txt b/examples/foundational/requirements.txt deleted file mode 100644 index 2f0c97b37..000000000 --- a/examples/foundational/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -pipecat-ai[webrtc,websocket,daily,deepgram,cartesia,silero,runner]>=0.0.77 \ No newline at end of file diff --git a/examples/quickstart/bot.py b/examples/quickstart/bot.py index 503965e98..11ab59829 100644 --- a/examples/quickstart/bot.py +++ b/examples/quickstart/bot.py @@ -26,7 +26,14 @@ import os from dotenv import load_dotenv from loguru import logger +print("πŸš€ Starting Pipecat bot...") +print("⏳ Loading AI models (30-40 seconds first run, <2 seconds after)\n") + +logger.info("Loading Silero VAD model...") from pipecat.audio.vad.silero import SileroVADAnalyzer + +logger.info("βœ… Silero VAD model loaded") +logger.info("Loading pipeline components...") from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -37,8 +44,14 @@ from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport, TransportParams + +logger.info("βœ… Pipeline components loaded") + +logger.info("Loading WebRTC transport...") from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +logger.info("βœ… All components loaded successfully!") + load_dotenv(override=True) diff --git a/pyproject.toml b/pyproject.toml index 7f9ca0c1f..e7e4eb7c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ azure = [ "azure-cognitiveservices-speech~=1.42.0"] cartesia = [ "cartesia~=2.0.3", "websockets>=13.1,<15.0" ] cerebras = [] deepseek = [] -daily = [ "daily-python~=0.19.5" ] +daily = [ "daily-python~=0.19.6" ] deepgram = [ "deepgram-sdk~=4.7.0" ] elevenlabs = [ "websockets>=13.1,<15.0" ] fal = [ "fal-client~=0.5.9" ] diff --git a/scripts/evals/README.md b/scripts/evals/README.md index b67d5d75b..74f1b2e27 100644 --- a/scripts/evals/README.md +++ b/scripts/evals/README.md @@ -32,7 +32,7 @@ also explains why it thinks the answer is valid or invalid. To run the release evals: ```sh -python run-release-evals.py -a -v +uv run run-release-evals.py -a -v ``` This runs all the evals and stores logs and audio (`-a`) for each test. @@ -41,7 +41,7 @@ You can also specify which tests to run. For example, to run all `07` series tests: ```sh -python run-release-evals.py -p 07 -a -v +uv run run-release-evals.py -p 07 -a -v ``` ## Script Evals @@ -49,7 +49,7 @@ python run-release-evals.py -p 07 -a -v You can also run evals for a single example (not part of the release set): ```sh -python run-eval.py -p "A simple math addition" -a -v YOUR_EXAMPLE_SCRIPT +uv run run-eval.py -p "A simple math addition" -a -v YOUR_EXAMPLE_SCRIPT ``` Your script needs to follow any of the foundation examples pattern. diff --git a/scripts/evals/eval.py b/scripts/evals/eval.py index 71d5c9e9e..fba701f9c 100644 --- a/scripts/evals/eval.py +++ b/scripts/evals/eval.py @@ -176,7 +176,7 @@ async def run_example_pipeline(script_path: Path): ), ) - await module.run_example(transport, argparse.Namespace(), True) + await module.run_bot(transport) async def run_eval_pipeline( diff --git a/scripts/evals/run-release-evals.py b/scripts/evals/run-release-evals.py index 3cf844735..6f877c80f 100644 --- a/scripts/evals/run-release-evals.py +++ b/scripts/evals/run-release-evals.py @@ -102,6 +102,10 @@ TESTS_19 = [ ("19a-azure-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER), ] +TESTS_21 = [ + ("21a-tavus-video-service.py", PROMPT_SIMPLE_MATH, None), +] + TESTS_26 = [ ("26-gemini-multimodal-live.py", PROMPT_SIMPLE_MATH, None), ("26a-gemini-multimodal-live-transcription.py", PROMPT_SIMPLE_MATH, None), @@ -112,16 +116,27 @@ TESTS_26 = [ # ("26d-gemini-multimodal-live-text.py", PROMPT_SIMPLE_MATH, None), ] +TESTS_27 = [ + ("27-simli-layer.py", PROMPT_SIMPLE_MATH, None), +] + TESTS_40 = [ ("40-aws-nova-sonic.py", PROMPT_SIMPLE_MATH, None), ] +TESTS_43 = [ + ("43a-heygen-video-service.py", PROMPT_SIMPLE_MATH, None), +] + TESTS = [ *TESTS_07, *TESTS_14, *TESTS_19, + *TESTS_21, *TESTS_26, + *TESTS_27, *TESTS_40, + *TESTS_43, ] diff --git a/src/pipecat/observers/loggers/user_bot_latency_log_observer.py b/src/pipecat/observers/loggers/user_bot_latency_log_observer.py index eb699f2bb..505a0a807 100644 --- a/src/pipecat/observers/loggers/user_bot_latency_log_observer.py +++ b/src/pipecat/observers/loggers/user_bot_latency_log_observer.py @@ -7,11 +7,14 @@ """Observer for measuring user-to-bot response latency.""" import time +from statistics import mean from loguru import logger from pipecat.frames.frames import ( BotStartedSpeakingFrame, + CancelFrame, + EndFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) @@ -35,6 +38,7 @@ class UserBotLatencyLogObserver(BaseObserver): super().__init__() self._processed_frames = set() self._user_stopped_time = 0 + self._latencies = [] async def on_push_frame(self, data: FramePushed): """Process frames to track speech timing and calculate latency. @@ -56,6 +60,18 @@ class UserBotLatencyLogObserver(BaseObserver): self._user_stopped_time = 0 elif isinstance(data.frame, UserStoppedSpeakingFrame): self._user_stopped_time = time.time() + elif isinstance(data.frame, (EndFrame, CancelFrame)): + if self._latencies: + avg_latency = mean(self._latencies) + min_latency = min(self._latencies) + max_latency = max(self._latencies) + logger.info( + f"⏱️ LATENCY FROM USER STOPPED SPEAKING TO BOT STARTED SPEAKING - Avg: {avg_latency:.3f}s, Min: {min_latency:.3f}s, Max: {max_latency:.3f}s" + ) elif isinstance(data.frame, BotStartedSpeakingFrame) and self._user_stopped_time: latency = time.time() - self._user_stopped_time - logger.debug(f"⏱️ LATENCY FROM USER STOPPED SPEAKING TO BOT STARTED SPEAKING: {latency}") + self._user_stopped_time = 0 + self._latencies.append(latency) + logger.debug( + f"⏱️ LATENCY FROM USER STOPPED SPEAKING TO BOT STARTED SPEAKING: {latency:.3f}s" + ) diff --git a/src/pipecat/pipeline/task_observer.py b/src/pipecat/pipeline/task_observer.py index cd46f85ef..532ef7977 100644 --- a/src/pipecat/pipeline/task_observer.py +++ b/src/pipecat/pipeline/task_observer.py @@ -153,22 +153,23 @@ class TaskObserver(BaseObserver): async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver): """Handle frame processing for a single observer.""" - warning_reported = False + on_push_frame_deprecated = False + signature = inspect.signature(observer.on_push_frame) + if len(signature.parameters) > 1: + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Observer `on_push_frame(source, destination, frame, direction, timestamp)` is deprecated, us `on_push_frame(data: FramePushed)` instead.", + DeprecationWarning, + ) + + on_push_frame_deprecated = True + while True: data = await queue.get() - - signature = inspect.signature(observer.on_push_frame) - if len(signature.parameters) > 1: - if not warning_reported: - import warnings - - with warnings.catch_warnings(): - warnings.simplefilter("always") - warnings.warn( - "Observer `on_push_frame(source, destination, frame, direction, timestamp)` is deprecated, us `on_push_frame(data: FramePushed)` instead.", - DeprecationWarning, - ) - warning_reported = True + if on_push_frame_deprecated: await observer.on_push_frame( data.src, data.dst, data.frame, data.direction, data.timestamp ) diff --git a/src/pipecat/runner/daily.py b/src/pipecat/runner/daily.py index 3fae393fe..33e3d8066 100644 --- a/src/pipecat/runner/daily.py +++ b/src/pipecat/runner/daily.py @@ -7,17 +7,14 @@ """Daily room and token configuration utilities. This module provides helper functions for creating and configuring Daily rooms -and authentication tokens. It handles both command-line argument parsing and -environment variable configuration. +and authentication tokens. It automatically creates temporary rooms for +development or uses existing rooms specified via environment variables. -The module supports creating temporary rooms for development or using existing -rooms specified via arguments or environment variables. +Environment variables: -Required environment variables: - -- DAILY_API_KEY - Daily API key for room/token creation -- DAILY_SAMPLE_ROOM_URL (optional) - Existing room URL to use -- DAILY_SAMPLE_ROOM_TOKEN (optional) - Existing token to use +- DAILY_API_KEY - Daily API key for room/token creation (required) +- DAILY_SAMPLE_ROOM_URL (optional) - Existing room URL to use. If not provided, + a temporary room will be created automatically. Example:: @@ -29,17 +26,26 @@ Example:: # Use room_url and token with DailyTransport """ -import argparse import os -from typing import Optional +import time +import uuid +from typing import Tuple import aiohttp -from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper +from pipecat.transports.services.helpers.daily_rest import ( + DailyRESTHelper, + DailyRoomParams, + DailyRoomProperties, +) -async def configure(aiohttp_session: aiohttp.ClientSession): - """Configure Daily room URL and token from arguments or environment. +async def configure(aiohttp_session: aiohttp.ClientSession) -> Tuple[str, str]: + """Configure Daily room URL and token from environment variables. + + This function will either: + 1. Use an existing room URL from DAILY_SAMPLE_ROOM_URL environment variable + 2. Create a new temporary room automatically if no URL is provided Args: aiohttp_session: HTTP session for making API requests. @@ -48,65 +54,79 @@ async def configure(aiohttp_session: aiohttp.ClientSession): Tuple containing the room URL and authentication token. Raises: - Exception: If room URL or API key are not provided. + Exception: If DAILY_API_KEY is not provided in environment variables. """ - (url, token, _) = await configure_with_args(aiohttp_session) - return (url, token) - - -async def configure_with_args( - aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None -): - """Configure Daily room with command-line argument parsing. - - Args: - aiohttp_session: HTTP session for making API requests. - parser: Optional argument parser. If None, creates a default one. - - Returns: - Tuple containing room URL, authentication token, and parsed arguments. - - Raises: - Exception: If room URL or API key are not provided via arguments or environment. - """ - if not parser: - parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") - parser.add_argument( - "-u", "--url", type=str, required=False, help="URL of the Daily room to join" - ) - parser.add_argument( - "-k", - "--apikey", - type=str, - required=False, - help="Daily API Key (needed to create an owner token for the room)", - ) - - args, unknown = parser.parse_known_args() - - url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL") - key = args.apikey or os.getenv("DAILY_API_KEY") - - if not url: + # Check for required API key + api_key = os.getenv("DAILY_API_KEY") + if not api_key: raise Exception( - "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL." + "DAILY_API_KEY environment variable is required. " + "Get your API key from https://dashboard.daily.co/developers" ) - if not key: - raise Exception( - "No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers." - ) + # Check for existing room URL + existing_room_url = os.getenv("DAILY_SAMPLE_ROOM_URL") daily_rest_helper = DailyRESTHelper( - daily_api_key=key, + daily_api_key=api_key, daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), aiohttp_session=aiohttp_session, ) - # Create a meeting token for the given room with an expiration 2 hours in - # the future. + if existing_room_url: + # Use existing room + print(f"Using existing Daily room: {existing_room_url}") + room_url = existing_room_url + else: + # Create a new temporary room + room_name = f"pipecat-{uuid.uuid4().hex[:8]}" + print(f"Creating new Daily room: {room_name}") + + # Calculate expiration time: current time + 2 hours + expiration_time = time.time() + (2 * 60 * 60) # 2 hours from now + + # Create room properties with absolute timestamp + room_properties = DailyRoomProperties( + exp=expiration_time, # Absolute Unix timestamp + eject_at_room_exp=True, + ) + + # Create room parameters + room_params = DailyRoomParams(name=room_name, properties=room_properties) + + room_response = await daily_rest_helper.create_room(room_params) + room_url = room_response.url + print(f"Created Daily room: {room_url}") + + # Create a meeting token for the room with an expiration 2 hours in the future expiry_time: float = 2 * 60 * 60 + token = await daily_rest_helper.get_token(room_url, expiry_time) - token = await daily_rest_helper.get_token(url, expiry_time) + return (room_url, token) - return (url, token, args) + +# Keep this for backwards compatibility, but mark as deprecated +async def configure_with_args(aiohttp_session: aiohttp.ClientSession, parser=None): + """Configure Daily room with command-line argument parsing. + + .. deprecated:: 0.0.78 + This function is deprecated. Use configure() instead which uses + environment variables only. + + Args: + aiohttp_session: HTTP session for making API requests. + parser: Ignored. Kept for backwards compatibility. + + Returns: + Tuple containing room URL, authentication token, and None (for args). + """ + import warnings + + warnings.warn( + "configure_with_args is deprecated. Use configure() instead.", + DeprecationWarning, + stacklevel=2, + ) + + room_url, token = await configure(aiohttp_session) + return (room_url, token, None) diff --git a/src/pipecat/runner/run.py b/src/pipecat/runner/run.py index 3d75b563e..77902adfd 100644 --- a/src/pipecat/runner/run.py +++ b/src/pipecat/runner/run.py @@ -82,7 +82,7 @@ from pipecat.runner.types import ( try: import uvicorn from dotenv import load_dotenv - from fastapi import BackgroundTasks, FastAPI, WebSocket + from fastapi import BackgroundTasks, FastAPI, Request, WebSocket from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, RedirectResponse except ImportError as e: @@ -261,17 +261,43 @@ def _setup_daily_routes(app: FastAPI): async with aiohttp.ClientSession() as session: room_url, token = await configure(session) - # Start the bot in the background + # Start the bot in the background with empty body for GET requests bot_module = _get_bot_module() runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={}) asyncio.create_task(bot_module.bot(runner_args)) return RedirectResponse(room_url) - @app.post("/connect") - async def rtvi_connect(): - """Launch a Daily bot and return connection info for RTVI clients.""" + async def _handle_rtvi_request(request: Request): + """Common handler for both /start and /connect endpoints. + + Expects POST body like:: + + { + "createDailyRoom": true, + "dailyRoomProperties": { "start_video_off": true }, + "body": { "custom_data": "value" } + } + """ print("Starting bot with Daily transport") + # Parse the request body + try: + request_data = await request.json() + logger.debug(f"Received request: {request_data}") + except Exception as e: + logger.error(f"Failed to parse request body: {e}") + request_data = {} + + # Extract the body data that should be passed to the bot + # This mimics Pipecat Cloud's behavior + bot_body = request_data.get("body", {}) + + # Log the extracted body data for debugging + if bot_body: + logger.info(f"Extracted body data for bot: {bot_body}") + else: + logger.debug("No body data provided in request") + import aiohttp from pipecat.runner.daily import configure @@ -279,11 +305,30 @@ def _setup_daily_routes(app: FastAPI): async with aiohttp.ClientSession() as session: room_url, token = await configure(session) - # Start the bot in the background + # Start the bot in the background with extracted body data bot_module = _get_bot_module() - runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={}) + runner_args = DailyRunnerArguments(room_url=room_url, token=token, body=bot_body) asyncio.create_task(bot_module.bot(runner_args)) - return {"room_url": room_url, "token": token} + # Match PCC /start endpoint response format: + return {"dailyRoom": room_url, "dailyToken": token} + + @app.post("/start") + async def rtvi_start(request: Request): + """Launch a Daily bot and return connection info for RTVI clients.""" + return await _handle_rtvi_request(request) + + @app.post("/connect") + async def rtvi_connect(request: Request): + """Launch a Daily bot and return connection info for RTVI clients. + + .. deprecated:: 0.0.78 + Use /start instead. This endpoint will be removed in a future version. + """ + logger.warning( + "DEPRECATED: /connect endpoint is deprecated. Please use /start instead. " + "This endpoint will be removed in a future version." + ) + return await _handle_rtvi_request(request) def _setup_telephony_routes(app: FastAPI, transport_type: str, proxy: str): @@ -345,6 +390,7 @@ async def _run_daily_direct(): async with aiohttp.ClientSession() as session: room_url, token = await configure(session) + # Direct connections have no request body, so use empty dict runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={}) # Get the bot module and run it directly @@ -357,6 +403,27 @@ async def _run_daily_direct(): await bot_module.bot(runner_args) +def _validate_and_clean_proxy(proxy: str) -> str: + """Validate and clean proxy hostname, removing protocol if present.""" + if not proxy: + return proxy + + original_proxy = proxy + + # Strip common protocols + if proxy.startswith(("http://", "https://")): + proxy = proxy.split("://", 1)[1] + logger.warning( + f"Removed protocol from proxy URL. Using '{proxy}' instead of '{original_proxy}'. " + f"The --proxy argument expects only the hostname (e.g., 'mybot.ngrok.io')." + ) + + # Remove trailing slashes + proxy = proxy.rstrip("/") + + return proxy + + def main(): """Start the Pipecat development runner. @@ -408,6 +475,10 @@ def main(): args = parser.parse_args() + # Validate and clean proxy hostname + if args.proxy: + args.proxy = _validate_and_clean_proxy(args.proxy) + # Auto-set transport to daily if --direct is used without explicit transport if args.direct and args.transport == "webrtc": # webrtc is the default args.transport = "daily" @@ -438,17 +509,16 @@ def main(): if args.transport == "webrtc": print() if args.esp32: - print( - f"πŸš€ WebRTC server starting at http://{args.host}:{args.port}/client (ESP32 mode)" - ) + print(f"πŸš€ Bot ready! (ESP32 mode)") + print(f" β†’ Open http://{args.host}:{args.port}/client in your browser") else: - print(f"πŸš€ WebRTC server starting at http://{args.host}:{args.port}/client") - print(f" Open this URL in your browser to connect!") + print(f"πŸš€ Bot ready!") + print(f" β†’ Open http://{args.host}:{args.port}/client in your browser") print() elif args.transport == "daily": print() - print(f"πŸš€ Daily server starting at http://{args.host}:{args.port}") - print(f" Open this URL in your browser to start a session!") + print(f"πŸš€ Bot ready!") + print(f" β†’ Open http://{args.host}:{args.port} in your browser to start a session") print() # Create the app with transport-specific setup diff --git a/src/pipecat/services/cartesia/tts.py b/src/pipecat/services/cartesia/tts.py index c4ea7c31c..c731a6984 100644 --- a/src/pipecat/services/cartesia/tts.py +++ b/src/pipecat/services/cartesia/tts.py @@ -34,6 +34,10 @@ from pipecat.utils.text.base_text_aggregator import BaseTextAggregator from pipecat.utils.text.skip_tags_aggregator import SkipTagsAggregator from pipecat.utils.tracing.service_decorators import traced_tts +# Suppress regex warnings from pydub (used by cartesia) +warnings.filterwarnings("ignore", message="invalid escape sequence", category=SyntaxWarning) + + # See .env.example for Cartesia configuration needed try: from cartesia import AsyncCartesia diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 238fdd5db..04e6853eb 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -983,17 +983,14 @@ class GeminiMultimodalLiveLLMService(LLMService): with audio and video inputs, preventing temporal misalignment that can occur when different modalities are processed through separate API pathways. - After sending the text, we signal turn completion to trigger a model response - for text-only interactions. + For realtimeInput, turn completion is automatically inferred by the API based + on user activity, so no explicit turnComplete signal is needed. Args: text: The text to send as user input. """ evt = events.TextInputMessage.from_text(text) await self.send_client_event(evt) - # After sending text, we need to signal that the turn is complete. - evt = events.ClientContentMessage.model_validate({"clientContent": {"turnComplete": True}}) - await self.send_client_event(evt) async def _send_user_video(self, frame): """Send user video frame to Gemini Live API.""" diff --git a/src/pipecat/services/llm_service.py b/src/pipecat/services/llm_service.py index f3e2b843a..c3ad69e0e 100644 --- a/src/pipecat/services/llm_service.py +++ b/src/pipecat/services/llm_service.py @@ -25,7 +25,6 @@ from loguru import logger from pipecat.adapters.base_llm_adapter import BaseLLMAdapter from pipecat.adapters.schemas.direct_function import DirectFunction, DirectFunctionWrapper -from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.services.open_ai_adapter import OpenAILLMAdapter from pipecat.frames.frames import ( CancelFrame, @@ -108,6 +107,7 @@ class FunctionCallRegistryItem: function_name: Optional[str] handler: FunctionCallHandler | "DirectFunctionWrapper" cancel_on_interruption: bool + handler_deprecated: bool @dataclass @@ -282,12 +282,25 @@ class LLMService(AIService): cancel_on_interruption: Whether to cancel this function call when an interruption occurs. Defaults to True. """ + signature = inspect.signature(handler) + handler_deprecated = len(signature.parameters) > 1 + if handler_deprecated: + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Function calls with parameters `(function_name, tool_call_id, arguments, llm, context, result_callback)` are deprecated, use a single `FunctionCallParams` parameter instead.", + DeprecationWarning, + ) + # Registering a function with the function_name set to None will run # that handler for all functions self._functions[function_name] = FunctionCallRegistryItem( function_name=function_name, handler=handler, cancel_on_interruption=cancel_on_interruption, + handler_deprecated=handler_deprecated, ) # Start callbacks are now deprecated. @@ -325,6 +338,7 @@ class LLMService(AIService): function_name=wrapper.name, handler=wrapper, cancel_on_interruption=cancel_on_interruption, + handler_deprecated=False, ) def unregister_function(self, function_name: Optional[str]): @@ -552,17 +566,7 @@ class LLMService(AIService): ) else: # Handler is a FunctionCallHandler - signature = inspect.signature(item.handler) - if len(signature.parameters) > 1: - import warnings - - with warnings.catch_warnings(): - warnings.simplefilter("always") - warnings.warn( - "Function calls with parameters `(function_name, tool_call_id, arguments, llm, context, result_callback)` are deprecated, use a single `FunctionCallParams` parameter instead.", - DeprecationWarning, - ) - + if item.handler_deprecated: await item.handler( runner_item.function_name, runner_item.tool_call_id, diff --git a/src/pipecat/services/piper/tts.py b/src/pipecat/services/piper/tts.py index 7892e15fa..d5b663c77 100644 --- a/src/pipecat/services/piper/tts.py +++ b/src/pipecat/services/piper/tts.py @@ -84,7 +84,9 @@ class PiperTTSService(TTSService): try: await self.start_ttfb_metrics() - async with self._session.post(self._base_url, json=text, headers=headers) as response: + async with self._session.post( + self._base_url, json={"text": text}, headers=headers + ) as response: if response.status != 200: error = await response.text() logger.error( diff --git a/src/pipecat/services/simli/video.py b/src/pipecat/services/simli/video.py index 6ba7da4b8..d398a87f3 100644 --- a/src/pipecat/services/simli/video.py +++ b/src/pipecat/services/simli/video.py @@ -18,6 +18,8 @@ from pipecat.frames.frames import ( OutputImageRawFrame, StartInterruptionFrame, TTSAudioRawFrame, + TTSStoppedFrame, + UserStartedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, StartFrame from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator @@ -45,24 +47,39 @@ class SimliVideoService(FrameProcessor): simli_config: SimliConfig, use_turn_server: bool = False, latency_interval: int = 0, + simli_url: str = "https://api.simli.ai", + is_trinity_avatar: bool = False, ): """Initialize the Simli video service. Args: simli_config: Configuration object for Simli client settings. use_turn_server: Whether to use TURN server for connection. Defaults to False. - latency_interval: Latency interval setting for video processing. Defaults to 0. + latency_interval: Latency interval setting for sending health checks to check the latency to Simli Servers. Defaults to 0. + simli_url: URL of the simli servers. Can be changed for custom deployments of enterprise users. + is_trinity_avatar: boolean to tell simli client that this is a Trinity avatar which reduces latency when using Trinity. + """ super().__init__() - self._simli_client = SimliClient(simli_config, use_turn_server, latency_interval) + self._initialized = False + simli_config.maxIdleTime += 5 + simli_config.maxSessionLength += 5 + self._simli_client = SimliClient( + simli_config, + use_turn_server, + latency_interval, + simliURL=simli_url, + ) - self._pipecat_resampler_event = asyncio.Event() self._pipecat_resampler: AudioResampler = None + self._pipecat_resampler_event = asyncio.Event() self._simli_resampler = AudioResampler("s16", "mono", 16000) - self._initialized = False self._audio_task: asyncio.Task = None self._video_task: asyncio.Task = None + self._is_trinity_avatar = is_trinity_avatar + self._previously_interrupted = is_trinity_avatar + self._audio_buffer = bytearray() async def _start_connection(self): """Start the connection to Simli service and begin processing tasks.""" @@ -71,11 +88,9 @@ class SimliVideoService(FrameProcessor): self._initialized = True # Create task to consume and process audio and video - if not self._audio_task: - self._audio_task = self.create_task(self._consume_and_process_audio()) - - if not self._video_task: - self._video_task = self.create_task(self._consume_and_process_video()) + await self._simli_client.sendSilence() + self._audio_task = self.create_task(self._consume_and_process_audio()) + self._video_task = self.create_task(self._consume_and_process_video()) async def _consume_and_process_audio(self): """Consume audio frames from Simli and push them downstream.""" @@ -118,7 +133,6 @@ class SimliVideoService(FrameProcessor): """ await super().process_frame(frame, direction) if isinstance(frame, StartFrame): - await self.push_frame(frame, direction) await self._start_connection() elif isinstance(frame, TTSAudioRawFrame): # Send audio frame to Simli @@ -137,19 +151,41 @@ class SimliVideoService(FrameProcessor): resampled_frames = self._simli_resampler.resample(old_frame) for resampled_frame in resampled_frames: - await self._simli_client.send( - resampled_frame.to_ndarray().astype(np.int16).tobytes() - ) + audioBytes = resampled_frame.to_ndarray().astype(np.int16).tobytes() + if self._previously_interrupted: + self._audio_buffer.extend(audioBytes) + if len(self._audio_buffer) >= 128000: + try: + for flushFrame in self._simli_resampler.resample(None): + self._audio_buffer.extend( + flushFrame.to_ndarray().astype(np.int16).tobytes() + ) + finally: + await self._simli_client.playImmediate(self._audio_buffer) + self._previously_interrupted = False + self._audio_buffer = bytearray() + else: + await self._simli_client.send(audioBytes) + return except Exception as e: logger.exception(f"{self} exception: {e}") + elif isinstance(frame, TTSStoppedFrame): + try: + if self._previously_interrupted and len(self._audio_buffer) > 0: + await self._simli_client.playImmediate(self._audio_buffer) + self._previously_interrupted = False + self._audio_buffer = bytearray() + except Exception as e: + logger.exception(f"{self} exception: {e}") + return elif isinstance(frame, (EndFrame, CancelFrame)): await self._stop() - await self.push_frame(frame, direction) - elif isinstance(frame, StartInterruptionFrame): - await self._simli_client.clearBuffer() - await self.push_frame(frame, direction) - else: - await self.push_frame(frame, direction) + elif isinstance(frame, (StartInterruptionFrame, UserStartedSpeakingFrame)): + if not self._previously_interrupted: + await self._simli_client.clearBuffer() + self._previously_interrupted = self._is_trinity_avatar + + await self.push_frame(frame, direction) async def _stop(self): """Stop the Simli client and cancel processing tasks.""" diff --git a/src/pipecat/services/speechmatics/stt.py b/src/pipecat/services/speechmatics/stt.py index d6c947d38..6859306d3 100644 --- a/src/pipecat/services/speechmatics/stt.py +++ b/src/pipecat/services/speechmatics/stt.py @@ -191,7 +191,7 @@ class SpeakerFragments: passive_format = active_format return { "text": self._format_text(active_format if self.is_active else passive_format), - "user_id": self.speaker_id, + "user_id": self.speaker_id or "", "timestamp": self.timestamp, "language": self.language, "result": [frag.result for frag in self.fragments], diff --git a/src/pipecat/services/tavus/video.py b/src/pipecat/services/tavus/video.py index f9ba2833b..37b21257f 100644 --- a/src/pipecat/services/tavus/video.py +++ b/src/pipecat/services/tavus/video.py @@ -24,6 +24,7 @@ from pipecat.frames.frames import ( Frame, OutputAudioRawFrame, OutputImageRawFrame, + OutputTransportReadyFrame, StartFrame, StartInterruptionFrame, TTSAudioRawFrame, @@ -81,6 +82,7 @@ class TavusVideoService(AIService): self._send_task: Optional[asyncio.Task] = None # This is the custom track destination expected by Tavus self._transport_destination: Optional[str] = "stream" + self._transport_ready = False async def setup(self, setup: FrameProcessorSetup): """Set up the Tavus video service. @@ -145,7 +147,8 @@ class TavusVideoService(AIService): format=video_frame.color_format, ) frame.transport_source = video_source - await self.push_frame(frame) + if self._transport_ready: + await self.push_frame(frame) async def _on_participant_audio_data( self, participant_id: str, audio: AudioData, audio_source: str @@ -157,7 +160,8 @@ class TavusVideoService(AIService): num_channels=audio.num_channels, ) frame.transport_source = audio_source - await self.push_frame(frame) + if self._transport_ready: + await self.push_frame(frame) def can_generate_metrics(self) -> bool: """Check if this service can generate processing metrics. @@ -221,6 +225,9 @@ class TavusVideoService(AIService): await self.push_frame(frame, direction) elif isinstance(frame, TTSAudioRawFrame): await self._handle_audio_frame(frame) + elif isinstance(frame, OutputTransportReadyFrame): + self._transport_ready = True + await self.push_frame(frame, direction) else: await self.push_frame(frame, direction) diff --git a/src/pipecat/transports/services/tavus.py b/src/pipecat/transports/services/tavus.py index 10ae001e0..e4e57717a 100644 --- a/src/pipecat/transports/services/tavus.py +++ b/src/pipecat/transports/services/tavus.py @@ -245,6 +245,10 @@ class TavusTransportClient: on_recording_started=partial(self._on_handle_callback, "on_recording_started"), on_recording_stopped=partial(self._on_handle_callback, "on_recording_stopped"), on_recording_error=partial(self._on_handle_callback, "on_recording_error"), + on_transcription_stopped=partial( + self._on_handle_callback, "on_transcription_stopped" + ), + on_transcription_error=partial(self._on_handle_callback, "on_transcription_error"), ) self._client = DailyTransportClient( room_url, None, "Pipecat", self._params, daily_callbacks, self._bot_name diff --git a/uv.lock b/uv.lock index 21fa76755..8a6eb2f82 100644 --- a/uv.lock +++ b/uv.lock @@ -1185,13 +1185,13 @@ wheels = [ [[package]] name = "daily-python" -version = "0.19.5" +version = "0.19.6" source = { registry = "https://pypi.org/simple" } wheels = [ - { url = "https://files.pythonhosted.org/packages/2a/ca/683c8a729b43a6e0ac4296973908be1c9cb0956bca69ecd6e5e4c4d56015/daily_python-0.19.5-cp37-abi3-macosx_10_15_x86_64.whl", hash = "sha256:a1c3e70a4dd87d0a829ebbb657c87cd20737246ffe1bc8351010cef8cdb34a52", size = 13686352, upload-time = "2025-07-30T20:19:03.658Z" }, - { url = "https://files.pythonhosted.org/packages/90/6e/a0e735021a27e81d2f3d26a2664886e06608cda50bd71ddd3d111be39c72/daily_python-0.19.5-cp37-abi3-macosx_11_0_arm64.whl", hash = "sha256:98faf21a04bd29086245319e3a4df0ba164f0d0224fc4f8278365e70d927ed45", size = 12018783, upload-time = "2025-07-30T20:19:05.921Z" }, - { url = "https://files.pythonhosted.org/packages/8c/60/454e0f7efe7086411bf8a679caf866f036d6b51a68154f5d1cdd22933bfd/daily_python-0.19.5-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:9021e012ec3a39faea1afd4af2c1744e2c67e3a5d397358529b375a7c917f756", size = 14055444, upload-time = "2025-07-30T20:19:08.043Z" }, - { url = "https://files.pythonhosted.org/packages/49/1e/cc4e3d04aef2a84a48ff9c0a8c5e58d0fd617b3ebf0cd9c8f0ab73cd6346/daily_python-0.19.5-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:b64d67f513097004d96796562a46020ff982c7633985be124faf59a4157344b0", size = 14568186, upload-time = "2025-07-30T20:19:10.196Z" }, + { url = "https://files.pythonhosted.org/packages/4a/73/74a313e6557a6f4da153b38f3feb5b9e4012268322976c79495ce2f228ec/daily_python-0.19.6-cp37-abi3-macosx_10_15_x86_64.whl", hash = "sha256:d35d181f9ba824b62ca531449b237dc9aa771e69c00205a6efa359c371208ee6", size = 13687145, upload-time = "2025-08-02T01:56:51.197Z" }, + { url = "https://files.pythonhosted.org/packages/bb/ad/fe2f81fc516851200ffec75f8ce44f631b85c9bc5bbdf9f6637f0823e99a/daily_python-0.19.6-cp37-abi3-macosx_11_0_arm64.whl", hash = "sha256:826342ee863086bb7192230615e26920700706ea79824f7028305f084e18d2e9", size = 12045258, upload-time = "2025-08-02T01:56:53.38Z" }, + { url = "https://files.pythonhosted.org/packages/22/0e/52aa355208d015134c88055ed53eed83452204590ac20ee2dcd1d6b4fa95/daily_python-0.19.6-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:6ff87c59e677c04c33e46963517e566a77fba425e9d52048418db4e037c6a1d6", size = 14066192, upload-time = "2025-08-02T01:56:55.669Z" }, + { url = "https://files.pythonhosted.org/packages/32/a0/cab8d478c5ae576bc607431a4d8d9e5a0aae831c096cf833ba2aa1535501/daily_python-0.19.6-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:d15bf8657105a92ddbc88da4dd5bded88075cdf2bc33552406b93f586fdaeced", size = 14575791, upload-time = "2025-08-02T01:56:57.983Z" }, ] [[package]] @@ -4200,7 +4200,7 @@ requires-dist = [ { name = "azure-cognitiveservices-speech", marker = "extra == 'azure'", specifier = "~=1.42.0" }, { name = "cartesia", marker = "extra == 'cartesia'", specifier = "~=2.0.3" }, { name = "coremltools", marker = "extra == 'local-smart-turn'", specifier = ">=8.0" }, - { name = "daily-python", marker = "extra == 'daily'", specifier = "~=0.19.5" }, + { name = "daily-python", marker = "extra == 'daily'", specifier = "~=0.19.6" }, { name = "deepgram-sdk", marker = "extra == 'deepgram'", specifier = "~=4.7.0" }, { name = "docstring-parser", specifier = "~=0.16" }, { name = "einops", marker = "extra == 'moondream'", specifier = "~=0.8.0" },