Compare commits

...

3 Commits

Author SHA1 Message Date
James Hush
1b28fc8e8e Fix: Ensure EndFrame propagates through AIService before stop()
This fix addresses a critical bug where EndFrame (and potentially other
system frames) would trigger the stop() method in AIService but never
be pushed downstream to subsequent processors, causing pipelines to hang.

The issue occurred because AIService.process_frame() would call stop(frame)
for EndFrame without first pushing it downstream. This meant that downstream
processors never received the shutdown signal, leaving the pipeline in a
waiting state.

The fix ensures EndFrame is pushed downstream BEFORE calling stop(), following
the same pattern used by RTVIProcessor and properly-implemented processors.
This guarantees that:
1. Downstream processors receive the EndFrame for proper cleanup
2. The stop() method can then safely perform service-specific cleanup
3. The ordering prevents race conditions during shutdown

This bug affected all AI services inheriting from AIService that didn't
override process_frame() to explicitly handle EndFrame, including scenarios
with TTS services, LLM services, and other AI service implementations.

Fixes pipeline hangs during graceful shutdown when EndFrame is sent.
2025-11-17 11:09:42 +01:00
Mark Backman
35ff44b799 Merge pull request #3059 from pipecat-ai/mb/remove-llm-tracing-fallback 2025-11-14 14:07:40 -05:00
Mark Backman
d01876ee60 Remove fallbacks in traced_llm 2025-11-14 12:13:49 -05:00
3 changed files with 9 additions and 16 deletions

View File

@@ -152,6 +152,9 @@ class AIService(FrameProcessor):
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() may wait on tasks to
# finish and downstream processors need to receive the EndFrame.
await self.push_frame(frame, direction)
await self.stop(frame)
async def process_generator(self, generator: AsyncGenerator[Frame | None, None]):

View File

@@ -23,7 +23,7 @@ if TYPE_CHECKING:
from opentelemetry import context as context_api
from opentelemetry import trace
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_context import NOT_GIVEN, LLMContext
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.utils.tracing.service_attributes import (
add_gemini_live_span_attributes,
@@ -399,11 +399,6 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
if hasattr(self, "get_llm_adapter"):
adapter = self.get_llm_adapter()
messages = adapter.get_messages_for_logging(context)
elif hasattr(context, "get_messages"):
# Fallback for unknown context types
messages = context.get_messages()
elif hasattr(context, "messages"):
messages = context.messages
# Serialize messages if available
if messages:
@@ -424,15 +419,10 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
if hasattr(self, "get_llm_adapter") and hasattr(context, "tools"):
adapter = self.get_llm_adapter()
tools = adapter.from_standard_tools(context.tools)
elif hasattr(context, "tools"):
# Fallback for unknown context types
tools = context.tools
# Serialize and count tools if available
# Check if tools is not None and not NOT_GIVEN (using attribute check as fallback)
if tools is not None and not (
hasattr(tools, "__name__") and tools.__name__ == "NOT_GIVEN"
):
# Check if tools is not None and not NOT_GIVEN
if tools is not None and tools is not NOT_GIVEN:
serialized_tools = json.dumps(tools)
tool_count = len(tools) if isinstance(tools, list) else 1

6
uv.lock generated
View File

@@ -36,12 +36,12 @@ wheels = [
[[package]]
name = "aic-sdk"
version = "1.0.2"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy" },
]
sdist = { url = "https://files.pythonhosted.org/packages/51/90/b02e853e863c303f8456c689b42ac24ad403b781adc9642d0a91ed4bed7e/aic_sdk-1.0.2.tar.gz", hash = "sha256:239097dd3aaa8a8a0fd7542b75d2510cb34144caec796370639b7c636acbc56e", size = 32059, upload-time = "2025-08-24T09:20:03.9Z" }
sdist = { url = "https://files.pythonhosted.org/packages/99/83/bf38b95d98c67b8ebc574fb4a4f23c07a3740b51992d7522976173d30b98/aic_sdk-1.1.0.tar.gz", hash = "sha256:04e08df695581c8cb4db8acca20e73815e9f449e7bd08e0162fd55518c727963", size = 34954, upload-time = "2025-11-11T20:45:24.25Z" }
[[package]]
name = "aioboto3"
@@ -4647,7 +4647,7 @@ docs = [
[package.metadata]
requires-dist = [
{ name = "accelerate", marker = "extra == 'moondream'", specifier = "~=1.10.0" },
{ name = "aic-sdk", marker = "extra == 'aic'", specifier = "~=1.0.1" },
{ name = "aic-sdk", marker = "extra == 'aic'", specifier = "~=1.1.0" },
{ name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.0.0" },
{ name = "aiofiles", specifier = ">=24.1.0,<25" },
{ name = "aiohttp", specifier = ">=3.11.12,<4" },