Apache Flink 漫谈系列(15) - DataStream Connectors之Kafka

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 聊什么 为了满足本系列读者的需求,在完成《Apache Flink 漫谈系列(14) - DataStream Connectors》之前,我先介绍一下Kafka在Apache Flink中的使用。所以本篇以一个简单的示例,向大家介绍在Apache Flink中如何使用Kafka。

聊什么

为了满足本系列读者的需求,在完成《Apache Flink 漫谈系列(14) - DataStream Connectors》之前,我先介绍一下Kafka在Apache Flink中的使用。所以本篇以一个简单的示例,向大家介绍在Apache Flink中如何使用Kafka。

Kafka 简介

Apache Kafka是一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,LinkedIn于2010年贡献给了Apache基金会并成为顶级开源项目。Kafka用于构建实时数据管道和流式应用程序。它具有水平扩展性、容错性、极快的速度,目前也得到了广泛的应用。

Kafka不但是分布式消息系统而且也支持流式计算,所以在介绍Kafka在Apache Flink中的应用之前,先以一个Kafka的简单示例直观了解什么是Kafka。

安装

本篇不是系统的,详尽的介绍Kafka,而是想让大家直观认识Kafka,以便在Apahe Flink中进行很好的应用,所以我们以最简单的方式安装Kafka。

  • 下载二进制包
curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
  • 解压安装
    Kafka安装只需要将下载的tgz解压即可,如下:
jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz 
jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls
LICENSE        NOTICE        bin        config        libs        site-docs

其中bin包含了所有Kafka的管理命令,如接下来我们要启动的Kafka的Server。

  • 启动Kafka Server
    Kafka是一个发布订阅系统,消息订阅首先要有个服务存在。我们启动一个Kafka Server 实例。 Kafka需要使用ZooKeeper,要进行投产部署我们需要安装ZooKeeper集群,这不在本篇的介绍范围内,所以我们利用Kafka提供的脚本,安装一个只有一个节点的ZooKeeper实例。如下:
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &

