21
This commit is contained in:
parent
2eaa6cea90
commit
3270c2df45
1
.kiro/specs/redis-report-queue/.config.kiro
Normal file
1
.kiro/specs/redis-report-queue/.config.kiro
Normal file
|
|
@ -0,0 +1 @@
|
|||
{"specId": "bff807b8-2170-4b09-839e-8f37001a6135", "workflowType": "requirements-first", "specType": "feature"}
|
||||
472
.kiro/specs/redis-report-queue/design.md
Normal file
472
.kiro/specs/redis-report-queue/design.md
Normal file
|
|
@ -0,0 +1,472 @@
|
|||
# Design Document: Redis 报告生成队列
|
||||
|
||||
## Overview
|
||||
|
||||
本设计将测评报告生成从同步调用模式改为基于 Redis List 的异步队列处理模式。当前 `AssessmentService.SubmitAnswersAsync` 在提交答案后直接 `await _reportGenerationService.GenerateReportAsync(recordId)` 同步等待报告生成,导致用户请求被阻塞约 30 秒。
|
||||
|
||||
改造后的架构采用生产者-消费者模式:
|
||||
- **生产者**:`AssessmentService` 提交答案后,将任务消息 LPUSH 到 Redis List `report:queue`,立即返回响应
|
||||
- **消费者**:`ReportQueueConsumer`(BackgroundService)通过 BRPOP 串行消费队列任务,调用 `ReportGenerationService` 生成报告
|
||||
- **重试机制**:失败任务按指数退避重新入队(10s/30s/60s),超过 3 次进入死信队列 `report:queue:dead`,记录状态更新为 5(生成失败)
|
||||
- **后台管理**:管理员可对状态为 3(生成中)或 5(生成失败)的记录手动触发单条/批量重新生成
|
||||
|
||||
### 设计决策
|
||||
|
||||
| 决策 | 选择 | 理由 |
|
||||
|------|------|------|
|
||||
| 队列实现 | Redis List (LPUSH/BRPOP) | 项目已有 Redis 基础设施(StackExchange.Redis),无需引入 RabbitMQ 等额外中间件 |
|
||||
| 消费模式 | 单进程串行消费 | 报告生成涉及数据库事务,串行处理避免并发冲突,当前业务量不需要并行消费 |
|
||||
| 重试策略 | 指数退避 + 死信队列 | 平衡临时性错误恢复与永久性错误隔离 |
|
||||
| 消费者宿主 | API 进程内 BackgroundService | 避免独立部署消费者进程,简化运维;API 进程已有完整的 DI 容器和数据库连接 |
|
||||
| 消息格式 | JSON 序列化 | 简单直观,与项目现有 JSON 序列化方式一致 |
|
||||
|
||||
## Architecture
|
||||
|
||||
```mermaid
|
||||
flowchart TB
|
||||
subgraph 小程序端
|
||||
A[用户提交答案]
|
||||
end
|
||||
|
||||
subgraph API 进程
|
||||
B[AssessmentService.SubmitAnswersAsync]
|
||||
C[IRedisService.ListLeftPushAsync]
|
||||
D[ReportQueueConsumer<br/>BackgroundService]
|
||||
E[ReportGenerationService]
|
||||
end
|
||||
|
||||
subgraph Redis
|
||||
F[report:queue<br/>Redis List]
|
||||
G[report:queue:dead<br/>Dead Letter Queue]
|
||||
end
|
||||
|
||||
subgraph 数据库
|
||||
H[(assessment_records<br/>Status: 3→4 或 3→5)]
|
||||
end
|
||||
|
||||
subgraph 后台管理
|
||||
I[管理员触发重新生成]
|
||||
J[AssessmentRecordController]
|
||||
end
|
||||
|
||||
A -->|1. 提交答案| B
|
||||
B -->|2. LPUSH 任务消息| C
|
||||
C -->|3. 入队| F
|
||||
B -->|4. 立即返回响应| A
|
||||
D -->|5. BRPOP 消费| F
|
||||
D -->|6. 调用生成| E
|
||||
E -->|7. 写入结果| H
|
||||
D -->|8. 超过重试次数| G
|
||||
D -->|8. 更新状态=5| H
|
||||
I -->|9. 手动触发| J
|
||||
J -->|10. LPUSH 新任务| F
|
||||
```
|
||||
|
||||
### 消息流转时序
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant User as 小程序用户
|
||||
participant API as AssessmentService
|
||||
participant Redis as Redis List
|
||||
participant Consumer as ReportQueueConsumer
|
||||
participant Report as ReportGenerationService
|
||||
participant DB as SQL Server
|
||||
|
||||
User->>API: SubmitAnswersAsync
|
||||
API->>DB: 保存答案, Status=3
|
||||
API->>Redis: LPUSH report:queue {RecordId, RetryCount=0}
|
||||
API-->>User: 返回成功响应
|
||||
|
||||
loop 持续消费
|
||||
Consumer->>Redis: BRPOP report:queue (30s超时)
|
||||
Redis-->>Consumer: 队列消息
|
||||
Consumer->>Report: GenerateReportAsync(recordId)
|
||||
alt 生成成功
|
||||
Report->>DB: 写入结果, Status=4
|
||||
Consumer->>Consumer: 记录成功日志
|
||||
else 生成失败 & RetryCount < 3
|
||||
Consumer->>Consumer: 等待退避时间
|
||||
Consumer->>Redis: LPUSH report:queue {RetryCount+1}
|
||||
else 生成失败 & RetryCount = 3
|
||||
Consumer->>Redis: LPUSH report:queue:dead
|
||||
Consumer->>DB: 更新 Status=5
|
||||
end
|
||||
end
|
||||
```
|
||||
|
||||
## Components and Interfaces
|
||||
|
||||
### 1. IRedisService 扩展(Redis List 操作)
|
||||
|
||||
在现有 `IRedisService` 接口中新增三个 List 操作方法:
|
||||
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// 将值推入列表左端(LPUSH)
|
||||
/// </summary>
|
||||
Task ListLeftPushAsync(string key, string value);
|
||||
|
||||
/// <summary>
|
||||
/// 从列表右端阻塞弹出(BRPOP)
|
||||
/// </summary>
|
||||
Task<string?> ListRightPopAsync(string key, TimeSpan timeout);
|
||||
|
||||
/// <summary>
|
||||
/// 获取列表长度(LLEN)
|
||||
/// </summary>
|
||||
Task<long> ListLengthAsync(string key);
|
||||
```
|
||||
|
||||
`RedisService` 实现遵循现有模式:连接不可用时 `ListLeftPushAsync` 静默返回、`ListRightPopAsync` 返回 null、`ListLengthAsync` 返回 0。
|
||||
|
||||
### 2. ReportQueueMessage(队列消息模型)
|
||||
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// 报告生成队列消息
|
||||
/// </summary>
|
||||
public class ReportQueueMessage
|
||||
{
|
||||
/// <summary>
|
||||
/// 测评记录ID
|
||||
/// </summary>
|
||||
public long RecordId { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 已重试次数
|
||||
/// </summary>
|
||||
public int RetryCount { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 入队时间
|
||||
/// </summary>
|
||||
public DateTime EnqueueTime { get; set; }
|
||||
}
|
||||
```
|
||||
|
||||
位置:`MiAssessment.Core/Models/ReportQueueMessage.cs`
|
||||
|
||||
### 3. ReportQueueProducer(队列生产者)
|
||||
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// 报告生成队列生产者接口
|
||||
/// </summary>
|
||||
public interface IReportQueueProducer
|
||||
{
|
||||
/// <summary>
|
||||
/// 将报告生成任务入队
|
||||
/// </summary>
|
||||
Task EnqueueAsync(long recordId);
|
||||
}
|
||||
```
|
||||
|
||||
位置:
|
||||
- 接口:`MiAssessment.Core/Interfaces/IReportQueueProducer.cs`
|
||||
- 实现:`MiAssessment.Core/Services/ReportQueueProducer.cs`
|
||||
|
||||
职责:构造 `ReportQueueMessage`(RetryCount=0, EnqueueTime=DateTime.Now),序列化为 JSON,调用 `IRedisService.ListLeftPushAsync("report:queue", json)`。
|
||||
|
||||
### 4. ReportQueueConsumer(队列消费者 BackgroundService)
|
||||
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// 报告生成队列消费者
|
||||
/// </summary>
|
||||
public class ReportQueueConsumer : BackgroundService
|
||||
```
|
||||
|
||||
位置:`MiAssessment.Api/BackgroundServices/ReportQueueConsumer.cs`
|
||||
|
||||
职责:
|
||||
- 在 `ExecuteAsync` 中循环调用 `IRedisService.ListRightPopAsync("report:queue", 30s)`
|
||||
- 反序列化消息,通过 `IServiceScopeFactory` 创建 scope 解析 `ReportGenerationService` 并调用 `GenerateReportAsync`
|
||||
- 成功:记录日志
|
||||
- 失败且 RetryCount < 3:按退避时间等待后重新 LPUSH
|
||||
- 失败且 RetryCount = 3:LPUSH 到死信队列,更新记录状态为 5
|
||||
- BRPOP 异常:记录错误日志,等待 5 秒后重新监听
|
||||
|
||||
注册方式:在 `Program.cs` 中 `builder.Services.AddHostedService<ReportQueueConsumer>()`
|
||||
|
||||
### 5. AssessmentService 改造
|
||||
|
||||
修改 `SubmitAnswersAsync` 方法:
|
||||
- 移除对 `ReportGenerationService.GenerateReportAsync` 的直接调用
|
||||
- 改为调用 `IReportQueueProducer.EnqueueAsync(recordId)`
|
||||
- 入队失败时记录错误日志,仍返回成功响应
|
||||
|
||||
### 6. Admin 端重新生成接口
|
||||
|
||||
在 `IAssessmentRecordService` 中新增:
|
||||
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// 重新生成报告
|
||||
/// </summary>
|
||||
Task RegenerateReportAsync(long recordId);
|
||||
|
||||
/// <summary>
|
||||
/// 批量重新生成报告
|
||||
/// </summary>
|
||||
Task<BatchRegenerateResult> BatchRegenerateReportAsync(List<long> recordIds);
|
||||
```
|
||||
|
||||
在 `AssessmentRecordController` 中新增:
|
||||
- `POST /api/admin/assessmentRecord/regenerateReport`:接收 `{ id: long }`
|
||||
- `POST /api/admin/assessmentRecord/batchRegenerateReport`:接收 `{ ids: long[] }`
|
||||
|
||||
Admin 端重新生成逻辑需要通过 Redis 入队,因此 `AssessmentRecordService` 需要注入 `IRedisService` 来执行 LPUSH 操作(Admin.Business 项目已可访问 Business 库)。
|
||||
|
||||
### 7. 状态描述扩展
|
||||
|
||||
- `AssessmentRecordService.StatusNames` 字典新增 `{ 5, "生成失败" }`
|
||||
- `AssessmentService.GetStatusText` 新增 `5 => "生成失败"`
|
||||
- `AssessmentService.GetResultStatusAsync` 对 Status=5 返回描述"报告生成失败,请联系客服"
|
||||
|
||||
## Data Models
|
||||
|
||||
### 队列消息结构(Redis 中存储的 JSON)
|
||||
|
||||
```json
|
||||
{
|
||||
"RecordId": 12345,
|
||||
"RetryCount": 0,
|
||||
"EnqueueTime": "2025-01-15T10:30:00"
|
||||
}
|
||||
```
|
||||
|
||||
### Redis Key 设计
|
||||
|
||||
| Key | 类型 | 说明 |
|
||||
|-----|------|------|
|
||||
| `report:queue` | List | 待处理的报告生成任务队列 |
|
||||
| `report:queue:dead` | List | 超过最大重试次数的死信队列 |
|
||||
|
||||
### 常量定义
|
||||
|
||||
| 常量 | 值 | 说明 |
|
||||
|------|-----|------|
|
||||
| `ReportQueueKey` | `report:queue` | 队列 Redis Key |
|
||||
| `DeadLetterQueueKey` | `report:queue:dead` | 死信队列 Redis Key |
|
||||
| `MaxRetryCount` | `3` | 最大重试次数 |
|
||||
| `BrpopTimeout` | `30s` | BRPOP 超时时间 |
|
||||
| `RetryDelays` | `[10s, 30s, 60s]` | 各次重试的退避等待时间 |
|
||||
| `ErrorRecoveryDelay` | `5s` | BRPOP 异常后的恢复等待时间 |
|
||||
|
||||
### Assessment_Record 状态扩展
|
||||
|
||||
| Status | 说明 | 变更 |
|
||||
|--------|------|------|
|
||||
| 1 | 待测评 | 无变更 |
|
||||
| 2 | 测评中 | 无变更 |
|
||||
| 3 | 生成中 | 无变更(入队后保持此状态) |
|
||||
| 4 | 已完成 | 无变更(由 ReportGenerationService 更新) |
|
||||
| 5 | 生成失败 | **新增**,超过最大重试次数时由 Consumer 更新 |
|
||||
|
||||
### 请求/响应模型
|
||||
|
||||
#### RegenerateReportRequest
|
||||
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// 重新生成报告请求
|
||||
/// </summary>
|
||||
public class RegenerateReportRequest
|
||||
{
|
||||
/// <summary>
|
||||
/// 测评记录ID
|
||||
/// </summary>
|
||||
public long Id { get; set; }
|
||||
}
|
||||
```
|
||||
|
||||
#### BatchRegenerateReportRequest
|
||||
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// 批量重新生成报告请求
|
||||
/// </summary>
|
||||
public class BatchRegenerateReportRequest
|
||||
{
|
||||
/// <summary>
|
||||
/// 测评记录ID列表
|
||||
/// </summary>
|
||||
public List<long> Ids { get; set; } = new();
|
||||
}
|
||||
```
|
||||
|
||||
#### BatchRegenerateResult
|
||||
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// 批量重新生成结果
|
||||
/// </summary>
|
||||
public class BatchRegenerateResult
|
||||
{
|
||||
/// <summary>
|
||||
/// 成功入队数量
|
||||
/// </summary>
|
||||
public int SuccessCount { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 跳过数量(状态不符或记录不存在)
|
||||
/// </summary>
|
||||
public int SkippedCount { get; set; }
|
||||
}
|
||||
```
|
||||
|
||||
位置:`MiAssessment.Admin.Business/Models/AssessmentRecord/`
|
||||
|
||||
|
||||
## Correctness Properties
|
||||
|
||||
*A property is a characteristic or behavior that should hold true across all valid executions of a system—essentially, a formal statement about what the system should do. Properties serve as the bridge between human-readable specifications and machine-verifiable correctness guarantees.*
|
||||
|
||||
### Property 1: 队列消息序列化/反序列化 round trip
|
||||
|
||||
*For any* valid `ReportQueueMessage`(包含任意正整数 RecordId、RetryCount 在 [0, MaxRetryCount] 范围内、任意合法 EnqueueTime),将其序列化为 JSON 再反序列化,应得到与原始对象等价的 `ReportQueueMessage`。
|
||||
|
||||
**Validates: Requirements 1.1, 2.2**
|
||||
|
||||
### Property 2: Redis List LPUSH/BRPOP round trip
|
||||
|
||||
*For any* 非空字符串 value,对同一个 Redis List key 执行 `ListLeftPushAsync(key, value)` 后,`ListRightPopAsync(key, timeout)` 应返回该 value。对于 N 次 LPUSH 操作,`ListLengthAsync` 应返回 N(在未消费的情况下),且 N 次 BRPOP 应按 FIFO 顺序返回所有 value。
|
||||
|
||||
**Validates: Requirements 4.1, 4.2, 4.3**
|
||||
|
||||
### Property 3: 失败重试递增 RetryCount
|
||||
|
||||
*For any* `ReportQueueMessage` 其 RetryCount 值在 [0, MaxRetryCount) 范围内,当报告生成失败时,重新入队的消息的 RetryCount 应等于原始 RetryCount + 1,且 RecordId 和 EnqueueTime 保持不变。
|
||||
|
||||
**Validates: Requirements 3.1**
|
||||
|
||||
### Property 4: 重新生成重置状态并入队
|
||||
|
||||
*For any* Assessment_Record 其状态为 3(生成中)或 5(生成失败),调用 `RegenerateReportAsync` 后,该记录的状态应被重置为 3,已有的测评结果数据应被清除,且 Report_Queue 中应新增一条 RetryCount=0 的消息。
|
||||
|
||||
**Validates: Requirements 5.2**
|
||||
|
||||
### Property 5: 非法状态拒绝重新生成
|
||||
|
||||
*For any* Assessment_Record 其状态不在 {3, 5} 中(即状态为 1、2 或 4),调用 `RegenerateReportAsync` 应抛出业务异常,且记录状态和测评结果数据保持不变。
|
||||
|
||||
**Validates: Requirements 5.4**
|
||||
|
||||
### Property 6: 批量重新生成按状态过滤
|
||||
|
||||
*For any* 包含混合状态记录的 RecordId 列表,`BatchRegenerateReportAsync` 应仅对状态为 3 或 5 的记录执行重新生成逻辑,其余记录应被跳过且状态不变。
|
||||
|
||||
**Validates: Requirements 6.2**
|
||||
|
||||
### Property 7: 批量操作计数不变量
|
||||
|
||||
*For any* RecordId 列表(长度 > 0),`BatchRegenerateReportAsync` 返回的 `SuccessCount + SkippedCount` 应等于请求的 RecordId 列表长度。
|
||||
|
||||
**Validates: Requirements 6.3**
|
||||
|
||||
## Error Handling
|
||||
|
||||
### 生产者端(AssessmentService)
|
||||
|
||||
| 场景 | 处理方式 |
|
||||
|------|----------|
|
||||
| Redis 连接不可用,LPUSH 失败 | 记录 Error 日志,仍返回成功响应,记录状态保持 3(生成中)。后续可通过后台管理手动触发重新生成 |
|
||||
| 消息序列化异常 | 记录 Error 日志,仍返回成功响应(极端情况,JSON 序列化几乎不会失败) |
|
||||
|
||||
### 消费者端(ReportQueueConsumer)
|
||||
|
||||
| 场景 | 处理方式 |
|
||||
|------|----------|
|
||||
| BRPOP 超时(队列为空) | 正常行为,继续下一轮 BRPOP 循环 |
|
||||
| BRPOP 异常(Redis 连接断开) | 记录 Error 日志,等待 5 秒后重新开始监听 |
|
||||
| 消息反序列化失败(JSON 格式错误) | 记录 Error 日志,丢弃该消息,继续处理下一条 |
|
||||
| 报告生成异常 & RetryCount < MaxRetryCount | 按退避时间等待,RetryCount+1 后重新 LPUSH 到 report:queue |
|
||||
| 报告生成异常 & RetryCount = MaxRetryCount | LPUSH 到 report:queue:dead,更新记录 Status=5,记录 Error 日志 |
|
||||
| 更新 Status=5 时数据库异常 | 记录 Error 日志,消息仍进入死信队列(状态更新失败不影响死信处理) |
|
||||
| CancellationToken 触发(应用关闭) | 优雅退出循环,当前正在处理的消息完成后停止 |
|
||||
|
||||
### 后台管理端
|
||||
|
||||
| 场景 | 处理方式 | 错误码 |
|
||||
|------|----------|--------|
|
||||
| 记录不存在或已软删除 | 返回"测评记录不存在" | 3241 (AssessmentRecordNotFound) |
|
||||
| 记录状态不是 3 或 5 | 返回"当前状态不允许重新生成" | 2005 (InvalidOperation) |
|
||||
| 批量请求 ID 列表为空 | 返回"记录ID列表不能为空" | 1001 (ParamError) |
|
||||
| Redis 入队失败 | 记录 Error 日志,该条记录计入 SkippedCount | - |
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
### 测试框架
|
||||
|
||||
- 单元测试:xUnit + Moq
|
||||
- 属性测试:FsCheck(项目已配置)
|
||||
- 每个属性测试最少运行 100 次迭代
|
||||
|
||||
### 单元测试
|
||||
|
||||
| 测试目标 | 测试内容 |
|
||||
|----------|----------|
|
||||
| ReportQueueProducer.EnqueueAsync | 验证调用 IRedisService.ListLeftPushAsync 的参数正确性 |
|
||||
| ReportQueueConsumer | 验证成功消费后记录日志;验证失败后重试入队;验证超过重试次数进入死信队列 |
|
||||
| AssessmentService.SubmitAnswersAsync | 验证不再直接调用 GenerateReportAsync;验证调用 EnqueueAsync;验证 Redis 失败时仍返回成功 |
|
||||
| AssessmentRecordService.RegenerateReportAsync | 验证状态重置为 3;验证清除已有结果;验证入队新消息;验证不存在记录返回错误;验证非法状态返回错误 |
|
||||
| AssessmentRecordService.BatchRegenerateReportAsync | 验证只处理状态 3/5 的记录;验证返回正确的 SuccessCount/SkippedCount;验证空列表返回错误 |
|
||||
| RedisService List 方法 | 验证连接不可用时的静默降级行为 |
|
||||
| GetStatusText / StatusNames | 验证 Status=5 返回"生成失败" |
|
||||
|
||||
### 属性测试
|
||||
|
||||
每个属性测试必须以注释引用设计文档中的属性编号:
|
||||
|
||||
```csharp
|
||||
/// <summary>
|
||||
/// Feature: redis-report-queue, Property 1: 队列消息序列化/反序列化 round trip
|
||||
/// </summary>
|
||||
[Property(MaxTest = 100)]
|
||||
public Property QueueMessageSerializationRoundTrip() { ... }
|
||||
|
||||
/// <summary>
|
||||
/// Feature: redis-report-queue, Property 2: Redis List LPUSH/BRPOP round trip
|
||||
/// </summary>
|
||||
[Property(MaxTest = 100)]
|
||||
public Property RedisListRoundTrip() { ... }
|
||||
|
||||
/// <summary>
|
||||
/// Feature: redis-report-queue, Property 3: 失败重试递增 RetryCount
|
||||
/// </summary>
|
||||
[Property(MaxTest = 100)]
|
||||
public Property RetryIncrementsRetryCount() { ... }
|
||||
|
||||
/// <summary>
|
||||
/// Feature: redis-report-queue, Property 4: 重新生成重置状态并入队
|
||||
/// </summary>
|
||||
[Property(MaxTest = 100)]
|
||||
public Property RegenerateResetsStatusAndEnqueues() { ... }
|
||||
|
||||
/// <summary>
|
||||
/// Feature: redis-report-queue, Property 5: 非法状态拒绝重新生成
|
||||
/// </summary>
|
||||
[Property(MaxTest = 100)]
|
||||
public Property InvalidStatusRejectsRegeneration() { ... }
|
||||
|
||||
/// <summary>
|
||||
/// Feature: redis-report-queue, Property 6: 批量重新生成按状态过滤
|
||||
/// </summary>
|
||||
[Property(MaxTest = 100)]
|
||||
public Property BatchRegenerateFiltersByStatus() { ... }
|
||||
|
||||
/// <summary>
|
||||
/// Feature: redis-report-queue, Property 7: 批量操作计数不变量
|
||||
/// </summary>
|
||||
[Property(MaxTest = 100)]
|
||||
public Property BatchCountInvariant() { ... }
|
||||
```
|
||||
|
||||
### 测试数据生成策略
|
||||
|
||||
- **RecordId**:正整数范围 [1, long.MaxValue]
|
||||
- **RetryCount**:[0, MaxRetryCount] 范围内的整数
|
||||
- **EnqueueTime**:合理的 DateTime 范围
|
||||
- **Status**:[1, 5] 范围内的整数,用于测试状态过滤逻辑
|
||||
- **RecordId 列表**:长度 [1, 50] 的随机正整数列表,用于批量操作测试
|
||||
97
.kiro/specs/redis-report-queue/requirements.md
Normal file
97
.kiro/specs/redis-report-queue/requirements.md
Normal file
|
|
@ -0,0 +1,97 @@
|
|||
# Requirements Document
|
||||
|
||||
## Introduction
|
||||
|
||||
将测评报告生成从当前的同步调用模式改为基于 Redis List 的异步队列处理模式。当前 `AssessmentService.SubmitAnswersAsync` 在提交答案后直接 `await _reportGenerationService.GenerateReportAsync(recordId)` 同步生成报告,导致用户请求被阻塞、失败后无重试机制、状态永久卡在"生成中"(Status=3)。本功能引入 Redis 队列(LPUSH/BRPOP)+ BackgroundService 消费者模式,实现异步生成、失败自动重试、后台管理手动触发重新生成。
|
||||
|
||||
## Glossary
|
||||
|
||||
- **Queue_Producer**: 将报告生成任务推入 Redis 队列的组件,负责构造队列消息并执行 LPUSH 操作
|
||||
- **Queue_Consumer**: 运行在 API 进程中的 BackgroundService,通过 BRPOP 从 Redis 队列中消费任务并调用报告生成服务
|
||||
- **Report_Queue**: Redis List 数据结构,Key 为 `report:queue`,存储待生成报告的任务消息(JSON 格式)
|
||||
- **Dead_Letter_Queue**: Redis List 数据结构,Key 为 `report:queue:dead`,存储超过最大重试次数仍失败的任务消息
|
||||
- **Queue_Message**: 队列中的 JSON 消息体,包含 RecordId(测评记录ID)、RetryCount(已重试次数)、EnqueueTime(入队时间)等字段
|
||||
- **ReportGenerationService**: 现有的报告生成服务,位于 `MiAssessment.Core/Services/`,核心方法 `GenerateReportAsync(long recordId)`
|
||||
- **AssessmentService**: 现有的测评服务,位于 `MiAssessment.Core/Services/`,`SubmitAnswersAsync` 方法中触发报告生成
|
||||
- **Admin_Record_Controller**: 后台管理系统的测评记录控制器,位于 `MiAssessment.Admin.Business/Controllers/AssessmentRecordController.cs`
|
||||
- **Assessment_Record**: 测评记录实体,Status 字段含义:1=待测评,2=测评中,3=生成中,4=已完成,5=生成失败
|
||||
- **Max_Retry_Count**: 最大重试次数常量,值为 3
|
||||
- **IRedisService**: 现有的 Redis 服务接口,位于 `MiAssessment.Core/Interfaces/`,需扩展 List 操作方法
|
||||
|
||||
## Requirements
|
||||
|
||||
### Requirement 1: 异步入队替代同步生成
|
||||
|
||||
**User Story:** As a 小程序用户, I want 提交测评答案后立即得到响应, so that 不需要等待报告生成完成就能继续使用小程序。
|
||||
|
||||
#### Acceptance Criteria
|
||||
|
||||
1. WHEN 用户提交测评答案成功, THE Queue_Producer SHALL 将包含 RecordId、RetryCount=0、EnqueueTime 的 Queue_Message 序列化为 JSON 并通过 LPUSH 推入 Report_Queue
|
||||
2. WHEN 用户提交测评答案成功, THE AssessmentService SHALL 在入队完成后立即返回 SubmitAnswersResponse,Assessment_Record 状态保持为 3(生成中)
|
||||
3. IF Redis 连接不可用导致入队失败, THEN THE AssessmentService SHALL 记录错误日志并仍然返回成功响应,Assessment_Record 状态保持为 3(生成中)
|
||||
|
||||
### Requirement 2: 队列消费与报告生成
|
||||
|
||||
**User Story:** As a 系统运维人员, I want 后台服务自动消费队列中的报告生成任务, so that 报告能在后台异步生成而不阻塞用户请求。
|
||||
|
||||
#### Acceptance Criteria
|
||||
|
||||
1. WHEN API 应用程序启动, THE Queue_Consumer SHALL 作为 BackgroundService 自动启动并持续监听 Report_Queue
|
||||
2. WHEN Report_Queue 中存在待处理消息, THE Queue_Consumer SHALL 通过 BRPOP 取出消息并调用 ReportGenerationService.GenerateReportAsync 生成报告
|
||||
3. WHEN 报告生成成功完成, THE Queue_Consumer SHALL 记录成功日志,Assessment_Record 状态由 ReportGenerationService 更新为 4(已完成)
|
||||
4. WHILE Queue_Consumer 正在处理一条消息, THE Queue_Consumer SHALL 在处理完成后再取出下一条消息(串行处理)
|
||||
5. IF Queue_Consumer 在 BRPOP 等待过程中发生异常, THEN THE Queue_Consumer SHALL 记录错误日志并在等待 5 秒后重新开始监听
|
||||
|
||||
### Requirement 3: 失败重试机制
|
||||
|
||||
**User Story:** As a 系统运维人员, I want 报告生成失败后自动重试, so that 临时性错误不会导致报告永久无法生成。
|
||||
|
||||
#### Acceptance Criteria
|
||||
|
||||
1. WHEN 报告生成抛出异常且 Queue_Message 的 RetryCount 小于 Max_Retry_Count, THE Queue_Consumer SHALL 将 RetryCount 加 1 后重新 LPUSH 到 Report_Queue 尾部
|
||||
2. WHEN 重新入队前, THE Queue_Consumer SHALL 按照 RetryCount 计算等待时间(第1次重试等待 10 秒,第2次等待 30 秒,第3次等待 60 秒)后再执行 LPUSH
|
||||
3. WHEN 报告生成抛出异常且 Queue_Message 的 RetryCount 等于 Max_Retry_Count, THE Queue_Consumer SHALL 将该消息 LPUSH 到 Dead_Letter_Queue 并更新 Assessment_Record 状态为 5(生成失败)
|
||||
4. WHEN 消息进入 Dead_Letter_Queue, THE Queue_Consumer SHALL 记录包含 RecordId、RetryCount、异常信息的错误日志
|
||||
|
||||
### Requirement 4: Redis List 操作扩展
|
||||
|
||||
**User Story:** As a 开发人员, I want IRedisService 接口支持 List 操作, so that 队列的生产和消费可以通过统一的 Redis 服务接口完成。
|
||||
|
||||
#### Acceptance Criteria
|
||||
|
||||
1. THE IRedisService SHALL 提供 ListLeftPushAsync(string key, string value) 方法,对应 Redis LPUSH 命令
|
||||
2. THE IRedisService SHALL 提供 ListRightPopAsync(string key, TimeSpan timeout) 方法,对应 Redis BRPOP 命令,在超时时返回 null
|
||||
3. THE IRedisService SHALL 提供 ListLengthAsync(string key) 方法,对应 Redis LLEN 命令
|
||||
4. THE RedisService SHALL 实现上述三个方法,在 Redis 连接不可用时 ListLeftPushAsync 静默返回、ListRightPopAsync 返回 null、ListLengthAsync 返回 0
|
||||
|
||||
### Requirement 5: 后台管理手动触发重新生成
|
||||
|
||||
**User Story:** As a 后台管理员, I want 在后台管理系统中手动触发重新生成报告, so that 卡在"生成中"或"生成失败"状态的测评记录可以重新生成报告。
|
||||
|
||||
#### Acceptance Criteria
|
||||
|
||||
1. WHEN 管理员对状态为 3(生成中)或 5(生成失败)的 Assessment_Record 触发重新生成, THE Admin_Record_Controller SHALL 提供 POST `/api/admin/assessmentRecord/regenerateReport` 接口接收 RecordId
|
||||
2. WHEN 重新生成请求合法, THE Admin_Record_Controller 对应的服务 SHALL 将 Assessment_Record 状态重置为 3(生成中),清除已有的测评结果数据,并将新的 Queue_Message(RetryCount=0)LPUSH 到 Report_Queue
|
||||
3. IF 指定的 Assessment_Record 不存在或已被软删除, THEN THE Admin_Record_Controller 对应的服务 SHALL 返回错误码和"测评记录不存在"提示
|
||||
4. IF 指定的 Assessment_Record 状态不是 3 或 5, THEN THE Admin_Record_Controller 对应的服务 SHALL 返回错误码和"当前状态不允许重新生成"提示
|
||||
|
||||
### Requirement 6: 批量重新生成
|
||||
|
||||
**User Story:** As a 后台管理员, I want 批量触发多条卡住的测评记录重新生成报告, so that 历史遗留的"生成中"状态记录可以一次性处理。
|
||||
|
||||
#### Acceptance Criteria
|
||||
|
||||
1. THE Admin_Record_Controller SHALL 提供 POST `/api/admin/assessmentRecord/batchRegenerateReport` 接口接收 RecordId 列表
|
||||
2. WHEN 批量重新生成请求合法, THE Admin_Record_Controller 对应的服务 SHALL 对每条符合条件(状态为 3 或 5)的 Assessment_Record 执行与单条重新生成相同的逻辑
|
||||
3. WHEN 批量操作完成, THE Admin_Record_Controller 对应的服务 SHALL 返回成功入队数量和跳过数量(因状态不符或记录不存在而跳过)
|
||||
4. IF RecordId 列表为空, THEN THE Admin_Record_Controller 对应的服务 SHALL 返回错误码和"记录ID列表不能为空"提示
|
||||
|
||||
### Requirement 7: 生成失败状态支持
|
||||
|
||||
**User Story:** As a 开发人员, I want Assessment_Record 支持"生成失败"状态, so that 超过最大重试次数的记录有明确的失败标识。
|
||||
|
||||
#### Acceptance Criteria
|
||||
|
||||
1. THE Assessment_Record SHALL 支持 Status=5 表示"生成失败"状态
|
||||
2. WHEN 小程序用户查询状态为 5 的 Assessment_Record, THE AssessmentService SHALL 返回"报告生成失败,请联系客服"的状态描述
|
||||
3. WHEN 后台管理员查询测评记录列表, THE Admin_Record_Controller 对应的服务 SHALL 在状态筛选中支持 Status=5(生成失败)的筛选条件
|
||||
181
.kiro/specs/redis-report-queue/tasks.md
Normal file
181
.kiro/specs/redis-report-queue/tasks.md
Normal file
|
|
@ -0,0 +1,181 @@
|
|||
# Implementation Plan: Redis 报告生成队列
|
||||
|
||||
## Overview
|
||||
|
||||
将测评报告生成从同步调用改为 Redis List 异步队列模式。按照自底向上的顺序实现:先扩展 Redis 基础设施,再构建队列生产者/消费者,然后改造现有服务,最后添加后台管理接口。每一步都在前一步基础上构建,确保无孤立代码。
|
||||
|
||||
## Tasks
|
||||
|
||||
- [ ] 1. 扩展 IRedisService 接口与 RedisService 实现(List 操作)
|
||||
- [ ] 1.1 在 `IRedisService` 中新增 `ListLeftPushAsync`、`ListRightPopAsync`、`ListLengthAsync` 三个方法签名
|
||||
- 文件:`MiAssessment.Core/Interfaces/IRedisService.cs`
|
||||
- `ListLeftPushAsync(string key, string value)` 对应 LPUSH
|
||||
- `ListRightPopAsync(string key, TimeSpan timeout)` 对应 BRPOP,超时返回 null
|
||||
- `ListLengthAsync(string key)` 对应 LLEN
|
||||
- _Requirements: 4.1, 4.2, 4.3_
|
||||
|
||||
- [ ] 1.2 在 `RedisService` 中实现三个 List 方法
|
||||
- 文件:`MiAssessment.Infrastructure/Cache/RedisService.cs`
|
||||
- 使用 StackExchange.Redis 的 `ListLeftPushAsync`、`ListRightPopAsync`(通过 `ExecuteAsync("BRPOP", ...)`)、`ListLengthAsync`
|
||||
- 连接不可用时:`ListLeftPushAsync` 静默返回、`ListRightPopAsync` 返回 null、`ListLengthAsync` 返回 0
|
||||
- _Requirements: 4.4_
|
||||
|
||||
- [ ]* 1.3 编写 RedisService List 方法的单元测试
|
||||
- 测试连接不可用时的静默降级行为
|
||||
- _Requirements: 4.4_
|
||||
|
||||
- [ ]* 1.4 编写属性测试:Redis List LPUSH/BRPOP round trip
|
||||
- **Property 2: Redis List LPUSH/BRPOP round trip**
|
||||
- **Validates: Requirements 4.1, 4.2, 4.3**
|
||||
|
||||
- [ ] 2. 创建队列消息模型与生产者
|
||||
- [ ] 2.1 创建 `ReportQueueMessage` 模型类
|
||||
- 文件:`MiAssessment.Core/Models/ReportQueueMessage.cs`
|
||||
- 包含 `RecordId`(long)、`RetryCount`(int)、`EnqueueTime`(DateTime)属性
|
||||
- 添加 XML 注释
|
||||
- _Requirements: 1.1_
|
||||
|
||||
- [ ]* 2.2 编写属性测试:队列消息序列化/反序列化 round trip
|
||||
- **Property 1: 队列消息序列化/反序列化 round trip**
|
||||
- **Validates: Requirements 1.1, 2.2**
|
||||
|
||||
- [ ] 2.3 创建 `IReportQueueProducer` 接口和 `ReportQueueProducer` 实现
|
||||
- 接口文件:`MiAssessment.Core/Interfaces/IReportQueueProducer.cs`
|
||||
- 实现文件:`MiAssessment.Core/Services/ReportQueueProducer.cs`
|
||||
- `EnqueueAsync(long recordId)` 方法:构造 `ReportQueueMessage`(RetryCount=0, EnqueueTime=DateTime.Now),序列化为 JSON,调用 `IRedisService.ListLeftPushAsync("report:queue", json)`
|
||||
- 定义常量 `ReportQueueKey = "report:queue"`
|
||||
- _Requirements: 1.1_
|
||||
|
||||
- [ ] 2.4 在 Autofac `ServiceModule` 中注册 `IReportQueueProducer`
|
||||
- 文件:`MiAssessment.Infrastructure/Modules/ServiceModule.cs`
|
||||
- 使用 `InstancePerLifetimeScope` 生命周期
|
||||
- _Requirements: 1.1_
|
||||
|
||||
- [ ]* 2.5 编写 ReportQueueProducer 的单元测试
|
||||
- 验证调用 `IRedisService.ListLeftPushAsync` 的参数正确性(key 为 `report:queue`,value 为包含正确 RecordId 和 RetryCount=0 的 JSON)
|
||||
- _Requirements: 1.1_
|
||||
|
||||
- [ ] 3. Checkpoint - 确保基础设施层编译通过
|
||||
- 确保所有测试通过,ask the user if questions arise.
|
||||
|
||||
- [ ] 4. 创建 ReportQueueConsumer(BackgroundService 消费者)
|
||||
- [ ] 4.1 创建 `ReportQueueConsumer` 类
|
||||
- 文件:`MiAssessment.Api/BackgroundServices/ReportQueueConsumer.cs`
|
||||
- 继承 `BackgroundService`,注入 `IRedisService`、`IServiceScopeFactory`、`ILogger<ReportQueueConsumer>`
|
||||
- 在 `ExecuteAsync` 中循环调用 `ListRightPopAsync("report:queue", 30s)`
|
||||
- 反序列化消息,通过 scope 解析 `ReportGenerationService` 调用 `GenerateReportAsync`
|
||||
- 定义常量:`MaxRetryCount=3`、`DeadLetterQueueKey="report:queue:dead"`、`RetryDelays=[10s, 30s, 60s]`、`ErrorRecoveryDelay=5s`
|
||||
- _Requirements: 2.1, 2.2, 2.3, 2.4, 2.5_
|
||||
|
||||
- [ ] 4.2 实现失败重试与死信队列逻辑
|
||||
- 失败且 RetryCount < MaxRetryCount:按退避时间等待后 RetryCount+1 重新 LPUSH 到 `report:queue`
|
||||
- 失败且 RetryCount = MaxRetryCount:LPUSH 到 `report:queue:dead`,更新记录 Status=5
|
||||
- 消息反序列化失败:记录错误日志,丢弃消息
|
||||
- BRPOP 异常:记录错误日志,等待 5 秒后重新监听
|
||||
- _Requirements: 3.1, 3.2, 3.3, 3.4_
|
||||
|
||||
- [ ] 4.3 在 `Program.cs` 中注册 `ReportQueueConsumer`
|
||||
- 文件:`MiAssessment.Api/Program.cs`
|
||||
- 使用 `builder.Services.AddHostedService<ReportQueueConsumer>()`
|
||||
- _Requirements: 2.1_
|
||||
|
||||
- [ ]* 4.4 编写 ReportQueueConsumer 的单元测试
|
||||
- 验证成功消费后记录日志
|
||||
- 验证失败后重试入队(RetryCount 递增)
|
||||
- 验证超过重试次数进入死信队列并更新状态为 5
|
||||
- _Requirements: 2.2, 2.3, 3.1, 3.2, 3.3, 3.4_
|
||||
|
||||
- [ ]* 4.5 编写属性测试:失败重试递增 RetryCount
|
||||
- **Property 3: 失败重试递增 RetryCount**
|
||||
- **Validates: Requirements 3.1**
|
||||
|
||||
- [ ] 5. 改造 AssessmentService(同步改异步入队)
|
||||
- [ ] 5.1 修改 `AssessmentService.SubmitAnswersAsync`
|
||||
- 文件:`MiAssessment.Core/Services/AssessmentService.cs`
|
||||
- 注入 `IReportQueueProducer`
|
||||
- 移除对 `ReportGenerationService.GenerateReportAsync` 的直接 await 调用
|
||||
- 改为调用 `IReportQueueProducer.EnqueueAsync(recordId)`
|
||||
- 入队失败时(try-catch)记录错误日志,仍返回成功响应,状态保持 3
|
||||
- _Requirements: 1.1, 1.2, 1.3_
|
||||
|
||||
- [ ]* 5.2 编写 AssessmentService 改造后的单元测试
|
||||
- 验证不再直接调用 `GenerateReportAsync`
|
||||
- 验证调用 `EnqueueAsync`
|
||||
- 验证 Redis 失败时仍返回成功
|
||||
- _Requirements: 1.1, 1.2, 1.3_
|
||||
|
||||
- [ ] 6. 添加生成失败状态支持(Status=5)
|
||||
- [ ] 6.1 扩展状态描述映射
|
||||
- 在 `AssessmentRecordService.StatusNames` 字典中新增 `{ 5, "生成失败" }`
|
||||
- 在 `AssessmentService.GetStatusText` 中新增 `5 => "生成失败"`
|
||||
- 在 `AssessmentService.GetResultStatusAsync` 中对 Status=5 返回描述"报告生成失败,请联系客服"
|
||||
- _Requirements: 7.1, 7.2, 7.3_
|
||||
|
||||
- [ ]* 6.2 编写状态描述的单元测试
|
||||
- 验证 Status=5 返回"生成失败"
|
||||
- 验证 GetResultStatusAsync 对 Status=5 返回正确描述
|
||||
- _Requirements: 7.1, 7.2_
|
||||
|
||||
- [ ] 7. Checkpoint - 确保核心队列功能完整
|
||||
- 确保所有测试通过,ask the user if questions arise.
|
||||
|
||||
- [ ] 8. 实现后台管理端重新生成接口
|
||||
- [ ] 8.1 创建请求/响应 DTO 模型
|
||||
- 文件目录:`MiAssessment.Admin.Business/Models/AssessmentRecord/`
|
||||
- 创建 `RegenerateReportRequest`(包含 `Id` 字段)
|
||||
- 创建 `BatchRegenerateReportRequest`(包含 `Ids` 列表字段)
|
||||
- 创建 `BatchRegenerateResult`(包含 `SuccessCount` 和 `SkippedCount` 字段)
|
||||
- _Requirements: 5.1, 6.1_
|
||||
|
||||
- [ ] 8.2 在 `IAssessmentRecordService` 中新增接口方法
|
||||
- 文件:`MiAssessment.Admin.Business/Services/Interfaces/IAssessmentRecordService.cs`
|
||||
- 新增 `RegenerateReportAsync(long recordId)` 方法
|
||||
- 新增 `BatchRegenerateReportAsync(List<long> recordIds)` 方法,返回 `BatchRegenerateResult`
|
||||
- _Requirements: 5.1, 6.1_
|
||||
|
||||
- [ ] 8.3 在 `AssessmentRecordService` 中实现重新生成逻辑
|
||||
- 文件:`MiAssessment.Admin.Business/Services/AssessmentRecordService.cs`
|
||||
- 注入 `IRedisService`
|
||||
- `RegenerateReportAsync`:校验记录存在且状态为 3 或 5,重置状态为 3,清除已有测评结果数据,构造 `ReportQueueMessage`(RetryCount=0)LPUSH 入队
|
||||
- `BatchRegenerateReportAsync`:遍历 ID 列表,对每条符合条件的记录执行重新生成逻辑,统计 SuccessCount 和 SkippedCount
|
||||
- 错误处理:记录不存在返回错误码 3241,状态不符返回错误码 2005,空列表返回错误码 1001
|
||||
- _Requirements: 5.2, 5.3, 5.4, 6.2, 6.3, 6.4_
|
||||
|
||||
- [ ] 8.4 在 `AssessmentRecordController` 中新增两个接口
|
||||
- 文件:`MiAssessment.Admin.Business/Controllers/AssessmentRecordController.cs`
|
||||
- `POST /api/admin/assessmentRecord/regenerateReport`:接收 `RegenerateReportRequest`,调用 `RegenerateReportAsync`
|
||||
- `POST /api/admin/assessmentRecord/batchRegenerateReport`:接收 `BatchRegenerateReportRequest`,调用 `BatchRegenerateReportAsync`
|
||||
- _Requirements: 5.1, 6.1_
|
||||
|
||||
- [ ]* 8.5 编写 AssessmentRecordService 重新生成逻辑的单元测试
|
||||
- 验证状态重置为 3、清除已有结果、入队新消息
|
||||
- 验证不存在记录返回错误
|
||||
- 验证非法状态返回错误
|
||||
- _Requirements: 5.2, 5.3, 5.4_
|
||||
|
||||
- [ ]* 8.6 编写属性测试:重新生成重置状态并入队
|
||||
- **Property 4: 重新生成重置状态并入队**
|
||||
- **Validates: Requirements 5.2**
|
||||
|
||||
- [ ]* 8.7 编写属性测试:非法状态拒绝重新生成
|
||||
- **Property 5: 非法状态拒绝重新生成**
|
||||
- **Validates: Requirements 5.4**
|
||||
|
||||
- [ ]* 8.8 编写属性测试:批量重新生成按状态过滤
|
||||
- **Property 6: 批量重新生成按状态过滤**
|
||||
- **Validates: Requirements 6.2**
|
||||
|
||||
- [ ]* 8.9 编写属性测试:批量操作计数不变量
|
||||
- **Property 7: 批量操作计数不变量**
|
||||
- **Validates: Requirements 6.3**
|
||||
|
||||
- [ ] 9. Final checkpoint - 确保所有功能完整集成
|
||||
- 确保所有测试通过,ask the user if questions arise.
|
||||
|
||||
## Notes
|
||||
|
||||
- Tasks marked with `*` are optional and can be skipped for faster MVP
|
||||
- Each task references specific requirements for traceability
|
||||
- Checkpoints ensure incremental validation
|
||||
- Property tests validate universal correctness properties from the design document
|
||||
- 所有代码使用 C#(.NET 10),遵循项目现有的 Autofac DI 和 RPC 风格 API 规范
|
||||
Loading…
Reference in New Issue
Block a user