集成平台下连接器设计规范: 调研-实践-思考

简介: 云集成是当今企业面临的主要挑战之一,为了满足对安全可靠的云集成解决方案日益增长的需求,一些供应商已开始提供集成服务,称为集成平台即服务 (iPaaS)。对于集成平台而言, 连接器可以看做平台运行时引擎的可重用扩展, 能够将 集成平台 应用程序与第三方 API、数据库和标准集成协议集成。连接器抽象了连接到目标系统所涉及的技术细节。本文探索了部分优秀的集成平台以及其各自的连接器规范, 对于现有链接场景的存在问题进行了一定的思考, 同时提出了我们自己的数据集成平台以及连接器设计思路, 希望可以通过Full-Code, Low-Code以及Platform三种方案灵活切换的方式解决实际需求。

前言

为什么需要集成平台?

云集成是当今企业面临的主要挑战之一,这已不是什么秘密 。为了满足对安全可靠的云集成解决方案日益增长的需求,一些供应商已开始提供集成服务,称为集成平台即服务 (iPaaS)。

  • iPaaS 是一种基于云的集成解决方案,是一个用于在云内部以及云与企业之间构建和部署集成的平台。借助 iPaaS,用户可以开发连接云或本地应用程序的集成流程,然后在无需安装或管理任何硬件或中间件的情况下进行部署。
  • Gartner 为 iPaaS 类别提供了进一步的定义和特殊性,概述了其参考模型中的一些关键功能。其中包括支持集成流执行的工具和技术、集成的开发和生命周期管理、应用程序流的管理和监控、治理以及基本的云功能,例如多租户、弹性和自配置。Gartner 还将 iPaaS 视为用户、服务提供商和集成提供商之间购买、销售和交换集成流(开箱即用和定制模式)的潜在平台 。
  • 由于 iPaaS 模型仍处于早期阶段,Gartner 指出,目前市场上的 iPaaS 产品可能并未包含其参考模型中的所有功能。相反,Gartner 确定了三类 iPaaS 供应商,每类都强调不同的集成领域:电子商务和 B2B 集成、云集成以及 企业服务总线 (ESB) 和 面向服务的架构 (SOA) 基础架构。
  • 根据企业的特定集成需求,某些供应商选项可能比其他选项更适合。对于短期集成需求,专注于电子商务/B2B 集成和云集成的 iPaaS 产品提供了快速连接合作伙伴应用程序和云服务的简单解决方案。

然而,鉴于向混合架构的不断转变,开始考虑长期集成战略以支持包括本地资源和云服务的计算模型将是明智之举。在三个供应商类别中,具有 ESB 和 SOA 背景的供应商提供的 iPaaS 产品为长期集成和治理项目提供了最平衡和最强大的功能集。尽管 Gartner 指出其中一些产品目前处于测试阶段或开发阶段,但显然具有 ESB 和 SOA 专业知识的 iPaaS 供应商最适合解决云时代的系统集成问题。

连接器是什么?

对于集成平台而言, 连接器可以看做平台运行时引擎的可重用扩展, 能够将 集成平台 应用程序与第三方 API、数据库和标准集成协议集成。连接器抽象了连接到目标系统所涉及的技术细节。

在 集成平台 应用程序中使用连接器具有以下优势:

  • 降低代码复杂性,可以将 应用程序连接到目标系统,而无需了解向目标系统编程所需的所有细节
  • 简化针对目标系统的身份验证
  • 主动推断目标系统的元数据,从而更轻松地使用表达式引擎识别和转换数据
  • 使代码维护更容易,因为:
  • 并非目标系统中的所有更改都需要更改应用程序。
  • 无需更新应用程序的其他部分即可更新连接器配置。

已有集成平台与连接器调研

Apache-Camel

简介

Apache Camel 是一个开源 Java 框架,专注于使开发人员更容易和更容易地进行集成。它通过提供:

  • 所有广泛使用的 EIP 的具体实现
  • 连接到各种各样的传输和 API
  • 易于使用的领域特定语言 (DSL) 将 EIP 和传输连接在一起

下图显示了这三个项目实际上是如何映射到 Camel 概念的。Camel 的组织方式以组件、端点、处理器和 DSL呈现。

网络异常,图片无法展示
|

Camel 架构的高级视图(来自Camel in Action)。

组件是 Camel 中的扩展点,用于添加与其他系统的连接。Camel 的核心非常小,以保持低依赖性、促进可嵌入性等,因此仅包含 26 个基本组件。核心之外有 316 多个组件。为了将这些系统暴露给 Camel 的其余部分,组件提供了一个端点接口。通过使用 URI,您可以以统一的方式在 Endpoints 上发送或接收消息。例如,要从 JMS 队列aQueue接收消息并将它们发送到文件系统目录“/tmp”,您可以使用“jms:aQueue”和“file:/tmp”之类的 URI。

处理器用于操作和调解端点之间的消息。所有 EIP 都被定义为处理器或处理器集。为了将处理器和端点连接在一起,Camel 用 Java、Scala 和 Groovy 等常规编程语言定义了多个 DSL。它还允许在 XML 中指定路由规则。

