使用IDEA查询HIVE数据,输出男女生人数及姓名

简介: 使用IDEA查询HIVE数据,输出男女生人数及姓名


接上篇

想要在输出男女生人数的基础上,输出姓名,需自定义聚合函数

AggrNameUDF


import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
class AggrNameUDF extends UserDefinedAggregateFunction{
  //输入数据的结构类型
  override def inputSchema: StructType = {StructType(List(StructField("name",StringType)))}
  //缓冲区数据的结构类型
  override def bufferSchema: StructType = {StructType(List(StructField("name",StringType)))}
  //返回值类型
  override def dataType: DataType = StringType
  override def deterministic: Boolean = true
  //初始化操作,初始值赋值为空
  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,"")
  //在work中每一个分区进行操作
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    //获取原先的值
    var bfValue = buffer.getString(0)
    //新传递的数据
    var nowValue = input.getString(0)
    if(bfValue==""){
      bfValue = nowValue
    }else{
      bfValue += ","+nowValue
    }
    //把合并的数据再放到缓冲区
    buffer.update(0,bfValue)
  }
  //合并所有rdd为一个数据
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    var bfValue = buffer1.getString(0)
    var nowValue = buffer2.getString(0)
    if(bfValue==""){
      bfValue = nowValue
    }else{
      bfValue += ","+nowValue
    }
    buffer1.update(0,bfValue)
  }
  //得到缓冲区存放数据
  override def evaluate(buffer: Row): Any = buffer.getString(0)
}

HiveDriver  

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object HiveDriver {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()
    session.udf.register("aggr",new AggrNameUDF())
    session.sql("use driver")
    //输出男女生人数
    //val df = session.sql("select gender,count(*) count from dr group by gender")
    //输出男生女生人数并输出对应的名字
    val df = session.sql("select gender,count(*) count,aggr(name) names from dr group by gender")
    df.show()
    session.stop()
  }
}
相关文章
|
3月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
62 4
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
121 3
|
3月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
53 2
|
3月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
136 0
|
5月前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
70 6
|
5月前
|
XML JSON Java
使用IDEA+Maven搭建整合一个Struts2+Spring4+Hibernate4项目,混合使用传统Xml与@注解,返回JSP视图或JSON数据,快来给你的SSH老项目翻新一下吧
本文介绍了如何使用IntelliJ IDEA和Maven搭建一个整合了Struts2、Spring4、Hibernate4的J2EE项目,并配置了项目目录结构、web.xml、welcome.jsp以及多个JSP页面,用于刷新和学习传统的SSH框架。
160 0
使用IDEA+Maven搭建整合一个Struts2+Spring4+Hibernate4项目,混合使用传统Xml与@注解,返回JSP视图或JSON数据,快来给你的SSH老项目翻新一下吧
|
5月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 分布式计算 数据处理
实时计算 Flink版产品使用问题之怎么将数据从Hive表中读取并写入到另一个Hive表中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。