【Esper技术专题】一个简单的事件响应功能的案例(2)复杂事件处理引擎

简介: 【Esper技术专题】一个简单的事件响应功能的案例(2)复杂事件处理引擎

前提回顾


Esper是一个开源的复杂事件处理引擎,它的目的是让用户能够通过它提供的接口,构建一个用于处理复杂事件的应用程序。


Esper主要包括了三个部分:Input adapter,Esper engine,Output adapter。




输入适配器和输出适配器


  • 输入适配器和输出适配器的主要目的是接收来自不同事件源的事件,并向不同的目的地输出事件。
  • Esper提供的适配器包括File Input and Output adapter, Spring JMS Input and Output Adapter, AMQP Input and Output Adapter, Kafka Adapter等等。
  • 这些适配器提供了一系列接口,可以让用户从不同的数据源读取数据,并将数据发送给不同的目的数据源,用户可以不用自己单独编写客户端代码来连接这些数据源,感觉相当于对这些数据源提供了一层封装。



Esper引擎


Esper引擎是处理事件的核心,它允许用户定义需要接收的事件以及对这些事件的处理方式。



Esper支持的事件表现形式


  • Esper支持多种事件表现形式,包括遵循JavaBean方式的含有getter方法的Java POJO(普通Java对象),实现了Map接口的对象,对象数组,XML文档对象,以及Apache Avro(一个支持JSON和Schema的数据序列化系统,可以将数据结构或对象转化成便于存储和传输的格式)。
  • 这些事件表现形式的共同之处在于,它们都提供了事件类型的元数据,也就是说能够表示事件的一系列属性,例如,一个Java对象可以通过其成员变量来表示其事件属性,一个Map对象能够通过键值对来表示属性。


由此可见,本质上事件是一系列属性值的集合,对事件的操作即对事件中的部分或全部属性的操作。


Esper事件处理模型


Esper的事件处理模型主要包括两个部分:Statement和Listener。


(1)Statement 利用Esper的事件处理语言EPL声明对事件进行的操作,Esper中提供了多种类型的事件操作,包括过滤、加窗、事件聚合等等。EPL是一种类似于SQL的语言,从这一点上来看,Esper恰好与数据库相反,数据库时保存数据,并在数据上运行查询语句,而Esper是保存查询语句,在这些查询上运行数据,只要事件与查询条件匹配,Esper就会实时进行处理,而不是只有在查询提交的时候才处理。 假设现在要处理的事件是用户注册事件,注册时用户需要提供用户名和年龄,那么事件中将包含用户名和年龄两个属性,而我们要做的事是计算用户的平均年龄,那么,首先应该定义一个事件类PersonEvent,并加上getter方法:


public class PersonEvent extends Event {
    private String name;
    private int age;
    public PersonEvent(String name, int age) {
        this.name = name;
        this.age = age;
    }
    public String getName() {
        return name;
    }
    public int getAge() {
        return age;
    }    
}
复制代码

然后,通过EPL语言声明对事件的操作,此处为取平均值:

import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.events.Event;
import com.events.OrderEvent;
import com.events.PersonEvent;
import com.listener.OrderEventListener;
import com.listener.PersonEventListener;
public class EsperClient {
    private EPServiceProvider engine;
    public EsperClient() {
        //obtain an engine instance
        this.engine = EPServiceProviderManager.getDefaultProvider();
        //System.out.println(engine.getURI());
    }
    public void personEventProcess() {
        //tell the engine about the event type
        engine.getEPAdministrator().getConfiguration().addEventType(PersonEvent.class);
        //create an epl statement
        String epl = "select name, age from PersonEvent";
        EPStatement statement = engine.getEPAdministrator().createEPL(epl);
    }
    public void send(Event event) {
        engine.getEPRuntime().sendEvent(event);
    }
}
复制代码


其中,send方法用于向Esper引擎发送一个事件,当引擎接收到这个事件后,便可根据事件的类型进行相应的处理。



(2)Listener Listener用于监听事件的处理情况,接收事件处理的结果,通过UpdateListener接口来实现,它相当于一个回调函数,当事件处理完成之后,可以通过该回调函数向结果发送到目的地。此处将处理结果打印到控制台:


import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;
public class PersonEventListener implements UpdateListener {
    @Override
    public void update(EventBean[] newEvents, EventBean[] oldEvents) {
        // TODO Auto-generated method stub
        EventBean event = newEvents[0];
        System.out.println(String.format("Name: %s, Age: %d", event.get("name"), event.get("age")));
    }
}
复制代码



然后将对事件的操作声明和监听器关联起来:

public void personEventProcess() {
        //tell the engine about the event type
        engine.getEPAdministrator().getConfiguration().
        addEventType(PersonEvent.class);
        //create an epl statement
        String epl = "select avg(age) from PersonEvent";
        EPStatement statement = engine.getEPAdministrator().createEPL(epl);
        //attach a callback to receive the results
        statement.addListener(new PersonEventListener());
    }
复制代码




测试:

import com.esper.client.EsperClient;
import com.events.PersonEvent;
public class Test {
    @SuppressWarnings("static-access")
    public static void main(String[] args) throws InterruptedException {
        // TODO Auto-generated method stub
        EsperClient ec = new EsperClient();
        ec.personEventProcess();
        ec.send(new PersonEvent("name", 10));    
        ec.send(new PersonEvent("name", 20));    
   }
}
复制代码




总结分析


一般来讲数据采集最好采用异步方式,一方面不会影响业务的核心交易链路,一方面可以保证采集方式的通用性 热点发现最好做到秒级实时,这样动态发现才有意义,实际上也是对核心节点的数据采集和分析能力提出了较高的要求




相关文章
|
8月前
|
边缘计算 JSON 物联网
解锁业务灵活性:RuleGo规则引擎的高效解耦与实时响应秘籍
RuleGo是一个基于Go语言的轻量级、高性能规则引擎,旨在通过动态规则链和组件化设计,简化复杂系统的业务逻辑管理和实时响应。
解锁业务灵活性:RuleGo规则引擎的高效解耦与实时响应秘籍
|
5月前
|
数据采集 存储 前端开发
ClkLog 自定义事件分析登场
ClkLog的自定义事件分析功能在大家满满的期待下终于发布了。 本次更新我们添加了用户关联、事件采集、事件分析三大块功能点。
ClkLog 自定义事件分析登场
|
8月前
|
XML 中间件 数据库
基于jeecgboot的flowable流程支持定时捕获事件
基于jeecgboot的flowable流程支持定时捕获事件
136 0
|
4月前
|
图形学 开发者 UED
Unity游戏开发必备技巧:深度解析事件系统运用之道,从生命周期回调到自定义事件,打造高效逻辑与流畅交互的全方位指南
【8月更文挑战第31天】在游戏开发中,事件系统是连接游戏逻辑与用户交互的关键。Unity提供了多种机制处理事件,如MonoBehaviour生命周期回调、事件系统组件及自定义事件。本文介绍如何有效利用这些机制,包括创建自定义事件和使用Unity内置事件系统提升游戏体验。通过合理安排代码执行时机,如在Awake、Start等方法中初始化组件,以及使用委托和事件处理复杂逻辑,可以使游戏更加高效且逻辑清晰。掌握这些技巧有助于开发者更好地应对游戏开发挑战。
184 0
|
设计模式 Go
Go语言事件系统设计解析:发布-订阅模式实战
Go语言事件系统设计解析:发布-订阅模式实战
516 0
|
8月前
|
XML 中间件 数据库
基于jeecgboot的flowable流程支持定时边界事件
基于jeecgboot的flowable流程支持定时边界事件
143 0
|
8月前
|
存储 前端开发 JavaScript
【亮剑】在Web开发中,滚动事件(onScroll)是一个常见且强大的交互手段。
【4月更文挑战第30天】在React中,利用`onScroll`事件可实现无限滚动、动态加载和视差效果。通过`componentDidMount`和`componentWillUnmount`(类组件)或`useEffect`(函数组件)添加/移除滚动监听器。为了优化性能,需注意节流、防抖、虚拟滚动和避免同步计算。实战案例展示了如何结合Intersection Observer实现无限滚动列表,当最后一个帖子进入视口时加载更多数据。关注性能,确保应用流畅。
138 0
|
存储 自然语言处理 算法
GaiaX开源解读 | 表达式作为逻辑动态化的基础,我们是如何设计的
GaiaX跨端模板引擎,是在阿里优酷、淘票票、大麦内广泛使用的Native动态化方案,其核心优势是性能、稳定和易用。本系列文章《GaiaX开源解读》,带大家看看过去三年GaiaX的发展过程。
379 0
|
存储 自然语言处理 算法
作为逻辑动态化的基础,GaiaX 表达式是如何设计的? | GaiaX 开源解读
GaiaX 跨端模板引擎,是在阿里文娱内广泛使用的 Native 动态化方案,其核心优势是性能、稳定和易用。本系列文章《GaiaX 开源解读》,带大家看看过去三年 GaiaX 的发展过程。 GaiaX 开源地址:https://github.com/alibaba/GaiaX
442 0
作为逻辑动态化的基础,GaiaX 表达式是如何设计的? | GaiaX 开源解读
|
Java
【Esper技术专题】针对于Esper事件驱动框架的数据模型分析| 8月更文挑战
【Esper技术专题】针对于Esper事件驱动框架的数据模型分析| 8月更文挑战
277 0

热门文章

最新文章