Merge branch 'main' into sarvam/stt

This commit is contained in:
shreyas-sarvam
2025-10-31 15:21:09 +05:30
33 changed files with 1766 additions and 1177 deletions

View File

@@ -12,6 +12,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added supprt for Sarvam Speech-to-Text service (`SarvamSTTService`) with streaming WebSocket
support for `saarika` (STT) and `saaras` (STT-translate) models.
- Added support for including images or audio to LLM context messages using
`LLMContext.create_image_message()` or `LLMContext.create_image_url_message()`
(not all LLMs support URLs) and `LLMContext.create_audio_message()`. For
example, when creating `LLMMessagesAppendFrame`:
```python
message = LLMContext.create_image_message(image=..., size= ...)
await self.push_frame(LLMMessagesAppendFrame(messages=[message], run_llm=True))
```
- New event handlers for the `DeepgramFluxSTTService`: `on_start_of_turn`,
`on_turn_resumed`, `on_end_of_turn`, `on_eager_end_of_turn`, `on_update`.
- Added `generation_config` parameter support to `CartesiaTTSService` and
`CartesiaHttpTTSService` for Cartesia Sonic-3 models. Includes a new
`GenerationConfig` class with `volume` (0.5-2.0), `speed` (0.6-1.5),
@@ -154,6 +167,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `UserImageRawFrame` new fields `add_to_context` and `text`. The
`add_to_context` field indicates if this image and text should be added to the
LLM context (by the LLM assistant aggregator). The `text` field, if set, might
also guide the LLM or the vision service on how to analyze the image.
- `UserImageRequestFrame` new fiels `add_to_context` and `text`. Both fields
will be used to set the same fields on the captured `UserImageRawFrame`.
- `UserImageRequestFrame` don't require function call name and ID anymore.
- Updated `MoondreamService` to process `UserImageRawFrame`.
- `VisionService` expects `UserImageRawFrame` in order to analyze images.
- `DailyTransport` triggers `on_error` event if transcription can't be started
or stopped.
@@ -178,6 +205,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Deprecated
- `LLMService.request_image_frame()` is deprecated, push a
`UserImageRequestFrame` instead.
- `UserResponseAggregator` is deprecated and will be removed in a future version.
- The `send_transcription_frames` argument to `OpenAIRealtimeLLMService` is
deprecated. Transcription frames are now always sent. They go upstream, to be
handled by the user context aggregator. See "Added" section for details.
@@ -196,6 +228,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue in `HumeTTSService` that was only using Octave 2, which does
not support the `description` field. Now, if a description is provided, it
switches to Octave 1.
- Fixed an issue where `DailyTransport` would timeout prematurely on join and on
leave.
@@ -205,7 +241,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed an issue in `ServiceSwitcher` where the `STTService`s would result in
all STT services producing `TranscriptionFrame`s.
- Fixed an issue in `HumeTTSService` that was only using Octave 2, which does not support the `description` field. Now, if a description is provided, it switches to Octave 1.
### Other
- Updated all vision 12-series foundational examples to load images from a file.
- Added 14-series video examples for different services. These new examples
request an image from the user camera through a function call.
## [0.0.91] - 2025-10-21

View File

@@ -6,6 +6,7 @@
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
@@ -20,10 +21,10 @@ from pipecat.processors.aggregators.llm_response import (
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.openai.base_llm import BaseOpenAILLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
from pipecat.services.speechmatics.tts import SpeechmaticsTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -51,121 +52,127 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Speechmatics STT Service Example
"""Speechmatics STT and TTS Service Example
This example demonstrates using Speechmatics Speech-to-Text service with speaker diarization and intelligent speaker management. Key features:
This example demonstrates using Speechmatics Speech-to-Text and Text-to-Speech services
with speaker diarization and intelligent speaker management. Key features:
1. Speaker Diarization
1. Speaker Diarization (STT)
- Automatically identifies and distinguishes between different speakers
- First speaker is identified as 'S1', others get subsequent IDs
- Uses `enable_diarization` parameter to manage speaker detection
2. Smart Speaker Control
2. Smart Speaker Control (STT)
- `focus_speakers` parameter lets you target specific speakers (e.g. ["S1"])
- Other speakers will be wrapped in PASSIVE tags
- Only processes speech from focused speakers
- Words from all speakers are wrapped with XML tags for clear speaker identification
- Other speakers' speech only sent when focused speaker is active
3. Voice Activity Detection
3. Voice Activity Detection (STT)
- Built-in VAD using `enable_vad` parameter
- Remove `vad_analyzer` from `transport` config to use module's VAD
- Emits speaker started/stopped events
4. Configuration Options
4. Text-to-Speech (TTS)
- Low latency streaming audio synthesis
- Multiple voice options available including `sarah`, `theo`, and `megan`
5. Configuration Options
- `operating_point` parameter defaults to `ENHANCED` for optimal accuracy
- Configurable `end_of_utterance_silence_trigger` (default 0.5s)
- Customizable speaker formatting
- Additional diarization settings available
For detailed information about operating points and configuration:
https://docs.speechmatics.com/rt-api-ref
For detailed information:
- STT: https://docs.speechmatics.com/rt-api-ref
- TTS: https://docs.speechmatics.com/text-to-speech/quickstart
"""
logger.info(f"Starting bot")
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
params=SpeechmaticsSTTService.InputParams(
language=Language.EN,
enable_vad=True,
enable_diarization=True,
focus_speakers=["S1"],
end_of_utterance_silence_trigger=0.5,
speaker_active_format="<{speaker_id}>{text}</{speaker_id}>",
speaker_passive_format="<PASSIVE><{speaker_id}>{text}</{speaker_id}></PASSIVE>",
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
model="eleven_turbo_v2_5",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
params=BaseOpenAILLMService.InputParams(temperature=0.75),
)
messages = [
{
"role": "system",
"content": (
"You are a helpful British assistant called Alfred. "
"Your goal is to demonstrate your capabilities in a succinct way. "
"Your output will be converted to audio so don't include special characters in your answers. "
"Always include punctuation in your responses. "
"Give very short replies - do not give longer replies unless strictly necessary. "
"Respond to what the user said in a concise, funny, creative and helpful way. "
"Use `<Sn/>` tags to identify different speakers - do not use tags in your replies. "
"Do not respond to speakers within `<PASSIVE/>` tags unless explicitly asked to. "
async with aiohttp.ClientSession() as session:
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
params=SpeechmaticsSTTService.InputParams(
language=Language.EN,
enable_vad=True,
enable_diarization=True,
focus_speakers=["S1"],
end_of_utterance_silence_trigger=0.5,
speaker_active_format="<{speaker_id}>{text}</{speaker_id}>",
speaker_passive_format="<PASSIVE><{speaker_id}>{text}</{speaker_id}></PASSIVE>",
),
},
]
)
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(aggregation_timeout=0.005),
)
tts = SpeechmaticsTTSService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
voice_id="sarah",
aiohttp_session=session,
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
params=BaseOpenAILLMService.InputParams(temperature=0.75),
)
messages = [
{
"role": "system",
"content": (
"You are a helpful British assistant called Sarah. "
"Your goal is to demonstrate your capabilities in a succinct way. "
"Your output will be converted to audio so don't include special characters in your answers. "
"Always include punctuation in your responses. "
"Give very short replies - do not give longer replies unless strictly necessary. "
"Respond to what the user said in a concise, funny, creative and helpful way. "
"Use `<Sn/>` tags to identify different speakers - do not use tags in your replies. "
"Do not respond to speakers within `<PASSIVE/>` tags unless explicitly asked to. "
),
},
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(aggregation_timeout=0.005),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say a short hello to the user."})
await task.queue_frames([LLMRunFrame()])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say a short hello to the user."})
await task.queue_frames([LLMRunFrame()])
await runner.run(task)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):

View File

@@ -6,6 +6,7 @@
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
@@ -24,10 +25,10 @@ from pipecat.processors.aggregators.llm_response import (
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.openai.base_llm import BaseOpenAILLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
from pipecat.services.speechmatics.tts import SpeechmaticsTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -61,100 +62,106 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Run example using Speechmatics STT.
"""Run example using Speechmatics STT and TTS.
This example will use diarization within our STT service and output the words spoken by
each individual speaker and wrap them with XML tags for the LLM to process. Note the
instructions in the system context for the LLM. This greatly improves the conversation
experience by allowing the LLM to understand who is speaking in a multi-party call.
This example demonstrates a complete Speechmatics integration with both Speech-to-Text
and Text-to-Speech services:
By default, this example will use our ENHANCED operating point, which is optimized for
high accuracy. You can change this by setting the `operating_point` parameter to a different
value.
STT Features:
- Diarization to identify and distinguish between different speakers
- Words spoken by each speaker are wrapped with XML tags for LLM processing
- System context instructions help the LLM understand multi-party conversations
- ENHANCED operating point by default for optimal accuracy
For more information on operating points, see the Speechmatics documentation:
https://docs.speechmatics.com/rt-api-ref
TTS Features:
- Low latency streaming audio synthesis
- Multiple voice options available including `sarah`, `theo`, and `megan`
For more information:
- STT: https://docs.speechmatics.com/rt-api-ref
- TTS: https://docs.speechmatics.com/text-to-speech/quickstart
"""
logger.info(f"Starting bot")
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
params=SpeechmaticsSTTService.InputParams(
language=Language.EN,
enable_diarization=True,
end_of_utterance_silence_trigger=0.5,
speaker_active_format="<{speaker_id}>{text}</{speaker_id}>",
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
model="eleven_turbo_v2_5",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
params=BaseOpenAILLMService.InputParams(temperature=0.75),
)
messages = [
{
"role": "system",
"content": (
"You are a helpful British assistant called Alfred. "
"Your goal is to demonstrate your capabilities in a succinct way. "
"Your output will be converted to audio so don't include special characters in your answers. "
"Always include punctuation in your responses. "
"Give very short replies - do not give longer replies unless strictly necessary. "
"Respond to what the user said in a concise, funny, creative and helpful way. "
"Use `<Sn/>` tags to identify different speakers - do not use tags in your replies."
async with aiohttp.ClientSession() as session:
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
params=SpeechmaticsSTTService.InputParams(
language=Language.EN,
enable_diarization=True,
end_of_utterance_silence_trigger=0.5,
speaker_active_format="<{speaker_id}>{text}</{speaker_id}>",
),
},
]
)
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(aggregation_timeout=0.005),
)
tts = SpeechmaticsTTSService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
voice_id="sarah",
aiohttp_session=session,
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
params=BaseOpenAILLMService.InputParams(temperature=0.75),
)
messages = [
{
"role": "system",
"content": (
"You are a helpful British assistant called Sarah. "
"Your goal is to demonstrate your capabilities in a succinct way. "
"Your output will be converted to audio so don't include special characters in your answers. "
"Always include punctuation in your responses. "
"Give very short replies - do not give longer replies unless strictly necessary. "
"Respond to what the user said in a concise, funny, creative and helpful way. "
"Use `<Sn/>` tags to identify different speakers - do not use tags in your replies."
),
},
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(aggregation_timeout=0.005),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say a short hello to the user."})
await task.queue_frames([LLMRunFrame()])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say a short hello to the user."})
await task.queue_frames([LLMRunFrame()])
await runner.run(task)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):

