Compare commits

...

13 Commits

Author SHA1 Message Date
Chad Bailey
ff1b2961d8 fixup 2024-05-31 14:23:56 +00:00
Chad Bailey
ba42cffcc2 test cleanup 2024-05-31 14:23:56 +00:00
Chad Bailey
9778d86607 everything but audioframe and endpipeframe 2024-05-31 14:23:52 +00:00
Kwindla Hultman Kramer
19caf750fd Merge pull request #194 from pipecat-ai/khk-cartesia-changelog
Added cartesia line to CHANGELOG.md
2024-05-30 14:18:41 -07:00
Kwindla Hultman Kramer
296611714f added cartesia line to CHANGELOG.md 2024-05-30 10:41:00 -07:00
chadbailey59
4c3d19cc8b Function calling (#175)
* added function calling code back

* removed old llm_context file

* added integration testing for openai

* added function calling example

* added function callbacks

* added function start callback

* fixup

* fixup

* added different return type support for function calling

* intake example working

* added frame loggers

* cleanup

* fixup

* Update openai.py

* removed function call frame types

* fixup

* re-added example

* renumbered wake phrase

* fixup for autopep8

* remove unused imports
2024-05-30 12:25:39 -05:00
Aleix Conchillo Flaqué
a3ba07c7a3 Merge pull request #193 from pipecat-ai/aleix/fix-camera-out-enabled-cpu
transport(output): fix high CPU usage with camera_out_enabled and no …
2024-05-31 01:25:06 +08:00
Kwindla Hultman Kramer
a1579808b2 Merge pull request #189 from pipecat-ai/khk-cartesia-etc
Cartesia TTS
2024-05-30 10:24:45 -07:00
Aleix Conchillo Flaqué
aecb9f5816 transport(output): fix high CPU usage with camera_out_enabled and no images 2024-05-30 10:18:43 -07:00
Aleix Conchillo Flaqué
a5d42a526c Merge pull request #191 from pipecat-ai/aleix/fix-silero-vad
vad: fix silero vad frame processor
2024-05-30 23:25:52 +08:00
Aleix Conchillo Flaqué
a9472f8116 vad: fix silero vad frame processor 2024-05-30 07:50:58 -07:00
Kwindla Hultman Kramer
d5f106ae19 pr fixes 2024-05-29 23:41:35 -07:00
Kwindla Hultman Kramer
920745345a cartesia tts support 2024-05-29 23:35:35 -07:00
54 changed files with 1464 additions and 390 deletions

View File

@@ -5,6 +5,20 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Added Cartesia TTS support (https://cartesia.ai/)
### Fixed
- Fixed SileroVAD frame processor.
- Fixed an issue where `camera_out_enabled` would cause the highg CPU usage if
no image was provided.
## [0.0.24] - 2024-05-29
### Added

View File

@@ -56,10 +56,11 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
fl_in = FrameLogger("Inner")
fl_out = FrameLogger("Outer")
fl = FrameLogger("!!! after LLM", "red")
fltts = FrameLogger("@@@ out of tts", "green")
flend = FrameLogger("### out of the end", "magenta")
messages = [
{
@@ -71,14 +72,15 @@ async def main(room_url: str, token):
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
fl_in,
transport.input(),
tma_in,
llm,
fl_out,
fl,
tts,
fltts,
transport.output(),
tma_out
tma_out,
flend
])
task = PipelineTask(pipeline)

View File

@@ -15,14 +15,15 @@ from pipecat.frames.frames import ImageRawFrame, Frame, SystemFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams
from runner import configure
@@ -66,7 +67,9 @@ async def main(room_url: str, token):
audio_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024,
transcription_enabled=True
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
@@ -87,8 +90,8 @@ async def main(room_url: str, token):
},
]
tma_in = LLMUserContextAggregator(messages)
tma_out = LLMAssistantContextAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
image_sync_aggregator = ImageSyncAggregator(
os.path.join(os.path.dirname(__file__), "assets", "speaking.png"),

View File

@@ -0,0 +1,93 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="Barbershop Man"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -3,14 +3,14 @@ import aiohttp
import asyncio
import logging
import os
from pipecat.pipeline.aggregators import SentenceAggregator
from pipecat.processors.aggregators import SentenceAggregator
from pipecat.pipeline.pipeline import Pipeline
from pipecat.transports.daily_transport import DailyTransport
from pipecat.services.azure_ai_services import AzureLLMService, AzureTTSService
from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService
from pipecat.services.fal_ai_services import FalImageGenService
from pipecat.pipeline.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
from runner import configure

View File

@@ -1,156 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import random
import sys
from PIL import Image
from pipecat.frames.frames import Frame, ImageRawFrame, SpriteFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import (
LLMUserContextAggregator,
LLMAssistantContextAggregator,
)
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
sprites = {}
image_files = [
"sc-default.png",
"sc-talk.png",
"sc-listen-1.png",
"sc-think-1.png",
"sc-think-2.png",
"sc-think-3.png",
"sc-think-4.png",
]
script_dir = os.path.dirname(__file__)
for file in image_files:
# Build the full path to the image file
full_path = os.path.join(script_dir, "assets", file)
# Get the filename without the extension to use as the dictionary key
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites[file] = ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)
# When the bot isn't talking, show a static image of the cat listening
quiet_frame = sprites["sc-listen-1.png"]
# When the bot is talking, build an animation from two sprites
talking_list = [sprites["sc-default.png"], sprites["sc-talk.png"]]
talking = [random.choice(talking_list) for x in range(30)]
talking_frame = SpriteFrame(talking)
# TODO: Support "thinking" as soon as we get a valid transcript, while LLM
# is processing
thinking_list = [
sprites["sc-think-1.png"],
sprites["sc-think-2.png"],
sprites["sc-think-3.png"],
sprites["sc-think-4.png"],
]
thinking_frame = SpriteFrame(thinking_list)
class ImageSyncAggregator(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await self.push_frame(talking_frame)
await self.push_frame(frame)
await self.push_frame(quiet_frame)
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Santa Cat",
DailyParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=720,
camera_out_height=1280,
camera_out_framerate=10,
transcription_enabled=True
)
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="jBpfuIE2acCO8z3wKNLl",
)
isa = ImageSyncAggregator()
messages = [
{
"role": "system",
"content": "You are Santa Cat, a cat that lives in Santa's workshop at the North Pole. You should be clever, and a bit sarcastic. You should also tell jokes every once in a while. Your responses should only be a few sentences long.",
},
]
tma_in = LLMUserContextAggregator(messages)
tma_out = LLMAssistantContextAggregator(messages)
wcf = WakeCheckFilter(["Santa Cat", "Santa"])
pipeline = Pipeline([
transport.input(), # Transport user input
isa, # Cat talking/quiet images
wcf, # Filter out speech not directed at Santa Cat
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Santa Cat spoken responses
])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# Send some greeting at the beginning.
await tts.say("Hi! If you want to talk to me, just say 'hey Santa Cat'.")
transport.capture_participant_transcription(participant["id"])
async def starting_image():
await transport.send_image(quiet_frame)
runner = PipelineRunner()
task = PipelineTask(pipeline)
await asyncio.gather(runner.run(task), starting_image())
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -19,15 +19,16 @@ from pipecat.frames.frames import (
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import (
LLMUserContextAggregator,
LLMAssistantContextAggregator,
from pipecat.processors.aggregators.llm_response import (
LLMUserResponseAggregator,
LLMAssistantResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
@@ -84,7 +85,12 @@ async def main(room_url: str, token):
room_url,
token,
"Respond bot",
DailyParams(audio_out_enabled=True, transcription_enabled=True)
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
llm = OpenAILLMService(
@@ -104,8 +110,8 @@ async def main(room_url: str, token):
},
]
tma_in = LLMUserContextAggregator(messages)
tma_out = LLMAssistantContextAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
out_sound = OutboundSoundEffectWrapper()
in_sound = InboundSoundEffectWrapper()
fl = FrameLogger("LLM Out")

View File

@@ -0,0 +1,145 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import json
import sys
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.services.openai import OpenAILLMContext
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import (
ChatCompletionToolParam,
)
from pipecat.frames.frames import (
TextFrame
)
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(llm):
await llm.push_frame(TextFrame("Let me think."))
async def fetch_weather_from_api(llm, args):
return ({"conditions": "nice", "temperature": "75"})
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
start_callback=start_fetch_weather)
fl_in = FrameLogger("Inner")
fl_out = FrameLogger("Outer")
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": [
"celsius",
"fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": [
"location",
"format"],
},
})]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages, tools)
tma_in = LLMUserContextAggregator(context)
tma_out = LLMAssistantContextAggregator(context)
pipeline = Pipeline([
fl_in,
transport.input(),
tma_in,
llm,
fl_out,
tts,
transport.output(),
tma_out
])
task = PipelineTask(pipeline)
@ transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await tts.say("Hi! Ask me about the weather in San Francisco.")
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -2,8 +2,8 @@ import asyncio
import aiohttp
import logging
import os
from pipecat.pipeline.frame_processor import FrameProcessor
from pipecat.pipeline.frames import TextFrame, TranscriptionFrame
from pipeline.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TextFrame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService
from pipecat.transports.websocket_transport import WebsocketTransport

