消费架构实战-实现

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 消费架构实战-实现

实现-事件服务模型

我以开源数据交换(https://github.com/dlimeng/exchange-parent),来讲解一些源码,毕竟是经历过线上的中间件,这样有兴趣的朋友用时,很轻松可以迁移过去。
image.png

因为项目代码很多,我只讲解事件服务模型部分,其它的略过。

代码在scheduler模块中。

Service
```java
//所有服务的接口,有状态机
public interface Service extends Closeable {

/**
 * Service states
 */
public enum STATE {

    NOTINITED(0, "NOTINITED"),

    INITED(1, "INITED"),



    STARTED(2, "STARTED"),



    STOPPED(3, "STOPPED");



    private final int value;

    private final String statename;

    private STATE(int value, String name) {
        this.value = value;
        this.statename = name;
    }

    public int getValue() {
        return value;
    }



    @Override
    public String toString() {
        return statename;
    }
}



void start();

void stop();

@Override
void close() throws IOException;

String getName();

STATE  getServiceState();

long getStartTime();

boolean isInState(STATE state);

Throwable getFailureCause();

STATE getFailureState();

boolean waitForServiceToStop(long timeout);

void init();

}



有两个类:

AbstractService 单个服务抽象类,实现单个业务功能。

CompositeService 组合服务。

>事件
```java
//所有事件的抽象类
public abstract class AbstractEvent<TYPE extends Enum<TYPE>> implements Event<TYPE> {



    private final TYPE type;

    private final long timestamp;

    public AbstractEvent(TYPE type) {
        this.type = type;
        this.timestamp = 1L;
    }

    public AbstractEvent(TYPE type,long timestamp){
        this.type = type;
        this.timestamp = timestamp;
    }



    @Override
    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public TYPE getType() {
        return type;
    }

    @Override
    public String toString() {
        return "EventType: " + getType();
    }

}

中央处理器
继承AbstractService

/**
 * 获取事件
 * 先进先出队列,阻塞队列
 * @return
 */
Runnable createThread() {
   
   
  return new Runnable() {
   
   
    @Override
    public void run() {
   
   
      while (!stopped && !Thread.currentThread().isInterrupted()) {
   
   
        drained = eventQueue.isEmpty();
        // blockNewEvents is only set when dispatcher is draining to stop,
        // adding this check is to avoid the overhead of acquiring the lock
        // and calling notify every time in the normal run of the loop.
        if (blockNewEvents) {
   
   
          synchronized (waitForDrained) {
   
   
            if (drained) {
   
   
              waitForDrained.notify();
            }
          }
        }
        Event event;
        try {
   
   
          event = eventQueue.take();
        } catch(InterruptedException ie) {
   
   
          if (!stopped) {
   
   
            LOG.warn("AsyncDispatcher thread interrupted", ie);
          }
          return;
        }
        if (event != null) {
   
   
          dispatch(event);
        }
      }
    }
  };
}
 /**
   * 适配启动设计handler处理器
   * @param event
   */
  @SuppressWarnings("unchecked")
  protected void dispatch(Event event) {
   
   
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
   
   
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }



    Class<? extends Enum> type = event.getType().getDeclaringClass();



    try{
   
   
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
   
   
        handler.handle(event);
      } else {
   
   
        throw new Exception("No handler for registered for " + type);
      }
    } catch (Throwable t) {
   
   
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      // If serviceStop is called, we should exit this thread gracefully.
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false
          && stopped == false) {
   
   
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
  }
/**
**注册事件,根据eventType绑定
**
**/
 @SuppressWarnings("unchecked")
  @Override
  public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
   
   
    /* check to see if we have a listener registered */
    EventHandler<Event> registeredHandler = (EventHandler<Event>)
    eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
   
   
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
   
   
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
   
   
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

节选一部分。

