程序员的量化交易之路(35)--Lean之DataFeed数据槽3

简介:

转载需注明出处:http://blog.csdn.net/minimicallhttp://cloudtrade.top/


Lean引擎的模块划分非常的规范。其中DataFeed是数据槽,就是供应数据的模块。

1. IDataFeed 接口

模块的接口为:

namespace QuantConnect.Lean.Engine.DataFeeds
{
    /// <summary>
    /// Datafeed interface for creating custom datafeed sources.
    /// 数据供应的借口
    /// </summary>
    public interface IDataFeed
    {
        /******************************************************** 
        * INTERFACE PROPERTIES
        *********************************************************/
        /// <summary>
        /// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data.
        /// 订阅列表
        /// </summary>
        List<SubscriptionDataConfig> Subscriptions
        {
            get;
        }


        /// <summary>
        /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime).
        /// 实时价格
        /// </summary>
        /// <remarks>Indexed in order of the subscriptions</remarks>
        List<decimal> RealtimePrices
        {
            get;
        }

        /// <summary>
        /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out.
        /// 跨线程使用的队列,datafeed线程放入数据,算法主线程读出数据
        /// </summary>
        ConcurrentQueue<List<BaseData>>[] Bridge
        {
            get;
            set;
        }

        /// <summary>
        /// Boolean flag indicating there is no more data in any of our subscriptions.
        /// </summary>
        bool EndOfBridges
        {
            get;
        }

        /// <summary>
        /// Array of boolean flags indicating the data status for each queue/subscription we're tracking.
        /// </summary>
        bool[] EndOfBridge
        {
            get;
        }

        /// <summary>
        /// Set the source of the data we're requesting for the type-readers to know where to get data from.
        /// </summary>
        /// <remarks>Live or Backtesting Datafeed</remarks>
        DataFeedEndpoint DataFeed
        {
            get;
            set;
        }

        /// <summary>
        /// Public flag indicator that the thread is still busy.
        /// 设置该线程是否活跃
        /// </summary>
        bool IsActive
        {
            get;
        }

        /// <summary>
        /// The most advanced moment in time for which the data feed has completed loading data
        /// </summary>
        DateTime LoadedDataFrontier { get; }

        /// <summary>
        /// Data has completely loaded and we don't expect any more.
        /// </summary>
        bool LoadingComplete
        {
            get;
        }

        /******************************************************** 
        * INTERFACE METHODS
        *********************************************************/
        /// <summary>
        /// Primary entry point.
        /// </summary>
        void Run();


        /// <summary>
        /// External controller calls to signal a terminate of the thread.
        /// </summary>
        void Exit();


        /// <summary>
        /// Purge all remaining data in the thread.
        /// </summary>
        void PurgeData();
    }
}
IDataFeed是数据槽接口,是其他实现类必须实现的。

2. BaseDataFeed 数据槽基类

它实现IDataFeed,并且是其他派生类的一个基类。

namespace QuantConnect.Lean.Engine.DataFeeds
{
    /// <summary>
    /// Common components of a data feed allowing the extender to implement only the parts which matter.
    /// 数据槽的基类,允许派生类定制部分
    /// </summary>
    public abstract class BaseDataFeed : IDataFeed
    {
        /******************************************************** 
        * CLASS VARIABLES
        *********************************************************/
        private IAlgorithm _algorithm;
        private BacktestNodePacket _job;
        private bool _endOfStreams = false;
        private int _subscriptions = 0;
        private int _bridgeMax = 500000;
        private bool _exitTriggered = false;

        private DateTime[] _frontierTime;

        /******************************************************** 
        * CLASS PROPERTIES
        *********************************************************/
        /// <summary>
        /// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data.
        /// 订阅列表信息
        /// </summary>
        public List<SubscriptionDataConfig> Subscriptions { get; private set; }

        /// <summary>
        /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime).
        /// 实时价格
        /// </summary>
        /// <remarks>Indexed in order of the subscriptions</remarks>
        public List<decimal> RealtimePrices { get; private set; }

