From 278a2fed56040098dd0e21a70fe503915a43500a Mon Sep 17 00:00:00 2001 From: TomTom101 Date: Tue, 28 May 2024 10:51:42 +0200 Subject: [PATCH 1/7] wip: First stab at langchain support Is this a service or processor? How to deal with conversation history? LC has sophisticated means of this, but might get in the way of `LLMResponseAggregator` --- .../07b-interruptible-langchain.py | 111 ++++++++++++++++++ pyproject.toml | 1 + src/pipecat/services/langchain.py | 62 ++++++++++ tests/test_langchain.py | 57 +++++++++ 4 files changed, 231 insertions(+) create mode 100644 examples/foundational/07b-interruptible-langchain.py create mode 100644 src/pipecat/services/langchain.py create mode 100644 tests/test_langchain.py diff --git a/examples/foundational/07b-interruptible-langchain.py b/examples/foundational/07b-interruptible-langchain.py new file mode 100644 index 000000000..40082f994 --- /dev/null +++ b/examples/foundational/07b-interruptible-langchain.py @@ -0,0 +1,111 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.frames.frames import LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator) +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.langchain import LangchainProcessor +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.vad.silero import SileroVADAnalyzer + +load_dotenv(override=True) + +try: + from langchain.prompts import ChatPromptTemplate + from langchain_openai import ChatOpenAI +except ModuleNotFoundError as e: + logger.exception( + "You need to `pip install langchain_openai` for this example. Also, be sure to set `OPENAI_API_KEY` in the environment variable." + ) + raise Exception(f"Missing module: {e}") + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(room_url: str, token): + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token, + "Respond bot", + DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + tts = ElevenLabsTTSService( + aiohttp_session=session, + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + ) + + llm = ChatOpenAI(model="gpt-4o", temperature=0.7) + prompt = ChatPromptTemplate.from_messages( + [ + ("system", + "Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. Your response will be synthesized to voice and those characters will create unnatural sounds.", + ), + ("human", + "{input}"), + ]) + chain = prompt | llm + lc = LangchainProcessor(chain) + + tma_in = LLMUserResponseAggregator() + tma_out = LLMAssistantResponseAggregator() + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + tma_in, # User responses + lc, # Langchain + tts, # TTS + transport.output(), # Transport bot output + tma_out, # Assistant spoken responses + ] + ) + + task = PipelineTask(pipeline, allow_interruptions=True) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + # Kick off the conversation. + # the `LLMMessagesFrame` will be picked up by the LangchainProcessor using + # only the content of the last message to inject it in the prompt defined + # above. So no role is required here. + messages = [( + { + "content": "Please briefly introduce yourself to the user." + } + )] + await task.queue_frames([LLMMessagesFrame(messages)]) + + runner = PipelineRunner() + + await runner.run(task) + + +if __name__ == "__main__": + (url, token) = configure() + asyncio.run(main(url, token)) diff --git a/pyproject.toml b/pyproject.toml index 23245cfdc..75b8c0f72 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ] fal = [ "fal-client~=0.4.0" ] google = [ "google-generativeai~=0.5.3" ] fireworks = [ "openai~=1.26.0" ] +langchain = [ "langchain~=0.2.1" ] local = [ "pyaudio~=0.2.0" ] moondream = [ "einops~=0.8.0", "timm~=0.9.16", "transformers~=4.40.2" ] openai = [ "openai~=1.26.0" ] diff --git a/src/pipecat/services/langchain.py b/src/pipecat/services/langchain.py new file mode 100644 index 000000000..6675005eb --- /dev/null +++ b/src/pipecat/services/langchain.py @@ -0,0 +1,62 @@ +import sys +from typing import Union + +from langchain_core.messages import AIMessageChunk +from langchain_core.runnables import Runnable +from loguru import logger + +from pipecat.frames.frames import (Frame, LLMFullResponseEndFrame, + LLMFullResponseStartFrame, LLMMessagesFrame, + LLMResponseEndFrame, LLMResponseStartFrame, + TextFrame) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class LangchainProcessor(FrameProcessor): + def __init__(self, chain: Runnable, transcript_key: str = "input"): + super().__init__() + self._chain = chain + self._transcript_key = transcript_key + + async def process_frame(self, frame: Frame, direction: FrameDirection): + if isinstance(frame, LLMMessagesFrame): + # Messages are accumulated by the `LLMUserResponseAggregator` in a list of messages. + # The last one by the human is the one we want to send to the LLM. + logger.debug(f"Got transcription frame {frame}") + text: str = frame.messages[-1]["content"] + + await self._ainvoke(text.strip()) + else: + await self.push_frame(frame) + + async def _invoke(self, text: str): + response = await self._chain.ainvoke({self._transcript_key: text}) + await self.push_frame(LLMFullResponseStartFrame()) + await self.push_frame(TextFrame(response)) + await self.push_frame(LLMFullResponseEndFrame()) + + @staticmethod + def __get_token_value(text: Union[str, AIMessageChunk]) -> str | None: + match text: + case str(): + return text + case AIMessageChunk(): + return text.content + case _: + return None + + async def _ainvoke(self, text: str): + logger.debug(f"Invoking chain with {text}") + await self.push_frame(LLMFullResponseStartFrame()) + try: + async for token in self._chain.astream({self._transcript_key: text}): + await self.push_frame(LLMResponseStartFrame()) + await self.push_frame(TextFrame(self.__get_token_value(token))) + await self.push_frame(LLMResponseEndFrame()) + except GeneratorExit: + logger.warning("Generator was closed prematurely") + raise # Re-raise to ensure proper generator closure + except Exception as e: + logger.error(f"An unknown error occurred: {e}") + raise + await self.push_frame(LLMFullResponseEndFrame()) diff --git a/tests/test_langchain.py b/tests/test_langchain.py new file mode 100644 index 000000000..e204c56a2 --- /dev/null +++ b/tests/test_langchain.py @@ -0,0 +1,57 @@ +import pytest +from langchain.prompts import ChatPromptTemplate +from langchain_core.language_models import FakeStreamingListLLM + +from pipecat.frames.frames import (StopTaskFrame, TranscriptionFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator) +from pipecat.processors.logger import FrameLogger +from pipecat.services.langchain import LangchainProcessor + + +@pytest.fixture +def fake_llm(): + responses = ["Hello dear human"] + return FakeStreamingListLLM(responses=responses) + + +@pytest.mark.asyncio +async def test_langchain(fake_llm: FakeStreamingListLLM): + fl_in = FrameLogger("Inner") + fl_out = FrameLogger("Outer") + + messages = [("system", "Say hello to {name}"), ("human", "{input}")] + prompt = ChatPromptTemplate.from_messages(messages).partial(name="Thomas") + chain = prompt | fake_llm + proc = LangchainProcessor(chain=chain) + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline( + [ + fl_in, + tma_in, + proc, + tma_out, + fl_out, + ] + ) + + task = PipelineTask(pipeline) + await task.queue_frames( + [ + UserStartedSpeakingFrame(), + TranscriptionFrame(text="Hi World", user_id="user", timestamp="now"), + UserStoppedSpeakingFrame(), + StopTaskFrame(), + ] + ) + + runner = PipelineRunner() + await runner.run(task) From 6d24e836b029b8a6ee155f572833cf187aebf806 Mon Sep 17 00:00:00 2001 From: TomTom101 Date: Tue, 28 May 2024 11:31:36 +0200 Subject: [PATCH 2/7] wip: Example using LC message history --- .../07b-interruptible-langchain.py | 34 +++++++++++++++---- src/pipecat/services/langchain.py | 14 ++++++-- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/examples/foundational/07b-interruptible-langchain.py b/examples/foundational/07b-interruptible-langchain.py index 40082f994..cb375fd0f 100644 --- a/examples/foundational/07b-interruptible-langchain.py +++ b/examples/foundational/07b-interruptible-langchain.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: BSD 2-Clause License # + import asyncio import os import sys @@ -27,8 +28,13 @@ from pipecat.vad.silero import SileroVADAnalyzer load_dotenv(override=True) try: - from langchain.prompts import ChatPromptTemplate + from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder + from langchain_community.chat_message_histories import ChatMessageHistory + from langchain_core.chat_history import BaseChatMessageHistory + from langchain_core.runnables.history import (BaseChatMessageHistory, + RunnableWithMessageHistory) from langchain_openai import ChatOpenAI + except ModuleNotFoundError as e: logger.exception( "You need to `pip install langchain_openai` for this example. Also, be sure to set `OPENAI_API_KEY` in the environment variable." @@ -38,6 +44,14 @@ except ModuleNotFoundError as e: logger.remove(0) logger.add(sys.stderr, level="DEBUG") +message_store = {} + + +def get_session_history(session_id: str) -> BaseChatMessageHistory: + if session_id not in message_store: + message_store[session_id] = ChatMessageHistory() + return message_store[session_id] + async def main(room_url: str, token): async with aiohttp.ClientSession() as session: @@ -59,17 +73,22 @@ async def main(room_url: str, token): voice_id=os.getenv("ELEVENLABS_VOICE_ID"), ) - llm = ChatOpenAI(model="gpt-4o", temperature=0.7) prompt = ChatPromptTemplate.from_messages( [ ("system", - "Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. Your response will be synthesized to voice and those characters will create unnatural sounds.", + "Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. " + "Your response will be synthesized to voice and those characters will create unnatural sounds.", ), - ("human", - "{input}"), + MessagesPlaceholder("chat_history"), + ("human", "{input}"), ]) - chain = prompt | llm - lc = LangchainProcessor(chain) + chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7) + history_chain = RunnableWithMessageHistory( + chain, + get_session_history, + history_messages_key="chat_history", + input_messages_key="input") + lc = LangchainProcessor(history_chain) tma_in = LLMUserResponseAggregator() tma_out = LLMAssistantResponseAggregator() @@ -90,6 +109,7 @@ async def main(room_url: str, token): @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): transport.capture_participant_transcription(participant["id"]) + lc.set_participant_id(participant["id"]) # Kick off the conversation. # the `LLMMessagesFrame` will be picked up by the LangchainProcessor using # only the content of the last message to inject it in the prompt defined diff --git a/src/pipecat/services/langchain.py b/src/pipecat/services/langchain.py index 6675005eb..2042071a7 100644 --- a/src/pipecat/services/langchain.py +++ b/src/pipecat/services/langchain.py @@ -17,6 +17,10 @@ class LangchainProcessor(FrameProcessor): super().__init__() self._chain = chain self._transcript_key = transcript_key + self._participant_id: str | None = None + + def set_participant_id(self, participant_id: str): + self._participant_id = participant_id async def process_frame(self, frame: Frame, direction: FrameDirection): if isinstance(frame, LLMMessagesFrame): @@ -30,7 +34,10 @@ class LangchainProcessor(FrameProcessor): await self.push_frame(frame) async def _invoke(self, text: str): - response = await self._chain.ainvoke({self._transcript_key: text}) + response = await self._chain.ainvoke( + {self._transcript_key: text}, + config={"configurable": {"session_id": self._participant_id}}, + ) await self.push_frame(LLMFullResponseStartFrame()) await self.push_frame(TextFrame(response)) await self.push_frame(LLMFullResponseEndFrame()) @@ -49,7 +56,10 @@ class LangchainProcessor(FrameProcessor): logger.debug(f"Invoking chain with {text}") await self.push_frame(LLMFullResponseStartFrame()) try: - async for token in self._chain.astream({self._transcript_key: text}): + async for token in self._chain.astream( + {self._transcript_key: text}, + config={"configurable": {"session_id": self._participant_id}}, + ): await self.push_frame(LLMResponseStartFrame()) await self.push_frame(TextFrame(self.__get_token_value(token))) await self.push_frame(LLMResponseEndFrame()) From 335990c1453b0160eee868db2a65e734e63cf890 Mon Sep 17 00:00:00 2001 From: TomTom101 Date: Tue, 28 May 2024 11:42:17 +0200 Subject: [PATCH 3/7] wip: hint to install langchain_community --- examples/foundational/07b-interruptible-langchain.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/foundational/07b-interruptible-langchain.py b/examples/foundational/07b-interruptible-langchain.py index cb375fd0f..c1c467f9f 100644 --- a/examples/foundational/07b-interruptible-langchain.py +++ b/examples/foundational/07b-interruptible-langchain.py @@ -31,13 +31,12 @@ try: from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_community.chat_message_histories import ChatMessageHistory from langchain_core.chat_history import BaseChatMessageHistory - from langchain_core.runnables.history import (BaseChatMessageHistory, - RunnableWithMessageHistory) + from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_openai import ChatOpenAI except ModuleNotFoundError as e: logger.exception( - "You need to `pip install langchain_openai` for this example. Also, be sure to set `OPENAI_API_KEY` in the environment variable." + "You need to `pip install langchain_openai langchain_community` for this example. Also, be sure to set `OPENAI_API_KEY` in the environment variable." ) raise Exception(f"Missing module: {e}") From 143033d7db7ea0778d6150ebfb2f526b1b3cc689 Mon Sep 17 00:00:00 2001 From: TomTom101 Date: Tue, 28 May 2024 16:07:40 +0200 Subject: [PATCH 4/7] fix: install langchain-community with the langchain extra --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 75b8c0f72..6555623e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ] fal = [ "fal-client~=0.4.0" ] google = [ "google-generativeai~=0.5.3" ] fireworks = [ "openai~=1.26.0" ] -langchain = [ "langchain~=0.2.1" ] +langchain = [ "langchain~=0.2.1", "langchain-community~=0.2.1" ] local = [ "pyaudio~=0.2.0" ] moondream = [ "einops~=0.8.0", "timm~=0.9.16", "transformers~=4.40.2" ] openai = [ "openai~=1.26.0" ] From 2bf094b950b426b80734e32601593fc3a665886c Mon Sep 17 00:00:00 2001 From: TomTom101 Date: Thu, 30 May 2024 10:43:33 +0200 Subject: [PATCH 5/7] test(langchain): Rewrite to unittest, make it meaningful --- tests/test_langchain.py | 113 +++++++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 42 deletions(-) diff --git a/tests/test_langchain.py b/tests/test_langchain.py index e204c56a2..0f3ccdc86 100644 --- a/tests/test_langchain.py +++ b/tests/test_langchain.py @@ -1,8 +1,11 @@ -import pytest +import unittest + from langchain.prompts import ChatPromptTemplate from langchain_core.language_models import FakeStreamingListLLM -from pipecat.frames.frames import (StopTaskFrame, TranscriptionFrame, +from pipecat.frames.frames import (LLMFullResponseEndFrame, + LLMFullResponseStartFrame, StopTaskFrame, + TextFrame, TranscriptionFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame) from pipecat.pipeline.pipeline import Pipeline @@ -10,48 +13,74 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.processors.aggregators.llm_response import ( LLMAssistantResponseAggregator, LLMUserResponseAggregator) -from pipecat.processors.logger import FrameLogger +from pipecat.processors.frame_processor import FrameProcessor from pipecat.services.langchain import LangchainProcessor -@pytest.fixture -def fake_llm(): - responses = ["Hello dear human"] - return FakeStreamingListLLM(responses=responses) +class TestLangchain(unittest.IsolatedAsyncioTestCase): + + class MockProcessor(FrameProcessor): + def __init__(self, name): + self.name = name + self.token: list[str] = [] + # Start collecting tokens when we see the start frame + self.start_collecting = False + + def __str__(self): + return self.name + + async def process_frame(self, frame, direction): + if isinstance(frame, LLMFullResponseStartFrame): + self.start_collecting = True + elif isinstance(frame, TextFrame) and self.start_collecting: + self.token.append(frame.text) + elif isinstance(frame, LLMFullResponseEndFrame): + self.start_collecting = False + + await self.push_frame(frame, direction) + + def setUp(self): + self.expected_response = "Hello dear human" + self.fake_llm = FakeStreamingListLLM(responses=[self.expected_response]) + self.mock_proc = self.MockProcessor("token_collector") + + async def test_langchain(self): + + messages = [("system", "Say hello to {name}"), ("human", "{input}")] + prompt = ChatPromptTemplate.from_messages(messages).partial(name="Thomas") + chain = prompt | self.fake_llm + proc = LangchainProcessor(chain=chain) + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline( + [ + tma_in, + proc, + self.mock_proc, + tma_out, + ] + ) + + task = PipelineTask(pipeline) + await task.queue_frames( + [ + UserStartedSpeakingFrame(), + TranscriptionFrame(text="Hi World", user_id="user", timestamp="now"), + UserStoppedSpeakingFrame(), + StopTaskFrame(), + ] + ) + + runner = PipelineRunner() + await runner.run(task) + self.assertEqual("".join(self.mock_proc.token), self.expected_response) + # TODO: Address this issue + # This next one would fail with: + # AssertionError: ' H e l l o d e a r h u m a n' != 'Hello dear human' + # self.assertEqual(tma_out.messages[-1]["content"], self.expected_response) -@pytest.mark.asyncio -async def test_langchain(fake_llm: FakeStreamingListLLM): - fl_in = FrameLogger("Inner") - fl_out = FrameLogger("Outer") - - messages = [("system", "Say hello to {name}"), ("human", "{input}")] - prompt = ChatPromptTemplate.from_messages(messages).partial(name="Thomas") - chain = prompt | fake_llm - proc = LangchainProcessor(chain=chain) - - tma_in = LLMUserResponseAggregator(messages) - tma_out = LLMAssistantResponseAggregator(messages) - - pipeline = Pipeline( - [ - fl_in, - tma_in, - proc, - tma_out, - fl_out, - ] - ) - - task = PipelineTask(pipeline) - await task.queue_frames( - [ - UserStartedSpeakingFrame(), - TranscriptionFrame(text="Hi World", user_id="user", timestamp="now"), - UserStoppedSpeakingFrame(), - StopTaskFrame(), - ] - ) - - runner = PipelineRunner() - await runner.run(task) +if __name__ == "__main__": + unittest.main() From b19243ab75219bc7fd51694ad0dc88ebcfa1dbe3 Mon Sep 17 00:00:00 2001 From: TomTom101 Date: Thu, 30 May 2024 10:53:42 +0200 Subject: [PATCH 6/7] fix: corrected hint to install Langchain libs --- examples/foundational/07b-interruptible-langchain.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/foundational/07b-interruptible-langchain.py b/examples/foundational/07b-interruptible-langchain.py index c1c467f9f..2ba4b6385 100644 --- a/examples/foundational/07b-interruptible-langchain.py +++ b/examples/foundational/07b-interruptible-langchain.py @@ -36,7 +36,7 @@ try: except ModuleNotFoundError as e: logger.exception( - "You need to `pip install langchain_openai langchain_community` for this example. Also, be sure to set `OPENAI_API_KEY` in the environment variable." + "You need to `pip install langchain langchain-openai langchain-community` for this example. Also, be sure to set `OPENAI_API_KEY` in the environment variable." ) raise Exception(f"Missing module: {e}") From 4b39efeee31267152e40fc01a023c2639746a007 Mon Sep 17 00:00:00 2001 From: TomTom101 Date: Fri, 31 May 2024 10:19:27 +0200 Subject: [PATCH 7/7] fix(langchain): try/catch langchain import in service; Only `langchain` is installed with the [langchain] extra (#190) --- .../foundational/07b-interruptible-langchain.py | 14 +++++++------- pyproject.toml | 2 +- src/pipecat/services/langchain.py | 11 +++++++++-- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/examples/foundational/07b-interruptible-langchain.py b/examples/foundational/07b-interruptible-langchain.py index 2ba4b6385..5e32964f9 100644 --- a/examples/foundational/07b-interruptible-langchain.py +++ b/examples/foundational/07b-interruptible-langchain.py @@ -36,7 +36,7 @@ try: except ModuleNotFoundError as e: logger.exception( - "You need to `pip install langchain langchain-openai langchain-community` for this example. Also, be sure to set `OPENAI_API_KEY` in the environment variable." + "In order to run this example you need to `pip install pipecat-ai[langchain] langchain-community langchain-openai. Also, be sure to set `OPENAI_API_KEY` in the environment variable." ) raise Exception(f"Missing module: {e}") @@ -94,12 +94,12 @@ async def main(room_url: str, token): pipeline = Pipeline( [ - transport.input(), # Transport user input - tma_in, # User responses - lc, # Langchain - tts, # TTS - transport.output(), # Transport bot output - tma_out, # Assistant spoken responses + transport.input(), # Transport user input + tma_in, # User responses + lc, # Langchain + tts, # TTS + transport.output(), # Transport bot output + tma_out, # Assistant spoken responses ] ) diff --git a/pyproject.toml b/pyproject.toml index 6555623e4..75b8c0f72 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ] fal = [ "fal-client~=0.4.0" ] google = [ "google-generativeai~=0.5.3" ] fireworks = [ "openai~=1.26.0" ] -langchain = [ "langchain~=0.2.1", "langchain-community~=0.2.1" ] +langchain = [ "langchain~=0.2.1" ] local = [ "pyaudio~=0.2.0" ] moondream = [ "einops~=0.8.0", "timm~=0.9.16", "transformers~=4.40.2" ] openai = [ "openai~=1.26.0" ] diff --git a/src/pipecat/services/langchain.py b/src/pipecat/services/langchain.py index 2042071a7..83522a16c 100644 --- a/src/pipecat/services/langchain.py +++ b/src/pipecat/services/langchain.py @@ -1,8 +1,6 @@ import sys from typing import Union -from langchain_core.messages import AIMessageChunk -from langchain_core.runnables import Runnable from loguru import logger from pipecat.frames.frames import (Frame, LLMFullResponseEndFrame, @@ -11,6 +9,15 @@ from pipecat.frames.frames import (Frame, LLMFullResponseEndFrame, TextFrame) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +try: + from langchain_core.messages import AIMessageChunk + from langchain_core.runnables import Runnable +except ModuleNotFoundError as e: + logger.exception( + "In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. " + ) + raise Exception(f"Missing module: {e}") + class LangchainProcessor(FrameProcessor): def __init__(self, chain: Runnable, transcript_key: str = "input"):