基本概念
ksqlDB Server
ksqlDB是事件流数据库,是一种特殊的数据库,基于Kafka的实时数据流处理引擎,提供了强大且易用的SQL交互方式来对Kafka数据流进行处理,而无需编写代码。KSQL具备高扩展、高弹性、容错式等优良特性,并且它提供了大范围的流式处理操作,比如数据过滤、转化、聚合、连接join、窗口化和 Sessionization (即捕获单一会话期间的所有的流事件)等。
ksqlDB CLI
KSQL命令行界面(CLI)以交互方式编写KSQL查询。 KSQL CLI充当KSQL Server的客户端。
事件(Event)
ksqlDB旨在通过使用较低级别的流处理器来提高抽象度。通常,一个事件称为“行”,就像它是关系数据库中的一行一样。
流(Stream)
流代表是一系列历史数据的分区的,不可变的,仅可以追加的集合。一旦将一行插入流中,就无法更改。可以在流的末尾添加新行,但是永远不能更新或者删除现有的行。 每一行数据存储在特定的分区中,每行隐式或显式地拥有一个代表其身份的键,具有相同键的所有行都位于同一分区中。
表(Table)
表是可变的、分区的集合,它的内容会随时间而变化。 流表示事件的历史序列,与之相反,表表示目前的真实情况。表通过利用每一行的键来工作。如果一个行序列共享一个键,那么给定键的最后一行表示该键标识的最新信息,后台进程定期运行并删除除最新行以外的所有行。
举例说明假设用户Alice和Bob刚开始分别有200美元和100美元,经过了以下一系列交易:
- Alice转给Bob 100美元。
- Bob转给Alice 50美元。
- Bob转给Alice 100美元。
在例子中Stream表示资金从一个账号转移到另一个账号的历史记录,Table反映了每个用户账号的最新状态。因此我们得出结论:Table将具有账户的当前状态,而Stream将捕获交易记录。
Stream可以看作是Table的变更日志,因为随着时间的推移更新Stream的聚合会产生一个表。 可以将某个Table在某个时间点视为Stream中每个键的最新值的快照(流的数据记录是键值对),观察Table随时间的变化会产生一个Stream。
Docker部署ksqlDB
创建docker-compose.yaml文件,包含ksqlDB Server和ksqlDB Cli:
--- version: '2' services: ksqldb-server: image: confluentinc/ksqldb-server:0.15.0 hostname: ksqldb-server container_name: ksqldb-server ports: - "8088:8088" environment: KSQL_LISTENERS: http://0.0.0.0:8088 KSQL_BOOTSTRAP_SERVERS: 192.168.1.87:9092 #要连接的kafka集群的地址 KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" ksqldb-cli: image: confluentinc/ksqldb-cli:0.15.0 container_name: ksqldb-cli depends_on: - ksqldb-server entrypoint: /bin/sh tty: true
通过docker-compose up -d
命令启动,然后用下面命令连接ksql:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. =========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = Event Streaming Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2020 Confluent Inc. CLI v0.15.0, Server v0.15.0 located at http://ksqldb-server:8088 Server Status: RUNNING Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
Producer代码
package tuling.kafkaDemo; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class MsgProducer { private final static String TOPIC_NAME = "cr7-topic"; public static void main(String[] args) throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092"); // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.87:9092,192.168.1.88:9092,192.168.1.89:9092"); // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "11.8.36.125:9092,11.8.38.116:9092,11.8.38.120:9092"); /* 发出消息持久化机制参数 (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。 (2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一 条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。 (3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略 会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。 */ props.put(ProducerConfig.ACKS_CONFIG, "1"); /* 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在 接收者那边做好消息接收的幂等性处理 */ props.put(ProducerConfig.RETRIES_CONFIG, 3); //重试间隔设置 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); //设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); /* kafka本地线程会从缓冲区取数据,批量发送到broker, 设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去 */ props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); /* 默认值是0,意思就是消息必须立即被发送,但这样会影响性能 一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去 如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长 */ props.put(ProducerConfig.LINGER_MS_CONFIG, 10); //把发送消息的key从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把发送消息value从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //创建Kafka消费者实例 Producer<String, String> producer = new KafkaProducer<String, String>(props); int msgNum = 50; final CountDownLatch countDownLatch = new CountDownLatch(msgNum); for (int i = 1; i <= msgNum; i++) { Order order = new Order(i, 100 + i, 1, 1000.00); //指定发送分区 /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , 0, order.getOrderId().toString(), JSON.toJSONString(order));*/ //未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , order.getOrderId().toString(), JSON.toJSONString(order)); //等待消息发送成功的同步阻塞方法 // RecordMetadata metadata = producer.send(producerRecord).get(); // System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" // + metadata.partition() + "|offset-" + metadata.offset()); //异步回调方式发送消息 producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("发送消息失败:" + exception.getStackTrace()); } if (metadata != null) { System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } // CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。 // 当每一个线程完成自己任务后,计数器的值就会减一。 countDownLatch.countDown(); } }); //异步应用场景送积分TODO } countDownLatch.await(5, TimeUnit.SECONDS); //当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。 producer.close(); //所有生产者线程完成任务后,主线程关闭和kafka broker的连接 } }
Producer会以如下Json格式向Kafka Broker发送数据:
生产者会以如下Json格式 {"orderAmount":1000,"orderId":2,"productId":102,"productNum":1}
打印Topic数据
ksql> PRINT 'cr7-topic' FROM BEGINNING limit 5; Key format: JSON or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/02/27 16:11:46.239 Z, key: 2, value: {"orderAmount":1000,"orderId":2,"productId":102,"productNum":1}, partition: 2 rowtime: 2021/02/27 16:11:46.239 Z, key: 3, value: {"orderAmount":1000,"orderId":3,"productId":103,"productNum":1}, partition: 2 rowtime: 2021/02/27 16:11:46.240 Z, key: 9, value: {"orderAmount":1000,"orderId":9,"productId":109,"productNum":1}, partition: 2 rowtime: 2021/02/27 16:11:46.241 Z, key: 16, value: {"orderAmount":1000,"orderId":16,"productId":116,"productNum":1}, partition: 2 rowtime: 2021/02/27 16:11:46.241 Z, key: 29, value: {"orderAmount":1000,"orderId":29,"productId":129,"productNum":1}, partition: 2
创建Stream
基于名为cr7-topic的topic创建一个Stream,注意Stream的名字不能有-
:
ksql> CREATE STREAM cr7_topic_stream ( orderAmount INTEGER, orderId INTEGER, productId INTEGER, productNum INTEGER) WITH (kafka_topic='cr7-topic',value_format='json'); Message ---------------- Stream created ----------------
列出所有Stream
ksql> list streams; Stream Name | Kafka Topic | Key Format | Value Format | Windowed ------------------------------------------------------------------------------------------ CR7_TOPIC_STREAM | cr7-topic | KAFKA | JSON | false
查询Stream数据
运行Producer程序,可以看到会持续输出数据:
ksql> select * from CR7_TOPIC_STREAM EMIT CHANGES; +---------------------------+---------------------------+---------------------------+---------------------------+ |ORDERAMOUNT |ORDERID |PRODUCTID |PRODUCTNUM | +---------------------------+---------------------------+---------------------------+---------------------------+ |1000 |4 |104 |1 | |1000 |6 |106 |1 | |1000 |10 |110 |1 | |1000 |12 |112 |1 | |1000 |13 |113 |1 | |1000 |14 |114 |1 | |1000 |18 |118 |1 | |1000 |19 |119 |1 | |1000 |20 |120 |1 | |1000 |24 |124 |1 | |1000 |26 |126 |1 | |1000 |31 |131 |1 | |1000 |35 |135 |1 | |1000 |38 |138 |1 | |1000 |39 |139 |1 | |1000 |42 |142 |1 | |1000 |46 |146 |1 | |1000 |1 |101 |1 | |1000 |5 |105 |1 | |1000 |7 |107 |1 | |1000 |8 |108 |1 | |1000 |11 |111 |1 | |1000 |15 |115 |1 | |1000 |17 |117 |1 | |1000 |21 |121 |1 | |1000 |22 |122 |1 | |1000 |23 |123 |1 | |1000 |25 |125 |1 | |1000 |2 |102 |1 | |1000 |3 |103 |1 |
通过Stream创建另一个Stream
将Stream cr7_topic_stream中orderid为单数的数据写入新的Stream s3中:
ksql> CREATE STREAM s3 AS SELECT * FROM cr7_topic_stream WHERE (orderid%2) != 0 EMIT CHANGES;
查看Stream s3,可以看到只有orderid为单数的数据:
ksql> select * from s3 emit changes; +---------------------------+---------------------------+---------------------------+---------------------------+ |ORDERAMOUNT |ORDERID |PRODUCTID |PRODUCTNUM | +---------------------------+---------------------------+---------------------------+---------------------------+ |1000 |1 |101 |1 | |1000 |5 |105 |1 | |1000 |7 |107 |1 | |1000 |11 |111 |1 | |1000 |15 |115 |1 | |1000 |17 |117 |1 | |1000 |21 |121 |1 | |1000 |23 |123 |1 | |1000 |25 |125 |1 | |1000 |27 |127 |1 | |1000 |33 |133 |1 | |1000 |37 |137 |1 | |1000 |43 |143 |1 | |1000 |45 |145 |1 | |1000 |47 |147 |1 | |1000 |13 |113 |1 | |1000 |19 |119 |1 | |1000 |31 |131 |1 | |1000 |35 |135 |1 | |1000 |39 |139 |1 | |1000 |3 |103 |1 |
Stream数据聚合查询
查询Stream cr7_topic_stream中的条目总数和orderamount的总和,并以productnum作为分组:
ksql> SELECT COUNT(*),SUM(orderamount) from cr7_topic_stream GROUP BY productnum EMIT CHANGES; +---------------------------------------------------------+---------------------------------------------------------+ |KSQL_COL_0 |KSQL_COL_1 | +---------------------------------------------------------+---------------------------------------------------------+ |50 |50000
手动往Stream中插入数据
ksql> INSERT INTO cr7_topic_stream (orderId,productNum) values (777,7777);
查看Stream数据结构
ksql> describe cr7_topic_stream; Name : CR7_TOPIC_STREAM Field | Type ----------------------- ORDERAMOUNT | INTEGER ORDERID | INTEGER PRODUCTID | INTEGER PRODUCTNUM | INTEGER ----------------------- For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
将上EXTENDED
参数查看详细信息:
ksql> describe extended cr7_topic_stream; Name : CR7_TOPIC_STREAM Type : STREAM Timestamp field : Not set - using <ROWTIME> Key format : KAFKA Value format : JSON Kafka topic : cr7-topic (partitions: 3, replication: 3) Statement : CREATE STREAM CR7_TOPIC_STREAM (ORDERAMOUNT INTEGER, ORDERID INTEGER, PRODUCTID INTEGER, PRODUCTNUM INTEGER) WITH (KAFKA_TOPIC='cr7-topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON'); Field | Type ----------------------- ORDERAMOUNT | INTEGER ORDERID | INTEGER PRODUCTID | INTEGER PRODUCTNUM | INTEGER ----------------------- Sources that have a DROP constraint on this source -------------------------------------------------- S3 Local runtime statistics ------------------------ (Statistics of the local KSQL server interaction with the Kafka topic cr7-topic)
删除Stream
DROP STREAM cr7_topic_stream;
创建Table
必须要含有主键,主键是Kafka生产者生产消息时指定的key。
ksql> CREATE TABLE cr7_topic_table ( orderAmount INTEGER, orderId INTEGER, productId INTEGER, productNum INTEGER, kafkaProducerKey VARCHAR PRIMARY KEY ) WITH (kafka_topic='cr7-topic',value_format='json');
kafka脚本生产消息指定key的方法:
#以逗号作为key和value的分隔符。 kafka-console-producer.sh --broker-list kafka1:9092 --topic cr7-topic --property parse.key=true --property key.separator=, >mykey,{"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}
查看Table信息
ksql> describe cr7_topic_table; Name : CR7_TOPIC_TABLE Field | Type ---------------------------------------------- ORDERAMOUNT | INTEGER ORDERID | INTEGER (primary key) PRODUCTID | INTEGER PRODUCTNUM | INTEGER ---------------------------------------------- For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>; ksql>
查询Table
ksql> select * from cr7_topic_table emit changes; +---------------------+---------------------+---------------------+---------------------+---------------------+ |KAFKAPRODUCERKEY |ORDERAMOUNT |ORDERID |PRODUCTID |PRODUCTNUM | +---------------------+---------------------+---------------------+---------------------+---------------------+ |1 |1000 |1 |101 |1 | |2 |1000 |2 |102 |2 | |3 |1000 |3 |103 |3 | ...... #当生产者重新生产数据,把Java代码中 #Order order = new Order(i, 100 + i, 1, 1000.00); 修改为 #Order order = new Order(i, 100 + i, 1, 2000.00); #在key值一样的情况下,查cr7_topic_table会是最新的值 |2 |2000 |2 |102 |2 | |3 |2000 |3 |103 |3 | |1 |2000 |1 |101 |1 | ......