ksqlDB基本使用

简介: ksqlDB基本使用

基本概念

ksqlDB Server

ksqlDB是事件流数据库,是一种特殊的数据库,基于Kafka的实时数据流处理引擎,提供了强大且易用的SQL交互方式来对Kafka数据流进行处理,而无需编写代码。KSQL具备高扩展、高弹性、容错式等优良特性,并且它提供了大范围的流式处理操作,比如数据过滤、转化、聚合、连接join、窗口化和 Sessionization (即捕获单一会话期间的所有的流事件)等。image.png

ksqlDB CLI

KSQL命令行界面(CLI)以交互方式编写KSQL查询。 KSQL CLI充当KSQL Server的客户端。

事件(Event)

ksqlDB旨在通过使用较低级别的流处理器来提高抽象度。通常,一个事件称为“行”,就像它是关系数据库中的一行一样。

流(Stream)

流代表是一系列历史数据的分区的,不可变的,仅可以追加的集合。一旦将一行插入流中,就无法更改。可以在流的末尾添加新行,但是永远不能更新或者删除现有的行。 每一行数据存储在特定的分区中,每行隐式或显式地拥有一个代表其身份的键,具有相同键的所有行都位于同一分区中。

表(Table)

表是可变的、分区的集合,它的内容会随时间而变化。 流表示事件的历史序列,与之相反,表表示目前的真实情况。表通过利用每一行的键来工作。如果一个行序列共享一个键,那么给定键的最后一行表示该键标识的最新信息,后台进程定期运行并删除除最新行以外的所有行

举例说明image.png假设用户Alice和Bob刚开始分别有200美元和100美元,经过了以下一系列交易:

  • Alice转给Bob 100美元。
  • Bob转给Alice 50美元。
  • Bob转给Alice 100美元。

在例子中Stream表示资金从一个账号转移到另一个账号的历史记录,Table反映了每个用户账号的最新状态。因此我们得出结论:Table将具有账户的当前状态,而Stream将捕获交易记录。

Stream可以看作是Table的变更日志,因为随着时间的推移更新Stream的聚合会产生一个表。 可以将某个Table在某个时间点视为Stream中每个键的最新值的快照(流的数据记录是键值对),观察Table随时间的变化会产生一个Stream。

Docker部署ksqlDB

创建docker-compose.yaml文件,包含ksqlDB Server和ksqlDB Cli:

---
version: '2'
services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:0.15.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: 192.168.1.87:9092  #要连接的kafka集群的地址
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.15.0
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

通过docker-compose up -d命令启动,然后用下面命令连接ksql:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.15.0, Server v0.15.0 located at http://ksqldb-server:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>

Producer代码

package tuling.kafkaDemo;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MsgProducer {
    private final static String TOPIC_NAME = "cr7-topic";
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.87:9092,192.168.1.88:9092,192.168.1.89:9092");
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "11.8.36.125:9092,11.8.38.116:9092,11.8.38.120:9092");
         /*
         发出消息持久化机制参数
        (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
        (2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一
             条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
        (3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略
            会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
         */
        props.put(ProducerConfig.ACKS_CONFIG, "1");
         /*
        发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在
        接收者那边做好消息接收的幂等性处理
        */
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        //重试间隔设置
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
        //设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        /*
        kafka本地线程会从缓冲区取数据,批量发送到broker,
        设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
        */
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        /*
        默认值是0,意思就是消息必须立即被发送,但这样会影响性能
        一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去
        如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
        */
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        //把发送消息的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //创建Kafka消费者实例
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        int msgNum = 50;
        final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
        for (int i = 1; i <= msgNum; i++) {
            Order order = new Order(i, 100 + i, 1, 1000.00);
            //指定发送分区
            /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , 0, order.getOrderId().toString(), JSON.toJSONString(order));*/
            //未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , order.getOrderId().toString(), JSON.toJSONString(order));
            //等待消息发送成功的同步阻塞方法
//            RecordMetadata metadata = producer.send(producerRecord).get();
//            System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
//                    + metadata.partition() + "|offset-" + metadata.offset());
            //异步回调方式发送消息
            producer.send(producerRecord, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("发送消息失败:" + exception.getStackTrace());
                    }
                    if (metadata != null) {
                        System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                                + metadata.partition() + "|offset-" + metadata.offset());
                    }
                    // CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。
                    // 当每一个线程完成自己任务后,计数器的值就会减一。
                    countDownLatch.countDown();
                }
            });
            //异步应用场景送积分TODO
        }
        countDownLatch.await(5, TimeUnit.SECONDS); //当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
        producer.close();  //所有生产者线程完成任务后,主线程关闭和kafka broker的连接
    }
}

Producer会以如下Json格式向Kafka Broker发送数据:

生产者会以如下Json格式
{"orderAmount":1000,"orderId":2,"productId":102,"productNum":1}

打印Topic数据

