17 KiB
Design Document: Redis 报告生成队列
Overview
本设计将测评报告生成从同步调用模式改为基于 Redis List 的异步队列处理模式。当前 AssessmentService.SubmitAnswersAsync 在提交答案后直接 await _reportGenerationService.GenerateReportAsync(recordId) 同步等待报告生成,导致用户请求被阻塞约 30 秒。
改造后的架构采用生产者-消费者模式:
- 生产者:
AssessmentService提交答案后,将任务消息 LPUSH 到 Redis Listreport:queue,立即返回响应 - 消费者:
ReportQueueConsumer(BackgroundService)通过 BRPOP 串行消费队列任务,调用ReportGenerationService生成报告 - 重试机制:失败任务按指数退避重新入队(10s/30s/60s),超过 3 次进入死信队列
report:queue:dead,记录状态更新为 5(生成失败) - 后台管理:管理员可对状态为 3(生成中)或 5(生成失败)的记录手动触发单条/批量重新生成
设计决策
| 决策 | 选择 | 理由 |
|---|---|---|
| 队列实现 | Redis List (LPUSH/BRPOP) | 项目已有 Redis 基础设施(StackExchange.Redis),无需引入 RabbitMQ 等额外中间件 |
| 消费模式 | 单进程串行消费 | 报告生成涉及数据库事务,串行处理避免并发冲突,当前业务量不需要并行消费 |
| 重试策略 | 指数退避 + 死信队列 | 平衡临时性错误恢复与永久性错误隔离 |
| 消费者宿主 | API 进程内 BackgroundService | 避免独立部署消费者进程,简化运维;API 进程已有完整的 DI 容器和数据库连接 |
| 消息格式 | JSON 序列化 | 简单直观,与项目现有 JSON 序列化方式一致 |
Architecture
flowchart TB
subgraph 小程序端
A[用户提交答案]
end
subgraph API 进程
B[AssessmentService.SubmitAnswersAsync]
C[IRedisService.ListLeftPushAsync]
D[ReportQueueConsumer<br/>BackgroundService]
E[ReportGenerationService]
end
subgraph Redis
F[report:queue<br/>Redis List]
G[report:queue:dead<br/>Dead Letter Queue]
end
subgraph 数据库
H[(assessment_records<br/>Status: 3→4 或 3→5)]
end
subgraph 后台管理
I[管理员触发重新生成]
J[AssessmentRecordController]
end
A -->|1. 提交答案| B
B -->|2. LPUSH 任务消息| C
C -->|3. 入队| F
B -->|4. 立即返回响应| A
D -->|5. BRPOP 消费| F
D -->|6. 调用生成| E
E -->|7. 写入结果| H
D -->|8. 超过重试次数| G
D -->|8. 更新状态=5| H
I -->|9. 手动触发| J
J -->|10. LPUSH 新任务| F
消息流转时序
sequenceDiagram
participant User as 小程序用户
participant API as AssessmentService
participant Redis as Redis List
participant Consumer as ReportQueueConsumer
participant Report as ReportGenerationService
participant DB as SQL Server
User->>API: SubmitAnswersAsync
API->>DB: 保存答案, Status=3
API->>Redis: LPUSH report:queue {RecordId, RetryCount=0}
API-->>User: 返回成功响应
loop 持续消费
Consumer->>Redis: BRPOP report:queue (30s超时)
Redis-->>Consumer: 队列消息
Consumer->>Report: GenerateReportAsync(recordId)
alt 生成成功
Report->>DB: 写入结果, Status=4
Consumer->>Consumer: 记录成功日志
else 生成失败 & RetryCount < 3
Consumer->>Consumer: 等待退避时间
Consumer->>Redis: LPUSH report:queue {RetryCount+1}
else 生成失败 & RetryCount = 3
Consumer->>Redis: LPUSH report:queue:dead
Consumer->>DB: 更新 Status=5
end
end
Components and Interfaces
1. IRedisService 扩展(Redis List 操作)
在现有 IRedisService 接口中新增三个 List 操作方法:
/// <summary>
/// 将值推入列表左端(LPUSH)
/// </summary>
Task ListLeftPushAsync(string key, string value);
/// <summary>
/// 从列表右端阻塞弹出(BRPOP)
/// </summary>
Task<string?> ListRightPopAsync(string key, TimeSpan timeout);
/// <summary>
/// 获取列表长度(LLEN)
/// </summary>
Task<long> ListLengthAsync(string key);
RedisService 实现遵循现有模式:连接不可用时 ListLeftPushAsync 静默返回、ListRightPopAsync 返回 null、ListLengthAsync 返回 0。
2. ReportQueueMessage(队列消息模型)
/// <summary>
/// 报告生成队列消息
/// </summary>
public class ReportQueueMessage
{
/// <summary>
/// 测评记录ID
/// </summary>
public long RecordId { get; set; }
/// <summary>
/// 已重试次数
/// </summary>
public int RetryCount { get; set; }
/// <summary>
/// 入队时间
/// </summary>
public DateTime EnqueueTime { get; set; }
}
位置:MiAssessment.Core/Models/ReportQueueMessage.cs
3. ReportQueueProducer(队列生产者)
/// <summary>
/// 报告生成队列生产者接口
/// </summary>
public interface IReportQueueProducer
{
/// <summary>
/// 将报告生成任务入队
/// </summary>
Task EnqueueAsync(long recordId);
}
位置:
- 接口:
MiAssessment.Core/Interfaces/IReportQueueProducer.cs - 实现:
MiAssessment.Core/Services/ReportQueueProducer.cs
职责:构造 ReportQueueMessage(RetryCount=0, EnqueueTime=DateTime.Now),序列化为 JSON,调用 IRedisService.ListLeftPushAsync("report:queue", json)。
4. ReportQueueConsumer(队列消费者 BackgroundService)
/// <summary>
/// 报告生成队列消费者
/// </summary>
public class ReportQueueConsumer : BackgroundService
位置:MiAssessment.Api/BackgroundServices/ReportQueueConsumer.cs
职责:
- 在
ExecuteAsync中循环调用IRedisService.ListRightPopAsync("report:queue", 30s) - 反序列化消息,通过
IServiceScopeFactory创建 scope 解析ReportGenerationService并调用GenerateReportAsync - 成功:记录日志
- 失败且 RetryCount < 3:按退避时间等待后重新 LPUSH
- 失败且 RetryCount = 3:LPUSH 到死信队列,更新记录状态为 5
- BRPOP 异常:记录错误日志,等待 5 秒后重新监听
注册方式:在 Program.cs 中 builder.Services.AddHostedService<ReportQueueConsumer>()
5. AssessmentService 改造
修改 SubmitAnswersAsync 方法:
- 移除对
ReportGenerationService.GenerateReportAsync的直接调用 - 改为调用
IReportQueueProducer.EnqueueAsync(recordId) - 入队失败时记录错误日志,仍返回成功响应
6. Admin 端重新生成接口
在 IAssessmentRecordService 中新增:
/// <summary>
/// 重新生成报告
/// </summary>
Task RegenerateReportAsync(long recordId);
/// <summary>
/// 批量重新生成报告
/// </summary>
Task<BatchRegenerateResult> BatchRegenerateReportAsync(List<long> recordIds);
在 AssessmentRecordController 中新增:
POST /api/admin/assessmentRecord/regenerateReport:接收{ id: long }POST /api/admin/assessmentRecord/batchRegenerateReport:接收{ ids: long[] }
Admin 端重新生成逻辑需要通过 Redis 入队,因此 AssessmentRecordService 需要注入 IRedisService 来执行 LPUSH 操作(Admin.Business 项目已可访问 Business 库)。
7. 状态描述扩展
AssessmentRecordService.StatusNames字典新增{ 5, "生成失败" }AssessmentService.GetStatusText新增5 => "生成失败"AssessmentService.GetResultStatusAsync对 Status=5 返回描述"报告生成失败,请联系客服"
Data Models
队列消息结构(Redis 中存储的 JSON)
{
"RecordId": 12345,
"RetryCount": 0,
"EnqueueTime": "2025-01-15T10:30:00"
}
Redis Key 设计
| Key | 类型 | 说明 |
|---|---|---|
report:queue |
List | 待处理的报告生成任务队列 |
report:queue:dead |
List | 超过最大重试次数的死信队列 |
常量定义
| 常量 | 值 | 说明 |
|---|---|---|
ReportQueueKey |
report:queue |
队列 Redis Key |
DeadLetterQueueKey |
report:queue:dead |
死信队列 Redis Key |
MaxRetryCount |
3 |
最大重试次数 |
BrpopTimeout |
30s |
BRPOP 超时时间 |
RetryDelays |
[10s, 30s, 60s] |
各次重试的退避等待时间 |
ErrorRecoveryDelay |
5s |
BRPOP 异常后的恢复等待时间 |
Assessment_Record 状态扩展
| Status | 说明 | 变更 |
|---|---|---|
| 1 | 待测评 | 无变更 |
| 2 | 测评中 | 无变更 |
| 3 | 生成中 | 无变更(入队后保持此状态) |
| 4 | 已完成 | 无变更(由 ReportGenerationService 更新) |
| 5 | 生成失败 | 新增,超过最大重试次数时由 Consumer 更新 |
请求/响应模型
RegenerateReportRequest
/// <summary>
/// 重新生成报告请求
/// </summary>
public class RegenerateReportRequest
{
/// <summary>
/// 测评记录ID
/// </summary>
public long Id { get; set; }
}
BatchRegenerateReportRequest
/// <summary>
/// 批量重新生成报告请求
/// </summary>
public class BatchRegenerateReportRequest
{
/// <summary>
/// 测评记录ID列表
/// </summary>
public List<long> Ids { get; set; } = new();
}
BatchRegenerateResult
/// <summary>
/// 批量重新生成结果
/// </summary>
public class BatchRegenerateResult
{
/// <summary>
/// 成功入队数量
/// </summary>
public int SuccessCount { get; set; }
/// <summary>
/// 跳过数量(状态不符或记录不存在)
/// </summary>
public int SkippedCount { get; set; }
}
位置:MiAssessment.Admin.Business/Models/AssessmentRecord/
Correctness Properties
A property is a characteristic or behavior that should hold true across all valid executions of a system—essentially, a formal statement about what the system should do. Properties serve as the bridge between human-readable specifications and machine-verifiable correctness guarantees.
Property 1: 队列消息序列化/反序列化 round trip
For any valid ReportQueueMessage(包含任意正整数 RecordId、RetryCount 在 [0, MaxRetryCount] 范围内、任意合法 EnqueueTime),将其序列化为 JSON 再反序列化,应得到与原始对象等价的 ReportQueueMessage。
Validates: Requirements 1.1, 2.2
Property 2: Redis List LPUSH/BRPOP round trip
For any 非空字符串 value,对同一个 Redis List key 执行 ListLeftPushAsync(key, value) 后,ListRightPopAsync(key, timeout) 应返回该 value。对于 N 次 LPUSH 操作,ListLengthAsync 应返回 N(在未消费的情况下),且 N 次 BRPOP 应按 FIFO 顺序返回所有 value。
Validates: Requirements 4.1, 4.2, 4.3
Property 3: 失败重试递增 RetryCount
For any ReportQueueMessage 其 RetryCount 值在 [0, MaxRetryCount) 范围内,当报告生成失败时,重新入队的消息的 RetryCount 应等于原始 RetryCount + 1,且 RecordId 和 EnqueueTime 保持不变。
Validates: Requirements 3.1
Property 4: 重新生成重置状态并入队
For any Assessment_Record 其状态为 3(生成中)或 5(生成失败),调用 RegenerateReportAsync 后,该记录的状态应被重置为 3,已有的测评结果数据应被清除,且 Report_Queue 中应新增一条 RetryCount=0 的消息。
Validates: Requirements 5.2
Property 5: 非法状态拒绝重新生成
For any Assessment_Record 其状态不在 {3, 5} 中(即状态为 1、2 或 4),调用 RegenerateReportAsync 应抛出业务异常,且记录状态和测评结果数据保持不变。
Validates: Requirements 5.4
Property 6: 批量重新生成按状态过滤
For any 包含混合状态记录的 RecordId 列表,BatchRegenerateReportAsync 应仅对状态为 3 或 5 的记录执行重新生成逻辑,其余记录应被跳过且状态不变。
Validates: Requirements 6.2
Property 7: 批量操作计数不变量
For any RecordId 列表(长度 > 0),BatchRegenerateReportAsync 返回的 SuccessCount + SkippedCount 应等于请求的 RecordId 列表长度。
Validates: Requirements 6.3
Error Handling
生产者端(AssessmentService)
| 场景 | 处理方式 |
|---|---|
| Redis 连接不可用,LPUSH 失败 | 记录 Error 日志,仍返回成功响应,记录状态保持 3(生成中)。后续可通过后台管理手动触发重新生成 |
| 消息序列化异常 | 记录 Error 日志,仍返回成功响应(极端情况,JSON 序列化几乎不会失败) |
消费者端(ReportQueueConsumer)
| 场景 | 处理方式 |
|---|---|
| BRPOP 超时(队列为空) | 正常行为,继续下一轮 BRPOP 循环 |
| BRPOP 异常(Redis 连接断开) | 记录 Error 日志,等待 5 秒后重新开始监听 |
| 消息反序列化失败(JSON 格式错误) | 记录 Error 日志,丢弃该消息,继续处理下一条 |
| 报告生成异常 & RetryCount < MaxRetryCount | 按退避时间等待,RetryCount+1 后重新 LPUSH 到 report:queue |
| 报告生成异常 & RetryCount = MaxRetryCount | LPUSH 到 report:queue:dead,更新记录 Status=5,记录 Error 日志 |
| 更新 Status=5 时数据库异常 | 记录 Error 日志,消息仍进入死信队列(状态更新失败不影响死信处理) |
| CancellationToken 触发(应用关闭) | 优雅退出循环,当前正在处理的消息完成后停止 |
后台管理端
| 场景 | 处理方式 | 错误码 |
|---|---|---|
| 记录不存在或已软删除 | 返回"测评记录不存在" | 3241 (AssessmentRecordNotFound) |
| 记录状态不是 3 或 5 | 返回"当前状态不允许重新生成" | 2005 (InvalidOperation) |
| 批量请求 ID 列表为空 | 返回"记录ID列表不能为空" | 1001 (ParamError) |
| Redis 入队失败 | 记录 Error 日志,该条记录计入 SkippedCount | - |
Testing Strategy
测试框架
- 单元测试:xUnit + Moq
- 属性测试:FsCheck(项目已配置)
- 每个属性测试最少运行 100 次迭代
单元测试
| 测试目标 | 测试内容 |
|---|---|
| ReportQueueProducer.EnqueueAsync | 验证调用 IRedisService.ListLeftPushAsync 的参数正确性 |
| ReportQueueConsumer | 验证成功消费后记录日志;验证失败后重试入队;验证超过重试次数进入死信队列 |
| AssessmentService.SubmitAnswersAsync | 验证不再直接调用 GenerateReportAsync;验证调用 EnqueueAsync;验证 Redis 失败时仍返回成功 |
| AssessmentRecordService.RegenerateReportAsync | 验证状态重置为 3;验证清除已有结果;验证入队新消息;验证不存在记录返回错误;验证非法状态返回错误 |
| AssessmentRecordService.BatchRegenerateReportAsync | 验证只处理状态 3/5 的记录;验证返回正确的 SuccessCount/SkippedCount;验证空列表返回错误 |
| RedisService List 方法 | 验证连接不可用时的静默降级行为 |
| GetStatusText / StatusNames | 验证 Status=5 返回"生成失败" |
属性测试
每个属性测试必须以注释引用设计文档中的属性编号:
/// <summary>
/// Feature: redis-report-queue, Property 1: 队列消息序列化/反序列化 round trip
/// </summary>
[Property(MaxTest = 100)]
public Property QueueMessageSerializationRoundTrip() { ... }
/// <summary>
/// Feature: redis-report-queue, Property 2: Redis List LPUSH/BRPOP round trip
/// </summary>
[Property(MaxTest = 100)]
public Property RedisListRoundTrip() { ... }
/// <summary>
/// Feature: redis-report-queue, Property 3: 失败重试递增 RetryCount
/// </summary>
[Property(MaxTest = 100)]
public Property RetryIncrementsRetryCount() { ... }
/// <summary>
/// Feature: redis-report-queue, Property 4: 重新生成重置状态并入队
/// </summary>
[Property(MaxTest = 100)]
public Property RegenerateResetsStatusAndEnqueues() { ... }
/// <summary>
/// Feature: redis-report-queue, Property 5: 非法状态拒绝重新生成
/// </summary>
[Property(MaxTest = 100)]
public Property InvalidStatusRejectsRegeneration() { ... }
/// <summary>
/// Feature: redis-report-queue, Property 6: 批量重新生成按状态过滤
/// </summary>
[Property(MaxTest = 100)]
public Property BatchRegenerateFiltersByStatus() { ... }
/// <summary>
/// Feature: redis-report-queue, Property 7: 批量操作计数不变量
/// </summary>
[Property(MaxTest = 100)]
public Property BatchCountInvariant() { ... }
测试数据生成策略
- RecordId:正整数范围 [1, long.MaxValue]
- RetryCount:[0, MaxRetryCount] 范围内的整数
- EnqueueTime:合理的 DateTime 范围
- Status:[1, 5] 范围内的整数,用于测试状态过滤逻辑
- RecordId 列表:长度 [1, 50] 的随机正整数列表,用于批量操作测试