MongoDB Spark Connector 实战指南

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介: Why Spark with MongoDB?高性能,官方号称 100x faster,因为可以全内存运行,性能提升肯定是很明显的简单易用,支持 Java、Python、Scala、SQL 等多种语言,使得构建分析应用非常简单统一构建 ,支持多种数据源,通过 Spark RDD 屏蔽底层数据差异,同一个分析应用可运行于不同的数据源;应用场景广泛,能同时支持批处理以及流式处理MongoDB Spark Connector 为官方推出,用于适配 Spark 操作 MongoDB 数据;本文以 Python 为例,介绍 MongoDB Spark Connector 的使用,帮助你基于 M

Why Spark with MongoDB?
高性能,官方号称 100x faster,因为可以全内存运行,性能提升肯定是很明显的
简单易用,支持 Java、Python、Scala、SQL 等多种语言,使得构建分析应用非常简单
统一构建 ,支持多种数据源,通过 Spark RDD 屏蔽底层数据差异,同一个分析应用可运行于不同的数据源;
应用场景广泛,能同时支持批处理以及流式处理
MongoDB Spark Connector 为官方推出,用于适配 Spark 操作 MongoDB 数据;本文以 Python 为例,介绍 MongoDB Spark Connector 的使用,帮助你基于 MongoDB 构建第一个分析应用。

准备 MongoDB 环境
安装 MongoDB 参考 Install MongoDB Community Edition on Linux

mkdir mongodata
mongod --dbpath mongodata --port 9555
准备 Spark python 环境
参考 PySpark - Quick Guide

下载 Spark

cd /home/mongo-spark
wget
tar zxvf spark-2.4.4-bin-hadoop2.7.tgz
设置 Spark 环境变量

export SPARK_HOME=/home/mongo-spark/spark-2.4.4-bin-hadoop2.7
export PATH=$PATH:/home/mongo-spark/spark-2.4.4-bin-hadoop2.7/bin
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH=$SPARK_HOME/python:$PATH
运行 Spark RDD 示例

count.py

from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
counts = words.count()

$SPARK_HOME/bin/spark-submit count.py
Number of elements in RDD → 8
如果上述程序运行成功,说明 Spark python 环境准备成功,还可以测试 Spark 的其他 RDD 操作,比如 collector、filter、map、reduce、join 等,更多买QQ示例参考 PySpark - Quick Guide

Spark 操作 MongoDB 数据
参考 Spark Connector Python Guide

准备测试数据 test.coll01 插入3条测试数据,test.coll02 未空

mongo --port 9555

db.coll01.find()
{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }
db.coll02.find()
准备操作脚本,将输入集合的数据按条件进行过滤,写到输出集合

mongo-spark-test.py

from pyspark.sql import SparkSession

Create Spark Session

spark = SparkSession \

.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1:9555/test.coll01") \
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1:9555/test.coll") \
.getOrCreate()

Read from MongoDB

df = spark.read.format("mongo").load()
df.show()

Filter and Write

df.filter(df['qty'] >= 10).write.format("mongo").mode("append").save()

Use SQL

df.createOrReplaceTempView("temp")

some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")

some_fruit.show()

运行脚本

$SPARK_HOME/bin/spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 mongo-spark-test.py

mongo --port 9555

