转载需注明出处:http://blog.csdn.net/minimicall?viewmode=contents,http://cloudtrade.top
数据读取需要定义一个读者。直接见下面代码:
namespace QuantConnect.Lean.Engine.DataFeeds { /******************************************************** * CLASS DEFINITIONS *********************************************************/ /// <summary> /// Subscription data reader is a wrapper on the stream reader class to download, unpack and iterate over a data file. /// 订阅数据的读者是对数据流读取类的一个封装,负责下载、解压、遍历数据 /// </summary> /// <remarks>The class accepts any subscription configuration and automatically makes it availble to enumerate</remarks> public class SubscriptionDataReader : IEnumerator<BaseData> { /******************************************************** * CLASS PRIVATE VARIABLES *********************************************************/ /// Source string to create memory stream: private string _source = ""; ///Default true to fillforward for this subscription, take the previous result and continue returning it till the next time barrier. private bool _isFillForward = true; ///Date of this source file. private DateTime _date = new DateTime(); ///End of stream from the reader private bool _endOfStream = false; /// Internal stream reader for processing data line by line: private IStreamReader _reader = null; /// All streams done async via web protocols: private WebClient _web = new WebClient(); /// Configuration of the data-reader:订阅数据的配置 private SubscriptionDataConfig _config; /// Subscription Securities Access证券 private Security _security; /// true if we can find a scale factor file for the security of the form: ..\Lean\Data\equity\factor_files\{SYMBOL}.csv private bool _hasScaleFactors = false; // Subscription is for a QC type: private bool _isDynamicallyLoadedData = false; //Symbol Mapping: private string _mappedSymbol = ""; /// Location of the datafeed - the type of this data. 数据类型 private readonly DataFeedEndpoint _feedEndpoint; /// Object Activator - Fast create new instance of "Type": private readonly Func<object[], object> _objectActivator; ///Create a single instance to invoke all Type Methods: private readonly BaseData _dataFactory; /// Remember edge conditions as market enters/leaves open-closed.这是什么鬼 private BaseData _lastBarOfStream = null; private BaseData _lastBarOutsideMarketHours = null; //Start finish times of the backtest: private readonly DateTime _periodStart; private readonly DateTime _periodFinish; private readonly FactorFile _factorFile; private readonly MapFile _mapFile; // we set the price factor ratio when we encounter a dividend in the factor file // and on the next trading day we use this data to produce the dividend instance private decimal? _priceFactorRatio; // we set the split factor when we encounter a split in the factor file // and on the next trading day we use this data to produce the split instance private decimal? _splitFactor; /******************************************************** * CLASS PUBLIC VARIABLES *********************************************************/ /// <summary> /// Last read BaseData object from this type and source /// 最近一个读取出来的数据 /// </summary> public BaseData Current { get; private set; } /// <summary> /// Explicit Interface Implementation for Current /// </summary> object IEnumerator.Current { get { return Current; } } /// <summary> /// Provides a means of exposing extra data related to this subscription. /// For now we expose dividend data for equities through here /// 额外数据:提供证券订阅数据额外相关的数据的存储,这里指股息 /// </summary> /// <remarks> /// It is currently assumed that whomever is pumping data into here is handling the /// time syncing issues. Dividends do this through the RefreshSource method /// </remarks> public Queue<BaseData> AuxiliaryData { get; private set; } /// <summary> /// Save an instance of the previous basedata we generated /// 上一个数据(比Current更前) /// </summary> public BaseData Previous { get; private set; } /// <summary> /// Source has been completed, load up next stream or stop asking for data. /// </summary> public bool EndOfStream { get { return _endOfStream || _reader == null; } set { _endOfStream = value; } } /******************************************************** * CLASS CONSTRUCTOR *********************************************************/ /// <summary> /// Subscription data reader takes a subscription request, loads the type, accepts the data source and enumerate on the results. /// </summary> /// <param name="config">Subscription configuration object 订阅配置</param> /// <param name="security">Security asset 证券</param> /// <param name="feed">Feed type enum 类型</param> /// <param name="periodStart">Start date for the data request/backtest 开始时间</param> /// <param name="periodFinish">Finish date for the data request/backtest 结束时间</param> public SubscriptionDataReader(SubscriptionDataConfig config, Security security, DataFeedEndpoint feed, DateTime periodStart, DateTime periodFinish) { Console.WriteLine("SubscriptionDataReader,SubscriptionDataConfig:<symbol:{0},resolution:{1}>" ,config.Symbol,config.Resolution); //Save configuration of data-subscription: _config = config;//配置 AuxiliaryData = new Queue<BaseData>();//额外数据队列 //Save access to fill foward flag:是否向前填充 _isFillForward = config.FillDataForward; //Save Start and End Dates:起止时间 _periodStart = periodStart; _periodFinish = periodFinish; //Save access to securities _security = security;//证券 _isDynamicallyLoadedData = security.IsDynamicallyLoadedData;//是否动态加载数据 // do we have factor tables? _hasScaleFactors = FactorFile.HasScalingFactors(config.Symbol);//变换因子 //Save the type of data we'll be getting from the source.数据类型 _feedEndpoint = feed; //Create the dynamic type-activators:提供一种创建给定类型对象的方法 _objectActivator = ObjectActivator.GetActivator(config.Type); if (_objectActivator == null) {//创建失败 Engine.ResultHandler.ErrorMessage("Custom data type '" + config.Type.Name + "' missing parameterless constructor E.g. public " + config.Type.Name + "() { }"); _endOfStream = true; return; } //Create an instance of the "Type": var userObj = _objectActivator.Invoke(new object[] { }); _dataFactory = userObj as BaseData; //If its quandl set the access token in data factory: var quandl = _dataFactory as Quandl; if (quandl != null) { if (!Quandl.IsAuthCodeSet) { Quandl.SetAuthCode(Config.Get("quandl-auth-token")); } } //Load the entire factor and symbol mapping tables into memory, we'll start with some defaults Console.WriteLine("load the entire factor and symbol mapping tables into memory"); _factorFile = new FactorFile(config.Symbol, new List<FactorFileRow>()); _mapFile = new MapFile(config.Symbol, new List<MapFileRow>()); try { if (_hasScaleFactors) { _factorFile = FactorFile.Read(config.Symbol); _mapFile = MapFile.Read(config.Symbol); } } catch (Exception err) { Log.Error("SubscriptionDataReader(): Fetching Price/Map Factors: " + err.Message); } } /******************************************************** * CLASS METHODS *********************************************************/ /// <summary> /// Try and create a new instance of the object and return it using the MoveNext enumeration pattern ("Current" public variable). /// 试图创建一个实例,付给Current /// </summary> /// <remarks>This is a highly called method and should be kept lean as possible.</remarks> /// <returns>Boolean true on successful move next. Set Current public property.</returns> public bool MoveNext() { Console.WriteLine("MoveNext()"); // yield the aux data first附加数据优先 if (AuxiliaryData.Count != 0) { Previous = Current;//将现在这个付给Previous Current = AuxiliaryData.Dequeue();//将这个 附属数据弹出付给Current Console.WriteLine("AuxiliaryData:symbol:"+Current.Symbol+",price:"+Current.Price); return true; } BaseData instance = null; var instanceMarketOpen = false; Log.Debug("SubscriptionDataReader.MoveNext(): Starting MoveNext..."); try { //Calls this when no file, first "moveNext()" in refresh source. if (_endOfStream || _reader == null || _reader.EndOfStream) { if (_reader == null) { //Handle the 1% of time:: getReader failed e.g. missing day so skip day: Current = null; } else { //This is a MoveNext() after reading the last line of file: _lastBarOfStream = Current; } _endOfStream = true; return false; } //Log.Debug("SubscriptionDataReader.MoveNext(): Launching While-InstanceNotNull && not EOS: " + reader.EndOfStream); //Keep looking until output's an instance: while (instance == null && !_reader.EndOfStream) { //Get the next string line from file, create instance of BaseData:读取一行 var line = _reader.ReadLine();//这个_reader里面其实会调用系统的输入流的readLine() try { instance = _dataFactory.Reader(_config, line, _date, _feedEndpoint);//将字符行变成对象 } catch (Exception err) { //Log.Debug("SubscriptionDataReader.MoveNext(): Error invoking instance: " + err.Message); Engine.ResultHandler.RuntimeError("Error invoking " + _config.Symbol + " data reader. Line: " + line + " Error: " + err.Message, err.StackTrace); _endOfStream = true; continue; } if (instance != null) { // we care if the market was open at any time over the bar判断是否在开市时间段内 instanceMarketOpen = Exchange.IsOpenDuringBar(instance.Time, instance.EndTime, false); //Apply custom user data filters: try { if (!_security.DataFilter.Filter(_security, instance)) { instance = null; continue; } } catch (Exception err) { Log.Error("SubscriptionDataReader.MoveNext(): Error applying filter: " + err.Message); Engine.ResultHandler.RuntimeError("Runtime error applying data filter. Assuming filter pass: " + err.Message, err.StackTrace); } if (instance == null) { // REVIEW -- Is this condition heuristically possible? Log.Trace("SubscriptionDataReader.MoveNext(): Instance null, continuing..."); continue; } //Check if we're in date range of the data request if (instance.Time < _periodStart)//取出的数据不在订阅的数据范围内 { _lastBarOutsideMarketHours = instance; instance = null; continue; } if (instance.Time > _periodFinish)//取出的数据比订阅的数据晚 { // we're done with data from this subscription, finalize the reader Current = null; _endOfStream = true; return false; } //Save bar for extended market hours (fill forward). if (!instanceMarketOpen) { _lastBarOutsideMarketHours = instance; } //However, if we only want market hours data, don't return yet: Discard and continue looping. if (!_config.ExtendedMarketHours && !instanceMarketOpen) { instance = null; } } } //Handle edge conditions: First Bar Read: // -> Use previous bar from yesterday if available if (Current == null) { //Handle first loop where not set yet: if (_lastBarOfStream == null) { //For first bar, fill forward from premarket data where possible _lastBarOfStream = _lastBarOutsideMarketHours ?? instance; } //If current not set yet, set Previous to yesterday/last bar read. Previous = _lastBarOfStream; } else { Previous = Current; } Current = instance; //End of Stream: rewind reader to last if (_reader.EndOfStream && instance == null) { //Log.Debug("SubscriptionDataReader.MoveNext(): Reader EOS."); _endOfStream = true; if (_isFillForward && Previous != null) { //If instance == null, current is null, so clone previous to record the final sample: Current = Previous.Clone(true); //When market closes fastforward current bar to the last bar fill forwarded to close time. Current.Time = _security.Exchange.TimeOfDayClosed(Previous.Time); // Save the previous bar as last bar before next stream (for fill forwrd). _lastBarOfStream = Previous; } return false; } return true; } catch (Exception err) { Log.Error("SubscriptionDataReader.MoveNext(): " + err.Message); return false; } } /// <summary> /// For backwards adjusted data the price is adjusted by a scale factor which is a combination of splits and dividends. /// This backwards adjusted price is used by default and fed as the current price. /// </summary> /// <param name="date">Current date of the backtest.</param> private void UpdateScaleFactors(DateTime date) { switch (_config.DataNormalizationMode) { case DataNormalizationMode.Raw: case DataNormalizationMode.TotalReturn: return; case DataNormalizationMode.SplitAdjusted: _config.PriceScaleFactor = _factorFile.GetSplitFactor(date); break; case DataNormalizationMode.Adjusted: _config.PriceScaleFactor = _factorFile.GetPriceScaleFactor(date); break; default: throw new ArgumentOutOfRangeException(); } } /// <summary> /// Check if this time is open for this subscription. /// </summary> /// <param name="time">Date and time we're checking to see if the market is open</param> /// <returns>Boolean true on market open</returns> public bool IsMarketOpen(DateTime time) { return _security.Exchange.DateTimeIsOpen(time); } /// <summary> /// Gets the associated exchange for this data reader/security /// </summary> public SecurityExchange Exchange { get { return _security.Exchange; } } /// <summary> /// Check if we're still in the extended market hours /// </summary> /// <param name="time">Time to scan</param> /// <returns>True on extended market hours</returns> public bool IsExtendedMarketOpen(DateTime time) { return _security.Exchange.DateTimeIsExtendedOpen(time); } /// <summary> /// Reset the IEnumeration /// </summary> /// <remarks>Not used</remarks> public void Reset() { throw new NotImplementedException("Reset method not implemented. Assumes loop will only be used once."); } /// <summary> /// Fetch and set the location of the data from the user's BaseData factory: /// </summary> /// <param name="date">Date of the source file.</param> /// <returns>Boolean true on successfully retrieving the data</returns> public bool RefreshSource(DateTime date) { //Update the source from the getSource method: _date = date; Log.Trace("RefreshSource:"+date); // if the map file is an empty instance this will always return true if (!_mapFile.HasData(date)) { // don't even bother checking the disk if the map files state we don't have ze dataz return false; } // check for dividends and split for this security CheckForDividend(date); CheckForSplit(date); var newSource = ""; //If we can find scale factor files on disk, use them. LiveTrading will aways use 1 by definition if (_hasScaleFactors) { // check to see if the symbol was remapped _mappedSymbol = _mapFile.GetMappedSymbol(date); _config.MappedSymbol = _mappedSymbol; // update our price scaling factors in light of the normalization mode UpdateScaleFactors(date); } //Make sure this particular security is trading today: if (!_security.Exchange.DateIsOpen(date)) { _endOfStream = true; return false; } //Choose the new source file, hide the QC source file locations newSource = GetSource(date); //When stream over stop looping on this data. if (newSource == "") { _endOfStream = true; return false; } Log.Debug("SubscriptionDataReader.MoveNext(): Source Refresh: " + newSource,1); if (_source != newSource && newSource != "") { //If a new file, reset the EOS flag: _endOfStream = false; //Set the new source. _source = newSource; //Close out the last source file. Dispose(); //Load the source: try { Log.Trace("SubscriptionDataReader.RefreshSource(): Created new reader for source: " + _source); _reader = GetReader(_source); } catch (Exception err) { Log.Error("SubscriptionDataReader.RefreshSource(): Failed to get reader: " + err.Message); //Engine.ResultHandler.DebugMessage("Failed to get a reader for the data source. There may be an error in your custom data source reader. Skipping date (" + date.ToShortDateString() + "). Err: " + err.Message); return false; } if (_reader == null) { Log.Error("Failed to get StreamReader for data source(" + _source + "), symbol(" + _mappedSymbol + "). Skipping date(" + date.ToShortDateString() + "). Reader is null."); //Engine.ResultHandler.DebugMessage("We could not find the requested data. This may be an invalid data request, failed download of custom data, or a public holiday. Skipping date (" + date.ToShortDateString() + ")."); if (_isDynamicallyLoadedData) { Engine.ResultHandler.ErrorMessage("We could not fetch the requested data. This may not be valid data, or a failed download of custom data. Skipping source (" + _source + ")."); } return false; } //Reset the public properties so we can explicitly set them with lastBar data. Current = null; Previous = null; //99% of time, populate the first "Current". 1% of of time no source file (getReader fails), so // method sets the Subscription properties as if no data. try { MoveNext(); } catch (Exception err) { throw new Exception("SubscriptionDataReader.RefreshSource(): Could not MoveNext to init stream: " + _source + " " + err.Message + " >> " + err.StackTrace); } } //Success: return true; } /// <summary> /// Check for dividends and emit them into the aux data queue /// </summary> private void CheckForSplit(DateTime date) { if (_splitFactor != null) { var close = GetRawClose(); var split = new Split(_config.Symbol, date, close, _splitFactor.Value); AuxiliaryData.Enqueue(split); _splitFactor = null; } decimal splitFactor; if (_factorFile.HasSplitEventOnNextTradingDay(date, out splitFactor)) { _splitFactor = splitFactor; } } /// <summary> /// Check for dividends and emit them into the aux data queue /// 检查股息,并将它写到附加数据队列中 /// </summary> private void CheckForDividend(DateTime date) { if (_priceFactorRatio != null) { var close = GetRawClose(); var dividend = new Dividend(_config.Symbol, date, close, _priceFactorRatio.Value); // let the config know about it for normalization _config.SumOfDividends += dividend.Distribution; AuxiliaryData.Enqueue(dividend); _priceFactorRatio = null; } // check the factor file to see if we have a dividend event tomorrow decimal priceFactorRatio; if (_factorFile.HasDividendEventOnNextTradingDay(date, out priceFactorRatio)) { _priceFactorRatio = priceFactorRatio; } } /// <summary> /// Un-normalizes the Previous.Value标准化什么鬼值 /// </summary> private decimal GetRawClose() { if (Previous == null) return 0m; var close = Previous.Value; switch (_config.DataNormalizationMode) { case DataNormalizationMode.Raw: break; case DataNormalizationMode.SplitAdjusted: case DataNormalizationMode.Adjusted: // we need to 'unscale' the price close = close/_config.PriceScaleFactor; break; case DataNormalizationMode.TotalReturn: // we need to remove the dividends since we've been accumulating them in the price close -= _config.SumOfDividends; break; default: throw new ArgumentOutOfRangeException(); } return close; } /// <summary> /// Using this source URL, download it to our cache and open a local reader. /// </summary> /// <param name="source">Source URL for the data:</param> /// <returns>StreamReader for the data source</returns> private IStreamReader GetReader(string source) { IStreamReader reader = null; if (_feedEndpoint == DataFeedEndpoint.LiveTrading) { // live trading currently always gets a rest endpoint,如果是实时交易,则只能用Rest读取 return new RestSubscriptionStreamReader(source); } // determine if we're hitting the file system/backtest if (_feedEndpoint == DataFeedEndpoint.FileSystem || _feedEndpoint == DataFeedEndpoint.Backtesting) { // construct a uri to determine if we have a local or remote file var uri = new Uri(source, UriKind.RelativeOrAbsolute); if (uri.IsAbsoluteUri && !uri.IsLoopback) {//绝对地址切uri的IP不是环回地址,即远端 reader = HandleRemoteSourceFile(source); } else {//本地 reader = HandleLocalFileSource(source); } } // if the reader is already at end of stream, just set to null so we don't try to get data for today if (reader != null && reader.EndOfStream) { reader = null; } return reader; } /// <summary> /// Dispose of the Stream Reader and close out the source stream and file connections. /// 废弃该流 /// </summary> public void Dispose() { if (_reader != null) { _reader.Close(); _reader.Dispose(); } if (_web != null) { _web.Dispose(); } } /// <summary> /// Get the source URL string for this datetime from the users GetSource() method in BaseData. /// 获取这个时间对应的文件URL /// </summary> /// <param name="date">DateTime we're requesting.</param> /// <returns>URL string of the source file</returns> public string GetSource(DateTime date) { var newSource = ""; //Invoke our instance of this method. if (_dataFactory != null) { try { newSource = _dataFactory.GetSource(_config, date, _feedEndpoint); } catch (Exception err) { Log.Error("SubscriptionDataReader.GetSource(): " + err.Message); Engine.ResultHandler.ErrorMessage("Error getting string source location for custom data source: " + err.Message, err.StackTrace); } } //Return the freshly calculated source URL. return newSource; } /// <summary> /// Opens up an IStreamReader for a local file source /// 打开一个本地文件数据流 /// </summary> private IStreamReader HandleLocalFileSource(string source) { if (!File.Exists(source)) { // the local uri doesn't exist, write an error and return null so we we don't try to get data for today Log.Trace("SubscriptionDataReader.GetReader(): Could not find QC Data, skipped: " + source); Engine.ResultHandler.SamplePerformance(_date.Date, 0); return null; } // handles zip or text files压缩文件或者文本文件都可以读取 return new LocalFileSubscriptionStreamReader(source); } /// <summary> /// Opens up an IStreamReader for a remote file source /// 打开远端文件数据源 /// </summary> private IStreamReader HandleRemoteSourceFile(string source) { // clean old files out of the cache if (!Directory.Exists(Constants.Cache)) Directory.CreateDirectory(Constants.Cache); foreach (var file in Directory.EnumerateFiles(Constants.Cache)) { if (File.GetCreationTime(file) < DateTime.Now.AddHours(-24)) File.Delete(file);//一天前的文件都删除掉 } try { // this will fire up a web client in order to download the 'source' file to the cache return new RemoteFileSubscriptionStreamReader(source, Constants.Cache);//创建一个远端文件读取流(先下载,后调用本地的读取流) } catch (Exception err) { Engine.ResultHandler.ErrorMessage("Error downloading custom data source file, skipped: " + source + " Err: " + err.Message, err.StackTrace); Engine.ResultHandler.SamplePerformance(_date.Date, 0); return null; } } } }