kafkaStream处理实时流式计算

简介: kafkaStream处理实时流式计算

1 实时流式计算

1.1 概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。


20210405232633938.png


流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。


1.2 应用场景

日志分析

网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策


大屏看板统计

可以实时的查看网站注册数量,订单数量,购买数量,金额等。


公交实时数据

可以随时更新公交车方位,计算多久到达站牌等


实时文章分值计算

头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。


1.3 技术方案选型

Hadoop


Apche Storm


Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。


Kafka Stream

可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。


2 Kafka Stream

2.1 概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。


Kafka Stream的特点如下:


Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署

除了Kafka外,无任何外部依赖

充分利用Kafka分区机制实现水平扩展和顺序性保证

通过可容错的state store实现高效的状态操作(如windowed join和aggregation)

支持正好一次处理语义

提供记录级的处理能力,从而实现毫秒级的低延迟

支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)

同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

2.2 Kafka Streams的关键概念

(1)Stream处理拓扑


流是Kafka Stream提出的最重要的抽象概念:它表示一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。

通过Kafka Streams编写一个或多个的计算逻辑的处理器拓扑。其中处理器拓扑是一个由流(边缘)连接的流处理(节点)的图。

流处理器是处理器拓扑中的一个节点;它表示一个处理的步骤,用来转换流中的数据(从拓扑中的上游处理器一次接受一个输入消息,并且随后产生一个或多个输出消息到其下游处理器中)。

(2)在拓扑中有两个特别的处理器:


源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。


Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。


2.3 KStream&KTable

(1)数据结构类似于map,如下图,key-value键值对


(2)KStream


KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。

数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。


KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。


为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:


(“ alice”,1)->("alice“,3)


如果您的流处理应用是要总结每个用户的价值,它将返回4了alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据


(3)KTable


KTable传统数据库,包含了各种存储了大量状态(state)的表格。KTable负责抽象的,就是表状数据。每一次操作,都是更新插入(update)


为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:


(“ alice”,1)->(“” alice“,3)


如果您的流处理应用是要总结每个用户的价值,它将返回3了alice。为什么?因为第二条数据记录将被视为先前记录的更新。


KStream - 每个新数据都包含了部分信息。


KTable - 每次更新都合并到原记录上。


2.4 Kafka Stream入门案例编写

(1)引入依赖


在之前的kafka-demo工程的pom文件中引入

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${kafka.client.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>


(2)创建类

package com.oldlu.kafka.simple;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;
/**
 * 需求:
 *  接收kafka消息内容并计算消息内单词的个数
 *  如:
 *      hello kafka stareams
 *      hello oldlu kafka
 *      hello beijing oldlu kafka
 *
 *  结果:
 *      hello  3
 *      kafka 3
 *      streams 1
 *      oldlu 2
 *      beijing 1
 */
public class KafkaStreamFastStart {
    public static void main(String[] args) {
        //kafka配置信息
        Properties prop = new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-faststart");
        //stream构建器
        StreamsBuilder builder = new StreamsBuilder();
        //流式计算
        group(builder);
        //创建kafkaStream
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),prop);
        //开启kafka流计算
        kafkaStreams.start();
    }
    /**
     * 实时流式计算
     * @param builder
     */
    private static void group(StreamsBuilder builder) {
        //接收上游处理器的消息
        KStream<String, String> stream = builder.stream("input_topic");
        KStream<String, String> map = stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            /**
             * 把消息中的词组,转换为一个一个的单词放到集合中
             * @param value
             * @return
             */
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
            /**
             * 把消息的key,重新赋值,目前消息的key,就是一个个的单词,把单词作为key进行聚合
             * @param key
             * @param value
             * @return
             */
            @Override
            public KeyValue<String, String> apply(String key, String value) {
                return new KeyValue<>(value, value);
            }
        })
                //根据key进行分组  目前的key 就是value,就是一个个的单词
                .groupByKey()
                //聚合的时间窗口  多久聚合一次
                .windowedBy(TimeWindows.of(10000))
                //聚合  求单词的个数,调用count后,消息的vlaue是聚合单词后的统计数值  是一个long类型
                //Materialized.as("count-article-num-001")  是当前消息的状态值,不重复即可
                .count(Materialized.as("count-article-num-001"))
                //转换成 Kstream
                .toStream()
                //把处理后的key和value转成string
                .map((key, value) -> {
                    return new KeyValue<>(key.key().toString(), value.toString());
                });
        //处理后的结果,发送给下游处理器
        map.to("out_topic");
    }
}

(3)测试


准备


使用生产者在topic为:input_topic中发送多条消息

使用消费者接收topic为:out_topic

①生产者代码修改ProducerFastStart

package com.oldlu.kafka.simple;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.protocol.types.Field;
import java.util.Properties;
/**
 * 消息生产者
 */