db.coll02.find()
{ "_id" : 2, "qty" : 10, "type" : "orange" }
{ "_id" : 3, "qty" : 15, "type" : "banana" }
相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。   相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
目录
相关文章
|
20天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
55 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
分布式计算 NoSQL 大数据
MongoDB 遇见 spark(进行整合)
这篇文章介绍了如何将MongoDB与Spark进行整合,包括MongoDB与HDFS的比较、大数据分层架构以及整合的源码示例。
45 1
|
3月前
|
运维 监控 NoSQL
【MongoDB 复制集秘籍】Secondary 同步慢怎么办?深度解析与实战指南,让你的数据库飞速同步!
【8月更文挑战第24天】本文通过一个具体案例探讨了MongoDB复制集中Secondary成员同步缓慢的问题。现象表现为数据延迟增加,影响业务运行。经分析,可能的原因包括硬件资源不足、网络状况不佳、复制日志错误等。解决策略涵盖优化硬件(如增加内存、升级CPU)、调整网络配置以减少延迟以及优化MongoDB配置(例如调整`oplogSize`、启用压缩)。通过这些方法可有效提升同步效率,保证系统的稳定性和性能。
92 4
|
6月前
|
Oracle NoSQL 关系型数据库
实时计算 Flink版产品使用合集之MongoDB CDC connector的全量快照功能可以并发读取吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
118 2
|
3月前
|
JSON NoSQL MongoDB
MongoDB Schema设计实战指南:优化数据结构,提升查询性能与数据一致性
【8月更文挑战第24天】MongoDB是一款领先的NoSQL数据库,其灵活的文档模型突破了传统关系型数据库的限制。它允许自定义数据结构,适应多样化的数据需求。设计MongoDB的Schema时需考虑数据访问模式、一致性需求及性能因素。设计原则强调简洁性、查询优化与合理使用索引。例如,在构建博客系统时,可以通过精心设计文章和用户的集合结构来提高查询效率并确保数据一致性。正确设计能够充分发挥MongoDB的优势,实现高效的数据管理。
64 3
|
3月前
|
存储 NoSQL JavaScript
MongoDB存储过程实战:聚合框架、脚本、最佳实践,一文全掌握!
【8月更文挑战第24天】MongoDB是一款备受欢迎的文档型NoSQL数据库,以灵活的数据模型和强大功能著称。尽管其存储过程支持不如传统关系型数据库,本文深入探讨了MongoDB在此方面的最佳实践。包括利用聚合框架处理复杂业务逻辑、封装业务逻辑提高复用性、运用JavaScript脚本实现类似存储过程的功能以及考虑集成其他工具提升数据处理能力。通过示例代码展示如何创建订单处理集合并定义验证规则,虽未直接实现存储过程,但有效地演示了如何借助JavaScript脚本处理业务逻辑,为开发者提供更多实用指导。
71 2
|
3月前
|
NoSQL Java 测试技术
5-MongoDB实战演练
本文档详细介绍了如何使用MongoDB实现头条文章的评论系统。主要功能包括基本的增删改查API、根据文章ID查询评论、以及评论的点赞功能。文章分析了表结构设计,明确了各字段的意义,并给出了具体的字段类型。技术选型方面,文档推荐使用mongodb-driver作为Java连接MongoDB的驱动包,同时介绍了Spring Data MongoDB这一更高层次的持久层框架。此外,文档还提供了搭建文章微服务模块的具体步骤,包括项目工程的搭建、实体类的编写、索引的添加方式等,并展示了如何使用MongoTemplate实现评论点赞功能。
|
3月前
|
安全 C# 数据安全/隐私保护
WPF安全加固全攻略:从数据绑定到网络通信,多维度防范让你的应用固若金汤,抵御各类攻击
【8月更文挑战第31天】安全性是WPF应用程序开发中不可或缺的一部分。本文从技术角度探讨了WPF应用面临的多种安全威胁及防护措施。通过严格验证绑定数据、限制资源加载来源、实施基于角色的权限管理和使用加密技术保障网络通信安全,可有效提升应用安全性,增强用户信任。例如,使用HTML编码防止XSS攻击、检查资源签名确保其可信度、定义安全策略限制文件访问权限,以及采用HTTPS和加密算法保护数据传输。这些措施有助于全面保障WPF应用的安全性。
52 0
|
3月前
|
C# 开发者 Windows
全面指南:WPF无障碍设计从入门到精通——让每一个用户都能无障碍地享受你的应用,从自动化属性到焦点导航的最佳实践
【8月更文挑战第31天】为了确保Windows Presentation Foundation (WPF) 应用程序对所有用户都具备无障碍性,开发者需关注无障碍设计原则。这不仅是法律要求,更是社会责任,旨在让技术更人性化,惠及包括视障、听障及行动受限等用户群体。
81 0
|
5月前
|
存储 NoSQL MongoDB
MongoDB实战面试指南:常见问题一网打尽
MongoDB实战面试指南:常见问题一网打尽