Mysql 流增量写入 Hdfs(二) --Storm + hdfs 的流式处理

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 一. 概述 上一篇我们介绍了如何将数据从 mysql 抛到 kafka,这次我们就专注于利用 storm 将数据写入到 hdfs 的过程,由于 storm 写入 hdfs 的可定制东西有些多,我们先不从 kafka 读取,而先自己定义一个 Spout 数据充当数据源,下章再进行整合。

一. 概述

上一篇我们介绍了如何将数据从 mysql 抛到 kafka,这次我们就专注于利用 storm 将数据写入到 hdfs 的过程,由于 storm 写入 hdfs 的可定制东西有些多,我们先不从 kafka 读取,而先自己定义一个 Spout 数据充当数据源,下章再进行整合。这里默认你是拥有一定的 storm 知识的基础,起码知道 Spout 和 bolt 是什么。

写入 hdfs 可以有以下的定制策略:

  1. 自定义写入文件的名字
  2. 定义写入内容格式
  3. 满足给定条件后更改写入的文件
  4. 更改写入文件时触发的 Action

本篇会先说明如何用 storm 写入 HDFS,写入过程一些 API 的描述,以及最后给定一个例子:

storm 每接收到 10 个 Tuple 后就会改变 hdfs 写入文件,新文件的名字就是第几次改变。

ps:storm 版本:1.1.1 。Hadoop 版本:2.7.4 。

接下来我们首先看看 Storm 如何写入 HDFS 。

二. Storm 写入 HDFS

Storm 官方有提供了相应的 API 让我们可以使用。可以通过创建 HdfsBolt 以及定义相应的规则,即可写入 HDFS 。

首先通过 maven 配置依赖以及插件。


    <properties>
        <storm.version>1.1.1</storm.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!--<scope>provided</scope>-->
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>

        <!--hadoop模块-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hdfs</artifactId>
            <version>1.1.1</version>
            <!--<scope>test</scope>-->
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>true</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>com.learningstorm.kafka.KafkaTopology</mainClass>
                </configuration>
            </plugin>
   
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.7</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

这里要提一下,如果要打包部署到集群上的话,打包的插件需要使用 maven-shade-plugin 这个插件,然后使用 maven Lifecycle 中的 package 打包。而不是用 Maven-assembly-plugin 插件进行打包。

因为使用 Maven-assembly-plugin 的时候,会将所有依赖的包unpack,然后在pack,这样就会出现,同样的文件被覆盖的情况。发布到集群上的时候就会报 No FileSystem for scheme: hdfs 的错 。

然后是使用 HdfsBolt 写入 Hdfs。这里来看看官方文档中的例子吧。

// 使用 "|" 来替代 ",",来进行字符分割
RecordFormat format = new DelimitedRecordFormat()
        .withFieldDelimiter("|");

// 每输入 1k 后将内容同步到 Hdfs 中
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// 当文件大小达到 5MB ,转换写入文件,即写入到一个新的文件中
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

//当转换写入文件时,生成新文件的名字并使用
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
        .withPath("/foo/");

HdfsBolt bolt = new HdfsBolt()
        .withFsUrl("hdfs://localhost:9000")
        .withFileNameFormat(fileNameFormat)
        .withRecordFormat(format)
        .withRotationPolicy(rotationPolicy)
        .withSyncPolicy(syncPolicy);

//生成该 bolt
topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
        

到这里就结束了。可以将 HdfsBolt 当作一个 Storm 中特殊一些的 bolt 即可。这个 bolt 的作用即使根据接收信息写入 Hdfs。

而在新建 HdfsBolt 中,Storm 为我们提供了相当强的灵活性,我们可以定义一些策略,比如当达成某个条件的时候转换写入文件,新写入文件的名字,写入时候的分隔符等等。

如果选择使用的话,Storm 有提供部分接口供我们使用,但如果我们觉得不够丰富也可以自定义相应的类。下面我们看看如何控制这些策略吧。

RecordFormat

这是一个接口,允许你自由定义接收到内容的格式。

public interface RecordFormat extends Serializable {
    byte[] format(Tuple tuple);
}

Storm 提供了 DelimitedRecordFormat ,使用方法在上面已经有了。这个类默认的分割符是逗号",",而你可以通过 withFieldDelimiter 方法改变分隔符。
如果你的初始分隔符不是逗号的话,那么也可以重写写一个类实现 RecordFormat 接口即可。

FileNameFormat

同样是一个接口。

public interface FileNameFormat extends Serializable {
    void prepare(Map conf, TopologyContext topologyContext);
    String getName(long rotation, long timeStamp);
    String getPath();
}

Storm 所提供的默认的是 org.apache.storm.hdfs.format.DefaultFileNameFormat 。默认人使用的转换文件名有点长,格式是这样的:

{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}

例如:

MyBolt-5-7-1390579837830.txt

默认情况下,前缀是空的,扩展标识是".txt"。

SyncPolicy

同步策略允许你将 buffered data 缓冲到 Hdfs 文件中(从而client可以读取数据),通过实现org.apache.storm.hdfs.sync.SyncPolicy 接口:

public interface SyncPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}

FileRotationPolicy

这个接口允许你控制什么情况下转换写入文件。

public interface FileRotationPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}

Storm 有提供三个实现该接口的类:

  • 最简单的就是不进行转换的org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy ,就是什么也不干。
  • 通过文件大小触发转换的 org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy。
  • 通过时间条件来触发转换的 org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy。

