BackTrader 中文文档(十四)(2)https://developer.aliyun.com/article/1505354
运行所有并评估它!
拥有数据加载器类和策略是不够的。就像任何其他框架一样,需要一些样板。以下代码使其成为可能。
def run(args=None): args = parse_args(args) cerebro = bt.Cerebro() # Data feed kwargs dkwargs = dict(**eval('dict(' + args.dargs + ')')) # Parse from/to-date dtfmt, tmfmt = '%Y-%m-%d', 'T%H:%M:%S' if args.fromdate: fmt = dtfmt + tmfmt * ('T' in args.fromdate) dkwargs['fromdate'] = datetime.datetime.strptime(args.fromdate, fmt) if args.todate: fmt = dtfmt + tmfmt * ('T' in args.todate) dkwargs['todate'] = datetime.datetime.strptime(args.todate, fmt) # add all the data files available in the directory datadir for fname in glob.glob(os.path.join(args.datadir, '*')): data = NetPayOutData(dataname=fname, **dkwargs) cerebro.adddata(data) # add strategy cerebro.addstrategy(St, **eval('dict(' + args.strat + ')')) # set the cash cerebro.broker.setcash(args.cash) cerebro.run() # execute it all # Basic performance evaluation ... final value ... minus starting cash pnl = cerebro.broker.get_value() - args.cash print('Profit ... or Loss: {:.2f}'.format(pnl))
在以下情况下完成:
- 解析参数并使其可用(这显然是可选的,因为一切都可以硬编码,但是良好的实践是好的实践)
- 创建一个
cerebro
引擎实例。是的,这是西班牙语中的“大脑”,是框架的一部分,负责在黑暗中协调编排的操作。虽然它可以接受几个选项,但默认值对于大多数用例来说应该足够了。 - 加载数据文件,使用
args.datadir
的简单目录扫描完成,并使用NetPayOutData
加载所有文件,并将其添加到cerebro
实例中 - 添加策略
- 设置现金,默认为
1,000,000
。考虑到使用情况是100
支股票在500
支股票的宇宙中,似乎有些现金是可以用的。这也是一个可以更改的参数。 - 并调用
cerebro.run()
- 最后评估性能
为了能够直接从命令行运行具有不同参数的事务,下面提供了一个启用了argparse
的样板,其中包含了整个代码
性能评估
通过最终结果值的形式添加了一个简单的性能评估,即:最终净资产价值减去起始现金。
backtrader生态系统提供了一组内置性能分析器,也可以使用,如:SharpeRatio
、Variability-Weighted Return
、SQN
等。参见文档 - 分析器参考
完整的脚本
最后,作品的大部分呈现为整体。享受吧!
import argparse import datetime import glob import os.path import backtrader as bt class NetPayOutData(bt.feeds.GenericCSVData): lines = ('npy',) # add a line containing the net payout yield params = dict( npy=6, # npy field is in the 6th column (0 based index) dtformat='%Y-%m-%d', # fix date format a yyyy-mm-dd timeframe=bt.TimeFrame.Months, # fixed the timeframe openinterest=-1, # -1 indicates there is no openinterest field ) class St(bt.Strategy): params = dict( selcperc=0.10, # percentage of stocks to select from the universe rperiod=1, # period for the returns calculation, default 1 period vperiod=36, # lookback period for volatility - default 36 periods mperiod=12, # lookback period for momentum - default 12 periods reserve=0.05 # 5% reserve capital ) def log(self, arg): print('{} {}'.format(self.datetime.date(), arg)) def __init__(self): # calculate 1st the amount of stocks that will be selected self.selnum = int(len(self.datas) * self.p.selcperc) # allocation perc per stock # reserve kept to make sure orders are not rejected due to # margin. Prices are calculated when known (close), but orders can only # be executed next day (opening price). Price can gap upwards self.perctarget = (1.0 - self.p.reserve) / self.selnum # returns, volatilities and momentums rs = [bt.ind.PctChange(d, period=self.p.rperiod) for d in self.datas] vs = [bt.ind.StdDev(ret, period=self.p.vperiod) for ret in rs] ms = [bt.ind.ROC(d, period=self.p.mperiod) for d in self.datas] # simple rank formula: (momentum * net payout) / volatility # the highest ranked: low vol, large momentum, large payout self.ranks = {d: d.npy * m / v for d, v, m in zip(self.datas, vs, ms)} def next(self): # sort data and current rank ranks = sorted( self.ranks.items(), # get the (d, rank), pair key=lambda x: x[1][0], # use rank (elem 1) and current time "0" reverse=True, # highest ranked 1st ... please ) # put top ranked in dict with data as key to test for presence rtop = dict(ranks[:self.selnum]) # For logging purposes of stocks leaving the portfolio rbot = dict(ranks[self.selnum:]) # prepare quick lookup list of stocks currently holding a position posdata = [d for d, pos in self.getpositions().items() if pos] # remove those no longer top ranked # do this first to issue sell orders and free cash for d in (d for d in posdata if d not in rtop): self.log('Leave {} - Rank {:.2f}'.format(d._name, rbot[d][0])) self.order_target_percent(d, target=0.0) # rebalance those already top ranked and still there for d in (d for d in posdata if d in rtop): self.log('Rebal {} - Rank {:.2f}'.format(d._name, rtop[d][0])) self.order_target_percent(d, target=self.perctarget) del rtop[d] # remove it, to simplify next iteration # issue a target order for the newly top ranked stocks # do this last, as this will generate buy orders consuming cash for d in rtop: self.log('Enter {} - Rank {:.2f}'.format(d._name, rtop[d][0])) self.order_target_percent(d, target=self.perctarget) def run(args=None): args = parse_args(args) cerebro = bt.Cerebro() # Data feed kwargs dkwargs = dict(**eval('dict(' + args.dargs + ')')) # Parse from/to-date dtfmt, tmfmt = '%Y-%m-%d', 'T%H:%M:%S' if args.fromdate: fmt = dtfmt + tmfmt * ('T' in args.fromdate) dkwargs['fromdate'] = datetime.datetime.strptime(args.fromdate, fmt) if args.todate: fmt = dtfmt + tmfmt * ('T' in args.todate) dkwargs['todate'] = datetime.datetime.strptime(args.todate, fmt) # add all the data files available in the directory datadir for fname in glob.glob(os.path.join(args.datadir, '*')): data = NetPayOutData(dataname=fname, **dkwargs) cerebro.adddata(data) # add strategy cerebro.addstrategy(St, **eval('dict(' + args.strat + ')')) # set the cash cerebro.broker.setcash(args.cash) cerebro.run() # execute it all # Basic performance evaluation ... final value ... minus starting cash pnl = cerebro.broker.get_value() - args.cash print('Profit ... or Loss: {:.2f}'.format(pnl)) def parse_args(pargs=None): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, description=('Rebalancing with the Conservative Formula'), ) parser.add_argument('--datadir', required=True, help='Directory with data files') parser.add_argument('--dargs', default='', metavar='kwargs', help='kwargs in k1=v1,k2=v2 format') # Defaults for dates parser.add_argument('--fromdate', required=False, default='', help='Date[time] in YYYY-MM-DD[THH:MM:SS] format') parser.add_argument('--todate', required=False, default='', help='Date[time] in YYYY-MM-DD[THH:MM:SS] format') parser.add_argument('--cerebro', required=False, default='', metavar='kwargs', help='kwargs in k1=v1,k2=v2 format') parser.add_argument('--cash', default=1000000.0, type=float, metavar='kwargs', help='kwargs in k1=v1,k2=v2 format') parser.add_argument('--strat', required=False, default='', metavar='kwargs', help='kwargs in k1=v1,k2=v2 format') return parser.parse_args(pargs) if __name__ == '__main__': run()
MFI 通用
原文:
www.backtrader.com/blog/2019-07-17-mfi-generic/mfi-generic/
在最近的规范与非规范文章中,开发了MFI
(也称为MoneyFlowIndicator
)。
尽管它是以规范方式开发的,但仍然存在一些改进和通用化的空间。
让我们关注实现的第 1 行,创建典型价格的行
class MFI_Canonical(bt.Indicator): lines = ('mfi',) params = dict(period=14) def __init__(self): tprice = (self.data.close + self.data.low + self.data.high) / 3.0 mfraw = tprice * self.data.volume ...
典型的实例化可能如下所示
class MyMFIStrategy(bt.Strategy): def __init__(self): mfi = bt.MFI_Canonical(self.data)
这里的问题应该是显而易见的:“需要为具有close
、low
、high
和volume
组件(也称为backtrader生态系统中的lines)的指标提供输入”
当然,可能会有这样一种情况,即希望使用来自不同数据源(数据源的线或其他指标的线)的组件创建MoneyFlowIndicator
,就像想要给close
赋予更大的权重一样,而无需开发特定的指标。考虑到行业标准的OHLCV
字段排序,一个多输入、额外加权close
的实例化可能如下所示
class MyMFIStrategy2(bt.Strategy): def __init__(self): wclose = self.data.close * 5.0 mfi = bt.MFI_Canonical(self.data.high, self.data.low, wclose, self.data.volume)
或者因为用户之前使用过ta-lib
,喜欢多输入样式。
支持多个输入
backtrader 尽可能地遵循pythonic的原则,self.datas
数组包含系统中数据源的列表(并且自动提供给您的策略),可以查询其长度。让我们使用这个来区分调用者想要的内容,并正确计算tprice
和mfraw
`def init(self):
if len(self.datas) == 1: # 传递了 1 个数据源,必须有分量 tprice = (self.data.close + self.data.low + self.data.high) / 3.0 mfraw = tprice * self.data.volume 否则: # 如果有多个数据源,则按照 OHLCV 的顺序提取各个分量 tprice = (self.data0 + self.data1 + self.data2) / 3.0 mfraw = tprice * self.data3 # 与之前的实现相比没有变化 flowpos = bt.ind.SumN(mfraw * (tprice > tprice(-1)), period=self.p.period) flowneg = bt.ind.SumN(mfraw * (tprice < tprice(-1)), period=self.p.period) mfiratio = bt.ind.DivByZero(flowpos, flowneg, zero=100.0) self.l.mfi = 100.0 - 100.0 / (1.0 + mfiratio)`
注意
请注意,如何引用各个分量,例如self.dataX
(例如self.data0
、self.data1
)
这与使用self.datas[x]
相同,如self.datas[0]
…
让我们从图形上看到,这个指标产生了与规范相同的结果,当多个输入对应于数据源的原始组件时也是如此。为此,它将在策略中运行,如下所示
class MyMFIStrategy2(bt.Strategy): def __init__(self): MFI_Canonical(self.data) MFI_MultipleInputs(self.data, plotname='MFI Single Input') MFI_MultipleInputs(self.data.high, self.data.low, self.data.close, self.data.volume, plotname='MFI Multiple Inputs')
无需每个值都进行检查,从图片上显然可以看出这三个结果是相同的。
最后让我们看看如果给close
加上更多的权重会发生什么。让我们这样运行。
class MyMFIStrategy2(bt.Strategy): def __init__(self): MFI_MultipleInputs(self.data) MFI_MultipleInputs(self.data.high, self.data.low, self.data.close * 5.0, self.data.volume, plotname='MFI Close * 5.0')
这是否有意义留给读者决定,但可以清楚地看到给close
添加权重已经改变了模式。
结论
通过简单使用 Pythonic 的len
,一个人可以将一个使用多个组件(和固定名称)的数据源的指标转换为接受多个通用输入的指标。
规范与非规范指标
原文:
www.backtrader.com/blog/2019-07-08-canonical-or-not/canonical-or-not/
这个问题已经出现了几次,或多或少地是这样的:
- 如何使用backtrader最佳/规范地实现这个或那个?
作为backtrader的目标之一是灵活地支持尽可能多的情况和用例,答案很简单:“至少有几种方法”。总结一下指标,这是最常见的问题:
__init__
方法中的 100%声明next
方法中的 100%逐步操作- 在复杂情况下,将上述两者混合在一起,以满足声明部分无法覆盖所有所需计算的需求。
对backtrader内置指标的快速查看表明,它们都是以声明方式实现的。原因是
- 更容易做到
- 更易读
- 更优雅
- 矢量化和基于事件的实现会自动管理
什么?!?!自动实现矢量化??
是的。如果一个指标完全在__init_
方法中实现,Python 中的元类和运算符重载的魔法将产生以下结果
- 矢量化实现(在运行回测时的默认设置)
- 基于事件的实现(例如用于实时交易)
另一方面,如果指标的任何部分在next
方法中实现:
- 这是直接用于基于事件的运行的代码。
- 矢量化将通过在后台为每个数据点调用
next
方法来模拟
注意
这意味着即使某个特定指标没有矢量化实现,所有其他具有矢量化实现的指标仍将以矢量化方式运行
资金流指数:一个例子
社区用户*@Rodrigo Brito发布了一个使用next
方法进行实现的"资金流指数(Money Flow Index)"*指标的版本。
代码
class MFI(bt.Indicator): lines = ('mfi', 'money_flow_raw', 'typical', 'money_flow_pos', 'money_flow_neg') plotlines = dict( money_flow_raw=dict(_plotskip=True), money_flow_pos=dict(_plotskip=True), money_flow_neg=dict(_plotskip=True), typical=dict(_plotskip=True), ) params = ( ('period', 14), ) def next(self): typical_price = (self.data.close[0] + self.data.low[0] + self.data.high[0]) / 3 money_flow_raw = typical_price * self.data.volume[0] self.lines.typical[0] = typical_price self.lines.money_flow_raw[0] = money_flow_raw self.lines.money_flow_pos[0] = money_flow_raw if self.lines.typical[0] >= self.lines.typical[-1] else 0 self.lines.money_flow_neg[0] = money_flow_raw if self.lines.typical[0] <= self.lines.typical[-1] else 0 pos_period = math.fsum(self.lines.money_flow_pos.get(size=self.p.period)) neg_period = math.fsum(self.lines.money_flow_neg.get(size=self.p.period)) if neg_period == 0: self.lines.mfi[0] = 100 return self.lines.mfi[0] = 100 - 100 / (1 + pos_period / neg_period)
注意
保持原样发布,包括需要水平滚动的长行
@Rodrigo Brito 已经注意到临时线条的使用(除了mfi
之外的所有线条)可能需要优化。确实,但在作者的谦逊意见中,实际上一切都可以稍加优化。
为了有共同的工作基础,可以使用StockCharts的*“资金流指数(Money Flow Index)”*定义,并查看上述实现是否良好。这是链接:
有了这个,一个快速的规范实现MFI
指标
class MFI_Canonical(bt.Indicator): lines = ('mfi',) params = dict(period=14) def __init__(self): tprice = (self.data.close + self.data.low + self.data.high) / 3.0 mfraw = tprice * self.data.volume flowpos = bt.ind.SumN(mfraw * (tprice > tprice(-1)), period=self.p.period) flowneg = bt.ind.SumN(mfraw * (tprice < tprice(-1)), period=self.p.period) mfiratio = bt.ind.DivByZero(flowpos, flowneg, zero=100.0) self.l.mfi = 100.0 - 100.0 / (1.0 + mfiratio)
人们应该立即注意到
- 定义了一个单行
mfi
。没有临时变量。 - 没有需要
[0]
数组索引的需求,看起来更干净 - 这里或那里没有单个
if
- 更紧凑但更易读
如果将两个运行对同一数据集绘制的图表,会是这样的
图表显示,规范和非规范版本在开始时除外,显示相同的值和发展。
- 非规范版本从一开始就提供值
- 它提供了无意义的值(100.0,直到提供额外的 1 个值,这也不好),因为它无法正确地提供
相比之下:
- 规范版本在达到最短预热时间后自动开始提供数值。
- 没有人为干预是必需的(肯定是 “人工智能” 或 “机器学习”,… 双关语)
查看受影响区域的近景图片
注意
当然,可以尝试通过以下方式缓解非规范版本的这种情况:
- 从已经具有
period
参数并知道如何处理它的bt.ind.PeriodN
子类化(并在__init__
期间调用super
)
还要注意,规范版本也像逐步next
代码一样考虑了公式中可能出现的除零情况。
if neg_period == 0: self.lines.mfi[0] = 100 return self.lines.mfi[0] = 100 - 100 / (1 + pos_period / neg_period)
这是另一种方法
mfiratio = bt.ind.DivByZero(flowpos, flowneg, zero=100.0) self.l.mfi = 100.0 - 100.0 / (1.0 + mfiratio)
不是有很多行,一个return
语句和对输出行的不同赋值,而是对mfiratio
计算的单个声明和对输出行mfi
的单个赋值(按照StockCharts公式)
结论
希望这能解释在规范(即:在__init__
中声明式)或非规范方式(逐步进行,使用数组索引在next
中)中实现某些内容时的差异。
BackTrader 中文文档(十四)(4)https://developer.aliyun.com/article/1505357