中间件
先来回顾下我们撮合程序项目中关于中间件的目录结构:
├── middleware # 中间件的包 │ ├── cache # 缓存包 │ │ └── cache.go # 缓存操作 │ ├── mq # 消息队列包 │ │ └── mq.go # MQ操作 │ └── redis.go # 主要做Redis初始化操作
虽然现在只用到了 Redis 一个中间件,但设计个 middleware 包,会方便以后扩展添加其他中间件,如 Kafka 或 RocketMQ 等。
再将缓存和消息队列分包,职责上就很分明,应用时也很明确。
redis.go 就只是做初始化的连接,我们来看看代码:
package middleware import ( "matching/log" "github.com/go-redis/redis" "github.com/spf13/viper" ) var RedisClient *redis.Client func Init() { addr := viper.GetString("redis.addr") RedisClient = redis.NewClient(&redis.Options{ Addr: addr, Password: "", // no password set DB: 0, // use default DB }) _, err := RedisClient.Ping().Result() if err != nil { panic(err) } else { log.Printf("Connected to redis: %s", addr) } }
其中,viper 是前文说过的第三方配置库,通过 viper.GetString("redis.addr") 从配置文件读取出要连接的 Redis 的地址,之后就新建一个 Redis 客户端并连接上 Redis 服务器了。
缓存的设计
讲数据结构设计时,我们已经说过,使用缓存的目的主要有两个:
- 请求去重,避免重复提交相同订单;
- 恢复数据,即程序重启后能恢复所有数据。
还记得上一篇文章讲 Dispatch 的实现时,有个判断订单是否存在的逻辑吗?就是读取缓存中是否已经存在该订单,从而判别是否为重复请求或无效请求。以及,还记得 process 包的初始化?就是从缓存中恢复数据的过程。
先了解下,我们总共缓存了哪些数据:
- 开启撮合的交易标的 symbol;
- 这些交易标的的最新价格;
- 所有有效的订单请求,包括下单和撤单请求。
1. 缓存symbol
开启撮合的交易标的 symbol 会有多个,且不能重复,那其实就可以保存为集合 set 类型。我将该 set 的 key 设计为 matching:symbols,之后,每有一个 symbol 开启撮合时,就可以用 Redis 的 sadd 命令将该 symbol 添加进这个集合里去了。而关闭撮合时,则需用 srem 命令将关闭撮合的 symbol 从集合中移除。读取所有 symbol 则可用 smembers 命令操作。
程序里对 symbol 的操作提供了三个函数,分别用来保存 symbol、移除 symbol 和获取所有 symbol,以下是实现的代码:
func SaveSymbol(symbol string) { key := "matching:symbols" RedisClient.SAdd(key, symbol) } func RemoveSymbol(symbol string) { key := "matching:symbols" RedisClient.SRem(key, symbol) } func GetSymbols() []string { key := "matching:symbols" return RedisClient.SMembers(key).Val() }
2. 缓存价格
交易标的的最新价格则是每个 symbol 会有一个价格,且无需缓存历史价格,那我就直接用字符串类型来保存价格,而每个价格的 key 则包含有各自的 symbol,key 的格式设计为 matching:price:{symbol},假如要保存的 symbol = “BTCUSD”,那对应的 key 值就是 matching:price:BTCUSD,保存的 value 值就是 BTCUSD 的最新价格。
我们也同样提供了保存价格、获取价格和删除价格的三个函数,代码如下:
func SavePrice(symbol string, price decimal.Decimal) { key := "matching:price:" + symbol RedisClient.Set(key, price.String(), 0) } func GetPrice(symbol string) decimal.Decimal { key := "matching:price:" + symbol priceStr := RedisClient.Get(key).Val() result, err := decimal.NewFromString(priceStr) if err != nil { result = decimal.Zero } return result } func RemovePrice(symbol string) { key := "matching:price:" + symbol RedisClient.Del(key) }
3. 缓存订单
对订单的缓存设计则没那么简单了,需要满足两点要求:
- 既能缓存下单请求,也能缓存撤单请求;
- 订单要符合定序要求。
先说下第一点,为什么需要缓存订单?且为什么下单和撤单请求都需要缓存?
先来解答第一个问题,我们是在内存中撮合的,每个交易标的引擎里各自维护了一个交易委托账本,程序运行时,这些账本是直接保存在程序内存里的。那如果程序退出了,这些账本都被清空了。如果没有缓存,那程序重启后就无法恢复账本数据。要满足该需求,就需要缓存账本里的所有委托单。
关于第二个问题,我们来考虑这样一个场景:假如订单通道里有撤单请求在排队,而程序并没有对撤单请求做缓存,这时程序重启了,那么订单通道里的所有订单还没被引擎接收处理之前就被清空了,撤单请求也就无法恢复了。
因此,程序需要缓存好订单,且下单和撤单都需要缓存。
再来看第二个要求,为什么要符合定序?我们知道,订单通道里的订单是定序的,交易委托账本里同价格的订单也是按时间排序的,那缓存时如果不定序,程序重启后就难以保证按原有的顺序恢复订单。
那具体要怎么来设计这个订单的缓存呢?我的方案是分两类缓存,第一类保存每个独立的订单请求,包括下单和撤单;第二类分交易标的保存对应 symbol 所有订单请求的订单 ID 和 action。
第一类,我设计的 Key 格式为 matching:order:{symbol}:{orderId}:{action},symbol、orderId 和 action 则是对应订单的三个变量值。比如,某订单 symbol = “BTCUSD”,orderId = “12345”,action = “cancel”,那该订单保存到 Redis 的 Key 值就是 matching:order:BTCUSD:12345:cancel。该 Key 对应的 Value 则是保存整个订单对象,可以用 hash 类型存储。
第二类,我设计的 Key 格式为 matching:orderids:{symbol},Value 保存的是 sorted set 类型的数据,保存对应 symbol 的所有订单请求,每条记录保存的值为 {orderId}:{action},而 score 值设为对应订单的 {timestamp}。用订单时间作为 score 就可以保证定序了。还记得之前文章我们将订单时间的单位设为 100 纳秒,保证时间戳长度刚好为 16 位吗?这是因为,如果超过 16 位,那 score 将转为科学计数法表示,那将会导致数字失真。
根据这样的设计,那保存订单时的实现逻辑就如以下代码所示:
func SaveOrder(order map[string]interface{}) { symbol := order["symbol"].(string) orderId := order["orderId"].(string) timestamp := order["timestamp"].(float64) action := order["action"].(string) key := "matching:order:" + symbol + ":" + orderId + ":" + action RedisClient.HMSet(key, order) key = "matching:orderids:" + symbol z := &redis.Z{ Score: timestamp, Member: orderId + ":" + action, } RedisClient.ZAdd(key, z) }
另外,还提供了 GetOrder()、UpdateOrder()、RemoveOrder()、OrderExist()、GetOrderIdsWithAction() 等函数。再给大伙看看 GetOrderIdsWithAction() 函数的实现:
func GetOrderIdsWithAction(symbol string) []string { key := "matching:orderids:" + symbol return RedisClient.ZRange(key, 0, -1).Val() }
该函数得到的结果是根据 score 值排好序的,这就是我们想要的结果。理解了这个设计之后,再翻回去看看 process 包的初始化,你就会明白那些代码的逻辑了。
MQ的设计
我们选择了使用 Redis 的 Stream 数据结构来作为 MQ 输出,Stream 数据结构采用了类似 Kafka 的设计,应用起来很方便。但由于 Redis 运行于内存的特性,相比 Kafka 快速很多,这也是我选择它来作为撮合程序的输出 MQ 的主要原因。
我们只有两类 MQ,撤单结果和成交记录,发送消息的实现如下:
func SendCancelResult(symbol, orderId string, ok bool) { values := map[string]interface{}{"orderId": orderId, "ok": ok} a := &redis.XAddArgs{ Stream: "matching:cancelresults:" + symbol, MaxLenApprox: 1000, Values: values, } RedisClient.XAdd(a) } func SendTrade(symbol string, trade map[string]interface{}) { a := &redis.XAddArgs{ Stream: "matching:trades:" + symbol, MaxLenApprox: 1000, Values: trade, } RedisClient.XAdd(a) }
其中,matching:cancelresults:{symbol} 就是撤单结果的 MQ 所属的 Key,matching:trades:{symbol} 则是成交记录的 MQ 所属的 Key。可以看到,我们还根据不同 symbol 分不同 MQ,这样还方便下游服务可以根据需要实现分布式订阅不同 symbol 的 MQ。
小结
本小节讲解了缓存和 MQ 的设计与实现,理解了这部分的设计之后,对整个撮合引擎的核心设计也基本能理解了。
最后,依然留几个思考题:是否可以不用缓存?如果不用缓存可以如何解决去重和数据恢复的问题?