        /// <summary>
        /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out.
        /// 桥
        /// </summary>
        public ConcurrentQueue<List<BaseData>>[] Bridge { get; set; }

        /// <summary>
        /// Stream created from the configuration settings.
        /// 配置产生的流
        /// </summary>
        public SubscriptionDataReader[] SubscriptionReaderManagers { get; set; }

        /// <summary>
        /// Set the source of the data we're requesting for the type-readers to know where to get data from.
        /// </summary>
        /// <remarks>Live or Backtesting Datafeed</remarks>
        public DataFeedEndpoint DataFeed { get; set; }

        /// <summary>
        /// Flag indicating the hander thread is completely finished and ready to dispose.
        /// </summary>
        public bool IsActive { get; private set; }

        /// <summary>
        /// Flag indicating the file system has loaded all files.
        /// </summary>
        public bool LoadingComplete { get; private set; }

        /// <summary>
        /// Furthest point in time that the data has loaded into the bridges.
        /// </summary>
        public DateTime LoadedDataFrontier { get; private set; }

        /// <summary>
        /// Signifying no more data across all bridges
        /// </summary>
        public bool EndOfBridges
        {
            get
            {
                for (var i = 0; i < Bridge.Length; i++)
                {
                    if (Bridge[i].Count != 0 || EndOfBridge[i] != true)
                    {
                        return false;
                    }
                }
                return true;
            }
        }

        /// <summary>
        /// End of Stream for Each Bridge:
        /// </summary>
        public bool[] EndOfBridge { get; set; }

        /******************************************************** 
        * CLASS CONSTRUCTOR
        *********************************************************/
        /// <summary>
        /// Create an instance of the base datafeed.
        /// </summary>
        public BaseDataFeed(IAlgorithm algorithm, BacktestNodePacket job)
        {
            //Save the data subscriptions
            Subscriptions = algorithm.SubscriptionManager.Subscriptions;//是一个链表,每个节点代表了对一种证券资产数据的订阅
            _subscriptions = Subscriptions.Count;//订阅了证券数目

            //Public Properties:
            DataFeed = DataFeedEndpoint.FileSystem;//默认赋予从文件系统读取
            IsActive = true;//线程是否活跃
            Bridge = new ConcurrentQueue<List<BaseData>>[_subscriptions];//桥是一个链表的链表
            EndOfBridge = new bool[_subscriptions];
            SubscriptionReaderManagers = new SubscriptionDataReader[_subscriptions];//初始化读者列表
            RealtimePrices = new List<decimal>(_subscriptions);//初始化实时价格数据列表 
            _frontierTime = new DateTime[_subscriptions];

            //Class Privates:
            _job = job;//相关任务
            _algorithm = algorithm;//相关算法
            _endOfStreams = false;
            _bridgeMax = _bridgeMax / _subscriptions;

            //Initialize arrays:
            for (var i = 0; i < _subscriptions; i++)
            {
                _frontierTime[i] = job.PeriodStart;
                EndOfBridge[i] = false;
                Bridge[i] = new ConcurrentQueue<List<BaseData>>();//分配每个订阅桥节点的数据链表
                //为每个订阅分配读者
                SubscriptionReaderManagers[i] = new SubscriptionDataReader(Subscriptions[i], algorithm.Securities[Subscriptions[i].Symbol], DataFeedEndpoint.Database, job.PeriodStart, job.PeriodFinish);
            
            }
        }


