修复sse监听器没有清理的问题
This commit is contained in:
+88
-24
@@ -712,52 +712,116 @@ router.get('/stream/:taskId', async (req, res) => {
|
|||||||
const downloadService = req.backend.getDownloadService();
|
const downloadService = req.backend.getDownloadService();
|
||||||
|
|
||||||
// 创建进度监听器
|
// 创建进度监听器
|
||||||
|
let isCleanedUp = false;
|
||||||
|
const cleanup = () => {
|
||||||
|
if (!isCleanedUp) {
|
||||||
|
isCleanedUp = true;
|
||||||
|
downloadService.removeProgressListener(taskId, progressListener);
|
||||||
|
logger.debug(`SSE连接已清理: ${taskId}`);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const progressListener = (task) => {
|
const progressListener = (task) => {
|
||||||
if (task.id === taskId) {
|
if (task.id === taskId && !isCleanedUp) {
|
||||||
// 使用setImmediate避免阻塞事件循环
|
// 使用setImmediate避免阻塞事件循环
|
||||||
setImmediate(() => {
|
setImmediate(() => {
|
||||||
try {
|
try {
|
||||||
// 检查连接是否仍然有效
|
// 检查连接是否仍然有效
|
||||||
if (!res.destroyed) {
|
if (!res.destroyed && !res.writableEnded) {
|
||||||
res.write(`data: ${JSON.stringify({
|
res.write(`data: ${JSON.stringify({
|
||||||
type: 'progress',
|
type: 'progress',
|
||||||
data: task
|
data: task
|
||||||
})}\n\n`);
|
})}\n\n`);
|
||||||
|
|
||||||
// 如果任务完成,关闭连接
|
// 如果任务完成,关闭连接
|
||||||
if (['completed', 'failed', 'cancelled', 'partial'].includes(task.status)) {
|
if (['completed', 'failed', 'cancelled', 'partial'].includes(task.status)) {
|
||||||
res.write(`data: ${JSON.stringify({
|
res.write(`data: ${JSON.stringify({
|
||||||
type: 'complete',
|
type: 'complete',
|
||||||
data: task
|
data: task
|
||||||
})}\n\n`);
|
})}\n\n`);
|
||||||
res.end();
|
res.end();
|
||||||
downloadService.removeProgressListener(taskId, progressListener);
|
cleanup();
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// 连接已断开,清理监听器
|
||||||
|
cleanup();
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('SSE写入失败:', error);
|
logger.error('SSE写入失败:', error);
|
||||||
// 连接可能已断开,清理监听器
|
// 连接可能已断开,清理监听器
|
||||||
downloadService.removeProgressListener(taskId, progressListener);
|
cleanup();
|
||||||
|
try {
|
||||||
|
if (!res.destroyed) {
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
} catch (endError) {
|
||||||
|
logger.error('关闭SSE连接失败:', endError);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// 注册监听器
|
// 注册监听器
|
||||||
downloadService.addProgressListener(taskId, progressListener);
|
const listenerAdded = downloadService.addProgressListener(taskId, progressListener);
|
||||||
|
if (!listenerAdded) {
|
||||||
|
logger.error(`无法为任务 ${taskId} 添加监听器,可能已达到限制`);
|
||||||
|
res.status(503).json({
|
||||||
|
error: true,
|
||||||
|
message: 'Too many listeners, please try again later'
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 立即发送当前状态
|
// 立即发送当前状态
|
||||||
const currentTask = downloadService.getTask(taskId);
|
const currentTask = downloadService.getTask(taskId);
|
||||||
if (currentTask) {
|
if (currentTask) {
|
||||||
res.write(`data: ${JSON.stringify({
|
try {
|
||||||
type: 'progress',
|
res.write(`data: ${JSON.stringify({
|
||||||
data: currentTask
|
type: 'progress',
|
||||||
})}\n\n`);
|
data: currentTask
|
||||||
|
})}\n\n`);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('发送初始状态失败:', error);
|
||||||
|
cleanup();
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 设置连接超时(30秒)
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
logger.warn(`SSE连接超时,主动关闭: ${taskId}`);
|
||||||
|
cleanup();
|
||||||
|
try {
|
||||||
|
if (!res.destroyed) {
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('超时关闭连接失败:', error);
|
||||||
|
}
|
||||||
|
}, 30000);
|
||||||
|
|
||||||
// 客户端断开连接时清理
|
// 客户端断开连接时清理
|
||||||
req.on('close', () => {
|
req.on('close', cleanup);
|
||||||
downloadService.removeProgressListener(taskId, progressListener);
|
req.on('error', (error) => {
|
||||||
|
logger.error('SSE请求错误:', error);
|
||||||
|
cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
|
// 响应对象错误处理
|
||||||
|
res.on('error', (error) => {
|
||||||
|
logger.error('SSE响应错误:', error);
|
||||||
|
cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
|
res.on('finish', () => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
|
res.on('close', () => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
cleanup();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -45,6 +45,9 @@ class DownloadService {
|
|||||||
// 先创建下载执行器,稍后在init方法中设置downloadService引用
|
// 先创建下载执行器,稍后在init方法中设置downloadService引用
|
||||||
this.downloadExecutor = new DownloadExecutor(this.fileManager, this.taskManager, this.progressManager, this.historyManager, this);
|
this.downloadExecutor = new DownloadExecutor(this.fileManager, this.taskManager, this.progressManager, this.historyManager, this);
|
||||||
|
|
||||||
|
// 启动监听器定期清理检查
|
||||||
|
this.progressManager.startPeriodicCleanup();
|
||||||
|
|
||||||
this.initialized = false;
|
this.initialized = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -92,7 +95,11 @@ class DownloadService {
|
|||||||
|
|
||||||
// 代理方法 - 进度管理
|
// 代理方法 - 进度管理
|
||||||
addProgressListener(taskId, listener) {
|
addProgressListener(taskId, listener) {
|
||||||
return this.progressManager.addProgressListener(taskId, listener);
|
const result = this.progressManager.addProgressListener(taskId, listener);
|
||||||
|
if (!result) {
|
||||||
|
logger.warn(`添加进度监听器失败: ${taskId}`);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
removeProgressListener(taskId, listener) {
|
removeProgressListener(taskId, listener) {
|
||||||
|
|||||||
@@ -15,16 +15,37 @@ class ProgressManager {
|
|||||||
this.throttleControl = new Map();
|
this.throttleControl = new Map();
|
||||||
// 节流间隔(毫秒)
|
// 节流间隔(毫秒)
|
||||||
this.throttleInterval = 100;
|
this.throttleInterval = 100;
|
||||||
|
// 每个任务的最大监听器数量
|
||||||
|
this.maxListenersPerTask = 10;
|
||||||
|
// 全局最大监听器数量
|
||||||
|
this.maxTotalListeners = 100;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 添加进度监听器
|
* 添加进度监听器
|
||||||
*/
|
*/
|
||||||
addProgressListener(taskId, listener) {
|
addProgressListener(taskId, listener) {
|
||||||
|
// 检查全局监听器数量限制
|
||||||
|
if (this.getTotalListenerCount() >= this.maxTotalListeners) {
|
||||||
|
logger.warn(`全局监听器数量已达上限 (${this.maxTotalListeners}),拒绝添加新监听器`);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (!this.progressListeners.has(taskId)) {
|
if (!this.progressListeners.has(taskId)) {
|
||||||
this.progressListeners.set(taskId, []);
|
this.progressListeners.set(taskId, []);
|
||||||
}
|
}
|
||||||
this.progressListeners.get(taskId).push(listener);
|
|
||||||
|
const listeners = this.progressListeners.get(taskId);
|
||||||
|
|
||||||
|
// 检查单个任务的监听器数量限制
|
||||||
|
if (listeners.length >= this.maxListenersPerTask) {
|
||||||
|
logger.warn(`任务 ${taskId} 的监听器数量已达上限 (${this.maxListenersPerTask}),拒绝添加新监听器`);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
listeners.push(listener);
|
||||||
|
logger.debug(`为任务 ${taskId} 添加监听器,当前数量: ${listeners.length}`);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -36,12 +57,16 @@ class ProgressManager {
|
|||||||
const index = listeners.indexOf(listener);
|
const index = listeners.indexOf(listener);
|
||||||
if (index > -1) {
|
if (index > -1) {
|
||||||
listeners.splice(index, 1);
|
listeners.splice(index, 1);
|
||||||
|
logger.debug(`从任务 ${taskId} 移除监听器,剩余数量: ${listeners.length}`);
|
||||||
}
|
}
|
||||||
if (listeners.length === 0) {
|
if (listeners.length === 0) {
|
||||||
this.progressListeners.delete(taskId);
|
this.progressListeners.delete(taskId);
|
||||||
// 清理节流控制
|
// 清理节流控制
|
||||||
this.throttleControl.delete(taskId);
|
this.throttleControl.delete(taskId);
|
||||||
|
logger.debug(`任务 ${taskId} 的所有监听器已清理`);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
logger.debug(`尝试移除不存在任务 ${taskId} 的监听器`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,8 +150,45 @@ class ProgressManager {
|
|||||||
* 清理所有监听器
|
* 清理所有监听器
|
||||||
*/
|
*/
|
||||||
clearAllListeners() {
|
clearAllListeners() {
|
||||||
|
const totalCount = this.getTotalListenerCount();
|
||||||
this.progressListeners.clear();
|
this.progressListeners.clear();
|
||||||
|
this.throttleControl.clear();
|
||||||
|
logger.info(`已清理所有监听器,共 ${totalCount} 个`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 定期清理超时的监听器(可选功能)
|
||||||
|
*/
|
||||||
|
startPeriodicCleanup(intervalMs = 300000) { // 默认5分钟
|
||||||
|
if (this.cleanupInterval) {
|
||||||
|
clearInterval(this.cleanupInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.cleanupInterval = setInterval(() => {
|
||||||
|
const totalListeners = this.getTotalListenerCount();
|
||||||
|
if (totalListeners > 0) {
|
||||||
|
logger.debug(`定期检查: 当前活跃监听器数量 ${totalListeners}`);
|
||||||
|
|
||||||
|
// 如果监听器数量过多,记录警告
|
||||||
|
if (totalListeners > this.maxTotalListeners * 0.8) {
|
||||||
|
logger.warn(`监听器数量接近上限: ${totalListeners}/${this.maxTotalListeners}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, intervalMs);
|
||||||
|
|
||||||
|
logger.info(`已启动监听器定期清理检查,间隔 ${intervalMs / 1000} 秒`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 停止定期清理
|
||||||
|
*/
|
||||||
|
stopPeriodicCleanup() {
|
||||||
|
if (this.cleanupInterval) {
|
||||||
|
clearInterval(this.cleanupInterval);
|
||||||
|
this.cleanupInterval = null;
|
||||||
|
logger.info('已停止监听器定期清理检查');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = ProgressManager;
|
module.exports = ProgressManager;
|
||||||
Reference in New Issue
Block a user