405 lines
15 KiB
JavaScript
405 lines
15 KiB
JavaScript
// ====================================
|
|
// Voice Bot Implementation
|
|
// A real-time voice chat system that converts speech to text,
|
|
// processes it through AI (OpenAI/Botpress), and converts responses back to speech
|
|
// ====================================
|
|
|
|
// Required Dependencies
|
|
const net = require('net');
|
|
const { SpeechClient } = require('@google-cloud/speech');
|
|
const { TextToSpeechClient } = require('@google-cloud/text-to-speech');
|
|
const { Transform } = require('stream');
|
|
const { OpenAI } = require('openai');
|
|
const util = require('util');
|
|
const async = require('async');
|
|
const { encoding_for_model } = require('tiktoken');
|
|
const axios = require('axios');
|
|
const { toUUID } = require('./utils');
|
|
const packageInfo = require('./package.json');
|
|
const prices = require('./prices.json');
|
|
const {
|
|
matchesSentenceEnding,
|
|
removeSpecialCharacters,
|
|
calculateMetricsAndPricing
|
|
} = require('./utils');
|
|
|
|
// Load environment variables
|
|
require('dotenv').config();
|
|
|
|
// ====================================
|
|
// Packet Protocol Definition
|
|
// Defines the types of packets that can be exchanged between client and server
|
|
// ====================================
|
|
const PACKET_TYPES = {
|
|
'TERMINATE': 0x0, // Signal to end the connection
|
|
'UUID': 0x1, // Client identification
|
|
'AUDIO': 0x10, // Audio data packet
|
|
'ERROR': 0xff // Error notification
|
|
};
|
|
|
|
// ====================================
|
|
// Socket State Management
|
|
// Tracks the state of the socket connection
|
|
// ====================================
|
|
function isSocketActive(socket) {
|
|
return socket && !socket.destroyed && socket.writable;
|
|
}
|
|
|
|
// ====================================
|
|
// Packet Handler
|
|
// Processes incoming packets based on their type
|
|
// ====================================
|
|
function handlePacket(socket, audioStream, packet) {
|
|
const packetType = packet.readUInt8(0);
|
|
const packetLength = packet.readUInt16BE(1);
|
|
|
|
switch (packetType) {
|
|
case PACKET_TYPES.TERMINATE:
|
|
console.log('Terminate packet received. Initiating graceful shutdown.');
|
|
// Clean up streams and pending operations
|
|
if (audioStream) {
|
|
audioStream.end();
|
|
}
|
|
|
|
// Set a flag to prevent new operations
|
|
socket.isTerminating = true;
|
|
|
|
// Give time for pending operations to complete
|
|
setTimeout(() => {
|
|
if (isSocketActive(socket)) {
|
|
socket.end();
|
|
}
|
|
}, 1000);
|
|
break;
|
|
|
|
case PACKET_TYPES.UUID:
|
|
if (!isSocketActive(socket)) return;
|
|
const uuid = toUUID(packet.slice(3, 19).toString('hex'));
|
|
socket.uuid = uuid;
|
|
console.log('UUID packet received: ' + uuid);
|
|
break;
|
|
|
|
case PACKET_TYPES.AUDIO:
|
|
if (!isSocketActive(socket) || socket.isTerminating) return;
|
|
const audioData = packet.slice(3, 3 + packetLength);
|
|
if (audioStream && !audioStream.destroyed) {
|
|
audioStream.write(audioData);
|
|
}
|
|
break;
|
|
|
|
case PACKET_TYPES.ERROR:
|
|
if (!isSocketActive(socket)) return;
|
|
const errorCode = packetLength > 0 ? packet.readUInt8(3) : null;
|
|
console.log('Error packet received with code: ' + errorCode);
|
|
break;
|
|
|
|
default:
|
|
console.log('Unknown packet type: ' + packetType);
|
|
}
|
|
}
|
|
|
|
// Safe write function to prevent write-after-end errors
|
|
function safeSocketWrite(socket, data) {
|
|
if (isSocketActive(socket) && !socket.isTerminating) {
|
|
try {
|
|
socket.write(data);
|
|
} catch (err) {
|
|
console.error('Error writing to socket:', err);
|
|
}
|
|
}
|
|
}
|
|
|
|
// ====================================
|
|
// Main Server Implementation
|
|
// Creates and manages the TCP server that handles client connections
|
|
// ====================================
|
|
const server = net.createServer(async socket => {
|
|
console.log('Client connected');
|
|
|
|
// State variables
|
|
let assistant = null;
|
|
let messages = [];
|
|
let totalCost = 0;
|
|
let isProcessing = false;
|
|
let isAssistantRunning = false;
|
|
|
|
// Initialize AI clients
|
|
const ttsClient = new TextToSpeechClient();
|
|
const speechClient = new SpeechClient();
|
|
|
|
// Initialize OpenAI if API key is provided
|
|
let openai = null;
|
|
if (process.env.OPENAI_API_KEY) {
|
|
openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
|
|
}
|
|
|
|
// Initialize Botpress if webhook URL is provided
|
|
let botpress = null;
|
|
if (process.env.BOTPRESS_WEBHOOK_URL) {
|
|
openai = null; // Disable OpenAI if using Botpress
|
|
botpress = axios.create({ baseURL: BOTPRESS_WEBHOOK_URL });
|
|
}
|
|
|
|
// Create OpenAI assistant if ID is provided
|
|
if (openai && process.env.OPENAI_ASSISTANT_ID) {
|
|
assistant = await openai.beta.assistants.create();
|
|
}
|
|
|
|
// ====================================
|
|
// Message Handler
|
|
// Tracks messages and calculates costs
|
|
// ====================================
|
|
const handleMessage = (socket, role, content) => {
|
|
const messageData = {
|
|
uuid: socket.uuid || '',
|
|
role: role,
|
|
content: content,
|
|
...calculateMetricsAndPricing(role, content)
|
|
};
|
|
|
|
// Update costs based on message role
|
|
switch (role) {
|
|
case 'system':
|
|
totalCost += messageData.costByCharacter;
|
|
break;
|
|
case 'user':
|
|
totalCost += messageData.costBySecond;
|
|
totalCost += messageData.costByToken;
|
|
console.log(messageData);
|
|
break;
|
|
case 'assistant':
|
|
totalCost += messageData.costByCharacter;
|
|
totalCost += messageData.costByToken;
|
|
console.log(messageData);
|
|
break;
|
|
}
|
|
|
|
messages.push(messageData);
|
|
};
|
|
|
|
// ====================================
|
|
// Text-to-Speech Handler
|
|
// Converts text responses to speech and streams audio back to client
|
|
// ====================================
|
|
const ttsQueue = async.queue(async task => {
|
|
const { message } = task;
|
|
const request = {
|
|
input: { text: message },
|
|
voice: {
|
|
languageCode: process.env.TEXT_TO_SPEECH_LANGUAGE || 'en-US',
|
|
ssmlGender: process.env.TEXT_TO_SPEECH_GENDER || 'FEMALE',
|
|
name: process.env.TEXT_TO_SPEECH_NAME || 'en-US-Journey-F'
|
|
},
|
|
audioConfig: {
|
|
audioEncoding: 'LINEAR16',
|
|
sampleRateHertz: 8000,
|
|
speakingRate: 1
|
|
}
|
|
};
|
|
|
|
try {
|
|
const [response] = await ttsClient.synthesizeSpeech(request);
|
|
handleMessage(socket, 'assistant', message);
|
|
|
|
const audioContent = response.audioContent;
|
|
const chunkSize = 320;
|
|
|
|
// Stream audio in chunks
|
|
for (let i = 0; i < audioContent.length; i += chunkSize) {
|
|
if (!isProcessing) break;
|
|
|
|
const chunk = audioContent.slice(i, i + chunkSize);
|
|
const header = Buffer.alloc(3);
|
|
header.writeUInt8(PACKET_TYPES.AUDIO, 0);
|
|
header.writeUInt16BE(chunk.length, 1);
|
|
|
|
const packet = Buffer.concat([header, chunk]);
|
|
safeSocketWrite(socket, packet);
|
|
await new Promise(resolve => setTimeout(resolve, 20));
|
|
}
|
|
} catch (error) {
|
|
console.error('Error synthesizing speech:', error);
|
|
}
|
|
}, 1);
|
|
|
|
ttsQueue.drain(() => {});
|
|
|
|
// ====================================
|
|
// Speech-to-Text Setup
|
|
// Configures and manages speech recognition
|
|
// ====================================
|
|
/* Start addition to check audio level */
|
|
let audioSampleWindow = [];
|
|
const WINDOW_SIZE = 10; // Number of chunks to analyze
|
|
const AUDIO_THRESHOLD = 700; // Adjust this threshold based on testing
|
|
/* End addition to check audio level */
|
|
|
|
const audioStream = new Transform({
|
|
transform(chunk, encoding, callback) {
|
|
/* Start addition to check audio level */
|
|
// Calculate RMS (Root Mean Square) of the audio chunk
|
|
let sum = 0;
|
|
for (let i = 0; i < chunk.length; i += 2) {
|
|
// Convert 2 bytes to a 16-bit integer
|
|
const sample = chunk.readInt16LE(i);
|
|
sum += sample * sample;
|
|
}
|
|
const rms = Math.sqrt(sum / (chunk.length / 2));
|
|
|
|
// Maintain a sliding window of audio levels
|
|
audioSampleWindow.push(rms);
|
|
if (audioSampleWindow.length > WINDOW_SIZE) {
|
|
audioSampleWindow.shift();
|
|
}
|
|
|
|
// Calculate average RMS over the window
|
|
const avgRMS = audioSampleWindow.reduce((a, b) => a + b, 0) / audioSampleWindow.length;
|
|
|
|
// Set isProcessing based on audio level
|
|
if (avgRMS > AUDIO_THRESHOLD) {
|
|
isProcessing = false;
|
|
console.log('Audio level triggered:', avgRMS.toFixed(2));
|
|
}
|
|
/* End addition to check audio level */
|
|
|
|
callback(null, chunk);
|
|
}
|
|
});
|
|
|
|
// Add system prompt to messages
|
|
handleMessage(
|
|
socket,
|
|
'system',
|
|
process.env.SYSTEM_PROMPT || 'You are a helpful assistant.'
|
|
);
|
|
|
|
// Configure speech recognition
|
|
const recognitionConfig = {
|
|
config: {
|
|
encoding: 'LINEAR16',
|
|
sampleRateHertz: 8000,
|
|
languageCode: process.env.SPEECH_RECOGNITION_LANGUAGE || 'en-US',
|
|
model: process.env.SPEECH_RECOGNITION_MODEL || 'phone_call',
|
|
useEnhanced: true
|
|
},
|
|
interimResults: false
|
|
};
|
|
|
|
// Add alternative languages if specified
|
|
if (process.env.SPEECH_RECOGNITION_ALTERNATIVE_LANGUAGES) {
|
|
recognitionConfig.alternativeLanguageCodes =
|
|
process.env.SPEECH_RECOGNITION_ALTERNATIVE_LANGUAGES.split(',');
|
|
}
|
|
|
|
// ====================================
|
|
// Speech Recognition Stream Handler
|
|
// Processes speech recognition results and manages AI responses
|
|
// ====================================
|
|
const recognizeStream = speechClient
|
|
.streamingRecognize(recognitionConfig)
|
|
.on('error', console.error)
|
|
.on('data', async data => {
|
|
try {
|
|
if (
|
|
data.results[0]?.alternatives[0]?.transcript &&
|
|
data.results[0]?.alternatives[0]?.confidence > 0
|
|
) {
|
|
const transcript = data.results[0].alternatives[0].transcript.trim();
|
|
|
|
if (transcript) {
|
|
let response;
|
|
isProcessing = false;
|
|
handleMessage(socket, 'user', transcript);
|
|
|
|
if (openai) {
|
|
if (process.env.OPENAI_ASSISTANT_ID) {
|
|
// Wait if assistant is still processing
|
|
while (isAssistantRunning) {
|
|
console.log('Assistant is running...');
|
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
}
|
|
|
|
// Process with OpenAI Assistant
|
|
await openai.beta.assistants.messages.create(
|
|
assistant.id,
|
|
{ role: 'user', content: transcript }
|
|
);
|
|
isAssistantRunning = true;
|
|
response = await openai.beta.assistants.runs.create(
|
|
assistant.id,
|
|
{
|
|
assistant_id: process.env.OPENAI_ASSISTANT_ID,
|
|
stream: true
|
|
}
|
|
);
|
|
} else {
|
|
// Process with standard OpenAI chat
|
|
response = await openai.chat.completions.create({
|
|
model: process.env.OPENAI_MODEL || 'gpt-3.5-turbo',
|
|
messages: messages,
|
|
max_tokens: 150,
|
|
stream: true
|
|
});
|
|
}
|
|
|
|
let currentResponse = '';
|
|
for await (const chunk of response) {
|
|
let content = '';
|
|
|
|
if (process.env.OPENAI_ASSISTANT_ID) {
|
|
if (Array.isArray(chunk.data.delta?.content) &&
|
|
chunk.data.delta.content[0]?.text) {
|
|
content = chunk.data.delta.content[0].text.value;
|
|
}
|
|
} else {
|
|
content = chunk.choices[0]?.delta?.content;
|
|
}
|
|
|
|
if (content) {
|
|
currentResponse += content;
|
|
currentResponse = removeSpecialCharacters(currentResponse);
|
|
|
|
if (matchesSentenceEnding(currentResponse)) {
|
|
isProcessing = true;
|
|
ttsQueue.push({ message: currentResponse });
|
|
currentResponse = '';
|
|
|
|
if (!isProcessing) {
|
|
ttsQueue.kill();
|
|
console.log('Stop streaming openai...');
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
isAssistantRunning = false;
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error(error);
|
|
isAssistantRunning = false;
|
|
}
|
|
});
|
|
|
|
// Pipe audio stream to recognition stream
|
|
audioStream.pipe(recognizeStream);
|
|
|
|
// Handle incoming socket data
|
|
socket.on('data', data => {
|
|
handlePacket(socket, audioStream, data);
|
|
});
|
|
|
|
// Clean up on socket close
|
|
socket.on('close', () => {
|
|
console.log('Connection closed');
|
|
console.log('Total cost:', totalCost.toFixed(4));
|
|
});
|
|
});
|
|
|
|
// Start the server
|
|
const PORT = process.env.PORT || 3000;
|
|
server.listen(PORT, () => {
|
|
console.log(`Server v${packageInfo.version} listening on port ${PORT}`);
|
|
});
|