连接器

Camel目前已经存在数百种连接器, 而了解连接器的最好方式便是从如何创建一个连接器入手. 连接器在Camel中被统一称为组件.

Apache Camel 的设计目的是让添加新组件变得非常容易,无论它们是路由组件、转换器、传输等。组件的想法是成为端点的工厂和管理器, 以下是添加新组件的主要步骤:

  • 编写一个实现该Component接口的 POJO。最简单的方法就是从DefaultComponent进行扩展.
  • 要支持组件的自动发现,请添加一个文件,META-INF/services/org/apache/camel/component/FOO其中 FOO 是组件的 URI 方案以及动态创建的任何相关端点。这个文件应该包含组件类全名的信息。例如,如果您的组件由com.example.CustomComponent类实现,则文件应包含以下行 -  class=com.example.CustomComponent.
  • 然后,用户可以显式创建您的组件,对其进行配置并使用 a 注册它,CamelContext或者他们可以使用自动创建您的组件的 URI。建议使用Camel Maven Archetypes引导您的初始组件,因为它将为您提供所有必要的位来轻松开始开发您的组件。您还需要确保在组件文件中包含Camel 组件 Maven 插件pom.xml,以便为组件生成所有必要的元数据和 Java 文件。

编写端点

  • 在实现Endpoint时,您通常可以实现以下一种或多种方法:
  • createProducer将创建一个生产者来向端点发送消息交换
  • createConsumer实现事件驱动的消费者模式来消费来自端点的消息交换。
  • 通常,您只是从DefaultEndpoint集成并实现接口即可

注释您的端点

  • 如果您想受益于自动生成端点上所有参数的 HTML 文档作为 maven 站点报告的一部分,您需要注释端点的参数
  • 这意味着您@UriEndpoint向 Endpoint 类添加注释,然后注释您希望通过 URI 配置机制配置的每个参数@UriParam(或@UriParams嵌套配置对象)。

选项

  • 如果你的组件有选项,你可以让它有公共的 getter/setter,Camel 会在端点创建时自动设置属性。
  • 但是,如果您想自己解决问题,则必须从给定参数列表中删除该选项,因为 Camel 将验证是否使用了所有选项。如果不是,Camel 将抛出一个ResolveEndpointFailedException说明哪些选项是未知的。
  • 参数由 Camel 在类的createEndpoint方法中提供DefaultComponent
protected abstract Endpoint<E> createEndpoint(String uri, String remaining, Map parameters)

该代码是来自SEDA组件的示例,它删除了 size 参数:

public BlockingQueue<Exchange> createQueue(String uri, Map parameters) {
        int size = 1000;
        Object value = parameters.remove("size");
        if (value != null) {
            Integer i = convertTo(Integer.class, value);
            if (i != null) {
                size = i;
            }
        }
        return new LinkedBlockingQueue<Exchange>(size);
    }

使用示例

以下例子为使用java-dsl操作camel进行文件操作: 利用from-choice-when-to等结构实现条件控制方式,

/**
 * A Camel Java DSL Router
 */
public class MyRouteBuilder extends RouteBuilder {
    /**
     * Let's configure the Camel routing rules using Java code...
     */
    public void configure() {
        // here is a sample which processes the input files
        // (leaving them in place - see the 'noop' flag)
        // then performs content based routing on the message using XPath
        from("file:src/data?noop=true")
            .choice()
                .when(xpath("/person/city = 'London'"))
                    .to("file:target/messages/uk")
                .otherwise()
                    .to("file:target/messages/others");
    }
}

本例子读取文件夹下的源文件, 并根据文件中内容决定要将文件发送至何处文件夹下, 可是看出其实际使用较为明确, 可以明显看出业务逻辑, 同时简化了实现细节

Mule/MuleSoft

简介

什么是 MuleSoft?MuleSoft 是一个平台,它为 IT 提供了自动化一切的工具。这包括集成数据和系统、自动化工作流程和流程以及创造令人难以置信的数字体验——所有这些都在一个易于使用的单一平台上完成。通过我们独特的方法,IT 创建了团队可以根据需要使用的数字构建块,所有这些都内置了正确的安全、治理和合规措施。 MuleSoft 在三件事上帮助 IT 团队:通过集成解锁系统和数据,通过自动化提高生产力和效率,以及创造引人入胜的数字体验。我们的可组合连接方法将每个数字资产变成可重复使用的产品。使用这种方法,团队可以更快地交付项目。而Mule则是MuleSoft平台下一个主要产品

连接器

经过数年沉淀, Mule也已经存在上百种功能各异的连接器, 在Mule 4规范中, 这些连接器被统一称为扩展(Extension).

mule的连接器需要利用maven生成初始化项目, 之后在项目中按照模板文件进行开发

