使用Spark高效将数据从Hive写入Redis (功能最全)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 使用Spark高效将数据从Hive写入Redis (功能最全)

使用Spark高效将数据从Hive写入Redis(功能最全)

在大数据时代,不同存储和处理系统之间高效地传输数据至关重要。Apache Spark作为一款强大的分布式计算框架,能够实现各种数据源和目的地之间的无缝集成。在本篇博文中,我们将探讨如何利用Spark从Hive读取数据并高效地写入Redis,这是一种流行的内存数据存储。

问题介绍

在实际场景中,经常需要将存储在Hive表中的数据用于实时应用。Redis以其高性能和灵活的数据结构成为了这类用例的理想选择。然而,在保持效率和可靠性的同时,将数据从Hive传输到Redis可能具有挑战性。

代码解析

我们的应用程序主要包括以下几个关键步骤:


1、导入所需库和模块:首先,导入需要类库。这些库包括用于Spark操作和与Redis交互的相关工具。

import java.net.URLDecoder
import java.text.SimpleDateFormat
import java.util
import java.util.regex.Pattern
import java.util.{Base64, Calendar, Date}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import redis.clients.jedis.{HostAndPort, JedisCluster}

2、定义主对象和全局变量:我们定义了一个名为importRedis的主对象,并声明了一些全局变量,用于存储从命令行传递的参数和配置信息,这些参数是执行任务时所需要参数。

object importRedis extends Logging{
  var redisAddr = ""
  var sql = ""
  var timeCategory = ""
  var redisPoolConf:String =_
  var interval:Int = _
  var redisPort:Int = _
  var setTtl:Boolean = _
  var currentTime:String = _


3、解析命令行参数:我们使用Scala的模式匹配来解析命令行传入的参数,并根据参数类型将其赋值给相应的全局变量。

    args.sliding(2, 2).toList.collect {
      case Array("--sql", argSql:String) => sql = getDecodeSql(argSql)
      case Array("--redisAddr", argRedisAddr: String) => redisAddr = argRedisAddr
      case Array("--interval", argInterval: String) => interval = argInterval.toInt
      case Array("--timeCategory",argTimeCategory:String) => timeCategory = argTimeCategory
      case Array("--redisPort",argRedisPort:String) => redisPort = argRedisPort.toInt
      case Array("--redisPoolConf",argRedisPoolConf:String) => redisPoolConf = argRedisPoolConf
      case Array("--setTtl",argSetTtl:String) => setTtl = argSetTtl.toBoolean
      case Array("--currentTime",argCurrentTime:String) => currentTime = argCurrentTime.toString.replaceAll("@"," ")
    }
    assert(sql != null, "should set --sql")
    assert(redisAddr != null, "should set --redisAddr")
    logInfo(s"sql: ${sql}")
    println(s"${redisAddr}:${interval}:${timeCategory}:${redisPort}:${redisPoolConf}:${setTtl}:${currentTime}")
    
    val mapBroad = new util.HashMap[String, String]()
    mapBroad.put("redisAddr", redisAddr);
    mapBroad.put("timeCategory", timeCategory)
    mapBroad.put("interval", interval.toString)
    mapBroad.put("redisPort", redisPort.toString)
    mapBroad.put("setTtl", setTtl.toString)
    mapBroad.put("currentTime", currentTime)


4、建立Spark会话:我们使用SparkSession建立了与Spark的连接,并启用了对Hive的支持。

    val spark = SparkSession.builder()
      .enableHiveSupport()
      .appName("importRedis")
      .getOrCreate()


5、广播变量传递配置信息:我们创建了一个广播变量,用于在Spark集群中广播Redis的配置信息。

    val broadCast = spark.sparkContext.broadcast(mapBroad)


6、读取数据并进行预处理:我们使用Spark SQL执行预定义的SQL语句从Hive中读取数据,并进行必要的预处理。

    val spark = SparkSession.builder()
      .enableHiveSupport()
      .appName("importRedis")
      .getOrCreate()
    val broadCast = spark.sparkContext.broadcast(mapBroad)
    val sparkDF = spark.sql(sql)
    val names: Array[String] = sparkDF.columns
    if(!names.contains("key") || !names.contains("value")){
      throw new Exception("请根据提示设置字段名称!")
    }
    if(sparkDF.schema.size > 3){
      throw new Exception("字段不能超过三个")
    }
    if(sparkDF.schema.size == 3){
      if(!names.contains("ttl")){
        throw new Exception("请根据提示设置时间别名")
      }
    }


7、写入数据到Redis:我们使用JedisCluster连接Redis,同时接受广播参入的redis配置信息,将数据写入相应的Redis键中。

s    sparkDF
      .javaRDD
      .foreachPartition(
        data =>{
          val map: util.HashMap[String, String] = broadCast.value
          val cluster = new JedisCluster(new HostAndPort(map.get("redisAddr"), map.get("redisPort").toInt))
          while (data.hasNext){
            val row:Row = data.next()
            val key = if (row.getAs("key") == null) "NULl" else row.getAs("key").toString
            val value = if (row.getAs("value") == null) "NULL" else row.getAs("value").toString
            cluster.set(key,value)
            if(map.get("setTtl").toBoolean){
              if (row.size==3){
                val lastActiveDt = row.getAs[String]("ttl")
                if(lastActiveDt != null){
                  setExpireTime(key,lastActiveDt,map.get("interval").toInt,map.get("timeCategory"),cluster)
                }
              }else if(row.size==2){
                setExpireTime(key,map.get("currentTime"),map.get("interval").toInt,map.get("timeCategory"),cluster)
              }
            }
          }
          }
      )


8、其他功能实现:我们还包括了一些其他功能,比如解析SQL语句、处理时间相关操作等。

