Compare commits
3 Commits
fix/event-
...
vp-trace-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
03dc1b343a | ||
|
|
f0471dccda | ||
|
|
fd0ef110ff |
155
examples/foundational/trace/001-trace.py
Normal file
155
examples/foundational/trace/001-trace.py
Normal file
@@ -0,0 +1,155 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from turn_detector_observer import TurnDetectorObserver
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.services.openai.stt import OpenAISTTService
|
||||
from pipecat.services.openai.tts import OpenAITTSService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
### STT ###
|
||||
stt = OpenAISTTService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o-transcribe",
|
||||
prompt="Expect normal helpful conversation.",
|
||||
)
|
||||
|
||||
### LLM ###
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
### TTS ###
|
||||
tts = OpenAITTSService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
voice="ballad",
|
||||
params=OpenAITTSService.InputParams(
|
||||
instructions="Please speak clearly and at a moderate pace."
|
||||
),
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
# RTVI events for detecting bot aggregation
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
### PIPELINE ###
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
rtvi,
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
### TASK ###
|
||||
turn_detector = TurnDetectorObserver()
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
observers=[turn_detector, RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
turn_detector.set_turn_observer_event_handlers(task.turn_tracking_observer)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
# not sure this is needed...
|
||||
@rtvi.event_handler("on_client_ready")
|
||||
async def on_client_ready(rtvi):
|
||||
await rtvi.set_bot_ready()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
161
examples/foundational/trace/002-realtime-trace.py
Normal file
161
examples/foundational/trace/002-realtime-trace.py
Normal file
@@ -0,0 +1,161 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from turn_detector_observer import TurnDetectorObserver
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.services.openai.stt import OpenAISTTService
|
||||
from pipecat.services.openai.tts import OpenAITTSService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
session_properties = SessionProperties(
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
transcription=InputAudioTranscription(),
|
||||
# Set openai TurnDetection parameters. Not setting this at all will turn it
|
||||
# on by default
|
||||
turn_detection=SemanticTurnDetection(),
|
||||
# Or set to False to disable openai turn detection and use transport VAD
|
||||
# turn_detection=False,
|
||||
noise_reduction=InputAudioNoiseReduction(type="near_field"),
|
||||
)
|
||||
),
|
||||
# In this example we provide tools through the context, but you could
|
||||
# alternatively provide them here.
|
||||
# tools=tools,
|
||||
instructions="""You are a helpful and friendly AI.
|
||||
|
||||
Act like a human, but remember that you aren't a human and that you can't do human
|
||||
things in the real world. Your voice and personality should be warm and engaging, with a lively and
|
||||
playful tone.
|
||||
|
||||
If interacting in a non-English language, start by using the standard accent or dialect familiar to
|
||||
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
|
||||
even if you're asked about them.
|
||||
|
||||
You are participating in a voice conversation. Keep your responses concise, short, and to the point
|
||||
unless specifically asked to elaborate on a topic.
|
||||
|
||||
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
|
||||
)
|
||||
|
||||
### LLM ###
|
||||
llm = OpenAIRealtimeLLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
session_properties=session_properties,
|
||||
start_audio_paused=False,
|
||||
)
|
||||
|
||||
# Create a standard OpenAI LLM context object using the normal messages format. The
|
||||
# OpenAIRealtimeLLMService will convert this internally to messages that the
|
||||
# openai WebSocket API can understand.
|
||||
context = LLMContext(
|
||||
[{"role": "user", "content": "Say hello!"}],
|
||||
tools,
|
||||
)
|
||||
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
### PIPELINE ###
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
### TASK ###
|
||||
turn_detector = TurnDetectorObserver()
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
observers=[turn_detector],
|
||||
)
|
||||
|
||||
turn_detector.set_turn_observer_event_handlers(task.turn_tracking_observer)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
188
examples/foundational/trace/003-function-calling-trace.py
Normal file
188
examples/foundational/trace/003-function-calling-trace.py
Normal file
@@ -0,0 +1,188 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from turn_detector_observer import TurnDetectorObserver
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.services.openai.stt import OpenAISTTService
|
||||
from pipecat.services.openai.tts import OpenAITTSService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
await params.result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
await params.result_callback({"name": "The Golden Dragon"})
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
### STT ###
|
||||
stt = OpenAISTTService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o-transcribe",
|
||||
prompt="Expect normal helpful conversation.",
|
||||
)
|
||||
|
||||
### LLM ###
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
### TTS ###
|
||||
tts = OpenAITTSService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
voice="ballad",
|
||||
params=OpenAITTSService.InputParams(instructions="Please speak clearly and at a moderate pace."),
|
||||
)
|
||||
|
||||
# You can also register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
@llm.event_handler("on_function_calls_started")
|
||||
async def on_function_calls_started(service, function_calls):
|
||||
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the user's location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
restaurant_function = FunctionSchema(
|
||||
name="get_restaurant_recommendation",
|
||||
description="Get a restaurant recommendation",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
### PIPELINE ###
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
### TASK ###
|
||||
turn_detector = TurnDetectorObserver()
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
observers=[turn_detector],
|
||||
)
|
||||
|
||||
turn_detector.set_turn_observer_event_handlers(task.turn_tracking_observer)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
11
examples/foundational/trace/README.md
Normal file
11
examples/foundational/trace/README.md
Normal file
@@ -0,0 +1,11 @@
|
||||
```bash
|
||||
uv sync
|
||||
uv pip install -e '.[cartesia,daily,elevenlabs,local-smart-turn-v3,openai,runner,webrtc]'
|
||||
```
|
||||
|
||||
```bash
|
||||
python examples/foundational/trace/001-trace.py
|
||||
```
|
||||
|
||||
- open [http://localhost:7860](http://localhost:7860)
|
||||
- click `connect` button in top right
|
||||
5
examples/foundational/trace/example.env
Normal file
5
examples/foundational/trace/example.env
Normal file
@@ -0,0 +1,5 @@
|
||||
OPENAI_API_KEY=...
|
||||
|
||||
ELEVENLABS_API_KEY=...
|
||||
ELEVENLABS_VOICE_ID=...
|
||||
CARTESIA_API_KEY=...
|
||||
181
examples/foundational/trace/turn_detector_observer.py
Normal file
181
examples/foundational/trace/turn_detector_observer.py
Normal file
@@ -0,0 +1,181 @@
|
||||
import time
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
EndFrame,
|
||||
FunctionCallResultFrame,
|
||||
FunctionCallsStartedFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
StartFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.openai.base_llm import LLMService
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
|
||||
|
||||
class TurnDetectorObserver(BaseObserver):
|
||||
"""Observer ... of turns."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self._turn_observer = None
|
||||
self._arrow = "→"
|
||||
|
||||
self._turn_number = 1
|
||||
self._endframe_queued = False
|
||||
|
||||
def init(self):
|
||||
"""
|
||||
Set ...
|
||||
"""
|
||||
pass
|
||||
|
||||
def set_turn_observer_event_handlers(self, turn_observer):
|
||||
self._turn_observer = turn_observer
|
||||
self.set_turn_observer_event_handlers(self._turn_observer)
|
||||
|
||||
def get_turn_observer(self):
|
||||
return self._turn_observer
|
||||
|
||||
def set_turn_observer_event_handlers(self, turn_observer):
|
||||
"""Sets the Turn Observer event handlers `on_turn_started` and `on_turn_ended`.
|
||||
|
||||
Args:
|
||||
turn_observer: The turn tracking observer of the pipeline task
|
||||
"""
|
||||
|
||||
@turn_observer.event_handler("on_turn_started")
|
||||
async def on_turn_started(observer, turn_number):
|
||||
self._turn_number = turn_number
|
||||
current_time = time.time()
|
||||
logger.info(f"🔄 Turn {turn_number} started")
|
||||
|
||||
#
|
||||
# code to start conversation turn here
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
@turn_observer.event_handler("on_turn_ended")
|
||||
async def on_turn_ended(observer, turn_number, duration, was_interrupted):
|
||||
current_time = time.time()
|
||||
|
||||
if was_interrupted:
|
||||
logger.info(f"🔄 Turn {turn_number} interrupted after {duration:.2f}s")
|
||||
else:
|
||||
logger.info(f"🏁 Turn {turn_number} completed in {duration:.2f}s")
|
||||
|
||||
#
|
||||
# code to end conversation turn here
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
########
|
||||
# everything past here isn't needed, just nice to have logging
|
||||
########
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
"""Runs when any frame is pushed through pipeline.
|
||||
Determines based on what type of frame and where it came from
|
||||
what metrics to update.
|
||||
|
||||
Args:
|
||||
data: the pushed frame
|
||||
"""
|
||||
src = data.source
|
||||
dst = data.destination
|
||||
frame = data.frame
|
||||
direction = data.direction
|
||||
timestamp = data.timestamp
|
||||
|
||||
# Convert timestamp to milliseconds for readability
|
||||
time_sec = timestamp / 1_000_000
|
||||
# Convert timestamp to seconds for readability
|
||||
# time_sec = timestamp / 1_000_000_000
|
||||
|
||||
# only log downstream frames
|
||||
if direction == FrameDirection.UPSTREAM:
|
||||
return
|
||||
|
||||
if isinstance(src, Pipeline) or isinstance(dst, Pipeline):
|
||||
if isinstance(frame, StartFrame):
|
||||
self._handle_StartFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, EndFrame):
|
||||
self._handle_EndFrame(src, dst, frame, time_sec)
|
||||
|
||||
if isinstance(src, BaseOutputTransport):
|
||||
if isinstance(frame, BotStartedSpeakingFrame):
|
||||
self._handle_BotStartedSpeakingFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
self._handle_BotStoppedSpeakingFrame(src, dst, frame, time_sec)
|
||||
|
||||
elif isinstance(frame, UserStartedSpeakingFrame):
|
||||
self._handle_UserStartedSpeakingFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
self._handle_UserStoppedSpeakingFrame(src, dst, frame, time_sec)
|
||||
|
||||
if isinstance(src, LLMService):
|
||||
if isinstance(frame, LLMFullResponseStartFrame):
|
||||
self._handle_LLMFullResponseStartFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, LLMFullResponseEndFrame):
|
||||
self._handle_LLMFullResponseEndFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, FunctionCallsStartedFrame):
|
||||
self._handle_FunctionCallsStartedFrame(src, dst, frame, time_sec)
|
||||
elif isinstance(frame, FunctionCallResultFrame):
|
||||
self._handle_FunctionCallResultFrame(src, dst, frame, time_sec)
|
||||
|
||||
# ------------ FRAME HANDLERS ------------
|
||||
|
||||
def _handle_StartFrame(self, src, dst, frame, time_sec):
|
||||
if isinstance(dst, Pipeline):
|
||||
logger.info(f"🟢🟢🟢 StartFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_EndFrame(self, src, dst, frame, time_sec):
|
||||
if isinstance(dst, Pipeline):
|
||||
logger.info(f"Queueing 🔴🔴🔴 EndFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
self._endframe_queued = True
|
||||
|
||||
if isinstance(src, Pipeline):
|
||||
logger.info(f"🔴🔴🔴 EndFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
current_time = time.time()
|
||||
end_state_info = {
|
||||
"turn_number": self._turn_number,
|
||||
}
|
||||
|
||||
def _handle_BotStartedSpeakingFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🤖🟢 BotStartedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_BotStoppedSpeakingFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🤖🔴 BotStoppedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_LLMFullResponseStartFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🧠🟢 LLMFullResponseStartFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_LLMFullResponseEndFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🧠🔴 LLMFullResponseEndFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_UserStartedSpeakingFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🙂🟢 UserStartedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_UserStoppedSpeakingFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(f"🙂🔴 UserStoppedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
def _handle_FunctionCallsStartedFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(
|
||||
f"📐🟢 {frame.function_calls[0].function_name} FunctionCallsStartedFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s"
|
||||
)
|
||||
|
||||
def _handle_FunctionCallResultFrame(self, src, dst, frame, time_sec):
|
||||
logger.info(
|
||||
f"📐🔴 {frame.function_name} FunctionCallResultFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s"
|
||||
)
|
||||
Reference in New Issue
Block a user