[2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
....
....
[2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

启动之后,ZooKeeper会绑定2181端口(默认)。接下来我们启动Kafka Server,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties
[2019-01-13 09:09:16,937] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer)
[2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
...
...
[2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

如果上面一切顺利,Kafka的安装就完成了。

创建Topic

Kafka是消息订阅系统,首先创建可以被订阅的Topic,我们创建一个名为flink-tipic的Topic,在一个新的terminal中,执行如下命令:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic

Created topic "flink-tipic".

在Kafka Server的terminal中也会输出如下成功创建信息:

...
[2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
...

上面显示了flink-topic的基本属性配置,如消息压缩方式,消息格式,备份数量等等。

除了看日志,我们可以用命令显示的查询我们是否成功的创建了flink-topic,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181

flink-tipic

如果输出flink-tipic,那么说明我们的Topic成功创建了。

那么Topic是保存在哪里?Kafka是怎样进行消息的发布和订阅的呢?为直观,我们看如下Kafka架构示意图简单理解一下:
image

简单介绍一下,Kafka利用ZooKeeper来存储集群信息,也就是上面我们启动的Kafka Server 实例,一个集群中可以有多个Kafka Server 实例,Kafka Server叫做Broker,我们创建的Topic可以在一个或多个Broker中。Kafka利用Push模式发送消息,利用Pull方式拉取消息。

发送消息

如何向已经存在的Topic中发送消息呢,当然我们可以API的方式编写代码发送消息。同时,还可以利用命令方式来便捷的发送消息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>Kafka test msg 
>Kafka connector

上面我们发送了两条消息Kafka test msgKafka connectorflink-topic Topic中。

读取消息

如果读取指定Topic的消息呢?同样可以API和命令两种方式都可以完成,我们以命令方式读取flink-topic的消息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
Kafka test msg
Kafka connector

其中--from-beginning 描述了我们从Topic开始位置读取消息。

Flink Kafka Connector

前面我们以最简单的方式安装了Kafka环境,那么我们以上面的环境介绍Flink Kafka Connector的使用。Flink Connector相关的基础知识会在《Apache Flink 漫谈系列(14) - Connectors》中介绍,这里我们直接介绍与Kafka Connector相关的内容。

Apache Flink 中提供了多个版本的Kafka Connector,本篇以flink-1.7.0版本为例进行介绍。

mvn 依赖

要使用Kakfa Connector需要在我们的pom中增加对Kafka Connector的依赖,如下:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.7.0</version>
</dependency>

Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。 DeserializationSchema允许用户指定这样的模式。 为每个Kafka消息调用 T deserialize(byte [] message)方法,从Kafka传递值。

Examples

我们示例读取Kafka的数据,再将数据做简单处理之后写入到Kafka中。我们需要再创建一个用于写入的Topic,如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output

所以示例中我们Source利用flink-topic, Sink用slink-topic-output

Simple ETL

我们假设Kafka中存储的就是一个简单的字符串,所以我们需要一个用于对字符串进行serializedeserialize的实现,也就是我们要定义一个实现DeserializationSchemaSerializationSchema 的序列化和反序列化的类。因为我们示例中是字符串,所以我们自定义一个KafkaMsgSchema实现类,然后在编写Flink主程序。

  • KafkaMsgSchema - 完整代码
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {
    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public KafkaMsgSchema() {
        // 默认UTF-8编码
        this(Charset.forName("UTF-8"));
    }

    public KafkaMsgSchema(Charset charset) {
        this.charset = Preconditions.checkNotNull(charset);
    }

    public Charset getCharset() {
        return this.charset;
    }

    public String deserialize(byte[] message) {
        // 将Kafka的消息反序列化为java对象
        return new String(message, charset);
    }

    public boolean isEndOfStream(String nextElement) {
        // 流永远不结束
        return false;
    }

    public byte[] serialize(String element) {
       // 将java对象序列化为Kafka的消息
        return element.getBytes(this.charset);
    }

    public TypeInformation<String> getProducedType() {
        // 定义产生的数据Typeinfo
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
        out.writeUTF(this.charset.name());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        String charsetName = in.readUTF();
        this.charset = Charset.forName(charsetName);
    }
}
  • 主程序 - 完整代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaExample {
    public static void main(String[] args) throws Exception {
        // 用户参数获取
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        // Stream 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Source的topic
        String sourceTopic = "flink-topic";
        // Sink的topic
        String sinkTopic = "flink-topic-output";
        // broker 地址
        String broker = "localhost:9092";

        // 属性参数 - 实际投产可以在命令行传入
        Properties p = parameterTool.getProperties();
        p.putAll(parameterTool.getProperties());
        p.put("bootstrap.servers", broker);

        env.getConfig().setGlobalJobParameters(parameterTool);

        // 创建消费者
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(
                sourceTopic,
                new KafkaMsgSchema(),
                p);
        // 设置读取最早的数据
//        consumer.setStartFromEarliest();

        // 读取Kafka消息
        DataStream<String> input = env.addSource(consumer);


        // 数据处理
        DataStream<String> result = input.map(new MapFunction<String, String>() {
            public String map(String s) throws Exception {
                String msg = "Flink study ".concat(s);
                System.out.println(msg);
                return msg;
            }
        });

        // 创建生产者
        FlinkKafkaProducer producer = new FlinkKafkaProducer<String>(
                sinkTopic,
                new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()),
                p,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

        // 将数据写入Kafka指定Topic中
        result.addSink(producer);

        // 执行job
        env.execute("Kafka Example");
    }
}

运行主程序如下:
image

我测试操作的过程如下:

  1. 启动flink-topicflink-topic-output的消费拉取;
  2. 通过命令向flink-topic中添加测试消息only for test;
  3. 通过命令打印验证添加的测试消息 only for test;
  4. 最简单的FlinkJob source->map->sink 对测试消息进行map处理:"Flink study ".concat(s);
  5. 通过命令打印sink的数据;

#### 内置Schemas
Apache Flink 内部提供了如下3种内置的常用消息格式的Schemas:

  • TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) 它基于Flink的TypeInformation创建模式。 如果数据由Flink写入和读取,这将非常有用。
  • JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) 它将序列化的JSON转换为ObjectNode对象,可以使用objectNode.get(“field”)作为(Int / String / ...)()从中访问字段。 KeyValue objectNode包含“key”和“value”字段,其中包含所有字段以及可选的"metadata"字段,该字段公开此消息的偏移量/分区/主题。
  • AvroDeserializationSchema 它使用静态提供的模式读取使用Avro格式序列化的数据。 它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(...))推断出模式,或者它可以与GenericRecords一起使用手动提供的模式(使用AvroDeserializationSchema.forGeneric(...))

要使用内置的Schemas需要添加如下依赖:

 <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.7.0</version>
</dependency>

读取位置配置

我们在消费Kafka数据时候,可能需要指定消费的位置,Apache Flink 的FlinkKafkaConsumer提供很多便利的位置设置,如下:

  • consumer.setStartFromEarliest() - 从最早的记录开始;
  • consumer.setStartFromLatest() - 从最新记录开始;
  • consumer.setStartFromTimestamp(...); // 从指定的epoch时间戳(毫秒)开始;
  • consumer.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。

上面的位置指定可以精确到每个分区,比如如下代码:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始

consumer.setStartFromSpecificOffsets(specificStartOffsets);

对于没有指定的分区还是默认的setStartFromGroupOffsets方式。

Topic发现

Kafka支持Topic自动发现,也就是用正则的方式创建FlinkKafkaConsumer,比如:

// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(            java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
new KafkaMsgSchema(),
p);

在上面的示例中,当作业开始运行时,消费者将订阅名称与指定正则表达式匹配的所有Topic(以sourceTopic的值开头并以单个数字结尾)。

定义Watermark(Window)

对Kafka Connector的应用不仅限于上面的简单数据提取,我们更多时候是期望对Kafka数据进行Event-time的窗口操作,那么就需要在Flink Kafka Source中定义Watermark。

