Compare commits

...

9 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
9cd7c82e77 testing pushing a frame from function call start hook 2024-09-30 14:52:18 -07:00
Kwindla Hultman Kramer
43161c816e get rid of some debug log lines used during development 2024-09-30 14:48:44 -07:00
Kwindla Hultman Kramer
6644c06af1 throw error if the llm tries to call a function that's not registered 2024-09-30 14:48:44 -07:00
Kwindla Hultman Kramer
ed47212e07 handle openai multiple function calls 2024-09-30 14:48:40 -07:00
JeevanReddy
db9cb74364 openai can give multiple tool calls, current implementation assumes only one function call at a time. Fixed this to handle multiple function calls. 2024-09-30 14:47:31 -07:00
Aleix Conchillo Flaqué
f64902eb25 pipeline(task): since everything is async tasks should wait for EndFrame 2024-09-30 14:08:11 -07:00
Aleix Conchillo Flaqué
e115a274d6 tests: fix langchanin tests 2024-09-30 14:08:11 -07:00
Aleix Conchillo Flaqué
00239c2fd4 syncparallelpipeline: fix now that all frames are asynchronous 2024-09-30 14:08:11 -07:00
Aleix Conchillo Flaqué
c0f9ad19fe all frame processors are asynchrnous
In this commit we make all frame processors asynchronous, that is, they have an
internal queue and they push frames using a task from that queue.
2024-09-30 13:17:50 -07:00
22 changed files with 268 additions and 193 deletions

View File

@@ -48,15 +48,10 @@ async def on_connected(processor):
frames. To achieve that, each frame processor should only output frames from a frames. To achieve that, each frame processor should only output frames from a
single task. single task.
In this version we introduce synchronous and asynchronous frame In this version all the frame processors have their own task to push
processors. The synchronous processors push output frames from the same task frames. That is, when `push_frame()` is called the given frame will be put
that they receive input frames, and therefore only pushing frames from one into an internal queue (with the exception of system frames) and a frame
task. Asynchronous frame processors can have internal tasks to perform things processor task will push it out.
asynchronously (e.g. receiving data from a websocket) but they also have a
single task where they push frames from.
By default, frame processors are synchronous. To change a frame processor to
asynchronous you only need to pass `sync=False` to the base class constructor.
- Added pipeline clocks. A pipeline clock is used by the output transport to - Added pipeline clocks. A pipeline clock is used by the output transport to
know when a frame needs to be presented. For that, all frames now have an know when a frame needs to be presented. For that, all frames now have an
@@ -68,9 +63,7 @@ async def on_connected(processor):
`SystemClock`). This clock will be passed to each frame processor via the `SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`. `StartFrame`.
- Added `CartesiaHttpTTSService`. This is a synchronous frame processor - Added `CartesiaHttpTTSService`.
(i.e. given an input text frame it will wait for the whole output before
returning).
- `DailyTransport` now supports setting the audio bitrate to improve audio - `DailyTransport` now supports setting the audio bitrate to improve audio
quality through the `DailyParams.audio_out_bitrate` parameter. The new quality through the `DailyParams.audio_out_bitrate` parameter. The new
@@ -110,8 +103,9 @@ async def on_connected(processor):
pipelines to be executed concurrently. The difference between a pipelines to be executed concurrently. The difference between a
`SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame, `SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame,
the `SyncParallelPipeline` will wait for all the internal pipelines to the `SyncParallelPipeline` will wait for all the internal pipelines to
complete. This is achieved by ensuring all the processors in each of the complete. This is achieved by making sure the last processor in each of the
internal pipelines are synchronous. pipelines is synchronous (e.g. an HTTP-based service that waits for the
response).
- `StartFrame` is back a system frame so we make sure it's processed immediately - `StartFrame` is back a system frame so we make sure it's processed immediately
by all processors. `EndFrame` stays a control frame since it needs to be by all processors. `EndFrame` stays a control frame since it needs to be

View File