public class ProducerFastStart {
    private static final String INPUT_TOPIC="input_topic";
    public static void main(String[] args) {
        //添加kafka的配置信息
        Properties properties = new Properties();
        //配置broker信息
        properties.put("bootstrap.servers","192.168.200.130:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.RETRIES_CONFIG,10);
        //生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        try {
            //封装消息
            for (int i = 0; i < 10; i++) {
                if (i % 2 == 0) {
                    ProducerRecord<String,String> record =
                            new ProducerRecord<String, String>(INPUT_TOPIC,i+"","hello shanghai kafka stream hello");
                    //发送消息
                    producer.send(record);
                    System.out.println("发送消息:"+record);
                }else {
                    ProducerRecord<String,String> record =
                            new ProducerRecord<String, String>(INPUT_TOPIC,i+"","helloworld kafka stream");
                    //发送消息
                    producer.send(record);
                    System.out.println("发送消息:"+record);
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        //关系消息通道
        producer.close();
    }
}

②消费者ConsumerFastStart


package com.oldlu.kafka.simple;
import com.sun.scenario.effect.Offset;
import jdk.nashorn.internal.runtime.logging.Logger;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
 * 消息消费者
 */
public class ConsumerFastStart {
    private static final String OUT_TOPIC="out_topic";
    public static void main(String[] args) {
        //添加配置信息
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //设置分组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList(OUT_TOPIC));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key()+":"+record.value());
                /*try {
                    //手动提交偏移量
                    consumer.commitSync();
                }catch (CommitFailedException e){
                    e.printStackTrace();
                    System.out.println("记录错误信息为:"+e);
                }*/
            }
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if(e!=null){
                        System.out.println("记录当前错误信息时提交的偏移量"+map+",异常信息为:"+e);
                    }
                }
            });
        }
    }
}

结果:


通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

2.5 SpringBoot集成Kafka Stream

从资料文件夹中把提供好的4个类拷贝到项目的config目录下


当前kafka-demo项目需要添加lombok的依赖包

<properties>
    <lombok.version>1.18.8</lombok.version>
</properties>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>${lombok.version}</version>
    <scope>provided</scope>
</dependency>

(1)自定配置参数

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    /**
     * 重新定义默认的KafkaStreams配置属性,包括:
     * 1、服务器地址
     * 2、应用ID
     * 3、流消息的副本数等配置
     * @return
     */
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // 消息副本数量
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
        props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
        return new KafkaStreamsConfiguration(props);
    }
}

修改application.yml文件,在最下方添加自定义配置

kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

(2)定义监听接口

/**
 * 流数据的监听消费者实现的接口类,系统自动会通过
 * KafkaStreamListenerFactory类扫描项目中实现该接口的类,
 * 并注册为流数据的消费端。
 *
 * 其中泛型可是KStream或KTable
 * @param <T>
 */
public interface KafkaStreamListener<T> {
    // 监听的类型
    String listenerTopic();
    // 处理结果发送的类
    String sendTopic();
    // 对象处理逻辑
    T getService(T stream);
}

(3)KafkaStream自动处理包装类


/**
 * KafkaStream自动处理包装类
 */
public class KafkaStreamProcessor {
    // 流构建器
    StreamsBuilder streamsBuilder;
    private String type;
    KafkaStreamListener listener;
    public KafkaStreamProcessor(StreamsBuilder streamsBuilder,KafkaStreamListener kafkaStreamListener){
        this.streamsBuilder = streamsBuilder;
        this.listener = kafkaStreamListener;
        this.parseType();
        Assert.notNull(this.type,"Kafka Stream 监听器只支持kstream、ktable,当前类型是"+this.type);
    }
    /**
     * 通过泛型类型自动注册对应类型的流处理器对象
     * 支持KStream、KTable
     * @return
     */
    public Object doAction(){
        if("kstream".equals(this.type)) {
            KStream<?, ?> stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
            stream=(KStream)listener.getService(stream);
            stream.to(listener.sendTopic());
            return stream;
        }else{
            KTable<?, ?> table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
            table = (KTable)listener.getService(table);
            table.toStream().to(listener.sendTopic());
            return table;
        }
    }
    /**
     * 解析传入listener类的泛型类
     */
    private void parseType(){
        Type[] types = listener.getClass().getGenericInterfaces();
        if(types!=null){
            for (int i = 0; i < types.length; i++) {
                if( types[i] instanceof ParameterizedType){
                    ParameterizedType t = (ParameterizedType)types[i];
                    String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase();
                    if(name.contains("org.apache.kafka.streams.kstream.kstream")||name.contains("org.apache.kafka.streams.kstream.ktable")){
                        this.type = name.substring(0,name.indexOf('<')).replace("org.apache.kafka.streams.kstream.","").trim();
                        break;
                    }
                }
            }
        }
    }
}

(4)KafkaStreamListener扫描和实例化成KafkaStreamProcessor.doAction的返回类,完成监听器实际注册的过程