        /// <summary>
        /// Launch the primary data thread.
        /// 读数据的线程主函数
        /// </summary>
        public virtual void Run()
        {
            while (!_exitTriggered && IsActive && !EndOfBridges)
            {
                for (var i = 0; i < Subscriptions.Count; i++)
                {
                    //With each subscription; fetch the next increment of data from the queues:
                    //为每一个订阅,读取下一个数据
                    var subscription = Subscriptions[i];//第i个证券订阅

                    if (Bridge[i].Count < 10000 && !EndOfBridge[i])//确定该证券读取的数据个数没有超出界限
                    {
                        var data = GetData(subscription);//读取数据的函数,返回数据

                        //Comment out for live databases, where we should continue asking even if no data.
                        if (data.Count == 0)//如果这个订阅没有数据,那么这个订阅就读取结束,跳到下一个订阅读取
                        {
                            EndOfBridge[i] = true;//本订阅读取结束
                            continue;
                        }

                        ////Insert data into bridge, each list is time-grouped. Assume all different time-groups.
                        foreach (var obj in data)
                        {
                            Bridge[i].Enqueue(new List<BaseData>() { obj });
                        }
                        
                        ////Record the furthest moment in time.
                        _frontierTime[i] = data.Max(bar => bar.Time);
                    }
                }
                //Set the most backward moment in time we've loaded
                LoadedDataFrontier = _frontierTime.Min();
            }

            IsActive = false;
        }


        /// <summary>
        /// Get the next set of data for this subscription
        /// 获取该订阅的下一集合数据
        /// </summary>
        /// <param name="subscription"></param>
        /// <returns></returns>
        public abstract  List<BaseData> GetData(SubscriptionDataConfig subscription);


        /// <summary>
        /// Send an exit signal to the thread.
        /// </summary>
        public virtual void Exit()
        {
            _exitTriggered = true;
            PurgeData();
        }

        /// <summary>
        /// Loop over all the queues and clear them to fast-quit this thread and return to main.
        /// </summary>
        public virtual void PurgeData()
        {
            foreach (var t in Bridge)
            {
                t.Clear();
            }
        }
    }
}
3  FileSystemDataFeed文件系统数据槽

namespace QuantConnect.Lean.Engine.DataFeeds
{
    /******************************************************** 
    * CLASS DEFINITIONS
    *********************************************************/
    /// <summary>
    /// Historical datafeed stream reader for processing files on a local disk.
    /// 从本地磁盘加载历史数据
    /// </summary>
    /// <remarks>Filesystem datafeeds are incredibly fast</remarks>
    public class FileSystemDataFeed : IDataFeed
    {
        /******************************************************** 
        * CLASS VARIABLES
        *********************************************************/
        // Set types in public area to speed up:
        private IAlgorithm _algorithm;
        private BacktestNodePacket _job;
        private bool _endOfStreams = false;
        private int _subscriptions = 0;
        private int _bridgeMax = 500000;
        private bool _exitTriggered = false;

        /******************************************************** 
        * CLASS PROPERTIES
        *********************************************************/
        /// <summary>
        /// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data.
        /// </summary>
        public List<SubscriptionDataConfig> Subscriptions { get; private set; }

        /// <summary>
        /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime).
        /// </summary>
        /// <remarks>Indexed in order of the subscriptions</remarks>
        public List<decimal> RealtimePrices { get; private set; } 

        /// <summary>
        /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out.
        /// </summary>
        public ConcurrentQueue<List<BaseData>>[] Bridge { get; set; }

        /// <summary>
        /// Set the source of the data we're requesting for the type-readers to know where to get data from.
        /// </summary>
        /// <remarks>Live or Backtesting Datafeed</remarks>
        public DataFeedEndpoint DataFeed { get; set; }

        /// <summary>
        /// Flag indicating the hander thread is completely finished and ready to dispose.
        /// </summary>
        public bool IsActive { get; private set; }

        /// <summary>
        /// Flag indicating the file system has loaded all files.
        /// </summary>
        public bool LoadingComplete { get; private set; }

        /// <summary>
        /// Furthest point in time that the data has loaded into the bridges.
        /// </summary>
        public DateTime LoadedDataFrontier { get; private set; }

        /// <summary>
        /// Stream created from the configuration settings.
        /// </summary>
        private SubscriptionDataReader[] SubscriptionReaders { get; set; }

