通过reducer联合产生宽表

简介:

public class ReducerJoin {

public static class ValueFlag implements Writable {
    private String value;
    private String flag;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(value);
        out.writeUTF(flag);

    }

    public void readFields(DataInput in) throws IOException {
        this.value = in.readUTF();
        this.flag = in.readUTF();

    }

}

// map读取两个文件 根据来源把每个kv对打上标签 输出给reduce可以必须是关联字段
public static class ReducerJoinMap extends Mapper<LongWritable, Text, Text, ValueFlag> {
    private FileSplit fileSplit;
    private String fileName;
    private String[] infos;
    private Text oKey = new Text();
    private ValueFlag oValue = new ValueFlag();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, ValueFlag>.Context context)
            throws IOException, InterruptedException {
        fileSplit = (FileSplit) context.getInputSplit();
        if (fileSplit.getPath().toString().contains("user-logs-large.txt")) {
            fileName = "userLogsLarge";
        } else if (fileSplit.getPath().toString().contains("user_info.txt")) {
            fileName = "userInfo";
        }

    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ValueFlag>.Context context)
            throws IOException, InterruptedException {
        infos = value.toString().split("\\s");

        oValue.setFlag(fileName);
        if (fileName.equals("userLogsLarge")) {
            // 解析user-logs-large.txt
            oKey.set(infos[0]);
            oValue.setValue(infos[1] + "\t" + infos[2]);
            context.write(oKey, oValue);
        } else if (fileName.equals("userInfo")) {
            // 解析user_infos.txt
            oKey.set(infos[0]);
            oValue.setValue(infos[1] + "\t" + infos[2]);
            context.write(oKey, oValue);
        }

    }

}

// 接受map发送过来的kv队 根据value中的flag把同一个key对应的value分组
// 那么两组中的数据就是分别来自两张表中的数据 对这两组数据做笛卡尔乘机即完成关联
public static class ReducerJoinReducer extends Reducer<Text, ValueFlag, AvroKey<UserActionLog>, NullWritable> {

    private List<String> userLogsLargeList;
    private List<String> userInfosList;
    private NullWritable outValue = NullWritable.get();
    private AvroKey<UserActionLog> outKey = new AvroKey<UserActionLog>();
    private String[] infos;

    @Override
    protected void reduce(Text key, Iterable<ValueFlag> values,
            Reducer<Text, ValueFlag, AvroKey<UserActionLog>, NullWritable>.Context context)
            throws IOException, InterruptedException {

        userLogsLargeList = new ArrayList<String>();
        userInfosList = new ArrayList<String>();

        for (ValueFlag value : values) {

            if (value.getFlag().equals("userLogsLarge")) {
                userLogsLargeList.add(value.getValue());
            } else if (value.getFlag().equals("userInfo")) {
                userInfosList.add(value.getValue());
            }
        }
        // 对两组中的数据进行笛卡尔乘积
        for (String userlog : userLogsLargeList) {
            for (String userinfo : userInfosList) {
                // 构建一个useractionLog对象
                UserActionLog.Builder build = UserActionLog.newBuilder();

                // 从userlog中提取actiontyoe和ipaddress
                infos = userlog.split("\\s");
                build.setActionType(infos[0]);
                build.setIpAddress(infos[1]);
                // 从userinfo 提取gender 和privince
                infos = userinfo.split("\\s");
                if (infos[0].equals("man")) {
                    build.setGender(0);
                } else {
                    build.setGender(1);
                }
                build.setProvience(infos[1]);
                build.setUserName(key.toString());
                UserActionLog userActionLog = build.build();
                // 吧userAction封装到Avrokey中
                outKey.datum(userActionLog);
                context.write(outKey, outValue);
            }
        }

    }

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);
    job.setJarByClass(ReducerJoin.class);
    job.setJobName("reducer联合");

    job.setMapperClass(ReducerJoinMap.class);
    job.setReducerClass(ReducerJoinReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(ValueFlag.class);
    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWriter.class);
    //设置输出的格式是avrokey
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    //设置输出key的schema
    AvroJob.setOutputKeySchema(job, UserActionLog.SCHEMA$);
    FileInputFormat.addInputPath(job, new Path("/mapjoin"));
    Path outputPath = new Path("/ReducerJoin");
    outputPath.getFileSystem(configuration).delete(outputPath, true);
    FileOutputFormat.setOutputPath(job, outputPath);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

相关文章
|
3天前
|
弹性计算 关系型数据库 微服务
基于 Docker 与 Kubernetes(K3s)的微服务:阿里云生产环境扩容实践
在微服务架构中,如何实现“稳定扩容”与“成本可控”是企业面临的核心挑战。本文结合 Python FastAPI 微服务实战,详解如何基于阿里云基础设施,利用 Docker 封装服务、K3s 实现容器编排,构建生产级微服务架构。内容涵盖容器构建、集群部署、自动扩缩容、可观测性等关键环节,适配阿里云资源特性与服务生态,助力企业打造低成本、高可靠、易扩展的微服务解决方案。
1101 0
|
2天前
|
机器学习/深度学习 人工智能 前端开发
通义DeepResearch全面开源!同步分享可落地的高阶Agent构建方法论
通义研究团队开源发布通义 DeepResearch —— 首个在性能上可与 OpenAI DeepResearch 相媲美、并在多项权威基准测试中取得领先表现的全开源 Web Agent。
461 9
|
12天前
|
人工智能 运维 安全
|
11天前
|
人工智能 测试技术 API
智能体(AI Agent)搭建全攻略:从概念到实践的终极指南
在人工智能浪潮中,智能体(AI Agent)正成为变革性技术。它们具备自主决策、环境感知、任务执行等能力,广泛应用于日常任务与商业流程。本文详解智能体概念、架构及七步搭建指南,助你打造专属智能体,迎接智能自动化新时代。
|
3天前
|
弹性计算 Kubernetes jenkins
如何在 ECS/EKS 集群中有效使用 Jenkins
本文探讨了如何将 Jenkins 与 AWS ECS 和 EKS 集群集成,以构建高效、灵活且具备自动扩缩容能力的 CI/CD 流水线,提升软件交付效率并优化资源成本。
296 0
|
10天前
|
人工智能 异构计算
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
|
11天前
|
机器学习/深度学习 人工智能 自然语言处理
B站开源IndexTTS2,用极致表现力颠覆听觉体验
在语音合成技术不断演进的背景下,早期版本的IndexTTS虽然在多场景应用中展现出良好的表现,但在情感表达的细腻度与时长控制的精准性方面仍存在提升空间。为了解决这些问题,并进一步推动零样本语音合成在实际场景中的落地能力,B站语音团队对模型架构与训练策略进行了深度优化,推出了全新一代语音合成模型——IndexTTS2 。
800 23
|
3天前
|
缓存 供应链 监控
VVIC seller_search 排行榜搜索接口深度分析及 Python 实现
VVIC搜款网seller_search接口提供服装批发市场的商品及商家排行榜数据,涵盖热销榜、销量排名、类目趋势等,支持多维度筛选与数据分析,助力选品决策、竞品分析与市场预测,为服装供应链提供有力数据支撑。
|
3天前
|
缓存 监控 API
Amazon item_review 商品评论接口深度分析及 Python 实现
亚马逊商品评论接口(item_review)可获取用户评分、评论内容及时间等数据,支持多维度筛选与分页调用,结合Python实现情感分析、关键词提取与可视化,助力竞品分析、产品优化与市场决策。