Flink实现实时异常登陆监控(两秒内多次登陆失败进行异常行为标记)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Flink实现实时异常登陆监控(两秒内多次登陆失败进行异常行为标记)

Flink实现异常登陆监控(两秒内多次登陆失败进行异常行为标记)

在大数据处理领域,Apache Flink 是一个流行的开源流处理框架,能够高效处理实时数据流。在这篇博客中,我们将展示如何使用 Apache Flink 从 MySQL 中读取数据并进行实时异常监控处理,最终将结果写回到 MySQL 数据库中的err_login表中。


项目概述

我们的示例程序将会执行以下任务:

从 MySQL 数据库读取用户登录数据。

过滤出特定状态的登录记录。

对这些记录进行时间窗口处理。

将处理结果写回 MySQL 数据库。

依赖环境

在开始之前,请确保你已经安装了以下环境:

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>EastMoney</artifactId>
    <version>1.0-SNAPSHOT</version>
    <repositories>
        <repository>
            <id>central</id>
            <name>Maven Central Repository</name>
            <url>https://repo.maven.apache.org/maven2</url>
        </repository>
    </repositories>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
            <!-- Apache Flink dependencies -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.11</artifactId>
                <version>1.14.6</version>
            </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
    </dependencies>

</project>

MySQL 数据库

CREATE TABLE `login_detail` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(255) DEFAULT NULL,
  `password` varchar(255) DEFAULT NULL,
  `time` varchar(255) DEFAULT NULL,
  `status` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=127 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 

CREATE TABLE `err_login` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(255) DEFAULT NULL,
  `status` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=74 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 

1. 数据模型定义

首先,我们定义了一个简单的 User case class,用于表示从 MySQL 中读取的用户数据。

case class User(id: Int, username: String, password: String, time: String, status: Int)

2.自定义 MySQL 数据源

我们实现了一个自定义的 RichSourceFunction,从 MySQL 数据库中读取数据。该函数会不断地查询数据库,并将新数据发送到 Flink 流中。

class MySQLInsertSource(jdbcUrl: String, username: String, password: String, tableName: String) extends RichSourceFunction[User] {
  @volatile private var isRunning = true
  private var connection: Connection = _
  private var lastMaxTime: String = _

  override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {
    super.open(parameters)
    connection = DriverManager.getConnection(jdbcUrl, username, password)
    // Initial load
    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(s"SELECT * FROM $tableName")
    while (resultSet.next()) {
      val user = User(
        resultSet.getInt("id"),
        resultSet.getString("username"),
        resultSet.getString("password"),
        resultSet.getString("time"),
        resultSet.getInt("status")
      )
      // Update lastMaxTime
      if (lastMaxTime == null || user.time > lastMaxTime) {
        lastMaxTime = user.time
      }
    }
  }

  override def run(ctx: SourceFunction.SourceContext[User]): Unit = {
    val statement = connection.createStatement()
    while (isRunning) {
      val query = s"SELECT * FROM $tableName WHERE time > '$lastMaxTime'"
      val resultSet = statement.executeQuery(query)
      while (resultSet.next()) {
        val user = User(
          resultSet.getInt("id"),
          resultSet.getString("username"),
          resultSet.getString("password"),
          resultSet.getString("time"),
          resultSet.getInt("status")
        )
        ctx.collect(user)
        // Update lastMaxTime
        if (user.time > lastMaxTime) {
          lastMaxTime = user.time
        }
      }
      Thread.sleep(2000) // sleep for 2 seconds
    }
  }

  override def cancel(): Unit = {
    isRunning = false
    if (connection != null) {
      connection.close()
    }
  }
}

变量声明:

isRunning: 用于控制数据源是否继续运行。

connection: 用于连接 MySQL 数据库的 Connection 对象。

lastMaxTime: 记录上次读取数据的最大时间戳,用于增量查询。

open 方法:在数据源启动时初始化数据库连接并进行初始加载,读取全部数据,更新 lastMaxTime。

run 方法:在数据源运行时不断查询数据库,获取新数据并发送到 Flink 流中。每隔2秒执行一次查询,并更新 lastMaxTime。

cancel 方法:在数据源取消时关闭数据库连接。

3. 时间戳分配器和水位线

为了确保事件按时间顺序处理,我们为数据流分配时间戳并生成水位线。

val userStreamWithTimestamps = userStream
  .assignTimestampsAndWatermarks(
    WatermarkStrategy
      .forBoundedOutOfOrderness[User](Duration.ofSeconds(1))
      .withTimestampAssigner(new SerializableTimestampAssigner[User] {
        override def extractTimestamp(element: User, recordTimestamp: Long): Long = {
          val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          val date = format.parse(element.time)
          date.getTime
        }
      })
  )

WatermarkStrategy:定义了水位线生成策略。forBoundedOutOfOrderness 表示允许事件在1秒的乱序范围内到达。


