processors(gstreamer): add new GStreamerPipelineSource
This commit is contained in:
0
src/pipecat/processors/gstreamer/__init__.py
Normal file
0
src/pipecat/processors/gstreamer/__init__.py
Normal file
233
src/pipecat/processors/gstreamer/pipeline_source.py
Normal file
233
src/pipecat/processors/gstreamer/pipeline_source.py
Normal file
@@ -0,0 +1,233 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from openai import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
ImageRawFrame,
|
||||
StartFrame,
|
||||
SystemFrame)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
import gi
|
||||
gi.require_version('Gst', '1.0')
|
||||
gi.require_version('GstApp', '1.0')
|
||||
from gi.repository import Gst, GstApp
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use GStreamer processors, you need to install GStreamer in your system`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class GStreamerPipelineSource(FrameProcessor):
|
||||
class OutputParams(BaseModel):
|
||||
video_width: int = 1280
|
||||
video_height: int = 720
|
||||
audio_sample_rate: int = 16000
|
||||
audio_channels: int = 1
|
||||
|
||||
def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self._out_params = out_params
|
||||
|
||||
Gst.init()
|
||||
|
||||
self._player = Gst.Pipeline.new("player")
|
||||
|
||||
source = Gst.parse_bin_from_description(pipeline, True)
|
||||
|
||||
decodebin = Gst.ElementFactory.make("decodebin", None)
|
||||
decodebin.connect("pad-added", self._decodebin_callback)
|
||||
|
||||
self._player.add(source)
|
||||
self._player.add(decodebin)
|
||||
source.link(decodebin)
|
||||
|
||||
bus = self._player.get_bus()
|
||||
bus.add_signal_watch()
|
||||
bus.connect("message", self._on_gstreamer_message)
|
||||
|
||||
# Create push frame task. This is the task that will push frames in
|
||||
# order. We also guarantee that all frames are pushed in the same task.
|
||||
self._create_push_task()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Specific system frames
|
||||
if isinstance(frame, CancelFrame):
|
||||
await self._cancel(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
# All other system frames
|
||||
elif isinstance(frame, SystemFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
# Control frames
|
||||
elif isinstance(frame, StartFrame):
|
||||
await self._start(frame)
|
||||
await self._internal_push_frame(frame, direction)
|
||||
elif isinstance(frame, EndFrame):
|
||||
# Push EndFrame before stop(), because stop() waits on the task to
|
||||
# finish and the task finishes when EndFrame is processed.
|
||||
await self._internal_push_frame(frame, direction)
|
||||
await self._stop(frame)
|
||||
# Other frames
|
||||
else:
|
||||
await self._internal_push_frame(frame, direction)
|
||||
|
||||
async def _start(self, frame: StartFrame):
|
||||
self._player.set_state(Gst.State.PLAYING)
|
||||
|
||||
async def _stop(self, frame: EndFrame):
|
||||
self._player.set_state(Gst.State.NULL)
|
||||
# Wait for the push frame task to finish. It will finish when the
|
||||
# EndFrame is actually processed.
|
||||
await self._push_frame_task
|
||||
|
||||
async def _cancel(self, frame: CancelFrame):
|
||||
self._player.set_state(Gst.State.NULL)
|
||||
# Cancel all the tasks and wait for them to finish.
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
|
||||
#
|
||||
# Push frames task
|
||||
#
|
||||
|
||||
def _create_push_task(self):
|
||||
loop = self.get_event_loop()
|
||||
self._push_queue = asyncio.Queue()
|
||||
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
|
||||
|
||||
async def _internal_push_frame(
|
||||
self,
|
||||
frame: Frame | None,
|
||||
direction: FrameDirection | None = FrameDirection.DOWNSTREAM):
|
||||
await self._push_queue.put((frame, direction))
|
||||
|
||||
async def _push_frame_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
(frame, direction) = await self._push_queue.get()
|
||||
await self.push_frame(frame, direction)
|
||||
running = not isinstance(frame, EndFrame)
|
||||
self._push_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
#
|
||||
# GStreaner
|
||||
#
|
||||
|
||||
def _on_gstreamer_message(self, bus: Gst.Bus, message: Gst.Message):
|
||||
t = message.type
|
||||
if t == Gst.MessageType.ERROR:
|
||||
err, debug = message.parse_error()
|
||||
logger.error(f"{self} error: {err} : {debug}")
|
||||
return True
|
||||
|
||||
def _decodebin_callback(self, decodebin: Gst.Element, pad: Gst.Pad):
|
||||
caps_string = pad.get_current_caps().to_string()
|
||||
if caps_string.startswith("audio"):
|
||||
self._decodebin_audio(pad)
|
||||
elif caps_string.startswith("video"):
|
||||
self._decodebin_video(pad)
|
||||
|
||||
def _decodebin_audio(self, pad: Gst.Pad):
|
||||
queue_audio = Gst.ElementFactory.make("queue", None)
|
||||
audioconvert = Gst.ElementFactory.make("audioconvert", None)
|
||||
audioresample = Gst.ElementFactory.make("audioresample", None)
|
||||
audiocapsfilter = Gst.ElementFactory.make("capsfilter", None)
|
||||
audiocaps = Gst.Caps.from_string(
|
||||
f"audio/x-raw,format=S16LE,rate={self._out_params.audio_sample_rate},channels={self._out_params.audio_channels},layout=interleaved")
|
||||
audiocapsfilter.set_property("caps", audiocaps)
|
||||
appsink_audio = Gst.ElementFactory.make("appsink", None)
|
||||
appsink_audio.set_property("emit-signals", True)
|
||||
appsink_audio.set_property("sync", False)
|
||||
appsink_audio.connect("new-sample", self._appsink_audio_new_sample)
|
||||
|
||||
self._player.add(queue_audio)
|
||||
self._player.add(audioconvert)
|
||||
self._player.add(audioresample)
|
||||
self._player.add(audiocapsfilter)
|
||||
self._player.add(appsink_audio)
|
||||
queue_audio.sync_state_with_parent()
|
||||
audioconvert.sync_state_with_parent()
|
||||
audioresample.sync_state_with_parent()
|
||||
audiocapsfilter.sync_state_with_parent()
|
||||
appsink_audio.sync_state_with_parent()
|
||||
|
||||
queue_audio.link(audioconvert)
|
||||
audioconvert.link(audioresample)
|
||||
audioresample.link(audiocapsfilter)
|
||||
audiocapsfilter.link(appsink_audio)
|
||||
|
||||
queue_pad = queue_audio.get_static_pad("sink")
|
||||
pad.link(queue_pad)
|
||||
|
||||
def _decodebin_video(self, pad: Gst.Pad):
|
||||
queue_video = Gst.ElementFactory.make("queue", None)
|
||||
videoconvert = Gst.ElementFactory.make("videoconvert", None)
|
||||
videoscale = Gst.ElementFactory.make("videoscale", None)
|
||||
videocapsfilter = Gst.ElementFactory.make("capsfilter", None)
|
||||
videocaps = Gst.Caps.from_string(
|
||||
f"video/x-raw,format=RGB,width={self._out_params.video_width},height={self._out_params.video_height}")
|
||||
videocapsfilter.set_property("caps", videocaps)
|
||||
|
||||
appsink_video = Gst.ElementFactory.make("appsink", None)
|
||||
appsink_video.set_property("emit-signals", True)
|
||||
appsink_video.set_property("sync", False)
|
||||
appsink_video.connect("new-sample", self._appsink_video_new_sample)
|
||||
|
||||
self._player.add(queue_video)
|
||||
self._player.add(videoconvert)
|
||||
self._player.add(videoscale)
|
||||
self._player.add(videocapsfilter)
|
||||
self._player.add(appsink_video)
|
||||
queue_video.sync_state_with_parent()
|
||||
videoconvert.sync_state_with_parent()
|
||||
videoscale.sync_state_with_parent()
|
||||
videocapsfilter.sync_state_with_parent()
|
||||
appsink_video.sync_state_with_parent()
|
||||
|
||||
queue_video.link(videoconvert)
|
||||
videoconvert.link(videoscale)
|
||||
videoscale.link(videocapsfilter)
|
||||
videocapsfilter.link(appsink_video)
|
||||
|
||||
queue_pad = queue_video.get_static_pad("sink")
|
||||
pad.link(queue_pad)
|
||||
|
||||
def _appsink_audio_new_sample(self, appsink: GstApp.AppSink):
|
||||
buffer = appsink.pull_sample().get_buffer()
|
||||
(_, info) = buffer.map(Gst.MapFlags.READ)
|
||||
frame = AudioRawFrame(audio=info.data,
|
||||
sample_rate=self._out_params.audio_sample_rate,
|
||||
num_channels=self._out_params.audio_channels)
|
||||
asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop())
|
||||
buffer.unmap(info)
|
||||
return Gst.FlowReturn.OK
|
||||
|
||||
def _appsink_video_new_sample(self, appsink: GstApp.AppSink):
|
||||
buffer = appsink.pull_sample().get_buffer()
|
||||
(_, info) = buffer.map(Gst.MapFlags.READ)
|
||||
frame = ImageRawFrame(
|
||||
image=info.data,
|
||||
size=(self._out_params.video_width, self._out_params.video_height),
|
||||
format="RGB")
|
||||
asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop())
|
||||
buffer.unmap(info)
|
||||
return Gst.FlowReturn.OK
|
||||
Reference in New Issue
Block a user