From 2a60d54830ee04e9686f0a01f7f28ed7a57336f5 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 20 Jan 2025 14:09:08 -0500 Subject: [PATCH 1/5] Update the AssistantTranscriptProcessor to use TTSTextFrames in place of OpenAILLMContextFrames --- .../processors/transcript_processor.py | 135 +++++++++--------- 1 file changed, 66 insertions(+), 69 deletions(-) diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py index 5379e22e6..47e73307e 100644 --- a/src/pipecat/processors/transcript_processor.py +++ b/src/pipecat/processors/transcript_processor.py @@ -4,16 +4,21 @@ # SPDX-License-Identifier: BSD 2-Clause License # +from datetime import datetime, timezone from typing import List +from loguru import logger + from pipecat.frames.frames import ( + BotStoppedSpeakingFrame, + EndFrame, Frame, - OpenAILLMContextAssistantTimestampFrame, + StartInterruptionFrame, TranscriptionFrame, TranscriptionMessage, TranscriptionUpdateFrame, + TTSTextFrame, ) -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -64,89 +69,81 @@ class UserTranscriptProcessor(BaseTranscriptProcessor): class AssistantTranscriptProcessor(BaseTranscriptProcessor): - """Processes assistant LLM context frames into timestamped conversation messages.""" + """Processes assistant TTS text frames into timestamped conversation messages. + + This processor aggregates TTS text frames into complete utterances and emits them as + transcript messages. Utterances are completed when: + - The bot stops speaking (BotStoppedSpeakingFrame) + - The bot is interrupted (StartInterruptionFrame) + - The pipeline ends (EndFrame) + + Attributes: + _current_text_parts: List of text fragments being aggregated for current utterance + _aggregation_start_time: Timestamp when the current utterance began + """ def __init__(self, **kwargs): - """Initialize processor with empty message stores.""" + """Initialize processor with aggregation state.""" super().__init__(**kwargs) - self._pending_assistant_messages: List[TranscriptionMessage] = [] + self._current_text_parts: List[str] = [] + self._aggregation_start_time: datetime | None = None - def _extract_messages(self, messages: List[dict]) -> List[TranscriptionMessage]: - """Extract assistant messages from the OpenAI standard message format. + async def _emit_aggregated_text(self): + """Emit aggregated text as a transcript message.""" + if self._current_text_parts and self._aggregation_start_time: + content = " ".join(self._current_text_parts).strip() + if content: + # Format timestamp with 3 decimal places + formatted_timestamp = ( + self._aggregation_start_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "+00:00" + ) + logger.debug(f"Emitting aggregated assistant message: {content}") + message = TranscriptionMessage( + role="assistant", + content=content, + timestamp=formatted_timestamp, + ) + await self._emit_update([message]) + else: + logger.debug("No content to emit after stripping whitespace") - Args: - messages: List of messages in OpenAI format, which can be either: - - Simple format: {"role": "user", "content": "Hello"} - - Content list: {"role": "user", "content": [{"type": "text", "text": "Hello"}]} - - Returns: - List[TranscriptionMessage]: Normalized conversation messages - """ - result = [] - for msg in messages: - if msg["role"] != "assistant": - continue - - content = msg.get("content") - if isinstance(content, str): - if content: - result.append(TranscriptionMessage(role="assistant", content=content)) - elif isinstance(content, list): - text_parts = [] - for part in content: - if isinstance(part, dict) and part.get("type") == "text": - text_parts.append(part["text"]) - - if text_parts: - result.append( - TranscriptionMessage(role="assistant", content=" ".join(text_parts)) - ) - - return result - - def _find_new_messages(self, current: List[TranscriptionMessage]) -> List[TranscriptionMessage]: - """Find unprocessed messages from current list. - - Args: - current: List of current messages - - Returns: - List[TranscriptionMessage]: New messages not yet processed - """ - if not self._processed_messages: - return current - - processed_len = len(self._processed_messages) - if len(current) <= processed_len: - return [] - - return current[processed_len:] + # Reset aggregation state + self._current_text_parts = [] + self._aggregation_start_time = None async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames into assistant conversation messages. + Handles different frame types: + - TTSTextFrame: Aggregates text for current utterance + - BotStoppedSpeakingFrame: Completes current utterance + - StartInterruptionFrame: Completes current utterance due to interruption + - EndFrame: Completes current utterance at pipeline end + Args: frame: Input frame to process direction: Frame processing direction """ await super().process_frame(frame, direction) - if isinstance(frame, OpenAILLMContextFrame): - standard_messages = [] - for msg in frame.context.messages: - converted = frame.context.to_standard_messages(msg) - standard_messages.extend(converted) + if isinstance(frame, TTSTextFrame): + # Start timestamp on first text part + if not self._aggregation_start_time: + self._aggregation_start_time = datetime.now(timezone.utc) - current_messages = self._extract_messages(standard_messages) - new_messages = self._find_new_messages(current_messages) - self._pending_assistant_messages.extend(new_messages) + self._current_text_parts.append(frame.text) - elif isinstance(frame, OpenAILLMContextAssistantTimestampFrame): - if self._pending_assistant_messages: - for msg in self._pending_assistant_messages: - msg.timestamp = frame.timestamp - await self._emit_update(self._pending_assistant_messages) - self._pending_assistant_messages = [] + elif isinstance(frame, BotStoppedSpeakingFrame): + # Emit accumulated text when bot finishes speaking + await self._emit_aggregated_text() + + elif isinstance(frame, StartInterruptionFrame): + # Emit any pending text when interrupted + await self._emit_aggregated_text() + + elif isinstance(frame, EndFrame): + # Emit any remaining text when pipeline ends + await self._emit_aggregated_text() await self.push_frame(frame, direction) @@ -170,8 +167,8 @@ class TranscriptProcessor: llm, tts, transport.output(), + transcript.assistant_tts(), # Assistant transcripts context_aggregator.assistant(), - transcript.assistant(), # Assistant transcripts ] ) From 31c77d8e35b5e1dc6d4574181d80dd4a14f34597 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 20 Jan 2025 14:20:39 -0500 Subject: [PATCH 2/5] Update examples for the updated TranscriptProcessor --- .../28a-transcription-processor-openai.py | 67 +++++++++++++++---- .../28b-transcript-processor-anthropic.py | 67 +++++++++++++++---- .../28c-transcription-processor-gemini.py | 67 +++++++++++++++---- 3 files changed, 165 insertions(+), 36 deletions(-) diff --git a/examples/foundational/28a-transcription-processor-openai.py b/examples/foundational/28a-transcription-processor-openai.py index a83f57aa1..8b5b943bf 100644 --- a/examples/foundational/28a-transcription-processor-openai.py +++ b/examples/foundational/28a-transcription-processor-openai.py @@ -7,7 +7,7 @@ import asyncio import os import sys -from typing import List +from typing import List, Optional import aiohttp from dotenv import load_dotenv @@ -15,7 +15,12 @@ from loguru import logger from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import EndFrame, TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.frames.frames import ( + EndFrame, + StartInterruptionFrame, + TranscriptionMessage, + TranscriptionUpdateFrame, +) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -33,13 +38,49 @@ logger.add(sys.stderr, level="DEBUG") class TranscriptHandler: - """Simple handler to demonstrate transcript processing. + """Handles real-time transcript processing and output. - Maintains a list of conversation messages and logs them with timestamps. + Maintains a list of conversation messages and outputs them either to a log + or to a file as they are received. Each message includes its timestamp and role. + + Attributes: + messages: List of all processed transcript messages + output_file: Optional path to file where transcript is saved. If None, outputs to log only. """ - def __init__(self): + def __init__(self, output_file: Optional[str] = None): + """Initialize handler with optional file output. + + Args: + output_file: Path to output file. If None, outputs to log only. + """ self.messages: List[TranscriptionMessage] = [] + self.output_file: Optional[str] = output_file + logger.debug( + f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}" + ) + + async def save_message(self, message: TranscriptionMessage): + """Save a single transcript message. + + Outputs the message to the log and optionally to a file. + + Args: + message: The message to save + """ + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}{message.role}: {message.content}" + + # Always log the message + logger.info(f"Transcript: {line}") + + # Optionally write to file + if self.output_file: + try: + with open(self.output_file, "a", encoding="utf-8") as f: + f.write(line + "\n") + except Exception as e: + logger.error(f"Error saving transcript message to file: {e}") async def on_transcript_update( self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame @@ -50,13 +91,11 @@ class TranscriptHandler: processor: The TranscriptProcessor that emitted the update frame: TranscriptionUpdateFrame containing new messages """ - self.messages.extend(frame.messages) + logger.debug(f"Received transcript update with {len(frame.messages)} new messages") - # Log the new messages - logger.info("New transcript messages:") for msg in frame.messages: - timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - logger.info(f"{timestamp}{msg.role}: {msg.content}") + self.messages.append(msg) + await self.save_message(msg) async def main(): @@ -99,7 +138,8 @@ async def main(): # Create transcript processor and handler transcript = TranscriptProcessor() - transcript_handler = TranscriptHandler() + transcript_handler = TranscriptHandler() # Output to log only + # transcript_handler = TranscriptHandler(output_file="transcript.txt") # Output to file and log pipeline = Pipeline( [ @@ -110,8 +150,8 @@ async def main(): llm, # LLM tts, # TTS transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses transcript.assistant(), # Assistant transcripts + context_aggregator.assistant(), # Assistant spoken responses ] ) @@ -130,6 +170,9 @@ async def main(): @transport.event_handler("on_participant_left") async def on_participant_left(transport, participant, reason): + # Interrupt the TTS to stop the TTS generation + await task.queue_frame(StartInterruptionFrame()) + # Stop the gracefully stop the pipeline await task.queue_frame(EndFrame()) runner = PipelineRunner() diff --git a/examples/foundational/28b-transcript-processor-anthropic.py b/examples/foundational/28b-transcript-processor-anthropic.py index 9a75dfb72..18074e430 100644 --- a/examples/foundational/28b-transcript-processor-anthropic.py +++ b/examples/foundational/28b-transcript-processor-anthropic.py @@ -7,7 +7,7 @@ import asyncio import os import sys -from typing import List +from typing import List, Optional import aiohttp from dotenv import load_dotenv @@ -15,7 +15,12 @@ from loguru import logger from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import EndFrame, TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.frames.frames import ( + EndFrame, + StartInterruptionFrame, + TranscriptionMessage, + TranscriptionUpdateFrame, +) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -33,13 +38,49 @@ logger.add(sys.stderr, level="DEBUG") class TranscriptHandler: - """Simple handler to demonstrate transcript processing. + """Handles real-time transcript processing and output. - Maintains a list of conversation messages and logs them with timestamps. + Maintains a list of conversation messages and outputs them either to a log + or to a file as they are received. Each message includes its timestamp and role. + + Attributes: + messages: List of all processed transcript messages + output_file: Optional path to file where transcript is saved. If None, outputs to log only. """ - def __init__(self): + def __init__(self, output_file: Optional[str] = None): + """Initialize handler with optional file output. + + Args: + output_file: Path to output file. If None, outputs to log only. + """ self.messages: List[TranscriptionMessage] = [] + self.output_file: Optional[str] = output_file + logger.debug( + f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}" + ) + + async def save_message(self, message: TranscriptionMessage): + """Save a single transcript message. + + Outputs the message to the log and optionally to a file. + + Args: + message: The message to save + """ + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}{message.role}: {message.content}" + + # Always log the message + logger.info(f"Transcript: {line}") + + # Optionally write to file + if self.output_file: + try: + with open(self.output_file, "a", encoding="utf-8") as f: + f.write(line + "\n") + except Exception as e: + logger.error(f"Error saving transcript message to file: {e}") async def on_transcript_update( self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame @@ -50,13 +91,11 @@ class TranscriptHandler: processor: The TranscriptProcessor that emitted the update frame: TranscriptionUpdateFrame containing new messages """ - self.messages.extend(frame.messages) + logger.debug(f"Received transcript update with {len(frame.messages)} new messages") - # Log the new messages - logger.info("New transcript messages:") for msg in frame.messages: - timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - logger.info(f"{timestamp}{msg.role}: {msg.content}") + self.messages.append(msg) + await self.save_message(msg) async def main(): @@ -99,7 +138,8 @@ async def main(): # Create transcript processor and handler transcript = TranscriptProcessor() - transcript_handler = TranscriptHandler() + transcript_handler = TranscriptHandler() # Output to log only + # transcript_handler = TranscriptHandler(output_file="transcript.txt") # Output to file and log pipeline = Pipeline( [ @@ -110,8 +150,8 @@ async def main(): llm, # LLM tts, # TTS transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses transcript.assistant(), # Assistant transcripts + context_aggregator.assistant(), # Assistant spoken responses ] ) @@ -130,6 +170,9 @@ async def main(): @transport.event_handler("on_participant_left") async def on_participant_left(transport, participant, reason): + # Interrupt the TTS to stop the TTS generation + await task.queue_frame(StartInterruptionFrame()) + # Stop the gracefully stop the pipeline await task.queue_frame(EndFrame()) runner = PipelineRunner() diff --git a/examples/foundational/28c-transcription-processor-gemini.py b/examples/foundational/28c-transcription-processor-gemini.py index a04925936..efa400372 100644 --- a/examples/foundational/28c-transcription-processor-gemini.py +++ b/examples/foundational/28c-transcription-processor-gemini.py @@ -7,7 +7,7 @@ import asyncio import os import sys -from typing import List +from typing import List, Optional import aiohttp from dotenv import load_dotenv @@ -15,7 +15,12 @@ from loguru import logger from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import EndFrame, TranscriptionMessage, TranscriptionUpdateFrame +from pipecat.frames.frames import ( + EndFrame, + StartInterruptionFrame, + TranscriptionMessage, + TranscriptionUpdateFrame, +) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -34,13 +39,49 @@ logger.add(sys.stderr, level="DEBUG") class TranscriptHandler: - """Simple handler to demonstrate transcript processing. + """Handles real-time transcript processing and output. - Maintains a list of conversation messages and logs them with timestamps. + Maintains a list of conversation messages and outputs them either to a log + or to a file as they are received. Each message includes its timestamp and role. + + Attributes: + messages: List of all processed transcript messages + output_file: Optional path to file where transcript is saved. If None, outputs to log only. """ - def __init__(self): + def __init__(self, output_file: Optional[str] = None): + """Initialize handler with optional file output. + + Args: + output_file: Path to output file. If None, outputs to log only. + """ self.messages: List[TranscriptionMessage] = [] + self.output_file: Optional[str] = output_file + logger.debug( + f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}" + ) + + async def save_message(self, message: TranscriptionMessage): + """Save a single transcript message. + + Outputs the message to the log and optionally to a file. + + Args: + message: The message to save + """ + timestamp = f"[{message.timestamp}] " if message.timestamp else "" + line = f"{timestamp}{message.role}: {message.content}" + + # Always log the message + logger.info(f"Transcript: {line}") + + # Optionally write to file + if self.output_file: + try: + with open(self.output_file, "a", encoding="utf-8") as f: + f.write(line + "\n") + except Exception as e: + logger.error(f"Error saving transcript message to file: {e}") async def on_transcript_update( self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame @@ -51,13 +92,11 @@ class TranscriptHandler: processor: The TranscriptProcessor that emitted the update frame: TranscriptionUpdateFrame containing new messages """ - self.messages.extend(frame.messages) + logger.debug(f"Received transcript update with {len(frame.messages)} new messages") - # Log the new messages - logger.info("New transcript messages:") for msg in frame.messages: - timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - logger.info(f"{timestamp}{msg.role}: {msg.content}") + self.messages.append(msg) + await self.save_message(msg) async def main(): @@ -102,7 +141,8 @@ async def main(): # Create transcript processor and handler transcript = TranscriptProcessor() - transcript_handler = TranscriptHandler() + transcript_handler = TranscriptHandler() # Output to log only + # transcript_handler = TranscriptHandler(output_file="transcript.txt") # Output to file and log pipeline = Pipeline( [ @@ -113,8 +153,8 @@ async def main(): llm, # LLM tts, # TTS transport.output(), # Transport bot output - context_aggregator.assistant(), # Assistant spoken responses transcript.assistant(), # Assistant transcripts + context_aggregator.assistant(), # Assistant spoken responses ] ) @@ -140,6 +180,9 @@ async def main(): @transport.event_handler("on_participant_left") async def on_participant_left(transport, participant, reason): + # Interrupt the TTS to stop the TTS generation + await task.queue_frame(StartInterruptionFrame()) + # Stop the gracefully stop the pipeline await task.queue_frame(EndFrame()) runner = PipelineRunner() From c2fe8e7fdbaa1ef059e13d9fdb8decfbdcfef786 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 20 Jan 2025 14:24:13 -0500 Subject: [PATCH 3/5] Updated CHANGELOG --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index be621b1d2..2ff694fc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - It is now possible to specify the period of the `PipelineTask` heartbeat frames with `heartbeats_period_secs`. +### Changed + +- Modified `TranscriptProcessor` to use TTS text frames for more accurate assistant + transcripts. Assistant messages are now aggregated based on bot speaking boundaries + rather than LLM context, providing better handling of interruptions and partial + utterances. + +- Updated foundational examples `28a-transcription-processor-openai.py`, + `28b-transcript-processor-anthropic.py`, and + `28c-transcription-processor-gemini.py` to use the updated + `TranscriptProcessor`. + ### Fixed - Fixed a type error when using `voice_settings` in `ElevenLabsHttpTTSService`. From e1430be9f9d094a963d1be7488877bb3d056ebbe Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 20 Jan 2025 20:23:07 -0500 Subject: [PATCH 4/5] Code review fixes --- .../processors/transcript_processor.py | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py index 47e73307e..c1a59a166 100644 --- a/src/pipecat/processors/transcript_processor.py +++ b/src/pipecat/processors/transcript_processor.py @@ -4,8 +4,7 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from datetime import datetime, timezone -from typing import List +from typing import List, Optional from loguru import logger @@ -20,6 +19,7 @@ from pipecat.frames.frames import ( TTSTextFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.utils.time import time_now_iso8601 class BaseTranscriptProcessor(FrameProcessor): @@ -86,22 +86,18 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor): """Initialize processor with aggregation state.""" super().__init__(**kwargs) self._current_text_parts: List[str] = [] - self._aggregation_start_time: datetime | None = None + self._aggregation_start_time: Optional[str] | None = None async def _emit_aggregated_text(self): """Emit aggregated text as a transcript message.""" if self._current_text_parts and self._aggregation_start_time: content = " ".join(self._current_text_parts).strip() if content: - # Format timestamp with 3 decimal places - formatted_timestamp = ( - self._aggregation_start_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "+00:00" - ) logger.debug(f"Emitting aggregated assistant message: {content}") message = TranscriptionMessage( role="assistant", content=content, - timestamp=formatted_timestamp, + timestamp=self._aggregation_start_time, ) await self._emit_update([message]) else: @@ -129,16 +125,12 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor): if isinstance(frame, TTSTextFrame): # Start timestamp on first text part if not self._aggregation_start_time: - self._aggregation_start_time = datetime.now(timezone.utc) + self._aggregation_start_time = time_now_iso8601() self._current_text_parts.append(frame.text) - elif isinstance(frame, BotStoppedSpeakingFrame): - # Emit accumulated text when bot finishes speaking - await self._emit_aggregated_text() - - elif isinstance(frame, StartInterruptionFrame): - # Emit any pending text when interrupted + elif isinstance(frame, (BotStoppedSpeakingFrame, StartInterruptionFrame)): + # Emit accumulated text when bot finishes speaking or is interrupted await self._emit_aggregated_text() elif isinstance(frame, EndFrame): From 7167719761c6eba936748f678e8ef81fbf60e29a Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Wed, 22 Jan 2025 14:55:15 -0500 Subject: [PATCH 5/5] Emit a transcription callback when receiving a CancelFrame, update examples accordingly --- .../foundational/28a-transcription-processor-openai.py | 9 +++------ .../foundational/28b-transcript-processor-anthropic.py | 9 +++------ .../foundational/28c-transcription-processor-gemini.py | 9 +++------ src/pipecat/processors/transcript_processor.py | 4 +++- 4 files changed, 12 insertions(+), 19 deletions(-) diff --git a/examples/foundational/28a-transcription-processor-openai.py b/examples/foundational/28a-transcription-processor-openai.py index 8b5b943bf..7f1d3dc25 100644 --- a/examples/foundational/28a-transcription-processor-openai.py +++ b/examples/foundational/28a-transcription-processor-openai.py @@ -16,8 +16,7 @@ from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( - EndFrame, - StartInterruptionFrame, + CancelFrame, TranscriptionMessage, TranscriptionUpdateFrame, ) @@ -170,10 +169,8 @@ async def main(): @transport.event_handler("on_participant_left") async def on_participant_left(transport, participant, reason): - # Interrupt the TTS to stop the TTS generation - await task.queue_frame(StartInterruptionFrame()) - # Stop the gracefully stop the pipeline - await task.queue_frame(EndFrame()) + # Stop the pipeline immediately when the participant leaves + await task.queue_frame(CancelFrame()) runner = PipelineRunner() diff --git a/examples/foundational/28b-transcript-processor-anthropic.py b/examples/foundational/28b-transcript-processor-anthropic.py index 18074e430..b9f95db66 100644 --- a/examples/foundational/28b-transcript-processor-anthropic.py +++ b/examples/foundational/28b-transcript-processor-anthropic.py @@ -16,8 +16,7 @@ from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( - EndFrame, - StartInterruptionFrame, + CancelFrame, TranscriptionMessage, TranscriptionUpdateFrame, ) @@ -170,10 +169,8 @@ async def main(): @transport.event_handler("on_participant_left") async def on_participant_left(transport, participant, reason): - # Interrupt the TTS to stop the TTS generation - await task.queue_frame(StartInterruptionFrame()) - # Stop the gracefully stop the pipeline - await task.queue_frame(EndFrame()) + # Stop the pipeline immediately when the participant leaves + await task.queue_frame(CancelFrame()) runner = PipelineRunner() diff --git a/examples/foundational/28c-transcription-processor-gemini.py b/examples/foundational/28c-transcription-processor-gemini.py index efa400372..377274ae1 100644 --- a/examples/foundational/28c-transcription-processor-gemini.py +++ b/examples/foundational/28c-transcription-processor-gemini.py @@ -16,8 +16,7 @@ from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.frames.frames import ( - EndFrame, - StartInterruptionFrame, + CancelFrame, TranscriptionMessage, TranscriptionUpdateFrame, ) @@ -180,10 +179,8 @@ async def main(): @transport.event_handler("on_participant_left") async def on_participant_left(transport, participant, reason): - # Interrupt the TTS to stop the TTS generation - await task.queue_frame(StartInterruptionFrame()) - # Stop the gracefully stop the pipeline - await task.queue_frame(EndFrame()) + # Stop the pipeline immediately when the participant leaves + await task.queue_frame(CancelFrame()) runner = PipelineRunner() diff --git a/src/pipecat/processors/transcript_processor.py b/src/pipecat/processors/transcript_processor.py index c1a59a166..d31310f84 100644 --- a/src/pipecat/processors/transcript_processor.py +++ b/src/pipecat/processors/transcript_processor.py @@ -10,6 +10,7 @@ from loguru import logger from pipecat.frames.frames import ( BotStoppedSpeakingFrame, + CancelFrame, EndFrame, Frame, StartInterruptionFrame, @@ -115,6 +116,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor): - BotStoppedSpeakingFrame: Completes current utterance - StartInterruptionFrame: Completes current utterance due to interruption - EndFrame: Completes current utterance at pipeline end + - CancelFrame: Completes current utterance due to cancellation Args: frame: Input frame to process @@ -129,7 +131,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor): self._current_text_parts.append(frame.text) - elif isinstance(frame, (BotStoppedSpeakingFrame, StartInterruptionFrame)): + elif isinstance(frame, (BotStoppedSpeakingFrame, StartInterruptionFrame, CancelFrame)): # Emit accumulated text when bot finishes speaking or is interrupted await self._emit_aggregated_text()