Flink教程(14)- Flink高级API(容错机制)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink教程(14)- Flink高级API(容错机制)

01 引言

在前面的博客,我们学习了Flink的一些高级API,有兴趣的同学可以参阅下:

在前面的教程,我们已经学习了Flink的四大基石里面的State了,如下图,本文讲解下Checkpoint容错机制:

02 Checkpoint

2.1 Checkpoint VS State

State:

  • 维护/存储的是某一个Operator的运行的状态/历史值,是维护在内存中;
  • 一般指一个具体的Operator的状态(operator的状态表示一些算子在运行的过程中会产生的一些历史结果,如前面的maxBy底层会维护当前的最大值,也就是会维护一个keyedOperator,这个State里面存放就是maxBy这个Operator中的最大值);
  • State数据默认保存在Java的堆内存中/TaskManage节点的内存中;
  • State可以被记录,在失败的情况下数据还可以恢复。

Checkpoint:

  • 某一时刻,Flink中所有的Operator的当前State的全局快照,一般存在磁盘上;
  • 表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有Operator的状态;
  • 可以理解为Checkpoint是把State数据定时持久化存储了;
  • 比如KafkaConsumer算子中维护的Offset状态,当任务重新恢复的时候可以从Checkpoint中获取。

注意:

Flink中使用Chandy-Lamport algorithm分布式快照算法取得了成功,后续SparkStructuredStreaming也借鉴了该算法。

2.2 Checkpoint 执行流程

2.2.1 简单流程

流程描述:

  • step1FlinkJobManager创建CheckpointCoordinator
  • step2Coordinator向所有的SourceOperator发送Barrier栅栏(理解为执行Checkpoint的信号)
  • step3SourceOperator接收到Barrier之后,暂停当前的操作(暂停的时间很短,因为后续的写快照是异步的),并制作State快照, 然后将自己的快照保存到指定的介质中(如HDFS), 一切ok之后向Coordinator汇报并将Barrier发送给下游的其他Operator
  • step4 : 其他的如TransformationOperator接收到Barrier,重复第2步,最后将Barrier发送给Sink
  • step5Sink接收到Barrier之后重复第2步
  • step6Coordinator接收到所有的Operator的执行ok的汇报结果,认为本次快照执行成功

注意:

  • 在往介质(如HDFS)中写入快照数据的时候是异步的(为了提高效率)
  • 分布式快照执行时的数据一致性由Chandy-Lamport algorithm分布式快照算法保证

2.2.2 复杂流程

下图左侧是Checkpoint Coordinator,是整个Checkpoint的发起者,中间是由两个 source,一个 sink组成的Flink作业,最右侧的是持久化存储,在大部分用户场景中对应 HDFS

step1Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint

step2source 节点向下游广播 barrier,这个barrier就是实现 Chandy-Lamport分布式快照算法的核心,下游的task只有收到所有inputbarrier 才会执行相应的Checkpoint

step3 :当 task完成state 备份后,会将备份数据的地址(state handle)通知给Checkpoint coordinator

step4 :下游的 sink节点收集齐上游两个 inputbarrier 之后,会执行本地快照,(栅栏对齐),这里还展示了RocksDB incremental Checkpoint(增量Checkpoint)的流程,首先RocksDB会全量刷数据到磁盘上(红色大三角表示),然后Flink框架会从中选择没有上传的文件进行持久化备份(紫色小三角)

step5 :同样的,sink节点在完成自己的 Checkpoint之后,会将 state handle 返回通知 Coordinator

step6 :最后,当 Checkpoint coordinator 收集齐所有taskstate handle,就认为这一次的Checkpoint全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。

2.3 State状态后端/State存储介质

注意:前面学习了Checkpoint其实就是Flink中某一时刻,所有的Operator的全局快照,那么快照应该要有一个地方进行存储,而这个存储的地方叫做状态后端Flink中的State状态后端有很多种,如下:

功能 MemStateBackend FastStateBackend RocksDBStateBackend
构造方法 MemoryStateBackend(int maxStateSize,boolean asynchronousSnapshots) FsStateBackend(URI checkpointDataUri,boolean asynchronousSnapshots) RocksDBStateBackend(URLI checkpointDataUri,boolean enableIncrementalCheckpointing)
存储方式 State(TaskManager内存)、Checkpoint(JobManager内存) State(TaskManager内存),Checkpoint(外部文件系统、本地或HDFS) State(TaskManager上的KV数据库,实际使用内存+磁盘),Checkpoint(外部文件系统,本地或HDFS)
容量限制 单个State maxStateSize(默认为5M),maxStateSize<akka.framesize(默认为10M),总大小不超过JobManager内存 单个TaskManager上State总量不超过它的内存,总大小不超过配置的文件系统容量 单个TaskManger上的State总量不超过它的内存+磁盘,单个Key最大2G,总大小不超过配置的文件系统容量
推荐使用的场景 本地测试(几乎无状态的作业,比如:ETL,JobManager不容易挂,或者挂掉影响不大的情况),不推荐生产环境使用 常规使用状态的作业(例如:分钟级窗口聚合、join),需要开启HA的作业,可以在生产场景使用 超大状态的作业(例如:天级窗口聚合)、需要开启HA的作业,对状态读写性能要求不高的作业、可以在生产场景使用

2.3.1 MemStateBackend

  • 内存存储,即 MemoryStateBackend,构造方法是设置最大的StateSize,选择是否做异步快照;
  • 对于State状态存储在TaskManager节点也就是执行节点内存中的,因为内存有容量限制,所以单个 State maxStateSize默认 5 M,且需要注意 maxStateSize <= akka.framesize 默认 10 M;
  • 对于Checkpoint存储在JobManager内存中,因此总大小不超过JobManager 的内存;
  • 推荐使用的场景为:本地测试、几乎无状态的作业,比如ETLJobManager 不容易挂,或挂掉影响不大的情况;
  • 不推荐在生产场景使用。

2.3.2 FastStateBackend

  • 另一种就是在文件系统上的 FsStateBackend 构建方法是需要传一个文件路径和是否异步快照。
  • State 依然在 TaskManager内存中,但不会像 MemoryStateBackend 是 5 M 的设置上限
  • Checkpoint 存储在外部文件系统(本地或 HDFS),打破了总大小 Jobmanager内存的限制。
  • 推荐使用的场景为:常规使用状态的作业、例如分钟级窗口聚合或 join、需要开启HA的作业。
  • 如果使用HDFS,则初始化FsStateBackend时,需要传入以 “hdfs://”开头的路径(即: new FsStateBackend("hdfs:///hacluster/checkpoint"))
  • 如果使用本地文件,则需要传入以“file://”开头的路径(即:new FsStateBackend("file:///Data"))
  • 在分布式情况下,不推荐使用本地文件。因为如果某个算子在节点A上失败,在节点B上恢复,使用本地文件时,在B上无法读取节点 A上的数据,导致状态恢复失败。

