Flink - RocksDBStateBackend

简介:

如果要考虑易用性和效率,使用rocksDB来替代普通内存的kv是有必要的

有了rocksdb,可以range查询,可以支持columnfamily,可以各种压缩

但是rocksdb本身是一个库,是跑在RocksDBStateBackend中的

所以taskmanager挂掉后,数据还是没了,

所以RocksDBStateBackend仍然需要类似HDFS这样的分布式存储来存储snapshot

 

kv state需要由rockdb来管理,这是和内存或file backend最大的不同

AbstractRocksDBState
复制代码
/**
 * Base class for {@link State} implementations that store state in a RocksDB database.
 *
 * <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} instance that
 * the {@link RocksDBStateBackend} manages and checkpoints.
 *
 * @param <K> The type of the key.
 * @param <N> The type of the namespace.
 * @param <S> The type of {@link State}.
 * @param <SD> The type of {@link StateDescriptor}.
 */
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>>
        implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
    /** Serializer for the namespace */
    private final TypeSerializer<N> namespaceSerializer;

    /** The current namespace, which the next value methods will refer to */
    private N currentNamespace;

    /** Backend that holds the actual RocksDB instance where we store state */
    protected RocksDBStateBackend backend;

    /** The column family of this particular instance of state */
    protected ColumnFamilyHandle columnFamily;

    /**
     * We disable writes to the write-ahead-log here.
     */
    private final WriteOptions writeOptions;

    /**
     * Creates a new RocksDB backed state.
     *
     * @param namespaceSerializer The serializer for the namespace.
     */
    protected AbstractRocksDBState(ColumnFamilyHandle columnFamily,
            TypeSerializer<N> namespaceSerializer,
            RocksDBStateBackend backend) {

        this.namespaceSerializer = namespaceSerializer;
        this.backend = backend;

        this.columnFamily = columnFamily;

        writeOptions = new WriteOptions();
        writeOptions.setDisableWAL(true);
    }

    @Override
    public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long checkpointId,
            long timestamp) throws Exception {
        throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend.");
    }
}
复制代码

 

RocksDBValueState

复制代码
/**
 * {@link ValueState} implementation that stores state in RocksDB.
 *
 * @param <K> The type of the key.
 * @param <N> The type of the namespace.
 * @param <V> The type of value that the state state stores.
 */
public class RocksDBValueState<K, N, V>
    extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>>
    implements ValueState<V> {

    @Override
    public V value() {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
        try {
            writeKeyAndNamespace(out);
            byte[] key = baos.toByteArray();
            byte[] valueBytes = backend.db.get(columnFamily, key); //从db读出value
            if (valueBytes == null) {
                return stateDesc.getDefaultValue();
            }
            return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
        } catch (IOException|RocksDBException e) {
            throw new RuntimeException("Error while retrieving data from RocksDB.", e);
        }
    }

    @Override
    public void update(V value) throws IOException {
        if (value == null) {
            clear();
            return;
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
        try {
            writeKeyAndNamespace(out);
            byte[] key = baos.toByteArray();
            baos.reset();
            valueSerializer.serialize(value, out);
            backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); //将kv写入db
        } catch (Exception e) {
            throw new RuntimeException("Error while adding data to RocksDB", e);
        }
    }
}
复制代码

因为对于kv state,key就是当前收到数据的key,所以key是直接从backend.currentKey()中读到;参考,Flink - Working with State

 

RocksDBStateBackend

初始化过程,

复制代码
/**
 * A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can
 * store very large state that exceeds memory and spills to disk.
 * 
 * <p>All key/value state (including windows) is stored in the key/value index of RocksDB.
 * For persistence against loss of machines, checkpoints take a snapshot of the
 * RocksDB database, and persist that snapshot in a file system (by default) or
 * another configurable state backend.
 * 
 * <p>The behavior of the RocksDB instances can be parametrized by setting RocksDB Options
 * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and
 * {@link #setOptions(OptionsFactory)}.
 */
public class RocksDBStateBackend extends AbstractStateBackend {

    // ------------------------------------------------------------------------
    //  Static configuration values
    // ------------------------------------------------------------------------
    
    /** The checkpoint directory that we copy the RocksDB backups to. */
    private final Path checkpointDirectory;

    /** The state backend that stores the non-partitioned state */
    private final AbstractStateBackend nonPartitionedStateBackend;

    /**
     * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
     * to store state. The different k/v states that we have don't each have their own RocksDB
     * instance. They all write to this instance but to their own column family.
     */
    protected volatile transient RocksDB db; //RocksDB实例

