【大数据】HDFS、HBase操作教程(含指令和JAVA API)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【大数据】HDFS、HBase操作教程(含指令和JAVA API)

1.前言

本文是作者大数据专栏系列的其中一篇,前文中已经详细聊过分布式文件系统HDFS和分布式数据库HBase了,本文将会是它们的实操讲解。

2.HDFS

2.1.指令操作

创建目录:

hdfs dfs -mkdir /user/mydir

递归创建目录:

hdfs dfs -mkdir -p /user/mydir/subdir

上传文件到HDFS:

hdfs dfs -put localfile.txt /user/mydir/

下载文件到本地:

hdfs dfs -get /user/mydir/file.txt localdir/

删除文件:

hdfs dfs -rm /user/mydir/file.txt

递归删除目录:

hdfs dfs -rm -r /user/mydir

查看目录内容:

hdfs dfs -ls /user/mydir

递归查看目录内容:

hdfs dfs -lsr /user/mydir

查看文件详细信息:

hdfs dfs -stat /user/mydir/file.txt

移动或重命名文件:

hdfs dfs -mv /user/mydir/file.txt /user/mydir/newfile.txt

复制文件、目录:

hdfs dfs -cp /user/mydir/file.txt /user/mydir2/

查看文件内容:

hdfs dfs -cat /user/mydir/file.txt

2.2.JAVA API

首先这里有个巨坑:

一定要把core-site.xml里面的fs.defaultFS换成真实IP地址,不能用localhsot

<configuration
        <property>
                <name>hadoop.tmp.version</name>
                <value>file:/usr/local/hadoop/tmp</value>
        </property>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://localhost:9000</value>
        </property>
</configuration>

如果JAVA API的client端会先找HDFS拿到fs.defaultFS,然后再去访问拿到的地址上的HDFS,如果JAVA API的client端和HDFS不在一台机器上,JAVA API的Client就会去访问它本地的localhost的9000端口上的服务,会直接报错:


Connection refused: no further information


依赖:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>

代码示例:

import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class HDFSSample {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        // 创建目录
        createDirectory(fs, "/user/hadoop/testdir");
        // 上传文件
        uploadFile(fs, "/user/hadoop/testfile.txt", "C:/localfile.txt");
        // 下载文件
        downloadFile(fs, "/user/hadoop/testfile.txt", "C:/downloadedfile.txt");
        // 列出目录内容
        listDirectory(fs, "/user/hadoop");
        // 删除文件
        deleteFile(fs, "/user/hadoop/testfile.txt");
        // 删除目录
        deleteDirectory(fs, "/user/hadoop/testdir");
        // 关闭文件系统
        fs.close();
    }
    private static void createDirectory(FileSystem fs, String dirPath) throws IOException {
        fs.mkdirs(new Path(dirPath));
        System.out.println("Directory created: " + dirPath);
    }
    private static void uploadFile(FileSystem fs, String hdfsPath, String localFilePath) throws IOException {
        Path hdfsPathObj = new Path(hdfsPath);
        Path localPathObj = new Path(localFilePath);
        fs.copyFromLocalFile(false, true, localPathObj, hdfsPathObj);
        System.out.println("File uploaded: " + localFilePath + " to " + hdfsPath);
    }
    private static void downloadFile(FileSystem fs, String hdfsPath, String localFilePath) throws IOException {
        Path hdfsPathObj = new Path(hdfsPath);
        Path localPathObj = new Path(localFilePath);
        fs.copyToLocalFile(true, hdfsPathObj, localPathObj);
        System.out.println("File downloaded: " + hdfsPath + " to " + localFilePath);
    }
    private static void listDirectory(FileSystem fs, String dirPath) throws IOException {
        for (FileStatus file : fs.listStatus(new Path(dirPath))) {
            System.out.println("File/Directory: " + file.getPath().toString());
        }
    }
    private static void deleteFile(FileSystem fs, String filePath) throws IOException {
        Path filePathObj = new Path(filePath);
        if (fs.exists(filePathObj)) {
            fs.delete(filePathObj, false);
            System.out.println("File deleted: " + filePath);
        } else {
            System.out.println("File not found: " + filePath);
        }
    }
    private static void deleteDirectory(FileSystem fs, String dirPath) throws IOException {
        Path dirPathObj = new Path(dirPath);
        if (fs.exists(dirPathObj)) {
            fs.delete(dirPathObj, true);
            System.out.println("Directory deleted: " + dirPath);
        } else {
            System.out.println("Directory not found: " + dirPath);
        }
    }
}

