Compare commits
1 Commits
v0.0.102
...
aleix/clau
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
741ec7486d |
@@ -82,6 +82,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"),
|
||||
model="claude-3-7-sonnet-latest",
|
||||
wait_for_all=True,
|
||||
params=AnthropicLLMService.InputParams(
|
||||
max_tokens=16000,
|
||||
extra={
|
||||
"thinking": {"type": "enabled", "budget_tokens": 10000},
|
||||
},
|
||||
),
|
||||
)
|
||||
llm.register_function("get_weather", get_weather)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
@@ -563,6 +563,33 @@ class LLMContextFrame(Frame):
|
||||
context: "LLMContext"
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMThinkingTextFrame(DataFrame):
|
||||
"""Reasoning frame generated by LLM services."""
|
||||
|
||||
thinking: str
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
# LLM services send text frames with all necessary spaces included
|
||||
self.includes_inter_frame_spaces = True
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, thinking: {self.thinking})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMThinkingSignatureFrame(DataFrame):
|
||||
"""Reasoning signature frame generated by LLM services."""
|
||||
|
||||
signature: str
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, signature: {self.signature})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMMessagesFrame(DataFrame):
|
||||
"""Frame containing LLM messages for chat completion.
|
||||
|
||||
@@ -47,6 +47,8 @@ from pipecat.frames.frames import (
|
||||
LLMRunFrame,
|
||||
LLMSetToolChoiceFrame,
|
||||
LLMSetToolsFrame,
|
||||
LLMThinkingSignatureFrame,
|
||||
LLMThinkingTextFrame,
|
||||
SpeechControlParamsFrame,
|
||||
StartFrame,
|
||||
TextFrame,
|
||||
@@ -591,6 +593,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
self._started = 0
|
||||
self._function_calls_in_progress: Dict[str, Optional[FunctionCallInProgressFrame]] = {}
|
||||
self._context_updated_tasks: Set[asyncio.Task] = set()
|
||||
self._thinking: List[TextPartForConcatenation] = []
|
||||
|
||||
@property
|
||||
def has_function_calls_in_progress(self) -> bool:
|
||||
@@ -601,6 +604,11 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
"""
|
||||
return bool(self._function_calls_in_progress)
|
||||
|
||||
async def reset(self):
|
||||
"""Reset the aggregation state."""
|
||||
await super().reset()
|
||||
self._thinking = []
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process frames for assistant response aggregation and function call management.
|
||||
|
||||
@@ -619,6 +627,10 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
await self._handle_llm_end(frame)
|
||||
elif isinstance(frame, TextFrame):
|
||||
await self._handle_text(frame)
|
||||
elif isinstance(frame, LLMThinkingTextFrame):
|
||||
await self._handle_thinking(frame)
|
||||
elif isinstance(frame, LLMThinkingSignatureFrame):
|
||||
await self._handle_thinking_signature(frame)
|
||||
elif isinstance(frame, LLMRunFrame):
|
||||
await self._handle_llm_run(frame)
|
||||
elif isinstance(frame, LLMMessagesAppendFrame):
|
||||
@@ -663,6 +675,14 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
timestamp_frame = LLMContextAssistantTimestampFrame(timestamp=time_now_iso8601())
|
||||
await self.push_frame(timestamp_frame)
|
||||
|
||||
def thinking_string(self) -> str:
|
||||
"""Get the current thinking as a string.
|
||||
|
||||
Returns:
|
||||
The concatenated thinking string.
|
||||
"""
|
||||
return concatenate_aggregated_text(self._thinking)
|
||||
|
||||
async def _handle_llm_run(self, frame: LLMRunFrame):
|
||||
await self.push_context_frame(FrameDirection.UPSTREAM)
|
||||
|
||||
@@ -824,6 +844,35 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
)
|
||||
)
|
||||
|
||||
async def _handle_thinking(self, frame: LLMThinkingTextFrame):
|
||||
if not self._started:
|
||||
return
|
||||
|
||||
# Make sure we really have text (spaces count, too!)
|
||||
if len(frame.thinking) == 0:
|
||||
return
|
||||
|
||||
self._thinking.append(
|
||||
TextPartForConcatenation(
|
||||
frame.thinking, includes_inter_part_spaces=frame.includes_inter_frame_spaces
|
||||
)
|
||||
)
|
||||
|
||||
async def _handle_thinking_signature(self, frame: LLMThinkingSignatureFrame):
|
||||
if not self._started:
|
||||
return
|
||||
|
||||
thinking = self.thinking_string()
|
||||
|
||||
self._context.add_message(
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "thinking", "thinking": thinking, "signature": frame.signature},
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
def _context_updated_task_finished(self, task: asyncio.Task):
|
||||
self._context_updated_tasks.discard(task)
|
||||
|
||||
|
||||
@@ -40,6 +40,8 @@ from pipecat.frames.frames import (
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMTextFrame,
|
||||
LLMThinkingSignatureFrame,
|
||||
LLMThinkingTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
UserImageRawFrame,
|
||||
)
|
||||
@@ -380,6 +382,10 @@ class AnthropicLLMService(LLMService):
|
||||
completion_tokens_estimate += self._estimate_tokens(
|
||||
event.delta.partial_json
|
||||
)
|
||||
elif hasattr(event.delta, "thinking"):
|
||||
await self.push_frame(LLMThinkingTextFrame(event.delta.thinking))
|
||||
elif hasattr(event.delta, "signature"):
|
||||
await self.push_frame(LLMThinkingSignatureFrame(event.delta.signature))
|
||||
elif event.type == "content_block_start":
|
||||
if event.content_block.type == "tool_use":
|
||||
tool_use_block = event.content_block
|
||||
|
||||
Reference in New Issue
Block a user