diff --git a/src/pipecat/services/google/gemini_live/llm.py b/src/pipecat/services/google/gemini_live/llm.py index 62df75cfa..c1af46745 100644 --- a/src/pipecat/services/google/gemini_live/llm.py +++ b/src/pipecat/services/google/gemini_live/llm.py @@ -1057,9 +1057,6 @@ class GeminiLiveLLMService(LLMService): # Trigger "initial" response with new connection await self._create_initial_response() else: - # Check for newly-completed function calls first (peek without sending results) - new_results = await self._process_completed_function_calls(send_new_results=False) - # If messages were appended, they might be: # - Messages generated by Gemini Live itself, which the remote # service and its internal context management is already @@ -1067,18 +1064,17 @@ class GeminiLiveLLMService(LLMService): # - Messages appended programmatically by the user (e.g. via # an LLMMessagesAppendFrame). We need to send these to the # remote Gemini Live service. - # - Function call results, which are handled separately. + # - Function call results, which are handled separately below. if diff.messages_appended and messages_programmatically_edited: # Messages were programmatically edited - send to the API - trigger_response = not new_results - await self._send_new_messages( - diff.messages_appended, trigger_response=trigger_response + logger.debug( + "Context update includes programmatically-appended messages. Creating incremental response..." ) + await self._create_incremental_response(diff.messages_appended) # Send results for newly-completed function calls, if any. - if new_results: - for tool_call_id, tool_name, response in new_results: - await self._send_tool_result(tool_call_id, tool_name, response) + logger.debug("Checking for newly-completed function call results...") + await self._process_completed_function_calls(send_new_results=True) def _context_update_requires_reconnect(self, diff: LLMContextDiff) -> bool: """Check if an update to our LLM context requires reconnection. @@ -1099,20 +1095,8 @@ class GeminiLiveLLMService(LLMService): return False - async def _process_completed_function_calls( - self, send_new_results: bool - ) -> List[tuple[str, str, dict]]: - """Check for and optionally send newly-completed function call results. - - Args: - send_new_results: Whether to send new results to the service. - Set to False to "peek" at pending results without sending, or - to just update bookkeeping of completed tool calls. - - Returns: - List of (tool_call_id, tool_name, response) tuples for new results. - """ - new_results: List[tuple[str, str, dict]] = [] + async def _process_completed_function_calls(self, send_new_results: bool): + # Check for set of completed function calls in the context adapter: GeminiLLMAdapter = self.get_llm_adapter() messages = adapter.get_llm_invocation_params(self._context).get("messages", []) for message in messages: @@ -1128,12 +1112,15 @@ class GeminiLiveLLMService(LLMService): and response and response.get("value") != "IN_PROGRESS" ): - # Found a newly-completed function call - new_results.append((tool_call_id, tool_name, response)) + # Found a newly-completed function call - send the result to the service if send_new_results: - await self._send_tool_result(tool_call_id, tool_name, response) + logger.debug( + f"Sending newly-completed tool call result for tool '{tool_name}'" + ) + await self._tool_result( + tool_call_id, tool_name, part.function_response.response + ) self._completed_tool_calls.add(tool_call_id) - return new_results async def _set_bot_is_responding(self, responding: bool): if self._bot_is_responding == responding: @@ -1494,14 +1481,11 @@ class GeminiLiveLLMService(LLMService): if not self._inference_on_context_initialization: self._needs_turn_complete_message = True - async def _send_new_messages( - self, new_messages: List[LLMContextMessage], trigger_response: bool - ): - """Send new messages mid-conversation. + async def _create_incremental_response(self, new_messages: List[LLMContextMessage]): + """Create a new response mid-conversation with newly-appended messages. Args: new_messages: The new messages to send. - trigger_response: Whether to trigger a model response. """ if self._disconnecting or not self._session or not new_messages: return @@ -1514,12 +1498,12 @@ class GeminiLiveLLMService(LLMService): if not messages: return - logger.debug(f"Sending new messages: {messages}. Triggering response: {trigger_response}") + logger.debug(f"Creating response with new messages: {messages}") await self.start_ttfb_metrics() try: - await self._session.send_client_content(turns=messages, turn_complete=trigger_response) + await self._session.send_client_content(turns=messages, turn_complete=True) except Exception as e: await self._handle_send_error(e) @@ -1549,7 +1533,7 @@ class GeminiLiveLLMService(LLMService): await self._handle_send_error(e) @traced_gemini_live(operation="llm_tool_result") - async def _send_tool_result( + async def _tool_result( self, tool_call_id: str, tool_name: str, tool_result_message: Dict[str, Any] ): """Send tool result back to the API.""" @@ -1561,7 +1545,6 @@ class GeminiLiveLLMService(LLMService): response = FunctionResponse(name=tool_name, id=tool_call_id, response=tool_result_message) try: - logger.debug(f"Sending response for tool '{tool_name}'") await self._session.send_tool_response(function_responses=response) except Exception as e: await self._handle_send_error(e)