动手造轮子:基于 Redis 实现 EventBus

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 动手造轮子:基于 Redis 实现 EventBusIntro#上次我们造了一个简单的基于内存的 EventBus,但是如果要跨系统的话就不合适了,所以有了这篇基于 Redis 的 EventBus 探索。

动手造轮子:基于 Redis 实现 EventBus
Intro#
上次我们造了一个简单的基于内存的 EventBus,但是如果要跨系统的话就不合适了,所以有了这篇基于 Redis 的 EventBus 探索。

本文的实现是基于 StackExchange.Redis 来实现的。

RedisEventStore 实现#
既然要实现跨系统的 EventBus 再使用基于内存的 EventStore 自然不行,因此这里基于 Redis 设计了一个 EventStoreInRedis ,基于 redis 的 Hash 来实现,以 Event 的 EventKey 作为 fieldName,以 Event 对应的 EventHandler 作为 value。

EventStoreInRedis 实现:

Copy
public class EventStoreInRedis : IEventStore
{

protected readonly string EventsCacheKey;
protected readonly ILogger Logger;

private readonly IRedisWrapper Wrapper;

public EventStoreInRedis(ILogger<EventStoreInRedis> logger)
{
    Logger = logger;
    Wrapper = new RedisWrapper(RedisConstants.EventStorePrefix);

    EventsCacheKey = RedisManager.RedisConfiguration.EventStoreCacheKey;
}

public bool AddSubscription<TEvent, TEventHandler>()
    where TEvent : IEventBase
    where TEventHandler : IEventHandler<TEvent>
{
    var eventKey = GetEventKey<TEvent>();
    var handlerType = typeof(TEventHandler);
    if (Wrapper.Database.HashExists(EventsCacheKey, eventKey))
    {
        var handlers = Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey));

        if (handlers.Contains(handlerType))
        {
            return false;
        }
        handlers.Add(handlerType);
        Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(handlers));
        return true;
    }
    else
    {
        return Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(new HashSet<Type> { handlerType }), StackExchange.Redis.When.NotExists);
    }
}

public bool Clear()
{
    return Wrapper.Database.KeyDelete(EventsCacheKey);
}

public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase
{
    var eventKey = GetEventKey<TEvent>();
    return Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey));
}

public string GetEventKey<TEvent>()
{
    return typeof(TEvent).FullName;
}

public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase
{
    var eventKey = GetEventKey<TEvent>();
    return Wrapper.Database.HashExists(EventsCacheKey, eventKey);
}

public bool RemoveSubscription<TEvent, TEventHandler>()
    where TEvent : IEventBase
    where TEventHandler : IEventHandler<TEvent>
{
    var eventKey = GetEventKey<TEvent>();
    var handlerType = typeof(TEventHandler);

    if (!Wrapper.Database.HashExists(EventsCacheKey, eventKey))
    {
        return false;
    }

    var handlers = Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey));

    if (!handlers.Contains(handlerType))
    {
        return false;
    }

    handlers.Remove(handlerType);
    Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(handlers));
    return true;
}

}
RedisWrapper 及更具体的代码可以参考我的 Redis 的扩展的实现 https://github.com/WeihanLi/WeihanLi.Redis

RedisEventBus 实现#
RedisEventBus 是基于 Redis 的 PUB/SUB 实现的,实现的感觉还有一些小问题,我想确保每个客户端注册的时候每个 EventHandler 即使多次注册也只注册一次,但是还没找到一个好的实现,如果你有什么想法欢迎指出,和我一起交流。具体的实现细节如下:

Copy
public class RedisEventBus : IEventBus
{

private readonly IEventStore _eventStore;
private readonly ISubscriber _subscriber;
private readonly IServiceProvider _serviceProvider;

public RedisEventBus(IEventStore eventStore, IConnectionMultiplexer connectionMultiplexer, IServiceProvider serviceProvider)
{
    _eventStore = eventStore;
    _serviceProvider = serviceProvider;
    _subscriber = connectionMultiplexer.GetSubscriber();
}

private string GetChannelPrefix<TEvent>() where TEvent : IEventBase
{
    var eventKey = _eventStore.GetEventKey<TEvent>();
    var channelPrefix =
        $"{RedisManager.RedisConfiguration.EventBusChannelPrefix}{RedisManager.RedisConfiguration.KeySeparator}{eventKey}{RedisManager.RedisConfiguration.KeySeparator}";
    return channelPrefix;
}

private string GetChannelName<TEvent, TEventHandler>() where TEvent : IEventBase
    where TEventHandler : IEventHandler<TEvent>
    => GetChannelName<TEvent>(typeof(TEventHandler));

private string GetChannelName<TEvent>(Type eventHandlerType) where TEvent : IEventBase
{
    var channelPrefix = GetChannelPrefix<TEvent>();
    var channelName = $"{channelPrefix}{eventHandlerType.FullName}";

    return channelName;
}

public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase
{
    if (!_eventStore.HasSubscriptionsForEvent<TEvent>())
    {
        return false;
    }

    var eventData = @event.ToJson();
    var handlerTypes = _eventStore.GetEventHandlerTypes<TEvent>();
    foreach (var handlerType in handlerTypes)
    {
        var handlerChannelName = GetChannelName<TEvent>(handlerType);
        _subscriber.Publish(handlerChannelName, eventData);
    }

    return true;
}

public bool Subscribe<TEvent, TEventHandler>()
    where TEvent : IEventBase
    where TEventHandler : IEventHandler<TEvent>
{
    _eventStore.AddSubscription<TEvent, TEventHandler>();

    var channelName = GetChannelName<TEvent, TEventHandler>();

    //// TODO: if current client subscribed the channel
    //if (true)
    //{
    _subscriber.Subscribe(channelName, async (channel, eventMessage) =>
    {
        var eventData = eventMessage.ToString().JsonToType<TEvent>();
        var handler = _serviceProvider.GetServiceOrCreateInstance<TEventHandler>();
        if (null != handler)
        {
            await handler.Handle(eventData).ConfigureAwait(false);
        }
    });
    return true;
    //}

    //return false;
}

public bool Unsubscribe<TEvent, TEventHandler>()
    where TEvent : IEventBase
    where TEventHandler : IEventHandler<TEvent>
{
    _eventStore.RemoveSubscription<TEvent, TEventHandler>();

    var channelName = GetChannelName<TEvent, TEventHandler>();

    //// TODO: if current client subscribed the channel
    //if (true)
    //{
    _subscriber.Unsubscribe(channelName);
    return true;
    //}
    //return false;
}

}
使用示例:#
使用起来大体上和上一篇使用一致,只是在初始化注入服务的时候,我们需要把 IEventBus 和 IEventStore 替换为对应 Redis 的实现即可。

注册服务

Copy
services.AddSingleton();
services.AddSingleton();
注册 EventHandler

Copy
services.AddSingleton();
订阅事件

Copy
eventBus.Subscribe();
发布事件

Copy
[HttpGet("{path}")]
public async Task GetByPath(string path, CancellationToken cancellationToken, [FromServices]IEventBus eventBus)
{

var notice = await _repository.FetchAsync(n => n.NoticeCustomPath == path, cancellationToken);
if (notice == null)
{
    return NotFound();
}
eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId });
return Ok(notice);

}
Memo#
如果要实现基于消息队列的事件处理,需要注意,消息可能会重复,可能会需要在事件处理中注意一下业务的幂等性或者对消息对一个去重处理。

我在使用 Redis 的事件处理中使用了一个基于 Redis 原子递增的特性设计的一个防火墙,从而实现一段时间内某一个消息id只会被处理一次,实现源码:https://github.com/WeihanLi/ActivityReservation/blob/dev/ActivityReservation.Helper/Events/NoticeViewEvent.cs

