程序员的量化交易之路(35)--Lean之DataFeed数据槽3

简介:

转载需注明出处:http://blog.csdn.net/minimicallhttp://cloudtrade.top/


Lean引擎的模块划分非常的规范。其中DataFeed是数据槽,就是供应数据的模块。

1. IDataFeed 接口

模块的接口为:

namespace QuantConnect.Lean.Engine.DataFeeds
{
    /// <summary>
    /// Datafeed interface for creating custom datafeed sources.
    /// 数据供应的借口
    /// </summary>
    public interface IDataFeed
    {
        /******************************************************** 
        * INTERFACE PROPERTIES
        *********************************************************/
        /// <summary>
        /// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data.
        /// 订阅列表
        /// </summary>
        List<SubscriptionDataConfig> Subscriptions
        {
            get;
        }


        /// <summary>
        /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime).
        /// 实时价格
        /// </summary>
        /// <remarks>Indexed in order of the subscriptions</remarks>
        List<decimal> RealtimePrices
        {
            get;
        }

        /// <summary>
        /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out.
        /// 跨线程使用的队列,datafeed线程放入数据,算法主线程读出数据
        /// </summary>
        ConcurrentQueue<List<BaseData>>[] Bridge
        {
            get;
            set;
        }

        /// <summary>
        /// Boolean flag indicating there is no more data in any of our subscriptions.
        /// </summary>
        bool EndOfBridges
        {
            get;
        }

        /// <summary>
        /// Array of boolean flags indicating the data status for each queue/subscription we're tracking.
        /// </summary>
        bool[] EndOfBridge
        {
            get;
        }

        /// <summary>
        /// Set the source of the data we're requesting for the type-readers to know where to get data from.
        /// </summary>
        /// <remarks>Live or Backtesting Datafeed</remarks>
        DataFeedEndpoint DataFeed
        {
            get;
            set;
        }

        /// <summary>
        /// Public flag indicator that the thread is still busy.
        /// 设置该线程是否活跃
        /// </summary>
        bool IsActive
        {
            get;
        }

        /// <summary>
        /// The most advanced moment in time for which the data feed has completed loading data
        /// </summary>
        DateTime LoadedDataFrontier { get; }

        /// <summary>
        /// Data has completely loaded and we don't expect any more.
        /// </summary>
        bool LoadingComplete
        {
            get;
        }

        /******************************************************** 
        * INTERFACE METHODS
        *********************************************************/
        /// <summary>
        /// Primary entry point.
        /// </summary>
        void Run();


        /// <summary>
        /// External controller calls to signal a terminate of the thread.
        /// </summary>
        void Exit();


        /// <summary>
        /// Purge all remaining data in the thread.
        /// </summary>
        void PurgeData();
    }
}
IDataFeed是数据槽接口,是其他实现类必须实现的。

2. BaseDataFeed 数据槽基类

它实现IDataFeed,并且是其他派生类的一个基类。

namespace QuantConnect.Lean.Engine.DataFeeds
{
    /// <summary>
    /// Common components of a data feed allowing the extender to implement only the parts which matter.
    /// 数据槽的基类,允许派生类定制部分
    /// </summary>
    public abstract class BaseDataFeed : IDataFeed
    {
        /******************************************************** 
        * CLASS VARIABLES
        *********************************************************/
        private IAlgorithm _algorithm;
        private BacktestNodePacket _job;
        private bool _endOfStreams = false;
        private int _subscriptions = 0;
        private int _bridgeMax = 500000;
        private bool _exitTriggered = false;

        private DateTime[] _frontierTime;

        /******************************************************** 
        * CLASS PROPERTIES
        *********************************************************/
        /// <summary>
        /// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data.
        /// 订阅列表信息
        /// </summary>
        public List<SubscriptionDataConfig> Subscriptions { get; private set; }

        /// <summary>
        /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime).
        /// 实时价格
        /// </summary>
        /// <remarks>Indexed in order of the subscriptions</remarks>
        public List<decimal> RealtimePrices { get; private set; }

        /// <summary>
        /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out.
        /// 桥
        /// </summary>
        public ConcurrentQueue<List<BaseData>>[] Bridge { get; set; }

