消费架构实战-实现

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 消费架构实战-实现

实现-事件服务模型

我以开源数据交换(https://github.com/dlimeng/exchange-parent),来讲解一些源码,毕竟是经历过线上的中间件,这样有兴趣的朋友用时,很轻松可以迁移过去。
image.png

因为项目代码很多,我只讲解事件服务模型部分,其它的略过。

代码在scheduler模块中。

Service
```java
//所有服务的接口,有状态机
public interface Service extends Closeable {

/**
 * Service states
 */
public enum STATE {

    NOTINITED(0, "NOTINITED"),

    INITED(1, "INITED"),



    STARTED(2, "STARTED"),



    STOPPED(3, "STOPPED");



    private final int value;

    private final String statename;

    private STATE(int value, String name) {
        this.value = value;
        this.statename = name;
    }

    public int getValue() {
        return value;
    }



    @Override
    public String toString() {
        return statename;
    }
}



void start();

void stop();

@Override
void close() throws IOException;

String getName();

STATE  getServiceState();

long getStartTime();

boolean isInState(STATE state);

Throwable getFailureCause();

STATE getFailureState();

boolean waitForServiceToStop(long timeout);

void init();

}



有两个类:

AbstractService 单个服务抽象类,实现单个业务功能。

CompositeService 组合服务。

>事件
```java
//所有事件的抽象类
public abstract class AbstractEvent<TYPE extends Enum<TYPE>> implements Event<TYPE> {



    private final TYPE type;

    private final long timestamp;

    public AbstractEvent(TYPE type) {
        this.type = type;
        this.timestamp = 1L;
    }

    public AbstractEvent(TYPE type,long timestamp){
        this.type = type;
        this.timestamp = timestamp;
    }



    @Override
    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public TYPE getType() {
        return type;
    }

    @Override
    public String toString() {
        return "EventType: " + getType();
    }

}

中央处理器
继承AbstractService

/**
 * 获取事件
 * 先进先出队列,阻塞队列
 * @return
 */
Runnable createThread() {
   
   
  return new Runnable() {
   
   
    @Override
    public void run() {
   
   
      while (!stopped && !Thread.currentThread().isInterrupted()) {
   
   
        drained = eventQueue.isEmpty();
        // blockNewEvents is only set when dispatcher is draining to stop,
        // adding this check is to avoid the overhead of acquiring the lock
        // and calling notify every time in the normal run of the loop.
        if (blockNewEvents) {
   
   
          synchronized (waitForDrained) {
   
   
            if (drained) {
   
   
              waitForDrained.notify();
            }
          }
        }
        Event event;
        try {
   
   
          event = eventQueue.take();
        } catch(InterruptedException ie) {
   
   
          if (!stopped) {
   
   
            LOG.warn("AsyncDispatcher thread interrupted", ie);
          }
          return;
        }
        if (event != null) {
   
   
          dispatch(event);
        }
      }
    }
  };
}
 /**
   * 适配启动设计handler处理器
   * @param event
   */
  @SuppressWarnings("unchecked")
  protected void dispatch(Event event) {
   
   
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
   
   
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }



    Class<? extends Enum> type = event.getType().getDeclaringClass();



    try{
   
   
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
   
   
        handler.handle(event);
      } else {
   
   
        throw new Exception("No handler for registered for " + type);
      }
    } catch (Throwable t) {
   
   
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      // If serviceStop is called, we should exit this thread gracefully.
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false
          && stopped == false) {
   
   
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
  }
/**
**注册事件,根据eventType绑定
**
**/
 @SuppressWarnings("unchecked")
  @Override
  public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
   
   
    /* check to see if we have a listener registered */
    EventHandler<Event> registeredHandler = (EventHandler<Event>)
    eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
   
   
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
   
   
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
   
   
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

节选一部分。

