|
| 1 | +const WebSocket = require('ws'); |
| 2 | + |
| 3 | +/** |
| 4 | + * |
| 5 | + * @param metadata {Object} Max size can be 4GB |
| 6 | + * @param bufferData {ArrayBuffer} [optional] |
| 7 | + * @return {ArrayBuffer} |
| 8 | + * @private |
| 9 | + */ |
| 10 | +function mergeMetadataAndArrayBuffer(metadata, bufferData) { |
| 11 | + bufferData = bufferData || new ArrayBuffer(0); |
| 12 | + if (typeof metadata !== 'object') { |
| 13 | + throw new Error("metadata should be an object, but was " + typeof metadata); |
| 14 | + } |
| 15 | + if (!(bufferData instanceof ArrayBuffer)) { |
| 16 | + throw new Error("Expected bufferData to be an instance of ArrayBuffer, but was " + typeof bufferData); |
| 17 | + } |
| 18 | + |
| 19 | + const metadataString = JSON.stringify(metadata); |
| 20 | + const metadataUint8Array = new TextEncoder().encode(metadataString); |
| 21 | + const metadataBuffer = metadataUint8Array.buffer; |
| 22 | + const sizePrefixLength = 4; // 4 bytes for a 32-bit integer |
| 23 | + |
| 24 | + if (metadataBuffer.byteLength > 4294000000) { |
| 25 | + throw new Error("metadata too large. Should be below 4,294MB, but was " + metadataBuffer.byteLength); |
| 26 | + } |
| 27 | + |
| 28 | + const concatenatedBuffer = new ArrayBuffer(sizePrefixLength + metadataBuffer.byteLength + bufferData.byteLength); |
| 29 | + const concatenatedUint8Array = new Uint8Array(concatenatedBuffer); |
| 30 | + |
| 31 | + // Write the length of metadataBuffer as a 32-bit integer |
| 32 | + new DataView(concatenatedBuffer).setUint32(0, metadataBuffer.byteLength, true); |
| 33 | + |
| 34 | + // Copy the metadataUint8Array and bufferData (if provided) to the concatenatedUint8Array |
| 35 | + concatenatedUint8Array.set(metadataUint8Array, sizePrefixLength); |
| 36 | + if (bufferData.byteLength > 0) { |
| 37 | + concatenatedUint8Array.set(new Uint8Array(bufferData), sizePrefixLength + metadataBuffer.byteLength); |
| 38 | + } |
| 39 | + |
| 40 | + return concatenatedBuffer; |
| 41 | +} |
| 42 | + |
| 43 | +function splitMetadataAndBuffer(concatenatedBuffer) { |
| 44 | + if(!(concatenatedBuffer instanceof ArrayBuffer)){ |
| 45 | + throw new Error("Expected ArrayBuffer message from websocket"); |
| 46 | + } |
| 47 | + const sizePrefixLength = 4; |
| 48 | + const buffer1Length = new DataView(concatenatedBuffer).getUint32(0, true); // Little endian |
| 49 | + |
| 50 | + const buffer1 = concatenatedBuffer.slice(sizePrefixLength, sizePrefixLength + buffer1Length); |
| 51 | + let buffer2; |
| 52 | + if (concatenatedBuffer.byteLength > sizePrefixLength + buffer1Length) { |
| 53 | + buffer2 = concatenatedBuffer.slice(sizePrefixLength + buffer1Length); |
| 54 | + } |
| 55 | + |
| 56 | + return { |
| 57 | + metadata: JSON.parse(new TextDecoder().decode(buffer1)), |
| 58 | + bufferData: buffer2 |
| 59 | + }; |
| 60 | +} |
| 61 | + |
| 62 | + |
| 63 | +const WS_COMMAND = { |
| 64 | + PING: "ping", |
| 65 | + RESPONSE: "response", |
| 66 | + LARGE_DATA_SOCKET_ANNOUNCE: "largeDataSock" |
| 67 | +}; |
| 68 | + |
| 69 | +const LARGE_DATA_THRESHOLD = 2*1024*1024; // 2MB |
| 70 | +// A map from dataSocketID to the actual data socket that is used for transporting large data only. |
| 71 | +// binary data larger than 2MB is considered large data and we will try to send it through a large data web socket if present. |
| 72 | +// a client typically makes 2 websockets, one for small data and another for large data transport. |
| 73 | +// so large file transfers wont put pressure on the websocket. |
| 74 | +const largeDataSocketMap = {}; |
| 75 | + |
| 76 | +function _getResponse(originalMetadata, data = null) { |
| 77 | + return { |
| 78 | + commandCode: WS_COMMAND.RESPONSE, |
| 79 | + commandId: originalMetadata.commandId, |
| 80 | + socketGroupID: originalMetadata.socketGroupID, |
| 81 | + data |
| 82 | + } |
| 83 | +} |
| 84 | + |
| 85 | +/** |
| 86 | + * |
| 87 | + * @param ws |
| 88 | + * @param metadata |
| 89 | + * @param dataObjectToSend |
| 90 | + * @param dataBuffer {ArrayBuffer} |
| 91 | + * @private |
| 92 | + */ |
| 93 | +function _sendResponse(ws, metadata, dataObjectToSend = null, dataBuffer = new ArrayBuffer(0)) { |
| 94 | + const response = _getResponse(metadata, dataObjectToSend); |
| 95 | + let socketToUse = ws, largeDataSocket = largeDataSocketMap[metadata.socketGroupID]; |
| 96 | + if(dataBuffer && dataBuffer.byteLength > LARGE_DATA_THRESHOLD && largeDataSocket) { |
| 97 | + socketToUse = largeDataSocket; |
| 98 | + } |
| 99 | + socketToUse.send(mergeMetadataAndArrayBuffer(response, dataBuffer)); |
| 100 | +} |
| 101 | + |
| 102 | +function processWSCommand(ws, metadata, dataBuffer) { |
| 103 | + try{ |
| 104 | + switch (metadata.commandCode) { |
| 105 | + case WS_COMMAND.PING: _sendResponse(ws, metadata, metadata.data, dataBuffer); return; |
| 106 | + case WS_COMMAND.LARGE_DATA_SOCKET_ANNOUNCE: |
| 107 | + ws.isLargeData = true; |
| 108 | + ws.LargeDataSocketGroupID = metadata.socketGroupID; |
| 109 | + largeDataSocketMap[metadata.socketGroupID] = ws; |
| 110 | + _sendResponse(ws, metadata, {}, dataBuffer); return; |
| 111 | + default: console.error("unknown command: "+ metadata); |
| 112 | + } |
| 113 | + } catch (e) { |
| 114 | + console.error(e); |
| 115 | + } |
| 116 | +} |
| 117 | + |
| 118 | +function processWebSocketMessage(ws, message) { |
| 119 | + const {metadata, bufferData} = splitMetadataAndBuffer(message); |
| 120 | + processWSCommand(ws, metadata, bufferData); |
| 121 | +} |
| 122 | + |
| 123 | +function CreatePhoenixFsServer(server, wssPath = "/phoenixFS") { |
| 124 | + // Create a WebSocket server by passing the HTTP server instance to WebSocket.Server |
| 125 | + const wss = new WebSocket.Server({ |
| 126 | + noServer: true, |
| 127 | + perMessageDeflate: false, // dont compress to improve performance and since we are on localhost. |
| 128 | + maxPayload: 2048 * 1024 * 1024 // 2GB Max message payload size |
| 129 | + }); |
| 130 | + |
| 131 | + server.on('upgrade', (request, socket, head) => { |
| 132 | + const pathname = new URL(request.url, `http://${request.headers.host}`).pathname; |
| 133 | + if (pathname === wssPath) { |
| 134 | + wss.handleUpgrade(request, socket, head, (ws) => { |
| 135 | + wss.emit('connection', ws, request); |
| 136 | + }); |
| 137 | + } else { |
| 138 | + // Not handling the upgrade here. Let the next listener deal with it. |
| 139 | + } |
| 140 | + }); |
| 141 | + |
| 142 | + // Set up a connection listener |
| 143 | + wss.on('connection', (ws) => { |
| 144 | + console.log('Websocket Client connected'); |
| 145 | + ws.binaryType = 'arraybuffer'; |
| 146 | + |
| 147 | + // Listen for messages from the client |
| 148 | + ws.on('message', (message) => { |
| 149 | + console.log(`Received message ${message} of size: ${message.byteLength}, type: ${typeof message}, isArrayBuffer: ${message instanceof ArrayBuffer}, isBuffer: ${Buffer.isBuffer(message)}`); |
| 150 | + processWebSocketMessage(ws, message); |
| 151 | + }); |
| 152 | + |
| 153 | + ws.on('error', console.error); |
| 154 | + |
| 155 | + // Handle disconnection |
| 156 | + ws.on('close', () => { |
| 157 | + if(ws.isLargeData && ws.socketGroupID && largeDataSocketMap[ws.socketGroupID] === ws){ |
| 158 | + delete largeDataSocketMap[ws.socketGroupID]; |
| 159 | + } |
| 160 | + console.log('Websocket Client disconnected'); |
| 161 | + }); |
| 162 | + }); |
| 163 | +} |
| 164 | + |
| 165 | +exports.CreatePhoenixFsServer = CreatePhoenixFsServer; |
0 commit comments