Compare commits

..

1 Commits

Author SHA1 Message Date
Mark Backman
4699dc2345 Improve TTSService handling of long LLM token outputs 2025-11-24 19:35:00 -05:00
13 changed files with 97 additions and 950 deletions

View File

@@ -9,8 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `LiveKitRESTHelper` utility class for managing LiveKit rooms via REST API.
- Added `DeepgramSageMakerSTTService` which connects to a SageMaker hosted
Deepgram STT model. Added `07c-interruptible-deepgram-sagemaker.py`
foundational example.
@@ -90,6 +88,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Improved `TTSService` to properly handle buffered sentences at the end of LLM
responses. Previously, when an LLM response ended, any complete sentences
remaining in the aggregator's buffer would be sent to TTS as one large
chunk. Now `TTSService` continues processing aggregation by repeatedly calling
`aggregate("")` until all buffered text has been processed, ensuring each
sentence is sent to TTS individually for better interruption points.
- Updated `daily-python` to 0.22.0.
- `BaseTextAggregator` changes:

View File

@@ -1,103 +0,0 @@
# TurnAwareTranscriptProcessor Example
## Overview
The `TurnAwareTranscriptProcessor` combines user and assistant transcript tracking with turn boundary detection. It correctly handles interruptions by only capturing what was actually spoken.
## Basic Usage
```python
from pipecat.processors.transcript_processor import TurnAwareTranscriptProcessor
# Create the processor
turn_processor = TurnAwareTranscriptProcessor()
# Register event handlers
@turn_processor.event_handler("on_turn_started")
async def handle_turn_started(processor, turn_number):
print(f"Turn {turn_number} started")
@turn_processor.event_handler("on_turn_ended")
async def handle_turn_ended(processor, turn_number, user_text, assistant_text, was_interrupted):
print(f"\nTurn {turn_number} ended:")
print(f" User said: {user_text}")
print(f" Assistant said: {assistant_text}")
print(f" Was interrupted: {was_interrupted}")
@turn_processor.event_handler("on_transcript_update")
async def handle_transcript_update(processor, frame):
for msg in frame.messages:
print(f"[{msg.role}]: {msg.content}")
# Add to pipeline
pipeline = Pipeline([
transport.input(),
stt,
turn_processor, # Process transcripts and track turns
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])
```
## Features
1. **Turn Boundary Detection**: Automatically detects when turns start and end based on user and bot speaking patterns
2. **Interruption Handling**: Correctly captures only what was actually spoken when interruptions occur
3. **Real-time Transcripts**: Emits transcript messages for both user and assistant speech
4. **Turn Events**: Provides start/end events with accumulated transcripts for each turn
## Events
### on_turn_started
Emitted when a new turn begins (user starts speaking).
**Handler signature**: `async def handler(processor, turn_number)`
### on_turn_ended
Emitted when a turn ends with accumulated transcripts.
**Handler signature**: `async def handler(processor, turn_number, user_transcript, assistant_transcript, was_interrupted)`
### on_transcript_update
Inherited from `BaseTranscriptProcessor`, emitted for individual transcript messages.
**Handler signature**: `async def handler(processor, frame)`
## Turn Logic
- Turns start when the user begins speaking (`UserStartedSpeakingFrame`)
- Turns end when:
- The user starts speaking again (previous turn ends, new turn starts)
- The bot is interrupted (`InterruptionFrame`)
- The pipeline ends (`EndFrame`/`CancelFrame`)
## Integration with OpenTelemetry
You can use turn events to enrich OpenTelemetry spans:
```python
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
turn_tracker = TurnTrackingObserver()
turn_tracer = TurnTraceObserver(turn_tracker)
turn_processor = TurnAwareTranscriptProcessor()
@turn_processor.event_handler("on_turn_ended")
async def add_transcripts_to_span(processor, turn_number, user_text, assistant_text, interrupted):
# Get current span and add transcript data
from opentelemetry import trace
current_span = trace.get_current_span()
if current_span:
current_span.set_attribute("turn.user_text", user_text)
current_span.set_attribute("turn.assistant_text", assistant_text)
```
## Notes
- The processor handles async frame processing correctly by delaying turn end until frames are processed
- Works with word-level timestamps from TTS services like Cartesia
- Accumulates both user (`TranscriptionFrame`) and assistant (`TTSTextFrame`) speech
- Emits individual transcript messages in addition to turn-level aggregation

View File