    /**
     * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the
     * file system and location defined by the given URI.
     * 
     * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system
     * host and port in the URI, or have the Hadoop configuration that describes the file system
     * (host / high-availability group / possibly credentials) either referenced from the Flink
     * config, or included in the classpath.
     *
     * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.
     * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
     */
    public RocksDBStateBackend(String checkpointDataUri) throws IOException {
        this(new Path(checkpointDataUri).toUri());
    }

    /**
     * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the
     * file system and location defined by the given URI.
     *
     * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system
     * host and port in the URI, or have the Hadoop configuration that describes the file system
     * (host / high-availability group / possibly credentials) either referenced from the Flink
     * config, or included in the classpath.
     *
     * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory.
     * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
     */
    public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
        // creating the FsStateBackend automatically sanity checks the URI
        FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri); //仍然使用FsStateBackend来存snapshot
        
        this.nonPartitionedStateBackend = fsStateBackend;
        this.checkpointDirectory = fsStateBackend.getBasePath();
    }
    
    // ------------------------------------------------------------------------
    //  State backend methods
    // ------------------------------------------------------------------------
    
    @Override
    public void initializeForJob(
            Environment env, 
            String operatorIdentifier,
            TypeSerializer<?> keySerializer) throws Exception {
        
        super.initializeForJob(env, operatorIdentifier, keySerializer);

        this.nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer);

        RocksDB.loadLibrary(); //初始化rockdb

        List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1); //columnFamily的概念和HBase相同,放在独立的文件
        // RocksDB seems to need this...
        columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
        List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
        try {
            db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); //真正的open rocksDB
        } catch (RocksDBException e) {
            throw new RuntimeException("Error while opening RocksDB instance.", e);
        }
    }
复制代码

 

snapshotPartitionedState

复制代码
@Override
public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
    if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {
        return new HashMap<>();
    }

    if (fullyAsyncBackup) {
        return performFullyAsyncSnapshot(checkpointId, timestamp);
    } else {
        return performSemiAsyncSnapshot(checkpointId, timestamp);
    }
}
复制代码

 

snapshot分为全异步和半异步两种,

 

半异步,

复制代码
/**
 * Performs a checkpoint by using the RocksDB backup feature to backup to a directory.
 * This backup is the asynchronously copied to the final checkpoint location.
 */
private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
    // We don't snapshot individual k/v states since everything is stored in a central
    // RocksDB data base. Create a dummy KvStateSnapshot that holds the information about
    // that checkpoint. We use the in injectKeyValueStateSnapshots to restore.

    final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId);
    final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);

    long startTime = System.currentTimeMillis();

    BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());
    // we disabled the WAL
    backupOptions.setBackupLogFiles(false);
    // no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot
    backupOptions.setSync(false); //设为异步

    try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) {
        // wait before flush with "true"
        backupEngine.createNewBackup(db, true); //利用rocksDB自己的backupEngine生成新的backup,存在本地磁盘
    }

    long endTime = System.currentTimeMillis(); //这部分是同步做的,需要计时看延时
    LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");

    // draw a copy in case it get's changed while performing the async snapshot
    List<StateDescriptor> kvStateInformationCopy = new ArrayList<>();
    for (Tuple2<ColumnFamilyHandle, StateDescriptor> state: kvStateInformation.values()) {
        kvStateInformationCopy.add(state.f1);
    }
    SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath, //
            backupUri,
            kvStateInformationCopy,
            checkpointId);


    HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
    result.put("dummy_state", dummySnapshot);
    return result;
}
复制代码

 

SemiAsyncSnapshot.materialize

复制代码
@Override
public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
    try {
        long startTime = System.currentTimeMillis();
        HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);  //从本地磁盘copy到hdfs
        long endTime = System.currentTimeMillis();
        LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
        return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
    } catch (Exception e) {
        FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
        fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
        throw e;
    } finally {
        FileUtils.deleteQuietly(localBackupPath);
    }
}
复制代码

 

全异步

复制代码
/**
 * Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then
 * iterating over all key/value pairs in RocksDB to store them in the final checkpoint
 * location. The only synchronous part is the drawing of the {@code Snapshot} which
 * is essentially free.
 */