@@ -86,13 +86,13 @@ async def main():
), ),
) )
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaHttpTTSService( tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"), api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
) )
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
imagegen = FalImageGenService( imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"), params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session, aiohttp_session=session,
@@ -107,8 +107,10 @@ async def main():
# that, each pipeline runs concurrently and `SyncParallelPipeline` will # that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed. # wait for the input frame to be processed.
# #
# Note that `SyncParallelPipeline` requires all processors in it to be # Note that `SyncParallelPipeline` requires the last processor in each
# synchronous (which is the default for most processors). # of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline( pipeline = Pipeline(
[ [
llm, # LLM llm, # LLM

View File

@@ -121,8 +121,10 @@ async def main():
# `SyncParallelPipeline` will wait for the input frame to be # `SyncParallelPipeline` will wait for the input frame to be
# processed. # processed.
# #
# Note that `SyncParallelPipeline` requires all processors in it to # Note that `SyncParallelPipeline` requires the last processor in
# be synchronous (which is the default for most processors). # each of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline( pipeline = Pipeline(
[ [
llm, # LLM llm, # LLM

View File

@@ -5,10 +5,15 @@
# #
import asyncio import asyncio
import aiohttp
import os import os
import sys import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.frames.frames import TextFrame from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.runner import PipelineRunner
@@ -19,14 +24,6 @@ from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True) load_dotenv(override=True)
logger.remove(0) logger.remove(0)
@@ -34,7 +31,12 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context): async def start_fetch_weather(function_name, llm, context):
await llm.push_frame(TextFrame("Let me check on that.")) # note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
await llm.push_frame(TextFrame("Let me check on that. "))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback): async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
@@ -106,11 +108,11 @@ async def main():
pipeline = Pipeline( pipeline = Pipeline(
[ [
fl_in, # fl_in,
transport.input(), transport.input(),
context_aggregator.user(), context_aggregator.user(),
llm, llm,
fl_out, # fl_out,
tts, tts,
transport.output(), transport.output(),
context_aggregator.assistant(), context_aggregator.assistant(),

View File

@@ -585,6 +585,7 @@ class FunctionCallResultFrame(DataFrame):
tool_call_id: str tool_call_id: str
arguments: str arguments: str
result: Any result: Any
run_llm: bool = True
@dataclass @dataclass

View File

@@ -9,14 +9,20 @@ import asyncio
from itertools import chain from itertools import chain
from typing import List from typing import List
from pipecat.frames.frames import ControlFrame, Frame, SystemFrame
from pipecat.pipeline.base_pipeline import BasePipeline from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import Frame
from loguru import logger from loguru import logger
class SyncFrame(ControlFrame):
"""This frame is used to know when the internal pipelines have finished."""
pass
class Source(FrameProcessor): class Source(FrameProcessor):
def __init__(self, upstream_queue: asyncio.Queue): def __init__(self, upstream_queue: asyncio.Queue):
super().__init__() super().__init__()
@@ -67,13 +73,16 @@ class SyncParallelPipeline(BasePipeline):
raise TypeError(f"SyncParallelPipeline argument {processors} is not a list") raise TypeError(f"SyncParallelPipeline argument {processors} is not a list")
# We add a source at the beginning of the pipeline and a sink at the end. # We add a source at the beginning of the pipeline and a sink at the end.
source = Source(self._up_queue) up_queue = asyncio.Queue()
sink = Sink(self._down_queue) down_queue = asyncio.Queue()
source = Source(up_queue)
sink = Sink(down_queue)
processors: List[FrameProcessor] = [source] + processors + [sink] processors: List[FrameProcessor] = [source] + processors + [sink]
# Keep track of sources and sinks. # Keep track of sources and sinks. We also keep the output queue of
self._sources.append(source) # the source and the sinks so we can use it later.
self._sinks.append(sink) self._sources.append({"processor": source, "queue": down_queue})
self._sinks.append({"processor": sink, "queue": up_queue})
# Create pipeline # Create pipeline
pipeline = Pipeline(processors) pipeline = Pipeline(processors)
@@ -94,17 +103,46 @@ class SyncParallelPipeline(BasePipeline):
async def process_frame(self, frame: Frame, direction: FrameDirection): async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction) await super().process_frame(frame, direction)
# The last processor of each pipeline needs to be synchronous otherwise
# this element won't work. Since, we know it should be synchronous we
# push a SyncFrame. Since frames are ordered we know this frame will be
# pushed after the synchronous processor has pushed its data allowing us
# to synchrnonize all the internal pipelines by waiting for the
# SyncFrame in all of them.
async def wait_for_sync(
obj, main_queue: asyncio.Queue, frame: Frame, direction: FrameDirection
):
processor = obj["processor"]
queue = obj["queue"]
await processor.process_frame(frame, direction)
# If we have a system frame we don't need to synchrnonize anything.
if isinstance(frame, SystemFrame):
await main_queue.put(frame)
else:
await processor.process_frame(SyncFrame(), direction)
frame = await queue.get()
while not isinstance(frame, SyncFrame):
await main_queue.put(frame)
queue.task_done()
frame = await queue.get()
if direction == FrameDirection.UPSTREAM: if direction == FrameDirection.UPSTREAM:
# If we get an upstream frame we process it in each sink. # If we get an upstream frame we process it in each sink.
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sinks]) await asyncio.gather(
*[wait_for_sync(s, self._up_queue, frame, direction) for s in self._sinks]
)
elif direction == FrameDirection.DOWNSTREAM: elif direction == FrameDirection.DOWNSTREAM:
# If we get a downstream frame we process it in each source. # If we get a downstream frame we process it in each source.
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sources]) await asyncio.gather(
*[wait_for_sync(s, self._down_queue, frame, direction) for s in self._sources]
)
seen_ids = set() seen_ids = set()
while not self._up_queue.empty(): while not self._up_queue.empty():
frame = await self._up_queue.get() frame = await self._up_queue.get()
if frame and frame.id not in seen_ids: if frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.UPSTREAM) await self.push_frame(frame, FrameDirection.UPSTREAM)
seen_ids.add(frame.id) seen_ids.add(frame.id)
self._up_queue.task_done() self._up_queue.task_done()
@@ -112,7 +150,7 @@ class SyncParallelPipeline(BasePipeline):
seen_ids = set() seen_ids = set()
while not self._down_queue.empty(): while not self._down_queue.empty():
frame = await self._down_queue.get() frame = await self._down_queue.get()
if frame and frame.id not in seen_ids: if frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.DOWNSTREAM) await self.push_frame(frame, FrameDirection.DOWNSTREAM)
seen_ids.add(frame.id) seen_ids.add(frame.id)
self._down_queue.task_done() self._down_queue.task_done()