View File

@@ -0,0 +1,16 @@
FROM python:3.10-bullseye
RUN mkdir /app
RUN mkdir /app/assets
RUN mkdir /app/utils
COPY *.py /app/
COPY requirements.txt /app/
copy assets/* /app/assets/
copy utils/* /app/utils/
WORKDIR /app
RUN pip3 install -r requirements.txt
EXPOSE 7860
CMD ["python3", "server.py"]

View File

@@ -0,0 +1,37 @@
# Simple Chatbot
<img src="image.png" width="420px">
This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion.
See a video of it in action: https://x.com/kwindla/status/1778628911817183509
And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416
The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded.
## Get started
```python
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
cp env.example .env # and add your credentials
```
## Run the server
```bash
python server.py
```
Then, visit `http://localhost:7860/start` in your browser to start a chatbot session.
## Build and test the Docker image
```
docker build -t chatbot .
docker run --env-file .env -p 7860:7860 chatbot
```

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,359 @@
import asyncio
import aiohttp
import copy
import json
import os
import re
import sys
import wave
from typing import List
from openai._types import NotGiven, NOT_GIVEN
from openai.types.chat import (
ChatCompletionToolParam,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator, LLMAssistantContextAggregator
from pipecat.processors.logger import FrameLogger
from pipecat.frames.frames import (
Frame,
LLMMessagesFrame,
AudioRawFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.ai_services import AIService
from pipecat.transports.services.daily import DailyParams, DailyTranscriptionSettings, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
sounds = {}
sound_files = [
"clack-short.wav",
"clack.wav",
"clack-short-quiet.wav",
"ding.wav",
"ding2.wav",
]
script_dir = os.path.dirname(__file__)
for file in sound_files:
# Build the full path to the sound file
full_path = os.path.join(script_dir, "assets", file)
# Get the filename without the extension to use as the dictionary key
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the sound and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = AudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(), audio_file.getnchannels())
class IntakeProcessor:
def __init__(
self,
context: OpenAILLMContext,
llm: AIService,
tools: List[ChatCompletionToolParam] | NotGiven = NOT_GIVEN,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self._context: OpenAILLMContext = context
self._llm = llm
print(f"Initializing context from IntakeProcessor")
self._context.add_message({"role": "system", "content": "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous. Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function."})
self._context.set_tools([
{
"type": "function",
"function": {
"name": "verify_birthday",
"description": "Use this function to verify the user has provided their correct birthday.",
"parameters": {
"type": "object",
"properties": {
"birthday": {
"type": "string",
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.",
}},
},
},
}])
# Create an allowlist of functions that the LLM can call
self._functions = [
"verify_birthday",
"list_prescriptions",
"list_allergies",
"list_conditions",
"list_visit_reasons",
]
async def verify_birthday(self, llm, args):
if args["birthday"] == "1983-01-01":
self._context.set_tools(
[
{
"type": "function",
"function": {
"name": "list_prescriptions",
"description": "Once the user has provided a list of their prescription medications, call this function.",
"parameters": {
"type": "object",
"properties": {
"prescriptions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"medication": {
"type": "string",
"description": "The medication's name",
},
"dosage": {
"type": "string",
"description": "The prescription's dosage",
},
},
},
}},
},
},
}])
# It's a bit weird to push this to the LLM, but it gets it into the pipeline
await llm.push_frame(sounds["ding2.wav"], FrameDirection.DOWNSTREAM)
# We don't need the function call in the context, so just return a new
# system message and let the framework re-prompt
return [{"role": "system", "content": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages."}]
else:
# The user provided an incorrect birthday; ask them to try again
return [{"role": "system", "content": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function."}]
async def start_prescriptions(self, llm):
print(f"!!! doing start prescriptions")
# Move on to allergies
self._context.set_tools(
[
{
"type": "function",
"function": {
"name": "list_allergies",
"description": "Once the user has provided a list of their allergies, call this function.",
"parameters": {
"type": "object",
"properties": {
"allergies": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "What the user is allergic to",
}},
},
}},
},
},
}])
self._context.add_message(
{
"role": "system",
"content": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function."})
print(f"!!! about to await llm process frame in start prescrpitions")
await llm.process_frame(OpenAILLMContextFrame(self._context), FrameDirection.DOWNSTREAM)
print(f"!!! past await process frame in start prescriptions")
async def start_allergies(self, llm):
print("!!! doing start allergies")
# Move on to conditions
self._context.set_tools(
[
{
"type": "function",
"function": {
"name": "list_conditions",
"description": "Once the user has provided a list of their medical conditions, call this function.",
"parameters": {
"type": "object",
"properties": {
"conditions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's medical condition",
}},
},
}},
},
},
},
])
self._context.add_message(
{
"role": "system",
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function."})
await llm.process_frame(OpenAILLMContextFrame(self._context), FrameDirection.DOWNSTREAM)
async def start_conditions(self, llm):
print("!!! doing start conditions")
# Move on to visit reasons
self._context.set_tools(
[
{
"type": "function",
"function": {
"name": "list_visit_reasons",
"description": "Once the user has provided a list of the reasons they are visiting a doctor today, call this function.",
"parameters": {
"type": "object",
"properties": {
"visit_reasons": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's reason for visiting the doctor",
}},
},
}},
},
},
}])
self._context.add_message(
{"role": "system", "content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function."})
await llm.process_frame(OpenAILLMContextFrame(self._context), FrameDirection.DOWNSTREAM)
pass
async def start_visit_reasons(self, llm):
print("!!! doing start visit reasons")
# move to finish call
self._context.set_tools([])
self._context.add_message({"role": "system",
"content": "Now, thank the user and end the conversation."})
await llm.process_frame(OpenAILLMContextFrame(self._context), FrameDirection.DOWNSTREAM)
pass
async def save_data(self, llm, args):
logger.info(f"!!! Saving data: {args}")
# Since this is supposed to be "async", returning None from the callback
# will prevent adding anything to context or re-prompting
return None
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=576,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
#
# Spanish
#
# transcription_settings=DailyTranscriptionSettings(
# language="es",
# tier="nova",
# model="2-general"
# )
)
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
#
# English
#
voice_id="pNInz6obpgDQGcFmaJgB",
#
# Spanish
#
# model="eleven_multilingual_v2",
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = []
context = OpenAILLMContext(
messages=messages,
)
user_context = LLMUserContextAggregator(context)
assistant_context = LLMAssistantContextAggregator(context)
# checklist = ChecklistProcessor(context, llm)
intake = IntakeProcessor(context, llm)
llm.register_function("verify_birthday", intake.verify_birthday)
llm.register_function(
"list_prescriptions",
intake.save_data,
start_callback=intake.start_prescriptions)
llm.register_function(
"list_allergies",
intake.save_data,
start_callback=intake.start_allergies)
llm.register_function(
"list_conditions",
intake.save_data,
start_callback=intake.start_conditions)
llm.register_function(
"list_visit_reasons",
intake.save_data,
start_callback=intake.start_visit_reasons)
fl = FrameLogger("LLM Output")
pipeline = Pipeline([
transport.input(),
user_context,
llm,
fl,
tts,
transport.output(),
assistant_context,
])
task = PipelineTask(pipeline, allow_interruptions=False)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
print(f"Context is: {context}")
await task.queue_frames([OpenAILLMContextFrame(context)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -0,0 +1,4 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=7df...
OPENAI_API_KEY=sk-PL...
ELEVENLABS_API_KEY=aeb...

Binary file not shown.

After

Width:  |  Height:  |  Size: 733 KiB

View File

@@ -0,0 +1,5 @@
python-dotenv
requests
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero]

View File

@@ -0,0 +1,58 @@
import argparse
import os
import time
import urllib
import requests
def configure():
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
room_name: str = urllib.parse.urlparse(url).path[1:]
expiration: float = time.time() + 60 * 60
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={
"Authorization": f"Bearer {key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True,
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return (url, token)

View File

@@ -0,0 +1,124 @@
import os
import argparse
import subprocess
import atexit
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from utils.daily_helpers import create_room as _create_room, get_token
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
def cleanup():
# Clean up function, just to be extra safe
for proc in bot_procs.values():
proc.terminate()
proc.wait()
atexit.register(cleanup)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/start")
async def start_agent(request: Request):
print(f"!!! Creating room")
room_url, room_name = _create_room()
print(f"!!! Room URL: {room_url}")
# Ensure the room property is present
if not room_url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room_url and proc[0].poll() is None)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room_url}")
# Get the token for the room
token = get_token(room_url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room_url} -t {token}"
],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
)
bot_procs[proc.pid] = (proc, room_url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room_url)
@app.get("/status/{pid}")
def get_status(pid: int):
# Look up the subprocess
proc = bot_procs.get(pid)
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
status = "running"
else:
status = "finished"
return JSONResponse({"bot_id": pid, "status": status})
if __name__ == "__main__":
import uvicorn
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
config = parser.parse_args()
print(f"to join a test room, visit http://localhost:{config.port}/start")
uvicorn.run(
"server:app",
host=config.host,
port=config.port,
reload=config.reload,
)

View File

@@ -0,0 +1,109 @@
import urllib.parse
import os
import time
import urllib
import requests
from dotenv import load_dotenv
load_dotenv()
daily_api_path = os.getenv("DAILY_API_URL") or "api.daily.co/v1"
daily_api_key = os.getenv("DAILY_API_KEY")
def create_room() -> tuple[str, str]:
"""
Helper function to create a Daily room.
# See: https://docs.daily.co/reference/rest-api/rooms
Returns:
tuple: A tuple containing the room URL and room name.
Raises:
Exception: If the request to create the room fails or if the response does not contain the room URL or room name.
"""
room_props = {
"exp": time.time() + 60 * 60, # 1 hour
"enable_chat": True,
"enable_emoji_reactions": True,
"eject_at_room_exp": True,
"enable_prejoin_ui": False, # Important for the bot to be able to join headlessly
}
res = requests.post(
f"https://{daily_api_path}/rooms",
headers={"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": room_props
},
)
if res.status_code != 200:
raise Exception(f"Unable to create room: {res.text}")
data = res.json()
room_url: str = data.get("url")
room_name: str = data.get("name")
if room_url is None or room_name is None:
raise Exception("Missing room URL or room name in response")
return room_url, room_name
def get_name_from_url(room_url: str) -> str:
"""
Extracts the name from a given room URL.
Args:
room_url (str): The URL of the room.
Returns:
str: The extracted name from the room URL.
"""
return urllib.parse.urlparse(room_url).path[1:]
def get_token(room_url: str) -> str:
"""
Retrieves a meeting token for the specified Daily room URL.
# See: https://docs.daily.co/reference/rest-api/meeting-tokens
Args:
room_url (str): The URL of the Daily room.
Returns:
str: The meeting token.
Raises:
Exception: If no room URL is specified or if no Daily API key is specified.
Exception: If there is an error creating the meeting token.
"""
if not room_url:
raise Exception(
"No Daily room specified. You must specify a Daily room in order a token to be generated.")
if not daily_api_key:
raise Exception(
"No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
expiration: float = time.time() + 60 * 60
room_name = get_name_from_url(room_url)
res: requests.Response = requests.post(
f"https://{daily_api_path}/meeting-tokens",
headers={
"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True, # Owner tokens required for transcription
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return token

View File

@@ -5,7 +5,9 @@
# pip-compile --all-extras pyproject.toml
#
aiohttp==3.9.5
# via pipecat-ai (pyproject.toml)
# via
# cartesia
# pipecat-ai (pyproject.toml)
aiosignal==1.3.1
# via aiohttp
annotated-types==0.7.0
@@ -21,7 +23,7 @@ async-timeout==4.0.3
# via aiohttp
attrs==23.2.0
# via aiohttp
av==12.0.0
av==12.1.0
# via faster-whisper
azure-cognitiveservices-speech==1.37.0
# via pipecat-ai (pyproject.toml)
@@ -29,11 +31,15 @@ blinker==1.8.2
# via flask
cachetools==5.3.3
# via google-auth
cartesia==0.1.0
# via pipecat-ai (pyproject.toml)
certifi==2024.2.2
# via
# httpcore
# httpx
# requests
cffi==1.16.0
# via sounddevice
charset-normalizer==3.3.2
# via requests
click==8.1.7
@@ -42,7 +48,7 @@ coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.2.1
# via faster-whisper
daily-python==0.9.0
daily-python==0.9.1
# via pipecat-ai (pyproject.toml)
distro==1.9.0
# via
@@ -51,7 +57,9 @@ distro==1.9.0
einops==0.8.0
# via pipecat-ai (pyproject.toml)
exceptiongroup==1.2.1
# via anyio
# via
# anyio
# pytest
fal-client==0.4.0
# via pipecat-ai (pyproject.toml)
faster-whisper==1.0.2
@@ -122,6 +130,7 @@ httplib2==0.22.0
httpx==0.27.0
# via
# anthropic
# cartesia
# fal-client
# openai
httpx-sse==0.4.0
@@ -140,6 +149,8 @@ idna==3.7
# httpx
# requests
# yarl
iniconfig==2.0.0
# via pytest
itsdangerous==2.2.0
# via flask
jinja2==3.1.4
@@ -177,11 +188,14 @@ packaging==24.0
# via
# huggingface-hub
# onnxruntime
# pytest
# transformers
pillow==10.3.0
# via
# pipecat-ai (pyproject.toml)
# torchvision
pluggy==1.5.0
# via pytest
proto-plus==1.23.0
# via
# google-ai-generativelanguage
@@ -204,6 +218,8 @@ pyasn1-modules==0.4.0
# via google-auth
pyaudio==0.2.14
# via pipecat-ai (pyproject.toml)
pycparser==2.22
# via cffi
pydantic==2.7.2
# via
# anthropic
@@ -217,6 +233,10 @@ pyloudnorm==0.1.1
# via pipecat-ai (pyproject.toml)
pyparsing==3.1.2
# via httplib2
pytest==8.2.1
# via pytest-asyncio
pytest-asyncio==0.23.7
# via cartesia
python-dotenv==1.0.1
# via pipecat-ai (pyproject.toml)
pyyaml==6.0.1
@@ -227,8 +247,9 @@ pyyaml==6.0.1
# transformers
regex==2024.5.15
# via transformers
requests==2.32.2
requests==2.32.3
# via
# cartesia
# google-api-core
# huggingface-hub
# pyht
@@ -247,7 +268,9 @@ sniffio==1.3.1
# anyio
# httpx
# openai
sympy==1.12
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sympy==1.12.1
# via
# onnxruntime
# torch
@@ -258,6 +281,8 @@ tokenizers==0.19.1
# anthropic
# faster-whisper
# transformers
tomli==2.0.1
# via pytest
torch==2.3.0
# via
# pipecat-ai (pyproject.toml)
@@ -292,7 +317,9 @@ uritemplate==4.1.1
urllib3==2.2.1
# via requests
websockets==12.0
# via pipecat-ai (pyproject.toml)
# via
# cartesia
# pipecat-ai (pyproject.toml)
werkzeug==3.0.3
# via flask
yarl==1.9.4

View File

@@ -35,6 +35,7 @@ Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.25.7" ]
azure = [ "azure-cognitiveservices-speech~=1.37.0" ]
cartesia = [ "numpy~=1.26.0", "sounddevice", "cartesia" ]
daily = [ "daily-python~=0.9.0" ]
examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.0" ]

View File

@@ -119,7 +119,7 @@ class TextFrame(DataFrame):
text: str
def __str__(self):
return f"{self.name}(text: [{self.text}])"
return f"{self.name}(text: {self.text})"
@dataclass
@@ -132,7 +132,7 @@ class TranscriptionFrame(TextFrame):
timestamp: str
def __str__(self):
return f"{self.name}(user_id: {self.user_id}, text: [{self.text}], timestamp: {self.timestamp})"
return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})"
@dataclass
@@ -143,7 +143,7 @@ class InterimTranscriptionFrame(TextFrame):
timestamp: str
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], timestamp: {self.timestamp})"
return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})"
@dataclass
@@ -307,7 +307,7 @@ class UserStoppedSpeakingFrame(ControlFrame):
@dataclass
class TTSStartedFrame(ControlFrame):
"""Used to indicate the beginning of a TTS response. Following
AudioRawFrames are part of the TTS response until an TTSEndFrame. These
AudioRawFrames are part of the TTS response until an TTSStoppedFrame. These
frames can be used for aggregating audio frames in a transport to optimize
the size of frames sent to the session, without needing to control this in
the TTS service.

View File

@@ -1,5 +1,5 @@
from typing import List
from pipecat.pipeline.frames import EndFrame, EndPipeFrame
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -16,8 +16,7 @@ class SequentialMergePipeline(Pipeline):
while True:
frame = await pipeline.sink.get()
if isinstance(
frame, EndFrame) or isinstance(
frame, EndPipeFrame):
frame, EndFrame):
break
await self.sink.put(frame)

View File

@@ -17,7 +17,7 @@ class GatedAggregator(FrameProcessor):
Yields gate-opening frame before any accumulated frames, then ensuing frames
until and not including the gate-closed frame.
>>> from pipecat.pipeline.frames import ImageFrame
>>> from pipecat.frames.frames import ImageFrame
>>> async def print_frames(aggregator, frame):
... async for frame in aggregator.process_frame(frame):

View File

@@ -1,82 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.frames.frames import Frame, InterimTranscriptionFrame, LLMMessagesFrame, TextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class LLMContextAggregator(FrameProcessor):
def __init__(
self,
messages: list[dict],
role: str,
complete_sentences=True,
pass_through=True,
):
super().__init__()
self._messages = messages
self._role = role
self._sentence = ""
self._complete_sentences = complete_sentences
self._pass_through = pass_through
async def process_frame(self, frame: Frame, direction: FrameDirection):
# We don't do anything with non-text frames, pass it along to next in
# the pipeline.
if not isinstance(frame, TextFrame):
await self.push_frame(frame, direction)
return
# If we get interim results, we ignore them.
if isinstance(frame, InterimTranscriptionFrame):
return
# The common case for "pass through" is receiving frames from the LLM that we'll
# use to update the "assistant" LLM messages, but also passing the text frames
# along to a TTS service to be spoken to the user.
if self._pass_through:
await self.push_frame(frame, direction)
# TODO: split up transcription by participant
if self._complete_sentences:
# type: ignore -- the linter thinks this isn't a TextFrame, even
# though we check it above
self._sentence += frame.text
if self._sentence.endswith((".", "?", "!")):
self._messages.append(
{"role": self._role, "content": self._sentence})
self._sentence = ""
await self.push_frame(LLMMessagesFrame(self._messages))
else:
# type: ignore -- the linter thinks this isn't a TextFrame, even
# though we check it above
self._messages.append({"role": self._role, "content": frame.text})
await self.push_frame(LLMMessagesFrame(self._messages))
class LLMUserContextAggregator(LLMContextAggregator):
def __init__(
self,
messages: list[dict],
complete_sentences=True):
super().__init__(
messages,
"user",
complete_sentences,
pass_through=False)
class LLMAssistantContextAggregator(LLMContextAggregator):
def __init__(
self,
messages: list[dict],
complete_sentences=True):
super().__init__(
messages,
"assistant",
complete_sentences,
pass_through=True,
)

View File

@@ -6,12 +6,16 @@
from typing import List
from pipecat.services.openai import OpenAILLMContextFrame, OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
Frame,
InterimTranscriptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
LLMMessagesFrame,
StartInterruptionFrame,
TranscriptionFrame,
@@ -211,3 +215,44 @@ class LLMFullResponseAggregator(FrameProcessor):
self._aggregation = ""
else:
await self.push_frame(frame, direction)
class LLMContextAggregator(LLMResponseAggregator):
def __init__(self, *, context: OpenAILLMContext, **kwargs):
self._context = context
super().__init__(**kwargs)
async def _push_aggregation(self):
if len(self._aggregation) > 0:
self._context.add_message({"role": self._role, "content": self._aggregation})
frame = OpenAILLMContextFrame(self._context)
await self.push_frame(frame)
# Reset our accumulator state.
self._reset()
class LLMAssistantContextAggregator(LLMContextAggregator):
def __init__(self, context: OpenAILLMContext):
super().__init__(
messages=[],
context=context,
role="assistant",
start_frame=LLMResponseStartFrame,
end_frame=LLMResponseEndFrame,
accumulator_frame=TextFrame
)
class LLMUserContextAggregator(LLMContextAggregator):
def __init__(self, context: OpenAILLMContext):
super().__init__(
messages=[],
context=context,
role="user",
start_frame=UserStartedSpeakingFrame,
end_frame=UserStoppedSpeakingFrame,
accumulator_frame=TranscriptionFrame,
interim_accumulator_frame=InterimTranscriptionFrame
)

View File

@@ -12,7 +12,7 @@ class VisionImageFrameAggregator(FrameProcessor):
"""This aggregator waits for a consecutive TextFrame and an
ImageFrame. After the ImageFrame arrives it will output a VisionImageFrame.
>>> from pipecat.pipeline.frames import ImageFrame
>>> from pipecat.frames.frames import ImageFrame
>>> async def print_frames(aggregator, frame):
... async for frame in aggregator.process_frame(frame):

View File

@@ -6,17 +6,22 @@
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from loguru import logger
from typing import Optional
logger = logger.opt(ansi=True)
class FrameLogger(FrameProcessor):
def __init__(self, prefix="Frame"):
def __init__(self, prefix="Frame", color: Optional[str] = None):
super().__init__()
self._prefix = prefix
self._color = color
async def process_frame(self, frame: Frame, direction: FrameDirection):
match direction:
case FrameDirection.UPSTREAM:
print(f"< {self._prefix}: {frame}")
case FrameDirection.DOWNSTREAM:
print(f"> {self._prefix}: {frame}")
dir = "<" if direction is FrameDirection.UPSTREAM else ">"
msg = f"{dir} {self._prefix}: {frame}"
if self._color:
msg = f"<{self._color}>{msg}</>"
logger.debug(msg)
await self.push_frame(frame, direction)

View File

@@ -1,6 +1,6 @@
from abc import abstractmethod
from pipecat.pipeline.frames import Frame
from pipecat.frames.frames import Frame
class FrameSerializer:

View File

@@ -1,14 +1,14 @@
import dataclasses
from typing import Text
from pipecat.pipeline.frames import AudioFrame, Frame, TextFrame, TranscriptionFrame
import pipecat.pipeline.protobufs.frames_pb2 as frame_protos
from pipecat.frames.frames import AudioRawFrame, Frame, TextFrame, TranscriptionFrame
import pipecat.frames.protobufs.frames_pb2 as frame_protos
from pipecat.serializers.abstract_frame_serializer import FrameSerializer
class ProtobufFrameSerializer(FrameSerializer):
SERIALIZABLE_TYPES = {
TextFrame: "text",
AudioFrame: "audio",
AudioRawFrame: "audio",
TranscriptionFrame: "transcription"
}

View File

@@ -46,7 +46,7 @@ class AzureTTSService(TTSService):
self._voice = voice
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Transcribing text: {text}")
logger.debug(f"Generating TTS: {text}")
ssml = (
"<speak version='1.0' xml:lang='en-US' xmlns='http://www.w3.org/2001/10/synthesis' "

View File

@@ -0,0 +1,56 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from cartesia.tts import AsyncCartesiaTTS
import time
from typing import AsyncGenerator
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame
from pipecat.services.ai_services import TTSService
from loguru import logger
class CartesiaTTSService(TTSService):
def __init__(
self,
*,
api_key: str,
voice_name: str,
**kwargs):
super().__init__(**kwargs)
self._api_key = api_key
self._voice_name = voice_name
self._client = None
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Transcribing text: [{text}]")
try:
if self._client is None:
self._client = AsyncCartesiaTTS(api_key=self._api_key)
voices = self._client.get_voices()
self._voice_id = voices[self._voice_name]["id"]
self._voice = self._client.get_voice_embedding(voice_id=self._voice_id)
chunk_generator = await self._client.generate(
transcript=text, voice=self._voice, stream=True,
model_id="upbeat-moon", data_rtype='array', output_format='pcm_16000',
# a chunk_time of 0.1 seems to be the default. there are small audio pops/gaps which
# we need to debug
chunk_time=0.1
)
async for chunk in chunk_generator:
# print(f"")
frame = AudioRawFrame(chunk['audio'], 16000, 1)
yield frame
except Exception as e:
logger.error(f"Exception {e}")

View File

@@ -32,17 +32,21 @@ class DeepgramTTSService(TTSService):
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.info(f"Running Deepgram TTS for {text}")
base_url = "https://api.deepgram.com/v1/speak"
request_url = f"{base_url}?model={self._voice}&encoding=linear16&container=none&sample_rate=16000"
request_url = f"{base_url}?model = {
self._voice} & encoding = linear16 & container = none & sample_rate = 16000"
headers = {"authorization": f"token {self._api_key}"}
body = {"text": text}
async with self._aiohttp_session.post(request_url, headers=headers, json=body) as r:
if r.status != 200:
text = await r.text()
logger.error(f"Error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})")
return
try:
async with self._aiohttp_session.post(request_url, headers=headers, json=body) as r:
if r.status != 200:
text = await r.text()
logger.error(f"Error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})")
return
async for data in r.content:
frame = AudioRawFrame(audio=data, sample_rate=16000, num_channels=1)
yield frame
async for data in r.content:
frame = AudioRawFrame(audio=data, sample_rate=16000, num_channels=1)
yield frame
except Exception as e:
logger.error(f"Exception {e}")

View File

@@ -8,7 +8,7 @@ import aiohttp
from typing import AsyncGenerator
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, TTSStartedFrame, TTSStoppedFrame, TextFrame
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame
from pipecat.services.ai_services import TTSService
from loguru import logger
@@ -32,7 +32,7 @@ class ElevenLabsTTSService(TTSService):
self._model = model
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Transcribing text: [{text}]")
logger.debug(f"Generating TTS: [{text}]")
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream"

View File

@@ -86,9 +86,18 @@ class GoogleLLMService(LLMService):
logger.debug(f"Google LLM TTFB: {time.time() - start_time}")
async for chunk in self._async_generator_wrapper(response):
await self.push_frame(LLMResponseStartFrame())
await self.push_frame(TextFrame(chunk.text))
await self.push_frame(LLMResponseEndFrame())
try:
text = chunk.text
await self.push_frame(LLMResponseStartFrame())
await self.push_frame(TextFrame(text))
await self.push_frame(LLMResponseEndFrame())
except Exception as e:
# Google LLMs seem to flag safety issues a lot!
if chunk.candidates[0].finish_reason == 3:
logger.debug(
f"LLM refused to generate content for safety reasons - {messages}.")
else:
logger.error(f"Error {e}")
except Exception as e:
logger.error(f"Exception: {e}")

View File

@@ -5,6 +5,7 @@
#
import io
import json
import time
import aiohttp
import base64
@@ -28,13 +29,19 @@ from pipecat.frames.frames import (
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService, ImageGenService
from openai.types.chat import (
ChatCompletionSystemMessageParam,
ChatCompletionFunctionMessageParam,
ChatCompletionToolParam,
ChatCompletionUserMessageParam,
)
from loguru import logger
try:
from openai import AsyncOpenAI, AsyncStream
from openai.types.chat import (
ChatCompletion,
ChatCompletionChunk,
ChatCompletionMessageParam,
)
@@ -45,6 +52,10 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
class OpenAIUnhandledFunctionException(BaseException):
pass
class BaseOpenAILLMService(LLMService):
"""This is the base for all services that use the AsyncOpenAI client.
@@ -59,10 +70,23 @@ class BaseOpenAILLMService(LLMService):
super().__init__()
self._model: str = model
self._client = self.create_client(api_key=api_key, base_url=base_url)
self._callbacks = {}
self._start_callbacks = {}
def create_client(self, api_key=None, base_url=None):
return AsyncOpenAI(api_key=api_key, base_url=base_url)
# TODO-CB: callback function type
def register_function(self, function_name, callback, start_callback=None):
self._callbacks[function_name] = callback
if start_callback:
self._start_callbacks[function_name] = start_callback
def unregister_function(self, function_name):
del self._callbacks[function_name]
if self._start_callbacks[function_name]:
del self._start_callbacks[function_name]
async def _stream_chat_completions(
self, context: OpenAILLMContext
) -> AsyncStream[ChatCompletionChunk]:
@@ -97,16 +121,24 @@ class BaseOpenAILLMService(LLMService):
return chunks
async def _chat_completions(self, messages) -> str | None:
response: ChatCompletion = await self._client.chat.completions.create(
model=self._model, stream=False, messages=messages
)
if response and len(response.choices) > 0:
return response.choices[0].message.content
else:
return None
async def _process_context(self, context: OpenAILLMContext):
function_name = ""
arguments = ""
tool_call_id = ""
chunk_stream: AsyncStream[ChatCompletionChunk] = (
await self._stream_chat_completions(context)
)
await self.push_frame(LLMFullResponseStartFrame())
async for chunk in chunk_stream:
if len(chunk.choices) == 0:
continue
@@ -126,23 +158,77 @@ class BaseOpenAILLMService(LLMService):
tool_call = chunk.choices[0].delta.tool_calls[0]
if tool_call.function and tool_call.function.name:
function_name += tool_call.function.name
# yield LLMFunctionStartFrame(function_name=tool_call.function.name)
tool_call_id = tool_call.id
# only send a function start frame if we're not handling the function call
if function_name in self._callbacks.keys():
if function_name in self._start_callbacks.keys():
await self._start_callbacks[function_name](self)
if tool_call.function and tool_call.function.arguments:
# Keep iterating through the response to collect all the argument fragments and
# yield a complete LLMFunctionCallFrame after run_llm_async
# completes
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
await self.push_frame(LLMResponseStartFrame())
await self.push_frame(TextFrame(chunk.choices[0].delta.content))
await self.push_frame(LLMResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame())
# if we got a function name and arguments, check to see if it's a function with
# a registered handler. If so, run the registered callback, save the result to
# the context, and re-prompt to get a chat answer. If we don't have a registered
# handler, raise an exception.
if function_name and arguments:
if function_name in self._callbacks.keys():
await self._handle_function_call(context, tool_call_id, function_name, arguments)
# if we got a function name and arguments, yield the frame with all the info so
# frame consumers can take action based on the function call.
# if function_name and arguments:
# yield LLMFunctionCallFrame(function_name=function_name, arguments=arguments)
else:
raise OpenAIUnhandledFunctionException(
f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function.")
async def _handle_function_call(
self,
context,
tool_call_id,
function_name,
arguments
):
arguments = json.loads(arguments)
result = await self._callbacks[function_name](self, arguments)
arguments = json.dumps(arguments)
if isinstance(result, (str, dict)):
# Handle it in "full magic mode"
tool_call = ChatCompletionFunctionMessageParam({
"role": "assistant",
"tool_calls": [
{
"id": tool_call_id,
"function": {
"arguments": arguments,
"name": function_name
},
"type": "function"
}
]
})
context.add_message(tool_call)
if isinstance(result, dict):
result = json.dumps(result)
tool_result = ChatCompletionToolParam({
"tool_call_id": tool_call_id,
"role": "tool",
"content": result
})
context.add_message(tool_result)
# re-prompt to get a human answer
await self._process_context(context)
elif isinstance(result, list):
# reduced magic
for msg in result:
context.add_message(msg)
await self._process_context(context)
elif isinstance(result, type(None)):
pass
else:
raise BaseException(f"Unknown return type from function callback: {type(result)}")
async def process_frame(self, frame: Frame, direction: FrameDirection):
context = None
@@ -156,7 +242,9 @@ class BaseOpenAILLMService(LLMService):
await self.push_frame(frame, direction)
if context:
await self.push_frame(LLMFullResponseStartFrame())
await self._process_context(context)
await self.push_frame(LLMFullResponseEndFrame())
class OpenAILLMService(BaseOpenAILLMService):

View File

@@ -158,7 +158,6 @@ class BaseOutputTransport(FrameProcessor):
while self._running:
try:
frame = self._sink_queue.get(timeout=1)
if not self._is_interrupted.is_set():
if isinstance(frame, AudioRawFrame):
if self._params.audio_out_enabled:
@@ -252,6 +251,8 @@ class BaseOutputTransport(FrameProcessor):
image = next(self._camera_images)
self._draw_image(image)
time.sleep(1.0 / self._params.camera_out_framerate)
else:
time.sleep(1.0 / self._params.camera_out_framerate)
except queue.Empty:
pass
except Exception as e:

View File

@@ -17,8 +17,8 @@ from functools import partial
from typing import Any, Callable, Mapping
from daily import (
CallClient,
Daily,
CallClient,
EventHandler,
VirtualCameraDevice,
VirtualMicrophoneDevice,

View File

@@ -0,0 +1,41 @@
from typing import List
from pipecat.processors.frame_processor import FrameProcessor
class TestException(BaseException):
pass
class TestFrameProcessor(FrameProcessor):
def __init__(self, test_frames):
self.test_frames = test_frames
self._list_counter = 0
super().__init__()
async def process_frame(self, frame, direction):
if not self.test_frames[0]: # then we've run out of required frames but the generator is still going?
raise TestException(f"Oops, got an extra frame, {frame}")
if isinstance(self.test_frames[0], List):
# We need to consume frames until we see the next frame type after this
next_frame = self.test_frames[1]
if isinstance(frame, next_frame):
# we're done iterating the list I guess
print(f"TestFrameProcessor got expected list exit frame: {frame}")
# pop twice to get rid of the list, as well as the next frame
self.test_frames.pop(0)
self.test_frames.pop(0)
self.list_counter = 0
else:
fl = self.test_frames[0]
fl_el = fl[self._list_counter % len(fl)]
if isinstance(frame, fl_el):
print(f"TestFrameProcessor got expected list frame: {frame}")
self._list_counter += 1
else:
raise TestException(f"Inside a list, expected {fl_el} but got {frame}")
else:
if not isinstance(frame, self.test_frames[0]):
raise TestException(f"Expected {self.test_frames[0]}, but got {frame}")
print(f"TestFrameProcessor got expected frame: {frame}")
self.test_frames.pop(0)

View File

@@ -37,8 +37,6 @@ class SileroVADAnalyzer(VADAnalyzer):
repo_or_dir="snakers4/silero-vad", model="silero_vad", force_reload=False
)
self._processor_vad_state: VADState = VADState.QUIET
logger.debug("Loaded Silero VAD")
#
@@ -73,6 +71,8 @@ class SileroVAD(FrameProcessor):
self._vad_analyzer = SileroVADAnalyzer(sample_rate=sample_rate, params=vad_params)
self._audio_passthrough = audio_passthrough
self._processor_vad_state: VADState = VADState.QUIET
#
# FrameProcessor
#

View File

@@ -1,8 +1,8 @@
import asyncio
import os
from pipecat.pipeline.openai_frames import OpenAILLMContextFrame
from pipecat.services.azure_ai_services import AzureLLMService
from pipecat.services.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.services.azure import AzureLLMService
from pipecat.services.openai import OpenAILLMContext
from openai.types.chat import (
ChatCompletionSystemMessageParam,

View File

@@ -1,11 +1,10 @@
import asyncio
from pipecat.pipeline.openai_frames import OpenAILLMContextFrame
from pipecat.services.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext
from openai.types.chat import (
ChatCompletionSystemMessageParam,
)
from pipecat.services.ollama_ai_services import OLLamaLLMService
from pipecat.services.ollama import OLLamaLLMService
if __name__ == "__main__":
async def test_chat():

View File

@@ -1,51 +1,75 @@
import asyncio
import json
import os
from pipecat.pipeline.openai_frames import OpenAILLMContextFrame
from pipecat.services.openai_llm_context import OpenAILLMContext
from typing import List
from pipecat.services.openai import OpenAILLMContextFrame, OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
TextFrame
)
from pipecat.utils.test_frame_processor import TestFrameProcessor
from openai.types.chat import (
ChatCompletionSystemMessageParam,
ChatCompletionToolParam,
ChatCompletionUserMessageParam,
)
from pipecat.services.openai_api_llm_service import BaseOpenAILLMService
from pipecat.services.openai import OpenAILLMService
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": [
"celsius",
"fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": [
"location",
"format"],
},
})]
if __name__ == "__main__":
async def test_functions():
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": [
"celsius",
"fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": [
"location",
"format"],
},
})]
async def test_simple_functions():
async def get_weather_from_api(llm, args):
return json.dumps({"conditions": "nice", "temperature": "75"})
api_key = os.getenv("OPENAI_API_KEY")
llm = BaseOpenAILLMService(
llm = OpenAILLMService(
api_key=api_key or "",
model="gpt-4-1106-preview",
)
llm.register_function("get_current_weather", get_weather_from_api)
t = TestFrameProcessor([
LLMFullResponseStartFrame,
[LLMResponseStartFrame, TextFrame, LLMResponseEndFrame],
LLMFullResponseEndFrame
])
llm.link(t)
context = OpenAILLMContext(tools=tools)
system_message: ChatCompletionSystemMessageParam = ChatCompletionSystemMessageParam(
content="Ask the user to ask for a weather report", name="system", role="system"
@@ -58,26 +82,64 @@ if __name__ == "__main__":
context.add_message(system_message)
context.add_message(user_message)
frame = OpenAILLMContextFrame(context)
async for s in llm.process_frame(frame):
print(s)
await llm.process_frame(frame, FrameDirection.DOWNSTREAM)
async def test_advanced_functions():
async def get_weather_from_api(llm, args):
return [{"role": "system", "content": "The user has asked for live weather. Respond by telling them we don't currently support live weather for that area, but it's coming soon."}]
async def test_chat():
api_key = os.getenv("OPENAI_API_KEY")
llm = BaseOpenAILLMService(
llm = OpenAILLMService(
api_key=api_key or "",
model="gpt-4-1106-preview",
)
llm.register_function("get_current_weather", get_weather_from_api)
t = TestFrameProcessor([
LLMFullResponseStartFrame,
[LLMResponseStartFrame, TextFrame, LLMResponseEndFrame],
LLMFullResponseEndFrame
])
llm.link(t)
context = OpenAILLMContext(tools=tools)
system_message: ChatCompletionSystemMessageParam = ChatCompletionSystemMessageParam(
content="Ask the user to ask for a weather report", name="system", role="system"
)
user_message: ChatCompletionUserMessageParam = ChatCompletionUserMessageParam(
content="Could you tell me the weather for Boulder, Colorado",
name="user",
role="user",
)
context.add_message(system_message)
context.add_message(user_message)
frame = OpenAILLMContextFrame(context)
await llm.process_frame(frame, FrameDirection.DOWNSTREAM)
async def test_chat():
api_key = os.getenv("OPENAI_API_KEY")
t = TestFrameProcessor([
LLMFullResponseStartFrame,
[LLMResponseStartFrame, TextFrame, LLMResponseEndFrame],
LLMFullResponseEndFrame
])
llm = OpenAILLMService(
api_key=api_key or "",
model="gpt-4o",
)
llm.link(t)
context = OpenAILLMContext()
message: ChatCompletionSystemMessageParam = ChatCompletionSystemMessageParam(
content="Please tell the world hello.", name="system", role="system")
context.add_message(message)
frame = OpenAILLMContextFrame(context)
async for s in llm.process_frame(frame):
print(s)
await llm.process_frame(frame, FrameDirection.DOWNSTREAM)
async def run_tests():
await test_functions()
await test_simple_functions()
await test_advanced_functions()
await test_chat()
asyncio.run(run_tests())

View File

@@ -3,16 +3,15 @@ import doctest
import functools
import unittest
from pipecat.pipeline.aggregators import (
GatedAggregator,
ParallelPipeline,
SentenceAggregator,
StatelessTextTransformer,
)
from pipecat.pipeline.frames import (
AudioFrame,
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.text_transformer import StatelessTextTransformer
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.frames.frames import (
AudioRawFrame,
EndFrame,
ImageFrame,
ImageRawFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
Frame,
@@ -46,26 +45,26 @@ class TestDailyFrameAggregators(unittest.IsolatedAsyncioTestCase):
async def test_gated_accumulator(self):
gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(
frame, ImageFrame), gate_close_fn=lambda frame: isinstance(
frame, ImageRawFrame), gate_close_fn=lambda frame: isinstance(
frame, LLMResponseStartFrame), start_open=False, )
frames = [
LLMResponseStartFrame(),
TextFrame("Hello, "),
TextFrame("world."),
AudioFrame(b"hello"),
ImageFrame(b"image", (0, 0)),
AudioFrame(b"world"),
AudioRawFrame(b"hello", 1, 1),
ImageRawFrame(b"image", (0, 0)),
AudioRawFrame(b"world", 1, 1),
LLMResponseEndFrame(),
]
expected_output_frames = [
ImageFrame(b"image", (0, 0)),
ImageRawFrame(b"image", (0, 0)),
LLMResponseStartFrame(),
TextFrame("Hello, "),
TextFrame("world."),
AudioFrame(b"hello"),
AudioFrame(b"world"),
AudioRawFrame(b"hello", 1, 1),
AudioRawFrame(b"world", 1, 1),
LLMResponseEndFrame(),
]
for frame in frames:

View File

@@ -3,7 +3,7 @@ import unittest
from typing import AsyncGenerator
from pipecat.services.ai_services import AIService
from pipecat.pipeline.frames import EndFrame, Frame, TextFrame
from pipecat.frames.frames import EndFrame, Frame, TextFrame
class SimpleAIService(AIService):

View File

@@ -2,9 +2,10 @@ import asyncio
import unittest
from unittest.mock import Mock
from pipecat.pipeline.aggregators import SentenceAggregator, StatelessTextTransformer
from pipecat.pipeline.frame_processor import FrameProcessor
from pipecat.pipeline.frames import EndFrame, TextFrame
from pipecat.processors.text_transformer import StatelessTextTransformer
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline

View File

@@ -1,6 +1,6 @@
import unittest
from pipecat.pipeline.frames import AudioFrame, TextFrame, TranscriptionFrame
from pipecat.frames.frames import AudioFrame, TextFrame, TranscriptionFrame
from pipecat.serializers.protobuf_serializer import ProtobufFrameSerializer

View File

@@ -2,7 +2,7 @@ import asyncio
import unittest
from unittest.mock import AsyncMock, patch, Mock
from pipecat.pipeline.frames import AudioFrame, EndFrame, TextFrame, TTSEndFrame, TTSStartFrame
from pipecat.frames.frames import AudioRawFrame, EndFrame, TextFrame, TTSStoppedFrame, TTSStartedFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.transports.websocket_transport import WebSocketFrameProcessor, WebsocketTransport
@@ -52,10 +52,10 @@ class TestWebSocketTransportService(unittest.IsolatedAsyncioTestCase):
processor = WebSocketFrameProcessor(audio_frame_size=4)
source_frames = [
TTSStartFrame(),
AudioFrame(b"1234"),
AudioFrame(b"5678"),
TTSEndFrame(),
TTSStartedFrame(),
AudioRawFrame(b"1234", 1, 1),
AudioRawFrame(b"5678", 1, 1),
TTSStoppedFrame(),
TextFrame("hello world")
]
@@ -65,9 +65,9 @@ class TestWebSocketTransportService(unittest.IsolatedAsyncioTestCase):
frames.append(output_frame)
self.assertEqual(len(frames), 3)
self.assertIsInstance(frames[0], AudioFrame)
self.assertIsInstance(frames[0], AudioRawFrame)
self.assertEqual(frames[0].data, b"1234")
self.assertIsInstance(frames[1], AudioFrame)
self.assertIsInstance(frames[1], AudioRawFrame)
self.assertEqual(frames[1].data, b"5678")
self.assertIsInstance(frames[2], TextFrame)
self.assertEqual(frames[2].text, "hello world")