Python 机器学习算法交易实用指南(一)(2)https://developer.aliyun.com/article/1523716
重建交易和订单簿
解析的消息允许我们重建给定日期的订单流。'R'
消息类型包含在给定日期内交易的所有股票列表,包括有关首次公开募股(IPOs)和交易限制的信息。
在一天的交易过程中,会添加新订单,并删除执行和取消的订单。对于引用前一日期放置的订单的消息,进行适当的会计处理需要跟踪多天的订单簿,但我们在此忽略了此方面的内容。
get_messages()
函数说明了如何收集影响交易的单一股票的订单(有关每个消息的详细信息,请参阅 ITCH 规范,略有简化,请参阅笔记本):
def get_messages(date, stock=stock): """Collect trading messages for given stock""" with pd.HDFStore(itch_store) as store: stock_locate = store.select('R', where='stock = stock').stock_locate.iloc[0] target = 'stock_locate = stock_locate' data = {} # relevant message types messages = ['A', 'F', 'E', 'C', 'X', 'D', 'U', 'P', 'Q'] for m in messages: data[m] = store.select(m, where=target).drop('stock_locate', axis=1).assign(type=m) order_cols = ['order_reference_number', 'buy_sell_indicator', 'shares', 'price'] orders = pd.concat([data['A'], data['F']], sort=False, ignore_index=True).loc[:, order_cols] for m in messages[2: -3]: data[m] = data[m].merge(orders, how='left') data['U'] = data['U'].merge(orders, how='left', right_on='order_reference_number', left_on='original_order_reference_number', suffixes=['', '_replaced']) data['Q'].rename(columns={'cross_price': 'price'}, inplace=True) data['X']['shares'] = data['X']['cancelled_shares'] data['X'] = data['X'].dropna(subset=['price']) data = pd.concat([data[m] for m in messages], ignore_index=True, sort=False)
重建成功的交易,即作为与被取消的订单相对的订单的交易相关消息,C
、E
、P
和 Q
,相对比较简单:
def get_trades(m): """Combine C, E, P and Q messages into trading records""" trade_dict = {'executed_shares': 'shares', 'execution_price': 'price'} cols = ['timestamp', 'executed_shares'] trades = pd.concat([m.loc[m.type == 'E', cols + ['price']].rename(columns=trade_dict), m.loc[m.type == 'C', cols + ['execution_price']].rename(columns=trade_dict), m.loc[m.type == 'P', ['timestamp', 'price', 'shares']], m.loc[m.type == 'Q', ['timestamp', 'price', 'shares']].assign(cross=1), ], sort=False).dropna(subset=['price']).fillna(0) return trades.set_index('timestamp').sort_index().astype(int)
订单簿跟踪限价订单,并且买入和卖出订单的各种价格水平构成了订单簿的深度。要重建给定深度级别的订单簿,需要以下步骤:
add_orders()
函数累积卖单按升序排列,买单按降序排列,以给定时间戳为基础直至达到所需的深度级别:
def add_orders(orders, buysell, nlevels): new_order = [] items = sorted(orders.copy().items()) if buysell == -1: items = reversed(items) for i, (p, s) in enumerate(items, 1): new_order.append((p, s)) if i == nlevels: break return orders, new_order
- 我们遍历所有 ITCH 消息,并根据规范要求处理订单及其替换:
for message in messages.itertuples(): i = message[0] if np.isnan(message.buy_sell_indicator): continue message_counter.update(message.type) buysell = message.buy_sell_indicator price, shares = None, None if message.type in ['A', 'F', 'U']: price, shares = int(message.price), int(message.shares) current_orders[buysell].update({price: shares}) current_orders[buysell], new_order = add_orders(current_orders[buysell], buysell, nlevels) order_book[buysell][message.timestamp] = new_order if message.type in ['E', 'C', 'X', 'D', 'U']: if message.type == 'U': if not np.isnan(message.shares_replaced): price = int(message.price_replaced) shares = -int(message.shares_replaced) else: if not np.isnan(message.price): price = int(message.price) shares = -int(message.shares) if price is not None: current_orders[buysell].update({price: shares}) if current_orders[buysell][price] <= 0: current_orders[buysell].pop(price) current_orders[buysell], new_order = add_orders(current_orders[buysell], buysell, nlevels) order_book[buysell][message.timestamp] = new_order
不同价格水平上的订单数量,在以下截图中使用不同强度的颜色突出显示买入和卖出订单的深度流动性。左侧面板显示了限价订单价格分布偏向于更高价格的买单。右侧面板绘制了交易日内限价订单和价格的演变:深色线跟踪了市场交易小时内的执行交易价格,而红色和蓝色点表示每分钟的限价订单(详见笔记本):
规范化 tick 数据
交易数据按纳秒索引,噪音很大。例如,出现买卖市价订单交替引发的买卖跳动,导致价格在买入和卖出价格之间波动。为了提高噪声-信号比并改善统计性能,我们需要重新采样和规范化 tick 数据,通过聚合交易活动来实现。
通常,我们收集聚合期间的开盘(第一个)、最低、最高和收盘(最后)价格,以及成交量加权平均价格(VWAP)、交易的股数和与数据相关的时间戳。
在此章节的 GitHub 文件夹中查看名为normalize_tick_data.ipynb
的笔记本,以获取额外的细节。
Tick 柱状图
AAPL
的原始 tick 价格和成交量数据的图表如下:
stock, date = 'AAPL', '20180329' title = '{} | {}'.format(stock, pd.to_datetime(date).date() with pd.HDFStore(itch_store) as store: s = store['S'].set_index('event_code') # system events s.timestamp = s.timestamp.add(pd.to_datetime(date)).dt.time market_open = s.loc['Q', 'timestamp'] market_close = s.loc['M', 'timestamp'] with pd.HDFStore(stock_store) as store: trades = store['{}/trades'.format(stock)].reset_index() trades = trades[trades.cross == 0] # excluding data from open/close crossings trades.price = trades.price.mul(1e-4) trades.price = trades.price.mul(1e-4) # format price trades = trades[trades.cross == 0] # exclude crossing trades trades = trades.between_time(market_open, market_close) # market hours only tick_bars = trades.set_index('timestamp') tick_bars.index = tick_bars.index.time tick_bars.price.plot(figsize=(10, 5), title=title), lw=1)
我们得到了前述代码的下列图表:
由于scipy.stats.normaltest
的 p 值较低,可以看出 tick 返回远非正态分布:
from scipy.stats import normaltest normaltest(tick_bars.price.pct_change().dropna()) NormaltestResult(statistic=62408.76562431228, pvalue=0.0)
时间柱状图
时间柱状图涉及按周期聚合交易:
def get_bar_stats(agg_trades): vwap = agg_trades.apply(lambda x: np.average(x.price, weights=x.shares)).to_frame('vwap') ohlc = agg_trades.price.ohlc() vol = agg_trades.shares.sum().to_frame('vol') txn = agg_trades.shares.size().to_frame('txn') return pd.concat([ohlc, vwap, vol, txn], axis=1) resampled = trades.resample('1Min') time_bars = get_bar_stats(resampled)
我们可以将结果显示为价格-成交量图:
def price_volume(df, price='vwap', vol='vol', suptitle=title): fig, axes = plt.subplots(nrows=2, sharex=True, figsize=(15, 8)) axes[0].plot(df.index, df[price]) axes[1].bar(df.index, df[vol], width=1 / (len(df.index)), color='r') xfmt = mpl.dates.DateFormatter('%H:%M') axes[1].xaxis.set_major_locator(mpl.dates.HourLocator(interval=3)) axes[1].xaxis.set_major_formatter(xfmt) axes[1].get_xaxis().set_tick_params(which='major', pad=25) axes[0].set_title('Price', fontsize=14) axes[1].set_title('Volume', fontsize=14) fig.autofmt_xdate() fig.suptitle(suptitle) fig.tight_layout() plt.subplots_adjust(top=0.9) price_volume(time_bars)
我们得到了前述代码的下列图表:
或者使用bokeh
绘图库绘制蜡烛图:
resampled = trades.resample('5Min') # 5 Min bars for better print df = get_bar_stats(resampled) increase = df.close > df.open decrease = df.open > df.close w = 2.5 * 60 * 1000 # 2.5 min in ms WIDGETS = "pan, wheel_zoom, box_zoom, reset, save" p = figure(x_axis_type='datetime', tools=WIDGETS, plot_width=1500, title = "AAPL Candlestick") p.xaxis.major_label_orientation = pi/4 p.grid.grid_line_alpha=0.4 p.segment(df.index, df.high, df.index, df.low, color="black") p.vbar(df.index[increase], w, df.open[increase], df.close[increase], fill_color="#D5E1DD", line_color="black") p.vbar(df.index[decrease], w, df.open[decrease], df.close[decrease], fill_color="#F2583E", line_color="black") show(p)
请看以下截图:
绘制 AAPL 蜡烛图
成交量柱状图
时间柱状图平滑了原始 tick 数据中的一些噪音,但可能未能解决订单碎片化的问题。以执行为中心的算法交易可能旨在在给定期间内匹配成交量加权平均价格(VWAP),并将单个订单分成多个交易,并根据历史模式下订单。时间柱状图会对相同的订单进行不同处理,即使市场没有新的信息到达。
成交量柱状图提供了一种根据成交量聚合交易数据的替代方法。我们可以按以下方式实现:
trades_per_min = trades.shares.sum()/(60*7.5) # min per trading day trades['cumul_vol'] = trades.shares.cumsum() df = trades.reset_index() by_vol = df.groupby(df.cumul_vol.div(trades_per_min).round().astype(int)) vol_bars = pd.concat([by_vol.timestamp.last().to_frame('timestamp'), get_bar_stats(by_vol)], axis=1) price_volume(vol_bars.set_index('timestamp'))
我们得到了前述代码的下列图表:
美元柱状图
当资产价格发生显着变化或股票拆分后,给定数量股票的价值也会发生变化。 Volume bars 不会正确反映这一点,并且可能妨碍对反映这些变化的不同期间的交易行为进行比较。 在这些情况下,应调整 volume bar 方法,以利用股票和价格的乘积来生成美元 bars。
市场数据的 API 访问
有几种选项可以使用 Python 通过 API 访问市场数据。 我们首先介绍了内置于 pandas
库中的几个数据源。 然后,我们简要介绍了交易平台 Quantopian,数据提供商 Quandl 以及本书稍后将使用的回测库,并列出了访问各种类型市场数据的几种其他选项。 在 GitHub 上的文件夹目录 data_providers
包含了几个示例笔记本,演示了这些选项的使用方法。
使用 pandas 进行远程数据访问
pandas
库使用 read_html
函数访问网站上显示的数据,并通过相关的 pandas-datareader
库访问各种数据提供商的 API 端点。
阅读 HTML 表格
下载一个或多个 html
表格的内容的方法如下,例如从 Wikipedia 获取 S&P500
指数的成分:
sp_url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies' sp = pd.read_html(sp_url, header=0)[0] # returns a list for each table sp.info() RangeIndex: 505 entries, 0 to 504 Data columns (total 9 columns): Ticker symbol 505 non-null object Security 505 non-null object SEC filings 505 non-null object GICS Sector 505 non-null object GICS Sub Industry 505 non-null object Location 505 non-null object Date first added[3][4] 398 non-null object CIK 505 non-null int64 Founded 139 non-null object
用于市场数据的 pandas-datareader
pandas
曾用于直接便捷访问数据提供商的 API,但该功能已迁移到相关的 pandas-datareader
库。 API 的稳定性因提供商政策而异,在 2018 年 6 月的版本 0.7 中,以下数据源可用:
来源 | 范围 | 注释 |
Yahoo! Finance | 股票和外汇对的 EOD 价格、分红、拆分数据 | 不稳定 |
Tiingo | 股票、共同基金和交易所交易基金的 EOD 价格 | 需要免费注册 |
投资者交易所(IEX) | 历史股票价格,订单簿数据 | 限制为五年 |
Robinhood | EOD 股票价格 | 限制为一年 |
Quandl | 各种资产价格的市场 | 高级数据需要订阅 |
纳斯达克 | 最新在纳斯达克交易的股票代码以及一些额外信息 | |
Stooq | 一些股票市场指数数据 | |
MOEX | 莫斯科证券交易所数据 | |
Alpha Vantage | EOD 股票价格和外汇对 | |
Fama/French | 来自 FF 数据库的因子收益和研究组合 |
访问和检索数据的方式对所有数据源都是相似的,如 Yahoo! Finance 所示:
import pandas_datareader.data as web from datetime import datetime start = '2014' # accepts strings end = datetime(2017, 5, 24) # or datetime objects yahoo= web.DataReader('FB', 'yahoo', start=start, end=end) yahoo.info() DatetimeIndex: 856 entries, 2014-01-02 to 2017-05-25 Data columns (total 6 columns): High 856 non-null float64 Low 856 non-null float64 Open 856 non-null float64 Close 856 non-null float64 Volume 856 non-null int64 Adj Close 856 non-null float64 dtypes: float64(5), int64(1)
投资者交易所
IEX 是作为对高频交易争议的回应而启动的另一种交易所,出现在迈克尔·刘易斯有争议的《闪 Boys》中。 它旨在减缓交易速度,创造一个更公平的竞争环境,并自 2016 年推出以来一直在迅速增长,但在 2018 年 6 月仍然很小,市场份额约为 2.5%。
除了历史结束日价格和成交量数据外,IEX 还提供实时的订单簿深度报价,通过价格和方向聚合订单的规模。该服务还包括最后成交价和规模信息:
book = web.get_iex_book('AAPL') orders = pd.concat([pd.DataFrame(book[side]).assign(side=side) for side in ['bids', 'asks']]) orders.sort_values('timestamp').head() price size timestamp side 4 140.00 100 1528983003604 bids 3 175.30 100 1528983900163 bids 3 205.80 100 1528983900163 asks 1 187.00 200 1528996876005 bids 2 186.29 100 1528997296755 bids
在datareader.ipynb
笔记本中查看更多示例。
Quantopian
Quantopian 是一家投资公司,提供一个研究平台来集体开发交易算法。免费注册后,它使会员能够使用各种数据源研究交易想法。它还提供了一个环境,用于对算法进行历史数据的回测,以及使用实时数据进行样本外测试。对于表现最佳的算法,它授予投资额度,其作者有权获得 10%的利润份额(在撰写本文时)。
Quantopian 研究平台包括用于 Alpha 因子研究和绩效分析的 Jupyter Notebook 环境。还有一个用于编写算法策略和使用自 2002 年以来带有分钟柱频率的历史数据回测结果的交互式开发环境(IDE)。
用户还可以使用实时数据模拟算法,这称为纸上交易。Quantopian 提供各种市场数据集,包括美国股票和期货价格和成交量数据,频率为一分钟,以及美国股票公司基本面数据,并集成了众多替代数据集。
我们将在第四章中更详细地介绍 Quantopian 平台,Alpha 因子研究并且在整本书中依赖其功能,所以随时打开一个账户(有关更多详细信息,请参阅 GitHub repo)。
Zipline
Zipline 是算法交易库,为 Quantopian 回测和实时交易平台提供支持。它也可以离线使用,使用有限数量的免费数据包来开发策略,这些数据包可以被摄取并用于测试交易想法的表现,然后将结果转移到在线 Quantopian 平台进行纸上和实时交易。
以下代码说明了zipline
允许我们访问一系列公司的每日股票数据。您可以在 Jupyter Notebook 中使用相同名称的魔术函数运行zipline
脚本。
首先,您需要使用所需的安全符号初始化上下文。我们还将使用一个计数器变量。然后zipline
调用handle_data
,在其中我们使用data.history()
方法回顾一个单一周期,并将上一天的数据附加到.csv
文件中:
%load_ext zipline %%zipline --start 2010-1-1 --end 2018-1-1 --data-frequency daily from zipline.api import order_target, record, symbol def initialize(context): context.i = 0 context.assets = [symbol('FB'), symbol('GOOG'), symbol('AMZN')] def handle_data(context, data): df = data.history(context.assets, fields=['price', 'volume'], bar_count=1, frequency="1d") df = df.to_frame().reset_index() if context.i == 0: df.columns = ['date', 'asset', 'price', 'volumne'] df.to_csv('stock_data.csv', index=False) else: df.to_csv('stock_data.csv', index=False, mode='a', header=None) context.i += 1 df = pd.read_csv('stock_data.csv') df.date = pd.to_datetime(df.date) df.set_index('date').groupby('asset').price.plot(lw=2, legend=True, figsize=(14, 6));
我们得到了上述代码的下列图表:
我们将在接下来的章节中更详细地探讨zipline
的功能,特别是在线 Quantopian 平台。
Quandl
Quandl 提供广泛的数据来源,包括免费和订阅,使用 Python API。注册并获取免费 API 密钥,以进行 50 次以上的调用。Quandl 数据涵盖除股票外的多种资产类别,包括外汇、固定收益、指数、期货和期权以及商品。
API 的使用简单直观,文档完善,灵活性强,除了单个数据系列下载外,还有许多其他方法,例如批量下载或元数据搜索。以下调用获取了自 1986 年以来由美国能源部报价的石油价格:
import quandl oil = quandl.get('EIA/PET_RWTC_D').squeeze() oil.plot(lw=2, title='WTI Crude Oil Price')
我们通过前述代码得到了这个图表:
Python 机器学习算法交易实用指南(一)(4)https://developer.aliyun.com/article/1523719