hdfs常用API和putMerge功能实现

简介: 所需jar包一、URL API操作方式import java.io.InputStream;import java.net.URL;import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;import org.apache.hadoop.io.IOUtils;import org.junit.Test;public clas

所需jar包

wKioL1ZIK6Hy-MbWAAAULWIONq4795.png

一、URL API操作方式

import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class HDFSUrlTest {

    /**
     * HDFS URL API操作方式
     * 不需要读取core-site.xml和hdfs-site.xml配置文件
     */

    // 让JAVA程序识别HDFS的URL
    static {

        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }

    // 查看文件内容

    @Test
    public void testRead() throws Exception {

        InputStream in = null;
        // 文件路径
        String fileUrl = "hdfs://hadoop-master.dragon.org:9000/opt/data/test/01.data";
        try {
            // 获取文件输入流
            in = new URL(fileUrl).openStream();
            // 将文件内容读取出来,打印控制台
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {

            IOUtils.closeStream(in);
        }

    }

}


二、通过FileSystem API操作HDFS


HDFS工具类


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;


public class HDFSUtils {

/**
 *     HDFS工具类
 */
    
    public static FileSystem  getFileSystem() {
        //声明FileSystem
        FileSystem hdfs=null;
        try {
            
            //获取文件配置信息
            Configuration conf =new Configuration();

            //获取文件系统
            hdfs=FileSystem.get(conf);
        } catch (IOException e) {
            
            e.printStackTrace();
        }
        return hdfs;
        
    }
    
}



常用操作实现类


import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.gethistory_jsp;
import org.junit.Test;

public class HDFSFsTest {

    /**
     * 
     * 通过FileSystem API操作HDFS
     */
    // 读取文件内容

    @Test
    public void testRead() throws Exception {
        // 获取文件系统
        FileSystem hdfs = HDFSUtils.getFileSystem();
        // 文件名称
        Path path = new Path("/opt/data/test/touch.data");
        // 打开文件输入流
        FSDataInputStream inStream = hdfs.open(path);
        // 读取文件到控制台显示
        IOUtils.copyBytes(inStream, System.out, 4096, false);

        // 关闭流
        IOUtils.closeStream(inStream);
    }

    // 查看目录
    @Test
    public void testList() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();
        // 文件名称
        Path path = new Path("/opt/data");
        FileStatus[] fileStatus = hdfs.listStatus(path);
        for (FileStatus file : fileStatus) {
            Path p = file.getPath();
            String info = file.isDir() ? "目录" : "文件";
            System.out.println(info + ":" + p);
        }
    }

    // 创建目录
    @Test
    public void testDirectory() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();
        // 要创建的目录
        Path path = new Path("/opt/data/dir");
        boolean isSuccessful = hdfs.mkdirs(path);// 相当于 linux下 mkdir -p
                                                    // /opt/data/dir
        String info = isSuccessful ? "成功" : "失败";
        System.out.println("创建目录【" + path + "】" + info);
    }

    // 上传文件-- put copyFromLocal

    @Test
    public void testPut() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();
        // 本地文件(目录+文件名称)
        Path srcPath = new Path("c:/0125.log");
        // hdfs文件上传路径
        Path dstPath = new Path("/opt/data/dir/");
        hdfs.copyFromLocalFile(srcPath, dstPath);

    }

    // 创建hdfs文件并写入内容

    @Test
    public void testCreate() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        Path path = new Path("/opt/data/dir/touch.data");
        // 创建文件并获取输出流
        FSDataOutputStream fSDataOutputStream = hdfs.create(path);
        // 通过输出流写入数据
        fSDataOutputStream.write("你好".getBytes());
        fSDataOutputStream.writeUTF("hello hadoop!");
        IOUtils.closeStream(fSDataOutputStream);
    }

    // 文件重命名

    @Test
    public void testRename() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        Path oldPath = new Path("/opt/data/dir/touch.data");
        Path newPath = new Path("/opt/data/dir/rename.data");

        boolean flag = hdfs.rename(oldPath, newPath);
        System.out.println(flag);
    }

    // 删除文件
    public void testDelete() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        Path path = new Path("/opt/data/dir/touch.data");

        boolean flag = hdfs.deleteOnExit(path);

        System.out.println(flag);

    }

    // 删除目录

    public void testDeleteDir() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        Path path = new Path("/opt/data/dir");

        boolean flag = hdfs.delete(path, true);// 如果是目录第二个参数必须为true

        System.out.println(flag);

    }

    // 查找某个文件在hdfs集群的位置

    public void testLocation() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        Path path = new Path("/opt/data/test.file");

        FileStatus fileStatus = hdfs.getFileStatus(path);
        BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fileStatus,
                0, fileStatus.getLen());
        for (BlockLocation blockLocation : blockLocations) {

            String[] hosts = blockLocation.getHosts();
            for (String host : hosts) {

                System.out.print(host + " ");
            }
            System.out.println();

        }

    }

    // 获取hdfs集群上所有节点名称信息

    public void testCluster() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) hdfs;
        DatanodeInfo[] datanodeInfos = distributedFileSystem.getDataNodeStats();

        for (DatanodeInfo datanodeInfo : datanodeInfos) {
            String hostName = datanodeInfo.getHostName();

            System.out.println(hostName);
        }
    }

}



