探索Flink动态CEP:杭州银行的实战案例

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 探索Flink动态CEP:杭州银行的实战案例

本文撰写自杭州银行大数据工程师唐占峰、欧阳武林老师。将介绍 Flink 动态 CEP的定义与核心概念、应用场景、并深入探讨其技术实现并介绍使用方式。主要分为以下几个内容:


  1. Flink动态CEP简介
  2. Flink动态CEP的应用场景
  3. Flink动态CEP的技术实现
  4. Flink动态CEP的使用方式
  5. 杭州银行应用实践


金融行业大数据技术正在进入成熟期,数据的实时性在金融的实时监控和分析交易数据以识别洗钱行为、欺诈行为、和确保合规性是至关重要的。随着业务环境的快速变化,传统的静态规则引擎已经无法满足这些需求,因为它们在规则变更时需要重启服务,这会导致服务中断和延迟响应。我们引入由 Flink 发展过来的 Flink 动态 CEP 作为行内的动态规则引擎,它能够在不中断服务的情况下动态更新规则,适应不断变化的业务需求。

CEP 是复杂事件处理 Complex Event Processing 的缩写,而 Flink CEP 则是基于 Flink 实现的复杂事件处理库,它可以识别出数据流中符合特定模式(Pattern)的事件序列,并允许用户作出针对性处理。Flink 动态 CEP,作为 Flink CEP 的高级功能,进一步扩展了这一能力,它支持在不重启服务的情况下动态更新规则。这种动态性不仅提高了系统的灵活性和响应速度,还大大降低了维护成本和复杂性。


01 Flink 动态 CEP 简介

1. Flink 动态 CEP 的定义和核心概念

Flink 动态 CEP 是 Apache Flink 流处理框架的一个高级功能,它允许通过DataStream(数据流)作业方式运行支持动态规则更新的 Flink CEP 作业,对 数据流进行动态的捕获、清洗和分析。Flink 动态 CEP 做到了基于 Flink 全托管快速构建动态加载最新规则来处理上游的数据流,让用户有机会实时掌握数据中重要的高阶特征。

关键概念

①pattern(模式):模式是规则,也是定义规则的方式。一个模式可以是单例或者循环模式,单例只接受一个事件,循环模式可以接受多个事件。用户可以使用pattern 来识别匹配到的事件。多个 pattern 可以组成复杂模式,我们把由多个pattern 组成的复杂模式序列称为 patternProcessor(模式处理器)。

②事件流:事件流可以来自异构上游,可以是 kafka 数据,也可以是数据库表数据(如交易流水类的实时事件流)。当 Flink 动态 CEP 作业启动后,遇到实际输入事件流,Flink 会尝试识别定义的 patternProcessor 并进行动态匹配,最终得到匹配结果。

③动态匹配:Flink 动态 CEP 会实时识别事件流变化,并不断发送给下游算子,下游算子接收到发送的事件进行解析和反序列化后生成真正使用的 patternProcessor,根据最新的 patternProcessor 定义的规则进行动态匹配。

2. Flink 动态 CEP 解决的问题

Flink CEP 是一种规则引擎,是通过设置规则模式来匹配事件的。而频繁变化的交易、记账场景要求我们对初始规则进行调整或者对规则进行新增。例如一个 CEP 作业初始规则是转账用户在一分钟内连续进行3次转账后将其认为是风险操作。而在特殊场景,预期转账次数会多一点,一分钟3次的转账次数阈值可能不合适,在当前开源 Flink CEP 实现下,没法做到使用户无感的转换,只能重新编写 Java 代码,然后重启作业,以使最新的规则生效。这样的操作带来时间成本较高和重启作业代价高的问题。因为要走一遍完整的代码开发和打包上线流程对于对时间延迟敏感程度高的银行风控领域是难以接受的,且规则引擎里通常会维护很多不同的规则,如果简单的规则修改都需要较长的时间窗口,会影响其他人的使用,维护起来也比较困难。Flink 动态 CEP 很好的降低了传统规则引擎较高的时间成本并做到无需重启作业就能丝滑更新规则,以下是 Flink 动态 CEP 解决的主要问题:

①动态规则更新:传统规则引擎在规则变更时需要重新部署和启动作业,这会导致服务中断,影响系统的实时性和可用性。而 Flink 动态 CEP 允许在不中断服务的情况下动态加载和更新 CEP 规则,这意味着可以在运行时修改模式匹配逻辑,而无需重启整个 Flink 作业。

②多规则支持:在静态场景下使用多条规则时,传统 Flink CEP 需要创建多个 CepOperator(CEP算子),这会导致数据的额外拷贝,增加处理开销。Flink动态 CEP 支持在一个 Operator(算子)中处理多条规则,减少了数据拷贝,提高了处理效率。

③参数化 Condition 支持:Flink 动态 CEP 支持在 Json 格式规则描述中定义参数化的 Condition,提高了自定义 Condition 的拓展性,解决了动态添加新的 Condition 类实现的需求。


02 Flink 动态 CEP 的应用场景

Flink 动态 CEP 就像是一个智能监控系统,它不仅能在线识别风险行为(比如洗钱或欺诈),还能为实时营销助力,为业务赋能。和金融领域相关的应用场景如下:

1. 反洗钱

Flink 动态 CEP 可以监控银行账户的交易活动,识别出类似洗钱的行为。例如,可以设置规则来识别短时间内频繁的大额存款和取款行为,或者识别出与洗钱交易相关的账户之间的资金流动,从而触发反洗钱调查。也可以结合大数据技术和机器学习技术构建洗钱风险监测模型,更准确地识别可疑交易和潜在洗钱风险客户。还可以运用 Flink 动态 CEP 的流式计算技术实时分析处理客户的全链路交易信息,结合知识图谱、实时智能等技术,构建起全行级别反洗钱领域客户关系网络图,深入融合可疑交易特征,动态完整展现资金流转全貌。

2. 反欺诈

国内电信网络诈骗非常的猖獗,金融领域的反欺诈系统对电信网络诈骗案件能起到非常关键的作用,能及时阻断欺诈案件中的资金流动减少用户资金损失。反欺诈系统对系统本身分布式、实时性、规则灵活、复杂规则匹配能力要求非常高,而 Flink 动态 CEP 在 Flink 的分布式、实时性的特性基础上,增加复杂规则匹配和规则动态配置能力,为反欺诈系统提供一种很好的解决方案。

3. 实时营销

在金融客户申请信用卡的时候,客户通常需要完成填充基本信息、个人身份信息认证等多个步骤完成信用卡的申请。用户在多步骤申请信用卡的过程中,有可能会因为各种原因在其中的任意一个环节退出、失败或超时。针对这种情况,利用客户行为日志作为数据源,Flink 动态 CEP 可以利用多种规则对各个环节客户的行为数据做规则匹配、计算。并可以根据输出结果做多种营销策略的输出,如推送客户优惠券、推送消息给客户经理及时联系客户来提高营销效率,为业务赋能。


03 Flink 动态 CEP 的技术实现

根据以上背景并基于阿里在社区提出的 FLIP-200 方案,ververica-cep 开源demo,数据架构研发团队在部门内实现了一版 Flink 动态 CEP 的支持。下面详细介绍我们是如何实现的。

在 Flink 动态 CEP 中我们复用了 Flink 的 OperatorCoordinator(算子协调器)机制,用它来负责协调FLink作业中的各个 operator(算子)。OperatorCoordinator 在 JobManager 中运行,会给 TaskManager 的 Operator 发送事件,我们实现的 DynamicCEPOperatorCoordinator(动态 CEP 算子协调器)是 OperatorCoordinator 的实现类,它是 JobManager 中运行的线程,负责调用 PatternProcessorDiscoverer(模式处理器探查器)接口拿到最新的 PatternProcessor。Flink 动态 CEP 的整体架构图如下所示:

image.png

上图展示的是从数据库中读取序列化后的 PatternProcessor 的过程。可以看到OperatorCoordinator 会调用 PatternProcessorDiscoverer 接口从数据库中拿到最新的且序列化后的 PatternProcessor,拿到后它会发送给和它关联的DynamicCEPOp(动态cep算子)。DynamicCEPOp 接收到发送的事件进行解析和反序列化后,最终生成要使用的 PatternProcessor 并构造相应的NFA(非确定有限状态机)。之后即可使用新构造的NFA来处理上游发生的事件,并最终输出到下游。基于这样的方式,可以做到不停机的规则更新,且只有 OperatorCoordinator 和规则数据库交互,可以减少对数据库的访问,并利用Flink 的特性保证下游 sub_task 中使用规则的一致性。

了解了 Flink 动态CEP获取规则的流程,接下来要构建FlinkCEP作业,最重要的方法,就是构建 CEP.dynamicPatterns(),阿里云实时计算 Flink 版已经定义了CEP.dynamicPatterns()Api,该 API 定义代码如下:

public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(

      DataStream<T> input,

      PatternProcessorDiscovererFactory<T> patternProcessorDiscovererFactory,

      TimeBehaviour timeBehaviour,

      TypeInformation<R> outTypeInfo)

该方法入参说明如下:

 参数   说明  
 DataStream<T>  input   输入事件流  
PatternProcessorDiscovererFactory<T>  patternProcessorDiscovererFactory   工厂对象,负责构造一个探查器(PatternProcessorDiscoverer),探查器负责获取最新规则,即构造一个PatternProcessor接口  
 TimeBehaviour  TimeBehaviour   描述FlinkCEP作业如何处理事件的时间属性。参数取值如下:ProcessingTime:代表按照ProcessingTime处理事件  EventTime:代表按照Event Time处理事件  
 TypeInformation<R>  OutTypeInfo   描述输出流的类型信息  

dynamicPatterns() 方法中 input、OutTypeInfo 分别定义输入和输出流,TimeBehaviour 定义时间属性,这里不需要多做介绍,PatternProcessorDiscovererFactory<T>接口负责构造探查器 PatternProcessorDiscoverer 以拿到最新 PatternProcessor,在实现Flink动态CEP功能中起到关键作用,故本文着重对 patternProcessor、 PatternProcessorDiscoverer 两个接口及其实现类和负责拿到 PatternProcessor 并发送给下游算子的 DynamicCEPOperatorCoordinator 的代码进行详细。

