commit 53d2f5233d3f0cb204135c08d6bd59c3195869be Author: Xin Wang Date: Thu May 21 13:08:40 2026 +0800 first vesion diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..93cf77a --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.venv/ +__pycache__/ +*.py[cod] +.env + diff --git a/README.md b/README.md new file mode 100644 index 0000000..3507075 --- /dev/null +++ b/README.md @@ -0,0 +1,171 @@ +# AI VideoAssistant Engine v5 Pipecat Minimal + +This is a Pipecat-based rewrite of `AI-VideoAssistant-engine-v3-minimal` in a separate folder. + +It intentionally uses Pipecat's FastAPI websocket transport with `ProtobufFrameSerializer`. +The old v3-minimal base64 JSON audio protocol is not supported here. + +## Shape + +```text +FastAPI /ws +-> Pipecat FastAPIWebsocketTransport +-> OpenAI STT +-> LLM context aggregator +-> OpenAI LLM +-> OpenAI TTS +-> Pipecat websocket output +``` + +## Run + +```bash +cd AI-VideoAssistant-engine-v5-pipecat-minimal +uv venv .venv +source .venv/bin/activate +uv pip install -r requirements.txt +export OPENAI_API_KEY=... +uv run uvicorn engine.main:app --reload --host 0.0.0.0 --port 8001 +``` + +Or pass keys directly in `config.json`. + +```bash +uv run python -m engine.main --config ./config.json +``` + +## Protocols + +Pipecat-native endpoint: + +```text +ws://localhost:8001/ws +``` + +The websocket payloads are Pipecat protobuf frames. A client should use Pipecat's websocket client/serializer stack or generate frames compatible with `pipecat.frames.protobufs.frames_pb2`. + +Important defaults: + +- serializer: `ProtobufFrameSerializer` +- audio input: PCM16 mono +- sample rate: `16000` +- endpoint: `/ws` + +Product endpoint: + +```text +ws://localhost:8001/ws-product +``` + +This endpoint uses a stable JSON/base64 protocol named `va.ws.v1`. It is meant for browser, mobile, or other product applications that should not depend on Pipecat's internal protobuf frame schema. + +Start a session: + +```json +{ + "type": "session.start", + "protocol": "va.ws.v1", + "audio": { + "encoding": "pcm_s16le", + "sample_rate": 16000, + "channels": 1 + } +} +``` + +Send audio: + +```json +{ + "type": "input.audio", + "audio": "", + "sample_rate": 16000, + "channels": 1 +} +``` + +The adapter also accepts raw binary websocket messages as PCM16 audio chunks. JSON/base64 is easier to inspect; binary is better for latency and bandwidth. + +Stop: + +```json +{"type": "session.stop", "reason": "done"} +``` + +Cancel: + +```json +{"type": "response.cancel"} +``` + +Returned bot audio: + +```json +{ + "type": "response.audio.delta", + "protocol": "va.ws.v1", + "seq": 1, + "audio": "", + "bytes": 6400, + "sample_rate": 16000, + "channels": 1 +} +``` + +Returned transcripts and assistant text: + +```json +{"type": "input.transcript.final", "text": "What's the weather?", "user_id": "...", "timestamp": "..."} +{"type": "response.text.started"} +{"type": "response.text.delta", "text": "It's "} +{"type": "response.text.delta", "text": "sunny in "} +{"type": "response.text.delta", "text": "Berlin."} +{"type": "response.text.final", "text": "It's sunny in Berlin.", "interrupted": false} +``` + +`response.text.started` fires at the start of every assistant turn (LLM +streaming reply, or a fixed `TTSSpeakFrame` greeting). +`response.text.delta` events stream LLM token chunks as they're produced, +**ahead of the synthesized audio**, because the producer sits upstream of +the TTS in the pipeline. `response.text.final` fires when the turn ends, +carrying the full concatenated assistant text and an `interrupted` flag +(true when an `input.text` or barge-in cut the turn short). + +## Stream A WAV File + +Start the server: + +```bash +cd AI-VideoAssistant-engine-v5-pipecat-minimal +source .venv/bin/activate +export OPENAI_API_KEY=... +uvicorn engine.main:app --host 127.0.0.1 --port 8001 +``` + +In another terminal, stream a WAV file through the product adapter: + +```bash +cd AI-VideoAssistant-engine-v5-pipecat-minimal +source .venv/bin/activate +python scripts/stream_wav_product_ws.py \ + ../AI-VideoAssistant-engine-v3-minimal/data/audio_examples/three_utterances_simple.wav \ + --url ws://127.0.0.1:8001/ws-product \ + --save-stereo-wav /tmp/product-conversation.wav +``` + +Or stream a WAV file as Pipecat protobuf audio frames: + +```bash +cd AI-VideoAssistant-engine-v5-pipecat-minimal +source .venv/bin/activate +python scripts/stream_wav_pipecat_ws.py \ + ../AI-VideoAssistant-engine-v3-minimal/data/audio_examples/three_utterances_simple.wav \ + --url ws://127.0.0.1:8001/ws \ + --save-stereo-wav /tmp/pipecat-conversation.wav +``` + +The input WAV must be PCM16 mono at 16 kHz. The script sends 20 ms chunks in real time and prints any text, transcription, message, or audio frames returned by the server. + +## Notes + +This folder keeps the rewrite minimal on purpose. Dograh's useful pattern is the separation between app entrypoint, service factory, and pipeline builder; its workflow graph, database, telephony, recordings, and pricing layers are deliberately left out. diff --git a/config.json b/config.json new file mode 100644 index 0000000..ef49a80 --- /dev/null +++ b/config.json @@ -0,0 +1,43 @@ +{ + "server": { + "host": "0.0.0.0", + "port": 8001, + "cors_origins": ["http://localhost:3000", "http://localhost:8080"] + }, + "audio": { + "sample_rate_hz": 16000, + "channels": 1, + "frame_ms": 20 + }, + "session": { + "inactivity_timeout_sec": 60 + }, + "agent": { + "system_prompt": "You are a helpful, friendly voice assistant. Keep responses concise and natural for spoken conversation.", + "greeting": "Please introduce yourself briefly.", + "greeting_mode": "generated" + }, + "services": { + "stt": { + "provider": "openai", + "api_key": "", + "base_url": null, + "model": "gpt-4o-mini-transcribe", + "language": "en" + }, + "llm": { + "provider": "openai", + "api_key": "", + "base_url": null, + "model": "gpt-4o-mini", + "temperature": 0.7 + }, + "tts": { + "provider": "openai", + "api_key": "", + "base_url": null, + "model": "gpt-4o-mini-tts", + "voice": "alloy" + } + } +} diff --git a/config/openai.example.json b/config/openai.example.json new file mode 100644 index 0000000..bdc5548 --- /dev/null +++ b/config/openai.example.json @@ -0,0 +1,43 @@ +{ + "server": { + "host": "0.0.0.0", + "port": 8001, + "cors_origins": ["*"] + }, + "audio": { + "sample_rate_hz": 16000, + "channels": 1, + "frame_ms": 20 + }, + "session": { + "inactivity_timeout_sec": 60 + }, + "agent": { + "system_prompt": "You are a concise voice assistant.", + "greeting": "Please say hello in one short sentence.", + "greeting_mode": "generated" + }, + "services": { + "stt": { + "provider": "openai", + "api_key": "", + "base_url": null, + "model": "gpt-4o-mini-transcribe", + "language": "en" + }, + "llm": { + "provider": "openai", + "api_key": "", + "base_url": null, + "model": "gpt-4o-mini", + "temperature": 0.7 + }, + "tts": { + "provider": "openai", + "api_key": "", + "base_url": null, + "model": "gpt-4o-mini-tts", + "voice": "alloy" + } + } +} diff --git a/config/siliconflow.json b/config/siliconflow.json new file mode 100644 index 0000000..4a78706 --- /dev/null +++ b/config/siliconflow.json @@ -0,0 +1,44 @@ +{ + "server": { + "host": "0.0.0.0", + "port": 8001, + "cors_origins": ["*"] + }, + "audio": { + "sample_rate_hz": 16000, + "channels": 1, + "frame_ms": 20 + }, + "session": { + "inactivity_timeout_sec": 60 + }, + "agent": { + "system_prompt": "你是一个有用的语音对话助手名字叫小白", + "greeting": "你好,我是小白,请问有什么可以帮你?", + "greeting_mode": "fixed" + }, + "services": { + "stt": { + "provider": "openai", + "api_key": "sk-uudpgflahqqjbofhgcbwjjefgwhvwwmxgeyehcueqlemwavq", + "base_url": "https://api.siliconflow.cn/v1", + "model": "TeleAI/TeleSpeechASR", + "language": "zh" + }, + "llm": { + "provider": "openai", + "api_key": "sk-fc4d59b360475f53401a864db8ce0985010acc4e696723d20a90d6569f38d80a", + "base_url": "https://api.qnaigc.com/v1", + "model": "deepseek-v3", + "temperature": 0.7 + }, + "tts": { + "provider": "openai", + "api_key": "sk-uudpgflahqqjbofhgcbwjjefgwhvwwmxgeyehcueqlemwavq", + "base_url": "https://api.siliconflow.cn/v1", + "model": "FunAudioLLM/CosyVoice2-0.5B", + "voice": "FunAudioLLM/CosyVoice2-0.5B:anna", + "source_sample_rate_hz": 24000 + } + } +} diff --git a/engine/__init__.py b/engine/__init__.py new file mode 100644 index 0000000..711efd5 --- /dev/null +++ b/engine/__init__.py @@ -0,0 +1,2 @@ +"""Minimal Pipecat-based voice engine.""" + diff --git a/engine/config.py b/engine/config.py new file mode 100644 index 0000000..937e506 --- /dev/null +++ b/engine/config.py @@ -0,0 +1,118 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from pathlib import Path + + +@dataclass(frozen=True) +class ServerConfig: + host: str = "0.0.0.0" + port: int = 8000 + cors_origins: list[str] = field(default_factory=list) + + +@dataclass(frozen=True) +class AudioConfig: + sample_rate_hz: int = 16000 + channels: int = 1 + frame_ms: int = 20 + + @property + def frame_bytes(self) -> int: + return int(self.sample_rate_hz * self.frame_ms / 1000) * self.channels * 2 + + +@dataclass(frozen=True) +class SessionConfig: + inactivity_timeout_sec: int = 60 + + +@dataclass(frozen=True) +class AgentConfig: + system_prompt: str = "You are a helpful, friendly voice assistant." + greeting: str | None = None + greeting_mode: str = "generated" + + +@dataclass(frozen=True) +class LLMConfig: + provider: str = "openai" + api_key: str = "" + base_url: str | None = None + model: str = "gpt-4o-mini" + temperature: float | None = 0.7 + + +@dataclass(frozen=True) +class STTConfig: + provider: str = "openai" + api_key: str = "" + base_url: str | None = None + model: str = "gpt-4o-mini-transcribe" + language: str | None = "en" + + +@dataclass(frozen=True) +class TTSConfig: + provider: str = "openai" + api_key: str = "" + base_url: str | None = None + model: str = "gpt-4o-mini-tts" + voice: str = "alloy" + source_sample_rate_hz: int | None = None + + +@dataclass(frozen=True) +class ServicesConfig: + llm: LLMConfig = field(default_factory=LLMConfig) + stt: STTConfig = field(default_factory=STTConfig) + tts: TTSConfig = field(default_factory=TTSConfig) + + +@dataclass(frozen=True) +class EngineConfig: + server: ServerConfig = field(default_factory=ServerConfig) + audio: AudioConfig = field(default_factory=AudioConfig) + session: SessionConfig = field(default_factory=SessionConfig) + agent: AgentConfig = field(default_factory=AgentConfig) + services: ServicesConfig = field(default_factory=ServicesConfig) + + +def load_config(path: str | Path = "config.json") -> EngineConfig: + config_path = Path(path) + if not config_path.exists() and str(path) == "config.json": + config_path = Path(__file__).resolve().parent.parent / "config.json" + data = json.loads(config_path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + raise ValueError(f"Config file must contain a JSON object: {config_path}") + return config_from_dict(data) + + +def config_from_dict(data: dict) -> EngineConfig: + services = _dict(data.get("services")) + agent = _dict(data.get("agent")) + if agent.get("greeting") == "": + agent["greeting"] = None + if agent.get("greeting_mode") not in (None, "generated", "fixed", "off"): + raise ValueError("agent.greeting_mode must be one of: generated, fixed, off") + + stt = _dict(services.get("stt") or services.get("asr")) + if stt.get("language") == "": + stt["language"] = None + + return EngineConfig( + server=ServerConfig(**_dict(data.get("server"))), + audio=AudioConfig(**_dict(data.get("audio"))), + session=SessionConfig(**_dict(data.get("session"))), + agent=AgentConfig(**agent), + services=ServicesConfig( + llm=LLMConfig(**_dict(services.get("llm"))), + stt=STTConfig(**stt), + tts=TTSConfig(**_dict(services.get("tts"))), + ), + ) + + +def _dict(value: object) -> dict: + return dict(value) if isinstance(value, dict) else {} diff --git a/engine/main.py b/engine/main.py new file mode 100644 index 0000000..b37064b --- /dev/null +++ b/engine/main.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import argparse +from functools import lru_cache + +from fastapi import FastAPI, WebSocket +from fastapi.middleware.cors import CORSMiddleware + +from .config import EngineConfig, load_config +from .pipeline import run_product_voice_pipeline, run_voice_pipeline + + +@lru_cache(maxsize=8) +def get_config(path: str = "config.json") -> EngineConfig: + return load_config(path) + + +def create_app(config_path: str = "config.json") -> FastAPI: + config = get_config(config_path) + app = FastAPI(title="AI VideoAssistant Engine v5 Pipecat Minimal", version="0.1.0") + app.state.config = config + + app.add_middleware( + CORSMiddleware, + allow_origins=config.server.cors_origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + @app.get("/health") + async def health() -> dict[str, object]: + return { + "status": "healthy", + "protocols": { + "/ws": "pipecat.websocket.protobuf", + "/ws-product": "va.ws.v1.json_base64", + }, + "features": { + "product_text_input": True, + "product_text_interrupt": True, + }, + "llm_provider": config.services.llm.provider, + "stt_provider": config.services.stt.provider, + "tts_provider": config.services.tts.provider, + } + + @app.websocket("/ws") + async def websocket_endpoint(websocket: WebSocket) -> None: + await websocket.accept() + await run_voice_pipeline(websocket, config) + + @app.websocket("/ws-product") + async def product_websocket_endpoint(websocket: WebSocket) -> None: + await websocket.accept() + await run_product_voice_pipeline(websocket, config) + + return app + + +app = create_app() + + +def main() -> None: + import uvicorn + + parser = argparse.ArgumentParser(description="Run the minimal Pipecat voice engine.") + parser.add_argument("--config", default="config.json") + args = parser.parse_args() + + config = load_config(args.config) + uvicorn.run( + create_app(args.config), + host=config.server.host, + port=config.server.port, + ) + + +if __name__ == "__main__": + main() diff --git a/engine/pipeline.py b/engine/pipeline.py new file mode 100644 index 0000000..b7eb525 --- /dev/null +++ b/engine/pipeline.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import ( + LLMRunFrame, + OutputTransportMessageUrgentFrame, + TTSSpeakFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + AssistantTurnStoppedMessage, + LLMContextAggregatorPair, + LLMUserAggregatorParams, + UserTurnStoppedMessage, +) +from pipecat.serializers.protobuf import ProtobufFrameSerializer +from pipecat.serializers.base_serializer import FrameSerializer +from pipecat.transports.websocket.fastapi import ( + FastAPIWebsocketParams, + FastAPIWebsocketTransport, +) + +from .config import EngineConfig +from .product_protocol import ProductWebsocketSerializer +from .services import create_llm_service, create_stt_service, create_tts_service +from .text_input import ProductTextInputProcessor +from .text_stream import ProductTextStreamProcessor + + +async def run_voice_pipeline(websocket, config: EngineConfig) -> None: + await run_pipeline_with_serializer( + websocket, + config, + serializer=ProtobufFrameSerializer(), + client_label="Pipecat protobuf", + ) + + +async def run_product_voice_pipeline(websocket, config: EngineConfig) -> None: + await run_pipeline_with_serializer( + websocket, + config, + serializer=ProductWebsocketSerializer( + sample_rate=config.audio.sample_rate_hz, + channels=config.audio.channels, + ), + client_label="Product JSON", + ) + + +async def run_pipeline_with_serializer( + websocket, + config: EngineConfig, + *, + serializer: FrameSerializer, + client_label: str, +) -> None: + transport = FastAPIWebsocketTransport( + websocket=websocket, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=config.audio.sample_rate_hz, + audio_out_sample_rate=config.audio.sample_rate_hz, + audio_in_channels=config.audio.channels, + audio_out_channels=config.audio.channels, + serializer=serializer, + session_timeout=None, + ), + ) + + stt = create_stt_service(config.services.stt) + llm = create_llm_service(config.services.llm) + tts = create_tts_service(config.services.tts, config.audio) + + messages = [{"role": "developer", "content": config.agent.system_prompt}] + if config.agent.greeting and config.agent.greeting_mode == "generated": + messages.append({"role": "developer", "content": config.agent.greeting}) + + context = LLMContext(messages) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + pipeline = Pipeline( + [ + transport.input(), + ProductTextInputProcessor(), + stt, + user_aggregator, + llm, + ProductTextStreamProcessor(), + tts, + transport.output(), + assistant_aggregator, + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + audio_in_sample_rate=config.audio.sample_rate_hz, + audio_out_sample_rate=config.audio.sample_rate_hz, + enable_metrics=True, + enable_usage_metrics=True, + enable_heartbeats=True, + ), + idle_timeout_secs=config.session.inactivity_timeout_sec, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(_transport, _client): + logger.info(f"{client_label} websocket client connected") + if config.agent.greeting_mode == "fixed" and config.agent.greeting: + await task.queue_frames([TTSSpeakFrame(config.agent.greeting)]) + elif config.agent.greeting_mode == "generated": + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(_transport, _client): + logger.info(f"{client_label} websocket client disconnected") + await task.cancel() + + @transport.event_handler("on_session_timeout") + async def on_session_timeout(_transport, _client): + logger.info(f"{client_label} websocket session timed out") + await task.cancel() + + @user_aggregator.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(_aggregator, _strategy, message: UserTurnStoppedMessage): + logger.info(f"User: {message.content}") + text = (message.content or "").strip() + if not text: + return + await task.queue_frame( + OutputTransportMessageUrgentFrame( + message={ + "type": "input.transcript.final", + "text": text, + "user_id": message.user_id, + "timestamp": message.timestamp, + } + ) + ) + + # NOTE: assistant turn started/final events are emitted by + # ProductTextStreamProcessor, upstream of TTS, so text streams to the + # client ahead of audio. This logger is kept for server-side visibility. + @assistant_aggregator.event_handler("on_assistant_turn_stopped") + async def on_assistant_turn_stopped(_aggregator, message: AssistantTurnStoppedMessage): + logger.info(f"Assistant: {message.content}") + + runner = PipelineRunner(handle_sigint=False) + await runner.run(task) diff --git a/engine/product_protocol.py b/engine/product_protocol.py new file mode 100644 index 0000000..6f961ac --- /dev/null +++ b/engine/product_protocol.py @@ -0,0 +1,164 @@ +from __future__ import annotations + +import base64 +import json +from typing import Any + +from loguru import logger + +from pipecat.frames.frames import ( + CancelFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + EndFrame, + Frame, + InputAudioRawFrame, + InputTransportMessageFrame, + OutputAudioRawFrame, + OutputTransportMessageFrame, + OutputTransportMessageUrgentFrame, + TextFrame, + TranscriptionFrame, +) +from pipecat.serializers.base_serializer import FrameSerializer + + +class ProductWebsocketSerializer(FrameSerializer): + """Stable app-facing JSON/base64 protocol adapter for Pipecat websocket transport.""" + + protocol = "va.ws.v1" + + def __init__(self, *, sample_rate: int, channels: int): + super().__init__() + self._sample_rate = sample_rate + self._channels = channels + self._sequence = 0 + + async def serialize(self, frame: Frame) -> str | bytes | None: + if isinstance(frame, OutputAudioRawFrame): + return self._event( + "response.audio.delta", + audio=base64.b64encode(frame.audio).decode("ascii"), + bytes=len(frame.audio), + sample_rate=frame.sample_rate, + channels=frame.num_channels, + ) + + if isinstance(frame, BotStartedSpeakingFrame): + return self._event("response.audio.started") + + if isinstance(frame, BotStoppedSpeakingFrame): + return self._event("response.audio.stopped") + + if isinstance(frame, TranscriptionFrame): + return self._event( + "input.transcript.final", + text=frame.text, + user_id=frame.user_id, + timestamp=frame.timestamp, + ) + + if isinstance(frame, TextFrame): + return self._event("response.text.delta", text=frame.text) + + if isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)): + if self.should_ignore_frame(frame): + return None + message = frame.message + # Allow callers to emit any named protocol event by pushing a + # transport-message frame whose payload already carries a `type`. + # The payload's other fields are merged alongside `type`, so e.g. + # `{"type": "response.text.final", "text": "..."}` is sent verbatim. + if isinstance(message, dict) and isinstance(message.get("type"), str): + event_type = message["type"] + payload = {k: v for k, v in message.items() if k != "type"} + return self._event(event_type, **payload) + return self._event("transport.message", message=message) + + return None + + async def deserialize(self, data: str | bytes) -> Frame | None: + if isinstance(data, bytes): + return InputAudioRawFrame( + audio=data, + sample_rate=self._sample_rate, + num_channels=self._channels, + ) + + try: + message = json.loads(data) + except json.JSONDecodeError as exc: + logger.warning(f"Invalid product websocket JSON: {exc}") + return None + + if not isinstance(message, dict): + logger.warning("Product websocket message must be a JSON object") + return None + + message_type = message.get("type") + if message_type == "session.start": + return InputTransportMessageFrame( + message={ + "type": "session.started", + "protocol": self.protocol, + "audio": { + "encoding": "pcm_s16le", + "sample_rate": self._sample_rate, + "channels": self._channels, + }, + } + ) + + if message_type == "session.stop": + return EndFrame() + + if message_type == "response.cancel": + return CancelFrame(reason="client_cancelled") + + if message_type == "input.audio": + audio = message.get("audio") or message.get("data") + if not isinstance(audio, str): + logger.warning("input.audio requires base64 'audio' or 'data'") + return None + try: + pcm = base64.b64decode(audio) + except ValueError as exc: + logger.warning(f"Invalid input.audio base64: {exc}") + return None + return InputAudioRawFrame( + audio=pcm, + sample_rate=int(message.get("sample_rate") or self._sample_rate), + num_channels=int(message.get("channels") or self._channels), + ) + + if message_type == "input.text": + text = message.get("text") + if not isinstance(text, str) or not text.strip(): + logger.warning("input.text requires non-empty 'text'") + return None + return InputTransportMessageFrame( + message={ + "type": "input.text", + "text": text, + "interrupt": bool(message.get("interrupt", True)), + } + ) + + if message_type == "transport.message": + payload = message.get("message") + return InputTransportMessageFrame(message=payload if isinstance(payload, dict) else message) + + logger.warning(f"Unsupported product websocket message type: {message_type!r}") + return None + + def _event(self, event_type: str, **payload: Any) -> str: + self._sequence += 1 + return json.dumps( + { + "type": event_type, + "protocol": self.protocol, + "seq": self._sequence, + **payload, + }, + ensure_ascii=False, + ) diff --git a/engine/services.py b/engine/services.py new file mode 100644 index 0000000..0fab66d --- /dev/null +++ b/engine/services.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +from collections.abc import AsyncGenerator + +from openai import BadRequestError +from openai import NOT_GIVEN + +from pipecat.frames.frames import ErrorFrame, Frame, TTSAudioRawFrame +from pipecat.services.openai._constants import OPENAI_SAMPLE_RATE +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.services.openai.stt import OpenAISTTService +from pipecat.services.openai.tts import VALID_VOICES, OpenAITTSService +from pipecat.transcriptions.language import Language + +from .config import AudioConfig, LLMConfig, STTConfig, TTSConfig + + +def create_stt_service(config: STTConfig): + _require_provider(config.provider, "openai", "stt") + return OpenAISTTService( + api_key=config.api_key or None, + base_url=config.base_url, + settings=OpenAISTTService.Settings( + model=config.model, + language=_language(config.language), + ), + ) + + +def create_llm_service(config: LLMConfig): + _require_provider(config.provider, "openai", "llm") + return OpenAILLMService( + api_key=config.api_key or None, + base_url=config.base_url, + settings=OpenAILLMService.Settings( + model=config.model, + temperature=config.temperature if config.temperature is not None else NOT_GIVEN, + ), + ) + + +def create_tts_service(config: TTSConfig, audio: AudioConfig): + _require_provider(config.provider, "openai", "tts") + service_class = OpenAITTSService if config.voice in VALID_VOICES else OpenAICompatibleTTSService + return service_class( + api_key=config.api_key or None, + base_url=config.base_url, + sample_rate=audio.sample_rate_hz, + source_sample_rate=config.source_sample_rate_hz, + settings=OpenAITTSService.Settings( + model=config.model, + voice=config.voice, + ), + ) + + +class OpenAICompatibleTTSService(OpenAITTSService): + """OpenAI-compatible TTS service that permits provider-specific voice ids.""" + + def __init__(self, *, source_sample_rate: int | None = None, **kwargs): + super().__init__(**kwargs) + self._source_sample_rate = source_sample_rate or OPENAI_SAMPLE_RATE + + async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame, None]: + voice = self._settings.voice + if not voice: + yield ErrorFrame(error="TTS voice must be specified") + return + + try: + create_params = { + "input": text, + "model": self._settings.model, + "voice": voice, + "response_format": "pcm", + } + + if self._settings.instructions: + create_params["instructions"] = self._settings.instructions + + if self._settings.speed: + create_params["speed"] = self._settings.speed + + async with self._client.audio.speech.with_streaming_response.create( + **create_params + ) as response: + if response.status_code != 200: + error = await response.text() + yield ErrorFrame( + error=f"TTS request failed (status: {response.status_code}, error: {error})" + ) + return + + await self.start_tts_usage_metrics(text) + + async def audio_chunks(): + async for chunk in response.iter_bytes(self.chunk_size): + if chunk: + yield chunk + + first_frame = True + async for frame in self._stream_audio_frames_from_iterator( + audio_chunks(), + in_sample_rate=self._source_sample_rate, + context_id=context_id, + ): + if first_frame: + await self.stop_ttfb_metrics() + first_frame = False + yield frame + except BadRequestError as exc: + yield ErrorFrame(error=f"TTS request failed: {exc}") + except Exception as exc: + yield ErrorFrame(error=f"TTS request failed: {exc}") + + +def _require_provider(actual: str, expected: str, service_name: str) -> None: + if actual != expected: + raise ValueError(f"Unsupported {service_name} provider {actual!r}; expected {expected!r}") + + +def _language(value: str | None) -> Language | None: + if value is None: + return None + normalized = value.replace("-", "_").upper() + return getattr(Language, normalized, value) diff --git a/engine/text_input.py b/engine/text_input.py new file mode 100644 index 0000000..c205468 --- /dev/null +++ b/engine/text_input.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from loguru import logger + +from pipecat.frames.frames import Frame, InputTransportMessageFrame, LLMMessagesAppendFrame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class ProductTextInputProcessor(FrameProcessor): + """Converts product text-input transport messages into LLM turns.""" + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if not isinstance(frame, InputTransportMessageFrame): + await self.push_frame(frame, direction) + return + + message = frame.message + if not isinstance(message, dict) or message.get("type") != "input.text": + await self.push_frame(frame, direction) + return + + text = str(message.get("text") or "").strip() + if not text: + return + + if message.get("interrupt", True): + logger.info("Text input interrupting current response") + await self.broadcast_interruption() + + await self.push_frame( + LLMMessagesAppendFrame( + messages=[{"role": "user", "content": text}], + run_llm=True, + ), + FrameDirection.DOWNSTREAM, + ) + diff --git a/engine/text_stream.py b/engine/text_stream.py new file mode 100644 index 0000000..e2e997e --- /dev/null +++ b/engine/text_stream.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from pipecat.frames.frames import ( + Frame, + InterruptionFrame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, + LLMTextFrame, + OutputTransportMessageUrgentFrame, + TTSSpeakFrame, +) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class ProductTextStreamProcessor(FrameProcessor): + """Mirrors LLM text frames as streaming protocol events. + + Placed between the LLM service and the TTS service, this processor + observes the LLM's text frames as they're emitted and forwards them + downstream as ``OutputTransportMessageUrgentFrame``s that the product + serializer turns into ``response.text.{started,delta,final}`` events. + + Because the events are emitted before the TTS holds onto + ``LLMFullResponseEndFrame`` to drain its audio queue, text reaches the + client well ahead of (or at worst, alongside) the synthesized audio. + + ``TTSSpeakFrame`` (used by the fixed-greeting code path, which bypasses + the LLM entirely) is also handled: the processor synthesizes a single + started/delta/final sequence for its fixed text. + """ + + def __init__(self) -> None: + super().__init__() + self._aggregation: list[str] = [] + self._turn_active = False + + async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: + await super().process_frame(frame, direction) + + if isinstance(frame, LLMFullResponseStartFrame): + await self._start_turn() + elif isinstance(frame, LLMTextFrame): + if frame.text: + await self._delta(frame.text) + elif isinstance(frame, LLMFullResponseEndFrame): + await self._end_turn(interrupted=False) + elif isinstance(frame, InterruptionFrame): + await self._end_turn(interrupted=True) + elif isinstance(frame, TTSSpeakFrame): + # Fixed-text / direct-speech path: there's no LLM cycle, so + # synthesize one started/delta/final sequence for the spoken text. + text = frame.text or "" + await self._start_turn() + if text: + await self._delta(text) + await self._end_turn(interrupted=False) + + await self.push_frame(frame, direction) + + async def _start_turn(self) -> None: + if self._turn_active: + return + self._turn_active = True + self._aggregation = [] + await self._emit("response.text.started") + + async def _delta(self, text: str) -> None: + if not self._turn_active: + # A text frame outside a turn shouldn't happen, but if it does, + # synthesize a started boundary so the client renders sensibly. + await self._start_turn() + self._aggregation.append(text) + await self._emit("response.text.delta", text=text) + + async def _end_turn(self, *, interrupted: bool) -> None: + if not self._turn_active: + return + full_text = "".join(self._aggregation) + self._turn_active = False + self._aggregation = [] + await self._emit( + "response.text.final", + text=full_text, + interrupted=interrupted, + ) + + async def _emit(self, event_type: str, **payload: object) -> None: + await self.push_frame( + OutputTransportMessageUrgentFrame( + message={"type": event_type, **payload}, + ), + FrameDirection.DOWNSTREAM, + ) diff --git a/examples/webpage/README.md b/examples/webpage/README.md new file mode 100644 index 0000000..8bcc835 --- /dev/null +++ b/examples/webpage/README.md @@ -0,0 +1,83 @@ +# Webpage Example — Realtime Voice Chat + +A self-contained browser client for the engine's product websocket +(`/ws-product`, protocol `va.ws.v1`). + +## Features + +- **Connect / Disconnect** to any `ws://` or `wss://` URL. +- **Mic on/off toggle** — getUserMedia is requested with + `echoCancellation`, `noiseSuppression`, and `autoGainControl` so the + browser handles AEC against the bot's voice. +- **Text composer** — type a message and press Enter to send + an `input.text` event (Shift+Enter for newline). Sending interrupts + any in-flight bot audio so the next reply is heard cleanly. +- **Chat history** rendered from `input.transcript.final` (you, when + spoken), streamed `response.text.delta` / `response.text.final` + (assistant — deltas arrive ahead of the synthesized audio), and locally + for text you submit (the engine doesn't echo text input back as a + transcript). +- **Gapless TTS playback** by scheduling each `response.audio.delta` + chunk back-to-back on the AudioContext. +- **Live VU meter** + mic and bot activity indicators. +- **Clear** button to reset history. + +No build step, no dependencies — just three files plus an AudioWorklet. + +## Layout + +```text +examples/webpage/ +├── index.html +├── styles.css +├── app.js +└── pcm-recorder.worklet.js +``` + +## Run + +1. Start the engine (default port `8001`): + + ```bash + cd AI-VideoAssistant-engine-v5-pipecat-minimal + source .venv/bin/activate + export OPENAI_API_KEY=... + uvicorn engine.main:app --host 127.0.0.1 --port 8001 + ``` + +2. In another terminal, serve the page from a port that's on the + engine's CORS allow-list (see `config.json`). The default config + allows `http://localhost:8080`: + + ```bash + cd AI-VideoAssistant-engine-v5-pipecat-minimal/examples/webpage + python -m http.server 8080 + ``` + +3. Open in Chrome, Edge, or Safari. + - Click **Connect** (uses `ws://127.0.0.1:8001/ws-product` by default). + - Click **Enable mic** and start speaking. The browser will prompt + for microphone access on first use. + +> The browser's mic API requires a secure context. `http://localhost` +> qualifies; if you serve from another host, use HTTPS and a `wss://` +> URL. + +## Audio details + +- Input: mono Float32 from `getUserMedia` is resampled in the + AudioWorklet to PCM16 mono @ 16 kHz, framed into 20 ms chunks, and + sent as **binary** websocket messages (the server accepts either + binary or the JSON+base64 form). +- Output: each `response.audio.delta` carries base64-encoded PCM16 @ + 16 kHz; chunks are decoded and scheduled back-to-back through Web + Audio. The browser handles resampling to the device rate. + +## Notes + +- Use headphones if you still hear echo despite browser AEC; the bot's + voice leaking back into the open mic is the most common cause of + feedback loops. +- The engine's session has an inactivity timeout + (`session.inactivity_timeout_sec` in `config.json`). If the bot + doesn't respond after a long silence, reconnect. diff --git a/examples/webpage/app.js b/examples/webpage/app.js new file mode 100644 index 0000000..3c63592 --- /dev/null +++ b/examples/webpage/app.js @@ -0,0 +1,614 @@ +/** + * Minimal browser client for the AI VideoAssistant engine's product + * websocket (`/ws-product`, protocol `va.ws.v1`). + * + * Responsibilities: + * - Open/close the websocket and run the session handshake. + * - Capture mic audio with echoCancellation, noiseSuppression, autoGainControl. + * - Downsample to PCM16 mono @ 16 kHz in an AudioWorklet and stream frames + * as binary websocket messages. + * - Play `response.audio.delta` frames gaplessly through Web Audio. + * - Render a chat-style history of user transcripts and bot text deltas. + */ + +const SAMPLE_RATE = 16000; +const CHANNELS = 1; +const FRAME_MS = 20; +const PROTOCOL = "va.ws.v1"; + +const els = { + url: document.getElementById("ws-url"), + connectBtn: document.getElementById("connect-btn"), + statusDot: document.getElementById("status-dot"), + statusText: document.getElementById("status-text"), + chatLog: document.getElementById("chat-log"), + micBtn: document.getElementById("mic-btn"), + micLabel: document.querySelector(".mic-btn__label"), + micIndicator: document.getElementById("mic-indicator"), + botIndicator: document.getElementById("bot-indicator"), + clearBtn: document.getElementById("clear-btn"), + meterFill: document.getElementById("meter-fill"), + composer: document.getElementById("composer"), + textInput: document.getElementById("text-input"), + sendBtn: document.getElementById("send-btn"), +}; + +const state = { + ws: null, + connected: false, + connecting: false, + + audioContext: null, + micStream: null, + micSourceNode: null, + recorderNode: null, + + micEnabled: false, + + // Output scheduling. + nextPlaybackTime: 0, + playbackEndsAt: 0, + scheduledSources: [], + botActive: false, + botUiTimer: null, + + // Chat state. + currentAssistantBubble: null, + + // VU meter smoothing. + meterLevel: 0, +}; + +/* ------------------------------------------------------------------ UI */ + +function setStatus(kind, text) { + els.statusDot.className = `status__dot status__dot--${kind}`; + els.statusText.textContent = text; +} + +function setConnectButton() { + if (state.connecting) { + els.connectBtn.textContent = "Connecting…"; + els.connectBtn.disabled = true; + els.connectBtn.classList.remove("is-disconnect"); + } else if (state.connected) { + els.connectBtn.textContent = "Disconnect"; + els.connectBtn.disabled = false; + els.connectBtn.classList.add("is-disconnect"); + } else { + els.connectBtn.textContent = "Connect"; + els.connectBtn.disabled = false; + els.connectBtn.classList.remove("is-disconnect"); + } +} + +function setMicButton() { + els.micBtn.disabled = !state.connected; + els.micBtn.setAttribute("aria-pressed", state.micEnabled ? "true" : "false"); + els.micBtn.title = state.micEnabled ? "Mute mic" : "Unmute mic"; + els.micLabel.textContent = state.micEnabled ? "Mute mic" : "Enable mic"; + els.micIndicator.classList.toggle("is-active", state.micEnabled); +} + +function setComposerEnabled(enabled) { + els.textInput.disabled = !enabled; + els.sendBtn.disabled = !enabled || els.textInput.value.trim().length === 0; +} + +function setBotIndicator(active) { + els.botIndicator.classList.toggle("is-active", active); +} + +function addBubble(role, text) { + if (els.chatLog.querySelector(".chat__empty")) { + els.chatLog.innerHTML = ""; + } + const bubble = document.createElement("div"); + bubble.className = `bubble bubble--${role}`; + if (role !== "system") { + const tag = document.createElement("span"); + tag.className = "bubble__role"; + tag.textContent = role === "user" ? "You" : "Assistant"; + bubble.appendChild(tag); + } + const body = document.createElement("span"); + body.className = "bubble__text"; + body.textContent = text; + bubble.appendChild(body); + els.chatLog.appendChild(bubble); + scrollChatToBottom(); + return bubble; +} + +function appendToBubble(bubble, text) { + const body = bubble.querySelector(".bubble__text"); + body.textContent += text; + scrollChatToBottom(); +} + +function scrollChatToBottom() { + els.chatLog.scrollTop = els.chatLog.scrollHeight; +} + +function clearChat() { + els.chatLog.innerHTML = ""; + state.currentAssistantBubble = null; + const empty = document.createElement("div"); + empty.className = "chat__empty"; + empty.innerHTML = "