3.HBase

3.1.指令操作

创建一个列族为info的student表:

create 'Student', 'info'

往表里插数据:

put 'Student', '1', 'info:id', '1'

put 'Student', '1', 'info:name', 'Alice' put 'Student', '1', 'info:age', '20'

put 'Student', '1', 'info:major', 'Computer Science'

put 'Student', '2', 'info:id', '2'

put 'Student', '2', 'info:name', 'Bob' put 'Student', '2', 'info:age', '21'

put 'Student', '2', 'info:major', 'Mathematics'
 

查询单个:

get 'Student', '1'

查询批量:

scan 'Student'

条件批量查询:

scan 'Student', {FILTER => "SingleColumnValueFilter('info','age', >=, 'binary:20')"}

在HBase中,Scan对象用于定义在表上进行扫描时的参数,包括哪些行和列需要被检索,以及如何处理这些数据。Filter是Scan的一部分,用于在服务器端对返回的数据进行过滤,以减少网络传输的数据量,提高查询效率。 Filter类提供了一种方式来指定复杂的过滤逻辑,允许你基于行键(Row Key)、列族、列限定符和时间戳来筛选结果。以下是一些常见的Filter类型及其用法:


RowFilter: 用于基于行键的比较,如RowFilter(=, 'binary:rowKey'),匹配特定的行键。


SingleColumnValueFilter: 用于基于列族和列限定符的值进行比较,如SingleColumnValueFilter('cf', 'qualifier', CompareOp.GREATER_OR_EQUAL,BinaryComparator.valueOf(Bytes.toBytes(20))),匹配特定列族和列限定符的值大于或等于给定值的行。


PrefixFilter: 用于匹配以特定前缀开头的行键,如PrefixFilter(Bytes.toBytes('row-prefix'))。


RegexStringComparator: 用于基于正则表达式匹配行键,如RowFilter(CompareOp.EQUAL, RegexStringComparator('.pattern.'))。


MultipleColumnPrefixFilter: 用于匹配具有相同前缀的多个列,如MultipleColumnPrefixFilter(Bytes.toBytes('col-prefix'))。


PageFilter: 用于限制返回结果的数量,这对于大数据量的扫描很有用,如PageFilter(pageSize),pageSize是你希望一次返回的最大行数。


TimestampsFilter: 用于指定返回的行必须包含特定时间戳范围内的版本,如TimestampsFilter(timestamps),timestamps是一个包含多个时间戳的列表。


ValueFilter 和 QualifierFilter: 分别基于列值和列限定符进行过滤。


使用不同类型的过滤器的指令示例:


RowFilter(基于行键过滤)


scan 'Student', {FILTER => "RowFilter(=, 'regexstring:^1')"}


SingleColumnValueFilter(基于特定列的值过滤)


scan 'Student', {FILTER => "SingleColumnValueFilter ('info', 'age', >=, 'binary:20')"}


PrefixFilter(基于列前缀过滤)


scan 'Student', {FILTER => "PrefixFilter(Bytes.toBytes('info'))"}


RegexStringComparator(基于列值的正则表达式过滤)


scan 'Student', {FILTER => "RowFilter(=, 'regexstring:.Alice.')"}


MultipleColumnPrefixFilter(基于多列前缀过滤)


scan 'Student', {FILTER => "MultipleColumnPrefixFilter(Bytes.toBytes('info'))"}


ValueFilter(基于列值的比较过滤)


scan 'Student', {FILTER => "ValueFilter(=, 'binary:Alice')"}


QualifierFilter(基于列限定符的比较过滤)


scan 'Student', {FILTER => "QualifierFilter(=, 'binary:age')"}


清理表:


delete 'Student', '1' delete 'Student', '2' delete 'Student', '3' disable 'Student' drop 'Student'

3.2.JAVA API

