asterisk/voicebot-rt/app-google-cloud/index.js

405 lines
15 KiB
JavaScript
Raw Normal View History

2024-12-10 19:47:02 +00:00
// ====================================
// Voice Bot Implementation
// A real-time voice chat system that converts speech to text,
// processes it through AI (OpenAI/Botpress), and converts responses back to speech
// ====================================
// Required Dependencies
const net = require('net');
const { SpeechClient } = require('@google-cloud/speech');
const { TextToSpeechClient } = require('@google-cloud/text-to-speech');
const { Transform } = require('stream');
const { OpenAI } = require('openai');
const util = require('util');
const async = require('async');
const { encoding_for_model } = require('tiktoken');
const axios = require('axios');
const { toUUID } = require('./utils');
const packageInfo = require('./package.json');
const prices = require('./prices.json');
const {
matchesSentenceEnding,
removeSpecialCharacters,
calculateMetricsAndPricing
} = require('./utils');
// Load environment variables
require('dotenv').config();
// ====================================
// Packet Protocol Definition
// Defines the types of packets that can be exchanged between client and server
// ====================================
const PACKET_TYPES = {
'TERMINATE': 0x0, // Signal to end the connection
'UUID': 0x1, // Client identification
'AUDIO': 0x10, // Audio data packet
'ERROR': 0xff // Error notification
};
2024-12-10 20:51:33 +00:00
// ====================================
// Socket State Management
// Tracks the state of the socket connection
// ====================================
function isSocketActive(socket) {
return socket && !socket.destroyed && socket.writable;
}
2024-12-10 19:47:02 +00:00
// ====================================
// Packet Handler
// Processes incoming packets based on their type
// ====================================
function handlePacket(socket, audioStream, packet) {
const packetType = packet.readUInt8(0);
const packetLength = packet.readUInt16BE(1);
switch (packetType) {
case PACKET_TYPES.TERMINATE:
2024-12-10 20:51:33 +00:00
console.log('Terminate packet received. Initiating graceful shutdown.');
// Clean up streams and pending operations
if (audioStream) {
audioStream.end();
}
// Set a flag to prevent new operations
socket.isTerminating = true;
// Give time for pending operations to complete
setTimeout(() => {
if (isSocketActive(socket)) {
socket.end();
}
}, 1000);
2024-12-10 19:47:02 +00:00
break;
case PACKET_TYPES.UUID:
2024-12-10 20:51:33 +00:00
if (!isSocketActive(socket)) return;
2024-12-10 19:47:02 +00:00
const uuid = toUUID(packet.slice(3, 19).toString('hex'));
socket.uuid = uuid;
console.log('UUID packet received: ' + uuid);
break;
case PACKET_TYPES.AUDIO:
2024-12-10 20:51:33 +00:00
if (!isSocketActive(socket) || socket.isTerminating) return;
2024-12-10 19:47:02 +00:00
const audioData = packet.slice(3, 3 + packetLength);
2024-12-10 20:51:33 +00:00
if (audioStream && !audioStream.destroyed) {
audioStream.write(audioData);
}
2024-12-10 19:47:02 +00:00
break;
case PACKET_TYPES.ERROR:
2024-12-10 20:51:33 +00:00
if (!isSocketActive(socket)) return;
2024-12-10 19:47:02 +00:00
const errorCode = packetLength > 0 ? packet.readUInt8(3) : null;
console.log('Error packet received with code: ' + errorCode);
break;
default:
console.log('Unknown packet type: ' + packetType);
}
}
2024-12-10 20:51:33 +00:00
// Safe write function to prevent write-after-end errors
function safeSocketWrite(socket, data) {
if (isSocketActive(socket) && !socket.isTerminating) {
try {
socket.write(data);
} catch (err) {
console.error('Error writing to socket:', err);
}
}
}
2024-12-10 19:47:02 +00:00
// ====================================
// Main Server Implementation
// Creates and manages the TCP server that handles client connections
// ====================================
const server = net.createServer(async socket => {
console.log('Client connected');
// State variables
let assistant = null;
let messages = [];
let totalCost = 0;
let isProcessing = false;
let isAssistantRunning = false;
// Initialize AI clients
const ttsClient = new TextToSpeechClient();
const speechClient = new SpeechClient();
// Initialize OpenAI if API key is provided
let openai = null;
if (process.env.OPENAI_API_KEY) {
openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
}
// Initialize Botpress if webhook URL is provided
let botpress = null;
if (process.env.BOTPRESS_WEBHOOK_URL) {
openai = null; // Disable OpenAI if using Botpress
botpress = axios.create({ baseURL: BOTPRESS_WEBHOOK_URL });
}
// Create OpenAI assistant if ID is provided
if (openai && process.env.OPENAI_ASSISTANT_ID) {
assistant = await openai.beta.assistants.create();
}
// ====================================
// Message Handler
// Tracks messages and calculates costs
// ====================================
const handleMessage = (socket, role, content) => {
const messageData = {
uuid: socket.uuid || '',
role: role,
content: content,
...calculateMetricsAndPricing(role, content)
};
// Update costs based on message role
switch (role) {
case 'system':
totalCost += messageData.costByCharacter;
break;
case 'user':
totalCost += messageData.costBySecond;
totalCost += messageData.costByToken;
console.log(messageData);
break;
case 'assistant':
totalCost += messageData.costByCharacter;
totalCost += messageData.costByToken;
console.log(messageData);
break;
}
messages.push(messageData);
};
// ====================================
// Text-to-Speech Handler
// Converts text responses to speech and streams audio back to client
// ====================================
const ttsQueue = async.queue(async task => {
const { message } = task;
const request = {
input: { text: message },
voice: {
languageCode: process.env.TEXT_TO_SPEECH_LANGUAGE || 'en-US',
ssmlGender: process.env.TEXT_TO_SPEECH_GENDER || 'FEMALE',
name: process.env.TEXT_TO_SPEECH_NAME || 'en-US-Journey-F'
},
audioConfig: {
audioEncoding: 'LINEAR16',
sampleRateHertz: 8000,
speakingRate: 1
}
};
try {
const [response] = await ttsClient.synthesizeSpeech(request);
handleMessage(socket, 'assistant', message);
const audioContent = response.audioContent;
const chunkSize = 320;
// Stream audio in chunks
for (let i = 0; i < audioContent.length; i += chunkSize) {
if (!isProcessing) break;
const chunk = audioContent.slice(i, i + chunkSize);
const header = Buffer.alloc(3);
header.writeUInt8(PACKET_TYPES.AUDIO, 0);
header.writeUInt16BE(chunk.length, 1);
const packet = Buffer.concat([header, chunk]);
2024-12-10 20:51:33 +00:00
safeSocketWrite(socket, packet);
2024-12-10 19:47:02 +00:00
await new Promise(resolve => setTimeout(resolve, 20));
}
} catch (error) {
console.error('Error synthesizing speech:', error);
}
}, 1);
ttsQueue.drain(() => {});
// ====================================
// Speech-to-Text Setup
// Configures and manages speech recognition
// ====================================
/* Start addition to check audio level */
let audioSampleWindow = [];
const WINDOW_SIZE = 10; // Number of chunks to analyze
const AUDIO_THRESHOLD = 700; // Adjust this threshold based on testing
/* End addition to check audio level */
const audioStream = new Transform({
transform(chunk, encoding, callback) {
/* Start addition to check audio level */
// Calculate RMS (Root Mean Square) of the audio chunk
let sum = 0;
for (let i = 0; i < chunk.length; i += 2) {
// Convert 2 bytes to a 16-bit integer
const sample = chunk.readInt16LE(i);
sum += sample * sample;
}
const rms = Math.sqrt(sum / (chunk.length / 2));
// Maintain a sliding window of audio levels
audioSampleWindow.push(rms);
if (audioSampleWindow.length > WINDOW_SIZE) {
audioSampleWindow.shift();
}
// Calculate average RMS over the window
const avgRMS = audioSampleWindow.reduce((a, b) => a + b, 0) / audioSampleWindow.length;
// Set isProcessing based on audio level
if (avgRMS > AUDIO_THRESHOLD) {
isProcessing = false;
console.log('Audio level triggered:', avgRMS.toFixed(2));
}
/* End addition to check audio level */
callback(null, chunk);
}
});
// Add system prompt to messages
handleMessage(
socket,
'system',
process.env.SYSTEM_PROMPT || 'You are a helpful assistant.'
);
// Configure speech recognition
const recognitionConfig = {
config: {
encoding: 'LINEAR16',
sampleRateHertz: 8000,
languageCode: process.env.SPEECH_RECOGNITION_LANGUAGE || 'en-US',
model: process.env.SPEECH_RECOGNITION_MODEL || 'phone_call',
useEnhanced: true
},
interimResults: false
};
// Add alternative languages if specified
if (process.env.SPEECH_RECOGNITION_ALTERNATIVE_LANGUAGES) {
recognitionConfig.alternativeLanguageCodes =
process.env.SPEECH_RECOGNITION_ALTERNATIVE_LANGUAGES.split(',');
}
// ====================================
// Speech Recognition Stream Handler
// Processes speech recognition results and manages AI responses
// ====================================
const recognizeStream = speechClient
.streamingRecognize(recognitionConfig)
.on('error', console.error)
.on('data', async data => {
try {
if (
data.results[0]?.alternatives[0]?.transcript &&
data.results[0]?.alternatives[0]?.confidence > 0
) {
const transcript = data.results[0].alternatives[0].transcript.trim();
if (transcript) {
let response;
isProcessing = false;
handleMessage(socket, 'user', transcript);
if (openai) {
if (process.env.OPENAI_ASSISTANT_ID) {
// Wait if assistant is still processing
while (isAssistantRunning) {
console.log('Assistant is running...');
await new Promise(resolve => setTimeout(resolve, 1000));
}
// Process with OpenAI Assistant
await openai.beta.assistants.messages.create(
assistant.id,
{ role: 'user', content: transcript }
);
isAssistantRunning = true;
response = await openai.beta.assistants.runs.create(
assistant.id,
{
assistant_id: process.env.OPENAI_ASSISTANT_ID,
stream: true
}
);
} else {
// Process with standard OpenAI chat
response = await openai.chat.completions.create({
model: process.env.OPENAI_MODEL || 'gpt-3.5-turbo',
messages: messages,
max_tokens: 150,
stream: true
});
}
let currentResponse = '';
for await (const chunk of response) {
let content = '';
if (process.env.OPENAI_ASSISTANT_ID) {
if (Array.isArray(chunk.data.delta?.content) &&
chunk.data.delta.content[0]?.text) {
content = chunk.data.delta.content[0].text.value;
}
} else {
content = chunk.choices[0]?.delta?.content;
}
if (content) {
currentResponse += content;
currentResponse = removeSpecialCharacters(currentResponse);
if (matchesSentenceEnding(currentResponse)) {
isProcessing = true;
ttsQueue.push({ message: currentResponse });
currentResponse = '';
if (!isProcessing) {
ttsQueue.kill();
console.log('Stop streaming openai...');
break;
}
}
}
}
isAssistantRunning = false;
}
}
}
} catch (error) {
console.error(error);
isAssistantRunning = false;
}
});
// Pipe audio stream to recognition stream
audioStream.pipe(recognizeStream);
// Handle incoming socket data
socket.on('data', data => {
handlePacket(socket, audioStream, data);
});
// Clean up on socket close
socket.on('close', () => {
console.log('Connection closed');
console.log('Total cost:', totalCost.toFixed(4));
});
});
// Start the server
const PORT = process.env.PORT || 3000;
server.listen(PORT, () => {
console.log(`Server v${packageInfo.version} listening on port ${PORT}`);
});