Apache Spark技术实战(三)利用Spark将json文件导入Cassandra &SparkR的安装及使用

简介: 本文第一部分讲解利用Spark将json文件导入Cassandra;第二部分讲解SparkR的安装及使用。

<一>利用Spark将json文件导入Cassandra

概要

sbt cassandra spark-cassandra-connector

实验目的

将存在于json文件中的数据导入到cassandra数据库,目前由cassandra提供的官方工具是json2sstable,由于对cassandra本身了解不多,这个我还没有尝试成功。

但想到spark sql中可以读取json文件,而spark-cassadra-connector又提供了将RDD存入到数据库的功能,我想是否可以将两者结合一下。

创建KeySpace和Table

为了减少复杂性,继续使用实战3中的keyspace和table,

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);

启动spark-shell

与实战3中描述一致。

bin/spark-shell --driver-class-path /root/working/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.1.0-SNAPSHOT.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-thrift/jars/cassandra-thrift-2.0.9.jar:/root/.ivy2/cache/org.apache.thrift/libthrift/jars/libthrift-0.9.1.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-clientutil/jars/cassandra-clientutil-2.0.9.jar:/root/.ivy2/cache/com.datastax.cassandra/cassandra-driver-core/jars/cassandra-driver-core-2.0.4.jar:/root/.ivy2/cache/io.netty/netty/bundles/netty-3.9.0.Final.jar:/root/.ivy2/cache/com.codahale.metrics/metrics-core/bundles/metrics-core-3.0.2.jar:/root/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.7.7.jar:/root/.ivy2/cache/org.apache.commons/commons-lang3/jars/commons-lang3-3.3.2.jar:/root/.ivy2/cache/org.joda/joda-convert/jars/joda-convert-1.2.jar:/root/.ivy2/cache/joda-time/joda-time/jars/joda-time-2.3.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-all/jars/cassandra-all-2.0.9.jar:/root/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.2.jar

准备json文件

以spark自带的person.json文件为例,内容如下所示