View File

@@ -69,6 +69,19 @@ class Source(FrameProcessor):
await self._up_queue.put(StopTaskFrame()) await self._up_queue.put(StopTaskFrame())
class Sink(FrameProcessor):
def __init__(self, down_queue: asyncio.Queue):
super().__init__()
self._down_queue = down_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We really just want to know when the EndFrame reached the sink.
if isinstance(frame, EndFrame):
await self._down_queue.put(frame)
class PipelineTask: class PipelineTask:
def __init__( def __init__(
self, self,
@@ -84,12 +97,16 @@ class PipelineTask:
self._params = params self._params = params
self._finished = False self._finished = False
self._down_queue = asyncio.Queue()
self._up_queue = asyncio.Queue() self._up_queue = asyncio.Queue()
self._down_queue = asyncio.Queue()
self._push_queue = asyncio.Queue()
self._source = Source(self._up_queue) self._source = Source(self._up_queue)
self._source.link(pipeline) self._source.link(pipeline)
self._sink = Sink(self._down_queue)
pipeline.link(self._sink)
def has_finished(self): def has_finished(self):
return self._finished return self._finished
@@ -103,19 +120,19 @@ class PipelineTask:
# out-of-band from the main streaming task which is what we want since # out-of-band from the main streaming task which is what we want since
# we want to cancel right away. # we want to cancel right away.
await self._source.push_frame(CancelFrame()) await self._source.push_frame(CancelFrame())
self._process_down_task.cancel() self._process_push_task.cancel()
self._process_up_task.cancel() self._process_up_task.cancel()
await self._process_down_task await self._process_push_task
await self._process_up_task await self._process_up_task
async def run(self): async def run(self):
self._process_up_task = asyncio.create_task(self._process_up_queue()) self._process_up_task = asyncio.create_task(self._process_up_queue())
self._process_down_task = asyncio.create_task(self._process_down_queue()) self._process_push_task = asyncio.create_task(self._process_push_queue())
await asyncio.gather(self._process_up_task, self._process_down_task) await asyncio.gather(self._process_up_task, self._process_push_task)
self._finished = True self._finished = True
async def queue_frame(self, frame: Frame): async def queue_frame(self, frame: Frame):
await self._down_queue.put(frame) await self._push_queue.put(frame)
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]): async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
if isinstance(frames, AsyncIterable): if isinstance(frames, AsyncIterable):
@@ -133,7 +150,7 @@ class PipelineTask:
data.append(ProcessingMetricsData(processor=p.name, value=0.0)) data.append(ProcessingMetricsData(processor=p.name, value=0.0))
return MetricsFrame(data=data) return MetricsFrame(data=data)
async def _process_down_queue(self): async def _process_push_queue(self):
self._clock.start() self._clock.start()
start_frame = StartFrame( start_frame = StartFrame(
@@ -154,11 +171,13 @@ class PipelineTask:
should_cleanup = True should_cleanup = True
while running: while running:
try: try:
frame = await self._down_queue.get() frame = await self._push_queue.get()
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM) await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
if isinstance(frame, EndFrame):
await self._wait_for_endframe()
running = not (isinstance(frame, StopTaskFrame) or isinstance(frame, EndFrame)) running = not (isinstance(frame, StopTaskFrame) or isinstance(frame, EndFrame))
should_cleanup = not isinstance(frame, StopTaskFrame) should_cleanup = not isinstance(frame, StopTaskFrame)
self._down_queue.task_done() self._push_queue.task_done()
except asyncio.CancelledError: except asyncio.CancelledError:
break break
# Cleanup only if we need to. # Cleanup only if we need to.
@@ -169,6 +188,12 @@ class PipelineTask:
self._process_up_task.cancel() self._process_up_task.cancel()
await self._process_up_task await self._process_up_task
async def _wait_for_endframe(self):
# NOTE(aleix): the Sink element just pushes EndFrames to the down queue,
# so just wait for it. In the future we might do something else here,
# but for now this is fine.
await self._down_queue.get()
async def _process_up_queue(self): async def _process_up_queue(self):
while True: while True:
try: try:

View File

@@ -133,6 +133,7 @@ class OpenAILLMContext:
tool_call_id: str, tool_call_id: str,
arguments: str, arguments: str,
llm: FrameProcessor, llm: FrameProcessor,
run_llm: bool = True,
) -> None: ) -> None:
# Push a SystemFrame downstream. This frame will let our assistant context aggregator # Push a SystemFrame downstream. This frame will let our assistant context aggregator
# know that we are in the middle of a function call. Some contexts/aggregators may # know that we are in the middle of a function call. Some contexts/aggregators may
@@ -153,6 +154,7 @@ class OpenAILLMContext:
tool_call_id=tool_call_id, tool_call_id=tool_call_id,
arguments=arguments, arguments=arguments,
result=result, result=result,
run_llm=run_llm,
) )
) )

