Apache Spark技术实战(三)利用Spark将json文件导入Cassandra &SparkR的安装及使用

简介: 本文第一部分讲解利用Spark将json文件导入Cassandra;第二部分讲解SparkR的安装及使用。

<一>利用Spark将json文件导入Cassandra

概要

sbt cassandra spark-cassandra-connector

实验目的

将存在于json文件中的数据导入到cassandra数据库,目前由cassandra提供的官方工具是json2sstable,由于对cassandra本身了解不多,这个我还没有尝试成功。

但想到spark sql中可以读取json文件,而spark-cassadra-connector又提供了将RDD存入到数据库的功能,我想是否可以将两者结合一下。

创建KeySpace和Table

为了减少复杂性,继续使用实战3中的keyspace和table,

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);

启动spark-shell

与实战3中描述一致。

bin/spark-shell --driver-class-path /root/working/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.1.0-SNAPSHOT.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-thrift/jars/cassandra-thrift-2.0.9.jar:/root/.ivy2/cache/org.apache.thrift/libthrift/jars/libthrift-0.9.1.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-clientutil/jars/cassandra-clientutil-2.0.9.jar:/root/.ivy2/cache/com.datastax.cassandra/cassandra-driver-core/jars/cassandra-driver-core-2.0.4.jar:/root/.ivy2/cache/io.netty/netty/bundles/netty-3.9.0.Final.jar:/root/.ivy2/cache/com.codahale.metrics/metrics-core/bundles/metrics-core-3.0.2.jar:/root/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.7.7.jar:/root/.ivy2/cache/org.apache.commons/commons-lang3/jars/commons-lang3-3.3.2.jar:/root/.ivy2/cache/org.joda/joda-convert/jars/joda-convert-1.2.jar:/root/.ivy2/cache/joda-time/joda-time/jars/joda-time-2.3.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-all/jars/cassandra-all-2.0.9.jar:/root/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.2.jar

准备json文件

以spark自带的person.json文件为例,内容如下所示