1. patternProcessor接口及其实现

public  interface PatternProcessor<IN> extends Serializable, Versioned{
   String getId();
   default Long getTimestamp(){
       return Long.MIN_VALUE;
  }
   Pattern<IN,?> getPattern(ClassLoader classLoader);
   PatternProcessFunction<IN,?> getPatternProcessFunction();
}

PatternProcessor 接口用于完整定义CEP中的一条规则。一个PatternProcessor 实现类包含一个确定的模式(Pattern)用于描述如何去匹配事件、一个 PatternProcessFunction 用于描述怎么处理一个匹配事件。除此之外还包括id和 version(可选)等用于标识 PatternProcessFunction 的信息属性。因此一个PatternProcessor既包含规则本身,也指明了规则触发时,Flink 作业如何做出响应。

@PublicEvolving
public class DefaultPatternProcessor<T> implements PatternProcessor<T> {
   /** The ID of the pattern processor. */
   private final String id;
   /** The version of the pattern processor. */
   private final Integer version;
   /** The pattern of the pattern processor. */
   private final String patternStr;
   private final @Nullable PatternProcessFunction<T, ?> patternProcessFunction;
   public DefaultPatternProcessor(
           final String id,
           final Integer version,
           final String pattern,
           final @Nullable PatternProcessFunction<T, ?> patternProcessFunction,
           final ClassLoader userCodeClassLoader) {
       this.id = checkNotNull(id);
       this.version = checkNotNull(version);
       this.patternStr = checkNotNull(pattern);
       this.patternProcessFunction = patternProcessFun
       ction;
  }
   @Override
   public String toString() {
       return "DefaultPatternProcessor{"
               + "id='"
               + id
               + '\''
               + ", version="
               + version
               + ", pattern="
               + patternStr
               + ", patternProcessFunction="
               + patternProcessFunction
               + '}';
  }
   @Override
   public String getId() {
       return id;
  }
   @Override
   public int getVersion() {
       return version;
  }
   @Override
   public Pattern<T, ?> getPattern(ClassLoader classLoader) {
       try {
           return (Pattern<T, ?>) CepJsonUtils.convertJSONStringToPattern(patternStr, classLoader);
      } catch (Exception e) {
           throw new RuntimeException(e);
      }
  }
       @Override
       public PatternProcessFunction<T,?> getPatternProcessFunction(){
        return patternProcessFunction;
  }
}


DefaultPatternProcessor 类是 PatternProcessor 的默认实现,它接收 id, version, pattern 字符串, PatternProcessFunction 和 ClassLoader 作为参数。并使用 checkNotNull 确保除了 patternProcessFunction 外的参数不为 null。它的 getPattern 方法中包括转换json字符串到CEP能识别的 pattern 的方法 convertJSONStringToPattern(),我们重写了 convertJSONStringToPattern() 方法,接受入参为我们指定的 classloader (类加载器)如下所示:

public static Pattern<?, ?> convertJSONStringToPattern(
       String jsonString, ClassLoader userCodeClassLoader) throws Exception {
   if (userCodeClassLoader == null) {
       LOG.warn(
               "The given userCodeClassLoader is null. Will try to use ContextClassLoader of current thread.");
       return convertJSONStringToPattern(jsonString);
  }
   GraphSpec deserializedGraphSpec = objectMapper.readValue(jsonString, GraphSpec.class);
   return deserializedGraphSpec.toPattern(userCodeClassLoader);
}

它的核心方法 toPattern() 涉及到 GraphSpec 类和方法本身,GraphSpec 类是Flink 中用于描述 Pattern 序列化和反序列化的工具,它用于处理由节点 (Nodes) 和边 (Edges) 组成的图形结构。这里的节点可以是单独的 Pattern 或者是嵌套的 GraphSpec,边则定义了节点之间的关系和数据流的方向,这和数据库中存储的规则Dag紧密相关,这里不做过多解释,具体来看 toPattern() 方法的实现:

public Pattern<?, ?> toPattern(final ClassLoader classLoader) throws Exception {
   // Construct cache of nodes and edges for later use
   final Map<String, NodeSpec> nodeCache = new HashMap<>();
   for (NodeSpec node : nodes) {
       nodeCache.put(node.getName(), node);
  }
   final Map<String, EdgeSpec> edgeCache = new HashMap<>();
   for (EdgeSpec edgeSpec : edges) {
       edgeCache.put(edgeSpec.getSource(), edgeSpec);
  }
   String currentNodeName = findBeginPatternName();
   Pattern<?, ?> prevPattern = null;
   String prevNodeName = null;
   while (currentNodeName != null) {
       NodeSpec currentNodeSpec = nodeCache.get(currentNodeName);
       EdgeSpec edgeToCurrentNode = edgeCache.get(prevNodeName);
       Pattern<?, ?> currentPattern =
               currentNodeSpec.toPattern(
                       prevPattern,
                       afterMatchStrategy.toAfterMatchSkipStrategy(),
                       prevNodeName == null
                               ? ConsumingStrategy.STRICT
                              : edgeToCurrentNode.getType(),
                       classLoader);
       if (currentNodeSpec instanceof GraphSpec) {
           ConsumingStrategy strategy =
                   prevNodeName == null
                           ? ConsumingStrategy.STRICT
                          : edgeToCurrentNode.getType();
           prevPattern =
                   buildGroupPattern(
                           strategy, currentPattern, prevPattern, prevNodeName == null);
      } else {
           prevPattern = currentPattern;
      }
       prevNodeName = currentNodeName;
       currentNodeName =
               edgeCache.get(currentNodeName) == null
                       ? null
                      : edgeCache.get(currentNodeName).getTarget();
  }
   // Add window semantics
   if (window != null && prevPattern != null) {
       prevPattern.within(this.window.getTime(), this.window.getType());
  }
   return prevPattern;
}