mvn org.mule.extensions:mule-extensions-archetype-maven-plugin:generate
  • 输入扩展名:DemoConnector
  • 输入扩展的groupId:com.Demo.muleConnector
  • 输入扩展的 artifactId:mulesoft-demo-connector
  • 输入扩展的版本:1.0.0
  • 进入扩展的主包:org.mule.extension.Demo

构建成功后,转到您的文件夹,您将找到一个带有 artifact-id 的连接器包。

您将找到以下类和文件夹结构。在本节中,我们将概述您将在每个文件夹中找到的内容:

网络异常,图片无法展示
|

src/main/java 文件夹: 此文件夹包含连接器的源 Java 文件,包括用于操作和连接配置的框架 Java 文件。其他支持类也应该存储在这里。一旦我们在 Anypoint Studio 中打开这个项目,就会有许多类,它们将使用 Mule SDK 注释进行注释,如下所示:

  • <connector-name>Extension.java:此类标识连接器的各种属性。在 Mule 4 中,连接器只是一个扩展。该类将识别哪些是配置类,哪些是操作类。
  • <connector-name>Configuration.java:这包含您想要从连接器的全局配置中获得的所有信息。
  • <connector-name>Connection.java:连接类负责处理连接,在我们的例子中,大部分实际编码都在这里。
  • <connector-name>ConnectionProvider.java:该类用于管理和提供与目标系统的连接。 连接提供者必须实现 Mule 中可用的连接提供者之一。选项是 PoolingConnectionProvider、CachedConnectionProvider 和 ConnectionProvider。
  • <connector-name>Operations.java:这是您定义所有必要操作的类。可以有多个操作类文件。

使用示例

MuleSoft开发程序使用AnyPointStudio, 为根据IDE改造的平台, 存在可视化界面, 其模型编排如下,下方应用实现了与camel示例相同的功能

可以看出, 其主要逻辑流程均可由拖拽完成, 方便用户操作, 上手难度也相对较低, 用户友好型较高。

优缺点分析

优点

  • 作为集成平台, 提供了大量丰富的连接器能力, 使得用户可以使用较低的开发时间/开发成本实现所需功能
  • 相较于传统的软件开发, 用户可以减少对于底层细节的关注, 从而将更多的精力投注在业务逻辑上
  • Getting More While Doing Less

缺点

  • 存在学习成本, camel的dsl流程编排需要对于组件, 库, Route均存在一定程度的了解, 需要额外耗费开发者精力/mule虽然存在可视化界面, 但是上手仍然有一定难度
  • 组件运行依赖于平台本身, 可能组件本身只实现了很简单的功能, 但是却需要平台(camel-core/ mule-runtime) 来进行运行支持, 而平台往往较为庞大占用运行资源
  • 出现问题后调试困难, 正所谓复杂度不会消失, 只是从一个地方转移到另一个地方; 作为用户而言, 难以搞清集成平台本身的运行逻辑, 很难想java-debug模式那样一行一行的查找问题锁定根源

举例

camel from-to方法: 为了实现from的调用, 引擎在运行时进行多级注册, 相互调用

同时引擎也分为多阶段进行启动

而这仅仅只是启动时的注册方法类, 其实际运行时又存在更多的process类, 导致开发者难以对于平台本身存在把握

BaseMainSupport: postProcessCamelContext - 根据生命周期启动各类contex, 同时注册监听listener

protected void postProcessCamelContext(CamelContext camelContext) throws Exception {
        // gathers the properties (key=value) that was used as property placeholders during bootstrap
        final OrderedLocationProperties propertyPlaceholders = new OrderedLocationProperties();
        // use the main autowired lifecycle strategy instead of the default
        camelContext.getLifecycleStrategies().removeIf(s -> s instanceof AutowiredLifecycleStrategy);
        camelContext.addLifecycleStrategy(new MainAutowiredLifecycleStrategy(camelContext));
        // setup properties
        configurePropertiesService(camelContext);
        // register listener on properties component so we can capture them
        PropertiesComponent pc = camelContext.getPropertiesComponent();
        pc.addPropertiesLookupListener(new PropertyPlaceholderListener(propertyPlaceholders));
        // setup startup recorder before building context
        configureStartupRecorder(camelContext);
        // setup package scan
        configurePackageScan(camelContext);
        // configure to use our main routes loader
        configureRoutesLoader(camelContext);
        // ensure camel context is build
        camelContext.build();
        for (MainListener listener : listeners) {
            listener.beforeInitialize(this);
        }
        // allow doing custom configuration before camel is started
        for (MainListener listener : listeners) {
            listener.beforeConfigure(this);
        }
        // we want to capture startup events for import tasks during main bootstrap
        StartupStepRecorder recorder = camelContext.adapt(ExtendedCamelContext.class).getStartupStepRecorder();
        StartupStep step;
        if (standalone) {
            step = recorder.beginStep(BaseMainSupport.class, "autoconfigure", "Auto Configure");
            autoconfigure(camelContext);
            recorder.endStep(step);
        }
        if (mainConfigurationProperties.isEagerClassloading()) {
            step = recorder.beginStep(BaseMainSupport.class, "classloading", "Eager Classloading");
            EagerClassloadedHelper.eagerLoadClasses();
            recorder.endStep(step);
        }
        configureLifecycle(camelContext);
        if (standalone) {
            step = recorder.beginStep(BaseMainSupport.class, "configureRoutes", "Collect Routes");
            configureRoutes(camelContext);
            recorder.endStep(step);
        }
        // allow doing custom configuration before camel is started
        for (MainListener listener : listeners) {
            listener.afterConfigure(this);
            listener.configure(camelContext);
        }
        // we want to log the property placeholder summary after routes has been started,
        // but before camel context logs that it has been started, so we need to use an event listener
        if (standalone && mainConfigurationProperties.isAutoConfigurationLogSummary()) {
            camelContext.getManagementStrategy().addEventNotifier(new EventNotifierSupport() {
                @Override
                public boolean isEnabled(CamelEvent event) {
                    return event instanceof CamelContextRoutesStartedEvent;
                }
                @Override
                public void notify(CamelEvent event) throws Exception {
                    // log summary of configurations
                    if (!propertyPlaceholders.isEmpty()) {
                        LOG.info("Property-placeholders summary");
                        for (var entry : propertyPlaceholders.entrySet()) {
                            String k = entry.getKey().toString();
                            Object v = entry.getValue();
                            String loc = locationSummary(propertyPlaceholders, k);
                            if (SensitiveUtils.containsSensitive(k)) {
                                LOG.info("    {} {}=xxxxxx", loc, k);
                            } else {
                                LOG.info("    {} {}={}", loc, k, v);
                            }
                        }
                    }
                }
            });
        }
    }

