Compare commits

...

42 Commits

Author SHA1 Message Date
James Hush
0a163201ea feat: Add sentence aggregation and Whisker debugger to transcript processor
- Enhance TranscriptHandler to aggregate transcript fragments into complete sentences using match_endofsentence()
- Add Whisker debugger integration for real-time pipeline visualization
- Implement sentence buffering for both user and assistant messages
- Add finalize_partial_sentences() method to handle incomplete sentences on disconnect
- Improves transcript readability by reducing fragmented output

Changes:
- Import match_endofsentence utility for sentence boundary detection
- Add pipecat_whisker.WhiskerObserver for debugging capabilities
- Modify on_transcript_update() to accumulate and aggregate messages
- Create _save_sentence() helper method for complete sentence handling
- Update client disconnect handler to preserve partial sentences
2025-09-25 14:01:19 +08:00
Aleix Conchillo Flaqué
3797f41c8c Merge pull request #2734 from pipecat-ai/aleix/pipecat-0.0.86
update CHANGELOG for 0.0.86
2025-09-24 12:19:16 -07:00
Aleix Conchillo Flaqué
ff919b8c15 update CHANGELOG for 0.0.86 2025-09-24 11:28:14 -07:00
Aleix Conchillo Flaqué
cb048d6c7e Merge pull request #2733 from pipecat-ai/aleix/tavus-missing-daily-callback
TavusTransport: add missing on_before_leave callback
2025-09-24 11:10:32 -07:00
Aleix Conchillo Flaqué
6c2c43ade0 Merge pull request #2724 from pipecat-ai/pk/update-natural-conversation-examples-with-universal-context
Update natural conversation examples with universal context
2025-09-24 11:07:50 -07:00
Aleix Conchillo Flaqué
f899c15b03 Merge pull request #2731 from pipecat-ai/pk/update-example-25-to-use-universal-context
Update example 25 to use universal `LLMContext`
2025-09-24 11:05:36 -07:00
Aleix Conchillo Flaqué
d10ef08775 Merge pull request #2727 from pipecat-ai/pk/strands-agents-needs-to-support-openaillmcontextframe-for-now
`StrandsAgentsProcessor` should still support `OpenAILLMContextFrame`…
2025-09-24 11:05:07 -07:00
Aleix Conchillo Flaqué
27a5af6fa1 Merge pull request #2728 from pipecat-ai/pk/fix-playht-in-env-example
Fix PlayHT env variable names in env.example
2025-09-24 11:04:55 -07:00
Aleix Conchillo Flaqué
4bff0a7c49 Merge pull request #2732 from pipecat-ai/pk/update-voicemail-detector-to-use-llm-context
Update `VoicemailDetector` to use universal `LLMContext`
2025-09-24 11:04:42 -07:00
Aleix Conchillo Flaqué
508f7d203d Merge pull request #2729 from pipecat-ai/aleix/frame-processor-cancel-default-timeout
FrameProcessor: timeout when cancelling tasks
2025-09-24 10:55:52 -07:00
Filipi da Silva Fuchter
0f87d5342c Merge pull request #2266 from pipecat-ai/filipi/hey_gen_transport
HeyGen implementation for Pipecat - HeyGenTransport
2025-09-24 14:35:50 -03:00
Filipi Fuchter
f6164e3bde HeyGen implementation for Pipecat - HeyGenTransport. 2025-09-24 14:33:56 -03:00
Aleix Conchillo Flaqué
1a0fb55d0f TavusTransport: add missing on_before_leave callback 2025-09-24 10:18:56 -07:00
Paul Kompfner
6d0beef944 Update VoicemailDetector to use universal LLMContext 2025-09-24 12:58:42 -04:00
Paul Kompfner
b9fd6b873b Update comment in example 07b to reference LLMContext rather than OpenAILLMContext 2025-09-24 12:49:34 -04:00
Paul Kompfner
dea0f1791f Update OpenAILLMAdapter.get_messages_for_logging() to truncate "input_audio" message data 2025-09-24 12:41:41 -04:00
Paul Kompfner
da66c38795 Update example 25 to use universal LLMContext 2025-09-24 12:37:29 -04:00
Paul Kompfner
912f8b96f0 Fix PlayHT env variable names in env.example 2025-09-24 11:24:35 -04:00
Aleix Conchillo Flaqué
f9eb447d82 FrameProcessor: timeout when cancelling tasks 2025-09-24 08:24:28 -07:00
Paul Kompfner
65f5fe8588 StrandsAgentsProcessor should still support OpenAILLMContextFrame until that frame has been deprecated 2025-09-24 11:05:27 -04:00
Paul Kompfner
817c77f3fe Update SmallWebRTCTransport to pass a sender ID in the "on_app_message" event 2025-09-24 10:24:59 -04:00
Paul Kompfner
8896179b00 Update another "natural conversation" example to use universal LLMContext. Note that this one had to also be fixed in various ways, as it wasn't working. 2025-09-24 09:55:11 -04:00
Filipi da Silva Fuchter
463752360b Merge pull request #2726 from pipecat-ai/mb/twilio-serializer-cleanup
Fixup for TwilioFrameSerializer
2025-09-24 10:45:38 -03:00
Paul Kompfner
66b7977a62 Make SmallWebRTCTransport adhere to the expected "on_app_message" event signature 2025-09-24 09:34:28 -04:00
Mark Backman
468de68aec Fixup for TwilioFrameSerializer 2025-09-24 09:32:46 -04:00
Mark Backman
c4762c1a92 Merge pull request #2627 from jessieweiyi/main
Support TwilioFrameSerializer region/edge settings. Close #2625.
2025-09-24 09:13:10 -04:00
Aleix Conchillo Flaqué
7f4d3a2f02 pyproject: updated sentry to 2.38.0 2025-09-23 19:12:03 -07:00
Jessie Wei
88614b312f Merge branch 'pipecat-ai:main' into main 2025-09-24 10:23:52 +10:00
Jessie Wei
5b4655f45a chore: Update per review comments 2025-09-24 00:22:56 +00:00
Aleix Conchillo Flaqué
d7c8f8df53 update CHANGELOG with AudioBufferProcessor fixes 2025-09-23 15:42:01 -07:00
Aleix Conchillo Flaqué
2571cb2e69 tests: fix formatting 2025-09-23 15:28:07 -07:00
Aleix Conchillo Flaqué
15782be27c Merge pull request #2676 from golbin/main
Fix audio buffer flush and silence handling
2025-09-23 15:27:31 -07:00
Aleix Conchillo Flaqué
997e4b66c6 Merge pull request #2722 from pipecat-ai/aleix/strands-agents-update-and-evals
examples: update Strands Agents with universal context and add evals
2025-09-23 15:21:41 -07:00
Paul Kompfner
6ccbfd9b57 Update "natural conversation" examples to use universal LLMContext 2025-09-23 16:20:16 -04:00
Paul Kompfner
677f69971c Add filters in 22b and 22c examples to prevent function call results triggering the "statement judge" LLM from running unnecessarily, and with the wrong system prompt, which would result in garbled output statements comprised of both LLMs outputs combined 2025-09-23 16:14:58 -04:00
Paul Kompfner
678dd22b8e Add missing sender argument to a few "on_app_message" handlers in examples 2025-09-23 15:29:20 -04:00
Aleix Conchillo Flaqué
620b1f785c examples: update Strands Agents with universal context and add evals 2025-09-23 11:37:57 -07:00
Jessie Wei
392293d55f Merge branch 'pipecat-ai:main' into main 2025-09-23 07:48:45 +10:00
Jin Kim
58f70e7e0d Add tests for audio buffer processor flush alignment 2025-09-18 22:19:32 +09:00
Jin Kim
d0b573e44f Fix audio buffer flush and silence handling 2025-09-18 19:40:45 +09:00
Jessie Wei
305108be9a [TwilioFrameSerializer]: Add parameter validation 2025-09-10 23:00:15 +00:00
Jessie Wei
2e1f397d17 Support TwilioFrameSerializer region/edge settings to that the call is terminated successfully for regions other than default us1 region 2025-09-10 14:30:10 +00:00
28 changed files with 980 additions and 226 deletions

