processors: add AsyncGeneratorProcessor

This commit is contained in:
Aleix Conchillo Flaqué
2024-09-24 19:35:05 -07:00
parent cf0ab85e2c
commit b8713666c2
2 changed files with 46 additions and 0 deletions

View File

@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `AsyncGeneratorProcessor`. This processor can be used together with a
`FrameSerializer` as an async generator. It provides a `generator()` function
that returns an `AsyncGenerator` and that yields serialized frames.
- Added `EndTaskFrame` and `CancelTaskFrame`. These are new frames that are
meant to be pushed upstream to tell the pipeline task to stop nicely or
immediately respectively.

View File

@@ -0,0 +1,42 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import Any, AsyncGenerator
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
)
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.serializers.base_serializer import FrameSerializer
class AsyncGeneratorProcessor(FrameProcessor):
def __init__(self, *, serializer: FrameSerializer, **kwargs):
super().__init__(**kwargs)
self._serializer = serializer
self._data_queue = asyncio.Queue()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, (CancelFrame, EndFrame)):
await self._data_queue.put(None)
else:
data = self._serializer.serialize(frame)
if data:
await self._data_queue.put(data)
async def generator(self) -> AsyncGenerator[Any, None]:
running = True
while running:
data = await self._data_queue.get()
running = data is not None
if data:
yield data