diff --git a/voicebot-rt/app-google-cloud/index.js b/voicebot-rt/app-google-cloud/index.js index 4346c69..99ef954 100644 --- a/voicebot-rt/app-google-cloud/index.js +++ b/voicebot-rt/app-google-cloud/index.js @@ -37,6 +37,14 @@ const PACKET_TYPES = { 'ERROR': 0xff // Error notification }; +// ==================================== +// Socket State Management +// Tracks the state of the socket connection +// ==================================== +function isSocketActive(socket) { + return socket && !socket.destroyed && socket.writable; +} + // ==================================== // Packet Handler // Processes incoming packets based on their type @@ -47,22 +55,40 @@ function handlePacket(socket, audioStream, packet) { switch (packetType) { case PACKET_TYPES.TERMINATE: - console.log('Terminate packet received. Closing connection.'); - socket.end(); + console.log('Terminate packet received. Initiating graceful shutdown.'); + // Clean up streams and pending operations + if (audioStream) { + audioStream.end(); + } + + // Set a flag to prevent new operations + socket.isTerminating = true; + + // Give time for pending operations to complete + setTimeout(() => { + if (isSocketActive(socket)) { + socket.end(); + } + }, 1000); break; case PACKET_TYPES.UUID: + if (!isSocketActive(socket)) return; const uuid = toUUID(packet.slice(3, 19).toString('hex')); socket.uuid = uuid; console.log('UUID packet received: ' + uuid); break; case PACKET_TYPES.AUDIO: + if (!isSocketActive(socket) || socket.isTerminating) return; const audioData = packet.slice(3, 3 + packetLength); - audioStream.write(audioData); + if (audioStream && !audioStream.destroyed) { + audioStream.write(audioData); + } break; case PACKET_TYPES.ERROR: + if (!isSocketActive(socket)) return; const errorCode = packetLength > 0 ? packet.readUInt8(3) : null; console.log('Error packet received with code: ' + errorCode); break; @@ -72,6 +98,17 @@ function handlePacket(socket, audioStream, packet) { } } +// Safe write function to prevent write-after-end errors +function safeSocketWrite(socket, data) { + if (isSocketActive(socket) && !socket.isTerminating) { + try { + socket.write(data); + } catch (err) { + console.error('Error writing to socket:', err); + } + } +} + // ==================================== // Main Server Implementation // Creates and manages the TCP server that handles client connections @@ -177,7 +214,7 @@ const server = net.createServer(async socket => { header.writeUInt16BE(chunk.length, 1); const packet = Buffer.concat([header, chunk]); - socket.write(packet); + safeSocketWrite(socket, packet); await new Promise(resolve => setTimeout(resolve, 20)); } } catch (error) {