通过reducer联合产生宽表

简介:

public class ReducerJoin {

public static class ValueFlag implements Writable {
    private String value;
    private String flag;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(value);
        out.writeUTF(flag);

    }

    public void readFields(DataInput in) throws IOException {
        this.value = in.readUTF();
        this.flag = in.readUTF();

    }

}

// map读取两个文件 根据来源把每个kv对打上标签 输出给reduce可以必须是关联字段
public static class ReducerJoinMap extends Mapper<LongWritable, Text, Text, ValueFlag> {
    private FileSplit fileSplit;
    private String fileName;
    private String[] infos;
    private Text oKey = new Text();
    private ValueFlag oValue = new ValueFlag();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, ValueFlag>.Context context)
            throws IOException, InterruptedException {
        fileSplit = (FileSplit) context.getInputSplit();
        if (fileSplit.getPath().toString().contains("user-logs-large.txt")) {
            fileName = "userLogsLarge";
        } else if (fileSplit.getPath().toString().contains("user_info.txt")) {
            fileName = "userInfo";
        }

    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ValueFlag>.Context context)
            throws IOException, InterruptedException {
        infos = value.toString().split("\\s");

        oValue.setFlag(fileName);
        if (fileName.equals("userLogsLarge")) {
            // 解析user-logs-large.txt
            oKey.set(infos[0]);
            oValue.setValue(infos[1] + "\t" + infos[2]);
            context.write(oKey, oValue);
        } else if (fileName.equals("userInfo")) {
            // 解析user_infos.txt
            oKey.set(infos[0]);
            oValue.setValue(infos[1] + "\t" + infos[2]);
            context.write(oKey, oValue);
        }

    }

}

// 接受map发送过来的kv队 根据value中的flag把同一个key对应的value分组
// 那么两组中的数据就是分别来自两张表中的数据 对这两组数据做笛卡尔乘机即完成关联
public static class ReducerJoinReducer extends Reducer<Text, ValueFlag, AvroKey<UserActionLog>, NullWritable> {

    private List<String> userLogsLargeList;
    private List<String> userInfosList;
    private NullWritable outValue = NullWritable.get();
    private AvroKey<UserActionLog> outKey = new AvroKey<UserActionLog>();
    private String[] infos;

    @Override
    protected void reduce(Text key, Iterable<ValueFlag> values,
            Reducer<Text, ValueFlag, AvroKey<UserActionLog>, NullWritable>.Context context)
            throws IOException, InterruptedException {

        userLogsLargeList = new ArrayList<String>();
        userInfosList = new ArrayList<String>();

        for (ValueFlag value : values) {

            if (value.getFlag().equals("userLogsLarge")) {
                userLogsLargeList.add(value.getValue());
            } else if (value.getFlag().equals("userInfo")) {
                userInfosList.add(value.getValue());
            }
        }
        // 对两组中的数据进行笛卡尔乘积
        for (String userlog : userLogsLargeList) {
            for (String userinfo : userInfosList) {
                // 构建一个useractionLog对象
                UserActionLog.Builder build = UserActionLog.newBuilder();

                // 从userlog中提取actiontyoe和ipaddress
                infos = userlog.split("\\s");
                build.setActionType(infos[0]);
                build.setIpAddress(infos[1]);
                // 从userinfo 提取gender 和privince
                infos = userinfo.split("\\s");
                if (infos[0].equals("man")) {
                    build.setGender(0);
                } else {
                    build.setGender(1);
                }
                build.setProvience(infos[1]);
                build.setUserName(key.toString());
                UserActionLog userActionLog = build.build();
                // 吧userAction封装到Avrokey中
                outKey.datum(userActionLog);
                context.write(outKey, outValue);
            }
        }

    }

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);
    job.setJarByClass(ReducerJoin.class);
    job.setJobName("reducer联合");

    job.setMapperClass(ReducerJoinMap.class);
    job.setReducerClass(ReducerJoinReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(ValueFlag.class);
    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWriter.class);
    //设置输出的格式是avrokey
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    //设置输出key的schema
    AvroJob.setOutputKeySchema(job, UserActionLog.SCHEMA$);
    FileInputFormat.addInputPath(job, new Path("/mapjoin"));
    Path outputPath = new Path("/ReducerJoin");
    outputPath.getFileSystem(configuration).delete(outputPath, true);
    FileOutputFormat.setOutputPath(job, outputPath);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

