MongoDB Spark Connector 实战指南

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Why Spark with MongoDB?高性能,官方号称 100x faster,因为可以全内存运行,性能提升肯定是很明显的简单易用,支持 Java、Python、Scala、SQL 等多种语言,使得构建分析应用非常简单统一构建 ,支持多种数据源,通过 Spark RDD 屏蔽...

Why Spark with MongoDB?

  1. 高性能,官方号称 100x faster,因为可以全内存运行,性能提升肯定是很明显的
  2. 简单易用,支持 Java、Python、Scala、SQL 等多种语言,使得构建分析应用非常简单
  3. 统一构建 ,支持多种数据源,通过 Spark RDD 屏蔽底层数据差异,同一个分析应用可运行于不同的数据源;
  4. 应用场景广泛,能同时支持批处理以及流式处理

MongoDB Spark Connector 为官方推出,用于适配 Spark 操作 MongoDB 数据;本文以 Python 为例,介绍 MongoDB Spark Connector 的使用,帮助你基于 MongoDB 构建第一个分析应用。

准备 MongoDB 环境

安装 MongoDB 参考 Install MongoDB Community Edition on Linux

mkdir mongodata
mongod --dbpath mongodata --port 9555

准备 Spark python 环境

参考 PySpark - Quick Guide

下载 Spark

cd /home/mongo-spark
wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
tar zxvf spark-2.4.4-bin-hadoop2.7.tgz

设置 Spark 环境变量

export SPARK_HOME=/home/mongo-spark/spark-2.4.4-bin-hadoop2.7
export PATH=$PATH:/home/mongo-spark/spark-2.4.4-bin-hadoop2.7/bin
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH=$SPARK_HOME/python:$PATH

运行 Spark RDD 示例

# count.py
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()

$SPARK_HOME/bin/spark-submit count.py
Number of elements in RDD → 8  

如果上述程序运行成功,说明 Spark python 环境准备成功,还可以测试 Spark 的其他 RDD 操作,比如 collector、filter、map、reduce、join 等,更多示例参考 PySpark - Quick Guide

Spark 操作 MongoDB 数据

参考 Spark Connector Python Guide

准备测试数据 test.coll01 插入3条测试数据,test.coll02 未空

mongo --port 9555

> db.coll01.find()
{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }
> db.coll02.find()

准备操作脚本,将输入集合的数据按条件进行过滤,写到输出集合

# mongo-spark-test.py
from pyspark.sql import SparkSession

# Create Spark Session

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:9555/test.coll01") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1:9555/test.coll") \
    .getOrCreate()


# Read from MongoDB
df = spark.read.format("mongo").load()
df.show()

# Filter and Write
df.filter(df['qty'] >= 10).write.format("mongo").mode("append").save()    

# Use SQL 
# df.createOrReplaceTempView("temp")
# some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
# some_fruit.show()

运行脚本


$SPARK_HOME/bin/spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 mongo-spark-test.py

mongo --port 9555

> db.coll02.find()
{ "_id" : 2, "qty" : 10, "type" : "orange" }
{ "_id" : 3, "qty" : 15, "type" : "banana" }

相关文章
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
1142 0
|
分布式计算 NoSQL 大数据
MongoDB 遇见 spark(进行整合)
这篇文章介绍了如何将MongoDB与Spark进行整合,包括MongoDB与HDFS的比较、大数据分层架构以及整合的源码示例。
149 1
|
存储 SQL 缓存
Hadoop入门(一篇就够了)
Hadoop入门(一篇就够了)
27612 4
Hadoop入门(一篇就够了)
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
745 0
|
SQL 机器学习/深度学习 分布式计算
|
Linux Docker 容器
Centos安装docker(linux安装docker)——超详细小白可操作手把手教程,包好用!!!
本篇博客重在讲解Centos安装docker,经博主多次在不同服务器上测试,极其的稳定,尤其是阿里的服务器,一路复制命令畅通无阻。
19947 5
Centos安装docker(linux安装docker)——超详细小白可操作手把手教程,包好用!!!
|
存储 分布式计算 Kubernetes
Spark+Celeborn:更快,更稳,更弹性
本文整理自阿里云 EMR Spark 团队的周克勇(一锤),在 Spark&DS Meetup 的分享。
69511 0
Spark+Celeborn:更快,更稳,更弹性
|
存储 监控 关系型数据库
DataX 概述、部署、数据同步运用示例
DataX是阿里巴巴开源的离线数据同步工具,支持多种数据源之间的高效传输。其特点是多数据源支持、可扩展性、灵活配置、高效传输、任务调度监控和活跃的开源社区支持。DataX通过Reader和Writer插件实现数据源的读取和写入,采用Framework+plugin架构。部署简单,解压即可用。示例展示了如何配置DataX同步MySQL到HDFS,并提供了速度和内存优化建议。此外,还解决了NULL值同步问题及配置文件变量传参的方法。
8648 5
|
SQL 分布式计算 Hadoop
Spark与Hadoop的关系和区别
Spark与Hadoop的关系和区别
|
SQL JSON 分布式计算
Spark 【Spark SQL(一)DataFrame的创建、保存与基本操作】
Spark 【Spark SQL(一)DataFrame的创建、保存与基本操作】