BaseMainSupport: configureRoutes - 配置并初始化路由选项 (为from-to类DSL语言支持)

protected void configureRoutes(CamelContext camelContext) throws Exception {
        // then configure and add the routes
        RoutesConfigurer configurer = new RoutesConfigurer();
        if (mainConfigurationProperties.isRoutesCollectorEnabled()) {
            configurer.setRoutesCollector(routesCollector);
        }
        configurer.setBeanPostProcessor(camelContext.adapt(ExtendedCamelContext.class).getBeanPostProcessor());
        configurer.setRoutesBuilders(mainConfigurationProperties.getRoutesBuilders());
        configurer.setRoutesBuilderClasses(mainConfigurationProperties.getRoutesBuilderClasses());
        if (mainConfigurationProperties.isBasePackageScanEnabled()) {
            // only set the base package if enabled
            configurer.setBasePackageScan(mainConfigurationProperties.getBasePackageScan());
        }
        configurer.setJavaRoutesExcludePattern(mainConfigurationProperties.getJavaRoutesExcludePattern());
        configurer.setJavaRoutesIncludePattern(mainConfigurationProperties.getJavaRoutesIncludePattern());
        configurer.setRoutesExcludePattern(mainConfigurationProperties.getRoutesExcludePattern());
        configurer.setRoutesIncludePattern(mainConfigurationProperties.getRoutesIncludePattern());
        configurer.configureRoutes(camelContext);
    }

org.apache.camel.main.RoutesConfigurer#configureRoutes - 扫描所有注册类, 获取注册的路由信息