View File

@@ -101,6 +101,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client disconnected")
await task.cancel()
@stt.event_handler("on_update")
async def on_deepgram_flux_update(stt, transcript):
logger.debug(f"On deeggram flux update: {transcript}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -4,36 +4,25 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from typing import Optional
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
TextFrame,
TTSSpeakFrame,
UserImageRawFrame,
UserImageRequestFrame,
)
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
get_transport_client_id,
maybe_capture_participant_camera,
)
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
@@ -43,49 +32,6 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
class UserImageRequester(FrameProcessor):
"""Converts incoming text into requests for user images."""
def __init__(self, participant_id: Optional[str] = None):
super().__init__()
self._participant_id = participant_id
def set_participant_id(self, participant_id: str):
self._participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id, context=frame.text),
FrameDirection.UPSTREAM,
)
else:
await self.push_frame(frame, direction)
class UserImageProcessor(FrameProcessor):
"""Converts incoming user images into context frames."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserImageRawFrame):
if frame.request and frame.request.context:
context = LLMContext()
context.add_image_frame_message(
image=frame.image,
text=frame.request.context,
size=frame.size,
format=frame.format,
)
frame = LLMContextFrame(context)
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -93,14 +39,12 @@ transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
@@ -110,33 +54,34 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
user_response = UserResponseAggregator()
# Initialize the image requester without setting the participant ID yet
image_requester = UserImageRequester()
image_processor = UserImageProcessor()
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# OpenAI GPT-4o for vision analysis
openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are also able to describe images.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
stt,
user_response,
image_requester,
image_processor,
openai,
tts,
transport.output(),
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -151,16 +96,28 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
logger.info(f"Client connected")
await maybe_capture_participant_camera(transport, client)
if not runner_args.body:
script_dir = os.path.dirname(__file__)
runner_args.body = {
"image_path": os.path.join(script_dir, "assets", "cat.jpg"),
"question": "Describe this image",
}
# Set the participant ID in the image requester
client_id = get_transport_client_id(transport, client)
image_requester.set_participant_id(client_id)
image_path = runner_args.body["image_path"]
question = runner_args.body["question"]
# Welcome message
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see."))
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,
text=question,
)
messages.append(message)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -1,180 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from typing import Optional
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
TextFrame,
TTSSpeakFrame,
UserImageRawFrame,
UserImageRequestFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
get_transport_client_id,
maybe_capture_participant_camera,
)
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.moondream.vision import MoondreamService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
class UserImageRequester(FrameProcessor):
"""Converts incoming text into requests for user images."""
def __init__(self, participant_id: Optional[str] = None):
super().__init__()
self._participant_id = participant_id
def set_participant_id(self, participant_id: str):
self._participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id, context=frame.text),
FrameDirection.UPSTREAM,
)
else:
await self.push_frame(frame, direction)
class UserImageProcessor(FrameProcessor):
"""Converts incoming user images into context frames."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserImageRawFrame):
if frame.request and frame.request.context:
context = LLMContext()
context.add_image_frame_message(
image=frame.image,
text=frame.request.context,
size=frame.size,
format=frame.format,
)
frame = LLMContextFrame(context)
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
user_response = UserResponseAggregator()
# Initialize the image requester without setting the participant ID yet
image_requester = UserImageRequester()
image_processor = UserImageProcessor()
# If you run into weird description, try with use_cpu=True
moondream = MoondreamService()
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_response,
image_requester,
image_processor,
moondream,
tts,
transport.output(),
]
)
task = PipelineTask(
pipeline,
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
await maybe_capture_participant_camera(transport, client)
# Set the participant ID in the image requester
client_id = get_transport_client_id(transport, client)
image_requester.set_participant_id(client_id)
# Welcome message
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see."))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -4,36 +4,25 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from typing import Optional
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
TextFrame,
TTSSpeakFrame,
UserImageRawFrame,
UserImageRequestFrame,
)
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
get_transport_client_id,
maybe_capture_participant_camera,
)
from pipecat.runner.utils import create_transport
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
@@ -43,49 +32,6 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
class UserImageRequester(FrameProcessor):
"""Converts incoming text into requests for user images."""
def __init__(self, participant_id: Optional[str] = None):
super().__init__()
self._participant_id = participant_id
def set_participant_id(self, participant_id: str):
self._participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id, context=frame.text),
FrameDirection.UPSTREAM,
)
else:
await self.push_frame(frame, direction)
class UserImageProcessor(FrameProcessor):
"""Converts incoming user images into context frames."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserImageRawFrame):
if frame.request and frame.request.context:
context = LLMContext()
context.add_image_frame_message(
image=frame.image,
text=frame.request.context,
size=frame.size,
format=frame.format,
)
frame = LLMContextFrame(context)
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -93,14 +39,12 @@ transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
@@ -110,33 +54,34 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
user_response = UserResponseAggregator()
# Initialize the image requester without setting the participant ID yet
image_requester = UserImageRequester()
image_processor = UserImageProcessor()
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Anthropic for vision analysis
anthropic = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are also able to describe images.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
stt,
user_response,
image_requester,
image_processor,
anthropic,
tts,
transport.output(),
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -151,16 +96,28 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
logger.info(f"Client connected")
await maybe_capture_participant_camera(transport, client)
if not runner_args.body:
script_dir = os.path.dirname(__file__)
runner_args.body = {
"image_path": os.path.join(script_dir, "assets", "cat.jpg"),
"question": "Describe this image",
}
# Set the participant ID in the image requester
client_id = get_transport_client_id(transport, client)
image_requester.set_participant_id(client_id)
image_path = runner_args.body["image_path"]
question = runner_args.body["question"]
# Welcome message
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see."))
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,
text=question,
)
messages.append(message)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -0,0 +1,148 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.llm import AWSBedrockLLMService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = AWSBedrockLLMService(
aws_region="us-west-2",
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
# Note: usually, prefer providing latency="optimized" param.
# Here we can't because AWS Bedrock doesn't support it for Claude 3.7,
# which we need for image input.
params=AWSBedrockLLMService.InputParams(temperature=0.8),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are also able to describe images.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
if not runner_args.body:
script_dir = os.path.dirname(__file__)
runner_args.body = {
"image_path": os.path.join(script_dir, "assets", "cat.jpg"),
"question": "Describe this image",
}
image_path = runner_args.body["image_path"]
question = runner_args.body["question"]
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,
text=question,
)
messages.append(message)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,141 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are also able to describe images.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
if not runner_args.body:
script_dir = os.path.dirname(__file__)
runner_args.body = {
"image_path": os.path.join(script_dir, "assets", "cat.jpg"),
"question": "Describe this image",
}
image_path = runner_args.body["image_path"]
question = runner_args.body["question"]
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,
text=question,
)
messages.append(message)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,122 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import UserImageRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.moondream.vision import MoondreamService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
vision = MoondreamService()
pipeline = Pipeline(
[
vision, # Vision
tts, # TTS
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
if not runner_args.body:
script_dir = os.path.dirname(__file__)
runner_args.body = {
"image_path": os.path.join(script_dir, "assets", "cat.jpg"),
"question": "Describe this image",
}
image_path = runner_args.body["image_path"]
question = runner_args.body["question"]
# Describe the image.
image = Image.open(image_path)
await task.queue_frames(
[
UserImageRawFrame(
image=image.tobytes(),
format="RGB",
size=image.size,
text=question,
)
]
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -4,8 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
@@ -17,12 +15,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -39,34 +38,30 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
# Global variable to store the client ID
client_id = ""
async def fetch_user_image(params: FunctionCallParams):
"""Fetch the user image and push it to the LLM.
async def get_weather(params: FunctionCallParams):
location = params.arguments["location"]
await params.result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def get_image(params: FunctionCallParams):
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream which will be added to the context by the LLM
assistant aggregator.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={client_id}, question={question}")
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request the image frame
await params.llm.request_image_frame(
user_id=client_id,
function_name=params.function_name,
tool_call_id=params.tool_call_id,
text_content=question,
# Request a user image frame and indicate that it should be added to the
# context.
await params.llm.push_frame(
UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True),
FrameDirection.UPSTREAM,
)
# Wait a short time for the frame to be processed
await asyncio.sleep(0.5)
await params.result_callback(None)
# Return a result to complete the function call
await params.result_callback(
f"I've captured an image from your camera and I'm analyzing what you asked about: {question}"
)
# Instead of None, it's possible to also provide a tool call answer to
# tell the LLM that we are grabbing the image to analyze.
# await params.result_callback({"result": "Image is being captured."})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
@@ -100,70 +95,32 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-7-sonnet-latest",
params=AnthropicLLMService.InputParams(enable_prompt_caching=True),
)
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
# Anthropic for vision analysis
llm = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY"))
llm.register_function("fetch_user_image", fetch_user_image)
weather_function = FunctionSchema(
name="get_weather",
description="Get the current weather",
fetch_image_function = FunctionSchema(
name="fetch_user_image",
description="Called when the user requests a description of their camera feed",
properties={
"location": {
"user_id": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
"description": "The ID of the user to grab the image from",
},
},
required=["location"],
)
get_image_function = FunctionSchema(
name="get_image",
description="Get an image from the video stream.",
properties={
"question": {
"type": "string",
"description": "The question that the user is asking about the image.",
}
"description": "The question that the user is asking about the image",
},
},
required=["question"],
required=["user_id", "question"],
)
tools = ToolsSchema(standard_tools=[weather_function, get_image_function])
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
Your response will be turned into speech so use only simple words and punctuation.
You have access to two tools: get_weather and get_image.
You can respond to questions about the weather using the get_weather tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
If you need to use a tool, simply use the tool. Do not tell the user the tool you are using. Be brief and concise.
"""
tools = ToolsSchema(standard_tools=[fetch_image_function])
messages = [
{
"role": "system",
"content": [
{
"type": "text",
"text": system_prompt,
}
],
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are able to describe images from the user camera.",
},
{"role": "user", "content": "Start the conversation by introducing yourself."},
]
context = LLMContext(messages, tools)
@@ -173,11 +130,11 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User speech to text
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -196,10 +153,16 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
await maybe_capture_participant_camera(transport, client)
global client_id
# Set the participant ID in the image requester
client_id = get_transport_client_id(transport, client)
# Kick off the conversation.
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

View File

@@ -5,29 +5,23 @@
#
import os
from typing import Optional
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
TextFrame,
TTSSpeakFrame,
UserImageRawFrame,
UserImageRequestFrame,
)
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -37,54 +31,37 @@ from pipecat.runner.utils import (
from pipecat.services.aws.llm import AWSBedrockLLMService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
class UserImageRequester(FrameProcessor):
"""Converts incoming text into requests for user images."""
async def fetch_user_image(params: FunctionCallParams):
"""Fetch the user image and push it to the LLM.
def __init__(self, participant_id: Optional[str] = None):
super().__init__()
self._participant_id = participant_id
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream which will be added to the context by the LLM
assistant aggregator.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
def set_participant_id(self, participant_id: str):
self._participant_id = participant_id
# Request a user image frame and indicate that it should be added to the
# context.
await params.llm.push_frame(
UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True),
FrameDirection.UPSTREAM,
)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await params.result_callback(None)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id, context=frame.text),
FrameDirection.UPSTREAM,
)
else:
await self.push_frame(frame, direction)
class UserImageProcessor(FrameProcessor):
"""Converts incoming user images into context frames."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserImageRawFrame):
if frame.request and frame.request.context:
# Note: AWS Bedrock does not yet support the universal LLMContext
context = LLMContext()
context.add_image_frame_message(
image=frame.image,
text=frame.request.context,
size=frame.size,
format=frame.format,
)
frame = LLMContextFrame(context)
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
# Instead of None, it's possible to also provide a tool call answer to
# tell the LLM that we are grabbing the image to analyze.
# await params.result_callback({"result": "Image is being captured."})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
@@ -111,17 +88,15 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
user_response = UserResponseAggregator()
# Initialize the image requester without setting the participant ID yet
image_requester = UserImageRequester()
image_processor = UserImageProcessor()
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
# AWS for vision analysis
aws = AWSBedrockLLMService(
llm = AWSBedrockLLMService(
aws_region="us-west-2",
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
# Note: usually, prefer providing latency="optimized" param.
@@ -129,22 +104,44 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# which we need for image input.
params=AWSBedrockLLMService.InputParams(temperature=0.8),
)
llm.register_function("fetch_user_image", fetch_user_image)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
fetch_image_function = FunctionSchema(
name="fetch_user_image",
description="Called when the user requests a description of their camera feed",
properties={
"user_id": {
"type": "string",
"description": "The ID of the user to grab the image from",
},
"question": {
"type": "string",
"description": "The question that the user is asking about the image",
},
},
required=["user_id", "question"],
)
tools = ToolsSchema(standard_tools=[fetch_image_function])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are able to describe images from the user camera.",
},
]
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
stt,
user_response,
image_requester,
image_processor,
aws,
tts,
transport.output(),
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -165,10 +162,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Set the participant ID in the image requester
client_id = get_transport_client_id(transport, client)
image_requester.set_participant_id(client_id)
# Welcome message
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see."))
# Kick off the conversation.
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -5,29 +5,23 @@
#
import os
from typing import Optional
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
TextFrame,
TTSSpeakFrame,
UserImageRawFrame,
UserImageRequestFrame,
)
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -37,53 +31,37 @@ from pipecat.runner.utils import (
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
class UserImageRequester(FrameProcessor):
"""Converts incoming text into requests for user images."""
async def fetch_user_image(params: FunctionCallParams):
"""Fetch the user image and push it to the LLM.
def __init__(self, participant_id: Optional[str] = None):
super().__init__()
self._participant_id = participant_id
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream which will be added to the context by the LLM
assistant aggregator.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
def set_participant_id(self, participant_id: str):
self._participant_id = participant_id
# Request a user image frame and indicate that it should be added to the
# context.
await params.llm.push_frame(
UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True),
FrameDirection.UPSTREAM,
)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await params.result_callback(None)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id, context=frame.text),
FrameDirection.UPSTREAM,
)
else:
await self.push_frame(frame, direction)
class UserImageProcessor(FrameProcessor):
"""Converts incoming user images into context frames."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserImageRawFrame):
if frame.request and frame.request.context:
context = LLMContext()
context.add_image_frame_message(
image=frame.image,
text=frame.request.context,
size=frame.size,
format=frame.format,
)
frame = LLMContextFrame(context)
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
# Instead of None, it's possible to also provide a tool call answer to
# tell the LLM that we are grabbing the image to analyze.
# await params.result_callback({"result": "Image is being captured."})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
@@ -110,33 +88,53 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
user_response = UserResponseAggregator()
# Initialize the image requester without setting the participant ID yet
image_requester = UserImageRequester()
image_processor = UserImageProcessor()
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Google Gemini model for vision analysis
google = GoogleLLMService(model="gemini-2.0-flash-001", api_key=os.getenv("GOOGLE_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
# Google Gemini model for vision analysis
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"))
llm.register_function("fetch_user_image", fetch_user_image)
fetch_image_function = FunctionSchema(
name="fetch_user_image",
description="Called when the user requests a description of their camera feed",
properties={
"user_id": {
"type": "string",
"description": "The ID of the user to grab the image from",
},
"question": {
"type": "string",
"description": "The question that the user is asking about the image",
},
},
required=["user_id", "question"],
)
tools = ToolsSchema(standard_tools=[fetch_image_function])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are able to describe images from the user camera.",
},
]
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
stt,
user_response,
image_requester,
image_processor,
google,
tts,
transport.output(),
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -157,10 +155,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Set the participant ID in the image requester
client_id = get_transport_client_id(transport, client)
image_requester.set_participant_id(client_id)
# Welcome message
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me about what I see."))
# Kick off the conversation.
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -0,0 +1,190 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
get_transport_client_id,
maybe_capture_participant_camera,
)
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.moondream.vision import MoondreamService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
async def fetch_user_image(params: FunctionCallParams):
"""Fetch the user image.
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request a user image frame. In this case, we don't want the requested
# image to be added to the context because we will process it with
# Moondream.
await params.llm.push_frame(
UserImageRequestFrame(user_id=user_id, text=question, add_to_context=False),
FrameDirection.UPSTREAM,
)
await params.result_callback(None)
# Instead of None, it's possible to also provide a tool call answer to
# tell the LLM that we are grabbing the image to analyze.
# await params.result_callback({"result": "Image is being captured."})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm.register_function("fetch_user_image", fetch_user_image)
fetch_image_function = FunctionSchema(
name="fetch_user_image",
description="Called when the user requests a description of their camera feed",
properties={
"user_id": {
"type": "string",
"description": "The ID of the user to grab the image from",
},
"question": {
"type": "string",
"description": "The question that the user is asking about the image",
},
},
required=["user_id", "question"],
)
tools = ToolsSchema(standard_tools=[fetch_image_function])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are able to describe images from the user camera.",
},
]
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
# If you run into weird description, try with use_cpu=True
moondream = MoondreamService()
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
ParallelPipeline(
[llm], # LLM
[moondream],
),
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
await maybe_capture_participant_camera(transport, client)
# Set the participant ID in the image requester
client_id = get_transport_client_id(transport, client)
# Kick off the conversation.
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -5,7 +5,6 @@
#
import asyncio
import os
from dotenv import load_dotenv
@@ -17,12 +16,13 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -39,34 +39,30 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
# Global variable to store the client ID
client_id = ""
async def fetch_user_image(params: FunctionCallParams):
"""Fetch the user image and push it to the LLM.
async def get_weather(params: FunctionCallParams):
location = params.arguments["location"]
await params.result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def get_image(params: FunctionCallParams):
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream which will be added to the context by the LLM
assistant aggregator.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={client_id}, question={question}")
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request the image frame
await params.llm.request_image_frame(
user_id=client_id,
function_name=params.function_name,
tool_call_id=params.tool_call_id,
text_content=question,
# Request a user image frame and indicate that it should be added to the
# context.
await params.llm.push_frame(
UserImageRequestFrame(user_id=user_id, text=question, add_to_context=True),
FrameDirection.UPSTREAM,
)
# Wait a short time for the frame to be processed
await asyncio.sleep(0.5)
await params.result_callback(None)
# Return a result to complete the function call
await params.result_callback(
f"I've captured an image from your camera and I'm analyzing what you asked about: {question}"
)
# Instead of None, it's possible to also provide a tool call answer to
# tell the LLM that we are grabbing the image to analyze.
# await params.result_callback({"result": "Image is being captured."})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
@@ -101,58 +97,30 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
llm.register_function("fetch_user_image", fetch_user_image)
weather_function = FunctionSchema(
name="get_weather",
description="Get the current weather",
fetch_image_function = FunctionSchema(
name="fetch_user_image",
description="Called when the user requests a description of their camera feed",
properties={
"location": {
"user_id": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
"description": "The ID of the user to grab the image from",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location"],
)
get_image_function = FunctionSchema(
name="get_image",
description="Get an image from the video stream.",
properties={
"question": {
"type": "string",
"description": "The question that the user is asking about the image.",
}
"description": "The question that the user is asking about the image",
},
},
required=["question"],
required=["user_id", "question"],
)
tools = ToolsSchema(standard_tools=[weather_function, get_image_function])
tools = ToolsSchema(standard_tools=[fetch_image_function])
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
Your response will be turned into speech so use only simple words and punctuation.
You have access to two tools: get_weather and get_image.
You can respond to questions about the weather using the get_weather tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
"""
messages = [
{"role": "system", "content": system_prompt},
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. You are able to describe images from the user camera.",
},
]
context = LLMContext(messages, tools)
@@ -160,13 +128,13 @@ indicate you should use the get_image tool are:
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -185,10 +153,15 @@ indicate you should use the get_image tool are:
await maybe_capture_participant_camera(transport, client)
global client_id
client_id = get_transport_client_id(transport, client)
# Kick off the conversation.
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