View File

@@ -37,7 +37,6 @@ class FrameProcessor:
*, *,
name: str | None = None, name: str | None = None,
metrics: FrameProcessorMetrics | None = None, metrics: FrameProcessorMetrics | None = None,
sync: bool = True,
loop: asyncio.AbstractEventLoop | None = None, loop: asyncio.AbstractEventLoop | None = None,
**kwargs, **kwargs,
): ):
@@ -47,7 +46,6 @@ class FrameProcessor:
self._prev: "FrameProcessor" | None = None self._prev: "FrameProcessor" | None = None
self._next: "FrameProcessor" | None = None self._next: "FrameProcessor" | None = None
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop() self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
self._sync = sync
self._event_handlers: dict = {} self._event_handlers: dict = {}
@@ -66,11 +64,8 @@ class FrameProcessor:
# Every processor in Pipecat should only output frames from a single # Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are # task. This avoid problems like audio overlapping. System frames are
# the exception to this rule. # the exception to this rule. This create this task.
# self.__create_push_task()
# This create this task.
if not self._sync:
self.__create_push_task()
@property @property
def interruptions_allowed(self): def interruptions_allowed(self):
@@ -167,7 +162,7 @@ class FrameProcessor:
await self.push_frame(error, FrameDirection.UPSTREAM) await self.push_frame(error, FrameDirection.UPSTREAM)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if self._sync or isinstance(frame, SystemFrame): if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction) await self.__internal_push_frame(frame, direction)
else: else:
await self.__push_queue.put((frame, direction)) await self.__push_queue.put((frame, direction))
@@ -194,13 +189,12 @@ class FrameProcessor:
# #
async def _start_interruption(self): async def _start_interruption(self):
if not self._sync: # Cancel the task. This will stop pushing frames downstream.
# Cancel the task. This will stop pushing frames downstream. self.__push_frame_task.cancel()
self.__push_frame_task.cancel() await self.__push_frame_task
await self.__push_frame_task
# Create a new queue and task. # Create a new queue and task.
self.__create_push_task() self.__create_push_task()
async def _stop_interruption(self): async def _stop_interruption(self):
# Nothing to do right now. # Nothing to do right now.

View File

@@ -516,7 +516,7 @@ class RTVIProcessor(FrameProcessor):
params: RTVIProcessorParams = RTVIProcessorParams(), params: RTVIProcessorParams = RTVIProcessorParams(),
**kwargs, **kwargs,
): ):
super().__init__(sync=False, **kwargs) super().__init__(**kwargs)
self._config = config self._config = config
self._params = params self._params = params

View File

@@ -44,7 +44,7 @@ class GStreamerPipelineSource(FrameProcessor):
clock_sync: bool = True clock_sync: bool = True
def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs): def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs):
super().__init__(sync=False, **kwargs) super().__init__(**kwargs)
self._out_params = out_params self._out_params = out_params

View File