public void configureRoutes(CamelContext camelContext) throws Exception {
        final List<RoutesBuilder> routes = new ArrayList<>();
        if (getRoutesBuilders() != null) {
            routes.addAll(getRoutesBuilders());
        }
        if (getRoutesBuilderClasses() != null) {
            String[] routeClasses = getRoutesBuilderClasses().split(",");
            for (String routeClass : routeClasses) {
                Class<RoutesBuilder> routeClazz = camelContext.getClassResolver().resolveClass(routeClass, RoutesBuilder.class);
                if (routeClazz == null) {
                    LOG.warn("Unable to resolve class: {}", routeClass);
                    continue;
                }
                // lets use Camel's injector so the class has some support for dependency injection
                RoutesBuilder builder = camelContext.getInjector().newInstance(routeClazz);
                routes.add(builder);
            }
        }
        if (getBasePackageScan() != null) {
            String[] pkgs = getBasePackageScan().split(",");
            Set<Class<?>> set = camelContext.adapt(ExtendedCamelContext.class)
                    .getPackageScanClassResolver()
                    .findImplementations(RoutesBuilder.class, pkgs);
            for (Class<?> routeClazz : set) {
                Object builder = camelContext.getInjector().newInstance(routeClazz);
                if (builder instanceof RoutesBuilder) {
                    routes.add((RoutesBuilder) builder);
                } else {
                    LOG.warn("Class {} is not a RouteBuilder class", routeClazz);
                }
            }
        }
        if (getRoutesCollector() != null) {
            try {
                LOG.debug("RoutesCollectorEnabled: {}", getRoutesCollector());
                // add discovered routes from registry
                Collection<RoutesBuilder> routesFromRegistry = getRoutesCollector().collectRoutesFromRegistry(
                        camelContext,
                        getJavaRoutesExcludePattern(),
                        getJavaRoutesIncludePattern());
                routes.addAll(routesFromRegistry);
                if (LOG.isDebugEnabled() && !routesFromRegistry.isEmpty()) {
                    LOG.debug("Discovered {} additional RoutesBuilder from registry: {}", routesFromRegistry.size(),
                            getRoutesIncludePattern());
                }
                // add discovered routes from directories
                StopWatch watch = new StopWatch();
                Collection<RoutesBuilder> routesFromDirectory = getRoutesCollector().collectRoutesFromDirectory(
                        camelContext,
                        getRoutesExcludePattern(),
                        getRoutesIncludePattern());
                routes.addAll(routesFromDirectory);
                if (LOG.isDebugEnabled() && !routesFromDirectory.isEmpty()) {
                    LOG.debug("Loaded {} additional RoutesBuilder from: {} (took {})", routesFromDirectory.size(),
                            getRoutesIncludePattern(), TimeUtils.printDuration(watch.taken(), true));
                }
            } catch (Exception e) {
                throw RuntimeCamelException.wrapRuntimeException(e);
            }
        }
        if (getBeanPostProcessor() != null) {
            // lets use Camel's bean post processor on any existing route builder classes
            // so the instance has some support for dependency injection
            for (RoutesBuilder routeBuilder : routes) {
                getBeanPostProcessor().postProcessBeforeInitialization(routeBuilder, routeBuilder.getClass().getName());
                getBeanPostProcessor().postProcessAfterInitialization(routeBuilder, routeBuilder.getClass().getName());
            }
        }
        // add the discovered routes
        addDiscoveredRoutes(camelContext, routes);
        // then discover and add templates
        Set<ConfigureRouteTemplates> set = camelContext.getRegistry().findByType(ConfigureRouteTemplates.class);
        for (ConfigureRouteTemplates crt : set) {
            LOG.debug("Configuring route templates via: {}", crt);
            crt.configure(camelContext);
        }
    }

mule的可视化编排: 其可视化编排结果为XML文件, 引擎则根据此文件启动流程执行

若要启动流程则首先启动执行引擎, 也就是Mule容器-Container, 其中会注册各类服务并且添加监听调用, 同样跳转很多层才能够发觉用户真正需要执行的代码

public void start(boolean registerShutdownHook) throws MuleException {
    if (registerShutdownHook) {
      registerShutdownHook();
    }
    try {
      startIfNeeded(artifactResourcesRegistry.getContainerProfilingService());
      doResourceInitialization();
      createExecutionMuleFolder();
      serviceManager.start();
      coreExtensionManager.setDeploymentService(deploymentService);
      coreExtensionManager.setRepositoryService(repositoryService);
      coreExtensionManager.setArtifactClassLoaderManager(artifactResourcesRegistry.getArtifactClassLoaderManager());
      coreExtensionManager.setToolingService(toolingService);
      coreExtensionManager.setServiceRepository(serviceManager);
      coreExtensionManager.setTroubleshootingService(troubleshootingService);
      validateLicense();
      showSplashScreen();
      coreExtensionManager.initialise();
      coreExtensionManager.start();
      toolingService.initialise();
      extensionModelLoaderManager.start();
      deploymentService.start();
    } catch (Throwable e) {
      shutdown(e);
    }
  }

经过容器启动-注册Context-启动生命周期-监听listener-初始化processor各类过程, 最终在流程执行时走到真正的process方法处

/**
   * Lists all the files in the {@code directoryPath} which match the given {@code matcher}.
   * <p>
   * If the listing encounters a directory, the output list will include its contents depending on the value of the
   * {@code recursive} parameter.
   * <p>
   *
   * @param config        the config that is parameterizing this operation
   * @param directoryPath the path to the directory to be listed
   * @param recursive     whether to include the contents of sub-directories. Defaults to false.
   * @param matcher     a matcher used to filter the output list
   * @param timeBetweenSizeCheck wait time between size checks to determine if a file is ready to be read.
   * @param timeBetweenSizeCheckUnit time unit to be used in the wait time between size checks.
   * @return a {@link List} of {@link Message messages} each one containing each file's content in the payload and metadata in the attributes
   * @throws IllegalArgumentException if {@code directoryPath} points to a file which doesn't exist or is not a directory
   */
  @Summary("List all the files from given directory")
  @MediaType(value = ANY, strict = false)
  @Throws(FileListErrorTypeProvider.class)
  public PagingProvider<LocalFileSystem, Result<Object, LocalFileAttributes>> list(@Config FileConnectorConfig config,
                                                                                   @Path(type = DIRECTORY,
                                                                                       location = EXTERNAL) String directoryPath,
                                                                                   @Optional(
                                                                                       defaultValue = "false") boolean recursive,
                                                                                   @Optional @DisplayName("File Matching Rules") @Summary("Matcher to filter the listed files") LocalFileMatcher matcher,
                                                                                   @ConfigOverride @Placement(
                                                                                       tab = ADVANCED_TAB) Long timeBetweenSizeCheck,
                                                                                   @ConfigOverride @Placement(
                                                                                       tab = ADVANCED_TAB) TimeUnit timeBetweenSizeCheckUnit,
                                                                                   StreamingHelper streamingHelper,
                                                                                   @Optional @Placement(
                                                                                       tab = ADVANCED_TAB) @Summary("Limit and sort the number of files returned") LocalSubsetList subset) {
    PagingProvider result =
        doPagedList(config, directoryPath, recursive, matcher,
                    config.getTimeBetweenSizeCheckInMillis(timeBetweenSizeCheck, timeBetweenSizeCheckUnit).orElse(null),
                    streamingHelper, subset);
    return result;
  }