@@ -49,7 +49,7 @@ aic = [ "aic-sdk~=1.1.0" ]
anthropic = [ "anthropic~=0.49.0" ]
assemblyai = [ "pipecat-ai[websockets-base]" ]
asyncai = [ "pipecat-ai[websockets-base]" ]
aws = [ "aioboto3~=15.5.0", "pipecat-ai[websockets-base]" ]
aws = [ "aioboto3~=15.0.0", "pipecat-ai[websockets-base]" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.2.0; python_version>='3.12'" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
@@ -72,7 +72,7 @@ inworld = []
koala = [ "pvkoala~=2.0.3" ]
krisp = [ "pipecat-ai-krisp~=0.4.0" ]
langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-openai~=0.3.9" ]
livekit = [ "livekit~=1.0.13", "livekit-api~=1.0.5", "tenacity>=8.2.3,<10.0.0", "pyjwt>=2.10.1" ]
livekit = [ "livekit~=1.0.13", "livekit-api~=1.0.5", "tenacity>=8.2.3,<10.0.0" ]
lmnt = [ "pipecat-ai[websockets-base]" ]
local = [ "pyaudio~=0.2.14" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ]

View File

@@ -15,7 +15,6 @@ from typing import List, Optional
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
@@ -25,7 +24,6 @@ from pipecat.frames.frames import (
TranscriptionMessage,
TranscriptionUpdateFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
@@ -308,267 +306,3 @@ class TranscriptProcessor:
return handler
return decorator
class TurnAwareTranscriptProcessor(BaseTranscriptProcessor):
"""Processes transcripts with turn boundary awareness.
This processor combines user and assistant transcript tracking with turn
detection, emitting events when turns start and end. It correctly handles
interruptions by only capturing what was actually spoken.
Turn boundaries are detected based on:
- User started speaking (UserStartedSpeakingFrame)
- Bot stopped speaking (BotStoppedSpeakingFrame)
- Interruptions (InterruptionFrame)
Events:
on_turn_started: Emitted when a new turn begins.
Handler signature: async def handler(processor, turn_number)
on_turn_ended: Emitted when a turn ends.
Handler signature: async def handler(processor, turn_number,
user_transcript, assistant_transcript,
was_interrupted)
on_transcript_update: Inherited from BaseTranscriptProcessor, emitted for
individual transcript messages.
Example::
turn_processor = TurnAwareTranscriptProcessor()
@turn_processor.event_handler("on_turn_started")
async def handle_turn_started(processor, turn_number):
print(f"Turn {turn_number} started")
@turn_processor.event_handler("on_turn_ended")
async def handle_turn_ended(processor, turn_number, user_text, assistant_text, interrupted):
print(f"Turn {turn_number} ended")
print(f"User said: {user_text}")
print(f"Assistant said: {assistant_text}")
print(f"Was interrupted: {interrupted}")
pipeline = Pipeline([
transport.input(),
stt,
turn_processor,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])
"""
def __init__(self, **kwargs):
"""Initialize the turn-aware transcript processor.
Args:
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
# Turn tracking state
self._turn_number = 0
self._turn_active = False
self._turn_start_time: Optional[str] = None
# Accumulate text for current turn
self._current_turn_user_parts: List[TextPartForConcatenation] = []
self._current_turn_assistant_parts: List[TextPartForConcatenation] = []
# Track bot speaking state
self._bot_is_speaking = False
# Register turn events
self._register_event_handler("on_turn_started")
self._register_event_handler("on_turn_ended")
async def _start_turn(self):
"""Start a new turn."""
if not self._turn_active:
self._turn_number += 1
self._turn_active = True
self._turn_start_time = time_now_iso8601()
self._current_turn_user_parts = []
self._current_turn_assistant_parts = []
logger.debug(f"Turn {self._turn_number} started")
await self._call_event_handler("on_turn_started", self._turn_number)
async def _end_turn(self, was_interrupted: bool = False):
"""End the current turn and emit aggregated transcripts.
Args:
was_interrupted: Whether the turn ended due to an interruption.
"""
if not self._turn_active:
return
# Aggregate user text
user_transcript = ""
if self._current_turn_user_parts:
user_transcript = concatenate_aggregated_text(self._current_turn_user_parts)
# Aggregate assistant text
assistant_transcript = ""
if self._current_turn_assistant_parts:
assistant_transcript = concatenate_aggregated_text(self._current_turn_assistant_parts)
# Emit turn ended event
logger.debug(
f"Turn {self._turn_number} ended (interrupted={was_interrupted}). "
f"User: '{user_transcript}', Assistant: '{assistant_transcript}'"
)
await self._call_event_handler(
"on_turn_ended",
self._turn_number,
user_transcript,
assistant_transcript,
was_interrupted,
)
# Reset turn state
self._turn_active = False
self._current_turn_user_parts = []
self._current_turn_assistant_parts = []
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for turn-aware transcript tracking.
Handles:
- UserStartedSpeakingFrame: Start new turn
- TranscriptionFrame: Accumulate user speech and emit transcript message
- BotStartedSpeakingFrame: Track bot speaking state
- TTSTextFrame: Accumulate assistant speech
- BotStoppedSpeakingFrame: End turn if no interruption pending
- InterruptionFrame: End turn immediately as interrupted
- EndFrame/CancelFrame: End any active turn
Args:
frame: Input frame to process.
direction: Frame processing direction.
"""
await super().process_frame(frame, direction)
if isinstance(frame, UserStartedSpeakingFrame):
# User started speaking
if self._bot_is_speaking:
# This is an interruption - end the current turn with what was spoken
if self._current_turn_assistant_parts:
assistant_content = concatenate_aggregated_text(
self._current_turn_assistant_parts
)
if assistant_content:
message = TranscriptionMessage(
role="assistant",
content=assistant_content,
timestamp=self._turn_start_time or time_now_iso8601(),
)
await self._emit_update([message])
await self._end_turn(was_interrupted=True)
self._bot_is_speaking = False
elif self._turn_active:
# Previous turn is ending normally (bot finished speaking)
if self._current_turn_assistant_parts:
assistant_content = concatenate_aggregated_text(
self._current_turn_assistant_parts
)
if assistant_content:
message = TranscriptionMessage(
role="assistant",
content=assistant_content,
timestamp=self._turn_start_time or time_now_iso8601(),
)
await self._emit_update([message])
await self._end_turn(was_interrupted=False)
# Start a new turn
await self._start_turn()
await self.push_frame(frame, direction)
elif isinstance(frame, TranscriptionFrame):
# Accumulate user speech for the current turn
if self._turn_active:
self._current_turn_user_parts.append(
TextPartForConcatenation(frame.text, includes_inter_part_spaces=True)
)
# Also emit individual transcript message
message = TranscriptionMessage(
role="user",
user_id=frame.user_id,
content=frame.text,
timestamp=frame.timestamp,
)
await self._emit_update([message])
await self.push_frame(frame, direction)
elif isinstance(frame, BotStartedSpeakingFrame):
# Bot started speaking
self._bot_is_speaking = True
await self.push_frame(frame, direction)
elif isinstance(frame, TTSTextFrame):
# Accumulate assistant speech for the current turn
if self._turn_active:
self._current_turn_assistant_parts.append(
TextPartForConcatenation(
frame.text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
await self.push_frame(frame, direction)
elif isinstance(frame, BotStoppedSpeakingFrame):
# Bot stopped speaking - just mark it, don't end turn yet
# Turn will end when next user speaks or pipeline ends
self._bot_is_speaking = False
await self.push_frame(frame, direction)
elif isinstance(frame, InterruptionFrame):
# Emit assistant transcript message with what was spoken before interruption
if self._current_turn_assistant_parts:
assistant_content = concatenate_aggregated_text(self._current_turn_assistant_parts)
if assistant_content:
message = TranscriptionMessage(
role="assistant",
content=assistant_content,
timestamp=self._turn_start_time or time_now_iso8601(),
)
await self._emit_update([message])
# Push frame first to ensure proper cleanup
await self.push_frame(frame, direction)
# End turn as interrupted
await self._end_turn(was_interrupted=True)
self._bot_is_speaking = False
elif isinstance(frame, (EndFrame, CancelFrame)):
# Pipeline ending - finalize any active turn
if self._turn_active:
# Emit any pending assistant transcript (allow time for TTSTextFrames to be processed)
# Give a brief moment for any pending frames to process
import asyncio
await asyncio.sleep(0.001)
if self._current_turn_assistant_parts:
assistant_content = concatenate_aggregated_text(
self._current_turn_assistant_parts
)
if assistant_content:
message = TranscriptionMessage(
role="assistant",
content=assistant_content,
timestamp=self._turn_start_time or time_now_iso8601(),
)
await self._emit_update([message])
await self._end_turn(was_interrupted=isinstance(frame, CancelFrame))
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)

View File

@@ -8,7 +8,6 @@ import sys
from pipecat.services import DeprecatedModuleProxy
from .agent_core import *
from .llm import *
from .nova_sonic import *
from .sagemaker import *

View File

@@ -1,258 +0,0 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""AWS AgentCore Processor Module.
This module defines the AWSAgentCoreProcessor, which invokes agents hosted on
Amazon Bedrock AgentCore Runtime and streams their responses as LLMTextFrames.
"""
import asyncio
import json
import os
from typing import Callable, Optional
import aioboto3
from loguru import logger
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
def default_context_to_payload_transformer(
context: LLMContext | OpenAILLMContext,
) -> Optional[str]:
"""Default transformer to create AgentCore payload from LLM context.
Extracts the latest user or system message text and wraps it in {"prompt": "<text>"}.
Args:
context: The LLM context containing conversation messages.
Returns:
A JSON string payload for AgentCore, or None if no valid message found.
"""
messages = context.messages
if not messages:
return None
last_message = messages[-1]
if isinstance(last_message, LLMSpecificMessage) or last_message.get("role") not in (
"user",
"system",
):
return None
content = last_message.get("content")
if not content:
return None
if isinstance(content, str):
prompt = content
elif isinstance(content, list):
prompt = " ".join([part.get("text", "") for part in content])
else:
return None
return json.dumps({"prompt": prompt})
def default_response_to_output_transformer(response_line: str) -> Optional[str]:
"""Default transformer to extract output text from AgentCore response.
Expects responses with {"response": "<text>"} format.
Args:
response_line: The raw response line from AgentCore (without "data: " prefix).
Returns:
The extracted output text, or None if no text found.
"""
response_json = json.loads(response_line)
return response_json.get("response")
class AWSAgentCoreProcessor(FrameProcessor):
"""Processor that runs an Amazon Bedrock AgentCore agent.
Input:
- LLMContextFrame: Supplies a context used to invoke the agent.
Output:
- LLMTextFrame: The agent's text response(s).
A single agent invocation may result in multiple text frames.
This processor transforms the input context to a payload for the AgentCore
agent, and transforms the agent's response(s) into output text frame(s). Both
mappings are configurable via transformers. Below is the default behavior.
Input transformer (context_to_payload_transformer):
- Grabs the latest user or system message (if it's the latest message)
- Extracts its text content
- Constructs a payload that looks like {"prompt": "<text>"}
Output transformer (response_to_output_transformer):
- Expects responses that look like {"response": "<text>"}
- Extracts the text for use in the LLMTextFrame(s)
"""
def __init__(
self,
agentArn: str,
aws_access_key: Optional[str] = None,
aws_secret_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
aws_region: Optional[str] = None,
context_to_payload_transformer: Optional[
Callable[[LLMContext | OpenAILLMContext], Optional[str]]
] = None,
response_to_output_transformer: Optional[Callable[[str], Optional[str]]] = None,
**kwargs,
):
"""Initialize the AWS AgentCore processor.
Args:
agentArn: The Amazon Web Services Resource Name (ARN) of the agent.
aws_access_key: AWS access key ID. If None, uses default credentials.
aws_secret_key: AWS secret access key. If None, uses default credentials.
aws_session_token: AWS session token for temporary credentials.
aws_region: AWS region.
context_to_payload_transformer: Optional callable to transform
LLMContext into AgentCore payload string. If None, uses
default_context_to_payload_transformer.
response_to_output_transformer: Optional callable to extract output text
from AgentCore response. If None, uses
default_response_to_output_transformer.
**kwargs: Additional arguments passed to parent FrameProcessor.
"""
super().__init__(**kwargs)
self._agentArn = agentArn
self._aws_session = aioboto3.Session()
# Store AWS session parameters for creating client in async context
self._aws_params = {
"aws_access_key_id": aws_access_key or os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": aws_secret_key or os.getenv("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": aws_session_token or os.getenv("AWS_SESSION_TOKEN"),
"region_name": aws_region or os.getenv("AWS_REGION", "us-east-1"),
}
# Set transformers with defaults
self._context_to_payload_transformer = (
context_to_payload_transformer or default_context_to_payload_transformer
)
self._response_to_output_transformer = (
response_to_output_transformer or default_response_to_output_transformer
)
# State for managing output response bookends
self._output_response_open = False
self._last_text_frame_time: Optional[float] = None
self._close_task: Optional[asyncio.Task] = None
self._output_response_timeout = 1.0 # seconds
async def _close_output_response_after_timeout(self):
"""Close the output response after timeout if no new text frames arrive."""
await asyncio.sleep(self._output_response_timeout)
if self._output_response_open:
self._output_response_open = False
await self.push_frame(LLMFullResponseEndFrame())
async def _push_text_frame(self, text: str):
"""Push a text frame, managing output response bookends."""
# Cancel any pending close task
if self._close_task and not self._close_task.done():
await self.cancel_task(self._close_task)
# Open output response if needed
if not self._output_response_open:
await self.push_frame(LLMFullResponseStartFrame())
self._output_response_open = True
# Push the text frame
await self.push_frame(LLMTextFrame(text))
self._last_text_frame_time = asyncio.get_event_loop().time()
# Schedule closing the output response after timeout
self._close_task = self.create_task(self._close_output_response_after_timeout())
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle LLM message frames.
Args:
frame: The incoming frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
# Create payload to invoke AgentCore agent
payload = self._context_to_payload_transformer(frame.context)
if not payload:
return
async with self._aws_session.client("bedrock-agentcore", **self._aws_params) as client:
# Invoke the AgentCore agent
response = await client.invoke_agent_runtime(
agentRuntimeArn=self._agentArn, payload=payload.encode()
)
# Determine if this is a streamed multi-part response, which
# will affect our parsing
is_multi_part_response = "text/event-stream" in response.get("contentType", "")
# Handle each response part (there may be one, for single
# responses, or multiple, for streamed multi-part responses)
async for part in response.get("response", []):
part_string = part.decode("utf-8")
# In streamed multi-part responses, each part might have
# one or more lines, each of which starts with "data: ".
# Treat each line as a response.
if is_multi_part_response:
for line in part_string.split("\n"):
# Get response text from this line
if not line:
continue
if not line.startswith("data: "):
logger.warning(f"Expected line to start with 'data: ', got: {line}")
continue
line = line[6:] # omit "data: "
# Transform response line to output text
text = self._response_to_output_transformer(line)
if text:
await self._push_text_frame(text)
# In single-part responses, the whole part is one response
# and there's no "data: " prefix
else:
# Transform response part string to output text
text = self._response_to_output_transformer(part_string)
if text:
await self._push_text_frame(text)
# Final close if output response is still open after all parts processed
if self._output_response_open:
if self._close_task and not self._close_task.done():
await self.cancel_task(self._close_task)
self._output_response_open = False
await self.push_frame(LLMFullResponseEndFrame())
else:
await self.push_frame(frame, direction)

View File

@@ -10,6 +10,7 @@ from pipecat.services import DeprecatedModuleProxy
from .flux import *
from .stt import *
from .stt_sagemaker import *
from .tts import *
sys.modules[__name__] = DeprecatedModuleProxy(globals(), "deepgram", "deepgram.[stt,tts]")

View File

@@ -183,14 +183,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
"""
await self._connect_websocket()
# Creating the receiver task (only created once during initial connection)
if not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
# Creating the watchdog task (only created once during initial connection)
if not self._watchdog_task:
self._watchdog_task = self.create_task(self._watchdog_task_handler())
async def _disconnect(self):
"""Disconnect from WebSocket and clean up tasks.
@@ -243,6 +235,16 @@ class DeepgramFluxSTTService(WebsocketSTTService):
additional_headers={"Authorization": f"Token {self._api_key}"},
)
# Creating the receiver task
if not self._receive_task:
self._receive_task = self.create_task(
self._receive_task_handler(self._report_error)
)
# Creating the watchdog task
if not self._watchdog_task:
self._watchdog_task = self.create_task(self._watchdog_task_handler())
# Now wait for the connection established event
logger.debug("WebSocket connected, waiting for server confirmation...")
await self._connection_established_event.wait()

View File

@@ -425,16 +425,25 @@ class TTSService(AIService):
# pause to avoid audio overlapping.
await self._maybe_pause_frame_processing()
pending_aggregation = self._text_aggregator.text
# Drain any remaining complete aggregation units from the aggregator
# by repeatedly calling aggregate("") until nothing is left
pending_aggregation = await self._text_aggregator.aggregate("")
while pending_aggregation:
await self._push_tts_frames(
AggregatedTextFrame(pending_aggregation.text, pending_aggregation.type)
)
pending_aggregation = await self._text_aggregator.aggregate("")
# After draining all complete units, get any remaining partial text
remaining_text = self._text_aggregator.text
if remaining_text.text:
await self._push_tts_frames(
AggregatedTextFrame(remaining_text.text, remaining_text.type)
)
# Reset aggregator state
await self._text_aggregator.reset()
self._processing_text = False
if pending_aggregation.text:
await self._push_tts_frames(
AggregatedTextFrame(pending_aggregation.text, pending_aggregation.type)
)
if isinstance(frame, LLMFullResponseEndFrame):
if self._push_text_frames:
await self.push_frame(frame, direction)

View File

@@ -1,96 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""LiveKit REST Helpers.
Methods that wrap the LiveKit API for room management.
"""
import aiohttp
class LiveKitRESTHelper:
"""Helper class for interacting with LiveKit's REST API.
Provides methods for managing LiveKit rooms.
"""
def __init__(
self,
*,
api_key: str,
api_secret: str,
api_url: str = "https://your-livekit-host.com",
aiohttp_session: aiohttp.ClientSession,
):
"""Initialize the LiveKit REST helper.
Args:
api_key: Your LiveKit API key.
api_secret: Your LiveKit API secret.
api_url: LiveKit server URL (e.g. "https://your-livekit-host.com").
aiohttp_session: Async HTTP session for making requests.
"""
self.api_key = api_key
self.api_secret = api_secret
self.api_url = api_url.rstrip("/")
self.aiohttp_session = aiohttp_session
def _create_access_token(self, room_create: bool = True) -> str:
"""Create a signed access token for LiveKit API authentication.
Args:
room_create: Whether to grant roomCreate permission.
Returns:
Signed JWT access token.
"""
import time
import jwt
claims = {
"iss": self.api_key,
"sub": self.api_key,
"nbf": int(time.time()),
"exp": int(time.time()) + 60, # Token valid for 60 seconds
"video": {
"roomCreate": room_create,
},
}
return jwt.encode(claims, self.api_secret, algorithm="HS256")
async def delete_room_by_name(self, room_name: str) -> bool:
"""Delete a LiveKit room by name.
This will forcibly disconnect all participants currently in the room.
Args:
room_name: Name of the room to delete.
Returns:
True if deletion was successful.
Raises:
Exception: If deletion fails.
"""
token = self._create_access_token(room_create=True)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
async with self.aiohttp_session.post(
f"{self.api_url}/twirp/livekit.RoomService/DeleteRoom",
headers=headers,
json={"room": room_name},
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Failed to delete room [{room_name}] (status: {r.status}): {text}")
return True

View File

@@ -33,3 +33,48 @@ class TestSimpleTextAggregator(unittest.IsolatedAsyncioTestCase):
assert self.aggregator.text.text == "How are"
aggregate = await self.aggregator.aggregate("you?")
assert aggregate.text == "How are you?"
async def test_word_by_word(self):
"""Test word-by-word token aggregation (e.g., OpenAI)."""
assert await self.aggregator.aggregate("Hello") == None
aggregate = await self.aggregator.aggregate("!")
assert aggregate.text == "Hello!"
assert await self.aggregator.aggregate(" I") == None
assert await self.aggregator.aggregate(" am") == None
aggregate = await self.aggregator.aggregate(" Doug.")
assert aggregate.text == "I am Doug."
assert self.aggregator.text.text == ""
async def test_chunks_with_partial_sentences(self):
"""Test chunks with partial sentences."""
aggregate = await self.aggregator.aggregate("Hey!")
assert aggregate.text == "Hey!"
aggregate = await self.aggregator.aggregate(" Nice to meet you! So")
assert aggregate.text == "Nice to meet you!"
assert self.aggregator.text.text == "So"
assert await self.aggregator.aggregate(" what") == None
aggregate = await self.aggregator.aggregate("'d you like?")
assert aggregate.text == "So what'd you like?"
async def test_multi_sentence_chunk(self):
"""Test chunks with multiple complete sentences."""
aggregate = await self.aggregator.aggregate("Hello! I am Doug. Nice to meet you!")
assert aggregate.text == "Hello!"
# Drain remaining sentences by calling aggregate("")
aggregate = await self.aggregator.aggregate("")
assert aggregate.text == "I am Doug."
aggregate = await self.aggregator.aggregate("")
assert aggregate.text == "Nice to meet you!"
assert await self.aggregator.aggregate("") == None
assert self.aggregator.text.text == ""
async def test_aggregate_empty_with_incomplete(self):
"""Test aggregate('') with incomplete sentence in buffer."""
aggregate = await self.aggregator.aggregate("Hello! I am")
assert aggregate.text == "Hello!"
assert await self.aggregator.aggregate("") == None
assert self.aggregator.text.text == "I am"
async def test_aggregate_empty_buffer(self):
"""Test aggregate('') with empty buffer."""
assert await self.aggregator.aggregate("") == None

View File

@@ -1,189 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.frames.frames import (
AggregationType,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
InterruptionFrame,
TranscriptionFrame,
TranscriptionUpdateFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
)
from pipecat.processors.transcript_processor import TurnAwareTranscriptProcessor
from pipecat.tests.utils import SleepFrame, run_test
class TestTurnAwareTranscriptProcessor(unittest.IsolatedAsyncioTestCase):
"""Tests for TurnAwareTranscriptProcessor."""
async def test_basic_turn_flow(self):
"""Test basic turn start/end with user and assistant speech."""
processor = TurnAwareTranscriptProcessor()
# Track events
turn_started_calls = []
turn_ended_calls = []
@processor.event_handler("on_turn_started")
async def on_turn_started(proc, turn_number):
turn_started_calls.append(turn_number)
@processor.event_handler("on_turn_ended")
async def on_turn_ended(proc, turn_number, user_text, assistant_text, interrupted):
turn_ended_calls.append(
{
"turn_number": turn_number,
"user_text": user_text,
"assistant_text": assistant_text,
"interrupted": interrupted,
}
)
frames_to_send = [
# Turn 1: User speaks, bot responds
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Hello", user_id="user1", timestamp=""),
SleepFrame(sleep=0.01), # Allow transcription to process
BotStartedSpeakingFrame(),
TTSTextFrame(text="Hi", aggregated_by=AggregationType.WORD),
TTSTextFrame(text=" there", aggregated_by=AggregationType.WORD),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.1),
]
await run_test(processor, frames_to_send=frames_to_send)
# Verify events
self.assertEqual(
len(turn_started_calls), 1, f"Expected 1 turn started, got {len(turn_started_calls)}"
)
self.assertEqual(turn_started_calls[0], 1)
self.assertEqual(
len(turn_ended_calls), 1, f"Expected 1 turn ended, got {len(turn_ended_calls)}"
)
self.assertEqual(turn_ended_calls[0]["turn_number"], 1)
self.assertEqual(turn_ended_calls[0]["user_text"], "Hello")
self.assertEqual(turn_ended_calls[0]["assistant_text"], "Hi there")
self.assertFalse(turn_ended_calls[0]["interrupted"])
async def test_interruption(self):
"""Test turn ending on interruption."""
processor = TurnAwareTranscriptProcessor()
# Track events
turn_ended_calls = []
@processor.event_handler("on_turn_ended")
async def on_turn_ended(proc, turn_number, user_text, assistant_text, interrupted):
turn_ended_calls.append(
{
"turn_number": turn_number,
"user_text": user_text,
"assistant_text": assistant_text,
"interrupted": interrupted,
}
)
frames_to_send = [
# User speaks
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Tell me", user_id="user1", timestamp=""),
SleepFrame(sleep=0.01), # Allow transcription to process
# Bot starts responding
BotStartedSpeakingFrame(),
TTSTextFrame(text="Sure", aggregated_by=AggregationType.WORD),
TTSTextFrame(text=" I", aggregated_by=AggregationType.WORD),
TTSTextFrame(text=" can", aggregated_by=AggregationType.WORD),
# User interrupts
InterruptionFrame(),
# New turn starts
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Wait", user_id="user1", timestamp=""),
SleepFrame(sleep=0.1),
]
await run_test(processor, frames_to_send=frames_to_send)
# Verify first turn was interrupted
self.assertGreaterEqual(
len(turn_ended_calls), 1, f"Expected at least 1 turn ended, got {len(turn_ended_calls)}"
)
first_turn = turn_ended_calls[0]
self.assertEqual(first_turn["user_text"], "Tell me")
# Note: In this test flow, InterruptionFrame arrives before TTSTextFrames are processed,
# so assistant text may be empty. In real scenarios, word timestamps ensure proper capture.
self.assertIn(first_turn["assistant_text"], ["", "Sure I can", "Sure I can"])
self.assertTrue(first_turn["interrupted"])
async def test_multiple_turns(self):
"""Test multiple back-and-forth turns."""
processor = TurnAwareTranscriptProcessor()
# Track events
turn_started_calls = []
turn_ended_calls = []
@processor.event_handler("on_turn_started")
async def on_turn_started(proc, turn_number):
turn_started_calls.append(turn_number)
@processor.event_handler("on_turn_ended")
async def on_turn_ended(proc, turn_number, user_text, assistant_text, interrupted):
turn_ended_calls.append(
{
"turn_number": turn_number,
"user_text": user_text,
"assistant_text": assistant_text,
}
)
frames_to_send = [
# Turn 1
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Hi", user_id="user1", timestamp=""),
SleepFrame(sleep=0.01), # Allow transcription to process
BotStartedSpeakingFrame(),
TTSTextFrame(text="Hello", aggregated_by=AggregationType.WORD),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.05),
# Turn 2
UserStartedSpeakingFrame(),
TranscriptionFrame(text="How are you", user_id="user1", timestamp=""),
SleepFrame(sleep=0.01), # Allow transcription to process
BotStartedSpeakingFrame(),
TTSTextFrame(text="I'm", aggregated_by=AggregationType.WORD),
TTSTextFrame(text=" good", aggregated_by=AggregationType.WORD),
BotStoppedSpeakingFrame(),
SleepFrame(sleep=0.1),
]
await run_test(processor, frames_to_send=frames_to_send)
# Verify multiple turns
self.assertEqual(
len(turn_started_calls), 2, f"Expected 2 turns started, got {len(turn_started_calls)}"
)
self.assertEqual(turn_started_calls, [1, 2])
self.assertEqual(
len(turn_ended_calls), 2, f"Expected 2 turns ended, got {len(turn_ended_calls)}"
)
self.assertEqual(turn_ended_calls[0]["turn_number"], 1)
self.assertEqual(turn_ended_calls[0]["user_text"], "Hi")
self.assertEqual(turn_ended_calls[0]["assistant_text"], "Hello")
self.assertEqual(turn_ended_calls[1]["turn_number"], 2)
self.assertEqual(turn_ended_calls[1]["user_text"], "How are you")
self.assertEqual(turn_ended_calls[1]["assistant_text"], "I'm good")
if __name__ == "__main__":
unittest.main()

36
uv.lock generated
View File

@@ -45,20 +45,20 @@ sdist = { url = "https://files.pythonhosted.org/packages/99/83/bf38b95d98c67b8eb
[[package]]
name = "aioboto3"
version = "15.5.0"
version = "15.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiobotocore", extra = ["boto3"] },
{ name = "aiofiles" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a2/01/92e9ab00f36e2899315f49eefcd5b4685fbb19016c7f19a9edf06da80bb0/aioboto3-15.5.0.tar.gz", hash = "sha256:ea8d8787d315594842fbfcf2c4dce3bac2ad61be275bc8584b2ce9a3402a6979", size = 255069, upload-time = "2025-10-30T13:37:16.122Z" }
sdist = { url = "https://files.pythonhosted.org/packages/80/d0/ed107e16551ba1b93ddcca9a6bf79580450945268a8bc396530687b3189f/aioboto3-15.0.0.tar.gz", hash = "sha256:dce40b701d1f8e0886dc874d27cd9799b8bf6b32d63743f57e7bef7e4a562756", size = 225278, upload-time = "2025-06-26T16:30:48.967Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/3e/e8f5b665bca646d43b916763c901e00a07e40f7746c9128bdc912a089424/aioboto3-15.5.0-py3-none-any.whl", hash = "sha256:cc880c4d6a8481dd7e05da89f41c384dbd841454fc1998ae25ca9c39201437a6", size = 35913, upload-time = "2025-10-30T13:37:14.549Z" },
{ url = "https://files.pythonhosted.org/packages/bf/95/d69c744f408e5e4592fe53ed98fc244dd13b83d84cf1f83b2499d98bfcc9/aioboto3-15.0.0-py3-none-any.whl", hash = "sha256:9cf54b3627c8b34bb82eaf43ab327e7027e37f92b1e10dd5cfe343cd512568d0", size = 35785, upload-time = "2025-06-26T16:30:47.444Z" },
]
[[package]]
name = "aiobotocore"
version = "2.25.1"
version = "2.23.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiohttp" },
@@ -69,9 +69,9 @@ dependencies = [
{ name = "python-dateutil" },
{ name = "wrapt" },
]
sdist = { url = "https://files.pythonhosted.org/packages/62/94/2e4ec48cf1abb89971cb2612d86f979a6240520f0a659b53a43116d344dc/aiobotocore-2.25.1.tar.gz", hash = "sha256:ea9be739bfd7ece8864f072ec99bb9ed5c7e78ebb2b0b15f29781fbe02daedbc", size = 120560, upload-time = "2025-10-28T22:33:21.787Z" }
sdist = { url = "https://files.pythonhosted.org/packages/9d/25/4b06ea1214ddf020a28df27dc7136ac9dfaf87929d51e6f6044dd350ed67/aiobotocore-2.23.0.tar.gz", hash = "sha256:0333931365a6c7053aee292fe6ef50c74690c4ae06bb019afdf706cb6f2f5e32", size = 115825, upload-time = "2025-06-12T23:46:38.055Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/95/2a/d275ec4ce5cd0096665043995a7d76f5d0524853c76a3d04656de49f8808/aiobotocore-2.25.1-py3-none-any.whl", hash = "sha256:eb6daebe3cbef5b39a0bb2a97cffbe9c7cb46b2fcc399ad141f369f3c2134b1f", size = 86039, upload-time = "2025-10-28T22:33:19.949Z" },
{ url = "https://files.pythonhosted.org/packages/ea/43/ccf9b29669cdb09fd4bfc0a8effeb2973b22a0f3c3be4142d0b485975d11/aiobotocore-2.23.0-py3-none-any.whl", hash = "sha256:8202cebbf147804a083a02bc282fbfda873bfdd0065fd34b64784acb7757b66e", size = 84161, upload-time = "2025-06-12T23:46:36.305Z" },
]
[package.optional-dependencies]
@@ -620,30 +620,30 @@ wheels = [
[[package]]
name = "boto3"
version = "1.40.61"
version = "1.38.27"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "botocore" },
{ name = "jmespath" },
{ name = "s3transfer" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ed/f9/6ef8feb52c3cce5ec3967a535a6114b57ac7949fd166b0f3090c2b06e4e5/boto3-1.40.61.tar.gz", hash = "sha256:d6c56277251adf6c2bdd25249feae625abe4966831676689ff23b4694dea5b12", size = 111535, upload-time = "2025-10-28T19:26:57.247Z" }
sdist = { url = "https://files.pythonhosted.org/packages/e7/96/fc74d8521d2369dd8c412438401ff12e1350a1cd3eab5c758ed3dd5e5f82/boto3-1.38.27.tar.gz", hash = "sha256:94bd7fdd92d5701b362d4df100d21e28f8307a67ff56b6a8b0398119cf22f859", size = 111875, upload-time = "2025-05-30T19:32:41.352Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/61/24/3bf865b07d15fea85b63504856e137029b6acbc73762496064219cdb265d/boto3-1.40.61-py3-none-any.whl", hash = "sha256:6b9c57b2a922b5d8c17766e29ed792586a818098efe84def27c8f582b33f898c", size = 139321, upload-time = "2025-10-28T19:26:55.007Z" },
{ url = "https://files.pythonhosted.org/packages/43/8b/b2361188bd1e293eede1bc165e2461d390394f71ec0c8c21211c8dabf62c/boto3-1.38.27-py3-none-any.whl", hash = "sha256:95f5fe688795303a8a15e8b7e7f255cadab35eae459d00cc281a4fd77252ea80", size = 139938, upload-time = "2025-05-30T19:32:38.006Z" },
]
[[package]]
name = "botocore"
version = "1.40.61"
version = "1.38.27"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jmespath" },
{ name = "python-dateutil" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/28/a3/81d3a47c2dbfd76f185d3b894f2ad01a75096c006a2dd91f237dca182188/botocore-1.40.61.tar.gz", hash = "sha256:a2487ad69b090f9cccd64cf07c7021cd80ee9c0655ad974f87045b02f3ef52cd", size = 14393956, upload-time = "2025-10-28T19:26:46.108Z" }
sdist = { url = "https://files.pythonhosted.org/packages/36/5e/67899214ad57f7f26af5bd776ac5eb583dc4ecf5c1e52e2cbfdc200e487a/botocore-1.38.27.tar.gz", hash = "sha256:9788f7efe974328a38cbade64cc0b1e67d27944b899f88cb786ae362973133b6", size = 13919963, upload-time = "2025-05-30T19:32:29.657Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/38/c5/f6ce561004db45f0b847c2cd9b19c67c6bf348a82018a48cb718be6b58b0/botocore-1.40.61-py3-none-any.whl", hash = "sha256:17ebae412692fd4824f99cde0f08d50126dc97954008e5ba2b522eb049238aa7", size = 14055973, upload-time = "2025-10-28T19:26:42.15Z" },
{ url = "https://files.pythonhosted.org/packages/7e/83/a753562020b69fa90cebc39e8af2c753b24dcdc74bee8355ee3f6cefdf34/botocore-1.38.27-py3-none-any.whl", hash = "sha256:a785d5e9a5eda88ad6ab9ed8b87d1f2ac409d0226bba6ff801c55359e94d91a8", size = 13580545, upload-time = "2025-05-30T19:32:26.712Z" },
]
[[package]]
@@ -4522,7 +4522,6 @@ langchain = [
livekit = [
{ name = "livekit" },
{ name = "livekit-api" },
{ name = "pyjwt" },
{ name = "tenacity" },
]
lmnt = [
@@ -4666,7 +4665,7 @@ docs = [
requires-dist = [
{ name = "accelerate", marker = "extra == 'moondream'", specifier = "~=1.10.0" },
{ name = "aic-sdk", marker = "extra == 'aic'", specifier = "~=1.1.0" },
{ name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.5.0" },
{ name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.0.0" },
{ name = "aiofiles", specifier = ">=24.1.0,<25" },
{ name = "aiohttp", specifier = ">=3.11.12,<4" },
{ name = "aiortc", marker = "extra == 'webrtc'", specifier = ">=1.13.0,<2" },
@@ -4740,7 +4739,6 @@ requires-dist = [
{ name = "pyaudio", marker = "extra == 'local'", specifier = "~=0.2.14" },
{ name = "pydantic", specifier = ">=2.10.6,<3" },
{ name = "pygobject", marker = "extra == 'gstreamer'", specifier = "~=3.50.0" },
{ name = "pyjwt", marker = "extra == 'livekit'", specifier = ">=2.10.1" },
{ name = "pyloudnorm", specifier = "~=0.1.1" },
{ name = "python-dotenv", marker = "extra == 'runner'", specifier = ">=1.0.0,<2.0.0" },
{ name = "pyvips", extras = ["binary"], marker = "extra == 'moondream'", specifier = "~=3.0.0" },
@@ -4765,7 +4763,7 @@ requires-dist = [
{ name = "wait-for2", marker = "python_full_version < '3.12'", specifier = ">=0.4.1" },
{ name = "websockets", marker = "extra == 'websockets-base'", specifier = ">=13.1,<16.0" },
]
provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "cerebras", "daily", "deepgram", "deepseek", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "koala", "krisp", "langchain", "livekit", "lmnt", "local", "local-smart-turn", "local-smart-turn-v3", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "neuphonic", "nim", "noisereduce", "openai", "openpipe", "openrouter", "perplexity", "playht", "qwen", "remote-smart-turn", "rime", "riva", "runner", "sagemaker", "sambanova", "sarvam", "sentry", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"]
provides-extras = ["aic", "anthropic", "assemblyai", "asyncai", "aws", "aws-nova-sonic", "azure", "cartesia", "cerebras", "deepseek", "daily", "deepgram", "elevenlabs", "fal", "fireworks", "fish", "gladia", "google", "grok", "groq", "gstreamer", "heygen", "hume", "inworld", "krisp", "koala", "langchain", "livekit", "lmnt", "local", "mcp", "mem0", "mistral", "mlx-whisper", "moondream", "nim", "neuphonic", "noisereduce", "openai", "openpipe", "openrouter", "perplexity", "playht", "qwen", "rime", "riva", "runner", "sambanova", "sarvam", "sentry", "local-smart-turn", "local-smart-turn-v3", "remote-smart-turn", "sagemaker", "silero", "simli", "soniox", "soundfile", "speechmatics", "strands", "tavus", "together", "tracing", "ultravox", "webrtc", "websocket", "websockets-base", "whisper"]
[package.metadata.requires-dev]
dev = [
@@ -6222,14 +6220,14 @@ wheels = [
[[package]]
name = "s3transfer"
version = "0.14.0"
version = "0.13.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "botocore" },
]
sdist = { url = "https://files.pythonhosted.org/packages/62/74/8d69dcb7a9efe8baa2046891735e5dfe433ad558ae23d9e3c14c633d1d58/s3transfer-0.14.0.tar.gz", hash = "sha256:eff12264e7c8b4985074ccce27a3b38a485bb7f7422cc8046fee9be4983e4125", size = 151547, upload-time = "2025-09-09T19:23:31.089Z" }
sdist = { url = "https://files.pythonhosted.org/packages/6d/05/d52bf1e65044b4e5e27d4e63e8d1579dbdec54fce685908ae09bc3720030/s3transfer-0.13.1.tar.gz", hash = "sha256:c3fdba22ba1bd367922f27ec8032d6a1cf5f10c934fb5d68cf60fd5a23d936cf", size = 150589, upload-time = "2025-07-18T19:22:42.31Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/48/f0/ae7ca09223a81a1d890b2557186ea015f6e0502e9b8cb8e1813f1d8cfa4e/s3transfer-0.14.0-py3-none-any.whl", hash = "sha256:ea3b790c7077558ed1f02a3072fb3cb992bbbd253392f4b6e9e8976941c7d456", size = 85712, upload-time = "2025-09-09T19:23:30.041Z" },
{ url = "https://files.pythonhosted.org/packages/6d/4f/d073e09df851cfa251ef7840007d04db3293a0482ce607d2b993926089be/s3transfer-0.13.1-py3-none-any.whl", hash = "sha256:a981aa7429be23fe6dfc13e80e4020057cbab622b08c0315288758d67cabc724", size = 85308, upload-time = "2025-07-18T19:22:40.947Z" },
]
[[package]]