2.3.3 RocksDBStateBackend

  • RocksDB 是一个 key/value的内存存储系统,和其他的 key/value 一样,先将状态放到内存中,如果内存快满时,则写入到磁盘中
  • 但需要注意 RocksDB不支持同步的Checkpoint,构造方法中没有同步快照这个选项。
  • 不过 RocksDB 支持增量的 Checkpoint,意味着并不需要把所有 sst文件上传到 Checkpoint 目录,仅需要上传新生成的sst文件即可。
  • 它的Checkpoint存储在外部文件系统(本地或HDFS
  • 其容量限制只要单个 TaskManagerState 总量不超过它的内存+磁盘,单 Key最大2G,总大小不超过配置的文件系统容量即可
  • 推荐使用的场景为:超大状态的作业,例如天级窗口聚合、需要开启 HA 的作业、最好是对状态读写性能要求不高的作业。

2.4 Checkpoint配置方式

2.4.1 全局配置

修改flink-conf.yaml

#这里可以配置
#jobmanager(即MemoryStateBackend), 
#filesystem(即FsStateBackend), 
#rocksdb(即RocksDBStateBackend)
state.backend: filesystem 
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints

2.4.2 代码配置

java代码:

//1.MemoryStateBackend--开发中不用
env.setStateBackend(new MemoryStateBackend)
//2.FsStateBackend--开发中可以使用--适合一般状态--秒级/分钟级窗口...
env.setStateBackend(new FsStateBackend("hdfs路径或测试时的本地路径"))
//3.RocksDBStateBackend--开发中可以使用--适合超大状态--天级窗口...
env.setStateBackend(new RocksDBStateBackend(filebackend, true))

注意RocksDBStateBackend还需要引入依赖:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
   <version>1.7.2</version>
</dependency>

2.5 示例代码

/**
 * 演示Checkpoint参数设置(也就是Checkpoint执行流程中的步骤0相关的参数设置)
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 9:48 上午
 */
public class CheckpointDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //===========Checkpoint参数设置====
        //===========类型1:必须参数=============
        //设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier!
        env.enableCheckpointing(1000);
        //设置State状态存储介质
        /*if(args.length > 0){
            env.setStateBackend(new FsStateBackend(args[0]));
        }else {
            env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));
        }*/
        if (SystemUtils.IS_OS_WINDOWS) {
            env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));
        } else {
            env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));
        }
        //===========类型2:建议参数===========
        //设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)
        //如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0
        //设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是  false不是
        //env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败
        //设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //===========类型3:直接使用默认的即可===============
        //设置checkpoint的执行模式为EXACTLY_ONCE(默认)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
        env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟
        //设置同一时间有多少个checkpoint可以同时执行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1
        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //3.1切割出每个单词并直接记为1
        DataStream<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //value就是每一行
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        //注意:批处理的分组是groupBy,流处理的分组是keyBy
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
        //3.3聚合
        DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
        DataStream<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
            @Override
            public String map(Tuple2<String, Integer> value) throws Exception {
                return value.f0 + ":::" + value.f1;
            }
        });
        //4.sink
        result.print();
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
        result.addSink(kafkaSink);
        //5.execute
        env.execute();
        // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }
}

03 状态恢复和重启策略

3.1 自动重启策略和恢复

3.1.1 重启策略配置方式

如果在配置文件中,可以在flink-conf.yml中可以进行配置,示例如下:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

还可以在代码中针对该任务进行配置,示例如下:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔
))

3.1.2 重启策略分类

3.1.2.1 默认重启策略

如果配置了Checkpoint,而没有配置重启策略,那么代码中出现了非致命错误时,程序会无限重启。

3.1.2.2 无重启策略

Job直接失败,不会尝试进行重启

配置文件的方式:

restart-strategy: none

代码配置的方式:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
3.1.2.3 固定延迟重启策略

重启策略可以配置flink-conf.yaml的下面配置参数来启用,作为默认的重启策略:

restart-strategy: fixed-delay
 restart-strategy.fixed-delay.attempts: 3
 restart-strategy.fixed-delay.delay: 10 s

也可以在程序中设置:

// 表示:如果job失败,重启3次, 每次间隔10
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 最多重启3次数
  Time.of(10, TimeUnit.SECONDS) // 重启时间间隔
))
3.1.2.4 失败率重启策略

失败率重启策略可以在flink-conf.yaml中设置下面的配置参数来启用:

restart-strategy:failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

失败率重启策略也可以在程序中设置:

//表示:如果5分钟内job失败不超过三次,自动重启, 
//每次间隔10s (如果5分钟内程序失败超过3次,则程序退出)
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 每个测量时间间隔最大失败次数
  Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
  Time.of(10, TimeUnit.SECONDS) // 两次连续重启的时间间隔
))
3.1.2.5 示例代码
/**
 * 演示Checkpoint+重启策略
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 10:01 上午
 */
