// ====================================
// Voice Bot Implementation
// A real-time voice chat system that converts speech to text,
// processes it through AI (OpenAI/Botpress), and converts responses back to speech
// ====================================

// Required Dependencies
const net = require('net');
const { SpeechClient } = require('@google-cloud/speech');
const { TextToSpeechClient } = require('@google-cloud/text-to-speech');
const { Transform } = require('stream');
const { OpenAI } = require('openai');
const util = require('util');
const async = require('async');
const { encoding_for_model } = require('tiktoken');
const axios = require('axios');
const { toUUID } = require('./utils');
const packageInfo = require('./package.json');
const prices = require('./prices.json');
const {
    matchesSentenceEnding,
    removeSpecialCharacters,
    calculateMetricsAndPricing
} = require('./utils');

// Load environment variables
require('dotenv').config();

// ====================================
// Packet Protocol Definition
// Defines the types of packets that can be exchanged between client and server
// ====================================
const PACKET_TYPES = {
    'TERMINATE': 0x0,  // Signal to end the connection
    'UUID': 0x1,      // Client identification
    'AUDIO': 0x10,    // Audio data packet
    'ERROR': 0xff     // Error notification
};

// ====================================
// Socket State Management
// Tracks the state of the socket connection
// ====================================
function isSocketActive(socket) {
    return socket && !socket.destroyed && socket.writable;
}

// ====================================
// Packet Handler
// Processes incoming packets based on their type
// ====================================
function handlePacket(socket, audioStream, packet) {
    const packetType = packet.readUInt8(0);
    const packetLength = packet.readUInt16BE(1);

    switch (packetType) {
        case PACKET_TYPES.TERMINATE:
            console.log('Terminate packet received. Initiating graceful shutdown.');
            // Clean up streams and pending operations
            if (audioStream) {
                audioStream.end();
            }
            
            // Set a flag to prevent new operations
            socket.isTerminating = true;
            
            // Give time for pending operations to complete
            setTimeout(() => {
                if (isSocketActive(socket)) {
                    socket.end();
                }
            }, 1000);
            break;

        case PACKET_TYPES.UUID:
            if (!isSocketActive(socket)) return;
            const uuid = toUUID(packet.slice(3, 19).toString('hex'));
            socket.uuid = uuid;
            console.log('UUID packet received: ' + uuid);
            break;

        case PACKET_TYPES.AUDIO:
            if (!isSocketActive(socket) || socket.isTerminating) return;
            const audioData = packet.slice(3, 3 + packetLength);
            if (audioStream && !audioStream.destroyed) {
                audioStream.write(audioData);
            }
            break;

        case PACKET_TYPES.ERROR:
            if (!isSocketActive(socket)) return;
            const errorCode = packetLength > 0 ? packet.readUInt8(3) : null;
            console.log('Error packet received with code: ' + errorCode);
            break;

        default:
            console.log('Unknown packet type: ' + packetType);
    }
}

// Safe write function to prevent write-after-end errors
function safeSocketWrite(socket, data) {
    if (isSocketActive(socket) && !socket.isTerminating) {
        try {
            socket.write(data);
        } catch (err) {
            console.error('Error writing to socket:', err);
        }
    }
}

