From 229ff794d669d0a332ba50a43c8331f767975da5 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 8 Dec 2025 15:56:40 -0500 Subject: [PATCH] Better handle Gemini non-function thought signatures --- .../adapters/services/gemini_adapter.py | 76 +++++++++++++------ src/pipecat/services/google/llm.py | 37 +++++++-- 2 files changed, 83 insertions(+), 30 deletions(-) diff --git a/src/pipecat/adapters/services/gemini_adapter.py b/src/pipecat/adapters/services/gemini_adapter.py index f5a237ef7..5a7387aca 100644 --- a/src/pipecat/adapters/services/gemini_adapter.py +++ b/src/pipecat/adapters/services/gemini_adapter.py @@ -209,7 +209,7 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]): system_instruction = None messages = [] tool_call_id_to_name_mapping = {} - non_fn_signed_parts = [] + non_fn_thought_signatures = [] # Process each message, converting to Google format as needed for message in universal_context_messages: @@ -230,16 +230,17 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]): ) continue - # Special handling for non-function-call-related thought - # signature messages (Gemini 3 Pro mainly, but possibly others, - # too, especially when functions are involved in the - # conversation) + # Special handling for non-function-call-related thought- + # signature-containing messages if ( isinstance(message.message, dict) and message.message.get("type") == "non_fn_thought_signature" - and (signed_part := message.message.get("signed_part")) + and (thought_signature := message.message.get("signature")) + and (bookmark := message.message.get("bookmark")) ): - non_fn_signed_parts.append(signed_part) + non_fn_thought_signatures.append( + {"signature": thought_signature, "bookmark": bookmark} + ) continue # Fall back to assuming that the message is already in Google @@ -269,7 +270,7 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]): # Apply non-function-call-related thought signatures to the appropriate # messages - self._apply_non_function_thought_signatures_to_messages(non_fn_signed_parts, messages) + self._apply_non_function_thought_signatures_to_messages(non_fn_thought_signatures, messages) # Check if we only have function-related messages (no regular text) has_regular_messages = any( @@ -476,21 +477,37 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]): break def _apply_non_function_thought_signatures_to_messages( - self, signed_parts: List[Part], messages: List[Content] + self, thought_signatures: List[dict], messages: List[Content] ) -> None: """Apply (optional, but recommended) non-function-call-related thought signatures to the last part of corresponding non-function-call assistant messages. Gemini 3 Pro (and, somewhat surprisingly, other models, too, when - functions are involved in the conversation) outputs a thought signature + functions are involved in the conversation) outputs thought signatures at the end of assistant responses. Args: - signed_parts: A list of signed received Parts containing thought signatures to apply. + thought_signatures: A list of dicts containing: + - "signature": a thought signature + - "bookmark": a bookmark to identify the message part to apply the signature to. + The bookmark may contain either: + - "text" + - "inline_data" messages: List of messages to search through. """ - if not signed_parts: + if not thought_signatures: return + # For debugging, print out thought signatures and their bookmarks + logger.trace(f"Thought signatures to apply: {len(thought_signatures)}") + for ts in thought_signatures: + bookmark = ts.get("bookmark") + if bookmark.get("text"): + text = bookmark["text"] + log_display_text = f"{text[:50]}..." if len(text) > 50 else text + logger.trace(f" - At text: {log_display_text}") + elif bookmark.get("inline_data"): + logger.trace(f" - At inline data") + # Find all assistant (model) messages that aren't function calls non_fn_assistant_messages = [] for message in messages: @@ -507,9 +524,10 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]): # Apply thought signatures to the corresponding assistant messages # Match them using content heuristics, maintaining order (messages without signatures are skipped) message_start_index = 0 # Track where to start searching for the next match - for signed_part in signed_parts: - thought_signature = getattr(signed_part, "thought_signature", None) - if not thought_signature: + for thought_signature_dict in thought_signatures: + signature = thought_signature_dict.get("signature") + bookmark = thought_signature_dict.get("bookmark") + if not signature: continue # Search through remaining non-function assistant messages for a match @@ -521,31 +539,41 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]): last_part = message.parts[-1] matched = False - # Check if signed part has text and last message part text has the same text or + # If it's a text bookmark, check that the last message part text has the same text or # - is a prefix of that text (in case spoken text was truncated due to interruption) - # - is prefixed by that text (in case signed part was not the end of the assistant response... - # which is NOT supposed to happen, according to Google's docs, but seems to, for long responses...) - if hasattr(signed_part, "text") and signed_part.text: + # - is prefixed by that text (in case bookmark represents just first chunk of multi-chunk text) + if bookmark_text := bookmark.get("text"): if hasattr(last_part, "text") and last_part.text: # Normalize whitespace for comparison - signed_text = " ".join(signed_part.text.split()) + signed_text = " ".join(bookmark_text.split()) last_text = " ".join(last_part.text.split()) if ( last_text == signed_text or signed_text.startswith(last_text) or last_text.startswith(signed_text) ): - last_part.thought_signature = thought_signature + log_display_text = ( + f"{last_part.text[:50]}..." + if len(last_part.text) > 50 + else last_part.text + ) + logger.trace( + f"Applying thought signature to part with matching text: {log_display_text}" + ) + last_part.thought_signature = signature matched = True # Check if signed part has inline_data and last message part has matching inline_data - elif hasattr(signed_part, "inline_data") and signed_part.inline_data: + elif inline_data := bookmark.get("inline_data"): if ( hasattr(last_part, "inline_data") and last_part.inline_data - and last_part.inline_data.data == signed_part.inline_data.data + and last_part.inline_data.data == inline_data.data ): - last_part.thought_signature = thought_signature + logger.trace( + f"Applying thought signature to part with matching inline_data" + ) + last_part.thought_signature = signature matched = True # If we found a match, update start index and stop searching for this signed part diff --git a/src/pipecat/services/google/llm.py b/src/pipecat/services/google/llm.py index 73f75447f..6ccea5eff 100644 --- a/src/pipecat/services/google/llm.py +++ b/src/pipecat/services/google/llm.py @@ -942,6 +942,7 @@ class GoogleLLMService(LLMService): ) function_calls = [] + previous_part = None async for chunk in response: # Stop TTFB metrics after the first chunk await self.stop_ttfb_metrics() @@ -1005,26 +1006,50 @@ class GoogleLLMService(LLMService): ) await self.push_frame(frame) - # With Gemini 3 Pro (and, somewhat surprisingly, - # other models models, too, especially when + # With Gemini 3 Pro (and, contrary to Google's + # docs, other models models, too, especially when # functions are involved in the conversation), - # thought signatures can be included in any kind of - # part, not just function calls. It will come in - # the last part of a response. + # thought signatures can be associated with any + # kind of Part, not just function calls. + # + # They should always be included in the last + # response Part. (*) + # + # (*) Since we're using the streaming API, though, + # where text Parts may be split across multiple + # chunks (each represented by a Part, confusingly), + # signatures may actually appear with the first + # chunk (Gemini 2.5) or in a trailing empty-text + # chunk (Gemini 3 Pro). if part.thought_signature and not part.function_call: + # Save a "bookmark" for the signature, so we + # can later stick it in the right place in + # context when sending it back to the LLM to + # continue the conversation. + bookmark = {} + if part.inline_data and part.inline_data.data: + bookmark["inline_data"] = {"inline_data": part.inline_data} + elif part.text is not None: + # Account for Gemini 3 Pro trailing + # empty-text chunk by using search_result, + # which accumulates all text so far. + bookmark["text"] = search_result await self.push_frame( LLMMessagesAppendFrame( [ self.get_llm_adapter().create_llm_specific_message( { "type": "non_fn_thought_signature", - "signed_part": part, + "signature": part.thought_signature, + "bookmark": bookmark, } ) ] ) ) + previous_part = part + if ( candidate.grounding_metadata and candidate.grounding_metadata.grounding_chunks