Compare commits
2 Commits
v0.0.58
...
hush/firew
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b5934783a7 | ||
|
|
95b28f635a |
99
CHANGELOG.md
99
CHANGELOG.md
@@ -5,38 +5,10 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.0.58] - 2025-02-26
|
||||
## Unreleased
|
||||
|
||||
### Added
|
||||
|
||||
- Added track-specific audio event `on_track_audio_data` to
|
||||
`AudioBufferProcessor` for accessing separate input and output audio tracks.
|
||||
|
||||
- Pipecat version will now be logged on every application startup. This will
|
||||
help us identify what version we are running in case of any issues.
|
||||
|
||||
- Added a new `StopFrame` which can be used to stop a pipeline task while
|
||||
keeping the frame processors running. The frame processors could then be used
|
||||
in a different pipeline. The difference between a `StopFrame` and a
|
||||
`StopTaskFrame` is that, as with `EndFrame` and `EndTaskFrame`, the
|
||||
`StopFrame` is pushed from the task and the `StopTaskFrame` is pushed upstream
|
||||
inside the pipeline by any processor.
|
||||
|
||||
- Added a new `PipelineTask` parameter `observers` that replaces the previous
|
||||
`PipelineParams.observers`.
|
||||
|
||||
- Added a new `PipelineTask` parameter `check_dangling_tasks` to enable or
|
||||
disable checking for frame processors' dangling tasks when the Pipeline
|
||||
finishes running.
|
||||
|
||||
- Added new `on_completion_timeout` event for LLM services (all OpenAI-based
|
||||
services, Anthropic and Google). Note that this event will only get triggered
|
||||
if LLM timeouts are setup and if the timeout was reached. It can be useful to
|
||||
retrigger another completion and see if the timeout was just a blip.
|
||||
|
||||
- Added new log observers `LLMLogObserver` and `TranscriptionLogObserver` that
|
||||
can be useful for debugging your pipelines.
|
||||
|
||||
- Added `room_url` property to `DailyTransport`.
|
||||
|
||||
- Added `addons` argument to `DeepgramSTTService`.
|
||||
@@ -45,27 +17,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- ⚠️ `PipelineTask` now requires keyword arguments (except for the first one for
|
||||
the pipeline).
|
||||
|
||||
- Updated `PlayHTHttpTTSService` to take a `voice_engine` and `protocol` input
|
||||
in the constructor. The previous method of providing a `voice_engine` input
|
||||
that contains the engine and protocol is deprecated by PlayHT.
|
||||
|
||||
- The base `TTSService` class now strips leading newlines before sending text
|
||||
to the TTS provider. This change is to solve issues where some TTS providers,
|
||||
like Azure, would not output text due to newlines.
|
||||
|
||||
- `GrokLLMSService` now uses `grok-2` as the default model.
|
||||
|
||||
- `AnthropicLLMService` now uses `claude-3-7-sonnet-20250219` as the default
|
||||
model.
|
||||
|
||||
- `RimeHttpTTSService` needs an `aiohttp.ClientSession` to be passed to the
|
||||
constructor as all the other HTTP-based services.
|
||||
|
||||
- `RimeHttpTTSService` doesn't use a default voice anymore.
|
||||
|
||||
- `DeepgramSTTService` now uses the new `nova-3` model by default. If you want
|
||||
to use the previous model you can pass `LiveOptions(model="nova-2-general")`.
|
||||
(see https://deepgram.com/learn/introducing-nova-3-speech-to-text-api)
|
||||
@@ -74,53 +25,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
|
||||
```
|
||||
|
||||
### Deprecated
|
||||
|
||||
- `PipelineParams.observers` is now deprecated, you the new `PipelineTask`
|
||||
parameter `observers`.
|
||||
|
||||
### Removed
|
||||
|
||||
- Remove `TransportParams.audio_out_is_live` since it was not being used at all.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed a `GoogleLLMService` that was causing an exception when sending inline
|
||||
audio in some cases.
|
||||
|
||||
- Fixed an `AudioContextWordTTSService` issue that would cause an `EndFrame` to
|
||||
disconnect from the TTS service before audio from all the contexts was
|
||||
received. This affected services like Cartesia and Rime.
|
||||
|
||||
- Fixed an issue that was not allowing to pass an `OpenAILLMContext` to create
|
||||
`GoogleLLMService`'s context aggregators.
|
||||
|
||||
- Fixed a `ElevenLabsTTSService`, `FishAudioTTSService`, `LMNTTTSService` and
|
||||
`PlayHTTTSService` issue that was resulting in audio requested before an
|
||||
interruption being played after an interruption.
|
||||
|
||||
- Fixed `match_endofsentence` support for ellipses.
|
||||
|
||||
- Fixed an issue that would cause undesired interruptions via
|
||||
`EmulateUserStartedSpeakingFrame` when only interim transcriptions (i.e. no
|
||||
final transcriptions) where received.
|
||||
|
||||
- Fixed an issue where `EndTaskFrame` was not triggering
|
||||
`on_client_disconnected` or closing the WebSocket in FastAPI.
|
||||
|
||||
- Fixed an issue in `DeepgramSTTService` where the `sample_rate` passed to the
|
||||
`LiveOptions` was not being used, causing the service to use the default
|
||||
sample rate of pipeline.
|
||||
|
||||
- Fixed a context aggregator issue that would not append the LLM text response
|
||||
to the context if a function call happened in the same LLM turn.
|
||||
|
||||
- Fixed an issue that was causing HTTP TTS services to push `TTSStoppedFrame`
|
||||
more than once.
|
||||
|
||||
- Fixed a `FishAudioTTSService` issue where `TTSStoppedFrame` was not being
|
||||
pushed.
|
||||
|
||||
- Fixed an issue that `start_callback` was not invoked for some LLM services.
|
||||
|
||||
- Fixed an issue that would cause `DeepgramSTTService` to stop working after an
|
||||
@@ -134,9 +40,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
|
||||
|
||||
- Added Gemini support to `examples/phone-chatbot`.
|
||||
|
||||
- Added foundational example `34-audio-recording.py` showing how to use the
|
||||
AudioBufferProcessor callbacks to save merged and track recordings.
|
||||
|
||||
## [0.0.57] - 2025-02-14
|
||||
|
||||
### Added
|
||||
|
||||
@@ -3,10 +3,10 @@ coverage~=7.6.12
|
||||
grpcio-tools~=1.67.1
|
||||
pip-tools~=7.4.1
|
||||
pre-commit~=4.0.1
|
||||
pyright~=1.1.394
|
||||
pyright~=1.1.393
|
||||
pytest~=8.3.4
|
||||
pytest-asyncio~=0.25.3
|
||||
ruff~=0.9.7
|
||||
pytest-asyncio~=0.25.2
|
||||
ruff~=0.9.5
|
||||
setuptools~=70.0.0
|
||||
setuptools_scm~=8.1.0
|
||||
python-dotenv~=1.0.1
|
||||
|
||||
@@ -18,9 +18,6 @@ AZURE_DALLE_API_KEY=...
|
||||
AZURE_DALLE_ENDPOINT=https://...
|
||||
AZURE_DALLE_MODEL=...
|
||||
|
||||
# Cartesia
|
||||
CARTESIA_API_KEY=...
|
||||
|
||||
# Daily
|
||||
DAILY_API_KEY=...
|
||||
DAILY_SAMPLE_ROOM_URL=https://...
|
||||
|
||||
@@ -17,7 +17,7 @@ from runner import configure
|
||||
from pipecat.frames.frames import AudioRawFrame, EndFrame, OutputAudioRawFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -124,7 +124,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@audiobuffer.event_handler("on_audio_data")
|
||||
async def on_audio_data(buffer, audio, sample_rate, num_channels):
|
||||
|
||||
@@ -70,7 +70,7 @@ async def main(room_url: str, token: str):
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -62,7 +62,7 @@ async def main(room_url: str, token: str):
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -18,7 +18,8 @@ from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.services.fal import FalImageGenService
|
||||
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.local.tk import TkLocalTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -33,9 +34,7 @@ async def main():
|
||||
|
||||
transport = TkLocalTransport(
|
||||
tk_root,
|
||||
TkTransportParams(
|
||||
camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024
|
||||
),
|
||||
TransportParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
|
||||
)
|
||||
|
||||
imagegen = FalImageGenService(
|
||||
|
||||
@@ -44,8 +44,7 @@ async def main():
|
||||
runner = PipelineRunner()
|
||||
|
||||
task = PipelineTask(
|
||||
Pipeline([imagegen, transport.output()]),
|
||||
params=PipelineParams(enable_metrics=True),
|
||||
Pipeline([imagegen, transport.output()]), PipelineParams(enable_metrics=True)
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
|
||||
@@ -30,7 +30,8 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.cartesia import CartesiaHttpTTSService
|
||||
from pipecat.services.fal import FalImageGenService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -151,7 +152,7 @@ async def main():
|
||||
|
||||
transport = TkLocalTransport(
|
||||
tk_root,
|
||||
TkTransportParams(
|
||||
TransportParams(
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_width=1024,
|
||||
|
||||
@@ -105,10 +105,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
PipelineParams(enable_metrics=True, enable_usage_metrics=True),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
|
||||
@@ -127,7 +127,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -76,7 +76,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -74,7 +74,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -79,7 +79,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -103,7 +103,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -74,7 +74,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -1,103 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.elevenlabs import ElevenLabsHttpTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
tts = ElevenLabsHttpTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
aiohttp_session=session,
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -74,7 +74,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -75,7 +75,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -77,7 +77,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -83,7 +83,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -75,7 +75,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -80,7 +80,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -71,7 +71,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -88,7 +88,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -79,7 +79,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -80,7 +80,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -76,7 +76,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -1,103 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.rime import RimeHttpTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
tts = RimeHttpTTSService(
|
||||
api_key=os.getenv("RIME_API_KEY", ""),
|
||||
voice_id="rex",
|
||||
aiohttp_session=session,
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -74,7 +74,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -74,7 +74,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -251,7 +251,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -74,7 +74,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -78,11 +78,7 @@ async def main():
|
||||
runner = PipelineRunner()
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=24000,
|
||||
audio_out_sample_rate=24000,
|
||||
),
|
||||
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
|
||||
)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
@@ -24,7 +24,8 @@ from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.local.tk import TkLocalTransport
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
@@ -66,7 +67,7 @@ async def main():
|
||||
|
||||
tk_transport = TkLocalTransport(
|
||||
tk_root,
|
||||
TkTransportParams(
|
||||
TransportParams(
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_is_live=True,
|
||||
@@ -82,11 +83,7 @@ async def main():
|
||||
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=24000,
|
||||
audio_out_sample_rate=24000,
|
||||
),
|
||||
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
|
||||
)
|
||||
|
||||
async def run_tk():
|
||||
|
||||
@@ -76,7 +76,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -29,14 +29,22 @@ logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
async def start_fetch_products(function_name, llm, context):
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
await llm.push_frame(TTSSpeakFrame("I'll take a look!"))
|
||||
logger.debug(f"Starting fetch_products_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
await result_callback({"conditions": "nice", "temperature": "75"})
|
||||
async def fetch_products_from_api(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
logger.debug(f"args for fetch_products_from_api: {args}")
|
||||
# In the real world you'd fetch the products from an API. We're hardcoding them here.
|
||||
product = args["product"]
|
||||
if product == "vacuums":
|
||||
await result_callback({"vacuums": ["Dyson V11", "Roomba i7"]})
|
||||
elif product == "tvs":
|
||||
await result_callback({"tvs": ["Samsung 65 inch", "LG 55 inch"]})
|
||||
else:
|
||||
await result_callback({"error": "Unknown product"})
|
||||
|
||||
|
||||
async def main():
|
||||
@@ -63,28 +71,24 @@ async def main():
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
# Register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
|
||||
llm.register_function(None, fetch_products_from_api, start_callback=start_fetch_products)
|
||||
|
||||
tools = [
|
||||
ChatCompletionToolParam(
|
||||
type="function",
|
||||
function={
|
||||
"name": "get_current_weather",
|
||||
"description": "Get the current weather",
|
||||
"name": "get_products",
|
||||
"description": "Get the list of products available.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"location": {
|
||||
"product": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
"enum": ["vacuums", "tvs"],
|
||||
"description": "The type of product to show.",
|
||||
}
|
||||
},
|
||||
"required": ["location", "format"],
|
||||
"required": ["product"],
|
||||
},
|
||||
},
|
||||
)
|
||||
@@ -92,7 +96,7 @@ async def main():
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
"content": "You are a helpful customer service agent named Hailey in a video call. Your goal is to sell vacuums or tvs. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
@@ -112,7 +116,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -99,13 +99,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
),
|
||||
)
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -153,13 +153,7 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
),
|
||||
)
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -152,7 +152,7 @@ indicate you should use the get_image tool are:
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -116,7 +116,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -113,7 +113,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -117,7 +117,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -116,7 +116,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -116,7 +116,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -123,7 +123,7 @@ Start by asking me for my location. Then, use 'get_weather_current' to give me a
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -123,7 +123,7 @@ Start by asking me for my location. Then, use 'get_weather_current' to give me a
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -117,7 +117,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -83,7 +83,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -133,7 +133,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -126,7 +126,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -85,13 +85,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
),
|
||||
)
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
|
||||
# When a participant joins, start transcription for that participant so the
|
||||
# bot can "hear" and respond to them.
|
||||
|
||||
@@ -108,7 +108,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
|
||||
@@ -38,6 +38,7 @@ async def main():
|
||||
"GStreamer",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_is_live=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_width=1280,
|
||||
camera_out_height=720,
|
||||
|
||||
@@ -154,7 +154,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -212,7 +212,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -237,7 +237,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -209,7 +209,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -263,7 +263,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -87,7 +87,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
# We just use 16000 because that's what Tavus is expecting and
|
||||
# we avoid resampling.
|
||||
audio_in_sample_rate=16000,
|
||||
|
||||
@@ -145,7 +145,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -138,7 +138,6 @@ class OutputGate(FrameProcessor):
|
||||
self._gate_open = start_open
|
||||
self._frames_buffer = []
|
||||
self._notifier = notifier
|
||||
self._gate_task = None
|
||||
|
||||
def close_gate(self):
|
||||
self._gate_open = False
|
||||
@@ -179,13 +178,10 @@ class OutputGate(FrameProcessor):
|
||||
|
||||
async def _start(self):
|
||||
self._frames_buffer = []
|
||||
if not self._gate_task:
|
||||
self._gate_task = self.create_task(self._gate_task_handler())
|
||||
self._gate_task = self.create_task(self._gate_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
if self._gate_task:
|
||||
await self.cancel_task(self._gate_task)
|
||||
self._gate_task = None
|
||||
await self.cancel_task(self._gate_task)
|
||||
|
||||
async def _gate_task_handler(self):
|
||||
while True:
|
||||
@@ -355,7 +351,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -342,7 +342,6 @@ class OutputGate(FrameProcessor):
|
||||
self._gate_open = start_open
|
||||
self._frames_buffer = []
|
||||
self._notifier = notifier
|
||||
self._gate_task = None
|
||||
|
||||
def close_gate(self):
|
||||
self._gate_open = False
|
||||
@@ -383,13 +382,10 @@ class OutputGate(FrameProcessor):
|
||||
|
||||
async def _start(self):
|
||||
self._frames_buffer = []
|
||||
if not self._gate_task:
|
||||
self._gate_task = self.create_task(self._gate_task_handler())
|
||||
self._gate_task = self.create_task(self._gate_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
if self._gate_task:
|
||||
await self.cancel_task(self._gate_task)
|
||||
self._gate_task = None
|
||||
await self.cancel_task(self._gate_task)
|
||||
|
||||
async def _gate_task_handler(self):
|
||||
while True:
|
||||
@@ -564,7 +560,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -25,8 +25,10 @@ from pipecat.frames.frames import (
|
||||
InputAudioRawFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
SystemFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
@@ -389,7 +391,7 @@ class AudioAccumulator(FrameProcessor):
|
||||
)
|
||||
self._user_speaking = False
|
||||
context = GoogleLLMContext()
|
||||
context.add_audio_frames_message(audio_frames=self._audio_frames)
|
||||
context.add_audio_frames_message(text="Audio follows", audio_frames=self._audio_frames)
|
||||
await self.push_frame(OpenAILLMContextFrame(context=context))
|
||||
elif isinstance(frame, InputAudioRawFrame):
|
||||
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
|
||||
@@ -553,7 +555,6 @@ class OutputGate(FrameProcessor):
|
||||
self._notifier = notifier
|
||||
self._context = context
|
||||
self._transcription_buffer = user_transcription_buffer
|
||||
self._gate_task = None
|
||||
|
||||
def close_gate(self):
|
||||
self._gate_open = False
|
||||
@@ -601,13 +602,10 @@ class OutputGate(FrameProcessor):
|
||||
|
||||
async def _start(self):
|
||||
self._frames_buffer = []
|
||||
if not self._gate_task:
|
||||
self._gate_task = self.create_task(self._gate_task_handler())
|
||||
self._gate_task = self.create_task(self._gate_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
if self._gate_task:
|
||||
await self.cancel_task(self._gate_task)
|
||||
self._gate_task = None
|
||||
await self.cancel_task(self._gate_task)
|
||||
|
||||
async def _gate_task_handler(self):
|
||||
while True:
|
||||
@@ -742,7 +740,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -87,7 +87,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -122,7 +122,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -354,7 +354,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -63,7 +63,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -89,7 +89,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -120,7 +120,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -79,7 +79,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -106,7 +106,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
#
|
||||
# Copyright (c) 2024-2025, Daily
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
@@ -34,7 +34,7 @@ search_tool = {"google_search": {}}
|
||||
tools = [search_tool]
|
||||
|
||||
system_instruction = """
|
||||
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
|
||||
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
|
||||
|
||||
Always use the google search API to retrieve the latest news. You must also use it to check which day is today.
|
||||
|
||||
@@ -93,7 +93,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -83,7 +83,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
),
|
||||
|
||||
@@ -150,7 +150,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -150,7 +150,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -178,7 +178,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -18,10 +18,12 @@ from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMTextFrame,
|
||||
StartInterruptionFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -71,6 +73,38 @@ class DebugObserver(BaseObserver):
|
||||
logger.info(f"🤖 BOT STOP SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
|
||||
class LLMLogObserver(BaseObserver):
|
||||
"""Observer to log LLM activity to the console.
|
||||
|
||||
Logs all frame instances of:
|
||||
- LLMFullResponseStartFrame (only from LLM service)
|
||||
- LLMTextFrame
|
||||
- LLMFullResponseEndFrame (only from LLM service)
|
||||
|
||||
This allows you to track when the LLM starts responding, what it generates, and when it finishes.
|
||||
Log format: [LLM EVENT]: [details] at [timestamp]s
|
||||
"""
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
time_sec = timestamp / 1_000_000_000
|
||||
|
||||
# Only log start/end frames from OpenAILLMService
|
||||
if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
|
||||
if isinstance(src, OpenAILLMService):
|
||||
event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
|
||||
logger.info(f"🧠 LLM {event} RESPONSE at {time_sec:.2f}s")
|
||||
# Log all LLMTextFrames
|
||||
elif isinstance(frame, LLMTextFrame):
|
||||
logger.info(f"🧠 LLM GENERATING: {frame.text!r} at {time_sec:.2f}s")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
@@ -117,13 +151,13 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
observers=[DebugObserver(), LLMLogObserver()],
|
||||
),
|
||||
observers=[DebugObserver(), LLMLogObserver()],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
|
||||
@@ -32,7 +32,7 @@ async def main():
|
||||
|
||||
pipeline = Pipeline([NullProcessor()])
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(enable_heartbeats=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(enable_heartbeats=True))
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
#
|
||||
# Copyright (c) 2024-2025, Daily
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
@@ -38,7 +38,7 @@ search_tool = {"google_search_retrieval": {}}
|
||||
tools = [search_tool]
|
||||
|
||||
system_instruction = """
|
||||
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
|
||||
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
|
||||
|
||||
Always use the google search API to retrieve the latest news. You must also use it to check which day is today.
|
||||
|
||||
@@ -117,7 +117,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -230,7 +230,7 @@ Your response will be turned into speech so use only simple words and punctuatio
|
||||
)
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -1,186 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Audio Recording Example with Pipecat.
|
||||
|
||||
This example demonstrates how to record audio from a conversation between a user and an AI assistant,
|
||||
saving both merged and individual audio tracks. It showcases the AudioBufferProcessor's capabilities
|
||||
to handle both combined and separate audio streams.
|
||||
|
||||
The example:
|
||||
1. Sets up a basic conversation with an AI assistant
|
||||
2. Records the entire conversation
|
||||
3. Saves three separate WAV files:
|
||||
- A merged recording of both participants
|
||||
- Individual recording of user audio
|
||||
- Individual recording of assistant audio
|
||||
|
||||
Example usage (run from pipecat root directory):
|
||||
$ pip install "pipecat-ai[daily,openai,cartesia,silero]"
|
||||
$ pip install -r dev-requirements.txt
|
||||
$ python examples/foundational/34-audio-recording.py
|
||||
|
||||
Requirements:
|
||||
- OpenAI API key (for GPT-4)
|
||||
- Cartesia API key (for text-to-speech)
|
||||
- Daily API key (for video/audio transport)
|
||||
|
||||
Environment variables (.env file):
|
||||
OPENAI_API_KEY=your_openai_key
|
||||
CARTESIA_API_KEY=your_cartesia_key
|
||||
DAILY_API_KEY=your_daily_key
|
||||
|
||||
The recordings will be saved in a 'recordings' directory with timestamps:
|
||||
recordings/
|
||||
merged_20240315_123456.wav (Combined audio)
|
||||
user_20240315_123456.wav (User audio only)
|
||||
bot_20240315_123456.wav (Bot audio only)
|
||||
|
||||
Note:
|
||||
This example requires the AudioBufferProcessor with track-specific audio support,
|
||||
which provides both 'on_audio_data' and 'on_track_audio_data' events for
|
||||
handling merged and separate audio tracks respectively.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import wave
|
||||
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def save_audio_file(audio: bytes, filename: str, sample_rate: int, num_channels: int):
|
||||
"""Save audio data to a WAV file."""
|
||||
if len(audio) > 0:
|
||||
with io.BytesIO() as buffer:
|
||||
with wave.open(buffer, "wb") as wf:
|
||||
wf.setsampwidth(2)
|
||||
wf.setnchannels(num_channels)
|
||||
wf.setframerate(sample_rate)
|
||||
wf.writeframes(audio)
|
||||
async with aiofiles.open(filename, "wb") as file:
|
||||
await file.write(buffer.getvalue())
|
||||
logger.info(f"Audio saved to {filename}")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Recording bot",
|
||||
DailyParams(
|
||||
# audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True, # Enable audio passthrough for recording
|
||||
),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4")
|
||||
|
||||
# Create audio buffer processor
|
||||
audiobuffer = AudioBufferProcessor()
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful assistant demonstrating audio recording capabilities. Keep your responses brief and clear.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
audiobuffer, # Add audio buffer to pipeline
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await audiobuffer.start_recording()
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Greet the user and explain that this conversation will be recorded.",
|
||||
}
|
||||
)
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await audiobuffer.stop_recording()
|
||||
await task.cancel()
|
||||
|
||||
# Handler for merged audio
|
||||
@audiobuffer.event_handler("on_audio_data")
|
||||
async def on_audio_data(buffer, audio, sample_rate, num_channels):
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"recordings/merged_{timestamp}.wav"
|
||||
os.makedirs("recordings", exist_ok=True)
|
||||
await save_audio_file(audio, filename, sample_rate, num_channels)
|
||||
|
||||
# Handler for separate tracks
|
||||
@audiobuffer.event_handler("on_track_audio_data")
|
||||
async def on_track_audio_data(buffer, user_audio, bot_audio, sample_rate, num_channels):
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
os.makedirs("recordings", exist_ok=True)
|
||||
|
||||
# Save user audio
|
||||
user_filename = f"recordings/user_{timestamp}.wav"
|
||||
await save_audio_file(user_audio, user_filename, sample_rate, 1)
|
||||
|
||||
# Save bot audio
|
||||
bot_filename = f"recordings/bot_{timestamp}.wav"
|
||||
await save_audio_file(bot_audio, bot_filename, sample_rate, 1)
|
||||
|
||||
runner = PipelineRunner()
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -92,8 +92,10 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(allow_interruptions=True),
|
||||
observers=[rtvi.observer()],
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
observers=[rtvi.observer()],
|
||||
),
|
||||
)
|
||||
|
||||
@rtvi.event_handler("on_client_ready")
|
||||
|
||||
@@ -140,8 +140,10 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(allow_interruptions=True),
|
||||
observers=[GoogleRTVIObserver(rtvi)],
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
observers=[GoogleRTVIObserver(rtvi)],
|
||||
),
|
||||
)
|
||||
|
||||
@rtvi.event_handler("on_client_ready")
|
||||
|
||||
@@ -346,7 +346,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=False))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=False))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -106,12 +106,12 @@ curl -X POST "http://localhost:7860/daily_start_bot" \
|
||||
-d '{"dialoutNumber": "+18057145330", "detectVoicemail": true}'
|
||||
```
|
||||
|
||||
### New! Using Gemini 2.0 Flash Lite with Daily
|
||||
### New! Using Gemini with Daily
|
||||
|
||||
We have introduced support for Google's Gemini 2.0 Flash Lite model in this example. This lightweight model offers faster response times and reduced costs while maintaining good conversational capabilities.
|
||||
We have introduced a new example file that uses Gemini. You can find the code within bot_daily_gemini.py.
|
||||
If you want to spin up a Gemini-based bot for this demo, instead of an OpenAI-based bot, call the same properties above but on the `daily_gemini_start_bot` endpoint instead.
|
||||
|
||||
**Quick Start**
|
||||
To use the Gemini-based bot instead of OpenAI:
|
||||
For example:
|
||||
|
||||
```shell
|
||||
curl -X POST "http://localhost:7860/daily_gemini_start_bot" \ py pipecat
|
||||
@@ -119,27 +119,7 @@ curl -X POST "http://localhost:7860/daily_gemini_start_bot" \
|
||||
-d '{"detectVoicemail": true}'
|
||||
```
|
||||
|
||||
All request body parameters supported by /daily_start_bot (such as detectVoicemail, dialoutNumber, etc.) are also compatible with /daily_gemini_start_bot.
|
||||
|
||||
This example uses context switching to help steer the bot in the right direction. As Flash Lite is a smaller model, breaking the prompt down into smaller piece helps to improve the bot's accuracy.
|
||||
|
||||
For example, instead of giving one large prompt like:
|
||||
|
||||
```python
|
||||
system_instruction="""You are a chatbot that needs to detect if you're talking to a voicemail system or human, then either leave a message or have a conversation. If it's voicemail, say "Hello, this is a message..." and hang up. If it's a human, introduce yourself and be helpful until they say goodbye."""
|
||||
```
|
||||
|
||||
We break it into stages:
|
||||
|
||||
First prompt focuses only on detection: "Determine if this is voicemail or human"
|
||||
After detection, we switch to a new context: either "Leave this specific voicemail message" or "Have a conversation with the human".
|
||||
|
||||
**Implementation Details**
|
||||
The implementation is available in bot_daily_gemini.py and features:
|
||||
|
||||
- Staged prompting approach: Breaking down complex tasks into smaller, more focused prompts to improve the lightweight model's performance
|
||||
- Dynamic context switching: The bot can change its behavior in real-time based on what it detects (voicemail vs. human caller)
|
||||
- Function-based architecture: Uses function calling to trigger context switches and call termination
|
||||
Any request body properties supported by `/daily_start_bot` (such as "detectVoicemail", "dialoutnumber", etc) can also be passed to `/daily_gemini_start_bot`. The only difference is that calling the Gemini endpoint will start a Gemini bot session.
|
||||
|
||||
### More information
|
||||
|
||||
|
||||
@@ -49,11 +49,7 @@ async def main(
|
||||
# If you are handling this via Twilio, Telnyx, set this to None
|
||||
# and handle call-forwarding when on_dialin_ready fires.
|
||||
|
||||
# We don't want to specify dial-in settings if we're not dialing in
|
||||
dialin_settings = None
|
||||
if callId and callDomain:
|
||||
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
|
||||
|
||||
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
@@ -100,13 +96,6 @@ async def main(
|
||||
- **"Please leave a message after the beep."**
|
||||
- **"No one is available to take your call."**
|
||||
- **"Record your message after the tone."**
|
||||
- **"Please leave a message after the beep"**
|
||||
- **"You have reached voicemail for..."**
|
||||
- **"You have reached [phone number]"**
|
||||
- **"[phone number] is unavailable"**
|
||||
- **"The person you are trying to reach..."**
|
||||
- **"The number you have dialed..."**
|
||||
- **"Your call has been forwarded to an automated voice messaging system"**
|
||||
- **Any phrase that suggests an answering machine or voicemail.**
|
||||
- **ASSUME IT IS A VOICEMAIL. DO NOT WAIT FOR MORE CONFIRMATION.**
|
||||
- **IF THE CALL SAYS "PLEASE LEAVE A MESSAGE AFTER THE BEEP", WAIT FOR THE BEEP BEFORE LEAVING A MESSAGE.**
|
||||
@@ -150,7 +139,7 @@ async def main(
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
if dialout_number:
|
||||
logger.debug("dialout number detected; doing dialout")
|
||||
|
||||
@@ -13,21 +13,14 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
EndTaskFrame,
|
||||
InputAudioRawFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.frames.frames import EndTaskFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import LLMService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.google import GoogleLLMService
|
||||
from pipecat.services.google.google import GoogleLLMContext
|
||||
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
|
||||
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
@@ -40,119 +33,10 @@ daily_api_key = os.getenv("DAILY_API_KEY", "")
|
||||
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
|
||||
|
||||
|
||||
class UserAudioCollector(FrameProcessor):
|
||||
"""This FrameProcessor collects audio frames in a buffer, then adds them to the
|
||||
LLM context when the user stops speaking.
|
||||
"""
|
||||
|
||||
def __init__(self, context, user_context_aggregator):
|
||||
super().__init__()
|
||||
self._context = context
|
||||
self._user_context_aggregator = user_context_aggregator
|
||||
self._audio_frames = []
|
||||
self._start_secs = 0.2 # this should match VAD start_secs (hardcoding for now)
|
||||
self._user_speaking = False
|
||||
|
||||
async def process_frame(self, frame, direction):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
# We could gracefully handle both audio input and text/transcription input ...
|
||||
# but let's leave that as an exercise to the reader. :-)
|
||||
return
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
self._user_speaking = True
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
self._user_speaking = False
|
||||
self._context.add_audio_frames_message(audio_frames=self._audio_frames)
|
||||
await self._user_context_aggregator.push_frame(
|
||||
self._user_context_aggregator.get_context_frame()
|
||||
)
|
||||
elif isinstance(frame, InputAudioRawFrame):
|
||||
if self._user_speaking:
|
||||
self._audio_frames.append(frame)
|
||||
else:
|
||||
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
|
||||
# frames as necessary. Assume all audio frames have the same duration.
|
||||
self._audio_frames.append(frame)
|
||||
frame_duration = len(frame.audio) / 16 * frame.num_channels / frame.sample_rate
|
||||
buffer_duration = frame_duration * len(self._audio_frames)
|
||||
while buffer_duration > self._start_secs:
|
||||
self._audio_frames.pop(0)
|
||||
buffer_duration -= frame_duration
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class ContextSwitcher:
|
||||
def __init__(self, llm, context_aggregator):
|
||||
self._llm = llm
|
||||
self._context_aggregator = context_aggregator
|
||||
|
||||
async def switch_context(self, system_instruction):
|
||||
"""Switch the context to a new system instruction based on what the bot hears."""
|
||||
# Create messages with updated system instruction
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": system_instruction,
|
||||
}
|
||||
]
|
||||
|
||||
# Update context with new messages
|
||||
self._context_aggregator.set_messages(messages)
|
||||
# Get the context frame with the updated messages
|
||||
context_frame = self._context_aggregator.get_context_frame()
|
||||
# Trigger LLM response by pushing a context frame
|
||||
await self._llm.push_frame(context_frame)
|
||||
|
||||
|
||||
class FunctionHandlers:
|
||||
def __init__(self, context_switcher):
|
||||
self.context_switcher = context_switcher
|
||||
|
||||
async def voicemail_response(
|
||||
self, function_name, tool_call_id, args, llm, context, result_callback
|
||||
):
|
||||
"""Function the bot can call to leave a voicemail message."""
|
||||
message = """You are Chatbot leaving a voicemail message. Say EXACTLY this message and nothing else:
|
||||
|
||||
"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."
|
||||
|
||||
After saying this message, call the terminate_call function."""
|
||||
|
||||
await self.context_switcher.switch_context(system_instruction=message)
|
||||
|
||||
await result_callback("Leaving a voicemail message")
|
||||
|
||||
async def human_conversation(
|
||||
self, function_name, tool_call_id, args, llm, context, result_callback
|
||||
):
|
||||
"""Function the bot can when it detects it's talking to a human."""
|
||||
message = """You are Chatbot talking to a human. Be friendly and helpful.
|
||||
|
||||
Start with: "Hello! I'm a friendly chatbot. How can I help you today?"
|
||||
|
||||
Keep your responses brief and to the point. Listen to what the person says.
|
||||
|
||||
When the person indicates they're done with the conversation by saying something like:
|
||||
- "Goodbye"
|
||||
- "That's all"
|
||||
- "I'm done"
|
||||
- "Thank you, that's all I needed"
|
||||
|
||||
THEN say: "Thank you for chatting. Goodbye!" and call the terminate_call function."""
|
||||
|
||||
await self.context_switcher.switch_context(system_instruction=message)
|
||||
|
||||
await result_callback("Talking to the customer")
|
||||
|
||||
|
||||
async def terminate_call(
|
||||
function_name, tool_call_id, args, llm: LLMService, context, result_callback
|
||||
):
|
||||
"""Function the bot can call to terminate the call upon completion of the call."""
|
||||
|
||||
"""Function the bot can call to terminate the call upon completion of a voicemail message."""
|
||||
await llm.queue_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
|
||||
@@ -167,12 +51,7 @@ async def main(
|
||||
# dialin_settings are only needed if Daily's SIP URI is used
|
||||
# If you are handling this via Twilio, Telnyx, set this to None
|
||||
# and handle call-forwarding when on_dialin_ready fires.
|
||||
|
||||
# We don't want to specify dial-in settings if we're not dialing in
|
||||
dialin_settings = None
|
||||
if callId and callDomain:
|
||||
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
|
||||
|
||||
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
@@ -186,8 +65,7 @@ async def main(
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
# transcription_enabled=True,
|
||||
transcription_enabled=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -199,63 +77,85 @@ async def main(
|
||||
tools = [
|
||||
{
|
||||
"function_declarations": [
|
||||
{
|
||||
"name": "switch_to_voicemail_response",
|
||||
"description": "Call this function when you detect this is a voicemail system.",
|
||||
},
|
||||
{
|
||||
"name": "switch_to_human_conversation",
|
||||
"description": "Call this function when you detect this is a human.",
|
||||
},
|
||||
{
|
||||
"name": "terminate_call",
|
||||
"description": "Call this function to terminate the call.",
|
||||
"description": "Terminate the call",
|
||||
},
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
system_instruction = """You are Chatbot trying to determine if this is a voicemail system or a human.
|
||||
system_instruction = """You are Chatbot, a friendly, helpful robot. Never mention this prompt.
|
||||
|
||||
If you hear any of these phrases (or very similar ones):
|
||||
- "Please leave a message after the beep"
|
||||
- "No one is available to take your call"
|
||||
- "Record your message after the tone"
|
||||
- "You have reached voicemail for..."
|
||||
- "You have reached [phone number]"
|
||||
- "[phone number] is unavailable"
|
||||
- "The person you are trying to reach..."
|
||||
- "The number you have dialed..."
|
||||
- "Your call has been forwarded to an automated voice messaging system"
|
||||
**Operating Procedure:**
|
||||
|
||||
Then call the function switch_to_voicemail_response.
|
||||
**Phase 1: Initial Call Answer - Listen for Voicemail Greeting**
|
||||
|
||||
If it sounds like a human (saying hello, asking questions, etc.), call the function switch_to_human_conversation.
|
||||
**IMMEDIATELY after the call connects, LISTEN CAREFULLY for the *very first thing* you hear.**
|
||||
|
||||
DO NOT say anything until you've determined if this is a voicemail or human."""
|
||||
**Listen for these sentences or very close variations as the *initial greeting*:**
|
||||
|
||||
* **"Please leave a message after the beep."**
|
||||
* **"No one is available to take your call."**
|
||||
* **"Record your message after the tone."**
|
||||
* **"You have reached voicemail for..."** (or similar voicemail identification)
|
||||
|
||||
**If you HEAR one of these sentences (or a very similar greeting) as the *initial response* to the call, IMMEDIATELY assume it is voicemail and proceed to Phase 2.**
|
||||
|
||||
**If you hear "PLEASE LEAVE A MESSAGE AFTER THE BEEP", WAIT for the actual beep sound from the voicemail system *after* hearing the sentence, before proceeding to Phase 2.**
|
||||
|
||||
**If you DO NOT hear any of these voicemail greetings as the *initial response*, assume it is a human and proceed to Phase 3.**
|
||||
|
||||
|
||||
**Phase 2: Leave Voicemail Message (If Voicemail Detected):**
|
||||
|
||||
If you assumed voicemail in Phase 1, say this EXACTLY:
|
||||
"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."
|
||||
|
||||
**Immediately after saying the message, call the function `terminate_call`.**
|
||||
**DO NOT SAY ANYTHING ELSE. SILENCE IS REQUIRED AFTER `terminate_call`.**
|
||||
|
||||
|
||||
**Phase 3: Human Interaction (If No Voicemail Greeting Detected in Phase 1):**
|
||||
|
||||
If you did not detect a voicemail greeting in Phase 1 and a human answers, say:
|
||||
"Oh, hello! I'm a friendly chatbot. Is there anything I can help you with?"
|
||||
|
||||
Keep your responses **short and helpful.**
|
||||
|
||||
If the human is finished, say:
|
||||
"Okay, thank you! Have a great day!"
|
||||
|
||||
**Then, immediately call the function `terminate_call`.**
|
||||
|
||||
|
||||
**VERY IMPORTANT RULES - DO NOT DO THESE THINGS:**
|
||||
|
||||
* **DO NOT SAY "Please leave a message after the beep."**
|
||||
* **DO NOT SAY "No one is available to take your call."**
|
||||
* **DO NOT SAY "Record your message after the tone."**
|
||||
* **DO NOT SAY ANY voicemail greeting yourself.**
|
||||
* **Only check for voicemail greetings in Phase 1, *immediately after the call connects*.**
|
||||
* **After voicemail or human interaction, ALWAYS call `terminate_call` immediately.**
|
||||
* **Do not speak after calling `terminate_call`.**
|
||||
* Your speech will be audio, so use simple language without special characters.
|
||||
"""
|
||||
|
||||
llm = GoogleLLMService(
|
||||
model="models/gemini-2.0-flash-lite",
|
||||
model="models/gemini-2.0-flash-exp",
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
system_instruction=system_instruction,
|
||||
tools=tools,
|
||||
)
|
||||
llm.register_function("terminate_call", terminate_call)
|
||||
|
||||
context = GoogleLLMContext()
|
||||
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
audio_collector = UserAudioCollector(context, context_aggregator.user())
|
||||
|
||||
context_switcher = ContextSwitcher(llm, context_aggregator.user())
|
||||
handlers = FunctionHandlers(context_switcher)
|
||||
|
||||
llm.register_function("switch_to_voicemail_response", handlers.voicemail_response)
|
||||
llm.register_function("switch_to_human_conversation", handlers.human_conversation)
|
||||
llm.register_function("terminate_call", terminate_call)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
audio_collector, # Collect audio frames
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
@@ -266,7 +166,7 @@ DO NOT say anything until you've determined if this is a voicemail or human."""
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(allow_interruptions=True),
|
||||
PipelineParams(allow_interruptions=True),
|
||||
)
|
||||
|
||||
if dialout_number:
|
||||
|
||||
@@ -77,7 +77,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -90,7 +90,7 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(allow_interruptions=True, enable_metrics=True),
|
||||
PipelineParams(allow_interruptions=True, enable_metrics=True),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
|
||||
@@ -172,12 +172,12 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
await task.queue_frame(quiet_frame)
|
||||
|
||||
|
||||
@@ -198,12 +198,12 @@ async def main():
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
await task.queue_frame(quiet_frame)
|
||||
|
||||
|
||||
@@ -104,7 +104,7 @@ async def main(room_url, token=None):
|
||||
|
||||
main_task = PipelineTask(
|
||||
main_pipeline,
|
||||
params=PipelineParams(
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -155,10 +155,8 @@ Your task is to help the user understand and learn from this article in 2 senten
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_out_sample_rate=44100,
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
PipelineParams(
|
||||
audio_out_sample_rate=44100, allow_interruptions=True, enable_metrics=True
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user