Chat cleared.

"; + els.chatLog.appendChild(empty); +} + +/* ---------------------------------------------------------------- Audio */ + +async function ensureAudioContext() { + if (!state.audioContext) { + const Ctx = window.AudioContext || window.webkitAudioContext; + state.audioContext = new Ctx(); + await state.audioContext.audioWorklet.addModule("./pcm-recorder.worklet.js"); + } + if (state.audioContext.state === "suspended") { + await state.audioContext.resume(); + } + return state.audioContext; +} + +async function startMic() { + const ctx = await ensureAudioContext(); + + state.micStream = await navigator.mediaDevices.getUserMedia({ + audio: { + echoCancellation: true, + noiseSuppression: true, + autoGainControl: true, + channelCount: 1, + }, + video: false, + }); + + state.micSourceNode = ctx.createMediaStreamSource(state.micStream); + state.recorderNode = new AudioWorkletNode(ctx, "pcm-recorder", { + numberOfInputs: 1, + numberOfOutputs: 0, + channelCount: 1, + processorOptions: { + targetSampleRate: SAMPLE_RATE, + frameMs: FRAME_MS, + }, + }); + state.recorderNode.port.onmessage = (event) => { + const data = event.data; + if (!data || data.type !== "frame") return; + updateMeter(data.rms || 0); + if (state.connected && state.ws && state.ws.readyState === WebSocket.OPEN) { + state.ws.send(data.buffer); + } + }; + + state.micSourceNode.connect(state.recorderNode); + state.micEnabled = true; + setMicButton(); +} + +function stopMic() { + if (state.recorderNode) { + try { + state.recorderNode.port.onmessage = null; + state.recorderNode.disconnect(); + } catch (_) { + /* ignore */ + } + state.recorderNode = null; + } + if (state.micSourceNode) { + try { + state.micSourceNode.disconnect(); + } catch (_) { + /* ignore */ + } + state.micSourceNode = null; + } + if (state.micStream) { + for (const track of state.micStream.getTracks()) { + try { + track.stop(); + } catch (_) { + /* ignore */ + } + } + state.micStream = null; + } + state.micEnabled = false; + updateMeter(0); + setMicButton(); +} + +function updateMeter(rms) { + // Smooth and convert to a 0..100 width. RMS ~0.3+ is loud speech. + const target = Math.min(1, rms * 2.4); + state.meterLevel = state.meterLevel * 0.5 + target * 0.5; + els.meterFill.style.width = `${Math.round(state.meterLevel * 100)}%`; +} + +/* ---------------------------------------------------- Bot audio playback */ + +function schedulePlayback(int16) { + const ctx = state.audioContext; + if (!ctx) return; + + const float32 = new Float32Array(int16.length); + for (let i = 0; i < int16.length; i++) { + float32[i] = int16[i] / (int16[i] < 0 ? 0x8000 : 0x7fff); + } + const buffer = ctx.createBuffer(CHANNELS, float32.length, SAMPLE_RATE); + buffer.copyToChannel(float32, 0); + + const src = ctx.createBufferSource(); + src.buffer = buffer; + src.connect(ctx.destination); + + const now = ctx.currentTime; + // Schedule immediately after the previously scheduled chunk to keep + // playback contiguous, with a tiny safety margin if we fell behind. + const startAt = Math.max(now + 0.02, state.nextPlaybackTime); + src.start(startAt); + state.nextPlaybackTime = startAt + buffer.duration; + state.playbackEndsAt = state.nextPlaybackTime; + + src.onended = () => { + const idx = state.scheduledSources.indexOf(src); + if (idx >= 0) state.scheduledSources.splice(idx, 1); + }; + state.scheduledSources.push(src); + + setBotIndicator(true); + if (state.botUiTimer) clearTimeout(state.botUiTimer); + const msUntilEnd = Math.max(0, (state.playbackEndsAt - now) * 1000) + 120; + state.botUiTimer = setTimeout(() => { + if (state.audioContext && + state.audioContext.currentTime >= state.playbackEndsAt - 0.01) { + setBotIndicator(false); + } + }, msUntilEnd); +} + +function stopPlaybackQueue() { + for (const src of state.scheduledSources) { + try { + src.onended = null; + src.stop(); + src.disconnect(); + } catch (_) { + /* already stopped */ + } + } + state.scheduledSources = []; + resetPlaybackClock(); + if (state.botUiTimer) { + clearTimeout(state.botUiTimer); + state.botUiTimer = null; + } + setBotIndicator(false); +} + +function resetPlaybackClock() { + if (state.audioContext) { + state.nextPlaybackTime = state.audioContext.currentTime; + state.playbackEndsAt = state.audioContext.currentTime; + } +} + +/* --------------------------------------------------------- Chat updates */ + +function handleUserTranscript(text) { + if (!text) return; + state.currentAssistantBubble = null; + addBubble("user", text); +} + +function sendText(text) { + const value = (text || "").trim(); + if (!value) return false; + if (!state.ws || state.ws.readyState !== WebSocket.OPEN) return false; + + // The engine does not echo text input back as a transcript event, so we + // render the user bubble locally. Also interrupt any in-flight bot audio + // so the next reply is heard cleanly. We deliberately do NOT clear + // `currentAssistantBubble` here — the engine will emit a + // `response.text.final(interrupted=true)` for the in-flight assistant + // turn, which finalizes that bubble in place. A brand-new bubble for the + // reply will be created when `response.text.started` arrives. + state.ws.send( + JSON.stringify({ + type: "input.text", + text: value, + interrupt: true, + }), + ); + stopPlaybackQueue(); + addBubble("user", value); + return true; +} + +function handleAssistantDelta(text) { + if (!text) return; + if (!state.currentAssistantBubble) { + state.currentAssistantBubble = addBubble("assistant", ""); + } + appendToBubble(state.currentAssistantBubble, text); +} + +function handleAssistantStarted() { + state.currentAssistantBubble = null; +} + +function handleAssistantFinal(text, interrupted) { + if (!text) { + state.currentAssistantBubble = null; + return; + } + if (state.currentAssistantBubble) { + const body = state.currentAssistantBubble.querySelector(".bubble__text"); + body.textContent = text; + } else { + state.currentAssistantBubble = addBubble("assistant", text); + } + if (interrupted) { + state.currentAssistantBubble.classList.add("bubble--interrupted"); + } + state.currentAssistantBubble = null; + scrollChatToBottom(); +} + +function finalizeAssistantBubble() { + state.currentAssistantBubble = null; +} + +/* ---------------------------------------------------------- Websocket IO */ + +function decodeBase64ToInt16(b64) { + const binary = atob(b64); + const len = binary.length; + const bytes = new Uint8Array(len); + for (let i = 0; i < len; i++) bytes[i] = binary.charCodeAt(i); + return new Int16Array(bytes.buffer, bytes.byteOffset, bytes.byteLength / 2); +} + +function handleEvent(event) { + switch (event.type) { + case "response.audio.delta": + if (typeof event.audio === "string") { + schedulePlayback(decodeBase64ToInt16(event.audio)); + } + break; + case "response.audio.started": + setBotIndicator(true); + break; + case "response.audio.stopped": + finalizeAssistantBubble(); + // The indicator turns off automatically when the playback queue drains. + break; + case "response.text.delta": + handleAssistantDelta(event.text); + break; + case "response.text.started": + handleAssistantStarted(); + break; + case "response.text.final": + handleAssistantFinal(event.text, event.interrupted); + break; + case "input.transcript.final": + handleUserTranscript(event.text); + break; + case "transport.message": + // Reserved for future structured messages; ignore silently. + break; + default: + // Unknown event type: log for debugging. + console.debug("ws event", event); + } +} + +async function connect() { + if (state.connected || state.connecting) return; + const url = (els.url.value || "").trim(); + if (!url) { + setStatus("error", "Missing URL"); + return; + } + + state.connecting = true; + setStatus("connecting", "Connecting…"); + setConnectButton(); + + try { + // Pre-warm audio context on user gesture so playback works on Safari. + await ensureAudioContext(); + } catch (err) { + console.error("AudioContext failed", err); + state.connecting = false; + setStatus("error", "Audio init failed"); + setConnectButton(); + return; + } + + let ws; + try { + ws = new WebSocket(url); + } catch (err) { + console.error("WebSocket constructor failed", err); + state.connecting = false; + setStatus("error", "Bad URL"); + setConnectButton(); + return; + } + ws.binaryType = "arraybuffer"; + state.ws = ws; + + ws.addEventListener("open", () => { + state.connecting = false; + state.connected = true; + resetPlaybackClock(); + setStatus("connected", "Connected"); + setConnectButton(); + setMicButton(); + + ws.send( + JSON.stringify({ + type: "session.start", + protocol: PROTOCOL, + audio: { + encoding: "pcm_s16le", + sample_rate: SAMPLE_RATE, + channels: CHANNELS, + }, + }), + ); + addBubble("system", "Session started."); + setComposerEnabled(true); + els.textInput.focus(); + }); + + ws.addEventListener("message", (event) => { + const data = event.data; + if (typeof data === "string") { + let parsed; + try { + parsed = JSON.parse(data); + } catch (err) { + console.warn("Bad JSON from server", err, data); + return; + } + handleEvent(parsed); + } else if (data instanceof ArrayBuffer) { + // Server doesn't currently send binary, but handle it just in case. + schedulePlayback(new Int16Array(data)); + } + }); + + ws.addEventListener("error", (err) => { + console.error("WebSocket error", err); + setStatus("error", "Connection error"); + }); + + ws.addEventListener("close", (event) => { + const wasConnected = state.connected; + state.ws = null; + state.connected = false; + state.connecting = false; + if (state.micEnabled) stopMic(); + stopPlaybackQueue(); + setConnectButton(); + setMicButton(); + setComposerEnabled(false); + setBotIndicator(false); + if (wasConnected) { + addBubble( + "system", + `Session ended${event.reason ? ` — ${event.reason}` : ""}.`, + ); + setStatus("idle", "Disconnected"); + } else { + setStatus("error", "Connection closed"); + } + }); +} + +function disconnect() { + if (!state.ws) return; + try { + if (state.ws.readyState === WebSocket.OPEN) { + state.ws.send( + JSON.stringify({ type: "session.stop", reason: "client_disconnect" }), + ); + } + } catch (_) { + /* ignore */ + } + try { + state.ws.close(1000, "client_disconnect"); + } catch (_) { + /* ignore */ + } +} + +/* ---------------------------------------------------------------- Wiring */ + +els.connectBtn.addEventListener("click", () => { + if (state.connected) disconnect(); + else connect(); +}); + +els.micBtn.addEventListener("click", async () => { + if (!state.connected) return; + els.micBtn.disabled = true; + try { + if (state.micEnabled) { + stopMic(); + } else { + await startMic(); + } + } catch (err) { + console.error("Mic error", err); + addBubble("system", `Mic error: ${err.message || err}`); + } finally { + els.micBtn.disabled = !state.connected; + } +}); + +els.clearBtn.addEventListener("click", () => { + clearChat(); +}); + +function autosizeTextarea() { + const ta = els.textInput; + ta.style.height = "auto"; + ta.style.height = `${Math.min(ta.scrollHeight, 180)}px`; +} + +function submitText() { + const value = els.textInput.value; + if (!sendText(value)) return; + els.textInput.value = ""; + autosizeTextarea(); + setComposerEnabled(state.connected); +} + +els.composer.addEventListener("submit", (event) => { + event.preventDefault(); + submitText(); +}); + +els.textInput.addEventListener("input", () => { + autosizeTextarea(); + setComposerEnabled(state.connected); +}); + +els.textInput.addEventListener("keydown", (event) => { + if (event.key === "Enter" && !event.shiftKey && !event.isComposing) { + event.preventDefault(); + submitText(); + } +}); + +window.addEventListener("beforeunload", () => { + if (state.ws) { + try { + state.ws.close(); + } catch (_) { + /* ignore */ + } + } + if (state.audioContext) { + try { + state.audioContext.close(); + } catch (_) { + /* ignore */ + } + } +}); + +setStatus("idle", "Disconnected"); +setConnectButton(); +setMicButton(); +setComposerEnabled(false); diff --git a/examples/webpage/index.html b/examples/webpage/index.html new file mode 100644 index 0000000..29b06dc --- /dev/null +++ b/examples/webpage/index.html @@ -0,0 +1,129 @@ + + + + + + VA Voice Chat — /ws-product + + + +
+
+
+ +