        /// <summary>
        /// Stream created from the configuration settings.
        /// 配置产生的流
        /// </summary>
        public SubscriptionDataReader[] SubscriptionReaderManagers { get; set; }

        /// <summary>
        /// Set the source of the data we're requesting for the type-readers to know where to get data from.
        /// </summary>
        /// <remarks>Live or Backtesting Datafeed</remarks>
        public DataFeedEndpoint DataFeed { get; set; }

        /// <summary>
        /// Flag indicating the hander thread is completely finished and ready to dispose.
        /// </summary>
        public bool IsActive { get; private set; }

        /// <summary>
        /// Flag indicating the file system has loaded all files.
        /// </summary>
        public bool LoadingComplete { get; private set; }

        /// <summary>
        /// Furthest point in time that the data has loaded into the bridges.
        /// </summary>
        public DateTime LoadedDataFrontier { get; private set; }

        /// <summary>
        /// Signifying no more data across all bridges
        /// </summary>
        public bool EndOfBridges
        {
            get
            {
                for (var i = 0; i < Bridge.Length; i++)
                {
                    if (Bridge[i].Count != 0 || EndOfBridge[i] != true)
                    {
                        return false;
                    }
                }
                return true;
            }
        }

        /// <summary>
        /// End of Stream for Each Bridge:
        /// </summary>
        public bool[] EndOfBridge { get; set; }

        /******************************************************** 
        * CLASS CONSTRUCTOR
        *********************************************************/
        /// <summary>
        /// Create an instance of the base datafeed.
        /// </summary>
        public BaseDataFeed(IAlgorithm algorithm, BacktestNodePacket job)
        {
            //Save the data subscriptions
            Subscriptions = algorithm.SubscriptionManager.Subscriptions;//是一个链表,每个节点代表了对一种证券资产数据的订阅
            _subscriptions = Subscriptions.Count;//订阅了证券数目

            //Public Properties:
            DataFeed = DataFeedEndpoint.FileSystem;//默认赋予从文件系统读取
            IsActive = true;//线程是否活跃
            Bridge = new ConcurrentQueue<List<BaseData>>[_subscriptions];//桥是一个链表的链表
            EndOfBridge = new bool[_subscriptions];
            SubscriptionReaderManagers = new SubscriptionDataReader[_subscriptions];//初始化读者列表
            RealtimePrices = new List<decimal>(_subscriptions);//初始化实时价格数据列表 
            _frontierTime = new DateTime[_subscriptions];

            //Class Privates:
            _job = job;//相关任务
            _algorithm = algorithm;//相关算法
            _endOfStreams = false;
            _bridgeMax = _bridgeMax / _subscriptions;

            //Initialize arrays:
            for (var i = 0; i < _subscriptions; i++)
            {
                _frontierTime[i] = job.PeriodStart;
                EndOfBridge[i] = false;
                Bridge[i] = new ConcurrentQueue<List<BaseData>>();//分配每个订阅桥节点的数据链表
                //为每个订阅分配读者
                SubscriptionReaderManagers[i] = new SubscriptionDataReader(Subscriptions[i], algorithm.Securities[Subscriptions[i].Symbol], DataFeedEndpoint.Database, job.PeriodStart, job.PeriodFinish);
            
            }
        }


        /// <summary>
        /// Launch the primary data thread.
        /// 读数据的线程主函数
        /// </summary>
        public virtual void Run()
        {
            while (!_exitTriggered && IsActive && !EndOfBridges)
            {
                for (var i = 0; i < Subscriptions.Count; i++)
                {
                    //With each subscription; fetch the next increment of data from the queues:
                    //为每一个订阅,读取下一个数据
                    var subscription = Subscriptions[i];//第i个证券订阅

                    if (Bridge[i].Count < 10000 && !EndOfBridge[i])//确定该证券读取的数据个数没有超出界限
                    {
                        var data = GetData(subscription);//读取数据的函数,返回数据

                        //Comment out for live databases, where we should continue asking even if no data.
                        if (data.Count == 0)//如果这个订阅没有数据,那么这个订阅就读取结束,跳到下一个订阅读取
                        {
                            EndOfBridge[i] = true;//本订阅读取结束
                            continue;
                        }

                        ////Insert data into bridge, each list is time-grouped. Assume all different time-groups.
                        foreach (var obj in data)
                        {
                            Bridge[i].Enqueue(new List<BaseData>() { obj });
                        }
                        
                        ////Record the furthest moment in time.
                        _frontierTime[i] = data.Max(bar => bar.Time);
                    }
                }
                //Set the most backward moment in time we've loaded
                LoadedDataFrontier = _frontierTime.Min();
            }

            IsActive = false;
        }


