实现-事件服务模型
我以开源数据交换(https://github.com/dlimeng/exchange-parent),来讲解一些源码,毕竟是经历过线上的中间件,这样有兴趣的朋友用时,很轻松可以迁移过去。
因为项目代码很多,我只讲解事件服务模型部分,其它的略过。
代码在scheduler模块中。
Service
```java
//所有服务的接口,有状态机
public interface Service extends Closeable {
/**
* Service states
*/
public enum STATE {
NOTINITED(0, "NOTINITED"),
INITED(1, "INITED"),
STARTED(2, "STARTED"),
STOPPED(3, "STOPPED");
private final int value;
private final String statename;
private STATE(int value, String name) {
this.value = value;
this.statename = name;
}
public int getValue() {
return value;
}
@Override
public String toString() {
return statename;
}
}
void start();
void stop();
@Override
void close() throws IOException;
String getName();
STATE getServiceState();
long getStartTime();
boolean isInState(STATE state);
Throwable getFailureCause();
STATE getFailureState();
boolean waitForServiceToStop(long timeout);
void init();
}
有两个类:
AbstractService 单个服务抽象类,实现单个业务功能。
CompositeService 组合服务。
>事件
```java
//所有事件的抽象类
public abstract class AbstractEvent<TYPE extends Enum<TYPE>> implements Event<TYPE> {
private final TYPE type;
private final long timestamp;
public AbstractEvent(TYPE type) {
this.type = type;
this.timestamp = 1L;
}
public AbstractEvent(TYPE type,long timestamp){
this.type = type;
this.timestamp = timestamp;
}
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public TYPE getType() {
return type;
}
@Override
public String toString() {
return "EventType: " + getType();
}
}
中央处理器
继承AbstractService
/**
* 获取事件
* 先进先出队列,阻塞队列
* @return
*/
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
dispatch(event);
}
}
}
};
}
/**
* 适配启动设计handler处理器
* @param event
*/
@SuppressWarnings("unchecked")
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
/**
**注册事件,根据eventType绑定
**
**/
@SuppressWarnings("unchecked")
@Override
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}
节选一部分。
组合服务
```java
public class SwapMaster extends CompositeService {
private Dispatcher dispatcher;//中央异步调度器
private volatile boolean isAlive=true;
public void setAlive(boolean alive) {
isAlive = alive;
}
public boolean isAlive() {
return isAlive;
}
public SwapMaster(String name) {
super(name);
SwapMasterUtil.getInstance(this);
}
@Override
public void serviceInit() throws Exception {
dispatcher = new AsyncDispatcher();
//注册Job和Task事件调度器,所以数据源的注册都在此
dispatcher.register(SwapJobEventType.class,new JobBase.SwapJobEventDispatcher());
dispatcher.register(HiveExportType.class,new HiveExportJob.HiveExportDispatcher());
dispatcher.register(HiveImportType.class,new HiveImportJob.HiveImportDispatcher());
dispatcher.register(ESExportType.class,new ESExportJob.ESExportDispatcher());
dispatcher.register(ESImportType.class,new ESImportJob.ESImportDispatcher());
dispatcher.register(MySQLExportType.class,new MySQLExportJob.MySQLExportDispatcher());
dispatcher.register(MySQLImportType.class,new MySQLImportJob.MySQLImportDispatcher());
dispatcher.register(OracleExportType.class,new OracleExportJob.OracleExportDispatcher());
dispatcher.register(OracleImportType.class,new OracleImportJob.OracleImportDispatcher());
dispatcher.register(GbaseExportType.class,new GbaseExportJob.GbaseExportDispatcher());
dispatcher.register(GbaseImportType.class,new GbaseImportJob.GbaseImportDispatcher());
dispatcher.register(FileExportType.class,new FileExportJob.FileExportDispatcher());
dispatcher.register(FileImportType.class,new FileImportJob.FileImportDispatcher());
dispatcher.register(Neo4jExportType.class,new Neo4jExportJob.Neo4jExportDispatcher());
dispatcher.register(Neo4jImportType.class,new Neo4jImportJob.Neo4jImportDispatcher());
addService((Service)dispatcher);
super.serviceInit();
}
@Override
public void serviceStart() throws Exception {
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
super.serviceStop();
}
public Dispatcher getDispatcher() {
return dispatcher;
}
}
这个也是整个模型client请求入口。
>测试
```plain
public class TestSimpleMRAppMaster extends CompositeService {
private Dispatcher dispatcher;//中央异步调度器
private String jobID;
private int taskNumber;//该作业包含的任务数目
private String[] taskIDs;//该作业内部包含的所有任务
public TestSimpleMRAppMaster(String name, String jobID, int taskNumber) {
super(name);
this.jobID = jobID;
this.taskNumber = taskNumber;
this.taskIDs = new String[taskNumber];
for (int i = 0; i < taskNumber; i++) {
taskIDs[i] = new String(jobID+"_task_"+i);
}
}
@Override
public void serviceInit() throws Exception {
dispatcher = new AsyncDispatcher();
//注册Job和Task事件调度器
dispatcher.register(TestJobEventType.class,new JobEventDispatcher());
dispatcher.register(TestTaskEventType.class,new TaskEventDispatcher());
addService((Service)dispatcher);
super.serviceInit();
}
public Dispatcher getDispatcher() {
return dispatcher;
}
private class JobEventDispatcher implements EventHandler<TestJobEvent> {
public void handle(TestJobEvent event) {
if(event.getType() == TestJobEventType.JOB_KILL){
System.out.println("Receive JOB_KILL event, killing all the tasks");
for (int i = 0; i < taskNumber; i++) {
dispatcher.getEventHandler().handle(new TestTaskEvent(taskIDs[i], TestTaskEventType.T_KILL));
}
}else if(event.getType() == TestJobEventType.JOB_INIT){
System.out.println("Receive JOB_INIT event, scheduling tasks");
for (int i = 0; i < taskNumber; i++) {
dispatcher.getEventHandler().handle(new TestTaskEvent(taskIDs[i], TestTaskEventType.T_SCHEDULE));
}
}
}
}
private class TaskEventDispatcher implements EventHandler<TestTaskEvent> {
public void handle(TestTaskEvent event) {
if (event.getType() == TestTaskEventType.T_KILL) {
System.out.println("Receive T_KILL event of task" + event.getTaskID());
} else if (event.getType() == TestTaskEventType.T_SCHEDULE) {
System.out.println("Receive T_SCHEDULE of task" + event.getTaskID());
}
}
}
}
public class TestSimpleMRAppMasterRun {
public static void main(String[] args) throws Exception {
String jobID = "job_2020";
TestSimpleMRAppMaster appMaster = new TestSimpleMRAppMaster("Simple MRAppMaster", jobID, 5);
appMaster.serviceInit();
appMaster.serviceStart();
/**
* Receive JOB_INIT event, scheduling tasks
* Receive JOB_KILL event, killing all the tasks
* Receive T_SCHEDULE of taskjob_2020_task_0
* Receive T_SCHEDULE of taskjob_2020_task_1
* Receive T_SCHEDULE of taskjob_2020_task_2
* Receive T_SCHEDULE of taskjob_2020_task_3
* Receive T_SCHEDULE of taskjob_2020_task_4
* Receive T_KILL event of taskjob_2020_task_0
* Receive T_KILL event of taskjob_2020_task_1
* Receive T_KILL event of taskjob_2020_task_2
* Receive T_KILL event of taskjob_2020_task_3
* Receive T_KILL event of taskjob_2020_task_4
*/
//TestTaskEvent
appMaster.getDispatcher().getEventHandler().handle((AbstractEvent)new TestTaskEvent(jobID, TestTaskEventType.T_SCHEDULE));
//appMaster.getDispatcher().getEventHandler().handle(new TestTaskEvent(jobID, TestTaskEventType.JOB_KILL));
appMaster.serviceStop();
}
}
结果:
Receive T_SCHEDULE of taskjob_2020
实现-分组消费架构
首先项目地址https://github.com/77954309/scheduler
目的
因为事件存在大量不同种类,紧急程度不同,中间件要适应大数据基础服务,多引擎,多队列,需要合理分组,并能够及时监控消费状况。执行逻辑
Scheduler
abstract class Scheduler {
def init():Unit
def start():Unit
def getName:String
def submit(event:SchedulerEvent):Unit
def get(event: SchedulerEvent): Option[SchedulerEvent]
def get(eventId: String): Option[SchedulerEvent]
def shutdown():Unit
def getSchedulerContext:SchedulerContext
}
object Scheduler extends Logging{
def createScheduler(scheduleType: String, schedulerContext: SchedulerContext): Option[Scheduler] = {
scheduleType match {
case "FIFO" => Some(new FIFOScheduler(schedulerContext))
case "PARA" => Some(new ParallelScheduler(schedulerContext))
case _ => {
error("Please enter the correct scheduling type!(请输入正确的调度类型!)")
None
}
}
}
}
- 当事件抵达,调用submit。
- 通过SchedulerContext上下文,获取groupName。
- 创建消费者,放入等待队列中。
- 设置事件id。
ConsumerManager
消费管理器注册在SchedulerContext中,维护着分组消费器关系,初始化消费者。
abstract class ConsumerManager {
private var schedulerContext:SchedulerContext = _
def setSchedulerContext(schedulerContext:SchedulerContext) :Unit = this.schedulerContext = schedulerContext
def getSchedulerContext:SchedulerContext = schedulerContext
def setConsumerListener(consumerListener:ConsumerListener):Unit
def getOrCreateExecutorService:ExecutorService
def getOrCreateConsumer(groupName:String):Consumer
protected def createConsumer(groupName:String):Consumer
def destroyConsumer(groupName:String):Unit
def shutdown():Unit
def listConsumers():Array[Consumer]
}
FIFOUserConsumer
消费器维护等待队列和消费队列,加入事件时,会执行长连接线程,轮询要消费的队列。
override def run(): Unit = {
Thread.currentThread().setName(s"${toString}Thread")
info(s"$toString thread started!")
while (!terminate){
Utils.tryAndError(loop())
Utils.tryAndError(Thread.sleep(10))
}
info(s"$toString thread stopped!")
}
小结
把主要的运行逻辑分析完了,感兴趣的朋友自行下载源码查看。
总结
基本分析完了,感觉兴趣的朋友可以给我留言互相交流。