        /// <summary>
        /// Signifying no more data across all bridges
        /// </summary>
        public bool EndOfBridges 
        {
            get 
            {
                for (var i = 0; i < Bridge.Length; i++)
                {
                    if (Bridge[i].Count != 0 || EndOfBridge[i] != true || _endOfStreams != true)
                    {
                        return false;
                    }
                }
                return true;
            }
        }

        /// <summary>
        /// End of Stream for Each Bridge:
        /// </summary>
        public bool[] EndOfBridge { get; set; }

        /// <summary>
        /// Frontiers for each fill forward high water mark
        /// </summary>
        public DateTime[] FillForwardFrontiers;

        /******************************************************** 
        * CLASS CONSTRUCTOR
        *********************************************************/
        /// <summary>
        /// Create a new backtesting data feed.
        /// </summary>
        /// <param name="algorithm">Instance of the algorithm</param>
        /// <param name="job">Algorithm work task</param>
        public FileSystemDataFeed(IAlgorithm algorithm, BacktestNodePacket job)
        {
            Console.WriteLine("FileSystemDataFeed,algorithm:" + algorithm + ",job: " + job);
            Subscriptions = algorithm.SubscriptionManager.Subscriptions;
            Console.WriteLine("Subscriptions.count:" + Subscriptions.Count);
            _subscriptions = Subscriptions.Count;
          

            //Public Properties:
            DataFeed = DataFeedEndpoint.FileSystem;
            IsActive = true;
            Bridge = new ConcurrentQueue<List<BaseData>>[_subscriptions];
            EndOfBridge = new bool[_subscriptions];
            SubscriptionReaders = new SubscriptionDataReader[_subscriptions];
            FillForwardFrontiers = new DateTime[_subscriptions];
            RealtimePrices = new List<decimal>(_subscriptions);

            //Class Privates:
            _job = job;
            _algorithm = algorithm;
            _endOfStreams = false;
            _bridgeMax = _bridgeMax / _subscriptions; //Set the bridge maximum count:

            for (var i = 0; i < _subscriptions; i++)
            {
                //Create a new instance in the dictionary:
                Bridge[i] = new ConcurrentQueue<List<BaseData>>();
                EndOfBridge[i] = false;

                SubscriptionReaders[i] = new SubscriptionDataReader(Subscriptions[i], _algorithm.Securities[Subscriptions[i].Symbol], DataFeed, _job.PeriodStart, _job.PeriodFinish);
                FillForwardFrontiers[i] = new DateTime();
            }
        }