toPattern方法是 GraphSpec 类中的核心方法之一,它负责将 GraphSpec 对象序列化信息反序列化回 Pattern 对象。它的内部逻辑包含几个步骤:

①构建节点和边缓存:创建 nodeCache 和 edgeCache 映射,分别存储NodeSpec和 EdgeSpec 实例。这有助于在后续处理中快速查找和使用节点和边的信息

②确定开始节点:初始化 currentNodeName 变量,它表示当前处理的节点名称。这个值通过调用 findBeginPatternName() 方法获得,该方法确保从图中的开始节点开始处理。

③构建 Pattern 迭代:

使用循环迭代所有节点,从开始节点开始,根据边的信息向前构建模式。在每次迭代中:从 nodeCache 获取当前节点的 NodeSpec。从 edgeCache 获取从上一个节点到当前节点的 EdgeSpec(如果存在)。使用 NodeSpec 和 EdgeSpec 构建或更新当前的 Pattern。这涉及到根据消耗策略(ConsumingStrategy)来使用不同的 Pattern 方法,如 Pattern.begin(), Pattern.next(),Pattern.followedBy(), 或 Pattern.followedByAny()。最后更新 prevPattern 和 prevNodeName 为下一个迭代做准备。最终返回构建完成的Pattern对象。

以上详细介绍了 patternProcessor 接口实现和其中的关键方法,描述了可用的Pattern 构建过程。下面介绍 PatternProcessorDiscoverer 接口及其实现。

2. PatternProcessorDiscoverer接口及其实现  

public abstract interface PatternProcessorDiscoverer<T> extends Closeable
{
   public abstract void discoverPatternProcessorUpdates(PatternProcessorManager<T> paramPatternProcessorManager);
}

PatternProcessorDiscoverer 接口用于描述如何发现 Processor。

我们基于阿里云默认周期性扫描外部存储的抽象类periodicPatternProcessorDiscoverer,提供了一个用于从支持 JDBC 协议的数据库中拉取最新规则的实现:JDBCPeriodicPatternProcessorDiscoverer

