修复网络波动造成的部分作品阻塞问题

This commit is contained in:
2025-10-06 12:15:41 +08:00
parent 93caf97a0c
commit 480d357fdb
5 changed files with 389 additions and 191 deletions
+175 -95
View File
@@ -690,17 +690,16 @@ router.delete('/files', async (req, res) => {
* SSE端点 - 实时推送下载进度 * SSE端点 - 实时推送下载进度
* GET /api/download/stream/:taskId * GET /api/download/stream/:taskId
*/ */
router.get('/stream/:taskId', async (req, res) => { router.get('/stream/:taskId', (req, res) => {
const { taskId } = req.params; const taskId = req.params.taskId;
if (!taskId) { logger.debug(`SSE连接建立: ${taskId}`, {
return res.status(400).json({ taskId,
success: false, clientIP: req.ip,
error: 'Task ID is required' userAgent: req.get('User-Agent')
}); });
}
// 设置SSE头 // 设置SSE响应
res.writeHead(200, { res.writeHead(200, {
'Content-Type': 'text/event-stream', 'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache', 'Cache-Control': 'no-cache',
@@ -709,120 +708,201 @@ router.get('/stream/:taskId', async (req, res) => {
'Access-Control-Allow-Headers': 'Cache-Control' 'Access-Control-Allow-Headers': 'Cache-Control'
}); });
// 发送初始连接确认
res.write('data: {"type":"connected","taskId":"' + taskId + '"}\n\n');
const downloadService = req.backend.getDownloadService(); const downloadService = req.backend.getDownloadService();
// 创建进度监听器 // 创建进度监听器
let isCleanedUp = false;
const cleanup = () => {
if (!isCleanedUp) {
isCleanedUp = true;
downloadService.removeProgressListener(taskId, progressListener);
logger.debug(`SSE连接已清理: ${taskId}`);
}
};
const progressListener = (task) => { const progressListener = (task) => {
if (task.id === taskId && !isCleanedUp) {
// 使用setImmediate避免阻塞事件循环
setImmediate(() => {
try { try {
// 检查连接是否仍然有效 if (res.writableEnded || res.destroyed || isCleanedUp) {
if (!res.destroyed && !res.writableEnded) { logger.debug(`SSE连接已关闭,跳过发送: ${taskId}`);
res.write(`data: ${JSON.stringify({ return;
type: 'progress', }
data: task
})}\n\n`);
// 如果任务完成,关闭连接 const data = JSON.stringify({
if (['completed', 'failed', 'cancelled', 'partial'].includes(task.status)) { type: 'progress',
res.write(`data: ${JSON.stringify({ task: task
type: 'complete',
data: task
})}\n\n`);
res.end();
cleanup();
}
} else {
// 连接已断开,清理监听器
cleanup();
}
} catch (error) {
logger.error('SSE写入失败:', error);
// 连接可能已断开,清理监听器
cleanup();
try {
if (!res.destroyed) {
res.end();
}
} catch (endError) {
logger.error('关闭SSE连接失败:', endError);
}
}
}); });
res.write(`data: ${data}\n\n`);
logger.debug(`SSE进度更新发送: ${taskId}`, {
status: task.status,
progress: task.progress,
completed: task.completed_files,
failed: task.failed_files
});
} catch (error) {
// 区分正常断开和异常错误
if (isNormalDisconnectError(error)) {
logger.debug(`SSE发送数据时连接正常断开: ${taskId}`, {
code: error.code,
message: error.message
});
} else {
logger.error(`SSE发送数据异常失败: ${taskId}`, {
error: error.message,
taskId
});
}
// 发送失败时移除监听器并清理连接
downloadService.removeProgressListener(taskId, progressListener);
cleanup('progress_send_error');
} }
}; };
// 注册监听器 // 注册进度监听器
const listenerAdded = downloadService.addProgressListener(taskId, progressListener); downloadService.addProgressListener(taskId, progressListener);
if (!listenerAdded) {
logger.error(`无法为任务 ${taskId} 添加监听器,可能已达到限制`);
res.status(503).json({
error: true,
message: 'Too many listeners, please try again later'
});
return;
}
// 立即发送当前状态 // 设置连接超时 - 增加到60秒,并添加心跳机制
const currentTask = downloadService.getTask(taskId); let connectionTimeout;
if (currentTask) { let heartbeatInterval;
try {
res.write(`data: ${JSON.stringify({
type: 'progress',
data: currentTask
})}\n\n`);
} catch (error) {
logger.error('发送初始状态失败:', error);
cleanup();
return;
}
}
// 设置连接超时(30秒) const resetTimeout = () => {
const timeout = setTimeout(() => { if (connectionTimeout) {
logger.warn(`SSE连接超时,主动关闭: ${taskId}`); clearTimeout(connectionTimeout);
cleanup(); }
connectionTimeout = setTimeout(() => {
logger.info(`SSE连接超时,关闭连接: ${taskId}`);
cleanup('connection_timeout');
if (!res.writableEnded && !isCleanedUp) {
try { try {
if (!res.destroyed) { res.write('data: {"type":"timeout"}\n\n');
res.end(); res.end();
} catch (error) {
// 忽略写入已关闭连接的错误
logger.debug(`SSE连接已关闭,无法发送超时消息: ${taskId}`);
}
}
}, 60000); // 60秒超时
};
// 启动心跳机制
heartbeatInterval = setInterval(() => {
try {
if (!res.writableEnded && !res.destroyed && !isCleanedUp) {
res.write('data: {"type":"heartbeat"}\n\n');
resetTimeout(); // 心跳时重置超时
} else {
clearInterval(heartbeatInterval);
heartbeatInterval = null;
} }
} catch (error) { } catch (error) {
logger.error('超时关闭连接失败:', error); // 区分正常断开和异常错误
if (isNormalDisconnectError(error)) {
logger.debug(`SSE心跳发送时连接正常断开: ${taskId}`, {
code: error.code,
message: error.message
});
} else {
logger.warn(`SSE心跳发送异常失败: ${taskId}`, error);
} }
}, 30000); clearInterval(heartbeatInterval);
heartbeatInterval = null;
cleanup('heartbeat_error');
}
}, 15000); // 每15秒发送心跳
resetTimeout();
// 连接状态跟踪
let isCleanedUp = false;
let isNormalDisconnect = false;
// 清理函数
const cleanup = (reason = 'unknown') => {
if (isCleanedUp) {
return; // 避免重复清理
}
isCleanedUp = true;
if (connectionTimeout) {
clearTimeout(connectionTimeout);
connectionTimeout = null;
}
if (heartbeatInterval) {
clearInterval(heartbeatInterval);
heartbeatInterval = null;
}
downloadService.removeProgressListener(taskId, progressListener);
logger.debug(`SSE连接清理完成: ${taskId}`, { reason });
};
// 判断是否为正常断开连接的错误
const isNormalDisconnectError = (error) => {
if (!error) return false;
// 常见的正常断开连接错误码
const normalErrorCodes = [
'ECONNRESET', // 连接被重置
'EPIPE', // 管道破裂
'ECONNABORTED', // 连接被中止
'ECANCELED' // 请求被取消
];
return normalErrorCodes.includes(error.code) ||
error.message === 'aborted' ||
error.message.includes('aborted');
};
// 监听客户端断开连接
req.on('close', () => {
isNormalDisconnect = true;
logger.debug(`SSE客户端断开连接: ${taskId}`);
cleanup('client_close');
});
// 客户端断开连接时清理
req.on('close', cleanup);
req.on('error', (error) => { req.on('error', (error) => {
logger.error('SSE请求错误:', error); if (isNormalDisconnectError(error)) {
cleanup(); // 正常断开连接,使用debug级别日志
logger.debug(`SSE客户端正常断开: ${taskId}`, {
code: error.code,
message: error.message
});
} else {
// 真正的异常错误,使用error级别日志
logger.error(`SSE请求异常错误: ${taskId}`, error);
}
cleanup('request_error');
}); });
// 响应对象错误处理
res.on('error', (error) => { res.on('error', (error) => {
logger.error('SSE响应错误:', error); if (isNormalDisconnectError(error)) {
cleanup(); // 正常断开连接,使用debug级别日志
logger.debug(`SSE响应正常断开: ${taskId}`, {
code: error.code,
message: error.message
}); });
} else {
res.on('finish', () => { // 真正的异常错误,使用error级别日志
clearTimeout(timeout); logger.error(`SSE响应异常错误: ${taskId}`, error);
cleanup(); }
cleanup('response_error');
}); });
res.on('close', () => { res.on('close', () => {
clearTimeout(timeout); logger.debug(`SSE响应关闭: ${taskId}`);
cleanup(); cleanup('response_close');
}); });
// 检查任务状态,如果任务已完成则立即关闭连接
const task = downloadService.getTask(taskId);
if (task && ['completed', 'failed', 'cancelled', 'partial'].includes(task.status)) {
logger.debug(`任务已完成,关闭SSE连接: ${taskId}`, { status: task.status });
setTimeout(() => {
cleanup('task_completed');
if (!res.writableEnded && !isCleanedUp) {
try {
res.write(`data: {"type":"completed","status":"${task.status}"}\n\n`);
res.end();
} catch (error) {
// 忽略写入已关闭连接的错误
logger.debug(`SSE连接已关闭,无法发送完成消息: ${taskId}`);
}
}
}, 1000); // 延迟1秒关闭,确保最后的状态更新被发送
}
}); });
/** /**
+41 -1
View File
@@ -294,6 +294,10 @@ class DownloadExecutor {
const batch = items.slice(i, i + batchSize); const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async item => { const batchPromises = batch.map(async item => {
// 为每个下载添加超时控制,防止单个下载卡住整个批次
return Promise.race([
// 实际下载Promise
(async () => {
try { try {
// 检查是否应该暂停(在每个作品下载前检查) // 检查是否应该暂停(在每个作品下载前检查)
if (this.shouldPause(task.id)) { if (this.shouldPause(task.id)) {
@@ -387,9 +391,45 @@ class DownloadExecutor {
results.push(result); results.push(result);
return result; return result;
} }
})(),
// 超时Promise - 防止单个下载卡住整个批次
new Promise((_, reject) => {
setTimeout(() => {
const artworkId = typeof item === 'object' ? item.id : item;
logger.warn(`作品下载超时,跳过: ${artworkId}`, { taskId: task.id, timeout: '120s' });
reject(new Error(`下载超时: ${artworkId}`));
}, 120000); // 2分钟超时
})
]).catch(error => {
// 处理超时或其他错误
const artworkId = typeof item === 'object' ? item.id : item;
task.failed_files++;
const result = { artwork_id: artworkId, success: false, error: error.message };
results.push(result);
return result;
});
}); });
await Promise.all(batchPromises); // 使用 Promise.allSettled 替代 Promise.all,确保不会因为单个Promise卡住而阻塞整个批次
const batchResults = await Promise.allSettled(batchPromises);
// 处理结果,确保所有Promise都有结果
batchResults.forEach((result, index) => {
if (result.status === 'rejected') {
const artworkId = typeof batch[index] === 'object' ? batch[index].id : batch[index];
logger.error(`批次中的作品处理失败: ${artworkId}`, {
error: result.reason?.message || result.reason,
taskId: task.id
});
// 确保失败的作品也被计入
task.failed_files++;
results.push({
artwork_id: artworkId,
success: false,
error: result.reason?.message || '未知错误'
});
}
});
// 更新进度并通知 // 更新进度并通知
task.progress = Math.round((task.completed_files / task.total_files) * 100); task.progress = Math.round((task.completed_files / task.total_files) * 100);
+46 -1
View File
@@ -307,11 +307,55 @@ class FileManager {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}, },
timeout: downloadConfig.timeout, timeout: downloadConfig.timeout,
signal: abortController ? abortController.signal : undefined signal: abortController ? abortController.signal : undefined,
// 添加连接超时和响应超时配置
httpsAgent: new (require('https')).Agent({
keepAlive: true,
timeout: 60000, // 连接超时60秒
}),
// 添加重试配置
validateStatus: (status) => status < 500, // 只对5xx错误重试
}); });
// 使用增强的写入流创建方法 // 使用增强的写入流创建方法
writer = await FileUtils.safeCreateWriteStream(filePath); writer = await FileUtils.safeCreateWriteStream(filePath);
let downloadedBytes = 0;
const totalBytes = parseInt(response.headers['content-length']) || 0;
// 设置流超时
let streamTimeout = setTimeout(() => {
logger.warn(`流传输超时,中断下载: ${filePath}`);
if (writer && !writer.destroyed) {
writer.destroy();
}
if (abortController) {
abortController.abort();
}
}, downloadConfig.timeout + 60000);
response.data.on('data', (chunk) => {
downloadedBytes += chunk.length;
// 重置流超时
clearTimeout(streamTimeout);
streamTimeout = setTimeout(() => {
if (writer && !writer.destroyed) {
logger.warn(`流传输超时,中断下载: ${filePath}`);
writer.destroy();
if (abortController) {
abortController.abort();
}
}
}, downloadConfig.timeout + 60000);
});
response.data.on('error', (error) => {
clearTimeout(streamTimeout);
logger.error(`下载流错误: ${filePath}`, error);
if (writer && !writer.destroyed) {
writer.destroy();
}
});
response.data.pipe(writer); response.data.pipe(writer);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
@@ -319,6 +363,7 @@ class FileManager {
let abortListener = null; let abortListener = null;
const cleanup = () => { const cleanup = () => {
clearTimeout(streamTimeout);
if (writer && !writer.destroyed) { if (writer && !writer.destroyed) {
writer.destroy(); writer.destroy();
} }
+30 -3
View File
@@ -203,17 +203,44 @@ class DownloadService {
eventSource.onmessage = (event) => { eventSource.onmessage = (event) => {
try { try {
const data = JSON.parse(event.data); const data = JSON.parse(event.data);
if (data.type === 'progress') {
onProgress(data.data); // 处理不同类型的SSE消息
if (data.type === 'connected') {
console.log('SSE连接已建立:', data.taskId);
} else if (data.type === 'progress') {
// 新的数据格式:data.task 包含任务信息
if (data.task) {
onProgress(data.task);
}
} else if (data.type === 'completed') {
// 任务完成
console.log('任务完成:', data.status);
if (onComplete) {
onComplete();
}
eventSource.close();
} else if (data.type === 'timeout') {
// 连接超时
console.warn('SSE连接超时');
if (onComplete) {
onComplete();
}
eventSource.close();
} else if (data.type === 'heartbeat') {
// 心跳消息,不需要处理
console.debug('收到SSE心跳');
} else if (data.type === 'complete') { } else if (data.type === 'complete') {
// 兼容旧格式
if (data.data) {
onProgress(data.data); onProgress(data.data);
}
if (onComplete) { if (onComplete) {
onComplete(); onComplete();
} }
eventSource.close(); eventSource.close();
} }
} catch (error) { } catch (error) {
console.error('解析SSE数据失败:', error); console.error('解析SSE数据失败:', error, 'Raw data:', event.data);
} }
}; };
+8 -2
View File
@@ -133,15 +133,21 @@ export const useDownloadStore = defineStore('download', () => {
console.log('开始SSE监听任务进度:', taskId); console.log('开始SSE监听任务进度:', taskId);
// 添加超时处理 // 添加超时处理 - 增加到60秒以匹配后端
const timeoutId = setTimeout(() => { const timeoutId = setTimeout(() => {
console.warn('SSE连接超时,关闭连接:', taskId); console.warn('SSE连接超时,关闭连接:', taskId);
stopTaskStreaming(taskId); stopTaskStreaming(taskId);
}, 30000); // 30秒超时 }, 60000); // 60秒超时
const closeConnection = downloadService.streamTaskProgress( const closeConnection = downloadService.streamTaskProgress(
taskId, taskId,
(task) => { (task) => {
// 验证task对象是否有效
if (!task || !task.id) {
console.error('收到无效的任务数据:', task);
return;
}
// console.log('收到SSE进度更新:', { // console.log('收到SSE进度更新:', {
// taskId, // taskId,
// status: task.status, // status: task.status,