MapReduce业务 - 图片关联计算

简介:

1.概述

  最近在和人交流时谈到数据相似度和数据共性问题,而刚好在业务层面有类似的需求,今天和大家分享这类问题的解决思路,分享目录如下所示:

  • 业务背景
  • 编码实践
  • 预览截图

  下面开始今天的内容分享。

2.业务背景

  目前有这样一个背景,在一大堆数据中,里面存放着图片的相关信息,如下图所示:

  上图只是给大家列举的一个示例数据格式,第一列表示自身图片,第二、第三......等列表示与第一列相关联的图片信息。那么我们从这堆数据中如何找出他们拥有相同图片信息的图片。

2.1 实现思路

  那么,我们在明确了上述需求后,下面我们来分析它的实现思路。首先,我们通过上图所要实现的目标结果,其最终计算结果如下所示:

pic_001pic_002 pic_003,pic_004,pic_005
pic_001pic_003 pic_002,pic_005
pic_001pic_004 pic_002,pic_005
pic_001pic_005 pic_002,pic_003,pic_004
......

  结果如上所示,找出两两图片之间的共性图片,结果未列完整,只是列举了部分,具体结果大家可以参考截图预览的相关信息。

  下面给大家介绍解决思路,通过观察数据,我们可以发现在上述数据当中,我们要计算图片两两的共性图片,可以从关联图片入手,在关联图片中我们可 以找到共性图片的关联信息,比如:我们要计算pic001pic002图片的共性图片,我们可以在关联图片中找到两者(pic001pic002组合)后 对应的自身图片(key),最后在将所有的key求并集即为两者的共性图片信息,具体信息如下图所示:

  通过上图,我们可以知道具体的实现思路,步骤如下所示:

  • 第一步:拆分数据,关联数据两两组合作为Key输出。
  • 第二步:将相同Key分组,然后求并集得到计算结果。

  这里使用一个MR来完成此项工作,在明白了实现思路后,我们接下来去实现对应的编码。

3.编码实践

  • 拆分数据,两两组合。

public static class PictureMap extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            StringTokenizer strToken = new StringTokenizer(value.toString());
            Text owner = new Text();

            Set<String> set = new TreeSet<String>();

            owner.set(strToken.nextToken());
            while (strToken.hasMoreTokens()) {
                set.add(strToken.nextToken());
            }

            String[] relations = new String[set.size()];
            relations = set.toArray(relations);

            for (int i = 0; i < relations.length; i++) {
                for (int j = i + 1; j < relations.length; j++) {
                    String outPutKey = relations[i] + relations[j];
                    context.write(new Text(outPutKey), owner);
                }

            }
        }
    }

  • 按Key分组,求并集

public static class PictureReduce extends Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String common = "";
            for (Text val : values) {
                if (common == "") {
                    common = val.toString();
                } else {
                    common = common + "," + val.toString();
                }
            }
            context.write(key, new Text(common));
        }
    }
  • 完整示例

package cn.hadoop.hdfs.example;

import java.io.IOException;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hadoop.hdfs.util.HDFSUtils;
import cn.hadoop.hdfs.util.SystemConfig;

/**
 * @Date Aug 31, 2015
 *
 * @Author dengjie
 *
 * @Note Find picture relations
 */
public class PictureRelations extends Configured implements Tool {

    private static Logger log = LoggerFactory.getLogger(PictureRelations.class);
    private static Configuration conf;