        /// <summary>
        /// Get the next set of data for this subscription
        /// 获取该订阅的下一集合数据
        /// </summary>
        /// <param name="subscription"></param>
        /// <returns></returns>
        public abstract  List<BaseData> GetData(SubscriptionDataConfig subscription);


        /// <summary>
        /// Send an exit signal to the thread.
        /// </summary>
        public virtual void Exit()
        {
            _exitTriggered = true;
            PurgeData();
        }

        /// <summary>
        /// Loop over all the queues and clear them to fast-quit this thread and return to main.
        /// </summary>
        public virtual void PurgeData()
        {
            foreach (var t in Bridge)
            {
                t.Clear();
            }
        }
    }
}
3  FileSystemDataFeed文件系统数据槽

namespace QuantConnect.Lean.Engine.DataFeeds
{
    /******************************************************** 
    * CLASS DEFINITIONS
    *********************************************************/
    /// <summary>
    /// Historical datafeed stream reader for processing files on a local disk.
    /// 从本地磁盘加载历史数据
    /// </summary>
    /// <remarks>Filesystem datafeeds are incredibly fast</remarks>
    public class FileSystemDataFeed : IDataFeed
    {
        /******************************************************** 
        * CLASS VARIABLES
        *********************************************************/
        // Set types in public area to speed up:
        private IAlgorithm _algorithm;
        private BacktestNodePacket _job;
        private bool _endOfStreams = false;
        private int _subscriptions = 0;
        private int _bridgeMax = 500000;
        private bool _exitTriggered = false;

        /******************************************************** 
        * CLASS PROPERTIES
        *********************************************************/
        /// <summary>
        /// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data.
        /// </summary>
        public List<SubscriptionDataConfig> Subscriptions { get; private set; }

        /// <summary>
        /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime).
        /// </summary>
        /// <remarks>Indexed in order of the subscriptions</remarks>
        public List<decimal> RealtimePrices { get; private set; } 

        /// <summary>
        /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out.
        /// </summary>
        public ConcurrentQueue<List<BaseData>>[] Bridge { get; set; }

        /// <summary>
        /// Set the source of the data we're requesting for the type-readers to know where to get data from.
        /// </summary>
        /// <remarks>Live or Backtesting Datafeed</remarks>
        public DataFeedEndpoint DataFeed { get; set; }

        /// <summary>
        /// Flag indicating the hander thread is completely finished and ready to dispose.
        /// </summary>
        public bool IsActive { get; private set; }

        /// <summary>
        /// Flag indicating the file system has loaded all files.
        /// </summary>
        public bool LoadingComplete { get; private set; }

        /// <summary>
        /// Furthest point in time that the data has loaded into the bridges.
        /// </summary>
        public DateTime LoadedDataFrontier { get; private set; }

        /// <summary>
        /// Stream created from the configuration settings.
        /// </summary>
        private SubscriptionDataReader[] SubscriptionReaders { get; set; }

        /// <summary>
        /// Signifying no more data across all bridges
        /// </summary>
        public bool EndOfBridges 
        {
            get 
            {
                for (var i = 0; i < Bridge.Length; i++)
                {
                    if (Bridge[i].Count != 0 || EndOfBridge[i] != true || _endOfStreams != true)
                    {
                        return false;
                    }
                }
                return true;
            }
        }

        /// <summary>
        /// End of Stream for Each Bridge:
        /// </summary>
        public bool[] EndOfBridge { get; set; }