public class CheckpointDemo02 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //===========Checkpoint参数设置====
        //===========类型1:必须参数=============
        //设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier!
        env.enableCheckpointing(1000);
        //设置State状态存储介质
        /*if(args.length > 0){
            env.setStateBackend(new FsStateBackend(args[0]));
        }else {
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        }*/
        if (SystemUtils.IS_OS_WINDOWS) {
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        } else {
            env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));
        }
        //===========类型2:建议参数===========
        //设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)
        //如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0
        //设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是  false不是
        //env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败
        //设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //===========类型3:直接使用默认的即可===============
        //设置checkpoint的执行模式为EXACTLY_ONCE(默认)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
        env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟
        //设置同一时间有多少个checkpoint可以同时执行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1
        //=============重启策略===========
        //-1.默认策略:配置了Checkpoint而没有配置重启策略默认使用无限重启
        //-2.配置无重启策略
        //env.setRestartStrategy(RestartStrategies.noRestart());
        //-3.固定延迟重启策略--开发中使用!
        //重启3次,每次间隔10s
        /*env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, //尝试重启3次
                Time.of(10, TimeUnit.SECONDS))//每次重启间隔10s
        );*/
        //-4.失败率重启--偶尔使用
        //5分钟内重启3次(第3次不包括,也就是最多重启2次),每次间隔10s
        /*env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, // 每个测量时间间隔最大失败次数
                Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
                Time.of(10, TimeUnit.SECONDS) // 每次重启的时间间隔
        ));*/
        //上面的能看懂就行,开发中使用下面的代码即可
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //3.1切割出每个单词并直接记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //value就是每一行
                String[] words = value.split(" ");
                for (String word : words) {
                    if (word.equals("bug")) {
                        System.out.println("手动模拟的bug...");
                        throw new RuntimeException("手动模拟的bug...");
                    }
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        //注意:批处理的分组是groupBy,流处理的分组是keyBy
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.sink
        result.print();
        //5.execute
        env.execute();
    }
}

3.2 手动重启并恢复

1.把程序打包

2.启动Flink集群(本地单机版,集群版都可以)

/export/server/flink/bin/start-cluster.sh

3.访问webUI

4.使用FlinkWebUI提交

5.取消任务