        /******************************************************** 
        * CLASS METHODS
        *********************************************************/
        /// <summary>
        /// Main routine for datafeed analysis.
        /// </summary>
        /// <remarks>This is a hot-thread and should be kept extremely lean. Modify with caution.</remarks>
        public void Run()
        {
            Log.Trace("debug FileSystemDataFeed.run()");
            Console.WriteLine("FileSystemDataFeed.run()");
            //Calculate the increment based on the subscriptions:
            var tradeBarIncrements = CalculateIncrement(includeTick: false);
            var increment = CalculateIncrement(includeTick: true);

            //Loop over each date in the job
            foreach (var date in Time.EachTradeableDay(_algorithm.Securities, _job.PeriodStart, _job.PeriodFinish))
            {
                Log.Trace("in trading date:"+date+",PeriodStart:"+_job.PeriodStart+",PeriodFinish:"+_job.PeriodFinish);
                //Update the source-URL from the BaseData, reset the frontier to today. Update the source URL once per day.
                // this is really the next frontier in the future
                var frontier = date.Add(increment);
                var activeStreams = _subscriptions;
                Log.Trace("subscription:" + _subscriptions);
                //Initialize the feeds to this date:
                for (var i = 0; i < _subscriptions; i++) 
                {
                    //Don't refresh source when we know the market is closed for this security:
                    Log.Trace("i:"+i+"subscription");
                    var success = SubscriptionReaders[i].RefreshSource(date);

                    //If we know the market is closed for security then can declare bridge closed.
                    if (success) {
                        EndOfBridge[i] = false;
                    }
                    else
                    {
                        ProcessMissingFileFillForward(SubscriptionReaders[i], i, tradeBarIncrements, date);
                        EndOfBridge[i] = true;
                    }
                }

                //Pause the DataFeed
                var bridgeFullCount = Bridge.Count(bridge => bridge.Count >= _bridgeMax);
                var bridgeZeroCount = Bridge.Count(bridge => bridge.Count == 0);
                var active = GetActiveStreams();

                //Pause here while bridges are full, but allow missing files to pass
                while (bridgeFullCount > 0 && ((_subscriptions - active) == bridgeZeroCount) && !_exitTriggered)
                {
                    bridgeFullCount = Bridge.Count(bridge => bridge.Count >= _bridgeMax);
                    bridgeZeroCount = Bridge.Count(bridge => bridge.Count == 0);
                    Thread.Sleep(5);
                }

                // for each smallest resolution
                var datePlusOneDay = date.Date.AddDays(1);
                while ((frontier.Date == date.Date || frontier.Date == datePlusOneDay) && !_exitTriggered)
                {
                    var cache = new List<BaseData>[_subscriptions];
                    
                    //Reset Loop:
                    long earlyBirdTicks = 0;

                    //Go over all the subscriptions, one by one add a minute of data to the bridge.
                    //对所订阅的证券进行一个个的加载,加载到数据桥中
                    for (var i = 0; i < _subscriptions; i++)
                    {
                        //Get the reader manager:获得第i个证券的读者
                        var manager = SubscriptionReaders[i];

                        //End of the manager stream set flag to end bridge: also if the EOB flag set, from the refresh source method above
                        if (manager.EndOfStream || EndOfBridge[i])
                        {
                            EndOfBridge[i] = true;
                            activeStreams = GetActiveStreams();
                            if (activeStreams == 0)
                            {
                                frontier = frontier.Date + TimeSpan.FromDays(1);
                            }
                            continue;
                        }

                        //Initialize data store:
                        cache[i] = new List<BaseData>(2);

                        //Add the last iteration to the new list: only if it falls into this time category
                        //下面这个代码很关键,它把当前读到的数据条放到该证券对应的链表里面
                        var cacheAtIndex = cache[i];
                        while (manager.Current.EndTime < frontier)
                        {
                            Log.Trace("Current:symbol:" + manager.Current.Symbol + ",price" + manager.Current.Price);
                            cacheAtIndex.Add(manager.Current);//放Current到该证券对应的链表里面
                            Log.Trace(string.Format("FileSystemDataFeed,Current: {0}", manager.Current));
                            if (!manager.MoveNext()) break;//读取下一个数据
                        }

                        //Save the next earliest time from the bridges: only if we're not filling forward.
                        if (manager.Current != null)
                        {
                            if (earlyBirdTicks == 0 || manager.Current.EndTime.Ticks < earlyBirdTicks)
                            {
                                earlyBirdTicks = manager.Current.EndTime.Ticks;
                            }
                        }
                    }

                    if (activeStreams == 0)
                    {
                        break;
                    }

                    //Add all the lists to the bridge, release the bridge
                    //we push all the data up to this frontier into the bridge at once
                    for (var i = 0; i < _subscriptions; i++)
                    {
                        if (cache[i] != null && cache[i].Count > 0)
                        {
                            FillForwardFrontiers[i] = cache[i][0].EndTime;
                            Bridge[i].Enqueue(cache[i]);
                        }
                        ProcessFillForward(SubscriptionReaders[i], i, tradeBarIncrements);
                    }

                    //This will let consumers know we have loaded data up to this date
                    //So that the data stream doesn't pull off data from the same time period in different events
                    LoadedDataFrontier = frontier;

                    if (earlyBirdTicks > 0 && earlyBirdTicks > frontier.Ticks) 
                    {
                        //Jump increment to the nearest second, in the future: Round down, add increment
                        frontier = (new DateTime(earlyBirdTicks)).RoundDown(increment) + increment;
                    }
                    else
                    {
                        //Otherwise step one forward.
                        frontier += increment;
                    }

                } // End of This Day.

                if (_exitTriggered) break;

            } // End of All Days:

            Log.Trace(DataFeed + ".Run(): Data Feed Completed.");
            LoadingComplete = true;

            //Make sure all bridges empty before declaring "end of bridge":
            while (!EndOfBridges && !_exitTriggered)
            {
                for (var i = 0; i < _subscriptions; i++)
                {
                    //Nothing left in the bridge, mark it as finished
                    if (Bridge[i].Count == 0)
                    {
                        EndOfBridge[i] = true;
                    }
                }
                if (GetActiveStreams() == 0) _endOfStreams = true;
                Thread.Sleep(100);
            }

            //Close up all streams:
            for (var i = 0; i < Subscriptions.Count; i++)
            {
                SubscriptionReaders[i].Dispose();
            }

            Log.Trace(DataFeed + ".Run(): Ending Thread... ");
            IsActive = false;
        }