如果有更加复杂的需求也可以自己定义。

RotationAction

这个主要是提供一个或多个 hook ,可加可不加。主要是在触发写入文件转换的时候会启动。

public interface RotationAction extends Serializable {
    void execute(FileSystem fileSystem, Path filePath) throws IOException;
}

三.实现一个例子

了解了上面的情况后,我们会实现一个例子,根据写入记录的多少来控制写入转换(改变写入的文件),并且转换后文件的名字表示当前是第几次转换。

首先来看看 HdfsBolt 的内容:

        RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" ");
        // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
//        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB);
        /** rotate file with Date,every month create a new file
         * format:yyyymm.txt
         */
        FileRotationPolicy rotationPolicy = new CountStrRotationPolicy();
        FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/");
        RotationAction action = new NewFileAction();
        HdfsBolt bolt = new HdfsBolt()
                .withFsUrl("hdfs://127.0.0.1:9000")
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy)
                .addRotationAction(action);

然后分别来看各个策略的类。

FileRotationPolicy

import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.tuple.Tuple;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 计数以改变Hdfs写入文件的位置,当写入10次的时候,则更改写入文件,更改名字取决于 “TimesFileNameFormat”
 * 这个类是线程安全
 */

public class CountStrRotationPolicy implements FileRotationPolicy {


    private SimpleDateFormat df = new SimpleDateFormat("yyyyMM");

    private String date =  null;

    private int count = 0;

    public CountStrRotationPolicy(){
        this.date =  df.format(new Date());
//        this.date = df.format(new Date());
    }


    /**
     * Called for every tuple the HdfsBolt executes.
     *
     * @param tuple  The tuple executed.
     * @param offset current offset of file being written
     * @return true if a file rotation should be performed
     */
    @Override
    public boolean mark(Tuple tuple, long offset) {
        count ++;
        if(count == 10) {
            System.out.print("num :" +count + "   ");
            count = 0;
            return true;

        }
        else {
            return false;
        }
    }

    /**
     * Called after the HdfsBolt rotates a file.
     */
    @Override
    public void reset() {

    }

    @Override
    public FileRotationPolicy copy() {
        return new CountStrRotationPolicy();
    }


}

FileNameFormat


import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.task.TopologyContext;

import java.util.Map;

/**
 * 决定重新写入文件时候的名字
 * 这里会返回是第几次转换写入文件,将这个第几次做为文件名
 */
public class TimesFileNameFormat implements FileNameFormat {
    //默认路径
    private String path = "/storm";
    //默认后缀
    private String extension = ".txt";
    private Long times = new Long(0);

    public TimesFileNameFormat withPath(String path){
        this.path = path;
        return this;
    }

    @Override
    public void prepare(Map conf, TopologyContext topologyContext) {
    }


    @Override
    public String getName(long rotation, long timeStamp) {
        times ++ ;
        //返回文件名,文件名为更换写入文件次数
        return times.toString() + this.extension;
    }

    public String getPath(){
        return this.path;
    }
}

RotationAction


import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
/**
    当转换写入文件时候调用的 hook ,这里仅写入日志。
 */
public class NewFileAction implements RotationAction {
    private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class);



    @Override
    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
        LOG.info("Hdfs change the written file!!");

        return;
    }
}

OK,这样就大功告成了。通过上面的代码,每接收到 10 个 Tuple 后就会转换写入文件,新文件的名字就是第几次转换。

完整代码包括一个随机生成字符串的 Spout ,可以到我的 github 上查看。

StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo


更多干货,欢迎关注公众号,哈尔的数据城堡

相关文章
|
19天前
|
canal 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行整库同步MySQL数据到StarRocks时,遇到全量数据可以同步,但增量数据无法同步,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
关系型数据库 MySQL Linux
mysql 主从同步 实现增量备份
mysql 主从同步 实现增量备份
32 0
|
2月前
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
36 0
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之从MySQL同步数据到Doris时,历史数据时间字段显示为null,而增量数据部分的时间类型字段正常显示的原因是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
关系型数据库 MySQL OLAP
实时计算 Flink版产品使用合集之可以支持 MySQL 数据源的增量同步到 Hudi 吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
运维 DataWorks 关系型数据库
DataWorks产品使用合集之DataWorks还有就是对于mysql中的表已经存在数据了,第一次全量后面增量同步的步骤如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
47 2
|
2月前
|
DataWorks 关系型数据库 MySQL
DataWorks产品使用合集之在DataWorks中,要实现MySQL数据源的增量同步如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
94 2
|
2月前
|
SQL 关系型数据库 MySQL
Sqoop【付诸实践 01】Sqoop1最新版 MySQL与HDFS\Hive\HBase 核心导入导出案例分享+多个WRAN及Exception问题处理(一篇即可学会在日常工作中使用Sqoop)
【2月更文挑战第9天】Sqoop【付诸实践 01】Sqoop1最新版 MySQL与HDFS\Hive\HBase 核心导入导出案例分享+多个WRAN及Exception问题处理(一篇即可学会在日常工作中使用Sqoop)
135 7
|
2天前
|
关系型数据库 MySQL 数据库
关系型数据库mysql数据增量恢复
【7月更文挑战第3天】
12 2
|
11天前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用问题之如何实现MySQL的实时增量同步
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。