连接器规范小结

Camel:

  • Component - EndPoint - Producer/Consumer 三级结构
  • Component管理EndPoint, 作为工厂以及管理者
  • EndPoint则产生Producer与Consumer
  • Producer可以作为from节点, 生产消息/Consumer则作为to节点, 消费到来的消息
  • 连接器真正的实现逻辑放在Producer以及Consumer节点之中

Mule:

  • Extension/Configuration/Connection/ConnectionProvider/Operations平行结构
  • Extension声明连接器结构, 以及连接器定义
  • Configuration则存储连接器相关配置
  • Connection/ConnectionProvider为与链接系统建立实质化链接
  • Operations则声明该连接器存在的所有可用操作, 同时在Mule可视化界面中透出
  • Mule的开始节点作为Source单独进行定义

面对与需要解决的问题

现存的一些问题:

  • 对于集成平台本身的依赖
  • 内部实现的复杂度导致调试困难
  • 上手成本

在当前云巧的交付环境下, 在实际项目中存在以下情形:

  • 存在交付项目只需要小部分数据需要与其他应用进行集成, 而客户计算资源有限难以提供单独的集成平台运行环境
  • 部分交付项目开发/维护人员存在大量ISV人员, 且人员稳定性不佳
  • 对于集成平台的上手成本这一痛点被无限放大, 每出现一次人员变更就需要重新平台进行熟悉, 浪费人员效率
  • 开发过程中, 对于bug的调试情况, 由于集成平台本身的复杂度较高, 新手很难准确在集成平台上进行调试
  • 客户本身想要进行能力沉淀, 而使用集成平台难以构成自闭环, 始终需要引入外部依赖, 会导致客户存在抵触心理

针对上述问题, 我们尝试使用云巧集成平台GTSP进行解决方案的探索

GTSP如何解决这些问题?

架构

上图可以看出, 云巧集成平台仍然使用了类似于传统集成平台的架构, 不同之处有

  • 对于连接器的划分更为贴近业务, 除开通用的连接器(如http/jdbc等)外, 有直接聚焦于某一类详细业务的连接器, 如专注于进行钉钉人员与组织同步的ECUS连接器等, 这类连接器可以让使用者更加贴近业务逻辑减少对于底层实现的关注
  • 场景应用之中的三类形态:

  1. 连接器输出-不含runtime,将连接器当做封装好的二方包使用

优点:可控性强,门槛低,无接入成本

缺点:代码侵入

  1. 生成源码+运行时-由开发态完成流程开发,codeGen生成流程源码,有runtime加载并独立运行

优点:可视化编排交付; 独立部署,无代码入侵; 生成代码与手写差异不大,阅读门槛较低; 可基于生成代码二次开发

                缺点:调试时需要一定程度理解运行时引擎。

  1. 平台输出

优点:可视化编码交付与迭代;  托管平台部署运维,日志查看断点调试;  集成流白屏化管理

缺点:需要一定学习成本; 编排能力依赖平台能力

与此同时, 我们可以进行源代码的开发, 既可以让使用的用户对所用模块知根知底, 同时也可以通过良好的可扩展性吸引更多的优秀连接器入驻, 与客户/伙伴共建连接器市场以及实际的连接应用场景, 实现资产的可沉淀性

集成产品功能

如导图所示, 集成产品将依赖于云巧市场以及大禹平台, 用户可以根据自己的需求使用上述三种方式的任意一种进行代码的开发; 我们也会根据常用的应用场景提供模板, 方便用户更快速的实现需求

连接器设计

力求简洁, GTSP连接器主要分为三个模块

  • Connector/ConnectorImpl
  • 连接器接口定义/核心实现功能类
  • connector定义的方法既可以作为连接器被GTSP作为流程节点调用, 又可以作为jar包直接被客户端集成
  • connector定义的方法未来会透出给前端, 因此其方法参数以及函数返回结果需要有较好的自解释性
  • 可以利用注解的方式为方法参数进行定义
  • 这就要求每个函数都声明自己必须使用的参数, 尽力去除无效参数; 而这有为函数参数形式的统一带来了挑战
  • Init
  • 连接器初始化, 为后续Spring-Boot等提供支持
  • Trigger
  • 可以作为开始节点的连接器
  • 连接器实现案例: 以全表查询为例, 为原生的jdbc进行部分能力增强