组合服务
```java
public class SwapMaster extends CompositeService {
private Dispatcher dispatcher;//中央异步调度器

private volatile boolean isAlive=true;

public void setAlive(boolean alive) {
    isAlive = alive;
}

public boolean isAlive() {
    return isAlive;
}

public SwapMaster(String name) {
    super(name);
    SwapMasterUtil.getInstance(this);
}

@Override
public void serviceInit() throws Exception {
    dispatcher = new AsyncDispatcher();

    //注册Job和Task事件调度器,所以数据源的注册都在此
    dispatcher.register(SwapJobEventType.class,new JobBase.SwapJobEventDispatcher());
    dispatcher.register(HiveExportType.class,new HiveExportJob.HiveExportDispatcher());
    dispatcher.register(HiveImportType.class,new HiveImportJob.HiveImportDispatcher());

    dispatcher.register(ESExportType.class,new ESExportJob.ESExportDispatcher());
    dispatcher.register(ESImportType.class,new ESImportJob.ESImportDispatcher());



    dispatcher.register(MySQLExportType.class,new MySQLExportJob.MySQLExportDispatcher());
    dispatcher.register(MySQLImportType.class,new MySQLImportJob.MySQLImportDispatcher());

    dispatcher.register(OracleExportType.class,new OracleExportJob.OracleExportDispatcher());
    dispatcher.register(OracleImportType.class,new OracleImportJob.OracleImportDispatcher());

    dispatcher.register(GbaseExportType.class,new GbaseExportJob.GbaseExportDispatcher());
    dispatcher.register(GbaseImportType.class,new GbaseImportJob.GbaseImportDispatcher());

    dispatcher.register(FileExportType.class,new FileExportJob.FileExportDispatcher());
    dispatcher.register(FileImportType.class,new FileImportJob.FileImportDispatcher());

    dispatcher.register(Neo4jExportType.class,new Neo4jExportJob.Neo4jExportDispatcher());
    dispatcher.register(Neo4jImportType.class,new Neo4jImportJob.Neo4jImportDispatcher());

    addService((Service)dispatcher);
    super.serviceInit();
}

@Override
public void serviceStart() throws Exception {
    super.serviceStart();
}

@Override
public void serviceStop() throws Exception {
    super.serviceStop();
}

public Dispatcher getDispatcher() {
    return dispatcher;
}

}

这个也是整个模型client请求入口。
>测试
```plain
public class TestSimpleMRAppMaster extends CompositeService  {
    private Dispatcher dispatcher;//中央异步调度器
    private String jobID;
    private int taskNumber;//该作业包含的任务数目
    private String[] taskIDs;//该作业内部包含的所有任务





    public TestSimpleMRAppMaster(String name, String jobID, int taskNumber) {
        super(name);
        this.jobID = jobID;
        this.taskNumber = taskNumber;
        this.taskIDs = new String[taskNumber];
        for (int i = 0; i < taskNumber; i++) {
            taskIDs[i] = new String(jobID+"_task_"+i);
        }
    }

    @Override
    public void serviceInit() throws Exception {
        dispatcher = new AsyncDispatcher();
        //注册Job和Task事件调度器
        dispatcher.register(TestJobEventType.class,new JobEventDispatcher());
        dispatcher.register(TestTaskEventType.class,new TaskEventDispatcher());
        addService((Service)dispatcher);
        super.serviceInit();
    }





    public Dispatcher getDispatcher() {
        return dispatcher;
    }

    private class JobEventDispatcher implements EventHandler<TestJobEvent> {



        public void handle(TestJobEvent event) {
            if(event.getType() == TestJobEventType.JOB_KILL){
                System.out.println("Receive JOB_KILL event, killing all the tasks");
                for (int i = 0; i < taskNumber; i++) {
                    dispatcher.getEventHandler().handle(new TestTaskEvent(taskIDs[i], TestTaskEventType.T_KILL));
                }
            }else if(event.getType() == TestJobEventType.JOB_INIT){
                System.out.println("Receive JOB_INIT event, scheduling tasks");
                for (int i = 0; i < taskNumber; i++) {
                    dispatcher.getEventHandler().handle(new TestTaskEvent(taskIDs[i], TestTaskEventType.T_SCHEDULE));
                }
            }
        }
    }

    private class TaskEventDispatcher implements EventHandler<TestTaskEvent> {