{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

数据导入

假设person.json文件存储在$SPARK_HOME目录,在启动spark-shell之后,执行如下语句

sc.stop
import com.datastax.spark.connector._
import org.apache.spark._
val conf = new SparkConf()
conf.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val path = "./people.json"
val people = sqlContext.jsonFile(path)
people.map(p=>(p.getString(10),p.getInt(0)))
      .saveToCassandra("test","kv",SomeColumns("key","value"))

注意:

  1. jsonFile返回的是jsonRDD,其中每一个成员是Row类型,并不行直接将saveToCassandra作用于jsonRDD,需要先作一步转换即map过程
  2. map中使用到的getXXX函数是在事先已知数据类型的情况下取出其值
  3. 最后saveToCassandra触发数据的存储过程

另外一个地方值得记录一下,如果在cassandra中创建的表使用了uuid作为primary key,在scala中使用如下函数来生成uuid

import java.util.UUID
UUID.randomUUID

验证步骤

使用cqlsh来查看数据是否已经真正的写入到test.kv表中。

小结

本次实验结合了以下知识:

本文简要介绍如何使用spark-cassandra-connector将json文件导入到cassandra数据库,这是一个使用spark的综合性示例。

前提条件

假设已经阅读技术实战之3,并安装了如下软件

  1. jdk
  2. scala
  3. spark sql
  4. spark RDD的转换函数
  5. spark-cassandra-connector

<二>SparkR的安装及使用

概要

根据论坛上的信息,在Sparkrelease计划中,在Spark 1.3中有将SparkR纳入到发行版的可能。本文就提前展示一下如何安装及使用SparkR.

SparkR的出现解决了R语言中无法级联扩展的难题,同时也极大的丰富了Spark在机器学习方面能够使用的Lib库。SparkR和Spark MLLIB将共同构建出Spark在机器学习方面的优势地位。

使用SparkR能让用户同时使用Spark RDD提供的丰富Api,也可以调用R语言中丰富的Lib库。

安装SparkR

先决条件:

  1. 已经安装好openjdk 7
  2. 安装好了R

安装步骤:

步骤1: 运行R Shell

bash# R

步骤2:在R shell中安装rJava

install.packages("rJava")

步骤3: 在R shell中安装devtools

install.packages("devtools")

步骤4: 安装好rJava及devtools,接下来安装SparkR

library(devtools)
install_github("amplab-extras/SparkR-pkg", subdir="pkg")

使用SparkR来运行wordcount

安装完SparkR之后,可以用wordcount来检验安装正确与否。

步骤1:在R shell中加载SparkR

library(SparkR)

步骤2:初始化SparkContext及执行wordcount

sc <- sparkR.init(master="local", "RwordCount")
lines <- textFile(sc, "README.md")
words <- flatMap(lines,
  function(line) {
    strsplit(line, " ")[[1]]
  })
wordCount <- lapply(words, function(word) { list(word, 1L) })

counts <- reduceByKey(wordCount, "+", 2L)
output <- collect(counts)
for (wordcount in output) {
  cat(wordcount[[1]], ": ", wordcount[[2]], "\n")
}

如果想将SparkR运行于集群环境中,只需要将master=local,换成spark集群的监听地址即可

小结

时间匆忙,还有两件事情没有来得及细细分析。

  1. SparkR的代码实现
  2. 如果很好的将R中支持的数据挖掘算法与Spark并行化处理能力很好的结合

参考资料

  1. https://github.com/amplab-extras/SparkR-pkg
目录
相关文章
|
4月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
327 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
22天前
|
SQL 分布式计算 关系型数据库
基于云服务器的数仓搭建-hive/spark安装
本文介绍了在本地安装和配置MySQL、Hive及Spark的过程。主要内容包括: - **MySQL本地安装**:详细描述了内存占用情况及安装步骤,涉及安装脚本的编写与执行,以及连接MySQL的方法。 - **Hive安装**:涵盖了从上传压缩包到配置环境变量的全过程,并解释了如何将Hive元数据存储配置到MySQL中。 - **Hive与Spark集成**:说明了如何安装Spark并将其与Hive集成,确保Hive任务由Spark执行,同时解决了依赖冲突问题。 - **常见问题及解决方法**:列举了安装过程中可能遇到的问题及其解决方案,如内存配置不足、节点间通信问题等。
165 1
基于云服务器的数仓搭建-hive/spark安装
|
5月前
|
JSON 算法 vr&ar
目标检测笔记(五):查看通过COCOEvaluator生成的coco_instances_results.json文件的详细检测信息,包含AP、AR、MR和DR等
本文介绍了如何使用COCO评估器通过Detectron2库对目标检测模型进行性能评估,生成coco_instances_results.json文件,并利用pycocotools解析该文件以计算AP、AR、MR和DR等关键指标。
358 1
目标检测笔记(五):查看通过COCOEvaluator生成的coco_instances_results.json文件的详细检测信息,包含AP、AR、MR和DR等
|
1月前
|
开发工具 git 索引
怎么取消对project.private.config.json这个文件的git记录
通过以上步骤,您可以成功取消对 `project.private.config.json`文件的Git记录。这样,文件将不会被包含在未来的提交中,同时仍保留在您的工作区中。
72 28
|
5月前
|
JSON 数据格式 Python
Python实用记录(十四):python统计某个单词在TXT/JSON文件中出现的次数
这篇文章介绍了一个Python脚本,用于统计TXT或JSON文件中特定单词的出现次数。它包含两个函数,分别处理文本和JSON文件,并通过命令行参数接收文件路径、目标单词和文件格式。文章还提供了代码逻辑的解释和示例用法。
88 0
Python实用记录(十四):python统计某个单词在TXT/JSON文件中出现的次数
|
5月前
|
JSON 数据格式
LangChain-20 Document Loader 文件加载 加载MD DOCX EXCEL PPT PDF HTML JSON 等多种文件格式 后续可通过FAISS向量化 增强检索
LangChain-20 Document Loader 文件加载 加载MD DOCX EXCEL PPT PDF HTML JSON 等多种文件格式 后续可通过FAISS向量化 增强检索
420 2
|
5月前
|
JSON 数据格式 计算机视觉
Opencv实用笔记(一): 获取并绘制JSON标注文件目标区域(可单独保存目标小图)
本文介绍了如何使用OpenCV和Python根据JSON标注文件获取并绘制目标区域,同时可将裁剪的图像单独保存。通过示例代码,展示了如何读取图片路径、解析JSON标注、绘制标注框并保存裁剪图像的过程。此外,还提供了相关的博客链接,供读者进一步学习。
134 0
|
5月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
137 0
|
8天前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
121 79
|
5月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
112 0

热门文章

最新文章

推荐镜像

更多