Compare commits

..

2 Commits

Author SHA1 Message Date
James Hush
50e9ef11e5 Update example 2025-02-20 13:04:34 +08:00
James Hush
2cd8ff3848 Recording issue 2025-02-20 12:57:06 +08:00
162 changed files with 643 additions and 1666 deletions

View File

@@ -5,35 +5,10 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## Unreleased
### Added
- Pipecat version will now be logged on every application startup. This will
help us identify what version we are running in case of any issues.
- Added a new `StopFrame` which can be used to stop a pipeline task while
keeping the frame processors running. The frame processors could then be used
in a different pipeline. The difference between a `StopFrame` and a
`StopTaskFrame` is that, as with `EndFrame` and `EndTaskFrame`, the
`StopFrame` is pushed from the task and the `StopTaskFrame` is pushed upstream
inside the pipeline by any processor.
- Added a new `PipelineTask` parameter `observers` that replaces the previous
`PipelineParams.observers`.
- Added a new `PipelineTask` parameter `check_dangling_tasks` to enable or
disable checking for frame processors' dangling tasks when the Pipeline
finishes running.
- Added new `on_completion_timeout` event for LLM services (all OpenAI-based
services, Anthropic and Google). Note that this event will only get triggered
if LLM timeouts are setup and if the timeout was reached. It can be useful to
retrigger another completion and see if the timeout was just a blip.
- Added new log observers `LLMLogObserver` and `TranscriptionLogObserver` that
can be useful for debugging your pipelines.
- Added `room_url` property to `DailyTransport`.
- Added `addons` argument to `DeepgramSTTService`.
@@ -42,23 +17,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- ⚠️ `PipelineTask` now requires keyword arguments (except for the first one for
the pipeline).
- The base `TTSService` class now strips leading newlines before sending text
to the TTS provider. This change is to solve issues where some TTS providers,
like Azure, would not output text due to newlines.
- `GrokLLMSService` now uses `grok-2` as the default model.
- `AnthropicLLMService` now uses `claude-3-7-sonnet-20250219` as the default
model.
- `RimeHttpTTSService` needs an `aiohttp.ClientSession` to be passed to the
constructor as all the other HTTP-based services.
- `RimeHttpTTSService` doesn't use a default voice anymore.
- `DeepgramSTTService` now uses the new `nova-3` model by default. If you want
to use the previous model you can pass `LiveOptions(model="nova-2-general")`.
(see https://deepgram.com/learn/introducing-nova-3-speech-to-text-api)
@@ -67,50 +25,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
```
### Deprecated
- `PipelineParams.observers` is now deprecated, you the new `PipelineTask`
parameter `observers`.
### Removed
- Remove `TransportParams.audio_out_is_live` since it was not being used at all.
### Fixed
- Fixed an `AudioContextWordTTSService` issue that would cause an `EndFrame` to
disconnect from the TTS service before audio from all the contexts was
received. This affected services like Cartesia and Rime.
- Fixed an issue that was not allowing to pass an `OpenAILLMContext` to create
`GoogleLLMService`'s context aggregators.
- Fixed a `ElevenLabsTTSService`, `FishAudioTTSService`, `LMNTTTSService` and
`PlayHTTTSService` issue that was resulting in audio requested before an
interruption being played after an interruption.
- Fixed `match_endofsentence` support for ellipses.
- Fixed an issue that would cause undesired interruptions via
`EmulateUserStartedSpeakingFrame` when only interim transcriptions (i.e. no
final transcriptions) where received.
- Fixed an issue where `EndTaskFrame` was not triggering
`on_client_disconnected` or closing the WebSocket in FastAPI.
- Fixed an issue in `DeepgramSTTService` where the `sample_rate` passed to the
`LiveOptions` was not being used, causing the service to use the default
sample rate of pipeline.
- Fixed a context aggregator issue that would not append the LLM text response
to the context if a function call happened in the same LLM turn.
- Fixed an issue that was causing HTTP TTS services to push `TTSStoppedFrame`
more than once.
- Fixed a `FishAudioTTSService` issue where `TTSStoppedFrame` was not being
pushed.
- Fixed an issue that `start_callback` was not invoked for some LLM services.
- Fixed an issue that would cause `DeepgramSTTService` to stop working after an
@@ -120,10 +36,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
- Fixed a `STTMuteFilter` issue that would not mute user audio frames causing
transcriptions to be generated by the STT service.
### Other
- Added Gemini support to `examples/phone-chatbot`.
## [0.0.57] - 2025-02-14
### Added

View File

@@ -3,10 +3,10 @@ coverage~=7.6.12
grpcio-tools~=1.67.1
pip-tools~=7.4.1
pre-commit~=4.0.1
pyright~=1.1.394
pyright~=1.1.393
pytest~=8.3.4
pytest-asyncio~=0.25.3
ruff~=0.9.7
pytest-asyncio~=0.25.2
ruff~=0.9.5
setuptools~=70.0.0
setuptools_scm~=8.1.0
python-dotenv~=1.0.1

View File

@@ -18,9 +18,6 @@ AZURE_DALLE_API_KEY=...
AZURE_DALLE_ENDPOINT=https://...
AZURE_DALLE_MODEL=...
# Cartesia
CARTESIA_API_KEY=...
# Daily
DAILY_API_KEY=...
DAILY_SAMPLE_ROOM_URL=https://...

View File

@@ -17,7 +17,7 @@ from runner import configure
from pipecat.frames.frames import AudioRawFrame, EndFrame, OutputAudioRawFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport

View File

@@ -119,7 +119,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -124,7 +124,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):

View File

@@ -70,7 +70,7 @@ async def main(room_url: str, token: str):
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -62,7 +62,7 @@ async def main(room_url: str, token: str):
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -18,7 +18,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.fal import FalImageGenService
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
load_dotenv(override=True)
@@ -33,9 +34,7 @@ async def main():
transport = TkLocalTransport(
tk_root,
TkTransportParams(
camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024
),
TransportParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
)
imagegen = FalImageGenService(

View File

@@ -44,8 +44,7 @@ async def main():
runner = PipelineRunner()
task = PipelineTask(
Pipeline([imagegen, transport.output()]),
params=PipelineParams(enable_metrics=True),
Pipeline([imagegen, transport.output()]), PipelineParams(enable_metrics=True)
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -30,7 +30,8 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport
load_dotenv(override=True)
@@ -151,7 +152,7 @@ async def main():
transport = TkLocalTransport(
tk_root,
TkTransportParams(
TransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,

View File

@@ -105,10 +105,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
PipelineParams(enable_metrics=True, enable_usage_metrics=True),
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -127,7 +127,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -76,7 +76,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -21,6 +21,11 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.transports.services.helpers.daily_rest import (
DailyMeetingTokenParams,
DailyRESTHelper,
DailyRoomParams,
)
load_dotenv(override=True)
@@ -30,10 +35,31 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
daily_rest_helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=session,
)
room = await daily_rest_helper.create_room(
params=DailyRoomParams(properties={"enable_recording": "cloud"})
)
params = DailyMeetingTokenParams(
properties={
"enable_recording": "cloud",
"start_cloud_recording": True,
}
)
token = await daily_rest_helper.get_token(
room_url=room.url, expiry_time=60 * 60, params=params
)
logger.debug(f"Room URL: {room.url} Room token: {token}")
transport = DailyTransport(
room_url,
room.url,
token,
"Respond bot",
DailyParams(
@@ -74,7 +100,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
@@ -85,6 +111,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# await transport.start_recording()
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -79,7 +79,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -103,7 +103,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -75,7 +75,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -77,7 +77,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -83,7 +83,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -75,7 +75,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -80,7 +80,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -71,7 +71,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -88,7 +88,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -79,7 +79,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -80,7 +80,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -76,7 +76,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -251,7 +251,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -78,11 +78,7 @@ async def main():
runner = PipelineRunner()
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=24000,
audio_out_sample_rate=24000,
),
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
)
await runner.run(task)

View File

@@ -24,7 +24,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -66,7 +67,7 @@ async def main():
tk_transport = TkLocalTransport(
tk_root,
TkTransportParams(
TransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
@@ -82,11 +83,7 @@ async def main():
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=24000,
audio_out_sample_rate=24000,
),
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
)
async def run_tk():

View File

@@ -76,7 +76,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -112,7 +112,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -99,13 +99,7 @@ async def main():
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -153,13 +153,7 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -152,7 +152,7 @@ indicate you should use the get_image tool are:
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -116,7 +116,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -113,7 +113,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -117,7 +117,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -116,7 +116,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -116,7 +116,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -123,7 +123,7 @@ Start by asking me for my location. Then, use 'get_weather_current' to give me a
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -123,7 +123,7 @@ Start by asking me for my location. Then, use 'get_weather_current' to give me a
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -117,7 +117,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -83,7 +83,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -133,7 +133,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -126,7 +126,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -85,13 +85,7 @@ async def main():
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
# When a participant joins, start transcription for that participant so the
# bot can "hear" and respond to them.

View File

@@ -108,7 +108,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,

View File

@@ -38,6 +38,7 @@ async def main():
"GStreamer",
DailyParams(
audio_out_enabled=True,
audio_out_is_live=True,
camera_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,

View File

@@ -16,13 +16,10 @@ from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAIRealtimeBetaLLMService,
@@ -143,29 +140,21 @@ Remember, your responses should be short. Just one or two sentences, usually."""
tools,
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Create transcript processor and handler
transcript = TranscriptProcessor()
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
transcript.user(), # User transcripts
context_aggregator.user(),
llm, # LLM
context_aggregator.assistant(),
transcript.assistant(), # Assistant transcripts
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
@@ -173,16 +162,9 @@ Remember, your responses should be short. Just one or two sentences, usually."""
),
)
# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
for msg in frame.messages:
logger.debug(msg)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# await transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -212,7 +212,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -237,7 +237,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -209,7 +209,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -263,7 +263,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -87,7 +87,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
# We just use 16000 because that's what Tavus is expecting and
# we avoid resampling.
audio_in_sample_rate=16000,

View File

@@ -145,7 +145,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -138,7 +138,6 @@ class OutputGate(FrameProcessor):
self._gate_open = start_open
self._frames_buffer = []
self._notifier = notifier
self._gate_task = None
def close_gate(self):
self._gate_open = False
@@ -179,13 +178,10 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
if not self._gate_task:
self._gate_task = self.create_task(self._gate_task_handler())
self._gate_task = self.create_task(self._gate_task_handler())
async def _stop(self):
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
await self.cancel_task(self._gate_task)
async def _gate_task_handler(self):
while True:
@@ -355,7 +351,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -342,7 +342,6 @@ class OutputGate(FrameProcessor):
self._gate_open = start_open
self._frames_buffer = []
self._notifier = notifier
self._gate_task = None
def close_gate(self):
self._gate_open = False
@@ -383,13 +382,10 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
if not self._gate_task:
self._gate_task = self.create_task(self._gate_task_handler())
self._gate_task = self.create_task(self._gate_task_handler())
async def _stop(self):
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
await self.cancel_task(self._gate_task)
async def _gate_task_handler(self):
while True:
@@ -564,7 +560,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -25,8 +25,10 @@ from pipecat.frames.frames import (
InputAudioRawFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TextFrame,
TranscriptionFrame,
@@ -553,7 +555,6 @@ class OutputGate(FrameProcessor):
self._notifier = notifier
self._context = context
self._transcription_buffer = user_transcription_buffer
self._gate_task = None
def close_gate(self):
self._gate_open = False
@@ -601,13 +602,10 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
if not self._gate_task:
self._gate_task = self.create_task(self._gate_task_handler())
self._gate_task = self.create_task(self._gate_task_handler())
async def _stop(self):
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
await self.cancel_task(self._gate_task)
async def _gate_task_handler(self):
while True:
@@ -742,7 +740,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -87,7 +87,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -122,7 +122,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -354,7 +354,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -63,7 +63,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -89,7 +89,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -120,7 +120,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -79,7 +79,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -106,7 +106,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024-2025, Daily
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
@@ -34,7 +34,7 @@ search_tool = {"google_search": {}}
tools = [search_tool]
system_instruction = """
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
Always use the google search API to retrieve the latest news. You must also use it to check which day is today.
@@ -93,7 +93,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -83,7 +83,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),

View File

@@ -150,7 +150,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -150,7 +150,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -178,7 +178,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -18,10 +18,12 @@ from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartInterruptionFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -71,6 +73,38 @@ class DebugObserver(BaseObserver):
logger.info(f"🤖 BOT STOP SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
class LLMLogObserver(BaseObserver):
"""Observer to log LLM activity to the console.
Logs all frame instances of:
- LLMFullResponseStartFrame (only from LLM service)
- LLMTextFrame
- LLMFullResponseEndFrame (only from LLM service)
This allows you to track when the LLM starts responding, what it generates, and when it finishes.
Log format: [LLM EVENT]: [details] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
time_sec = timestamp / 1_000_000_000
# Only log start/end frames from OpenAILLMService
if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
if isinstance(src, OpenAILLMService):
event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
logger.info(f"🧠 LLM {event} RESPONSE at {time_sec:.2f}s")
# Log all LLMTextFrames
elif isinstance(frame, LLMTextFrame):
logger.info(f"🧠 LLM GENERATING: {frame.text!r} at {time_sec:.2f}s")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
@@ -117,13 +151,13 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
observers=[DebugObserver(), LLMLogObserver()],
),
observers=[DebugObserver(), LLMLogObserver()],
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -32,7 +32,7 @@ async def main():
pipeline = Pipeline([NullProcessor()])
task = PipelineTask(pipeline, params=PipelineParams(enable_heartbeats=True))
task = PipelineTask(pipeline, PipelineParams(enable_heartbeats=True))
runner = PipelineRunner()

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024-2025, Daily
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
@@ -38,7 +38,7 @@ search_tool = {"google_search_retrieval": {}}
tools = [search_tool]
system_instruction = """
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
Always use the google search API to retrieve the latest news. You must also use it to check which day is today.
@@ -117,7 +117,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -230,7 +230,7 @@ Your response will be turned into speech so use only simple words and punctuatio
)
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -92,8 +92,10 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(allow_interruptions=True),
observers=[rtvi.observer()],
params=PipelineParams(
allow_interruptions=True,
observers=[rtvi.observer()],
),
)
@rtvi.event_handler("on_client_ready")

View File

@@ -140,8 +140,10 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(allow_interruptions=True),
observers=[GoogleRTVIObserver(rtvi)],
PipelineParams(
allow_interruptions=True,
observers=[GoogleRTVIObserver(rtvi)],
),
)
@rtvi.event_handler("on_client_ready")

View File

@@ -346,7 +346,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=False))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=False))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -1,5 +1,3 @@
<!-- @format -->
<div align="center">
 <img alt="pipecat" width="300px" height="auto" src="image.png">
</div>
@@ -106,41 +104,6 @@ curl -X POST "http://localhost:7860/daily_start_bot" \
-d '{"dialoutNumber": "+18057145330", "detectVoicemail": true}'
```
### New! Using Gemini 2.0 Flash Lite with Daily
We have introduced support for Google's Gemini 2.0 Flash Lite model in this example. This lightweight model offers faster response times and reduced costs while maintaining good conversational capabilities.
**Quick Start**
To use the Gemini-based bot instead of OpenAI:
```shell
curl -X POST "http://localhost:7860/daily_gemini_start_bot" \ py pipecat
-H "Content-Type: application/json" \
-d '{"detectVoicemail": true}'
```
All request body parameters supported by /daily_start_bot (such as detectVoicemail, dialoutNumber, etc.) are also compatible with /daily_gemini_start_bot.
This example uses context switching to help steer the bot in the right direction. As Flash Lite is a smaller model, breaking the prompt down into smaller piece helps to improve the bot's accuracy.
For example, instead of giving one large prompt like:
```python
system_instruction="""You are a chatbot that needs to detect if you're talking to a voicemail system or human, then either leave a message or have a conversation. If it's voicemail, say "Hello, this is a message..." and hang up. If it's a human, introduce yourself and be helpful until they say goodbye."""
```
We break it into stages:
First prompt focuses only on detection: "Determine if this is voicemail or human"
After detection, we switch to a new context: either "Leave this specific voicemail message" or "Have a conversation with the human".
**Implementation Details**
The implementation is available in bot_daily_gemini.py and features:
- Staged prompting approach: Breaking down complex tasks into smaller, more focused prompts to improve the lightweight model's performance
- Dynamic context switching: The bot can change its behavior in real-time based on what it detects (voicemail vs. human caller)
- Function-based architecture: Uses function calling to trigger context switches and call termination
### More information
For more configuration options, please consult [Daily's API documentation](https://docs.daily.co).

View File

@@ -49,11 +49,7 @@ async def main(
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
# We don't want to specify dial-in settings if we're not dialing in
dialin_settings = None
if callId and callDomain:
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport = DailyTransport(
room_url,
token,
@@ -100,16 +96,8 @@ async def main(
- **"Please leave a message after the beep."**
- **"No one is available to take your call."**
- **"Record your message after the tone."**
- **"Please leave a message after the beep"**
- **"You have reached voicemail for..."**
- **"You have reached [phone number]"**
- **"[phone number] is unavailable"**
- **"The person you are trying to reach..."**
- **"The number you have dialed..."**
- **"Your call has been forwarded to an automated voice messaging system"**
- **Any phrase that suggests an answering machine or voicemail.**
- **ASSUME IT IS A VOICEMAIL. DO NOT WAIT FOR MORE CONFIRMATION.**
- **IF THE CALL SAYS "PLEASE LEAVE A MESSAGE AFTER THE BEEP", WAIT FOR THE BEEP BEFORE LEAVING A MESSAGE.**
#### **Step 2: Leave a Voicemail Message**
- Immediately say:
@@ -122,9 +110,7 @@ async def main(
- If the call is answered by a human, say:
*"Oh, hello! I'm a friendly chatbot. Is there anything I can help you with?"*
- Keep responses **brief and helpful**.
- If the user no longer needs assistance, say:
*"Okay, thank you! Have a great day!"*
-**Then call `terminate_call` immediately.**
- If the user no longer needs assistance, **call `terminate_call` immediately.**
---
@@ -150,7 +136,7 @@ async def main(
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
if dialout_number:
logger.debug("dialout number detected; doing dialout")

View File

@@ -1,339 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
import sys
from dataclasses import dataclass
from typing import Optional
import google.ai.generativelanguage as glm
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
EndTaskFrame,
Frame,
InputAudioRawFrame,
SystemFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.ai_services import LLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
class UserAudioCollector(FrameProcessor):
"""This FrameProcessor collects audio frames in a buffer, then adds them to the
LLM context when the user stops speaking.
"""
def __init__(self, context, user_context_aggregator):
super().__init__()
self._context = context
self._user_context_aggregator = user_context_aggregator
self._audio_frames = []
self._start_secs = 0.2 # this should match VAD start_secs (hardcoding for now)
self._user_speaking = False
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
# We could gracefully handle both audio input and text/transcription input ...
# but let's leave that as an exercise to the reader. :-)
return
if isinstance(frame, UserStartedSpeakingFrame):
self._user_speaking = True
elif isinstance(frame, UserStoppedSpeakingFrame):
self._user_speaking = False
self._context.add_audio_frames_message(audio_frames=self._audio_frames)
await self._user_context_aggregator.push_frame(
self._user_context_aggregator.get_context_frame()
)
elif isinstance(frame, InputAudioRawFrame):
if self._user_speaking:
self._audio_frames.append(frame)
else:
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
# frames as necessary. Assume all audio frames have the same duration.
self._audio_frames.append(frame)
frame_duration = len(frame.audio) / 16 * frame.num_channels / frame.sample_rate
buffer_duration = frame_duration * len(self._audio_frames)
while buffer_duration > self._start_secs:
self._audio_frames.pop(0)
buffer_duration -= frame_duration
await self.push_frame(frame, direction)
class ContextSwitcher:
def __init__(self, llm, context_aggregator):
self._llm = llm
self._context_aggregator = context_aggregator
async def switch_context(self, system_instruction):
"""Switch the context to a new system instruction based on what the bot hears."""
# Create messages with updated system instruction
messages = [
{
"role": "system",
"content": system_instruction,
}
]
# Update context with new messages
self._context_aggregator.set_messages(messages)
# Get the context frame with the updated messages
context_frame = self._context_aggregator.get_context_frame()
# Trigger LLM response by pushing a context frame
await self._llm.push_frame(context_frame)
class FunctionHandlers:
def __init__(self, context_switcher):
self.context_switcher = context_switcher
async def voicemail_response(
self, function_name, tool_call_id, args, llm, context, result_callback
):
"""Function the bot can call to leave a voicemail message."""
message = """You are Chatbot leaving a voicemail message. Say EXACTLY this message and nothing else:
"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."
After saying this message, call the terminate_call function."""
await self.context_switcher.switch_context(system_instruction=message)
await result_callback("Leaving a voicemail message")
async def human_conversation(
self, function_name, tool_call_id, args, llm, context, result_callback
):
"""Function the bot can when it detects it's talking to a human."""
message = """You are Chatbot talking to a human. Be friendly and helpful.
Start with: "Hello! I'm a friendly chatbot. How can I help you today?"
Keep your responses brief and to the point. Listen to what the person says.
When the person indicates they're done with the conversation by saying something like:
- "Goodbye"
- "That's all"
- "I'm done"
- "Thank you, that's all I needed"
THEN say: "Thank you for chatting. Goodbye!" and call the terminate_call function."""
await self.context_switcher.switch_context(system_instruction=message)
await result_callback("Talking to the customer")
async def terminate_call(
function_name, tool_call_id, args, llm: LLMService, context, result_callback
):
"""Function the bot can call to terminate the call upon completion of the call."""
await llm.queue_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
async def main(
room_url: str,
token: str,
callId: str,
callDomain: str,
detect_voicemail: bool,
dialout_number: Optional[str],
):
# dialin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
# We don't want to specify dial-in settings if we're not dialing in
dialin_settings = None
if callId and callDomain:
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
dialin_settings=dialin_settings,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
# transcription_enabled=True,
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
tools = [
{
"function_declarations": [
{
"name": "switch_to_voicemail_response",
"description": "Call this function when you detect this is a voicemail system.",
},
{
"name": "switch_to_human_conversation",
"description": "Call this function when you detect this is a human.",
},
{
"name": "terminate_call",
"description": "Call this function to terminate the call.",
},
]
}
]
system_instruction = """You are Chatbot trying to determine if this is a voicemail system or a human.
If you hear any of these phrases (or very similar ones):
- "Please leave a message after the beep"
- "No one is available to take your call"
- "Record your message after the tone"
- "You have reached voicemail for..."
- "You have reached [phone number]"
- "[phone number] is unavailable"
- "The person you are trying to reach..."
- "The number you have dialed..."
- "Your call has been forwarded to an automated voice messaging system"
Then call the function switch_to_voicemail_response.
If it sounds like a human (saying hello, asking questions, etc.), call the function switch_to_human_conversation.
DO NOT say anything until you've determined if this is a voicemail or human."""
llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite-preview-02-05",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,
)
context = GoogleLLMContext()
context_aggregator = llm.create_context_aggregator(context)
audio_collector = UserAudioCollector(context, context_aggregator.user())
context_switcher = ContextSwitcher(llm, context_aggregator.user())
handlers = FunctionHandlers(context_switcher)
llm.register_function("switch_to_voicemail_response", handlers.voicemail_response)
llm.register_function("switch_to_human_conversation", handlers.human_conversation)
llm.register_function("terminate_call", terminate_call)
pipeline = Pipeline(
[
transport.input(), # Transport user input
audio_collector, # Collect audio frames
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(allow_interruptions=True),
)
if dialout_number:
logger.debug("dialout number detected; doing dialout")
# Configure some handlers for dialing out
@transport.event_handler("on_joined")
async def on_joined(transport, data):
logger.debug(f"Joined; starting dialout to: {dialout_number}")
await transport.start_dialout({"phoneNumber": dialout_number})
@transport.event_handler("on_dialout_connected")
async def on_dialout_connected(transport, data):
logger.debug(f"Dial-out connected: {data}")
@transport.event_handler("on_dialout_answered")
async def on_dialout_answered(transport, data):
logger.debug(f"Dial-out answered: {data}")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# unlike the dialin case, for the dialout case, the caller will speak first. Presumably
# they will answer the phone and say "Hello?" Since we've captured their transcript,
# That will put a frame into the pipeline and prompt an LLM completion, which is how the
# bot will then greet the user.
elif detect_voicemail:
logger.debug("Detect voicemail example. You can test this in example in Daily Prebuilt")
# For the voicemail detection case, we do not want the bot to answer the phone. We want it to wait for the voicemail
# machine to say something like 'Leave a message after the beep', or for the user to say 'Hello?'.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
else:
logger.debug("no dialout number; assuming dialin")
# Different handlers for dialin
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# For the dialin case, we want the bot to answer the phone and greet the user. We
# can prompt the bot to speak by putting the context into the pipeline.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Simple ChatBot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
parser.add_argument("-i", type=str, help="Call ID")
parser.add_argument("-d", type=str, help="Call Domain")
parser.add_argument("-v", action="store_true", help="Detect voicemail")
parser.add_argument("-o", type=str, help="Dialout number", default=None)
config = parser.parse_args()
asyncio.run(main(config.u, config.t, config.i, config.d, config.v, config.o))

View File

@@ -110,15 +110,10 @@ async def _create_daily_room(
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
print(f"Vendor: {vendor}")
if vendor == "daily":
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {callId} -d {callDomain}{' -v' if detect_voicemail else ''}"
if dialoutNumber:
bot_proc += f" -o {dialoutNumber}"
elif vendor == "daily-gemini":
bot_proc = f"python3 -m bot_daily_gemini -u {room.url} -t {token} -i {callId} -d {callDomain}{' -v' if detect_voicemail else ''}"
if dialoutNumber:
bot_proc += f" -o {dialoutNumber}"
else:
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {token} -i {callId} -s {room.config.sip_endpoint}"
@@ -206,38 +201,6 @@ async def daily_start_bot(request: Request) -> JSONResponse:
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
@app.post("/daily_gemini_start_bot")
async def daily_gemini_start_bot(request: Request) -> JSONResponse:
# The /daily_start_bot is invoked when a call is received on Daily's SIP URI
# daily_start_bot will create the room, put the call on hold until
# the bot and sip worker are ready. Daily will automatically
# forward the call to the SIP URi when dialin_ready fires.
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
# Get the dial-in properties from the request
try:
data = await request.json()
if "test" in data:
# Pass through any webhook checks
return JSONResponse({"test": True})
detect_voicemail = data.get("detectVoicemail", False)
callId = data.get("callId", None)
callDomain = data.get("callDomain", None)
dialoutNumber = data.get("dialoutNumber", None)
except Exception:
raise HTTPException(
status_code=500, detail="Missing properties 'callId', 'callDomain', or 'dialoutNumber'"
)
room: DailyRoomObject = await _create_daily_room(
room_url, callId, callDomain, dialoutNumber, "daily-gemini", detect_voicemail
)
# Grab a token for the user to join with
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
# ----------------- Main ----------------- #

View File

@@ -77,7 +77,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -90,7 +90,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(allow_interruptions=True, enable_metrics=True),
PipelineParams(allow_interruptions=True, enable_metrics=True),
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -172,12 +172,12 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
observers=[RTVIObserver(rtvi)],
)
await task.queue_frame(quiet_frame)

View File

@@ -198,12 +198,12 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
observers=[RTVIObserver(rtvi)],
)
await task.queue_frame(quiet_frame)

View File

@@ -104,7 +104,7 @@ async def main(room_url, token=None):
main_task = PipelineTask(
main_pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -155,10 +155,8 @@ Your task is to help the user understand and learn from this article in 2 senten
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_out_sample_rate=44100,
allow_interruptions=True,
enable_metrics=True,
PipelineParams(
audio_out_sample_rate=44100, allow_interruptions=True, enable_metrics=True
),
)

View File

@@ -183,12 +183,12 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=False, # We don't want to interrupt the translator bot
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -108,9 +108,7 @@ async def run_bot(websocket_client: WebSocket, stream_sid: str, testing: bool):
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000,
audio_out_sample_rate=8000,
allow_interruptions=True,
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
),
)

Some files were not shown because too many files have changed in this diff Show More