public class JDBCPeriodicPatternProcessorDiscoverer<T>
       extends PeriodicPatternProcessorDiscoverer<T> {
   private static final Logger LOG =
           LoggerFactory.getLogger(JDBCPeriodicPatternProcessorDiscoverer.class);
   private final String tableName;
   private final String userName;
   private final String password;
   private final String jdbcUrl;
   private final String tenant;
   private final List<PatternProcessor<T>> initialPatternProcessors;
   private final ClassLoader userCodeClassLoader;
   private Connection connection;
   private Statement statement;
   private ResultSet resultSet;
   private Map<String, Tuple4<String, Integer, String, String>> latestPatternProcessors = new ConcurrentHashMap<>();
   /**
    * Creates a new using the given initial {@link PatternProcessor} and the time interval how
    * often to check the pattern processor updates.> *
    *
    * @param jdbcUrl The JDBC url of the database.> * @param jdbcDriver The JDBC driver of the database.> * @param initialPatternProcessors The list of the initial {@link PatternProcessor}.> * @param intervalMillis Time interval in milliseconds how often to check updates.>
    */
   public JDBCPeriodicPatternProcessorDiscoverer(
           final String jdbcUrl,
           final String jdbcDriver,
           final String tableName,
           final String userName,
           final String password,
           @Nullable final String tenant,
           final ClassLoader userCodeClassLoader,
           @Nullable final List<PatternProcessor<T>> initialPatternProcessors,
           @Nullable final Long intervalMillis)
           throws Exception {
       super(intervalMillis);
       this.tableName = requireNonNull(tableName);
       this.initialPatternProcessors = initialPatternProcessors;
       this.userCodeClassLoader = userCodeClassLoader;
       this.userName = userName;
       this.password = password;
       this.jdbcUrl = jdbcUrl;
       this.tenant = tenant;
       Class.forName(requireNonNull(jdbcDriver));
       this.connection = DriverManager.getConnection(requireNonNull(jdbcUrl), userName, password);
       this.statement = this.connection.createStatement();
  }

JDBCPeriodicPatternProcessorDiscoverer 包括两个关键方法 arePatternProcessorsUpdated() 和 getLatestPatternProcessors(),分别用于判断 PatternProcessors 是否被更新和获取最新的 PatternProcessors。

@Override
public boolean arePatternProcessorsUpdated() throws SQLException {
   if (latestPatternProcessors == null
           && !CollectionUtil.isNullOrEmpty(initialPatternProcessors)) {
       return true;
  }
   LOG.info("Start check is pattern processor updated.");
   if (statement == null) {
       try {
           this.connection = DriverManager.getConnection(requireNonNull(jdbcUrl), userName, password);
           this.statement = this.connection.createStatement();
      } catch (SQLException e) {
           LOG.error("Connect to database error!", e);
           throw e;
      }
  }
   try {
       String sql = buildQuerySql();
       LOG.info("Statement execute sql is {}", sql);
       resultSet = statement.executeQuery(sql);
       Map<String, Tuple4<String, Integer, String, String>> currentPatternProcessors = new ConcurrentHashMap<>();
       while (resultSet.next()) {
           LOG.debug("check getLatestPatternProcessors start :{}", resultSet.getString(1));
           String id = resultSet.getString("id");
           if (currentPatternProcessors.containsKey(id)
                   && currentPatternProcessors.get(id).f1 >= resultSet.getInt("version")) {
               continue;
          }
           currentPatternProcessors.put(
                   id,
                   new Tuple4<>(
                           requireNonNull(resultSet.getString("id")),
                           resultSet.getInt("version"),
                           requireNonNull(resultSet.getString("pattern")),
                           resultSet.getString("function")));
      }
       if (latestPatternProcessors == null
               || isPatternProcessorUpdated(currentPatternProcessors)) {
           LOG.debug("latest pattern processors size is {}", currentPatternProcessors.size());
           latestPatternProcessors = currentPatternProcessors;
           return true;
      } else {
           return false;
      }
  } catch (SQLException e) {
       LOG.error(
               "Pattern processor discoverer failed to check rule changes, will recreate connection.", e);
       try {
           statement.close();
           connection.close();
           connection = DriverManager.getConnection(requireNonNull(this.jdbcUrl), this.userName, this.password);
           statement = connection.createStatement();
      } catch (SQLException ex) {
           LOG.error("Connect pattern processor discovery database error.", ex);
           throw new RuntimeException("Cannot recreate connection to database.");
      }
  }
   return false;
}

arePatternProcessorsUpdated() 用于检查数据库中存储的模式处理器是否发生了更新,它首先会检查是否有尚未处理的初始模式处理器列表(initialPatternProcessors),如果存在未被处理的 PatternProcessor,则直接返回true。接着建立数据库连接,调用 buildQuerySql() 来执行 sql,用于从 tableName 指定的表中获取所有或特定租户 (tenant) 的模式处理器信息。然后处理sql的执行结果,对每一个 currentPatternProcessors,检查是否已存在或版本是否更旧。如果存在更旧的版本则跳过,否则更新 currentPatternProcessors 映射。如果 latestPatternProcessors 为空或存在更新,则用 currentPatternProcessors 更新 latestPatternProcessors,并返回 true,表示有更新。


@Override
public List<PatternProcessor<T>> getLatestPatternProcessors() throws Exception {
   LOG.debug("Start convert pattern processors to default pattern processor.");
   return latestPatternProcessors.values().stream()
          .map(
                   patternProcessor -> {
                       try {
                           String patternStr = patternProcessor.f2;
                           GraphSpec graphSpec =
                                   CepJsonUtils.convertJSONStringToGraphSpec(patternStr);
                           LOG.debug("Latest pattern processor is {}",
                                   CepJsonUtils.convertGraphSpecToJSONString(graphSpec));
                           PatternProcessFunction<T, ?> patternProcessFunction = null;
                           String id = patternProcessor.f0;
                           int version = patternProcessor.f1;
                           if (!StringUtils.isNullOrWhitespaceOnly(patternProcessor.f3)) {
                               patternProcessFunction =
                                      (PatternProcessFunction<T, ?>)
                                               this.userCodeClassLoader
                                                      .loadClass(patternProcessor.f3)
                                                      .getConstructor(String.class, int.class, String.class)
                                                      .newInstance(id, version, tenant);
                          }
                           return new DefaultPatternProcessor<>(
                                   patternProcessor.f0,
                                   patternProcessor.f1,
                                   patternStr,
                                   patternProcessFunction,
                                   this.userCodeClassLoader);
                      } catch (Exception e) {
                           LOG.error(
                                   "Get the latest pattern processors of the discoverer failure. - ", e);
                           e.printStackTrace();
                      }
                       return null;
                  }).filter(pre -> pre != null).collect(Collectors.toList());
}

getLatestPatternProcessors() 方法涉及从数据库获取最新 PatternProcessors的过程,利用 StreamAPI,将存储在 ConcurrentHashMap 中的模式处理器信息转换为 PatternProcessor 列表。这里涉及到实例化的过程:根据模式处理器信息中的类名(patternProcessor.f3),通过类加载器加载并实例化自定义的 PatternProcessFunction。如果类名不为空或非空字符串,将其转换为对应的 Java 类,并调用构造函数,传入处理器的 id、version 和租户 tenant 信息。使用上述信息,创建一个 DefaultPatternProcessor 实例,封装模式字符串、自定义的处理器函数、类加载器等信息,最后返回一个PatternProcessor 列表,其中包含了从数据库中获取的所有模式处理器的最新实例。这些实例可以被 Flink 的 CEP 功能直接使用,以处理复杂事件模式匹配。

3. PatternProcessorDiscoverer接口及其实现

接下来介绍 DynamicCepOperatorCoordinator(动态CEP算子协调器),它承担着调用上文 PatternProcessorDiscoverer 接口从数据库中拿到最新的且序列化后的 PatternProcessor,并发送给和它关联的 DynamicCEPOp 的任务如下所示:

public class DynamicCepOperatorCoordinator<T> implements OperatorCoordinator {
   private static final Logger LOG =
           LoggerFactory.getLogger(DynamicCepOperatorCoordinator.class);
   private final DynamicCepOperatorCoordinatorContext cepCoordinatorContext;
   private final PatternProcessorDiscovererFactory discovererFactory;
   private final String operatorName;
   private boolean started;
   private volatile boolean closed;
   public DynamicCepOperatorCoordinator(String operatorName, PatternProcessorDiscovererFactory discovererFactory, DynamicCepOperatorCoordinatorContext context) {
       this.cepCoordinatorContext = context;
       this.discovererFactory = discovererFactory;
       this.operatorName = operatorName;
       this.started = false;
       this.closed = false;
  }
   @Override
   public void start() throws Exception {
       Preconditions.checkState(!started, "Dynamic Cep Operator Coordinator Started!");
       LOG.info("Starting Coordinator for {}:{}", this.getClass().getSimpleName(), operatorName);
       cepCoordinatorContext.runInCoordinatorThreadWithFixedRate(()->{
           if (discovererFactory instanceof PeriodicPatternProcessorDiscovererFactory) {
               try {
               PeriodicPatternProcessorDiscoverer patternProcessorDiscoverer =
                      (PeriodicPatternProcessorDiscoverer) discovererFactory
                              .createPatternProcessorDiscoverer(cepCoordinatorContext.getUserCodeClassloader());
               boolean updated = patternProcessorDiscoverer.arePatternProcessorsUpdated();
                   if (updated && started) {
                   Set<Integer> subtasks = cepCoordinatorContext.getSubtasks();
                   if (!patternProcessorDiscoverer.getLatestPatternProcessors().isEmpty()) {
                       UpdatePatternProcessorEvent updatePatternProcessorEvent =
                               new UpdatePatternProcessorEvent(patternProcessorDiscoverer.getLatestPatternProcessors());
                       subtasks.forEach(subtaskId -> {
                           cepCoordinatorContext.sendEventToOperator(subtaskId, updatePatternProcessorEvent);
                      });
                  }
              }
              } catch (Exception e) {
                   LOG.error("Starting Coordinator failed", e);
              }
          }
      });
       started = true;
  }
   @Override
   public void close() throws Exception {
       closed = true;
       cepCoordinatorContext.close();
  }
   @Override
   public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {
       LOG.info("Received event {} from operator {}.", event, subtask);
  }
   @Override
   public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
//       cepCoordinatorContext.runInCoordinatorThread(() -> {
           LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", operatorName, checkpointId);
           try {
               resultFuture.complete("Dynamic cep".getBytes(StandardCharsets.UTF_8));
          } catch (Throwable e) {
               ExceptionUtils.rethrowIfFatalErrorOrOOM(e);
               resultFuture.completeExceptionally(
                       new CompletionException(
                               String.format(
                                       "Failed to checkpoint for dynamic cep %s",
                                       operatorName),
                               e));
          }
  }
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
  }
   @Override
   public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
  }
   @Override
   public void subtaskReset(int subtask, long checkpointId) {
  }
   @Override
   public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
       cepCoordinatorContext.subtaskNotReady(subtask);
  }
   @Override
   public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {
       cepCoordinatorContext.subtaskReady(gateway);
  }
}