ksql> PRINT 'cr7-topic' FROM BEGINNING limit 5;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/02/27 16:11:46.239 Z, key: 2, value: {"orderAmount":1000,"orderId":2,"productId":102,"productNum":1}, partition: 2
rowtime: 2021/02/27 16:11:46.239 Z, key: 3, value: {"orderAmount":1000,"orderId":3,"productId":103,"productNum":1}, partition: 2
rowtime: 2021/02/27 16:11:46.240 Z, key: 9, value: {"orderAmount":1000,"orderId":9,"productId":109,"productNum":1}, partition: 2
rowtime: 2021/02/27 16:11:46.241 Z, key: 16, value: {"orderAmount":1000,"orderId":16,"productId":116,"productNum":1}, partition: 2
rowtime: 2021/02/27 16:11:46.241 Z, key: 29, value: {"orderAmount":1000,"orderId":29,"productId":129,"productNum":1}, partition: 2

创建Stream

基于名为cr7-topic的topic创建一个Stream,注意Stream的名字不能有-

ksql> CREATE STREAM  cr7_topic_stream ( orderAmount INTEGER, orderId INTEGER, productId INTEGER, productNum INTEGER)
WITH (kafka_topic='cr7-topic',value_format='json');
 Message
----------------
 Stream created
----------------

列出所有Stream

ksql> list streams;
 Stream Name         | Kafka Topic                 | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------
 CR7_TOPIC_STREAM    | cr7-topic                   | KAFKA      | JSON         | false

查询Stream数据

运行Producer程序,可以看到会持续输出数据:

ksql> select * from  CR7_TOPIC_STREAM EMIT CHANGES;
+---------------------------+---------------------------+---------------------------+---------------------------+
|ORDERAMOUNT                |ORDERID                    |PRODUCTID                  |PRODUCTNUM                 |
+---------------------------+---------------------------+---------------------------+---------------------------+
|1000                       |4                          |104                        |1                          |
|1000                       |6                          |106                        |1                          |
|1000                       |10                         |110                        |1                          |
|1000                       |12                         |112                        |1                          |
|1000                       |13                         |113                        |1                          |
|1000                       |14                         |114                        |1                          |
|1000                       |18                         |118                        |1                          |
|1000                       |19                         |119                        |1                          |
|1000                       |20                         |120                        |1                          |
|1000                       |24                         |124                        |1                          |
|1000                       |26                         |126                        |1                          |
|1000                       |31                         |131                        |1                          |
|1000                       |35                         |135                        |1                          |
|1000                       |38                         |138                        |1                          |
|1000                       |39                         |139                        |1                          |
|1000                       |42                         |142                        |1                          |
|1000                       |46                         |146                        |1                          |
|1000                       |1                          |101                        |1                          |
|1000                       |5                          |105                        |1                          |
|1000                       |7                          |107                        |1                          |
|1000                       |8                          |108                        |1                          |
|1000                       |11                         |111                        |1                          |
|1000                       |15                         |115                        |1                          |
|1000                       |17                         |117                        |1                          |
|1000                       |21                         |121                        |1                          |
|1000                       |22                         |122                        |1                          |
|1000                       |23                         |123                        |1                          |
|1000                       |25                         |125                        |1                          |
|1000                       |2                          |102                        |1                          |
|1000                       |3                          |103                        |1                          |

通过Stream创建另一个Stream

将Stream cr7_topic_stream中orderid为单数的数据写入新的Stream s3中:

ksql> CREATE STREAM s3 AS SELECT * FROM cr7_topic_stream
WHERE (orderid%2) != 0 EMIT CHANGES;

查看Stream s3,可以看到只有orderid为单数的数据:

ksql> select * from s3 emit changes;
+---------------------------+---------------------------+---------------------------+---------------------------+
|ORDERAMOUNT                |ORDERID                    |PRODUCTID                  |PRODUCTNUM                 |
+---------------------------+---------------------------+---------------------------+---------------------------+
|1000                       |1                          |101                        |1                          |
|1000                       |5                          |105                        |1                          |
|1000                       |7                          |107                        |1                          |
|1000                       |11                         |111                        |1                          |
|1000                       |15                         |115                        |1                          |
|1000                       |17                         |117                        |1                          |
|1000                       |21                         |121                        |1                          |
|1000                       |23                         |123                        |1                          |
|1000                       |25                         |125                        |1                          |
|1000                       |27                         |127                        |1                          |
|1000                       |33                         |133                        |1                          |
|1000                       |37                         |137                        |1                          |
|1000                       |43                         |143                        |1                          |
|1000                       |45                         |145                        |1                          |
|1000                       |47                         |147                        |1                          |
|1000                       |13                         |113                        |1                          |
|1000                       |19                         |119                        |1                          |
|1000                       |31                         |131                        |1                          |
|1000                       |35                         |135                        |1                          |
|1000                       |39                         |139                        |1                          |
|1000                       |3                          |103                        |1                          |

Stream数据聚合查询

查询Stream cr7_topic_stream中的条目总数和orderamount的总和,并以productnum作为分组:

ksql> SELECT COUNT(*),SUM(orderamount) from cr7_topic_stream GROUP BY  productnum EMIT CHANGES;
+---------------------------------------------------------+---------------------------------------------------------+
|KSQL_COL_0                                               |KSQL_COL_1                                               |
+---------------------------------------------------------+---------------------------------------------------------+
|50                                                       |50000

手动往Stream中插入数据

ksql> INSERT INTO cr7_topic_stream 
(orderId,productNum)
values (777,7777);

查看Stream数据结构

ksql> describe  cr7_topic_stream;
Name                 : CR7_TOPIC_STREAM
 Field       | Type
-----------------------
 ORDERAMOUNT | INTEGER
 ORDERID     | INTEGER
 PRODUCTID   | INTEGER
 PRODUCTNUM  | INTEGER
-----------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

将上EXTENDED参数查看详细信息:

ksql> describe extended cr7_topic_stream;
Name                 : CR7_TOPIC_STREAM
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : cr7-topic (partitions: 3, replication: 3)
Statement            : CREATE STREAM CR7_TOPIC_STREAM (ORDERAMOUNT INTEGER, ORDERID INTEGER, PRODUCTID INTEGER, PRODUCTNUM INTEGER) WITH (KAFKA_TOPIC='cr7-topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
 Field       | Type
-----------------------
 ORDERAMOUNT | INTEGER
 ORDERID     | INTEGER
 PRODUCTID   | INTEGER
 PRODUCTNUM  | INTEGER
-----------------------
Sources that have a DROP constraint on this source
--------------------------------------------------
S3
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic cr7-topic)

删除Stream

DROP STREAM cr7_topic_stream;

创建Table

必须要含有主键,主键是Kafka生产者生产消息时指定的key。

ksql> CREATE TABLE  cr7_topic_table (
orderAmount INTEGER, orderId INTEGER, productId INTEGER, productNum INTEGER, kafkaProducerKey VARCHAR PRIMARY KEY
)
WITH (kafka_topic='cr7-topic',value_format='json');

kafka脚本生产消息指定key的方法:

#以逗号作为key和value的分隔符。
kafka-console-producer.sh --broker-list kafka1:9092 --topic cr7-topic --property parse.key=true --property key.separator=,
>mykey,{"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}

查看Table信息

ksql> describe cr7_topic_table;
Name                 : CR7_TOPIC_TABLE
 Field       | Type
----------------------------------------------
 ORDERAMOUNT | INTEGER
 ORDERID     | INTEGER          (primary key)
 PRODUCTID   | INTEGER
 PRODUCTNUM  | INTEGER
----------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

查询Table

ksql> select * from cr7_topic_table emit changes;
+---------------------+---------------------+---------------------+---------------------+---------------------+
|KAFKAPRODUCERKEY     |ORDERAMOUNT          |ORDERID              |PRODUCTID            |PRODUCTNUM           |
+---------------------+---------------------+---------------------+---------------------+---------------------+
|1                    |1000                 |1                    |101                  |1                    |
|2                    |1000                 |2                    |102                  |2                    |
|3                    |1000                 |3                    |103                  |3                    |
......
#当生产者重新生产数据,把Java代码中 
#Order order = new Order(i, 100 + i, 1, 1000.00);
修改为
#Order order = new Order(i, 100 + i, 1, 2000.00);
#在key值一样的情况下,查cr7_topic_table会是最新的值
|2                    |2000                 |2                    |102                  |2                    |
|3                    |2000                 |3                    |103                  |3                    |
|1                    |2000                 |1                    |101                  |1                    |
......
目录
相关文章
|
8月前
最新jsonwebtoken-jjwt 0.12.3 基本使用
最新jsonwebtoken-jjwt 0.12.3 基本使用
698 0
|
5月前
|
网络安全 Perl
Nikto——基本使用
Nikto——基本使用
68 0
|
8月前
|
存储 缓存 Java
【scoop】安装及基本使用
【scoop】安装及基本使用
519 0
|
8月前
|
存储 NoSQL 定位技术
RedisGEO的基本使用
对GEO的基本介绍
ApplicationEventPublisher的简单使用
ApplicationEventPublisher的简单使用
448 0
CodeBlock 基本使用
基本介绍 代码块又称为初始化块,属于类中的成员【即是类的一部分,类似于方法,讲逻辑语句封装在方法体中,通过{}包围起来】 和方法不同,没有方法名,没有返回值,没有参数,只有方法体,而且不用通过对象或类显式调用,而是加载类时或创建对象时隐式调用。
159 0
|
Go
基本使用
基本使用
88 0
|
缓存 Shell 索引
ElastchSearch 基本使用姿势
ElastchSearch 基本使用姿势,如常见的 添加文档 常见的查询姿势 修改/删除文档
266 0
ElastchSearch 基本使用姿势
|
定位技术
MapKit的基本使用
MapKit的基本使用
522 0
MapKit的基本使用