Compare commits

...

104 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
007b2fe0c9 pass int instead of float to 2025-01-24 14:29:59 -08:00
Filipi da Silva Fuchter
7c52736ff6 Merge pull request #1030 from pipecat-ai/gemini_grounding_metadata
Introduce support for extracting and processing grounding metadata from GoogleLLMService.
2025-01-24 15:41:54 -03:00
Mark Backman
48ce751602 Merge pull request #1075 from Vaibhav159/vl_add_daily_meeting_token_v2
adding models to DailyRestHelper
2025-01-24 13:21:52 -05:00
Vaibhav159
1f1e2dac2b wrapping things up 2025-01-24 23:44:23 +05:30
Vaibhav159
71c2dc3d05 minor typing change 2025-01-24 23:38:44 +05:30
Vaibhav159
ef02ece662 doc string 2025-01-24 22:47:40 +05:30
Vaibhav159
d5818fad5b addressing comments 2025-01-24 22:46:54 +05:30
Vaibhav159
c5faac1cf8 adding RecordingsBucketConfig 2025-01-24 15:14:20 +05:30
Vaibhav159
e106d7a215 adding line space 2025-01-24 09:12:07 +05:30
Vaibhav159
40c1a8369a updated changelog 2025-01-24 09:11:15 +05:30
Vaibhav159
6ab2404a98 adding more properties to daily room 2025-01-24 09:10:25 +05:30
Mark Backman
e61c996a2e Merge pull request #1079 from ecdeng/patch-1
Update cartesia.py to use the new model pointer `sonic`
2025-01-23 22:15:30 -05:00
Eric Deng
2c81dc1f06 Update cartesia.py to use the new model pointer sonic instead of sonic-english
We are now using `sonic` as a pointer to the latest stable release (https://docs.cartesia.ai/build-with-sonic/models#continuous-updates). sonic-english will forever point to `sonic-2024-10-19`, which is already out of date.
2025-01-23 15:47:07 -08:00
Mark Backman
d4e4b12109 Merge pull request #1071 from porcelaincode/patch-1
Update runner.py
2025-01-23 13:19:22 -05:00
Mark Backman
466d26a4f2 Merge pull request #1077 from Vaibhav159/vl_fix_missing_leftover_audio
adding missing audio buffer fix
2025-01-23 13:16:41 -05:00
Vaibhav159
ef511d580d adding missing audio buffer fix 2025-01-23 23:17:49 +05:30
Vaibhav159
5957ddb038 adding missing audio buffer fix 2025-01-23 23:17:18 +05:30
Vaibhav159
799c2d14b8 adding meeting token v2 func 2025-01-23 21:40:42 +05:30
vatsal
dee1224530 Update runner.py 2025-01-23 13:21:49 +05:30
Mark Backman
fc6aa6eae8 Merge pull request #1060 from chhao01/patch-1
[bug]TypeError: object of type 'NoneType' has no len()
2025-01-22 19:14:35 -05:00
Mark Backman
ddd5bf70ab Merge pull request #1061 from Allenmylath/patch-21
Update README.md
2025-01-22 19:13:15 -05:00
allenmylath
aa59744444 Update examples/README.md
Co-authored-by: Mark Backman <m.backman@gmail.com>
2025-01-23 05:38:37 +05:30
chadbailey59
067ddfe505 Storytelling chatbot updates (#1066)
* initial changes for gemini storybot

* storybot updates for gemini

* more storybot updates

* interim interruptible commit

* cleanup

* cleanup

* cleanup

* cleanup
2025-01-22 15:20:21 -06:00
Mark Backman
a64df978e7 Merge pull request #1046 from pipecat-ai/mb/transcript-tts
Modified `TranscriptProcessor` to use `TTSTextFrame`s
2025-01-22 15:46:01 -05:00
Mark Backman
7167719761 Emit a transcription callback when receiving a CancelFrame, update examples accordingly 2025-01-22 14:56:29 -05:00
Mark Backman
e1430be9f9 Code review fixes 2025-01-22 14:56:29 -05:00
Mark Backman
c2fe8e7fdb Updated CHANGELOG 2025-01-22 14:56:28 -05:00
Mark Backman
31c77d8e35 Update examples for the updated TranscriptProcessor 2025-01-22 14:56:00 -05:00
Mark Backman
2a60d54830 Update the AssistantTranscriptProcessor to use TTSTextFrames in place of OpenAILLMContextFrames 2025-01-22 14:56:00 -05:00
Aleix Conchillo Flaqué
b3c99887dc Merge pull request #1068 from Canonical-AI-Inc/import-fix
Fixing missing import
2025-01-22 11:37:49 -08:00
Mark Backman
38ad75cc17 Merge pull request #1065 from pipecat-ai/mb/fix-openai_realtime-function-calling
OpenAIRealtimeBetaLLMService: Fixed an error in function calling
2025-01-22 14:37:01 -05:00
Adrian Cowham
2debac314c fixing missing import 2025-01-22 11:06:53 -08:00
Mark Backman
e0c9a1a1a2 Merge pull request #1041 from Allenmylath/patch-20
Update bot.py
2025-01-22 09:18:19 -05:00
allenmylath
4cdcca588e Update examples/moondream-chatbot/bot.py
Co-authored-by: Mark Backman <m.backman@gmail.com>
2025-01-22 19:40:12 +05:30
allenmylath
a90e81e2eb Update examples/moondream-chatbot/bot.py
Co-authored-by: Mark Backman <m.backman@gmail.com>
2025-01-22 19:38:36 +05:30
Mark Backman
0ba60c9e28 Merge pull request #975 from imsakg/main
fix(gemini): prevent non-audio modality processing
2025-01-22 09:03:18 -05:00
Mark Backman
5ca5fbd825 OpenAIRealtimeBetaLLMService: Fixed an error in function calling 2025-01-22 08:54:03 -05:00
allenmylath
2b52e2c109 Update README.md
Silero-tts changed to VAD, also description regarding session handling added to websocket chatbot
2025-01-22 14:42:35 +05:30
Cheng Hao
7e8fc2e7e2 [bug]TypeError: object of type 'NoneType' has no len()
Sometimes the chunk.choices is None, and I got exception like: 
```
TypeError: object of type 'NoneType' has no len()
```
2025-01-22 15:31:27 +08:00
Aleix Conchillo Flaqué
0d79a9eaa6 update CHANGELOG.md 2025-01-21 18:00:10 -08:00
Aleix Conchillo Flaqué
f89b9ec23f Merge pull request #1057 from pipecat-ai/aleix/replace-resampy-soxr
improve audio resampling by switching from resampy to soxr
2025-01-21 17:52:49 -08:00
Mark Backman
20d5824e56 Merge pull request #1058 from pipecat-ai/mb/fix-trace-log 2025-01-21 20:44:50 -05:00
Aleix Conchillo Flaqué
f23baa78d8 test-requirements: add soxr and remove resampy 2025-01-21 17:40:17 -08:00
Aleix Conchillo Flaqué
cacd6ba3fa improve audio resampling by switching from resampy to soxr 2025-01-21 17:40:17 -08:00
Aleix Conchillo Flaqué
f87ecd3a51 Merge pull request #1048 from pipecat-ai/aleix/add-unittest-utils
tests: add some initial run_test() utilities
2025-01-21 17:39:06 -08:00
Mark Backman
b96a922aa8 Fix trace log line for resume_processing_frames 2025-01-21 18:15:03 -05:00
Aleix Conchillo Flaqué
401d3ff267 tests: added PipelineTask tests 2025-01-21 11:45:43 -08:00
Aleix Conchillo Flaqué
ab4221a4db task: added BaseTask 2025-01-21 11:45:43 -08:00
Aleix Conchillo Flaqué
bd6f82cf94 task: allow specifying heartbeat period 2025-01-21 11:45:43 -08:00
Aleix Conchillo Flaqué
dd21b424d6 pyproject: ignore 'audioop' deprecation warning 2025-01-21 10:27:34 -08:00
Aleix Conchillo Flaqué
76884877dd tests: add pytest-asyncio dependency 2025-01-21 10:23:19 -08:00
Aleix Conchillo Flaqué
0d6c680133 README: add unit tests badge 2025-01-21 10:14:37 -08:00
Aleix Conchillo Flaqué
a27fe4bde2 tests: move test_ai_services to test_utils_string 2025-01-21 10:06:14 -08:00
Aleix Conchillo Flaqué
177cb2ca8b tests: initial pipeline and parallelpipeline tests 2025-01-21 09:57:54 -08:00
Aleix Conchillo Flaqué
3c970a3cee tests: add more filter tests 2025-01-21 09:43:57 -08:00
Aleix Conchillo Flaqué
af02f8f1cd filters(frame_filter): allow more than one frame 2025-01-21 09:43:33 -08:00
Aleix Conchillo Flaqué
2e0fb198bf frame_processor: allow pushing more frames after EndFrame
This can be useful for testing purposes. In real practice, there shouldn't be
any frames after an EndFrame is pushed.
2025-01-21 09:42:15 -08:00
Filipi da Silva Fuchter
4f758c5a3b Merge pull request #1050 from pipecat-ai/fix_rtvi_warning_msg
Ignoring transport messages that are not intended to RTVI.
2025-01-21 13:36:50 -03:00
Filipi Fuchter
3e0836b340 Ignoring transport messages that are not intended to RTVI. 2025-01-21 10:08:14 -03:00
Aleix Conchillo Flaqué
2f23693bf3 tests: fix test_protobuf_serializer.py 2025-01-20 18:39:59 -08:00
Aleix Conchillo Flaqué
b7dd9748cf serializers: fix special fix initialization 2025-01-20 18:39:41 -08:00
Aleix Conchillo Flaqué
d4d9c3b7ae tests: fix test_aggregators.py 2025-01-20 18:16:14 -08:00
Aleix Conchillo Flaqué
090bc81ec5 tests: add some initial run_test() utilities 2025-01-20 17:41:21 -08:00
Filipi Fuchter
9b61633aa0 Introduce support for extracting and processing grounding metadata from Google LLM responses. 2025-01-20 11:28:12 -03:00
Mark Backman
e3d53d3d9a Merge pull request #1044 from pipecat-ai/mb/elevenlabs-http-fix-voice-settings
Fixed a type error when using voice_settings in ElevenLabsHttpTTSService
2025-01-20 08:11:38 -05:00
Mark Backman
262d3a19c9 Fixed a type error when using voice_settings in ElevenLabsHttpTTSService 2025-01-20 07:57:02 -05:00
allenmylath
491feb691c Update bot.py
quiet and talking frames are determined based on BotStartedSpeakingFrame and BotStoppedSpeakingFrame not ttsframe
2025-01-20 14:00:17 +05:30
Aleix Conchillo Flaqué
e4f83b237e update CHANGELOG (remove 07d-interruptible-elevenlabs-http.py) 2025-01-19 11:36:18 -08:00
Aleix Conchillo Flaqué
a169e0cde9 Merge pull request #1035 from pipecat-ai/aleix/prepare-0.0.53
update CHANGELOG for 0.0.53
2025-01-18 14:50:35 -08:00
Aleix Conchillo Flaqué
c6d643d4ec update CHANGELOG for 0.0.53 2025-01-18 14:48:48 -08:00
Aleix Conchillo Flaqué
2abbd4bb27 Merge pull request #1039 from pipecat-ai/aleix/fish-audio-websocket-service
services(fish): FishAudioTTSService to use WebsocketService
2025-01-18 14:48:20 -08:00
Aleix Conchillo Flaqué
e0011a3996 services(fish): FishAudioTTSService to use WebsocketService 2025-01-18 14:29:45 -08:00
Aleix Conchillo Flaqué
ea44c59ddd Merge pull request #1037 from Vaibhav159/fixing_unused_11labs_package
removing unused 11labs package imports
2025-01-17 22:08:04 -08:00
Vaibhav159
a9c7dbbc05 removing unused code 2025-01-18 10:58:07 +05:30
Vaibhav159
8a87e92b2b adding missing 11labs package 2025-01-18 10:48:57 +05:30
Mark Backman
982f2becc6 Merge pull request #1002 from pipecat-ai/mb/add-on-error-callback
Register the on_error handler
2025-01-17 21:58:59 -05:00
Mark Backman
e049ae470d Register the on_error handler 2025-01-17 21:49:42 -05:00
Mark Backman
e159f2dce1 Merge pull request #1024 from pipecat-ai/mb/elevenlabs-http
Add ElevenLabsHttpTTSService
2025-01-17 21:30:31 -05:00
Aleix Conchillo Flaqué
e9162ae467 Merge pull request #1004 from Fluentsai/feature/dtmf_input
Twilio serializer reading dtmf websocket messages
2025-01-17 18:14:46 -08:00
Aleix Conchillo Flaqué
bb65512ff4 Merge pull request #1034 from pipecat-ai/aleix/ulaw-resample-update
ulaw resample update
2025-01-17 17:47:18 -08:00
Mark Backman
b81323d676 Code review fixes + docstrings 2025-01-17 20:12:43 -05:00
Aleix Conchillo Flaqué
65fa77dfa5 audio: use resample_audio to resample ulaw bytes 2025-01-17 15:24:41 -08:00
Aleix Conchillo Flaqué
9ddd9ae27c Merge pull request #1011 from Vaibhav159/vl_deepgram_metrics_without_vad
adding metric generation without deepgram VAD
2025-01-17 14:47:19 -08:00
Aleix Conchillo Flaqué
12fc6e17ef Merge pull request #1033 from pipecat-ai/aleix/observers-performance
task: add TaskObserver and avoid pipeline blocking
2025-01-17 14:43:26 -08:00
Aleix Conchillo Flaqué
3e4020cdba task: add TaskObserver and avoid pipeline blocking
Observers now process frames in separate tasks. This avoids blocking the
pipeline while the observer is processing the frame.
2025-01-17 11:15:52 -08:00
Aleix Conchillo Flaqué
4f883ee31f Merge pull request #1023 from pipecat-ai/aleix/introduce-heartbeat-frames
introduce heartbeat frames
2025-01-17 10:31:07 -08:00
Mark Backman
3ff360f042 Merge pull request #1032 from pipecat-ai/mb/user-idle-fixes
Start UserIdleProcessor on speaking frame, fix bug not pushing EndFrame
2025-01-17 13:18:09 -05:00
Aleix Conchillo Flaqué
45cbad5b3e task: add HEARTBEAT_MONITOR_SECONDS 2025-01-17 10:11:28 -08:00
Aleix Conchillo Flaqué
477d0d154b frame_processor: make sure clock is initialized 2025-01-17 10:05:23 -08:00
Aleix Conchillo Flaqué
4b3c776f58 task: don't use push queue to send a heartbeat
This is because we might be waiting for the EndFrame. Currently, if we push an
EndFrame to the task, the task will block until the EndFrame traverses all the
pipeline.
2025-01-17 10:04:24 -08:00
Aleix Conchillo Flaqué
da0c4cfd99 task: increase heartbeat monitoring to 5 seconds 2025-01-17 10:04:05 -08:00
Aleix Conchillo Flaqué
f22a00570d task: start heartbeats task when push task starts 2025-01-17 10:03:13 -08:00
Mark Backman
85f4663a41 Start UserIdleProcessor on speaking frame, fix bug not pushing EndFrame 2025-01-17 12:54:17 -05:00
Mark Backman
d51893f61c Refactor for aiohttp, correct use of settings 2025-01-16 23:49:53 -05:00
Mark Backman
740d2743df Add TTFB metrics 2025-01-16 23:05:53 -05:00
Mark Backman
225b65c3d2 Add ElevenLabsHttpTTSService 2025-01-16 22:46:32 -05:00
Aleix Conchillo Flaqué
2503f76107 examples: add 31-heartbeats.py 2025-01-16 19:31:13 -08:00
Aleix Conchillo Flaqué
ff8aa68942 introduce heartbeat frames 2025-01-16 19:31:13 -08:00
Maxim Makatchev
c5edbf4b75 Made InputDTMFFrame a DataFrame and moved up to data frames 2025-01-17 12:27:04 +09:00
Vaibhav159
85e7d62f94 fixing log text 2025-01-16 21:36:51 +05:30
Vaibhav159
923d33eeff fixing ruff 2025-01-16 21:32:48 +05:30
Vaibhav159
7ee6e7193d adding metric generation without deepgram VAD 2025-01-16 21:23:56 +05:30
Maxim Makatchev
dcf317f2fa Twilio serializer reading dtmf websocket messages and generating InputDTMFFrame containing the corresponding value of KeypadEntry 2025-01-16 17:43:12 +09:00
Mert Sefa AKGUN
14e5419913 fix(gemini): prevent non-audio modality processing
Add an early return in the _handle_transcribe_model_audio method to
prevent unnecessary processing when the modalities setting is not set
to audio. This change ensures that audio transcription only occurs
when appropriate.
2025-01-12 22:17:10 +03:00
61 changed files with 1919 additions and 702 deletions

View File

@@ -1,4 +1,4 @@
name: test
name: tests
on:
workflow_dispatch:
@@ -49,4 +49,4 @@ jobs:
- name: Test with pytest
run: |
source .venv/bin/activate
pytest --ignore-glob="*to_be_updated*" --ignore-glob=*pipeline_source* src tests
pytest

View File

@@ -9,13 +9,74 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- It is now possible to specify the period of the `PipelineTask` heartbeat
frames with `heartbeats_period_secs`.
- Added `DailyMeetingTokenProperties` and `DailyMeetingTokenParams` Pydantic models
for meeting token creation in `get_token` method of `DailyRESTHelper`.
- Added `enable_recording` and `geo` parameters to `DailyRoomProperties`.
- Added `RecordingsBucketConfig` to `DailyRoomProperties` to upload recordings to a custom AWS bucket.
### Changed
- Modified `TranscriptProcessor` to use TTS text frames for more accurate assistant
transcripts. Assistant messages are now aggregated based on bot speaking boundaries
rather than LLM context, providing better handling of interruptions and partial
utterances.
- Updated foundational examples `28a-transcription-processor-openai.py`,
`28b-transcript-processor-anthropic.py`, and
`28c-transcription-processor-gemini.py` to use the updated
`TranscriptProcessor`.
### Fixed
- Fixed a type error when using `voice_settings` in `ElevenLabsHttpTTSService`.
- Fixed an issue where `OpenAIRealtimeBetaLLMService` function calling resulted
in an error.
- Fixed an issue in `AudioBufferProcessor` where the last audio buffer was not
being processed, in cases where the `_user_audio_buffer` was smaller than the
buffer size.
### Performance
- Replaced audio resampling library `resampy` with `soxr`. Resampling a 2:21s
audio file from 24KHz to 16KHz took 1.41s with `resampy` and 0.031s with
`soxr` with similar audio quality.
### Other
- Added initial unit test infrastructure.
## [0.0.53] - 2025-01-18
### Added
- Added `ElevenLabsHttpTTSService` which uses EleveLabs' HTTP API instead of the
websocket one.
- Introduced pipeline frame observers. Observers can view all the frames that go
through the pipeline without the need to inject processors in the
pipeline. This can be useful, for example, to implement frame loggers or
debuggers among other things.
debuggers among other things. The example
`examples/foundational/30-observer.py` shows how to add an observer to a
pipeline for debugging.
- Added `30-observer.py` to show how to add an Observer to a pipeline for
debugging.
- Introduced heartbeat frames. The pipeline task can now push periodic
heartbeats down the pipeline when `enable_heartbeats=True`. Heartbeats are
system frames that are supposed to make it all the way to the end of the
pipeline. When a heartbeat frame is received the traversing time (i.e. the
time it took to go through the whole pipeline) will be displayed (with TRACE
logging) otherwise a warning will be shown. The example
`examples/foundational/31-heartbeats.py` shows how to enable heartbeats and
forces warnings to be displayed.
- Added `LLMTextFrame` and `TTSTextFrame` which should be pushed by LLM and TTS
services respectively instead of `TextFrame`s.
- Added `OpenRouter` for OpenRouter integration with an OpenAI-compatible
interface. Added foundational example `14m-function-calling-openrouter.py`.
@@ -56,6 +117,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Modified `UserIdleProcessor` to start monitoring only after first
conversation activity (`UserStartedSpeakingFrame` or
`BotStartedSpeakingFrame`) instead of immediately.
- Modified `OpenAIAssistantContextAggregator` to support controlled completions
and to emit context update callbacks via `FunctionCallResultProperties`.
@@ -79,6 +144,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue where `DeepgramSTTService` was not generating metrics using
pipeline's VAD.
- Fixed `UserIdleProcessor` not properly propagating `EndFrame`s through the
pipeline.
- Fixed an issue where websocket based TTS services could incorrectly terminate
their connection due to a retry counter not resetting.

View File

@@ -2,7 +2,7 @@
 <img alt="pipecat" width="300px" height="auto" src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/pipecat.png">
</div></h1>
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) [![Docs](https://img.shields.io/badge/Documentation-blue)](https://docs.pipecat.ai) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat) <a href="https://app.commanddash.io/agent/github_pipecat-ai_pipecat"><img src="https://img.shields.io/badge/AI-Code%20Agent-EB9FDA"></a>
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) ![Tests](https://github.com/pipecat-ai/pipecat/actions/workflows/tests.yaml/badge.svg) [![Docs](https://img.shields.io/badge/Documentation-blue)](https://docs.pipecat.ai) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat) <a href="https://app.commanddash.io/agent/github_pipecat-ai_pipecat"><img src="https://img.shields.io/badge/AI-Code%20Agent-EB9FDA"></a>
Pipecat is an open source Python framework for building voice and multimodal conversational agents. It handles the complex orchestration of AI services, network transport, audio processing, and multimodal interactions, letting you focus on creating engaging experiences.
@@ -189,7 +189,7 @@ pip install "path_to_this_repo[option,...]"
From the root directory, run:
```shell
pytest --doctest-modules --ignore-glob="*to_be_updated*" --ignore-glob=*pipeline_source* src tests
pytest
```
## Setting up your editor

View File

@@ -4,6 +4,7 @@ pip-tools~=7.4.1
pre-commit~=4.0.1
pyright~=1.1.392
pytest~=8.3.4
pytest-asyncio~=0.25.2
ruff~=0.9.1
setuptools~=75.8.0
setuptools_scm~=8.1.0

View File

@@ -42,7 +42,7 @@ Next, follow the steps in the README for each demo.
| [Dialin Chatbot](dialin-chatbot) | A chatbot that connects to an incoming phone call from Daily or Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
| [Twilio Chatbot](twilio-chatbot) | A chatbot that connects to an incoming phone call from Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
| [studypal](studypal) | A chatbot to have a conversation about any article on the web | |
| [WebSocket Chatbot Server](websocket-server) | A real-time websocket server that handles audio streaming and bot interactions with speech-to-text and text-to-speech capabilities | `python-websockets`, `openai`, `deepgram`, `silero-tts`, `numpy` |
| [WebSocket Chatbot Server](websocket-server) | A real-time websocket server that handles audio streaming and bot interactions with speech-to-text and text-to-speech capabilities. | Cartesia, Deepgram, OpenAI, Websockets |
> [!IMPORTANT]
> These example projects use Daily as a WebRTC transport and can be joined using their hosted Prebuilt UI.

View File

@@ -53,4 +53,3 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
return (url, token)

View File

@@ -7,7 +7,7 @@
import asyncio
import os
import sys
from typing import List
from typing import List, Optional
import aiohttp
from dotenv import load_dotenv
@@ -15,7 +15,11 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.frames.frames import (
CancelFrame,
TranscriptionMessage,
TranscriptionUpdateFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -33,13 +37,49 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptHandler:
"""Simple handler to demonstrate transcript processing.
"""Handles real-time transcript processing and output.
Maintains a list of conversation messages and logs them with timestamps.
Maintains a list of conversation messages and outputs them either to a log
or to a file as they are received. Each message includes its timestamp and role.
Attributes:
messages: List of all processed transcript messages
output_file: Optional path to file where transcript is saved. If None, outputs to log only.
"""
def __init__(self):
def __init__(self, output_file: Optional[str] = None):
"""Initialize handler with optional file output.
Args:
output_file: Path to output file. If None, outputs to log only.
"""
self.messages: List[TranscriptionMessage] = []
self.output_file: Optional[str] = output_file
logger.debug(
f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}"
)
async def save_message(self, message: TranscriptionMessage):
"""Save a single transcript message.
Outputs the message to the log and optionally to a file.
Args:
message: The message to save
"""
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}{message.role}: {message.content}"
# Always log the message
logger.info(f"Transcript: {line}")
# Optionally write to file
if self.output_file:
try:
with open(self.output_file, "a", encoding="utf-8") as f:
f.write(line + "\n")
except Exception as e:
logger.error(f"Error saving transcript message to file: {e}")
async def on_transcript_update(
self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame
@@ -50,13 +90,11 @@ class TranscriptHandler:
processor: The TranscriptProcessor that emitted the update
frame: TranscriptionUpdateFrame containing new messages
"""
self.messages.extend(frame.messages)
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
# Log the new messages
logger.info("New transcript messages:")
for msg in frame.messages:
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
logger.info(f"{timestamp}{msg.role}: {msg.content}")
self.messages.append(msg)
await self.save_message(msg)
async def main():
@@ -99,7 +137,8 @@ async def main():
# Create transcript processor and handler
transcript = TranscriptProcessor()
transcript_handler = TranscriptHandler()
transcript_handler = TranscriptHandler() # Output to log only
# transcript_handler = TranscriptHandler(output_file="transcript.txt") # Output to file and log
pipeline = Pipeline(
[
@@ -110,8 +149,8 @@ async def main():
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
transcript.assistant(), # Assistant transcripts
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -130,7 +169,8 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# Stop the pipeline immediately when the participant leaves
await task.queue_frame(CancelFrame())
runner = PipelineRunner()

View File

@@ -7,7 +7,7 @@
import asyncio
import os
import sys
from typing import List
from typing import List, Optional
import aiohttp
from dotenv import load_dotenv
@@ -15,7 +15,11 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.frames.frames import (
CancelFrame,
TranscriptionMessage,
TranscriptionUpdateFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -33,13 +37,49 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptHandler:
"""Simple handler to demonstrate transcript processing.
"""Handles real-time transcript processing and output.
Maintains a list of conversation messages and logs them with timestamps.
Maintains a list of conversation messages and outputs them either to a log
or to a file as they are received. Each message includes its timestamp and role.
Attributes:
messages: List of all processed transcript messages
output_file: Optional path to file where transcript is saved. If None, outputs to log only.
"""
def __init__(self):
def __init__(self, output_file: Optional[str] = None):
"""Initialize handler with optional file output.
Args:
output_file: Path to output file. If None, outputs to log only.
"""
self.messages: List[TranscriptionMessage] = []
self.output_file: Optional[str] = output_file
logger.debug(
f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}"
)
async def save_message(self, message: TranscriptionMessage):
"""Save a single transcript message.
Outputs the message to the log and optionally to a file.
Args:
message: The message to save
"""
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}{message.role}: {message.content}"
# Always log the message
logger.info(f"Transcript: {line}")
# Optionally write to file
if self.output_file:
try:
with open(self.output_file, "a", encoding="utf-8") as f:
f.write(line + "\n")
except Exception as e:
logger.error(f"Error saving transcript message to file: {e}")
async def on_transcript_update(
self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame
@@ -50,13 +90,11 @@ class TranscriptHandler:
processor: The TranscriptProcessor that emitted the update
frame: TranscriptionUpdateFrame containing new messages
"""
self.messages.extend(frame.messages)
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
# Log the new messages
logger.info("New transcript messages:")
for msg in frame.messages:
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
logger.info(f"{timestamp}{msg.role}: {msg.content}")
self.messages.append(msg)
await self.save_message(msg)
async def main():
@@ -99,7 +137,8 @@ async def main():
# Create transcript processor and handler
transcript = TranscriptProcessor()
transcript_handler = TranscriptHandler()
transcript_handler = TranscriptHandler() # Output to log only
# transcript_handler = TranscriptHandler(output_file="transcript.txt") # Output to file and log
pipeline = Pipeline(
[
@@ -110,8 +149,8 @@ async def main():
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
transcript.assistant(), # Assistant transcripts
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -130,7 +169,8 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# Stop the pipeline immediately when the participant leaves
await task.queue_frame(CancelFrame())
runner = PipelineRunner()

View File

@@ -7,7 +7,7 @@
import asyncio
import os
import sys
from typing import List
from typing import List, Optional
import aiohttp
from dotenv import load_dotenv
@@ -15,7 +15,11 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.frames.frames import (
CancelFrame,
TranscriptionMessage,
TranscriptionUpdateFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -34,13 +38,49 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptHandler:
"""Simple handler to demonstrate transcript processing.
"""Handles real-time transcript processing and output.
Maintains a list of conversation messages and logs them with timestamps.
Maintains a list of conversation messages and outputs them either to a log
or to a file as they are received. Each message includes its timestamp and role.
Attributes:
messages: List of all processed transcript messages
output_file: Optional path to file where transcript is saved. If None, outputs to log only.
"""
def __init__(self):
def __init__(self, output_file: Optional[str] = None):
"""Initialize handler with optional file output.
Args:
output_file: Path to output file. If None, outputs to log only.
"""
self.messages: List[TranscriptionMessage] = []
self.output_file: Optional[str] = output_file
logger.debug(
f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}"
)
async def save_message(self, message: TranscriptionMessage):
"""Save a single transcript message.
Outputs the message to the log and optionally to a file.
Args:
message: The message to save
"""
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}{message.role}: {message.content}"
# Always log the message
logger.info(f"Transcript: {line}")
# Optionally write to file
if self.output_file:
try:
with open(self.output_file, "a", encoding="utf-8") as f:
f.write(line + "\n")
except Exception as e:
logger.error(f"Error saving transcript message to file: {e}")
async def on_transcript_update(
self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame
@@ -51,13 +91,11 @@ class TranscriptHandler:
processor: The TranscriptProcessor that emitted the update
frame: TranscriptionUpdateFrame containing new messages
"""
self.messages.extend(frame.messages)
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
# Log the new messages
logger.info("New transcript messages:")
for msg in frame.messages:
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
logger.info(f"{timestamp}{msg.role}: {msg.content}")
self.messages.append(msg)
await self.save_message(msg)
async def main():
@@ -102,7 +140,8 @@ async def main():
# Create transcript processor and handler
transcript = TranscriptProcessor()
transcript_handler = TranscriptHandler()
transcript_handler = TranscriptHandler() # Output to log only
# transcript_handler = TranscriptHandler(output_file="transcript.txt") # Output to file and log
pipeline = Pipeline(
[
@@ -113,8 +152,8 @@ async def main():
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
transcript.assistant(), # Assistant transcripts
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -140,7 +179,8 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# Stop the pipeline immediately when the participant leaves
await task.queue_frame(CancelFrame())
runner = PipelineRunner()

View File

@@ -0,0 +1,130 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from pathlib import Path
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.google import GoogleLLMService, LLMSearchResponseFrame
from pipecat.transports.services.daily import DailyParams, DailyTransport
sys.path.append(str(Path(__file__).parent.parent))
from runner import configure
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
# Function handlers for the LLM
search_tool = {"google_search_retrieval": {}}
tools = [search_tool]
system_instruction = """
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
Always use the google search API to retrieve the latest news. You must also use it to check which day is today.
You can:
- Use the Google search API to check the current date.
- Provide the most recent and relevant news from any place by using the google search API.
- Answer any questions the user may have, ensuring your responses are accurate and concise.
Start each interaction by asking the user about which place they would like to know the information.
"""
class LLMSearchLoggerProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMSearchResponseFrame):
print(f"LLMSearchLoggerProcessor: {frame}")
await self.push_frame(frame)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Latest news!",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
# Initialize the Gemini Multimodal Live model
llm = GoogleLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,
)
context = OpenAILLMContext(
[
{
"role": "user",
"content": "Start by greeting the user warmly, introducing yourself, and mentioning the current day. Be friendly and engaging to set a positive tone for the interaction.",
}
],
)
context_aggregator = llm.create_context_aggregator(context)
llm_search_logger = LLMSearchLoggerProcessor()
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
llm_search_logger,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,43 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import sys
from loguru import logger
from pipecat.frames.frames import Frame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class NullProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
async def main():
"""This test shows heartbeat monitoring by displaying a warning when
heartbeats are not received.
"""
pipeline = Pipeline([NullProcessor()])
task = PipelineTask(pipeline, PipelineParams(enable_heartbeats=True))
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -67,8 +67,8 @@ talking_frame = SpriteFrame(images=sprites)
class TalkingAnimation(FrameProcessor):
"""This class starts a talking animation when it receives an first AudioFrame,
and then returns to a "quiet" sprite when it sees a TTSStoppedFrame.
"""This class starts a talking animation when it receives an first BotStartedSpeakingFrame,
and then returns to a "quiet" sprite when it sees a BotStoppedSpeakingFrame.
"""
def __init__(self):

View File

@@ -66,7 +66,7 @@ The build UI files can be found in `frontend/out`
Start the API / bot manager:
`python src/bot_runner.py`
`python src/bot_runner.py --host localhost`
If you'd like to run a custom domain or port:

View File

@@ -4,6 +4,7 @@ ELEVENLABS_API_KEY=
ELEVENLABS_VOICE_ID=
FAL_KEY=
OPENAI_API_KEY=
GOOGLE_API_KEY=
ENV= # dev | production
RUN_AS_VM= # Set this if you want to run bots on process (not launch a new VM)

View File

@@ -1,4 +1,4 @@
import React, { useState } from "react";
import React, { useState, useEffect } from "react";
import {
useDaily,
useParticipantIds,
@@ -33,7 +33,9 @@ const Story: React.FC<StoryProps> = ({ handleLeave }) => {
setTimeout(() => daily.setLocalAudio(true), 500);
setStoryState("user");
} else {
daily.setLocalAudio(false);
// Uncomment the next line to mute the mic while the
// assistant it talking. Leave it commented to allow for interruptions
// daily.setLocalAudio(false);
setStoryState("assistant");
}
},
@@ -58,7 +60,7 @@ const Story: React.FC<StoryProps> = ({ handleLeave }) => {
{participantIds.length >= 1 ? (
<VideoTile
sessionId={participantIds[0]}
inactive={storyState === "user"}
inactive={false}
/>
) : (
<span className="p-3 rounded-full bg-gray-900/60 animate-pulse">
@@ -71,7 +73,7 @@ const Story: React.FC<StoryProps> = ({ handleLeave }) => {
)}
<DailyAudio />
</div>
<UserInputIndicator active={storyState === "user"} />
<UserInputIndicator active={true} />
</div>
);
};

View File

@@ -43,25 +43,8 @@
transition: opacity 0.5s ease;
}
@keyframes pulse {
0% {
outline-width: 6px;
@apply outline-teal-500/10;
}
50% {
outline-width: 24px;
@apply outline-teal-500/50;
}
100% {
outline-width: 6px;
@apply outline-teal-500/10;
}
}
.micIconActive{
@apply bg-teal-950 border-teal-500 outline-teal-500/20;
animation: pulse 2s infinite ease-in-out;
}
.micIconActive svg{

View File

@@ -1,4 +1,4 @@
import React, { useState, useEffect } from "react";
import React, { useState, useEffect, useRef } from "react";
import { useAppMessage } from "@daily-co/daily-react";
import { DailyEventObjectAppMessage } from "@daily-co/daily-js";
@@ -13,12 +13,31 @@ interface Props {
export default function UserInputIndicator({ active }: Props) {
const [transcription, setTranscription] = useState<string[]>([]);
const timeoutRef = useRef<NodeJS.Timeout>();
const resetTimeout = () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
}
timeoutRef.current = setTimeout(() => {
setTranscription([]);
}, 5000);
};
useEffect(() => {
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
}
};
}, []);
useAppMessage({
onAppMessage: (e: DailyEventObjectAppMessage<any>) => {
if (e.fromId && e.fromId === "transcription") {
if (e.data.user_id === "" && e.data.is_final) {
setTranscription((t) => [...t, ...e.data.text.split(" ")]);
resetTimeout();
}
}
},

View File

@@ -2,4 +2,4 @@ async_timeout
fastapi
uvicorn
python-dotenv
pipecat-ai[daily,elevenlabs,openai,fal]
pipecat-ai[daily,openai,fal,google,cartesia]

View File

@@ -13,16 +13,23 @@ import aiohttp
from dotenv import load_dotenv
from loguru import logger
from processors import StoryImageProcessor, StoryProcessor
from prompts import CUE_USER_TURN, LLM_BASE_PROMPT, LLM_INTRO_PROMPT
from prompts import CUE_USER_TURN, LLM_BASE_PROMPT
from utils.helpers import load_images, load_sounds
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, StopTaskFrame
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, StopTaskFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.logger import FrameLogger
from pipecat.services.cartesia import CartesiaHttpTTSService, CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.services.google import GoogleLLMService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import (
DailyParams,
@@ -53,6 +60,7 @@ async def main(room_url, token=None):
camera_out_width=768,
camera_out_height=768,
transcription_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_enabled=True,
),
)
@@ -61,11 +69,10 @@ async def main(room_url, token=None):
# -------------- Services --------------- #
llm_service = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm_service = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"))
tts_service = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id=os.getenv("ELEVENLABS_VOICE_ID")
)
fal_service_params = FalImageGenService.InputParams(
@@ -74,7 +81,7 @@ async def main(room_url, token=None):
fal_service = FalImageGenService(
aiohttp_session=session,
model="fal-ai/fast-lightning-sdxl",
model="fal-ai/stable-diffusion-v35-medium",
params=fal_service_params,
key=os.getenv("FAL_KEY"),
)
@@ -97,35 +104,8 @@ async def main(room_url, token=None):
runner = PipelineRunner()
# The intro pipeline is used to start
# the story (as per LLM_INTRO_PROMPT)
intro_pipeline = Pipeline([llm_service, tts_service, transport.output()])
intro_task = PipelineTask(intro_pipeline)
logger.debug("Waiting for participant...")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.debug("Participant joined, storytime commence!")
await transport.capture_participant_transcription(participant["id"])
await intro_task.queue_frames(
[
images["book1"],
LLMMessagesFrame([LLM_INTRO_PROMPT]),
DailyTransportMessageFrame(CUE_USER_TURN),
sounds["listening"],
images["book2"],
StopTaskFrame(),
]
)
# We run the intro pipeline. This will start the transport. The intro
# task will exit after StopTaskFrame is processed.
await runner.run(intro_task)
# The main story pipeline is used to continue the story based on user
# input.
main_pipeline = Pipeline(
[
transport.input(),
@@ -139,11 +119,32 @@ async def main(room_url, token=None):
]
)
main_task = PipelineTask(main_pipeline)
main_task = PipelineTask(
main_pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.debug("Participant joined, storytime commence!")
await transport.capture_participant_transcription(participant["id"])
await main_task.queue_frames(
[
images["book1"],
context_aggregator.user().get_context_frame(),
DailyTransportMessageFrame(CUE_USER_TURN),
# sounds["listening"],
images["book2"],
]
)
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await intro_task.queue_frame(EndFrame())
await main_task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")

View File

@@ -114,7 +114,7 @@ async def start_bot(request: Request) -> JSONResponse:
else:
try:
subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
[f"python -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
@@ -175,7 +175,7 @@ async def virtualize_bot(room_url: str, token: str):
image = data[0]["config"]["image"]
# Machine configuration
cmd = f"python3 src/bot.py -u {room_url} -t {token}"
cmd = f"python src/bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {
@@ -215,7 +215,7 @@ async def virtualize_bot(room_url: str, token: str):
if __name__ == "__main__":
# Check environment variables
required_env_vars = [
"OPENAI_API_KEY",
"GOOGLE_API_KEY",
"DAILY_API_KEY",
"FAL_KEY",
"ELEVENLABS_VOICE_ID",

View File

@@ -37,8 +37,7 @@ class StoryPromptFrame(TextFrame):
class StoryImageProcessor(FrameProcessor):
"""
Processor for image prompt frames that will be sent to the FAL service.
"""Processor for image prompt frames that will be sent to the FAL service.
This processor is responsible for consuming frames of type `StoryImageFrame`.
It processes them by passing it to the FAL service.
@@ -68,8 +67,7 @@ class StoryImageProcessor(FrameProcessor):
class StoryProcessor(FrameProcessor):
"""
Primary frame processor. It takes the frames generated by the LLM
"""Primary frame processor. It takes the frames generated by the LLM
and processes them into image prompts and story pages (sentences).
For a clearer picture of how this works, reference prompts.py
@@ -97,44 +95,10 @@ class StoryProcessor(FrameProcessor):
await self.push_frame(sounds["talking"])
elif isinstance(frame, TextFrame):
# We want to look for sentence breaks in the text
# but since TextFrames are streamed from the LLM
# we need to keep a buffer of the text we've seen so far
# Add new text to the buffer
self._text += frame.text
# IMAGE PROMPT
# Looking for: < [image prompt] > in the LLM response
# We prompted our LLM to add an image prompt in the response
# so we use regex matching to find it and yield a StoryImageFrame
if re.search(r"<.*?>", self._text):
if not re.search(r"<.*?>.*?>", self._text):
# Pass any frames until we have a closing bracket
# otherwise the image prompt will be passed to TTS
pass
# Extract the image prompt from the text using regex
image_prompt = re.search(r"<(.*?)>", self._text).group(1)
# Remove the image prompt from the text
self._text = re.sub(r"<.*?>", "", self._text, count=1)
# Process the image prompt frame
await self.push_frame(StoryImageFrame(image_prompt))
# STORY PAGE
# Looking for: [break] in the LLM response
# We prompted our LLM to add a [break] after each sentence
# so we use regex matching to find it in the LLM response
if re.search(r".*\[[bB]reak\].*", self._text):
# Remove the [break] token from the text
# so it isn't spoken out loud by the TTS
self._text = re.sub(r"\[[bB]reak\]", "", self._text, flags=re.IGNORECASE)
self._text = self._text.replace("\n", " ")
if len(self._text) > 2:
# Append the sentence to the story
self._story.append(self._text)
await self.push_frame(StoryPageFrame(self._text))
# Assert that it's the LLMs turn, until we're finished
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
# Clear the buffer
self._text = ""
# Process any complete patterns in the order they appear
await self.process_text_content()
# End of a full LLM response
# Driven by the prompt, the LLM should have asked the user for input
@@ -150,3 +114,38 @@ class StoryProcessor(FrameProcessor):
# Anything that is not a TextFrame pass through
else:
await self.push_frame(frame)
async def process_text_content(self):
"""Process text content in order of appearance, handling both image prompts and story breaks."""
while True:
# Find the first occurrence of each pattern
image_match = re.search(r"<(.*?)>", self._text)
break_match = re.search(r"\[[bB]reak\]", self._text)
# If neither pattern is found, we're done processing
if not image_match and not break_match:
break
# Find which pattern comes first in the text
image_pos = image_match.start() if image_match else float("inf")
break_pos = break_match.start() if break_match else float("inf")
if image_pos < break_pos:
# Process image prompt first
image_prompt = image_match.group(1)
# Remove the image prompt from the text
self._text = self._text[: image_match.start()] + self._text[image_match.end() :]
await self.push_frame(StoryImageFrame(image_prompt))
else:
# Process story break first
parts = re.split(r"\[[bB]reak\]", self._text, flags=re.IGNORECASE, maxsplit=1)
before_break = parts[0].replace("\n", " ").strip()
if len(before_break) > 2:
self._story.append(before_break)
await self.push_frame(StoryPageFrame(before_break))
# await self.push_frame(sounds["ding"])
await self.push_frame(DailyTransportMessageFrame(CUE_ASSISTANT_TURN))
# Keep the remainder (if any) in the buffer
self._text = parts[1].strip() if len(parts) > 1 else ""

View File

@@ -1,31 +1,34 @@
LLM_INTRO_PROMPT = {
"role": "system",
"content": "You are a creative storyteller who loves to tell whimsical, fantastical stories. \
Your goal is to craft an engaging and fun story. \
Start by asking the user what kind of story they'd like to hear. Don't provide any examples. \
Keep your response to only a few sentences.",
}
LLM_BASE_PROMPT = {
"role": "system",
"content": "You are a creative storyteller who loves tell whimsical, fantastical stories. \
Your goal is to craft an engaging and fun story. \
Keep all responses short and no more than a few sentences. Include [break] after each sentence of the story. \
\
Start each sentence with an image prompt, wrapped in triangle braces, that I can use to generate an illustration representing the upcoming scene. \
Image prompts should always be wrapped in triangle braces, like this: <image prompt goes here>. \
You should provide as much descriptive detail in your image prompt as you can to help recreate the current scene depicted by the sentence. \
For any recurring characters, you should provide a description of them in the image prompt each time, for example: <a brown fluffy dog ...>. \
Please do not include any character names in the image prompts, just their descriptions. \
Image prompts should focus on key visual attributes of all characters each time, for example <a brown fluffy dog and the tiny red cat ...>. \
Please use the following structure for your image prompts: characters, setting, action, and mood. \
Image prompts should be less than 150-200 characters and start in lowercase. \
\
Responses should use the format: <...> story sentence [break] <...> story sentence [break] ... \
After each response, ask me how I'd like the story to continue and wait for my input. \
Please ensure your responses are less than 3-4 sentences long. \
Please refrain from using any explicit language or content. Do not tell scary stories.",
"content": """You are a creative storyteller who loves tell whimsical, fantastical stories.
Your goal is to craft an engaging and fun story.
Keep all responses short and no more than a few sentences.
Start by asking the user what kind of story they'd like to hear. Don't provide any examples.
After they've answered the question, start telling the story. Include [break] after each sentence of the story.
Start each sentence with an image prompt, wrapped in triangle braces, that I can use to generate an illustration representing the upcoming scene.
Image prompts should always be wrapped in triangle braces, like this: <image prompt goes here>.
You should provide as much descriptive detail in your image prompt as you can to help recreate the current scene depicted by the sentence.
For any recurring characters, you should provide a description of them in the image prompt each time, for example: <a brown fluffy dog ...>.
Please do not include any character names in the image prompts, just their descriptions.
Image prompts should focus on key visual attributes of all characters each time, for example <a brown fluffy dog and the tiny red cat ...>.
Please use the following structure for your image prompts: characters, setting, action, and mood.
Image prompts should be less than 150-200 characters and start in lowercase.
STORY SENTENCE OUTPUT FORMAT:
<image description 1>
story sentence 1 [break]
<image description 2>
story sentence 2 [break]
<image description 3>
story sentence 3 [break]
How would you like the story to continue?
END OF EXAMPLE OUTPUT
Generate three story sentences, then ask what should happen next and wait for my input. You can propose an idea for how the story should proceed, but make sure to tell me I can suggest whatever I want. \
Please ensure your responses are less than 5 sentences long. \
Please refrain from using any explicit language or content. Do not tell scary stories.
Once you've started telling the story, EVERY RESPONSE should follow the story sentence output format. It is VERY IMPORTANT that you continue to include <image descriptions> and [break] between story sentences. DO NOT RESPOND without image descriptions and break tags.""",
}

View File

@@ -32,8 +32,7 @@ dependencies = [
"protobuf~=5.29.3",
"pydantic~=2.10.5",
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"tenacity~=9.0.0"
"soxr~=0.5.0"
]
[project.urls]
@@ -63,7 +62,7 @@ fireworks = [ "openai~=1.59.6" ]
krisp = [ "pipecat-ai-krisp~=0.3.0" ]
koala = [ "pvkoala~=2.0.3" ]
langchain = [ "langchain~=0.3.14", "langchain-community~=0.3.14", "langchain-openai~=0.3.0" ]
livekit = [ "livekit~=0.19.1", "livekit-api~=0.8.1" ]
livekit = [ "livekit~=0.19.1", "livekit-api~=0.8.1", "tenacity~=9.0.0" ]
lmnt = [ "websockets~=13.1" ]
local = [ "pyaudio~=0.2.14" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ]
@@ -86,7 +85,13 @@ openrouter = [ "openai~=1.59.6" ]
where = ["src"]
[tool.pytest.ini_options]
addopts = "--verbose"
testpaths = ["tests"]
pythonpath = ["src"]
asyncio_default_fixture_loop_scope = "function"
filterwarnings = [
"ignore:'audioop' is deprecated:DeprecationWarning",
]
[tool.setuptools_scm]
local_scheme = "no-local-version"

View File

@@ -8,14 +8,14 @@ import audioop
import numpy as np
import pyloudnorm as pyln
import resampy
import soxr
def resample_audio(audio: bytes, original_rate: int, target_rate: int) -> bytes:
if original_rate == target_rate:
return audio
audio_data = np.frombuffer(audio, dtype=np.int16)
resampled_audio = resampy.resample(audio_data, original_rate, target_rate)
resampled_audio = soxr.resample(audio_data, original_rate, target_rate)
return resampled_audio.astype(np.int16).tobytes()
@@ -80,14 +80,14 @@ def ulaw_to_pcm(ulaw_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
in_pcm_bytes = audioop.ulaw2lin(ulaw_bytes, 2)
# Resample
out_pcm_bytes = audioop.ratecv(in_pcm_bytes, 2, 1, in_sample_rate, out_sample_rate, None)[0]
out_pcm_bytes = resample_audio(in_pcm_bytes, in_sample_rate, out_sample_rate)
return out_pcm_bytes
def pcm_to_ulaw(pcm_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
# Resample
in_pcm_bytes = audioop.ratecv(pcm_bytes, 2, 1, in_sample_rate, out_sample_rate, None)[0]
in_pcm_bytes = resample_audio(pcm_bytes, in_sample_rate, out_sample_rate)
# Convert PCM to μ-law
ulaw_bytes = audioop.lin2ulaw(in_pcm_bytes, 2)

View File

@@ -5,6 +5,7 @@
#
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Literal, Mapping, Optional, Tuple
from pipecat.audio.vad.vad_analyzer import VADParams
@@ -18,6 +19,23 @@ if TYPE_CHECKING:
from pipecat.observers.base_observer import BaseObserver
class KeypadEntry(str, Enum):
"""DTMF entries."""
ONE = "1"
TWO = "2"
THREE = "3"
FOUR = "4"
FIVE = "5"
SIX = "6"
SEVEN = "7"
EIGHT = "8"
NINE = "9"
ZERO = "0"
POUND = "#"
STAR = "*"
def format_pts(pts: int | None):
return nanoseconds_to_str(pts) if pts else None
@@ -375,6 +393,13 @@ class TransportMessageFrame(DataFrame):
return f"{self.name}(message: {self.message})"
@dataclass
class InputDTMFFrame(DataFrame):
"""A DTMF button input"""
button: KeypadEntry
#
# System frames
#
@@ -424,6 +449,16 @@ class FatalErrorFrame(ErrorFrame):
fatal: bool = field(default=True, init=False)
@dataclass
class HeartbeatFrame(SystemFrame):
"""This frame is used by the pipeline task as a mechanism to know if the
pipeline is running properly.
"""
timestamp: int
@dataclass
class EndTaskFrame(SystemFrame):
"""This is used to notify the pipeline task that the pipeline should be

View File

@@ -0,0 +1,56 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from typing import AsyncIterable, Iterable
from pipecat.frames.frames import Frame
class BaseTask(ABC):
@abstractmethod
def has_finished(self) -> bool:
"""Indicates whether the tasks has finished. That is, all processors
have stopped.
"""
pass
@abstractmethod
async def stop_when_done(self):
"""This is a helper function that sends an EndFrame to the pipeline in
order to stop the task after everything in it has been processed.
"""
pass
@abstractmethod
async def cancel(self):
"""
Stops the running pipeline immediately.
"""
pass
@abstractmethod
async def run(self):
"""
Starts running the given pipeline.
"""
pass
@abstractmethod
async def queue_frame(self, frame: Frame):
"""
Queue a frame to be pushed down the pipeline.
"""
pass
@abstractmethod
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
"""
Queues multiple frames to be pushed down the pipeline.
"""
pass

View File

@@ -19,6 +19,7 @@ from pipecat.frames.frames import (
EndTaskFrame,
ErrorFrame,
Frame,
HeartbeatFrame,
MetricsFrame,
StartFrame,
StopTaskFrame,
@@ -26,19 +27,26 @@ from pipecat.frames.frames import (
from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.base_task import BaseTask
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.utils import obj_count, obj_id
HEARTBEAT_SECONDS = 1.0
HEARTBEAT_MONITOR_SECONDS = HEARTBEAT_SECONDS * 5
class PipelineParams(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
allow_interruptions: bool = False
enable_heartbeats: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
send_initial_empty_metrics: bool = True
report_only_initial_ttfb: bool = False
observers: List[BaseObserver] = []
heartbeats_period_secs: float = HEARTBEAT_SECONDS
class Source(FrameProcessor):
@@ -58,25 +66,10 @@ class Source(FrameProcessor):
match direction:
case FrameDirection.UPSTREAM:
await self._handle_upstream_frame(frame)
await self._up_queue.put(frame)
case FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
async def _handle_upstream_frame(self, frame: Frame):
if isinstance(frame, EndTaskFrame):
# Tell the task we should end nicely.
await self._up_queue.put(EndTaskFrame())
elif isinstance(frame, CancelTaskFrame):
# Tell the task we should end right away.
await self._up_queue.put(CancelTaskFrame())
elif isinstance(frame, ErrorFrame):
logger.error(f"Error running app: {frame}")
if frame.fatal:
# Cancel all tasks downstream.
await self.push_frame(CancelFrame())
# Tell the task we should stop.
await self._up_queue.put(StopTaskFrame())
class Sink(FrameProcessor):
"""This is the sink processor that is linked at the end of the pipeline
@@ -91,36 +84,10 @@ class Sink(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We really just want to know when the EndFrame reached the sink.
if isinstance(frame, EndFrame):
await self._down_queue.put(frame)
await self._down_queue.put(frame)
class Observer(BaseObserver):
"""This is a pipeline frame observer that is used as a proxy to the user
provided observers. That is, this is the only observer passed to the frame
processors. Then, every time a frame is pushed this observer will call all
the observers registered to the pipeline task.
"""
def __init__(self, observers: List[BaseObserver] = []):
self._observers = observers
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
for observer in self._observers:
await observer.on_push_frame(src, dst, frame, direction, timestamp)
class PipelineTask:
class PipelineTask(BaseTask):
def __init__(
self,
pipeline: BasePipeline,
@@ -135,9 +102,18 @@ class PipelineTask:
self._params = params
self._finished = False
# This queue receives frames coming from the pipeline upstream.
self._up_queue = asyncio.Queue()
# This queue receives frames coming from the pipeline downstream.
self._down_queue = asyncio.Queue()
# This queue is the queue used to push frames to the pipeline.
self._push_queue = asyncio.Queue()
# This is the heartbeat queue. When a heartbeat frame is received in the
# down queue we add it to the heartbeat queue for processing.
self._heartbeat_queue = asyncio.Queue()
# This event is used to indicate an EndFrame has been received in the
# down queue.
self._endframe_event = asyncio.Event()
self._source = Source(self._up_queue)
self._source.link(pipeline)
@@ -145,36 +121,52 @@ class PipelineTask:
self._sink = Sink(self._down_queue)
pipeline.link(self._sink)
self._observer = Observer(params.observers)
self._observer = TaskObserver(params.observers)
def has_finished(self):
def has_finished(self) -> bool:
"""Indicates whether the tasks has finished. That is, all processors
have stopped.
"""
return self._finished
async def stop_when_done(self):
"""This is a helper function that sends an EndFrame to the pipeline in
order to stop the task after everything in it has been processed.
"""
logger.debug(f"Task {self} scheduled to stop when done")
await self.queue_frame(EndFrame())
async def cancel(self):
"""
Stops the running pipeline immediately.
"""
logger.debug(f"Canceling pipeline task {self}")
# Make sure everything is cleaned up downstream. This is sent
# out-of-band from the main streaming task which is what we want since
# we want to cancel right away.
await self._source.push_frame(CancelFrame())
self._process_push_task.cancel()
self._process_up_task.cancel()
await self._process_push_task
await self._process_up_task
await self._cancel_tasks(True)
async def run(self):
self._process_up_task = asyncio.create_task(self._process_up_queue())
self._process_push_task = asyncio.create_task(self._process_push_queue())
await asyncio.gather(self._process_up_task, self._process_push_task)
"""
Starts running the given pipeline.
"""
tasks = self._create_tasks()
await asyncio.gather(*tasks)
self._finished = True
async def queue_frame(self, frame: Frame):
"""
Queue a frame to be pushed down the pipeline.
"""
await self._push_queue.put(frame)
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
"""
Queues multiple frames to be pushed down the pipeline.
"""
if isinstance(frames, AsyncIterable):
async for frame in frames:
await self.queue_frame(frame)
@@ -182,6 +174,43 @@ class PipelineTask:
for frame in frames:
await self.queue_frame(frame)
def _create_tasks(self):
tasks = []
self._process_up_task = asyncio.create_task(self._process_up_queue())
self._process_down_task = asyncio.create_task(self._process_down_queue())
self._process_push_task = asyncio.create_task(self._process_push_queue())
tasks = [self._process_up_task, self._process_down_task, self._process_push_task]
return tasks
def _maybe_start_heartbeat_tasks(self):
if self._params.enable_heartbeats:
self._heartbeat_push_task = asyncio.create_task(self._heartbeat_push_handler())
self._heartbeat_monitor_task = asyncio.create_task(self._heartbeat_monitor_handler())
async def _cancel_tasks(self, cancel_push: bool):
await self._maybe_cancel_heartbeat_tasks()
if cancel_push:
self._process_push_task.cancel()
await self._process_push_task
self._process_up_task.cancel()
await self._process_up_task
self._process_down_task.cancel()
await self._process_down_task
await self._observer.stop()
async def _maybe_cancel_heartbeat_tasks(self):
if self._params.enable_heartbeats:
self._heartbeat_push_task.cancel()
await self._heartbeat_push_task
self._heartbeat_monitor_task.cancel()
await self._heartbeat_monitor_task
def _initial_metrics_frame(self) -> MetricsFrame:
processors = self._pipeline.processors_with_metrics()
data = []
@@ -190,9 +219,20 @@ class PipelineTask:
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
return MetricsFrame(data=data)
async def _wait_for_endframe(self):
await self._endframe_event.wait()
self._endframe_event.clear()
async def _process_push_queue(self):
"""This is the task that runs the pipeline for the first time by sending
a StartFrame and by pushing any other frames queued by the user. It runs
until the tasks is canceled or stopped (e.g. with an EndFrame).
"""
self._clock.start()
self._maybe_start_heartbeat_tasks()
start_frame = StartFrame(
allow_interruptions=self._params.allow_interruptions,
enable_metrics=self._params.enable_metrics,
@@ -224,29 +264,91 @@ class PipelineTask:
await self._source.cleanup()
await self._pipeline.cleanup()
await self._sink.cleanup()
# We just enqueue None to terminate the task gracefully.
self._process_up_task.cancel()
await self._process_up_task
async def _wait_for_endframe(self):
# NOTE(aleix): the Sink element just pushes EndFrames to the down queue,
# so just wait for it. In the future we might do something else here,
# but for now this is fine.
await self._down_queue.get()
# Finally, cancel internal tasks. We don't cancel the push tasks because
# that's us.
await self._cancel_tasks(False)
async def _process_up_queue(self):
"""This is the task that processes frames coming upstream from the
pipeline. These frames might indicate, for example, that we want the
pipeline to be stopped (e.g. EndTaskFrame) in which case we would send
an EndFrame down the pipeline.
"""
while True:
try:
frame = await self._up_queue.get()
if isinstance(frame, EndTaskFrame):
# Tell the task we should end nicely.
await self.queue_frame(EndFrame())
elif isinstance(frame, CancelTaskFrame):
# Tell the task we should end right away.
await self.queue_frame(CancelFrame())
elif isinstance(frame, StopTaskFrame):
await self.queue_frame(StopTaskFrame())
elif isinstance(frame, ErrorFrame):
logger.error(f"Error running app: {frame}")
if frame.fatal:
# Cancel all tasks downstream.
await self.queue_frame(CancelFrame())
# Tell the task we should stop.
await self.queue_frame(StopTaskFrame())
self._up_queue.task_done()
except asyncio.CancelledError:
break
async def _process_down_queue(self):
"""This tasks process frames coming downstream from the pipeline. For
example, heartbeat frames or an EndFrame which would indicate all
processors have handled the EndFrame and therefore we can exit the task
cleanly.
"""
while True:
try:
frame = await self._down_queue.get()
if isinstance(frame, EndFrame):
self._endframe_event.set()
elif isinstance(frame, HeartbeatFrame):
await self._heartbeat_queue.put(frame)
self._down_queue.task_done()
except asyncio.CancelledError:
break
async def _heartbeat_push_handler(self):
"""
This tasks pushes a heartbeat frame every heartbeat period.
"""
while True:
try:
# Don't use `queue_frame()` because if an EndFrame is queued the
# task will just stop waiting for the pipeline to finish not
# allowing more frames to be pushed.
await self._source.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time()))
await asyncio.sleep(self._params.heartbeats_period_secs)
except asyncio.CancelledError:
break
async def _heartbeat_monitor_handler(self):
"""This tasks monitors heartbeat frames. If a heartbeat frame has not
been received for a long period a warning will be logged. It also logs
the time that a heartbeat frame takes to processes, that is how long it
takes for the heartbeat frame to traverse all the pipeline.
"""
wait_time = HEARTBEAT_MONITOR_SECONDS
while True:
try:
frame = await asyncio.wait_for(self._heartbeat_queue.get(), timeout=wait_time)
process_time = (self._clock.get_time() - frame.timestamp) / 1_000_000_000
logger.trace(f"{self}: heartbeat frame processed in {process_time} seconds")
self._heartbeat_queue.task_done()
except asyncio.TimeoutError:
logger.warning(
f"{self}: heartbeat frame not received for more than {wait_time} seconds"
)
except asyncio.CancelledError:
break
def __str__(self):
return self.name

View File

@@ -0,0 +1,97 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import List
from attr import dataclass
from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import BaseObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@dataclass
class Proxy:
"""This is the data we receive from the main observer and that we put into
a queue for later processing.
"""
queue: asyncio.Queue
task: asyncio.Task
observer: BaseObserver
@dataclass
class ObserverData:
"""This is the data we receive from the main observer and that we put into a
proxy queue for later processing.
"""
src: FrameProcessor
dst: FrameProcessor
frame: Frame
direction: FrameDirection
timestamp: int
class TaskObserver(BaseObserver):
"""This is a pipeline frame observer that is meant to be used as a proxy to
the user provided observers. That is, this is the observer that should be
passed to the frame processors. Then, every time a frame is pushed this
observer will call all the observers registered to the pipeline task.
This observer makes sure that passing frames to observers doesn't block the
pipeline by creating a queue and a task for each user observer. When a frame
is received, it will be put in a queue for efficiency and later processed by
each task.
"""
def __init__(self, observers: List[BaseObserver] = []):
self._proxies: List[Proxy] = self._create_proxies(observers)
async def stop(self):
"""Stops all proxy observer tasks."""
for proxy in self._proxies:
proxy.task.cancel()
await proxy.task
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
for proxy in self._proxies:
await proxy.queue.put(
ObserverData(
src=src, dst=dst, frame=frame, direction=direction, timestamp=timestamp
)
)
def _create_proxies(self, observers) -> List[Proxy]:
proxies = []
for observer in observers:
queue = asyncio.Queue()
task = asyncio.create_task(self._proxy_task_handler(queue, observer))
proxy = Proxy(queue=queue, task=task, observer=observer)
proxies.append(proxy)
return proxies
async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver):
while True:
try:
data = await queue.get()
await observer.on_push_frame(
data.src, data.dst, data.frame, data.direction, data.timestamp
)
except asyncio.CancelledError:
break

View File

@@ -6,6 +6,7 @@
from pipecat.audio.utils import interleave_stereo_audio, mix_audio, resample_audio
from pipecat.frames.frames import (
EndFrame,
Frame,
InputAudioRawFrame,
OutputAudioRawFrame,
@@ -86,6 +87,9 @@ class AudioBufferProcessor(FrameProcessor):
if self._buffer_size > 0 and len(self._user_audio_buffer) > self._buffer_size:
await self._call_on_audio_data_handler()
if isinstance(frame, EndFrame):
await self._call_on_audio_data_handler()
await self.push_frame(frame, direction)
async def _call_on_audio_data_handler(self):

View File

@@ -11,7 +11,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class FrameFilter(FrameProcessor):
def __init__(self, types: Tuple[Type[Frame]]):
def __init__(self, types: Tuple[Type[Frame], ...]):
super().__init__()
self._types = types

View File

@@ -186,7 +186,7 @@ class FrameProcessor:
self.__should_block_frames = True
async def resume_processing_frames(self):
logger.trace("f{self}: resuming frame processing")
logger.trace(f"{self}: resuming frame processing")
self.__input_event.set()
self.__should_block_frames = False
@@ -260,7 +260,7 @@ class FrameProcessor:
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
try:
timestamp = self._clock.get_time()
timestamp = self._clock.get_time() if self._clock else 0
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
if self._observer:
@@ -293,8 +293,7 @@ class FrameProcessor:
await self.__input_frame_task
async def __input_frame_task_handler(self):
running = True
while running:
while True:
try:
if self.__should_block_frames:
logger.trace(f"{self}: frame processing paused")
@@ -311,8 +310,6 @@ class FrameProcessor:
if callback:
await callback(self, frame, direction)
running = not isinstance(frame, EndFrame)
self.__input_queue.task_done()
except asyncio.CancelledError:
logger.trace(f"{self}: cancelled input task")
@@ -330,12 +327,10 @@ class FrameProcessor:
await self.__push_frame_task
async def __push_frame_task_handler(self):
running = True
while running:
while True:
try:
(frame, direction) = await self.__push_queue.get()
await self.__internal_push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
self.__push_queue.task_done()
except asyncio.CancelledError:
logger.trace(f"{self}: cancelled push task")

View File

@@ -62,6 +62,9 @@ from pipecat.utils.string import match_endofsentence
RTVI_PROTOCOL_VERSION = "0.3.0"
RTVI_MESSAGE_LABEL = "rtvi-ai"
RTVIMessageLiteral = Literal["rtvi-ai"]
ActionResult = Union[bool, int, float, str, list, dict]
@@ -154,7 +157,7 @@ class RTVIActionFrame(DataFrame):
class RTVIMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: str
id: str
data: Optional[Dict[str, Any]] = None
@@ -170,7 +173,7 @@ class RTVIErrorResponseData(BaseModel):
class RTVIErrorResponse(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["error-response"] = "error-response"
id: str
data: RTVIErrorResponseData
@@ -182,7 +185,7 @@ class RTVIErrorData(BaseModel):
class RTVIError(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["error"] = "error"
data: RTVIErrorData
@@ -192,7 +195,7 @@ class RTVIDescribeConfigData(BaseModel):
class RTVIDescribeConfig(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["config-available"] = "config-available"
id: str
data: RTVIDescribeConfigData
@@ -203,14 +206,14 @@ class RTVIDescribeActionsData(BaseModel):
class RTVIDescribeActions(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["actions-available"] = "actions-available"
id: str
data: RTVIDescribeActionsData
class RTVIConfigResponse(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["config"] = "config"
id: str
data: RTVIConfig
@@ -221,7 +224,7 @@ class RTVIActionResponseData(BaseModel):
class RTVIActionResponse(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["action-response"] = "action-response"
id: str
data: RTVIActionResponseData
@@ -233,7 +236,7 @@ class RTVIBotReadyData(BaseModel):
class RTVIBotReady(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-ready"] = "bot-ready"
id: str
data: RTVIBotReadyData
@@ -246,7 +249,7 @@ class RTVILLMFunctionCallMessageData(BaseModel):
class RTVILLMFunctionCallMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["llm-function-call"] = "llm-function-call"
data: RTVILLMFunctionCallMessageData
@@ -256,7 +259,7 @@ class RTVILLMFunctionCallStartMessageData(BaseModel):
class RTVILLMFunctionCallStartMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["llm-function-call-start"] = "llm-function-call-start"
data: RTVILLMFunctionCallStartMessageData
@@ -269,22 +272,22 @@ class RTVILLMFunctionCallResultData(BaseModel):
class RTVIBotLLMStartedMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-llm-started"] = "bot-llm-started"
class RTVIBotLLMStoppedMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-llm-stopped"] = "bot-llm-stopped"
class RTVIBotTTSStartedMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-tts-started"] = "bot-tts-started"
class RTVIBotTTSStoppedMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-tts-stopped"] = "bot-tts-stopped"
@@ -293,19 +296,19 @@ class RTVITextMessageData(BaseModel):
class RTVIBotTranscriptionMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-transcription"] = "bot-transcription"
data: RTVITextMessageData
class RTVIBotLLMTextMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-llm-text"] = "bot-llm-text"
data: RTVITextMessageData
class RTVIBotTTSTextMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-tts-text"] = "bot-tts-text"
data: RTVITextMessageData
@@ -317,7 +320,7 @@ class RTVIAudioMessageData(BaseModel):
class RTVIBotTTSAudioMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-tts-audio"] = "bot-tts-audio"
data: RTVIAudioMessageData
@@ -330,39 +333,39 @@ class RTVIUserTranscriptionMessageData(BaseModel):
class RTVIUserTranscriptionMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["user-transcription"] = "user-transcription"
data: RTVIUserTranscriptionMessageData
class RTVIUserLLMTextMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["user-llm-text"] = "user-llm-text"
data: RTVITextMessageData
class RTVIUserStartedSpeakingMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["user-started-speaking"] = "user-started-speaking"
class RTVIUserStoppedSpeakingMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["user-stopped-speaking"] = "user-stopped-speaking"
class RTVIBotStartedSpeakingMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-started-speaking"] = "bot-started-speaking"
class RTVIBotStoppedSpeakingMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-stopped-speaking"] = "bot-stopped-speaking"
class RTVIMetricsMessage(BaseModel):
label: Literal["rtvi-ai"] = "rtvi-ai"
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["metrics"] = "metrics"
data: Mapping[str, Any]
@@ -875,7 +878,11 @@ class RTVIProcessor(FrameProcessor):
async def _handle_transport_message(self, frame: TransportMessageUrgentFrame):
try:
message = RTVIMessage.model_validate(frame.message)
transport_message = frame.message
if transport_message.get("label") != RTVI_MESSAGE_LABEL:
logger.warning(f"Ignoring not RTVI message: {transport_message}")
return
message = RTVIMessage.model_validate(transport_message)
await self._message_queue.put(message)
except ValidationError as e:
await self.send_error(f"Invalid RTVI transport message: {e}")

View File

@@ -4,17 +4,23 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import List
from typing import List, Optional
from loguru import logger
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
Frame,
OpenAILLMContextAssistantTimestampFrame,
StartInterruptionFrame,
TranscriptionFrame,
TranscriptionMessage,
TranscriptionUpdateFrame,
TTSTextFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.time import time_now_iso8601
class BaseTranscriptProcessor(FrameProcessor):
@@ -64,89 +70,74 @@ class UserTranscriptProcessor(BaseTranscriptProcessor):
class AssistantTranscriptProcessor(BaseTranscriptProcessor):
"""Processes assistant LLM context frames into timestamped conversation messages."""
"""Processes assistant TTS text frames into timestamped conversation messages.
This processor aggregates TTS text frames into complete utterances and emits them as
transcript messages. Utterances are completed when:
- The bot stops speaking (BotStoppedSpeakingFrame)
- The bot is interrupted (StartInterruptionFrame)
- The pipeline ends (EndFrame)
Attributes:
_current_text_parts: List of text fragments being aggregated for current utterance
_aggregation_start_time: Timestamp when the current utterance began
"""
def __init__(self, **kwargs):
"""Initialize processor with empty message stores."""
"""Initialize processor with aggregation state."""
super().__init__(**kwargs)
self._pending_assistant_messages: List[TranscriptionMessage] = []
self._current_text_parts: List[str] = []
self._aggregation_start_time: Optional[str] | None = None
def _extract_messages(self, messages: List[dict]) -> List[TranscriptionMessage]:
"""Extract assistant messages from the OpenAI standard message format.
async def _emit_aggregated_text(self):
"""Emit aggregated text as a transcript message."""
if self._current_text_parts and self._aggregation_start_time:
content = " ".join(self._current_text_parts).strip()
if content:
logger.debug(f"Emitting aggregated assistant message: {content}")
message = TranscriptionMessage(
role="assistant",
content=content,
timestamp=self._aggregation_start_time,
)
await self._emit_update([message])
else:
logger.debug("No content to emit after stripping whitespace")
Args:
messages: List of messages in OpenAI format, which can be either:
- Simple format: {"role": "user", "content": "Hello"}
- Content list: {"role": "user", "content": [{"type": "text", "text": "Hello"}]}
Returns:
List[TranscriptionMessage]: Normalized conversation messages
"""
result = []
for msg in messages:
if msg["role"] != "assistant":
continue
content = msg.get("content")
if isinstance(content, str):
if content:
result.append(TranscriptionMessage(role="assistant", content=content))
elif isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
text_parts.append(part["text"])
if text_parts:
result.append(
TranscriptionMessage(role="assistant", content=" ".join(text_parts))
)
return result
def _find_new_messages(self, current: List[TranscriptionMessage]) -> List[TranscriptionMessage]:
"""Find unprocessed messages from current list.
Args:
current: List of current messages
Returns:
List[TranscriptionMessage]: New messages not yet processed
"""
if not self._processed_messages:
return current
processed_len = len(self._processed_messages)
if len(current) <= processed_len:
return []
return current[processed_len:]
# Reset aggregation state
self._current_text_parts = []
self._aggregation_start_time = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames into assistant conversation messages.
Handles different frame types:
- TTSTextFrame: Aggregates text for current utterance
- BotStoppedSpeakingFrame: Completes current utterance
- StartInterruptionFrame: Completes current utterance due to interruption
- EndFrame: Completes current utterance at pipeline end
- CancelFrame: Completes current utterance due to cancellation
Args:
frame: Input frame to process
direction: Frame processing direction
"""
await super().process_frame(frame, direction)
if isinstance(frame, OpenAILLMContextFrame):
standard_messages = []
for msg in frame.context.messages:
converted = frame.context.to_standard_messages(msg)
standard_messages.extend(converted)
if isinstance(frame, TTSTextFrame):
# Start timestamp on first text part
if not self._aggregation_start_time:
self._aggregation_start_time = time_now_iso8601()
current_messages = self._extract_messages(standard_messages)
new_messages = self._find_new_messages(current_messages)
self._pending_assistant_messages.extend(new_messages)
self._current_text_parts.append(frame.text)
elif isinstance(frame, OpenAILLMContextAssistantTimestampFrame):
if self._pending_assistant_messages:
for msg in self._pending_assistant_messages:
msg.timestamp = frame.timestamp
await self._emit_update(self._pending_assistant_messages)
self._pending_assistant_messages = []
elif isinstance(frame, (BotStoppedSpeakingFrame, StartInterruptionFrame, CancelFrame)):
# Emit accumulated text when bot finishes speaking or is interrupted
await self._emit_aggregated_text()
elif isinstance(frame, EndFrame):
# Emit any remaining text when pipeline ends
await self._emit_aggregated_text()
await self.push_frame(frame, direction)
@@ -170,8 +161,8 @@ class TranscriptProcessor:
llm,
tts,
transport.output(),
transcript.assistant_tts(), # Assistant transcripts
context_aggregator.assistant(),
transcript.assistant(), # Assistant transcripts
]
)

View File

@@ -12,7 +12,6 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
@@ -20,10 +19,24 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class UserIdleProcessor(FrameProcessor):
"""This class is useful to check if the user is interacting with the bot
within a given timeout. If the timeout is reached before any interaction
occurred the provided callback will be called.
"""Monitors user inactivity and triggers callbacks after timeout periods.
Starts monitoring only after the first conversation activity (UserStartedSpeaking
or BotSpeaking).
Args:
callback: Function to call when user is idle
timeout: Seconds to wait before considering user idle
**kwargs: Additional arguments passed to FrameProcessor
Example:
async def handle_idle(processor: "UserIdleProcessor") -> None:
await send_reminder("Are you still there?")
processor = UserIdleProcessor(
callback=handle_idle,
timeout=5.0
)
"""
def __init__(
@@ -37,40 +50,72 @@ class UserIdleProcessor(FrameProcessor):
self._callback = callback
self._timeout = timeout
self._interrupted = False
self._conversation_started = False
self._idle_task = None
self._idle_event = asyncio.Event()
def _create_idle_task(self):
"""Create the idle task if it hasn't been created yet."""
if self._idle_task is None:
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
async def _stop(self):
self._idle_task.cancel()
await self._idle_task
"""Stops and cleans up the idle monitoring task."""
if self._idle_task is not None:
self._idle_task.cancel()
try:
await self._idle_task
except asyncio.CancelledError:
pass # Expected when task is cancelled
self._idle_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Processes incoming frames and manages idle monitoring state.
Args:
frame: The frame to process
direction: Direction of the frame flow
"""
await super().process_frame(frame, direction)
# Check for end frames before processing
if isinstance(frame, StartFrame):
self._create_idle_task()
elif isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
if isinstance(frame, (EndFrame, CancelFrame)):
await self.push_frame(frame, direction) # Push the frame down the pipeline
if self._idle_task:
await self._stop() # Stop the idle task, if it exists
return
await self.push_frame(frame, direction)
# We shouldn't call the idle callback if the user or the bot are speaking
if isinstance(frame, UserStartedSpeakingFrame):
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, UserStoppedSpeakingFrame):
self._interrupted = False
self._idle_event.set()
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()
# Start monitoring on first conversation activity
if not self._conversation_started and isinstance(
frame, (UserStartedSpeakingFrame, BotSpeakingFrame)
):
self._conversation_started = True
self._create_idle_task()
# Only process these events if conversation has started
if self._conversation_started:
# We shouldn't call the idle callback if the user or the bot are speaking
if isinstance(frame, UserStartedSpeakingFrame):
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, UserStoppedSpeakingFrame):
self._interrupted = False
self._idle_event.set()
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()
async def cleanup(self):
await self._stop()
def _create_idle_task(self):
self._idle_event = asyncio.Event()
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
"""Cleans up resources when processor is shutting down."""
if self._idle_task: # Only stop if task exists
await self._stop()
async def _idle_task_handler(self):
"""Monitors for idle timeout and triggers callbacks.
Runs in a loop until cancelled.
"""
while True:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)

View File

@@ -93,11 +93,11 @@ class ProtobufFrameSerializer(FrameSerializer):
id = getattr(args, "id", None)
name = getattr(args, "name", None)
pts = getattr(args, "pts", None)
if not id and "id" in args_dict:
if "id" in args_dict:
del args_dict["id"]
if not name and "name" in args_dict:
if "name" in args_dict:
del args_dict["name"]
if not pts and "pts" in args_dict:
if "pts" in args_dict:
del args_dict["pts"]
# Create the instance
@@ -105,10 +105,10 @@ class ProtobufFrameSerializer(FrameSerializer):
# Set special fields
if id:
setattr(instance, "id", getattr(args, "id", None))
setattr(instance, "id", id)
if name:
setattr(instance, "name", getattr(args, "name", None))
setattr(instance, "name", name)
if pts:
setattr(instance, "pts", getattr(args, "pts", None))
setattr(instance, "pts", pts)
return instance

View File

@@ -10,7 +10,14 @@ import json
from pydantic import BaseModel
from pipecat.audio.utils import pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import AudioRawFrame, Frame, InputAudioRawFrame, StartInterruptionFrame
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
KeypadEntry,
StartInterruptionFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
@@ -48,9 +55,7 @@ class TwilioFrameSerializer(FrameSerializer):
def deserialize(self, data: str | bytes) -> Frame | None:
message = json.loads(data)
if message["event"] != "media":
return None
else:
if message["event"] == "media":
payload_base64 = message["media"]["payload"]
payload = base64.b64decode(payload_base64)
@@ -61,3 +66,13 @@ class TwilioFrameSerializer(FrameSerializer):
audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate
)
return audio_frame
elif message["event"] == "dtmf":
digit = message.get("dtmf", {}).get("digit")
try:
return InputDTMFFrame(KeypadEntry(digit))
except ValueError as e:
# Handle case where string doesn't match any enum value
return None
else:
return None

View File

@@ -18,6 +18,7 @@ from pipecat.frames.frames import CancelFrame, EndFrame, Frame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AIService
try:
import aiofiles

View File

@@ -88,7 +88,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
voice_id: str,
cartesia_version: str = "2024-06-10",
url: str = "wss://api.cartesia.ai/tts/websocket",
model: str = "sonic-english",
model: str = "sonic",
sample_rate: int = 24000,
encoding: str = "pcm_s16le",
container: str = "raw",
@@ -329,7 +329,7 @@ class CartesiaHttpTTSService(TTSService):
*,
api_key: str,
voice_id: str,
model: str = "sonic-english",
model: str = "sonic",
base_url: str = "https://api.cartesia.ai",
sample_rate: int = 24000,
encoding: str = "pcm_s16le",

View File

@@ -20,6 +20,7 @@ from pipecat.frames.frames import (
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
@@ -169,7 +170,7 @@ class DeepgramSTTService(STTService):
return self._settings["vad_events"]
def can_generate_metrics(self) -> bool:
return self.vad_enabled
return True
async def set_model(self, model: str):
await super().set_model(model)
@@ -210,9 +211,12 @@ class DeepgramSTTService(STTService):
logger.debug("Disconnecting from Deepgram")
await self._connection.finish()
async def _on_speech_started(self, *args, **kwargs):
async def start_metrics(self):
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async def _on_speech_started(self, *args, **kwargs):
await self.start_metrics()
await self._call_event_handler("on_speech_started", *args, **kwargs)
async def _on_utterance_end(self, *args, **kwargs):
@@ -243,7 +247,10 @@ class DeepgramSTTService(STTService):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserStoppedSpeakingFrame):
if isinstance(frame, UserStartedSpeakingFrame) and not self.vad_enabled:
# Start metrics if Deepgram VAD is disabled & pipeline VAD has detected speech
await self.start_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
# https://developers.deepgram.com/docs/finalize
await self._connection.finalize()
logger.debug(f"Triggering finalize event on: {frame.name=}, {direction=}")
logger.trace(f"Triggered finalize event on: {frame.name=}, {direction=}")

View File

@@ -7,8 +7,9 @@
import asyncio
import base64
import json
from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple
from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Tuple, Union
import aiohttp
from loguru import logger
from pydantic import BaseModel, model_validator
@@ -16,6 +17,7 @@ from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
@@ -26,7 +28,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import WordTTSService
from pipecat.services.ai_services import TTSService, WordTTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
@@ -418,3 +420,160 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
class ElevenLabsHttpTTSService(TTSService):
"""ElevenLabs Text-to-Speech service using HTTP streaming.
Args:
api_key: ElevenLabs API key
voice_id: ID of the voice to use
aiohttp_session: aiohttp ClientSession
model: Model ID (default: "eleven_flash_v2_5" for low latency)
base_url: API base URL
output_format: Audio output format (PCM)
params: Additional parameters for voice configuration
"""
class InputParams(BaseModel):
language: Optional[Language] = Language.EN
optimize_streaming_latency: Optional[int] = None
stability: Optional[float] = None
similarity_boost: Optional[float] = None
style: Optional[float] = None
use_speaker_boost: Optional[bool] = None
def __init__(
self,
*,
api_key: str,
voice_id: str,
aiohttp_session: aiohttp.ClientSession,
model: str = "eleven_flash_v2_5",
base_url: str = "https://api.elevenlabs.io",
output_format: ElevenLabsOutputFormat = "pcm_24000",
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate_from_output_format(output_format), **kwargs)
self._api_key = api_key
self._base_url = base_url
self._output_format = output_format
self._params = params
self._session = aiohttp_session
self._settings = {
"sample_rate": sample_rate_from_output_format(output_format),
"language": self.language_to_service_language(params.language)
if params.language
else "en",
"output_format": output_format,
"optimize_streaming_latency": params.optimize_streaming_latency,
"stability": params.stability,
"similarity_boost": params.similarity_boost,
"style": params.style,
"use_speaker_boost": params.use_speaker_boost,
}
self.set_model_name(model)
self.set_voice(voice_id)
self._voice_settings = self._set_voice_settings()
def can_generate_metrics(self) -> bool:
return True
def _set_voice_settings(self) -> Optional[Dict[str, Union[float, bool]]]:
"""Configure voice settings if stability and similarity_boost are provided.
Returns:
Dictionary of voice settings or None if required parameters are missing.
"""
voice_settings: Dict[str, Union[float, bool]] = {}
if (
self._settings["stability"] is not None
and self._settings["similarity_boost"] is not None
):
voice_settings["stability"] = float(self._settings["stability"])
voice_settings["similarity_boost"] = float(self._settings["similarity_boost"])
if self._settings["style"] is not None:
voice_settings["style"] = float(self._settings["style"])
if self._settings["use_speaker_boost"] is not None:
voice_settings["use_speaker_boost"] = bool(self._settings["use_speaker_boost"])
else:
if self._settings["style"] is not None:
logger.warning(
"'style' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)
if self._settings["use_speaker_boost"] is not None:
logger.warning(
"'use_speaker_boost' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)
return voice_settings or None
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using ElevenLabs streaming API.
Args:
text: The text to convert to speech
Yields:
Frames containing audio data and status information
"""
logger.debug(f"Generating TTS: [{text}]")
url = f"{self._base_url}/v1/text-to-speech/{self._voice_id}/stream"
payload: Dict[str, Union[str, Dict[str, Union[float, bool]]]] = {
"text": text,
"model_id": self._model_name,
}
if self._voice_settings:
payload["voice_settings"] = self._voice_settings
if self._settings["language"]:
payload["language_code"] = self._settings["language"]
headers = {
"xi-api-key": self._api_key,
"Content-Type": "application/json",
}
# Build query parameters
params = {
"output_format": self._output_format,
}
if self._settings["optimize_streaming_latency"] is not None:
params["optimize_streaming_latency"] = self._settings["optimize_streaming_latency"]
logger.debug(f"ElevenLabs request - payload: {payload}, params: {params}")
try:
await self.start_ttfb_metrics()
async with self._session.post(
url, json=payload, headers=headers, params=params
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"{self} error: {error_text}")
yield ErrorFrame(error=f"ElevenLabs API error: {error_text}")
return
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
async for chunk in response.content:
if chunk:
await self.stop_ttfb_metrics()
yield TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"Error in run_tts: {e}")
yield ErrorFrame(error=str(e))
finally:
yield TTSStoppedFrame()

View File

@@ -4,13 +4,11 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import uuid
from typing import AsyncGenerator, Literal, Optional
from loguru import logger
from pydantic import BaseModel
from tenacity import AsyncRetrying, RetryCallState, stop_after_attempt, wait_exponential
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
@@ -28,6 +26,7 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import TTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
try:
@@ -44,7 +43,7 @@ except ModuleNotFoundError as e:
FishAudioOutputFormat = Literal["opus", "mp3", "pcm", "wav"]
class FishAudioTTSService(TTSService):
class FishAudioTTSService(TTSService, WebsocketService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN
latency: Optional[str] = "normal" # "normal" or "balanced"
@@ -105,7 +104,9 @@ class FishAudioTTSService(TTSService):
async def _connect(self):
await self._connect_websocket()
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
self._receive_task = self.get_event_loop().create_task(
self._receive_task_handler(self.push_error)
)
async def _disconnect(self):
await self._disconnect_websocket()
@@ -169,30 +170,6 @@ class FishAudioTTSService(TTSService):
except Exception as e:
logger.error(f"Error processing message: {e}")
async def _reconnect_websocket(self, retry_state: RetryCallState):
logger.warning(f"Fish Audio reconnecting (attempt: {retry_state.attempt_number})")
await self._disconnect_websocket()
await self._connect_websocket()
async def _receive_task_handler(self):
while True:
try:
async for attempt in AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
before_sleep=self._reconnect_websocket,
reraise=True,
):
with attempt:
await self._receive_messages()
except asyncio.CancelledError:
break
except Exception as e:
message = f"Fish Audio error receiving messages: {e}"
logger.error(message)
await self.push_error(ErrorFrame(message, fatal=True))
break
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

View File

@@ -288,6 +288,10 @@ class GeminiMultimodalLiveLLMService(LLMService):
)
async def _handle_transcribe_model_audio(self, audio, context):
# Early return if modalities are not set to audio.
if self._settings["modalities"] != GeminiMultimodalModalities.AUDIO:
return
text = await self._transcribe_audio(audio, context)
logger.debug(f"[Transcription:model] {text}")
# We add user messages directly to the context. We don't do that for assistant messages,

View File

@@ -0,0 +1,2 @@
from .frames import LLMSearchResponseFrame
from .google import *

View File

@@ -0,0 +1,33 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from dataclasses import dataclass, field
from typing import List, Optional
from pipecat.frames.frames import DataFrame
@dataclass
class LLMSearchResult:
text: str
confidence: Optional[float] = None
@dataclass
class LLMSearchOrigin:
site_uri: Optional[str] = None
site_title: Optional[str] = None
results: List[LLMSearchResult] = field(default_factory=list)
@dataclass
class LLMSearchResponseFrame(DataFrame):
search_result: Optional[str] = None
rendered_content: Optional[str] = None
origins: List[LLMSearchOrigin] = field(default_factory=list)
def __str__(self):
return f"LLMSearchResponseFrame(search_result={self.search_result}, origins={self.origins})"

View File

@@ -38,6 +38,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService, TTSService
from pipecat.services.google.frames import LLMSearchResponseFrame
from pipecat.services.openai import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
@@ -639,6 +640,9 @@ class GoogleLLMService(LLMService):
completion_tokens = 0
total_tokens = 0
grounding_metadata = None
search_result = ""
try:
logger.debug(
# f"Generating chat: {self._system_instruction} | {context.get_messages_for_logging()}"
@@ -698,6 +702,7 @@ class GoogleLLMService(LLMService):
try:
for c in chunk.parts:
if c.text:
search_result += c.text
await self.push_frame(LLMTextFrame(c.text))
elif c.function_call:
logger.debug(f"!!! Function call: {c.function_call}")
@@ -708,6 +713,63 @@ class GoogleLLMService(LLMService):
function_name=c.function_call.name,
arguments=args,
)
# Handle grounding metadata
# It seems only the last chunk that we receive may contain this information
# If the response doesn't include groundingMetadata, this means the response wasn't grounded.
if chunk.candidates:
for candidate in chunk.candidates:
# logger.debug(f"candidate received: {candidate}")
# Extract grounding metadata
grounding_metadata = (
{
"rendered_content": getattr(
getattr(candidate, "grounding_metadata", None),
"search_entry_point",
None,
).rendered_content
if hasattr(
getattr(candidate, "grounding_metadata", None),
"search_entry_point",
)
else None,
"origins": [
{
"site_uri": getattr(grounding_chunk.web, "uri", None),
"site_title": getattr(
grounding_chunk.web, "title", None
),
"results": [
{
"text": getattr(
grounding_support.segment, "text", ""
),
"confidence": getattr(
grounding_support, "confidence_scores", None
),
}
for grounding_support in getattr(
getattr(candidate, "grounding_metadata", None),
"grounding_supports",
[],
)
if index
in getattr(
grounding_support, "grounding_chunk_indices", []
)
],
}
for index, grounding_chunk in enumerate(
getattr(
getattr(candidate, "grounding_metadata", None),
"grounding_chunks",
[],
)
)
],
}
if getattr(candidate, "grounding_metadata", None)
else None
)
except Exception as e:
# Google LLMs seem to flag safety issues a lot!
if chunk.candidates[0].finish_reason == 3:
@@ -720,6 +782,14 @@ class GoogleLLMService(LLMService):
except Exception as e:
logger.exception(f"{self} exception: {e}")
finally:
if grounding_metadata is not None and isinstance(grounding_metadata, dict):
llm_search_frame = LLMSearchResponseFrame(
search_result=search_result,
origins=grounding_metadata["origins"],
rendered_content=grounding_metadata["rendered_content"],
)
await self.push_frame(llm_search_frame)
await self.start_llm_usage_metrics(
LLMTokenUsage(
prompt_tokens=prompt_tokens,

View File

@@ -221,7 +221,7 @@ class BaseOpenAILLMService(LLMService):
)
await self.start_llm_usage_metrics(tokens)
if len(chunk.choices) == 0:
if chunk.choices is None or len(chunk.choices) == 0:
continue
await self.stop_ttfb_metrics()

View File

@@ -6,10 +6,16 @@
import copy
import json
from typing import Optional
from loguru import logger
from pipecat.frames.frames import Frame, LLMMessagesUpdateFrame, LLMSetToolsFrame
from pipecat.frames.frames import (
Frame,
FunctionCallResultProperties,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
@@ -174,10 +180,13 @@ class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator)
if not self._function_call_result:
return
properties: Optional[FunctionCallResultProperties] = None
self._reset()
try:
run_llm = True
frame = self._function_call_result
properties = frame.properties
self._function_call_result = None
if frame.result:
# The "tool_call" message from the LLM that triggered the function call
@@ -211,11 +220,20 @@ class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator)
await self._user_context_aggregator.push_frame(
RealtimeFunctionCallResultFrame(result_frame=frame)
)
run_llm = frame.run_llm
if properties and properties.run_llm is not None:
# If the tool call result has a run_llm property, use it
run_llm = properties.run_llm
else:
# Default behavior is to run the LLM if there are no function calls in progress
run_llm = not bool(self._function_calls_in_progress)
if run_llm:
await self._user_context_aggregator.push_context_frame()
# Emit the on_context_updated callback once the function call result is added to the context
if properties and properties.on_context_updated is not None:
await properties.on_context_updated()
frame = OpenAILLMContextFrame(self._context)
await self.push_frame(frame)

View File

@@ -27,6 +27,7 @@ from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InputAudioRawFrame,
InterimTranscriptionFrame,
@@ -921,6 +922,7 @@ class DailyTransport(BaseTransport):
# these handlers.
self._register_event_handler("on_joined")
self._register_event_handler("on_left")
self._register_event_handler("on_error")
self._register_event_handler("on_app_message")
self._register_event_handler("on_call_state_updated")
self._register_event_handler("on_dialin_connected")
@@ -1035,9 +1037,17 @@ class DailyTransport(BaseTransport):
await self._call_event_handler("on_left")
async def _on_error(self, error):
# TODO(aleix): Report error to input/output transports. The one managing
# the client should report the error.
pass
await self._call_event_handler("on_error", error)
# Push error frame to notify the pipeline
error_frame = ErrorFrame(error)
if self._input:
await self._input.push_error(error_frame)
elif self._output:
await self._output.push_error(error_frame)
else:
logger.error("Both input and output are None while trying to push error")
raise RuntimeError("No valid input or output channel to push error")
async def _on_app_message(self, message: Any, sender: str):
if self._input:

View File

@@ -33,6 +33,19 @@ class DailyRoomSipParams(BaseModel):
num_endpoints: int = 1
class RecordingsBucketConfig(BaseModel):
"""Configuration for storing Daily recordings in a custom S3 bucket.
Refer to the Daily API documentation for more information:
https://docs.daily.co/guides/products/live-streaming-recording/storing-recordings-in-a-custom-s3-bucket
"""
bucket_name: str
bucket_region: str
assume_role_arn: str
allow_api_access: bool = False
class DailyRoomProperties(BaseModel, extra="allow"):
"""Properties for configuring a Daily room.
@@ -43,6 +56,8 @@ class DailyRoomProperties(BaseModel, extra="allow"):
enable_emoji_reactions: Whether emoji reactions are enabled
eject_at_room_exp: Whether to remove participants when room expires
enable_dialout: Whether SIP dial-out is enabled
enable_recording: Recording settings ('cloud', 'local', 'raw-tracks')
geo: Geographic region for room
max_participants: Maximum number of participants allowed in the room
sip: SIP configuration parameters
sip_uri: SIP URI information returned by Daily
@@ -57,7 +72,10 @@ class DailyRoomProperties(BaseModel, extra="allow"):
enable_emoji_reactions: bool = False
eject_at_room_exp: bool = True
enable_dialout: Optional[bool] = None
enable_recording: Optional[Literal["cloud", "local", "raw-tracks"]] = None
geo: Optional[str] = None
max_participants: Optional[int] = None
recordings_bucket: Optional[RecordingsBucketConfig] = None
sip: Optional[DailyRoomSipParams] = None
sip_uri: Optional[dict] = None
start_video_off: bool = False
@@ -111,6 +129,84 @@ class DailyRoomObject(BaseModel):
config: DailyRoomProperties
class DailyMeetingTokenProperties(BaseModel):
"""Properties for configuring a Daily meeting token.
Refer to the Daily API documentation for more information:
https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token#properties
"""
room_name: Optional[str] = Field(
default=None,
description="The room for which this token is valid. If not set, the token is valid for all rooms in your domain. You should always set room_name if using this token to control meeting access.",
)
eject_at_token_exp: Optional[bool] = Field(
default=None,
description="If `true`, the user will be ejected from the room when the token expires. Defaults to `false`.",
)
eject_after_elapsed: Optional[int] = Field(
default=None,
description="The number of seconds after which the user will be ejected from the room. If not provided, the user will not be ejected based on elapsed time.",
)
nbf: Optional[int] = Field(
default=None,
description="Not before. This is a unix timestamp (seconds since the epoch.) Users cannot join a meeting in with this token before this time.",
)
exp: Optional[int] = Field(
default=None,
description="Expiration time (unix timestamp in seconds). We strongly recommend setting this value for security. If not set, the token will not expire. Refer docs for more info.",
)
is_owner: Optional[bool] = Field(
default=None,
description="If `true`, the token will grant owner privileges in the room. Defaults to `false`.",
)
user_name: Optional[str] = Field(
default=None,
description="The name of the user. This will be added to the token payload.",
)
user_id: Optional[str] = Field(
default=None,
description="A unique identifier for the user. This will be added to the token payload.",
)
enable_screenshare: Optional[bool] = Field(
default=None,
description="If `true`, the user will be able to share their screen. Defaults to `true`.",
)
start_video_off: Optional[bool] = Field(
default=None,
description="If `true`, the user's video will be turned off when they join the room. Defaults to `false`.",
)
start_audio_off: Optional[bool] = Field(
default=None,
description="If `true`, the user's audio will be turned off when they join the room. Defaults to `false`.",
)
enable_recording: Optional[Literal["cloud", "local", "raw-tracks"]] = Field(
default=None,
description="Recording settings for the token. Must be one of `cloud`, `local` or `raw-tracks`.",
)
enable_prejoin_ui: Optional[bool] = Field(
default=None,
description="If `true`, the user will see the prejoin UI before joining the room.",
)
start_cloud_recording: Optional[bool] = Field(
default=None,
description="Start cloud recording when the user joins the room. This can be used to always record and archive meetings, for example in a customer support context.",
)
class DailyMeetingTokenParams(BaseModel):
"""Parameters for creating a Daily meeting token.
Refer to the Daily API documentation for more information:
https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token#body-params
"""
properties: DailyMeetingTokenProperties = Field(default_factory=DailyMeetingTokenProperties)
class DailyRESTHelper:
"""Helper class for interacting with Daily's REST API.
@@ -129,6 +225,7 @@ class DailyRESTHelper:
daily_api_url: str = "https://api.daily.co/v1",
aiohttp_session: aiohttp.ClientSession,
):
"""Initialize the Daily REST helper."""
self.daily_api_key = daily_api_key
self.daily_api_url = daily_api_url
self.aiohttp_session = aiohttp_session
@@ -169,7 +266,7 @@ class DailyRESTHelper:
Exception: If room creation fails or response is invalid
"""
headers = {"Authorization": f"Bearer {self.daily_api_key}"}
json = {**params.model_dump(exclude_none=True)}
json = params.model_dump(exclude_none=True)
async with self.aiohttp_session.post(
f"{self.daily_api_url}/rooms", headers=headers, json=json
) as r:
@@ -187,7 +284,11 @@ class DailyRESTHelper:
return room
async def get_token(
self, room_url: str, expiry_time: float = 60 * 60, owner: bool = True
self,
room_url: str,
expiry_time: float = 60 * 60,
owner: bool = True,
params: Optional[DailyMeetingTokenParams] = None,
) -> str:
"""Generate a meeting token for user to join a Daily room.
@@ -195,6 +296,7 @@ class DailyRESTHelper:
room_url: Daily room URL
expiry_time: Token validity duration in seconds (default: 1 hour)
owner: Whether token has owner privileges
params: Parameters for creating a Daily meeting token
Returns:
str: Meeting token
@@ -207,12 +309,23 @@ class DailyRESTHelper:
"No Daily room specified. You must specify a Daily room in order a token to be generated."
)
expiration: float = time.time() + expiry_time
expiration: int = int(time.time() + expiry_time)
room_name = self.get_name_from_url(room_url)
headers = {"Authorization": f"Bearer {self.daily_api_key}"}
json = {"properties": {"room_name": room_name, "is_owner": owner, "exp": expiration}}
if params is None:
params = DailyMeetingTokenParams(
**{"properties": {"room_name": room_name, "is_owner": owner, "exp": expiration}}
)
else:
params.properties.room_name = room_name
params.properties.exp = int(expiration)
params.properties.is_owner = owner
json = params.model_dump(exclude_none=True)
async with self.aiohttp_session.post(
f"{self.daily_api_url}/meeting-tokens", headers=headers, json=json
) as r:

View File

@@ -22,8 +22,8 @@ pydantic~=2.8.2
pyloudnorm~=0.1.1
pyht~=0.1.4
python-dotenv~=1.0.1
resampy~=0.4.3
silero-vad~=5.1
soxr~=0.5.0
together~=1.2.7
transformers~=4.44.0
websockets~=13.1

0
tests/__init__.py Normal file
View File

View File

@@ -1,122 +1,70 @@
import asyncio
import doctest
import functools
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.frames.frames import (
AudioRawFrame,
EndFrame,
Frame,
ImageRawFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
TextFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.text_transformer import StatelessTextTransformer
from tests.utils import run_test
class TestDailyFrameAggregators(unittest.IsolatedAsyncioTestCase):
@unittest.skip("FIXME: This test is failing")
class TestSentenceAggregator(unittest.IsolatedAsyncioTestCase):
async def test_sentence_aggregator(self):
sentence = "Hello, world. How are you? I am fine"
expected_sentences = ["Hello, world.", " How are you?", " I am fine "]
aggregator = SentenceAggregator()
sentence = "Hello, world. How are you? I am fine!"
frames_to_send = []
for word in sentence.split(" "):
async for sentence in aggregator.process_frame(TextFrame(word + " ")):
self.assertIsInstance(sentence, TextFrame)
if isinstance(sentence, TextFrame):
self.assertEqual(sentence.text, expected_sentences.pop(0))
frames_to_send.append(TextFrame(text=word + " "))
async for sentence in aggregator.process_frame(EndFrame()):
if len(expected_sentences):
self.assertIsInstance(sentence, TextFrame)
if isinstance(sentence, TextFrame):
self.assertEqual(sentence.text, expected_sentences.pop(0))
else:
self.assertIsInstance(sentence, EndFrame)
expected_returned_frames = [TextFrame, TextFrame, TextFrame]
self.assertEqual(expected_sentences, [])
(received_down, _) = await run_test(aggregator, frames_to_send, expected_returned_frames)
assert received_down[-3].text == "Hello, world. "
assert received_down[-2].text == "How are you? "
assert received_down[-1].text == "I am fine! "
@unittest.skip("FIXME: This test is failing")
async def test_gated_accumulator(self):
class TestGatedAggregator(unittest.IsolatedAsyncioTestCase):
async def test_gated_aggregator(self):
gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame),
start_open=False,
)
frames = [
frames_to_send = [
LLMFullResponseStartFrame(),
TextFrame("Hello, "),
TextFrame("world."),
AudioRawFrame(b"hello"),
ImageRawFrame(b"image", (0, 0)),
AudioRawFrame(b"world"),
OutputAudioRawFrame(audio=b"hello", sample_rate=16000, num_channels=1),
OutputImageRawFrame(image=b"image", size=(0, 0), format="RGB"),
OutputAudioRawFrame(audio=b"world", sample_rate=16000, num_channels=1),
LLMFullResponseEndFrame(),
]
expected_output_frames = [
ImageRawFrame(b"image", (0, 0)),
LLMFullResponseStartFrame(),
TextFrame("Hello, "),
TextFrame("world."),
AudioRawFrame(b"hello"),
AudioRawFrame(b"world"),
LLMFullResponseEndFrame(),
expected_returned_frames = [
OutputImageRawFrame,
LLMFullResponseStartFrame,
TextFrame,
TextFrame,
OutputAudioRawFrame,
OutputAudioRawFrame,
LLMFullResponseEndFrame,
]
for frame in frames:
async for out_frame in gated_aggregator.process_frame(frame):
self.assertEqual(out_frame, expected_output_frames.pop(0))
self.assertEqual(expected_output_frames, [])
@unittest.skip("FIXME: This test is failing")
async def test_parallel_pipeline(self):
async def slow_add(sleep_time: float, name: str, x: str):
await asyncio.sleep(sleep_time)
return ":".join([x, name])
pipe1_annotation = StatelessTextTransformer(functools.partial(slow_add, 0.1, "pipe1"))
pipe2_annotation = StatelessTextTransformer(functools.partial(slow_add, 0.2, "pipe2"))
sentence_aggregator = SentenceAggregator()
add_dots = StatelessTextTransformer(lambda x: x + ".")
source = asyncio.Queue()
sink = asyncio.Queue()
pipeline = Pipeline(
[
ParallelPipeline([[pipe1_annotation], [sentence_aggregator, pipe2_annotation]]),
add_dots,
],
source,
sink,
(received_down, _) = await run_test(
gated_aggregator, frames_to_send, expected_returned_frames
)
frames = [TextFrame("Hello, "), TextFrame("world."), EndFrame()]
expected_output_frames: list[Frame] = [
TextFrame(text="Hello, :pipe1."),
TextFrame(text="world.:pipe1."),
TextFrame(text="Hello, world.:pipe2."),
EndFrame(),
]
for frame in frames:
await source.put(frame)
await pipeline.run_pipeline()
while not sink.empty():
frame = await sink.get()
self.assertEqual(frame, expected_output_frames.pop(0))
def load_tests(loader, tests, ignore):
"""Run doctests on the aggregators module."""
from pipecat.processors import aggregators
tests.addTests(doctest.DocTestSuite(aggregators))
return tests

View File

@@ -1,3 +1,9 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest

94
tests/test_filters.py Normal file
View File

@@ -0,0 +1,94 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import unittest
from pipecat.frames.frames import (
EndFrame,
Frame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.filters.frame_filter import FrameFilter
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
from tests.utils import EndTestFrame, run_test
class TestIdentifyFilter(unittest.IsolatedAsyncioTestCase):
async def test_identity(self):
filter = IdentityFilter()
frames_to_send = [UserStartedSpeakingFrame(), UserStoppedSpeakingFrame()]
expected_returned_frames = [UserStartedSpeakingFrame, UserStoppedSpeakingFrame]
await run_test(filter, frames_to_send, expected_returned_frames)
class TestFrameFilter(unittest.IsolatedAsyncioTestCase):
async def test_text_frame(self):
filter = FrameFilter(types=(TextFrame, EndTestFrame))
frames_to_send = [TextFrame(text="Hello Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(filter, frames_to_send, expected_returned_frames)
async def test_end_frame(self):
filter = FrameFilter(types=(EndFrame, EndTestFrame))
frames_to_send = [EndFrame()]
expected_returned_frames = [EndFrame]
await run_test(filter, frames_to_send, expected_returned_frames)
async def test_system_frame(self):
filter = FrameFilter(types=(EndTestFrame,))
frames_to_send = [UserStartedSpeakingFrame()]
expected_returned_frames = [UserStartedSpeakingFrame]
await run_test(filter, frames_to_send, expected_returned_frames)
class TestFunctionFilter(unittest.IsolatedAsyncioTestCase):
async def test_passthrough(self):
async def passthrough(frame: Frame):
return True
filter = FunctionFilter(filter=passthrough)
frames_to_send = [TextFrame(text="Hello Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(filter, frames_to_send, expected_returned_frames)
async def test_no_passthrough(self):
async def no_passthrough(frame: Frame):
return False
filter = FunctionFilter(filter=no_passthrough)
frames_to_send = [TextFrame(text="Hello Pipecat!")]
expected_returned_frames = [TextFrame]
try:
await asyncio.wait_for(
run_test(filter, frames_to_send, expected_returned_frames), timeout=0.5
)
assert False
except asyncio.TimeoutError:
pass
class TestWakeCheckFilter(unittest.IsolatedAsyncioTestCase):
async def test_no_wake_word(self):
filter = WakeCheckFilter(wake_phrases=["Hey, Pipecat"])
frames_to_send = [TranscriptionFrame(user_id="test", text="Phrase 1", timestamp="")]
expected_returned_frames = []
await run_test(filter, frames_to_send, expected_returned_frames)
async def test_wake_word(self):
filter = WakeCheckFilter(wake_phrases=["Hey, Pipecat"])
frames_to_send = [
TranscriptionFrame(user_id="test", text="Hey, Pipecat", timestamp=""),
TranscriptionFrame(user_id="test", text="Phrase 1", timestamp=""),
]
expected_returned_frames = [TranscriptionFrame, TranscriptionFrame]
(received_down, _) = await run_test(filter, frames_to_send, expected_returned_frames)
assert received_down[-1].text == "Phrase 1"

View File

@@ -93,7 +93,3 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
# This next one would fail with:
# AssertionError: ' H e l l o d e a r h u m a n' != 'Hello dear human'
# self.assertEqual(tma_out.messages[-1]["content"], self.expected_response)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,37 +0,0 @@
import asyncio
import unittest
import openai
import pyaudio
from dotenv import load_dotenv
from pipecat.frames.frames import AudioRawFrame, ErrorFrame
from pipecat.services.openai import OpenAITTSService
load_dotenv()
class TestWhisperOpenAIService(unittest.IsolatedAsyncioTestCase):
@unittest.skip("FIXME: This test is failing")
async def test_whisper_tts(self):
pa = pyaudio.PyAudio()
stream = pa.open(format=pyaudio.paInt16, channels=1, rate=24_000, output=True)
tts = OpenAITTSService(voice="nova")
async for frame in tts.run_tts("Hello, there. Nice to meet you, seems to work well"):
self.assertIsInstance(frame, AudioRawFrame)
stream.write(frame.audio)
await asyncio.sleep(0.5)
stream.stop_stream()
pa.terminate()
tts = OpenAITTSService(voice="invalid_voice")
with self.assertRaises(openai.BadRequestError):
async for frame in tts.run_tts("wont work"):
self.assertIsInstance(frame, ErrorFrame)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,116 +1,92 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import unittest
from unittest.mock import Mock
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.frames.frames import EndFrame, HeartbeatFrame, TextFrame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.processors.text_transformer import StatelessTextTransformer
from tests.utils import HeartbeatsObserver, run_test
class TestDailyPipeline(unittest.IsolatedAsyncioTestCase):
@unittest.skip("FIXME: This test is failing")
async def test_pipeline_simple(self):
aggregator = SentenceAggregator()
class TestPipeline(unittest.IsolatedAsyncioTestCase):
async def test_pipeline_single(self):
pipeline = Pipeline([IdentityFilter()])
outgoing_queue = asyncio.Queue()
incoming_queue = asyncio.Queue()
pipeline = Pipeline([aggregator], incoming_queue, outgoing_queue)
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(pipeline, frames_to_send, expected_returned_frames)
await incoming_queue.put(TextFrame("Hello, "))
await incoming_queue.put(TextFrame("world."))
await incoming_queue.put(EndFrame())
async def test_pipeline_multiple(self):
identity1 = IdentityFilter()
identity2 = IdentityFilter()
identity3 = IdentityFilter()
await pipeline.run_pipeline()
pipeline = Pipeline([identity1, identity2, identity3])
self.assertEqual(await outgoing_queue.get(), TextFrame("Hello, world."))
self.assertIsInstance(await outgoing_queue.get(), EndFrame)
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(pipeline, frames_to_send, expected_returned_frames)
@unittest.skip("FIXME: This test is failing")
async def test_pipeline_multiple_stages(self):
sentence_aggregator = SentenceAggregator()
to_upper = StatelessTextTransformer(lambda x: x.upper())
add_space = StatelessTextTransformer(lambda x: x + " ")
outgoing_queue = asyncio.Queue()
incoming_queue = asyncio.Queue()
pipeline = Pipeline(
[add_space, sentence_aggregator, to_upper], incoming_queue, outgoing_queue
class TestParallelPipeline(unittest.IsolatedAsyncioTestCase):
async def test_parallel_single(self):
pipeline = ParallelPipeline([IdentityFilter()])
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(pipeline, frames_to_send, expected_returned_frames)
async def test_parallel_multiple(self):
"""Should only passthrough one instance of TextFrame."""
pipeline = ParallelPipeline([IdentityFilter()], [IdentityFilter()])
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(pipeline, frames_to_send, expected_returned_frames)
class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
async def test_task_single(self):
pipeline = Pipeline([IdentityFilter()])
task = PipelineTask(pipeline)
await task.queue_frame(TextFrame(text="Hello!"))
await task.queue_frames([TextFrame(text="Bye!"), EndFrame()])
await task.run()
assert task.has_finished()
async def test_task_heartbeats(self):
heartbeats_counter = 0
async def heartbeat_received(processor: FrameProcessor, heartbeat: HeartbeatFrame):
nonlocal heartbeats_counter
heartbeats_counter += 1
identity = IdentityFilter()
pipeline = Pipeline([identity])
heartbeats_observer = HeartbeatsObserver(
target=identity, heartbeat_callback=heartbeat_received
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_heartbeats=True, heartbeats_period_secs=0.2, observers=[heartbeats_observer]
),
)
sentence = "Hello, world. It's me, a pipeline."
for c in sentence:
await incoming_queue.put(TextFrame(c))
await incoming_queue.put(EndFrame())
expected_heartbeats = 1.0 / 0.2
await pipeline.run_pipeline()
self.assertEqual(await outgoing_queue.get(), TextFrame("H E L L O , W O R L D ."))
self.assertEqual(
await outgoing_queue.get(),
TextFrame(" I T ' S M E , A P I P E L I N E ."),
)
# leftover little bit because of the spacing
self.assertEqual(
await outgoing_queue.get(),
TextFrame(" "),
)
self.assertIsInstance(await outgoing_queue.get(), EndFrame)
class TestLogFrame(unittest.TestCase):
class MockProcessor(FrameProcessor):
def __init__(self, name):
self.name = name
def __str__(self):
return self.name
def setUp(self):
self.processor1 = self.MockProcessor("processor1")
self.processor2 = self.MockProcessor("processor2")
self.pipeline = Pipeline(processors=[self.processor1, self.processor2])
self.pipeline._name = "MyClass"
self.pipeline._logger = Mock()
@unittest.skip("FIXME: This test is failing")
def test_log_frame_from_source(self):
frame = Mock(__class__=Mock(__name__="MyFrame"))
self.pipeline._log_frame(frame, depth=1)
self.pipeline._logger.debug.assert_called_once_with(
"MyClass source -> MyFrame -> processor1"
)
@unittest.skip("FIXME: This test is failing")
def test_log_frame_to_sink(self):
frame = Mock(__class__=Mock(__name__="MyFrame"))
self.pipeline._log_frame(frame, depth=3)
self.pipeline._logger.debug.assert_called_once_with(
"MyClass processor2 -> MyFrame -> sink"
)
@unittest.skip("FIXME: This test is failing")
def test_log_frame_repeated_log(self):
frame = Mock(__class__=Mock(__name__="MyFrame"))
self.pipeline._log_frame(frame, depth=2)
self.pipeline._logger.debug.assert_called_once_with(
"MyClass processor1 -> MyFrame -> processor2"
)
self.pipeline._log_frame(frame, depth=2)
self.pipeline._logger.debug.assert_called_with("MyClass ... repeated")
@unittest.skip("FIXME: This test is failing")
def test_log_frame_reset_repeated_log(self):
frame1 = Mock(__class__=Mock(__name__="MyFrame1"))
frame2 = Mock(__class__=Mock(__name__="MyFrame2"))
self.pipeline._log_frame(frame1, depth=2)
self.pipeline._logger.debug.assert_called_once_with(
"MyClass processor1 -> MyFrame1 -> processor2"
)
self.pipeline._log_frame(frame1, depth=2)
self.pipeline._logger.debug.assert_called_with("MyClass ... repeated")
self.pipeline._log_frame(frame2, depth=2)
self.pipeline._logger.debug.assert_called_with(
"MyClass processor1 -> MyFrame2 -> processor2"
)
await task.queue_frame(TextFrame(text="Hello!"))
try:
await asyncio.wait_for(task.run(), timeout=1.0)
except asyncio.TimeoutError:
pass
assert heartbeats_counter == expected_heartbeats

View File

@@ -1,6 +1,16 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.frames.frames import AudioRawFrame, TextFrame, TranscriptionFrame
from pipecat.frames.frames import (
OutputAudioRawFrame,
TextFrame,
TranscriptionFrame,
)
from pipecat.serializers.protobuf import ProtobufFrameSerializer
@@ -8,22 +18,19 @@ class TestProtobufFrameSerializer(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.serializer = ProtobufFrameSerializer()
@unittest.skip("FIXME: This test is failing")
async def test_roundtrip(self):
text_frame = TextFrame(text="hello world")
frame = self.serializer.deserialize(self.serializer.serialize(text_frame))
self.assertEqual(frame, TextFrame(text="hello world"))
self.assertEqual(text_frame, frame)
transcription_frame = TranscriptionFrame(
text="Hello there!", participantId="123", timestamp="2021-01-01"
text="Hello there!", user_id="123", timestamp="2021-01-01"
)
frame = self.serializer.deserialize(self.serializer.serialize(transcription_frame))
self.assertEqual(frame, transcription_frame)
audio_frame = AudioRawFrame(data=b"1234567890")
audio_frame = OutputAudioRawFrame(audio=b"1234567890", sample_rate=16000, num_channels=1)
frame = self.serializer.deserialize(self.serializer.serialize(audio_frame))
self.assertEqual(frame, audio_frame)
if __name__ == "__main__":
unittest.main()
self.assertEqual(frame.audio, audio_frame.audio)
self.assertEqual(frame.sample_rate, audio_frame.sample_rate)
self.assertEqual(frame.num_channels, audio_frame.num_channels)

View File

@@ -1,28 +1,15 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from typing import AsyncGenerator
from pipecat.frames.frames import EndFrame, Frame, TextFrame
from pipecat.services.ai_services import AIService, match_endofsentence
from pipecat.utils.string import match_endofsentence
class SimpleAIService(AIService):
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
yield frame
class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
async def test_simple_processing(self):
service = SimpleAIService()
input_frames = [TextFrame("hello"), EndFrame()]
output_frames = []
for input_frame in input_frames:
async for output_frame in service.process_frame(input_frame):
output_frames.append(output_frame)
self.assertEqual(input_frames, output_frames)
class TestUtilsString(unittest.IsolatedAsyncioTestCase):
async def test_endofsentence(self):
assert match_endofsentence("This is a sentence.")
assert match_endofsentence("This is a sentence! ")
@@ -51,7 +38,3 @@ class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
for i in chinese_sentences:
assert match_endofsentence(i)
assert not match_endofsentence("你好,")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,3 +1,9 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
# import asyncio
# import unittest
# from unittest.mock import AsyncMock, patch, Mock

120
tests/utils.py Normal file
View File

@@ -0,0 +1,120 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Callable, Sequence, Tuple
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
ControlFrame,
Frame,
HeartbeatFrame,
StartFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@dataclass
class EndTestFrame(ControlFrame):
pass
class HeartbeatsObserver(BaseObserver):
def __init__(
self,
*,
target: FrameProcessor,
heartbeat_callback: Callable[[FrameProcessor, HeartbeatFrame], Awaitable[None]],
):
self._target = target
self._callback = heartbeat_callback
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
if src == self._target and isinstance(frame, HeartbeatFrame):
await self._callback(self._target, frame)
class QueuedFrameProcessor(FrameProcessor):
def __init__(self, queue: asyncio.Queue, ignore_start: bool = True):
super().__init__()
self._queue = queue
self._ignore_start = ignore_start
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self._ignore_start and isinstance(frame, StartFrame):
return
await self._queue.put(frame)
async def run_test(
processor: FrameProcessor,
frames_to_send: Sequence[Frame],
expected_down_frames: Sequence[type],
expected_up_frames: Sequence[type] = [],
) -> Tuple[Sequence[Frame], Sequence[Frame]]:
received_up = asyncio.Queue()
received_down = asyncio.Queue()
up_processor = QueuedFrameProcessor(received_up)
down_processor = QueuedFrameProcessor(received_down)
up_processor.link(processor)
processor.link(down_processor)
await processor.queue_frame(StartFrame(clock=SystemClock()))
for frame in frames_to_send:
await processor.process_frame(frame, FrameDirection.DOWNSTREAM)
await processor.queue_frame(EndTestFrame())
await processor.queue_frame(EndTestFrame(), FrameDirection.UPSTREAM)
#
# Down frames
#
received_down_frames: Sequence[Frame] = []
running = True
while running:
frame = await received_down.get()
running = not isinstance(frame, EndTestFrame)
if running:
received_down_frames.append(frame)
print("received DOWN frames =", received_down_frames)
assert len(received_down_frames) == len(expected_down_frames)
for real, expected in zip(received_down_frames, expected_down_frames):
assert isinstance(real, expected)
#
# Up frames
#
received_up_frames: Sequence[Frame] = []
running = True
while running:
frame = await received_up.get()
running = not isinstance(frame, EndTestFrame)
if running:
received_up_frames.append(frame)
print("received UP frames =", received_up_frames)
assert len(received_up_frames) == len(expected_up_frames)
for real, expected in zip(received_up_frames, expected_up_frames):
assert isinstance(real, expected)
return (received_down_frames, received_up_frames)