@Connector(name = "jdbc", title = "JDBC连接器")
public interface JdbcConnector {
    /**
     * 遍历查询全表, 不使用query中sql语句
     *
     * @param config
     * @param query  支持额外参数(AdvancedOptions):
     *               分页查询参数:
     *               int pageSize - default=10
     *               String waterColumn
     *               sql修饰参数:
     *               String selectColumns
     *               String whereCondition
     * @return
     */
    @Operation(name = "iterate", title = "遍历查询(返回map)")
    Iterator<Map<String, Object>> iterate(@Config JdbcConfig config, @Param Query query);
}
    public <T> Iterator<T> iterate(Query<T> query, JdbcConfig config) {
        NamedParameterJdbcTemplate jdbcTemplate = jdbcTemplate(config);
        String table = getTableNameFromAnnotationAndQuery(query);
        PageIteratorAsClass<T> pageIteratorAsClass = new PageIteratorAsClass<>(jdbcTemplate, query);
        return pageIteratorAsClass;
    }
}

全表查询迭代器Iterator实现类

public class PageIteratorAsClass<T> implements Iterator<T> {
    @Override
    public boolean hasNext() {
        if (iterator == null || iterator.hasNext() == false) poll();
        return iterator.hasNext();
    }
    @Override
    public T next() {
        T t = iterator.next();
        if (t != null) {
            updateDataIndex(t);
        }
        return t;
    }
    private void updateDataIndex(T data) {
        dataMark!=null?updateIndexWithDataMark():updateIndexWithOutDataMark();
    }
    /*
     * current, limit only get supported by mysql
     * so this connector only works for rds-mysql now
     */
    private void poll() {
        String sql = generateNextSQL();
        LOG.info("Execute " + sql);
        iterator = jdbcTemplate.query(sql, connectorRowMapper).stream().iterator();
    }
}

连接器原生使用方式:

public void init() {
        jdbcConnector = new JdbcConnectorImpl();
        jdbcConfig = new JdbcConfig();
        setJdbcConfig(jdbcConfig);
    }
    public void iterate() {
        Query<MidwayData> query = new Query<>();
        query.setClazz(MidwayData.class); //只需在query中注明DO类, 即可实现全表查询
        // 更多设置可以在query中进行定义, 本方法反射方式读取mybatis注解属性构造sql语句
        Iterator<MidwayData> iterator = jdbcConnector.iterate(query, jdbcConfig);
        while (iterator.hasNext()) {
            System.out.println(iterator.next().toString());
        }
    }

CodeGen实现方式

codegen代码形式:

public void test() throws Exception {
        FlowGeneratorImpl generator = new FlowGeneratorImpl();
        generator.setNodeMetaProvider(new NodeMetaProviderImpl());
        FlowModel flowModel = new FlowModel();
        NodeModel nodeModel = new NodeModel();
        setNodeModelProperty(nodeModel);
        NodeModel nodeModel2 = new NodeModel();
        setNodeModelProperty(nodeModel2);
        flowModel.getNodes().add(nodeModel).add(nodeModel2);
        String code = generator.generate(flowModel);
        System.out.println(code);
    }

生成代码形式

/** codegen by engine @Date Wed Sep 28 17:47:13 CST 2022 */
public class nullFlow implements Flow {
    @Resource 
    private HttpConnector httpConnector;
    @Override
    public void execute(FlowContext context) {
        /* step 1 invoke httpTest  */
        httpTest(context);
        /* step 2 invoke httpTest2  */
        httpTest2(context);
    }
    private void httpTest(FlowContext context) {
        HttpConfig config = context.getConfig("httpConfigxxx", HttpConfig.class);
        HttpRequest request = new HttpRequest();
        request.setMethod("GET");
        HttpResponse response = httpConnector.execute(config, request);
        context.putResult("httpTest", response);
    }
    private void httpTest2(FlowContext context) {
        HttpConfig config = context.getConfig("httpConfigxxx2", HttpConfig.class);
        HttpRequest request = new HttpRequest();
        request.setMethod("POST");
        request.setBody(context.getValue("#input.body", String.class));
        request.setContentType("application/json");
        HttpResponse response = httpConnector.execute(config, request);
        context.putResult("httpTest2", response);
    }
}

总结与展望

如何更好地在实际项目中应用集成平台, 这本身就是一个见仁见智的问题. 本文尝试从传统的code与新型的no-code方式之中进行一个折中的尝试; 希望能够通过更加灵活/便捷的使用方式, 为项目中的数据集成提供更大的价值.

问题讨论

上文中, 我们发现交付项目中存在的种种需要数据集成而又不利于集成平台存在的场景, 针对于此GTSP则灵活提供三类配置, 其中 连接器输出以及生成源码(CodeGen)+运行时这两个模式可以较好的减弱人员上手成本以及降低外部依赖性, 使用此方式开发效率相较于传统code方式能够获得一定提效, 可以增强代码的复用性, 相较于集成平台式的界面开发仍然不足之处, 并且对于接口文档的依赖以及接口易用性/可靠性的要求也较高. 更好地解决方式目前仍然在探索中, 或许一个易于使用同时方便理解的架构平台会是一种解决方式, 只是此时仍未可知.

