撮合引擎开发:流程的代码实现

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
日志服务 SLS,月写入数据量 50GB 1个月
简介: 笔记

程序入口


我们要开始聊代码实现逻辑了,如果不记得之前讲的目录结构,请回去翻看前文。聊代码实现的第一步自然从程序入口开始,核心就两个函数:init()main(),其代码如下:

package main
... //other codes
func init() {
  initViper()
  initLog()
  engine.Init()
  middleware.Init()
  process.Init()
}
func main() {
  mux := http.NewServeMux()
  mux.HandleFunc("/openMatching", handler.OpenMatching)
  mux.HandleFunc("/closeMatching", handler.CloseMatching)
  mux.HandleFunc("/handleOrder", handler.HandleOrder)
  log.Printf("HTTP ListenAndServe at port %s", viper.GetString("server.port"))
  if err := http.ListenAndServe(viper.GetString("server.port"), mux); err != nil {
    panic(err)
  }
}

init() 函数做了一些初始化的操作,我来简单介绍这几个初始化函数:

  • initViper():配置文件初始化,使用了第三方配置库 viper,这是一个被广泛使用的配置库,其 github 地址为 github.com/spf13/viper
  • initLog():日志初始化,程序主要使用自己定义的日志包用来输出日志文件,该日志包的实现后续文章再单独讲。
  • engine.Init():引擎包的初始化,只是初始化了一个 map,用来保存不同交易标的的订单 channel,作为各交易标的的定序队列来用。
  • middleware.Init():中间件的初始化,我们用到的中间件就只有 Redis,所以这里其实就是初始化 Redis 连接。Redis 客户端库方面我选择的是 go-redis/redis
  • process.Init():这一步主要是从缓存加载和恢复各交易标的引擎的启动和所有订单数据。

viper 和 redis 的初始化都是参照官方 demo 写的,这里就不展开说明了。log 后续再单独讲。engine 包和 process 包的初始化就需要好好讲讲。

其中,引擎包的初始化虽然非常简单,但很关键,其代码写在 engine/init.go 文件中,完整代码如下:

package engine
var ChanMap map[string]chan Order
func Init() {
  ChanMap = make(map[string]chan Order)
}

这个保存通道的 map,其 Key 是各交易标的的 symbol,即是说每个交易标的各有一个订单通道,这些订单通道将作为每个交易标的的定序队列。

process 包的初始化则如下:

func Init() {
  symbols := cache.GetSymbols()
  for _, symbol := range symbols {
    price := cache.GetPrice(symbol)
    NewEngine(symbol, price)
    orderIds := cache.GetOrderIdsWithAction(symbol)
    for _, orderId := range orderIds {
      mapOrder := cache.GetOrder(symbol, orderId)
      order := engine.Order{}
      order.FromMap(mapOrder)
      engine.ChanMap[order.Symbol] <- order
    }
  }
}

简单讲解下实现逻辑:

  1. 从缓存读取所有 symbol,即程序重启之前,已经开启了撮合的所有交易标的的 symbol;
  2. 从缓存读取每个 symbol 对应的价格,这是程序重启前的最新成交价格;
  3. 启动每个 symbol 的撮合引擎;
  4. 从缓存读取每个 symbol 的所有订单,这些订单都是按时间顺序排列的;
  5. 按顺序将这些订单添加到对应 symbol 的订单通道里去。

如果对这里面有些设计逻辑还不太明白的话,也没关系,后面讲到对应模块时会再详细说明。

main() 函数里,定义了我们之前所说的三个接口,分别交由对应的 handler 去处理具体的请求,之后就启动 http 服务了。


handler


因为只有几个接口,而且也很简单,因此,并没有引入第三方 web 框架,handler 都是用原生实现的。先来看看 OpenMatching 的完整实现:

package handler
import (
  "encoding/json"
  "io/ioutil"
  "net/http"
  "strings"
  "matching/errcode"
  "matching/process"
  "github.com/shopspring/decimal"
)
type openMatchingParams struct {
  Symbol string          `json:"symbol"`
  Price  decimal.Decimal `json:"price"`
}
func OpenMatching(w http.ResponseWriter, r *http.Request) {
  w.Header().Set("Content-Type", "application/json")
  if r.Method != http.MethodPost {
    w.WriteHeader(http.StatusMethodNotAllowed)
    return
  }
  body, err := ioutil.ReadAll(r.Body)
  if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    return
  }
  var params openMatchingParams
  if err := json.Unmarshal(body, &params); err != nil {
    w.WriteHeader(http.StatusBadRequest)
    return
  }
  if strings.TrimSpace(params.Symbol) == "" {
    w.Write(errcode.BlankSymbol.ToJson())
    return
  }
  if params.Price.IsNegative() {
    w.Write(errcode.InvalidPrice.ToJson())
    return
  }
  if e := process.NewEngine(params.Symbol, params.Price); !e.IsOK() {
    w.Write(e.ToJson())
    return
  }
  w.Write(errcode.OK.ToJson())
}