@@ -26,7 +26,7 @@ class IdleFrameProcessor(FrameProcessor):
types: List[type] = [], types: List[type] = [],
**kwargs, **kwargs,
): ):
super().__init__(sync=False, **kwargs) super().__init__(**kwargs)
self._callback = callback self._callback = callback
self._timeout = timeout self._timeout = timeout

View File

@@ -31,7 +31,7 @@ class UserIdleProcessor(FrameProcessor):
timeout: float, timeout: float,
**kwargs, **kwargs,
): ):
super().__init__(sync=False, **kwargs) super().__init__(**kwargs)
self._callback = callback self._callback = callback
self._timeout = timeout self._timeout = timeout

View File

@@ -110,7 +110,13 @@ class LLMService(AIService):
return function_name in self._callbacks.keys() return function_name in self._callbacks.keys()
async def call_function( async def call_function(
self, *, context: OpenAILLMContext, tool_call_id: str, function_name: str, arguments: str self,
*,
context: OpenAILLMContext,
tool_call_id: str,
function_name: str,
arguments: str,
run_llm: bool,
) -> None: ) -> None:
f = None f = None
if function_name in self._callbacks.keys(): if function_name in self._callbacks.keys():
@@ -120,7 +126,12 @@ class LLMService(AIService):
else: else:
return None return None
await context.call_function( await context.call_function(
f, function_name=function_name, tool_call_id=tool_call_id, arguments=arguments, llm=self f,
function_name=function_name,
tool_call_id=tool_call_id,
arguments=arguments,
llm=self,
run_llm=run_llm,
) )
# QUESTION FOR CB: maybe this isn't needed anymore? # QUESTION FOR CB: maybe this isn't needed anymore?
@@ -144,6 +155,10 @@ class TTSService(AIService):
# if True, TTSService will push TextFrames and LLMFullResponseEndFrames, # if True, TTSService will push TextFrames and LLMFullResponseEndFrames,
# otherwise subclass must do it # otherwise subclass must do it
push_text_frames: bool = True, push_text_frames: bool = True,
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
push_stop_frames: bool = False,
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 1.0,
# TTS output sample rate # TTS output sample rate
sample_rate: int = 16000, sample_rate: int = 16000,
**kwargs, **kwargs,
@@ -151,9 +166,15 @@ class TTSService(AIService):
super().__init__(**kwargs) super().__init__(**kwargs)
self._aggregate_sentences: bool = aggregate_sentences self._aggregate_sentences: bool = aggregate_sentences
self._push_text_frames: bool = push_text_frames self._push_text_frames: bool = push_text_frames
self._current_sentence: str = "" self._push_stop_frames: bool = push_stop_frames
self._stop_frame_timeout_s: float = stop_frame_timeout_s
self._sample_rate: int = sample_rate self._sample_rate: int = sample_rate
self._stop_frame_task: Optional[asyncio.Task] = None
self._stop_frame_queue: asyncio.Queue = asyncio.Queue()
self._current_sentence: str = ""
@property @property
def sample_rate(self) -> int: def sample_rate(self) -> int:
return self._sample_rate return self._sample_rate
@@ -210,13 +231,72 @@ class TTSService(AIService):
async def set_role(self, role: str): async def set_role(self, role: str):
pass pass
@abstractmethod
async def flush_audio(self):
pass
# Converts the text to audio. # Converts the text to audio.
@abstractmethod @abstractmethod
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
pass pass
async def start(self, frame: StartFrame):
await super().start(frame)
if self._push_stop_frames:
self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler())
async def stop(self, frame: EndFrame):
await super().stop(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None
async def say(self, text: str): async def say(self, text: str):
await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM) await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM)
await self.flush_audio()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
await self._process_text_frame(frame)
elif isinstance(frame, StartInterruptionFrame):
await self._handle_interruption(frame, direction)
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
sentence = self._current_sentence
self._current_sentence = ""
await self._push_tts_frames(sentence)
if isinstance(frame, LLMFullResponseEndFrame):
if self._push_text_frames:
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
elif isinstance(frame, TTSSpeakFrame):
await self._push_tts_frames(frame.text)
await self.flush_audio()
elif isinstance(frame, TTSUpdateSettingsFrame):
await self._update_tts_settings(frame)
else:
await self.push_frame(frame, direction)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
await super().push_frame(frame, direction)
if self._push_stop_frames and (
isinstance(frame, StartInterruptionFrame)
or isinstance(frame, TTSStartedFrame)
or isinstance(frame, TTSAudioRawFrame)
or isinstance(frame, TTSStoppedFrame)
):
await self._stop_frame_queue.put(frame)
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection): async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
self._current_sentence = "" self._current_sentence = ""
@@ -276,88 +356,6 @@ class TTSService(AIService):
if frame.role is not None: if frame.role is not None:
await self.set_role(frame.role) await self.set_role(frame.role)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
await self._process_text_frame(frame)
elif isinstance(frame, StartInterruptionFrame):
await self._handle_interruption(frame, direction)
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
sentence = self._current_sentence
self._current_sentence = ""
await self._push_tts_frames(sentence)
if isinstance(frame, LLMFullResponseEndFrame):
if self._push_text_frames:
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
elif isinstance(frame, TTSSpeakFrame):
await self._push_tts_frames(frame.text)
elif isinstance(frame, TTSUpdateSettingsFrame):
await self._update_tts_settings(frame)
else:
await self.push_frame(frame, direction)
class AsyncTTSService(TTSService):
def __init__(
self,
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
push_stop_frames: bool = False,
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 1.0,
**kwargs,
):
super().__init__(sync=False, **kwargs)
self._push_stop_frames: bool = push_stop_frames
self._stop_frame_timeout_s: float = stop_frame_timeout_s
self._stop_frame_task: Optional[asyncio.Task] = None
self._stop_frame_queue: asyncio.Queue = asyncio.Queue()
@abstractmethod
async def flush_audio(self):
pass
async def say(self, text: str):
await super().say(text)
await self.flush_audio()
async def start(self, frame: StartFrame):
await super().start(frame)
if self._push_stop_frames:
self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler())
async def stop(self, frame: EndFrame):
await super().stop(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSSpeakFrame):
await self.flush_audio()
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
await super().push_frame(frame, direction)
if self._push_stop_frames and (
isinstance(frame, StartInterruptionFrame)
or isinstance(frame, TTSStartedFrame)
or isinstance(frame, TTSAudioRawFrame)
or isinstance(frame, TTSStoppedFrame)
):
await self._stop_frame_queue.put(frame)
async def _stop_frame_handler(self): async def _stop_frame_handler(self):
try: try:
has_started = False has_started = False
@@ -378,7 +376,7 @@ class AsyncTTSService(TTSService):
pass pass
class AsyncWordTTSService(AsyncTTSService): class WordTTSService(TTSService):
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self._initial_word_timestamp = -1 self._initial_word_timestamp = -1