  def setExpireTime(key: String, expireTime: String, interval: Int, formatter: String,jedisCluster:JedisCluster): Unit = {
    val dateMap: util.HashMap[String, Integer] = new util.HashMap[String, Integer]
    dateMap.put("day", Calendar.DATE)
    dateMap.put("hour", Calendar.HOUR)
    dateMap.put("minute", Calendar.MINUTE)
    dateMap.put("second",Calendar.SECOND)

    val calendarInstance: Calendar = Calendar.getInstance
    val simpleDateFormat: SimpleDateFormat = new SimpleDateFormat(getFormatter(expireTime))
    try {
      val parsedDate: Date = simpleDateFormat.parse(expireTime)
      calendarInstance.setTime(parsedDate)
      calendarInstance.add(dateMap.get(formatter), interval)
      val transformatedDate: Long = calendarInstance.getTime.toInstant.toEpochMilli
      jedisCluster.pexpireAt(key, transformatedDate)
    } catch {
      case e: Exception =>
        jedisCluster.del(key)
        throw new Exception("时间解析异常")
    }
  }

  def getFormatter(datetime: String): String = if (Pattern.matches("\\s*\\d{4}-\\d{2}-\\d{2}\\s*", datetime)) "yyyy-MM-dd" else "yyyy-MM-dd HH:mm:ss"

总结

通过以上步骤,我们成功地开发了一个Spark应用程序,能够高效地将数据从Hive写入Redis。我们使用了Spark的分布式计算能力和Jedis库的灵活性,使得数据传输过程稳健高效。通过深入理解每个步骤的实现原理,我们可以更好地应用这些技术解决实际问题。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
1
1
0
15
分享
相关文章
Redis应用—2.在列表数据里的应用
本文介绍了基于数据库和缓存双写的分享贴功能设计,包括:基于数据库 + 缓存双写的分享贴功能、查询分享贴列表缓存时的延迟构建、分页列表惰性缓存方案、用户分享贴列表数据按页缓存实现精准过期控制、用户分享贴列表的分页缓存异步更新、数据库与缓存的分页数据一致性方案、热门用户分享贴列表的分页缓存失效时消除并发线程串行等待锁的影响。总结:该设计通过合理的缓存策略和异步处理机制,有效提升了系统性能,降低了内存占用,并确保了数据的一致性和高可用性。
Redis应用—2.在列表数据里的应用
如何在IDE中通过Spark操作Hive
通过以上方法和代码示例,你可以在IDE中成功通过Spark操作Hive,实现大规模数据处理和分析。确保理解每一步的实现细节,应用到实际项目中时能有效地处理各种复杂的数据场景。
62 28
基于云服务器的数仓搭建-hive/spark安装
本文介绍了在本地安装和配置MySQL、Hive及Spark的过程。主要内容包括: - **MySQL本地安装**:详细描述了内存占用情况及安装步骤,涉及安装脚本的编写与执行,以及连接MySQL的方法。 - **Hive安装**:涵盖了从上传压缩包到配置环境变量的全过程,并解释了如何将Hive元数据存储配置到MySQL中。 - **Hive与Spark集成**:说明了如何安装Spark并将其与Hive集成,确保Hive任务由Spark执行,同时解决了依赖冲突问题。 - **常见问题及解决方法**:列举了安装过程中可能遇到的问题及其解决方案,如内存配置不足、节点间通信问题等。
118 0
基于云服务器的数仓搭建-hive/spark安装
Redis应用—1.在用户数据里的应用
本文主要介绍了社区电商的业务闭环及Redis缓存架构中遇到的典型生产问题及其解决方案。通过介绍的设计和优化,社区电商平台能够在高并发读取和少量写入的情况下,保持高性能和数据一致性。
Redis应用—1.在用户数据里的应用
Redis经典问题:数据并发竞争
数据并发竞争是大流量系统(如火车票系统、微博平台)中常见的问题,可能导致用户体验下降甚至系统崩溃。本文介绍了两种解决方案:1) 加写回操作加互斥锁,查询失败快速返回默认值;2) 保持多个缓存备份,减少并发竞争概率。通过实践案例展示,成功提高了系统的稳定性和性能。
Redis经典问题:数据不一致
在使用Redis时,缓存与数据库数据不一致会导致应用异常。主要原因包括缓存更新失败、Rehash异常等。解决方案有:重试机制、缩短缓存时间、优化写入策略、建立监控报警、定期验证一致性、采用缓存分层及数据回滚恢复机制。这些措施可确保数据最终一致性,提升应用稳定性和性能。
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
104 14
|
4月前
|
Redis分片集群中数据是怎么存储和读取的 ?
Redis集群采用哈希槽分区算法,共有16384个哈希槽,每个槽分配到不同的Redis节点上。数据操作时,通过CRC16算法对key计算并取模,确定其所属的槽和对应的节点,从而实现高效的数据存取。
97 13
|
4月前
|
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
78 5
Redis应用—8.相关的缓存框架
本文介绍了Ehcache和Guava Cache两个缓存框架及其使用方法,以及如何自定义缓存。主要内容包括:Ehcache缓存框架、Guava Cache缓存框架、自定义缓存。总结:Ehcache适合用作本地缓存或与Redis结合使用,Guava Cache则提供了更灵活的缓存管理和更高的并发性能。自定义缓存可以根据具体需求选择不同的数据结构和引用类型来实现特定的缓存策略。
Redis应用—8.相关的缓存框架
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等