        public void handle(TestTaskEvent event) {
            if (event.getType() == TestTaskEventType.T_KILL) {
                System.out.println("Receive T_KILL event of task" + event.getTaskID());
            } else if (event.getType() == TestTaskEventType.T_SCHEDULE) {
                System.out.println("Receive T_SCHEDULE of task" + event.getTaskID());
            }
        }
    }
}

public class TestSimpleMRAppMasterRun {
    public static void main(String[] args)  throws Exception  {
        String jobID = "job_2020";
        TestSimpleMRAppMaster appMaster  = new TestSimpleMRAppMaster("Simple MRAppMaster", jobID, 5);

        appMaster.serviceInit();
        appMaster.serviceStart();

        /**
         * Receive JOB_INIT event, scheduling tasks
         * Receive JOB_KILL event, killing all the tasks
         * Receive T_SCHEDULE of taskjob_2020_task_0
         * Receive T_SCHEDULE of taskjob_2020_task_1
         * Receive T_SCHEDULE of taskjob_2020_task_2
         * Receive T_SCHEDULE of taskjob_2020_task_3
         * Receive T_SCHEDULE of taskjob_2020_task_4
         * Receive T_KILL event of taskjob_2020_task_0
         * Receive T_KILL event of taskjob_2020_task_1
         * Receive T_KILL event of taskjob_2020_task_2
         * Receive T_KILL event of taskjob_2020_task_3
         * Receive T_KILL event of taskjob_2020_task_4
         */
        //TestTaskEvent
        appMaster.getDispatcher().getEventHandler().handle((AbstractEvent)new TestTaskEvent(jobID, TestTaskEventType.T_SCHEDULE));
        //appMaster.getDispatcher().getEventHandler().handle(new TestTaskEvent(jobID, TestTaskEventType.JOB_KILL));

        appMaster.serviceStop();
    }
}

结果:
Receive T_SCHEDULE of taskjob_2020

实现-分组消费架构

首先项目地址https://github.com/77954309/scheduler

目的
因为事件存在大量不同种类,紧急程度不同,中间件要适应大数据基础服务,多引擎,多队列,需要合理分组,并能够及时监控消费状况。

执行逻辑

image.png

Scheduler

abstract class Scheduler {
  def init():Unit
  def start():Unit
  def getName:String
  def submit(event:SchedulerEvent):Unit
  def get(event: SchedulerEvent): Option[SchedulerEvent]
  def get(eventId: String): Option[SchedulerEvent]
  def shutdown():Unit
  def getSchedulerContext:SchedulerContext
}
object Scheduler extends Logging{
  def createScheduler(scheduleType: String, schedulerContext: SchedulerContext): Option[Scheduler] = {
    scheduleType match {
      case "FIFO" => Some(new FIFOScheduler(schedulerContext))
      case "PARA" =>  Some(new ParallelScheduler(schedulerContext))
      case _ => {
        error("Please enter the correct scheduling type!(请输入正确的调度类型!)")
        None
      }
    }
  }
}
  • 当事件抵达,调用submit。
  • 通过SchedulerContext上下文,获取groupName。
  • 创建消费者,放入等待队列中。
  • 设置事件id。

    ConsumerManager

消费管理器注册在SchedulerContext中,维护着分组消费器关系,初始化消费者。

abstract class ConsumerManager {

  private var schedulerContext:SchedulerContext = _

  def setSchedulerContext(schedulerContext:SchedulerContext) :Unit = this.schedulerContext = schedulerContext

  def getSchedulerContext:SchedulerContext = schedulerContext

  def setConsumerListener(consumerListener:ConsumerListener):Unit

  def getOrCreateExecutorService:ExecutorService

  def getOrCreateConsumer(groupName:String):Consumer

  protected def createConsumer(groupName:String):Consumer

  def destroyConsumer(groupName:String):Unit

  def shutdown():Unit

  def listConsumers():Array[Consumer]

}

FIFOUserConsumer

消费器维护等待队列和消费队列,加入事件时,会执行长连接线程,轮询要消费的队列。

override def run(): Unit = {
  Thread.currentThread().setName(s"${toString}Thread")
  info(s"$toString thread started!")
  while (!terminate){
    Utils.tryAndError(loop())
    Utils.tryAndError(Thread.sleep(10))
  }
  info(s"$toString thread stopped!")
}

