前提回顾
Esper是一个开源的复杂事件处理引擎,它的目的是让用户能够通过它提供的接口,构建一个用于处理复杂事件的应用程序。
Esper主要包括了三个部分:Input adapter,Esper engine,Output adapter。
输入适配器和输出适配器
- 输入适配器和输出适配器的主要目的是接收来自不同事件源的事件,并向不同的目的地输出事件。
- Esper提供的适配器包括
File Input and Output adapter, Spring JMS Input and Output Adapter, AMQP Input and Output Adapter, Kafka Adapter
等等。 - 这些适配器提供了一系列接口,可以让用户从不同的数据源读取数据,并将数据发送给不同的目的数据源,用户可以不用自己单独编写客户端代码来连接这些数据源,感觉相当于对这些数据源提供了一层封装。
Esper引擎
Esper引擎是处理事件的核心,它允许用户定义需要接收的事件以及对这些事件的处理方式。
Esper支持的事件表现形式
- Esper支持多种事件表现形式,包括遵循JavaBean方式的含有getter方法的Java POJO(普通Java对象),实现了Map接口的对象,对象数组,XML文档对象,以及Apache Avro(一个支持JSON和Schema的数据序列化系统,可以将数据结构或对象转化成便于存储和传输的格式)。
- 这些事件表现形式的共同之处在于,它们都提供了事件类型的元数据,也就是说能够表示事件的一系列属性,例如,一个Java对象可以通过其成员变量来表示其事件属性,一个Map对象能够通过键值对来表示属性。
由此可见,本质上事件是一系列属性值的集合,对事件的操作即对事件中的部分或全部属性的操作。
Esper事件处理模型
Esper的事件处理模型主要包括两个部分:Statement和Listener。
(1)Statement 利用Esper的事件处理语言EPL声明对事件进行的操作,Esper中提供了多种类型的事件操作,包括过滤、加窗、事件聚合等等。EPL是一种类似于SQL的语言,从这一点上来看,Esper恰好与数据库相反,数据库时保存数据,并在数据上运行查询语句,而Esper是保存查询语句,在这些查询上运行数据,只要事件与查询条件匹配,Esper就会实时进行处理,而不是只有在查询提交的时候才处理。 假设现在要处理的事件是用户注册事件,注册时用户需要提供用户名和年龄,那么事件中将包含用户名和年龄两个属性,而我们要做的事是计算用户的平均年龄,那么,首先应该定义一个事件类PersonEvent,并加上getter方法:
public class PersonEvent extends Event { private String name; private int age; public PersonEvent(String name, int age) { this.name = name; this.age = age; } public String getName() { return name; } public int getAge() { return age; } } 复制代码
然后,通过EPL语言声明对事件的操作,此处为取平均值:
import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.events.Event; import com.events.OrderEvent; import com.events.PersonEvent; import com.listener.OrderEventListener; import com.listener.PersonEventListener; public class EsperClient { private EPServiceProvider engine; public EsperClient() { //obtain an engine instance this.engine = EPServiceProviderManager.getDefaultProvider(); //System.out.println(engine.getURI()); } public void personEventProcess() { //tell the engine about the event type engine.getEPAdministrator().getConfiguration().addEventType(PersonEvent.class); //create an epl statement String epl = "select name, age from PersonEvent"; EPStatement statement = engine.getEPAdministrator().createEPL(epl); } public void send(Event event) { engine.getEPRuntime().sendEvent(event); } } 复制代码
其中,send方法用于向Esper引擎发送一个事件,当引擎接收到这个事件后,便可根据事件的类型进行相应的处理。
(2)Listener Listener用于监听事件的处理情况,接收事件处理的结果,通过UpdateListener接口来实现,它相当于一个回调函数,当事件处理完成之后,可以通过该回调函数向结果发送到目的地。此处将处理结果打印到控制台:
import com.espertech.esper.client.EventBean; import com.espertech.esper.client.UpdateListener; public class PersonEventListener implements UpdateListener { @Override public void update(EventBean[] newEvents, EventBean[] oldEvents) { // TODO Auto-generated method stub EventBean event = newEvents[0]; System.out.println(String.format("Name: %s, Age: %d", event.get("name"), event.get("age"))); } } 复制代码
然后将对事件的操作声明和监听器关联起来:
public void personEventProcess() { //tell the engine about the event type engine.getEPAdministrator().getConfiguration(). addEventType(PersonEvent.class); //create an epl statement String epl = "select avg(age) from PersonEvent"; EPStatement statement = engine.getEPAdministrator().createEPL(epl); //attach a callback to receive the results statement.addListener(new PersonEventListener()); } 复制代码
测试:
import com.esper.client.EsperClient; import com.events.PersonEvent; public class Test { @SuppressWarnings("static-access") public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub EsperClient ec = new EsperClient(); ec.personEventProcess(); ec.send(new PersonEvent("name", 10)); ec.send(new PersonEvent("name", 20)); } } 复制代码
总结分析
一般来讲数据采集最好采用异步方式,一方面不会影响业务的核心交易链路,一方面可以保证采集方式的通用性 热点发现最好做到秒级实时,这样动态发现才有意义,实际上也是对核心节点的数据采集和分析能力提出了较高的要求