使用Spark高效将数据从Hive写入Redis (功能最全)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 使用Spark高效将数据从Hive写入Redis (功能最全)

使用Spark高效将数据从Hive写入Redis(功能最全)

在大数据时代,不同存储和处理系统之间高效地传输数据至关重要。Apache Spark作为一款强大的分布式计算框架,能够实现各种数据源和目的地之间的无缝集成。在本篇博文中,我们将探讨如何利用Spark从Hive读取数据并高效地写入Redis,这是一种流行的内存数据存储。

问题介绍

在实际场景中,经常需要将存储在Hive表中的数据用于实时应用。Redis以其高性能和灵活的数据结构成为了这类用例的理想选择。然而,在保持效率和可靠性的同时,将数据从Hive传输到Redis可能具有挑战性。

代码解析

我们的应用程序主要包括以下几个关键步骤:


1、导入所需库和模块:首先,导入需要类库。这些库包括用于Spark操作和与Redis交互的相关工具。

import java.net.URLDecoder
import java.text.SimpleDateFormat
import java.util
import java.util.regex.Pattern
import java.util.{Base64, Calendar, Date}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import redis.clients.jedis.{HostAndPort, JedisCluster}

2、定义主对象和全局变量:我们定义了一个名为importRedis的主对象,并声明了一些全局变量,用于存储从命令行传递的参数和配置信息,这些参数是执行任务时所需要参数。

object importRedis extends Logging{
  var redisAddr = ""
  var sql = ""
  var timeCategory = ""
  var redisPoolConf:String =_
  var interval:Int = _
  var redisPort:Int = _
  var setTtl:Boolean = _
  var currentTime:String = _


3、解析命令行参数:我们使用Scala的模式匹配来解析命令行传入的参数,并根据参数类型将其赋值给相应的全局变量。

    args.sliding(2, 2).toList.collect {
      case Array("--sql", argSql:String) => sql = getDecodeSql(argSql)
      case Array("--redisAddr", argRedisAddr: String) => redisAddr = argRedisAddr
      case Array("--interval", argInterval: String) => interval = argInterval.toInt
      case Array("--timeCategory",argTimeCategory:String) => timeCategory = argTimeCategory
      case Array("--redisPort",argRedisPort:String) => redisPort = argRedisPort.toInt
      case Array("--redisPoolConf",argRedisPoolConf:String) => redisPoolConf = argRedisPoolConf
      case Array("--setTtl",argSetTtl:String) => setTtl = argSetTtl.toBoolean
      case Array("--currentTime",argCurrentTime:String) => currentTime = argCurrentTime.toString.replaceAll("@"," ")
    }
    assert(sql != null, "should set --sql")
    assert(redisAddr != null, "should set --redisAddr")
    logInfo(s"sql: ${sql}")
    println(s"${redisAddr}:${interval}:${timeCategory}:${redisPort}:${redisPoolConf}:${setTtl}:${currentTime}")
    
    val mapBroad = new util.HashMap[String, String]()
    mapBroad.put("redisAddr", redisAddr);
    mapBroad.put("timeCategory", timeCategory)
    mapBroad.put("interval", interval.toString)
    mapBroad.put("redisPort", redisPort.toString)
    mapBroad.put("setTtl", setTtl.toString)
    mapBroad.put("currentTime", currentTime)


4、建立Spark会话:我们使用SparkSession建立了与Spark的连接,并启用了对Hive的支持。

    val spark = SparkSession.builder()
      .enableHiveSupport()
      .appName("importRedis")
      .getOrCreate()


5、广播变量传递配置信息:我们创建了一个广播变量,用于在Spark集群中广播Redis的配置信息。

    val broadCast = spark.sparkContext.broadcast(mapBroad)


6、读取数据并进行预处理:我们使用Spark SQL执行预定义的SQL语句从Hive中读取数据,并进行必要的预处理。

    val spark = SparkSession.builder()
      .enableHiveSupport()
      .appName("importRedis")
      .getOrCreate()
    val broadCast = spark.sparkContext.broadcast(mapBroad)
    val sparkDF = spark.sql(sql)
    val names: Array[String] = sparkDF.columns
    if(!names.contains("key") || !names.contains("value")){
      throw new Exception("请根据提示设置字段名称!")
    }
    if(sparkDF.schema.size > 3){
      throw new Exception("字段不能超过三个")
    }
    if(sparkDF.schema.size == 3){
      if(!names.contains("ttl")){
        throw new Exception("请根据提示设置时间别名")
      }
    }


7、写入数据到Redis:我们使用JedisCluster连接Redis,同时接受广播参入的redis配置信息,将数据写入相应的Redis键中。

s    sparkDF
      .javaRDD
      .foreachPartition(
        data =>{
          val map: util.HashMap[String, String] = broadCast.value
          val cluster = new JedisCluster(new HostAndPort(map.get("redisAddr"), map.get("redisPort").toInt))
          while (data.hasNext){
            val row:Row = data.next()
            val key = if (row.getAs("key") == null) "NULl" else row.getAs("key").toString
            val value = if (row.getAs("value") == null) "NULL" else row.getAs("value").toString
            cluster.set(key,value)
            if(map.get("setTtl").toBoolean){
              if (row.size==3){
                val lastActiveDt = row.getAs[String]("ttl")
                if(lastActiveDt != null){
                  setExpireTime(key,lastActiveDt,map.get("interval").toInt,map.get("timeCategory"),cluster)
                }
              }else if(row.size==2){
                setExpireTime(key,map.get("currentTime"),map.get("interval").toInt,map.get("timeCategory"),cluster)
              }
            }
          }
          }
      )


8、其他功能实现:我们还包括了一些其他功能,比如解析SQL语句、处理时间相关操作等。

