hadoop分布式缓存

简介: 1.前言DistributedCache是hadoop框架提供的一种机制,可以将job指定的文件,在job执行前,先行分发到task执行的机器上,并有相关机制对cache文件进行管理。DistributedCache 可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。DistributedCache 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件 (包括文本,档案文件,jar文件等)。Map-Redcue框架在作业所有任务执行之前会把必要的文件拷贝到slave节点上。 它运行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。Dis

1.前言

DistributedCache是hadoop框架提供的一种机制,可以将job指定的文件,在job执行前,先行分发到task执行的机器上,并有相关机制对cache文件进行管理。
DistributedCache 可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。DistributedCache 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件 (包括文本,档案文件,jar文件等)。
Map-Redcue框架在作业所有任务执行之前会把必要的文件拷贝到slave节点上。 它运行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。
DistributedCache 根据缓存文档修改的时间戳进行追踪。 在作业执行期间,当前应用程序或者外部程序不能修改缓存文件。
distributedCache可以分发简单的只读数据或文本文件,也可以分发复杂类型的文件例如归档文件和jar文件。归档文件(zip,tar,tgz和tar.gz文件)在slave节点上会被解档(un-archived)。 这些文件可以设置执行权限。
用户可以通过设置mapred.cache.{files|archives}来分发文件。 如果要分发多个文件,可以使用逗号分隔文件所在路径。
DistributedCache可在map/reduce任务中作为 一种基础软件分发机制使用。它可以被用于分发jar包和本地库(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能够被用于 缓存文件和jar包,并把它们加入子jvm的classpath。也可以通过设置配置文档里的属性 mapred.job.classpath.{files|archives}达到相同的效果。缓存文件可用于分发和装载本地库。

添加缓存文件:

DistributedCache.addCacheFile(URI,conf)
DistributedCache.addCacheArchive(URI,conf) DistributedCache.setCacheFiles(URIs,conf)
DistributedCache.setCacheArchives(URIs,conf)
其中URI的形式是 hdfs://host:port/absolute-path#link-name

缓存Jar:
DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能够被用于 缓存文件和jar包,并把它们加入子jvm的classpath。
DistributedCache.createSymlink(Configuration)方法让DistributedCache 在当前工作目录下创建到缓存文件的符号链接。或者通过设置配置文件属性mapred.create.symlink为yes。 分布式缓存会截取URI的片段作为链接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so,则在task当前工作目录会有名为lib.so的链接,它会链接分布式缓存中的lib.so.1。

2.常见应用场景

(1)分发第三方库(jar,so等);
(2)分发算法需要的词典文件;
(3)分发程序运行需要的配置;
(4)分发多表数据join时小表数据简便处理等

3.注意事项

1.DistributedCache只能应用于分布式的情况,包括伪分布式,完全分布式.有些api在这2种情况下有移植性问题.
2.需要分发的文件,必须提前放到hdfs上.默认的路径前缀是hdfs://的,不是file://
3.需要分发的文件,最好在运行期间是只读的.
4.不建议分发较大的文件,比如压缩文件,可能会影响task的启动速度.
5.注意在Driver中创建Job实例时一定要把Configuration类型的参数传递进去,否则在Mapper或Reducer中调用DistributedCache.getLocalCacheFiles(conf);返回值就为null。因为空构造函数的Job采用的Configuration是从hadoop的配置文件中读出来的(使用new Configuration()创建的Configuration就是从hadoop的配置文件中读出来的),请注意在main()函数中有一句:DistributedCache.addCacheFile(dataFile.toUri(), conf);即此时的Configuration中多了一个DistributedCacheFile,所以你需要把这个Configuration传递给Job构造函数,如果传递默认的Configuration,那在Job中当然不知道DistributedCacheFile的存在了。

4.基本流程

1.每个tasktracker启动时,都会产生一个TrackerDistributedCacheManager对象,用来管理该tt机器上所有的task的cache文件.
2.在客户端提交job时,在JobClient内,对即将cache的文件,进行校验
以确定文件是否存在,文件的大小,文件的修改时间,以及文件的权限是否是private or public.
3.当task在tt初始化job时,会由TrackerDistributedCacheManager产生一个TaskDistributedCacheManager对象,来管理本task的cache文件.
4.和本task相关联的TaskDistributedCacheManager,获取并解压相关cache文件到本地相应目录如果本tt机器上已经有了本job的其他task,并已经完成了相应cache文件的获取和解压工作,则游戏出售平台不会重复进行。如果本地已经有了cache文件,则比较修改时间和hdfs上的文件是否一致,如果一致则可以使用.
5.当task结束时,会对该cache进行ref减一操作.
6.TrackerDistributedCacheManager有一个clearup线程,每隔1min会去处理那些无人使用的,目录大小大于local.cache.size或者子目录个数大于mapreduce.tasktracker.cache.local.numberdirectories的cache目录.

5.应用实例

public class MapJoinByCache {

public static class MapJoiner extends Mapper<LongWritable,Text,Text,Text>
{   
    static Map<String,String> movies=new HashMap<String,String>();
    public void setup(Context context) {            
        try {
            FileReader reader = new FileReader("movies.dat");
            BufferedReader br = new BufferedReader(reader);
            String s1 = null;
            while ((s1 = br.readLine()) != null)
            {
                System.out.println(s1);
                String[] splits= s1.split("::");                    
                String movieId=splits[];
                String movieName =splits[1];
                movies.put(movieId, movieName);                 
            }
            br.close();
            reader.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private Text outKey=new Text();
    private Text outVal=new Text();     
    public void map(LongWritable key,Text value,Context context)throws IOException, InterruptedException
    {
        if(value!=null||value.toString()!=null)
        {
            String[] splits = value.toString().split("::");  
            String movieId =splits[1];
            String movieName= movies.get(movieId);
            outKey.set(movieId);
            outVal.set(movieName+"::"+value.toString());
            context.write(outKey, outVal);
        }
    }
}
public static class DirectReducer extends Reducer<Text,Text,NullWritable,Text>
{
    NullWritable outKey=NullWritable.get();
    public void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException
    {
        for(Text value :values)
        {

context.write(outKey, value);

        }
    }
}
public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException, ClassNotFoundException {
    Configuration conf =new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    DistributedCache.createSymlink(conf);
    DistributedCache.addCacheFile(new URI("hdfs://mylinux:9000/data/exam/movie/movies.dat#movies.dat"), conf);      
    Job job=new Job(conf);
    job.setJobName("Join on Map Side");
    job.setJarByClass(MapJoinByCache.class);
    job.setMapperClass(MapJoiner.class);
    job.setReducerClass(DirectReducer.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[]));
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));        
    System.exit(job.waitForCompletion(true) ?  : 1);
}
目录
相关文章
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
40 2
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
44 1
|
1月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
41 1
|
1月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
28 1
|
1月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
40 1
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
44 1
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
45 0
|
3月前
|
存储 分布式计算 算法
探索Hadoop的三种运行模式:单机模式、伪分布式模式和完全分布式模式
在配置Hadoop集群之前,了解这三种模式的特点、适用场景和配置差异是非常重要的。这有助于用户根据个人需求和资源情况,选择最适合自己的Hadoop运行模式。在最初的学习和开发阶段,单机模式和伪分布式模式能为用户提供便利和成本效益。进而,当用户要处理大规模数据集时,完全分布式模式将是理想的选择。
171 2
|
3月前
|
存储 分布式计算 Hadoop
【揭秘Hadoop背后的秘密!】HDFS读写流程大曝光:从理论到实践,带你深入了解Hadoop分布式文件系统!
【8月更文挑战第24天】Hadoop分布式文件系统(HDFS)是Hadoop生态系统的关键组件,专为大规模数据集提供高效率存储及访问。本文深入解析HDFS数据读写流程并附带示例代码。HDFS采用NameNode和DataNode架构,前者负责元数据管理,后者承担数据块存储任务。文章通过Java示例演示了如何利用Hadoop API实现数据的写入与读取,有助于理解HDFS的工作原理及其在大数据处理中的应用价值。
87 1
|
3月前
|
存储 缓存 分布式计算
下一篇
无影云桌面