private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
    // we draw a snapshot from RocksDB then iterate over all keys at that point
    // and store them in the backup location

    final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);

    long startTime = System.currentTimeMillis();

    org.rocksdb.Snapshot snapshot = db.getSnapshot(); //生成snapshot,但不用落盘

    long endTime = System.currentTimeMillis();
    LOG.info("Fully asynchronous RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");

    // draw a copy in case it get's changed while performing the async snapshot
    Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<>();
    columnFamiliesCopy.putAll(kvStateInformation);
    FullyAsyncSnapshot dummySnapshot = new FullyAsyncSnapshot(snapshot, //直接把snapshot传入
            this,
            backupUri,
            columnFamiliesCopy,
            checkpointId);


    HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
    result.put("dummy_state", dummySnapshot);
    return result;
}
复制代码

 

FullyAsyncSnapshot.materialize

可以看到需要自己去做db内容的序列化到文件的过程

复制代码
@Override
public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
    try {
        long startTime = System.currentTimeMillis();

        CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);

        outputView.writeInt(columnFamilies.size());

        // we don't know how many key/value pairs there are in each column family.
        // We prefix every written element with a byte that signifies to which
        // column family it belongs, this way we can restore the column families
        byte count = 0;
        Map<String, Byte> columnFamilyMapping = new HashMap<>();
        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
            columnFamilyMapping.put(column.getKey(), count);

            outputView.writeByte(count);

            ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
            ooOut.writeObject(column.getValue().f1);
            ooOut.flush();

            count++;
        }

        ReadOptions readOptions = new ReadOptions();
        readOptions.setSnapshot(snapshot);

        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
            byte columnByte = columnFamilyMapping.get(column.getKey());

            synchronized (dbCleanupLock) {
                if (db == null) {
                    throw new RuntimeException("RocksDB instance was disposed. This happens " +
                            "when we are in the middle of a checkpoint and the job fails.");
                }
                RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
                iterator.seekToFirst();
                while (iterator.isValid()) {
                    outputView.writeByte(columnByte);
                    BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
                            outputView);
                    BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
                            outputView);
                    iterator.next();
                }
            }
        }

        StateHandle<DataInputView> stateHandle = outputView.closeAndGetHandle();

        long endTime = System.currentTimeMillis();
        LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
        return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
    } finally {
        synchronized (dbCleanupLock) {
            if (db != null) {
                db.releaseSnapshot(snapshot);
            }
        }
        snapshot = null;
    }
}
复制代码

 

CheckpointStateOutputView

backend.createCheckpointStateOutputView

public CheckpointStateOutputView createCheckpointStateOutputView(
        long checkpointID, long timestamp) throws Exception {
    return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
}

关键createCheckpointStateOutputStream

 

RocksDBStateBackend

@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(
        long checkpointID, long timestamp) throws Exception {
    
    return nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);
}

 

看看nonPartitionedStateBackend是什么?

复制代码
public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
    // creating the FsStateBackend automatically sanity checks the URI
    FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);
    
    this.nonPartitionedStateBackend = fsStateBackend;
    this.checkpointDirectory = fsStateBackend.getBasePath();
}
复制代码

其实就是FsStateBackend,最终rocksDB还是要用FsStateBackend来存储snapshot

 

restoreState

复制代码
@Override
public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
    if (keyValueStateSnapshots.size() == 0) {
        return;
    }

    KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state");
    if (dummyState instanceof FinalSemiAsyncSnapshot) {
        restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState);
    } else if (dummyState instanceof FinalFullyAsyncSnapshot) {
        restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState);
    } else {
        throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState);
    }
}
复制代码

同样也分为两种,半异步和全异步,过程基本就是snapshot的逆过程


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
流计算 索引
美团 Flink 大作业部署问题之RocksDBStateBackend 的增量 Checkpoint 要怎么制作
美团 Flink 大作业部署问题之RocksDBStateBackend 的增量 Checkpoint 要怎么制作
257 0
|
9月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
789 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
1714 7
阿里云实时计算Flink在多行业的应用和实践
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
4440 74
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
zdl
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
749 56
|
人工智能 Apache 流计算
Flink Forward Asia 2024 上海站|探索实时计算新边界
Flink Forward Asia 2024 即将盛大开幕!11 月 29 至 30 日在上海举行,大会聚焦 Apache Flink 技术演进与未来规划,涵盖流式湖仓、流批一体、Data+AI 融合等前沿话题,提供近百场专业演讲。立即报名,共襄盛举!官网:https://asia.flink-forward.org/shanghai-2024/
1585 33
Flink Forward Asia 2024 上海站|探索实时计算新边界
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。

热门文章

最新文章