下面只介绍它的关键方法start(),用于负责初始化和激活协调器的运行流程:

start() 方法调用 cepCoordinatorContext.runInCoordinatorThreadWithFixedRate 来安排一个周期性执行的任务。这个方法将在框架的协调器线程中执行一个 lambda 表达式定义的任务,定期检查模式处理器更新。在这里我们定义的时间是10s,也就是每10s检查和执行一次 patternProcessors 的更新逻辑。然后构建UpdatePatternProcessorEvent,由 cepCoordinatorContext 来广播它给下游算子。需要注意的是,DynamicCepOperatorCoordinator 是 jobmanager 运行的线程,和 taskmanager 中 PatternProcessor 的产生过程是异步的。


04 Flink 动态 CEP 的使用方式

本章介绍如何编写 Flink 动态 CEP 作业,具体操作流程如下(以Kafka源为例):

1. 连接数据源(数据源也可以是来自数据库,配置不同的连接器即可)

public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Classloader initial
       final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// Process args
// Build Kafka source with new Source API based on FLIP-27
       Properties prop =new Properties();
       prop.setProperty("security.protocol","SASL_PLAINTEXT");
       prop.setProperty("sasl.mechanism","SCRAM-SHA-256");
       prop.setProperty("sasl.jaas.config",
               "org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule" +
               " required username=\"100670\" password=\"000000000\";");
       KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder()
              .setBootstrapServers("123.4.50.105:9292,123.4.60.106:9292,123.4.50.107:9292")
              .setTopics("cep_test1").setGroupId("test").setStartingOffsets(OffsetsInitializer.earliest())
              .setProperties(prop).setValueOnlyDeserializer((new KafkaJsonDeserializer())).build();
       env.setParallelism(1);
       DataStream<Event> input = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "source");