读者若有更多的见解, 欢迎留言进行讨论, 不胜感激

相关文章
|
3月前
|
存储 缓存 NoSQL
深入理解Django与Redis的集成实践
深入理解Django与Redis的集成实践
100 0
|
2月前
|
机器学习/深度学习 人工智能 jenkins
软件测试中的自动化与持续集成实践
在快速迭代的软件开发过程中,自动化测试和持续集成(CI)是确保代码质量和加速产品上市的关键。本文探讨了自动化测试的重要性、常见的自动化测试工具以及如何将自动化测试整合到持续集成流程中,以提高软件测试的效率和可靠性。通过案例分析,展示了自动化测试和持续集成在实际项目中的应用效果,并提供了实施建议。
|
2月前
|
jenkins Devops Java
DevOps实践:Jenkins在持续集成与持续部署中的价值
【10月更文挑战第27天】在快速发展的软件开发领域,DevOps实践日益重要。Jenkins作为一款流行的开源自动化服务器,在持续集成(CI)和持续部署(CD)中扮演关键角色。本文通过案例分析,探讨Jenkins在Java项目中的应用,展示其自动化构建、测试和部署的能力,提高开发效率和软件质量。
87 2
|
30天前
|
人工智能 数据可视化 JavaScript
NodeTool:AI 工作流可视化构建器,通过拖放节点设计复杂的工作流,集成 OpenAI 等多个平台
NodeTool 是一个开源的 AI 工作流可视化构建器,通过拖放节点的方式设计复杂的工作流,无需编码即可快速原型设计和测试。它支持本地 GPU 运行 AI 模型,并与 Hugging Face、OpenAI 等平台集成,提供模型访问能力。
111 14
NodeTool:AI 工作流可视化构建器,通过拖放节点设计复杂的工作流,集成 OpenAI 等多个平台
|
16天前
|
运维 监控 Cloud Native
构建深度可观测、可集成的网络智能运维平台
本文介绍了构建深度可观测、可集成的网络智能运维平台(简称NIS),旨在解决云上网络运维面临的复杂挑战。内容涵盖云网络运维的三大难题、打造云原生AIOps工具集的解决思路、可观测性对业务稳定的重要性,以及产品发布的亮点,包括流量分析NPM、网络架构巡检和自动化运维OpenAPI,助力客户实现自助运维与优化。
|
1月前
|
DataWorks 数据挖掘 大数据
方案实践测评 | DataWorks集成Hologres构建一站式高性能的OLAP数据分析
DataWorks在任务开发便捷性、任务运行速度、产品使用门槛等方面都表现出色。在数据处理场景方面仍有改进和扩展的空间,通过引入更多的智能技术、扩展数据源支持、优化任务调度和可视化功能以及提升团队协作效率,DataWorks将能够为企业提供更全面、更高效的数据处理解决方案。
|
3月前
|
运维 监控 Devops
DevOps实践:自动化部署与持续集成的融合之旅
【10月更文挑战第7天】在软件开发领域,DevOps已成为一种文化和实践,它倡导开发(Dev)与运维(Ops)之间的协作与整合。本文将引导读者了解如何通过自动化部署和持续集成(CI)的实践来提升软件交付的速度和质量。我们将探讨一些实用的工具和技术,以及它们是如何帮助团队高效地管理代码变更、测试和部署的。文章将不包含代码示例,但会详细解释概念和流程,确保内容的通俗易懂和条理性。
156 62
|
2月前
|
运维 Devops jenkins
DevOps实践:持续集成与持续部署在现代软件开发中的作用
【10月更文挑战第42天】在快节奏的软件开发世界里,DevOps已经成为一种提升效率、确保质量和加速交付的重要方法。本文将深入探讨DevOps的核心组成部分—持续集成(CI)和持续部署(CD)—并展示它们如何通过自动化流程优化开发周期。我们将从基础概念讲起,逐步过渡到实际操作,最终通过一个简单代码示例来演示这一过程。文章旨在为读者提供清晰的指导,帮助他们理解和实现CI/CD流程,从而在软件开发领域取得竞争优势。
|
2月前
|
Devops jenkins 测试技术
DevOps实践:自动化部署与持续集成的融合之旅
【10月更文挑战第41天】在软件开发的世界中,快速迭代和高效交付是企业竞争力的关键。本文将带你走进DevOps的核心实践——自动化部署与持续集成,揭示如何通过它们提升开发流程的效率与质量。我们将从DevOps的基本理念出发,逐步深入到具体的技术实现,最终展示一个实际的代码示例,让理论与实践相结合,为你的开发旅程提供清晰的指引。
72 4
|
2月前
|
存储 监控 Devops
DevOps实践:持续集成/持续部署(CI/CD)的实战指南
DevOps实践:持续集成/持续部署(CI/CD)的实战指南

热门文章

最新文章