View File

@@ -26,7 +26,7 @@ from pipecat.frames.frames import (
) )
from pipecat.processors.frame_processor import FrameDirection from pipecat.processors.frame_processor import FrameDirection
from pipecat.transcriptions.language import Language from pipecat.transcriptions.language import Language
from pipecat.services.ai_services import AsyncWordTTSService, TTSService from pipecat.services.ai_services import WordTTSService, TTSService
from loguru import logger from loguru import logger
@@ -61,7 +61,7 @@ def language_to_cartesia_language(language: Language) -> str | None:
return None return None
class CartesiaTTSService(AsyncWordTTSService): class CartesiaTTSService(WordTTSService):
class InputParams(BaseModel): class InputParams(BaseModel):
encoding: Optional[str] = "pcm_s16le" encoding: Optional[str] = "pcm_s16le"
sample_rate: Optional[int] = 16000 sample_rate: Optional[int] = 16000

View File

@@ -23,7 +23,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame, TTSStoppedFrame,
) )
from pipecat.processors.frame_processor import FrameDirection from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AsyncWordTTSService from pipecat.services.ai_services import WordTTSService
# See .env.example for ElevenLabs configuration needed # See .env.example for ElevenLabs configuration needed
try: try:
@@ -70,7 +70,7 @@ def calculate_word_times(
return word_times return word_times
class ElevenLabsTTSService(AsyncWordTTSService): class ElevenLabsTTSService(WordTTSService):
class InputParams(BaseModel): class InputParams(BaseModel):
language: Optional[str] = None language: Optional[str] = None
output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000" output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000"

View File

@@ -51,7 +51,7 @@ class GladiaSTTService(STTService):
params: InputParams = InputParams(), params: InputParams = InputParams(),
**kwargs, **kwargs,
): ):
super().__init__(sync=False, **kwargs) super().__init__(**kwargs)
self._api_key = api_key self._api_key = api_key
self._url = url self._url = url

View File