6.重新启动任务并指定从哪恢复(例如:hdfs://node1:8020/flink-checkpoint/checkpoint/9e8ce00dcd557dc03a678732f1552c3a/chk-34

7.关闭/取消任务

8.关闭集群

/export/server/flink/bin/stop-cluster.sh

04 Savepoint

4.1 Savepoint介绍

保存点(Savepoint):类似于以前玩游戏的时候,遇到难关了/遇到boss了,赶紧手动存个档,然后接着玩,如果失败了,赶紧从上次的存档中恢复,然后接着玩。

在实际开发中,可能会遇到这样的情况:如要对集群进行停机维护/扩容…

那么这时候需要执行一次Savepoint也就是执行一次手动的Checkpoint/也就是手动的发一个barrier栅栏,那么这样的话,程序的所有状态都会被执行快照并保存。

当维护/扩容完毕之后,可以从上一次Savepoint的目录中进行恢复!

4.2 Savepoint VS Checkpoint

功能 Checkpoint Savepoint
触发管理方式 由Flink自动触发并管理 由用户手动触发并管理
主要用途 在Task发生异常时快速恢复(例如网络抖动导致的超时异常) 有计划地机型备份,使作业能停止后回复(例如修改代码,调整并发)
特点 轻量、自动从故障中恢复、在作业停止后默认清除 持久、以标准格式存储,允许代码或配置发生改变、手动触发从Savepoint的恢复
目标 任务失败的恢复/故障转移机制 手动备份/重启/回复任务
实现 轻量快速 注重可移植性,成本较高
生命周期 Flink自身控制 用户手动控制

4.3 Savepoint 案例演示

# 启动yarn session
/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
# 运行job-会自动执行Checkpoint
/export/server/flink/bin/flink run --class cn.itcast.checkpoint.CheckpointDemo01 /root/ckp.jar
# 手动创建savepoint--相当于手动做了一次Checkpoint
/export/server/flink/bin/flink savepoint 702b872ef80f08854c946a544f2ee1a5 hdfs://node1:8020/flink-checkpoint/savepoint/
# 停止job
/export/server/flink/bin/flink cancel 702b872ef80f08854c946a544f2ee1a5
# 重新启动job,手动加载savepoint数据
/export/server/flink/bin/flink run -s hdfs://node1:8020/flink-checkpoint/savepoint/savepoint-702b87-0a11b997fa70 --class cn.itcast.checkpoint.CheckpointDemo01 /root/ckp.jar 
# 停止yarn session
yarn application -kill application_1607782486484_0014

05 文末

本文主要讲解了Flink高级API里面的容错机制,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
20天前
|
人工智能 数据可视化 测试技术
Postman 性能测试教程:快速上手 API 压测
本文介绍API上线后因高频调用导致服务器告警,通过Postman与Apifox进行压力测试排查性能瓶颈。对比两款工具在批量请求、断言验证、可视化报告等方面的优劣,探讨API性能优化策略及行业未来发展方向。
Postman 性能测试教程:快速上手 API 压测
|
3月前
|
JSON API 网络安全
通用邮箱邮件获取API教程:支持IMAP/POP3协议
本文介绍如何通过接口盒子的免费API获取邮箱邮件,支持IMAP/POP3协议,适用于QQ邮箱、网易邮箱等主流服务。内容包括接口基本信息、请求参数、返回参数、调用示例及注意事项,帮助开发者快速实现邮件读取功能。
|
3月前
|
JSON 监控 API
在线网络PING接口检测服务器连通状态免费API教程
接口盒子提供免费PING检测API,可测试域名或IP的连通性与响应速度,支持指定地域节点,适用于服务器运维和网络监控。
|
3月前
|
JSON API PHP
通用图片搜索API:百度源免费接口教程
本文介绍一款基于百度图片搜索的免费API接口,由接口盒子提供。支持关键词搜索,具备详细请求与返回参数说明,并提供PHP及Python调用示例。开发者可快速集成实现图片搜索功能,适用于内容聚合、素材库建设等场景。
|
3月前
|
JSON 机器人 API
随机昵称网名API接口教程:轻松获取百万创意昵称库
接口盒子提供随机昵称网名API,拥有百万级中文昵称库,支持聊天机器人、游戏角色等场景的昵称生成。提供详细调用指南及多语言示例代码,助力开发者高效集成。
|
1月前
|
人工智能 API 开发者
图文教程:阿里云百炼API-KEY获取方法,亲测全流程
本文详细介绍了如何获取阿里云百炼API-KEY,包含完整流程与截图指引。需先开通百炼平台及大模型服务,再通过控制台创建并复制API-KEY。目前平台提供千万tokens免费额度,适合开发者快速上手使用。
602 5
|
3月前
|
JSON API PHP
天气预报免费API接口【地址查询版】使用教程
本文介绍了如何使用中国气象局官方数据提供的免费天气预报API接口,通过省份和地点查询指定地区当日天气信息。该接口由接口盒子支持,提供JSON格式数据、GET/POST请求方式,并需注册获取用户ID和KEY进行身份验证。
1865 2
|
3月前
|
JSON API PHP
ICP备案查询免费API接口使用教程
本文介绍如何通过接口盒子提供的免费API接口查询域名ICP备案信息,包含请求地址、参数说明及PHP和Python调用示例,适用于开发者快速集成备案查询功能。
|
3月前
|
JSON Shell API
查手机号归属地免费API接口教程
本接口提供手机号码归属地查询功能,支持获取号段、归属地省份/城市、运营商、区号、邮编等信息。请求地址为 `https://cn.apihz.cn/api/ip/shouji.php`,支持 POST 或 GET 方式调用,需提供 `id`、`key` 和 `phone` 参数。返回包含归属地信息及运营商等数据,适用于手机号归属查询场景。
|
3月前
|
JSON API 开发者
淘宝 API 零基础快速上手教程(2025 版)
淘宝API是淘宝开放平台提供的接口,允许开发者获取商品、订单等数据,并实现自动化操作。本文介绍了API基础概念、账号开通流程、权限申请、调用方法及实战示例,适合零基础开发者快速入门并掌握淘宝API的核心使用技巧。

热门文章

最新文章