撮合引擎开发:缓存和MQ

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 笔记

中间件

先来回顾下我们撮合程序项目中关于中间件的目录结构:

├── middleware               # 中间件的包
│   ├── cache                # 缓存包
│   │   └── cache.go         # 缓存操作
│   ├── mq                   # 消息队列包
│   │   └── mq.go            # MQ操作
│   └── redis.go             # 主要做Redis初始化操作

虽然现在只用到了 Redis 一个中间件,但设计个 middleware 包,会方便以后扩展添加其他中间件,如 Kafka 或 RocketMQ 等。

再将缓存和消息队列分包,职责上就很分明,应用时也很明确。

redis.go 就只是做初始化的连接,我们来看看代码:


package middleware
import (
  "matching/log"
  "github.com/go-redis/redis"
  "github.com/spf13/viper"
)
var RedisClient *redis.Client
func Init() {
  addr := viper.GetString("redis.addr")
  RedisClient = redis.NewClient(&redis.Options{
    Addr:     addr,
    Password: "", // no password set
    DB:       0,  // use default DB
  })
  _, err := RedisClient.Ping().Result()
  if err != nil {
    panic(err)
  } else {
    log.Printf("Connected to redis: %s", addr)
  }
}

其中,viper 是前文说过的第三方配置库,通过 viper.GetString("redis.addr") 从配置文件读取出要连接的 Redis 的地址,之后就新建一个 Redis 客户端并连接上 Redis 服务器了。


缓存的设计


讲数据结构设计时,我们已经说过,使用缓存的目的主要有两个:

  1. 请求去重,避免重复提交相同订单;
  2. 恢复数据,即程序重启后能恢复所有数据。

还记得上一篇文章讲 Dispatch 的实现时,有个判断订单是否存在的逻辑吗?就是读取缓存中是否已经存在该订单,从而判别是否为重复请求或无效请求。以及,还记得 process 包的初始化?就是从缓存中恢复数据的过程。

先了解下,我们总共缓存了哪些数据:

  • 开启撮合的交易标的 symbol;
  • 这些交易标的的最新价格;
  • 所有有效的订单请求,包括下单和撤单请求。

1. 缓存symbol

开启撮合的交易标的 symbol 会有多个,且不能重复,那其实就可以保存为集合 set 类型。我将该 set 的 key 设计为 matching:symbols,之后,每有一个 symbol 开启撮合时,就可以用 Redis 的 sadd 命令将该 symbol 添加进这个集合里去了。而关闭撮合时,则需用 srem 命令将关闭撮合的 symbol 从集合中移除。读取所有 symbol 则可用 smembers 命令操作。

程序里对 symbol 的操作提供了三个函数,分别用来保存 symbol、移除 symbol 和获取所有 symbol,以下是实现的代码:

func SaveSymbol(symbol string) {
  key := "matching:symbols"
  RedisClient.SAdd(key, symbol)
}
func RemoveSymbol(symbol string) {
  key := "matching:symbols"
  RedisClient.SRem(key, symbol)
}
func GetSymbols() []string {
  key := "matching:symbols"
  return RedisClient.SMembers(key).Val()
}

2. 缓存价格

交易标的的最新价格则是每个 symbol 会有一个价格,且无需缓存历史价格,那我就直接用字符串类型来保存价格,而每个价格的 key 则包含有各自的 symbol,key 的格式设计为 matching:price:{symbol},假如要保存的 symbol = “BTCUSD”,那对应的 key 值就是 matching:price:BTCUSD,保存的 value 值就是 BTCUSD 的最新价格。

我们也同样提供了保存价格、获取价格和删除价格的三个函数,代码如下:

func SavePrice(symbol string, price decimal.Decimal) {
  key := "matching:price:" + symbol
  RedisClient.Set(key, price.String(), 0)
}
func GetPrice(symbol string) decimal.Decimal {
  key := "matching:price:" + symbol
  priceStr := RedisClient.Get(key).Val()
  result, err := decimal.NewFromString(priceStr)
  if err != nil {
    result = decimal.Zero
  }
  return result
}
func RemovePrice(symbol string) {
  key := "matching:price:" + symbol
  RedisClient.Del(key)
}

3. 缓存订单