Copy
public class NoticeViewEvent : EventBase
{

public Guid NoticeId { get; set; }

// UserId
// IP
// ...

}

public class NoticeViewEventHandler : IEventHandler
{

public async Task Handle(NoticeViewEvent @event)
{
    var firewallClient = RedisManager.GetFirewallClient($"{nameof(NoticeViewEventHandler)}_{@event.EventId}", TimeSpan.FromMinutes(5));
    if (await firewallClient.HitAsync())
    {
        await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>
        {
            //var notice = await dbContext.Notices.FindAsync(@event.NoticeId);
            //notice.NoticeVisitCount += 1;
            //await dbContext.SaveChangesAsync();

            var conn = dbContext.Database.GetDbConnection();
            await conn.ExecuteAsync($@"UPDATE tabNotice SET NoticeVisitCount = NoticeVisitCount +1 WHERE NoticeId = @NoticeId", new { @event.NoticeId });
        });
    }
}

}
Reference#
https://github.com/WeihanLi/ActivityReservation
https://github.com/WeihanLi/WeihanLi.Redis
https://redis.io/topics/pubsub
作者:WeihanLi

出处:https://www.cnblogs.com/weihanli/p/implement-eventbus-with-redis-pubsub.html

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
存储 移动开发 NoSQL
微服务轮子项目(29) -Redis 单机、主从复制、哨兵、cluster集群、持久化方案(下)
微服务轮子项目(29) -Redis 单机、主从复制、哨兵、cluster集群、持久化方案(下)
94 0
|
缓存 移动开发 NoSQL
php结合redis实现高并发下的抢购、秒杀功能的实例
php结合redis实现高并发下的抢购、秒杀功能的实例
271 0
|
NoSQL Redis
Redis学习4:List数据类型、拓展操作、实现日志等
注意点:对存储空间的顺序进行分析!
Redis学习4:List数据类型、拓展操作、实现日志等
|
存储 NoSQL Redis
Redis学习3:hash类型操作、拓展操作、实现购物等
首先可以理解成一个redis里面有一个小的redis。同时要注意引入了一个field的名字。
Redis学习3:hash类型操作、拓展操作、实现购物等
|
缓存 NoSQL 安全
2021年你还不会Shiro?----10.使用redis实现Shiro的缓存
上一篇文章已经总结了使用ehCache来实现Shiro的缓存管理,步骤也很简单,引入依赖后,直接开启Realm的缓存管理器即可。如果使用Redis来实现缓存管理其实也是一样的,我们也是需要引入redis的依赖,然后开启缓存传入自定义的redis的缓存管理器就行。区别是我们需要为自定义的redis缓存管理器提供自定义的缓存管理类。这个缓存管理类中需要使用到redisTemplate模板,这个模板我们也是需要自己定义。
290 0
|
NoSQL Java 关系型数据库
浅谈Redis实现分布式锁
浅谈Redis实现分布式锁
|
存储 NoSQL 关系型数据库
「Redis」事务实现机制
Redis事务实现机制
600 0
|
消息中间件 设计模式 NoSQL
异步结果通知实现——基于Redis实现,我这操作很可以
前段时间,我在内存中实现了一个简单异步通知框架。但由于没有持久化功能,应用重启就会导致数据丢失,且不支持分布式和集群。今天这篇笔记,引入了 Redis 来解决这些问题,以下是几点理由: 数据结构丰富,支持 List、Sorted Set 等 具有持久化功能,消息的可靠性能得到保证 高可用性,支持单机、主从、集群部署 项目中已使用,接入成本更低 基于 Redis 实现延时队列也有几种方法,展开详细讲讲。
|
NoSQL 前端开发 PHP
thinkphp+redis实现秒杀功能
thinkphp+redis实现秒杀功能
275 0
thinkphp+redis实现秒杀功能
|
存储 NoSQL 安全
分布式锁中-基于 Redis 的实现如何防重入
分布式锁中-基于 Redis 的实现如何防重入
279 0