组合服务
```java
public class SwapMaster extends CompositeService {
private Dispatcher dispatcher;//中央异步调度器

private volatile boolean isAlive=true;

public void setAlive(boolean alive) {
    isAlive = alive;
}

public boolean isAlive() {
    return isAlive;
}

public SwapMaster(String name) {
    super(name);
    SwapMasterUtil.getInstance(this);
}

@Override
public void serviceInit() throws Exception {
    dispatcher = new AsyncDispatcher();

    //注册Job和Task事件调度器,所以数据源的注册都在此
    dispatcher.register(SwapJobEventType.class,new JobBase.SwapJobEventDispatcher());
    dispatcher.register(HiveExportType.class,new HiveExportJob.HiveExportDispatcher());
    dispatcher.register(HiveImportType.class,new HiveImportJob.HiveImportDispatcher());

    dispatcher.register(ESExportType.class,new ESExportJob.ESExportDispatcher());
    dispatcher.register(ESImportType.class,new ESImportJob.ESImportDispatcher());



    dispatcher.register(MySQLExportType.class,new MySQLExportJob.MySQLExportDispatcher());
    dispatcher.register(MySQLImportType.class,new MySQLImportJob.MySQLImportDispatcher());

    dispatcher.register(OracleExportType.class,new OracleExportJob.OracleExportDispatcher());
    dispatcher.register(OracleImportType.class,new OracleImportJob.OracleImportDispatcher());

    dispatcher.register(GbaseExportType.class,new GbaseExportJob.GbaseExportDispatcher());
    dispatcher.register(GbaseImportType.class,new GbaseImportJob.GbaseImportDispatcher());

    dispatcher.register(FileExportType.class,new FileExportJob.FileExportDispatcher());
    dispatcher.register(FileImportType.class,new FileImportJob.FileImportDispatcher());

    dispatcher.register(Neo4jExportType.class,new Neo4jExportJob.Neo4jExportDispatcher());
    dispatcher.register(Neo4jImportType.class,new Neo4jImportJob.Neo4jImportDispatcher());

    addService((Service)dispatcher);
    super.serviceInit();
}

@Override
public void serviceStart() throws Exception {
    super.serviceStart();
}

@Override
public void serviceStop() throws Exception {
    super.serviceStop();
}

public Dispatcher getDispatcher() {
    return dispatcher;
}

}

这个也是整个模型client请求入口。
>测试
```plain
public class TestSimpleMRAppMaster extends CompositeService  {
    private Dispatcher dispatcher;//中央异步调度器
    private String jobID;
    private int taskNumber;//该作业包含的任务数目
    private String[] taskIDs;//该作业内部包含的所有任务





    public TestSimpleMRAppMaster(String name, String jobID, int taskNumber) {
        super(name);
        this.jobID = jobID;
        this.taskNumber = taskNumber;
        this.taskIDs = new String[taskNumber];
        for (int i = 0; i < taskNumber; i++) {
            taskIDs[i] = new String(jobID+"_task_"+i);
        }
    }

    @Override
    public void serviceInit() throws Exception {
        dispatcher = new AsyncDispatcher();
        //注册Job和Task事件调度器
        dispatcher.register(TestJobEventType.class,new JobEventDispatcher());
        dispatcher.register(TestTaskEventType.class,new TaskEventDispatcher());
        addService((Service)dispatcher);
        super.serviceInit();
    }





    public Dispatcher getDispatcher() {
        return dispatcher;
    }

    private class JobEventDispatcher implements EventHandler<TestJobEvent> {



        public void handle(TestJobEvent event) {
            if(event.getType() == TestJobEventType.JOB_KILL){
                System.out.println("Receive JOB_KILL event, killing all the tasks");
                for (int i = 0; i < taskNumber; i++) {
                    dispatcher.getEventHandler().handle(new TestTaskEvent(taskIDs[i], TestTaskEventType.T_KILL));
                }
            }else if(event.getType() == TestJobEventType.JOB_INIT){
                System.out.println("Receive JOB_INIT event, scheduling tasks");
                for (int i = 0; i < taskNumber; i++) {
                    dispatcher.getEventHandler().handle(new TestTaskEvent(taskIDs[i], TestTaskEventType.T_SCHEDULE));
                }
            }
        }
    }

    private class TaskEventDispatcher implements EventHandler<TestTaskEvent> {