@Component
public class KafkaStreamListenerFactory implements InitializingBean {
    Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class);
    @Autowired
    DefaultListableBeanFactory defaultListableBeanFactory;
    /**
     * 初始化完成后自动调用
     */
    @Override
    public void afterPropertiesSet() {
        Map<String, KafkaStreamListener> map = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class);
        for (String key : map.keySet()) {
            KafkaStreamListener k = map.get(key);
            KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k);
            String beanName = k.getClass().getSimpleName()+"AutoProcessor" ;
            //注册baen,并且执行doAction方法
            defaultListableBeanFactory.registerSingleton(beanName,processor.doAction());
            logger.info("add kafka stream auto listener [{}]",beanName);
        }
    }
}

(5)手动创建监听器


1,该类需要实现KafkaStreamListener接口


2,listenerTopic方法返回需要监听的topic


3,sendTopic方法返回需要处理完后发送的topic


4,getService方法,主要处理流数据

package com.oldlu.kafka.stream.listerer;
import com.oldlu.kafka.config.KafkaStreamListener;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.stereotype.Component;
import java.util.Arrays;
/**
 * KafkaStreamListener的泛型是固定的,有两种泛型可以选择
 * KTable
 * KStream
 */
@Component
public class StreamHandlerListener implements KafkaStreamListener<KStream<String,String>> {
    /**
     * 在哪里接收消息  源处理器
     * @return
     */
    @Override
    public String listenerTopic() {
        return "input_topic";
    }
    /**
     * 计算完成后的结果发送到什么位置  下游处理器
     * @return
     */
    @Override
    public String sendTopic() {
        return "out_topic";
    }
    @Override
    public KStream<String, String> getService(KStream<String, String> stream) {
        //计算
        return stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            /**
             * 把消息中的词组,转换为一个一个的单词放到集合中
             * @param value
             * @return
             */
            @Override
            public Iterable<String> apply(String value) {
                System.out.println("消息的value:"+value);//hello kafka stareams
                String[] strings = value.split(" ");
                return Arrays.asList(strings);
            }
        }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
            /**
             * 把消息的key,重新赋值,目前消息的key就是单词
             * @param key
             * @param value
             * @return
             */
            @Override
            public KeyValue<String, String> apply(String key, String value) {
                return new KeyValue<>(value,value);
            }
        }).groupByKey()
                //时间聚合窗口
                .windowedBy(TimeWindows.of(5000))
                //消息的value就是聚合单词后的统计数值,long类型
                .count(Materialized.as("count-word-num-0001"))
                //转换为Kstream
                .toStream()
                //把处理后的key和value转换String
                .map((key,value)->{
                    return new KeyValue<>(key.key().toString(),value.toString());
                });
    }
}

测试:


启动微服务,正常发送消息,可以正常接收到消息


目录
相关文章
|
3月前
|
传感器 监控 Java
流计算中的数据延迟是什么?为什么它在流计算中很重要?
流计算中的数据延迟是什么?为什么它在流计算中很重要?
115 0
|
6月前
|
传感器 数据采集 监控
实时数仓的应用
实时数仓的应用
67 1
|
6月前
|
传感器 数据采集 监控
实时数仓的特点
实时数仓的特点
82 0
|
SQL 分布式计算 监控
漫谈实时数仓
漫谈实时数仓
328 0
漫谈实时数仓
|
SQL 数据采集 缓存
【实时数仓篇】(01)美团 Flink 实时数仓应用2
【实时数仓篇】(01)美团 Flink 实时数仓应用2
288 0
【实时数仓篇】(01)美团 Flink 实时数仓应用2
|
存储 消息中间件 缓存
【实时数仓篇】(01)美团 Flink 实时数仓应用1
【实时数仓篇】(01)美团 Flink 实时数仓应用1
427 0
【实时数仓篇】(01)美团 Flink 实时数仓应用1
|
消息中间件 机器学习/深度学习 分布式计算
什么是实时流式计算?
实时流式计算,也就是RealTime,Streaming,Analyse,在不同的领域有不同的定义,这里我们说的是大数据领域的实时流式计算。 实时流式计算,或者是实时计算,流式计算,在大数据领域都是差不多的概念。那么,到底什么是实时流式计算呢?
831 1
什么是实时流式计算?
|
存储 测试技术 数据处理
流计算
在过去几年里,已为流计算开发了数百项应用。下面简单介绍了一些应用,着重强调了流计算支持的使用类型。面对日益增长的海量数据,精细化营销的挑战.通常电信行业会面临海量数据,异构数据,实时主动分析等挑战.
510 0
流计算
|
存储 消息中间件 监控
Flink 在快手实时多维分析场景的应用
作为短视频分享跟直播的平台,快手有诸多业务场景应用了 Flink,包括短视频、直播的质量监控、用户增长分析、实时数据处理、直播 CDN 调度等。此次主要介绍在快手使用 Flink 在实时多维分析场景的应用与优化。
Flink 在快手实时多维分析场景的应用
|
消息中间件 存储 供应链
如果你也想做实时数仓…
数据仓库也是公司数据发展到一定规模后必然会提供的一种基础服务,数据仓库的建设也是“数据智能”中必不可少的一环。本文将从数据仓库的简介、经历了怎样的发展、如何建设、架构演变、应用案例以及实时数仓与离线数仓的对比六个方面全面分享关于数仓的详细内容。
3026 0