Apache Flink 漫谈系列(15) - DataStream Connectors之Kafka

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 聊什么 为了满足本系列读者的需求,在完成《Apache Flink 漫谈系列(14) - DataStream Connectors》之前,我先介绍一下Kafka在Apache Flink中的使用。所以本篇以一个简单的示例,向大家介绍在Apache Flink中如何使用Kafka。

聊什么

为了满足本系列读者的需求,在完成《Apache Flink 漫谈系列(14) - DataStream Connectors》之前,我先介绍一下Kafka在Apache Flink中的使用。所以本篇以一个简单的示例,向大家介绍在Apache Flink中如何使用Kafka。

Kafka 简介

Apache Kafka是一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,LinkedIn于2010年贡献给了Apache基金会并成为顶级开源项目。Kafka用于构建实时数据管道和流式应用程序。它具有水平扩展性、容错性、极快的速度,目前也得到了广泛的应用。

Kafka不但是分布式消息系统而且也支持流式计算,所以在介绍Kafka在Apache Flink中的应用之前,先以一个Kafka的简单示例直观了解什么是Kafka。

安装

本篇不是系统的,详尽的介绍Kafka,而是想让大家直观认识Kafka,以便在Apahe Flink中进行很好的应用,所以我们以最简单的方式安装Kafka。

  • 下载二进制包
curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
  • 解压安装
    Kafka安装只需要将下载的tgz解压即可,如下:
jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz 
jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls
LICENSE        NOTICE        bin        config        libs        site-docs

其中bin包含了所有Kafka的管理命令,如接下来我们要启动的Kafka的Server。

  • 启动Kafka Server
    Kafka是一个发布订阅系统,消息订阅首先要有个服务存在。我们启动一个Kafka Server 实例。 Kafka需要使用ZooKeeper,要进行投产部署我们需要安装ZooKeeper集群,这不在本篇的介绍范围内,所以我们利用Kafka提供的脚本,安装一个只有一个节点的ZooKeeper实例。如下:
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &

[2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
....
....
[2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

启动之后,ZooKeeper会绑定2181端口(默认)。接下来我们启动Kafka Server,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties
[2019-01-13 09:09:16,937] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer)
[2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
...
...
[2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

如果上面一切顺利,Kafka的安装就完成了。

创建Topic

Kafka是消息订阅系统,首先创建可以被订阅的Topic,我们创建一个名为flink-tipic的Topic,在一个新的terminal中,执行如下命令:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic

Created topic "flink-tipic".

在Kafka Server的terminal中也会输出如下成功创建信息:

...
[2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
...

上面显示了flink-topic的基本属性配置,如消息压缩方式,消息格式,备份数量等等。

除了看日志,我们可以用命令显示的查询我们是否成功的创建了flink-topic,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181

flink-tipic

如果输出flink-tipic,那么说明我们的Topic成功创建了。

那么Topic是保存在哪里?Kafka是怎样进行消息的发布和订阅的呢?为直观,我们看如下Kafka架构示意图简单理解一下:
image

简单介绍一下,Kafka利用ZooKeeper来存储集群信息,也就是上面我们启动的Kafka Server 实例,一个集群中可以有多个Kafka Server 实例,Kafka Server叫做Broker,我们创建的Topic可以在一个或多个Broker中。Kafka利用Push模式发送消息,利用Pull方式拉取消息。

发送消息

如何向已经存在的Topic中发送消息呢,当然我们可以API的方式编写代码发送消息。同时,还可以利用命令方式来便捷的发送消息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>Kafka test msg 
>Kafka connector

上面我们发送了两条消息Kafka test msgKafka connectorflink-topic Topic中。

读取消息

如果读取指定Topic的消息呢?同样可以API和命令两种方式都可以完成,我们以命令方式读取flink-topic的消息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
Kafka test msg
Kafka connector

其中--from-beginning 描述了我们从Topic开始位置读取消息。

Flink Kafka Connector

前面我们以最简单的方式安装了Kafka环境,那么我们以上面的环境介绍Flink Kafka Connector的使用。Flink Connector相关的基础知识会在《Apache Flink 漫谈系列(14) - Connectors》中介绍,这里我们直接介绍与Kafka Connector相关的内容。

Apache Flink 中提供了多个版本的Kafka Connector,本篇以flink-1.7.0版本为例进行介绍。

mvn 依赖

要使用Kakfa Connector需要在我们的pom中增加对Kafka Connector的依赖,如下:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.7.0</version>
</dependency>

Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。 DeserializationSchema允许用户指定这样的模式。 为每个Kafka消息调用 T deserialize(byte [] message)方法,从Kafka传递值。

Examples

我们示例读取Kafka的数据,再将数据做简单处理之后写入到Kafka中。我们需要再创建一个用于写入的Topic,如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output

所以示例中我们Source利用flink-topic, Sink用slink-topic-output

Simple ETL

我们假设Kafka中存储的就是一个简单的字符串,所以我们需要一个用于对字符串进行serializedeserialize的实现,也就是我们要定义一个实现DeserializationSchemaSerializationSchema 的序列化和反序列化的类。因为我们示例中是字符串,所以我们自定义一个KafkaMsgSchema实现类,然后在编写Flink主程序。

  • KafkaMsgSchema - 完整代码
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {
    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public KafkaMsgSchema() {
        // 默认UTF-8编码
        this(Charset.forName("UTF-8"));
    }

    public KafkaMsgSchema(Charset charset) {
        this.charset = Preconditions.checkNotNull(charset);
    }

    public Charset getCharset() {
        return this.charset;
    }

    public String deserialize(byte[] message) {
        // 将Kafka的消息反序列化为java对象
        return new String(message, charset);
    }

    public boolean isEndOfStream(String nextElement) {
        // 流永远不结束
        return false;
    }

    public byte[] serialize(String element) {
       // 将java对象序列化为Kafka的消息
        return element.getBytes(this.charset);
    }

    public TypeInformation<String> getProducedType() {
        // 定义产生的数据Typeinfo
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
        out.writeUTF(this.charset.name());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        String charsetName = in.readUTF();
        this.charset = Charset.forName(charsetName);
    }
}
  • 主程序 - 完整代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaExample {
    public static void main(String[] args) throws Exception {
        // 用户参数获取
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        // Stream 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Source的topic
        String sourceTopic = "flink-topic";
        // Sink的topic
        String sinkTopic = "flink-topic-output";
        // broker 地址
        String broker = "localhost:9092";

        // 属性参数 - 实际投产可以在命令行传入
        Properties p = parameterTool.getProperties();
        p.putAll(parameterTool.getProperties());
        p.put("bootstrap.servers", broker);

        env.getConfig().setGlobalJobParameters(parameterTool);

        // 创建消费者
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(
                sourceTopic,
                new KafkaMsgSchema(),
                p);
        // 设置读取最早的数据
//        consumer.setStartFromEarliest();

        // 读取Kafka消息
        DataStream<String> input = env.addSource(consumer);


        // 数据处理
        DataStream<String> result = input.map(new MapFunction<String, String>() {
            public String map(String s) throws Exception {
                String msg = "Flink study ".concat(s);
                System.out.println(msg);
                return msg;
            }
        });

        // 创建生产者
        FlinkKafkaProducer producer = new FlinkKafkaProducer<String>(
                sinkTopic,
                new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()),
                p,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

        // 将数据写入Kafka指定Topic中
        result.addSink(producer);

        // 执行job
        env.execute("Kafka Example");
    }
}

运行主程序如下:
image

我测试操作的过程如下:

  1. 启动flink-topicflink-topic-output的消费拉取;
  2. 通过命令向flink-topic中添加测试消息only for test;
  3. 通过命令打印验证添加的测试消息 only for test;
  4. 最简单的FlinkJob source->map->sink 对测试消息进行map处理:"Flink study ".concat(s);
  5. 通过命令打印sink的数据;

#### 内置Schemas
Apache Flink 内部提供了如下3种内置的常用消息格式的Schemas:

  • TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) 它基于Flink的TypeInformation创建模式。 如果数据由Flink写入和读取,这将非常有用。
  • JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) 它将序列化的JSON转换为ObjectNode对象,可以使用objectNode.get(“field”)作为(Int / String / ...)()从中访问字段。 KeyValue objectNode包含“key”和“value”字段,其中包含所有字段以及可选的"metadata"字段,该字段公开此消息的偏移量/分区/主题。
  • AvroDeserializationSchema 它使用静态提供的模式读取使用Avro格式序列化的数据。 它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(...))推断出模式,或者它可以与GenericRecords一起使用手动提供的模式(使用AvroDeserializationSchema.forGeneric(...))

要使用内置的Schemas需要添加如下依赖:

 <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.7.0</version>
</dependency>

读取位置配置

我们在消费Kafka数据时候,可能需要指定消费的位置,Apache Flink 的FlinkKafkaConsumer提供很多便利的位置设置,如下:

  • consumer.setStartFromEarliest() - 从最早的记录开始;
  • consumer.setStartFromLatest() - 从最新记录开始;
  • consumer.setStartFromTimestamp(...); // 从指定的epoch时间戳(毫秒)开始;
  • consumer.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。

上面的位置指定可以精确到每个分区,比如如下代码:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始

consumer.setStartFromSpecificOffsets(specificStartOffsets);

对于没有指定的分区还是默认的setStartFromGroupOffsets方式。

Topic发现

Kafka支持Topic自动发现,也就是用正则的方式创建FlinkKafkaConsumer,比如:

// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(            java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
new KafkaMsgSchema(),
p);

在上面的示例中,当作业开始运行时,消费者将订阅名称与指定正则表达式匹配的所有Topic(以sourceTopic的值开头并以单个数字结尾)。

定义Watermark(Window)

对Kafka Connector的应用不仅限于上面的简单数据提取,我们更多时候是期望对Kafka数据进行Event-time的窗口操作,那么就需要在Flink Kafka Source中定义Watermark。