HBase也要注意和HDFS中相似的问题,hbase-site.xml中也要用真实的IP地址,不然JAVA API的Client端和HBase不在一台机器上的会,就会访问不到HBase,下面的代码中作为演示代码并没有用真实IP,仍然用的LocalHost,这点要注意。

依赖:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.2.2</version>
</dependency>

代码示例:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
 
public class HBaseExample {
 
    public static void main(String[] args) {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "localhost"); // 设置ZooKeeper地址
        config.set("hbase.zookeeper.property.clientPort", "2181"); // 设置ZooKeeper端口
 
        try (Connection connection = ConnectionFactory.createConnection(config);
             Table table = connection.getTable(TableName.valueOf("students"))) {
 
            // 创建表
            table.createIfNotExists();
 
            // 插入数据
            Put put1 = new Put(Bytes.toBytes("student1"));
            put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Alice"));
            put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("20"));
            put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("major"), Bytes.toBytes("CS"));
            table.put(put1);
 
            Put put2 = new Put(Bytes.toBytes("student2"));
            put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Bob"));
            put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("21"));
            put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("major"), Bytes.toBytes("Math"));
            table.put(put2);
 
            // 查询数据
            Get get = new Get(Bytes.toBytes("student1"));
            Result result = table.get(get);
            System.out.println("Name: " + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
            System.out.println("Age: " + Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"))));
            System.out.println("Major: " + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("major"))));
 
            // 根据条件删除数据
            Delete delete = new Delete(Bytes.toBytes("student1"));
            table.delete(delete);
 
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
4天前
|
Java API
深入探讨 Java 8 集合操作:全面解析 Stream API 的强大功能
深入探讨 Java 8 集合操作:全面解析 Stream API 的强大功能
14 2
|
4天前
|
SQL Java API
Java一分钟之-JPA查询:JPQL与Criteria API
【6月更文挑战第14天】本文探讨了Java Persistence API (JPA)中的两种查询方式:JPQL和Criteria API。JPQL是面向对象的SQL,适用于简单查询,而Criteria API则提供类型安全的动态查询构造。文章指出了每种方法的常见问题和避免策略,如混淆实体属性与数据库字段、参数绑定错误、过度复杂化和性能问题。建议开发者根据需求选择适当的方法,并关注查询的可读性、可维护性和性能优化。
19 2
|
5天前
|
分布式计算 自然语言处理 大数据
【大数据】MapReduce JAVA API编程实践及适用场景介绍
【大数据】MapReduce JAVA API编程实践及适用场景介绍
16 0
|
5天前
|
数据可视化 Java API
【JAVA】javadoc,如何生成标准的JAVA API文档
【JAVA】javadoc,如何生成标准的JAVA API文档
6 0
|
7天前
|
安全 Java API
Java一分钟之-GraphQL:查询语言与API设计
【6月更文挑战第11天】GraphQL,一种革命性的查询语言,正在改变Web开发中的API构建和使用方式。它允许客户端按需请求数据,减少冗余,提升性能。本文概述了GraphQL的核心理念,如声明式查询、强类型和统一入口,并讨论了Java开发者常遇问题:过度查询、Schema设计和安全性。解决方案包括使用Dataloader、优化Schema和实现授权机制。通过理解原理、关注性能、重视安全和持续实践,开发者能更好地利用GraphQL构建高效API。
18 2
|
7天前
|
SQL JSON 分布式计算
|
7天前
|
SQL 分布式计算 Java
|
7天前
|
存储 分布式计算 大数据
MaxCompute操作报错合集之通过UDF调用异常(其他使用http调用正常)。报错:java.lang.NoSuchMethodError:是什么导致的
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
8天前
|
分布式计算 DataWorks NoSQL
MaxCompute产品使用合集之一张表如果想只保留近七天的数据,应该如何设置
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6天前
|
SQL 关系型数据库 MySQL
基于Hive的天气情况大数据分析系统(通过hive进行大数据分析将分析的数据通过sqoop导入到mysql,通过Django基于mysql的数据做可视化)
基于Hive的天气情况大数据分析系统(通过hive进行大数据分析将分析的数据通过sqoop导入到mysql,通过Django基于mysql的数据做可视化)