三、上传合并小文件到hdfs

实现思想:循环遍历本地文件输入流

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
 * 
 * 向hdfs上传复制文件的过程中,进行合并文件
 * 
 */
public class PutMerge {

    /**
     * 
     * @param localDir
     *            本地要上传的文件目录
     * @param hdfsFile
     *            HDFS上的文件名称,包括路径
     */
    public static void put(String localDir, String hdfsFile) throws Exception {

        // 获取配置信息
        Configuration conf = new Configuration();
        Path localPath = new Path(localDir);
        Path hdfsPath = new Path(hdfsFile);

        // 获取本地文件系统
        FileSystem localFs = FileSystem.getLocal(conf);
        // 获取HDFS
        FileSystem hdfs = FileSystem.get(conf);

        // 本地文件系统指定目录中的所有文件

        FileStatus[] status = localFs.listStatus(localPath);
        // 打开hdfs上文件的输出流
        FSDataOutputStream fSDataOutputStream = hdfs.create(hdfsPath);
        // 循环遍历本地文件

        for (FileStatus fileStatus : status) {
            // 获取文件
            Path path = fileStatus.getPath();
            System.out.println("文件为:" + path.getName());
            // 打开文件输入流
            FSDataInputStream fSDataInputStream = localFs.open(path);

            // 进行流的读写操作
            byte[] buff = new byte[1024];

            int len = 0;
            while ((len = fSDataInputStream.read(buff)) > 0) {

                fSDataOutputStream.write(buff, 0, len);
            }
            fSDataInputStream.close();
        }
        fSDataOutputStream.close();
    }

    
    public static void main(String[] args) {
        String localDir="D:/logs";
        String hdfsFile="hdfs://hadoop-master.dragon.org:9000/opt/data/logs.data";
        try {
            put(localDir,hdfsFile);
        } catch (Exception e) {
            
            e.printStackTrace();
        }
}
    
    
}


本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1710640

目录
相关文章
|
2月前
|
Java API 数据库
构建RESTful API已经成为现代Web开发的标准做法之一。Spring Boot框架因其简洁的配置、快速的启动特性及丰富的功能集而备受开发者青睐。
【10月更文挑战第11天】本文介绍如何使用Spring Boot构建在线图书管理系统的RESTful API。通过创建Spring Boot项目,定义`Book`实体类、`BookRepository`接口和`BookService`服务类,最后实现`BookController`控制器来处理HTTP请求,展示了从基础环境搭建到API测试的完整过程。
58 4
|
9天前
|
Java
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
68 34
|
4月前
HDFS web Interfaces功能解读
HDFS web Interfaces功能解读
|
16天前
|
JSON 供应链 搜索推荐
某东API接口:开启电商数据交互与功能调用的新篇章
在当今的数字化时代,电商平台的开放API(Application Programming Interface,应用程序编程接口)已经成为连接开发者与电商平台之间的重要桥梁。京东作为中国领先的电商平台之一,其开放平台提供的API接口更是为开发者们带来了无限可能。本文将深入探讨京东API接口的功能、应用场景、使用流程以及其在电商领域的重要价值。
|
28天前
|
API 开发工具 开发者
探究亚马逊国际获得AMAZON商品详情 API 接口功能、作用与实际应用示例
亚马逊提供的Amazon Product Advertising API或Selling Partner API,使开发者能编程访问亚马逊商品数据,包括商品标题、描述、价格等。支持跨境电商和数据分析,提供商品搜索和详情获取等功能。示例代码展示了如何使用Python和boto3库获取特定商品信息。使用时需遵守亚马逊政策并注意可能产生的费用。
|
6月前
|
Java API
深入探讨 Java 8 集合操作:全面解析 Stream API 的强大功能
深入探讨 Java 8 集合操作:全面解析 Stream API 的强大功能
78 2
|
2月前
|
Java
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
65 2
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
|
2月前
|
机器学习/深度学习 算法 Java
通过 Java Vector API 利用 SIMD 的强大功能
通过 Java Vector API 利用 SIMD 的强大功能
57 10
|
2月前
|
移动开发 前端开发 JavaScript
前端开发实战:利用Web Speech API之speechSynthesis实现文字转语音功能
前端开发实战:利用Web Speech API之speechSynthesis实现文字转语音功能
235 0
|
3月前
|
JSON 搜索推荐 API
深入了解亚马逊商品详情API:功能、作用与实例
亚马逊商品详情API接口由官方提供,允许开发者通过程序调用获取商品详细信息,如标题、价格等,适用于电商数据分析、搜索及个性化推荐等场景。接口名称包括ItemLookup、GetMatchingProductForId等,支持HTTP POST/GET请求,需提供商品ID、API密钥及其他可选参数。返回数据格式通常为JSON或XML,涵盖商品详情、分类、品牌、价格、图片URL及用户评价等。该接口对数据收集、实时推荐、营销活动及数据分析至关重要,有助于提升电商平台的数据处理能力、用户体验及商家运营效率。使用时需注册亚马逊开发者账号并申请API访问权限,获取API密钥后按文档构建请求并处理响应数据。

热门文章

最新文章