Files
pipecat/examples/foundational/15a-switch-languages.py
2024-08-19 15:31:34 -07:00

147 lines
5.1 KiB
Python

#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.whisper import Model, WhisperSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
current_language = "English"
async def switch_language(function_name, tool_call_id, args, llm, context, result_callback):
global current_language
current_language = args["language"]
await result_callback({"voice": f"Your answers from now on should be in {current_language}."})
async def english_filter(frame) -> bool:
return current_language == "English"
async def spanish_filter(frame) -> bool:
return current_language == "Spanish"
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Pipecat",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
)
)
stt = WhisperSTTService(model=Model.LARGE)
english_tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
spanish_tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="846d6cb0-2301-48b6-9683-48f5618ea2f6", # Spanish-speaking Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm.register_function("switch_language", switch_language)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "switch_language",
"description": "Switch to another language when the user asks you to",
"parameters": {
"type": "object",
"properties": {
"language": {
"type": "string",
"description": "The language the user wants you to speak",
},
},
"required": ["language"],
},
})]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities. Respond to what the user said in a creative and helpful way. Your output should not include non-alphanumeric characters. You can speak the following languages: 'English' and 'Spanish'.",
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
[FunctionFilter(english_filter), english_tts], # English
[FunctionFilter(spanish_filter), spanish_tts], # Spanish
),
transport.output(), # Transport bot output
context_aggregator.assistant() # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())