  def setExpireTime(key: String, expireTime: String, interval: Int, formatter: String,jedisCluster:JedisCluster): Unit = {
    val dateMap: util.HashMap[String, Integer] = new util.HashMap[String, Integer]
    dateMap.put("day", Calendar.DATE)
    dateMap.put("hour", Calendar.HOUR)
    dateMap.put("minute", Calendar.MINUTE)
    dateMap.put("second",Calendar.SECOND)

    val calendarInstance: Calendar = Calendar.getInstance
    val simpleDateFormat: SimpleDateFormat = new SimpleDateFormat(getFormatter(expireTime))
    try {
      val parsedDate: Date = simpleDateFormat.parse(expireTime)
      calendarInstance.setTime(parsedDate)
      calendarInstance.add(dateMap.get(formatter), interval)
      val transformatedDate: Long = calendarInstance.getTime.toInstant.toEpochMilli
      jedisCluster.pexpireAt(key, transformatedDate)
    } catch {
      case e: Exception =>
        jedisCluster.del(key)
        throw new Exception("时间解析异常")
    }
  }

  def getFormatter(datetime: String): String = if (Pattern.matches("\\s*\\d{4}-\\d{2}-\\d{2}\\s*", datetime)) "yyyy-MM-dd" else "yyyy-MM-dd HH:mm:ss"

总结

通过以上步骤,我们成功地开发了一个Spark应用程序,能够高效地将数据从Hive写入Redis。我们使用了Spark的分布式计算能力和Jedis库的灵活性,使得数据传输过程稳健高效。通过深入理解每个步骤的实现原理,我们可以更好地应用这些技术解决实际问题。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
相关文章
|
4月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
4月前
|
数据采集 存储 NoSQL
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
274 67
|
1月前
|
存储 NoSQL Redis
采用Redis的Bitmaps实现类似Github连续提交状态的功能。
在现实世界的应用开发中,实现类似于Github提交跟踪系统时,还可能需要考虑用户时区、闰年等日期相关的边界条件,以及辅助数据的存储和查询优化,例如对活跃用户的即时查询和统计等。不过这些都可以在Bitmaps的基础功能之上通过额外的代码逻辑来实现。
59 0
|
4月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
161 32
|
4月前
|
存储 监控 NoSQL
使用Redis实现延迟消息发送功能
使用 Redis 的密码认证功能,为实例设置密码以防止未授权访问。为消息提供适当加密,确保消息内容在网络传输过程中不被窃取或篡改。
160 16
|
3月前
|
存储 缓存 NoSQL
告别数据僵尸!Redis实现自动清理过期键值对
在数据激增的时代,Redis如同内存管理的智能管家,支持键值对的自动过期功能,实现“数据保鲜”。通过`EXPIRE`设定生命倒计时、`TTL`查询剩余时间,结合惰性删除与定期清理策略,Redis高效维护内存秩序。本文以Python实战演示其过期机制,并提供最佳实践指南,助你掌握数据生命周期管理的艺术,让数据优雅退场。
241 0
|
6月前
|
存储 NoSQL 算法
Redis分片集群中数据是怎么存储和读取的 ?
Redis集群采用的算法是哈希槽分区算法。Redis集群中有16384个哈希槽(槽的范围是 0 -16383,哈希槽),将不同的哈希槽分布在不同的Redis节点上面进行管理,也就是说每个Redis节点只负责一部分的哈希槽。在对数据进行操作的时候,集群会对使用CRC16算法对key进行计算并对16384取模(slot = CRC16(key)%16383),得到的结果就是 Key-Value 所放入的槽,通过这个值,去找到对应的槽所对应的Redis节点,然后直接到这个对应的节点上进行存取操作
|
6月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
1. 先更新Mysql,再更新Redis,如果更新Redis失败,可能仍然不⼀致 2. 先删除Redis缓存数据,再更新Mysql,再次查询的时候在将数据添加到缓存中 这种⽅案能解决1 ⽅案的问题,但是在⾼并发下性能较低,⽽且仍然会出现数据不⼀致的问题,⽐如线程1删除了 Redis缓存数据,正在更新Mysql,此时另外⼀个查询再查询,那么就会把Mysql中⽼数据⼜查到 Redis中 1. 使用MQ异步同步, 保证数据的最终一致性 我们项目中会根据业务情况 , 使用不同的方案来解决Redis和Mysql的一致性问题 : 1. 对于一些一致性要求不高的场景 , 不做处理例如 : 用户行为数据 ,
|
6月前
|
NoSQL Redis
Redis的数据淘汰策略有哪些 ?
Redis 提供 8 种数据淘汰策略: 淘汰易失数据(具有过期时间的数据) 1. volatile-lru(least recently used):从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰 2. volatile-lfu(least frequently used):从已设置过期时间的数据集(server.db[i].expires)中挑选最不经常使用的数据淘汰 3. volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰 4. volatile-random:从已设置过期
|
6月前
|
NoSQL Redis
Redis的数据持久化策略有哪些 ?
Redis 提供了两种方式,实现数据的持久化到硬盘。 1. RDB 持久化(全量),是指在指定的时间间隔内将内存中的数据集快照写入磁盘。 2. AOF持久化(增量),以日志的形式记录服务器所处理的每一个写、删除操作 RDB和AOF一起使用, 在Redis4.0版本支持混合持久化方式 ( 设置 aof-use-rdb-preamble yes )

热门文章

最新文章