        /// <summary>
        /// Frontiers for each fill forward high water mark
        /// </summary>
        public DateTime[] FillForwardFrontiers;

        /******************************************************** 
        * CLASS CONSTRUCTOR
        *********************************************************/
        /// <summary>
        /// Create a new backtesting data feed.
        /// </summary>
        /// <param name="algorithm">Instance of the algorithm</param>
        /// <param name="job">Algorithm work task</param>
        public FileSystemDataFeed(IAlgorithm algorithm, BacktestNodePacket job)
        {
            Console.WriteLine("FileSystemDataFeed,algorithm:" + algorithm + ",job: " + job);
            Subscriptions = algorithm.SubscriptionManager.Subscriptions;
            Console.WriteLine("Subscriptions.count:" + Subscriptions.Count);
            _subscriptions = Subscriptions.Count;
          

            //Public Properties:
            DataFeed = DataFeedEndpoint.FileSystem;
            IsActive = true;
            Bridge = new ConcurrentQueue<List<BaseData>>[_subscriptions];
            EndOfBridge = new bool[_subscriptions];
            SubscriptionReaders = new SubscriptionDataReader[_subscriptions];
            FillForwardFrontiers = new DateTime[_subscriptions];
            RealtimePrices = new List<decimal>(_subscriptions);

            //Class Privates:
            _job = job;
            _algorithm = algorithm;
            _endOfStreams = false;
            _bridgeMax = _bridgeMax / _subscriptions; //Set the bridge maximum count:

            for (var i = 0; i < _subscriptions; i++)
            {
                //Create a new instance in the dictionary:
                Bridge[i] = new ConcurrentQueue<List<BaseData>>();
                EndOfBridge[i] = false;

                SubscriptionReaders[i] = new SubscriptionDataReader(Subscriptions[i], _algorithm.Securities[Subscriptions[i].Symbol], DataFeed, _job.PeriodStart, _job.PeriodFinish);
                FillForwardFrontiers[i] = new DateTime();
            }
        }

