diff --git a/examples/multi-task/code-assistant/code-assistant.py b/examples/multi-task/code-assistant/code-assistant.py new file mode 100644 index 000000000..2b701db13 --- /dev/null +++ b/examples/multi-task/code-assistant/code-assistant.py @@ -0,0 +1,178 @@ +# +# Copyright (c) 2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Voice code assistant powered by Claude Agent SDK. + +Talk to your codebase hands-free. Ask questions like "what does the +auth middleware do?" or "find all TODO comments" and get spoken answers +based on actual file contents. The Claude Agent SDK worker navigates +the filesystem using Read, Bash, Glob, and Grep tools. + +Architecture:: + + Main task (transport + LLM + ``ask_code`` tool) + └── job → CodeWorker (Claude Agent SDK) + +Requirements: + +- ANTHROPIC_API_KEY +- OPENAI_API_KEY +- DEEPGRAM_API_KEY +- CARTESIA_API_KEY +- DAILY_API_KEY (for Daily transport) +""" + +import os + +from code_worker import CodeWorker +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import LLMMessagesAppendFrame, LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams + +load_dotenv(override=True) + +PROJECT_PATH = os.getenv("PROJECT_PATH", os.getcwd()) + +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def ask_code(params: FunctionCallParams, question: str): + """Ask a question about the codebase. A Claude Code worker will + explore the project by reading files, searching code, and running + commands. It remembers previous questions for follow-ups. + + Args: + question (str): The question about code, files, structure, + dependencies, or anything in the project. + """ + logger.info(f"Asking code worker: '{question}'") + async with params.pipeline_task.job("code_worker", payload={"question": question}) as job: + await params.llm.queue_frame( + LLMMessagesAppendFrame( + messages=[{"role": "developer", "content": "Give me a moment."}], + run_llm=True, + ) + ) + # The LLM keeps talking while the worker runs. + await params.result_callback(job.response) + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info("Starting code assistant") + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"]) + tts = CartesiaTTSService( + api_key=os.environ["CARTESIA_API_KEY"], + settings=CartesiaTTSService.Settings( + voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc", # Jacqueline + ), + ) + llm = OpenAILLMService( + api_key=os.environ["OPENAI_API_KEY"], + settings=OpenAILLMService.Settings( + system_instruction=( + "You are a voice interface to a code assistant powered by Claude Code. " + "Behind you is a worker that can read files, search code with grep and " + "glob patterns, and run bash commands on the project. It maintains " + "context across questions, so follow-up questions work naturally.\n\n" + "When the user asks anything about code, project structure, files, " + "dependencies, tests, or wants to explore the codebase, call the " + "ask_code tool. When the worker result comes back, summarize it naturally " + "for speaking. Keep responses concise and conversational.\n" + ), + ), + ) + + llm.register_direct_function(ask_code, cancel_on_interruption=False, timeout_secs=60) + + context = LLMContext(tools=ToolsSchema(standard_tools=[ask_code])) + aggregators = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), + stt, + aggregators.user(), + llm, + tts, + transport.output(), + aggregators.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + await runner.spawn(CodeWorker("code_worker", project_path=PROJECT_PATH)) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected") + context.add_message( + { + "role": "developer", + "content": "Greet the user and tell them you're a code assistant.", + } + ) + await task.queue_frame(LLMRunFrame()) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected") + await task.cancel() + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/multi-task/code-assistant/code_worker.py b/examples/multi-task/code-assistant/code_worker.py new file mode 100644 index 000000000..4e1d701e5 --- /dev/null +++ b/examples/multi-task/code-assistant/code_worker.py @@ -0,0 +1,120 @@ +# +# Copyright (c) 2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Code worker that explores a codebase using Claude Agent SDK.""" + +import asyncio + +from loguru import logger + +from pipecat.bus import ( + BusCancelTaskMessage, + BusEndTaskMessage, + BusJobRequestMessage, +) +from pipecat.pipeline.base_task import BaseTask +from pipecat.pipeline.job_context import JobStatus + +try: + from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use CodeWorker, you need to `pip install claude-agent-sdk`.") + raise Exception(f"Missing module: {e}") + + +class CodeWorker(BaseTask): + """Bus-only task that answers code questions using Claude Agent SDK. + + Maintains a persistent Claude SDK session so follow-up questions + share context. Questions are queued and processed sequentially. The + worker has no Pipecat pipeline — it consumes job requests from the + bus and replies with job responses. + """ + + def __init__(self, name: str, *, project_path: str): + """Initialize the CodeWorker. + + Args: + name: Unique task name. + project_path: Filesystem path the Claude SDK should explore. + """ + super().__init__(name) + + self._project_path = project_path + self._queue: asyncio.Queue = asyncio.Queue() + self._worker_task: asyncio.Task | None = None + + self._claude_options = ClaudeAgentOptions( + permission_mode="bypassPermissions", + system_prompt=( + f"You are a code assistant. The project is at: {self._project_path}\n\n" + "Answer the user's question by exploring the codebase. Use Read to " + "view files, Glob to find files by pattern, and Bash to run commands " + "like grep or find. Be thorough but concise in your answer. " + "Focus on what the user asked. Respond with a clear, spoken-friendly " + "summary (no markdown, no bullet points, no code blocks)." + ), + allowed_tools=["Read", "Bash", "Glob", "Grep"], + model="sonnet", + max_turns=10, + ) + + async def start(self) -> None: + """Launch the Claude SDK worker loop alongside the standard task start.""" + await super().start() + self._worker_task = self.create_task(self._worker_loop(), f"{self.name}::worker") + + async def stop(self) -> None: + """Cancel the worker loop before tearing down the task.""" + if self._worker_task: + await self.cancel_task(self._worker_task) + self._worker_task = None + await super().stop() + + async def on_job_request(self, message: BusJobRequestMessage) -> None: + """Enqueue an incoming job for the worker loop.""" + await super().on_job_request(message) + logger.info(f"Worker '{self.name}': queued '{message.payload['question']}'") + self._queue.put_nowait(message) + + async def _handle_task_end(self, message: BusEndTaskMessage) -> None: + """Signal the run loop to finish on a graceful end.""" + await super()._handle_task_end(message) + self._finished_event.set() + + async def _handle_task_cancel(self, message: BusCancelTaskMessage) -> None: + """Signal the run loop to finish on cancellation.""" + await super()._handle_task_cancel(message) + self._finished_event.set() + + async def _worker_loop(self): + try: + async with ClaudeSDKClient(options=self._claude_options) as client: + while True: + message = await self._queue.get() + question = message.payload["question"] + logger.info(f"Worker '{self.name}': researching '{question}'") + + try: + answer = "" + await client.query(prompt=question) + async for msg in client.receive_response(): + if type(msg).__name__ == "AssistantMessage": + for block in msg.content: + if type(block).__name__ == "TextBlock": + answer += block.text + + logger.info(f"Worker '{self.name}': completed ({len(answer)} chars)") + await self.send_job_response(message.job_id, {"answer": answer}) + + except Exception as e: + logger.error(f"Worker '{self.name}': error: {e}") + await self.send_job_response( + message.job_id, {"error": str(e)}, status=JobStatus.ERROR + ) + except Exception as e: + logger.error(f"Worker '{self.name}': failed to start Claude SDK: {e}")