diff --git a/CHANGELOG.md b/CHANGELOG.md index b47a26116..0e0d8d530 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added new frames `InputTransportMessageUrgentFrame` and + `DailyInputTransportMessageUrgentFrame` for transport messages received from + external sources. + - Added `UserSpeakingFrame`. This will be sent upstream and downstream while VAD detects the user is speaking. @@ -134,6 +138,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue where messages received from the transport were always being resent. + - Fixed `SmallWebRTCTransport` to not use `mid` to decide if the transceiver should be `sendrecv` or not. diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 0d8471756..4b380c845 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -1128,6 +1128,23 @@ class TransportMessageUrgentFrame(SystemFrame): return f"{self.name}(message: {self.message})" +@dataclass +class InputTransportMessageUrgentFrame(TransportMessageUrgentFrame): + """Frame for transport messages received from external sources. + + This frame wraps incoming transport messages to distinguish them from outgoing + urgent transport messages (TransportMessageUrgentFrame), preventing infinite + message loops in the transport layer. It inherits the message payload from + TransportMessageFrame while marking the message as having been received + rather than generated locally. + + Used by transport implementations to properly handle bidirectional message + flow without creating feedback loops. + """ + + pass + + @dataclass class UserImageRequestFrame(SystemFrame): """Frame requesting an image from a specific user. diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 11e212437..5c07ed518 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -29,6 +29,7 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, Frame, + InputTransportMessageUrgentFrame, MixerControlFrame, OutputAudioRawFrame, OutputDTMFFrame, @@ -275,7 +276,9 @@ class BaseOutputTransport(FrameProcessor): elif isinstance(frame, StartInterruptionFrame): await self.push_frame(frame, direction) await self._handle_frame(frame) - elif isinstance(frame, TransportMessageUrgentFrame): + elif isinstance(frame, TransportMessageUrgentFrame) and not isinstance( + frame, InputTransportMessageUrgentFrame + ): await self.send_message(frame) elif isinstance(frame, OutputDTMFUrgentFrame): await self.write_dtmf(frame) diff --git a/src/pipecat/transports/daily/transport.py b/src/pipecat/transports/daily/transport.py index cfc7998ef..3a1a0b2fd 100644 --- a/src/pipecat/transports/daily/transport.py +++ b/src/pipecat/transports/daily/transport.py @@ -29,6 +29,7 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, InputAudioRawFrame, + InputTransportMessageUrgentFrame, InterimTranscriptionFrame, OutputAudioRawFrame, OutputImageRawFrame, @@ -95,6 +96,17 @@ class DailyTransportMessageUrgentFrame(TransportMessageUrgentFrame): participant_id: Optional[str] = None +@dataclass +class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame): + """Frame for input urgent transport messages in Daily calls. + + Parameters: + participant_id: Optional ID of the participant this message is for/from. + """ + + participant_id: Optional[str] = None + + class WebRTCVADAnalyzer(VADAnalyzer): """Voice Activity Detection analyzer using WebRTC. @@ -1547,7 +1559,7 @@ class DailyInputTransport(BaseInputTransport): message: The message data to send. sender: ID of the message sender. """ - frame = DailyTransportMessageUrgentFrame(message=message, participant_id=sender) + frame = DailyInputTransportMessageUrgentFrame(message=message, participant_id=sender) await self.push_frame(frame) # diff --git a/src/pipecat/transports/smallwebrtc/transport.py b/src/pipecat/transports/smallwebrtc/transport.py index fd7a87f8c..681674000 100644 --- a/src/pipecat/transports/smallwebrtc/transport.py +++ b/src/pipecat/transports/smallwebrtc/transport.py @@ -26,6 +26,7 @@ from pipecat.frames.frames import ( EndFrame, Frame, InputAudioRawFrame, + InputTransportMessageUrgentFrame, OutputAudioRawFrame, OutputImageRawFrame, SpriteFrame, @@ -650,7 +651,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): message: The application message to process. """ logger.debug(f"Received app message inside SmallWebRTCInputTransport {message}") - frame = TransportMessageUrgentFrame(message=message) + frame = InputTransportMessageUrgentFrame(message=message) await self.push_frame(frame) # Add this method similar to DailyInputTransport.request_participant_image