        public void handle(TestTaskEvent event) {
            if (event.getType() == TestTaskEventType.T_KILL) {
                System.out.println("Receive T_KILL event of task" + event.getTaskID());
            } else if (event.getType() == TestTaskEventType.T_SCHEDULE) {
                System.out.println("Receive T_SCHEDULE of task" + event.getTaskID());
            }
        }
    }
}

public class TestSimpleMRAppMasterRun {
    public static void main(String[] args)  throws Exception  {
        String jobID = "job_2020";
        TestSimpleMRAppMaster appMaster  = new TestSimpleMRAppMaster("Simple MRAppMaster", jobID, 5);

        appMaster.serviceInit();
        appMaster.serviceStart();

        /**
         * Receive JOB_INIT event, scheduling tasks
         * Receive JOB_KILL event, killing all the tasks
         * Receive T_SCHEDULE of taskjob_2020_task_0
         * Receive T_SCHEDULE of taskjob_2020_task_1
         * Receive T_SCHEDULE of taskjob_2020_task_2
         * Receive T_SCHEDULE of taskjob_2020_task_3
         * Receive T_SCHEDULE of taskjob_2020_task_4
         * Receive T_KILL event of taskjob_2020_task_0
         * Receive T_KILL event of taskjob_2020_task_1
         * Receive T_KILL event of taskjob_2020_task_2
         * Receive T_KILL event of taskjob_2020_task_3
         * Receive T_KILL event of taskjob_2020_task_4
         */
        //TestTaskEvent
        appMaster.getDispatcher().getEventHandler().handle((AbstractEvent)new TestTaskEvent(jobID, TestTaskEventType.T_SCHEDULE));
        //appMaster.getDispatcher().getEventHandler().handle(new TestTaskEvent(jobID, TestTaskEventType.JOB_KILL));

        appMaster.serviceStop();
    }
}

结果:
Receive T_SCHEDULE of taskjob_2020

实现-分组消费架构

首先项目地址https://github.com/77954309/scheduler

目的
因为事件存在大量不同种类,紧急程度不同,中间件要适应大数据基础服务,多引擎,多队列,需要合理分组,并能够及时监控消费状况。

执行逻辑

image.png

Scheduler

abstract class Scheduler {
  def init():Unit
  def start():Unit
  def getName:String
  def submit(event:SchedulerEvent):Unit
  def get(event: SchedulerEvent): Option[SchedulerEvent]
  def get(eventId: String): Option[SchedulerEvent]
  def shutdown():Unit
  def getSchedulerContext:SchedulerContext
}
object Scheduler extends Logging{
  def createScheduler(scheduleType: String, schedulerContext: SchedulerContext): Option[Scheduler] = {
    scheduleType match {
      case "FIFO" => Some(new FIFOScheduler(schedulerContext))
      case "PARA" =>  Some(new ParallelScheduler(schedulerContext))
      case _ => {
        error("Please enter the correct scheduling type!(请输入正确的调度类型!)")
        None
      }
    }
  }
}
  • 当事件抵达,调用submit。
  • 通过SchedulerContext上下文,获取groupName。
  • 创建消费者,放入等待队列中。
  • 设置事件id。

    ConsumerManager

消费管理器注册在SchedulerContext中,维护着分组消费器关系,初始化消费者。

abstract class ConsumerManager {

  private var schedulerContext:SchedulerContext = _

  def setSchedulerContext(schedulerContext:SchedulerContext) :Unit = this.schedulerContext = schedulerContext

  def getSchedulerContext:SchedulerContext = schedulerContext

  def setConsumerListener(consumerListener:ConsumerListener):Unit

  def getOrCreateExecutorService:ExecutorService

  def getOrCreateConsumer(groupName:String):Consumer

  protected def createConsumer(groupName:String):Consumer

  def destroyConsumer(groupName:String):Unit

  def shutdown():Unit

  def listConsumers():Array[Consumer]

}

FIFOUserConsumer

消费器维护等待队列和消费队列,加入事件时,会执行长连接线程,轮询要消费的队列。

