Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p-webrtc/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ WORKDIR /app

# Install system dependencies required for OpenCV
RUN apt-get update && apt-get install -y \
libgl1-mesa-glx \
libgl1 \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*

Expand Down
65 changes: 37 additions & 28 deletions p2p-webrtc/docker/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,21 @@ <h1>WebRTC Voice Agent</h1>
});
}

const waitForIceGatheringComplete = async (pc, timeoutMs = 2000) => {
if (pc.iceGatheringState === 'complete') return;
console.log("Waiting for ICE gathering to complete. Current state:", pc.iceGatheringState);
return new Promise((resolve) => {
let timeoutId;
const checkState = () => {
console.log("icegatheringstatechange:", pc.iceGatheringState);
if (pc.iceGatheringState === 'complete') {
cleanup();
resolve();
}
};
const onTimeout = () => {
console.warn(`ICE gathering timed out after ${timeoutMs} ms.`);
cleanup();
resolve();
};
const cleanup = () => {
pc.removeEventListener('icegatheringstatechange', checkState);
clearTimeout(timeoutId);
};
pc.addEventListener('icegatheringstatechange', checkState);
timeoutId = setTimeout(onTimeout, timeoutMs);
// Checking the state again to avoid any eventual race condition
checkState();
const sendIceCandidate = async (pc, candidate) => {
await fetch('/api/offer', {
method: "PATCH",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
pc_id: pc.pc_id,
candidates:[{
candidate: candidate.candidate,
sdp_mid: candidate.sdpMid,
sdp_mline_index: candidate.sdpMLineIndex
}]
})
});
};


const createSmallWebRTCConnection = async (audioTrack) => {
// TODO: need to configure a TURN server if using inside Docker on Mac.
const config = {
Expand All @@ -80,21 +66,36 @@ <h1>WebRTC Voice Agent</h1>
]
};
const pc = new RTCPeerConnection(config)

// Queue to store ICE candidates until we have received the answer and have a session in progress
pc.pendingIceCandidates = []
pc.canSendIceCandidates = false

addPeerConnectionEventListeners(pc)
pc.ontrack = e => audioEl.srcObject = e.streams[0]
// SmallWebRTCTransport expects to receive both transceivers
pc.addTransceiver(audioTrack, { direction: 'sendrecv' })
pc.addTransceiver('video', { direction: 'sendrecv' })
await pc.setLocalDescription(await pc.createOffer())
await waitForIceGatheringComplete(pc)
const offer = pc.localDescription
const response = await fetch('/api/offer', {
body: JSON.stringify({ sdp: offer.sdp, type: offer.type}),
headers: { 'Content-Type': 'application/json' },
method: 'POST',
});
const answer = await response.json()
pc.pc_id = answer.pc_id
await pc.setRemoteDescription(answer)

// Now we can send ICE candidates
pc.canSendIceCandidates = true

// Send any queued ICE candidates
for (const candidate of pc.pendingIceCandidates) {
await sendIceCandidate(pc, candidate)
}
pc.pendingIceCandidates = []

return pc
}

Expand All @@ -117,9 +118,17 @@ <h1>WebRTC Voice Agent</h1>
_onDisconnected()
}
}
pc.onicecandidate = (event) => {
pc.onicecandidate = async (event) => {
if (event.candidate) {
console.log("New ICE candidate:", event.candidate);
// Check if we can send ICE candidates (we have received the answer with pc_id)
if (pc.canSendIceCandidates && pc.pc_id) {
// Send immediately
await sendIceCandidate(pc, event.candidate)
} else {
// Queue the candidate until we have pc_id
pc.pendingIceCandidates.push(event.candidate)
}
} else {
console.log("All ICE candidates have been sent.");
}
Expand Down
62 changes: 25 additions & 37 deletions p2p-webrtc/docker/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,51 @@
#

import argparse
import asyncio
import sys
from contextlib import asynccontextmanager
from typing import Dict

import uvicorn
from bot import run_bot
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi import BackgroundTasks, FastAPI, Request
from fastapi.responses import FileResponse
from loguru import logger
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCPatchRequest,
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)

# Load environment variables
load_dotenv(override=True)

app = FastAPI()

# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}

# TODO: need to configure a TURN server if using inside Docker on Mac.
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
# Initialize the SmallWebRTC request handler
small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is much cleaner using the nice encapsulation you've built into SmallWebRTCRequestHandler!



@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
pc_id = request.get("pc_id")

if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request["sdp"], type=request["type"], restart_pc=request.get("restart_pc", False)
)
else:
pipecat_connection = SmallWebRTCConnection(ice_servers)
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
async def offer(request: SmallWebRTCRequest, background_tasks: BackgroundTasks):
"""Handle WebRTC offer requests via SmallWebRTCRequestHandler."""

@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
# Prepare runner arguments with the callback to run your bot
async def webrtc_connection_callback(connection):
background_tasks.add_task(run_bot, connection)

background_tasks.add_task(run_bot, pipecat_connection)
# Delegate handling to SmallWebRTCRequestHandler
answer = await small_webrtc_handler.handle_web_request(
request=request,
webrtc_connection_callback=webrtc_connection_callback,
)
return answer

answer = pipecat_connection.get_answer()
# Updating the peer connection inside the map
pcs_map[answer["pc_id"]] = pipecat_connection

return answer
@app.patch("/api/offer")
async def ice_candidate(request: SmallWebRTCPatchRequest):
logger.debug(f"Received patch request: {request}")
await small_webrtc_handler.handle_patch_request(request)
return {"status": "success"}


@app.get("/")
Expand All @@ -70,9 +60,7 @@ async def serve_index():
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # Run app
coros = [pc.disconnect() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
await small_webrtc_handler.close()


if __name__ == "__main__":
Expand Down
4 changes: 1 addition & 3 deletions p2p-webrtc/pipecat-cloud/client/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ class WebRTCApp {

private initializePipecatClient(): void {
const opts: PipecatClientOptions = {
transport: new SmallWebRTCTransport({
waitForICEGathering: true,
}),
transport: new SmallWebRTCTransport(),
enableMic: true,
enableCam: true,
callbacks: {
Expand Down
73 changes: 43 additions & 30 deletions p2p-webrtc/voice-agent/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,55 +24,60 @@ <h1>WebRTC Voice Agent</h1>
let connected = false
let peerConnection = null

const waitForIceGatheringComplete = async (pc, timeoutMs = 2000) => {
if (pc.iceGatheringState === 'complete') return;
console.log("Waiting for ICE gathering to complete. Current state:", pc.iceGatheringState);
return new Promise((resolve) => {
let timeoutId;
const checkState = () => {
console.log("icegatheringstatechange:", pc.iceGatheringState);
if (pc.iceGatheringState === 'complete') {
cleanup();
resolve();
}
};
const onTimeout = () => {
console.warn(`ICE gathering timed out after ${timeoutMs} ms.`);
cleanup();
resolve();
};
const cleanup = () => {
pc.removeEventListener('icegatheringstatechange', checkState);
clearTimeout(timeoutId);
};
pc.addEventListener('icegatheringstatechange', checkState);
timeoutId = setTimeout(onTimeout, timeoutMs);
// Checking the state again to avoid any eventual race condition
checkState();
const sendIceCandidate = async (pc, candidate) => {
await fetch('/api/offer', {
method: "PATCH",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
pc_id: pc.pc_id,
candidates:[{
candidate: candidate.candidate,
sdp_mid: candidate.sdpMid,
sdp_mline_index: candidate.sdpMLineIndex
}]
})
});
};


const createSmallWebRTCConnection = async (audioTrack) => {
const config = {
iceServers: [],
iceServers:[
{
urls:"stun:stun.l.google.com:19302",
}
]
};
const pc = new RTCPeerConnection(config)

// Queue to store ICE candidates until we have received the answer and have a session in progress
pc.pendingIceCandidates = []
pc.canSendIceCandidates = false

addPeerConnectionEventListeners(pc)
pc.ontrack = e => audioEl.srcObject = e.streams[0]
// SmallWebRTCTransport expects to receive both transceivers
pc.addTransceiver(audioTrack, { direction: 'sendrecv' })
pc.addTransceiver('video', { direction: 'sendrecv' })
await pc.setLocalDescription(await pc.createOffer())
await waitForIceGatheringComplete(pc)
const offer = pc.localDescription
const response = await fetch('/api/offer', {
body: JSON.stringify({ sdp: offer.sdp, type: offer.type}),
headers: { 'Content-Type': 'application/json' },
method: 'POST',
});
const answer = await response.json()
pc.pc_id = answer.pc_id
await pc.setRemoteDescription(answer)

// Now we can send ICE candidates
pc.canSendIceCandidates = true

// Send any queued ICE candidates
for (const candidate of pc.pendingIceCandidates) {
await sendIceCandidate(pc, candidate)
}
pc.pendingIceCandidates = []

return pc
}

Expand All @@ -95,9 +100,17 @@ <h1>WebRTC Voice Agent</h1>
_onDisconnected()
}
}
pc.onicecandidate = (event) => {
pc.onicecandidate = async (event) => {
if (event.candidate) {
console.log("New ICE candidate:", event.candidate);
// Check if we can send ICE candidates (we have received the answer with pc_id)
if (pc.canSendIceCandidates && pc.pc_id) {
// Send immediately
await sendIceCandidate(pc, event.candidate)
} else {
// Queue the candidate until we have pc_id
pc.pendingIceCandidates.push(event.candidate)
}
} else {
console.log("All ICE candidates have been sent.");
}
Expand Down Expand Up @@ -140,4 +153,4 @@ <h1>WebRTC Voice Agent</h1>
});
</script>
</body>
</html>
</html>
Loading