Compare commits

...

3 Commits

Author SHA1 Message Date
Filipi Fuchter
332a6b3669 Addressing the comments left in the PR review. 2025-09-23 18:33:55 -03:00
Filipi Fuchter
be570f593f Overriding the _start_interruption method. 2025-09-23 16:33:16 -03:00
Filipi Fuchter
12dcc87030 Fixed an issue in BaseOpenAILLMService that could cause the pipeline to freeze under certain race conditions. 2025-09-23 16:23:57 -03:00
3 changed files with 117 additions and 5 deletions

View File

@@ -102,6 +102,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue in BaseOpenAILLMService that could cause the pipeline to freeze
under certain race conditions.
- Fixed a `BaseOutputTransport` issue that could produce large saved
`AudioBufferProcessor` files when using an audio mixer.

View File

@@ -41,6 +41,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.utils.asyncio.cancellable_stream import CancellableStream
from pipecat.utils.tracing.service_decorators import traced_llm
@@ -138,6 +139,7 @@ class BaseOpenAILLMService(LLMService):
default_headers=default_headers,
**kwargs,
)
self._stream: Optional[CancellableStream[AsyncStream[AsyncStream]]] = None
def create_client(
self,
@@ -329,13 +331,15 @@ class BaseOpenAILLMService(LLMService):
await self.start_ttfb_metrics()
# Generate chat completions using either OpenAILLMContext or universal LLMContext
chunk_stream = await (
self._stream_chat_completions_specific_context(context)
if isinstance(context, OpenAILLMContext)
else self._stream_chat_completions_universal_context(context)
self._stream = CancellableStream(
await (
self._stream_chat_completions_specific_context(context)
if isinstance(context, OpenAILLMContext)
else self._stream_chat_completions_universal_context(context)
)
)
async for chunk in chunk_stream:
async for chunk in self._stream:
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,
@@ -389,6 +393,8 @@ class BaseOpenAILLMService(LLMService):
):
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.audio["transcript"]))
self._stream = None
# if we got a function name and arguments, check to see if it's a function with
# a registered handler. If so, run the registered callback, save the result to
# the context, and re-prompt to get a chat answer. If we don't have a registered
@@ -416,6 +422,22 @@ class BaseOpenAILLMService(LLMService):
await self.run_function_calls(function_calls)
async def _start_interruption(self):
"""Start handling an interruption by cancelling current tasks."""
# OpenAI's client swallows asyncio.CancelledError internally, which prevents proper
# task cancellation propagation. To ensure proper cancellation behavior:
# 1. We check if there's an active chunk stream when receiving an interruption
# 2. We explicitly cancel the chunk stream first
# 3. This allows the task to be cancelled cleanly afterwards
# This approach ensures we don't get stuck in case there was a chunk processing loop
# when cancellation is requested.
if self._stream:
logger.debug(f"{self}: Cancelling chunk stream due to interruption")
await self._stream.cancel()
self._stream = None
await super()._start_interruption()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for LLM completion requests.

View File

@@ -0,0 +1,87 @@
"""Provides a wrapper class for making async streams cancellable.
This module implements a CancellableStream class that wraps any async iterator,
adding the ability to safely cancel iteration at any point. This is particularly
useful for handling cleanup of async streams in scenarios where early termination
is required.
The module provides functionality for:
- Wrapping any async iterator with cancellation capabilities
- Safe termination of async streams
- Proper cleanup and synchronization during cancellation
"""
import asyncio
from typing import AsyncIterator, Generic, TypeVar
T = TypeVar("T")
class CancellableStream(Generic[T]):
"""A wrapper around an async stream that can be cancelled."""
def __init__(self, stream: AsyncIterator[T]) -> None:
"""Initialize a cancellable stream wrapper.
Creates a wrapper around an async iterator that can be cancelled mid-iteration.
The wrapper maintains state about cancellation requests and iteration status.
Args:
stream: The async iterator to wrap. This is the source stream that will
be iterated over until either exhaustion or cancellation.
Attributes:
_stream: The wrapped async iterator
_cancel_future: A future that completes when cancellation is acknowledged
_cancel_requested: Flag indicating if cancellation has been requested
_iter_started: Flag indicating if iteration has begun
"""
self._stream: AsyncIterator[T] = stream
self._cancel_future: asyncio.Future[None] | None = None
self._cancel_requested: bool = False
self._iter_started: bool = False
async def cancel(self) -> None:
"""Request stream cancellation and wait for acknowledgment.
Sets up cancellation state and waits until the cancellation is acknowledged
by the next iteration attempt. If iteration hasn't started yet, the
cancellation is acknowledged immediately.
The method will:
1. Create a cancellation future if one doesn't exist
2. Mark the stream as cancelled
3. If iteration hasn't started, complete the future immediately
4. Otherwise, wait for the next iteration to acknowledge cancellation
Returns:
None: The method returns when cancellation is acknowledged.
"""
if self._cancel_future is None:
self._cancel_future = asyncio.get_event_loop().create_future()
self._cancel_requested = True
# If iteration has not started, we complete the future immediately
if not self._iter_started:
self._cancel_future.set_result(None)
await self._cancel_future
def __aiter__(self) -> "CancellableStream[T]":
self._iter_started = True
return self
async def __anext__(self) -> T:
if self._cancel_requested:
# Complete the future if cancellation was requested
if self._cancel_future and not self._cancel_future.done():
self._cancel_future.set_result(None)
raise StopAsyncIteration
try:
return await self._stream.__anext__()
except StopAsyncIteration:
# also complete cancel future if iteration naturally ends
if self._cancel_future and not self._cancel_future.done():
self._cancel_future.set_result(None)
raise