        /// <summary>
        /// Send an exit signal to the thread.
        /// 退出该线程
        /// </summary>
        public void Exit()
        {
            _exitTriggered = true;
            PurgeData();
        }


        /// <summary>
        /// Loop over all the queues and clear them to fast-quit this thread and return to main.
        /// 清除缓存
        /// </summary>
        public void PurgeData()
        {
            foreach (var t in Bridge)
            {
                t.Clear();
            }
        }

        private void ProcessMissingFileFillForward(SubscriptionDataReader manager, int i, TimeSpan increment, DateTime dateToFill)
        {
            // we'll copy the current into the next day
            var subscription = Subscriptions[i];
            if (!subscription.FillDataForward || manager.Current == null) return;

            var start = dateToFill.Date + manager.Exchange.MarketOpen;
            if (subscription.ExtendedMarketHours)
            {
                start = dateToFill.Date + manager.Exchange.ExtendedMarketOpen;
            }

            // shift the 'start' time to the end of the bar by adding the increment, this makes 'date'
            // the end time which also allows the market open functions to behave as expected

            var current = manager.Current;
            for (var endTime = start.Add(increment); endTime.Date == dateToFill.Date; endTime = endTime + increment)
            {
                if (manager.IsMarketOpen(endTime) || (subscription.ExtendedMarketHours && manager.IsExtendedMarketOpen(endTime)))
                {
                    EnqueueFillForwardData(i, current, endTime);
                }
                else
                {
                    // stop fill forwarding when we're no longer open
                    break;
                }
            }
        }

