diff --git a/backend/routes/download.js b/backend/routes/download.js index b265154..d9de9ed 100644 --- a/backend/routes/download.js +++ b/backend/routes/download.js @@ -690,17 +690,16 @@ router.delete('/files', async (req, res) => { * SSE端点 - 实时推送下载进度 * GET /api/download/stream/:taskId */ -router.get('/stream/:taskId', async (req, res) => { - const { taskId } = req.params; +router.get('/stream/:taskId', (req, res) => { + const taskId = req.params.taskId; - if (!taskId) { - return res.status(400).json({ - success: false, - error: 'Task ID is required' - }); - } + logger.debug(`SSE连接建立: ${taskId}`, { + taskId, + clientIP: req.ip, + userAgent: req.get('User-Agent') + }); - // 设置SSE头部 + // 设置SSE响应头 res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', @@ -709,120 +708,201 @@ router.get('/stream/:taskId', async (req, res) => { 'Access-Control-Allow-Headers': 'Cache-Control' }); + // 发送初始连接确认 + res.write('data: {"type":"connected","taskId":"' + taskId + '"}\n\n'); + const downloadService = req.backend.getDownloadService(); - + // 创建进度监听器 - let isCleanedUp = false; - const cleanup = () => { - if (!isCleanedUp) { - isCleanedUp = true; - downloadService.removeProgressListener(taskId, progressListener); - logger.debug(`SSE连接已清理: ${taskId}`); - } - }; - const progressListener = (task) => { - if (task.id === taskId && !isCleanedUp) { - // 使用setImmediate避免阻塞事件循环 - setImmediate(() => { - try { - // 检查连接是否仍然有效 - if (!res.destroyed && !res.writableEnded) { - res.write(`data: ${JSON.stringify({ - type: 'progress', - data: task - })}\n\n`); - - // 如果任务完成,关闭连接 - if (['completed', 'failed', 'cancelled', 'partial'].includes(task.status)) { - res.write(`data: ${JSON.stringify({ - type: 'complete', - data: task - })}\n\n`); - res.end(); - cleanup(); - } - } else { - // 连接已断开,清理监听器 - cleanup(); - } - } catch (error) { - logger.error('SSE写入失败:', error); - // 连接可能已断开,清理监听器 - cleanup(); - try { - if (!res.destroyed) { - res.end(); - } - } catch (endError) { - logger.error('关闭SSE连接失败:', endError); - } - } + try { + if (res.writableEnded || res.destroyed || isCleanedUp) { + logger.debug(`SSE连接已关闭,跳过发送: ${taskId}`); + return; + } + + const data = JSON.stringify({ + type: 'progress', + task: task }); + + res.write(`data: ${data}\n\n`); + logger.debug(`SSE进度更新发送: ${taskId}`, { + status: task.status, + progress: task.progress, + completed: task.completed_files, + failed: task.failed_files + }); + } catch (error) { + // 区分正常断开和异常错误 + if (isNormalDisconnectError(error)) { + logger.debug(`SSE发送数据时连接正常断开: ${taskId}`, { + code: error.code, + message: error.message + }); + } else { + logger.error(`SSE发送数据异常失败: ${taskId}`, { + error: error.message, + taskId + }); + } + // 发送失败时移除监听器并清理连接 + downloadService.removeProgressListener(taskId, progressListener); + cleanup('progress_send_error'); } }; - // 注册监听器 - const listenerAdded = downloadService.addProgressListener(taskId, progressListener); - if (!listenerAdded) { - logger.error(`无法为任务 ${taskId} 添加监听器,可能已达到限制`); - res.status(503).json({ - error: true, - message: 'Too many listeners, please try again later' - }); - return; - } + // 注册进度监听器 + downloadService.addProgressListener(taskId, progressListener); - // 立即发送当前状态 - const currentTask = downloadService.getTask(taskId); - if (currentTask) { - try { - res.write(`data: ${JSON.stringify({ - type: 'progress', - data: currentTask - })}\n\n`); - } catch (error) { - logger.error('发送初始状态失败:', error); - cleanup(); - return; + // 设置连接超时 - 增加到60秒,并添加心跳机制 + let connectionTimeout; + let heartbeatInterval; + + const resetTimeout = () => { + if (connectionTimeout) { + clearTimeout(connectionTimeout); } - } + connectionTimeout = setTimeout(() => { + logger.info(`SSE连接超时,关闭连接: ${taskId}`); + cleanup('connection_timeout'); + if (!res.writableEnded && !isCleanedUp) { + try { + res.write('data: {"type":"timeout"}\n\n'); + res.end(); + } catch (error) { + // 忽略写入已关闭连接的错误 + logger.debug(`SSE连接已关闭,无法发送超时消息: ${taskId}`); + } + } + }, 60000); // 60秒超时 + }; - // 设置连接超时(30秒) - const timeout = setTimeout(() => { - logger.warn(`SSE连接超时,主动关闭: ${taskId}`); - cleanup(); + // 启动心跳机制 + heartbeatInterval = setInterval(() => { try { - if (!res.destroyed) { - res.end(); + if (!res.writableEnded && !res.destroyed && !isCleanedUp) { + res.write('data: {"type":"heartbeat"}\n\n'); + resetTimeout(); // 心跳时重置超时 + } else { + clearInterval(heartbeatInterval); + heartbeatInterval = null; } } catch (error) { - logger.error('超时关闭连接失败:', error); + // 区分正常断开和异常错误 + if (isNormalDisconnectError(error)) { + logger.debug(`SSE心跳发送时连接正常断开: ${taskId}`, { + code: error.code, + message: error.message + }); + } else { + logger.warn(`SSE心跳发送异常失败: ${taskId}`, error); + } + clearInterval(heartbeatInterval); + heartbeatInterval = null; + cleanup('heartbeat_error'); } - }, 30000); + }, 15000); // 每15秒发送心跳 + + resetTimeout(); + + // 连接状态跟踪 + let isCleanedUp = false; + let isNormalDisconnect = false; + + // 清理函数 + const cleanup = (reason = 'unknown') => { + if (isCleanedUp) { + return; // 避免重复清理 + } + isCleanedUp = true; + + if (connectionTimeout) { + clearTimeout(connectionTimeout); + connectionTimeout = null; + } + if (heartbeatInterval) { + clearInterval(heartbeatInterval); + heartbeatInterval = null; + } + downloadService.removeProgressListener(taskId, progressListener); + logger.debug(`SSE连接清理完成: ${taskId}`, { reason }); + }; + + // 判断是否为正常断开连接的错误 + const isNormalDisconnectError = (error) => { + if (!error) return false; + + // 常见的正常断开连接错误码 + const normalErrorCodes = [ + 'ECONNRESET', // 连接被重置 + 'EPIPE', // 管道破裂 + 'ECONNABORTED', // 连接被中止 + 'ECANCELED' // 请求被取消 + ]; + + return normalErrorCodes.includes(error.code) || + error.message === 'aborted' || + error.message.includes('aborted'); + }; + + // 监听客户端断开连接 + req.on('close', () => { + isNormalDisconnect = true; + logger.debug(`SSE客户端断开连接: ${taskId}`); + cleanup('client_close'); + }); - // 客户端断开连接时清理 - req.on('close', cleanup); req.on('error', (error) => { - logger.error('SSE请求错误:', error); - cleanup(); + if (isNormalDisconnectError(error)) { + // 正常断开连接,使用debug级别日志 + logger.debug(`SSE客户端正常断开: ${taskId}`, { + code: error.code, + message: error.message + }); + } else { + // 真正的异常错误,使用error级别日志 + logger.error(`SSE请求异常错误: ${taskId}`, error); + } + cleanup('request_error'); }); - // 响应对象错误处理 res.on('error', (error) => { - logger.error('SSE响应错误:', error); - cleanup(); - }); - - res.on('finish', () => { - clearTimeout(timeout); - cleanup(); + if (isNormalDisconnectError(error)) { + // 正常断开连接,使用debug级别日志 + logger.debug(`SSE响应正常断开: ${taskId}`, { + code: error.code, + message: error.message + }); + } else { + // 真正的异常错误,使用error级别日志 + logger.error(`SSE响应异常错误: ${taskId}`, error); + } + cleanup('response_error'); }); res.on('close', () => { - clearTimeout(timeout); - cleanup(); + logger.debug(`SSE响应关闭: ${taskId}`); + cleanup('response_close'); }); + + // 检查任务状态,如果任务已完成则立即关闭连接 + const task = downloadService.getTask(taskId); + if (task && ['completed', 'failed', 'cancelled', 'partial'].includes(task.status)) { + logger.debug(`任务已完成,关闭SSE连接: ${taskId}`, { status: task.status }); + setTimeout(() => { + cleanup('task_completed'); + if (!res.writableEnded && !isCleanedUp) { + try { + res.write(`data: {"type":"completed","status":"${task.status}"}\n\n`); + res.end(); + } catch (error) { + // 忽略写入已关闭连接的错误 + logger.debug(`SSE连接已关闭,无法发送完成消息: ${taskId}`); + } + } + }, 1000); // 延迟1秒关闭,确保最后的状态更新被发送 + } }); /** diff --git a/backend/services/download-executor.js b/backend/services/download-executor.js index 041bdad..dc6b09b 100644 --- a/backend/services/download-executor.js +++ b/backend/services/download-executor.js @@ -294,102 +294,142 @@ class DownloadExecutor { const batch = items.slice(i, i + batchSize); const batchPromises = batch.map(async item => { - try { - // 检查是否应该暂停(在每个作品下载前检查) - if (this.shouldPause(task.id)) { - logger.info('批量下载任务已暂停,停止当前作品下载:', task.id); - // 设置任务状态为暂停 - task.status = 'paused'; - await this.taskManager.saveTasks(); - this.progressManager.notifyProgressUpdate(task.id, task); - return { artwork_id: typeof item === 'object' ? item.id : item, success: false, paused: true }; - } - - // 获取作品ID - 支持直接传入ID或作品对象 - const artworkId = typeof item === 'object' ? item.id : item; - - // 使用专门的批量下载方法,避免创建重复任务 - const downloadResult = await this.downloadService.downloadSingleArtworkForBatch(artworkId, { - size, - quality, - format, - skipExisting: true - }); + // 为每个下载添加超时控制,防止单个下载卡住整个批次 + return Promise.race([ + // 实际下载Promise + (async () => { + try { + // 检查是否应该暂停(在每个作品下载前检查) + if (this.shouldPause(task.id)) { + logger.info('批量下载任务已暂停,停止当前作品下载:', task.id); + // 设置任务状态为暂停 + task.status = 'paused'; + await this.taskManager.saveTasks(); + this.progressManager.notifyProgressUpdate(task.id, task); + return { artwork_id: typeof item === 'object' ? item.id : item, success: false, paused: true }; + } + + // 获取作品ID - 支持直接传入ID或作品对象 + const artworkId = typeof item === 'object' ? item.id : item; + + // 使用专门的批量下载方法,避免创建重复任务 + const downloadResult = await this.downloadService.downloadSingleArtworkForBatch(artworkId, { + size, + quality, + format, + skipExisting: true + }); - if (downloadResult.success) { - // 检查是否跳过下载 - if (downloadResult.skipped) { - // 跳过下载,不计入失败,但也不计入完成 - const result = { artwork_id: artworkId, success: true, skipped: true }; - results.push(result); - return result; - } else { - // 真正下载成功,立即添加到注册表 - task.completed_files++; - - // 立即添加到下载注册表 - try { - await this.downloadService.downloadRegistry.addArtwork( - downloadResult.artist_name, - artworkId - ); - logger.debug(`批量下载中的作品 ${artworkId} 已添加到下载注册表`, { - artworkId, - artistName: downloadResult.artist_name, - taskId: task.id - }); - } catch (error) { - logger.error(`批量下载中添加作品到注册表失败: ${artworkId}`, { - artworkId, - artistName: downloadResult.artist_name, - taskId: task.id, - error: error.message, - stack: error.stack - }); + if (downloadResult.success) { + // 检查是否跳过下载 + if (downloadResult.skipped) { + // 跳过下载,不计入失败,但也不计入完成 + const result = { artwork_id: artworkId, success: true, skipped: true }; + results.push(result); + return result; + } else { + // 真正下载成功,立即添加到注册表 + task.completed_files++; + + // 立即添加到下载注册表 + try { + await this.downloadService.downloadRegistry.addArtwork( + downloadResult.artist_name, + artworkId + ); + logger.debug(`批量下载中的作品 ${artworkId} 已添加到下载注册表`, { + artworkId, + artistName: downloadResult.artist_name, + taskId: task.id + }); + } catch (error) { + logger.error(`批量下载中添加作品到注册表失败: ${artworkId}`, { + artworkId, + artistName: downloadResult.artist_name, + taskId: task.id, + error: error.message, + stack: error.stack + }); + } + + // 添加到最近完成列表 + const completedItem = { + artwork_id: artworkId, + artwork_title: downloadResult.artwork_title || + (typeof item === 'object' ? item.title : null) || + `作品 ${artworkId}`, + artist_name: downloadResult.artist_name || + (typeof item === 'object' ? item.user?.name : null) || + '未知作者' + }; + + recentCompleted.unshift(completedItem); + // 只保留最近5个 + if (recentCompleted.length > 5) { + recentCompleted.pop(); + } + + // 更新任务的recent_completed + task.recent_completed = [...recentCompleted]; + + const result = { artwork_id: artworkId, success: true }; + results.push(result); + return result; + } + } else { + // 下载失败 + task.failed_files++; + const result = { artwork_id: artworkId, success: false, error: downloadResult.error }; + results.push(result); + return result; } - - // 添加到最近完成列表 - const completedItem = { - artwork_id: artworkId, - artwork_title: downloadResult.artwork_title || - (typeof item === 'object' ? item.title : null) || - `作品 ${artworkId}`, - artist_name: downloadResult.artist_name || - (typeof item === 'object' ? item.user?.name : null) || - '未知作者' - }; - - recentCompleted.unshift(completedItem); - // 只保留最近5个 - if (recentCompleted.length > 5) { - recentCompleted.pop(); - } - - // 更新任务的recent_completed - task.recent_completed = [...recentCompleted]; - - const result = { artwork_id: artworkId, success: true }; + } catch (error) { + // 异常情况 + const artworkId = typeof item === 'object' ? item.id : item; + task.failed_files++; + const result = { artwork_id: artworkId, success: false, error: error.message }; results.push(result); return result; } - } else { - // 下载失败 - task.failed_files++; - const result = { artwork_id: artworkId, success: false, error: downloadResult.error }; - results.push(result); - return result; - } - } catch (error) { - // 异常情况 + })(), + // 超时Promise - 防止单个下载卡住整个批次 + new Promise((_, reject) => { + setTimeout(() => { + const artworkId = typeof item === 'object' ? item.id : item; + logger.warn(`作品下载超时,跳过: ${artworkId}`, { taskId: task.id, timeout: '120s' }); + reject(new Error(`下载超时: ${artworkId}`)); + }, 120000); // 2分钟超时 + }) + ]).catch(error => { + // 处理超时或其他错误 const artworkId = typeof item === 'object' ? item.id : item; task.failed_files++; const result = { artwork_id: artworkId, success: false, error: error.message }; results.push(result); return result; - } + }); }); - await Promise.all(batchPromises); + // 使用 Promise.allSettled 替代 Promise.all,确保不会因为单个Promise卡住而阻塞整个批次 + const batchResults = await Promise.allSettled(batchPromises); + + // 处理结果,确保所有Promise都有结果 + batchResults.forEach((result, index) => { + if (result.status === 'rejected') { + const artworkId = typeof batch[index] === 'object' ? batch[index].id : batch[index]; + logger.error(`批次中的作品处理失败: ${artworkId}`, { + error: result.reason?.message || result.reason, + taskId: task.id + }); + // 确保失败的作品也被计入 + task.failed_files++; + results.push({ + artwork_id: artworkId, + success: false, + error: result.reason?.message || '未知错误' + }); + } + }); // 更新进度并通知 task.progress = Math.round((task.completed_files / task.total_files) * 100); diff --git a/backend/services/file-manager.js b/backend/services/file-manager.js index a0d60e8..bc33957 100644 --- a/backend/services/file-manager.js +++ b/backend/services/file-manager.js @@ -307,11 +307,55 @@ class FileManager { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }, timeout: downloadConfig.timeout, - signal: abortController ? abortController.signal : undefined + signal: abortController ? abortController.signal : undefined, + // 添加连接超时和响应超时配置 + httpsAgent: new (require('https')).Agent({ + keepAlive: true, + timeout: 60000, // 连接超时60秒 + }), + // 添加重试配置 + validateStatus: (status) => status < 500, // 只对5xx错误重试 }); // 使用增强的写入流创建方法 writer = await FileUtils.safeCreateWriteStream(filePath); + let downloadedBytes = 0; + const totalBytes = parseInt(response.headers['content-length']) || 0; + + // 设置流超时 + let streamTimeout = setTimeout(() => { + logger.warn(`流传输超时,中断下载: ${filePath}`); + if (writer && !writer.destroyed) { + writer.destroy(); + } + if (abortController) { + abortController.abort(); + } + }, downloadConfig.timeout + 60000); + + response.data.on('data', (chunk) => { + downloadedBytes += chunk.length; + // 重置流超时 + clearTimeout(streamTimeout); + streamTimeout = setTimeout(() => { + if (writer && !writer.destroyed) { + logger.warn(`流传输超时,中断下载: ${filePath}`); + writer.destroy(); + if (abortController) { + abortController.abort(); + } + } + }, downloadConfig.timeout + 60000); + }); + + response.data.on('error', (error) => { + clearTimeout(streamTimeout); + logger.error(`下载流错误: ${filePath}`, error); + if (writer && !writer.destroyed) { + writer.destroy(); + } + }); + response.data.pipe(writer); return new Promise((resolve, reject) => { @@ -319,6 +363,7 @@ class FileManager { let abortListener = null; const cleanup = () => { + clearTimeout(streamTimeout); if (writer && !writer.destroyed) { writer.destroy(); } diff --git a/ui/src/services/download.ts b/ui/src/services/download.ts index 2eb4269..80c0da7 100644 --- a/ui/src/services/download.ts +++ b/ui/src/services/download.ts @@ -203,17 +203,44 @@ class DownloadService { eventSource.onmessage = (event) => { try { const data = JSON.parse(event.data); - if (data.type === 'progress') { - onProgress(data.data); + + // 处理不同类型的SSE消息 + if (data.type === 'connected') { + console.log('SSE连接已建立:', data.taskId); + } else if (data.type === 'progress') { + // 新的数据格式:data.task 包含任务信息 + if (data.task) { + onProgress(data.task); + } + } else if (data.type === 'completed') { + // 任务完成 + console.log('任务完成:', data.status); + if (onComplete) { + onComplete(); + } + eventSource.close(); + } else if (data.type === 'timeout') { + // 连接超时 + console.warn('SSE连接超时'); + if (onComplete) { + onComplete(); + } + eventSource.close(); + } else if (data.type === 'heartbeat') { + // 心跳消息,不需要处理 + console.debug('收到SSE心跳'); } else if (data.type === 'complete') { - onProgress(data.data); + // 兼容旧格式 + if (data.data) { + onProgress(data.data); + } if (onComplete) { onComplete(); } eventSource.close(); } } catch (error) { - console.error('解析SSE数据失败:', error); + console.error('解析SSE数据失败:', error, 'Raw data:', event.data); } }; diff --git a/ui/src/stores/download.ts b/ui/src/stores/download.ts index eac1e94..cc000f5 100644 --- a/ui/src/stores/download.ts +++ b/ui/src/stores/download.ts @@ -133,15 +133,21 @@ export const useDownloadStore = defineStore('download', () => { console.log('开始SSE监听任务进度:', taskId); - // 添加超时处理 + // 添加超时处理 - 增加到60秒以匹配后端 const timeoutId = setTimeout(() => { console.warn('SSE连接超时,关闭连接:', taskId); stopTaskStreaming(taskId); - }, 30000); // 30秒超时 + }, 60000); // 60秒超时 const closeConnection = downloadService.streamTaskProgress( taskId, (task) => { + // 验证task对象是否有效 + if (!task || !task.id) { + console.error('收到无效的任务数据:', task); + return; + } + // console.log('收到SSE进度更新:', { // taskId, // status: task.status,