View File

@@ -5,10 +5,17 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.86] - 2025-09-24
### Added
- Added `HeyGenTransport`. This is an integration for HeyGen Interactive
Avatar. A video service that handles audio streaming and requests HeyGen to
generate avatar video responses. (see https://www.heygen.com/). When used, the
Pipecat bot joins the same virtual room as the HeyGen Avatar and the user.
- Added support to `TwilioFrameSerializer` for `region` and `edge` settings.
- Added support for using universal `LLMContext` with:
- `LLMLogObserver`
@@ -68,6 +75,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated `aiortc` to 1.13.0.
- Updated `sentry` to 2.38.0.
- `BaseOutputTransport` methods `write_audio_frame` and `write_video_frame` now
return a boolean to indicate if the transport implementation was able to write
the given frame or not.
@@ -102,6 +113,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue where the pipeline could freeze if a task cancellation never
completed because a third-party library swallowed asyncio.CancelledError. We
now apply a timeout to task cancellations to prevent these freezes. If the
timeout is reached, the system logs warnings and leaves dangling tasks behind,
which can help diagnose where cancellation is being blocked.
- Fixed an `AudioBufferProcessor` issues that was causing user audio to be
missing in stereo recordings causing bot and user overlaps.
- Fixed a `BaseOutputTransport` issue that could produce large saved
`AudioBufferProcessor` files when using an audio mixer.

View File

@@ -66,8 +66,8 @@ LMNT_VOICE_ID=...
PERPLEXITY_API_KEY=...
# PlayHT
PLAY_HT_USER_ID=...
PLAY_HT_API_KEY=...
PLAYHT_USER_ID=...
PLAYHT_API_KEY=...
# OpenAI
OPENAI_API_KEY=...

View File

@@ -129,7 +129,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
# An `OpenAILLMContextFrame` will be picked up by the LangchainProcessor using
# An `LLMContextFrame` will be picked up by the LangchainProcessor using
# only the content of the last message to inject it in the prompt defined
# above. So no role is required here.
messages = [({"content": "Please briefly introduce yourself to the user."})]

View File

@@ -9,14 +9,12 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesAppendFrame, LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.strands_agents import StrandsAgentsProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
@@ -115,19 +113,18 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
# Setup context aggregators for message handling
context = OpenAILLMContext()
tma_in = LLMUserContextAggregator(context=context)
tma_out = LLMAssistantContextAggregator(context=context)
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # Speech-to-text
tma_in, # User context aggregator
context_aggregator.user(), # User responses
llm, # Strands Agents processor
tts, # Text-to-speech
transport.output(), # Transport bot output
tma_out, # Assistant context aggregator
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -143,6 +140,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames(
[
LLMMessagesAppendFrame(
messages=[
{
"role": "user",
"content": f"Greet the user and introduce yourself.",
}
],
run_llm=True,
)
]
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -9,8 +9,9 @@ import os
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
CancelFrame,
@@ -19,6 +20,7 @@ from pipecat.frames.frames import (
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InterruptionFrame,
LLMContextFrame,
LLMRunFrame,
StartFrame,
SystemFrame,
@@ -32,10 +34,8 @@ from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.user_idle_processor import UserIdleProcessor
@@ -66,13 +66,13 @@ class StatementJudgeContextFilter(FrameProcessor):
await self.push_frame(frame, direction)
return
# We only want to handle OpenAILLMContextFrames, and only want to push through a simplified
# We only want to handle LLMContextFrames, and only want to push through a simplified
# context frame that contains a system prompt and the most recent user messages,
# concatenated.
if isinstance(frame, OpenAILLMContextFrame):
if isinstance(frame, LLMContextFrame):
logger.debug(f"Context Frame: {frame}")
# Take text content from the most recent user messages.
messages = frame.context.messages
messages = frame.context.get_messages()
user_text_messages = []
last_assistant_message = None
for message in reversed(messages):
@@ -100,7 +100,7 @@ class StatementJudgeContextFilter(FrameProcessor):
if last_assistant_message:
messages.append(last_assistant_message)
messages.append({"role": "user", "content": user_message})
await self.push_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
await self.push_frame(LLMContextFrame(LLMContext(messages)))
class CompletenessCheck(FrameProcessor):
@@ -231,22 +231,26 @@ class TurnDetectionLLM(Pipeline):
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
isinstance(frame, LLMContextFrame)
or isinstance(frame, InterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
or isinstance(frame, FunctionCallResultFrame)
)
async def filter_all(frame):
return False
super().__init__(
[
ParallelPipeline(
[
# Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed
# Ignore everything except an LLMContextFrame. Pass a specially constructed
# simplified context frame to the statement classifier LLM. The only frame this
# sub-pipeline will output is a UserStoppedSpeakingFrame.
statement_judge_context_filter,
statement_llm,
completeness_check,
FunctionFilter(filter=filter_all, direction=FrameDirection.UPSTREAM),
],
[
# Block everything except frames that trigger LLM inference.
@@ -302,30 +306,23 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
)
]
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
messages = [
{
@@ -334,8 +331,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm_main.create_context_aggregator(context)
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
# LLM + turn detection (with an extra LLM as a judge)
llm = TurnDetectionLLM(llm_main)
@@ -369,7 +366,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_app_message")
async def on_app_message(transport, message):
async def on_app_message(transport, message, sender):
logger.debug(f"Received app message: {message}")
if "message" not in message:
return

View File

@@ -9,8 +9,9 @@ import os
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
CancelFrame,
@@ -19,6 +20,7 @@ from pipecat.frames.frames import (
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InterruptionFrame,
LLMContextFrame,
LLMRunFrame,
StartFrame,
SystemFrame,
@@ -32,10 +34,8 @@ from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.user_idle_processor import UserIdleProcessor
@@ -272,11 +272,11 @@ class StatementJudgeContextFilter(FrameProcessor):
await self.push_frame(frame, direction)
return
# We only want to handle OpenAILLMContextFrames, and only want to push through a simplified
# We only want to handle LLMContextFrames, and only want to push through a simplified
# context frame that contains a system prompt and the most recent user messages,
if isinstance(frame, OpenAILLMContextFrame):
if isinstance(frame, LLMContextFrame):
# Take text content from the most recent user messages.
messages = frame.context.messages
messages = frame.context.get_messages()
user_text_messages = []
last_assistant_message = None
for message in reversed(messages):
@@ -303,7 +303,7 @@ class StatementJudgeContextFilter(FrameProcessor):
if last_assistant_message:
messages.append(last_assistant_message)
messages.append({"role": "user", "content": user_message})
await self.push_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
await self.push_frame(LLMContextFrame(LLMContext(messages)))
class CompletenessCheck(FrameProcessor):
@@ -425,12 +425,15 @@ class TurnDetectionLLM(Pipeline):
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
isinstance(frame, LLMContextFrame)
or isinstance(frame, InterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
or isinstance(frame, FunctionCallResultFrame)
)
async def filter_all(frame):
return False
super().__init__(
[
ParallelPipeline(
@@ -440,12 +443,13 @@ class TurnDetectionLLM(Pipeline):
FunctionFilter(filter=block_user_stopped_speaking),
],
[
# Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed
# Ignore everything except an LLMContextFrame. Pass a specially constructed
# simplified context frame to the statement classifier LLM. The only frame this
# sub-pipeline will output is a UserStoppedSpeakingFrame.
statement_judge_context_filter,
statement_llm,
completeness_check,
FunctionFilter(filter=filter_all, direction=FrameDirection.UPSTREAM),
],
[
# Block everything except frames that trigger LLM inference.
@@ -505,30 +509,23 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
)
]
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
messages = [
{
@@ -537,8 +534,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm_main.create_context_aggregator(context)
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
# LLM + turn detection (with an extra LLM as a judge)
llm = TurnDetectionLLM(llm_main)
@@ -577,7 +574,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_app_message")
async def on_app_message(transport, message):
async def on_app_message(transport, message, sender):
logger.debug(f"Received app message: {message}")
if "message" not in message:
return

View File

@@ -9,7 +9,6 @@ import os
import time
from dotenv import load_dotenv
from google.genai.types import Content, Part
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -21,6 +20,7 @@ from pipecat.frames.frames import (
FunctionCallResultFrame,
InputAudioRawFrame,
InterruptionFrame,
LLMContextFrame,
LLMFullResponseStartFrame,
LLMRunFrame,
StartFrame,
@@ -34,20 +34,18 @@ from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMAssistantResponseAggregator,
)
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.google.llm import GoogleLLMContext, GoogleLLMService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.llm_service import LLMService
from pipecat.sync.base_notifier import BaseNotifier
from pipecat.sync.event_notifier import EventNotifier
@@ -375,7 +373,7 @@ class AudioAccumulator(FrameProcessor):
await super().process_frame(frame, direction)
# ignore context frame
if isinstance(frame, OpenAILLMContextFrame):
if isinstance(frame, LLMContextFrame):
return
if isinstance(frame, TranscriptionFrame):
@@ -392,9 +390,9 @@ class AudioAccumulator(FrameProcessor):
f"Processing audio buffer seconds: ({len(self._audio_frames)}) ({len(data)}) {len(data) / 2 / 16000}"
)
self._user_speaking = False
context = GoogleLLMContext()
context = LLMContext()
context.add_audio_frames_message(audio_frames=self._audio_frames)
await self.push_frame(OpenAILLMContextFrame(context=context))
await self.push_frame(LLMContextFrame(context=context))
elif isinstance(frame, InputAudioRawFrame):
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
# frames as necessary.
@@ -513,7 +511,7 @@ class LLMAggregatorBuffer(LLMAssistantResponseAggregator):
class ConversationAudioContextAssembler(FrameProcessor):
"""Takes the single-message context generated by the AudioAccumulator and adds it to the conversation LLM's context."""
def __init__(self, context: OpenAILLMContext, **kwargs):
def __init__(self, context: LLMContext, **kwargs):
super().__init__(**kwargs)
self._context = context
@@ -525,11 +523,10 @@ class ConversationAudioContextAssembler(FrameProcessor):
await self.push_frame(frame, direction)
return
if isinstance(frame, OpenAILLMContextFrame):
GoogleLLMContext.upgrade_to_google(self._context)
last_message = frame.context.messages[-1]
if isinstance(frame, LLMContextFrame):
last_message = frame.context.get_messages()[-1]
self._context._messages.append(last_message)
await self.push_frame(OpenAILLMContextFrame(context=self._context))
await self.push_frame(LLMContextFrame(context=self._context))
class OutputGate(FrameProcessor):
@@ -543,7 +540,7 @@ class OutputGate(FrameProcessor):
def __init__(
self,
notifier: BaseNotifier,
context: OpenAILLMContext,
context: LLMContext,
llm_transcription_buffer: LLMAggregatorBuffer,
**kwargs,
):
@@ -610,19 +607,23 @@ class OutputGate(FrameProcessor):
self._gate_task = None
async def _gate_task_handler(self):
await self._notifier.wait()
while True:
try:
await self._notifier.wait()
transcription = await self._transcription_buffer.wait_for_transcription() or "-"
self._context.add_message(Content(role="user", parts=[Part(text=transcription)]))
transcription = await self._transcription_buffer.wait_for_transcription() or "-"
self._context.add_message({"role": "user", "content": transcription})
self.open_gate()
for frame, direction in self._frames_buffer:
await self.push_frame(frame, direction)
self._frames_buffer = []
self.open_gate()
for frame, direction in self._frames_buffer:
await self.push_frame(frame, direction)
self._frames_buffer = []
except asyncio.CancelledError:
break
class TurnDetectionLLM(Pipeline):
def __init__(self, llm: LLMService, context: OpenAILLMContext):
def __init__(self, llm: LLMService, context: LLMContext):
# This is the LLM that will transcribe user speech.
tx_llm = GoogleLLMService(
name="Transcriber",
@@ -648,10 +649,10 @@ class TurnDetectionLLM(Pipeline):
# as complete or incomplete.
# statement_judge_context_filter = StatementJudgeAudioContextAccumulator(notifier=notifier)
audio_accumulater = AudioAccumulator()
audio_accumulator = AudioAccumulator()
# This sends a UserStoppedSpeakingFrame and triggers the notifier event
completeness_check = CompletenessCheck(
notifier=notifier, audio_accumulator=audio_accumulater
notifier=notifier, audio_accumulator=audio_accumulator
)
async def block_user_stopped_speaking(frame):
@@ -667,7 +668,7 @@ class TurnDetectionLLM(Pipeline):
super().__init__(
[
audio_accumulater,
audio_accumulator,
ParallelPipeline(
[
# Pass everything except UserStoppedSpeaking to the elements after
@@ -734,8 +735,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
system_instruction=conversation_system_instruction,
)
context = OpenAILLMContext()
context_aggregator = conversation_llm.create_context_aggregator(context)
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(context)
llm = TurnDetectionLLM(conversation_llm, context)
@@ -761,12 +762,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_app_message")
async def on_app_message(transport, message):
logger.debug(f"Received app message: {message}")
async def on_app_message(transport, message, sender):
logger.debug(f"Received app message: {message}, sender: {sender}") # TODO: revert
if "message" not in message:
return

View File

@@ -8,13 +8,13 @@ import os
from dataclasses import dataclass
from dotenv import load_dotenv
from google.genai.types import Content, Part
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMRunFrame,
SystemFrame,
@@ -27,15 +27,13 @@ from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.google.llm import GoogleLLMContext, GoogleLLMService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -101,9 +99,7 @@ class UserAudioCollector(FrameProcessor):
elif isinstance(frame, UserStoppedSpeakingFrame):
self._user_speaking = False
self._context.add_audio_frames_message(audio_frames=self._audio_frames)
await self._user_context_aggregator.push_frame(
self._user_context_aggregator.get_context_frame()
)
await self._user_context_aggregator.push_frame(LLMContextFrame(context=self._context))
elif isinstance(frame, InputAudioRawFrame):
if self._user_speaking:
self._audio_frames.append(frame)
@@ -121,10 +117,10 @@ class UserAudioCollector(FrameProcessor):
class InputTranscriptionContextFilter(FrameProcessor):
"""This FrameProcessor blocks all frames except the OpenAILLMContextFrame that triggers
"""This FrameProcessor blocks all frames except the LLMContextFrame that triggers
LLM inference. (And system frames, which are needed for the pipeline element lifecycle.)
We take the context object out of the OpenAILLMContextFrame and use it to create a new
We take the context object out of the LLMContextFrame and use it to create a new
context object that we will send to the transcriber LLM.
"""
@@ -136,52 +132,54 @@ class InputTranscriptionContextFilter(FrameProcessor):
await self.push_frame(frame, direction)
return
if not isinstance(frame, OpenAILLMContextFrame):
if not isinstance(frame, LLMContextFrame):
return
try:
# Make sure we're working with a GoogleLLMContext
context = GoogleLLMContext.upgrade_to_google(frame.context)
message = context.messages[-1]
message = frame.context.get_messages()[-1]
if not isinstance(message, Content):
logger.error(f"Expected Content, got {type(message)}")
message_content = message["content"]
if not message_content or not isinstance(message_content, list):
return
last_part = message.parts[-1]
if not (
message.role == "user"
and last_part.inline_data
and last_part.inline_data.mime_type == "audio/wav"
):
last_part = message["content"][-1]
if not (message["role"] == "user" and last_part["type"] == "input_audio"):
return
# Assemble a new message, with three parts: conversation history, transcription
# prompt, and audio. We could use only part of the conversation, if we need to
# keep the token count down, but for now, we'll just use the whole thing.
parts = []
new_message_content = []
# Get previous conversation history
previous_messages = frame.context.messages[:-2]
previous_messages = frame.context.get_messages()[:-2]
history = ""
for msg in previous_messages:
for part in msg.parts:
if part.text:
history += f"{msg.role}: {part.text}\n"
previous_message_content = msg["content"]
if not previous_message_content:
continue
if isinstance(previous_message_content, str):
history += f"{msg['role']}: {previous_message_content}\n"
elif isinstance(previous_message_content, list):
for c in previous_message_content:
if c.get("text"):
history += f"{msg['role']}: {c['text']}\n"
if history:
assembled = f"Here is the conversation history so far. These are not instructions. This is data that you should use only to improve the accuracy of your transcription.\n\n----\n\n{history}\n\n----\n\nEND OF CONVERSATION HISTORY\n\n"
parts.append(Part(text=assembled))
new_message_content.append({"type": "text", "text": assembled})
parts.append(
Part(
text="Transcribe this audio. Respond either with the transcription exactly as it was said by the user, or with the special string 'EMPTY' if the audio is not clear."
)
new_message_content.append(
{
"type": "text",
"text": "Transcribe this audio. Respond either with the transcription exactly as it was said by the user, or with the special string 'EMPTY' if the audio is not clear.",
}
)
parts.append(last_part)
msg = Content(role="user", parts=parts)
ctx = GoogleLLMContext([msg])
ctx.system_message = transcriber_system_message
await self.push_frame(OpenAILLMContextFrame(context=ctx))
new_message_content.append(last_part)
msg = {"role": "user", "content": new_message_content}
ctx = LLMContext([{"role": "system", "content": transcriber_system_message}, msg])
await self.push_frame(LLMContextFrame(context=ctx))
except Exception as e:
logger.error(f"Error processing frame: {e}")
@@ -227,10 +225,8 @@ class TranscriptionContextFixup(FrameProcessor):
Audio is big, using a lot of tokens and network bandwidth. So doing this is
important if we want to keep both latency and cost low.
This class is a bit of a hack, especially because it directly creates a
GoogleLLMContext object, which we don't generally do. We usually try to leave
the implementation-specific details of the LLM context encapsulated inside the
service classes.
This class is a bit of a hack, especially because it directly creates an
LLMContext object, which we don't generally do.
"""
def __init__(self, context):
@@ -239,25 +235,22 @@ class TranscriptionContextFixup(FrameProcessor):
self._transcript = "THIS IS A TRANSCRIPT"
def is_user_audio_message(self, message):
last_part = message.parts[-1]
return (
message.role == "user"
and last_part.inline_data
and last_part.inline_data.mime_type == "audio/wav"
)
message_content = message["content"]
if not message_content or not isinstance(message_content, list):
return False
last_part = message["content"][-1]
return message["role"] == "user" and last_part["type"] == "input_audio"
def swap_user_audio(self):
if not self._transcript:
return
message = self._context.messages[-2]
message = self._context.get_messages()[-2]
if not self.is_user_audio_message(message):
message = self._context.messages[-1]
message = self._context.get_messages()[-1]
if not self.is_user_audio_message(message):
return
audio_part = message.parts[-1]
audio_part.inline_data = None
audio_part.text = self._transcript
message["content"] = self._transcript
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
@@ -327,8 +320,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
},
]
context = OpenAILLMContext(messages)
context_aggregator = conversation_llm.create_context_aggregator(context)
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
audio_collector = UserAudioCollector(context, context_aggregator.user())
input_transcription_context_filter = InputTranscriptionContextFilter()
transcription_frames_emitter = InputTranscriptionFrameEmitter()

View File

@@ -29,6 +29,10 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.utils.string import match_endofsentence
logger.info("Loading Whisker debugger...")
from pipecat_whisker import WhiskerObserver
load_dotenv(override=True)
@@ -52,6 +56,8 @@ class TranscriptHandler:
"""
self.messages: List[TranscriptionMessage] = []
self.output_file: Optional[str] = output_file
self._current_user_sentence = ""
self._current_assistant_sentence = ""
logger.debug(
f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}"
)
@@ -78,11 +84,29 @@ class TranscriptHandler:
except Exception as e:
logger.error(f"Error saving transcript message to file: {e}")
async def _save_sentence(self, role: str, content: str, timestamp: Optional[str] = None):
"""Save a complete sentence as a transcript message.
Args:
role: The role (user/assistant)
content: The complete sentence content
timestamp: Optional timestamp
"""
# Cast role to the appropriate literal type
message_role = "user" if role == "user" else "assistant"
sentence_message = TranscriptionMessage(
role=message_role, content=content.strip(), timestamp=timestamp
)
self.messages.append(sentence_message)
await self.save_message(sentence_message)
async def on_transcript_update(
self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame
):
"""Handle new transcript messages.
Aggregates messages into complete sentences before saving them using match_endofsentence.
Args:
processor: The TranscriptProcessor that emitted the update
frame: TranscriptionUpdateFrame containing new messages
@@ -90,8 +114,31 @@ class TranscriptHandler:
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
for msg in frame.messages:
self.messages.append(msg)
await self.save_message(msg)
# Accumulate text for the appropriate role
if msg.role == "user":
self._current_user_sentence += msg.content + " "
# Check if we have a complete sentence
if match_endofsentence(self._current_user_sentence):
await self._save_sentence("user", self._current_user_sentence, msg.timestamp)
self._current_user_sentence = ""
elif msg.role == "assistant":
self._current_assistant_sentence += msg.content + " "
# Check if we have a complete sentence
if match_endofsentence(self._current_assistant_sentence):
await self._save_sentence(
"assistant", self._current_assistant_sentence, msg.timestamp
)
self._current_assistant_sentence = ""
async def finalize_partial_sentences(self):
"""Save any remaining partial sentences when the conversation ends."""
if self._current_user_sentence.strip():
await self._save_sentence("user", self._current_user_sentence)
self._current_user_sentence = ""
if self._current_assistant_sentence.strip():
await self._save_sentence("assistant", self._current_assistant_sentence)
self._current_assistant_sentence = ""
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
@@ -160,12 +207,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
]
)
# Create Whisker debugger observer
whisker = WhiskerObserver(pipeline)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[whisker],
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@@ -183,6 +234,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
# Finalize any partial sentences before canceling
await transcript_handler.finalize_partial_sentences()
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)

View File

@@ -0,0 +1,113 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response_universal import (
LLMContext,
LLMContextAggregatorPair,
)
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.transports.heygen.transport import HeyGenParams, HeyGenTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
transport = HeyGenTransport(
api_key=os.getenv("HEYGEN_API_KEY"),
session=session,
params=HeyGenParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="00967b2f-88a6-4a31-8153-110a92134b9f",
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful assistant. Your output will be converted to audio so don't include special characters in your answers. Be succinct and respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append(
{
"role": "system",
"content": "Start by saying 'Hello' and then a short greeting.",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -14,13 +14,12 @@ from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnal
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector
from pipecat.frames.frames import EndTaskFrame, TTSSpeakFrame
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService

View File

@@ -93,7 +93,7 @@ riva = [ "nvidia-riva-client~=2.21.1" ]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.117.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"]
sambanova = []
sarvam = [ "pipecat-ai[websockets-base]" ]
sentry = [ "sentry-sdk~=2.23.1" ]
sentry = [ "sentry-sdk>=2.28.0,<3" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ]
local-smart-turn-v3 = [ "transformers", "onnxruntime>=1.20.1,<2" ]
remote-smart-turn = []
@@ -107,7 +107,7 @@ tavus=[]
together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]
ultravox = [ "transformers>=4.48.0", "vllm>=0.9.0" ]
webrtc = [ "aiortc~=1.13.0", "opencv-python~=4.11.0.86" ]
webrtc = [ "aiortc>=1.13.0,<2", "opencv-python>=4.11.0.86,<5" ]
websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<0.117.0" ]
websockets-base = [ "websockets>=13.1,<16.0" ]
whisper = [ "faster-whisper~=1.1.1" ]

View File

@@ -83,6 +83,7 @@ TESTS_07 = [
("07k-interruptible-lmnt.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07l-interruptible-groq.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07m-interruptible-aws.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07m-interruptible-aws-strands.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("07n-interruptible-gemini.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07n-interruptible-google.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07o-interruptible-assemblyai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),

View File

@@ -105,6 +105,8 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
if item["type"] == "image_url":
if item["image_url"]["url"].startswith("data:image/"):
item["image_url"]["url"] = "data:image/..."
if item["type"] == "input_audio":
item["input_audio"]["data"] = "..."
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
msg["data"] = "..."
msgs.append(msg)

View File

@@ -36,7 +36,8 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.services.llm_service import LLMService
from pipecat.sync.base_notifier import BaseNotifier
@@ -614,8 +615,8 @@ VOICEMAIL SYSTEM (respond "VOICEMAIL"):
]
# Create the LLM context and aggregators for conversation management
self._context = OpenAILLMContext(self._messages)
self._context_aggregator = llm.create_context_aggregator(self._context)
self._context = LLMContext(self._messages)
self._context_aggregator = LLMContextAggregatorPair(self._context)
# Create notification system for coordinating between components
self._gate_notifier = EventNotifier() # Signals classification completion

View File

@@ -229,9 +229,12 @@ class AudioBufferProcessor(FrameProcessor):
# Save time of frame so we can compute silence.
self._last_bot_frame_at = time.time()
if self._buffer_size > 0 and len(self._user_audio_buffer) > self._buffer_size:
if self._buffer_size > 0 and (
len(self._user_audio_buffer) >= self._buffer_size
or len(self._bot_audio_buffer) >= self._buffer_size
):
await self._call_on_audio_data_handler()
self._reset_recording()
self._reset_primary_audio_buffers()
# Process turn recording with preprocessed data.
if self._enable_turn_audio:
@@ -272,9 +275,15 @@ class AudioBufferProcessor(FrameProcessor):
async def _call_on_audio_data_handler(self):
"""Call the audio data event handlers with buffered audio."""
if not self.has_audio() or not self._recording:
if not self._recording:
return
if len(self._user_audio_buffer) == 0 and len(self._bot_audio_buffer) == 0:
return
self._align_track_buffers()
flush_time = time.time()
# Call original handler with merged audio
merged_audio = self.merge_audio_buffers()
await self._call_event_handler(
@@ -290,23 +299,49 @@ class AudioBufferProcessor(FrameProcessor):
self._num_channels,
)
self._last_user_frame_at = flush_time
self._last_bot_frame_at = flush_time
def _buffer_has_audio(self, buffer: bytearray) -> bool:
"""Check if a buffer contains audio data."""
return buffer is not None and len(buffer) > 0
def _reset_recording(self):
"""Reset recording state and buffers."""
self._reset_audio_buffers()
self._reset_all_audio_buffers()
self._last_user_frame_at = time.time()
self._last_bot_frame_at = time.time()
def _reset_audio_buffers(self):
def _reset_all_audio_buffers(self):
"""Reset all audio buffers to empty state."""
self._reset_primary_audio_buffers()
self._reset_turn_audio_buffers()
def _reset_primary_audio_buffers(self):
"""Clear user and bot buffers while preserving turn buffers and timestamps."""
self._user_audio_buffer = bytearray()
self._bot_audio_buffer = bytearray()
def _reset_turn_audio_buffers(self):
"""Clear user and bot turn buffers while preserving primary buffers and timestamps."""
self._user_turn_audio_buffer = bytearray()
self._bot_turn_audio_buffer = bytearray()
def _align_track_buffers(self):
"""Pad the shorter track with silence so both tracks stay in sync."""
user_len = len(self._user_audio_buffer)
bot_len = len(self._bot_audio_buffer)
if user_len == bot_len:
return
target_len = max(user_len, bot_len)
if user_len < target_len:
self._user_audio_buffer.extend(b"\x00" * (target_len - user_len))
self._last_user_frame_at = max(self._last_user_frame_at, self._last_bot_frame_at)
if bot_len < target_len:
self._bot_audio_buffer.extend(b"\x00" * (target_len - bot_len))
self._last_bot_frame_at = max(self._last_bot_frame_at, self._last_user_frame_at)
async def _resample_input_audio(self, frame: InputAudioRawFrame) -> bytes:
"""Resample audio frame to the target sample rate."""
return await self._input_resampler.resample(

View File

@@ -455,9 +455,13 @@ class FrameProcessor(BaseObject):
name = f"{self}::{coroutine.cr_code.co_name}"
return self.task_manager.create_task(coroutine, name)
async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None):
async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = 1.0):
"""Cancel a task managed by this processor.
A default timeout if 1 second is used in order to avoid potential
freezes caused by certain libraries that swallow
`asyncio.CancelledError`.
Args:
task: The task to cancel.
timeout: Optional timeout for task cancellation.

View File

@@ -10,6 +10,7 @@ from loguru import logger
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
@@ -71,9 +72,11 @@ class StrandsAgentsProcessor(FrameProcessor):
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, OpenAILLMContextFrame):
text = frame.context.messages[-1]["content"]
await self._ainvoke(str(text).strip())
if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
messages = frame.context.get_messages()
if messages:
last_message = messages[-1]
await self._ainvoke(str(last_message["content"]).strip())
else:
await self.push_frame(frame, direction)

View File

@@ -61,6 +61,8 @@ class TwilioFrameSerializer(FrameSerializer):
call_sid: Optional[str] = None,
account_sid: Optional[str] = None,
auth_token: Optional[str] = None,
region: Optional[str] = None,
edge: Optional[str] = None,
params: Optional[InputParams] = None,
):
"""Initialize the TwilioFrameSerializer.
@@ -70,13 +72,42 @@ class TwilioFrameSerializer(FrameSerializer):
call_sid: The associated Twilio Call SID (optional, but required for auto hang-up).
account_sid: Twilio account SID (required for auto hang-up).
auth_token: Twilio auth token (required for auto hang-up).
region: Twilio region (e.g., "au1", "ie1"). Must be specified with edge.
edge: Twilio edge location (e.g., "sydney", "dublin"). Must be specified with region.
params: Configuration parameters.
"""
self._params = params or TwilioFrameSerializer.InputParams()
# Validate hangup-related parameters if auto_hang_up is enabled
if self._params.auto_hang_up:
# Validate required credentials
missing_credentials = []
if not call_sid:
missing_credentials.append("call_sid")
if not account_sid:
missing_credentials.append("account_sid")
if not auth_token:
missing_credentials.append("auth_token")
if missing_credentials:
raise ValueError(
f"auto_hang_up is enabled but missing required parameters: {', '.join(missing_credentials)}"
)
# Validate region and edge are both provided if either is specified
if (region and not edge) or (edge and not region):
raise ValueError(
"Both edge and region parameters are required if one is set. "
f"Twilio's FQDN format requires both: api.{{edge}}.{{region}}.twilio.com. "
f"Got: region='{region}', edge='{edge}'"
)
self._stream_sid = stream_sid
self._call_sid = call_sid
self._account_sid = account_sid
self._auth_token = auth_token
self._params = params or TwilioFrameSerializer.InputParams()
self._region = region
self._edge = edge
self._twilio_sample_rate = self._params.twilio_sample_rate
self._sample_rate = 0 # Pipeline input rate
@@ -158,25 +189,14 @@ class TwilioFrameSerializer(FrameSerializer):
account_sid = self._account_sid
auth_token = self._auth_token
call_sid = self._call_sid
region = self._region
edge = self._edge
if not call_sid or not account_sid or not auth_token:
missing = []
if not call_sid:
missing.append("call_sid")
if not account_sid:
missing.append("account_sid")
if not auth_token:
missing.append("auth_token")
logger.warning(
f"Cannot hang up Twilio call: missing required parameters: {', '.join(missing)}"
)
return
region_prefix = f"{region}." if region else ""
edge_prefix = f"{edge}." if edge else ""
# Twilio API endpoint for updating calls
endpoint = (
f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json"
)
endpoint = f"https://api.{edge_prefix}{region_prefix}twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json"
# Create basic auth from account_sid and auth_token
auth = aiohttp.BasicAuth(account_sid, auth_token)

View File

@@ -108,12 +108,14 @@ class HeyGenSession(BaseModel):
Parameters:
session_id (str): Unique identifier for the streaming session.
access_token (str): Token for accessing the session securely.
livekit_agent_token (str): Token for HeyGens audio agents(Pipecat).
realtime_endpoint (str): Real-time communication endpoint URL.
url (str): Direct URL for the session.
"""
session_id: str
access_token: str
livekit_agent_token: str
realtime_endpoint: str
url: str

View File

@@ -393,7 +393,9 @@ class HeyGenClient:
participant_id: Identifier of the participant to capture audio from
callback: Async function to handle received audio frames
"""
logger.debug(f"capture_participant_audio: {participant_id}")
logger.debug(
f"capture_participant_audio: {participant_id}, sample_rate: {self._in_sample_rate}"
)
self._audio_frame_callback = callback
if self._audio_task is not None:
logger.warning(
@@ -407,7 +409,9 @@ class HeyGenClient:
for track_pub in participant.track_publications.values():
if track_pub.kind == rtc.TrackKind.KIND_AUDIO and track_pub.track is not None:
logger.debug(f"Starting audio capture for existing track: {track_pub.sid}")
audio_stream = rtc.AudioStream(track_pub.track)
audio_stream = rtc.AudioStream(
track=track_pub.track, sample_rate=self._in_sample_rate
)
self._audio_task = self._task_manager.create_task(
self._process_audio_frames(audio_stream), name="HeyGenClient_Receive_Audio"
)
@@ -536,7 +540,7 @@ class HeyGenClient:
and self._audio_task is None
):
logger.debug(f"Creating audio stream processor for track: {publication.sid}")
audio_stream = rtc.AudioStream(track)
audio_stream = rtc.AudioStream(track=track, sample_rate=self._in_sample_rate)
self._audio_task = self._task_manager.create_task(
self._process_audio_frames(audio_stream), name="HeyGenClient_Receive_Audio"
)
@@ -559,7 +563,7 @@ class HeyGenClient:
)
await self._livekit_room.connect(
self._heyGen_session.url, self._heyGen_session.access_token
self._heyGen_session.url, self._heyGen_session.livekit_agent_token
)
logger.debug(f"Successfully connected to LiveKit room: {self._livekit_room.name}")
logger.debug(f"Local participant SID: {self._livekit_room.local_participant.sid}")

View File

@@ -110,6 +110,7 @@ class HeyGenVideoService(AIService):
api_key=self._api_key,
session=self._session,
params=TransportParams(
audio_in_sample_rate=48000,
audio_in_enabled=True,
video_in_enabled=True,
audio_out_enabled=True,

View File

@@ -0,0 +1,381 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""HeyGen implementation for Pipecat.
This module provides integration with the HeyGen platform for creating conversational
AI applications with avatars. It manages conversation sessions and provides real-time
audio/video streaming capabilities through the HeyGen API.
The module consists of three main components:
- HeyGenInputTransport: Handles incoming audio and events from HeyGen conversations
- HeyGenOutputTransport: Manages outgoing audio and events to HeyGen conversations
- HeyGenTransport: Main transport implementation that coordinates input/output transports
"""
from typing import Any, Optional
import aiohttp
from loguru import logger
from pipecat.frames.frames import (
AudioRawFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InterruptionFrame,
OutputAudioRawFrame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.services.heygen.api import NewSessionRequest
from pipecat.services.heygen.client import HeyGenCallbacks, HeyGenClient
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
class HeyGenInputTransport(BaseInputTransport):
"""Input transport for receiving audio and events from HeyGen conversations.
Handles incoming audio streams from participants and manages audio capture
from the Daily room connected to the HeyGen conversation.
"""
def __init__(
self,
client: HeyGenClient,
params: TransportParams,
**kwargs,
):
"""Initialize the HeyGen input transport.
Args:
client: The HeyGen transport client instance.
params: Transport configuration parameters.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(params, **kwargs)
self._client = client
self._params = params
# Whether we have seen a StartFrame already.
self._initialized = False
async def setup(self, setup: FrameProcessorSetup):
"""Setup the input transport.
Args:
setup: The frame processor setup configuration.
"""
await super().setup(setup)
await self._client.setup(setup)
async def cleanup(self):
"""Cleanup input transport resources."""
await super().cleanup()
await self._client.cleanup()
async def start(self, frame: StartFrame):
"""Start the input transport.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
if self._initialized:
return
self._initialized = True
await self.set_transport_ready(frame)
async def stop(self, frame: EndFrame):
"""Stop the input transport.
Args:
frame: The end frame signaling transport shutdown.
"""
await super().stop(frame)
await self._client.stop()
async def cancel(self, frame: CancelFrame):
"""Cancel the input transport.
Args:
frame: The cancel frame signaling immediate cancellation.
"""
await super().cancel(frame)
await self._client.stop()
async def start_capturing_audio(self, participant_id: str):
"""Start capturing audio from a participant.
Args:
participant_id: The participant to capture audio from.
"""
if self._params.audio_in_enabled:
logger.info(f"HeyGenTransport start capturing audio for participant {participant_id}")
await self._client.capture_participant_audio(
participant_id, self._on_participant_audio_data
)
async def _on_participant_audio_data(self, audio_frame: AudioRawFrame):
"""Handle received participant audio data."""
frame = InputAudioRawFrame(
audio=audio_frame.audio,
sample_rate=audio_frame.sample_rate,
num_channels=audio_frame.num_channels,
)
await self.push_audio_frame(frame)
class HeyGenOutputTransport(BaseOutputTransport):
"""Output transport for sending audio and events to HeyGen conversations.
Handles outgoing audio streams to participants and manages the custom
audio track expected by the HeyGen platform.
"""
def __init__(
self,
client: HeyGenClient,
params: TransportParams,
**kwargs,
):
"""Initialize the HeyGen output transport.
Args:
client: The HeyGen transport client instance.
params: Transport configuration parameters.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(params, **kwargs)
self._client = client
self._params = params
# Whether we have seen a StartFrame already.
self._initialized = False
self._event_id = None
async def setup(self, setup: FrameProcessorSetup):
"""Setup the output transport.
Args:
setup: The frame processor setup configuration.
"""
await super().setup(setup)
await self._client.setup(setup)
async def cleanup(self):
"""Cleanup output transport resources."""
await super().cleanup()
await self._client.cleanup()
async def start(self, frame: StartFrame):
"""Start the output transport.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
if self._initialized:
return
self._initialized = True
await self._client.start(frame, self.audio_chunk_size)
await self.set_transport_ready(frame)
self._client.transport_ready()
async def stop(self, frame: EndFrame):
"""Stop the output transport.
Args:
frame: The end frame signaling transport shutdown.
"""
await super().stop(frame)
await self._client.stop()
async def cancel(self, frame: CancelFrame):
"""Cancel the output transport.
Args:
frame: The cancel frame signaling immediate cancellation.
"""
await super().cancel(frame)
await self._client.stop()
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push a frame to the next processor in the pipeline.
Args:
frame: The frame to push.
direction: The direction to push the frame.
"""
# The BotStartedSpeakingFrame and BotStoppedSpeakingFrame are created inside BaseOutputTransport
# This is a workaround, so we can more reliably be aware when the bot has started or stopped speaking
if direction == FrameDirection.DOWNSTREAM:
if isinstance(frame, BotStartedSpeakingFrame):
if self._event_id is not None:
logger.warning("self._event_id is already defined!")
self._event_id = str(frame.id)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._client.agent_speak_end(self._event_id)
self._event_id = None
await super().push_frame(frame, direction)
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames and handle interruptions.
Handles various types of frames including interruption events and user speaking states.
Updates the HeyGen client state based on the received frames.
Args:
frame: The frame to process
direction: The direction of frame flow in the pipeline
Note:
Special handling is implemented for:
- InterruptionFrame: Triggers interruption of current speech
- UserStartedSpeakingFrame: Initiates agent listening mode
- UserStoppedSpeakingFrame: Stops agent listening mode
"""
await super().process_frame(frame, direction)
if isinstance(frame, InterruptionFrame):
await self._client.interrupt(self._event_id)
await self.push_frame(frame, direction)
if isinstance(frame, UserStartedSpeakingFrame):
await self._client.start_agent_listening()
await self.push_frame(frame, direction)
elif isinstance(frame, UserStoppedSpeakingFrame):
await self._client.stop_agent_listening()
await self.push_frame(frame, direction)
async def write_audio_frame(self, frame: OutputAudioRawFrame) -> bool:
"""Write an audio frame to the HeyGen transport.
Args:
frame: The audio frame to write.
"""
await self._client.agent_speak(bytes(frame.audio), self._event_id)
return True
class HeyGenParams(TransportParams):
"""Configuration parameters for the HeyGen transport.
Parameters:
audio_in_enabled: Whether to enable audio input from participants.
audio_out_enabled: Whether to enable audio output to participants.
"""
audio_in_enabled: bool = True
audio_out_enabled: bool = True
class HeyGenTransport(BaseTransport):
"""Transport implementation for HeyGen video calls.
When used, the Pipecat bot joins the same virtual room as the HeyGen Avatar and the user.
This is achieved by using `HeyGenTransport`, which initiates the conversation via
`HeyGenApi` and obtains a room URL that all participants connect to.
"""
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
params: HeyGenParams = HeyGenParams(),
input_name: Optional[str] = None,
output_name: Optional[str] = None,
session_request: NewSessionRequest = NewSessionRequest(
avatar_id="Shawn_Therapist_public",
version="v2",
),
):
"""Initialize the HeyGen transport.
Sets up a new HeyGen transport instance with the specified configuration for
handling video calls between the Pipecat bot and HeyGen Avatar.
Args:
session: aiohttp session for making async HTTP requests
api_key: HeyGen API key for authentication
params: HeyGen-specific configuration parameters (default: HeyGenParams())
input_name: Optional custom name for the input transport
output_name: Optional custom name for the output transport
session_request: Configuration for the HeyGen session (default: uses Shawn_Therapist_public avatar)
Note:
The transport will automatically join the same virtual room as the HeyGen Avatar
and user through the HeyGenClient, which handles session initialization via HeyGenApi.
"""
super().__init__(input_name=input_name, output_name=output_name)
self._params = params
self._client = HeyGenClient(
api_key=api_key,
session=session,
params=params,
session_request=session_request,
callbacks=HeyGenCallbacks(
on_participant_connected=self._on_participant_connected,
on_participant_disconnected=self._on_participant_disconnected,
),
)
self._input: Optional[HeyGenInputTransport] = None
self._output: Optional[HeyGenOutputTransport] = None
self._HeyGen_participant_id = None
# Register supported handlers. The user will only be able to register
# these handlers.
self._register_event_handler("on_client_connected")
self._register_event_handler("on_client_disconnected")
async def _on_participant_disconnected(self, participant_id: str):
logger.debug(f"HeyGen participant {participant_id} disconnected")
if participant_id != "heygen":
await self._on_client_disconnected(participant_id)
async def _on_participant_connected(self, participant_id: str):
logger.debug(f"HeyGen participant {participant_id} connected")
if participant_id != "heygen":
await self._on_client_connected(participant_id)
if self._input:
await self._input.start_capturing_audio(participant_id)
def input(self) -> FrameProcessor:
"""Get the input transport for receiving media and events.
Returns:
The HeyGen input transport instance.
"""
if not self._input:
self._input = HeyGenInputTransport(client=self._client, params=self._params)
return self._input
def output(self) -> FrameProcessor:
"""Get the output transport for sending media and events.
Returns:
The HeyGen output transport instance.
"""
if not self._output:
self._output = HeyGenOutputTransport(client=self._client, params=self._params)
return self._output
async def _on_client_connected(self, participant: Any):
"""Handle client connected events."""
await self._call_event_handler("on_client_connected", participant)
async def _on_client_disconnected(self, participant: Any):
"""Handle client disconnected events."""
await self._call_event_handler("on_client_disconnected", participant)

View File

@@ -66,7 +66,7 @@ class SmallWebRTCCallbacks(BaseModel):
on_client_disconnected: Called when a client disconnects.
"""
on_app_message: Callable[[Any], Awaitable[None]]
on_app_message: Callable[[Any, str], Awaitable[None]]
on_client_connected: Callable[[SmallWebRTCConnection], Awaitable[None]]
on_client_disconnected: Callable[[SmallWebRTCConnection], Awaitable[None]]
@@ -254,7 +254,7 @@ class SmallWebRTCClient:
@self._webrtc_connection.event_handler("app-message")
async def on_app_message(connection: SmallWebRTCConnection, message: Any):
await self._handle_app_message(message)
await self._handle_app_message(message, connection.pc_id)
def _convert_frame(self, frame_array: np.ndarray, format_name: str) -> np.ndarray:
"""Convert a video frame to RGB format based on the input format.
@@ -512,9 +512,9 @@ class SmallWebRTCClient:
if not self._closing:
await self._callbacks.on_client_disconnected(self._webrtc_connection)
async def _handle_app_message(self, message: Any):
async def _handle_app_message(self, message: Any, sender: str):
"""Handle incoming application messages."""
await self._callbacks.on_app_message(message)
await self._callbacks.on_app_message(message, sender)
def _can_send(self):
"""Check if the connection is ready for sending data."""
@@ -935,11 +935,11 @@ class SmallWebRTCTransport(BaseTransport):
if self._output:
await self._output.queue_frame(frame, FrameDirection.DOWNSTREAM)
async def _on_app_message(self, message: Any):
async def _on_app_message(self, message: Any, sender: str):
"""Handle incoming application messages."""
if self._input:
await self._input.push_app_message(message)
await self._call_event_handler("on_app_message", message)
await self._call_event_handler("on_app_message", message, sender)
async def _on_client_connected(self, webrtc_connection):
"""Handle client connection events."""

View File

@@ -221,6 +221,7 @@ class TavusTransportClient:
),
on_joined=self._on_joined,
on_left=self._on_left,
on_before_leave=partial(self._on_handle_callback, "on_before_leave"),
on_error=partial(self._on_handle_callback, "on_error"),
on_app_message=partial(self._on_handle_callback, "on_app_message"),
on_call_state_updated=partial(self._on_handle_callback, "on_call_state_updated"),

View File

@@ -0,0 +1,117 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import struct
import unittest
from pipecat.frames.frames import InputAudioRawFrame, OutputAudioRawFrame, StartFrame
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
class _PassthroughResampler:
async def resample(
self, audio: bytes, in_rate: int, out_rate: int
) -> bytes: # pragma: no cover - trivial
return audio
class TestAudioBufferProcessor(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.processor = AudioBufferProcessor(sample_rate=16000, num_channels=2, buffer_size=4)
self.processor._input_resampler = _PassthroughResampler()
self.processor._output_resampler = _PassthroughResampler()
self.processor._update_sample_rate(StartFrame(audio_out_sample_rate=16000))
await self.processor.start_recording()
async def asyncTearDown(self):
if getattr(self.processor, "_recording", False):
await self.processor.stop_recording()
await self.processor.cleanup()
async def test_flush_user_audio_pads_bot_track(self):
user_audio = struct.pack("<hh", 1000, -1000)
audio_event = asyncio.Event()
track_event = asyncio.Event()
captured = {}
async def on_audio_data(_, audio: bytes, sample_rate: int, num_channels: int):
captured["merged"] = (audio, sample_rate, num_channels)
audio_event.set()
async def on_track_audio_data(
_, user: bytes, bot: bytes, sample_rate: int, num_channels: int
):
captured["tracks"] = (user, bot, sample_rate, num_channels)
track_event.set()
self.processor.add_event_handler("on_audio_data", on_audio_data)
self.processor.add_event_handler("on_track_audio_data", on_track_audio_data)
frame = InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1)
await self.processor._process_recording(frame)
await asyncio.wait_for(audio_event.wait(), timeout=1)
await asyncio.wait_for(track_event.wait(), timeout=1)
merged_audio, merged_sr, merged_channels = captured["merged"]
user_track, bot_track, track_sr, track_channels = captured["tracks"]
self.assertEqual(merged_sr, 16000)
self.assertEqual(merged_channels, 2)
self.assertEqual(track_sr, 16000)
self.assertEqual(track_channels, 2)
self.assertEqual(user_track, user_audio)
self.assertEqual(bot_track, b"\x00" * len(user_audio))
self.assertEqual(len(merged_audio), len(user_audio) * 2)
self.assertEqual(merged_audio[0:2], user_audio[0:2])
self.assertEqual(merged_audio[2:4], b"\x00\x00")
self.assertEqual(merged_audio[4:6], user_audio[2:4])
self.assertEqual(merged_audio[6:8], b"\x00\x00")
self.assertEqual(len(self.processor._user_audio_buffer), 0)
self.assertEqual(len(self.processor._bot_audio_buffer), 0)
async def test_flush_bot_audio_pads_user_track(self):
bot_audio = struct.pack("<hh", -800, 400)
audio_event = asyncio.Event()
track_event = asyncio.Event()
captured = {}
async def on_audio_data(_, audio: bytes, sample_rate: int, num_channels: int):
captured["merged"] = (audio, sample_rate, num_channels)
audio_event.set()
async def on_track_audio_data(
_, user: bytes, bot: bytes, sample_rate: int, num_channels: int
):
captured["tracks"] = (user, bot, sample_rate, num_channels)
track_event.set()
self.processor.add_event_handler("on_audio_data", on_audio_data)
self.processor.add_event_handler("on_track_audio_data", on_track_audio_data)
frame = OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1)
await self.processor._process_recording(frame)
await asyncio.wait_for(audio_event.wait(), timeout=1)
await asyncio.wait_for(track_event.wait(), timeout=1)
merged_audio, merged_sr, merged_channels = captured["merged"]
user_track, bot_track, track_sr, track_channels = captured["tracks"]
self.assertEqual(merged_sr, 16000)
self.assertEqual(merged_channels, 2)
self.assertEqual(track_sr, 16000)
self.assertEqual(track_channels, 2)
self.assertEqual(user_track, b"\x00" * len(bot_audio))
self.assertEqual(bot_track, bot_audio)
self.assertEqual(len(merged_audio), len(bot_audio) * 2)
self.assertEqual(merged_audio[0:2], b"\x00\x00")
self.assertEqual(merged_audio[2:4], bot_audio[0:2])
self.assertEqual(merged_audio[4:6], b"\x00\x00")
self.assertEqual(merged_audio[6:8], bot_audio[2:4])
self.assertEqual(len(self.processor._user_audio_buffer), 0)
self.assertEqual(len(self.processor._bot_audio_buffer), 0)

12
uv.lock generated
View File

@@ -4484,7 +4484,7 @@ requires-dist = [
{ name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.0.0" },
{ name = "aiofiles", specifier = ">=24.1.0,<25" },
{ name = "aiohttp", specifier = ">=3.11.12,<4" },
{ name = "aiortc", marker = "extra == 'webrtc'", specifier = "~=1.13.0" },
{ name = "aiortc", marker = "extra == 'webrtc'", specifier = ">=1.13.0,<2" },
{ name = "anthropic", marker = "extra == 'anthropic'", specifier = "~=0.49.0" },
{ name = "audioop-lts", marker = "python_full_version >= '3.13'", specifier = "~=0.2.1" },
{ name = "aws-sdk-bedrock-runtime", marker = "python_full_version >= '3.12' and extra == 'aws-nova-sonic'", specifier = "~=0.0.2" },
@@ -4522,7 +4522,7 @@ requires-dist = [
{ name = "onnxruntime", marker = "extra == 'local-smart-turn-v3'", specifier = ">=1.20.1,<2" },
{ name = "onnxruntime", marker = "extra == 'silero'", specifier = ">=1.20.1,<2" },
{ name = "openai", specifier = ">=1.74.0,<=1.99.1" },
{ name = "opencv-python", marker = "extra == 'webrtc'", specifier = "~=4.11.0.86" },
{ name = "opencv-python", marker = "extra == 'webrtc'", specifier = ">=4.11.0.86,<5" },
{ name = "openpipe", marker = "extra == 'openpipe'", specifier = "~=4.50.0" },
{ name = "opentelemetry-api", marker = "extra == 'tracing'", specifier = ">=1.33.0" },
{ name = "opentelemetry-instrumentation", marker = "extra == 'tracing'", specifier = ">=0.54b0" },
@@ -4557,7 +4557,7 @@ requires-dist = [
{ name = "python-dotenv", marker = "extra == 'runner'", specifier = ">=1.0.0,<2.0.0" },
{ name = "pyvips", extras = ["binary"], marker = "extra == 'moondream'", specifier = "~=3.0.0" },
{ name = "resampy", specifier = "~=0.4.3" },
{ name = "sentry-sdk", marker = "extra == 'sentry'", specifier = "~=2.23.1" },
{ name = "sentry-sdk", marker = "extra == 'sentry'", specifier = ">=2.28.0,<3" },
{ name = "simli-ai", marker = "extra == 'simli'", specifier = "~=0.1.10" },
{ name = "soundfile", marker = "extra == 'soundfile'", specifier = "~=0.13.0" },
{ name = "soxr", specifier = "~=0.5.0" },
@@ -6389,15 +6389,15 @@ wheels = [
[[package]]
name = "sentry-sdk"
version = "2.23.1"
version = "2.38.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/60/fd/2c5f7161dbea1fa03381f139c443b4524f3a15d58e50c96a65d19f454ba2/sentry_sdk-2.23.1.tar.gz", hash = "sha256:2288320465065f3f056630ce55936426204f96f63f1208edb79e033ed03774db", size = 316248, upload-time = "2025-03-17T12:52:34.14Z" }
sdist = { url = "https://files.pythonhosted.org/packages/b2/22/60fd703b34d94d216b2387e048ac82de3e86b63bc28869fb076f8bb0204a/sentry_sdk-2.38.0.tar.gz", hash = "sha256:792d2af45e167e2f8a3347143f525b9b6bac6f058fb2014720b40b84ccbeb985", size = 348116, upload-time = "2025-09-15T15:00:37.846Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4e/00/9a9a2ab9020ee824d787f7e82a539305bf926393fe139baedbcf34356770/sentry_sdk-2.23.1-py2.py3-none-any.whl", hash = "sha256:42ef3a6cc1db3d22cb2ab24163d75b23f291ad9892b1a8c44075ce809a32b191", size = 336327, upload-time = "2025-03-17T12:52:32.176Z" },
{ url = "https://files.pythonhosted.org/packages/7a/84/bde4c4bbb269b71bc09316af8eb00da91f67814d40337cc12ef9c8742541/sentry_sdk-2.38.0-py2.py3-none-any.whl", hash = "sha256:2324aea8573a3fa1576df7fb4d65c4eb8d9929c8fa5939647397a07179eef8d0", size = 370346, upload-time = "2025-09-15T15:00:35.821Z" },
]
[[package]]