        /// <summary>
        /// If this is a fillforward subscription, look at the previous time, and current time, and add new 
        /// objects to queue until current time to fill up the gaps.
        /// </summary>
        /// <param name="manager">Subscription to process</param>
        /// <param name="i">Subscription position in the bridge ( which queue are we pushing data to )</param>
        /// <param name="increment">Timespan increment to jump the fillforward results</param>
        private void ProcessFillForward(SubscriptionDataReader manager, int i, TimeSpan increment)
        {
            // If previous == null cannot fill forward nothing there to move forward (e.g. cases where file not found on first file).
            if (!Subscriptions[i].FillDataForward || manager.Previous == null || manager.Current == null) return;

            //Last tradebar and the current one we're about to add to queue:
            var previous = manager.Previous;
            var current = manager.Current;

            // final two points of file that ends at midnight, causes issues in the day rollover/fill forward
            if (current.EndTime.TimeOfDay.Ticks == 0 && previous.EndTime == current.Time)
            {
                return;
            }

            //Initialize the frontier:
            if (FillForwardFrontiers[i].Ticks == 0) FillForwardFrontiers[i] = previous.EndTime;

            // using the previous to fill forward since 'current' is ahead the frontier
            var whatToFill = previous;
            // using current.EndTime as fill until because it's the next piece of data we have for this subscription
            var fillUntil = current.EndTime;

            //Data ended before the market closed: premature ending flag - continue filling forward until market close.
            if (manager.EndOfStream && manager.IsMarketOpen(current.EndTime))
            {
                //Make sure we only fill forward to end of *today* -- don't fill forward tomorrow just because its also open
                fillUntil = FillForwardFrontiers[i].Date.AddDays(1);
                // since we ran out of data, use the current as the clone source, it's more recent than previous
                whatToFill = current;
            }

            // loop from our last time (previous.EndTime) to our current.EndTime, filling in all missing day during
            // request market hours
            for (var endTime = FillForwardFrontiers[i] + increment; (endTime < fillUntil); endTime = endTime + increment)
            {
                if (Subscriptions[i].ExtendedMarketHours)
                {
                    if (!manager.IsExtendedMarketOpen(endTime.Subtract(increment)))
                    {
                        //If we've asked for extended hours, and the security is no longer inside extended market hours, skip:
                        continue;
                    }
                }
                else
                {
                    // if the market isn't open skip to the current.EndTime and rewind until the market is open
                    // this is the case where the previous value is from yesterday but we're trying to fill forward
                    // the next day, so instead of zooming through 18 hours of off-market hours, skip to our current data
                    // point and rewind the market open.
                    //
                    // E.g, Current.EndTime = 9:40am and Previous.EndTime = 2:00pm, so fill in from 2->4pm
                    // and then skip to 9:40am, reverse to 9:30am and fill from 9:30->9:40
                    if (!manager.IsMarketOpen(endTime.Subtract(increment)) && Subscriptions[i].Resolution != Resolution.Daily)
                    {
                        // Move fill forward so we don't waste time in this tight loop.
                        endTime = fillUntil;
                        do
                        {
                            endTime = endTime - increment;
                        }
                        // is market open assumes start time of bars, open at 9:30 closed at 4:00
                        // so decrement our date to use the start time
                        while (manager.IsMarketOpen(endTime.Subtract(increment)));
                        continue;
                    }
                }

                // add any overlap condition here
                if (Subscriptions[i].Resolution == Resolution.Daily)
                {
                    // handle fill forward on lower resolutions
                    var barStartTime = endTime - increment;
                    if (manager.Exchange.IsOpenDuringBar(barStartTime, endTime, Subscriptions[i].ExtendedMarketHours))
                    {
                        EnqueueFillForwardData(i, previous, endTime);
                    }
                    // special case catch missing days
                    else if (endTime.TimeOfDay.Ticks == 0 && manager.Exchange.DateIsOpen(endTime.Date.AddDays(-1)))
                    {
                        EnqueueFillForwardData(i, previous, endTime);
                    }
                    continue;
                }

                EnqueueFillForwardData(i, whatToFill, endTime);
            }
        }


        private void EnqueueFillForwardData(int i, BaseData previous, DateTime dataEndTime)
        {
            var cache = new List<BaseData>(1);
            var fillforward = previous.Clone(true);
            fillforward.Time = dataEndTime.Subtract(Subscriptions[i].Increment);
            fillforward.EndTime = dataEndTime;
            FillForwardFrontiers[i] = dataEndTime;
            cache.Add(fillforward);
            Bridge[i].Enqueue(cache);
        }


        /// <summary>
        /// Get the number of active streams still EndOfBridge array.
        /// </summary>
        /// <returns>Count of the number of streams with data</returns>
        private int GetActiveStreams()
        {
            //Get the number of active streams:
            var activeStreams = (from stream in EndOfBridge
                                 where stream == false
                                 select stream).Count();
            return activeStreams;
        }