相关文章
|
2月前
|
SQL 分布式计算 NoSQL
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
37 1
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
|
6月前
|
存储 SQL 消息中间件
ClickHouse(12)ClickHouse合并树MergeTree家族表引擎之AggregatingMergeTree详细解析
AggregatingMergeTree是ClickHouse的一种表引擎,它优化了MergeTree的合并逻辑,通过将相同主键(排序键)的行聚合为一行并存储聚合函数状态来减少行数。适用于增量数据聚合和物化视图。建表语法中涉及AggregateFunction和SimpleAggregateFunction类型。插入数据需使用带-State-的聚合函数,查询时使用GROUP BY和-Merge-。处理逻辑包括按排序键聚合、在合并分区时计算、以分区为单位聚合等。常用于物化视图配合普通MergeTree使用。查阅更多资料可访问相关链接。
319 4
|
2月前
|
存储 大数据 分布式数据库
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
50 1
|
7月前
|
SQL 存储 分布式计算
奇思妙想的SQL|去重Cube计算优化新思路
本文主要分享了作者在蚂蚁集团高管数据链路改造升级过程中,针对去重Cube的优化实践。
850 48
|
6月前
|
传感器 存储 SQL
ClickHouse(15)ClickHouse合并树MergeTree家族表引擎之GraphiteMergeTree详细解析
GraphiteMergeTree是ClickHouse用于优化Graphite数据存储和汇总的表引擎,适合需要瘦身和高效查询Graphite数据的开发者。它基于MergeTree,减少存储空间并提升查询效率。创建表时需包括Path、Time、Value和Version列。配置涉及pattern、regexp、function和retention,用于指定聚合函数和数据保留规则。文章还提供了建表语句示例和相关资源链接。
103 1
|
7月前
|
存储 数据挖掘 大数据
大数据数仓建模基础理论【维度表、事实表、数仓分层及示例】
数据仓库建模是组织和设计数据以支持数据分析的过程,包括ER模型和维度建模。ER模型通过实体和关系描述数据结构,遵循三范式减少冗余。维度建模,特别是Kimball方法,用于数据仓库设计,便于分析和报告。事实表存储业务度量,如销售数据,分为累积、快照、事务和周期性快照类型。维度表提供描述性信息,如时间、产品、地点和客户详情。数仓通常分层为ODS(源数据)、DWD(明细数据)、DIM(公共维度)、DWS(数据汇总)和ADS(应用数据),以优化数据管理、质量、查询性能和适应性。
1982 3
|
6月前
|
存储 SQL 算法
ClickHouse(14)ClickHouse合并树MergeTree家族表引擎之VersionedCollapsingMergeTree详细解析
VersionedCollapsingMergeTree是ClickHouse的一种优化引擎,扩展了MergeTree,支持多线程异步插入和高效的数据折叠。它通过Sign和Version列处理对象状态的变化,Sign表示行的状态(正向或撤销),Version追踪状态版本。引擎自动删除旧状态,减少存储占用。在查询时,需注意可能需使用GROUP BY和聚合函数确保数据折叠,因为ClickHouse不保证查询结果已折叠。文章还提供了建表语法、使用示例和相关资源链接。
203 0
|
SQL HIVE
Hive 常用的窗口函数【高频重点】(下)
Hive 常用的窗口函数【高频重点】(下)
89 0
|
SQL HIVE
Hive 常用的窗口函数【高频重点】(上)
Hive 常用的窗口函数【高频重点】
178 0
|
SQL 存储 大数据
大数据Hive函数高阶 2
大数据Hive函数高阶
62 0