要定义Event-time,首先是Kafka数据里面携带时间属性,假设我们数据是String#Long的格式,如only for test#1000。那么我们将Long作为时间列。

  • KafkaWithTsMsgSchema - 完整代码
    要想解析上面的Kafka的数据格式,我们需要开发一个自定义的Schema,比如叫KafkaWithTsMsgSchema,将String#Long解析为一个Java的Tuple2<String, Long>,完整代码如下:
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {
    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public KafkaWithTsMsgSchema() {
        this(Charset.forName("UTF-8"));
    }

    public KafkaWithTsMsgSchema(Charset charset) {
        this.charset = Preconditions.checkNotNull(charset);
    }

    public Charset getCharset() {
        return this.charset;
    }

    public Tuple2<String, Long> deserialize(byte[] message) {
        String msg = new String(message, charset);
        String[] dataAndTs = msg.split("#");
        if(dataAndTs.length == 2){
            return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
        }else{
            // 实际生产上需要抛出runtime异常
            System.out.println("Fail due to invalid msg format.. ["+msg+"]");
            return new Tuple2<String, Long>(msg, 0L);
        }
    }

    @Override
    public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {
        return false;
    }

    public byte[] serialize(Tuple2<String, Long> element) {
        return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
        out.writeUTF(this.charset.name());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        String charsetName = in.readUTF();
        this.charset = Charset.forName(charsetName);
    }

    @Override
    public TypeInformation<Tuple2<String, Long>> getProducedType() {
        return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
    }
}
  • Watermark生成

提取时间戳和创建Watermark,需要实现一个自定义的时间提取和Watermark生成器。在Apache Flink 内部有2种方式如下:

  • AssignerWithPunctuatedWatermarks - 每条记录都产生Watermark。
  • AssignerWithPeriodicWatermarks - 周期性的生成Watermark。

    我们以AssignerWithPunctuatedWatermarks为例写一个自定义的时间提取和Watermark生成器。代码如下:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

public class KafkaAssignerWithPunctuatedWatermarks
        implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {
    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) {
        // 利用提取的时间戳创建Watermark
        return new Watermark(l);
    }

    @Override
    public long extractTimestamp(Tuple2<String, Long> o, long l) {
       // 提取时间戳
        return o.f1;
    }
}
  • 主程序 - 完整程序
    我们计算一个大小为1秒的Tumble窗口,计算窗口内最大的值。完整的程序如下:
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaWithEventTimeExample {
    public static void main(String[] args) throws Exception {
        // 用户参数获取
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        // Stream 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置 Event-time
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // Source的topic
        String sourceTopic = "flink-topic";
        // Sink的topic
        String sinkTopic = "flink-topic-output";
        // broker 地址
        String broker = "localhost:9092";

        // 属性参数 - 实际投产可以在命令行传入
        Properties p = parameterTool.getProperties();
        p.putAll(parameterTool.getProperties());
        p.put("bootstrap.servers", broker);

        env.getConfig().setGlobalJobParameters(parameterTool);
        // 创建消费者
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>(
                sourceTopic,
                new KafkaWithTsMsgSchema(),
                p);

        // 读取Kafka消息
        TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<Tuple2<String, Long>>(
                BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);

        DataStream<Tuple2<String, Long>> input = env
                .addSource(consumer).returns(typeInformation)
                // 提取时间戳,并生产Watermark
                .assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());

        // 数据处理
        DataStream<Tuple2<String, Long>> result = input
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
                .max(0);

        // 创建生产者
        FlinkKafkaProducer producer = new FlinkKafkaProducer<Tuple2<String, Long>>(
                sinkTopic,
                new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()),
                p,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

        // 将数据写入Kafka指定Topic中
        result.addSink(producer);

        // 执行job
        env.execute("Kafka With Event-time Example");
    }
}

测试运行如下:
image

简单解释一下,我们输入数如下:

Msg Watermark
E#1000000 1000000
A#3000000 3000000
B#5000000 5000000
C#5000100 5000100
E#5000120 5000120
A#7000000 7000000

我们看的5000000~7000000之间的数据,其中B#5000000, C#5000100E#5000120是同一个窗口的内容。计算MAX值,按字符串比较,最大的消息就是输出的E#5000120

Kafka携带Timestamps

在Kafka-0.10+ 消息可以携带timestamps,也就是说不用单独的在msg中显示添加一个数据列作为timestamps。只有在写入和读取都用Flink时候简单一些。一般情况用上面的示例方式已经足够了。

小结

本篇重点是向大家介绍Kafka如何在Flink中进行应用,开篇介绍了Kafka的简单安装和收发消息的命令演示,然后以一个简单的数据提取和一个Event-time的窗口示例让大家直观的感受如何在Apache Flink中使用Kafka。

关于点赞和评论

本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家!

目录
相关文章
|
1月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
616 13
Apache Flink 2.0-preview released
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
71 3
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
144 0
|
1月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
57 1
|
1月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
47 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
281 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
138 0

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多
    下一篇
    无影云桌面