Compare commits

..

69 Commits

Author SHA1 Message Date
James Hush
230d92850a example: realtime with transcripts 2025-02-26 16:29:07 +08:00
Aleix Conchillo Flaqué
96c6aeaada Merge pull request #1295 from pipecat-ai/aleix/pipelinetask-keyword-arguments
PipelineTask: force constructor keyword arguments
2025-02-25 19:00:58 -08:00
Aleix Conchillo Flaqué
6722aae598 PipelineTask: force constructor keyword arguments 2025-02-25 18:58:47 -08:00
Aleix Conchillo Flaqué
66564392a6 Merge pull request #1293 from pipecat-ai/aleix/log-pipecat-version
log pipecat version on application startup
2025-02-25 18:57:52 -08:00
Aleix Conchillo Flaqué
f258f5ab66 Merge pull request #1292 from pipecat-ai/aleix/audiocontext-terminate-nicely
AudioContextWordTTSService: wait for all requested audio
2025-02-25 18:56:41 -08:00
Aleix Conchillo Flaqué
f8f0578c3d log pipecat version on application startup 2025-02-25 18:55:45 -08:00
Aleix Conchillo Flaqué
aa60a413f3 Merge pull request #1294 from pipecat-ai/aleix/improve-test-requirements
improve test-requirements.txt
2025-02-25 18:55:18 -08:00
Aleix Conchillo Flaqué
3e66f2378d improve test-requirements.txt 2025-02-25 17:34:33 -08:00
Aleix Conchillo Flaqué
9a50f33e36 AudioContextWordTTSService: wait for all requested audio 2025-02-25 15:35:47 -08:00
Aleix Conchillo Flaqué
4bd5e9c0a7 Merge pull request #1285 from pipecat-ai/aleix/handle-stop-task-gracefully
handle stop task gracefully
2025-02-25 11:25:38 -08:00
Mark Backman
12092c8715 Merge pull request #1288 from pipecat-ai/mb/clean-up-tts-text-input
TTSService: Remove newlines before sending text to TTS service to gen…
2025-02-25 14:00:43 -05:00
Mark Backman
92cc6d39f2 TTSService: Remove newlines before sending text to TTS service to generate 2025-02-25 13:37:25 -05:00
Aleix Conchillo Flaqué
34a50033cb tk: use TkTransportParams in examples 2025-02-25 10:24:24 -08:00
Aleix Conchillo Flaqué
e60b65228b allow multiple StartFrames 2025-02-25 10:24:04 -08:00
Mark Backman
e74864335b Merge pull request #1287 from pipecat-ai/mb/30-observer-pipeline-task
Example 30: Move observers to PipelineTask
2025-02-25 12:11:23 -05:00
Mark Backman
27a088a457 Merge pull request #1286 from pipecat-ai/mb/update-grok-2
Set grok-2 as default model for GrokLLMSService
2025-02-25 12:11:09 -05:00
Mark Backman
cfe72143b8 Example 30: Move observers to PipelineTask 2025-02-25 10:54:25 -05:00
Mark Backman
36a729cbfe Set grok-2 as default model for GrokLLMSService 2025-02-25 10:00:45 -05:00
Aleix Conchillo Flaqué
d2f006682c introduce new BaseTaskManager 2025-02-24 23:38:51 -08:00
Aleix Conchillo Flaqué
fb7fe540f5 tts: don't connect to websocket if already connected 2025-02-24 23:38:51 -08:00
Aleix Conchillo Flaqué
1ec68bd071 make sure we don't create tasks if already created 2025-02-24 23:38:51 -08:00
Aleix Conchillo Flaqué
4536d03e82 FrameProcessor: cancel input/push tasks on CancelFrame 2025-02-24 23:38:51 -08:00
Aleix Conchillo Flaqué
699704732c asyncio: re-raise CancelledError in wait_for_task() 2025-02-24 23:38:51 -08:00
Aleix Conchillo Flaqué
376d969a77 task: handle StopFrame and StopTaskFrame gracefully 2025-02-24 23:38:51 -08:00
Aleix Conchillo Flaqué
68789dfcf0 frames: add new StopFrame 2025-02-24 21:34:23 -08:00
Aleix Conchillo Flaqué
fe9fc61c4e Merge pull request #1282 from pipecat-ai/aleix/pipelinetask-observers-constructor
PipelineTask: pass observers in contructor parameter
2025-02-24 21:29:46 -08:00
Aleix Conchillo Flaqué
6028f0f23a PipelineTask: pass observers in contructor parameter 2025-02-24 21:29:17 -08:00
Aleix Conchillo Flaqué
e9a0959e28 Merge pull request #1283 from pipecat-ai/aleix/check-dangling-tasks
PipelineTask: add check_dangling_tasks parameter
2025-02-24 21:26:32 -08:00
Dominic Stewart
f66be2cfa7 Dom/gemini system prompt switching (#1260)
* Updated example to use Gemini

* Fixed typo

* Based on feedback, made the gemini file something that can be called separately

* Updated the readme

* Updated the readme

* Changed example to use gemini 2.0 flash lite

* This works

* Improvement

* I think this works

* Updated the code to use the correct prompt broken down into smaller pieces

* Added a few more things to detect in the prompt

* Fixed import ordering

* Updated prompt for non gemini bot to look for more voicemail examples, plus added logic to detect if we're doing dialin or not to avoid a non-fatal dialin related error

* moved terminate call to handlers class

* Simplified logic for dialin

* Forgot to use the same logic for the openai bot

* Starting to add logic for native audio input for flash lite

* Fixed logic

* Fixed some code based on suggestions
2025-02-24 22:29:55 -06:00
Aleix Conchillo Flaqué
f818bed58f Merge pull request #1281 from pipecat-ai/aleix/google-context-aggregator-upgrade-context
google: updgrade OpenAILLMContext to GoogleLLMContext
2025-02-24 17:37:26 -08:00
Aleix Conchillo Flaqué
07b9be5308 PipelineTask: add check_dangling_tasks parameter 2025-02-24 17:33:10 -08:00
Aleix Conchillo Flaqué
40c2452d6e google: updgrade OpenAILLMContext to GoogleLLMContext 2025-02-24 15:35:18 -08:00
Aleix Conchillo Flaqué
30cdd1b71a Merge pull request #1280 from pipecat-ai/aleix/add-completion-timeout
services(llm): add on_completion_timeout event
2025-02-24 15:07:20 -08:00
Aleix Conchillo Flaqué
2110b79507 services(llm): add on_completion_timeout event 2025-02-24 14:55:36 -08:00
Aleix Conchillo Flaqué
fc544fa61c Merge pull request #1272 from pipecat-ai/aleix/tts-websocket-interruptions
services: fix some TTS websocket service interruption handling
2025-02-24 14:54:41 -08:00
Mark Backman
976fe95304 Merge pull request #1279 from pipecat-ai/mb/remove-open-optional-dep
Remove `openai` optional dependency from services as it's now required
2025-02-24 17:42:53 -05:00
Aleix Conchillo Flaqué
408270b647 lmnt: don't send "eof" before closing the socket 2025-02-24 14:37:37 -08:00
Mark Backman
1dfb75bc9d Merge pull request #1278 from pipecat-ai/mb/claude-3-7
Update AnthropicLLMService to use claude-3-7-sonnet-20250219 by default
2025-02-24 15:41:28 -05:00
Mark Backman
cefc2a1088 Fix test-requirements.text ordering 2025-02-24 15:06:13 -05:00
Mark Backman
3b9b9200ea Remove openai optional dependency from services as it's now required 2025-02-24 15:05:42 -05:00
Mark Backman
d6f29a0f4b Update AnthropicLLMService to use claude-3-7-sonnet-20250219 by default 2025-02-24 14:32:00 -05:00
Aleix Conchillo Flaqué
5b762d11ef Merge pull request #1228 from CarlKho-Minerva/main
Missing Cartesia~=1.3.1 → `test-requirements`
2025-02-24 08:47:41 -08:00
Aleix Conchillo Flaqué
2f3e2da6b9 Merge pull request #1259 from pipecat-ai/openai-not-optional
Since the `openai` package is used by pretty much everything in pipec…
2025-02-24 08:45:45 -08:00
allenmylath
45058d4a94 Update audio_buffer_processor.py (#1266) 2025-02-24 08:41:19 -08:00
Aleix Conchillo Flaqué
5b637bd826 services: fix some TTS websocket service interruption handling 2025-02-24 08:37:22 -08:00
Mark Backman
2d4fd7e903 Merge pull request #1274 from pipecat-ai/mb/add-ellipsis-test
Add one additional ellipsis test to test_utils_string
2025-02-23 11:26:20 -05:00
Mark Backman
b5662520aa Add one additional ellipsis test to test_utils_string 2025-02-23 11:04:24 -05:00
Aleix Conchillo Flaqué
af45c170b5 Merge pull request #1264 from pipecat-ai/aleix/add-log-observers
add initial log observers
2025-02-21 15:20:45 -08:00
Aleix Conchillo Flaqué
65f548b2ec examples(30-observer): update to use LLMLogObserver 2025-02-21 15:15:16 -08:00
Aleix Conchillo Flaqué
b29ab8c608 observers: add LLMLogObserver and TranscriptionLogObserver 2025-02-21 15:15:16 -08:00
Aleix Conchillo Flaqué
d6dc37f0b6 Merge pull request #1269 from pipecat-ai/aleix/endofsentence-support-ellipses
utils: add support for ellipses in match_endofsentence()
2025-02-21 15:08:22 -08:00
Aleix Conchillo Flaqué
12bce2e8c0 utils: add support for ellipses in match_endofsentence() 2025-02-21 15:05:50 -08:00
Aleix Conchillo Flaqué
4acf7296e0 Merge pull request #1261 from pipecat-ai/aleix/emualted-frames-being-triggered-prematurely
LLMUserContextAggregator: don't reset timer with interim transcription
2025-02-21 10:15:28 -08:00
Aleix Conchillo Flaqué
98706d429c LLMUserContextAggregator: make sure incoming transcription has text 2025-02-21 10:12:54 -08:00
Aleix Conchillo Flaqué
41720b1a13 LLMUserContextAggregator: don't reset timer with interim transcription
It turns out that in some cases we only get interim transcriptions (e.g. someone
is speaking very very softly or someone is talking in the background). In those
cases we don't want to interrupt the bot because there's really nothing to
interrupt the bot for.

We originally thought we should interrupt the bot right at the time we got an
interim frame, but this is causing too many false positives. It's actually
better to simply wait for a real transcription before interrupting (in case VAD
didn't interrupt).
2025-02-21 09:05:56 -08:00
Aleix Conchillo Flaqué
3ef4245166 Merge pull request #1265 from pipecat-ai/aleix/transport-remove-audio-out-is-live 2025-02-21 06:51:09 -08:00
Filipi da Silva Fuchter
3bb0797922 Merge pull request #1257 from pipecat-ai/fastapi_disconnect_issue
Fixed an issue where FastAPI was not triggering on_client_disconnected.
2025-02-21 09:15:15 -03:00
Filipi Fuchter
7c7b4c52af Fixed an issue where EndTaskFrame was not triggering on_client_disconnected or closing the WebSocket in FastAPI. 2025-02-21 09:11:58 -03:00
Aleix Conchillo Flaqué
01f083b7fc transports: remove TransportParams.audio_out_is_live 2025-02-20 23:33:06 -08:00
Aleix Conchillo Flaqué
91fcaebe25 Merge pull request #1263 from Vaibhav159/vl_fix_deepgram_sample_rate_mismatch
fixing deepgram mismatch
2025-02-20 22:39:06 -08:00
Vaibhav159
9c5fe5c85e fixing deepgram mismatch 2025-02-21 09:32:40 +05:30
Aleix Conchillo Flaqué
7e5e167a4b Merge pull request #1250 from pipecat-ai/aleix/context-aggregation-simulatenous-text-tools
AssistantContextAggregator: append aggregation and tools in the same turn
2025-02-20 17:32:57 -08:00
Aleix Conchillo Flaqué
d04c4b36f3 AssistantContextAggregator: append aggregation and tools in the same turn 2025-02-20 17:29:43 -08:00
Aleix Conchillo Flaqué
a811e53626 Merge pull request #1253 from pipecat-ai/aleix/http-tts-services-stopped-frame
HTTP TTS services stopped frame
2025-02-20 17:28:05 -08:00
Paul Kompfner
df57202a05 Since the openai package is used by pretty much everything in pipecat (due to OpenAILLMContext being the standard context representation), let's make it a non-optional dependency.
This change solves an issue faced by users who aren't intending to use OpenAI getting scary error messages saying that they need the `openai` optional dependency "in order to use OpenAI", along with an instruction to set the OPENAI_API_KEY environment variable.

Note that with this change we could theoretically remove from pyproject.toml a number of defined optional dependencies that list only the `openai` package as a dependency (like `deepseek`, for example), but I didn't want to "break the API" in terms of how users install/consume pipecat and its set of built-in services.

Finally, I removed the `python-deepcompare` dependency from the `openai` optional dependency, since it appears to me like it was added by mistake (my guess is it was used for debugging during development and then never removed).
2025-02-20 15:21:35 -05:00
Aleix Conchillo Flaqué
69e6f3fdb7 rime: pass aiohttp session to constructor 2025-02-20 07:36:24 -08:00
Aleix Conchillo Flaqué
6809254963 tts: fix metrics and TTSStoppedFrame frame in HTTP services
Fixes #1247
2025-02-20 07:36:21 -08:00
Carl Kho
a5cdd5f1b8 Add Cartesia API key to dot-env.template 2025-02-14 21:29:37 -08:00
Carl Kho
5f937b8479 Update test requirements to include Cartesia version 1.3.1 2025-02-14 21:14:32 -08:00
161 changed files with 1269 additions and 702 deletions

View File

@@ -5,10 +5,35 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
## [Unreleased]
### Added
- Pipecat version will now be logged on every application startup. This will
help us identify what version we are running in case of any issues.
- Added a new `StopFrame` which can be used to stop a pipeline task while
keeping the frame processors running. The frame processors could then be used
in a different pipeline. The difference between a `StopFrame` and a
`StopTaskFrame` is that, as with `EndFrame` and `EndTaskFrame`, the
`StopFrame` is pushed from the task and the `StopTaskFrame` is pushed upstream
inside the pipeline by any processor.
- Added a new `PipelineTask` parameter `observers` that replaces the previous
`PipelineParams.observers`.
- Added a new `PipelineTask` parameter `check_dangling_tasks` to enable or
disable checking for frame processors' dangling tasks when the Pipeline
finishes running.
- Added new `on_completion_timeout` event for LLM services (all OpenAI-based
services, Anthropic and Google). Note that this event will only get triggered
if LLM timeouts are setup and if the timeout was reached. It can be useful to
retrigger another completion and see if the timeout was just a blip.
- Added new log observers `LLMLogObserver` and `TranscriptionLogObserver` that
can be useful for debugging your pipelines.
- Added `room_url` property to `DailyTransport`.
- Added `addons` argument to `DeepgramSTTService`.
@@ -17,6 +42,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- ⚠️ `PipelineTask` now requires keyword arguments (except for the first one for
the pipeline).
- The base `TTSService` class now strips leading newlines before sending text
to the TTS provider. This change is to solve issues where some TTS providers,
like Azure, would not output text due to newlines.
- `GrokLLMSService` now uses `grok-2` as the default model.
- `AnthropicLLMService` now uses `claude-3-7-sonnet-20250219` as the default
model.
- `RimeHttpTTSService` needs an `aiohttp.ClientSession` to be passed to the
constructor as all the other HTTP-based services.
- `RimeHttpTTSService` doesn't use a default voice anymore.
- `DeepgramSTTService` now uses the new `nova-3` model by default. If you want
to use the previous model you can pass `LiveOptions(model="nova-2-general")`.
(see https://deepgram.com/learn/introducing-nova-3-speech-to-text-api)
@@ -25,8 +67,47 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
```
### Deprecated
- `PipelineParams.observers` is now deprecated, you the new `PipelineTask`
parameter `observers`.
### Removed
- Remove `TransportParams.audio_out_is_live` since it was not being used at all.
### Fixed
- Fixed an `AudioContextWordTTSService` issue that would cause an `EndFrame` to
disconnect from the TTS service before audio from all the contexts was
received. This affected services like Cartesia and Rime.
- Fixed an issue that was not allowing to pass an `OpenAILLMContext` to create
`GoogleLLMService`'s context aggregators.
- Fixed a `ElevenLabsTTSService`, `FishAudioTTSService`, `LMNTTTSService` and
`PlayHTTTSService` issue that was resulting in audio requested before an
interruption being played after an interruption.
- Fixed `match_endofsentence` support for ellipses.
- Fixed an issue that would cause undesired interruptions via
`EmulateUserStartedSpeakingFrame` when only interim transcriptions (i.e. no
final transcriptions) where received.
- Fixed an issue where `EndTaskFrame` was not triggering
`on_client_disconnected` or closing the WebSocket in FastAPI.
- Fixed an issue in `DeepgramSTTService` where the `sample_rate` passed to the
`LiveOptions` was not being used, causing the service to use the default
sample rate of pipeline.
- Fixed a context aggregator issue that would not append the LLM text response
to the context if a function call happened in the same LLM turn.
- Fixed an issue that was causing HTTP TTS services to push `TTSStoppedFrame`
more than once.
- Fixed a `FishAudioTTSService` issue where `TTSStoppedFrame` was not being
pushed.

View File

@@ -3,10 +3,10 @@ coverage~=7.6.12
grpcio-tools~=1.67.1
pip-tools~=7.4.1
pre-commit~=4.0.1
pyright~=1.1.393
pyright~=1.1.394
pytest~=8.3.4
pytest-asyncio~=0.25.2
ruff~=0.9.5
pytest-asyncio~=0.25.3
ruff~=0.9.7
setuptools~=70.0.0
setuptools_scm~=8.1.0
python-dotenv~=1.0.1

View File

@@ -18,6 +18,9 @@ AZURE_DALLE_API_KEY=...
AZURE_DALLE_ENDPOINT=https://...
AZURE_DALLE_MODEL=...
# Cartesia
CARTESIA_API_KEY=...
# Daily
DAILY_API_KEY=...
DAILY_SAMPLE_ROOM_URL=https://...

View File

@@ -17,7 +17,7 @@ from runner import configure
from pipecat.frames.frames import AudioRawFrame, EndFrame, OutputAudioRawFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport

View File

@@ -119,7 +119,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -124,7 +124,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):

View File

@@ -70,7 +70,7 @@ async def main(room_url: str, token: str):
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -62,7 +62,7 @@ async def main(room_url: str, token: str):
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -18,8 +18,7 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.fal import FalImageGenService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
load_dotenv(override=True)
@@ -34,7 +33,9 @@ async def main():
transport = TkLocalTransport(
tk_root,
TransportParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
TkTransportParams(
camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024
),
)
imagegen = FalImageGenService(

View File

@@ -44,7 +44,8 @@ async def main():
runner = PipelineRunner()
task = PipelineTask(
Pipeline([imagegen, transport.output()]), PipelineParams(enable_metrics=True)
Pipeline([imagegen, transport.output()]),
params=PipelineParams(enable_metrics=True),
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -30,8 +30,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
load_dotenv(override=True)
@@ -152,7 +151,7 @@ async def main():
transport = TkLocalTransport(
tk_root,
TransportParams(
TkTransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,

View File

@@ -105,7 +105,10 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(enable_metrics=True, enable_usage_metrics=True),
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -127,7 +127,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -76,7 +76,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -79,7 +79,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -103,7 +103,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -75,7 +75,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -77,7 +77,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -83,7 +83,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -75,7 +75,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -80,7 +80,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -71,7 +71,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -88,7 +88,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -79,7 +79,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -80,7 +80,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -76,7 +76,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -251,7 +251,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -78,7 +78,11 @@ async def main():
runner = PipelineRunner()
task = PipelineTask(
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
pipeline,
params=PipelineParams(
audio_in_sample_rate=24000,
audio_out_sample_rate=24000,
),
)
await runner.run(task)

View File

@@ -24,8 +24,7 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -67,7 +66,7 @@ async def main():
tk_transport = TkLocalTransport(
tk_root,
TransportParams(
TkTransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
@@ -83,7 +82,11 @@ async def main():
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
task = PipelineTask(
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
pipeline,
params=PipelineParams(
audio_in_sample_rate=24000,
audio_out_sample_rate=24000,
),
)
async def run_tk():

View File

@@ -76,7 +76,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -112,7 +112,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -99,7 +99,13 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -153,7 +153,13 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -152,7 +152,7 @@ indicate you should use the get_image tool are:
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -116,7 +116,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -113,7 +113,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -117,7 +117,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -116,7 +116,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -116,7 +116,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -123,7 +123,7 @@ Start by asking me for my location. Then, use 'get_weather_current' to give me a
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -123,7 +123,7 @@ Start by asking me for my location. Then, use 'get_weather_current' to give me a
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -117,7 +117,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -83,7 +83,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -133,7 +133,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -126,7 +126,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -85,7 +85,13 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
# When a participant joins, start transcription for that participant so the
# bot can "hear" and respond to them.

View File

@@ -108,7 +108,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,

View File

@@ -38,7 +38,6 @@ async def main():
"GStreamer",
DailyParams(
audio_out_enabled=True,
audio_out_is_live=True,
camera_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,

View File

@@ -16,10 +16,13 @@ from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAIRealtimeBetaLLMService,
@@ -140,21 +143,29 @@ Remember, your responses should be short. Just one or two sentences, usually."""
tools,
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Create transcript processor and handler
transcript = TranscriptProcessor()
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
transcript.user(), # User transcripts
context_aggregator.user(),
llm, # LLM
context_aggregator.assistant(),
transcript.assistant(), # Assistant transcripts
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
@@ -162,9 +173,16 @@ Remember, your responses should be short. Just one or two sentences, usually."""
),
)
# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
for msg in frame.messages:
logger.debug(msg)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -212,7 +212,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -237,7 +237,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -209,7 +209,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -263,7 +263,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -87,7 +87,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
# We just use 16000 because that's what Tavus is expecting and
# we avoid resampling.
audio_in_sample_rate=16000,

View File

@@ -145,7 +145,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -138,6 +138,7 @@ class OutputGate(FrameProcessor):
self._gate_open = start_open
self._frames_buffer = []
self._notifier = notifier
self._gate_task = None
def close_gate(self):
self._gate_open = False
@@ -178,10 +179,13 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
self._gate_task = self.create_task(self._gate_task_handler())
if not self._gate_task:
self._gate_task = self.create_task(self._gate_task_handler())
async def _stop(self):
await self.cancel_task(self._gate_task)
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
async def _gate_task_handler(self):
while True:
@@ -351,7 +355,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -342,6 +342,7 @@ class OutputGate(FrameProcessor):
self._gate_open = start_open
self._frames_buffer = []
self._notifier = notifier
self._gate_task = None
def close_gate(self):
self._gate_open = False
@@ -382,10 +383,13 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
self._gate_task = self.create_task(self._gate_task_handler())
if not self._gate_task:
self._gate_task = self.create_task(self._gate_task_handler())
async def _stop(self):
await self.cancel_task(self._gate_task)
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
async def _gate_task_handler(self):
while True:
@@ -560,7 +564,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -25,10 +25,8 @@ from pipecat.frames.frames import (
InputAudioRawFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TextFrame,
TranscriptionFrame,
@@ -555,6 +553,7 @@ class OutputGate(FrameProcessor):
self._notifier = notifier
self._context = context
self._transcription_buffer = user_transcription_buffer
self._gate_task = None
def close_gate(self):
self._gate_open = False
@@ -602,10 +601,13 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
self._gate_task = self.create_task(self._gate_task_handler())
if not self._gate_task:
self._gate_task = self.create_task(self._gate_task_handler())
async def _stop(self):
await self.cancel_task(self._gate_task)
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
async def _gate_task_handler(self):
while True:
@@ -740,7 +742,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -87,7 +87,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -122,7 +122,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -354,7 +354,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -63,7 +63,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -89,7 +89,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -120,7 +120,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -79,7 +79,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -106,7 +106,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024, Daily
# Copyright (c) 2024-2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
@@ -34,7 +34,7 @@ search_tool = {"google_search": {}}
tools = [search_tool]
system_instruction = """
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
Always use the google search API to retrieve the latest news. You must also use it to check which day is today.
@@ -93,7 +93,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -83,7 +83,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),

View File

@@ -150,7 +150,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -150,7 +150,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -178,7 +178,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -18,12 +18,10 @@ from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartInterruptionFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -73,38 +71,6 @@ class DebugObserver(BaseObserver):
logger.info(f"🤖 BOT STOP SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
class LLMLogObserver(BaseObserver):
"""Observer to log LLM activity to the console.
Logs all frame instances of:
- LLMFullResponseStartFrame (only from LLM service)
- LLMTextFrame
- LLMFullResponseEndFrame (only from LLM service)
This allows you to track when the LLM starts responding, what it generates, and when it finishes.
Log format: [LLM EVENT]: [details] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
time_sec = timestamp / 1_000_000_000
# Only log start/end frames from OpenAILLMService
if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
if isinstance(src, OpenAILLMService):
event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
logger.info(f"🧠 LLM {event} RESPONSE at {time_sec:.2f}s")
# Log all LLMTextFrames
elif isinstance(frame, LLMTextFrame):
logger.info(f"🧠 LLM GENERATING: {frame.text!r} at {time_sec:.2f}s")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
@@ -151,13 +117,13 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
observers=[DebugObserver(), LLMLogObserver()],
),
observers=[DebugObserver(), LLMLogObserver()],
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -32,7 +32,7 @@ async def main():
pipeline = Pipeline([NullProcessor()])
task = PipelineTask(pipeline, PipelineParams(enable_heartbeats=True))
task = PipelineTask(pipeline, params=PipelineParams(enable_heartbeats=True))
runner = PipelineRunner()

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024, Daily
# Copyright (c) 2024-2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
@@ -38,7 +38,7 @@ search_tool = {"google_search_retrieval": {}}
tools = [search_tool]
system_instruction = """
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
Always use the google search API to retrieve the latest news. You must also use it to check which day is today.
@@ -117,7 +117,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -230,7 +230,7 @@ Your response will be turned into speech so use only simple words and punctuatio
)
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -92,10 +92,8 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
observers=[rtvi.observer()],
),
params=PipelineParams(allow_interruptions=True),
observers=[rtvi.observer()],
)
@rtvi.event_handler("on_client_ready")

View File

@@ -140,10 +140,8 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
observers=[GoogleRTVIObserver(rtvi)],
),
params=PipelineParams(allow_interruptions=True),
observers=[GoogleRTVIObserver(rtvi)],
)
@rtvi.event_handler("on_client_ready")

View File

@@ -346,7 +346,7 @@ async def main():
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=False))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=False))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -113,7 +113,6 @@ We have introduced support for Google's Gemini 2.0 Flash Lite model in this exam
**Quick Start**
To use the Gemini-based bot instead of OpenAI:
```shell
curl -X POST "http://localhost:7860/daily_gemini_start_bot" \ py pipecat
-H "Content-Type: application/json" \
@@ -122,24 +121,25 @@ curl -X POST "http://localhost:7860/daily_gemini_start_bot" \
All request body parameters supported by /daily_start_bot (such as detectVoicemail, dialoutNumber, etc.) are also compatible with /daily_gemini_start_bot.
This example uses context switching to help steer the bot in the right direction. As Flash Lite is a smaller model, getting it to consistently call functions was difficult for these longer prompts. Breaking the prompt
down into smaller pieces helped improve the accuracy of the bot.
This example uses context switching to help steer the bot in the right direction. As Flash Lite is a smaller model, breaking the prompt down into smaller piece helps to improve the bot's accuracy.
For example, instead of giving one large prompt like:
```python
system_instruction="""You are a chatbot that needs to detect if you're talking to a voicemail system or human, then either leave a message or have a conversation. If it's voicemail, say "Hello, this is a message..." and hang up. If it's a human, introduce yourself and be helpful until they say goodbye."""
```
We break it into stages:
First prompt focuses only on detection: "Determine if this is voicemail or human"
After detection, we switch to a new context: either "Leave this specific voicemail message" or "Have a conversation with the human".
**Implementation Details**
The implementation is available in bot_daily_gemini.py and features:
Staged prompting approach: Breaking down complex tasks into smaller, more focused prompts to improve the lightweight model's performance
Dynamic context switching: The bot can change its behavior in real-time based on what it detects (voicemail vs. human caller)
Function-based architecture: Uses function calling to trigger context switches and call termination
**Optimizations for Lightweight Models**
Working with Gemini 2.0 Flash Lite required some specific optimizations:
Simplified prompts: Each prompt focuses on a single task with clear instructions
Function-driven state changes: The model calls specific functions to switch between different conversation modes
Reduced context requirements: Each stage maintains only the context needed for its specific purpose
This approach significantly improves the consistency of function calling in this lightweight model, which was challenging with longer, more complex prompts.
- Staged prompting approach: Breaking down complex tasks into smaller, more focused prompts to improve the lightweight model's performance
- Dynamic context switching: The bot can change its behavior in real-time based on what it detects (voicemail vs. human caller)
- Function-based architecture: Uses function calling to trigger context switches and call termination
### More information

View File

@@ -49,7 +49,7 @@ async def main(
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
# We don't want to specify dialin settings if we're not dialing in
# We don't want to specify dial-in settings if we're not dialing in
dialin_settings = None
if callId and callDomain:
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
@@ -150,7 +150,7 @@ async def main(
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
if dialout_number:
logger.debug("dialout number detected; doing dialout")

View File

@@ -20,7 +20,6 @@ from pipecat.frames.frames import (
EndTaskFrame,
Frame,
InputAudioRawFrame,
StopTaskFrame,
SystemFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
@@ -45,8 +44,6 @@ logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
system_message = None
class UserAudioCollector(FrameProcessor):
"""This FrameProcessor collects audio frames in a buffer, then adds them to the
@@ -123,24 +120,21 @@ class FunctionHandlers:
self, function_name, tool_call_id, args, llm, context, result_callback
):
"""Function the bot can call to leave a voicemail message."""
print(f"!!! Got a voicemail response, llm is: {llm}")
system_message = """You are Chatbot leaving a voicemail message. Say EXACTLY this message and nothing else:
message = """You are Chatbot leaving a voicemail message. Say EXACTLY this message and nothing else:
"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."
After saying this message, call the terminate_call function."""
print("!!! about to push stop task frame from voicemail")
await llm.queue_frame(StopTaskFrame(), FrameDirection.UPSTREAM)
print("!!! pushed stop task frame from voicemail")
await result_callback("Goodbye")
await self.context_switcher.switch_context(system_instruction=message)
await result_callback("Leaving a voicemail message")
async def human_conversation(
self, function_name, tool_call_id, args, llm, context, result_callback
):
"""Function the bot can when it detects it's talking to a human."""
print(f"!!! Got a human response, llm is: {llm}")
system_message = """You are Chatbot talking to a human. Be friendly and helpful.
message = """You are Chatbot talking to a human. Be friendly and helpful.
Start with: "Hello! I'm a friendly chatbot. How can I help you today?"
@@ -153,16 +147,17 @@ class FunctionHandlers:
- "Thank you, that's all I needed"
THEN say: "Thank you for chatting. Goodbye!" and call the terminate_call function."""
print("!!! about to push stop task frame from human")
await llm.queue_frame(StopTaskFrame(), FrameDirection.UPSTREAM)
print("!!! pushed stop task frame from human")
await result_callback("Goodbye")
await self.context_switcher.switch_context(system_instruction=message)
await result_callback("Talking to the customer")
async def terminate_call(
function_name, tool_call_id, args, llm: LLMService, context, result_callback
):
"""Function the bot can call to terminate the call upon completion of the call."""
await llm.queue_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
@@ -178,7 +173,7 @@ async def main(
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
# We don't want to specify dialin settings if we're not dialing in
# We don't want to specify dial-in settings if we're not dialing in
dialin_settings = None
if callId and callDomain:
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
@@ -244,88 +239,39 @@ If it sounds like a human (saying hello, asking questions, etc.), call the funct
DO NOT say anything until you've determined if this is a voicemail or human."""
greeting_llm = GoogleLLMService(
llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite-preview-02-05",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,
)
greeting_context = GoogleLLMContext()
greeting_context_aggregator = greeting_llm.create_context_aggregator(greeting_context)
greeting_audio_collector = UserAudioCollector(
greeting_context, greeting_context_aggregator.user()
)
context = GoogleLLMContext()
context_aggregator = llm.create_context_aggregator(context)
audio_collector = UserAudioCollector(context, context_aggregator.user())
context_switcher = ContextSwitcher(greeting_llm, greeting_context_aggregator.user())
context_switcher = ContextSwitcher(llm, context_aggregator.user())
handlers = FunctionHandlers(context_switcher)
greeting_llm.register_function("switch_to_voicemail_response", handlers.voicemail_response)
greeting_llm.register_function("switch_to_human_conversation", handlers.human_conversation)
greeting_llm.register_function("terminate_call", terminate_call)
llm.register_function("switch_to_voicemail_response", handlers.voicemail_response)
llm.register_function("switch_to_human_conversation", handlers.human_conversation)
llm.register_function("terminate_call", terminate_call)
greeting_pipeline = Pipeline(
pipeline = Pipeline(
[
transport.input(), # Transport user input
greeting_audio_collector, # Collect audio frames
greeting_context_aggregator.user(), # User responses
greeting_llm, # LLM
audio_collector, # Collect audio frames
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
greeting_context_aggregator.assistant(), # Assistant spoken responses
]
)
greeting_pipeline_task = PipelineTask(
greeting_pipeline,
PipelineParams(allow_interruptions=True),
)
runner = PipelineRunner()
print("!!! starting greeting")
await runner.run(greeting_pipeline_task)
print("!!! Done with greeting")
# Create conversation pipeline with new system message
conversation_llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite-preview-02-05",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_message if system_message else "You are a helpful chatbot.",
tools=[
{
"function_declarations": [
{
"name": "terminate_call",
"description": "Call this function to terminate the call.",
}
]
}
],
)
conversation_llm.register_function("terminate_call", terminate_call)
conversation_context = GoogleLLMContext()
conversation_context_aggregator = conversation_llm.create_context_aggregator(
conversation_context
)
conversation_audio_collector = UserAudioCollector(
conversation_context, conversation_context_aggregator.user()
)
conversation_pipeline = Pipeline(
[
transport.input(), # Transport user input
conversation_audio_collector, # Collect audio frames
conversation_context_aggregator.user(), # User responses
conversation_llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
conversation_context_aggregator.assistant(), # Assistant spoken responses
context_aggregator.assistant(), # Assistant spoken responses
]
)
conversation_task = PipelineTask(
conversation_pipeline,
PipelineParams(allow_interruptions=True),
task = PipelineTask(
pipeline,
params=PipelineParams(allow_interruptions=True),
)
if dialout_number:
@@ -373,11 +319,11 @@ DO NOT say anything until you've determined if this is a voicemail or human."""
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await conversation_task.cancel()
await task.cancel()
print("!!! Starting conversation")
await runner.run(conversation_task)
print("!!! Done with conversation")
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":

View File

@@ -77,7 +77,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -90,7 +90,7 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(allow_interruptions=True, enable_metrics=True),
params=PipelineParams(allow_interruptions=True, enable_metrics=True),
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -172,12 +172,12 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
observers=[RTVIObserver(rtvi)],
)
await task.queue_frame(quiet_frame)

View File

@@ -198,12 +198,12 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
observers=[RTVIObserver(rtvi)],
)
await task.queue_frame(quiet_frame)

View File

@@ -104,7 +104,7 @@ async def main(room_url, token=None):
main_task = PipelineTask(
main_pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -155,8 +155,10 @@ Your task is to help the user understand and learn from this article in 2 senten
task = PipelineTask(
pipeline,
PipelineParams(
audio_out_sample_rate=44100, allow_interruptions=True, enable_metrics=True
params=PipelineParams(
audio_out_sample_rate=44100,
allow_interruptions=True,
enable_metrics=True,
),
)

View File

@@ -183,12 +183,12 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
params=PipelineParams(
allow_interruptions=False, # We don't want to interrupt the translator bot
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -108,7 +108,9 @@ async def run_bot(websocket_client: WebSocket, stream_sid: str, testing: bool):
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
audio_in_sample_rate=8000,
audio_out_sample_rate=8000,
allow_interruptions=True,
),
)

View File

@@ -142,7 +142,9 @@ async def run_client(client_name: str, server_url: str, duration_secs: int):
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
audio_in_sample_rate=8000,
audio_out_sample_rate=8000,
allow_interruptions=True,
),
)

Some files were not shown because too many files have changed in this diff Show More