From 6b9b38a80caa12fc470853d24a696cd1db06f911 Mon Sep 17 00:00:00 2001 From: kjqwer <2990346238@qq.com> Date: Sat, 4 Oct 2025 18:33:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dsse=E7=9B=91=E5=90=AC?= =?UTF-8?q?=E5=99=A8=E6=B2=A1=E6=9C=89=E6=B8=85=E7=90=86=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/routes/download.js | 112 +++++++++++++++++++++------ backend/services/download.js | 9 ++- backend/services/progress-manager.js | 66 +++++++++++++++- 3 files changed, 160 insertions(+), 27 deletions(-) diff --git a/backend/routes/download.js b/backend/routes/download.js index 8aec0d0..b265154 100644 --- a/backend/routes/download.js +++ b/backend/routes/download.js @@ -712,52 +712,116 @@ router.get('/stream/:taskId', async (req, res) => { const downloadService = req.backend.getDownloadService(); // 创建进度监听器 + let isCleanedUp = false; + const cleanup = () => { + if (!isCleanedUp) { + isCleanedUp = true; + downloadService.removeProgressListener(taskId, progressListener); + logger.debug(`SSE连接已清理: ${taskId}`); + } + }; + const progressListener = (task) => { - if (task.id === taskId) { + if (task.id === taskId && !isCleanedUp) { // 使用setImmediate避免阻塞事件循环 setImmediate(() => { try { // 检查连接是否仍然有效 - if (!res.destroyed) { - res.write(`data: ${JSON.stringify({ - type: 'progress', - data: task - })}\n\n`); - - // 如果任务完成,关闭连接 - if (['completed', 'failed', 'cancelled', 'partial'].includes(task.status)) { - res.write(`data: ${JSON.stringify({ - type: 'complete', - data: task - })}\n\n`); - res.end(); - downloadService.removeProgressListener(taskId, progressListener); - } + if (!res.destroyed && !res.writableEnded) { + res.write(`data: ${JSON.stringify({ + type: 'progress', + data: task + })}\n\n`); + + // 如果任务完成,关闭连接 + if (['completed', 'failed', 'cancelled', 'partial'].includes(task.status)) { + res.write(`data: ${JSON.stringify({ + type: 'complete', + data: task + })}\n\n`); + res.end(); + cleanup(); + } + } else { + // 连接已断开,清理监听器 + cleanup(); } } catch (error) { logger.error('SSE写入失败:', error); // 连接可能已断开,清理监听器 - downloadService.removeProgressListener(taskId, progressListener); + cleanup(); + try { + if (!res.destroyed) { + res.end(); + } + } catch (endError) { + logger.error('关闭SSE连接失败:', endError); + } } }); } }; // 注册监听器 - downloadService.addProgressListener(taskId, progressListener); + const listenerAdded = downloadService.addProgressListener(taskId, progressListener); + if (!listenerAdded) { + logger.error(`无法为任务 ${taskId} 添加监听器,可能已达到限制`); + res.status(503).json({ + error: true, + message: 'Too many listeners, please try again later' + }); + return; + } // 立即发送当前状态 const currentTask = downloadService.getTask(taskId); if (currentTask) { - res.write(`data: ${JSON.stringify({ - type: 'progress', - data: currentTask - })}\n\n`); + try { + res.write(`data: ${JSON.stringify({ + type: 'progress', + data: currentTask + })}\n\n`); + } catch (error) { + logger.error('发送初始状态失败:', error); + cleanup(); + return; + } } + // 设置连接超时(30秒) + const timeout = setTimeout(() => { + logger.warn(`SSE连接超时,主动关闭: ${taskId}`); + cleanup(); + try { + if (!res.destroyed) { + res.end(); + } + } catch (error) { + logger.error('超时关闭连接失败:', error); + } + }, 30000); + // 客户端断开连接时清理 - req.on('close', () => { - downloadService.removeProgressListener(taskId, progressListener); + req.on('close', cleanup); + req.on('error', (error) => { + logger.error('SSE请求错误:', error); + cleanup(); + }); + + // 响应对象错误处理 + res.on('error', (error) => { + logger.error('SSE响应错误:', error); + cleanup(); + }); + + res.on('finish', () => { + clearTimeout(timeout); + cleanup(); + }); + + res.on('close', () => { + clearTimeout(timeout); + cleanup(); }); }); diff --git a/backend/services/download.js b/backend/services/download.js index ccc33fc..63d3627 100644 --- a/backend/services/download.js +++ b/backend/services/download.js @@ -45,6 +45,9 @@ class DownloadService { // 先创建下载执行器,稍后在init方法中设置downloadService引用 this.downloadExecutor = new DownloadExecutor(this.fileManager, this.taskManager, this.progressManager, this.historyManager, this); + // 启动监听器定期清理检查 + this.progressManager.startPeriodicCleanup(); + this.initialized = false; } @@ -92,7 +95,11 @@ class DownloadService { // 代理方法 - 进度管理 addProgressListener(taskId, listener) { - return this.progressManager.addProgressListener(taskId, listener); + const result = this.progressManager.addProgressListener(taskId, listener); + if (!result) { + logger.warn(`添加进度监听器失败: ${taskId}`); + } + return result; } removeProgressListener(taskId, listener) { diff --git a/backend/services/progress-manager.js b/backend/services/progress-manager.js index 97ffaf1..f43af12 100644 --- a/backend/services/progress-manager.js +++ b/backend/services/progress-manager.js @@ -15,16 +15,37 @@ class ProgressManager { this.throttleControl = new Map(); // 节流间隔(毫秒) this.throttleInterval = 100; + // 每个任务的最大监听器数量 + this.maxListenersPerTask = 10; + // 全局最大监听器数量 + this.maxTotalListeners = 100; } /** * 添加进度监听器 */ addProgressListener(taskId, listener) { + // 检查全局监听器数量限制 + if (this.getTotalListenerCount() >= this.maxTotalListeners) { + logger.warn(`全局监听器数量已达上限 (${this.maxTotalListeners}),拒绝添加新监听器`); + return false; + } + if (!this.progressListeners.has(taskId)) { this.progressListeners.set(taskId, []); } - this.progressListeners.get(taskId).push(listener); + + const listeners = this.progressListeners.get(taskId); + + // 检查单个任务的监听器数量限制 + if (listeners.length >= this.maxListenersPerTask) { + logger.warn(`任务 ${taskId} 的监听器数量已达上限 (${this.maxListenersPerTask}),拒绝添加新监听器`); + return false; + } + + listeners.push(listener); + logger.debug(`为任务 ${taskId} 添加监听器,当前数量: ${listeners.length}`); + return true; } /** @@ -36,12 +57,16 @@ class ProgressManager { const index = listeners.indexOf(listener); if (index > -1) { listeners.splice(index, 1); + logger.debug(`从任务 ${taskId} 移除监听器,剩余数量: ${listeners.length}`); } if (listeners.length === 0) { this.progressListeners.delete(taskId); // 清理节流控制 this.throttleControl.delete(taskId); + logger.debug(`任务 ${taskId} 的所有监听器已清理`); } + } else { + logger.debug(`尝试移除不存在任务 ${taskId} 的监听器`); } } @@ -125,8 +150,45 @@ class ProgressManager { * 清理所有监听器 */ clearAllListeners() { + const totalCount = this.getTotalListenerCount(); this.progressListeners.clear(); + this.throttleControl.clear(); + logger.info(`已清理所有监听器,共 ${totalCount} 个`); + } + + /** + * 定期清理超时的监听器(可选功能) + */ + startPeriodicCleanup(intervalMs = 300000) { // 默认5分钟 + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + } + + this.cleanupInterval = setInterval(() => { + const totalListeners = this.getTotalListenerCount(); + if (totalListeners > 0) { + logger.debug(`定期检查: 当前活跃监听器数量 ${totalListeners}`); + + // 如果监听器数量过多,记录警告 + if (totalListeners > this.maxTotalListeners * 0.8) { + logger.warn(`监听器数量接近上限: ${totalListeners}/${this.maxTotalListeners}`); + } + } + }, intervalMs); + + logger.info(`已启动监听器定期清理检查,间隔 ${intervalMs / 1000} 秒`); + } + + /** + * 停止定期清理 + */ + stopPeriodicCleanup() { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + logger.info('已停止监听器定期清理检查'); + } } } -module.exports = ProgressManager; \ No newline at end of file +module.exports = ProgressManager; \ No newline at end of file