Compare commits
3 Commits
v0.0.94
...
filipi/fre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
332a6b3669 | ||
|
|
be570f593f | ||
|
|
12dcc87030 |
@@ -102,6 +102,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue in BaseOpenAILLMService that could cause the pipeline to freeze
|
||||
under certain race conditions.
|
||||
|
||||
- Fixed a `BaseOutputTransport` issue that could produce large saved
|
||||
`AudioBufferProcessor` files when using an audio mixer.
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
|
||||
from pipecat.utils.asyncio.cancellable_stream import CancellableStream
|
||||
from pipecat.utils.tracing.service_decorators import traced_llm
|
||||
|
||||
|
||||
@@ -138,6 +139,7 @@ class BaseOpenAILLMService(LLMService):
|
||||
default_headers=default_headers,
|
||||
**kwargs,
|
||||
)
|
||||
self._stream: Optional[CancellableStream[AsyncStream[AsyncStream]]] = None
|
||||
|
||||
def create_client(
|
||||
self,
|
||||
@@ -329,13 +331,15 @@ class BaseOpenAILLMService(LLMService):
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
# Generate chat completions using either OpenAILLMContext or universal LLMContext
|
||||
chunk_stream = await (
|
||||
self._stream_chat_completions_specific_context(context)
|
||||
if isinstance(context, OpenAILLMContext)
|
||||
else self._stream_chat_completions_universal_context(context)
|
||||
self._stream = CancellableStream(
|
||||
await (
|
||||
self._stream_chat_completions_specific_context(context)
|
||||
if isinstance(context, OpenAILLMContext)
|
||||
else self._stream_chat_completions_universal_context(context)
|
||||
)
|
||||
)
|
||||
|
||||
async for chunk in chunk_stream:
|
||||
async for chunk in self._stream:
|
||||
if chunk.usage:
|
||||
tokens = LLMTokenUsage(
|
||||
prompt_tokens=chunk.usage.prompt_tokens,
|
||||
@@ -389,6 +393,8 @@ class BaseOpenAILLMService(LLMService):
|
||||
):
|
||||
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.audio["transcript"]))
|
||||
|
||||
self._stream = None
|
||||
|
||||
# if we got a function name and arguments, check to see if it's a function with
|
||||
# a registered handler. If so, run the registered callback, save the result to
|
||||
# the context, and re-prompt to get a chat answer. If we don't have a registered
|
||||
@@ -416,6 +422,22 @@ class BaseOpenAILLMService(LLMService):
|
||||
|
||||
await self.run_function_calls(function_calls)
|
||||
|
||||
async def _start_interruption(self):
|
||||
"""Start handling an interruption by cancelling current tasks."""
|
||||
# OpenAI's client swallows asyncio.CancelledError internally, which prevents proper
|
||||
# task cancellation propagation. To ensure proper cancellation behavior:
|
||||
# 1. We check if there's an active chunk stream when receiving an interruption
|
||||
# 2. We explicitly cancel the chunk stream first
|
||||
# 3. This allows the task to be cancelled cleanly afterwards
|
||||
# This approach ensures we don't get stuck in case there was a chunk processing loop
|
||||
# when cancellation is requested.
|
||||
if self._stream:
|
||||
logger.debug(f"{self}: Cancelling chunk stream due to interruption")
|
||||
await self._stream.cancel()
|
||||
self._stream = None
|
||||
|
||||
await super()._start_interruption()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process frames for LLM completion requests.
|
||||
|
||||
|
||||
87
src/pipecat/utils/asyncio/cancellable_stream.py
Normal file
87
src/pipecat/utils/asyncio/cancellable_stream.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""Provides a wrapper class for making async streams cancellable.
|
||||
|
||||
This module implements a CancellableStream class that wraps any async iterator,
|
||||
adding the ability to safely cancel iteration at any point. This is particularly
|
||||
useful for handling cleanup of async streams in scenarios where early termination
|
||||
is required.
|
||||
|
||||
The module provides functionality for:
|
||||
- Wrapping any async iterator with cancellation capabilities
|
||||
- Safe termination of async streams
|
||||
- Proper cleanup and synchronization during cancellation
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncIterator, Generic, TypeVar
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class CancellableStream(Generic[T]):
|
||||
"""A wrapper around an async stream that can be cancelled."""
|
||||
|
||||
def __init__(self, stream: AsyncIterator[T]) -> None:
|
||||
"""Initialize a cancellable stream wrapper.
|
||||
|
||||
Creates a wrapper around an async iterator that can be cancelled mid-iteration.
|
||||
The wrapper maintains state about cancellation requests and iteration status.
|
||||
|
||||
Args:
|
||||
stream: The async iterator to wrap. This is the source stream that will
|
||||
be iterated over until either exhaustion or cancellation.
|
||||
|
||||
Attributes:
|
||||
_stream: The wrapped async iterator
|
||||
_cancel_future: A future that completes when cancellation is acknowledged
|
||||
_cancel_requested: Flag indicating if cancellation has been requested
|
||||
_iter_started: Flag indicating if iteration has begun
|
||||
"""
|
||||
self._stream: AsyncIterator[T] = stream
|
||||
self._cancel_future: asyncio.Future[None] | None = None
|
||||
self._cancel_requested: bool = False
|
||||
self._iter_started: bool = False
|
||||
|
||||
async def cancel(self) -> None:
|
||||
"""Request stream cancellation and wait for acknowledgment.
|
||||
|
||||
Sets up cancellation state and waits until the cancellation is acknowledged
|
||||
by the next iteration attempt. If iteration hasn't started yet, the
|
||||
cancellation is acknowledged immediately.
|
||||
|
||||
The method will:
|
||||
1. Create a cancellation future if one doesn't exist
|
||||
2. Mark the stream as cancelled
|
||||
3. If iteration hasn't started, complete the future immediately
|
||||
4. Otherwise, wait for the next iteration to acknowledge cancellation
|
||||
|
||||
Returns:
|
||||
None: The method returns when cancellation is acknowledged.
|
||||
"""
|
||||
if self._cancel_future is None:
|
||||
self._cancel_future = asyncio.get_event_loop().create_future()
|
||||
self._cancel_requested = True
|
||||
|
||||
# If iteration has not started, we complete the future immediately
|
||||
if not self._iter_started:
|
||||
self._cancel_future.set_result(None)
|
||||
|
||||
await self._cancel_future
|
||||
|
||||
def __aiter__(self) -> "CancellableStream[T]":
|
||||
self._iter_started = True
|
||||
return self
|
||||
|
||||
async def __anext__(self) -> T:
|
||||
if self._cancel_requested:
|
||||
# Complete the future if cancellation was requested
|
||||
if self._cancel_future and not self._cancel_future.done():
|
||||
self._cancel_future.set_result(None)
|
||||
raise StopAsyncIteration
|
||||
|
||||
try:
|
||||
return await self._stream.__anext__()
|
||||
except StopAsyncIteration:
|
||||
# also complete cancel future if iteration naturally ends
|
||||
if self._cancel_future and not self._cancel_future.done():
|
||||
self._cancel_future.set_result(None)
|
||||
raise
|
||||
Reference in New Issue
Block a user