使用Spark高效将数据从Hive写入Redis(功能最全)
在大数据时代,不同存储和处理系统之间高效地传输数据至关重要。Apache Spark作为一款强大的分布式计算框架,能够实现各种数据源和目的地之间的无缝集成。在本篇博文中,我们将探讨如何利用Spark从Hive读取数据并高效地写入Redis,这是一种流行的内存数据存储。
问题介绍
在实际场景中,经常需要将存储在Hive表中的数据用于实时应用。Redis以其高性能和灵活的数据结构成为了这类用例的理想选择。然而,在保持效率和可靠性的同时,将数据从Hive传输到Redis可能具有挑战性。
代码解析
我们的应用程序主要包括以下几个关键步骤:
1、导入所需库和模块:首先,导入需要类库。这些库包括用于Spark操作和与Redis交互的相关工具。
import java.net.URLDecoder import java.text.SimpleDateFormat import java.util import java.util.regex.Pattern import java.util.{Base64, Calendar, Date} import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import redis.clients.jedis.{HostAndPort, JedisCluster}
2、定义主对象和全局变量:我们定义了一个名为importRedis的主对象,并声明了一些全局变量,用于存储从命令行传递的参数和配置信息,这些参数是执行任务时所需要参数。
object importRedis extends Logging{ var redisAddr = "" var sql = "" var timeCategory = "" var redisPoolConf:String =_ var interval:Int = _ var redisPort:Int = _ var setTtl:Boolean = _ var currentTime:String = _
3、解析命令行参数:我们使用Scala的模式匹配来解析命令行传入的参数,并根据参数类型将其赋值给相应的全局变量。
args.sliding(2, 2).toList.collect { case Array("--sql", argSql:String) => sql = getDecodeSql(argSql) case Array("--redisAddr", argRedisAddr: String) => redisAddr = argRedisAddr case Array("--interval", argInterval: String) => interval = argInterval.toInt case Array("--timeCategory",argTimeCategory:String) => timeCategory = argTimeCategory case Array("--redisPort",argRedisPort:String) => redisPort = argRedisPort.toInt case Array("--redisPoolConf",argRedisPoolConf:String) => redisPoolConf = argRedisPoolConf case Array("--setTtl",argSetTtl:String) => setTtl = argSetTtl.toBoolean case Array("--currentTime",argCurrentTime:String) => currentTime = argCurrentTime.toString.replaceAll("@"," ") } assert(sql != null, "should set --sql") assert(redisAddr != null, "should set --redisAddr") logInfo(s"sql: ${sql}") println(s"${redisAddr}:${interval}:${timeCategory}:${redisPort}:${redisPoolConf}:${setTtl}:${currentTime}") val mapBroad = new util.HashMap[String, String]() mapBroad.put("redisAddr", redisAddr); mapBroad.put("timeCategory", timeCategory) mapBroad.put("interval", interval.toString) mapBroad.put("redisPort", redisPort.toString) mapBroad.put("setTtl", setTtl.toString) mapBroad.put("currentTime", currentTime)
4、建立Spark会话:我们使用SparkSession建立了与Spark的连接,并启用了对Hive的支持。
val spark = SparkSession.builder() .enableHiveSupport() .appName("importRedis") .getOrCreate()
5、广播变量传递配置信息:我们创建了一个广播变量,用于在Spark集群中广播Redis的配置信息。
val broadCast = spark.sparkContext.broadcast(mapBroad)
6、读取数据并进行预处理:我们使用Spark SQL执行预定义的SQL语句从Hive中读取数据,并进行必要的预处理。
val spark = SparkSession.builder() .enableHiveSupport() .appName("importRedis") .getOrCreate() val broadCast = spark.sparkContext.broadcast(mapBroad) val sparkDF = spark.sql(sql) val names: Array[String] = sparkDF.columns if(!names.contains("key") || !names.contains("value")){ throw new Exception("请根据提示设置字段名称!") } if(sparkDF.schema.size > 3){ throw new Exception("字段不能超过三个") } if(sparkDF.schema.size == 3){ if(!names.contains("ttl")){ throw new Exception("请根据提示设置时间别名") } }
7、写入数据到Redis:我们使用JedisCluster连接Redis,同时接受广播参入的redis配置信息,将数据写入相应的Redis键中。
s sparkDF .javaRDD .foreachPartition( data =>{ val map: util.HashMap[String, String] = broadCast.value val cluster = new JedisCluster(new HostAndPort(map.get("redisAddr"), map.get("redisPort").toInt)) while (data.hasNext){ val row:Row = data.next() val key = if (row.getAs("key") == null) "NULl" else row.getAs("key").toString val value = if (row.getAs("value") == null) "NULL" else row.getAs("value").toString cluster.set(key,value) if(map.get("setTtl").toBoolean){ if (row.size==3){ val lastActiveDt = row.getAs[String]("ttl") if(lastActiveDt != null){ setExpireTime(key,lastActiveDt,map.get("interval").toInt,map.get("timeCategory"),cluster) } }else if(row.size==2){ setExpireTime(key,map.get("currentTime"),map.get("interval").toInt,map.get("timeCategory"),cluster) } } } } )
8、其他功能实现:我们还包括了一些其他功能,比如解析SQL语句、处理时间相关操作等。
def setExpireTime(key: String, expireTime: String, interval: Int, formatter: String,jedisCluster:JedisCluster): Unit = { val dateMap: util.HashMap[String, Integer] = new util.HashMap[String, Integer] dateMap.put("day", Calendar.DATE) dateMap.put("hour", Calendar.HOUR) dateMap.put("minute", Calendar.MINUTE) dateMap.put("second",Calendar.SECOND) val calendarInstance: Calendar = Calendar.getInstance val simpleDateFormat: SimpleDateFormat = new SimpleDateFormat(getFormatter(expireTime)) try { val parsedDate: Date = simpleDateFormat.parse(expireTime) calendarInstance.setTime(parsedDate) calendarInstance.add(dateMap.get(formatter), interval) val transformatedDate: Long = calendarInstance.getTime.toInstant.toEpochMilli jedisCluster.pexpireAt(key, transformatedDate) } catch { case e: Exception => jedisCluster.del(key) throw new Exception("时间解析异常") } } def getFormatter(datetime: String): String = if (Pattern.matches("\\s*\\d{4}-\\d{2}-\\d{2}\\s*", datetime)) "yyyy-MM-dd" else "yyyy-MM-dd HH:mm:ss"
总结
通过以上步骤,我们成功地开发了一个Spark应用程序,能够高效地将数据从Hive写入Redis。我们使用了Spark的分布式计算能力和Jedis库的灵活性,使得数据传输过程稳健高效。通过深入理解每个步骤的实现原理,我们可以更好地应用这些技术解决实际问题。