        /******************************************************** 
        * CLASS METHODS
        *********************************************************/
        /// <summary>
        /// Main routine for datafeed analysis.
        /// </summary>
        /// <remarks>This is a hot-thread and should be kept extremely lean. Modify with caution.</remarks>
        public void Run()
        {
            Log.Trace("debug FileSystemDataFeed.run()");
            Console.WriteLine("FileSystemDataFeed.run()");
            //Calculate the increment based on the subscriptions:
            var tradeBarIncrements = CalculateIncrement(includeTick: false);
            var increment = CalculateIncrement(includeTick: true);

            //Loop over each date in the job
            foreach (var date in Time.EachTradeableDay(_algorithm.Securities, _job.PeriodStart, _job.PeriodFinish))
            {
                Log.Trace("in trading date:"+date+",PeriodStart:"+_job.PeriodStart+",PeriodFinish:"+_job.PeriodFinish);
                //Update the source-URL from the BaseData, reset the frontier to today. Update the source URL once per day.
                // this is really the next frontier in the future
                var frontier = date.Add(increment);
                var activeStreams = _subscriptions;
                Log.Trace("subscription:" + _subscriptions);
                //Initialize the feeds to this date:
                for (var i = 0; i < _subscriptions; i++) 
                {
                    //Don't refresh source when we know the market is closed for this security:
                    Log.Trace("i:"+i+"subscription");
                    var success = SubscriptionReaders[i].RefreshSource(date);

                    //If we know the market is closed for security then can declare bridge closed.
                    if (success) {
                        EndOfBridge[i] = false;
                    }
                    else
                    {
                        ProcessMissingFileFillForward(SubscriptionReaders[i], i, tradeBarIncrements, date);
                        EndOfBridge[i] = true;
                    }
                }

                //Pause the DataFeed
                var bridgeFullCount = Bridge.Count(bridge => bridge.Count >= _bridgeMax);
                var bridgeZeroCount = Bridge.Count(bridge => bridge.Count == 0);
                var active = GetActiveStreams();

                //Pause here while bridges are full, but allow missing files to pass
                while (bridgeFullCount > 0 && ((_subscriptions - active) == bridgeZeroCount) && !_exitTriggered)
                {
                    bridgeFullCount = Bridge.Count(bridge => bridge.Count >= _bridgeMax);
                    bridgeZeroCount = Bridge.Count(bridge => bridge.Count == 0);
                    Thread.Sleep(5);
                }

                // for each smallest resolution
                var datePlusOneDay = date.Date.AddDays(1);
                while ((frontier.Date == date.Date || frontier.Date == datePlusOneDay) && !_exitTriggered)
                {
                    var cache = new List<BaseData>[_subscriptions];
                    
                    //Reset Loop:
                    long earlyBirdTicks = 0;

                    //Go over all the subscriptions, one by one add a minute of data to the bridge.
                    //对所订阅的证券进行一个个的加载,加载到数据桥中
                    for (var i = 0; i < _subscriptions; i++)
                    {
                        //Get the reader manager:获得第i个证券的读者
                        var manager = SubscriptionReaders[i];

                        //End of the manager stream set flag to end bridge: also if the EOB flag set, from the refresh source method above
                        if (manager.EndOfStream || EndOfBridge[i])
                        {
                            EndOfBridge[i] = true;
                            activeStreams = GetActiveStreams();
                            if (activeStreams == 0)
                            {
                                frontier = frontier.Date + TimeSpan.FromDays(1);
                            }
                            continue;
                        }

                        //Initialize data store:
                        cache[i] = new List<BaseData>(2);

                        //Add the last iteration to the new list: only if it falls into this time category
                        //下面这个代码很关键,它把当前读到的数据条放到该证券对应的链表里面
                        var cacheAtIndex = cache[i];
                        while (manager.Current.EndTime < frontier)
                        {
                            Log.Trace("Current:symbol:" + manager.Current.Symbol + ",price" + manager.Current.Price);
                            cacheAtIndex.Add(manager.Current);//放Current到该证券对应的链表里面
                            Log.Trace(string.Format("FileSystemDataFeed,Current: {0}", manager.Current));
                            if (!manager.MoveNext()) break;//读取下一个数据
                        }

                        //Save the next earliest time from the bridges: only if we're not filling forward.
                        if (manager.Current != null)
                        {
                            if (earlyBirdTicks == 0 || manager.Current.EndTime.Ticks < earlyBirdTicks)
                            {
                                earlyBirdTicks = manager.Current.EndTime.Ticks;
                            }
                        }
                    }

                    if (activeStreams == 0)
                    {
                        break;
                    }

                    //Add all the lists to the bridge, release the bridge
                    //we push all the data up to this frontier into the bridge at once
                    for (var i = 0; i < _subscriptions; i++)
                    {
                        if (cache[i] != null && cache[i].Count > 0)
                        {
                            FillForwardFrontiers[i] = cache[i][0].EndTime;
                            Bridge[i].Enqueue(cache[i]);
                        }
                        ProcessFillForward(SubscriptionReaders[i], i, tradeBarIncrements);
                    }

                    //This will let consumers know we have loaded data up to this date
                    //So that the data stream doesn't pull off data from the same time period in different events
                    LoadedDataFrontier = frontier;

                    if (earlyBirdTicks > 0 && earlyBirdTicks > frontier.Ticks) 
                    {
                        //Jump increment to the nearest second, in the future: Round down, add increment
                        frontier = (new DateTime(earlyBirdTicks)).RoundDown(increment) + increment;
                    }
                    else
                    {
                        //Otherwise step one forward.
                        frontier += increment;
                    }

                } // End of This Day.

                if (_exitTriggered) break;

            } // End of All Days:

            Log.Trace(DataFeed + ".Run(): Data Feed Completed.");
            LoadingComplete = true;

            //Make sure all bridges empty before declaring "end of bridge":
            while (!EndOfBridges && !_exitTriggered)
            {
                for (var i = 0; i < _subscriptions; i++)
                {
                    //Nothing left in the bridge, mark it as finished
                    if (Bridge[i].Count == 0)
                    {
                        EndOfBridge[i] = true;
                    }
                }
                if (GetActiveStreams() == 0) _endOfStreams = true;
                Thread.Sleep(100);
            }

            //Close up all streams:
            for (var i = 0; i < Subscriptions.Count; i++)
            {
                SubscriptionReaders[i].Dispose();
            }

            Log.Trace(DataFeed + ".Run(): Ending Thread... ");
            IsActive = false;
        }