// ====================================
// Main Server Implementation
// Creates and manages the TCP server that handles client connections
// ====================================
const server = net.createServer(async socket => {
    console.log('Client connected');

    // State variables
    let assistant = null;
    let messages = [];
    let totalCost = 0;
    let isProcessing = false;
    let isAssistantRunning = false;

    // Initialize AI clients
    const ttsClient = new TextToSpeechClient();
    const speechClient = new SpeechClient();
    
    // Initialize OpenAI if API key is provided
    let openai = null;
    if (process.env.OPENAI_API_KEY) {
        openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
    }

    // Initialize Botpress if webhook URL is provided
    let botpress = null;
    if (process.env.BOTPRESS_WEBHOOK_URL) {
        openai = null; // Disable OpenAI if using Botpress
        botpress = axios.create({ baseURL: BOTPRESS_WEBHOOK_URL });
    }

    // Create OpenAI assistant if ID is provided
    if (openai && process.env.OPENAI_ASSISTANT_ID) {
        assistant = await openai.beta.assistants.create();
    }

    // ====================================
    // Message Handler
    // Tracks messages and calculates costs
    // ====================================
    const handleMessage = (socket, role, content) => {
        const messageData = {
            uuid: socket.uuid || '',
            role: role,
            content: content,
            ...calculateMetricsAndPricing(role, content)
        };

        // Update costs based on message role
        switch (role) {
            case 'system':
                totalCost += messageData.costByCharacter;
                break;
            case 'user':
                totalCost += messageData.costBySecond;
                totalCost += messageData.costByToken;
                console.log(messageData);
                break;
            case 'assistant':
                totalCost += messageData.costByCharacter;
                totalCost += messageData.costByToken;
                console.log(messageData);
                break;
        }

        messages.push(messageData);
    };

    // ====================================
    // Text-to-Speech Handler
    // Converts text responses to speech and streams audio back to client
    // ====================================
    const ttsQueue = async.queue(async task => {
        const { message } = task;
        const request = {
            input: { text: message },
            voice: {
                languageCode: process.env.TEXT_TO_SPEECH_LANGUAGE || 'en-US',
                ssmlGender: process.env.TEXT_TO_SPEECH_GENDER || 'FEMALE',
                name: process.env.TEXT_TO_SPEECH_NAME || 'en-US-Journey-F'
            },
            audioConfig: {
                audioEncoding: 'LINEAR16',
                sampleRateHertz: 8000,
                speakingRate: 1
            }
        };

        try {
            const [response] = await ttsClient.synthesizeSpeech(request);
            handleMessage(socket, 'assistant', message);

            const audioContent = response.audioContent;
            const chunkSize = 320;

            // Stream audio in chunks
            for (let i = 0; i < audioContent.length; i += chunkSize) {
                if (!isProcessing) break;

                const chunk = audioContent.slice(i, i + chunkSize);
                const header = Buffer.alloc(3);
                header.writeUInt8(PACKET_TYPES.AUDIO, 0);
                header.writeUInt16BE(chunk.length, 1);

                const packet = Buffer.concat([header, chunk]);
                safeSocketWrite(socket, packet);
                await new Promise(resolve => setTimeout(resolve, 20));
            }
        } catch (error) {
            console.error('Error synthesizing speech:', error);
        }
    }, 1);

    ttsQueue.drain(() => {});

    // ====================================
    // Speech-to-Text Setup
    // Configures and manages speech recognition
    // ====================================
    /* Start addition to check audio level */
    let audioSampleWindow = [];
    const WINDOW_SIZE = 10;  // Number of chunks to analyze
    const AUDIO_THRESHOLD = 700;  // Adjust this threshold based on testing
    /* End addition to check audio level */

    const audioStream = new Transform({
        transform(chunk, encoding, callback) {
            /* Start addition to check audio level */
            // Calculate RMS (Root Mean Square) of the audio chunk
            let sum = 0;
            for (let i = 0; i < chunk.length; i += 2) {
                // Convert 2 bytes to a 16-bit integer
                const sample = chunk.readInt16LE(i);
                sum += sample * sample;
            }
            const rms = Math.sqrt(sum / (chunk.length / 2));

            // Maintain a sliding window of audio levels
            audioSampleWindow.push(rms);
            if (audioSampleWindow.length > WINDOW_SIZE) {
                audioSampleWindow.shift();
            }

            // Calculate average RMS over the window
            const avgRMS = audioSampleWindow.reduce((a, b) => a + b, 0) / audioSampleWindow.length;

            // Set isProcessing based on audio level
            if (avgRMS > AUDIO_THRESHOLD) {
                isProcessing = false;
                console.log('Audio level triggered:', avgRMS.toFixed(2));
            }
            /* End addition to check audio level */

            callback(null, chunk);
        }
    });

    // Add system prompt to messages
    handleMessage(
        socket,
        'system',
        process.env.SYSTEM_PROMPT || 'You are a helpful assistant.'
    );

    // Configure speech recognition
    const recognitionConfig = {
        config: {
            encoding: 'LINEAR16',
            sampleRateHertz: 8000,
            languageCode: process.env.SPEECH_RECOGNITION_LANGUAGE || 'en-US',
            model: process.env.SPEECH_RECOGNITION_MODEL || 'phone_call',
            useEnhanced: true
        },
        interimResults: false
    };

    // Add alternative languages if specified
    if (process.env.SPEECH_RECOGNITION_ALTERNATIVE_LANGUAGES) {
        recognitionConfig.alternativeLanguageCodes =
            process.env.SPEECH_RECOGNITION_ALTERNATIVE_LANGUAGES.split(',');
    }

    // ====================================
    // Speech Recognition Stream Handler
    // Processes speech recognition results and manages AI responses
    // ====================================
    const recognizeStream = speechClient
        .streamingRecognize(recognitionConfig)
        .on('error', console.error)
        .on('data', async data => {
            try {
                if (
                    data.results[0]?.alternatives[0]?.transcript &&
                    data.results[0]?.alternatives[0]?.confidence > 0
                ) {
                    const transcript = data.results[0].alternatives[0].transcript.trim();
                    
                    if (transcript) {
                        let response;
                        isProcessing = false;
                        handleMessage(socket, 'user', transcript);

                        if (openai) {
                            if (process.env.OPENAI_ASSISTANT_ID) {
                                // Wait if assistant is still processing
                                while (isAssistantRunning) {
                                    console.log('Assistant is running...');
                                    await new Promise(resolve => setTimeout(resolve, 1000));
                                }

                                // Process with OpenAI Assistant
                                await openai.beta.assistants.messages.create(
                                    assistant.id,
                                    { role: 'user', content: transcript }
                                );
                                isAssistantRunning = true;
                                response = await openai.beta.assistants.runs.create(
                                    assistant.id,
                                    {
                                        assistant_id: process.env.OPENAI_ASSISTANT_ID,
                                        stream: true
                                    }
                                );
                            } else {
                                // Process with standard OpenAI chat
                                response = await openai.chat.completions.create({
                                    model: process.env.OPENAI_MODEL || 'gpt-3.5-turbo',
                                    messages: messages,
                                    max_tokens: 150,
                                    stream: true
                                });
                            }

                            let currentResponse = '';
                            for await (const chunk of response) {
                                let content = '';
                                
                                if (process.env.OPENAI_ASSISTANT_ID) {
                                    if (Array.isArray(chunk.data.delta?.content) &&
                                        chunk.data.delta.content[0]?.text) {
                                        content = chunk.data.delta.content[0].text.value;
                                    }
                                } else {
                                    content = chunk.choices[0]?.delta?.content;
                                }

                                if (content) {
                                    currentResponse += content;
                                    currentResponse = removeSpecialCharacters(currentResponse);

                                    if (matchesSentenceEnding(currentResponse)) {
                                        isProcessing = true;
                                        ttsQueue.push({ message: currentResponse });
                                        currentResponse = '';

                                        if (!isProcessing) {
                                            ttsQueue.kill();
                                            console.log('Stop streaming openai...');
                                            break;
                                        }
                                    }
                                }
                            }
                            isAssistantRunning = false;
                        }
                    }
                }
            } catch (error) {
                console.error(error);
                isAssistantRunning = false;
            }
        });

    // Pipe audio stream to recognition stream
    audioStream.pipe(recognizeStream);

    // Handle incoming socket data
    socket.on('data', data => {
        handlePacket(socket, audioStream, data);
    });

    // Clean up on socket close
    socket.on('close', () => {
        console.log('Connection closed');
        console.log('Total cost:', totalCost.toFixed(4));
    });
});

// Start the server
const PORT = process.env.PORT || 3000;
server.listen(PORT, () => {
    console.log(`Server v${packageInfo.version} listening on port ${PORT}`);
});