SerializableTimestampAssigner:定义了时间戳提取器,从 User 对象的 time 字段提取时间戳。


4. 数据过滤和窗口处理

我们过滤出 status 为 0 的记录,并对这些记录进行2秒的窗口处理。

val filteredStream = userStreamWithTimestamps.filter(_.status == 0)

val windowedStream = filteredStream
  .keyBy(_.username)
  .timeWindow(Time.seconds(2))
  .process(new WriteToDatabaseFunction(jdbcUrl, username, password))

过滤:filter 操作保留 status 为 0 的记录。(0为登陆失败)

窗口处理:对每个 username 进行2秒的时间窗口处理,并使用自定义的 WriteToDatabaseFunction 进行处理。

5. 窗口处理函数

我们实现了一个 ProcessWindowFunction,在窗口结束时将获取到的异常登陆用户写入 MySQL 数据库。

class WriteToDatabaseFunction(url: String, username: String, password: String) extends ProcessWindowFunction[User, String, String, TimeWindow] {
  val insertSql = "INSERT INTO err_login (username, status) VALUES (?, ?)"

  override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {
    val allStatusOne = elements.forall(_.status == 0)
    if (allStatusOne) {
      out.collect(s"Username: $key had status 1 for 2 seconds")
      val connection = DriverManager.getConnection(url, username, password)
      val preparedStatement = connection.prepareStatement(insertSql)
      try {
        for (user <- elements) {
          preparedStatement.setString(1, user.username)
          preparedStatement.setInt(2, user.status)
          preparedStatement.addBatch()
        }
        preparedStatement.executeBatch()
      } finally {
        preparedStatement.close()
        connection.close()
      }
    }
  }
}

变量声明:insertSql 为插入错误登录记录的 SQL 语句。

process 方法:

检查窗口内的所有记录 status 是否都为 0。

如果是,打印日志并将记录写入 err_login 表中。

使用批量插入提高效率。

6. 主函数

最后,我们将所有部分组装在一起,并执行 Flink 作业。

object FlinkMySQLExample {
  val jdbcUrl = "jdbc:mysql://localhost:3306/big_data"
  val username = "root"
  val password = "12345678"
  val tableName = "login_detail"

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val mySQLSource = new MySQLInsertSource(jdbcUrl, username, password, tableName)
    val userStream = env.addSource(mySQLSource)

    userStream.print()

    val userStreamWithTimestamps = userStream
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[User](Duration.ofSeconds(1))
          .withTimestampAssigner(new SerializableTimestampAssigner[User] {
            override def extractTimestamp(element: User, recordTimestamp: Long): Long = {
              val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
              val date = format.parse(element.time)
              date.getTime
            }
          })
      )

    val filteredStream = userStreamWithTimestamps.filter(_.status == 0)

    val windowedStream = filteredStream
      .keyBy(_.username)
      .timeWindow(Time.seconds(2))
      .process(new WriteToDatabaseFunction(jdbcUrl, username, password))

    windowedStream.print()

    env.execute("Flink MySQL Example")
  }
}

主函数:

获取 Flink 的执行环境。

添加自定义数据源 MySQLInsertSource,从 MySQL 数据库中读取数据。

将数据流赋予时间戳和水位线。

过滤出 status 为 0 的记录。

对过滤后的记录进行2秒的窗口处理,并将结果写入 MySQL 数据库。

执行 Flink 作业。


7.总结

这段代码展示了如何使用 Apache Flink 处理实时数据流,并与 MySQL 数据库进行交互。通过自定义数据源、时间戳和水位线分配、窗口处理和自定义窗口函数,我们可以构建强大的流处理应用程序。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 消息中间件 监控
实时计算 Flink版产品使用问题之怎么使用Metric Reporters监控作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
4月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
4月前
|
消息中间件 监控 关系型数据库
实时计算 Flink版产品使用问题之运行后,怎么进行监控和报警
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
机器学习/深度学习 监控 Serverless
Serverless 应用的监控与调试问题之Flink在内部使用的未来规划,以及接下来有什么打算贡献社区的创新技术
Serverless 应用的监控与调试问题之Flink在内部使用的未来规划,以及接下来有什么打算贡献社区的创新技术
|
4月前
|
机器学习/深度学习 监控 大数据
Serverless 应用的监控与调试问题之Flink在整个开源大数据生态中应该如何定位,差异化该如何保持
Serverless 应用的监控与调试问题之Flink在整个开源大数据生态中应该如何定位,差异化该如何保持
|
4月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Pravega和Flink实现端到端的auto-scaling要如何操作
Serverless 应用的监控与调试问题之Pravega和Flink实现端到端的auto-scaling要如何操作
|
4月前
|
SQL 监控 大数据
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
|
4月前
|
存储 监控 Cloud Native
Serverless 应用的监控与调试问题之Flink流批一体在架构层面有什么演进
Serverless 应用的监控与调试问题之Flink流批一体在架构层面有什么演进
|
4月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决