@@ -20,7 +20,7 @@ from pipecat.frames.frames import (
TTSStartedFrame, TTSStartedFrame,
TTSStoppedFrame, TTSStoppedFrame,
) )
from pipecat.services.ai_services import AsyncTTSService from pipecat.services.ai_services import TTSService
from loguru import logger from loguru import logger
@@ -35,7 +35,7 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}") raise Exception(f"Missing module: {e}")
class LmntTTSService(AsyncTTSService): class LmntTTSService(TTSService):
def __init__( def __init__(
self, self,
*, *,
@@ -47,7 +47,7 @@ class LmntTTSService(AsyncTTSService):
): ):
# Let TTSService produce TTSStoppedFrames after a short delay of # Let TTSService produce TTSStoppedFrames after a short delay of
# no activity. # no activity.
super().__init__(sync=False, push_stop_frames=True, sample_rate=sample_rate, **kwargs) super().__init__(push_stop_frames=True, sample_rate=sample_rate, **kwargs)
self._api_key = api_key self._api_key = api_key
self._voice_id = voice_id self._voice_id = voice_id

View File

@@ -205,6 +205,10 @@ class BaseOpenAILLMService(LLMService):
return chunks return chunks
async def _process_context(self, context: OpenAILLMContext): async def _process_context(self, context: OpenAILLMContext):
functions_list = []
arguments_list = []
tool_id_list = []
func_idx = 0
function_name = "" function_name = ""
arguments = "" arguments = ""
tool_call_id = "" tool_call_id = ""
@@ -242,6 +246,14 @@ class BaseOpenAILLMService(LLMService):
# yield a frame containing the function name and the arguments. # yield a frame containing the function name and the arguments.
tool_call = chunk.choices[0].delta.tool_calls[0] tool_call = chunk.choices[0].delta.tool_calls[0]
if tool_call.index != func_idx:
functions_list.append(function_name)
arguments_list.append(arguments)
tool_id_list.append(tool_call_id)
function_name = ""
arguments = ""
tool_call_id = ""
func_idx += 1
if tool_call.function and tool_call.function.name: if tool_call.function and tool_call.function.name:
function_name += tool_call.function.name function_name += tool_call.function.name
tool_call_id = tool_call.id tool_call_id = tool_call.id
@@ -257,21 +269,29 @@ class BaseOpenAILLMService(LLMService):
# the context, and re-prompt to get a chat answer. If we don't have a registered # the context, and re-prompt to get a chat answer. If we don't have a registered
# handler, raise an exception. # handler, raise an exception.
if function_name and arguments: if function_name and arguments:
if self.has_function(function_name): # added to the list as last function name and arguments not added to the list
await self._handle_function_call(context, tool_call_id, function_name, arguments) functions_list.append(function_name)
else: arguments_list.append(arguments)
raise OpenAIUnhandledFunctionException( tool_id_list.append(tool_call_id)
f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function."
)
async def _handle_function_call(self, context, tool_call_id, function_name, arguments): total_items = len(functions_list)
arguments = json.loads(arguments) for index, (function_name, arguments, tool_id) in enumerate(
await self.call_function( zip(functions_list, arguments_list, tool_id_list), start=1
context=context, ):
tool_call_id=tool_call_id, if self.has_function(function_name):
function_name=function_name, run_llm = index == total_items
arguments=arguments, arguments = json.loads(arguments)
) await self.call_function(
context=context,
function_name=function_name,
arguments=arguments,
tool_call_id=tool_id,
run_llm=run_llm,
)
else:
raise OpenAIUnhandledFunctionException(
f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function."
)
async def _update_settings(self, frame: LLMUpdateSettingsFrame): async def _update_settings(self, frame: LLMUpdateSettingsFrame):
if frame.model is not None: if frame.model is not None:
@@ -461,31 +481,27 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
def __init__(self, user_context_aggregator: OpenAIUserContextAggregator): def __init__(self, user_context_aggregator: OpenAIUserContextAggregator):
super().__init__(context=user_context_aggregator._context) super().__init__(context=user_context_aggregator._context)
self._user_context_aggregator = user_context_aggregator self._user_context_aggregator = user_context_aggregator
self._function_call_in_progress = None self._function_calls_in_progress = {}
self._function_call_result = None self._function_call_result = None
async def process_frame(self, frame, direction): async def process_frame(self, frame, direction):
await super().process_frame(frame, direction) await super().process_frame(frame, direction)
# See note above about not calling push_frame() here. # See note above about not calling push_frame() here.
if isinstance(frame, StartInterruptionFrame): if isinstance(frame, StartInterruptionFrame):
self._function_call_in_progress = None self._function_calls_in_progress.clear()
self._function_call_finished = None self._function_call_finished = None
elif isinstance(frame, FunctionCallInProgressFrame): elif isinstance(frame, FunctionCallInProgressFrame):
self._function_call_in_progress = frame self._function_calls_in_progress[frame.tool_call_id] = frame
elif isinstance(frame, FunctionCallResultFrame): elif isinstance(frame, FunctionCallResultFrame):
if ( if frame.tool_call_id in self._function_calls_in_progress:
self._function_call_in_progress del self._function_calls_in_progress[frame.tool_call_id]
and self._function_call_in_progress.tool_call_id == frame.tool_call_id
):
self._function_call_in_progress = None
self._function_call_result = frame self._function_call_result = frame
# TODO-CB: Kwin wants us to refactor this out of here but I REFUSE # TODO-CB: Kwin wants us to refactor this out of here but I REFUSE
await self._push_aggregation() await self._push_aggregation()
else: else:
logger.warning( logger.warning(
"FunctionCallResultFrame tool_call_id does not match FunctionCallInProgressFrame tool_call_id" "FunctionCallResultFrame tool_call_id does not match any function call in progress"
) )
self._function_call_in_progress = None
self._function_call_result = None self._function_call_result = None
async def _push_aggregation(self): async def _push_aggregation(self):
@@ -524,7 +540,7 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
"tool_call_id": frame.tool_call_id, "tool_call_id": frame.tool_call_id,
} }
) )
run_llm = True run_llm = frame.run_llm
else: else:
self._context.add_message({"role": "assistant", "content": aggregation}) self._context.add_message({"role": "assistant", "content": aggregation})

