Files
pipecat/examples/foundational/03-still-frame.py
2025-05-29 10:43:12 -07:00

74 lines
2.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.fal.image import FalImageGenService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
"webrtc": lambda: TransportParams(
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
}
async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting bot")
# Create an HTTP session
async with aiohttp.ClientSession() as session:
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
task = PipelineTask(Pipeline([imagegen, transport.output()]))
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frame(TextFrame("a cat in the style of picasso"))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
if __name__ == "__main__":
from pipecat.examples.run import main
main(run_example, transport_params=transport_params)