基于Spark的证券公司仪表板实现

简介: 基于Spark的证券公司仪表板实现

一、业务场景

现应某证券公司要求,应用Spark流处理技术,为该公司开发一套实时显示股票信息的仪表盘系统,用来实时统计每秒钟买卖股票的数量。

二、数据集说明

本案例用到的实时数据说明如下:

  - 原始数据文件位于本地/data/dataset/streaming/orders.txt

  - 脚本文件位于本地/data/dataset/streaming/splitAndSend.sh

  - 通过运行splitAndSend.sh脚本,读取orders.txt数据并发送到HDFS的/data/dataset/orders-input/目录下,形成实时数据流,然后由Spark流处理程序处理在该目录下到达的流数据。

orders.txt文件中共有500,000行,代表买卖订单。这些数据是随机生成的。每一行包含如下用逗号分隔的元素:

  ■ Order时间戳 — 格式为yyyy-mm-dd hh:MM :ss

  ■ Order ID — 连续递增的整数

  ■ Client ID — 从1到100的范围随机选取整数

  ■ Stock代码 — 从80个股票符号的列表中随机抽取

  ■ 股票买卖的数量 — 从1到1000中的随机数字

  ■ 股票买卖的价格 — 从1到100中的随机数字

  ■ 字符B 或 S — 代表一个订单的买(B)/卖(S)事件

以可使用如下命令查看文件的前5行内容:

1.  head -n 5 /data/dataset/streaming/orders.txt

内容如下所示:

1.  2016-03-22 20:25:28,1,80,EPE,710,51.00,B
2.  2016-03-22 20:25:28,2,70,NFLX,158,8.00,B
3.  2016-03-22 20:25:28,3,53,VALE,284,5.00,B
4.  2016-03-22 20:25:28,4,14,SRPT,183,34.00,B
5.  2016-03-22 20:25:28,5,62,BP,241,36.00,S

三、操作步骤

阶段一、启动HDFS、Spark集群服务

1、启动HDFS集群

在Linux终端窗口下,输入以下命令,启动HDFS集群:

1.  $ start-dfs.sh

2、启动Spark集群

  在Linux终端窗口下,输入以下命令,启动Spark集群:

1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh

3、验证以上进程是否已启动

  在Linux终端窗口下,输入以下命令,查看启动的服务进程:

1.  $ jps

如果显示以下5个进程,则说明各项服务启动正常,可以继续下一阶段。

1.  2288 NameNode
2.  2402 DataNode
3.  2603 SecondaryNameNode
4.  2769 Master
5.  2891 Worker

阶段二、准备案例中用到的数据监控目录

1、在HDFS上创建实时数据流监控目录。在Linux终端窗口下,输入以下命令:

1.  $ hdfs dfs -mkdir -p /data/dataset/orders-input
2.  $ hdfs dfs -mkdir -p /tmp/streaming

2、在Linux终端窗口下,输入以下命令,查看HDFS上是否已经有了orders-input目录:

1.  $ hdfs dfs -ls /data/dataset/orders-input

这时应该看到该目录下目前为空。

阶段三、处理实时数据

1、启动spark shell。在Linux终端窗口中,输入以下命令:(注意,请将命令中的localhost替换为虚拟机实际的机器名)

1.  spark-shell --master spark://localhost:7077

执行以上代码,进入到Spark Shell交互开发环境:

5aaab90186ad48719928f8abb7d7a67c.png

2、首先需要导入案例代码运行所依赖的包。在paste模式下,输入以下代码:

1.  // 首先导入包
2.  import org.apache.spark._
3.  import org.apache.spark.streaming._
4.  import java.sql.Timestamp
5.  import java.text.SimpleDateFormat

同时按下【Ctrl + D】按键,执行以上代码,输出结果如下:

84fb8ebd4b9c426d933a5ca961a25812.png

3、构造StreamingContext对象,读取流数据,构造DStream。在paste模式下,输入以下代码:

1.  val ssc = new StreamingContext(sc, Seconds(5))
2.  val inputPath = "/data/dataset/orders-input"
3.  val fileStream = ssc.textFileStream(inputPath)

同时按下【 Ctrl + D】键,执行以上代码,输出结果如下:

1a93c20a004a4ceeaa5c050ceeb10252.png

4、构造一个case class,表示实时订单数据对象。在paste模式下,输入以下代码:

1.  case class Order(time: java.sql.Timestamp, 
2.                   orderId:Long, 
3.                   clientId:Long, 
4.                   symbol:String, 
5.                   amount:Int, 
6.                   price:Double, 
7.                   buy:Boolean)

同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:

807c44408baa4df5b7b2fddcc3dbc23d.png

由以上输出内容可以看出,定义了一个case class叫Order。

5、对接收到的实时数据进行格式处理和转换,包括日期格式解析。在paste模式下,输入以下代码:

1.  val orders = fileStream.flatMap( line => {
2.      val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
3.      val s = line.split(",")
4.      try{
5.          assert(s(6)=="B" || s(6)=="S")
6.          List(Order(new Timestamp(dateFormat.parse(s(0)).getTime()),s(1).toLong, s(2).toLong, s(3), s(4).toInt,s(5).toDouble, s(6) == "B"))
7.      }catch{
8.          case e: Throwable => println("行内容格式错误(" + e + "): " + line)
9.          List()
10.     }
11. })

同时按下【Ctrl + D】 键,执行以上代码,输出内容如下:

fd268f80230e40c1a5dea7625a8239b1.png

6、计算每秒钟买卖股票的数量,并将结果保存到指定的位置。在paste模式下,输入以下代码:

1.  val numPerType = orders.map(o => (o.buy, 1L))
2.                         .reduceByKey((c1,c2) => c1 + c2)
3.  val resultPath = "/data/dataset/orders-output/result"
4.  numPerType.repartition(1).saveAsTextFiles(resultPath, "txt")
5.  numPerType.print()

同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:

9e5a5b6e88174c2aa669b7db08aff4cd.png

7、启动刚ssc,监听实时流程序,并处理。在paste模式下,输入以下代码:

1.  ssc.start()

同时按下 【Ctrl + D】 键,执行以上代码。

   上面在spark-shell中执行的程序,一旦你输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息。

d8d5145ad9d248fbb7bbaa80c1778ec9.png

8、另打开一个终端窗口,命令行下执行如下命令:

1.  $  cd /data/dataset/streaming
2.  $  chmod +x splitAndSend.sh
3.  $  ./splitAndSend.sh /data/dataset/orders-input

按下【Enter】键,执行以上代码。

9、回到第一个窗口(Spark流程序运行窗口),观察输出结果,如下所示:

                                 d7081f8d6f1441f4a9e1e0b7b37cca31.png

在界面中会每秒钟输出股票买卖统计结果。其中true表示买入,false表示卖出。

10、在第一个终端窗口中,按下【Ctrl + C】键,终止程序执行。

或者,执行如下方法来停止:

1.  ssc.stop(false)

— END —








相关文章
|
6月前
|
分布式计算 DataWorks 图计算
在DataWorks中使用PySpark的GraphX进行图计算
【2月更文挑战第14天】在DataWorks中使用PySpark的GraphX进行图计算
139 8
|
5月前
|
存储 机器学习/深度学习 缓存
如何使用PySpark进行离线数据分析?
【6月更文挑战第15天】如何使用PySpark进行离线数据分析?
79 10
|
5月前
|
机器学习/深度学习 分布式计算 数据挖掘
如何使用PySpark进行实时数据分析?
【6月更文挑战第15天】如何使用PySpark进行实时数据分析?
80 7
|
6月前
|
SQL 分布式计算 大数据
Spark 的集成
Spark 的集成
73 2
|
数据可视化 关系型数据库 MySQL
Apache Superset 1.2.0教程 (三)—— 图表功能详解
通过之前章节的学习,我们已经成功地安装了superset,并且连接mysql数据库,可视化了王者英雄的数据。使用的是最简单Table类型的图表,但是superset还支持非常多的图表类型。 本文我们将对各种图表类型进行逐一的演示,文章较长。
971 0
Apache Superset 1.2.0教程 (三)—— 图表功能详解
|
存储 分布式计算 Java
如何通过Cloudera Manager页面自定义图表
如何通过Cloudera Manager页面自定义图表
320 1
|
网络协议 数据可视化 Ubuntu
使用 Apache Superset 可视化 ClickHouse 数据
Apache Superset是一个强大的BI工具,它提供了查看和探索数据的方法。它在 ClickHouse 用户中也越来越受欢迎。 我们将介绍安装 Superset 的 2 种方法,然后展示如何从 Superset 连接到您的第一个 ClickHouse 数据库。代码示例基于 Ubuntu 18.04、Superset 1.1.0 和 clickhouse-sqlalchemy 0.1.6。
872 0
使用 Apache Superset 可视化 ClickHouse 数据
|
SQL JSON 分布式计算
【Spark】(task2)PySpark数据统计和分组聚合
1.2 保存读取的信息 步骤2:将读取的进行保存,表头也需要保存,这里可保存为csv或者json格式文件。
680 0
【Spark】(task2)PySpark数据统计和分组聚合
|
分布式计算 监控 Spark
X-Pack Spark 监控指标详解
概述 本文主要介绍X-Pack Spark集群监控指标的查看方法。Spark集群对接了Ganglia和云监控。下面分别介绍两者的使用方法。 Ganglia Ganglia是一个分布式监控系统。 Ganglia 入口 打开Spark集群依次进入:数据库连接>UI访问>详细监控UI>Ganglia。
1520 0
如何使用X-Pack Spark的YarnUI、SparkUI、Spark日志、任务运行状况的分析
概述 X-Pack Spark目前是通过Yarn管理资源,在提交Spark 任务后我们经常需要知道任务的运行状况,例如在哪里看日志、怎么查看每个Executor的运行状态、每个task的运行状态,性能瓶颈点在哪里等信息。
3709 0
下一篇
无影云桌面