View File

@@ -31,7 +31,7 @@ from loguru import logger
class BaseInputTransport(FrameProcessor): class BaseInputTransport(FrameProcessor):
def __init__(self, params: TransportParams, **kwargs): def __init__(self, params: TransportParams, **kwargs):
super().__init__(sync=False, **kwargs) super().__init__(**kwargs)
self._params = params self._params = params

View File

@@ -43,7 +43,7 @@ from pipecat.utils.time import nanoseconds_to_seconds
class BaseOutputTransport(FrameProcessor): class BaseOutputTransport(FrameProcessor):
def __init__(self, params: TransportParams, **kwargs): def __init__(self, params: TransportParams, **kwargs):
super().__init__(sync=False, **kwargs) super().__init__(**kwargs)
self._params = params self._params = params

View File

@@ -7,9 +7,9 @@
import unittest import unittest
from pipecat.frames.frames import ( from pipecat.frames.frames import (
EndFrame,
LLMFullResponseEndFrame, LLMFullResponseEndFrame,
LLMFullResponseStartFrame, LLMFullResponseStartFrame,
StopTaskFrame,
TextFrame, TextFrame,
TranscriptionFrame, TranscriptionFrame,
UserStartedSpeakingFrame, UserStartedSpeakingFrame,
@@ -32,6 +32,7 @@ from langchain_core.language_models import FakeStreamingListLLM
class TestLangchain(unittest.IsolatedAsyncioTestCase): class TestLangchain(unittest.IsolatedAsyncioTestCase):
class MockProcessor(FrameProcessor): class MockProcessor(FrameProcessor):
def __init__(self, name): def __init__(self, name):
super().__init__()
self.name = name self.name = name
self.token: list[str] = [] self.token: list[str] = []
# Start collecting tokens when we see the start frame # Start collecting tokens when we see the start frame
@@ -55,13 +56,13 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
def setUp(self): def setUp(self):
self.expected_response = "Hello dear human" self.expected_response = "Hello dear human"
self.fake_llm = FakeStreamingListLLM(responses=[self.expected_response]) self.fake_llm = FakeStreamingListLLM(responses=[self.expected_response])
self.mock_proc = self.MockProcessor("token_collector")
async def test_langchain(self): async def test_langchain(self):
messages = [("system", "Say hello to {name}"), ("human", "{input}")] messages = [("system", "Say hello to {name}"), ("human", "{input}")]
prompt = ChatPromptTemplate.from_messages(messages).partial(name="Thomas") prompt = ChatPromptTemplate.from_messages(messages).partial(name="Thomas")
chain = prompt | self.fake_llm chain = prompt | self.fake_llm
proc = LangchainProcessor(chain=chain) proc = LangchainProcessor(chain=chain)
self.mock_proc = self.MockProcessor("token_collector")
tma_in = LLMUserResponseAggregator(messages) tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages) tma_out = LLMAssistantResponseAggregator(messages)
@@ -81,7 +82,7 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
UserStartedSpeakingFrame(), UserStartedSpeakingFrame(),
TranscriptionFrame(text="Hi World", user_id="user", timestamp="now"), TranscriptionFrame(text="Hi World", user_id="user", timestamp="now"),
UserStoppedSpeakingFrame(), UserStoppedSpeakingFrame(),
StopTaskFrame(), EndFrame(),
] ]
) )