Spark从入门到入土(三):MongoDB的集成

简介: 前面一篇中已经集成了对MongoDB的支持

完整pom文件


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.chinamobile.iot.meter</groupId>
    <artifactId>rsms-spark-parent</artifactId>
    <version>1.0</version>
    <packaging>pom</packaging>
    <!-- 声明公有的属性 -->
    <properties>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11.8</scala.version>
        <log4j.version>1.2.17</log4j.version>
        <slf4j.version>1.7.22</slf4j.version>
    </properties>
    <dependencies>
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <!-- Logging End -->
        <!-- Spark -->
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>2.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--Spark END-->
        <!-- MongoDB -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-mongodb</artifactId>
            <version>1.10.17.RELEASE</version>
        </dependency>
        <!--MongoDB END -->
    </dependencies>
    <modules>
        <module>rsms-spark-common</module>
        <module>rsms-alarm-task</module>
        <module>rsms-freeze-task</module>
    </modules>
</project>


MongoManager添加对事务的支持


package com.chinamobile.iot.meter.mongo;
import com.chinamobile.iot.meter.config.MongoConfig;
import com.mongodb.*;
import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import java.util.List;
/**
 * @Description Mongo客户端管理
 * @Author dbq
 * @Date 2019/5/7
 */
public class MongoManager {
    public static Logger logger = LoggerFactory.getLogger(MongoManager.class);
    private static MongoClient mongo = null;
    private MongoManager() {
    }
    static {
        System.out.println("---------------------------------------------->>>>>>>>>>>>>");
        initDBPrompties();
        logger.info("init mongodb client end.");
    }
    public static MongoDatabase getDB() {
        return mongo.getDatabase(MongoConfig.DB);
    }
    /**
     * 初始化连接池
     */
    private static void initDBPrompties() {
        // 其他参数根据实际情况进行添加
        try {
            mongo = new MongoClient(MongoConfig.HOST, MongoConfig.PORT);
        } catch (MongoException me) {
        }
    }
    public static boolean checkEmpty(String collection) {
        long count = getDB().getCollection(collection).countDocuments();
        return count == 0;
    }
    public static void saveToMongoWithoutTransaction(List<Document> datas, String collection) {
        Assert.notEmpty(datas, "集合不能为空");
        getDB().getCollection(collection).insertMany(datas);
    }
    public static void saveToMongo(List<Document> datas, String collection) {
        Assert.notEmpty(datas, "集合不能为空");
        TransactionOptions txnOptions = TransactionOptions.builder()
                .readPreference(ReadPreference.primary())
                .readConcern(ReadConcern.MAJORITY)
                .writeConcern(WriteConcern.MAJORITY)
                .build();
        try (ClientSession clientSession = mongo.startSession()) {
            clientSession.startTransaction(txnOptions);
            getDB().getCollection(collection).insertMany(clientSession, datas);
            commitWithRetry(clientSession);
        }
    }
    private static void commitWithRetry(ClientSession clientSession) {
        while (true) {
            try {
                clientSession.commitTransaction();
                logger.info("MongoDB Transaction committed");
                break;
            } catch (MongoException e) {
                // can retry commit
                if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
                    logger.error("UnknownTransactionCommitResult, retrying commit operation ...");
                    continue;
                } else {
                    logger.error("Exception during commit ...");
                    throw e;
                }
            }
        }
    }
}


这里碰到一个小插曲,根据MongoDB官网说明,在4.2版本规划了对分布式事务的支持。并且从4.0开始,支持事务的java驱动版本是3.8.0。


image.png


image.png


但是在mongoDB升级到4.2,驱动从3.10降为3.8之后,仍然出现了驱动版本不支持分片事务的错误,最后将3.8版本的驱动拷贝到spark的jars目录下之后问题解决。


image.png


image.png


相关文章
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
1178 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
10月前
|
分布式计算 Java 大数据
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
525 2
|
分布式计算 NoSQL 大数据
MongoDB 遇见 spark(进行整合)
这篇文章介绍了如何将MongoDB与Spark进行整合,包括MongoDB与HDFS的比较、大数据分层架构以及整合的源码示例。
273 1
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
523 1
|
NoSQL Java 关系型数据库
MongoDB保姆级指南(下):无缝集成SpringData框架,一篇最全面的Java接入指南!
前面的两篇文章已经将MongoDB大多数知识进行了阐述,不过其中的所有内容,都基于原生的MongoDB语法在操作。可是,在实际的日常开发过程中,我们并不会直接去接触MongoDB,毕竟MongoDB只能算作是系统内的一个组件,无法仅依靠它来搭建出一整套系统。
1100 1
|
NoSQL Java MongoDB
Spring Boot与MongoDB的集成应用
Spring Boot与MongoDB的集成应用
|
NoSQL Java MongoDB
如何在Spring Boot应用中集成MongoDB数据库
如何在Spring Boot应用中集成MongoDB数据库
|
SQL JSON 分布式计算
日志服务(SLS)集成 Spark 流计算实战
日志服务集成 Spark 流式计算:使用Spark Streaming和Structured Streaming对采集到日志服务中的数据进行消费,计算并将结果写回到日志服务。
8682 0
日志服务(SLS)集成 Spark 流计算实战
|
11月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
574 0
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
684 79

推荐镜像

更多