Compare commits

...

4 Commits

Author SHA1 Message Date
James Hush
a751130a76 Can work with double sound but not parallel sound 2025-01-24 17:46:06 +08:00
James Hush
b29ac3c7a8 Remove logs 2025-01-23 16:31:41 +08:00
James Hush
5222488fb5 Have a default transfer 2025-01-23 16:24:40 +08:00
James Hush
c2fef9584b Add call transfer to bot_daily 2025-01-23 15:54:53 +08:00
3 changed files with 262 additions and 5 deletions

View File

@@ -37,7 +37,16 @@ Run `bot_runner.py` to handle incoming HTTP requests:
Then target the following URL:
`POST /daily_start_bot`
```bash
curl -X POST 'http://localhost:7860/daily_start_bot' \
-H 'Content-Type: application/json' \
-d '{
"callId": "callId-from-call",
"callDomain": "callDomain-from-call"
}'
```
Use [this guide](https://docs.pipecat.ai/guides/telephony/daily-webrtc) to connect a phone number purchased from Daily to the bot.
For more configuration options, please consult Daily's API documentation.
@@ -82,4 +91,4 @@ If you're using Twilio as a number vendor:
## Need to do something more advanced?
This demo covers the basics of bot telephony. If you want to know more about working with PSTN / SIP, please ping us on [Discord](https://discord.gg/pipecat).
This demo covers the basics of bot telephony. If you want to know more about working with PSTN / SIP, please ping us on [Discord](https://discord.gg/pipecat).

View File

@@ -1,3 +1,8 @@
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
@@ -5,13 +10,16 @@ import sys
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.ai_services import LLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
@@ -55,16 +63,62 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
content = f"""
You are a delivery service customer support specialist supporting customers with their orders.
Begin with: "Hello, this is Hailey from customer support. What can I help you with today?"
"""
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.",
"content": content,
},
]
context = OpenAILLMContext(messages)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "transfer_call",
"description": "Transfer the call to a person. This function is used to connect the call to a real person. Examples of real people are: managers, supervisors, or other customer support specialists. Any person is okay as long as they are not a bot.",
"parameters": {
"type": "object",
"properties": {
"call_id": {
"type": "string",
"description": "This is always {callId}.",
},
"summary": {
"type": "string",
"description": """
Provide a concise summary in 3-5 sentences. Highlight any important details or unusual aspects of the conversation.
""",
},
},
},
},
)
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
async def default_transfer_call(
function_name, tool_call_id, args, llm: LLMService, context, result_callback
):
logger.debug(f"default_transfer_call: {function_name} {tool_call_id} {args}")
await result_callback(
{
"transfer_call": False,
"reason": "To transfer call calls, please dial in to the room using a phone or a SIP client.",
}
)
llm.register_function(
function_name="transfer_call",
callback=default_transfer_call,
)
pipeline = Pipeline(
[
transport.input(),
@@ -87,6 +141,44 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_dialin_ready")
async def on_dialin_ready(_, sip_endpoint):
logger.info(f"on_dialin_ready: {sip_endpoint}")
@transport.event_handler("on_dialin_connected")
async def on_dialin_connected(transport, event):
logger.info(f"on_dialin_connected: {event}")
sip_session_id = event["sessionId"]
async def transfer_call(
function_name, tool_call_id, args, llm: LLMService, context, result_callback
):
logger.debug(f"transfer_call: {function_name} {tool_call_id} {args}")
# sip_url = "sip:your_user_name@sip.linphone.org"
sip_url = (
f"sip:your_username@dailyco.sip.twilio.com?x-daily_id={room_url.split('/')[-1]}"
)
try:
await transport.sip_refer(
settings={
"sessionId": sip_session_id,
"toEndPoint": sip_url,
}
)
except Exception as e:
logger.error(f"An error occurred during SIP refer: {e}")
await result_callback({"transfer_call": False})
await result_callback({"transfer_call": True})
llm.register_function(
function_name="transfer_call",
callback=transfer_call,
)
runner = PipelineRunner()
await runner.run(task)

View File

@@ -0,0 +1,156 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import math
import os
import random
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import BotSpeakingFrame, EndFrame, Frame, TextFrame, TTSSpeakFrame
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyOutputTransport, DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class DebugObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- StartInterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
arrow = "" if direction == FrameDirection.DOWNSTREAM else ""
# Convert timestamp to seconds for readability
time_sec = timestamp / 1_000_000_000
if isinstance(frame, BotSpeakingFrame):
return
if isinstance(dst, DailyOutputTransport):
logger.debug(f"{frame} {src} {arrow} {dst} at {time_sec:.2f}s")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport1 = DailyTransport(
"https://hush.daily.co/sip",
None,
"Don't Do Anything",
DailyParams(audio_out_enabled=True),
)
transport2 = DailyTransport(
"https://hush.daily.co/demo",
None,
"Summarize Call",
DailyParams(audio_out_enabled=True),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
runner = PipelineRunner()
async def true_filter(frame) -> bool:
return True
async def false_filter(frame) -> bool:
return False
pipeline = Pipeline(
[
transport1.input(),
transport2.input(),
ParallelPipeline(
[transport1.output()],
[tts, transport2.output()],
),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
observers=[DebugObserver()],
),
)
# Register an event handler so we can play the audio when the
# participant joins.
@transport1.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
logger.info(f"-- {participant_name} joined transport1")
def get_call_summary():
"""In a real app this would be a call to a database or API."""
# Randomly choose between two options
message = random.choice(
[
"Alice needs help finding her customer record.",
"Bob is calling about his lost password.",
]
)
return message
@transport2.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
logger.info(f"-- {participant_name} joined transport2")
call_summary = get_call_summary()
await task.queue_frames(
[
TTSSpeakFrame(
f"Hi {participant_name}! Here's the summary of the call: {call_summary}"
),
EndFrame(),
]
)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())