        /// <summary>
        /// Calculate the minimum increment to scan for data based on the data requested.
        /// </summary>
        /// <param name="includeTick">When true the subscriptions include a tick data source, meaning there is almost no increment.</param>
        /// <returns>Timespan to jump the data source so it efficiently orders the results</returns>
        private TimeSpan CalculateIncrement(bool includeTick)
        {
            var increment = TimeSpan.FromDays(1);
            foreach (var config in Subscriptions)
            {
                switch (config.Resolution)
                {
                    //Hourly TradeBars:
                    case Resolution.Hour:
                        if (increment > TimeSpan.FromHours(1))
                        {
                            increment = TimeSpan.FromHours(1);
                        }
                        break;

                    //Minutely TradeBars:
                    case Resolution.Minute:
                        if (increment > TimeSpan.FromMinutes(1))
                        {
                            increment = TimeSpan.FromMinutes(1);
                        }
                        break;

                    //Secondly Bars:
                    case Resolution.Second:
                        if (increment > TimeSpan.FromSeconds(1))
                        {
                            increment = TimeSpan.FromSeconds(1);
                        }
                        break;

                    //Ticks: No increment; just fire each data piece in as they happen.
                    case Resolution.Tick:
                        if (increment > TimeSpan.FromMilliseconds(1) && includeTick)
                        {
                            increment = new TimeSpan(0, 0, 0, 0, 1);
                        }
                        break;
                }
            }
            return increment;
        }

    } // End FileSystem Local Feed Class:
} // End Namespace

4. BackTestingDataFeed 回归测试数据槽

namespace QuantConnect.Lean.Engine.DataFeeds
{
    /******************************************************** 
    * CLASS DEFINITIONS
    *********************************************************/
    /// <summary>
    /// Backtesting data feed extends the filesystem data feed with almost no modifications. Later this method can
    /// be used for implementing alternative sources/generation for backtesting data.
    /// 回归测试数据槽是文件系统数据槽的派生类
    /// </summary>
    public class BacktestingDataFeed : FileSystemDataFeed
    {
        /******************************************************** 
        * CLASS VARIABLES
        *********************************************************/

        /******************************************************** 
        * CLASS PROPERTIES
        *********************************************************/

        /******************************************************** 
        * CLASS CONSTRUCTOR
        *********************************************************/
        /// <summary>
        /// Pass through the backtesting datafeed to the underlying file system datafeed implementation.
        /// </summary>
        /// <param name="algorithm">Algorithm we're operating with</param>
        /// <param name="job">Algorithm worker job</param>
        public BacktestingDataFeed(IAlgorithm algorithm, BacktestNodePacket job) : base(algorithm, job)
        {
            DataFeed = DataFeedEndpoint.Backtesting;
        }
    } // End Backtesting Feed Class:
} // End Namespace


此外还有数据库数据槽DataBaseDataFeed和LiveTradingDataFeed实时交易数据槽。在这里就不在说明。

相关文章
|
6月前
|
安全 编译器 C++
[笔记]读书笔记 C++设计新思维《二》技术(Techniques)(一)
[笔记]读书笔记 C++设计新思维《二》技术(Techniques)
|
6月前
|
存储 算法 Java
[笔记]读书笔记 C++设计新思维《二》技术(Techniques)(二)
[笔记]读书笔记 C++设计新思维《二》技术(Techniques)(二)
|
7月前
|
存储 JavaScript 前端开发
关于缺少编程基础的朋友想转行 ABAP 开发岗提出的一些咨询问题和解答
关于缺少编程基础的朋友想转行 ABAP 开发岗提出的一些咨询问题和解答
44 0
|
SQL 前端开发 程序员
High&NewTech:新物种?这是一种不需要写代码的程序猿,这事,得从Ta们掌握了 iVX工具(首个无代码编程语言)说起(二)
High&NewTech:新物种?这是一种不需要写代码的程序猿,这事,得从Ta们掌握了 iVX工具(首个无代码编程语言)说起……
High&NewTech:新物种?这是一种不需要写代码的程序猿,这事,得从Ta们掌握了 iVX工具(首个无代码编程语言)说起(二)
|
Devops 持续交付
《精益产品开发》读书笔记之六--结
何老师的这本书是一本非常“好”读的书,深涩的概念也是讲得深入浅出,触类旁通,而且故事感十足。
169 0
《精益产品开发》读书笔记之六--结
面向AARRR 的 移动架构设计 思考备忘
版权声明:本文为半吊子子全栈工匠(wireless_com,同公众号)原创文章,未经允许不得转载。
774 0