103 lines
3.4 KiB
C#
103 lines
3.4 KiB
C#
using LiveForum.Code.Redis.Contract;
|
||
using LiveForum.IService.Messages;
|
||
using LiveForum.Model.Events;
|
||
using Microsoft.Extensions.Logging;
|
||
using StackExchange.Redis;
|
||
using System;
|
||
using System.Text.Json;
|
||
using System.Threading.Tasks;
|
||
|
||
namespace LiveForum.Service.Messages
|
||
{
|
||
/// <summary>
|
||
/// 基于Redis的消息事件总线实现
|
||
/// </summary>
|
||
public class RedisMessageEventBus : IMessageEventBus
|
||
{
|
||
private readonly IRedisService _redisService;
|
||
private readonly ILogger<RedisMessageEventBus> _logger;
|
||
private readonly IDatabase _database;
|
||
|
||
// 队列名称常量
|
||
private const string LIKE_QUEUE = "message:events:like";
|
||
private const string COMMENT_QUEUE = "message:events:comment";
|
||
private const string CUSTOM_QUEUE = "message:events:custom";
|
||
|
||
public RedisMessageEventBus(
|
||
IRedisService redisService,
|
||
ILogger<RedisMessageEventBus> logger,
|
||
IDatabase database)
|
||
{
|
||
_redisService = redisService;
|
||
_logger = logger;
|
||
_database = database;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息事件到Redis队列
|
||
/// </summary>
|
||
public async Task PublishAsync<T>(T eventData) where T : MessageEventBase
|
||
{
|
||
try
|
||
{
|
||
// 根据事件类型选择队列
|
||
var queueName = GetQueueName(eventData);
|
||
|
||
// 序列化事件数据
|
||
var json = JsonSerializer.Serialize(eventData, new JsonSerializerOptions
|
||
{
|
||
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
|
||
});
|
||
|
||
// 推入Redis List队列(右侧推入,左侧弹出,保证FIFO)
|
||
await _database.ListRightPushAsync(queueName, json);
|
||
|
||
_logger.LogInformation(
|
||
"[EventBus] 事件已发布: {EventType}, EventId: {EventId}, Queue: {Queue}",
|
||
eventData.EventType, eventData.EventId, queueName);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex,
|
||
"[EventBus] 发布事件失败: {EventType}, EventId: {EventId}",
|
||
eventData.EventType, eventData.EventId);
|
||
|
||
// 发布失败不应该阻塞主业务流程,只记录日志
|
||
// 但也可以选择抛出异常让调用方处理
|
||
throw;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取队列长度
|
||
/// </summary>
|
||
public async Task<long> GetQueueLengthAsync(string queueName)
|
||
{
|
||
try
|
||
{
|
||
return await _database.ListLengthAsync(queueName);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, "[EventBus] 获取队列长度失败: {Queue}", queueName);
|
||
return -1;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 根据事件类型获取队列名称
|
||
/// </summary>
|
||
private string GetQueueName(MessageEventBase eventData)
|
||
{
|
||
return eventData switch
|
||
{
|
||
LikeEvent => LIKE_QUEUE,
|
||
CommentReplyEvent => COMMENT_QUEUE,
|
||
CustomMessageEvent => CUSTOM_QUEUE,
|
||
_ => throw new ArgumentException($"未知的事件类型: {eventData.GetType().Name}")
|
||
};
|
||
}
|
||
}
|
||
}
|
||
|