程序员的量化交易之路(37)--Lean之DataStream数据流5

简介:

转载需注明出处:http://blog.csdn.net/minimicallhttp://cloudtrade.top

我们之前说明了数据读者,数据槽。将数据读取到队列中,在算法主线程中需要使用DataFeed线程的数据。这是一个典型的读者-写着问题。

在主线程中和DataFeed打教导的事DataStream。下面我们看它的代码。说明在注释中说明了。

/*
 * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
 * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); 
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at http://www.apache.org /licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;
using System.Linq;
using QuantConnect.Data;
using QuantConnect.Lean.Engine.DataFeeds;
using QuantConnect.Logging;

namespace QuantConnect.Lean.Engine
{
    /******************************************************** 
    * QUANTCONNECT NAMESPACES
    *********************************************************/
    /// <summary>
    /// Data stream class takes a datafeed hander and converts it into a synchronized enumerable data format for looping 
    /// 数据流类,拥有数据槽句柄,然后将数据转化为同步可枚举的,用作循环。
    /// in the primary algorithm thread.
    /// </summary>
    public static class DataStream
    {
        /******************************************************** 
        * CLASS VARIABLES
        *********************************************************/
        //Count of bridges and subscriptions.
        //订阅数,就是addSecurity添加的证券数量
        private static int _subscriptions;

        /******************************************************** 
        * CLASS PROPERTIES
        *********************************************************/

        /// <summary>
        /// The frontier time of the data stream
        /// 
        /// </summary>
        public static DateTime AlgorithmTime { get; private set; }

        /******************************************************** 
        * CLASS METHODS
        *********************************************************/
        /// <summary>
        /// Process over the datafeed cross thread bridges to generate an enumerable sorted collection of the data, ready for a consumer
        /// to use and already synchronized in time.
        /// </summary>
        /// <param name="feed">DataFeed object</param>
        /// <param name="frontierOrigin">Starting date for the data feed</param>
        /// <returns></returns>
        public static IEnumerable<Dictionary<int, List<BaseData>>> GetData(IDataFeed feed, DateTime frontierOrigin)
        {
            //Initialize:
            _subscriptions = feed.Subscriptions.Count;//订阅的证券数量
            AlgorithmTime = frontierOrigin;//算法前置时间
            long algorithmTime = AlgorithmTime.Ticks;//获取时间tick
            var frontier = frontierOrigin;
            var nextEmitTime = DateTime.MinValue;
            var periods = feed.Subscriptions.Select(x => x.Resolution.ToTimeSpan()).ToArray();

            //Wait for datafeeds to be ready, wait for first data to arrive:
            while (feed.Bridge.Length != _subscriptions) Thread.Sleep(100);

            // clear data first when in live mode, start with fresh data
            if (Engine.LiveMode)
            {
                feed.PurgeData();
            }

            //Get all data in queues: return as a sorted dictionary:
            //获取队列中的所有数据,以排序好的字典返回
            while (!feed.EndOfBridges)
            {
                //Reset items which are not fill forward:
                long earlyBirdTicks = 0;
                var newData = new Dictionary<int, List<BaseData>>();

                // spin wait until the feed catches up to our frontier
                WaitForDataOrEndOfBridges(feed, frontier);

                for (var i = 0; i < _subscriptions; i++)
                {
                    //If there's data on the bridge, check to see if it's time to pull it off, if it's in the future
                    // we'll record the time as 'earlyBirdTicks' so we can fast forward the frontier time
                    while (!feed.Bridge[i].IsEmpty)//第i个证券中有数据
                    {
                        //Look at first item on list, leave it there until time passes this item.
                        //检查链表中第一个条目
                        List<BaseData> result;
                        if (!feed.Bridge[i].TryPeek(out result))
                        {
                            // if there's no item skip to the next subscription
                            //如果这个证券没有数据,那么就直接跳到下一个证券
                            break;
                        }
                        if (result.Count > 0 && result[0].EndTime > frontier)
                        {
                            // we have at least one item, check to see if its in ahead of the frontier,
                            // if so, keep track of how many ticks in the future it is
                            if (earlyBirdTicks == 0 || earlyBirdTicks > result[0].EndTime.Ticks)
                            {
                                earlyBirdTicks = result[0].EndTime.Ticks;
                            }
                            break;
                        }
                        if (result.Count > 0)
                        {
                            // we have at least one item, check to see if its in ahead of the frontier,
                            // if so, keep track of how many ticks in the future it is
                            if (earlyBirdTicks == 0 || earlyBirdTicks > result[0].EndTime.Ticks)
                            {
                                earlyBirdTicks = result[0].EndTime.Ticks;
                            }
                        }

                        //Pull a grouped time list out of the bridge
                        List<BaseData> dataPoints;
                        if (feed.Bridge[i].TryDequeue(out dataPoints))
                        {
                            // round the time down based on the requested resolution for fill forward data
                            // this is a temporary fix, long term fill forward logic should be moved into this class
                            foreach (var point in dataPoints)
                            {
                                if (algorithmTime < point.EndTime.Ticks)
                                {
                                    // set this to most advanced end point in time, pre rounding
                                    // min  10:02  10:02:01(FF) 10:02:01.01(FF)
                                    // sec  10:02  10:02:01     10:02:01.01(FF)
                                    // tic  10:02  10:02:01     10:02:01.01
                                    // the algorithm time should always be the 'frontier' the furthest
                                    // time within this data slice
                                    algorithmTime = point.EndTime.Ticks;
                                }
                                if (point.IsFillForward)
                                {
                                    point.Time = point.Time.RoundDown(periods[i]);
                                }
                            }
                            // add the list to the collection to be yielded
                            List<BaseData> dp;
                            if (!newData.TryGetValue(i, out dp))
                            {
                                dp = new List<BaseData>();
                                newData[i] = dp;
                            }
                            dp.AddRange(dataPoints);
                        }
                        else
                        {
                            //Should never fail:
                            Log.Error("DataStream.GetData(): Failed to dequeue bridge item");
                        }
                    }
                }

                if (newData.Count > 0)
                {
                    AlgorithmTime = new DateTime(algorithmTime);
                    yield return newData;
                }

                //Update the frontier and start again.
                if (earlyBirdTicks > 0)
                {
                    //Seek forward in time to next data event from stream: there's nothing here for us to do now: why loop over empty seconds
                    frontier = new DateTime(earlyBirdTicks);
                }
                else if (feed.EndOfBridges)
                {
                    // we're out of data or quit
                    break;
                }

                //Allow loop pass through emits every second to allow event handling (liquidate/stop/ect...)
                if (Engine.LiveMode && DateTime.Now > nextEmitTime)
                {
                    AlgorithmTime = DateTime.Now.RoundDown(periods.Min());
                    nextEmitTime = DateTime.Now + TimeSpan.FromSeconds(1);
                    yield return new Dictionary<int, List<BaseData>>();
                }
            }
            Log.Trace("DataStream.GetData(): All Streams Completed.");
        }

        /// <summary>
        /// Waits until the data feed is ready for the data stream to pull data from it.
        /// 等待数据槽准备好了,可以从里面拉取数据
        /// </summary>
        /// <param name="feed">The IDataFeed instance populating the bridges</param>
        /// <param name="dataStreamFrontier">The frontier of the data stream</param>
        private static void WaitForDataOrEndOfBridges(IDataFeed feed, DateTime dataStreamFrontier)
        {
            //Make sure all bridges have data to to peek sync properly.
            var now = Stopwatch.StartNew();

            // timeout to prevent infinite looping here -- 50ms for live and 30sec for non-live
            var loopTimeout = (Engine.LiveMode) ? 50 : 30000;

            if (Engine.LiveMode)
            {
                // give some time to the other threads in live mode
                Thread.Sleep(1);
            }

            //Waiting for data in the bridges:
            while (!AllBridgesHaveData(feed) && now.ElapsedMilliseconds < loopTimeout)
            {
                Thread.Sleep(1);
            }

            //we want to verify that our data stream is never ahead of our data feed.
            //this acts as a virtual lock around the bridge so we can wait for the feed
            //to be ahead of us
            // if we're out of data then the feed will never update (it will stay here forever if there's no more data, so use a timeout!!)
            while (dataStreamFrontier > feed.LoadedDataFrontier && (!feed.EndOfBridges && !feed.LoadingComplete) && now.ElapsedMilliseconds < loopTimeout)
            {
                Thread.Sleep(1);
            }
        }


        /// <summary>
        /// Check if all the bridges have data or are dead before starting the analysis
        /// 
        /// This determines whether or not the data stream can pull data from the data feed.
        /// </summary>
        /// <param name="feed">Feed Interface with concurrent connection between producer and consumer</param>
        /// <returns>Boolean true more data to download</returns>
        private static bool AllBridgesHaveData(IDataFeed feed)
        {
            //Lock on the bridge to scan if it has data:
            for (var i = 0; i < _subscriptions; i++)
            {
                if (feed.EndOfBridge[i]) continue;
                if (feed.Bridge[i].IsEmpty)
                {
                    return false;
                }
            }
            return true;
        }

        /// <summary>
        /// Resets the frontier time to DateTime.MinValue
        /// </summary>
        public static void ResetFrontier()
        {
            AlgorithmTime = new DateTime();
        }
    }
}