        /// <summary>
        /// Send an exit signal to the thread.
        /// 退出该线程
        /// </summary>
        public void Exit()
        {
            _exitTriggered = true;
            PurgeData();
        }


        /// <summary>
        /// Loop over all the queues and clear them to fast-quit this thread and return to main.
        /// 清除缓存
        /// </summary>
        public void PurgeData()
        {
            foreach (var t in Bridge)
            {
                t.Clear();
            }
        }

        private void ProcessMissingFileFillForward(SubscriptionDataReader manager, int i, TimeSpan increment, DateTime dateToFill)
        {
            // we'll copy the current into the next day
            var subscription = Subscriptions[i];
            if (!subscription.FillDataForward || manager.Current == null) return;

            var start = dateToFill.Date + manager.Exchange.MarketOpen;
            if (subscription.ExtendedMarketHours)
            {
                start = dateToFill.Date + manager.Exchange.ExtendedMarketOpen;
            }

            // shift the 'start' time to the end of the bar by adding the increment, this makes 'date'
            // the end time which also allows the market open functions to behave as expected

            var current = manager.Current;
            for (var endTime = start.Add(increment); endTime.Date == dateToFill.Date; endTime = endTime + increment)
            {
                if (manager.IsMarketOpen(endTime) || (subscription.ExtendedMarketHours && manager.IsExtendedMarketOpen(endTime)))
                {
                    EnqueueFillForwardData(i, current, endTime);
                }
                else
                {
                    // stop fill forwarding when we're no longer open
                    break;
                }
            }
        }

        /// <summary>
        /// If this is a fillforward subscription, look at the previous time, and current time, and add new 
        /// objects to queue until current time to fill up the gaps.
        /// </summary>
        /// <param name="manager">Subscription to process</param>
        /// <param name="i">Subscription position in the bridge ( which queue are we pushing data to )</param>
        /// <param name="increment">Timespan increment to jump the fillforward results</param>
        private void ProcessFillForward(SubscriptionDataReader manager, int i, TimeSpan increment)
        {
            // If previous == null cannot fill forward nothing there to move forward (e.g. cases where file not found on first file).
            if (!Subscriptions[i].FillDataForward || manager.Previous == null || manager.Current == null) return;

            //Last tradebar and the current one we're about to add to queue:
            var previous = manager.Previous;
            var current = manager.Current;

            // final two points of file that ends at midnight, causes issues in the day rollover/fill forward
            if (current.EndTime.TimeOfDay.Ticks == 0 && previous.EndTime == current.Time)
            {
                return;
            }

            //Initialize the frontier:
            if (FillForwardFrontiers[i].Ticks == 0) FillForwardFrontiers[i] = previous.EndTime;

            // using the previous to fill forward since 'current' is ahead the frontier
            var whatToFill = previous;
            // using current.EndTime as fill until because it's the next piece of data we have for this subscription
            var fillUntil = current.EndTime;

            //Data ended before the market closed: premature ending flag - continue filling forward until market close.
            if (manager.EndOfStream && manager.IsMarketOpen(current.EndTime))
            {
                //Make sure we only fill forward to end of *today* -- don't fill forward tomorrow just because its also open
                fillUntil = FillForwardFrontiers[i].Date.AddDays(1);
                // since we ran out of data, use the current as the clone source, it's more recent than previous
                whatToFill = current;
            }

            // loop from our last time (previous.EndTime) to our current.EndTime, filling in all missing day during
            // request market hours
            for (var endTime = FillForwardFrontiers[i] + increment; (endTime < fillUntil); endTime = endTime + increment)
            {
                if (Subscriptions[i].ExtendedMarketHours)
                {
                    if (!manager.IsExtendedMarketOpen(endTime.Subtract(increment)))
                    {
                        //If we've asked for extended hours, and the security is no longer inside extended market hours, skip:
                        continue;
                    }
                }
                else
                {
                    // if the market isn't open skip to the current.EndTime and rewind until the market is open
                    // this is the case where the previous value is from yesterday but we're trying to fill forward
                    // the next day, so instead of zooming through 18 hours of off-market hours, skip to our current data
                    // point and rewind the market open.
                    //
                    // E.g, Current.EndTime = 9:40am and Previous.EndTime = 2:00pm, so fill in from 2->4pm
                    // and then skip to 9:40am, reverse to 9:30am and fill from 9:30->9:40
                    if (!manager.IsMarketOpen(endTime.Subtract(increment)) && Subscriptions[i].Resolution != Resolution.Daily)
                    {
                        // Move fill forward so we don't waste time in this tight loop.
                        endTime = fillUntil;
                        do
                        {
                            endTime = endTime - increment;
                        }
                        // is market open assumes start time of bars, open at 9:30 closed at 4:00
                        // so decrement our date to use the start time
                        while (manager.IsMarketOpen(endTime.Subtract(increment)));
                        continue;
                    }
                }

                // add any overlap condition here
                if (Subscriptions[i].Resolution == Resolution.Daily)
                {
                    // handle fill forward on lower resolutions
                    var barStartTime = endTime - increment;
                    if (manager.Exchange.IsOpenDuringBar(barStartTime, endTime, Subscriptions[i].ExtendedMarketHours))
                    {
                        EnqueueFillForwardData(i, previous, endTime);
                    }
                    // special case catch missing days
                    else if (endTime.TimeOfDay.Ticks == 0 && manager.Exchange.DateIsOpen(endTime.Date.AddDays(-1)))
                    {
                        EnqueueFillForwardData(i, previous, endTime);
                    }
                    continue;
                }

                EnqueueFillForwardData(i, whatToFill, endTime);
            }
        }