    static {
        String tag = SystemConfig.getProperty("dev.tag");
        String[] hosts = SystemConfig.getPropertyArray(tag + ".hdfs.host", ",");
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://cluster1");
        conf.set("dfs.nameservices", "cluster1");
        conf.set("dfs.ha.namenodes.cluster1", "nna,nns");
        conf.set("dfs.namenode.rpc-address.cluster1.nna", hosts[0]);
        conf.set("dfs.namenode.rpc-address.cluster1.nns", hosts[1]);
        conf.set("dfs.client.failover.proxy.provider.cluster1",
                "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
        conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
    }

    public static class PictureMap extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            StringTokenizer strToken = new StringTokenizer(value.toString());
            Text owner = new Text();

            Set<String> set = new TreeSet<String>();

            owner.set(strToken.nextToken());
            while (strToken.hasMoreTokens()) {
                set.add(strToken.nextToken());
            }

            String[] relations = new String[set.size()];
            relations = set.toArray(relations);

            for (int i = 0; i < relations.length; i++) {
                for (int j = i + 1; j < relations.length; j++) {
                    String outPutKey = relations[i] + relations[j];
                    context.write(new Text(outPutKey), owner);
                }

            }
        }
    }

    public static class PictureReduce extends Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String common = "";
            for (Text val : values) {
                if (common == "") {
                    common = val.toString();
                } else {
                    common = common + "," + val.toString();
                }
            }
            context.write(key, new Text(common));
        }
    }

    public int run(String[] args) throws Exception {
        final Job job = Job.getInstance(conf);
        job.setJarByClass(PictureMap.class);
        job.setMapperClass(PictureMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(PictureReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(job, args[0]);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        int status = job.waitForCompletion(true) ? 0 : 1;
        return status;
    }

    public static void main(String[] args) {
        try {
            if (args.length != 1) {
                log.warn("args length must be 1 and as date param");
                return;
            }
            String tmpIn = SystemConfig.getProperty("hdfs.input.path.v2");
            String tmpOut = SystemConfig.getProperty("hdfs.output.path.v2");
            String inPath = String.format(tmpIn, "t_pic_20150801.log");
            String outPath = String.format(tmpOut, "meta/" + args[0]);

            // bak dfs file to old
            HDFSUtils.bak(tmpOut, outPath, "meta/" + args[0] + "-old", conf);

            args = new String[] { inPath, outPath };
            int res = ToolRunner.run(new Configuration(), new PictureRelations(), args);
            System.exit(res);
        } catch (Exception ex) {
            ex.printStackTrace();
            log.error("Picture relations task has error,msg is" + ex.getMessage());
        }

    }

}

4.截图预览

  关于计算结果,如下图所示:

5.总结

  本篇博客只是从思路上实现了图片关联计算,在数据量大的情况下,是有待优化的,这里就不多做赘述了,后续有时间在为大家分析其中的细节。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!


目录
相关文章
|
2月前
|
存储 大数据 分布式数据库
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
48 1
|
4月前
|
存储 数据挖掘 大数据
深度解析Hologres计算资源配置:如何根据业务场景选择合适的计算类型?
【8月更文挑战第22天】Hologres是一款由阿里云提供的分布式分析型数据库,支持高效的大数据处理与分析。本文通过电商优化商品推荐策略的案例,介绍了Hologres中的计算组型与通用型配置。计算组型提供弹性扩展资源,适合大规模数据及高并发查询;通用型则适用于多数数据分析场景,具备良好计算性能。通过实例创建、数据加载、计算任务建立及结果查询的步骤展示,读者可理解两种配置的差异并根据业务需求灵活选择。
65 2
|
4月前
|
数据处理 流计算 Docker
实时计算 Flink版产品使用问题之进行数据处理时,怎么确保维度的更新在逻辑处理之后进行
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 缓存 测试技术
实时计算 Flink版产品使用问题之如何实现滚动窗口统计用户不重复的总数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Shell 数据处理 流计算
实时计算 Flink版产品使用问题之如何实现两个流之间的关联
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
分布式计算 Hadoop
Hadoop数据重分布的逻辑流程
【6月更文挑战第16天】
50 8
|
6月前
|
分布式计算 大数据 调度
MaxCompute产品使用问题之如何解决UDF针对数据每行操作,而XGBoost需要对数据整体操作的问题
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行DWS层的实时聚合计算时,遇到多次更新同一个字段的情况,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
资源调度 Java API
[flink 实时流基础] flink组件栈以及任务执行与资源划分
[flink 实时流基础] flink组件栈以及任务执行与资源划分
117 1
|
Serverless
函数计算的典型用户场景——云产品场景的触发
函数计算的典型用户场景——云产品场景的触发自制脑图
130 0
函数计算的典型用户场景——云产品场景的触发