相关文章
|
4月前
|
分布式计算 资源调度 测试技术
“Spark Streaming异常处理秘籍:揭秘如何驯服实时数据流的猛兽,守护你的应用稳如泰山,不容错过!”
【8月更文挑战第7天】Spark Streaming 是 Apache Spark 中的关键组件,用于实时数据流处理。部署时可能遭遇数据问题、资源限制或逻辑错误等异常。合理处理这些异常对于保持应用稳定性至关重要。基础在于理解其异常处理机制,通过 DSC 将数据流切分为 RDD。对于数据异常,可采用 try-catch 结构捕获并处理;资源层面异常需优化 Spark 配置,如调整内存分配;逻辑异常则需加强单元测试及集成测试。结合监控工具,可全面提升应用的健壮性和可靠性。
84 3
|
6月前
|
算法 安全 Java
技术经验分享:JavaSecurity:Java加密框架(JCA)简要说明
技术经验分享:JavaSecurity:Java加密框架(JCA)简要说明
|
7月前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
491 0
|
7月前
流式系统:第五章到第八章
流式系统:第五章到第八章
63 0
|
7月前
流式系统:第九章到第十章
流式系统:第九章到第十章
44 0
|
存储 编解码 算法
TDengine Contributor 钟宇讲述 TSZ 压缩算法优化背后的故事
我们对 TDengine Contributor 钟宇进行了一次深入采访,他将从为何选择 TDengine 作为研究对象之一、TSZ 压缩算法的具体优化工作以及参与开源的感受等诸多方面展开分享。
153 1
|
SQL 关系型数据库 MySQL
C#三十四 常用开发的部分总结
C#三十四 常用开发的部分总结
41 0
|
Java
【Esper技术专题】针对于Esper事件驱动框架的数据模型分析| 8月更文挑战
【Esper技术专题】针对于Esper事件驱动框架的数据模型分析| 8月更文挑战
255 0
|
分布式计算 Hadoop 开发者
MapReduce 工作流程(面试重点)| 学习笔记
快速学习 MapReduce 工作流程(面试重点)
170 0
MapReduce 工作流程(面试重点)| 学习笔记
|
数据采集 缓存 数据库
无事来学学--Kettle的转换概念和七大特点详细讲解
转换包括一个或多个步骤,步骤之间通过跳(hop)来连接。跳定义了一个单向通道,允许数据从一个步骤流向另一个步骤。在Kettle中,数据的单位是行,数据流就是数据行从一个步骤到另一个步骤的移动。
329 0