逻辑非常简单,先判断是否为 POST 请求,再读取 body 里的数据并转为结构体对象,接着对参数做个简单的检查,最后就调用 process.NewEngine(symbol, price) 进入下一步的业务逻辑,如果结果返回是 OK,也返回 OK 作为请求的响应。

另外,用到了第三方的 decimal.Decimal 类型用来表示价格,整个程序都统一用 decimal 来表示浮点数和做精确计算。

CloseMatchingHandleOrder 的实现逻辑也是同理,CloseMatching 最后会调用 process.CloseEngine(symbol) 函数进入下一步的处理,HandleOrder 最后则调用 process.Dispatch(order) 进入下一步。不过,Order 结构体是定义在 engine 包的,其结构如下:

type Order struct {
  Action    enum.OrderAction `json:"action"`
  Symbol    string           `json:"symbol"`
  OrderId   string           `json:"orderId"`
  Side      enum.OrderSide   `json:"side"`
  Type      enum.OrderType   `json:"type"`
  Amount    decimal.Decimal  `json:"amount"`
  Price     decimal.Decimal  `json:"price"`
  Timestamp int64            `json:"timestamp"`
}

可以看到,其中的字段,除了有 Decimal 类型,还有 enum 包的几个类型,这几个其实是我们程序中自己定义的枚举类型。Golang 语言本身并没有提供和其他语言一样的 enum 关键字来定义枚举类型,所以一般采用类型定义+常量来模拟枚举类型,以 enum.OrderAction 为例:


type OrderAction string
const (
  ActionCreate OrderAction = "create"
  ActionCancel OrderAction = "cancel"
)

其他几个枚举类型也是这样定义的。

另外,为了方便转为字符串和检验参数是否有效,程序中还为每个枚举类型分别提供了两个函数,还是以 OrderAction 为例:

func (o OrderAction) String() string {
  switch o {
  case ActionCreate:
    return "create"
  case ActionCancel:
    return "cancel"
  default:
    return "unknown"
  }
}
func (o OrderAction) Valid() bool {
  if o.String() == "unknown" {
    return false
  }
  return true
}

其他几个枚举类型也都定义了类似的两个函数,就不再贴代码了。


process 包


来回顾下 process 包有哪些文件:

└── process                  #
    ├── close_engine.go      # 关闭引擎
    ├── dispatch.go          # 分发订单
    ├── init.go              # 初始化
    └── new_engine.go        # 启动新引擎

init.go 就一个初始化函数,上文已经讲了。其他三个文件分别定义了上文三个 handler 对应的下一步逻辑实现。


启动新引擎

先来看看 new_engine.go

package process
import (
  "matching/engine"
  "matching/errcode"
  "matching/middleware/cache"
  "github.com/shopspring/decimal"
)
func NewEngine(symbol string, price decimal.Decimal) *errcode.Errcode {
  if engine.ChanMap[symbol] != nil {
    return errcode.EngineExist
  }
  engine.ChanMap[symbol] = make(chan engine.Order, 100)
  go engine.Run(symbol, price)
  cache.SaveSymbol(symbol)
  cache.SavePrice(symbol, price)
  return errcode.OK
}

逻辑也是比较简单的,第一步先判断 ChanMap[symbol] 是否为空,该 ChanMap 就是上文所说的引擎包初始化时用来保存订单通道的 map。如果 ChanMap[symbol] 不为空,说明该 symbol 的撮合引擎已经启动过了,那就返回错误。如果为空,那就初始化这个 symbol 的通道,从代码可知,ChanMap[symbol] 初始化为一个缓冲大小为 100 的订单通道。

接着,就调用 engine.Run() 启动一个 goroutine 了,这行代码即表示用 goroutine 的方式启动指定 symbol 的撮合引擎了。

然后,就将 symbol 和 price 都缓存起来了。

最后,返回 OK,搞定。


2. 分发订单

接着,来看看 Dispatch 的实现又是怎样的:

func Dispatch(order engine.Order) *errcode.Errcode {
  if engine.ChanMap[order.Symbol] == nil {
    return errcode.EngineNotFound
  }
  if order.Action == enum.ActionCreate {
    if cache.OrderExist(order.Symbol, order.OrderId, order.Action.String()) {
      return errcode.OrderExist
    }
  } else {
    if !cache.OrderExist(order.Symbol, order.OrderId, enum.ActionCreate.String()) {
      return errcode.OrderNotFound
    }
  }
  order.Timestamp = time.Now().UnixNano() / 1e3
  cache.SaveOrder(order.ToMap())
  engine.ChanMap[order.Symbol] <- order
  return errcode.OK
}

第一步,判断 ChanMap[order.Symbol] 是否为空,如果为空,表示引擎没开启,那就无法处理订单。

第二步,判断订单是否存在。如果是 create 订单,那缓存中就不应该查到订单,否则说明是重复请求。如果是 cancel 订单,那缓存中如果也查不到订单,那说明该订单已经全部成交或已经成功撤单过了。

第三步,将订单时间设为当前时间,时间单位是 100 纳秒,这可以保证时间戳长度刚好为 16 位,保存到 Redis 里就不会有精度失真的问题。这点后续文章讲到 Redis 详细设计时再说。

第四步,将订单缓存。

第五步,将订单传入对应的订单通道,对应引擎会从该通道中获取该订单进行处理。这一步就实现了订单的分发。

第六步,返回 OK。

3. 关闭引擎

关闭引擎的实现就非常简单了,请看代码:

func CloseEngine(symbol string) *errcode.Errcode {
  if engine.ChanMap[symbol] == nil {
    return errcode.EngineNotFound
  }
  close(engine.ChanMap[symbol])
  return errcode.OK
}

核心代码就一行,将对应 symbol 的订单通道关闭。后续的处理其实是在引擎里完成的,待会我们再结合引擎里的代码来讲解这个设计。


引擎入口的实现


交易引擎 goroutine 的启动入口就是 engine.Run() 函数,来看看其代码实现:

func Run(symbol string, price decimal.Decimal) {
  lastTradePrice := price
  book := &orderBook{}
  book.init()
  log.Info("engine %s is running", symbol)
  for {
    order, ok := <-ChanMap[symbol]
    if !ok {
      log.Info("engine %s is closed", symbol)
      delete(ChanMap, symbol)
      cache.Clear(symbol)
      return
    }
    log.Info("engine %s receive an order: %s", symbol, order.ToJson())
    switch order.Action {
    case enum.ActionCreate:
      dealCreate(&order, book, &lastTradePrice)
    case enum.ActionCancel:
      dealCancel(&order, book)
    }
  }
}

第一步,先定义和初始化了一个 book 变量,该变量就是用来保存整个交易委托账本

接着,就是一个 for 循环了,for 循环里的第一行就是从对应 symbol 的订单通道里读取出一个订单,读取到订单时,order 变量就会有值,且 ok 变量为 true。如果通道里暂时没有订单,那就会阻塞在这行代码,直到从通道中获取到订单或通道已关闭的消息。

当通道被关闭之后,最后,从通道中读取到的 ok 变量则为 false,当然,在这之前,会先依序读取完通道里剩下的订单。当 ok 为 false 时,引擎里会执行两步操作:一是从 ChanMap 中删除该 symbol 对应的记录,二是清空该 symbol 对应的缓存数据。最后用 return 来退出 for 循环,这样,整个 Run() 函数就结束退出了,意味着该引擎也真正关闭了。

当每读取到一个订单,就会判断是下单还是撤单,然后进行相应的逻辑处理了。

我们先来看看撤单的逻辑,这个比较简单:

func dealCancel(order *Order, book *orderBook) {
  var ok bool
  switch order.Side {
  case enum.SideBuy:
    ok = book.removeBuyOrder(order)
  case enum.SideSell:
    ok = book.removeSellOrder(order)
  }
  cache.RemoveOrder(order.ToMap())
  mq.SendCancelResult(order.Symbol, order.OrderId, ok)
  log.Info("engine %s, order %s cancel result is %s", order.Symbol, order.OrderId, ok)
}

核心就三个步骤:

  1. 从委托账本中移除该订单;
  2. 从缓存中移除该订单;
  3. 发送撤单结果到 MQ。

下单逻辑就比较复杂了,需要根据不同的订单类型做不同的逻辑处理,请看代码:

func dealCreate(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
  switch order.Type {
  case enum.TypeLimit:
    dealLimit(order, book, lastTradePrice)
  case enum.TypeLimitIoc:
    dealLimitIoc(order, book, lastTradePrice)
  case enum.TypeMarket:
    dealMarket(order, book, lastTradePrice)
  case enum.TypeMarketTop5:
    dealMarketTop5(order, book, lastTradePrice)
  case enum.TypeMarketTop10:
    dealMarketTop10(order, book, lastTradePrice)
  case enum.TypeMarketOpponent:
    dealMarketOpponent(order, book, lastTradePrice)
  }
}