小结

把主要的运行逻辑分析完了,感兴趣的朋友自行下载源码查看。

总结

基本分析完了,感觉兴趣的朋友可以给我留言互相交流。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
17天前
|
运维 NoSQL Java
后端架构演进:微服务架构的优缺点与实战案例分析
【10月更文挑战第28天】本文探讨了微服务架构与单体架构的优缺点,并通过实战案例分析了微服务架构在实际应用中的表现。微服务架构具有高内聚、低耦合、独立部署等优势,但也面临分布式系统的复杂性和较高的运维成本。通过某电商平台的实际案例,展示了微服务架构在提升系统性能和团队协作效率方面的显著效果,同时也指出了其带来的挑战。
55 4
|
1月前
|
存储 前端开发 API
DDD领域驱动设计实战-分层架构
DDD分层架构通过明确各层职责及交互规则,有效降低了层间依赖。其基本原则是每层仅与下方层耦合,分为严格和松散两种形式。架构演进包括传统四层架构与改良版四层架构,后者采用依赖反转设计原则优化基础设施层位置。各层职责分明:用户接口层处理显示与请求;应用层负责服务编排与组合;领域层实现业务逻辑;基础层提供技术基础服务。通过合理设计聚合与依赖关系,DDD支持微服务架构灵活演进,提升系统适应性和可维护性。
|
6月前
|
运维 Oracle 容灾
Oracle dataguard 容灾技术实战(笔记),教你一种更清晰的Linux运维架构
Oracle dataguard 容灾技术实战(笔记),教你一种更清晰的Linux运维架构
|
2月前
|
运维 持续交付 API
深入理解并实践微服务架构:从理论到实战
深入理解并实践微服务架构:从理论到实战
134 3
|
2月前
|
存储 缓存 负载均衡
亿级流量架构理论+秒杀实战系列(二)
亿级流量架构理论+秒杀实战系列(二)
|
2月前
|
运维 监控 持续交付
深入浅出:微服务架构的设计与实战
微服务,一个在软件开发领域如雷贯耳的名词,它代表着一种现代软件架构的风格。本文将通过浅显易懂的语言,带领读者从零开始了解微服务的概念、设计原则及其在实际项目中的运用。我们将一起探讨如何将一个庞大的单体应用拆分为灵活、独立、可扩展的微服务,并分享一些实践中的经验和技巧。无论你是初学者还是有一定经验的开发者,这篇文章都将为你提供新的视角和深入的理解。
81 3
|
2月前
|
SQL 缓存 运维
亿级流量架构理论+秒杀实战系列(一)
亿级流量架构理论+秒杀实战系列(一)
|
2月前
|
消息中间件 应用服务中间件 数据库
亿级流量架构理论+秒杀实战系列(三)
亿级流量架构理论+秒杀实战系列(三)
|
3月前
|
弹性计算 监控 数据挖掘
事件驱动架构的优势与应用:深度解析与实战应用
【8月更文挑战第17天】事件驱动架构以其松耦合、可扩展性、异步处理、实时性和高可靠性等优势,在实时数据处理、复杂业务流程、弹性伸缩和实时通信等多个领域展现出巨大的应用潜力。通过合理应用事件驱动架构,可以构建灵活、可扩展和可维护的系统架构,满足不断变化的业务需求和技术挑战。对于开发者而言,深入理解事件驱动架构的核心概念和优势,将有助于更好地设计和实现高质量的软件系统。
|
3月前
|
XML 存储 Android开发
Android实战经验之Kotlin中快速实现MVI架构
本文介绍MVI(Model-View-Intent)架构模式,强调单向数据流与不可变状态管理,提升Android应用的可维护性和可测试性。MVI分为Model(存储数据)、View(展示UI)、Intent(用户动作)、State(UI状态)与ViewModel(处理逻辑)。通过Kotlin示例展示了MVI的实现过程,包括定义Model、State、Intent及创建ViewModel,并在View中观察状态更新UI。
169 12

热门文章

最新文章