Compare commits

...

76 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
2f4467b5a5 Merge pull request #213 from pipecat-ai/aleix/pipecat-0.0.26
update CHANGELOG for 0.0.26
2024-06-06 01:10:01 +08:00
Aleix Conchillo Flaqué
e91ab54a69 update CHANGELOG for 0.0.26 2024-06-05 10:07:45 -07:00
Aleix Conchillo Flaqué
6a33432c82 Merge pull request #212 from pipecat-ai/aleix/make-pinlesscallupdate-public
transports(daily): move pinlessCallUpdate to public api
2024-06-05 23:14:14 +08:00
Aleix Conchillo Flaqué
135654a080 transports(daily): move pinlessCallUpdate to public api 2024-06-05 08:08:56 -07:00
Aleix Conchillo Flaqué
7b708a2bee Merge pull request #211 from pipecat-ai/aleix/base-transport-async
various fixes and improvements
2024-06-05 22:57:35 +08:00
Aleix Conchillo Flaqué
b515c28417 services(cartesia): allow output_format and model_id 2024-06-04 19:24:33 -07:00
Aleix Conchillo Flaqué
854ffb0323 update CHANGELOG for DailyRESTHelper 2024-06-04 15:45:17 -07:00
Aleix Conchillo Flaqué
891b7b22ea transports: push EndFrame/CancelFrame before stopping push task 2024-06-04 15:43:54 -07:00
Aleix Conchillo Flaqué
c8d37a7227 pipeline(runner): add support for SIGTERM 2024-06-04 15:43:54 -07:00
Aleix Conchillo Flaqué
489060881d update macos-py3.10-requirements 2024-06-04 15:43:54 -07:00
Aleix Conchillo Flaqué
d56a4cce1b update CHANGELOG with latest changes 2024-06-04 15:43:54 -07:00
Aleix Conchillo Flaqué
7eb9dfde38 pyproject: include langchain-community and langchain-openai 2024-06-04 15:43:54 -07:00
Aleix Conchillo Flaqué
571e10f83e services(anthropic): fix interruptions with anthropic 2024-06-04 15:43:54 -07:00
Aleix Conchillo Flaqué
af202d4fe5 pipeline(task): introduce has_finished() 2024-06-04 15:43:54 -07:00
Aleix Conchillo Flaqué
4057fbbcfd transports(tk): fix pyaudio output stream cleanup 2024-06-04 15:43:54 -07:00
Aleix Conchillo Flaqué
5cdb8a79a1 examples: use camera_out_is_live for live video 2024-06-04 15:43:54 -07:00
Aleix Conchillo Flaqué
a674b43243 transport: remove redundant camera thread and switch audio pull for push 2024-06-04 15:43:54 -07:00
Jon Taylor
ac41f13b7c Merge pull request #205 from pipecat-ai/daily_rest_helpers
Created REST helpers for Daily covering commonly used methods for running / deployment
2024-06-04 22:26:39 +02:00
Jon Taylor
003b9887b1 made sip and sipuri optional and None 2024-06-04 19:03:58 +02:00
Jon Taylor
ba45c2ab5b addressed review (urllib import and linting 2024-06-04 18:39:35 +02:00
Aleix Conchillo Flaqué
9d36a48a80 Merge pull request #208 from pipecat-ai/aleix/cartesia-voice-load-startup
services(cartesia): load voices on startup
2024-06-04 22:54:25 +08:00
Aleix Conchillo Flaqué
20a525635e Merge pull request #201 from TomTom101/TomTom101/openai_tts
Added OpenAI TTS (#196)
2024-06-04 22:53:56 +08:00
Aleix Conchillo Flaqué
659eceea95 services(cartesia): load voices on startup 2024-06-03 14:08:04 -07:00
TomTom101
d462c03d00 chore: Review comments 2024-06-03 20:13:15 +02:00
Jon Taylor
6591e07eb4 removed hardcoded 'https' from API url 2024-06-03 19:32:14 +02:00
Aleix Conchillo Flaqué
fe71825954 Merge pull request #206 from pipecat-ai/aleix/fix-deepgram-tts
services(deepgram): fixed DeepgramTTSService
2024-06-04 00:28:53 +08:00
Aleix Conchillo Flaqué
43516f84fe services(deepgram): fixed DeepgramTTSService 2024-06-03 07:53:46 -07:00
Jon Taylor
0849edb00b added Daily REST helpers file for common methods used in Pipecat bots 2024-06-03 16:38:13 +02:00
Aleix Conchillo Flaqué
dd3b4083eb Merge pull request #204 from TomTom101/TomTom101/langchain
fix: Fixed imports, support new PipelineParams
2024-06-03 03:16:30 +08:00
TomTom101
89673a4040 test(langchain): Use new PipelineParams in test 2024-06-02 20:19:55 +02:00
TomTom101
410dbd3dfc fix: Fixed imports, support new PipelineParams 2024-06-02 20:16:11 +02:00
TomTom101
7085b1ea3f doc(openai): Added hint re the 24kHz sample rate 2024-06-01 20:35:46 +02:00
TomTom101
8683cae719 feat: OpenAITTS 2024-06-01 10:13:28 +02:00
Aleix Conchillo Flaqué
0197efa524 Merge pull request #200 from pipecat-ai/aleix/changelog-0.0.25
update CHANGELOG.md for version 0.0.25
2024-06-01 07:48:42 +08:00
Aleix Conchillo Flaqué
16e76caa33 update CHANGELOG.md for version 0.0.25 2024-05-31 16:48:03 -07:00
Aleix Conchillo Flaqué
1f5240694d Merge pull request #199 from pipecat-ai/aleix/langchain-changelog
move LangchainProcessor to processors/frameworks and update CHANGELOG
2024-06-01 07:46:51 +08:00
Aleix Conchillo Flaqué
f087151db7 move LangchainProcessor to processors/frameworks and update CHANGELOG 2024-05-31 16:45:39 -07:00
Aleix Conchillo Flaqué
0b691ff597 Merge pull request #198 from pipecat-ai/aleix/websocket-transport
websocket transport support
2024-06-01 04:40:39 +08:00
TomTom101
ae049961b7 wip: untested 2024-05-31 22:30:52 +02:00
Aleix Conchillo Flaqué
0d6eee705f Merge pull request #190 from TomTom101/TomTom101/langchain
Langchain service
2024-06-01 04:21:12 +08:00
Aleix Conchillo Flaqué
58d20ec9dc transport(websocket-server): add on_client_disconnected 2024-05-31 12:52:43 -07:00
Aleix Conchillo Flaqué
38befe1dc1 examples(websocket): rename server.py to bot.py 2024-05-31 12:09:54 -07:00
Aleix Conchillo Flaqué
2f335100a5 remove storage folder 2024-05-31 11:54:18 -07:00
Aleix Conchillo Flaqué
3fef818843 examples(websocket-server): use VAD analyzer from transport 2024-05-31 11:54:18 -07:00
Aleix Conchillo Flaqué
428c8af77e transports(websocket): base class from BaseInputTransport 2024-05-31 11:54:18 -07:00
Aleix Conchillo Flaqué
54fccd2e25 pipeline: cleanup processors one by one 2024-05-31 11:37:43 -07:00
Aleix Conchillo Flaqué
66c6a5dc0f transports(websocket): base class from BaseOutputTransport 2024-05-31 11:37:43 -07:00
Aleix Conchillo Flaqué
92561ae19d some event loop parameter updates 2024-05-31 11:37:43 -07:00
Aleix Conchillo Flaqué
b85e93410b transports(daily): fix event handlers callback 2024-05-31 11:37:43 -07:00
Aleix Conchillo Flaqué
593993ba97 transports(base_input): remove unnecessary task 2024-05-31 11:37:41 -07:00
Aleix Conchillo Flaqué
7b8b606278 update CHANGELOG and create websocker-server instructions 2024-05-31 11:37:19 -07:00
Aleix Conchillo Flaqué
7116ad0607 examples: fix websocket-client audio playback 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
c507044277 examples: use gpt-4o model by default 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
5f45a9d90f examples: websocket-server updates 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
e31e87aabd transport(websocket): update audio_frame_size 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
2957416d90 serializers(protobuf): support id and name fields 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
b9b761b67a added sample_rate and num_channels to protobuf AudioRawFrame 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
a7539e9317 transports: simplify and fix async and nested decorators 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
75575c0c68 use get_event_loop() and move event handlers to BaseTransport 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
77b3e08214 examples: add and update wbesocket eaxmples 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
956b783c1a transports: added new WebsocketServerTransport 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
e90c080470 serializers: added BaseSerializer 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
37aabaa03a frames: generate protobuf pb2 file for pipecat package 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
3e289a7bef pyproject: add protobuf dependency 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
6dd5e3fdf5 dev-requirements: add grpcio-tools 2024-05-31 11:36:52 -07:00
Aleix Conchillo Flaqué
e60df3c7c0 Merge pull request #195 from pipecat-ai/aleix/function-calling-move-to-llmservice
function calling move to LLMService
2024-06-01 02:36:29 +08:00
Aleix Conchillo Flaqué
42f772beed examples: some function calling examples cleanup 2024-05-31 11:36:04 -07:00
Aleix Conchillo Flaqué
3655c4a0fc services: move function calling registration to LLMService 2024-05-31 11:36:04 -07:00
Aleix Conchillo Flaqué
012dbffd94 update CHANGELOG.md for function calling 2024-05-31 11:36:03 -07:00
TomTom101
4b39efeee3 fix(langchain): try/catch langchain import in service; Only langchain is installed with the [langchain] extra (#190) 2024-05-31 10:19:27 +02:00
TomTom101
b19243ab75 fix: corrected hint to install Langchain libs 2024-05-30 10:53:42 +02:00
TomTom101
2bf094b950 test(langchain): Rewrite to unittest, make it meaningful 2024-05-30 10:43:33 +02:00
TomTom101
143033d7db fix: install langchain-community with the langchain extra 2024-05-30 03:15:14 +02:00
TomTom101
335990c145 wip: hint to install langchain_community 2024-05-30 03:15:14 +02:00
TomTom101
6d24e836b0 wip: Example using LC message history 2024-05-30 03:15:14 +02:00
TomTom101
278a2fed56 wip: First stab at langchain support
Is this a service or processor?
How to deal with conversation history? LC has sophisticated means of this, but might get in the way of `LLMResponseAggregator`
2024-05-30 03:15:14 +02:00
65 changed files with 1800 additions and 706 deletions

View File

@@ -5,10 +5,65 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.26] - 2024-06-05
### Added
- Allow passing `output_format` and `model_id` to `CartesiaTTSService` to change
audio sample format and the model to use.
- Added `DailyRESTHelper` which helps you create Daily rooms and tokens in an
easy way.
- `PipelineTask` now has a `has_finished()` method to indicate if the task has
completed. If a task is never ran `has_finished()` will return False.
- `PipelineRunner` now supports SIGTERM. If received, the runner will be
canceled.
### Fixed
- Fixed an issue where `BaseInputTransport` and `BaseOutputTransport` where
stopping push tasks before pushing `EndFrame` frames could cause the bots to
get stuck.
- Fixed an error closing local audio transports.
- Fixed an issue with Deepgram TTS that was introduced in the previous release.
- Fixed `AnthropicLLMService` interruptions. If an interruption occurred, a
`user` message could be appended after the previous `user` message. Anthropic
does not allow that because it requires alternate `user` and `assistant`
messages.
### Performance
- The `BaseInputTransport` does not pull audio frames from sub-classes any
more. Instead, sub-classes now push audio frames into a queue in the base
class. Also, `DailyInputTransport` now pushes audio frames every 20ms instead
of 10ms.
- Remove redundant camera input thread from `DailyInputTransport`. This should
improve performance a little bit when processing participant videos.
- Load Cartesia voice on startup.
## [0.0.25] - 2024-05-31
### Added
- Added WebsocketServerTransport. This will create a websocket server and will
read messages coming from a client. The messages are serialized/deserialized
with protobufs. See `examples/websocket-server` for a detailed example.
- Added function calling (LLMService.register_function()). This will allow the
LLM to call functions you have registered when needed. For example, if you
register a function to get the weather in Los Angeles and ask the LLM about
the weather in Los Angeles, the LLM will call your function.
See https://platform.openai.com/docs/guides/function-calling
- Added new `LangchainProcessor`.
- Added Cartesia TTS support (https://cartesia.ai/)
### Fixed
@@ -18,6 +73,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed an issue where `camera_out_enabled` would cause the highg CPU usage if
no image was provided.
### Performance
- Removed unnecessary audio input tasks.
## [0.0.24] - 2024-05-29

View File

@@ -1,6 +1,6 @@
BSD 2-Clause License
Copyright (c) 2024, Kwindla Hultman Kramer
Copyright (c) 2024, Daily
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

View File

@@ -1,5 +1,6 @@
autopep8~=2.1.0
build~=1.2.1
grpcio-tools~=1.62.2
pip-tools~=7.4.1
pytest~=8.2.0
setuptools~=69.5.1

View File

@@ -44,7 +44,7 @@ async def main(room_url):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
messages = [
{

View File

@@ -93,7 +93,7 @@ async def main(room_url):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(

View File

@@ -76,7 +76,7 @@ async def main():
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
tts = ElevenLabsTTSService(
aiohttp_session=session,
@@ -156,7 +156,7 @@ async def main():
await runner.stop_when_done()
async def run_tk():
while True:
while not task.has_finished():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)

View File

@@ -81,7 +81,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
messages = [
{

View File

@@ -53,7 +53,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
messages = [
{

View File

@@ -0,0 +1,125 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI
from loguru import logger
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
message_store = {}
def get_session_history(session_id: str) -> BaseChatMessageHistory:
if session_id not in message_store:
message_store[session_id] = ChatMessageHistory()
return message_store[session_id]
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
prompt = ChatPromptTemplate.from_messages(
[
("system",
"Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
"Your response will be synthesized to voice and those characters will create unnatural sounds.",
),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
history_chain = RunnableWithMessageHistory(
chain,
get_session_history,
history_messages_key="chat_history",
input_messages_key="input")
lc = LangchainProcessor(history_chain)
tma_in = LLMUserResponseAggregator()
tma_out = LLMAssistantResponseAggregator()
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
lc, # Langchain
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
lc.set_participant_id(participant["id"])
# Kick off the conversation.
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using
# only the content of the last message to inject it in the prompt defined
# above. So no role is required here.
messages = [(
{
"content": "Please briefly introduce yourself to the user."
}
)]
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -53,7 +53,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
messages = [
{

View File

@@ -39,6 +39,7 @@ async def main(room_url: str, token):
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
@@ -47,7 +48,8 @@ async def main(room_url: str, token):
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="Barbershop Man"
voice_name="British Lady",
output_format="pcm_44100"
)
llm = OpenAILLMService(

View File

@@ -30,6 +30,7 @@ async def main(room_url, token):
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720
)

View File

@@ -38,6 +38,7 @@ async def main(room_url, token):
TransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720))
@@ -47,15 +48,15 @@ async def main(room_url, token):
pipeline = Pipeline([daily_transport.input(), tk_transport.output()])
runner = PipelineRunner()
task = PipelineTask(pipeline)
async def run_tk():
while runner.is_active():
while not task.has_finished():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)
task = PipelineTask(pipeline)
runner = PipelineRunner()
await asyncio.gather(runner.run(task), run_tk())

View File

@@ -95,7 +95,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
tts = ElevenLabsTTSService(
aiohttp_session=session,

View File

@@ -16,8 +16,6 @@ from pipecat.services.whisper import WhisperSTTService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.audio import LocalAudioTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
@@ -34,7 +32,7 @@ class TranscriptionLogger(FrameProcessor):
print(f"Transcription: {frame.text}")
async def main(room_url: str):
async def main():
transport = LocalAudioTransport(TransportParams(audio_in_enabled=True))
stt = WhisperSTTService()
@@ -51,5 +49,4 @@ async def main(room_url: str):
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url))
asyncio.run(main())

View File

@@ -7,9 +7,9 @@
import asyncio
import aiohttp
import os
import json
import sys
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -17,18 +17,13 @@ from pipecat.processors.aggregators.llm_response import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.services.openai import OpenAILLMContext
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import (
ChatCompletionToolParam,
)
from pipecat.frames.frames import (
TextFrame
)
from openai.types.chat import ChatCompletionToolParam
from runner import configure
@@ -71,7 +66,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
llm.register_function(
"get_current_weather",
fetch_weather_from_api,

View File

@@ -1,25 +0,0 @@
syntax = "proto3";
package pipecat_proto;
message TextFrame {
string text = 1;
}
message AudioFrame {
bytes audio = 1;
}
message TranscriptionFrame {
string text = 1;
string participant_id = 2;
string timestamp = 3;
}
message Frame {
oneof frame {
TextFrame text = 1;
AudioFrame audio = 2;
TranscriptionFrame transcription = 3;
}
}

View File

@@ -1,134 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="//cdn.jsdelivr.net/npm/protobufjs@7.X.X/dist/protobuf.min.js"></script>
<title>WebSocket Audio Stream</title>
</head>
<body>
<h1>WebSocket Audio Stream</h1>
<button id="startAudioBtn">Start Audio</button>
<button id="stopAudioBtn">Stop Audio</button>
<script>
const SAMPLE_RATE = 16000;
const BUFFER_SIZE = 8192;
const MIN_AUDIO_SIZE = 6400;
let audioContext;
let microphoneStream;
let scriptProcessor;
let source;
let frame;
let audioChunks = [];
let isPlaying = false;
let ws;
const proto = protobuf.load("frames.proto", (err, root) => {
if (err) throw err;
frame = root.lookupType("pipecat_proto.Frame");
});
function initWebSocket() {
ws = new WebSocket('ws://localhost:8765');
ws.addEventListener('open', () => console.log('WebSocket connection established.'));
ws.addEventListener('message', handleWebSocketMessage);
ws.addEventListener('close', (event) => console.log("WebSocket connection closed.", event.code, event.reason));
ws.addEventListener('error', (event) => console.error('WebSocket error:', event));
}
async function handleWebSocketMessage(event) {
const arrayBuffer = await event.data.arrayBuffer();
enqueueAudioFromProto(arrayBuffer);
}
function enqueueAudioFromProto(arrayBuffer) {
const parsedFrame = frame.decode(new Uint8Array(arrayBuffer));
if (!parsedFrame?.audio) return false;
const frameCount = parsedFrame.audio.data.length / 2;
const audioOutBuffer = audioContext.createBuffer(1, frameCount, SAMPLE_RATE);
const nowBuffering = audioOutBuffer.getChannelData(0);
const view = new Int16Array(parsedFrame.audio.data.buffer);
for (let i = 0; i < frameCount; i++) {
const word = view[i];
nowBuffering[i] = ((word + 32768) % 65536 - 32768) / 32768.0;
}
audioChunks.push(audioOutBuffer);
if (!isPlaying) playNextChunk();
}
function playNextChunk() {
if (audioChunks.length === 0) {
isPlaying = false;
return;
}
isPlaying = true;
const audioOutBuffer = audioChunks.shift();
const source = audioContext.createBufferSource();
source.buffer = audioOutBuffer;
source.connect(audioContext.destination);
source.onended = playNextChunk;
source.start();
}
function startAudio() {
if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) {
alert('getUserMedia is not supported in your browser.');
return;
}
navigator.mediaDevices.getUserMedia({ audio: true })
.then((stream) => {
microphoneStream = stream;
audioContext = new (window.AudioContext || window.webkitAudioContext)();
scriptProcessor = audioContext.createScriptProcessor(BUFFER_SIZE, 1, 1);
source = audioContext.createMediaStreamSource(stream);
source.connect(scriptProcessor);
scriptProcessor.connect(audioContext.destination);
const audioBuffer = [];
const skipRatio = Math.floor(audioContext.sampleRate / (SAMPLE_RATE * 2));
scriptProcessor.onaudioprocess = (event) => {
const rawLeftChannelData = event.inputBuffer.getChannelData(0);
for (let i = 0; i < rawLeftChannelData.length; i += skipRatio) {
const normalized = ((rawLeftChannelData[i] * 32768.0) + 32768) % 65536 - 32768;
const swappedBytes = ((normalized & 0xff) << 8) | ((normalized >> 8) & 0xff);
audioBuffer.push(swappedBytes);
}
if (audioBuffer.length >= MIN_AUDIO_SIZE) {
const audioFrame = frame.create({ audio: { audio: audioBuffer.slice(0, MIN_AUDIO_SIZE) } });
const encodedFrame = new Uint8Array(frame.encode(audioFrame).finish());
ws.send(encodedFrame);
audioBuffer.splice(0, MIN_AUDIO_SIZE);
}
};
initWebSocket();
})
.catch((error) => console.error('Error accessing microphone:', error));
}
function stopAudio() {
if (ws) {
ws.close();
scriptProcessor.disconnect();
source.disconnect();
ws = undefined;
}
}
document.getElementById('startAudioBtn').addEventListener('click', startAudio);
document.getElementById('stopAudioBtn').addEventListener('click', stopAudio);
</script>
</body>
</html>

View File

@@ -1,50 +0,0 @@
import asyncio
import aiohttp
import logging
import os
from pipecat.pipeline.frame_processor import FrameProcessor
from pipecat.pipeline.frames import TextFrame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService
from pipecat.transports.websocket_transport import WebsocketTransport
from pipecat.services.whisper_ai_services import WhisperSTTService
logging.basicConfig(format="%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("pipecat")
logger.setLevel(logging.DEBUG)
class WhisperTranscriber(FrameProcessor):
async def process_frame(self, frame):
if isinstance(frame, TranscriptionFrame):
print(f"Transcribed: {frame.text}")
else:
yield frame
async def main():
async with aiohttp.ClientSession() as session:
transport = WebsocketTransport(
mic_enabled=True,
speaker_enabled=True,
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
pipeline = Pipeline([
WhisperSTTService(),
WhisperTranscriber(),
tts,
])
@transport.on_connection
async def queue_frame():
await pipeline.queue_frames([TextFrame("Hello there!")])
await transport.run(pipeline)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -145,7 +145,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
ta = TalkingAnimation()

View File

@@ -1,11 +1,15 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import copy
import json
import os
import re
import sys
import wave
from typing import List
from openai._types import NotGiven, NOT_GIVEN
@@ -14,23 +18,18 @@ from openai.types.chat import (
ChatCompletionToolParam,
)
from pipecat.frames.frames import AudioRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator, LLMAssistantContextAggregator
from pipecat.processors.logger import FrameLogger
from pipecat.frames.frames import (
Frame,
LLMMessagesFrame,
AudioRawFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame, OpenAILLMService
from pipecat.services.ai_services import AIService
from pipecat.transports.services.daily import DailyParams, DailyTranscriptionSettings, DailyTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
from runner import configure
@@ -242,7 +241,6 @@ class IntakeProcessor:
self._context.add_message(
{"role": "system", "content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function."})
await llm.process_frame(OpenAILLMContextFrame(self._context), FrameDirection.DOWNSTREAM)
pass
async def start_visit_reasons(self, llm):
print("!!! doing start visit reasons")
@@ -251,7 +249,6 @@ class IntakeProcessor:
self._context.add_message({"role": "system",
"content": "Now, thank the user and end the conversation."})
await llm.process_frame(OpenAILLMContextFrame(self._context), FrameDirection.DOWNSTREAM)
pass
async def save_data(self, llm, args):
logger.info(f"!!! Saving data: {args}")
@@ -305,12 +302,10 @@ async def main(room_url: str, token):
model="gpt-4o")
messages = []
context = OpenAILLMContext(
messages=messages,
)
context = OpenAILLMContext(messages=messages)
user_context = LLMUserContextAggregator(context)
assistant_context = LLMAssistantContextAggregator(context)
# checklist = ChecklistProcessor(context, llm)
intake = IntakeProcessor(context, llm)
llm.register_function("verify_birthday", intake.verify_birthday)
llm.register_function(
@@ -329,19 +324,20 @@ async def main(room_url: str, token):
"list_visit_reasons",
intake.save_data,
start_callback=intake.start_visit_reasons)
fl = FrameLogger("LLM Output")
pipeline = Pipeline([
transport.input(),
user_context,
llm,
fl,
tts,
transport.output(),
assistant_context,
transport.input(), # Transport input
user_context, # User responses
llm, # LLM
fl, # Frame logger
tts, # TTS
transport.output(), # Transport output
assistant_context, # Assistant responses
])
task = PipelineTask(pipeline, allow_interruptions=False)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=False))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -117,7 +117,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
messages = [
{

View File

@@ -56,7 +56,7 @@ async def main(room_url, token=None):
llm_service = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo"
model="gpt-4o"
)
tts_service = ElevenLabsTTSService(

View File

@@ -97,7 +97,8 @@ async def main(room_url: str, token):
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4-turbo-preview"
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
sa = SentenceAggregator()

View File

@@ -0,0 +1,27 @@
# Websocket Server
This is an example that shows how to use `WebsocketServerTransport` to communicate with a web client.
## Get started
```python
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
## Run the bot
```bash
python bot.py
```
## Run the HTTP server
This will host the static web client:
```bash
python -m http.server
```
Then, visit `http://localhost:8000` in your browser to start a session.

View File

@@ -0,0 +1,94 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.whisper import WhisperSTTService
from pipecat.transports.network.websocket_server import WebsocketServerParams, WebsocketServerTransport
from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
transport = WebsocketServerTransport(
params=WebsocketServerParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
)
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
stt = WhisperSTTService()
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
tma_out # LLM responses
])
task = PipelineTask(pipeline)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,43 @@
//
// Copyright (c) 2024, Daily
//
// SPDX-License-Identifier: BSD 2-Clause License
//
// Generate frames_pb2.py with:
//
// python -m grpc_tools.protoc --proto_path=./ --python_out=./protobufs frames.proto
syntax = "proto3";
package pipecat;
message TextFrame {
uint64 id = 1;
string name = 2;
string text = 3;
}
message AudioRawFrame {
uint64 id = 1;
string name = 2;
bytes audio = 3;
uint32 sample_rate = 4;
uint32 num_channels = 5;
}
message TranscriptionFrame {
uint64 id = 1;
string name = 2;
string text = 3;
string user_id = 4;
string timestamp = 5;
}
message Frame {
oneof frame {
TextFrame text = 1;
AudioRawFrame audio = 2;
TranscriptionFrame transcription = 3;
}
}

View File

@@ -0,0 +1,205 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="https://cdn.jsdelivr.net/npm/protobufjs@7.X.X/dist/protobuf.min.js"></script>
<title>Pipecat WebSocket Client Example</title>
</head>
<body>
<h1>Pipecat WebSocket Client Example</h1>
<h3><div id="progressText">Loading, wait...</div></h2>
<button id="startAudioBtn">Start Audio</button>
<button id="stopAudioBtn">Stop Audio</button>
<script>
const SAMPLE_RATE = 16000;
const NUM_CHANNELS = 1;
const PLAY_TIME_RESET_THRESHOLD_MS = 1.0;
// The protobuf type. We will load it later.
let Frame = null;
// The websocket connection.
let ws = null;
// The audio context
let audioContext = null;
// The audio context media stream source
let source = null;
// The microphone stream from getUserMedia. SHould be sampled to the
// proper sample rate.
let microphoneStream = null;
// Script processor to get data from microphone.
let scriptProcessor = null;
// AudioContext play time.
let playTime = 0;
// Last time we received a websocket message.
let lastMessageTime = 0;
// Whether we should be playing audio.
let isPlaying = false;
let startBtn = document.getElementById('startAudioBtn');
let stopBtn = document.getElementById('stopAudioBtn');
const proto = protobuf.load("frames.proto", (err, root) => {
if (err) {
throw err;
}
Frame = root.lookupType("pipecat.Frame");
const progressText = document.getElementById("progressText");
progressText.textContent = "We are ready! Make sure to run the server and then click `Start Audio`.";
startBtn.disabled = false;
stopBtn.disabled = true;
});
function initWebSocket() {
ws = new WebSocket('ws://localhost:8765');
ws.addEventListener('open', () => console.log('WebSocket connection established.'));
ws.addEventListener('message', handleWebSocketMessage);
ws.addEventListener('close', (event) => {
console.log("WebSocket connection closed.", event.code, event.reason);
stopAudio(false);
});
ws.addEventListener('error', (event) => console.error('WebSocket error:', event));
}
async function handleWebSocketMessage(event) {
const arrayBuffer = await event.data.arrayBuffer();
if (isPlaying) {
enqueueAudioFromProto(arrayBuffer);
}
}
function enqueueAudioFromProto(arrayBuffer) {
const parsedFrame = Frame.decode(new Uint8Array(arrayBuffer));
if (!parsedFrame?.audio) {
return false;
}
// Reset play time if it's been a while we haven't played anything.
const diffTime = audioContext.currentTime - lastMessageTime;
if ((playTime == 0) || (diffTime > PLAY_TIME_RESET_THRESHOLD_MS)) {
playTime = audioContext.currentTime;
}
lastMessageTime = audioContext.currentTime;
// We should be able to use parsedFrame.audio.audio.buffer but for
// some reason that contains all the bytes from the protobuf message.
const audioVector = Array.from(parsedFrame.audio.audio);
const audioArray = new Uint8Array(audioVector);
audioContext.decodeAudioData(audioArray.buffer, function(buffer) {
const source = new AudioBufferSourceNode(audioContext);
source.buffer = buffer;
source.start(playTime);
source.connect(audioContext.destination);
playTime = playTime + buffer.duration;
});
}
function convertFloat32ToS16PCM(float32Array) {
let int16Array = new Int16Array(float32Array.length);
for (let i = 0; i < float32Array.length; i++) {
let clampedValue = Math.max(-1, Math.min(1, float32Array[i]));
int16Array[i] = clampedValue < 0 ? clampedValue * 32768 : clampedValue * 32767;
}
return int16Array;
}
function startAudioBtnHandler() {
if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) {
alert('getUserMedia is not supported in your browser.');
return;
}
startBtn.disabled = true;
stopBtn.disabled = false;
audioContext = new (window.AudioContext || window.webkitAudioContext)({
latencyHint: "interactive",
sampleRate: SAMPLE_RATE
});
isPlaying = true;
initWebSocket();
navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: SAMPLE_RATE,
channelCount: NUM_CHANNELS,
autoGainControl: true,
echoCancellation: true,
noiseSuppression: true,
}
}).then((stream) => {
microphoneStream = stream;
// 512 is closest thing to 200ms.
scriptProcessor = audioContext.createScriptProcessor(512, 1, 1);
source = audioContext.createMediaStreamSource(stream);
source.connect(scriptProcessor);
scriptProcessor.connect(audioContext.destination);
scriptProcessor.onaudioprocess = (event) => {
if (!ws) {
return;
}
const audioData = event.inputBuffer.getChannelData(0);
const pcmS16Array = convertFloat32ToS16PCM(audioData);
const pcmByteArray = new Uint8Array(pcmS16Array.buffer);
const frame = Frame.create({
audio: {
audio: Array.from(pcmByteArray),
sampleRate: SAMPLE_RATE,
numChannels: NUM_CHANNELS
}
});
const encodedFrame = new Uint8Array(Frame.encode(frame).finish());
ws.send(encodedFrame);
};
}).catch((error) => console.error('Error accessing microphone:', error));
}
function stopAudio(closeWebsocket) {
playTime = 0;
isPlaying = false;
startBtn.disabled = false;
stopBtn.disabled = true;
if (ws && closeWebsocket) {
ws.close();
ws = null;
}
if (scriptProcessor) {
scriptProcessor.disconnect();
}
if (source) {
source.disconnect();
}
}
function stopAudioBtnHandler() {
stopAudio(true);
}
startBtn.addEventListener('click', startAudioBtnHandler);
stopBtn.addEventListener('click', stopAudioBtnHandler);
startBtn.disabled = true;
stopBtn.disabled = true;
</script>
</body>
</html>

View File

@@ -0,0 +1,2 @@
python-dotenv
pipecat-ai[openai,silero,websocket,whisper]

View File

@@ -5,7 +5,11 @@
# pip-compile --all-extras pyproject.toml
#
aiohttp==3.9.5
# via pipecat-ai (pyproject.toml)
# via
# cartesia
# langchain
# langchain-community
# pipecat-ai (pyproject.toml)
aiosignal==1.3.1
# via aiohttp
annotated-types==0.7.0
@@ -18,10 +22,12 @@ anyio==4.4.0
# httpx
# openai
async-timeout==4.0.3
# via aiohttp
# via
# aiohttp
# langchain
attrs==23.2.0
# via aiohttp
av==12.0.0
av==12.1.0
# via faster-whisper
azure-cognitiveservices-speech==1.37.0
# via pipecat-ai (pyproject.toml)
@@ -29,11 +35,15 @@ blinker==1.8.2
# via flask
cachetools==5.3.3
# via google-auth
certifi==2024.2.2
cartesia==0.1.1
# via pipecat-ai (pyproject.toml)
certifi==2024.6.2
# via
# httpcore
# httpx
# requests
cffi==1.16.0
# via sounddevice
charset-normalizer==3.3.2
# via requests
click==8.1.7
@@ -42,8 +52,10 @@ coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.2.1
# via faster-whisper
daily-python==0.9.0
daily-python==0.9.1
# via pipecat-ai (pyproject.toml)
dataclasses-json==0.6.6
# via langchain-community
distro==1.9.0
# via
# anthropic
@@ -51,7 +63,9 @@ distro==1.9.0
einops==0.8.0
# via pipecat-ai (pyproject.toml)
exceptiongroup==1.2.1
# via anyio
# via
# anyio
# pytest
fal-client==0.4.0
# via pipecat-ai (pyproject.toml)
faster-whisper==1.0.2
@@ -75,7 +89,7 @@ frozenlist==1.4.1
# via
# aiohttp
# aiosignal
fsspec==2024.5.0
fsspec==2024.6.0
# via
# huggingface-hub
# torch
@@ -88,7 +102,7 @@ google-api-core[grpc]==2.19.0
# google-ai-generativelanguage
# google-api-python-client
# google-generativeai
google-api-python-client==2.131.0
google-api-python-client==2.132.0
# via google-generativeai
google-auth==2.29.0
# via
@@ -101,11 +115,13 @@ google-auth-httplib2==0.2.0
# via google-api-python-client
google-generativeai==0.5.4
# via pipecat-ai (pyproject.toml)
googleapis-common-protos==1.63.0
googleapis-common-protos==1.63.1
# via
# google-api-core
# grpcio-status
grpcio==1.64.0
greenlet==3.0.3
# via sqlalchemy
grpcio==1.64.1
# via
# google-api-core
# grpcio-status
@@ -123,6 +139,7 @@ httplib2==0.22.0
httpx==0.27.0
# via
# anthropic
# cartesia
# fal-client
# openai
httpx-sse==0.4.0
@@ -141,29 +158,62 @@ idna==3.7
# httpx
# requests
# yarl
iniconfig==2.0.0
# via pytest
itsdangerous==2.2.0
# via flask
jinja2==3.1.4
# via
# flask
# torch
jsonpatch==1.33
# via langchain-core
jsonpointer==2.4
# via jsonpatch
langchain==0.2.1
# via
# langchain-community
# pipecat-ai (pyproject.toml)
langchain-community==0.2.1
# via pipecat-ai (pyproject.toml)
langchain-core==0.2.3
# via
# langchain
# langchain-community
# langchain-openai
# langchain-text-splitters
langchain-openai==0.1.8
# via pipecat-ai (pyproject.toml)
langchain-text-splitters==0.2.0
# via langchain
langsmith==0.1.69
# via
# langchain
# langchain-community
# langchain-core
loguru==0.7.2
# via pipecat-ai (pyproject.toml)
markupsafe==2.1.5
# via
# jinja2
# werkzeug
marshmallow==3.21.2
# via dataclasses-json
mpmath==1.3.0
# via sympy
multidict==6.0.5
# via
# aiohttp
# yarl
mypy-extensions==1.0.0
# via typing-inspect
networkx==3.3
# via torch
numpy==1.26.4
# via
# ctranslate2
# langchain
# langchain-community
# onnxruntime
# pipecat-ai (pyproject.toml)
# pyloudnorm
@@ -204,16 +254,25 @@ nvidia-nvtx-cu12==12.1.105
onnxruntime==1.18.0
# via faster-whisper
openai==1.26.0
# via pipecat-ai (pyproject.toml)
packaging==24.0
# via
# langchain-openai
# pipecat-ai (pyproject.toml)
orjson==3.10.3
# via langsmith
packaging==23.2
# via
# huggingface-hub
# langchain-core
# marshmallow
# onnxruntime
# pytest
# transformers
pillow==10.3.0
# via
# pipecat-ai (pyproject.toml)
# torchvision
pluggy==1.5.0
# via pytest
proto-plus==1.23.0
# via
# google-ai-generativelanguage
@@ -226,6 +285,7 @@ protobuf==4.25.3
# googleapis-common-protos
# grpcio-status
# onnxruntime
# pipecat-ai (pyproject.toml)
# proto-plus
# pyht
pyasn1==0.6.0
@@ -236,12 +296,17 @@ pyasn1-modules==0.4.0
# via google-auth
pyaudio==0.2.14
# via pipecat-ai (pyproject.toml)
pydantic==2.7.2
pycparser==2.22
# via cffi
pydantic==2.7.3
# via
# anthropic
# google-generativeai
# langchain
# langchain-core
# langsmith
# openai
pydantic-core==2.18.3
pydantic-core==2.18.4
# via pydantic
pyht==0.0.28
# via pipecat-ai (pyproject.toml)
@@ -249,21 +314,35 @@ pyloudnorm==0.1.1
# via pipecat-ai (pyproject.toml)
pyparsing==3.1.2
# via httplib2
pytest==8.2.2
# via pytest-asyncio
pytest-asyncio==0.23.7
# via cartesia
python-dotenv==1.0.1
# via pipecat-ai (pyproject.toml)
pyyaml==6.0.1
# via
# ctranslate2
# huggingface-hub
# langchain
# langchain-community
# langchain-core
# timm
# transformers
regex==2024.5.15
# via transformers
requests==2.32.2
# via
# tiktoken
# transformers
requests==2.32.3
# via
# cartesia
# google-api-core
# huggingface-hub
# langchain
# langchain-community
# langsmith
# pyht
# tiktoken
# transformers
rsa==4.9
# via google-auth
@@ -279,10 +358,23 @@ sniffio==1.3.1
# anyio
# httpx
# openai
sympy==1.12
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sqlalchemy==2.0.30
# via
# langchain
# langchain-community
sympy==1.12.1
# via
# onnxruntime
# torch
tenacity==8.3.0
# via
# langchain
# langchain-community
# langchain-core
tiktoken==0.7.0
# via langchain-openai
timm==0.9.16
# via pipecat-ai (pyproject.toml)
tokenizers==0.19.1
@@ -290,6 +382,8 @@ tokenizers==0.19.1
# anthropic
# faster-whisper
# transformers
tomli==2.0.1
# via pytest
torch==2.3.0
# via
# pipecat-ai (pyproject.toml)
@@ -310,7 +404,7 @@ transformers==4.40.2
# via pipecat-ai (pyproject.toml)
triton==2.3.0
# via torch
typing-extensions==4.11.0
typing-extensions==4.12.1
# via
# anthropic
# anyio
@@ -320,13 +414,19 @@ typing-extensions==4.11.0
# pipecat-ai (pyproject.toml)
# pydantic
# pydantic-core
# sqlalchemy
# torch
# typing-inspect
typing-inspect==0.9.0
# via dataclasses-json
uritemplate==4.1.1
# via google-api-python-client
urllib3==2.2.1
# via requests
websockets==12.0
# via pipecat-ai (pyproject.toml)
# via
# cartesia
# pipecat-ai (pyproject.toml)
werkzeug==3.0.3
# via flask
yarl==1.9.4

View File

@@ -7,6 +7,8 @@
aiohttp==3.9.5
# via
# cartesia
# langchain
# langchain-community
# pipecat-ai (pyproject.toml)
aiosignal==1.3.1
# via aiohttp
@@ -20,7 +22,9 @@ anyio==4.4.0
# httpx
# openai
async-timeout==4.0.3
# via aiohttp
# via
# aiohttp
# langchain
attrs==23.2.0
# via aiohttp
av==12.1.0
@@ -31,9 +35,9 @@ blinker==1.8.2
# via flask
cachetools==5.3.3
# via google-auth
cartesia==0.1.0
cartesia==0.1.1
# via pipecat-ai (pyproject.toml)
certifi==2024.2.2
certifi==2024.6.2
# via
# httpcore
# httpx
@@ -50,6 +54,8 @@ ctranslate2==4.2.1
# via faster-whisper
daily-python==0.9.1
# via pipecat-ai (pyproject.toml)
dataclasses-json==0.6.6
# via langchain-community
distro==1.9.0
# via
# anthropic
@@ -82,7 +88,7 @@ frozenlist==1.4.1
# via
# aiohttp
# aiosignal
fsspec==2024.5.0
fsspec==2024.6.0
# via
# huggingface-hub
# torch
@@ -95,7 +101,7 @@ google-api-core[grpc]==2.19.0
# google-ai-generativelanguage
# google-api-python-client
# google-generativeai
google-api-python-client==2.131.0
google-api-python-client==2.132.0
# via google-generativeai
google-auth==2.29.0
# via
@@ -108,11 +114,11 @@ google-auth-httplib2==0.2.0
# via google-api-python-client
google-generativeai==0.5.4
# via pipecat-ai (pyproject.toml)
googleapis-common-protos==1.63.0
googleapis-common-protos==1.63.1
# via
# google-api-core
# grpcio-status
grpcio==1.64.0
grpcio==1.64.1
# via
# google-api-core
# grpcio-status
@@ -157,23 +163,54 @@ jinja2==3.1.4
# via
# flask
# torch
jsonpatch==1.33
# via langchain-core
jsonpointer==2.4
# via jsonpatch
langchain==0.2.2
# via
# langchain-community
# pipecat-ai (pyproject.toml)
langchain-community==0.2.2
# via pipecat-ai (pyproject.toml)
langchain-core==0.2.4
# via
# langchain
# langchain-community
# langchain-openai
# langchain-text-splitters
langchain-openai==0.1.8
# via pipecat-ai (pyproject.toml)
langchain-text-splitters==0.2.1
# via langchain
langsmith==0.1.69
# via
# langchain
# langchain-community
# langchain-core
loguru==0.7.2
# via pipecat-ai (pyproject.toml)
markupsafe==2.1.5
# via
# jinja2
# werkzeug
marshmallow==3.21.2
# via dataclasses-json
mpmath==1.3.0
# via sympy
multidict==6.0.5
# via
# aiohttp
# yarl
mypy-extensions==1.0.0
# via typing-inspect
networkx==3.3
# via torch
numpy==1.26.4
# via
# ctranslate2
# langchain
# langchain-community
# onnxruntime
# pipecat-ai (pyproject.toml)
# pyloudnorm
@@ -183,10 +220,16 @@ numpy==1.26.4
onnxruntime==1.18.0
# via faster-whisper
openai==1.26.0
# via pipecat-ai (pyproject.toml)
packaging==24.0
# via
# langchain-openai
# pipecat-ai (pyproject.toml)
orjson==3.10.3
# via langsmith
packaging==23.2
# via
# huggingface-hub
# langchain-core
# marshmallow
# onnxruntime
# pytest
# transformers
@@ -208,6 +251,7 @@ protobuf==4.25.3
# googleapis-common-protos
# grpcio-status
# onnxruntime
# pipecat-ai (pyproject.toml)
# proto-plus
# pyht
pyasn1==0.6.0
@@ -220,12 +264,15 @@ pyaudio==0.2.14
# via pipecat-ai (pyproject.toml)
pycparser==2.22
# via cffi
pydantic==2.7.2
pydantic==2.7.3
# via
# anthropic
# google-generativeai
# langchain
# langchain-core
# langsmith
# openai
pydantic-core==2.18.3
pydantic-core==2.18.4
# via pydantic
pyht==0.0.28
# via pipecat-ai (pyproject.toml)
@@ -233,7 +280,7 @@ pyloudnorm==0.1.1
# via pipecat-ai (pyproject.toml)
pyparsing==3.1.2
# via httplib2
pytest==8.2.1
pytest==8.2.2
# via pytest-asyncio
pytest-asyncio==0.23.7
# via cartesia
@@ -243,16 +290,25 @@ pyyaml==6.0.1
# via
# ctranslate2
# huggingface-hub
# langchain
# langchain-community
# langchain-core
# timm
# transformers
regex==2024.5.15
# via transformers
# via
# tiktoken
# transformers
requests==2.32.3
# via
# cartesia
# google-api-core
# huggingface-hub
# langchain
# langchain-community
# langsmith
# pyht
# tiktoken
# transformers
rsa==4.9
# via google-auth
@@ -270,10 +326,21 @@ sniffio==1.3.1
# openai
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sqlalchemy==2.0.30
# via
# langchain
# langchain-community
sympy==1.12.1
# via
# onnxruntime
# torch
tenacity==8.3.0
# via
# langchain
# langchain-community
# langchain-core
tiktoken==0.7.0
# via langchain-openai
timm==0.9.16
# via pipecat-ai (pyproject.toml)
tokenizers==0.19.1
@@ -301,7 +368,7 @@ tqdm==4.66.4
# transformers
transformers==4.40.2
# via pipecat-ai (pyproject.toml)
typing-extensions==4.11.0
typing-extensions==4.12.1
# via
# anthropic
# anyio
@@ -311,7 +378,11 @@ typing-extensions==4.11.0
# pipecat-ai (pyproject.toml)
# pydantic
# pydantic-core
# sqlalchemy
# torch
# typing-inspect
typing-inspect==0.9.0
# via dataclasses-json
uritemplate==4.1.1
# via google-api-python-client
urllib3==2.2.1

View File

@@ -24,8 +24,9 @@ dependencies = [
"numpy~=1.26.4",
"loguru~=0.7.0",
"Pillow~=10.3.0",
"protobuf~=4.25.3",
"pyloudnorm~=0.1.1",
"typing-extensions~=4.11.0",
"typing-extensions~=4.12.1",
]
[project.urls]
@@ -41,6 +42,7 @@ examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.0" ]
google = [ "google-generativeai~=0.5.3" ]
fireworks = [ "openai~=1.26.0" ]
langchain = [ "langchain~=0.2.1", "langchain-community~=0.2.1", "langchain-openai~=0.1.8" ]
local = [ "pyaudio~=0.2.0" ]
moondream = [ "einops~=0.8.0", "timm~=0.9.16", "transformers~=4.40.2" ]
openai = [ "openai~=1.26.0" ]

View File

@@ -4,28 +4,40 @@
// SPDX-License-Identifier: BSD 2-Clause License
//
// Generate frames_pb2.py with:
//
// python -m grpc_tools.protoc --proto_path=./ --python_out=./protobufs frames.proto
syntax = "proto3";
package pipecat_proto;
package pipecat;
message TextFrame {
string text = 1;
uint64 id = 1;
string name = 2;
string text = 3;
}
message AudioFrame {
bytes data = 1;
message AudioRawFrame {
uint64 id = 1;
string name = 2;
bytes audio = 3;
uint32 sample_rate = 4;
uint32 num_channels = 5;
}
message TranscriptionFrame {
string text = 1;
string participantId = 2;
string timestamp = 3;
uint64 id = 1;
string name = 2;
string text = 3;
string user_id = 4;
string timestamp = 5;
}
message Frame {
oneof frame {
TextFrame text = 1;
AudioFrame audio = 2;
TranscriptionFrame transcription = 3;
}
oneof frame {
TextFrame text = 1;
AudioRawFrame audio = 2;
TranscriptionFrame transcription = 3;
}
}

View File

@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: frames.proto
# Protobuf Python Version: 4.25.3
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
@@ -14,19 +14,19 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\rpipecat_proto\"\x19\n\tTextFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\"\x1a\n\nAudioFrame\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\"L\n\x12TranscriptionFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\x12\x15\n\rparticipantId\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"\xa2\x01\n\x05\x46rame\x12(\n\x04text\x18\x01 \x01(\x0b\x32\x18.pipecat_proto.TextFrameH\x00\x12*\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x19.pipecat_proto.AudioFrameH\x00\x12:\n\rtranscription\x18\x03 \x01(\x0b\x32!.pipecat_proto.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"c\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'frames_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals['_TEXTFRAME']._serialized_start=31
_globals['_TEXTFRAME']._serialized_end=56
_globals['_AUDIOFRAME']._serialized_start=58
_globals['_AUDIOFRAME']._serialized_end=84
_globals['_TRANSCRIPTIONFRAME']._serialized_start=86
_globals['_TRANSCRIPTIONFRAME']._serialized_end=162
_globals['_FRAME']._serialized_start=165
_globals['_FRAME']._serialized_end=327
_globals['_TEXTFRAME']._serialized_start=25
_globals['_TEXTFRAME']._serialized_end=76
_globals['_AUDIORAWFRAME']._serialized_start=78
_globals['_AUDIORAWFRAME']._serialized_end=177
_globals['_TRANSCRIPTIONFRAME']._serialized_start=179
_globals['_TRANSCRIPTIONFRAME']._serialized_end=275
_globals['_FRAME']._serialized_start=278
_globals['_FRAME']._serialized_end=425
# @@protoc_insertion_point(module_scope)

View File

@@ -67,7 +67,8 @@ class Pipeline(FrameProcessor):
await self._sink.process_frame(frame, FrameDirection.UPSTREAM)
async def _cleanup_processors(self):
await asyncio.gather(*[p.cleanup() for p in self._processors])
for p in self._processors:
await p.cleanup()
def _link_processors(self):
prev = self._processors[0]

View File

@@ -20,18 +20,15 @@ class PipelineRunner:
self.name: str = name or f"{self.__class__.__name__}#{obj_count(self)}"
self._tasks = {}
self._running = True
if handle_sigint:
self._setup_sigint()
async def run(self, task: PipelineTask):
logger.debug(f"Runner {self} started running {task}")
self._running = True
self._tasks[task.name] = task
await task.run()
del self._tasks[task.name]
self._running = False
logger.debug(f"Runner {self} finished running {task}")
async def stop_when_done(self):
@@ -42,18 +39,19 @@ class PipelineRunner:
logger.debug(f"Canceling runner {self}")
await asyncio.gather(*[t.cancel() for t in self._tasks.values()])
def is_active(self):
return self._running
def _setup_sigint(self):
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGINT,
lambda *args: asyncio.create_task(self._sigint_handler())
lambda *args: asyncio.create_task(self._sig_handler())
)
loop.add_signal_handler(
signal.SIGTERM,
lambda *args: asyncio.create_task(self._sig_handler())
)
async def _sigint_handler(self):
logger.warning(f"Ctrl-C detected. Canceling runner {self}")
async def _sig_handler(self):
logger.warning(f"Interruption detected. Canceling runner {self}")
await self.cancel()
def __str__(self):

View File

@@ -43,6 +43,7 @@ class PipelineTask:
self._pipeline = pipeline
self._params = params
self._finished = False
self._down_queue = asyncio.Queue()
self._up_queue = asyncio.Queue()
@@ -50,6 +51,9 @@ class PipelineTask:
self._source = Source(self._up_queue)
self._source.link(pipeline)
def has_finished(self):
return self._finished
async def stop_when_done(self):
logger.debug(f"Task {self} scheduled to stop when done")
await self.queue_frame(EndFrame())
@@ -67,6 +71,7 @@ class PipelineTask:
self._process_up_task = asyncio.create_task(self._process_up_queue())
self._process_down_task = asyncio.create_task(self._process_down_queue())
await asyncio.gather(self._process_up_task, self._process_down_task)
self._finished = True
async def queue_frame(self, frame: Frame):
await self._down_queue.put(frame)

View File

@@ -5,7 +5,7 @@
#
import asyncio
from asyncio import AbstractEventLoop
from enum import Enum
from pipecat.frames.frames import ErrorFrame, Frame
@@ -21,12 +21,12 @@ class FrameDirection(Enum):
class FrameProcessor:
def __init__(self):
def __init__(self, loop: asyncio.AbstractEventLoop | None = None):
self.id: int = obj_id()
self.name = f"{self.__class__.__name__}#{obj_count(self)}"
self._prev: "FrameProcessor" | None = None
self._next: "FrameProcessor" | None = None
self._loop: AbstractEventLoop = asyncio.get_running_loop()
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
async def cleanup(self):
pass
@@ -36,7 +36,7 @@ class FrameProcessor:
processor._prev = self
logger.debug(f"Linking {self} -> {self._next}")
def get_event_loop(self) -> AbstractEventLoop:
def get_event_loop(self) -> asyncio.AbstractEventLoop:
return self._loop
async def process_frame(self, frame: Frame, direction: FrameDirection):

View File

@@ -0,0 +1,77 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Union
from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMResponseEndFrame,
LLMResponseStartFrame,
TextFrame)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from loguru import logger
try:
from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import Runnable
except ModuleNotFoundError as e:
logger.exception(
"In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. "
)
raise Exception(f"Missing module: {e}")
class LangchainProcessor(FrameProcessor):
def __init__(self, chain: Runnable, transcript_key: str = "input"):
super().__init__()
self._chain = chain
self._transcript_key = transcript_key
self._participant_id: str | None = None
def set_participant_id(self, participant_id: str):
self._participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, LLMMessagesFrame):
# Messages are accumulated by the `LLMUserResponseAggregator` in a list of messages.
# The last one by the human is the one we want to send to the LLM.
logger.debug(f"Got transcription frame {frame}")
text: str = frame.messages[-1]["content"]
await self._ainvoke(text.strip())
else:
await self.push_frame(frame, direction)
@staticmethod
def __get_token_value(text: Union[str, AIMessageChunk]) -> str:
match text:
case str():
return text
case AIMessageChunk():
return text.content
case _:
return ""
async def _ainvoke(self, text: str):
logger.debug(f"Invoking chain with {text}")
await self.push_frame(LLMFullResponseStartFrame())
try:
async for token in self._chain.astream(
{self._transcript_key: text},
config={"configurable": {"session_id": self._participant_id}},
):
await self.push_frame(LLMResponseStartFrame())
await self.push_frame(TextFrame(self.__get_token_value(token)))
await self.push_frame(LLMResponseEndFrame())
except GeneratorExit:
logger.warning("Generator was closed prematurely")
except Exception as e:
logger.error(f"An unknown error occurred: {e}")
await self.push_frame(LLMFullResponseEndFrame())

View File

View File

@@ -1,16 +0,0 @@
from abc import abstractmethod
from pipecat.pipeline.frames import Frame
class FrameSerializer:
def __init__(self):
pass
@abstractmethod
def serialize(self, frame: Frame) -> bytes:
raise NotImplementedError
@abstractmethod
def deserialize(self, data: bytes) -> Frame:
raise NotImplementedError

View File

@@ -0,0 +1,20 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from pipecat.frames.frames import Frame
class FrameSerializer(ABC):
@abstractmethod
def serialize(self, frame: Frame) -> bytes:
pass
@abstractmethod
def deserialize(self, data: bytes) -> Frame:
pass

View File

@@ -1,14 +1,21 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import dataclasses
from typing import Text
from pipecat.pipeline.frames import AudioFrame, Frame, TextFrame, TranscriptionFrame
import pipecat.pipeline.protobufs.frames_pb2 as frame_protos
from pipecat.serializers.abstract_frame_serializer import FrameSerializer
import pipecat.frames.protobufs.frames_pb2 as frame_protos
from pipecat.frames.frames import AudioRawFrame, Frame, TextFrame, TranscriptionFrame
from pipecat.serializers.base_serializer import FrameSerializer
class ProtobufFrameSerializer(FrameSerializer):
SERIALIZABLE_TYPES = {
TextFrame: "text",
AudioFrame: "audio",
AudioRawFrame: "audio",
TranscriptionFrame: "transcription"
}
@@ -29,7 +36,8 @@ class ProtobufFrameSerializer(FrameSerializer):
setattr(getattr(proto_frame, proto_optional_name), field.name,
getattr(frame, field.name))
return proto_frame.SerializeToString()
result = proto_frame.SerializeToString()
return result
def deserialize(self, data: bytes) -> Frame:
"""Returns a Frame object from a Frame protobuf. Used to convert frames
@@ -61,4 +69,22 @@ class ProtobufFrameSerializer(FrameSerializer):
args_dict = {}
for field in proto.DESCRIPTOR.fields_by_name[which].message_type.fields:
args_dict[field.name] = getattr(args, field.name)
return class_name(**args_dict)
# Remove special fields if needed
id = getattr(args, "id")
name = getattr(args, "name")
if not id:
del args_dict["id"]
if not name:
del args_dict["name"]
# Create the instance
instance = class_name(**args_dict)
# Set special fields
if id:
setattr(instance, "id", getattr(args, "id"))
if name:
setattr(instance, "name", getattr(args, "name"))
return instance

View File

@@ -43,6 +43,31 @@ class LLMService(AIService):
def __init__(self):
super().__init__()
self._callbacks = {}
self._start_callbacks = {}
# TODO-CB: callback function type
def register_function(self, function_name: str, callback, start_callback=None):
self._callbacks[function_name] = callback
if start_callback:
self._start_callbacks[function_name] = start_callback
def unregister_function(self, function_name: str):
del self._callbacks[function_name]
if self._start_callbacks[function_name]:
del self._start_callbacks[function_name]
def has_function(self, function_name: str):
return function_name in self._callbacks.keys()
async def call_function(self, function_name: str, args):
if function_name in self._callbacks.keys():
return await self._callbacks[function_name](self, args)
return None
async def call_start_function(self, function_name: str):
if function_name in self._start_callbacks.keys():
await self._start_callbacks[function_name](self)
class TTSService(AIService):
@@ -171,7 +196,7 @@ class ImageGenService(AIService):
super().__init__()
# Renders the image. Returns an Image object.
@ abstractmethod
@abstractmethod
async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]:
pass
@@ -190,7 +215,7 @@ class VisionService(AIService):
super().__init__()
self._describe_text = None
@ abstractmethod
@abstractmethod
async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]:
pass

View File

@@ -4,8 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import asyncio
import time
import base64
@@ -80,8 +78,20 @@ class AnthropicLLMService(LLMService):
}]
})
else:
# text frame
anthropic_messages.append({"role": role, "content": content})
# Text frame. Anthropic needs the roles to alternate. This will
# cause an issue with interruptions. So, if we detect we are the
# ones asking again it probably means we were interrupted.
if role == "user" and len(anthropic_messages) > 1:
last_message = anthropic_messages[-1]
if last_message["role"] == "user":
anthropic_messages = anthropic_messages[:-1]
content = last_message["content"]
anthropic_messages.append(
{"role": "user", "content": f"Sorry, I just asked you about [{content}] but now I would like to know [{text}]."})
else:
anthropic_messages.append({"role": role, "content": text})
else:
anthropic_messages.append({"role": role, "content": text})
return anthropic_messages
@@ -107,7 +117,7 @@ class AnthropicLLMService(LLMService):
await self.push_frame(LLMResponseEndFrame())
except Exception as e:
logger.error(f"Exception: {e}")
logger.error(f"Anthrophic exception: {e}")
finally:
await self.push_frame(LLMFullResponseEndFrame())
@@ -125,22 +135,3 @@ class AnthropicLLMService(LLMService):
if context:
await self._process_context(context)
async def x_process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, LLMMessagesFrame):
stream = await self.client.messages.create(
max_tokens=self.max_tokens,
messages=[
{
"role": "user",
"content": "Hello, Claude",
}
],
model=self.model,
stream=True,
)
async for event in stream:
if event.type == "content_block_delta":
await self.push_frame(TextFrame(event.delta.text))
else:
await self.push_frame(frame, direction)

View File

@@ -6,10 +6,9 @@
from cartesia.tts import AsyncCartesiaTTS
import time
from typing import AsyncGenerator
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame
from pipecat.frames.frames import AudioRawFrame, Frame
from pipecat.services.ai_services import TTSService
from loguru import logger
@@ -22,35 +21,37 @@ class CartesiaTTSService(TTSService):
*,
api_key: str,
voice_name: str,
model_id: str = "upbeat-moon",
output_format: str = "pcm_16000",
**kwargs):
super().__init__(**kwargs)
self._api_key = api_key
self._voice_name = voice_name
self._client = None
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Transcribing text: [{text}]")
self._model_id = model_id
self._output_format = output_format
try:
if self._client is None:
self._client = AsyncCartesiaTTS(api_key=self._api_key)
voices = self._client.get_voices()
self._voice_id = voices[self._voice_name]["id"]
self._voice = self._client.get_voice_embedding(voice_id=self._voice_id)
self._client = AsyncCartesiaTTS(api_key=self._api_key)
voices = self._client.get_voices()
voice_id = voices[self._voice_name]["id"]
self._voice = self._client.get_voice_embedding(voice_id=voice_id)
except Exception as e:
logger.error(f"Cartesia initialization error: {e}")
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
try:
chunk_generator = await self._client.generate(
transcript=text, voice=self._voice, stream=True,
model_id="upbeat-moon", data_rtype='array', output_format='pcm_16000',
# a chunk_time of 0.1 seems to be the default. there are small audio pops/gaps which
# we need to debug
chunk_time=0.1
stream=True,
transcript=text,
voice=self._voice,
model_id=self._model_id,
output_format=self._output_format,
)
async for chunk in chunk_generator:
# print(f"")
frame = AudioRawFrame(chunk['audio'], 16000, 1)
yield frame
yield AudioRawFrame(chunk["audio"], chunk["sampling_rate"], 1)
except Exception as e:
logger.error(f"Exception {e}")
logger.error(f"Cartesia exception: {e}")

View File

@@ -30,10 +30,10 @@ class DeepgramTTSService(TTSService):
self._aiohttp_session = aiohttp_session
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.info(f"Running Deepgram TTS for {text}")
logger.debug(f"Generating TTS: [{text}]")
base_url = "https://api.deepgram.com/v1/speak"
request_url = f"{base_url}?model = {
self._voice} & encoding = linear16 & container = none & sample_rate = 16000"
request_url = f"{base_url}?model={self._voice}&encoding=linear16&container=none&sample_rate=16000"
headers = {"authorization": f"token {self._api_key}"}
body = {"text": text}
@@ -49,4 +49,4 @@ class DeepgramTTSService(TTSService):
frame = AudioRawFrame(audio=data, sample_rate=16000, num_channels=1)
yield frame
except Exception as e:
logger.error(f"Exception {e}")
logger.error(f"Deepgram exception: {e}")

View File

@@ -3,18 +3,18 @@
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import base64
import io
import json
import time
import aiohttp
import base64
from PIL import Image
from typing import AsyncGenerator, List, Literal
import aiohttp
from loguru import logger
from PIL import Image
from pipecat.frames.frames import (
AudioRawFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
@@ -26,24 +26,25 @@ from pipecat.frames.frames import (
URLImageRawFrame,
VisionImageRawFrame
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext, OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService, ImageGenService
from openai.types.chat import (
ChatCompletionSystemMessageParam,
ChatCompletionFunctionMessageParam,
ChatCompletionToolParam,
ChatCompletionUserMessageParam,
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import (
ImageGenService,
LLMService,
TTSService
)
from loguru import logger
try:
from openai import AsyncOpenAI, AsyncStream
from openai import AsyncOpenAI, AsyncStream, BadRequestError
from openai.types.chat import (
ChatCompletion,
ChatCompletionChunk,
ChatCompletionFunctionMessageParam,
ChatCompletionMessageParam,
ChatCompletionToolParam
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
@@ -70,23 +71,10 @@ class BaseOpenAILLMService(LLMService):
super().__init__()
self._model: str = model
self._client = self.create_client(api_key=api_key, base_url=base_url)
self._callbacks = {}
self._start_callbacks = {}
def create_client(self, api_key=None, base_url=None):
return AsyncOpenAI(api_key=api_key, base_url=base_url)
# TODO-CB: callback function type
def register_function(self, function_name, callback, start_callback=None):
self._callbacks[function_name] = callback
if start_callback:
self._start_callbacks[function_name] = start_callback
def unregister_function(self, function_name):
del self._callbacks[function_name]
if self._start_callbacks[function_name]:
del self._start_callbacks[function_name]
async def _stream_chat_completions(
self, context: OpenAILLMContext
) -> AsyncStream[ChatCompletionChunk]:
@@ -159,10 +147,7 @@ class BaseOpenAILLMService(LLMService):
if tool_call.function and tool_call.function.name:
function_name += tool_call.function.name
tool_call_id = tool_call.id
# only send a function start frame if we're not handling the function call
if function_name in self._callbacks.keys():
if function_name in self._start_callbacks.keys():
await self._start_callbacks[function_name](self)
await self.call_start_function(function_name)
if tool_call.function and tool_call.function.arguments:
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
@@ -176,9 +161,8 @@ class BaseOpenAILLMService(LLMService):
# the context, and re-prompt to get a chat answer. If we don't have a registered
# handler, raise an exception.
if function_name and arguments:
if function_name in self._callbacks.keys():
if self.has_function(function_name):
await self._handle_function_call(context, tool_call_id, function_name, arguments)
else:
raise OpenAIUnhandledFunctionException(
f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function.")
@@ -191,7 +175,7 @@ class BaseOpenAILLMService(LLMService):
arguments
):
arguments = json.loads(arguments)
result = await self._callbacks[function_name](self, arguments)
result = await self.call_function(function_name, arguments)
arguments = json.dumps(arguments)
if isinstance(result, (str, dict)):
# Handle it in "full magic mode"
@@ -249,7 +233,7 @@ class BaseOpenAILLMService(LLMService):
class OpenAILLMService(BaseOpenAILLMService):
def __init__(self, model="gpt-4", **kwargs):
def __init__(self, model="gpt-4o", **kwargs):
super().__init__(model, **kwargs)
@@ -292,3 +276,51 @@ class OpenAIImageGenService(ImageGenService):
image = Image.open(image_stream)
frame = URLImageRawFrame(image_url, image.tobytes(), image.size, image.format)
yield frame
class OpenAITTSService(TTSService):
"""This service uses the OpenAI TTS API to generate audio from text.
The returned audio is PCM encoded at 24kHz. When using the DailyTransport, set the sample rate in the DailyParams accordingly:
```
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24_000,
)
```
"""
def __init__(
self,
*,
api_key: str | None = None,
voice: Literal["alloy", "echo", "fable", "onyx", "nova", "shimmer"] = "alloy",
model: Literal["tts-1", "tts-1-hd"] = "tts-1",
**kwargs):
super().__init__(**kwargs)
self._voice = voice
self._model = model
self._client = AsyncOpenAI(api_key=api_key)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
try:
async with self._client.audio.speech.with_streaming_response.create(
input=text,
model=self._model,
voice=self._voice,
response_format="pcm",
) as r:
if r.status_code != 200:
error = await r.text()
logger.error(f"Error getting audio (status: {r.status_code}, error: {error})")
yield ErrorFrame(f"Error getting audio (status: {r.status_code}, error: {error})")
return
async for chunk in r.iter_bytes(8192):
if len(chunk) > 0:
frame = AudioRawFrame(chunk, 24_000, 1)
yield frame
except BadRequestError as e:
logger.error(f"Error generating TTS: {e}")

View File

@@ -1,9 +0,0 @@
class SearchIndexer():
def __init__(self, story_id):
pass
def index_text(self, text):
pass
def index_image(self, text):
pass

View File

View File

@@ -21,7 +21,7 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame)
from pipecat.transports.base_transport import TransportParams
from pipecat.vad.vad_analyzer import VADState
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
from loguru import logger
@@ -36,11 +36,7 @@ class BaseInputTransport(FrameProcessor):
self._running = False
self._allow_interruptions = False
self._in_executor = ThreadPoolExecutor(max_workers=5)
# Create audio input queue if needed.
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_queue = queue.Queue()
self._executor = ThreadPoolExecutor(max_workers=5)
# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
@@ -57,12 +53,11 @@ class BaseInputTransport(FrameProcessor):
self._running = True
# Create audio input queue and thread if needed.
if self._params.audio_in_enabled or self._params.vad_enabled:
loop = self.get_event_loop()
self._audio_in_thread = loop.run_in_executor(
self._in_executor, self._audio_in_thread_handler)
self._audio_out_thread = loop.run_in_executor(
self._in_executor, self._audio_out_thread_handler)
self._audio_in_queue = queue.Queue()
self._audio_thread = self._loop.run_in_executor(
self._executor, self._audio_thread_handler)
async def stop(self):
if not self._running:
@@ -73,16 +68,15 @@ class BaseInputTransport(FrameProcessor):
# Wait for the threads to finish.
if self._params.audio_in_enabled or self._params.vad_enabled:
await self._audio_in_thread
await self._audio_out_thread
await self._audio_thread
self._push_frame_task.cancel()
def vad_analyze(self, audio_frames: bytes) -> VADState:
pass
def vad_analyzer(self) -> VADAnalyzer | None:
return self._params.vad_analyzer
def read_raw_audio_frames(self, frame_count: int) -> bytes:
pass
def push_audio_frame(self, frame: AudioRawFrame):
self._audio_in_queue.put_nowait(frame)
#
# Frame processor
@@ -93,16 +87,16 @@ class BaseInputTransport(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, CancelFrame):
await self.stop()
# We don't queue a CancelFrame since we want to stop ASAP.
await self.push_frame(frame, direction)
await self.stop()
elif isinstance(frame, StartFrame):
self._allow_interruption = frame.allow_interruptions
await self.start(frame)
await self._internal_push_frame(frame, direction)
elif isinstance(frame, EndFrame):
await self.stop()
await self._internal_push_frame(frame, direction)
await self.stop()
else:
await self._internal_push_frame(frame, direction)
@@ -150,8 +144,15 @@ class BaseInputTransport(FrameProcessor):
# Audio input
#
def _vad_analyze(self, audio_frames: bytes) -> VADState:
state = VADState.QUIET
vad_analyzer = self.vad_analyzer()
if vad_analyzer:
state = vad_analyzer.analyze_audio(audio_frames)
return state
def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
new_vad_state = self.vad_analyze(audio_frames)
new_vad_state = self._vad_analyze(audio_frames)
if new_vad_state != vad_state and new_vad_state != VADState.STARTING and new_vad_state != VADState.STOPPING:
frame = None
if new_vad_state == VADState.SPEAKING:
@@ -167,44 +168,26 @@ class BaseInputTransport(FrameProcessor):
vad_state = new_vad_state
return vad_state
def _audio_in_thread_handler(self):
sample_rate = self._params.audio_in_sample_rate
num_channels = self._params.audio_in_channels
num_frames = int(sample_rate / 100) # 10ms of audio
while self._running:
try:
audio_frames = self.read_raw_audio_frames(num_frames)
if len(audio_frames) > 0:
frame = AudioRawFrame(
audio=audio_frames,
sample_rate=sample_rate,
num_channels=num_channels)
self._audio_in_queue.put(frame)
except BaseException as e:
logger.error(f"Error reading audio frames: {e}")
def _audio_out_thread_handler(self):
def _audio_thread_handler(self):
vad_state: VADState = VADState.QUIET
while self._running:
try:
frame = self._audio_in_queue.get(timeout=1)
frame: AudioRawFrame = self._audio_in_queue.get(timeout=1)
audio_passthrough = True
# Check VAD and push event if necessary. We just care about changes
# from QUIET to SPEAKING and vice versa.
# Check VAD and push event if necessary. We just care about
# changes from QUIET to SPEAKING and vice versa.
if self._params.vad_enabled:
vad_state = self._handle_vad(frame.audio, vad_state)
audio_passthrough = self._params.vad_audio_passthrough
# Push audio downstream if passthrough.
# Push audio downstream if passthrough.
if audio_passthrough:
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
self._internal_push_frame(frame), self._loop)
future.result()
self._audio_in_queue.task_done()
except queue.Empty:
pass
except BaseException as e:
logger.error(f"Error pushing audio frames: {e}")
logger.error(f"Error reading audio frames: {e}")

View File

@@ -43,7 +43,7 @@ class BaseOutputTransport(FrameProcessor):
self._running = False
self._allow_interruptions = False
self._out_executor = ThreadPoolExecutor(max_workers=5)
self._executor = ThreadPoolExecutor(max_workers=5)
# These are the images that we should send to the camera at our desired
# framerate.
@@ -57,6 +57,10 @@ class BaseOutputTransport(FrameProcessor):
self._stopped_event = asyncio.Event()
self._is_interrupted = threading.Event()
# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
self._create_push_task()
async def start(self, frame: StartFrame):
# Make sure we have the latest params. Note that this transport might
# have been started on another task that might not need interruptions,
@@ -70,15 +74,12 @@ class BaseOutputTransport(FrameProcessor):
loop = self.get_event_loop()
# Create queues and threads.
if self._params.camera_out_enabled:
self._camera_out_thread = loop.run_in_executor(
self._out_executor, self._camera_out_thread_handler)
self._executor, self._camera_out_thread_handler)
self._sink_thread = loop.run_in_executor(self._out_executor, self._sink_thread_handler)
# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
self._create_push_task()
self._sink_thread = loop.run_in_executor(self._executor, self._sink_thread_handler)
async def stop(self):
if not self._running:
@@ -117,16 +118,16 @@ class BaseOutputTransport(FrameProcessor):
#
if isinstance(frame, StartFrame):
await self.start(frame)
self._sink_queue.put(frame)
self._sink_queue.put_nowait(frame)
# EndFrame is managed in the queue handler.
elif isinstance(frame, CancelFrame):
await self.push_frame(frame, direction)
await self.stop()
await self.push_frame(frame, direction)
elif isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame):
await self._handle_interruptions(frame)
await self.push_frame(frame, direction)
await self._handle_interruptions(frame)
else:
self._sink_queue.put(frame)
self._sink_queue.put_nowait(frame)
# If we are finishing, wait here until we have stopped, otherwise we might
# close things too early upstream. We need this event because we don't
@@ -233,7 +234,7 @@ class BaseOutputTransport(FrameProcessor):
def _set_camera_image(self, image: ImageRawFrame):
if self._params.camera_out_is_live:
self._camera_out_queue.put(image)
self._camera_out_queue.put_nowait(image)
else:
self._camera_images = itertools.cycle([image])

View File

@@ -4,6 +4,9 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import inspect
from abc import ABC, abstractmethod
from pydantic import ConfigDict
@@ -12,6 +15,8 @@ from pydantic.main import BaseModel
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.vad.vad_analyzer import VADAnalyzer
from loguru import logger
class TransportParams(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -36,6 +41,10 @@ class TransportParams(BaseModel):
class BaseTransport(ABC):
def __init__(self, loop: asyncio.AbstractEventLoop | None):
self._loop = loop or asyncio.get_running_loop()
self._event_handlers: dict = {}
@abstractmethod
def input(self) -> FrameProcessor:
raise NotImplementedError
@@ -43,3 +52,30 @@ class BaseTransport(ABC):
@abstractmethod
def output(self) -> FrameProcessor:
raise NotImplementedError
def event_handler(self, event_name: str):
def decorator(handler):
self._add_event_handler(event_name, handler)
return handler
return decorator
def _register_event_handler(self, event_name: str):
if event_name in self._event_handlers:
raise Exception(f"Event handler {event_name} already registered")
self._event_handlers[event_name] = []
def _add_event_handler(self, event_name: str, handler):
if event_name not in self._event_handlers:
raise Exception(f"Event handler {event_name} not registered")
self._event_handlers[event_name].append(handler)
async def _call_event_handler(self, event_name: str, *args, **kwargs):
try:
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.error(f"Exception in event handler {event_name}: {e}")
raise e

View File

@@ -6,7 +6,7 @@
import asyncio
from pipecat.frames.frames import StartFrame
from pipecat.frames.frames import AudioRawFrame, StartFrame
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
@@ -28,16 +28,17 @@ class LocalAudioInputTransport(BaseInputTransport):
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
super().__init__(params)
sample_rate = self._params.audio_in_sample_rate
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
self._in_stream = py_audio.open(
format=py_audio.get_format_from_width(2),
channels=params.audio_in_channels,
rate=params.audio_in_sample_rate,
frames_per_buffer=params.audio_in_sample_rate,
frames_per_buffer=num_frames,
stream_callback=self._audio_in_callback,
input=True)
def read_raw_audio_frames(self, frame_count: int) -> bytes:
return self._in_stream.read(frame_count, exception_on_overflow=False)
async def start(self, frame: StartFrame):
await super().start(frame)
self._in_stream.start_stream()
@@ -54,6 +55,17 @@ class LocalAudioInputTransport(BaseInputTransport):
await super().cleanup()
def _audio_in_callback(self, in_data, frame_count, time_info, status):
if not self._running:
return (None, pyaudio.paAbort)
frame = AudioRawFrame(audio=in_data,
sample_rate=self._params.audio_in_sample_rate,
num_channels=self._params.audio_in_channels)
self.push_audio_frame(frame)
return (None, pyaudio.paContinue)
class LocalAudioOutputTransport(BaseOutputTransport):
@@ -69,21 +81,9 @@ class LocalAudioOutputTransport(BaseOutputTransport):
def write_raw_audio_frames(self, frames: bytes):
self._out_stream.write(frames)
async def start(self, frame: StartFrame):
await super().start(frame)
self._out_stream.start_stream()
async def stop(self):
await super().stop()
self._out_stream.stop_stream()
async def cleanup(self):
# This is not very pretty (taken from PyAudio docs).
while self._out_stream.is_active():
await asyncio.sleep(0.1)
self._out_stream.close()
await super().cleanup()
self._out_stream.close()
class LocalAudioTransport(BaseTransport):

View File

@@ -9,7 +9,7 @@ import asyncio
import numpy as np
import tkinter as tk
from pipecat.frames.frames import ImageRawFrame, StartFrame
from pipecat.frames.frames import AudioRawFrame, ImageRawFrame, StartFrame
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
@@ -38,16 +38,17 @@ class TkInputTransport(BaseInputTransport):
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
super().__init__(params)
sample_rate = self._params.audio_in_sample_rate
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
self._in_stream = py_audio.open(
format=py_audio.get_format_from_width(2),
channels=params.audio_in_channels,
rate=params.audio_in_sample_rate,
frames_per_buffer=params.audio_in_sample_rate,
frames_per_buffer=num_frames,
stream_callback=self._audio_in_callback,
input=True)
def read_raw_audio_frames(self, frame_count: int) -> bytes:
return self._in_stream.read(frame_count, exception_on_overflow=False)
async def start(self, frame: StartFrame):
await super().start(frame)
self._in_stream.start_stream()
@@ -57,12 +58,22 @@ class TkInputTransport(BaseInputTransport):
self._in_stream.stop_stream()
async def cleanup(self):
await super().cleanup()
# This is not very pretty (taken from PyAudio docs).
while self._in_stream.is_active():
await asyncio.sleep(0.1)
self._in_stream.close()
await super().cleanup()
def _audio_in_callback(self, in_data, frame_count, time_info, status):
if not self._running:
return (None, pyaudio.paAbort)
frame = AudioRawFrame(audio=in_data,
sample_rate=self._params.audio_in_sample_rate,
num_channels=self._params.audio_in_channels)
self.push_audio_frame(frame)
return (None, pyaudio.paContinue)
class TkOutputTransport(BaseOutputTransport):
@@ -89,21 +100,9 @@ class TkOutputTransport(BaseOutputTransport):
def write_frame_to_camera(self, frame: ImageRawFrame):
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
async def start(self, frame: StartFrame):
await super().start(frame)
self._out_stream.start_stream()
async def stop(self):
await super().stop()
self._out_stream.stop_stream()
async def cleanup(self):
# This is not very pretty (taken from PyAudio docs).
while self._out_stream.is_active():
await asyncio.sleep(0.1)
self._out_stream.close()
await super().cleanup()
self._out_stream.close()
def _write_frame_to_tk(self, frame: ImageRawFrame):
width = frame.size[0]

View File

@@ -0,0 +1,206 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import io
import queue
import wave
import websockets
from typing import Awaitable, Callable
from pydantic.main import BaseModel
from pipecat.frames.frames import AudioRawFrame, StartFrame
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from loguru import logger
class WebsocketServerParams(TransportParams):
add_wav_header: bool = False
audio_frame_size: int = 6400 # 200ms
serializer: FrameSerializer = ProtobufFrameSerializer()
class WebsocketServerCallbacks(BaseModel):
on_client_connected: Callable[[websockets.WebSocketServerProtocol], Awaitable[None]]
on_client_disconnected: Callable[[websockets.WebSocketServerProtocol], Awaitable[None]]
class WebsocketServerInputTransport(BaseInputTransport):
def __init__(
self,
host: str,
port: int,
params: WebsocketServerParams,
callbacks: WebsocketServerCallbacks):
super().__init__(params)
self._host = host
self._port = port
self._params = params
self._callbacks = callbacks
self._websocket: websockets.WebSocketServerProtocol | None = None
self._client_audio_queue = queue.Queue()
self._stop_server_event = asyncio.Event()
async def start(self, frame: StartFrame):
self._server_task = self.get_event_loop().create_task(self._server_task_handler())
await super().start(frame)
async def stop(self):
self._stop_server_event.set()
await self._server_task
await super().stop()
def read_next_audio_frame(self) -> AudioRawFrame | None:
try:
return self._client_audio_queue.get(timeout=1)
except queue.Empty:
return None
async def _server_task_handler(self):
logger.info(f"Starting websocket server on {self._host}:{self._port}")
async with websockets.serve(self._client_handler, self._host, self._port) as server:
await self._stop_server_event.wait()
async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, path):
logger.info(f"New client connection from {websocket.remote_address}")
if self._websocket:
await self._websocket.close()
logger.warning("Only one client connected, using new connection")
self._websocket = websocket
# Notify
await self._callbacks.on_client_connected(websocket)
# Handle incoming messages
async for message in websocket:
frame = self._params.serializer.deserialize(message)
if isinstance(frame, AudioRawFrame) and self._params.audio_in_enabled:
self._client_audio_queue.put_nowait(frame)
else:
await self._internal_push_frame(frame)
# Notify disconnection
await self._callbacks.on_client_disconnected(websocket)
await self._websocket.close()
self._websocket = None
logger.info(f"Client {websocket.remote_address} disconnected")
class WebsocketServerOutputTransport(BaseOutputTransport):
def __init__(self, params: WebsocketServerParams):
super().__init__(params)
self._params = params
self._websocket: websockets.WebSocketServerProtocol | None = None
self._audio_buffer = bytes()
async def set_client_connection(self, websocket: websockets.WebSocketServerProtocol | None):
if self._websocket:
await self._websocket.close()
logger.warning("Only one client allowed, using new connection")
self._websocket = websocket
def write_raw_audio_frames(self, frames: bytes):
self._audio_buffer += frames
while len(self._audio_buffer) >= self._params.audio_frame_size:
frame = AudioRawFrame(
audio=self._audio_buffer[:self._params.audio_frame_size],
sample_rate=self._params.audio_out_sample_rate,
num_channels=self._params.audio_out_channels
)
if self._params.add_wav_header:
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(frame.num_channels)
ww.setframerate(frame.sample_rate)
ww.writeframes(frame.audio)
ww.close()
content.seek(0)
wav_frame = AudioRawFrame(
content.read(),
sample_rate=frame.sample_rate,
num_channels=frame.num_channels)
frame = wav_frame
proto = self._params.serializer.serialize(frame)
future = asyncio.run_coroutine_threadsafe(
self._websocket.send(proto), self.get_event_loop())
future.result()
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
class WebsocketServerTransport(BaseTransport):
def __init__(
self,
host: str = "localhost",
port: int = 8765,
params: WebsocketServerParams = WebsocketServerParams(),
loop: asyncio.AbstractEventLoop | None = None):
super().__init__(loop)
self._host = host
self._port = port
self._params = params
self._callbacks = WebsocketServerCallbacks(
on_client_connected=self._on_client_connected,
on_client_disconnected=self._on_client_disconnected
)
self._input: WebsocketServerInputTransport | None = None
self._output: WebsocketServerOutputTransport | None = None
self._websocket: websockets.WebSocketServerProtocol | None = None
# Register supported handlers. The user will only be able to register
# these handlers.
self._register_event_handler("on_client_connected")
self._register_event_handler("on_client_disconnected")
def input(self) -> FrameProcessor:
if not self._input:
self._input = WebsocketServerInputTransport(
self._host, self._port, self._params, self._callbacks)
return self._input
def output(self) -> FrameProcessor:
if not self._output:
self._output = WebsocketServerOutputTransport(self._params)
return self._output
async def _on_client_connected(self, websocket):
if self._output:
await self._output.set_client_connection(websocket)
await self._call_event_handler("on_client_connected", websocket)
else:
logger.error("A WebsocketServerTransport output is missing in the pipeline")
async def _on_client_disconnected(self, websocket):
if self._output:
await self._output.set_client_connection(None)
await self._call_event_handler("on_client_disconnected", websocket)
else:
logger.error("A WebsocketServerTransport output is missing in the pipeline")

View File

@@ -6,15 +6,12 @@
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor
import inspect
import queue
import time
import types
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Mapping
from concurrent.futures import ThreadPoolExecutor
from daily import (
CallClient,
@@ -40,7 +37,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.vad.vad_analyzer import VADAnalyzer, VADParams, VADState
from pipecat.vad.vad_analyzer import VADAnalyzer, VADParams
from loguru import logger
@@ -102,7 +99,7 @@ class DailyTranscriptionSettings(BaseModel):
class DailyParams(TransportParams):
api_url: str = "https://api.daily.co"
api_url: str = "https://api.daily.co/v1"
api_key: str = ""
dialin_settings: DailyDialinSettings | None = None
transcription_enabled: bool = False
@@ -139,7 +136,8 @@ class DailyTransportClient(EventHandler):
token: str | None,
bot_name: str,
params: DailyParams,
callbacks: DailyCallbacks):
callbacks: DailyCallbacks,
loop: asyncio.AbstractEventLoop):
super().__init__()
if not self._daily_initialized:
@@ -151,6 +149,7 @@ class DailyTransportClient(EventHandler):
self._bot_name: str = bot_name
self._params: DailyParams = params
self._callbacks = callbacks
self._loop = loop
self._participant_id: str = ""
self._video_renderers = {}
@@ -189,15 +188,22 @@ class DailyTransportClient(EventHandler):
def send_message(self, frame: DailyTransportMessageFrame):
self._client.send_app_message(frame.message, frame.participant_id)
def read_raw_audio_frames(self, frame_count: int) -> bytes:
def read_next_audio_frame(self) -> AudioRawFrame | None:
sample_rate = self._params.audio_in_sample_rate
num_channels = self._params.audio_in_channels
if self._other_participant_has_joined:
return self._speaker.read_frames(frame_count)
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
audio = self._speaker.read_frames(num_frames)
return AudioRawFrame(audio=audio, sample_rate=sample_rate, num_channels=num_channels)
else:
# If no one has ever joined the meeting `read_frames()` would block,
# instead we just wait a bit. daily-python should probably return
# silence instead.
time.sleep(0.01)
return b''
return None
def write_raw_audio_frames(self, frames: bytes):
self._mic.write_frames(frames)
@@ -212,8 +218,7 @@ class DailyTransportClient(EventHandler):
self._joining = True
loop = asyncio.get_running_loop()
await loop.run_in_executor(self._executor, self._join)
await self._loop.run_in_executor(self._executor, self._join)
def _join(self):
logger.info(f"Joining {self._room_url}")
@@ -304,8 +309,7 @@ class DailyTransportClient(EventHandler):
self._joined = False
self._leaving = True
loop = asyncio.get_running_loop()
await loop.run_in_executor(self._executor, self._leave)
await self._loop.run_in_executor(self._executor, self._leave)
def _leave(self):
logger.info(f"Leaving {self._room_url}")
@@ -335,8 +339,7 @@ class DailyTransportClient(EventHandler):
self._callbacks.on_error(error_msg)
async def cleanup(self):
loop = asyncio.get_running_loop()
await loop.run_in_executor(self._executor, self._cleanup)
await self._loop.run_in_executor(self._executor, self._cleanup)
def _cleanup(self):
if self._client:
@@ -469,9 +472,8 @@ class DailyInputTransport(BaseInputTransport):
self._client = client
self._video_renderers = {}
self._camera_in_queue = queue.Queue()
self._vad_analyzer = params.vad_analyzer
self._vad_analyzer: VADAnalyzer | None = params.vad_analyzer
if params.vad_enabled and not params.vad_analyzer:
self._vad_analyzer = WebRTCVADAnalyzer(
sample_rate=self._params.audio_in_sample_rate,
@@ -480,37 +482,33 @@ class DailyInputTransport(BaseInputTransport):
async def start(self, frame: StartFrame):
if self._running:
return
# Parent start.
await super().start(frame)
# Join the room.
await self._client.join()
# This will set _running=True
await super().start(frame)
# Create camera in thread (runs if _running is true).
loop = asyncio.get_running_loop()
self._camera_in_thread = loop.run_in_executor(
self._in_executor, self._camera_in_thread_handler)
# Create audio task. It reads audio frames from Daily and push them
# internally for VAD processing.
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_thread = self._loop.run_in_executor(
self._executor, self._audio_in_thread_handler)
async def stop(self):
if not self._running:
return
# Parent stop. This will set _running to False.
await super().stop()
# Leave the room.
await self._client.leave()
# This will set _running=False
await super().stop()
# The thread will stop.
await self._camera_in_thread
# Stop audio thread.
if self._params.audio_in_enabled or self._params.vad_enabled:
await self._audio_in_thread
async def cleanup(self):
await super().cleanup()
await self._client.cleanup()
def vad_analyze(self, audio_frames: bytes) -> VADState:
state = VADState.QUIET
if self._vad_analyzer:
state = self._vad_analyzer.analyze_audio(audio_frames)
return state
def read_raw_audio_frames(self, frame_count: int) -> bytes:
return self._client.read_raw_audio_frames(frame_count)
def vad_analyzer(self) -> VADAnalyzer | None:
return self._vad_analyzer
#
# FrameProcessor
@@ -537,6 +535,16 @@ class DailyInputTransport(BaseInputTransport):
self._internal_push_frame(frame), self.get_event_loop())
future.result()
#
# Audio in
#
def _audio_in_thread_handler(self):
while self._running:
frame = self._client.read_next_audio_frame()
if frame:
self.push_audio_frame(frame)
#
# Camera in
#
@@ -585,23 +593,12 @@ class DailyInputTransport(BaseInputTransport):
image=buffer,
size=size,
format=format)
self._camera_in_queue.put(frame)
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()
self._video_renderers[participant_id]["timestamp"] = curr_time
def _camera_in_thread_handler(self):
while self._running:
try:
frame = self._camera_in_queue.get(timeout=1)
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()
self._camera_in_queue.task_done()
except queue.Empty:
pass
except BaseException as e:
logger.error(f"Error capturing video: {e}")
class DailyOutputTransport(BaseOutputTransport):
@@ -613,7 +610,7 @@ class DailyOutputTransport(BaseOutputTransport):
async def start(self, frame: StartFrame):
if self._running:
return
# This will set _running=True
# Parent start.
await super().start(frame)
# Join the room.
await self._client.join()
@@ -621,7 +618,7 @@ class DailyOutputTransport(BaseOutputTransport):
async def stop(self):
if not self._running:
return
# This will set _running=False
# Parent stop. This will set _running to False.
await super().stop()
# Leave the room.
await self._client.leave()
@@ -642,7 +639,15 @@ class DailyOutputTransport(BaseOutputTransport):
class DailyTransport(BaseTransport):
def __init__(self, room_url: str, token: str | None, bot_name: str, params: DailyParams):
def __init__(
self,
room_url: str,
token: str | None,
bot_name: str,
params: DailyParams,
loop: asyncio.AbstractEventLoop | None = None):
super().__init__(loop)
callbacks = DailyCallbacks(
on_joined=self._on_joined,
on_left=self._on_left,
@@ -660,12 +665,10 @@ class DailyTransport(BaseTransport):
)
self._params = params
self._client = DailyTransportClient(room_url, token, bot_name, params, callbacks)
self._client = DailyTransportClient(
room_url, token, bot_name, params, callbacks, self._loop)
self._input: DailyInputTransport | None = None
self._output: DailyOutputTransport | None = None
self._loop = asyncio.get_running_loop()
self._event_handlers: dict = {}
# Register supported handlers. The user will only be able to register
# these handlers.
@@ -741,10 +744,10 @@ class DailyTransport(BaseTransport):
participant_id, framerate, video_source, color_format)
def _on_joined(self, participant):
self.on_joined(participant)
self._call_async_event_handler("on_joined", participant)
def _on_left(self):
self.on_left()
self._call_async_event_handler("on_left")
def _on_error(self, error):
# TODO(aleix): Report error to input/output transports. The one managing
@@ -754,10 +757,10 @@ class DailyTransport(BaseTransport):
def _on_app_message(self, message: Any, sender: str):
if self._input:
self._input.push_app_message(message, sender)
self.on_app_message(message, sender)
self._call_async_event_handler("on_app_message", message, sender)
def _on_call_state_updated(self, state: str):
self.on_call_state_updated(state)
self._call_async_event_handler("on_call_state_updated", state)
async def _handle_dialin_ready(self, sip_endpoint: str):
if not self._params.dialin_settings:
@@ -766,7 +769,7 @@ class DailyTransport(BaseTransport):
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Bearer {self._params.api_key}",
"Content-Type": "application/x-www-form-urlencoded"
"Content-Type": "application/json"
}
data = {
"callId": self._params.dialin_settings.call_id,
@@ -777,7 +780,7 @@ class DailyTransport(BaseTransport):
url = f"{self._params.api_url}/dialin/pinlessCallUpdate"
try:
async with session.post(url, headers=headers, data=data, timeout=10) as r:
async with session.post(url, headers=headers, json=data, timeout=10) as r:
if r.status != 200:
text = await r.text()
logger.error(
@@ -793,28 +796,28 @@ class DailyTransport(BaseTransport):
def _on_dialin_ready(self, sip_endpoint):
if self._params.dialin_settings:
asyncio.run_coroutine_threadsafe(self._handle_dialin_ready(sip_endpoint), self._loop)
self.on_dialin_ready(sip_endpoint)
self._call_async_event_handler("on_dialin_ready", sip_endpoint)
def _on_dialout_connected(self, data):
self.on_dialout_connected(data)
self._call_async_event_handler("on_dialout_connected", data)
def _on_dialout_stopped(self, data):
self.on_dialout_stopped(data)
self._call_async_event_handler("on_dialout_stopped", data)
def _on_dialout_error(self, data):
self.on_dialout_error(data)
self._call_async_event_handler("on_dialout_error", data)
def _on_dialout_warning(self, data):
self.on_dialout_warning(data)
self._call_async_event_handler("on_dialout_warning", data)
def _on_participant_joined(self, participant):
self.on_participant_joined(participant)
self._call_async_event_handler("on_participant_joined", participant)
def _on_participant_left(self, participant, reason):
self.on_participant_left(participant, reason)
self._call_async_event_handler("on_participant_left", participant, reason)
def _on_first_participant_joined(self, participant):
self.on_first_participant_joined(participant)
self._call_async_event_handler("on_first_participant_joined", participant)
def _on_transcription_message(self, participant_id, message):
text = message["text"]
@@ -829,84 +832,7 @@ class DailyTransport(BaseTransport):
if self._input:
self._input.push_transcription_frame(frame)
#
# Decorators (event handlers)
#
def on_joined(self, participant):
pass
def on_left(self):
pass
def on_app_message(self, message, sender):
pass
def on_call_state_updated(self, state):
pass
def on_dialin_ready(self, sip_endpoint):
pass
def on_dialout_connected(self, data):
pass
def on_dialout_stopped(self, data):
pass
def on_dialout_error(self, data):
pass
def on_dialout_warning(self, data):
pass
def on_first_participant_joined(self, participant):
pass
def on_participant_joined(self, participant):
pass
def on_participant_left(self, participant, reason):
pass
def event_handler(self, event_name: str):
def decorator(handler):
self._add_event_handler(event_name, handler)
return handler
return decorator
def _register_event_handler(self, event_name: str):
methods = inspect.getmembers(self, predicate=inspect.ismethod)
if event_name not in [method[0] for method in methods]:
raise Exception(f"Event handler {event_name} not found")
self._event_handlers[event_name] = [getattr(self, event_name)]
patch_method = types.MethodType(partial(self._patch_method, event_name), self)
setattr(self, event_name, patch_method)
def _add_event_handler(self, event_name: str, handler):
if event_name not in self._event_handlers:
raise Exception(f"Event handler {event_name} not registered")
self._event_handlers[event_name].append(types.MethodType(handler, self))
def _patch_method(self, event_name, *args, **kwargs):
try:
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
# Beware, if handler() calls another event handler it
# will deadlock. You shouldn't do that anyways.
future = asyncio.run_coroutine_threadsafe(
handler(*args[1:], **kwargs), self._loop)
# wait for the coroutine to finish. This will also
# raise any exceptions raised by the coroutine.
future.result()
else:
handler(*args[1:], **kwargs)
except Exception as e:
logger.error(f"Exception in event handler {event_name}: {e}")
raise e
# def start_recording(self):
# self.client.start_recording()
def _call_async_event_handler(self, event_name: str, *args, **kwargs):
future = asyncio.run_coroutine_threadsafe(
self._call_event_handler(event_name, *args, **kwargs), self._loop)
future.result()

View File

@@ -0,0 +1,137 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""
Daily REST Helpers
Methods that wrap the Daily API to create rooms, check room URLs, and get meeting tokens.
"""
from urllib.parse import urlparse
import requests
from typing import Literal, Optional
from time import time
from pydantic import BaseModel, ValidationError
class DailyRoomSipParams(BaseModel):
display_name: str = "sw-sip-dialin"
video: bool = False
sip_mode: str = "dial-in"
num_endpoints: int = 1
class DailyRoomProperties(BaseModel):
exp: float = time() + 5 * 60
enable_chat: bool = False
enable_emoji_reactions: bool = False
eject_at_room_exp: bool = True
enable_dialout: Optional[bool] = None
sip: Optional[DailyRoomSipParams] = None
sip_uri: Optional[dict] = None
@property
def sip_endpoint(self) -> str:
if not self.sip_uri:
return ""
else:
return "sip:%s" % self.sip_uri['endpoint']
class DailyRoomParams(BaseModel):
name: Optional[str] = None
privacy: Literal['private', 'public'] = "public"
properties: DailyRoomProperties = DailyRoomProperties()
class DailyRoomObject(BaseModel):
id: str
name: str
api_created: bool
privacy: str
url: str
created_at: str
config: DailyRoomProperties
class DailyRESTHelper:
def __init__(self, daily_api_key: str, daily_api_url: str = "https://api.daily.co/v1"):
self.daily_api_key = daily_api_key
self.daily_api_url = daily_api_url
def _get_name_from_url(self, room_url: str) -> str:
return urlparse(room_url).path[1:]
def create_room(self, params: DailyRoomParams) -> DailyRoomObject:
res = requests.post(
f"{self.daily_api_url}/rooms",
headers={"Authorization": f"Bearer {self.daily_api_key}"},
json={**params.model_dump(exclude_none=True)}
)
if res.status_code != 200:
raise Exception(f"Unable to create room: {res.text}")
data = res.json()
try:
room = DailyRoomObject(**data)
except ValidationError as e:
raise Exception(f"Invalid response: {e}")
return room
def _get_room_from_name(self, room_name: str) -> DailyRoomObject:
res: requests.Response = requests.get(
f"{self.daily_api_url}/rooms/{room_name}",
headers={"Authorization": f"Bearer {self.daily_api_key}"}
)
if res.status_code != 200:
raise Exception(f"Room not found: {room_name}")
data = res.json()
try:
room = DailyRoomObject(**data)
except ValidationError as e:
raise Exception(f"Invalid response: {e}")
return room
def get_room_from_url(self, room_url: str,) -> DailyRoomObject:
room_name = self._get_name_from_url(room_url)
return self._get_room_from_name(room_name)
def get_token(self, room_url: str, expiry_time: float = 60 * 60, owner: bool = True) -> str:
if not room_url:
raise Exception(
"No Daily room specified. You must specify a Daily room in order a token to be generated.")
expiration: float = time() + expiry_time
room_name = self._get_name_from_url(room_url)
res: requests.Response = requests.post(
f"{self.daily_api_url}/meeting-tokens",
headers={
"Authorization": f"Bearer {self.daily_api_key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": owner,
"exp": expiration
}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return token

92
tests/test_langchain.py Normal file
View File

@@ -0,0 +1,92 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.frames.frames import (LLMFullResponseEndFrame,
LLMFullResponseStartFrame, StopTaskFrame,
TextFrame, TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.processors.frameworks.langchain import LangchainProcessor
from langchain.prompts import ChatPromptTemplate
from langchain_core.language_models import FakeStreamingListLLM
class TestLangchain(unittest.IsolatedAsyncioTestCase):
class MockProcessor(FrameProcessor):
def __init__(self, name):
self.name = name
self.token: list[str] = []
# Start collecting tokens when we see the start frame
self.start_collecting = False
def __str__(self):
return self.name
async def process_frame(self, frame, direction):
if isinstance(frame, LLMFullResponseStartFrame):
self.start_collecting = True
elif isinstance(frame, TextFrame) and self.start_collecting:
self.token.append(frame.text)
elif isinstance(frame, LLMFullResponseEndFrame):
self.start_collecting = False
await self.push_frame(frame, direction)
def setUp(self):
self.expected_response = "Hello dear human"
self.fake_llm = FakeStreamingListLLM(responses=[self.expected_response])
self.mock_proc = self.MockProcessor("token_collector")
async def test_langchain(self):
messages = [("system", "Say hello to {name}"), ("human", "{input}")]
prompt = ChatPromptTemplate.from_messages(messages).partial(name="Thomas")
chain = prompt | self.fake_llm
proc = LangchainProcessor(chain=chain)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
tma_in,
proc,
self.mock_proc,
tma_out,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=False))
await task.queue_frames(
[
UserStartedSpeakingFrame(),
TranscriptionFrame(text="Hi World", user_id="user", timestamp="now"),
UserStoppedSpeakingFrame(),
StopTaskFrame(),
]
)
runner = PipelineRunner()
await runner.run(task)
self.assertEqual("".join(self.mock_proc.token), self.expected_response)
# TODO: Address this issue
# This next one would fail with:
# AssertionError: ' H e l l o d e a r h u m a n' != 'Hello dear human'
# self.assertEqual(tma_out.messages[-1]["content"], self.expected_response)
if __name__ == "__main__":
unittest.main()

39
tests/test_openai_tts.py Normal file
View File

@@ -0,0 +1,39 @@
import asyncio
import unittest
import openai
import pyaudio
from dotenv import load_dotenv
from pipecat.frames.frames import AudioRawFrame, ErrorFrame
from pipecat.services.openai import OpenAITTSService
load_dotenv()
class TestWhisperOpenAIService(unittest.IsolatedAsyncioTestCase):
async def test_whisper_tts(self):
pa = pyaudio.PyAudio()
stream = pa.open(format=pyaudio.paInt16,
channels=1,
rate=24_000,
output=True)
tts = OpenAITTSService(voice="nova")
async for frame in tts.run_tts("Hello, there. Nice to meet you, seems to work well"):
self.assertIsInstance(frame, AudioRawFrame)
stream.write(frame.audio)
await asyncio.sleep(.5)
stream.stop_stream()
pa.terminate()
tts = OpenAITTSService(voice="invalid_voice")
with self.assertRaises(openai.BadRequestError):
async for frame in tts.run_tts("wont work"):
self.assertIsInstance(frame, ErrorFrame)
if __name__ == "__main__":
unittest.main()