每个类型再分买卖方向处理,以 dealLimit() 为例:

func dealLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
  switch order.Side {
  case enum.SideBuy:
    dealBuyLimit(order, book, lastTradePrice)
  case enum.SideSell:
    dealSellLimit(order, book, lastTradePrice)
  }
}

然后,再来看看 dealBuyLimit() 的处理逻辑:

func dealBuyLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
LOOP:
  headOrder := book.getHeadSellOrder()
  if headOrder == nil || order.Price.LessThan(headOrder.Price) {
    book.addBuyOrder(order)
    log.Info("engine %s, a order has added to the orderbook: %s", order.Symbol, order.ToJson())
  } else {
    matchTrade(headOrder, order, book, lastTradePrice)
    if order.Amount.IsPositive() {
      goto LOOP
    }
  }
}

我来解析下这个处理流程:

  1. 从委托账本中读取出卖单队列的头部订单;
  2. 如果头部订单为空,或新订单(买单)价格小于头部订单(卖单),则无法匹配成交,那就将新订单添加到委托账本的买单队列中去;
  3. 如果头部订单不为空,且新订单(买单)价格大于等于头部订单(卖单),则两个订单可以匹配成交,那就对这两个订单进行成交处理;
  4. 如果上一步的成交处理完之后,新订单的剩余数量还不为零,那就继续重复第一步。

其中,匹配成交的记录会作为一条输出记录发送到 MQ。

对其他类型的处理也是类似的,就不再一一讲解了。

那引擎包的实现就先讲到这里,后续文章再聊其他部分的实现。


小结


本小节主要还是通过代码梳理清楚整个数据流程,包括一些细节上的设计。理解了本文所列举的这些代码,也就对整个撮合服务的实现理解一大半了。

这次的思考题:ChanMap 保存的订单通道是否可以改用无缓冲的通道?用无缓冲的通道和用有缓冲的通道处理逻辑有哪些不同?两种方案各自的优缺点是什么?


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
7月前
|
安全 区块链 数据库
DAPP持币生息项目系统开发|步骤逻辑|源码案例
智能保证执行安全,并减少交易成本。智能合约允许在没有第三方的情况下进行可信交易,且交易可追踪、不可逆转
预约抢单互助系统开发详细功能/需求方案/步骤功能/逻辑项目/源码案例
The development model of appointment and order grabbing mutual assistance system is a widely used development model on mutual assistance service platforms. It adopts a combination of appointment and order grabbing modes, allowing users to make appointments or actively participate in mutual assistanc
|
7月前
|
安全
外汇交易所系统开发规则玩法/步骤逻辑/方案项目/教程指南/源码流程
The development of foreign exchange system involves a series of functions and features, aiming to provide a safe, efficient, transparent, and reliable trading platform for foreign exchange trading. The following are the functions that may be involved in the development of the foreign exchange exchan
|
7月前
|
新零售 人工智能 搜索推荐
2+1链动互助模式系统开发|项目方案|流程分析
对于消费者而言,我们已经习惯了便捷的网络购物方式,但是网购我们无法了解商品的质量,
|
7月前
|
安全
什么是外汇交易所系统开发步骤详细丨案例设计丨需求逻辑丨源码项目
The foreign exchange system is one of the key systems in the financial field, providing investors with foreign exchange trading services. When developing a foreign exchange exchange system
怎么做问答推广?问答营销的步骤和逻辑
怎么做问答推广?问答营销的步骤和逻辑
|
存储 前端开发 安全
dapp矩阵公排互助预约排单抢单项目系统开发指南流程丨案例设计丨功能逻辑丨规则玩法丨项目方案丨源码程序
需求分析:与团队明确系统的需求和目标,包括公排互助预约排单抢单项目系统的功能、规则、奖励机制等方面。
|
存储 安全 前端开发
DApp公排互助预约抢单排单模式系统开发参考版/详细流程/方案逻辑/规则玩法/案例设计/源码程序
需求分析:与团队明确系统的需求、目标和范围,包括公排互助预约抢单排单模式系统的功能、规则、奖励机制等方面
|
敏捷开发 存储 测试技术
链动2+1系统开发项目案例丨指南教程丨需求方案丨功能设计丨成熟技术丨步骤逻辑丨源码程序
用户需求导向:系统开发应以用户需求为中心,从用户的角度思考,了解用户的真实需求和期望,以提供优质的用户体验。