        private void EnqueueFillForwardData(int i, BaseData previous, DateTime dataEndTime)
        {
            var cache = new List<BaseData>(1);
            var fillforward = previous.Clone(true);
            fillforward.Time = dataEndTime.Subtract(Subscriptions[i].Increment);
            fillforward.EndTime = dataEndTime;
            FillForwardFrontiers[i] = dataEndTime;
            cache.Add(fillforward);
            Bridge[i].Enqueue(cache);
        }


        /// <summary>
        /// Get the number of active streams still EndOfBridge array.
        /// </summary>
        /// <returns>Count of the number of streams with data</returns>
        private int GetActiveStreams()
        {
            //Get the number of active streams:
            var activeStreams = (from stream in EndOfBridge
                                 where stream == false
                                 select stream).Count();
            return activeStreams;
        }


        /// <summary>
        /// Calculate the minimum increment to scan for data based on the data requested.
        /// </summary>
        /// <param name="includeTick">When true the subscriptions include a tick data source, meaning there is almost no increment.</param>
        /// <returns>Timespan to jump the data source so it efficiently orders the results</returns>
        private TimeSpan CalculateIncrement(bool includeTick)
        {
            var increment = TimeSpan.FromDays(1);
            foreach (var config in Subscriptions)
            {
                switch (config.Resolution)
                {
                    //Hourly TradeBars:
                    case Resolution.Hour:
                        if (increment > TimeSpan.FromHours(1))
                        {
                            increment = TimeSpan.FromHours(1);
                        }
                        break;

                    //Minutely TradeBars:
                    case Resolution.Minute:
                        if (increment > TimeSpan.FromMinutes(1))
                        {
                            increment = TimeSpan.FromMinutes(1);
                        }
                        break;

                    //Secondly Bars:
                    case Resolution.Second:
                        if (increment > TimeSpan.FromSeconds(1))
                        {
                            increment = TimeSpan.FromSeconds(1);
                        }
                        break;

                    //Ticks: No increment; just fire each data piece in as they happen.
                    case Resolution.Tick:
                        if (increment > TimeSpan.FromMilliseconds(1) && includeTick)
                        {
                            increment = new TimeSpan(0, 0, 0, 0, 1);
                        }
                        break;
                }
            }
            return increment;
        }

    } // End FileSystem Local Feed Class:
} // End Namespace

