21
This commit is contained in:
parent
3270c2df45
commit
ebb5225b2d
|
|
@ -6,15 +6,15 @@
|
|||
|
||||
## Tasks
|
||||
|
||||
- [ ] 1. 扩展 IRedisService 接口与 RedisService 实现(List 操作)
|
||||
- [ ] 1.1 在 `IRedisService` 中新增 `ListLeftPushAsync`、`ListRightPopAsync`、`ListLengthAsync` 三个方法签名
|
||||
- [x] 1. 扩展 IRedisService 接口与 RedisService 实现(List 操作)
|
||||
- [x] 1.1 在 `IRedisService` 中新增 `ListLeftPushAsync`、`ListRightPopAsync`、`ListLengthAsync` 三个方法签名
|
||||
- 文件:`MiAssessment.Core/Interfaces/IRedisService.cs`
|
||||
- `ListLeftPushAsync(string key, string value)` 对应 LPUSH
|
||||
- `ListRightPopAsync(string key, TimeSpan timeout)` 对应 BRPOP,超时返回 null
|
||||
- `ListLengthAsync(string key)` 对应 LLEN
|
||||
- _Requirements: 4.1, 4.2, 4.3_
|
||||
|
||||
- [ ] 1.2 在 `RedisService` 中实现三个 List 方法
|
||||
- [x] 1.2 在 `RedisService` 中实现三个 List 方法
|
||||
- 文件:`MiAssessment.Infrastructure/Cache/RedisService.cs`
|
||||
- 使用 StackExchange.Redis 的 `ListLeftPushAsync`、`ListRightPopAsync`(通过 `ExecuteAsync("BRPOP", ...)`)、`ListLengthAsync`
|
||||
- 连接不可用时:`ListLeftPushAsync` 静默返回、`ListRightPopAsync` 返回 null、`ListLengthAsync` 返回 0
|
||||
|
|
@ -28,8 +28,8 @@
|
|||
- **Property 2: Redis List LPUSH/BRPOP round trip**
|
||||
- **Validates: Requirements 4.1, 4.2, 4.3**
|
||||
|
||||
- [ ] 2. 创建队列消息模型与生产者
|
||||
- [ ] 2.1 创建 `ReportQueueMessage` 模型类
|
||||
- [x] 2. 创建队列消息模型与生产者
|
||||
- [x] 2.1 创建 `ReportQueueMessage` 模型类
|
||||
- 文件:`MiAssessment.Core/Models/ReportQueueMessage.cs`
|
||||
- 包含 `RecordId`(long)、`RetryCount`(int)、`EnqueueTime`(DateTime)属性
|
||||
- 添加 XML 注释
|
||||
|
|
@ -39,14 +39,14 @@
|
|||
- **Property 1: 队列消息序列化/反序列化 round trip**
|
||||
- **Validates: Requirements 1.1, 2.2**
|
||||
|
||||
- [ ] 2.3 创建 `IReportQueueProducer` 接口和 `ReportQueueProducer` 实现
|
||||
- [x] 2.3 创建 `IReportQueueProducer` 接口和 `ReportQueueProducer` 实现
|
||||
- 接口文件:`MiAssessment.Core/Interfaces/IReportQueueProducer.cs`
|
||||
- 实现文件:`MiAssessment.Core/Services/ReportQueueProducer.cs`
|
||||
- `EnqueueAsync(long recordId)` 方法:构造 `ReportQueueMessage`(RetryCount=0, EnqueueTime=DateTime.Now),序列化为 JSON,调用 `IRedisService.ListLeftPushAsync("report:queue", json)`
|
||||
- 定义常量 `ReportQueueKey = "report:queue"`
|
||||
- _Requirements: 1.1_
|
||||
|
||||
- [ ] 2.4 在 Autofac `ServiceModule` 中注册 `IReportQueueProducer`
|
||||
- [x] 2.4 在 Autofac `ServiceModule` 中注册 `IReportQueueProducer`
|
||||
- 文件:`MiAssessment.Infrastructure/Modules/ServiceModule.cs`
|
||||
- 使用 `InstancePerLifetimeScope` 生命周期
|
||||
- _Requirements: 1.1_
|
||||
|
|
@ -55,11 +55,11 @@
|
|||
- 验证调用 `IRedisService.ListLeftPushAsync` 的参数正确性(key 为 `report:queue`,value 为包含正确 RecordId 和 RetryCount=0 的 JSON)
|
||||
- _Requirements: 1.1_
|
||||
|
||||
- [ ] 3. Checkpoint - 确保基础设施层编译通过
|
||||
- [x] 3. Checkpoint - 确保基础设施层编译通过
|
||||
- 确保所有测试通过,ask the user if questions arise.
|
||||
|
||||
- [ ] 4. 创建 ReportQueueConsumer(BackgroundService 消费者)
|
||||
- [ ] 4.1 创建 `ReportQueueConsumer` 类
|
||||
- [x] 4. 创建 ReportQueueConsumer(BackgroundService 消费者)
|
||||
- [x] 4.1 创建 `ReportQueueConsumer` 类
|
||||
- 文件:`MiAssessment.Api/BackgroundServices/ReportQueueConsumer.cs`
|
||||
- 继承 `BackgroundService`,注入 `IRedisService`、`IServiceScopeFactory`、`ILogger<ReportQueueConsumer>`
|
||||
- 在 `ExecuteAsync` 中循环调用 `ListRightPopAsync("report:queue", 30s)`
|
||||
|
|
@ -67,14 +67,14 @@
|
|||
- 定义常量:`MaxRetryCount=3`、`DeadLetterQueueKey="report:queue:dead"`、`RetryDelays=[10s, 30s, 60s]`、`ErrorRecoveryDelay=5s`
|
||||
- _Requirements: 2.1, 2.2, 2.3, 2.4, 2.5_
|
||||
|
||||
- [ ] 4.2 实现失败重试与死信队列逻辑
|
||||
- [x] 4.2 实现失败重试与死信队列逻辑
|
||||
- 失败且 RetryCount < MaxRetryCount:按退避时间等待后 RetryCount+1 重新 LPUSH 到 `report:queue`
|
||||
- 失败且 RetryCount = MaxRetryCount:LPUSH 到 `report:queue:dead`,更新记录 Status=5
|
||||
- 消息反序列化失败:记录错误日志,丢弃消息
|
||||
- BRPOP 异常:记录错误日志,等待 5 秒后重新监听
|
||||
- _Requirements: 3.1, 3.2, 3.3, 3.4_
|
||||
|
||||
- [ ] 4.3 在 `Program.cs` 中注册 `ReportQueueConsumer`
|
||||
- [x] 4.3 在 `Program.cs` 中注册 `ReportQueueConsumer`
|
||||
- 文件:`MiAssessment.Api/Program.cs`
|
||||
- 使用 `builder.Services.AddHostedService<ReportQueueConsumer>()`
|
||||
- _Requirements: 2.1_
|
||||
|
|
@ -89,8 +89,8 @@
|
|||
- **Property 3: 失败重试递增 RetryCount**
|
||||
- **Validates: Requirements 3.1**
|
||||
|
||||
- [ ] 5. 改造 AssessmentService(同步改异步入队)
|
||||
- [ ] 5.1 修改 `AssessmentService.SubmitAnswersAsync`
|
||||
- [x] 5. 改造 AssessmentService(同步改异步入队)
|
||||
- [x] 5.1 修改 `AssessmentService.SubmitAnswersAsync`
|
||||
- 文件:`MiAssessment.Core/Services/AssessmentService.cs`
|
||||
- 注入 `IReportQueueProducer`
|
||||
- 移除对 `ReportGenerationService.GenerateReportAsync` 的直接 await 调用
|
||||
|
|
@ -104,8 +104,8 @@
|
|||
- 验证 Redis 失败时仍返回成功
|
||||
- _Requirements: 1.1, 1.2, 1.3_
|
||||
|
||||
- [ ] 6. 添加生成失败状态支持(Status=5)
|
||||
- [ ] 6.1 扩展状态描述映射
|
||||
- [x] 6. 添加生成失败状态支持(Status=5)
|
||||
- [x] 6.1 扩展状态描述映射
|
||||
- 在 `AssessmentRecordService.StatusNames` 字典中新增 `{ 5, "生成失败" }`
|
||||
- 在 `AssessmentService.GetStatusText` 中新增 `5 => "生成失败"`
|
||||
- 在 `AssessmentService.GetResultStatusAsync` 中对 Status=5 返回描述"报告生成失败,请联系客服"
|
||||
|
|
@ -116,24 +116,24 @@
|
|||
- 验证 GetResultStatusAsync 对 Status=5 返回正确描述
|
||||
- _Requirements: 7.1, 7.2_
|
||||
|
||||
- [ ] 7. Checkpoint - 确保核心队列功能完整
|
||||
- [x] 7. Checkpoint - 确保核心队列功能完整
|
||||
- 确保所有测试通过,ask the user if questions arise.
|
||||
|
||||
- [ ] 8. 实现后台管理端重新生成接口
|
||||
- [ ] 8.1 创建请求/响应 DTO 模型
|
||||
- [x] 8. 实现后台管理端重新生成接口
|
||||
- [x] 8.1 创建请求/响应 DTO 模型
|
||||
- 文件目录:`MiAssessment.Admin.Business/Models/AssessmentRecord/`
|
||||
- 创建 `RegenerateReportRequest`(包含 `Id` 字段)
|
||||
- 创建 `BatchRegenerateReportRequest`(包含 `Ids` 列表字段)
|
||||
- 创建 `BatchRegenerateResult`(包含 `SuccessCount` 和 `SkippedCount` 字段)
|
||||
- _Requirements: 5.1, 6.1_
|
||||
|
||||
- [ ] 8.2 在 `IAssessmentRecordService` 中新增接口方法
|
||||
- [x] 8.2 在 `IAssessmentRecordService` 中新增接口方法
|
||||
- 文件:`MiAssessment.Admin.Business/Services/Interfaces/IAssessmentRecordService.cs`
|
||||
- 新增 `RegenerateReportAsync(long recordId)` 方法
|
||||
- 新增 `BatchRegenerateReportAsync(List<long> recordIds)` 方法,返回 `BatchRegenerateResult`
|
||||
- _Requirements: 5.1, 6.1_
|
||||
|
||||
- [ ] 8.3 在 `AssessmentRecordService` 中实现重新生成逻辑
|
||||
- [x] 8.3 在 `AssessmentRecordService` 中实现重新生成逻辑
|
||||
- 文件:`MiAssessment.Admin.Business/Services/AssessmentRecordService.cs`
|
||||
- 注入 `IRedisService`
|
||||
- `RegenerateReportAsync`:校验记录存在且状态为 3 或 5,重置状态为 3,清除已有测评结果数据,构造 `ReportQueueMessage`(RetryCount=0)LPUSH 入队
|
||||
|
|
@ -141,7 +141,7 @@
|
|||
- 错误处理:记录不存在返回错误码 3241,状态不符返回错误码 2005,空列表返回错误码 1001
|
||||
- _Requirements: 5.2, 5.3, 5.4, 6.2, 6.3, 6.4_
|
||||
|
||||
- [ ] 8.4 在 `AssessmentRecordController` 中新增两个接口
|
||||
- [x] 8.4 在 `AssessmentRecordController` 中新增两个接口
|
||||
- 文件:`MiAssessment.Admin.Business/Controllers/AssessmentRecordController.cs`
|
||||
- `POST /api/admin/assessmentRecord/regenerateReport`:接收 `RegenerateReportRequest`,调用 `RegenerateReportAsync`
|
||||
- `POST /api/admin/assessmentRecord/batchRegenerateReport`:接收 `BatchRegenerateReportRequest`,调用 `BatchRegenerateReportAsync`
|
||||
|
|
@ -169,7 +169,7 @@
|
|||
- **Property 7: 批量操作计数不变量**
|
||||
- **Validates: Requirements 6.3**
|
||||
|
||||
- [ ] 9. Final checkpoint - 确保所有功能完整集成
|
||||
- [x] 9. Final checkpoint - 确保所有功能完整集成
|
||||
- 确保所有测试通过,ask the user if questions arise.
|
||||
|
||||
## Notes
|
||||
|
|
|
|||
|
|
@ -130,4 +130,58 @@ public class AssessmentRecordController : BusinessControllerBase
|
|||
return Error(ErrorCodes.SystemError, "导出测评记录失败");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 重新生成报告
|
||||
/// </summary>
|
||||
/// <param name="request">重新生成请求</param>
|
||||
/// <returns>操作结果</returns>
|
||||
[HttpPost("regenerateReport")]
|
||||
[BusinessPermission(BusinessPermissions.AssessmentRecord.View)]
|
||||
public async Task<IActionResult> RegenerateReport([FromBody] RegenerateReportRequest request)
|
||||
{
|
||||
if (request.Id <= 0)
|
||||
{
|
||||
return ValidationError("测评记录ID无效");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await _assessmentRecordService.RegenerateReportAsync(request.Id);
|
||||
return Ok();
|
||||
}
|
||||
catch (BusinessException ex)
|
||||
{
|
||||
return Error(ex.Code, ex.Message);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
return Error(ErrorCodes.SystemError, "重新生成报告失败");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 批量重新生成报告
|
||||
/// </summary>
|
||||
/// <param name="request">批量重新生成请求</param>
|
||||
/// <returns>批量操作结果</returns>
|
||||
[HttpPost("batchRegenerateReport")]
|
||||
[BusinessPermission(BusinessPermissions.AssessmentRecord.View)]
|
||||
public async Task<IActionResult> BatchRegenerateReport([FromBody] BatchRegenerateReportRequest request)
|
||||
{
|
||||
try
|
||||
{
|
||||
var result = await _assessmentRecordService.BatchRegenerateReportAsync(request.Ids);
|
||||
return Ok(result);
|
||||
}
|
||||
catch (BusinessException ex)
|
||||
{
|
||||
return Error(ex.Code, ex.Message);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
return Error(ErrorCodes.SystemError, "批量重新生成报告失败");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,12 @@
|
|||
namespace MiAssessment.Admin.Business.Models.AssessmentRecord;
|
||||
|
||||
/// <summary>
|
||||
/// 批量重新生成报告请求
|
||||
/// </summary>
|
||||
public class BatchRegenerateReportRequest
|
||||
{
|
||||
/// <summary>
|
||||
/// 测评记录ID列表
|
||||
/// </summary>
|
||||
public List<long> Ids { get; set; } = new();
|
||||
}
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
namespace MiAssessment.Admin.Business.Models.AssessmentRecord;
|
||||
|
||||
/// <summary>
|
||||
/// 批量重新生成结果
|
||||
/// </summary>
|
||||
public class BatchRegenerateResult
|
||||
{
|
||||
/// <summary>
|
||||
/// 成功入队数量
|
||||
/// </summary>
|
||||
public int SuccessCount { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 跳过数量(状态不符或记录不存在)
|
||||
/// </summary>
|
||||
public int SkippedCount { get; set; }
|
||||
}
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
namespace MiAssessment.Admin.Business.Models.AssessmentRecord;
|
||||
|
||||
/// <summary>
|
||||
/// 重新生成报告请求
|
||||
/// </summary>
|
||||
public class RegenerateReportRequest
|
||||
{
|
||||
/// <summary>
|
||||
/// 测评记录ID
|
||||
/// </summary>
|
||||
public long Id { get; set; }
|
||||
}
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
using System.Text.Json;
|
||||
using ClosedXML.Excel;
|
||||
using MiAssessment.Admin.Business.Data;
|
||||
using MiAssessment.Admin.Business.Entities;
|
||||
|
|
@ -5,6 +6,9 @@ using MiAssessment.Admin.Business.Models;
|
|||
using MiAssessment.Admin.Business.Models.AssessmentRecord;
|
||||
using MiAssessment.Admin.Business.Models.Common;
|
||||
using MiAssessment.Admin.Business.Services.Interfaces;
|
||||
using MiAssessment.Core.Interfaces;
|
||||
using MiAssessment.Core.Models;
|
||||
using MiAssessment.Core.Services;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
|
|
@ -17,6 +21,7 @@ public class AssessmentRecordService : IAssessmentRecordService
|
|||
{
|
||||
private readonly AdminBusinessDbContext _dbContext;
|
||||
private readonly ILogger<AssessmentRecordService> _logger;
|
||||
private readonly IRedisService _redisService;
|
||||
|
||||
/// <summary>
|
||||
/// 状态名称映射
|
||||
|
|
@ -26,7 +31,8 @@ public class AssessmentRecordService : IAssessmentRecordService
|
|||
{ 1, "待测评" },
|
||||
{ 2, "测评中" },
|
||||
{ 3, "生成中" },
|
||||
{ 4, "已完成" }
|
||||
{ 4, "已完成" },
|
||||
{ 5, "生成失败" }
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
|
|
@ -56,12 +62,15 @@ public class AssessmentRecordService : IAssessmentRecordService
|
|||
/// </summary>
|
||||
/// <param name="dbContext">数据库上下文</param>
|
||||
/// <param name="logger">日志记录器</param>
|
||||
/// <param name="redisService">Redis服务</param>
|
||||
public AssessmentRecordService(
|
||||
AdminBusinessDbContext dbContext,
|
||||
ILogger<AssessmentRecordService> logger)
|
||||
ILogger<AssessmentRecordService> logger,
|
||||
IRedisService redisService)
|
||||
{
|
||||
_dbContext = dbContext;
|
||||
_logger = logger;
|
||||
_redisService = redisService;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
|
|
@ -456,6 +465,136 @@ public class AssessmentRecordService : IAssessmentRecordService
|
|||
return stream.ToArray();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task RegenerateReportAsync(long recordId)
|
||||
{
|
||||
// 查找记录,过滤软删除
|
||||
var record = await _dbContext.AssessmentRecords
|
||||
.FirstOrDefaultAsync(r => r.Id == recordId && !r.IsDeleted);
|
||||
|
||||
if (record == null)
|
||||
{
|
||||
throw new BusinessException(ErrorCodes.AssessmentRecordNotFound, "测评记录不存在");
|
||||
}
|
||||
|
||||
// 校验状态:仅允许状态为 3(生成中)或 5(生成失败)的记录重新生成
|
||||
if (record.Status != 3 && record.Status != 5)
|
||||
{
|
||||
throw new BusinessException(ErrorCodes.InvalidOperation, "当前状态不允许重新生成");
|
||||
}
|
||||
|
||||
// 重置状态为 3(生成中)
|
||||
record.Status = 3;
|
||||
record.UpdateTime = DateTime.Now;
|
||||
|
||||
// 清除已有的测评结果数据
|
||||
var existingResults = await _dbContext.AssessmentResults
|
||||
.Where(r => r.RecordId == recordId)
|
||||
.ToListAsync();
|
||||
|
||||
if (existingResults.Count > 0)
|
||||
{
|
||||
_dbContext.AssessmentResults.RemoveRange(existingResults);
|
||||
}
|
||||
|
||||
// 构造队列消息并入队
|
||||
var message = new ReportQueueMessage
|
||||
{
|
||||
RecordId = recordId,
|
||||
RetryCount = 0,
|
||||
EnqueueTime = DateTime.Now
|
||||
};
|
||||
|
||||
var json = JsonSerializer.Serialize(message);
|
||||
await _redisService.ListLeftPushAsync(ReportQueueProducer.ReportQueueKey, json);
|
||||
|
||||
await _dbContext.SaveChangesAsync();
|
||||
|
||||
_logger.LogInformation("重新生成报告已入队,记录ID: {RecordId}", recordId);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<BatchRegenerateResult> BatchRegenerateReportAsync(List<long> recordIds)
|
||||
{
|
||||
// 校验参数
|
||||
if (recordIds == null || recordIds.Count == 0)
|
||||
{
|
||||
throw new BusinessException(ErrorCodes.ParamError, "记录ID列表不能为空");
|
||||
}
|
||||
|
||||
var successCount = 0;
|
||||
var skippedCount = 0;
|
||||
|
||||
foreach (var recordId in recordIds)
|
||||
{
|
||||
try
|
||||
{
|
||||
// 查找记录,过滤软删除
|
||||
var record = await _dbContext.AssessmentRecords
|
||||
.FirstOrDefaultAsync(r => r.Id == recordId && !r.IsDeleted);
|
||||
|
||||
// 记录不存在,跳过
|
||||
if (record == null)
|
||||
{
|
||||
_logger.LogWarning("批量重新生成:记录不存在,ID: {RecordId}", recordId);
|
||||
skippedCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// 状态不符,跳过
|
||||
if (record.Status != 3 && record.Status != 5)
|
||||
{
|
||||
_logger.LogWarning("批量重新生成:状态不符,ID: {RecordId}, 状态: {Status}", recordId, record.Status);
|
||||
skippedCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// 重置状态为 3(生成中)
|
||||
record.Status = 3;
|
||||
record.UpdateTime = DateTime.Now;
|
||||
|
||||
// 清除已有的测评结果数据
|
||||
var existingResults = await _dbContext.AssessmentResults
|
||||
.Where(r => r.RecordId == recordId)
|
||||
.ToListAsync();
|
||||
|
||||
if (existingResults.Count > 0)
|
||||
{
|
||||
_dbContext.AssessmentResults.RemoveRange(existingResults);
|
||||
}
|
||||
|
||||
// 构造队列消息并入队
|
||||
var message = new ReportQueueMessage
|
||||
{
|
||||
RecordId = recordId,
|
||||
RetryCount = 0,
|
||||
EnqueueTime = DateTime.Now
|
||||
};
|
||||
|
||||
var json = JsonSerializer.Serialize(message);
|
||||
await _redisService.ListLeftPushAsync(ReportQueueProducer.ReportQueueKey, json);
|
||||
|
||||
successCount++;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "批量重新生成:处理失败,ID: {RecordId}", recordId);
|
||||
skippedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
// 统一保存所有变更
|
||||
await _dbContext.SaveChangesAsync();
|
||||
|
||||
_logger.LogInformation("批量重新生成完成,成功: {SuccessCount}, 跳过: {SkippedCount}", successCount, skippedCount);
|
||||
|
||||
return new BatchRegenerateResult
|
||||
{
|
||||
SuccessCount = successCount,
|
||||
SkippedCount = skippedCount
|
||||
};
|
||||
}
|
||||
|
||||
#region 私有方法
|
||||
|
||||
/// <summary>
|
||||
|
|
|
|||
|
|
@ -35,4 +35,17 @@ public interface IAssessmentRecordService
|
|||
/// <param name="request">查询请求</param>
|
||||
/// <returns>Excel文件字节数组</returns>
|
||||
Task<byte[]> ExportRecordsAsync(AssessmentRecordQueryRequest request);
|
||||
|
||||
/// <summary>
|
||||
/// 重新生成报告
|
||||
/// </summary>
|
||||
/// <param name="recordId">测评记录ID</param>
|
||||
Task RegenerateReportAsync(long recordId);
|
||||
|
||||
/// <summary>
|
||||
/// 批量重新生成报告
|
||||
/// </summary>
|
||||
/// <param name="recordIds">测评记录ID列表</param>
|
||||
/// <returns>批量操作结果</returns>
|
||||
Task<BatchRegenerateResult> BatchRegenerateReportAsync(List<long> recordIds);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,246 @@
|
|||
using System.Text.Json;
|
||||
using MiAssessment.Core.Interfaces;
|
||||
using MiAssessment.Core.Models;
|
||||
using MiAssessment.Core.Services;
|
||||
using MiAssessment.Model.Data;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace MiAssessment.Api.BackgroundServices;
|
||||
|
||||
/// <summary>
|
||||
/// 报告生成队列消费者
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// 作为 BackgroundService 运行,通过 BRPOP 从 Redis 队列中串行消费报告生成任务。
|
||||
/// 支持失败自动重试(指数退避)和死信队列机制。
|
||||
/// </remarks>
|
||||
public class ReportQueueConsumer : BackgroundService
|
||||
{
|
||||
/// <summary>
|
||||
/// 最大重试次数
|
||||
/// </summary>
|
||||
private const int MaxRetryCount = 3;
|
||||
|
||||
/// <summary>
|
||||
/// 死信队列 Redis Key
|
||||
/// </summary>
|
||||
private const string DeadLetterQueueKey = "report:queue:dead";
|
||||
|
||||
/// <summary>
|
||||
/// 各次重试的退避等待时间
|
||||
/// </summary>
|
||||
private static readonly TimeSpan[] RetryDelays = new[]
|
||||
{
|
||||
TimeSpan.FromSeconds(10),
|
||||
TimeSpan.FromSeconds(30),
|
||||
TimeSpan.FromSeconds(60)
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// BRPOP 异常后的恢复等待时间
|
||||
/// </summary>
|
||||
private static readonly TimeSpan ErrorRecoveryDelay = TimeSpan.FromSeconds(5);
|
||||
|
||||
/// <summary>
|
||||
/// BRPOP 超时时间
|
||||
/// </summary>
|
||||
private static readonly TimeSpan BrpopTimeout = TimeSpan.FromSeconds(30);
|
||||
|
||||
private readonly IRedisService _redisService;
|
||||
private readonly IServiceScopeFactory _serviceScopeFactory;
|
||||
private readonly ILogger<ReportQueueConsumer> _logger;
|
||||
|
||||
/// <summary>
|
||||
/// 构造函数
|
||||
/// </summary>
|
||||
/// <param name="redisService">Redis 服务</param>
|
||||
/// <param name="serviceScopeFactory">服务作用域工厂</param>
|
||||
/// <param name="logger">日志记录器</param>
|
||||
public ReportQueueConsumer(
|
||||
IRedisService redisService,
|
||||
IServiceScopeFactory serviceScopeFactory,
|
||||
ILogger<ReportQueueConsumer> logger)
|
||||
{
|
||||
_redisService = redisService;
|
||||
_serviceScopeFactory = serviceScopeFactory;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 执行队列消费循环
|
||||
/// </summary>
|
||||
/// <param name="stoppingToken">取消令牌</param>
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("报告生成队列消费者已启动");
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
// 从队列右端阻塞弹出消息
|
||||
var json = await _redisService.ListRightPopAsync(
|
||||
ReportQueueProducer.ReportQueueKey, BrpopTimeout);
|
||||
|
||||
// 超时返回 null,继续下一轮循环
|
||||
if (json == null)
|
||||
continue;
|
||||
|
||||
await ProcessMessageAsync(json, stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
// 应用关闭,优雅退出
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// BRPOP 异常:记录错误日志,等待恢复时间后重新监听
|
||||
_logger.LogError(ex, "队列消费异常,将在 {Delay} 秒后重新监听",
|
||||
ErrorRecoveryDelay.TotalSeconds);
|
||||
|
||||
try
|
||||
{
|
||||
await Task.Delay(ErrorRecoveryDelay, stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation("报告生成队列消费者已停止");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 处理单条队列消息
|
||||
/// </summary>
|
||||
/// <param name="json">消息 JSON 字符串</param>
|
||||
/// <param name="stoppingToken">取消令牌</param>
|
||||
private async Task ProcessMessageAsync(string json, CancellationToken stoppingToken)
|
||||
{
|
||||
// 反序列化消息
|
||||
ReportQueueMessage? message;
|
||||
try
|
||||
{
|
||||
message = JsonSerializer.Deserialize<ReportQueueMessage>(json);
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
// 消息反序列化失败:记录错误日志,丢弃消息
|
||||
_logger.LogError(ex, "队列消息反序列化失败,已丢弃。消息内容: {Json}", json);
|
||||
return;
|
||||
}
|
||||
|
||||
if (message == null)
|
||||
{
|
||||
_logger.LogError("队列消息反序列化结果为 null,已丢弃。消息内容: {Json}", json);
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation("开始处理报告生成任务,RecordId: {RecordId}, RetryCount: {RetryCount}",
|
||||
message.RecordId, message.RetryCount);
|
||||
|
||||
try
|
||||
{
|
||||
// 通过 scope 解析 ReportGenerationService 并调用生成方法
|
||||
using var scope = _serviceScopeFactory.CreateScope();
|
||||
var reportService = scope.ServiceProvider.GetRequiredService<ReportGenerationService>();
|
||||
await reportService.GenerateReportAsync(message.RecordId);
|
||||
|
||||
_logger.LogInformation("报告生成成功,RecordId: {RecordId}", message.RecordId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// 报告生成失败,执行重试或死信逻辑
|
||||
await HandleFailureAsync(message, ex, stoppingToken);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 处理报告生成失败的重试与死信逻辑
|
||||
/// </summary>
|
||||
/// <param name="message">队列消息</param>
|
||||
/// <param name="ex">异常信息</param>
|
||||
/// <param name="stoppingToken">取消令牌</param>
|
||||
private async Task HandleFailureAsync(
|
||||
ReportQueueMessage message, Exception ex, CancellationToken stoppingToken)
|
||||
{
|
||||
if (message.RetryCount < MaxRetryCount)
|
||||
{
|
||||
// 未超过最大重试次数:按退避时间等待后重新入队
|
||||
var delay = RetryDelays[message.RetryCount];
|
||||
|
||||
_logger.LogWarning(ex,
|
||||
"报告生成失败,将在 {Delay} 秒后进行第 {RetryCount} 次重试,RecordId: {RecordId}",
|
||||
delay.TotalSeconds, message.RetryCount + 1, message.RecordId);
|
||||
|
||||
try
|
||||
{
|
||||
await Task.Delay(delay, stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
// 应用关闭,将消息重新入队以便下次启动时处理
|
||||
message.RetryCount++;
|
||||
var json = JsonSerializer.Serialize(message);
|
||||
await _redisService.ListLeftPushAsync(ReportQueueProducer.ReportQueueKey, json);
|
||||
return;
|
||||
}
|
||||
|
||||
// RetryCount 加 1 后重新 LPUSH 到队列
|
||||
message.RetryCount++;
|
||||
var retryJson = JsonSerializer.Serialize(message);
|
||||
await _redisService.ListLeftPushAsync(ReportQueueProducer.ReportQueueKey, retryJson);
|
||||
}
|
||||
else
|
||||
{
|
||||
// 超过最大重试次数:进入死信队列
|
||||
_logger.LogError(ex,
|
||||
"报告生成失败且已达最大重试次数,进入死信队列。RecordId: {RecordId}, RetryCount: {RetryCount}",
|
||||
message.RecordId, message.RetryCount);
|
||||
|
||||
// LPUSH 到死信队列
|
||||
var deadLetterJson = JsonSerializer.Serialize(message);
|
||||
await _redisService.ListLeftPushAsync(DeadLetterQueueKey, deadLetterJson);
|
||||
|
||||
// 更新测评记录状态为 5(生成失败)
|
||||
await UpdateRecordStatusToFailedAsync(message.RecordId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 更新测评记录状态为生成失败(Status=5)
|
||||
/// </summary>
|
||||
/// <param name="recordId">测评记录ID</param>
|
||||
private async Task UpdateRecordStatusToFailedAsync(long recordId)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var scope = _serviceScopeFactory.CreateScope();
|
||||
var dbContext = scope.ServiceProvider.GetRequiredService<MiAssessmentDbContext>();
|
||||
|
||||
var record = await dbContext.AssessmentRecords
|
||||
.FirstOrDefaultAsync(r => r.Id == recordId);
|
||||
|
||||
if (record != null)
|
||||
{
|
||||
record.Status = 5;
|
||||
record.UpdateTime = DateTime.Now;
|
||||
await dbContext.SaveChangesAsync();
|
||||
|
||||
_logger.LogInformation("测评记录状态已更新为生成失败,RecordId: {RecordId}", recordId);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogWarning("更新状态失败,测评记录不存在,RecordId: {RecordId}", recordId);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// 状态更新失败不影响死信处理
|
||||
_logger.LogError(ex, "更新测评记录状态为生成失败时发生异常,RecordId: {RecordId}", recordId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
using Autofac;
|
||||
using Autofac.Extensions.DependencyInjection;
|
||||
|
||||
using MiAssessment.Api.BackgroundServices;
|
||||
using MiAssessment.Api.Filters;
|
||||
using MiAssessment.Core.Mappings;
|
||||
using MiAssessment.Infrastructure.Cache;
|
||||
|
|
@ -112,6 +113,9 @@ try
|
|||
builder.Services.AddSingleton<ICacheService>(sp =>
|
||||
new RedisCacheService(builder.Configuration));
|
||||
|
||||
// 注册报告生成队列消费者
|
||||
builder.Services.AddHostedService<ReportQueueConsumer>();
|
||||
|
||||
// 添加控制器
|
||||
builder.Services.AddControllers(options =>
|
||||
{
|
||||
|
|
|
|||
|
|
@ -44,4 +44,19 @@ public interface IRedisService
|
|||
/// 释放分布式锁
|
||||
/// </summary>
|
||||
Task<bool> ReleaseLockAsync(string key, string value);
|
||||
|
||||
/// <summary>
|
||||
/// 将值推入列表左端(LPUSH)
|
||||
/// </summary>
|
||||
Task ListLeftPushAsync(string key, string value);
|
||||
|
||||
/// <summary>
|
||||
/// 从列表右端阻塞弹出(BRPOP)
|
||||
/// </summary>
|
||||
Task<string?> ListRightPopAsync(string key, TimeSpan timeout);
|
||||
|
||||
/// <summary>
|
||||
/// 获取列表长度(LLEN)
|
||||
/// </summary>
|
||||
Task<long> ListLengthAsync(string key);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,13 @@
|
|||
namespace MiAssessment.Core.Interfaces;
|
||||
|
||||
/// <summary>
|
||||
/// 报告生成队列生产者接口
|
||||
/// </summary>
|
||||
public interface IReportQueueProducer
|
||||
{
|
||||
/// <summary>
|
||||
/// 将报告生成任务入队
|
||||
/// </summary>
|
||||
/// <param name="recordId">测评记录ID</param>
|
||||
Task EnqueueAsync(long recordId);
|
||||
}
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
namespace MiAssessment.Core.Models;
|
||||
|
||||
/// <summary>
|
||||
/// 报告生成队列消息
|
||||
/// </summary>
|
||||
public class ReportQueueMessage
|
||||
{
|
||||
/// <summary>
|
||||
/// 测评记录ID
|
||||
/// </summary>
|
||||
public long RecordId { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 已重试次数
|
||||
/// </summary>
|
||||
public int RetryCount { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 入队时间
|
||||
/// </summary>
|
||||
public DateTime EnqueueTime { get; set; }
|
||||
}
|
||||
|
|
@ -24,22 +24,22 @@ public class AssessmentService : IAssessmentService
|
|||
{
|
||||
private readonly MiAssessmentDbContext _dbContext;
|
||||
private readonly ILogger<AssessmentService> _logger;
|
||||
private readonly ReportGenerationService _reportGenerationService;
|
||||
private readonly IReportQueueProducer _reportQueueProducer;
|
||||
|
||||
/// <summary>
|
||||
/// 构造函数
|
||||
/// </summary>
|
||||
/// <param name="dbContext">数据库上下文</param>
|
||||
/// <param name="logger">日志记录器</param>
|
||||
/// <param name="reportGenerationService">报告生成服务</param>
|
||||
/// <param name="reportQueueProducer">报告生成队列生产者</param>
|
||||
public AssessmentService(
|
||||
MiAssessmentDbContext dbContext,
|
||||
ILogger<AssessmentService> logger,
|
||||
ReportGenerationService reportGenerationService)
|
||||
IReportQueueProducer reportQueueProducer)
|
||||
{
|
||||
_dbContext = dbContext;
|
||||
_logger = logger;
|
||||
_reportGenerationService = reportGenerationService;
|
||||
_reportQueueProducer = reportQueueProducer;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
|
|
@ -113,6 +113,12 @@ public class AssessmentService : IAssessmentService
|
|||
}
|
||||
else
|
||||
{
|
||||
// 对生成失败状态设置提示信息
|
||||
if (record.Status == 5)
|
||||
{
|
||||
record.Message = "报告生成失败,请联系客服";
|
||||
}
|
||||
|
||||
_logger.LogDebug("查询报告状态成功,status: {Status}, isCompleted: {IsCompleted}", record.Status, record.IsCompleted);
|
||||
}
|
||||
|
||||
|
|
@ -289,15 +295,15 @@ public class AssessmentService : IAssessmentService
|
|||
_logger.LogInformation("测评答案提交成功,userId: {UserId}, recordId: {RecordId}, answerCount: {AnswerCount}",
|
||||
userId, request.RecordId, answers.Count);
|
||||
|
||||
// 触发报告生成
|
||||
// 将报告生成任务入队
|
||||
try
|
||||
{
|
||||
await _reportGenerationService.GenerateReportAsync(request.RecordId);
|
||||
await _reportQueueProducer.EnqueueAsync(request.RecordId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "报告生成失败,recordId: {RecordId}", request.RecordId);
|
||||
// 报告生成失败不影响答案提交的返回,状态保持为3(生成中)
|
||||
_logger.LogError(ex, "报告生成任务入队失败,recordId: {RecordId}", request.RecordId);
|
||||
// 入队失败不影响答案提交的返回,状态保持为3(生成中)
|
||||
}
|
||||
|
||||
return new SubmitAnswersResponse
|
||||
|
|
@ -615,6 +621,7 @@ public class AssessmentService : IAssessmentService
|
|||
2 => "测评中",
|
||||
3 => "生成中",
|
||||
4 => "已完成",
|
||||
5 => "生成失败",
|
||||
_ => "未知"
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,37 @@
|
|||
using System.Text.Json;
|
||||
using MiAssessment.Core.Interfaces;
|
||||
using MiAssessment.Core.Models;
|
||||
|
||||
namespace MiAssessment.Core.Services;
|
||||
|
||||
/// <summary>
|
||||
/// 报告生成队列生产者实现
|
||||
/// </summary>
|
||||
public class ReportQueueProducer : IReportQueueProducer
|
||||
{
|
||||
/// <summary>
|
||||
/// 报告生成队列 Redis Key
|
||||
/// </summary>
|
||||
public const string ReportQueueKey = "report:queue";
|
||||
|
||||
private readonly IRedisService _redisService;
|
||||
|
||||
public ReportQueueProducer(IRedisService redisService)
|
||||
{
|
||||
_redisService = redisService;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task EnqueueAsync(long recordId)
|
||||
{
|
||||
var message = new ReportQueueMessage
|
||||
{
|
||||
RecordId = recordId,
|
||||
RetryCount = 0,
|
||||
EnqueueTime = DateTime.Now
|
||||
};
|
||||
|
||||
var json = JsonSerializer.Serialize(message);
|
||||
await _redisService.ListLeftPushAsync(ReportQueueKey, json);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
using MiAssessment.Core.Interfaces;
|
||||
using MiAssessment.Core.Interfaces;
|
||||
|
||||
using Microsoft.Extensions.Configuration;
|
||||
|
||||
|
|
@ -105,6 +105,40 @@ public class RedisService : IRedisService, IDisposable
|
|||
return await _database.KeyDeleteAsync(key);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task ListLeftPushAsync(string key, string value)
|
||||
{
|
||||
if (_database == null || !_isConnected)
|
||||
return;
|
||||
|
||||
await _database.ListLeftPushAsync(key, value);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<string?> ListRightPopAsync(string key, TimeSpan timeout)
|
||||
{
|
||||
if (_database == null || !_isConnected)
|
||||
return null;
|
||||
|
||||
// 使用 BRPOP 命令实现阻塞弹出
|
||||
var result = await _database.ExecuteAsync("BRPOP", key, (int)timeout.TotalSeconds);
|
||||
if (result.IsNull)
|
||||
return null;
|
||||
|
||||
// BRPOP 返回数组:[key, value]
|
||||
var results = (RedisResult[])result!;
|
||||
return results[1].ToString();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<long> ListLengthAsync(string key)
|
||||
{
|
||||
if (_database == null || !_isConnected)
|
||||
return 0;
|
||||
|
||||
return await _database.ListLengthAsync(key);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_connection?.Dispose();
|
||||
|
|
|
|||
|
|
@ -129,6 +129,11 @@ public class ServiceModule : Module
|
|||
return new HomeService(dbContext, logger);
|
||||
}).As<IHomeService>().InstancePerLifetimeScope();
|
||||
|
||||
// ========== 报告生成队列服务注册 ==========
|
||||
|
||||
// 注册报告队列生产者
|
||||
builder.RegisterType<ReportQueueProducer>().As<IReportQueueProducer>().InstancePerLifetimeScope();
|
||||
|
||||
// ========== 小程序测评模块服务注册 ==========
|
||||
|
||||
// 注册报告生成服务
|
||||
|
|
@ -144,8 +149,8 @@ public class ServiceModule : Module
|
|||
{
|
||||
var dbContext = c.Resolve<MiAssessmentDbContext>();
|
||||
var logger = c.Resolve<ILogger<AssessmentService>>();
|
||||
var reportGenerationService = c.Resolve<ReportGenerationService>();
|
||||
return new AssessmentService(dbContext, logger, reportGenerationService);
|
||||
var reportQueueProducer = c.Resolve<IReportQueueProducer>();
|
||||
return new AssessmentService(dbContext, logger, reportQueueProducer);
|
||||
}).As<IAssessmentService>().InstancePerLifetimeScope();
|
||||
|
||||
// ========== 小程序订单模块服务注册 ==========
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ public partial class MiAssessmentDbContext : DbContext
|
|||
public virtual DbSet<AssessmentResult> AssessmentResults { get; set; }
|
||||
|
||||
public virtual DbSet<ReportCategory> ReportCategories { get; set; }
|
||||
S
|
||||
|
||||
public virtual DbSet<QuestionCategoryMapping> QuestionCategoryMappings { get; set; }
|
||||
|
||||
public virtual DbSet<ReportConclusion> ReportConclusions { get; set; }
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ namespace MiAssessment.Model.Models.Assessment;
|
|||
public class ResultStatusDto
|
||||
{
|
||||
/// <summary>
|
||||
/// 状态:1待测评 2测评中 3生成中 4已完成
|
||||
/// 状态:1待测评 2测评中 3生成中 4已完成 5生成失败
|
||||
/// </summary>
|
||||
public int Status { get; set; }
|
||||
|
||||
|
|
@ -14,4 +14,9 @@ public class ResultStatusDto
|
|||
/// 是否已完成
|
||||
/// </summary>
|
||||
public bool IsCompleted { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 状态描述信息
|
||||
/// </summary>
|
||||
public string? Message { get; set; }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,8 @@ public class AssessmentRecordServicePropertyTests
|
|||
{ 1, "待测评" },
|
||||
{ 2, "测评中" },
|
||||
{ 3, "生成中" },
|
||||
{ 4, "已完成" }
|
||||
{ 4, "已完成" },
|
||||
{ 5, "生成失败" }
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user