live-forum/server/webapi/LiveForum/LiveForum.WebApi/BackgroundServices/LikeBatchSyncService.cs
2026-03-24 11:27:37 +08:00

348 lines
14 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using FreeSql;
using LiveForum.Code.Redis.Contract;
using LiveForum.Model;
using LiveForum.Code.Base;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace LiveForum.WebApi.BackgroundServices
{
/// <summary>
/// 点赞批量同步后台服务
/// 从Redis Hash获取待处理操作定时批量同步到数据库
/// 触发条件每1分钟执行一次
/// </summary>
public class LikeBatchSyncService : BackgroundService
{
private readonly IRedisService _redisService;
private readonly IFreeSql _fsql;
private readonly ILogger<LikeBatchSyncService> _logger;
private const string LIKE_PENDING_OPERATIONS_KEY = "like:pending:operations";
private static readonly TimeSpan SYNC_INTERVAL = TimeSpan.FromSeconds(30); // 缩短间隔
private const int MAX_BATCH_SIZE = 100; // 限制批次大小
public LikeBatchSyncService(
IRedisService redisService,
IFreeSql fsql,
ILogger<LikeBatchSyncService> logger)
{
_redisService = redisService;
_fsql = fsql;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("[LikeBatchSync] 点赞批量同步服务已启动");
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 等待同步间隔
await Task.Delay(SYNC_INTERVAL, stoppingToken);
_logger.LogDebug("[LikeBatchSync] 开始定时批量同步");
await ProcessBatchSyncAsync(stoppingToken);
}
catch (OperationCanceledException)
{
// 正常停止
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "[LikeBatchSync] 批量同步过程中发生异常");
// 发生异常后等待30秒再继续
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
}
_logger.LogInformation("[LikeBatchSync] 点赞批量同步服务已停止");
}
/// <summary>
/// 执行批量同步
/// </summary>
private async Task ProcessBatchSyncAsync(CancellationToken stoppingToken)
{
try
{
// 1. 限量获取待处理操作,避免一次性处理过多数据
var (toInsert, toDelete, processedFields) = await GetLimitedOperationsAsync();
if (toInsert.Count == 0 && toDelete.Count == 0)
{
_logger.LogDebug("[LikeBatchSync] 没有待处理的点赞操作");
return;
}
_logger.LogInformation("[LikeBatchSync] 本次处理:插入 {InsertCount},删除 {DeleteCount}", toInsert.Count, toDelete.Count);
// 2. 批量写入数据库
await ProcessDatabaseOperationsAsync(toInsert, toDelete, stoppingToken);
// 3. 清除已处理的待处理记录
if (processedFields.Count > 0)
{
foreach (var field in processedFields)
{
await _redisService.HashDeleteAsync(LIKE_PENDING_OPERATIONS_KEY, field);
}
_logger.LogInformation("[LikeBatchSync] 已清除 {Count} 个已处理的操作记录", processedFields.Count);
}
_logger.LogInformation("[LikeBatchSync] 批量同步完成。插入: {InsertCount}, 删除: {DeleteCount}", toInsert.Count, toDelete.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "[LikeBatchSync] 批量同步处理失败");
throw;
}
}
/// <summary>
/// 限量获取Redis中的操作避免一次性处理过多
/// </summary>
private async Task<(List<(long UserId, int TargetType, long TargetId)> toInsert,
List<(long UserId, int TargetType, long TargetId)> toDelete,
List<string> processedFields)> GetLimitedOperationsAsync()
{
var operationsByKey = new Dictionary<string, (string Action, DateTime OccurredAt)>();
var processedCount = 0;
// 获取所有操作,但限制处理数量
var allOperations = await _redisService.HashGetAllAsync(LIKE_PENDING_OPERATIONS_KEY);
if (allOperations == null || allOperations.Length == 0)
{
return (new List<(long, int, long)>(), new List<(long, int, long)>(), new List<string>());
}
foreach (var entry in allOperations)
{
if (processedCount >= MAX_BATCH_SIZE)
break;
var field = entry.Name.ToString(); // {userId}:{targetType}:{targetId}
var value = entry.Value.ToString();
try
{
var operation = JsonSerializer.Deserialize<OperationInfo>(value, new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true
});
if (operation != null)
{
// 如果已存在,比较时间戳,保留最新的
if (operationsByKey.TryGetValue(field, out var existing))
{
if (operation.OccurredAt > existing.OccurredAt)
{
operationsByKey[field] = (operation.Action, operation.OccurredAt);
}
}
else
{
operationsByKey[field] = (operation.Action, operation.OccurredAt);
}
processedCount++;
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[LikeBatchSync] 解析操作失败。Field: {Field}, Value: {Value}",
field, value);
}
}
// 准备批量插入和删除
var toInsert = new List<(long UserId, int TargetType, long TargetId)>();
var toDelete = new List<(long UserId, int TargetType, long TargetId)>();
var processedFields = new List<string>();
foreach (var kvp in operationsByKey)
{
var field = kvp.Key; // {userId}:{targetType}:{targetId}
var parts = field.Split(':');
if (parts.Length != 3)
{
_logger.LogWarning("[LikeBatchSync] 无效的操作字段格式: {Field}", field);
continue;
}
if (!long.TryParse(parts[0], out var userId) ||
!int.TryParse(parts[1], out var targetType) ||
!long.TryParse(parts[2], out var targetId))
{
_logger.LogWarning("[LikeBatchSync] 解析操作字段失败: {Field}", field);
continue;
}
processedFields.Add(field);
if (kvp.Value.Action == "INSERT")
{
toInsert.Add((userId, targetType, targetId));
}
else if (kvp.Value.Action == "DELETE")
{
toDelete.Add((userId, targetType, targetId));
}
}
return (toInsert, toDelete, processedFields);
}
/// <summary>
/// 处理数据库操作
/// </summary>
private async Task ProcessDatabaseOperationsAsync(
List<(long UserId, int TargetType, long TargetId)> toInsert,
List<(long UserId, int TargetType, long TargetId)> toDelete,
CancellationToken stoppingToken)
{
const int maxRetries = 3;
var retryCount = 0;
while (retryCount < maxRetries)
{
try
{
// 使用FreeSql的事务
using (var uow = _fsql.CreateUnitOfWork())
{
try
{
var likesRepo = uow.GetRepository<T_Likes>();
// 1. 批量插入(先查询已存在的,避免重复插入)
if (toInsert.Count > 0)
{
var userIds = toInsert.Select(x => x.UserId).Distinct().ToList();
var targetTypes = toInsert.Select(x => x.TargetType).Distinct().ToList();
var targetIds = toInsert.Select(x => x.TargetId).Distinct().ToList();
var existing = await likesRepo.Select
.Where(x => userIds.Contains(x.UserId)
&& targetTypes.Contains(x.TargetType)
&& targetIds.Contains(x.TargetId))
.ToListAsync();
var existingKeys = existing.Select(x => (x.UserId, x.TargetType, x.TargetId)).ToHashSet();
var insertList = toInsert
.Where(x => !existingKeys.Contains((x.UserId, x.TargetType, x.TargetId)))
.Select(x => new T_Likes
{
UserId = x.UserId,
TargetType = x.TargetType,
TargetId = x.TargetId,
CreatedAt = DateTime.Now
})
.ToList();
if (insertList.Count > 0)
{
await likesRepo.InsertAsync(insertList);
_logger.LogInformation("[LikeBatchSync] 批量插入 {Count} 条点赞记录", insertList.Count);
}
}
// 2. 批量删除使用SQL批量删除
if (toDelete.Count > 0)
{
// 按顺序排序,避免死锁
var sortedDeletes = toDelete.OrderBy(x => x.UserId).ThenBy(x => x.TargetType).ThenBy(x => x.TargetId).ToList();
// 构建批量删除SQL
var deleteConditions = sortedDeletes.Select((item, index) =>
$"(UserId = @UserId{index} AND TargetType = @TargetType{index} AND TargetId = @TargetId{index})")
.ToList();
var parameters = new Dictionary<string, object>();
for (int i = 0; i < sortedDeletes.Count; i++)
{
parameters[$"@UserId{i}"] = sortedDeletes[i].UserId;
parameters[$"@TargetType{i}"] = sortedDeletes[i].TargetType;
parameters[$"@TargetId{i}"] = sortedDeletes[i].TargetId;
}
var sql = $@"
DELETE FROM T_Likes WITH (UPDLOCK, READPAST)
WHERE {string.Join(" OR ", deleteConditions)}";
var deletedRows = await _fsql.Ado.ExecuteNonQueryAsync(sql, parameters);
_logger.LogInformation("[LikeBatchSync] 批量删除 {Count} 条点赞记录", deletedRows);
}
// 3. 提交事务
uow.Commit();
}
catch (Exception ex)
{
uow.Rollback();
_logger.LogError(ex, "[LikeBatchSync] 数据库操作失败,已回滚");
throw;
}
}
return; // 成功则退出重试循环
}
catch (Exception ex) when (IsDeadlockException(ex) && retryCount < maxRetries - 1)
{
retryCount++;
var delay = TimeSpan.FromMilliseconds(100 * Math.Pow(2, retryCount));
_logger.LogWarning(ex, "[LikeBatchSync] 检测到死锁,第 {Retry} 次重试,延迟 {Delay}ms",
retryCount, delay.TotalMilliseconds);
await Task.Delay(delay, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "[LikeBatchSync] 处理数据库操作时发生异常");
throw;
}
}
}
private bool IsDeadlockException(Exception ex)
{
return ex.Message.Contains("deadlock") ||
ex.Message.Contains("deadlocked") ||
ex.Message.Contains("was chosen as the deadlock victim");
}
/// <summary>
/// 操作信息(用于反序列化)
/// </summary>
private class OperationInfo
{
public string Action { get; set; }
public DateTime OccurredAt { get; set; }
public long? ReceiverId { get; set; }
}
public override Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("[LikeBatchSync] 正在停止点赞批量同步服务...");
return base.StopAsync(cancellationToken);
}
}
}