4. BackTestingDataFeed 回归测试数据槽

namespace QuantConnect.Lean.Engine.DataFeeds
{
    /******************************************************** 
    * CLASS DEFINITIONS
    *********************************************************/
    /// <summary>
    /// Backtesting data feed extends the filesystem data feed with almost no modifications. Later this method can
    /// be used for implementing alternative sources/generation for backtesting data.
    /// 回归测试数据槽是文件系统数据槽的派生类
    /// </summary>
    public class BacktestingDataFeed : FileSystemDataFeed
    {
        /******************************************************** 
        * CLASS VARIABLES
        *********************************************************/

        /******************************************************** 
        * CLASS PROPERTIES
        *********************************************************/

        /******************************************************** 
        * CLASS CONSTRUCTOR
        *********************************************************/
        /// <summary>
        /// Pass through the backtesting datafeed to the underlying file system datafeed implementation.
        /// </summary>
        /// <param name="algorithm">Algorithm we're operating with</param>
        /// <param name="job">Algorithm worker job</param>
        public BacktestingDataFeed(IAlgorithm algorithm, BacktestNodePacket job) : base(algorithm, job)
        {
            DataFeed = DataFeedEndpoint.Backtesting;
        }
    } // End Backtesting Feed Class:
} // End Namespace


此外还有数据库数据槽DataBaseDataFeed和LiveTradingDataFeed实时交易数据槽。在这里就不在说明。

相关文章
|
安全 编译器 C++
[笔记]读书笔记 C++设计新思维《二》技术(Techniques)(一)
[笔记]读书笔记 C++设计新思维《二》技术(Techniques)
|
存储 算法 Java
[笔记]读书笔记 C++设计新思维《二》技术(Techniques)(二)
[笔记]读书笔记 C++设计新思维《二》技术(Techniques)(二)
|
机器学习/深度学习 架构师 JavaScript
15年软件架构师经验总结:在ML领域,初学者踩过的5个坑
15年软件架构师经验总结:在ML领域,初学者踩过的5个坑
|
数据库 Windows
艾伟:基于.NET平台的Windows编程实战(二)—— 需求分析与数据库设计
本系列文章导航 基于.NET平台的Windows编程实战(一)——前言 基于.NET平台的Windows编程实战(二)—— 需求分析与数据库设计 基于.NET平台的Windows编程实战(四)—— 数据库操作类的编写 基于.NET平台的Windows编程实战(五)—— 问卷管理功能的实现 基于.NET平台的Windows编程实战(六)—— 题目管理功能的实现   大家都知道一个系统的成败与否关键在于其所做的需求分析是否到位,数据库的设计是否合理。
1026 0
|
数据库 Windows
艾伟_转载:基于.NET平台的Windows编程实战(二)—— 需求分析与数据库设计
本系列文章导航 基于.NET平台的Windows编程实战(一)——前言 基于.NET平台的Windows编程实战(二)—— 需求分析与数据库设计 基于.NET平台的Windows编程实战(四)—— 数据库操作类的编写 基于.NET平台的Windows编程实战(五)—— 问卷管理功能的实现 基于.NET平台的Windows编程实战(六)—— 题目管理功能的实现   大家都知道一个系统的成败与否关键在于其所做的需求分析是否到位,数据库的设计是否合理。
993 0
|
存储 监控 NoSQL
How Digg is Built:讲述Digg背后的技术,互联网营销
  虽然最近业绩有所下滑,也出现了一些技术故障,但Digg作为首屈一指的社会化新闻网站,其背后的技术还是值得一探,最近Digg工程师 Dave Beckett 的一篇名为《How Digg is Built》的文章,非常系统地将Digg背后的技术展现给大家,非常值得一看。
1176 0
|
测试技术 API iOS开发
Artsy 工程师总结的一些 Cocoa 开发设计误区
本文讲的是Artsy 工程师总结的一些 Cocoa 开发设计误区,在开发 Artsy 这款 iOS app 的时候,我们尝试了一些设计模式。现在我想要谈谈现在我们有的和已经被移除的设计模式。我不会面面俱到,毕竟已经历了那么长时间
1039 0