Merge pull request #2579 from pipecat-ai/filipi/input_message

Creating InputTransportMessageUrgentFrame
This commit is contained in:
Filipi da Silva Fuchter
2025-09-03 19:01:07 -03:00
committed by GitHub
5 changed files with 42 additions and 3 deletions

View File

@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added new frames `InputTransportMessageUrgentFrame` and
`DailyInputTransportMessageUrgentFrame` for transport messages received from
external sources.
- Added `UserSpeakingFrame`. This will be sent upstream and downstream while VAD
detects the user is speaking.
@@ -134,6 +138,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue where messages received from the transport were always being resent.
- Fixed `SmallWebRTCTransport` to not use `mid` to decide if the transceiver should
be `sendrecv` or not.

View File

@@ -1128,6 +1128,23 @@ class TransportMessageUrgentFrame(SystemFrame):
return f"{self.name}(message: {self.message})"
@dataclass
class InputTransportMessageUrgentFrame(TransportMessageUrgentFrame):
"""Frame for transport messages received from external sources.
This frame wraps incoming transport messages to distinguish them from outgoing
urgent transport messages (TransportMessageUrgentFrame), preventing infinite
message loops in the transport layer. It inherits the message payload from
TransportMessageFrame while marking the message as having been received
rather than generated locally.
Used by transport implementations to properly handle bidirectional message
flow without creating feedback loops.
"""
pass
@dataclass
class UserImageRequestFrame(SystemFrame):
"""Frame requesting an image from a specific user.

View File

@@ -29,6 +29,7 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InputTransportMessageUrgentFrame,
MixerControlFrame,
OutputAudioRawFrame,
OutputDTMFFrame,
@@ -275,7 +276,9 @@ class BaseOutputTransport(FrameProcessor):
elif isinstance(frame, StartInterruptionFrame):
await self.push_frame(frame, direction)
await self._handle_frame(frame)
elif isinstance(frame, TransportMessageUrgentFrame):
elif isinstance(frame, TransportMessageUrgentFrame) and not isinstance(
frame, InputTransportMessageUrgentFrame
):
await self.send_message(frame)
elif isinstance(frame, OutputDTMFUrgentFrame):
await self.write_dtmf(frame)

View File

@@ -29,6 +29,7 @@ from pipecat.frames.frames import (
ErrorFrame,
Frame,
InputAudioRawFrame,
InputTransportMessageUrgentFrame,
InterimTranscriptionFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
@@ -95,6 +96,17 @@ class DailyTransportMessageUrgentFrame(TransportMessageUrgentFrame):
participant_id: Optional[str] = None
@dataclass
class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
"""Frame for input urgent transport messages in Daily calls.
Parameters:
participant_id: Optional ID of the participant this message is for/from.
"""
participant_id: Optional[str] = None
class WebRTCVADAnalyzer(VADAnalyzer):
"""Voice Activity Detection analyzer using WebRTC.
@@ -1547,7 +1559,7 @@ class DailyInputTransport(BaseInputTransport):
message: The message data to send.
sender: ID of the message sender.
"""
frame = DailyTransportMessageUrgentFrame(message=message, participant_id=sender)
frame = DailyInputTransportMessageUrgentFrame(message=message, participant_id=sender)
await self.push_frame(frame)
#

View File

@@ -26,6 +26,7 @@ from pipecat.frames.frames import (
EndFrame,
Frame,
InputAudioRawFrame,
InputTransportMessageUrgentFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
SpriteFrame,
@@ -650,7 +651,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
message: The application message to process.
"""
logger.debug(f"Received app message inside SmallWebRTCInputTransport {message}")
frame = TransportMessageUrgentFrame(message=message)
frame = InputTransportMessageUrgentFrame(message=message)
await self.push_frame(frame)
# Add this method similar to DailyInputTransport.request_participant_image