KSQL,一个用于Apache Kafka流的SQL 引擎。 KSQL降低了流处理的入口,提供了一个简单而完整的交互式SQL接口,用于处理Kafka中的数据。 不再需要编写Java或Python代码! KSQL是开源的(Apache 2.0许可),分布式,可扩展,可靠且实时。 它支持各种强大的流处理操作,包括聚合,连接,窗口化,会话化等等。
例子
查询流数据意味着什么,与SQL数据库相比较
它实际上与SQL数据库完全不同。 大多数数据库用于按需查找和对存储数据的更改。 KSQL不进行查找(但是),它所做的是连续转换 - 即流处理。 例如,假设我有来自用户的点击流和信息表。 KSQL允许我对这个点击流和用户表进行建模,并将两者结合在一起。 即使这两件事中的一件是无限的。
所以KSQL运行的是连续查询 - 转换速度与它们一样快 - Kafka主题。 相反,对关系数据库的查询是一次性查询
KSQL作用
可以不断地查询无限的数据流,那有什么用?
1. 实时监控与实时分析相结合
CREATE TABLE error_counts AS SELECT error_code, count(*)FROM monitoring_stream WINDOW TUMBLING (SIZE 1 MINUTE) WHERE type = 'ERROR'
它的一个用途是定义定制的业务级度量,这些度量是实时计算的,可以对其进行监视和警报,就像处理CPU负载一样。另一个用途是在KSQL中定义应用程序的正确性概念,并检查它在生产中运行时是否满足这个要求。当我们想到监视时,我们通常会想到计数器和测量器,它们跟踪低级别性能统计数据。这些类型的标尺通常可以告诉你CPU负载很高,但是它们不能真正告诉你应用程序是否正在执行它应该执行的任务。KSQL允许从应用程序生成的原始事件流中定义自定义度量,无论它们是记录事件、数据库更新还是其他类型。
例如,一个web应用程序可能需要检查每次新用户注册一个受欢迎的电子邮件时,一个新的用户记录被创建,他们的信用卡被计费。这些功能可能分布在不同的服务或应用程序上,您可能希望在一些SLA中监视每一个新客户的每一件事情,比如30秒。
2.安全性和异常检测
CREATE TABLE possible_fraud AS SELECT card_number, count(*) FROM authorization_attempts WINDOW TUMBLING (SIZE 5 SECONDS) GROUP BY card_number HAVING count(*) > 3;
这个示例的一个简单版本是在上面的演示中看到的:KSQL查询将事件流转换为数字时间序列聚合,使用Kafka-Elastic连接器将其转换为弹性聚合,并在Grafana UI中进行可视化。安全用例通常看起来很像监视和分析。不是监视应用程序行为或业务行为,而是寻找欺诈、滥用、垃圾邮件、入侵或其他不良行为的模式。KSQL提供了一种简单、复杂和实时的方式来定义这些模式和查询实时流。
3.在线数据集成
CREATE STREAM vip_users AS SELECT userid, page, action FROM clickstream c LEFT JOIN users u ON c.userid = u.user_id WHERE u.level = 'Platinum';
来自数据库,搜索,缓存或其他数据的数据库的数据的数据收集系统。 很长一段时间,ETL - 提取,转换和加载 - 已作为定期批处理作业执行。 例如,实时转储原始数据,然后每隔几小时转换一次,以实现高效查询。 对于许多用例,这种延迟是不可接受的。 KSQL与Kafka连接器一起使用时,可以实现从批量数据集成到在线数据集成的转变。 可以使用流表连接使用存储在表中的元数据来获取丰富的数据流,或者在将流加载到另一个系统之前对PII(个人身份信息)数据进行简单过滤。
4.应用程序开发
许多应用程序将输入流转换为输出流。 例如,负责重新排序在线商店库存不足的产品的流程。
对于用Java编写的更复杂的应用程序,Kafka的本机API流可能就是这样。 但是对于简单的应用程序,或者对Java编程不感兴趣的团队,一个简单的SQL接口可能就是他们想要的。
KSQL的核心抽象
KSQL在内部使用Kafka的API Streams,它们共享相同的核心抽象,用于Kafka上的流处理。 KSQL中有两个可以由Kafka Streams操作的核心抽象,允许操作Kafka主题:
1.流:流是结构化数据的无界序列(“facts”)。 例如,我们可以进行一系列金融交易,例如“爱丽丝给鲍勃闻100美元,然后查理给鲍勃闻50美元”。 流中的事实是不可变的,这意味着可以将新事实插入到流中,但不能更新或删除。 可以从Kafka主题创建流,也可以从现有流和表派生流。
CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) WITH (kafka_topic='pageviews', value_format=’JSON’);
2.表:表是流或另一个TABLE的视图,表示不断变化的事实的集合。 例如,我们可以有一个表格,其中包含最新信息“Bob的当前账户余额为150美元”。 它相当于传统的数据库,但它通过流式语义(如窗口)来丰富。 表中的事实是可变的,这意味着可以将新事实插入表中,并且可以更新或删除现有事实。 可以从Kafka主题创建表,也可以从现有流和表派生表。
CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH (kafka_topic='users', value_format='DELIMITED');
KSQL简化了流式应用程序,因为它完全集成了表和流的概念,允许允许join表表示流的当前状态,代表正在发生的event。Apache kafka中的一个主题可以表示为KSQL中的流或表,这取决于主题上的处理的预期语义。例如,如果想将主题中的数据作为一系列独立值读取,则可以使用创建流。这样的流的一个示例是捕获页面视图事件的主题,其中每个页面视图事件是无关的并且独立于另一个。另一方面,如果要将主题中的数据作为可更新的值的集合来读取,则可以使用CREATE表。在KSQL中应该作为一个表读取的主题的一个示例是捕获用户元数据,其中每个事件代表特定用户ID的最新元数据,无论是用户的名称、地址还是首选项。
KSQL实战:实时点击流分析和异常检测
让我们来看一个真正的演示。 该演示展示了如何使用KSQL进行实时监控,异常检测和警报。 点击流数据的实时日志分析可以采用多种形式。 在此示例中,我们标记了在Web服务器上占用过多带宽的恶意用户会话。 监控恶意用户会话是会话化的众多应用之一。 但从广义上讲,会话是用户行为分析的基石。 可以将用户和事件关联到特定的身份识别会话,可以构建多种类型的分析,从简单的指标(如访问次数)到更复杂的指标(如客户转化渠道和事件流)。 我们通过展示如何在由Elastic支持的Grafana仪表板上实时可视化KSQL查询的输出来展示此演示。
相关视频和代码
视频链接:
https://pan.baidu.com/s/1vMdRDwkQPZu4U8rwSpm6PQ 密码:u5k2
代码:
你也可以按照说明
(https://github.com/confluentinc/ksql/tree/0.1.x/ksql-clickstream-demo#clickstream-analysis )写代码
KSQL集群
有一个KSQL服务器进程执行查询。 一组KSQL进程在集群上运行。你可以通过启动KSQL服务器来动态添加更多处理容量。 这些实例是容错的:如果一个失败,其他实例将接管其工作。 使用交互式KSQL命令行客户端启动查询,该客户端通过REST API将命令发送到集群。 命令行允许检查可用的流和表,发出新查询,检查状态和终止运行查询。 内部KSQL使用Kafka的API Streams构建; 它继承了其弹性可扩展性,高级状态管理和容错能力,并支持Kafka最近推出的一次性处理语义。 KSQL服务器具有分布式SQL引擎,并具有用于查询和控制的REST API。
Kafka + KSQL内部与数据库对比
我们已经讨论过将数据库内部化,我们通过在内向外数据库中添加SQL层来实现它。
在关系数据库中,表是核心抽象,日志是实现细节。 在以事件为中心,与数据库相反,核心抽象不是表格; 是日志。 表仅来自日志,并且随着新数据到达日志而连续更新。 日志是kafka,KSQL引擎,允许创建所需的实化视图并将它们表示为连续更新表。 然后,您可以针对此类流表运行时间点查询(即将推出KSQL),以持续的方式获取日志中每个键的最新值。
Kafka日志是流数据的核心存储抽象,允许离线数据仓库使用数据。 其他所有内容都是日志的流媒体物化视图,无论是各种数据库,搜索索引还是公司的其他数据服务系统。 所有数据丰富和ETL都需要使用KSQL以流媒体方式创建。 监控,安全性,异常和威胁检测,分析以及对故障的响应可以实时完成。 所有这些都可用于简单的SQL到Kafka数据。