{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

数据导入

假设person.json文件存储在$SPARK_HOME目录,在启动spark-shell之后,执行如下语句

sc.stop
import com.datastax.spark.connector._
import org.apache.spark._
val conf = new SparkConf()
conf.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val path = "./people.json"
val people = sqlContext.jsonFile(path)
people.map(p=>(p.getString(10),p.getInt(0)))
      .saveToCassandra("test","kv",SomeColumns("key","value"))

注意:

  1. jsonFile返回的是jsonRDD,其中每一个成员是Row类型,并不行直接将saveToCassandra作用于jsonRDD,需要先作一步转换即map过程
  2. map中使用到的getXXX函数是在事先已知数据类型的情况下取出其值
  3. 最后saveToCassandra触发数据的存储过程

另外一个地方值得记录一下,如果在cassandra中创建的表使用了uuid作为primary key,在scala中使用如下函数来生成uuid

import java.util.UUID
UUID.randomUUID

验证步骤

使用cqlsh来查看数据是否已经真正的写入到test.kv表中。

小结

本次实验结合了以下知识:

本文简要介绍如何使用spark-cassandra-connector将json文件导入到cassandra数据库,这是一个使用spark的综合性示例。

前提条件

假设已经阅读技术实战之3,并安装了如下软件

  1. jdk
  2. scala
  3. spark sql
  4. spark RDD的转换函数
  5. spark-cassandra-connector

<二>SparkR的安装及使用

概要

根据论坛上的信息,在Sparkrelease计划中,在Spark 1.3中有将SparkR纳入到发行版的可能。本文就提前展示一下如何安装及使用SparkR.

SparkR的出现解决了R语言中无法级联扩展的难题,同时也极大的丰富了Spark在机器学习方面能够使用的Lib库。SparkR和Spark MLLIB将共同构建出Spark在机器学习方面的优势地位。

使用SparkR能让用户同时使用Spark RDD提供的丰富Api,也可以调用R语言中丰富的Lib库。

安装SparkR

先决条件:

  1. 已经安装好openjdk 7
  2. 安装好了R

安装步骤:

步骤1: 运行R Shell

bash# R

步骤2:在R shell中安装rJava

install.packages("rJava")

步骤3: 在R shell中安装devtools

install.packages("devtools")

步骤4: 安装好rJava及devtools,接下来安装SparkR

library(devtools)
install_github("amplab-extras/SparkR-pkg", subdir="pkg")

使用SparkR来运行wordcount

安装完SparkR之后,可以用wordcount来检验安装正确与否。

步骤1:在R shell中加载SparkR

library(SparkR)

步骤2:初始化SparkContext及执行wordcount

sc <- sparkR.init(master="local", "RwordCount")
lines <- textFile(sc, "README.md")
words <- flatMap(lines,
  function(line) {
    strsplit(line, " ")[[1]]
  })
wordCount <- lapply(words, function(word) { list(word, 1L) })

counts <- reduceByKey(wordCount, "+", 2L)
output <- collect(counts)
for (wordcount in output) {
  cat(wordcount[[1]], ": ", wordcount[[2]], "\n")
}

如果想将SparkR运行于集群环境中,只需要将master=local,换成spark集群的监听地址即可

小结

时间匆忙,还有两件事情没有来得及细细分析。

  1. SparkR的代码实现
  2. 如果很好的将R中支持的数据挖掘算法与Spark并行化处理能力很好的结合

参考资料

  1. https://github.com/amplab-extras/SparkR-pkg
目录
相关文章
|
9天前
|
数据采集 JSON 数据可视化
JSON数据解析实战:从嵌套结构到结构化表格
在信息爆炸的时代,从杂乱数据中提取精准知识图谱是数据侦探的挑战。本文以Google Scholar为例,解析嵌套JSON数据,提取文献信息并转换为结构化表格,通过Graphviz制作技术关系图谱,揭示文献间的隐秘联系。代码涵盖代理IP、请求头设置、JSON解析及可视化,提供完整实战案例。
JSON数据解析实战:从嵌套结构到结构化表格
|
5月前
|
JSON 算法 vr&ar
目标检测笔记(五):查看通过COCOEvaluator生成的coco_instances_results.json文件的详细检测信息,包含AP、AR、MR和DR等
本文介绍了如何使用COCO评估器通过Detectron2库对目标检测模型进行性能评估,生成coco_instances_results.json文件,并利用pycocotools解析该文件以计算AP、AR、MR和DR等关键指标。
333 1
目标检测笔记(五):查看通过COCOEvaluator生成的coco_instances_results.json文件的详细检测信息,包含AP、AR、MR和DR等
|
1月前
|
开发工具 git 索引
怎么取消对project.private.config.json这个文件的git记录
通过以上步骤,您可以成功取消对 `project.private.config.json`文件的Git记录。这样,文件将不会被包含在未来的提交中,同时仍保留在您的工作区中。
68 28
|
5月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
141 1
|
5月前
|
JSON 数据格式 Python
Python实用记录(十四):python统计某个单词在TXT/JSON文件中出现的次数
这篇文章介绍了一个Python脚本,用于统计TXT或JSON文件中特定单词的出现次数。它包含两个函数,分别处理文本和JSON文件,并通过命令行参数接收文件路径、目标单词和文件格式。文章还提供了代码逻辑的解释和示例用法。
83 0
Python实用记录(十四):python统计某个单词在TXT/JSON文件中出现的次数
|
5月前
|
JSON 数据格式
LangChain-20 Document Loader 文件加载 加载MD DOCX EXCEL PPT PDF HTML JSON 等多种文件格式 后续可通过FAISS向量化 增强检索
LangChain-20 Document Loader 文件加载 加载MD DOCX EXCEL PPT PDF HTML JSON 等多种文件格式 后续可通过FAISS向量化 增强检索
392 2
|
5月前
|
JSON 数据格式 计算机视觉
Opencv实用笔记(一): 获取并绘制JSON标注文件目标区域(可单独保存目标小图)
本文介绍了如何使用OpenCV和Python根据JSON标注文件获取并绘制目标区域,同时可将裁剪的图像单独保存。通过示例代码,展示了如何读取图片路径、解析JSON标注、绘制标注框并保存裁剪图像的过程。此外,还提供了相关的博客链接,供读者进一步学习。
121 0
|
3月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
442 33
The Past, Present and Future of Apache Flink
|
5月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1100 13
Apache Flink 2.0-preview released
|
5月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
194 3

推荐镜像

更多