对订单的缓存设计则没那么简单了,需要满足两点要求:

  1. 既能缓存下单请求,也能缓存撤单请求;
  2. 订单要符合定序要求。

先说下第一点,为什么需要缓存订单?且为什么下单和撤单请求都需要缓存?

先来解答第一个问题,我们是在内存中撮合的,每个交易标的引擎里各自维护了一个交易委托账本,程序运行时,这些账本是直接保存在程序内存里的。那如果程序退出了,这些账本都被清空了。如果没有缓存,那程序重启后就无法恢复账本数据。要满足该需求,就需要缓存账本里的所有委托单。

关于第二个问题,我们来考虑这样一个场景:假如订单通道里有撤单请求在排队,而程序并没有对撤单请求做缓存,这时程序重启了,那么订单通道里的所有订单还没被引擎接收处理之前就被清空了,撤单请求也就无法恢复了。

因此,程序需要缓存好订单,且下单和撤单都需要缓存。

再来看第二个要求,为什么要符合定序?我们知道,订单通道里的订单是定序的,交易委托账本里同价格的订单也是按时间排序的,那缓存时如果不定序,程序重启后就难以保证按原有的顺序恢复订单。

那具体要怎么来设计这个订单的缓存呢?我的方案是分两类缓存,第一类保存每个独立的订单请求,包括下单和撤单;第二类分交易标的保存对应 symbol 所有订单请求的订单 ID 和 action。

第一类,我设计的 Key 格式为 matching:order:{symbol}:{orderId}:{action},symbol、orderId 和 action 则是对应订单的三个变量值。比如,某订单 symbol = “BTCUSD”,orderId = “12345”,action = “cancel”,那该订单保存到 Redis 的 Key 值就是 matching:order:BTCUSD:12345:cancel。该 Key 对应的 Value 则是保存整个订单对象,可以用 hash 类型存储。

第二类,我设计的 Key 格式为 matching:orderids:{symbol},Value 保存的是 sorted set 类型的数据,保存对应 symbol 的所有订单请求,每条记录保存的值为 {orderId}:{action},而 score 值设为对应订单的 {timestamp}。用订单时间作为 score 就可以保证定序了。还记得之前文章我们将订单时间的单位设为 100 纳秒,保证时间戳长度刚好为 16 位吗?这是因为,如果超过 16 位,那 score 将转为科学计数法表示,那将会导致数字失真。

根据这样的设计,那保存订单时的实现逻辑就如以下代码所示:

func SaveOrder(order map[string]interface{}) {
  symbol := order["symbol"].(string)
  orderId := order["orderId"].(string)
  timestamp := order["timestamp"].(float64)
  action := order["action"].(string)
  key := "matching:order:" + symbol + ":" + orderId + ":" + action
  RedisClient.HMSet(key, order)
  key = "matching:orderids:" + symbol
  z := &redis.Z{
    Score:  timestamp,
    Member: orderId + ":" + action,
  }
  RedisClient.ZAdd(key, z)
}

另外,还提供了 GetOrder()、UpdateOrder()、RemoveOrder()、OrderExist()、GetOrderIdsWithAction() 等函数。再给大伙看看 GetOrderIdsWithAction() 函数的实现:


func GetOrderIdsWithAction(symbol string) []string {
  key := "matching:orderids:" + symbol
  return RedisClient.ZRange(key, 0, -1).Val()
}

该函数得到的结果是根据 score 值排好序的,这就是我们想要的结果。理解了这个设计之后,再翻回去看看 process 包的初始化,你就会明白那些代码的逻辑了。


MQ的设计


我们选择了使用 Redis 的 Stream 数据结构来作为 MQ 输出,Stream 数据结构采用了类似 Kafka 的设计,应用起来很方便。但由于 Redis 运行于内存的特性,相比 Kafka 快速很多,这也是我选择它来作为撮合程序的输出 MQ 的主要原因。

我们只有两类 MQ,撤单结果和成交记录,发送消息的实现如下:

func SendCancelResult(symbol, orderId string, ok bool) {
  values := map[string]interface{}{"orderId": orderId, "ok": ok}
  a := &redis.XAddArgs{
    Stream:       "matching:cancelresults:" + symbol,
    MaxLenApprox: 1000,
    Values:       values,
  }
  RedisClient.XAdd(a)
}
func SendTrade(symbol string, trade map[string]interface{}) {
  a := &redis.XAddArgs{
    Stream:       "matching:trades:" + symbol,
    MaxLenApprox: 1000,
    Values:       trade,
  }
  RedisClient.XAdd(a)
}

其中,matching:cancelresults:{symbol} 就是撤单结果的 MQ 所属的 Key,matching:trades:{symbol} 则是成交记录的 MQ 所属的 Key。可以看到,我们还根据不同 symbol 分不同 MQ,这样还方便下游服务可以根据需要实现分布式订阅不同 symbol 的 MQ。


小结


本小节讲解了缓存和 MQ 的设计与实现,理解了这部分的设计之后,对整个撮合引擎的核心设计也基本能理解了。

最后,依然留几个思考题:是否可以不用缓存?如果不用缓存可以如何解决去重和数据恢复的问题?


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
缓存 Java
java开发常用模块——缓存模块
java开发常用模块——缓存模块
|
2月前
|
存储 缓存 NoSQL
后端开发中的缓存策略:提升应用性能的关键
后端开发中的缓存策略:提升应用性能的关键
31 0
|
3月前
|
存储 缓存 关系型数据库
InnoDB 引擎底层存储和缓存原理
InnoDB 引擎底层存储和缓存原理
|
2月前
|
存储 缓存 NoSQL
在Python Web开发过程中:数据库与缓存,Redis在Web开发中的常见应用场景有哪些?
Redis在Python Web开发中常用于缓存、会话管理、分布式锁、排行榜、消息队列和实时分析。作为内存数据存储,它提供高效的数据结构(如字符串、哈希、列表、集合、有序集合),支持会话存储、互斥操作、计数与排名、队列实现及实时数据处理。其高速性能和丰富功能使其成为多场景下的理想选择。
25 5
|
2月前
|
XML 存储 缓存
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache管理器的实战开发指南(修正篇)
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache管理器的实战开发指南(修正篇)
35 0
|
2月前
|
存储 XML 缓存
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache功能的开发实战指南(一)
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache功能的开发实战指南
102 0
|
10天前
|
缓存 编解码
FFmpeg开发笔记(十四)FFmpeg音频重采样的缓存
FFmpeg在视频流重编码和音频重采样中使用缓存机制。在音频文件格式转换时,特别是对于帧长度不固定的格式如ogg、amr、wma,需处理重采样缓存。通过调用`swr_convert`,传入空输入和0大小来清空缓存。在`swrmp3.c`中,修改帧样本数处理,并在循环结束后添加代码以冲刷缓存。编译并运行程序,将ogg文件重采样为MP3,日志显示操作成功,播放转换后的文件确认功能正常。
FFmpeg开发笔记(十四)FFmpeg音频重采样的缓存
|
11天前
|
缓存 NoSQL PHP
【PHP 开发专栏】Redis 作为 PHP 缓存的解决方案
【4月更文挑战第30天】本文探讨了Redis作为PHP缓存的优势,如高性能、丰富数据结构、数据持久化和分布式支持。通过安装配置Redis、选择PHP客户端、执行读写操作及制定缓存策略实现缓存。应用场景包括页面、数据和会话缓存。但需注意数据一致性、过期时间、容量和安全问题,以确保应用稳定和安全。Redis能有效提升PHP应用响应速度和处理能力。
|
12天前
|
缓存 监控 PHP
【PHP开发专栏】Memcached在PHP中的缓存应用
【4月更文挑战第29天】Memcached是高性能分布式内存缓存系统,常用于加速动态Web应用,减轻数据库负担。在PHP中,通过官方扩展模块与Memcached服务器交互,涉及安装扩展、创建实例、设置/获取缓存、删除缓存及其它操作。使用Memcached可减少数据库负载、缓存查询结果、实现页面缓存,支持分布式缓存,并需注意避免缓存击穿、穿透和雪崩。监控和调优缓存策略能优化性能。了解和掌握Memcached有助于提升PHP应用的效率和扩展性。
|
2月前
|
缓存 应用服务中间件 数据库
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
42 1

热门文章

最新文章