要定义Event-time,首先是Kafka数据里面携带时间属性,假设我们数据是String#Long的格式,如only for test#1000。那么我们将Long作为时间列。

  • KafkaWithTsMsgSchema - 完整代码
    要想解析上面的Kafka的数据格式,我们需要开发一个自定义的Schema,比如叫KafkaWithTsMsgSchema,将String#Long解析为一个Java的Tuple2<String, Long>,完整代码如下:
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {
    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public KafkaWithTsMsgSchema() {
        this(Charset.forName("UTF-8"));
    }

    public KafkaWithTsMsgSchema(Charset charset) {
        this.charset = Preconditions.checkNotNull(charset);
    }

    public Charset getCharset() {
        return this.charset;
    }

    public Tuple2<String, Long> deserialize(byte[] message) {
        String msg = new String(message, charset);
        String[] dataAndTs = msg.split("#");
        if(dataAndTs.length == 2){
            return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
        }else{
            // 实际生产上需要抛出runtime异常
            System.out.println("Fail due to invalid msg format.. ["+msg+"]");
            return new Tuple2<String, Long>(msg, 0L);
        }
    }

    @Override
    public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {
        return false;
    }

    public byte[] serialize(Tuple2<String, Long> element) {
        return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
        out.writeUTF(this.charset.name());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        String charsetName = in.readUTF();
        this.charset = Charset.forName(charsetName);
    }

    @Override
    public TypeInformation<Tuple2<String, Long>> getProducedType() {
        return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
    }
}
  • Watermark生成

提取时间戳和创建Watermark,需要实现一个自定义的时间提取和Watermark生成器。在Apache Flink 内部有2种方式如下:

  • AssignerWithPunctuatedWatermarks - 每条记录都产生Watermark。
  • AssignerWithPeriodicWatermarks - 周期性的生成Watermark。

    我们以AssignerWithPunctuatedWatermarks为例写一个自定义的时间提取和Watermark生成器。代码如下:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

public class KafkaAssignerWithPunctuatedWatermarks
        implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {
    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) {
        // 利用提取的时间戳创建Watermark
        return new Watermark(l);
    }

    @Override
    public long extractTimestamp(Tuple2<String, Long> o, long l) {
       // 提取时间戳
        return o.f1;
    }
}
  • 主程序 - 完整程序
    我们计算一个大小为1秒的Tumble窗口,计算窗口内最大的值。完整的程序如下:
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaWithEventTimeExample {
    public static void main(String[] args) throws Exception {
        // 用户参数获取
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        // Stream 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置 Event-time
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // Source的topic
        String sourceTopic = "flink-topic";
        // Sink的topic
        String sinkTopic = "flink-topic-output";
        // broker 地址
        String broker = "localhost:9092";

        // 属性参数 - 实际投产可以在命令行传入
        Properties p = parameterTool.getProperties();
        p.putAll(parameterTool.getProperties());
        p.put("bootstrap.servers", broker);

        env.getConfig().setGlobalJobParameters(parameterTool);
        // 创建消费者
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>(
                sourceTopic,
                new KafkaWithTsMsgSchema(),
                p);

        // 读取Kafka消息
        TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<Tuple2<String, Long>>(
                BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);

        DataStream<Tuple2<String, Long>> input = env
                .addSource(consumer).returns(typeInformation)
                // 提取时间戳,并生产Watermark
                .assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());

        // 数据处理
        DataStream<Tuple2<String, Long>> result = input
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
                .max(0);

        // 创建生产者
        FlinkKafkaProducer producer = new FlinkKafkaProducer<Tuple2<String, Long>>(
                sinkTopic,
                new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()),
                p,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

        // 将数据写入Kafka指定Topic中
        result.addSink(producer);

        // 执行job
        env.execute("Kafka With Event-time Example");
    }
}

测试运行如下:
image

简单解释一下,我们输入数如下:

Msg Watermark
E#1000000 1000000
A#3000000 3000000
B#5000000 5000000
C#5000100 5000100
E#5000120 5000120
A#7000000 7000000

我们看的5000000~7000000之间的数据,其中B#5000000, C#5000100E#5000120是同一个窗口的内容。计算MAX值,按字符串比较,最大的消息就是输出的E#5000120

Kafka携带Timestamps

在Kafka-0.10+ 消息可以携带timestamps,也就是说不用单独的在msg中显示添加一个数据列作为timestamps。只有在写入和读取都用Flink时候简单一些。一般情况用上面的示例方式已经足够了。

小结

本篇重点是向大家介绍Kafka如何在Flink中进行应用,开篇介绍了Kafka的简单安装和收发消息的命令演示,然后以一个简单的数据提取和一个Event-time的窗口示例让大家直观的感受如何在Apache Flink中使用Kafka。

关于点赞和评论

本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家!

目录
相关文章
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
334 33
The Past, Present and Future of Apache Flink
|
2月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
148 7
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
109 5
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
123 4
|
2月前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
109 4
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
86 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
71 1
|
2月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
108 2
|
2月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
2月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
65 0

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多