// keyBy userId and productionId
// Notes, only events with the same key will be processd to see if there is a match
       KeyedStream<Event, Tuple2<String, String>> keyedStream =
               input.keyBy(
                       new KeySelector<Event, Tuple2<String, String>>() {
                           @Override
                           public Tuple2<String, String> getKey(Event value) throws Exception {
                               return Tuple2.of(value.getName(), value.getName());
                          }
                      });

①初始化执行环境

②Kafka 源配置,并将事件流 Event 根据 name 字段进行 keyby

2. 构建动态规则匹配

long time = 1000;
SingleOutputStreamOperator<String> output = CEP.dynamicPatterns(
       keyedStream,
       new JDBCPeriodicPatternProcessorDiscovererFactory<>(
       "jdbc:mysql//123.45.6.789:3306/cep_demo_db",
       "com.mysql.cj.jdbc.Driver",
       "rds_demo",
       "riskcollateral",
       "riskcollateral",
       null,
       null,
       timer),
       TimeBehaviour.ProcessingTime,
       TypeInformation.of(new TypeHint<String>()){
      }));
   output.addSink(new PrintSinkFunction<>().name("cep"));
   env.excute("CEPDemo");
}
}

3. 构建并运行

我们使用 Streampark 作为 Flink 作业的运维管控平台,根据以下步骤创建 Flink jar 包作业:

①添加jar包资源:

image.png

②添加作业:

image.png

③添加作业相关配置:

image.png

④发布及启动作业:

image.png

image.png

4. 插入规则

①建表 rds_demo 用于存储 cep 规则:

image.png

②插入动态更新规则:

将表示 Pattern 的 JSON 字符串与 id、version、function 类名一起插入 rds_demo 表中(阿里云实时计算Flink版定义了一套 JSON 格式的规则描述,详情请参加阿里云文档——动态 CEP 中规则的 JSON 格式定义):

 id    version    pattern    function  
 1    1    {"name":"end","quantifier":{"consumingStrategy}...    xxxpackage.dynamic.cep.core.DemoPatternProcessFunction    

将 pattern 的 JSON 字符串解析后,展示如下:

{
   "name": "end",
   "quantifier": {
       "consumingStrategy": "SKIP_TILL_NEXT",
       "properties": [
           "SINGLE"
      ],
       "times": null,
       "untilCondition": null
  },
   "condition": null,
   "nodes": [
      {
           "name": "end",
           "quantifier": {
               "consumingStrategy": "SKIP_TILL_NEXT",
               "properties": [
                   "SINGLE"
              ],
               "times": null,
               "untilCondition": null
          },
           "condition": {
               "className": "xxxpackage.dynamic.cep.core.EndCondition",
               "type": "CLASS"
          },
           "type": "ATOMIC"
      },
      {
           "name": "start",
           "quantifier": {
               "consumingStrategy": "SKIP_TILL_NEXT",
               "properties": [
                   "LOOPING"
              ],
               "times": null,
               "untilCondition": null
 
          },
           "type": "ATOMIC"
      }
  ],
   "edges": [
      {
           "source": "start",
           "target": "end",
           "type": "SKIP_TILL_NEXT"
      }
  ],
   "window": null,
   "afterMatchStrategy": {
       "type": "SKIP_PAST_LAST_EVENT",
       "patternName": null
  },
   "type": "COMPOSITE",
   "version": 1
}

这段 JSON 规则描述了一个复合模式 (COMPOSITE),它由两个原子节点(ATOMIC)组成:“start”和“end”。

这个模式目的是匹配一个特定的事件序列,其中“start”节点匹配 action 等于0的输入事件,而“end”节点匹配“xxxpackage.dynamic.cep.core.EndCondition”这个类定义的事件,这个条件由开发者定义,例如:

public class EndCondition extends SimpleCondition<Event> {
   @Override
   public boolean filter(Event value) throws Exception {
       return value.getAction() != 1;
  }
}

这个 EndCondition 用于检查事件的 action 属性是否不等于1.如果事件的 action 属性不等于1,那么 filter 方法将返回 true,表示事件满足 end 节点的条件。

结合起来,这个模式的匹配的事件序列满足:“start”节点匹配所有 action 等于0的事件,一旦遇到一个 action 不等于1的事件,“end”节点的条件被满足,整个模式匹配完成。

function 字段用 DemoPatternProcessFunction 类的全路径加类名指定,记录了匹配到记录以后的处理方法如下:

public class DemoPatternProcessFunction<IN> extends PatternProcessFunction<IN, String> {
   String id;
   int version;
   String tenant;
   public DemoPatternProcessFunction(String id, int version, String tenant) {
       this.id = id;
       this.version = version;
       this.tenant = tenant;
  }
   @Override
   public void processMatch(
           final Map<String, List<IN>> match, final Context ctx, final Collector<String> out) {
       StringBuilder sb = new StringBuilder();
       sb.append("A match for Pattern of (id, version): (")
              .append(id)
              .append(", ")
              .append(version)
              .append(") is found. The event sequence: ").append("\n");
       for (Map.Entry<String, List<IN>> entry : match.entrySet()) {
           sb.append(entry.getKey()).append(": ").append(entry.getValue().get(0).toString()).append("\n");
      }
       out.collect(sb.toString());
  }
}

这个处理方法是如果 PatternProcessor 匹配到一个事件序列,processMatch 方法将生成对应的描述性字符串,并由下游算子通过 Collector 将其输出。

5. 输入事件流

假如有一个事件序列如下:

private static void sendEvents(Producer<String, String> producer, String topic) {
   ObjectMapper objectMapper = new ObjectMapper();
   Event[] events = {
           new Event("ken", 1, 1, 0, 1662022777000L),
           new Event("ken", 2, 1, 0, 1662022778000L),
           new Event("ken", 3, 1, 1, 1662022779000L),
           new Event("ken", 4, 1, 2, 1662022780000L),
           new Event("ken", 5, 1, 1, 1662022780000L)
  };
   while (true) {
       try {
           for (Event event : events) {
               String json = objectMapper.writeValueAsString(event);
               ProducerRecord<String, String> record = new ProducerRecord<>(topic, json);
               producer.send(record, (metadata, exception) -> {
                   if (exception != null) {
                       LOG.error("Failed to send data to Kafka: ", exception);
                  } else {
                       System.out.println(metadata.topic());
                       LOG.info("Data sent successfully to topic {} at offset {}",
                               metadata.topic(), metadata.offset());
                  }
              });
          }
      } catch (Exception e) {
           LOG.error("Error while sending events to Kafka: ", e);
      }
  }
}

我们往 Kafka Topic 插入 events,我们将会观察到 “start” 节点会匹配前两个事件,因为它们的 action 属性为0。第四个事件 action 不等于1,因此“end”节点的条件被满足,模式匹配完成。第五个事件不会影响已经完成的模式匹配。


05 杭州银行应用实践

杭州银行在我们开发的 Flink 动态 CEP 规则引擎下,也有实际的业务场景落地和应用,如事件中心-行为序列事件模块。

事件中心是以用户行为埋点数据作为数据源,对他们进行处理和分析,并输出结果辅助业务决策的平台。其中行为序列事件模块应用了行内开发的 Flink 动态 CEP 技术。事件中心-行为序列事件模块如下:

新增一个行为序列事件,填好基础信息后,用户可在行为序列配置里可以新增事件或事件组,并配置事件过期时间。

image.png

一个行为序列事件模板如下:

image.png

如下图所示,1-5原子事件表示某用户的埋点行为序列,作为 Flink 动态 CEP 的输入流 event 按照埋点顺序进入动态规则匹配,而匹配的规则是事件过期时间,这里为 20分钟。例如某输入流在 20分钟内还未完成全部五个原子事件,而只完成到事件4,这样则视为模式匹配完成,匹配到的事件为事件1到事件4,可以通过配置输出流输出自定义的规则匹配结果(如用户名字、错误原因、用户手机号码等)到 kafka、rocketMQ 等消息队列。如此,就能给业务更有价值的数据支持,做针对性的用户推荐。

image.png

Flink 动态 CEP 在事件中心实践中的优势体现在,修改或新增规则或事件序列,完全无需启停服务,只需直接编辑并保存。web 端修改会同步修改数据库中保存的规则,然后选择上线,动态规则转换就完成了。

image.png


【参考文献】

[1]阿里云开发者社区.(2023−02−10).Flink CEP 新特性进展与在实时风控场景的落地.阿里云开发者社区.https://developer.aliyun.com/article/1157197

[2]阿里云帮助中心. (2023-11-07). Flink 动态 CEP 快速入门_实时计算 Flink版(Flink). 阿里云帮助中心. https://help.aliyun.com/zh/flink/getting-started/getting-started-with-dynamic-flink-cep

[3]Apache Flink. (2022-09-16). FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP). Apache Flink. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308

[4]Apache Flink. (v1.15.4). FlinkCEP-Flink的复杂事件处理 . https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/libs/cep/

[5]https://github.com/RealtimeCompute/ververica-cep-demohttps://github.com/RealtimeCompute/ververica-cep-demo

   

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
760 2
探索Flink动态CEP:杭州银行的实战案例
|
4月前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
381 27
|
7月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
5月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
2288 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
9月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
1091 7
阿里云实时计算Flink在多行业的应用和实践
zdl
|
5月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
263 56
|
3月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
293 0
Flink CDC 在阿里云实时计算Flink版的云上实践
|
4月前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
8月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
人工智能 Apache 流计算
Flink Forward Asia 2024 上海站|探索实时计算新边界
Flink Forward Asia 2024 即将盛大开幕!11 月 29 至 30 日在上海举行,大会聚焦 Apache Flink 技术演进与未来规划,涵盖流式湖仓、流批一体、Data+AI 融合等前沿话题,提供近百场专业演讲。立即报名,共襄盛举!官网:https://asia.flink-forward.org/shanghai-2024/
1091 33
Flink Forward Asia 2024 上海站|探索实时计算新边界

热门文章

最新文章