diff --git a/src/pipecat/processors/frameworks/strands_agents.py b/src/pipecat/processors/frameworks/strands_agents.py index 67005d74b..d129c2063 100644 --- a/src/pipecat/processors/frameworks/strands_agents.py +++ b/src/pipecat/processors/frameworks/strands_agents.py @@ -84,11 +84,11 @@ class StrandsAgentsProcessor(FrameProcessor): text: The user input text to process through the agent or graph. """ logger.debug(f"Invoking Strands agent with: {text}") + ttfb_tracking = True try: await self.push_frame(LLMFullResponseStartFrame()) await self.start_processing_metrics() await self.start_ttfb_metrics() - ttfb_tracking = True if self.graph: # Graph does not stream; await full result then emit assistant text @@ -108,9 +108,9 @@ class StrandsAgentsProcessor(FrameProcessor): await self.push_frame(LLMTextFrame(str(block["text"]))) # Update usage metrics await self._report_usage_metrics( - agent_result.metrics.accumulated_usage.get('inputTokens', 0), - agent_result.metrics.accumulated_usage.get('outputTokens', 0), - agent_result.metrics.accumulated_usage.get('totalTokens', 0) + agent_result.metrics.accumulated_usage.get("inputTokens", 0), + agent_result.metrics.accumulated_usage.get("outputTokens", 0), + agent_result.metrics.accumulated_usage.get("totalTokens", 0), ) except Exception as parse_err: logger.warning(f"Failed to extract messages from GraphResult: {parse_err}") @@ -123,12 +123,20 @@ class StrandsAgentsProcessor(FrameProcessor): if ttfb_tracking: await self.stop_ttfb_metrics() ttfb_tracking = False - + # Update usage metrics - if isinstance(event, dict) and "event" in event and "metadata" in event['event']: - if 'usage' in event['event']['metadata']: - usage = event['event']['metadata']['usage'] - await self._report_usage_metrics(usage.get('inputTokens', 0), usage.get('outputTokens', 0), usage.get('totalTokens', 0)) + if ( + isinstance(event, dict) + and "event" in event + and "metadata" in event["event"] + ): + if "usage" in event["event"]["metadata"]: + usage = event["event"]["metadata"]["usage"] + await self._report_usage_metrics( + usage.get("inputTokens", 0), + usage.get("outputTokens", 0), + usage.get("totalTokens", 0), + ) except GeneratorExit: logger.warning(f"{self} generator was closed prematurely") except Exception as e: @@ -139,7 +147,7 @@ class StrandsAgentsProcessor(FrameProcessor): ttfb_tracking = False await self.stop_processing_metrics() await self.push_frame(LLMFullResponseEndFrame()) - + def can_generate_metrics(self) -> bool: """Check if this service can generate performance metrics. @@ -149,14 +157,11 @@ class StrandsAgentsProcessor(FrameProcessor): return True async def _report_usage_metrics( - self, - prompt_tokens: int, - completion_tokens: int, - total_tokens: int + self, prompt_tokens: int, completion_tokens: int, total_tokens: int ): tokens = LLMTokenUsage( prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, - total_tokens=total_tokens + total_tokens=total_tokens, ) - await self.start_llm_usage_metrics(tokens) \ No newline at end of file + await self.start_llm_usage_metrics(tokens)