Revert "Another change to support Gemini Live in Pipecat Flows: avoid triggering a model response twice when there is a Flows node transition that returns a value from the transition function and loads new context messages in the new node"

This reverts commit 90e6f0dca8.
This commit is contained in:
Paul Kompfner
2026-02-03 17:02:31 -05:00
parent 90e6f0dca8
commit 3183f9c077

View File

@@ -1057,9 +1057,6 @@ class GeminiLiveLLMService(LLMService):
# Trigger "initial" response with new connection
await self._create_initial_response()
else:
# Check for newly-completed function calls first (peek without sending results)
new_results = await self._process_completed_function_calls(send_new_results=False)
# If messages were appended, they might be:
# - Messages generated by Gemini Live itself, which the remote
# service and its internal context management is already
@@ -1067,18 +1064,17 @@ class GeminiLiveLLMService(LLMService):
# - Messages appended programmatically by the user (e.g. via
# an LLMMessagesAppendFrame). We need to send these to the
# remote Gemini Live service.
# - Function call results, which are handled separately.
# - Function call results, which are handled separately below.
if diff.messages_appended and messages_programmatically_edited:
# Messages were programmatically edited - send to the API
trigger_response = not new_results
await self._send_new_messages(
diff.messages_appended, trigger_response=trigger_response
logger.debug(
"Context update includes programmatically-appended messages. Creating incremental response..."
)
await self._create_incremental_response(diff.messages_appended)
# Send results for newly-completed function calls, if any.
if new_results:
for tool_call_id, tool_name, response in new_results:
await self._send_tool_result(tool_call_id, tool_name, response)
logger.debug("Checking for newly-completed function call results...")
await self._process_completed_function_calls(send_new_results=True)
def _context_update_requires_reconnect(self, diff: LLMContextDiff) -> bool:
"""Check if an update to our LLM context requires reconnection.
@@ -1099,20 +1095,8 @@ class GeminiLiveLLMService(LLMService):
return False
async def _process_completed_function_calls(
self, send_new_results: bool
) -> List[tuple[str, str, dict]]:
"""Check for and optionally send newly-completed function call results.
Args:
send_new_results: Whether to send new results to the service.
Set to False to "peek" at pending results without sending, or
to just update bookkeeping of completed tool calls.
Returns:
List of (tool_call_id, tool_name, response) tuples for new results.
"""
new_results: List[tuple[str, str, dict]] = []
async def _process_completed_function_calls(self, send_new_results: bool):
# Check for set of completed function calls in the context
adapter: GeminiLLMAdapter = self.get_llm_adapter()
messages = adapter.get_llm_invocation_params(self._context).get("messages", [])
for message in messages:
@@ -1128,12 +1112,15 @@ class GeminiLiveLLMService(LLMService):
and response
and response.get("value") != "IN_PROGRESS"
):
# Found a newly-completed function call
new_results.append((tool_call_id, tool_name, response))
# Found a newly-completed function call - send the result to the service
if send_new_results:
await self._send_tool_result(tool_call_id, tool_name, response)
logger.debug(
f"Sending newly-completed tool call result for tool '{tool_name}'"
)
await self._tool_result(
tool_call_id, tool_name, part.function_response.response
)
self._completed_tool_calls.add(tool_call_id)
return new_results
async def _set_bot_is_responding(self, responding: bool):
if self._bot_is_responding == responding:
@@ -1494,14 +1481,11 @@ class GeminiLiveLLMService(LLMService):
if not self._inference_on_context_initialization:
self._needs_turn_complete_message = True
async def _send_new_messages(
self, new_messages: List[LLMContextMessage], trigger_response: bool
):
"""Send new messages mid-conversation.
async def _create_incremental_response(self, new_messages: List[LLMContextMessage]):
"""Create a new response mid-conversation with newly-appended messages.
Args:
new_messages: The new messages to send.
trigger_response: Whether to trigger a model response.
"""
if self._disconnecting or not self._session or not new_messages:
return
@@ -1514,12 +1498,12 @@ class GeminiLiveLLMService(LLMService):
if not messages:
return
logger.debug(f"Sending new messages: {messages}. Triggering response: {trigger_response}")
logger.debug(f"Creating response with new messages: {messages}")
await self.start_ttfb_metrics()
try:
await self._session.send_client_content(turns=messages, turn_complete=trigger_response)
await self._session.send_client_content(turns=messages, turn_complete=True)
except Exception as e:
await self._handle_send_error(e)
@@ -1549,7 +1533,7 @@ class GeminiLiveLLMService(LLMService):
await self._handle_send_error(e)
@traced_gemini_live(operation="llm_tool_result")
async def _send_tool_result(
async def _tool_result(
self, tool_call_id: str, tool_name: str, tool_result_message: Dict[str, Any]
):
"""Send tool result back to the API."""
@@ -1561,7 +1545,6 @@ class GeminiLiveLLMService(LLMService):
response = FunctionResponse(name=tool_name, id=tool_call_id, response=tool_result_message)
try:
logger.debug(f"Sending response for tool '{tool_name}'")
await self._session.send_tool_response(function_responses=response)
except Exception as e:
await self._handle_send_error(e)