Binary file not shown.

After

Width:  |  Height:  |  Size: 63 KiB

View File

@@ -10,9 +10,10 @@ import os
import re
import time
import wave
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple
from typing import Any, List, Optional, Tuple
import aiofiles
from deepgram import LiveOptions
@@ -53,6 +54,14 @@ EVAL_TIMEOUT_SECS = 120
EvalPrompt = str | Tuple[str, ImageFile]
@dataclass
class EvalConfig:
prompt: EvalPrompt
eval: str
eval_speaks_first: bool = False
runner_args_body: Optional[Any] = None
class EvalRunner:
def __init__(
self,
@@ -93,9 +102,7 @@ class EvalRunner:
async def run_eval(
self,
example_file: str,
prompt: EvalPrompt,
eval: str,
user_speaks_first: bool = False,
eval_config: EvalConfig,
):
if not re.match(self._pattern, example_file):
return
@@ -112,10 +119,8 @@ class EvalRunner:
try:
tasks = [
asyncio.create_task(run_example_pipeline(script_path)),
asyncio.create_task(
run_eval_pipeline(self, example_file, prompt, eval, user_speaks_first)
),
asyncio.create_task(run_example_pipeline(script_path, eval_config)),
asyncio.create_task(run_eval_pipeline(self, example_file, eval_config)),
]
_, pending = await asyncio.wait(tasks, timeout=EVAL_TIMEOUT_SECS)
if pending:
@@ -177,7 +182,7 @@ class EvalRunner:
return os.path.join(self._recordings_dir, f"{base_name}.wav")
async def run_example_pipeline(script_path: Path):
async def run_example_pipeline(script_path: Path, eval_config: EvalConfig):
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL")
module = load_module_from_path(script_path)
@@ -196,6 +201,7 @@ async def run_example_pipeline(script_path: Path):
runner_args = RunnerArguments()
runner_args.pipeline_idle_timeout_secs = PIPELINE_IDLE_TIMEOUT_SECS
runner_args.body = eval_config.runner_args_body
await module.run_bot(transport, runner_args)
@@ -203,9 +209,7 @@ async def run_example_pipeline(script_path: Path):
async def run_eval_pipeline(
eval_runner: EvalRunner,
example_file: str,
prompt: EvalPrompt,
eval: str,
user_speaks_first: bool = False,
eval_config: EvalConfig,
):
logger.info(f"Starting eval bot")
@@ -262,17 +266,16 @@ async def run_eval_pipeline(
# Load example prompt depending on image.
example_prompt = ""
example_image: Optional[ImageFile] = None
if isinstance(prompt, str):
example_prompt = prompt
elif isinstance(prompt, tuple):
example_prompt, example_image = prompt
if isinstance(eval_config.prompt, str):
example_prompt = eval_config.prompt
elif isinstance(eval_config.prompt, tuple):
example_prompt, example_image = eval_config.prompt
eval_prompt = f"The answer is correct if it matches: {eval}."
common_system_prompt = (
"The user might say things other than the answer and that's allowed. "
f"You should only call the eval function with your assessment when the user actually answers the question. {eval_prompt}"
f"You should only call the eval function when the user: {eval_config.eval}"
)
if user_speaks_first:
if eval_config.eval_speaks_first:
system_prompt = f"You are an LLM eval, be extremly brief. You will start the conversation by saying: '{example_prompt}'. {common_system_prompt}"
else:
system_prompt = f"You are an LLM eval, be extremly brief. Your goal is to first ask one question: {example_prompt}. {common_system_prompt}"
@@ -330,9 +333,9 @@ async def run_eval_pipeline(
# Default behavior is for the bot to speak first
# If the eval bot speaks first, we append the prompt to the messages
if user_speaks_first:
if eval_config.eval_speaks_first:
messages.append(
{"role": "user", "content": f"Start by saying this exactly: '{prompt}'"}
{"role": "user", "content": f"Start by saying this exactly: '{eval_config.prompt}'"}
)
await task.queue_frames([LLMRunFrame()])

View File

@@ -11,7 +11,7 @@ from datetime import datetime, timezone
from pathlib import Path
from dotenv import load_dotenv
from eval import EvalRunner
from eval import EvalConfig, EvalRunner
from loguru import logger
from PIL import Image
from utils import check_env_variables
@@ -24,189 +24,183 @@ ASSETS_DIR = SCRIPT_DIR / "assets"
FOUNDATIONAL_DIR = SCRIPT_DIR.parent.parent / "examples" / "foundational"
# Speaking order constants
USER_SPEAKS_FIRST = True
BOT_SPEAKS_FIRST = False
# Math
PROMPT_SIMPLE_MATH = "A simple math addition."
EVAL_SIMPLE_MATH = "Correct math addition."
# Weather
PROMPT_WEATHER = "What's the weather in San Francisco?"
EVAL_WEATHER = (
"Something specific about the current weather in San Francisco, including the degrees."
EVAL_SIMPLE_MATH = EvalConfig(
prompt="A simple math addition.",
eval="The user answers the math addition correctly.",
)
# Online search
PROMPT_ONLINE_SEARCH = "What's the date right now in London?"
EVAL_ONLINE_SEARCH = f"Today is {datetime.now(timezone.utc).strftime('%B %d, %Y')}."
EVAL_WEATHER = EvalConfig(
prompt="What's the weather in San Francisco?",
eval="The user says something specific about the current weather in San Francisco, including the degrees.",
)
# Switch language
PROMPT_SWITCH_LANGUAGE = "Say something in Spanish."
EVAL_SWITCH_LANGUAGE = "The user is now talking in Spanish."
EVAL_ONLINE_SEARCH = EvalConfig(
prompt="What's the date right now in London?",
eval=f"The user says today is {datetime.now(timezone.utc).strftime('%B %d, %Y')} in London.",
)
# Vision
PROMPT_VISION = ("What do you see?", Image.open(ASSETS_DIR / "cat.jpg"))
EVAL_VISION = "A cat description."
EVAL_SWITCH_LANGUAGE = EvalConfig(
prompt="Say something in Spanish.",
eval="The user talks in Spanish.",
)
EVAL_VISION_CAMERA = EvalConfig(
prompt=("Briefly describe what you see.", Image.open(ASSETS_DIR / "cat.jpg")),
eval="The user provides a cat description.",
)
def EVAL_VISION_IMAGE(*, eval_speaks_first: bool = False):
return EvalConfig(
prompt="Briefly describe this image.",
eval="The user provides a cat description.",
eval_speaks_first=eval_speaks_first,
runner_args_body={
"image_path": ASSETS_DIR / "cat.jpg",
"question": "Briefly describe this image.",
},
)
EVAL_VOICEMAIL = EvalConfig(
prompt="Please leave a message.",
eval="The user leaves a voicemail message.",
eval_speaks_first=True,
)
EVAL_CONVERSATION = EvalConfig(
prompt="Hello, this is Mark.",
eval="The user replies with a greeting.",
eval_speaks_first=True,
)
# Voicemail
PROMPT_VOICEMAIL = "Please leave a message after the beep."
EVAL_VOICEMAIL = "Assess the conversation and determine if it is a voicemail."
PROMPT_CONVERSATION = "Hello, this is Mark."
EVAL_CONVERSATION = "A start of a conversation, not a voicemail."
TESTS_07 = [
# 07 series
("07-interruptible.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07-interruptible-cartesia-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07a-interruptible-speechmatics.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07aa-interruptible-soniox.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07ab-interruptible-inworld-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07ac-interruptible-asyncai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07ac-interruptible-asyncai-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07b-interruptible-langchain.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07c-interruptible-deepgram.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07c-interruptible-deepgram-flux.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07d-interruptible-elevenlabs.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"07d-interruptible-elevenlabs-http.py",
PROMPT_SIMPLE_MATH,
EVAL_SIMPLE_MATH,
BOT_SPEAKS_FIRST,
),
("07f-interruptible-azure.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07g-interruptible-openai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07h-interruptible-openpipe.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07j-interruptible-gladia.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07k-interruptible-lmnt.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07l-interruptible-groq.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07m-interruptible-aws.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07m-interruptible-aws-strands.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("07n-interruptible-gemini.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07n-interruptible-google.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07o-interruptible-assemblyai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07q-interruptible-rime.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07q-interruptible-rime-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07r-interruptible-riva-nim.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"07s-interruptible-google-audio-in.py",
PROMPT_SIMPLE_MATH,
EVAL_SIMPLE_MATH,
BOT_SPEAKS_FIRST,
),
("07t-interruptible-fish.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07v-interruptible-neuphonic.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07v-interruptible-neuphonic-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07w-interruptible-fal.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07y-interruptible-minimax.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07z-interruptible-sarvam.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07ae-interruptible-hume.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07-interruptible.py", EVAL_SIMPLE_MATH),
("07-interruptible-cartesia-http.py", EVAL_SIMPLE_MATH),
("07a-interruptible-speechmatics.py", EVAL_SIMPLE_MATH),
("07aa-interruptible-soniox.py", EVAL_SIMPLE_MATH),
("07ab-interruptible-inworld-http.py", EVAL_SIMPLE_MATH),
("07ac-interruptible-asyncai.py", EVAL_SIMPLE_MATH),
("07ac-interruptible-asyncai-http.py", EVAL_SIMPLE_MATH),
("07b-interruptible-langchain.py", EVAL_SIMPLE_MATH),
("07c-interruptible-deepgram.py", EVAL_SIMPLE_MATH),
("07c-interruptible-deepgram-flux.py", EVAL_SIMPLE_MATH),
("07d-interruptible-elevenlabs.py", EVAL_SIMPLE_MATH),
("07d-interruptible-elevenlabs-http.py", EVAL_SIMPLE_MATH),
("07f-interruptible-azure.py", EVAL_SIMPLE_MATH),
("07g-interruptible-openai.py", EVAL_SIMPLE_MATH),
("07h-interruptible-openpipe.py", EVAL_SIMPLE_MATH),
("07j-interruptible-gladia.py", EVAL_SIMPLE_MATH),
("07k-interruptible-lmnt.py", EVAL_SIMPLE_MATH),
("07l-interruptible-groq.py", EVAL_SIMPLE_MATH),
("07m-interruptible-aws.py", EVAL_SIMPLE_MATH),
("07m-interruptible-aws-strands.py", EVAL_WEATHER),
("07n-interruptible-gemini.py", EVAL_SIMPLE_MATH),
("07n-interruptible-google.py", EVAL_SIMPLE_MATH),
("07o-interruptible-assemblyai.py", EVAL_SIMPLE_MATH),
("07q-interruptible-rime.py", EVAL_SIMPLE_MATH),
("07q-interruptible-rime-http.py", EVAL_SIMPLE_MATH),
("07r-interruptible-riva-nim.py", EVAL_SIMPLE_MATH),
("07s-interruptible-google-audio-in.py", EVAL_SIMPLE_MATH),
("07t-interruptible-fish.py", EVAL_SIMPLE_MATH),
("07v-interruptible-neuphonic.py", EVAL_SIMPLE_MATH),
("07v-interruptible-neuphonic-http.py", EVAL_SIMPLE_MATH),
("07w-interruptible-fal.py", EVAL_SIMPLE_MATH),
("07y-interruptible-minimax.py", EVAL_SIMPLE_MATH),
("07z-interruptible-sarvam.py", EVAL_SIMPLE_MATH),
("07ae-interruptible-hume.py", EVAL_SIMPLE_MATH),
# Needs a local XTTS docker instance running.
# ("07i-interruptible-xtts.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
# ("07i-interruptible-xtts.py", EVAL_SIMPLE_MATH),
# Needs a Krisp license.
# ("07p-interruptible-krisp.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
# ("07p-interruptible-krisp.py", EVAL_SIMPLE_MATH),
# Needs GPU resources.
# ("07u-interruptible-ultravox.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
# ("07u-interruptible-ultravox.py", EVAL_SIMPLE_MATH),
]
TESTS_12 = [
("12-describe-video.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST),
("12a-describe-video-gemini-flash.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST),
("12b-describe-video-gpt-4o.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST),
("12c-describe-video-anthropic.py", PROMPT_VISION, EVAL_VISION, BOT_SPEAKS_FIRST),
("12-describe-image-openai.py", EVAL_VISION_IMAGE(eval_speaks_first=True)),
("12a-describe-image-anthropic.py", EVAL_VISION_IMAGE(eval_speaks_first=True)),
("12b-describe-image-aws.py", EVAL_VISION_IMAGE(eval_speaks_first=True)),
("12c-describe-image-gemini-flash.py", EVAL_VISION_IMAGE(eval_speaks_first=True)),
("12d-describe-image-moondream.py", EVAL_VISION_IMAGE()),
]
TESTS_14 = [
("14-function-calling.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14a-function-calling-anthropic.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14b-function-calling-anthropic-video.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14d-function-calling-video.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14e-function-calling-google.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14f-function-calling-groq.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14g-function-calling-grok.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14h-function-calling-azure.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14i-function-calling-fireworks.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14j-function-calling-nim.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14k-function-calling-cerebras.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14m-function-calling-openrouter.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14n-function-calling-perplexity.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14p-function-calling-gemini-vertex-ai.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14q-function-calling-qwen.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14r-function-calling-aws.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14v-function-calling-openai.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14w-function-calling-mistral.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14x-function-calling-openpipe.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("14-function-calling.py", EVAL_WEATHER),
("14a-function-calling-anthropic.py", EVAL_WEATHER),
("14e-function-calling-google.py", EVAL_WEATHER),
("14f-function-calling-groq.py", EVAL_WEATHER),
("14g-function-calling-grok.py", EVAL_WEATHER),
("14h-function-calling-azure.py", EVAL_WEATHER),
("14i-function-calling-fireworks.py", EVAL_WEATHER),
("14j-function-calling-nim.py", EVAL_WEATHER),
("14k-function-calling-cerebras.py", EVAL_WEATHER),
("14m-function-calling-openrouter.py", EVAL_WEATHER),
("14n-function-calling-perplexity.py", EVAL_WEATHER),
("14p-function-calling-gemini-vertex-ai.py", EVAL_WEATHER),
("14q-function-calling-qwen.py", EVAL_WEATHER),
("14r-function-calling-aws.py", EVAL_WEATHER),
("14v-function-calling-openai.py", EVAL_WEATHER),
("14w-function-calling-mistral.py", EVAL_WEATHER),
("14x-function-calling-openpipe.py", EVAL_WEATHER),
# Video
("14d-function-calling-anthropic-video.py", EVAL_VISION_CAMERA),
("14d-function-calling-aws-video.py", EVAL_VISION_CAMERA),
("14d-function-calling-gemini-flash-video.py", EVAL_VISION_CAMERA),
("14d-function-calling-moondream-video.py", EVAL_VISION_CAMERA),
("14d-function-calling-openai-video.py", EVAL_VISION_CAMERA),
# Currently not working.
# ("14c-function-calling-together.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# ("14l-function-calling-deepseek.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# ("14o-function-calling-gemini-openai-format.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# ("14c-function-calling-together.py", EVAL_WEATHER),
# ("14l-function-calling-deepseek.py", EVAL_WEATHER),
# ("14o-function-calling-gemini-openai-format.py", EVAL_WEATHER),
]
TESTS_15 = [
("15a-switch-languages.py", PROMPT_SWITCH_LANGUAGE, EVAL_SWITCH_LANGUAGE, BOT_SPEAKS_FIRST),
("15a-switch-languages.py", EVAL_SWITCH_LANGUAGE),
]
TESTS_19 = [
("19-openai-realtime.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19-openai-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19-openai-realtime.py", EVAL_WEATHER),
("19-openai-realtime-beta.py", EVAL_WEATHER),
# OpenAI Realtime not released on Azure yet
# ("19a-azure-realtime.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19a-azure-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19b-openai-realtime-text.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19b-openai-realtime-beta-text.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# ("19a-azure-realtime.py", EVAL_WEATHER),
("19a-azure-realtime-beta.py", EVAL_WEATHER),
("19b-openai-realtime-text.py", EVAL_WEATHER),
("19b-openai-realtime-beta-text.py", EVAL_WEATHER),
]
TESTS_21 = [
("21a-tavus-video-service.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("21a-tavus-video-service.py", EVAL_SIMPLE_MATH),
]
TESTS_26 = [
("26-gemini-multimodal-live.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"26a-gemini-live-transcription.py",
PROMPT_SIMPLE_MATH,
EVAL_SIMPLE_MATH,
BOT_SPEAKS_FIRST,
),
(
"26b-gemini-live-function-calling.py",
PROMPT_WEATHER,
EVAL_WEATHER,
BOT_SPEAKS_FIRST,
),
("26c-gemini-live-video.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"26e-gemini-multimodal-google-search.py",
PROMPT_ONLINE_SEARCH,
EVAL_ONLINE_SEARCH,
BOT_SPEAKS_FIRST,
),
("26-gemini-live.py", EVAL_SIMPLE_MATH),
("26a-gemini-live-transcription.py", EVAL_SIMPLE_MATH),
("26b-gemini-live-function-calling.py", EVAL_WEATHER),
("26c-gemini-live-video.py", EVAL_SIMPLE_MATH),
("26e-gemini-live-google-search.py", EVAL_ONLINE_SEARCH),
("26h-gemini-live-vertex-function-calling.py", EVAL_WEATHER),
# Currently not working.
# ("26d-gemini-live-text.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"26h-gemini-live-vertex-function-calling.py",
PROMPT_WEATHER,
EVAL_WEATHER,
BOT_SPEAKS_FIRST,
),
# ("26d-gemini-live-text.py", EVAL_SIMPLE_MATH),
]
TESTS_27 = [
("27-simli-layer.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("27-simli-layer.py", EVAL_SIMPLE_MATH),
]
TESTS_40 = [
("40-aws-nova-sonic.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("40-aws-nova-sonic.py", EVAL_SIMPLE_MATH),
]
TESTS_43 = [
("43a-heygen-video-service.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("43a-heygen-video-service.py", EVAL_SIMPLE_MATH),
]
TESTS_44 = [
("44-voicemail-detection.py", PROMPT_VOICEMAIL, EVAL_VOICEMAIL, USER_SPEAKS_FIRST),
("44-voicemail-detection.py", PROMPT_CONVERSATION, EVAL_CONVERSATION, USER_SPEAKS_FIRST),
("44-voicemail-detection.py", EVAL_VOICEMAIL),
("44-voicemail-detection.py", EVAL_CONVERSATION),
]
TESTS = [
@@ -244,9 +238,9 @@ async def main(args: argparse.Namespace):
# Parse test config: (test, prompt, eval, user_speaks_first)
for test_config in TESTS:
test, prompt, eval, user_speaks_first = test_config
test, eval_config = test_config
await runner.run_eval(test, prompt, eval, user_speaks_first)
await runner.run_eval(test, eval_config)
runner.print_results()

View File

@@ -245,13 +245,25 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
item["text"] = "(empty)"
# handle image_url -> image conversion
if item["type"] == "image_url":
item["type"] = "image"
item["source"] = {
"type": "base64",
"media_type": "image/jpeg",
"data": item["image_url"]["url"].split(",")[1],
}
del item["image_url"]
if item["image_url"]["url"].startswith("data:"):
item["type"] = "image"
item["source"] = {
"type": "base64",
"media_type": "image/jpeg",
"data": item["image_url"]["url"].split(",")[1],
}
del item["image_url"]
elif item["image_url"]["url"].startswith("http"):
item["type"] = "image"
item["source"] = {
"type": "url",
"url": item["image_url"]["url"],
}
del item["image_url"]
else:
url = item["image_url"]["url"]
logger.warning(f"Unsupported 'image_url': {url}")
# In the case where there's a single image in the list (like what
# would result from a UserImageRawFrame), ensure that the image
# comes before text, as recommended by Anthropic docs

View File

@@ -256,15 +256,22 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
new_content.append({"text": text_content})
# handle image_url -> image conversion
if item["type"] == "image_url":
new_item = {
"image": {
"format": "jpeg",
"source": {
"bytes": base64.b64decode(item["image_url"]["url"].split(",")[1])
},
if item["image_url"]["url"].startswith("data:"):
new_item = {
"image": {
"format": "jpeg",
"source": {
"bytes": base64.b64decode(
item["image_url"]["url"].split(",")[1]
)
},
}
}
}
new_content.append(new_item)
new_content.append(new_item)
else:
url = item["image_url"]["url"]
logger.warning(f"Unsupported 'image_url': {url}")
# In the case where there's a single image in the list (like what
# would result from a UserImageRawFrame), ensure that the image
# comes before text

View File

@@ -343,7 +343,7 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
for c in content:
if c["type"] == "text":
parts.append(Part(text=c["text"]))
elif c["type"] == "image_url":
elif c["type"] == "image_url" and c["image_url"]["url"].startswith("data:"):
parts.append(
Part(
inline_data=Blob(
@@ -352,6 +352,9 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
)
)
)
elif c["type"] == "image_url":
url = c["image_url"]["url"]
logger.warning(f"Unsupported 'image_url': {url}")
elif c["type"] == "input_audio":
input_audio = c["input_audio"]
audio_bytes = base64.b64decode(input_audio["data"])

View File

@@ -1201,26 +1201,23 @@ class TransportMessageUrgentFrame(OutputTransportMessageUrgentFrame):
class UserImageRequestFrame(SystemFrame):
"""Frame requesting an image from a specific user.
A frame to request an image from the given user. The frame might be
generated by a function call in which case the corresponding fields will be
properly set.
A frame to request an image from the given user. The request might come with
a text that can be later used to describe the requested image.
Parameters:
user_id: Identifier of the user to request image from.
context: Optional context for the image request.
function_name: Name of function that generated this request (if any).
tool_call_id: Tool call ID if generated by function call.
text: An optional text associated to the image request.
add_to_context: Whether the requested image should be added to an LLM context.
video_source: Specific video source to capture from.
"""
user_id: str
context: Optional[Any] = None
function_name: Optional[str] = None
tool_call_id: Optional[str] = None
text: Optional[str] = None
add_to_context: Optional[bool] = None
video_source: Optional[str] = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, video_source: {self.video_source}, function: {self.function_name}, request: {self.tool_call_id})"
return f"{self.name}(user: {self.user_id}, text: {self.text}, add_to_context: {self.add_to_context}, {self.video_source})"
@dataclass
@@ -1294,15 +1291,17 @@ class UserImageRawFrame(InputImageRawFrame):
Parameters:
user_id: Identifier of the user who provided this image.
request: The original image request frame if this is a response.
text: An optional text associated to this image.
add_to_context: Whether this image should be added to an LLM context.
"""
user_id: str = ""
request: Optional[UserImageRequestFrame] = None
text: Optional[str] = None
add_to_context: Optional[bool] = None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})"
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, text: {self.text}, add_to_context: {self.add_to_context})"
@dataclass

View File

@@ -16,6 +16,7 @@ service-specific adapter.
import base64
import io
import wave
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, List, Optional, TypeAlias, Union
@@ -113,6 +114,89 @@ class LLMContext:
self._tools: ToolsSchema | NotGiven = LLMContext._normalize_and_validate_tools(tools)
self._tool_choice: LLMContextToolChoice | NotGiven = tool_choice
@staticmethod
def create_image_url_message(
*,
role: str = "user",
url: str,
text: Optional[str] = None,
) -> LLMContextMessage:
"""Create a context message containing an image URL.
Args:
role: The role of this message (defaults to "user").
url: The URL of the image.
text: Optional text to include with the image.
"""
content = []
if text:
content.append({"type": "text", "text": text})
content.append({"type": "image_url", "image_url": {"url": url}})
return {"role": role, "content": content}
@staticmethod
def create_image_message(
*,
role: str = "user",
format: str,
size: tuple[int, int],
image: bytes,
text: Optional[str] = None,
) -> LLMContextMessage:
"""Create a context message containing an image.
Args:
role: The role of this message (defaults to "user").
format: Image format (e.g., 'RGB', 'RGBA').
size: Image dimensions as (width, height) tuple.
image: Raw image bytes.
text: Optional text to include with the image.
"""
buffer = io.BytesIO()
Image.frombytes(format, size, image).save(buffer, format="JPEG")
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
url = f"data:image/jpeg;base64,{encoded_image}"
return LLMContext.create_image_url_message(role=role, url=url, text=text)
@staticmethod
def create_audio_message(
*, role: str = "user", audio_frames: list[AudioRawFrame], text: str = "Audio follows"
) -> LLMContextMessage:
"""Create a context message containing audio.
Args:
role: The role of this message (defaults to "user").
audio_frames: List of audio frame objects to include.
text: Optional text to include with the audio.
"""
sample_rate = audio_frames[0].sample_rate
num_channels = audio_frames[0].num_channels
content = []
content.append({"type": "text", "text": text})
data = b"".join(frame.audio for frame in audio_frames)
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(data)
encoded_audio = base64.b64encode(buffer.getvalue()).decode("utf-8")
content.append(
{
"type": "input_audio",
"input_audio": {"data": encoded_audio, "format": "wav"},
}
)
return {"role": role, "content": content}
@property
def messages(self) -> List[LLMContextMessage]:
"""Get the current messages list.
@@ -238,7 +322,7 @@ class LLMContext:
self._tool_choice = tool_choice
def add_image_frame_message(
self, *, format: str, size: tuple[int, int], image: bytes, text: str = None
self, *, format: str, size: tuple[int, int], image: bytes, text: Optional[str] = None
):
"""Add a message containing an image frame.
@@ -248,17 +332,8 @@ class LLMContext:
image: Raw image bytes.
text: Optional text to include with the image.
"""
buffer = io.BytesIO()
Image.frombytes(format, size, image).save(buffer, format="JPEG")
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
content = []
if text:
content.append({"type": "text", "text": text})
content.append(
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}},
)
self.add_message({"role": "user", "content": content})
message = LLMContext.create_image_message(format=format, size=size, image=image, text=text)
self.add_message(message)
def add_audio_frames_message(
self, *, audio_frames: list[AudioRawFrame], text: str = "Audio follows"
@@ -269,66 +344,8 @@ class LLMContext:
audio_frames: List of audio frame objects to include.
text: Optional text to include with the audio.
"""
if not audio_frames:
return
sample_rate = audio_frames[0].sample_rate
num_channels = audio_frames[0].num_channels
content = []
content.append({"type": "text", "text": text})
data = b"".join(frame.audio for frame in audio_frames)
data = bytes(
self._create_wav_header(
sample_rate,
num_channels,
16,
len(data),
)
+ data
)
encoded_audio = base64.b64encode(data).decode("utf-8")
content.append(
{
"type": "input_audio",
"input_audio": {"data": encoded_audio, "format": "wav"},
}
)
self.add_message({"role": "user", "content": content})
def _create_wav_header(self, sample_rate, num_channels, bits_per_sample, data_size):
"""Create a WAV file header for audio data.
Args:
sample_rate: Audio sample rate in Hz.
num_channels: Number of audio channels.
bits_per_sample: Bits per audio sample.
data_size: Size of audio data in bytes.
Returns:
WAV header as a bytearray.
"""
# RIFF chunk descriptor
header = bytearray()
header.extend(b"RIFF") # ChunkID
header.extend((data_size + 36).to_bytes(4, "little")) # ChunkSize: total size - 8
header.extend(b"WAVE") # Format
# "fmt " sub-chunk
header.extend(b"fmt ") # Subchunk1ID
header.extend((16).to_bytes(4, "little")) # Subchunk1Size (16 for PCM)
header.extend((1).to_bytes(2, "little")) # AudioFormat (1 for PCM)
header.extend(num_channels.to_bytes(2, "little")) # NumChannels
header.extend(sample_rate.to_bytes(4, "little")) # SampleRate
# Calculate byte rate and block align
byte_rate = sample_rate * num_channels * (bits_per_sample // 8)
block_align = num_channels * (bits_per_sample // 8)
header.extend(byte_rate.to_bytes(4, "little")) # ByteRate
header.extend(block_align.to_bytes(2, "little")) # BlockAlign
header.extend(bits_per_sample.to_bytes(2, "little")) # BitsPerSample
# "data" sub-chunk
header.extend(b"data") # Subchunk2ID
header.extend(data_size.to_bytes(4, "little")) # Subchunk2Size
return header
message = LLMContext.create_audio_message(audio_frames=audio_frames, text=text)
self.add_message(message)
@staticmethod
def _normalize_and_validate_tools(tools: ToolsSchema | NotGiven) -> ToolsSchema | NotGiven:

View File

@@ -616,7 +616,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
await self._handle_function_call_result(frame)
elif isinstance(frame, FunctionCallCancelFrame):
await self._handle_function_call_cancel(frame)
elif isinstance(frame, UserImageRawFrame) and frame.request and frame.request.tool_call_id:
elif isinstance(frame, UserImageRawFrame):
await self._handle_user_image_frame(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_aggregation()
@@ -767,27 +767,16 @@ class LLMAssistantAggregator(LLMContextAggregator):
message["content"] = result
async def _handle_user_image_frame(self, frame: UserImageRawFrame):
logger.debug(
f"{self} UserImageRawFrame: [{frame.request.function_name}:{frame.request.tool_call_id}]"
)
if frame.request.tool_call_id not in self._function_calls_in_progress:
logger.warning(
f"UserImageRawFrame tool_call_id [{frame.request.tool_call_id}] is not running"
)
if not frame.add_to_context:
return
del self._function_calls_in_progress[frame.request.tool_call_id]
logger.debug(f"{self} Adding UserImageRawFrame to LLM context (size: {frame.size})")
# Update context with the image frame
self._update_function_call_result(
frame.request.function_name, frame.request.tool_call_id, "COMPLETED"
)
self._context.add_image_frame_message(
format=frame.format,
size=frame.size,
image=frame.image,
text=frame.request.context,
text=frame.text,
)
await self.push_aggregation()

View File

@@ -27,11 +27,24 @@ class UserResponseAggregator(LLMUserAggregator):
def __init__(self, **kwargs):
"""Initialize the user response aggregator.
.. deprecated:: 0.0.92
`UserResponseAggregator` is deprecated and will be removed in a future version.
Args:
**kwargs: Additional arguments passed to parent LLMUserAggregator.
"""
super().__init__(context=LLMContext(), **kwargs)
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"`UserResponseAggregator` is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
async def push_aggregation(self):
"""Push the aggregated user response as a TextFrame.

View File

@@ -156,6 +156,12 @@ class DeepgramFluxSTTService(WebsocketSTTService):
self._language = Language.EN
self._websocket_url = None
self._receive_task = None
# Flux event handlers
self._register_event_handler("on_start_of_turn")
self._register_event_handler("on_turn_resumed")
self._register_event_handler("on_end_of_turn")
self._register_event_handler("on_eager_end_of_turn")
self._register_event_handler("on_update")
async def _connect(self):
"""Connect to WebSocket and start background tasks.
@@ -523,6 +529,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM)
await self.start_metrics()
await self._call_event_handler("on_start_of_turn", transcript)
if transcript:
logger.trace(f"Start of turn transcript: {transcript}")
@@ -537,6 +544,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
event: The event type string for logging purposes.
"""
logger.trace(f"Received event TurnResumed: {event}")
await self._call_event_handler("on_turn_resumed")
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EndOfTurn events from Deepgram Flux.
@@ -571,6 +579,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
await self.stop_processing_metrics()
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
await self._call_event_handler("on_end_of_turn", transcript)
async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EagerEndOfTurn events from Deepgram Flux.
@@ -615,6 +624,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
result=data,
)
)
await self._call_event_handler("on_eager_end_of_turn", transcript)
async def _handle_update(self, transcript: str):
"""Handle Update events from Deepgram Flux.
@@ -638,3 +648,4 @@ class DeepgramFluxSTTService(WebsocketSTTService):
# both the "user started speaking" event and the first transcript simultaneously,
# making this timing measurement meaningless in this context.
# await self.stop_ttfb_metrics()
await self._call_event_handler("on_update", transcript)

View File

@@ -492,11 +492,19 @@ class LLMService(AIService):
tool_call_id: Optional[str] = None,
text_content: Optional[str] = None,
video_source: Optional[str] = None,
timeout: Optional[float] = 10.0,
):
"""Request an image from a user.
Pushes a UserImageRequestFrame upstream to request an image from the
specified user.
specified user. The user image can then be processed by the LLM.
Use this function from a function call if you want the LLM to process
the image. If you expect the image to be processed by a vision service,
you might want to push a UserImageRequestFrame upstream directly.
.. deprecated:: 0.0.92
This method is deprecated, push a `UserImageRequestFrame` instead.
Args:
user_id: The ID of the user to request an image from.
@@ -504,15 +512,19 @@ class LLMService(AIService):
tool_call_id: Optional tool call ID associated with the request.
text_content: Optional text content/context for the image request.
video_source: Optional video source identifier.
timeout: Optional timeout for the requested image to be added to the LLM context.
"""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Method `request_image_frame()` is deprecated, push a `UserImageRequestFrame` instead.",
DeprecationWarning,
)
await self.push_frame(
UserImageRequestFrame(
user_id=user_id,
function_name=function_name,
tool_call_id=tool_call_id,
context=text_content,
video_source=video_source,
),
UserImageRequestFrame(user_id=user_id, text=text_content),
FrameDirection.UPSTREAM,
)

View File

@@ -11,15 +11,17 @@ for image analysis and description generation.
"""
import asyncio
import base64
from io import BytesIO
from typing import AsyncGenerator, Optional
from loguru import logger
from PIL import Image
from pipecat.frames.frames import ErrorFrame, Frame, TextFrame
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TextFrame,
UserImageRawFrame,
)
from pipecat.services.vision_service import VisionService
try:
@@ -92,16 +94,16 @@ class MoondreamService(VisionService):
trust_remote_code=True,
revision=revision,
device_map={"": device},
torch_dtype=dtype,
dtype=dtype,
).eval()
logger.debug("Loaded Moondream model")
async def run_vision(self, context: LLMContext) -> AsyncGenerator[Frame, None]:
async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]:
"""Analyze an image and generate a description.
Args:
context: The context to process, containing image data.
frame: The image frame to process.
Yields:
Frame: TextFrame containing the generated image description, or ErrorFrame
@@ -112,45 +114,14 @@ class MoondreamService(VisionService):
yield ErrorFrame("Moondream model not available")
return
image_bytes = None
text = None
try:
messages = context.get_messages()
last_message = messages[-1]
last_message_content = last_message.get("content")
logger.debug(f"Analyzing image (bytes length: {len(frame.image)})")
for item in last_message_content:
if isinstance(item, dict):
if (
"image_url" in item
and isinstance(item["image_url"], dict)
and item["image_url"].get("url")
):
image_bytes = base64.b64decode(item["image_url"]["url"].split(",")[1])
elif "text" in item and isinstance(item["text"], str):
text = item["text"]
except Exception as e:
logger.error(f"Exception during image extraction: {e}")
yield ErrorFrame("Failed to extract image from context")
return
if not image_bytes:
logger.error("No image found in context")
yield ErrorFrame("No image found in context")
return
logger.debug(
f"Analyzing image (bytes length: {len(image_bytes) if image_bytes else 'None'})"
)
def get_image_description(bytes: bytes, text: Optional[str]) -> str:
image_buffer = BytesIO(bytes)
image = Image.open(image_buffer)
def get_image_description(image_bytes: bytes, text: Optional[str]) -> str:
image = Image.frombytes(frame.format, frame.size, image_bytes)
image_embeds = self._model.encode_image(image)
description = self._model.query(image_embeds, text)["answer"]
return description
description = await asyncio.to_thread(get_image_description, image_bytes, text)
description = await asyncio.to_thread(get_image_description, frame.image, frame.text)
yield TextFrame(text=description)

View File

@@ -49,6 +49,33 @@ END_TOKEN = "<end>"
FINALIZED_TOKEN = "<fin>"
class SonioxContextGeneralItem(BaseModel):
"""Represents a key-value pair for structured general context information."""
key: str
value: str
class SonioxContextTranslationTerm(BaseModel):
"""Represents a custom translation mapping for ambiguous or domain-specific terms."""
source: str
target: str
class SonioxContextObject(BaseModel):
"""Context object for models with context_version 2, for Soniox stt-rt-v3-preview and higher.
Learn more about context in the documentation:
https://soniox.com/docs/stt/concepts/context
"""
general: Optional[List[SonioxContextGeneralItem]] = None
text: Optional[str] = None
terms: Optional[List[str]] = None
translation_terms: Optional[List[SonioxContextTranslationTerm]] = None
class SonioxInputParams(BaseModel):
"""Real-time transcription settings.
@@ -60,9 +87,9 @@ class SonioxInputParams(BaseModel):
audio_format: Audio format to use for transcription.
num_channels: Number of channels to use for transcription.
language_hints: List of language hints to use for transcription.
context: Customization for transcription.
enable_non_final_tokens: Whether to enable non-final tokens. If false, only final tokens will be returned.
max_non_final_tokens_duration_ms: Maximum duration of non-final tokens.
context: Customization for transcription. String for models with context_version 1 and ContextObject for models with context_version 2.
enable_speaker_diarization: Whether to enable speaker diarization. Tokens are annotated with speaker IDs.
enable_language_identification: Whether to enable language identification. Tokens are annotated with language IDs.
client_reference_id: Client reference ID to use for transcription.
"""
@@ -72,10 +99,10 @@ class SonioxInputParams(BaseModel):
num_channels: Optional[int] = 1
language_hints: Optional[List[Language]] = None
context: Optional[str] = None
context: Optional[SonioxContextObject | str] = None
enable_non_final_tokens: Optional[bool] = True
max_non_final_tokens_duration_ms: Optional[int] = None
enable_speaker_diarization: Optional[bool] = False
enable_language_identification: Optional[bool] = False
client_reference_id: Optional[str] = None
@@ -173,6 +200,10 @@ class SonioxSTTService(STTService):
# Either one or the other is required.
enable_endpoint_detection = not self._vad_force_turn_endpoint
context = self._params.context
if isinstance(context, SonioxContextObject):
context = context.model_dump()
# Send the initial configuration message.
config = {
"api_key": self._api_key,
@@ -182,9 +213,9 @@ class SonioxSTTService(STTService):
"enable_endpoint_detection": enable_endpoint_detection,
"sample_rate": self.sample_rate,
"language_hints": _prepare_language_hints(self._params.language_hints),
"context": self._params.context,
"enable_non_final_tokens": self._params.enable_non_final_tokens,
"max_non_final_tokens_duration_ms": self._params.max_non_final_tokens_duration_ms,
"context": context,
"enable_speaker_diarization": self._params.enable_speaker_diarization,
"enable_language_identification": self._params.enable_language_identification,
"client_reference_id": self._params.client_reference_id,
}

View File

@@ -0,0 +1,189 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Speechmatics TTS service integration."""
from typing import AsyncGenerator, Optional
from urllib.parse import urlencode
import aiohttp
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.tts_service import TTSService
from pipecat.utils.tracing.service_decorators import traced_tts
try:
from speechmatics.rt import __version__
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Speechmatics, you need to `pip install pipecat-ai[speechmatics]`."
)
raise Exception(f"Missing module: {e}")
class SpeechmaticsTTSService(TTSService):
"""Speechmatics TTS service implementation.
This service provides text-to-speech synthesis using the Speechmatics HTTP API.
It converts text to speech and returns raw PCM audio data for real-time playback.
"""
SPEECHMATICS_SAMPLE_RATE = 16000
class InputParams(BaseModel):
"""Optional input parameters for Speechmatics TTS configuration."""
pass
def __init__(
self,
*,
api_key: str,
base_url: str = "https://preview.tts.speechmatics.com",
voice_id: str = "sarah",
aiohttp_session: aiohttp.ClientSession,
sample_rate: Optional[int] = SPEECHMATICS_SAMPLE_RATE,
params: Optional[InputParams] = None,
**kwargs,
):
"""Initialize the Speechmatics TTS service.
Args:
api_key: Speechmatics API key for authentication.
base_url: Base URL for Speechmatics TTS API.
voice_id: Voice model to use for synthesis.
aiohttp_session: Shared aiohttp session for HTTP requests.
sample_rate: Audio sample rate in Hz.
params: Optional[InputParams]: Input parameters for the service.
**kwargs: Additional arguments passed to TTSService.
"""
if sample_rate and sample_rate != self.SPEECHMATICS_SAMPLE_RATE:
logger.warning(
f"Speechmatics TTS only supports {self.SPEECHMATICS_SAMPLE_RATE}Hz sample rate. "
f"Current rate of {sample_rate}Hz may cause issues."
)
super().__init__(sample_rate=sample_rate, **kwargs)
# Service parameters
self._api_key: str = api_key
self._base_url: str = base_url
self._session = aiohttp_session
# Check we have required attributes
if not self._api_key:
raise ValueError("Missing Speechmatics API key")
# Default parameters
self._params = params or SpeechmaticsTTSService.InputParams()
# Set voice from constructor parameter
self.set_voice(voice_id)
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Speechmatics service supports metrics generation.
"""
return True
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Speechmatics' HTTP API.
Args:
text: The text to synthesize into speech.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
}
payload = {
"text": text,
}
url = _get_endpoint_url(self._base_url, self._voice_id, self.sample_rate)
try:
await self.start_ttfb_metrics()
async with self._session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_message = f"Speechmatics TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
# Process the response in streaming chunks
first_chunk = True
buffer = b""
async for chunk in response.content.iter_any():
if not chunk:
continue
if first_chunk:
await self.stop_ttfb_metrics()
first_chunk = False
buffer += chunk
# Emit all complete 2-byte int16 samples from buffer
if len(buffer) >= 2:
complete_samples = len(buffer) // 2
complete_bytes = complete_samples * 2
audio_data = buffer[:complete_bytes]
buffer = buffer[complete_bytes:] # Keep remaining bytes for next iteration
yield TTSAudioRawFrame(
audio=audio_data,
sample_rate=self.sample_rate,
num_channels=1,
)
except Exception as e:
logger.exception(f"Error generating TTS: {e}")
yield ErrorFrame(error=f"Speechmatics TTS error: {str(e)}")
finally:
yield TTSStoppedFrame()
def _get_endpoint_url(base_url: str, voice: str, sample_rate: int) -> str:
"""Format the TTS endpoint URL with voice, output format, and version params.
Args:
base_url: The base URL for the TTS endpoint.
voice: The voice model to use.
sample_rate: The audio sample rate.
Returns:
str: The formatted TTS endpoint URL.
"""
query_params = {}
query_params["output_format"] = f"pcm_{sample_rate}"
query_params["sm-app"] = f"pipecat/{__version__}"
query = urlencode(query_params)
return f"{base_url}/generate/{voice}?{query}"

View File

@@ -14,8 +14,7 @@ visual content.
from abc import abstractmethod
from typing import AsyncGenerator
from pipecat.frames.frames import Frame, LLMContextFrame
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.frames.frames import Frame, UserImageRawFrame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
@@ -38,15 +37,15 @@ class VisionService(AIService):
self._describe_text = None
@abstractmethod
async def run_vision(self, context: LLMContext) -> AsyncGenerator[Frame, None]:
"""Process the latest image in the context and generate results.
async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]:
"""Process the given vision image and generate results.
This method must be implemented by subclasses to provide actual computer
vision functionality such as image description, object detection, or
visual question answering.
Args:
context: The context to process, containing image data.
frame: The image frame to process.
Yields:
Frame: Frames containing the vision analysis results, typically TextFrame
@@ -57,7 +56,7 @@ class VisionService(AIService):
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames, handling vision image frames for analysis.
Automatically processes VisionImageRawFrame objects by calling run_vision
Automatically processes UserImageRawFrame objects by calling run_vision
and handles metrics tracking. Other frames are passed through unchanged.
Args:
@@ -66,9 +65,9 @@ class VisionService(AIService):
"""
await super().process_frame(frame, direction)
if isinstance(frame, LLMContextFrame):
if isinstance(frame, UserImageRawFrame) and frame.text:
await self.start_processing_metrics()
await self.process_generator(self.run_vision(frame.context))
await self.process_generator(self.run_vision(frame))
await self.stop_processing_metrics()
else:
await self.push_frame(frame, direction)

View File

@@ -1839,10 +1839,11 @@ class DailyInputTransport(BaseInputTransport):
if render_frame:
frame = UserImageRawFrame(
user_id=participant_id,
request=request_frame,
image=video_frame.buffer,
size=(video_frame.width, video_frame.height),
format=video_frame.color_format,
text=request_frame.text if request_frame else None,
add_to_context=request_frame.add_to_context if request_frame else None,
)
frame.transport_source = video_source
await self.push_video_frame(frame)

View File

@@ -15,7 +15,7 @@ import asyncio
import fractions
import time
from collections import deque
from typing import Any, Awaitable, Callable, Optional
from typing import Any, Awaitable, Callable, List, Optional
import numpy as np
from loguru import logger
@@ -567,7 +567,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
self._receive_audio_task = None
self._receive_video_task = None
self._receive_screen_video_task = None
self._image_requests = {}
self._image_requests: List[UserImageRequestFrame] = []
# Whether we have seen a StartFrame already.
self._initialized = False
@@ -657,23 +657,27 @@ class SmallWebRTCInputTransport(BaseInputTransport):
if video_frame:
await self.push_video_frame(video_frame)
# Check if there are any pending image requests and create UserImageRawFrame
if self._image_requests:
for req_id, request_frame in list(self._image_requests.items()):
if request_frame.video_source == video_source:
# Create UserImageRawFrame using the current video frame
image_frame = UserImageRawFrame(
user_id=request_frame.user_id,
request=request_frame,
image=video_frame.image,
size=video_frame.size,
format=video_frame.format,
)
image_frame.transport_source = video_source
# Push the frame to the pipeline
await self.push_video_frame(image_frame)
# Remove from pending requests
del self._image_requests[req_id]
# Check if there are any pending image requests and create
# UserImageRawFrame. Use a shallow copy so we can remove
# elements.
for request_frame in self._image_requests[:]:
if request_frame.video_source == video_source:
# Create UserImageRawFrame using the current video frame
image_frame = UserImageRawFrame(
user_id=request_frame.user_id,
image=video_frame.image,
size=video_frame.size,
format=video_frame.format,
text=request_frame.text if request_frame else None,
add_to_context=request_frame.add_to_context
if request_frame
else None,
)
image_frame.transport_source = video_source
# Push the frame to the pipeline
await self.push_video_frame(image_frame)
# Remove from pending requests
self._image_requests.remove(request_frame)
except Exception as e:
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
@@ -701,8 +705,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
logger.debug(f"Requesting image from participant: {frame.user_id}")
# Store the request
request_id = f"{frame.function_name}:{frame.tool_call_id}"
self._image_requests[request_id] = frame
self._image_requests.append(frame)
# Default to camera if no source specified
if frame.video_source is None: