// ==================================== // Voice Bot Implementation // A real-time voice chat system that converts speech to text, // processes it through AI (OpenAI/Botpress), and converts responses back to speech // ==================================== // Required Dependencies const net = require('net'); const { SpeechClient } = require('@google-cloud/speech'); const { TextToSpeechClient } = require('@google-cloud/text-to-speech'); const { Transform } = require('stream'); const { OpenAI } = require('openai'); const util = require('util'); const async = require('async'); const { encoding_for_model } = require('tiktoken'); const axios = require('axios'); const { toUUID } = require('./utils'); const packageInfo = require('./package.json'); const prices = require('./prices.json'); const { matchesSentenceEnding, removeSpecialCharacters, calculateMetricsAndPricing } = require('./utils'); // Load environment variables require('dotenv').config(); // ==================================== // Packet Protocol Definition // Defines the types of packets that can be exchanged between client and server // ==================================== const PACKET_TYPES = { 'TERMINATE': 0x0, // Signal to end the connection 'UUID': 0x1, // Client identification 'AUDIO': 0x10, // Audio data packet 'ERROR': 0xff // Error notification }; // ==================================== // Socket State Management // Tracks the state of the socket connection // ==================================== function isSocketActive(socket) { return socket && !socket.destroyed && socket.writable; } // ==================================== // Packet Handler // Processes incoming packets based on their type // ==================================== function handlePacket(socket, audioStream, packet) { const packetType = packet.readUInt8(0); const packetLength = packet.readUInt16BE(1); switch (packetType) { case PACKET_TYPES.TERMINATE: console.log('Terminate packet received. Initiating graceful shutdown.'); // Clean up streams and pending operations if (audioStream) { audioStream.end(); } // Set a flag to prevent new operations socket.isTerminating = true; // Give time for pending operations to complete setTimeout(() => { if (isSocketActive(socket)) { socket.end(); } }, 1000); break; case PACKET_TYPES.UUID: if (!isSocketActive(socket)) return; const uuid = toUUID(packet.slice(3, 19).toString('hex')); socket.uuid = uuid; console.log('UUID packet received: ' + uuid); break; case PACKET_TYPES.AUDIO: if (!isSocketActive(socket) || socket.isTerminating) return; const audioData = packet.slice(3, 3 + packetLength); if (audioStream && !audioStream.destroyed) { audioStream.write(audioData); } break; case PACKET_TYPES.ERROR: if (!isSocketActive(socket)) return; const errorCode = packetLength > 0 ? packet.readUInt8(3) : null; console.log('Error packet received with code: ' + errorCode); break; default: console.log('Unknown packet type: ' + packetType); } } // Safe write function to prevent write-after-end errors function safeSocketWrite(socket, data) { if (isSocketActive(socket) && !socket.isTerminating) { try { socket.write(data); } catch (err) { console.error('Error writing to socket:', err); } } } // ==================================== // Main Server Implementation // Creates and manages the TCP server that handles client connections // ==================================== const server = net.createServer(async socket => { console.log('Client connected'); // State variables let assistant = null; let messages = []; let totalCost = 0; let isProcessing = false; let isAssistantRunning = false; // Initialize AI clients const ttsClient = new TextToSpeechClient(); const speechClient = new SpeechClient(); // Initialize OpenAI if API key is provided let openai = null; if (process.env.OPENAI_API_KEY) { openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }); } // Initialize Botpress if webhook URL is provided let botpress = null; if (process.env.BOTPRESS_WEBHOOK_URL) { openai = null; // Disable OpenAI if using Botpress botpress = axios.create({ baseURL: BOTPRESS_WEBHOOK_URL }); } // Create OpenAI assistant if ID is provided if (openai && process.env.OPENAI_ASSISTANT_ID) { assistant = await openai.beta.assistants.create(); } // ==================================== // Message Handler // Tracks messages and calculates costs // ==================================== const handleMessage = (socket, role, content) => { const messageData = { uuid: socket.uuid || '', role: role, content: content, ...calculateMetricsAndPricing(role, content) }; // Update costs based on message role switch (role) { case 'system': totalCost += messageData.costByCharacter; break; case 'user': totalCost += messageData.costBySecond; totalCost += messageData.costByToken; console.log(messageData); break; case 'assistant': totalCost += messageData.costByCharacter; totalCost += messageData.costByToken; console.log(messageData); break; } messages.push(messageData); }; // ==================================== // Text-to-Speech Handler // Converts text responses to speech and streams audio back to client // ==================================== const ttsQueue = async.queue(async task => { const { message } = task; const request = { input: { text: message }, voice: { languageCode: process.env.TEXT_TO_SPEECH_LANGUAGE || 'en-US', ssmlGender: process.env.TEXT_TO_SPEECH_GENDER || 'FEMALE', name: process.env.TEXT_TO_SPEECH_NAME || 'en-US-Journey-F' }, audioConfig: { audioEncoding: 'LINEAR16', sampleRateHertz: 8000, speakingRate: 1 } }; try { const [response] = await ttsClient.synthesizeSpeech(request); handleMessage(socket, 'assistant', message); const audioContent = response.audioContent; const chunkSize = 320; // Stream audio in chunks for (let i = 0; i < audioContent.length; i += chunkSize) { if (!isProcessing) break; const chunk = audioContent.slice(i, i + chunkSize); const header = Buffer.alloc(3); header.writeUInt8(PACKET_TYPES.AUDIO, 0); header.writeUInt16BE(chunk.length, 1); const packet = Buffer.concat([header, chunk]); safeSocketWrite(socket, packet); await new Promise(resolve => setTimeout(resolve, 20)); } } catch (error) { console.error('Error synthesizing speech:', error); } }, 1); ttsQueue.drain(() => {}); // ==================================== // Speech-to-Text Setup // Configures and manages speech recognition // ==================================== /* Start addition to check audio level */ let audioSampleWindow = []; const WINDOW_SIZE = 10; // Number of chunks to analyze const AUDIO_THRESHOLD = 700; // Adjust this threshold based on testing /* End addition to check audio level */ const audioStream = new Transform({ transform(chunk, encoding, callback) { /* Start addition to check audio level */ // Calculate RMS (Root Mean Square) of the audio chunk let sum = 0; for (let i = 0; i < chunk.length; i += 2) { // Convert 2 bytes to a 16-bit integer const sample = chunk.readInt16LE(i); sum += sample * sample; } const rms = Math.sqrt(sum / (chunk.length / 2)); // Maintain a sliding window of audio levels audioSampleWindow.push(rms); if (audioSampleWindow.length > WINDOW_SIZE) { audioSampleWindow.shift(); } // Calculate average RMS over the window const avgRMS = audioSampleWindow.reduce((a, b) => a + b, 0) / audioSampleWindow.length; // Set isProcessing based on audio level if (avgRMS > AUDIO_THRESHOLD) { isProcessing = false; console.log('Audio level triggered:', avgRMS.toFixed(2)); } /* End addition to check audio level */ callback(null, chunk); } }); // Add system prompt to messages handleMessage( socket, 'system', process.env.SYSTEM_PROMPT || 'You are a helpful assistant.' ); // Configure speech recognition const recognitionConfig = { config: { encoding: 'LINEAR16', sampleRateHertz: 8000, languageCode: process.env.SPEECH_RECOGNITION_LANGUAGE || 'en-US', model: process.env.SPEECH_RECOGNITION_MODEL || 'phone_call', useEnhanced: true }, interimResults: false }; // Add alternative languages if specified if (process.env.SPEECH_RECOGNITION_ALTERNATIVE_LANGUAGES) { recognitionConfig.alternativeLanguageCodes = process.env.SPEECH_RECOGNITION_ALTERNATIVE_LANGUAGES.split(','); } // ==================================== // Speech Recognition Stream Handler // Processes speech recognition results and manages AI responses // ==================================== const recognizeStream = speechClient .streamingRecognize(recognitionConfig) .on('error', console.error) .on('data', async data => { try { if ( data.results[0]?.alternatives[0]?.transcript && data.results[0]?.alternatives[0]?.confidence > 0 ) { const transcript = data.results[0].alternatives[0].transcript.trim(); if (transcript) { let response; isProcessing = false; handleMessage(socket, 'user', transcript); if (openai) { if (process.env.OPENAI_ASSISTANT_ID) { // Wait if assistant is still processing while (isAssistantRunning) { console.log('Assistant is running...'); await new Promise(resolve => setTimeout(resolve, 1000)); } // Process with OpenAI Assistant await openai.beta.assistants.messages.create( assistant.id, { role: 'user', content: transcript } ); isAssistantRunning = true; response = await openai.beta.assistants.runs.create( assistant.id, { assistant_id: process.env.OPENAI_ASSISTANT_ID, stream: true } ); } else { // Process with standard OpenAI chat response = await openai.chat.completions.create({ model: process.env.OPENAI_MODEL || 'gpt-3.5-turbo', messages: messages, max_tokens: 150, stream: true }); } let currentResponse = ''; for await (const chunk of response) { let content = ''; if (process.env.OPENAI_ASSISTANT_ID) { if (Array.isArray(chunk.data.delta?.content) && chunk.data.delta.content[0]?.text) { content = chunk.data.delta.content[0].text.value; } } else { content = chunk.choices[0]?.delta?.content; } if (content) { currentResponse += content; currentResponse = removeSpecialCharacters(currentResponse); if (matchesSentenceEnding(currentResponse)) { isProcessing = true; ttsQueue.push({ message: currentResponse }); currentResponse = ''; if (!isProcessing) { ttsQueue.kill(); console.log('Stop streaming openai...'); break; } } } } isAssistantRunning = false; } } } } catch (error) { console.error(error); isAssistantRunning = false; } }); // Pipe audio stream to recognition stream audioStream.pipe(recognizeStream); // Handle incoming socket data socket.on('data', data => { handlePacket(socket, audioStream, data); }); // Clean up on socket close socket.on('close', () => { console.log('Connection closed'); console.log('Total cost:', totalCost.toFixed(4)); }); }); // Start the server const PORT = process.env.PORT || 3000; server.listen(PORT, () => { console.log(`Server v${packageInfo.version} listening on port ${PORT}`); });