VA Voice Chat

+
+ +
+ + +
+ +
+ + Disconnected +
+
+ +
+
+
+

Connect to the engine, enable your mic, and start talking.

+

+ Audio is streamed as PCM16 mono @ 16 kHz over + /ws-product. +

+
+
+
+ +
+ + +
+ + +
+ +
+ + +
+ + + Mic + + + + Bot + +
+ + +
+ +

+ Press Enter to send, Shift+Enter + for newline. Sending text will interrupt the bot if it's speaking. + Browser echo cancellation is on; use headphones if echo persists. +

+
+
+ + + + diff --git a/examples/webpage/pcm-recorder.worklet.js b/examples/webpage/pcm-recorder.worklet.js new file mode 100644 index 0000000..db501ae --- /dev/null +++ b/examples/webpage/pcm-recorder.worklet.js @@ -0,0 +1,104 @@ +/** + * PCM Recorder AudioWorklet. + * + * Captures mono Float32 mic samples at the AudioContext's native rate, + * resamples them to a target sample rate (default 16 kHz) with linear + * interpolation, then ships PCM16 frames of a fixed duration (default 20 ms) + * to the main thread via `port.postMessage(ArrayBuffer)`. + * + * It also computes a simple RMS level per frame for the UI VU meter so the + * main thread doesn't have to re-process the audio. + */ + +class PcmRecorderProcessor extends AudioWorkletProcessor { + constructor(options) { + super(); + + const opts = (options && options.processorOptions) || {}; + this._targetSampleRate = opts.targetSampleRate || 16000; + this._frameMs = opts.frameMs || 20; + this._frameSamples = Math.round( + (this._targetSampleRate * this._frameMs) / 1000, + ); + + // Resampling state. + // `ratio` is input samples per output sample. + this._ratio = sampleRate / this._targetSampleRate; + this._inputBuffer = new Float32Array(0); + // Float position in `_inputBuffer` for the next output sample. + this._inputOffset = 0; + + // Output framing state. + this._frameBuffer = new Int16Array(this._frameSamples); + this._frameIndex = 0; + + // VU meter accumulator. + this._rmsSumSquares = 0; + this._rmsCount = 0; + } + + process(inputs) { + const input = inputs[0]; + if (!input || input.length === 0) return true; + const channel = input[0]; + if (!channel || channel.length === 0) return true; + + // Append new samples to the input buffer. + const merged = new Float32Array(this._inputBuffer.length + channel.length); + merged.set(this._inputBuffer, 0); + merged.set(channel, this._inputBuffer.length); + this._inputBuffer = merged; + + const ratio = this._ratio; + const inLen = this._inputBuffer.length; + let pos = this._inputOffset; + + while (pos + 1 < inLen) { + const lo = Math.floor(pos); + const hi = lo + 1; + const w = pos - lo; + const sample = + this._inputBuffer[lo] * (1 - w) + this._inputBuffer[hi] * w; + + this._rmsSumSquares += sample * sample; + this._rmsCount += 1; + + let s = sample; + if (s > 1) s = 1; + else if (s < -1) s = -1; + this._frameBuffer[this._frameIndex++] = + s < 0 ? Math.round(s * 0x8000) : Math.round(s * 0x7fff); + + if (this._frameIndex === this._frameSamples) { + const frame = new Int16Array(this._frameSamples); + frame.set(this._frameBuffer); + const rms = + this._rmsCount > 0 + ? Math.sqrt(this._rmsSumSquares / this._rmsCount) + : 0; + this.port.postMessage( + { type: "frame", buffer: frame.buffer, rms }, + [frame.buffer], + ); + this._frameIndex = 0; + this._rmsSumSquares = 0; + this._rmsCount = 0; + } + + pos += ratio; + } + + // Trim consumed samples from the input buffer; keep at least the last + // sample we still need to interpolate against on the next call. + const consumed = Math.floor(pos); + if (consumed > 0) { + this._inputBuffer = this._inputBuffer.slice(consumed); + pos -= consumed; + } + this._inputOffset = pos; + + return true; + } +} + +registerProcessor("pcm-recorder", PcmRecorderProcessor); diff --git a/examples/webpage/styles.css b/examples/webpage/styles.css new file mode 100644 index 0000000..5cbfeda --- /dev/null +++ b/examples/webpage/styles.css @@ -0,0 +1,516 @@ +:root { + color-scheme: dark; + --bg: #0b0d12; + --bg-elevated: #131722; + --bg-soft: #1a2030; + --border: #232a3b; + --text: #e6e9f2; + --text-dim: #95a0bb; + --accent: #4f8cff; + --accent-strong: #6aa1ff; + --user: #2f7ff0; + --assistant: #2a3145; + --danger: #ff5577; + --success: #2dd28b; + --warning: #ffb84d; + --radius: 14px; + --shadow: 0 10px 30px rgba(0, 0, 0, 0.35); + font-family: ui-sans-serif, system-ui, -apple-system, "Segoe UI", Roboto, + "Helvetica Neue", Arial, sans-serif; +} + +* { + box-sizing: border-box; +} + +html, +body { + margin: 0; + padding: 0; + height: 100%; + background: radial-gradient( + 1200px 600px at 80% -10%, + rgba(79, 140, 255, 0.18), + transparent 60% + ), + radial-gradient( + 900px 500px at -10% 110%, + rgba(45, 210, 139, 0.12), + transparent 60% + ), + var(--bg); + color: var(--text); +} + +body { + display: flex; + justify-content: center; + padding: 24px; +} + +.app { + display: grid; + grid-template-rows: auto 1fr auto; + gap: 16px; + width: min(880px, 100%); + height: calc(100vh - 48px); + background: var(--bg-elevated); + border: 1px solid var(--border); + border-radius: 20px; + box-shadow: var(--shadow); + padding: 18px; +} + +/* Header ---------------------------------------------------------------- */ + +.app__header { + display: grid; + grid-template-columns: auto 1fr auto; + align-items: center; + gap: 16px; + padding-bottom: 14px; + border-bottom: 1px solid var(--border); +} + +.brand { + display: flex; + align-items: center; + gap: 10px; +} + +.brand h1 { + font-size: 16px; + margin: 0; + letter-spacing: 0.2px; +} + +.brand__dot { + width: 10px; + height: 10px; + border-radius: 50%; + background: var(--accent); + box-shadow: 0 0 0 4px rgba(79, 140, 255, 0.18); +} + +.connection { + display: flex; + align-items: end; + gap: 8px; + min-width: 0; +} + +.connection__field { + display: flex; + flex-direction: column; + gap: 4px; + min-width: 0; + flex: 1; +} + +.connection__field span { + font-size: 11px; + color: var(--text-dim); + text-transform: uppercase; + letter-spacing: 0.8px; +} + +.connection__field input { + background: var(--bg-soft); + color: var(--text); + border: 1px solid var(--border); + border-radius: 10px; + padding: 8px 10px; + font: inherit; + font-size: 13px; + outline: none; + min-width: 240px; +} + +.connection__field input:focus { + border-color: var(--accent); + box-shadow: 0 0 0 3px rgba(79, 140, 255, 0.18); +} + +.status { + display: flex; + align-items: center; + gap: 8px; + font-size: 13px; + color: var(--text-dim); + white-space: nowrap; +} + +.status__dot { + width: 9px; + height: 9px; + border-radius: 50%; + background: var(--text-dim); +} + +.status__dot--idle { + background: var(--text-dim); +} + +.status__dot--connecting { + background: var(--warning); + animation: pulse 1.2s ease-in-out infinite; +} + +.status__dot--connected { + background: var(--success); +} + +.status__dot--error { + background: var(--danger); +} + +/* Chat ------------------------------------------------------------------ */ + +.chat { + overflow: hidden; + background: var(--bg); + border: 1px solid var(--border); + border-radius: var(--radius); + display: flex; + flex-direction: column; +} + +.chat__log { + flex: 1; + overflow-y: auto; + padding: 18px; + display: flex; + flex-direction: column; + gap: 10px; + scroll-behavior: smooth; +} + +.chat__empty { + margin: auto; + text-align: center; + color: var(--text-dim); +} + +.chat__empty p { + margin: 4px 0; +} + +.chat__hint code { + background: var(--bg-soft); + border: 1px solid var(--border); + border-radius: 6px; + padding: 1px 6px; + font-size: 12px; +} + +.bubble { + max-width: 78%; + padding: 10px 14px; + border-radius: 14px; + line-height: 1.45; + font-size: 14px; + white-space: pre-wrap; + word-wrap: break-word; + animation: bubble-in 0.16s ease-out; +} + +.bubble--user { + background: var(--user); + color: #fff; + align-self: flex-end; + border-bottom-right-radius: 4px; +} + +.bubble--assistant { + background: var(--assistant); + color: var(--text); + align-self: flex-start; + border-bottom-left-radius: 4px; +} + +.bubble--assistant.bubble--interrupted { + opacity: 0.75; + border-left: 2px solid var(--warning); +} + +.bubble--system { + align-self: center; + background: transparent; + color: var(--text-dim); + font-size: 12px; + border: 1px dashed var(--border); + padding: 6px 10px; + border-radius: 999px; +} + +.bubble__role { + display: block; + font-size: 10px; + text-transform: uppercase; + letter-spacing: 0.8px; + opacity: 0.7; + margin-bottom: 4px; +} + +/* Controls -------------------------------------------------------------- */ + +.controls { + display: grid; + gap: 10px; + padding: 14px 16px 6px; + border-top: 1px solid var(--border); +} + +.meter { + height: 6px; + background: var(--bg-soft); + border-radius: 999px; + overflow: hidden; +} + +.meter__fill { + height: 100%; + width: 0%; + background: linear-gradient(90deg, var(--success), var(--accent)); + transition: width 80ms linear; +} + +.composer { + display: flex; + align-items: flex-end; + gap: 8px; +} + +.composer__input { + flex: 1; + resize: none; + background: var(--bg-soft); + color: var(--text); + border: 1px solid var(--border); + border-radius: 12px; + padding: 10px 12px; + font: inherit; + font-size: 14px; + line-height: 1.4; + outline: none; + min-height: 42px; + max-height: 180px; + overflow-y: auto; +} + +.composer__input:focus { + border-color: var(--accent); + box-shadow: 0 0 0 3px rgba(79, 140, 255, 0.18); +} + +.composer__input:disabled { + opacity: 0.55; + cursor: not-allowed; +} + +.composer__send { + padding: 10px 18px; + border-radius: 12px; + min-width: 84px; + align-self: flex-end; +} + +.composer__send:disabled { + opacity: 0.5; + cursor: not-allowed; +} + +.controls__row { + display: flex; + align-items: center; + gap: 14px; +} + +.mic-btn { + display: inline-flex; + align-items: center; + gap: 8px; + padding: 9px 14px; + border-radius: 999px; + background: var(--bg-soft); + color: var(--text); + border: 1px solid var(--border); + font: inherit; + font-weight: 600; + cursor: pointer; + transition: transform 0.08s ease, background 0.15s ease, color 0.15s ease, + border-color 0.15s ease; +} + +.mic-btn:disabled { + opacity: 0.5; + cursor: not-allowed; +} + +.mic-btn:not(:disabled):hover { + border-color: var(--accent); +} + +.mic-btn:not(:disabled):active { + transform: scale(0.98); +} + +.mic-btn[aria-pressed="true"] { + background: var(--danger); + border-color: var(--danger); + color: #fff; + box-shadow: 0 0 0 6px rgba(255, 85, 119, 0.18); +} + +.mic-btn__icon { + display: block; +} + +.indicators { + display: flex; + gap: 12px; + margin-left: auto; +} + +.indicator { + display: inline-flex; + align-items: center; + gap: 6px; + font-size: 12px; + color: var(--text-dim); +} + +.indicator__dot { + width: 8px; + height: 8px; + border-radius: 50%; + background: var(--bg-soft); + border: 1px solid var(--border); +} + +.indicator.is-active .indicator__dot--mic { + background: var(--success); + border-color: var(--success); + box-shadow: 0 0 0 4px rgba(45, 210, 139, 0.2); +} + +.indicator.is-active .indicator__dot--bot { + background: var(--accent); + border-color: var(--accent); + box-shadow: 0 0 0 4px rgba(79, 140, 255, 0.22); + animation: pulse 1s ease-in-out infinite; +} + +.indicator.is-active .indicator__label { + color: var(--text); +} + +.btn { + appearance: none; + border: 1px solid var(--border); + background: var(--bg-soft); + color: var(--text); + padding: 9px 14px; + border-radius: 10px; + font: inherit; + font-size: 13px; + cursor: pointer; + transition: background 0.15s ease, border-color 0.15s ease; +} + +.btn:hover { + border-color: var(--accent); +} + +.btn--primary { + background: var(--accent); + color: #fff; + border-color: var(--accent); +} + +.btn--primary:hover { + background: var(--accent-strong); + border-color: var(--accent-strong); +} + +.btn--primary.is-disconnect { + background: transparent; + color: var(--danger); + border-color: var(--danger); +} + +.btn--ghost { + background: transparent; +} + +.hint { + margin: 4px 2px 0; + font-size: 12px; + color: var(--text-dim); +} + +.hint kbd { + background: var(--bg-soft); + border: 1px solid var(--border); + border-bottom-width: 2px; + border-radius: 4px; + padding: 0 5px; + font-family: ui-monospace, SFMono-Regular, Menlo, monospace; + font-size: 11px; + color: var(--text); +} + +/* Animations ------------------------------------------------------------ */ + +@keyframes pulse { + 0%, + 100% { + opacity: 1; + } + 50% { + opacity: 0.55; + } +} + +@keyframes bubble-in { + from { + opacity: 0; + transform: translateY(4px); + } + to { + opacity: 1; + transform: translateY(0); + } +} + +/* Responsive ------------------------------------------------------------ */ + +@media (max-width: 720px) { + body { + padding: 0; + } + + .app { + height: 100vh; + border-radius: 0; + border: none; + padding: 12px; + } + + .app__header { + grid-template-columns: 1fr; + } + + .connection { + flex-direction: column; + align-items: stretch; + } + + .status { + justify-content: flex-end; + } + + .indicators { + margin-left: 0; + } +} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ab1b369 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +fastapi>=0.115.6,<1 +uvicorn[standard]>=0.32.0,<1 +-e ../pipecat[websocket,openai,silero] +