override def run(): Unit = {
  Thread.currentThread().setName(s"${toString}Thread")
  info(s"$toString thread started!")
  while (!terminate){
    Utils.tryAndError(loop())
    Utils.tryAndError(Thread.sleep(10))
  }
  info(s"$toString thread stopped!")
}

小结

把主要的运行逻辑分析完了,感兴趣的朋友自行下载源码查看。

总结

基本分析完了,感觉兴趣的朋友可以给我留言互相交流。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
3月前
|
存储 机器学习/深度学习 固态存储
「读书笔记」《大规模分布式存储系统:原理解析与架构实战》:二
「读书笔记」《大规模分布式存储系统:原理解析与架构实战》:二
「读书笔记」《大规模分布式存储系统:原理解析与架构实战》:二
|
3月前
|
存储 安全 网络安全
「读书笔记」《大规模分布式存储系统:原理解析与架构实战》:八
「读书笔记」《大规模分布式存储系统:原理解析与架构实战》:八
|
3月前
|
分布式计算 关系型数据库 大数据
「读书笔记」《大规模分布式存储系统:原理解析与架构实战》:九
「读书笔记」《大规模分布式存储系统:原理解析与架构实战》:九
|
3月前
|
存储 负载均衡 算法
「读书笔记」《大规模分布式存储系统:原理解析与架构实战》:一
「读书笔记」《大规模分布式存储系统:原理解析与架构实战》:一
|
3月前
|
运维 Oracle 容灾
Oracle dataguard 容灾技术实战(笔记),教你一种更清晰的Linux运维架构
Oracle dataguard 容灾技术实战(笔记),教你一种更清晰的Linux运维架构
|
3天前
|
弹性计算 监控 数据挖掘
事件驱动架构的优势与应用:深度解析与实战应用
【8月更文挑战第17天】事件驱动架构以其松耦合、可扩展性、异步处理、实时性和高可靠性等优势,在实时数据处理、复杂业务流程、弹性伸缩和实时通信等多个领域展现出巨大的应用潜力。通过合理应用事件驱动架构,可以构建灵活、可扩展和可维护的系统架构,满足不断变化的业务需求和技术挑战。对于开发者而言,深入理解事件驱动架构的核心概念和优势,将有助于更好地设计和实现高质量的软件系统。
|
18天前
|
XML 存储 Android开发
Android实战经验之Kotlin中快速实现MVI架构
本文介绍MVI(Model-View-Intent)架构模式,强调单向数据流与不可变状态管理,提升Android应用的可维护性和可测试性。MVI分为Model(存储数据)、View(展示UI)、Intent(用户动作)、State(UI状态)与ViewModel(处理逻辑)。通过Kotlin示例展示了MVI的实现过程,包括定义Model、State、Intent及创建ViewModel,并在View中观察状态更新UI。
56 12
|
1月前
|
Kubernetes Cloud Native 微服务
企业级容器部署实战:基于ACK与ALB灵活构建云原生应用架构
这篇内容概述了云原生架构的优势,特别是通过阿里云容器服务Kubernetes版(ACK)和应用负载均衡器(ALB)实现的解决方案。它强调了ACK相对于自建Kubernetes的便利性,包括优化的云服务集成、自动化管理和更强的生态系统支持。文章提供了部署云原生应用的步骤,包括一键部署和手动部署的流程,并指出手动部署更适合有技术背景的用户。作者建议在预算允许的情况下使用ACK,因为它能提供高效、便捷的管理体验。同时,文章也提出了对文档改进的建议,如添加更多技术细节和解释,以帮助用户更好地理解和实施解决方案。最后,展望了ACK未来在智能化、安全性与边缘计算等方面的潜在发展。水文一篇,太忙了,见谅!
|
2月前
|
监控 API 数据库
构建高效后端:微服务架构的实战指南
【6月更文挑战第14天】在数字化浪潮下,后端开发面临着前所未有的挑战和机遇。本文将深入探讨微服务架构的设计理念、实现方式及其在现代软件开发中的重要性,为读者提供一份全面而实用的微服务实战手册。
39 2
|
2月前
|
调度
【灵动之链】打造高效处理架构的双轨组合模式实战
【灵动之链】打造高效处理架构的双轨组合模式实战