diff --git a/examples/foundational/45-openai-agent-basic.py b/examples/foundational/45-openai-agent-basic.py index 6d7456b7e..dc821add8 100644 --- a/examples/foundational/45-openai-agent-basic.py +++ b/examples/foundational/45-openai-agent-basic.py @@ -172,7 +172,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): "I can help you with weather information, share interesting facts, " "or just have a conversation. What would you like to know?" ), - EndFrame(), + # Don't send EndFrame() here - that closes the pipeline! + # The conversation should continue after the greeting ] ) diff --git a/src/pipecat/services/openai_agent/agent_service.py b/src/pipecat/services/openai_agent/agent_service.py index 5a6461cdc..49d63a039 100644 --- a/src/pipecat/services/openai_agent/agent_service.py +++ b/src/pipecat/services/openai_agent/agent_service.py @@ -288,6 +288,8 @@ class OpenAIAgentService(AIService): self._agent, input_text, context=self._session_config ) + has_streaming_deltas = False + # Process the stream events async for event in result.stream_events(): if event.type == "raw_response_event": @@ -296,19 +298,23 @@ class OpenAIAgentService(AIService): if hasattr(event.data, "delta") and getattr(event.data, "delta", None): delta_text = getattr(event.data, "delta", "") if delta_text: + has_streaming_deltas = True + self._accumulated_text += delta_text await self.push_frame(LLMTextFrame(text=delta_text)) elif event.type == "run_item_stream_event": # Handle completed items if event.item.type == "message_output_item": - # Get the complete message text - message_text = self._extract_message_text(event.item) - if message_text and message_text != self._accumulated_text: - # Send any new text that wasn't already streamed - new_text = message_text[len(self._accumulated_text) :] - if new_text: - await self.push_frame(LLMTextFrame(text=new_text)) - self._accumulated_text = message_text + # Only process complete message if we didn't get streaming deltas + if not has_streaming_deltas: + message_text = self._extract_message_text(event.item) + logger.debug( + f"Processing complete message (no deltas): {message_text[:50]}..." + if len(message_text) > 50 + else f"Processing complete message: {message_text}" + ) + if message_text: + await self.push_frame(LLMTextFrame(text=message_text)) elif event.item.type == "tool_call_item": # Use getattr for safe attribute access @@ -359,8 +365,27 @@ class OpenAIAgentService(AIService): The extracted text content. """ try: - # Handle different message item formats - if hasattr(item, "content"): + # Handle OpenAI Agents SDK MessageOutputItem format + if hasattr(item, "raw_item") and hasattr(item.raw_item, "content"): + content = item.raw_item.content + if isinstance(content, list): + text_parts = [] + for content_part in content: + if hasattr(content_part, "text"): + text_parts.append(content_part.text) + elif ( + isinstance(content_part, dict) + and content_part.get("type") == "output_text" + ): + text_parts.append(content_part.get("text", "")) + elif isinstance(content_part, dict) and content_part.get("type") == "text": + text_parts.append(content_part.get("text", "")) + return "".join(text_parts) + elif isinstance(content, str): + return content + + # Handle direct content attribute + elif hasattr(item, "content"): if isinstance(item.content, str): return item.content elif isinstance(item.content, list): @@ -373,8 +398,9 @@ class OpenAIAgentService(AIService): text_parts.append(content_part) return "".join(text_parts) - # Fallback: try to get text through string conversion - return str(item) + # If no text content found, return empty string instead of str(item) + logger.debug(f"No extractable text content found in item: {type(item)}") + return "" except Exception as e: logger.warning(f"Could not extract text from message item: {e}")