集成平台下连接器设计规范: 调研-实践-思考

简介: 云集成是当今企业面临的主要挑战之一,为了满足对安全可靠的云集成解决方案日益增长的需求,一些供应商已开始提供集成服务,称为集成平台即服务 (iPaaS)。对于集成平台而言, 连接器可以看做平台运行时引擎的可重用扩展, 能够将 集成平台 应用程序与第三方 API、数据库和标准集成协议集成。连接器抽象了连接到目标系统所涉及的技术细节。本文探索了部分优秀的集成平台以及其各自的连接器规范, 对于现有链接场景的存在问题进行了一定的思考, 同时提出了我们自己的数据集成平台以及连接器设计思路, 希望可以通过Full-Code, Low-Code以及Platform三种方案灵活切换的方式解决实际需求。

前言

为什么需要集成平台?

云集成是当今企业面临的主要挑战之一,这已不是什么秘密 。为了满足对安全可靠的云集成解决方案日益增长的需求,一些供应商已开始提供集成服务,称为集成平台即服务 (iPaaS)。

  • iPaaS 是一种基于云的集成解决方案,是一个用于在云内部以及云与企业之间构建和部署集成的平台。借助 iPaaS,用户可以开发连接云或本地应用程序的集成流程,然后在无需安装或管理任何硬件或中间件的情况下进行部署。
  • Gartner 为 iPaaS 类别提供了进一步的定义和特殊性,概述了其参考模型中的一些关键功能。其中包括支持集成流执行的工具和技术、集成的开发和生命周期管理、应用程序流的管理和监控、治理以及基本的云功能,例如多租户、弹性和自配置。Gartner 还将 iPaaS 视为用户、服务提供商和集成提供商之间购买、销售和交换集成流(开箱即用和定制模式)的潜在平台 。
  • 由于 iPaaS 模型仍处于早期阶段,Gartner 指出,目前市场上的 iPaaS 产品可能并未包含其参考模型中的所有功能。相反,Gartner 确定了三类 iPaaS 供应商,每类都强调不同的集成领域:电子商务和 B2B 集成、云集成以及 企业服务总线 (ESB) 和 面向服务的架构 (SOA) 基础架构。
  • 根据企业的特定集成需求,某些供应商选项可能比其他选项更适合。对于短期集成需求,专注于电子商务/B2B 集成和云集成的 iPaaS 产品提供了快速连接合作伙伴应用程序和云服务的简单解决方案。

然而,鉴于向混合架构的不断转变,开始考虑长期集成战略以支持包括本地资源和云服务的计算模型将是明智之举。在三个供应商类别中,具有 ESB 和 SOA 背景的供应商提供的 iPaaS 产品为长期集成和治理项目提供了最平衡和最强大的功能集。尽管 Gartner 指出其中一些产品目前处于测试阶段或开发阶段,但显然具有 ESB 和 SOA 专业知识的 iPaaS 供应商最适合解决云时代的系统集成问题。

连接器是什么?

对于集成平台而言, 连接器可以看做平台运行时引擎的可重用扩展, 能够将 集成平台 应用程序与第三方 API、数据库和标准集成协议集成。连接器抽象了连接到目标系统所涉及的技术细节。

在 集成平台 应用程序中使用连接器具有以下优势:

  • 降低代码复杂性,可以将 应用程序连接到目标系统,而无需了解向目标系统编程所需的所有细节
  • 简化针对目标系统的身份验证
  • 主动推断目标系统的元数据,从而更轻松地使用表达式引擎识别和转换数据
  • 使代码维护更容易,因为:
  • 并非目标系统中的所有更改都需要更改应用程序。
  • 无需更新应用程序的其他部分即可更新连接器配置。

已有集成平台与连接器调研

Apache-Camel

简介

Apache Camel 是一个开源 Java 框架,专注于使开发人员更容易和更容易地进行集成。它通过提供:

  • 所有广泛使用的 EIP 的具体实现
  • 连接到各种各样的传输和 API
  • 易于使用的领域特定语言 (DSL) 将 EIP 和传输连接在一起

下图显示了这三个项目实际上是如何映射到 Camel 概念的。Camel 的组织方式以组件、端点、处理器和 DSL呈现。

网络异常,图片无法展示
|

Camel 架构的高级视图(来自Camel in Action)。

组件是 Camel 中的扩展点,用于添加与其他系统的连接。Camel 的核心非常小,以保持低依赖性、促进可嵌入性等,因此仅包含 26 个基本组件。核心之外有 316 多个组件。为了将这些系统暴露给 Camel 的其余部分,组件提供了一个端点接口。通过使用 URI,您可以以统一的方式在 Endpoints 上发送或接收消息。例如,要从 JMS 队列aQueue接收消息并将它们发送到文件系统目录“/tmp”,您可以使用“jms:aQueue”和“file:/tmp”之类的 URI。

处理器用于操作和调解端点之间的消息。所有 EIP 都被定义为处理器或处理器集。为了将处理器和端点连接在一起,Camel 用 Java、Scala 和 Groovy 等常规编程语言定义了多个 DSL。它还允许在 XML 中指定路由规则。

连接器

Camel目前已经存在数百种连接器, 而了解连接器的最好方式便是从如何创建一个连接器入手. 连接器在Camel中被统一称为组件.

Apache Camel 的设计目的是让添加新组件变得非常容易,无论它们是路由组件、转换器、传输等。组件的想法是成为端点的工厂和管理器, 以下是添加新组件的主要步骤:

  • 编写一个实现该Component接口的 POJO。最简单的方法就是从DefaultComponent进行扩展.
  • 要支持组件的自动发现,请添加一个文件,META-INF/services/org/apache/camel/component/FOO其中 FOO 是组件的 URI 方案以及动态创建的任何相关端点。这个文件应该包含组件类全名的信息。例如,如果您的组件由com.example.CustomComponent类实现,则文件应包含以下行 -  class=com.example.CustomComponent.
  • 然后,用户可以显式创建您的组件,对其进行配置并使用 a 注册它,CamelContext或者他们可以使用自动创建您的组件的 URI。建议使用Camel Maven Archetypes引导您的初始组件,因为它将为您提供所有必要的位来轻松开始开发您的组件。您还需要确保在组件文件中包含Camel 组件 Maven 插件pom.xml,以便为组件生成所有必要的元数据和 Java 文件。

编写端点

  • 在实现Endpoint时,您通常可以实现以下一种或多种方法:
  • createProducer将创建一个生产者来向端点发送消息交换
  • createConsumer实现事件驱动的消费者模式来消费来自端点的消息交换。
  • 通常,您只是从DefaultEndpoint集成并实现接口即可

注释您的端点

  • 如果您想受益于自动生成端点上所有参数的 HTML 文档作为 maven 站点报告的一部分,您需要注释端点的参数
  • 这意味着您@UriEndpoint向 Endpoint 类添加注释,然后注释您希望通过 URI 配置机制配置的每个参数@UriParam(或@UriParams嵌套配置对象)。

选项

  • 如果你的组件有选项,你可以让它有公共的 getter/setter,Camel 会在端点创建时自动设置属性。
  • 但是,如果您想自己解决问题,则必须从给定参数列表中删除该选项,因为 Camel 将验证是否使用了所有选项。如果不是,Camel 将抛出一个ResolveEndpointFailedException说明哪些选项是未知的。
  • 参数由 Camel 在类的createEndpoint方法中提供DefaultComponent
protected abstract Endpoint<E> createEndpoint(String uri, String remaining, Map parameters)

该代码是来自SEDA组件的示例,它删除了 size 参数:

public BlockingQueue<Exchange> createQueue(String uri, Map parameters) {
        int size = 1000;
        Object value = parameters.remove("size");
        if (value != null) {
            Integer i = convertTo(Integer.class, value);
            if (i != null) {
                size = i;
            }
        }
        return new LinkedBlockingQueue<Exchange>(size);
    }

使用示例

以下例子为使用java-dsl操作camel进行文件操作: 利用from-choice-when-to等结构实现条件控制方式,

/**
 * A Camel Java DSL Router
 */
public class MyRouteBuilder extends RouteBuilder {
    /**
     * Let's configure the Camel routing rules using Java code...
     */
    public void configure() {
        // here is a sample which processes the input files
        // (leaving them in place - see the 'noop' flag)
        // then performs content based routing on the message using XPath
        from("file:src/data?noop=true")
            .choice()
                .when(xpath("/person/city = 'London'"))
                    .to("file:target/messages/uk")
                .otherwise()
                    .to("file:target/messages/others");
    }
}

本例子读取文件夹下的源文件, 并根据文件中内容决定要将文件发送至何处文件夹下, 可是看出其实际使用较为明确, 可以明显看出业务逻辑, 同时简化了实现细节

Mule/MuleSoft

简介

什么是 MuleSoft?MuleSoft 是一个平台,它为 IT 提供了自动化一切的工具。这包括集成数据和系统、自动化工作流程和流程以及创造令人难以置信的数字体验——所有这些都在一个易于使用的单一平台上完成。通过我们独特的方法,IT 创建了团队可以根据需要使用的数字构建块,所有这些都内置了正确的安全、治理和合规措施。 MuleSoft 在三件事上帮助 IT 团队:通过集成解锁系统和数据,通过自动化提高生产力和效率,以及创造引人入胜的数字体验。我们的可组合连接方法将每个数字资产变成可重复使用的产品。使用这种方法,团队可以更快地交付项目。而Mule则是MuleSoft平台下一个主要产品

连接器

经过数年沉淀, Mule也已经存在上百种功能各异的连接器, 在Mule 4规范中, 这些连接器被统一称为扩展(Extension).

mule的连接器需要利用maven生成初始化项目, 之后在项目中按照模板文件进行开发

mvn org.mule.extensions:mule-extensions-archetype-maven-plugin:generate
  • 输入扩展名:DemoConnector
  • 输入扩展的groupId:com.Demo.muleConnector
  • 输入扩展的 artifactId:mulesoft-demo-connector
  • 输入扩展的版本:1.0.0
  • 进入扩展的主包:org.mule.extension.Demo

构建成功后,转到您的文件夹,您将找到一个带有 artifact-id 的连接器包。

您将找到以下类和文件夹结构。在本节中,我们将概述您将在每个文件夹中找到的内容:

网络异常,图片无法展示
|

src/main/java 文件夹: 此文件夹包含连接器的源 Java 文件,包括用于操作和连接配置的框架 Java 文件。其他支持类也应该存储在这里。一旦我们在 Anypoint Studio 中打开这个项目,就会有许多类,它们将使用 Mule SDK 注释进行注释,如下所示:

  • <connector-name>Extension.java:此类标识连接器的各种属性。在 Mule 4 中,连接器只是一个扩展。该类将识别哪些是配置类,哪些是操作类。
  • <connector-name>Configuration.java:这包含您想要从连接器的全局配置中获得的所有信息。
  • <connector-name>Connection.java:连接类负责处理连接,在我们的例子中,大部分实际编码都在这里。
  • <connector-name>ConnectionProvider.java:该类用于管理和提供与目标系统的连接。 连接提供者必须实现 Mule 中可用的连接提供者之一。选项是 PoolingConnectionProvider、CachedConnectionProvider 和 ConnectionProvider。
  • <connector-name>Operations.java:这是您定义所有必要操作的类。可以有多个操作类文件。

使用示例

MuleSoft开发程序使用AnyPointStudio, 为根据IDE改造的平台, 存在可视化界面, 其模型编排如下,下方应用实现了与camel示例相同的功能

可以看出, 其主要逻辑流程均可由拖拽完成, 方便用户操作, 上手难度也相对较低, 用户友好型较高。

优缺点分析

优点

  • 作为集成平台, 提供了大量丰富的连接器能力, 使得用户可以使用较低的开发时间/开发成本实现所需功能
  • 相较于传统的软件开发, 用户可以减少对于底层细节的关注, 从而将更多的精力投注在业务逻辑上
  • Getting More While Doing Less

缺点

  • 存在学习成本, camel的dsl流程编排需要对于组件, 库, Route均存在一定程度的了解, 需要额外耗费开发者精力/mule虽然存在可视化界面, 但是上手仍然有一定难度
  • 组件运行依赖于平台本身, 可能组件本身只实现了很简单的功能, 但是却需要平台(camel-core/ mule-runtime) 来进行运行支持, 而平台往往较为庞大占用运行资源
  • 出现问题后调试困难, 正所谓复杂度不会消失, 只是从一个地方转移到另一个地方; 作为用户而言, 难以搞清集成平台本身的运行逻辑, 很难想java-debug模式那样一行一行的查找问题锁定根源

举例

camel from-to方法: 为了实现from的调用, 引擎在运行时进行多级注册, 相互调用

同时引擎也分为多阶段进行启动

而这仅仅只是启动时的注册方法类, 其实际运行时又存在更多的process类, 导致开发者难以对于平台本身存在把握

BaseMainSupport: postProcessCamelContext - 根据生命周期启动各类contex, 同时注册监听listener

protected void postProcessCamelContext(CamelContext camelContext) throws Exception {
        // gathers the properties (key=value) that was used as property placeholders during bootstrap
        final OrderedLocationProperties propertyPlaceholders = new OrderedLocationProperties();
        // use the main autowired lifecycle strategy instead of the default
        camelContext.getLifecycleStrategies().removeIf(s -> s instanceof AutowiredLifecycleStrategy);
        camelContext.addLifecycleStrategy(new MainAutowiredLifecycleStrategy(camelContext));
        // setup properties
        configurePropertiesService(camelContext);
        // register listener on properties component so we can capture them
        PropertiesComponent pc = camelContext.getPropertiesComponent();
        pc.addPropertiesLookupListener(new PropertyPlaceholderListener(propertyPlaceholders));
        // setup startup recorder before building context
        configureStartupRecorder(camelContext);
        // setup package scan
        configurePackageScan(camelContext);
        // configure to use our main routes loader
        configureRoutesLoader(camelContext);
        // ensure camel context is build
        camelContext.build();
        for (MainListener listener : listeners) {
            listener.beforeInitialize(this);
        }
        // allow doing custom configuration before camel is started
        for (MainListener listener : listeners) {
            listener.beforeConfigure(this);
        }
        // we want to capture startup events for import tasks during main bootstrap
        StartupStepRecorder recorder = camelContext.adapt(ExtendedCamelContext.class).getStartupStepRecorder();
        StartupStep step;
        if (standalone) {
            step = recorder.beginStep(BaseMainSupport.class, "autoconfigure", "Auto Configure");
            autoconfigure(camelContext);
            recorder.endStep(step);
        }
        if (mainConfigurationProperties.isEagerClassloading()) {
            step = recorder.beginStep(BaseMainSupport.class, "classloading", "Eager Classloading");
            EagerClassloadedHelper.eagerLoadClasses();
            recorder.endStep(step);
        }
        configureLifecycle(camelContext);
        if (standalone) {
            step = recorder.beginStep(BaseMainSupport.class, "configureRoutes", "Collect Routes");
            configureRoutes(camelContext);
            recorder.endStep(step);
        }
        // allow doing custom configuration before camel is started
        for (MainListener listener : listeners) {
            listener.afterConfigure(this);
            listener.configure(camelContext);
        }
        // we want to log the property placeholder summary after routes has been started,
        // but before camel context logs that it has been started, so we need to use an event listener
        if (standalone && mainConfigurationProperties.isAutoConfigurationLogSummary()) {
            camelContext.getManagementStrategy().addEventNotifier(new EventNotifierSupport() {
                @Override
                public boolean isEnabled(CamelEvent event) {
                    return event instanceof CamelContextRoutesStartedEvent;
                }
                @Override
                public void notify(CamelEvent event) throws Exception {
                    // log summary of configurations
                    if (!propertyPlaceholders.isEmpty()) {
                        LOG.info("Property-placeholders summary");
                        for (var entry : propertyPlaceholders.entrySet()) {
                            String k = entry.getKey().toString();
                            Object v = entry.getValue();
                            String loc = locationSummary(propertyPlaceholders, k);
                            if (SensitiveUtils.containsSensitive(k)) {
                                LOG.info("    {} {}=xxxxxx", loc, k);
                            } else {
                                LOG.info("    {} {}={}", loc, k, v);
                            }
                        }
                    }
                }
            });
        }
    }

BaseMainSupport: configureRoutes - 配置并初始化路由选项 (为from-to类DSL语言支持)

protected void configureRoutes(CamelContext camelContext) throws Exception {
        // then configure and add the routes
        RoutesConfigurer configurer = new RoutesConfigurer();
        if (mainConfigurationProperties.isRoutesCollectorEnabled()) {
            configurer.setRoutesCollector(routesCollector);
        }
        configurer.setBeanPostProcessor(camelContext.adapt(ExtendedCamelContext.class).getBeanPostProcessor());
        configurer.setRoutesBuilders(mainConfigurationProperties.getRoutesBuilders());
        configurer.setRoutesBuilderClasses(mainConfigurationProperties.getRoutesBuilderClasses());
        if (mainConfigurationProperties.isBasePackageScanEnabled()) {
            // only set the base package if enabled
            configurer.setBasePackageScan(mainConfigurationProperties.getBasePackageScan());
        }
        configurer.setJavaRoutesExcludePattern(mainConfigurationProperties.getJavaRoutesExcludePattern());
        configurer.setJavaRoutesIncludePattern(mainConfigurationProperties.getJavaRoutesIncludePattern());
        configurer.setRoutesExcludePattern(mainConfigurationProperties.getRoutesExcludePattern());
        configurer.setRoutesIncludePattern(mainConfigurationProperties.getRoutesIncludePattern());
        configurer.configureRoutes(camelContext);
    }

org.apache.camel.main.RoutesConfigurer#configureRoutes - 扫描所有注册类, 获取注册的路由信息

public void configureRoutes(CamelContext camelContext) throws Exception {
        final List<RoutesBuilder> routes = new ArrayList<>();
        if (getRoutesBuilders() != null) {
            routes.addAll(getRoutesBuilders());
        }
        if (getRoutesBuilderClasses() != null) {
            String[] routeClasses = getRoutesBuilderClasses().split(",");
            for (String routeClass : routeClasses) {
                Class<RoutesBuilder> routeClazz = camelContext.getClassResolver().resolveClass(routeClass, RoutesBuilder.class);
                if (routeClazz == null) {
                    LOG.warn("Unable to resolve class: {}", routeClass);
                    continue;
                }
                // lets use Camel's injector so the class has some support for dependency injection
                RoutesBuilder builder = camelContext.getInjector().newInstance(routeClazz);
                routes.add(builder);
            }
        }
        if (getBasePackageScan() != null) {
            String[] pkgs = getBasePackageScan().split(",");
            Set<Class<?>> set = camelContext.adapt(ExtendedCamelContext.class)
                    .getPackageScanClassResolver()
                    .findImplementations(RoutesBuilder.class, pkgs);
            for (Class<?> routeClazz : set) {
                Object builder = camelContext.getInjector().newInstance(routeClazz);
                if (builder instanceof RoutesBuilder) {
                    routes.add((RoutesBuilder) builder);
                } else {
                    LOG.warn("Class {} is not a RouteBuilder class", routeClazz);
                }
            }
        }
        if (getRoutesCollector() != null) {
            try {
                LOG.debug("RoutesCollectorEnabled: {}", getRoutesCollector());
                // add discovered routes from registry
                Collection<RoutesBuilder> routesFromRegistry = getRoutesCollector().collectRoutesFromRegistry(
                        camelContext,
                        getJavaRoutesExcludePattern(),
                        getJavaRoutesIncludePattern());
                routes.addAll(routesFromRegistry);
                if (LOG.isDebugEnabled() && !routesFromRegistry.isEmpty()) {
                    LOG.debug("Discovered {} additional RoutesBuilder from registry: {}", routesFromRegistry.size(),
                            getRoutesIncludePattern());
                }
                // add discovered routes from directories
                StopWatch watch = new StopWatch();
                Collection<RoutesBuilder> routesFromDirectory = getRoutesCollector().collectRoutesFromDirectory(
                        camelContext,
                        getRoutesExcludePattern(),
                        getRoutesIncludePattern());
                routes.addAll(routesFromDirectory);
                if (LOG.isDebugEnabled() && !routesFromDirectory.isEmpty()) {
                    LOG.debug("Loaded {} additional RoutesBuilder from: {} (took {})", routesFromDirectory.size(),
                            getRoutesIncludePattern(), TimeUtils.printDuration(watch.taken(), true));
                }
            } catch (Exception e) {
                throw RuntimeCamelException.wrapRuntimeException(e);
            }
        }
        if (getBeanPostProcessor() != null) {
            // lets use Camel's bean post processor on any existing route builder classes
            // so the instance has some support for dependency injection
            for (RoutesBuilder routeBuilder : routes) {
                getBeanPostProcessor().postProcessBeforeInitialization(routeBuilder, routeBuilder.getClass().getName());
                getBeanPostProcessor().postProcessAfterInitialization(routeBuilder, routeBuilder.getClass().getName());
            }
        }
        // add the discovered routes
        addDiscoveredRoutes(camelContext, routes);
        // then discover and add templates
        Set<ConfigureRouteTemplates> set = camelContext.getRegistry().findByType(ConfigureRouteTemplates.class);
        for (ConfigureRouteTemplates crt : set) {
            LOG.debug("Configuring route templates via: {}", crt);
            crt.configure(camelContext);
        }
    }

mule的可视化编排: 其可视化编排结果为XML文件, 引擎则根据此文件启动流程执行

若要启动流程则首先启动执行引擎, 也就是Mule容器-Container, 其中会注册各类服务并且添加监听调用, 同样跳转很多层才能够发觉用户真正需要执行的代码

public void start(boolean registerShutdownHook) throws MuleException {
    if (registerShutdownHook) {
      registerShutdownHook();
    }
    try {
      startIfNeeded(artifactResourcesRegistry.getContainerProfilingService());
      doResourceInitialization();
      createExecutionMuleFolder();
      serviceManager.start();
      coreExtensionManager.setDeploymentService(deploymentService);
      coreExtensionManager.setRepositoryService(repositoryService);
      coreExtensionManager.setArtifactClassLoaderManager(artifactResourcesRegistry.getArtifactClassLoaderManager());
      coreExtensionManager.setToolingService(toolingService);
      coreExtensionManager.setServiceRepository(serviceManager);
      coreExtensionManager.setTroubleshootingService(troubleshootingService);
      validateLicense();
      showSplashScreen();
      coreExtensionManager.initialise();
      coreExtensionManager.start();
      toolingService.initialise();
      extensionModelLoaderManager.start();
      deploymentService.start();
    } catch (Throwable e) {
      shutdown(e);
    }
  }

经过容器启动-注册Context-启动生命周期-监听listener-初始化processor各类过程, 最终在流程执行时走到真正的process方法处

/**
   * Lists all the files in the {@code directoryPath} which match the given {@code matcher}.
   * <p>
   * If the listing encounters a directory, the output list will include its contents depending on the value of the
   * {@code recursive} parameter.
   * <p>
   *
   * @param config        the config that is parameterizing this operation
   * @param directoryPath the path to the directory to be listed
   * @param recursive     whether to include the contents of sub-directories. Defaults to false.
   * @param matcher     a matcher used to filter the output list
   * @param timeBetweenSizeCheck wait time between size checks to determine if a file is ready to be read.
   * @param timeBetweenSizeCheckUnit time unit to be used in the wait time between size checks.
   * @return a {@link List} of {@link Message messages} each one containing each file's content in the payload and metadata in the attributes
   * @throws IllegalArgumentException if {@code directoryPath} points to a file which doesn't exist or is not a directory
   */
  @Summary("List all the files from given directory")
  @MediaType(value = ANY, strict = false)
  @Throws(FileListErrorTypeProvider.class)
  public PagingProvider<LocalFileSystem, Result<Object, LocalFileAttributes>> list(@Config FileConnectorConfig config,
                                                                                   @Path(type = DIRECTORY,
                                                                                       location = EXTERNAL) String directoryPath,
                                                                                   @Optional(
                                                                                       defaultValue = "false") boolean recursive,
                                                                                   @Optional @DisplayName("File Matching Rules") @Summary("Matcher to filter the listed files") LocalFileMatcher matcher,
                                                                                   @ConfigOverride @Placement(
                                                                                       tab = ADVANCED_TAB) Long timeBetweenSizeCheck,
                                                                                   @ConfigOverride @Placement(
                                                                                       tab = ADVANCED_TAB) TimeUnit timeBetweenSizeCheckUnit,
                                                                                   StreamingHelper streamingHelper,
                                                                                   @Optional @Placement(
                                                                                       tab = ADVANCED_TAB) @Summary("Limit and sort the number of files returned") LocalSubsetList subset) {
    PagingProvider result =
        doPagedList(config, directoryPath, recursive, matcher,
                    config.getTimeBetweenSizeCheckInMillis(timeBetweenSizeCheck, timeBetweenSizeCheckUnit).orElse(null),
                    streamingHelper, subset);
    return result;
  }

连接器规范小结

Camel:

  • Component - EndPoint - Producer/Consumer 三级结构
  • Component管理EndPoint, 作为工厂以及管理者
  • EndPoint则产生Producer与Consumer
  • Producer可以作为from节点, 生产消息/Consumer则作为to节点, 消费到来的消息
  • 连接器真正的实现逻辑放在Producer以及Consumer节点之中

Mule:

  • Extension/Configuration/Connection/ConnectionProvider/Operations平行结构
  • Extension声明连接器结构, 以及连接器定义
  • Configuration则存储连接器相关配置
  • Connection/ConnectionProvider为与链接系统建立实质化链接
  • Operations则声明该连接器存在的所有可用操作, 同时在Mule可视化界面中透出
  • Mule的开始节点作为Source单独进行定义

面对与需要解决的问题

现存的一些问题:

  • 对于集成平台本身的依赖
  • 内部实现的复杂度导致调试困难
  • 上手成本

在当前云巧的交付环境下, 在实际项目中存在以下情形:

  • 存在交付项目只需要小部分数据需要与其他应用进行集成, 而客户计算资源有限难以提供单独的集成平台运行环境
  • 部分交付项目开发/维护人员存在大量ISV人员, 且人员稳定性不佳
  • 对于集成平台的上手成本这一痛点被无限放大, 每出现一次人员变更就需要重新平台进行熟悉, 浪费人员效率
  • 开发过程中, 对于bug的调试情况, 由于集成平台本身的复杂度较高, 新手很难准确在集成平台上进行调试
  • 客户本身想要进行能力沉淀, 而使用集成平台难以构成自闭环, 始终需要引入外部依赖, 会导致客户存在抵触心理

针对上述问题, 我们尝试使用云巧集成平台GTSP进行解决方案的探索

GTSP如何解决这些问题?

架构

上图可以看出, 云巧集成平台仍然使用了类似于传统集成平台的架构, 不同之处有

  • 对于连接器的划分更为贴近业务, 除开通用的连接器(如http/jdbc等)外, 有直接聚焦于某一类详细业务的连接器, 如专注于进行钉钉人员与组织同步的ECUS连接器等, 这类连接器可以让使用者更加贴近业务逻辑减少对于底层实现的关注
  • 场景应用之中的三类形态:

  1. 连接器输出-不含runtime,将连接器当做封装好的二方包使用

优点:可控性强,门槛低,无接入成本

缺点:代码侵入

  1. 生成源码+运行时-由开发态完成流程开发,codeGen生成流程源码,有runtime加载并独立运行

优点:可视化编排交付; 独立部署,无代码入侵; 生成代码与手写差异不大,阅读门槛较低; 可基于生成代码二次开发

                缺点:调试时需要一定程度理解运行时引擎。

  1. 平台输出

优点:可视化编码交付与迭代;  托管平台部署运维,日志查看断点调试;  集成流白屏化管理

缺点:需要一定学习成本; 编排能力依赖平台能力

与此同时, 我们可以进行源代码的开发, 既可以让使用的用户对所用模块知根知底, 同时也可以通过良好的可扩展性吸引更多的优秀连接器入驻, 与客户/伙伴共建连接器市场以及实际的连接应用场景, 实现资产的可沉淀性

集成产品功能

如导图所示, 集成产品将依赖于云巧市场以及大禹平台, 用户可以根据自己的需求使用上述三种方式的任意一种进行代码的开发; 我们也会根据常用的应用场景提供模板, 方便用户更快速的实现需求

连接器设计

力求简洁, GTSP连接器主要分为三个模块

  • Connector/ConnectorImpl
  • 连接器接口定义/核心实现功能类
  • connector定义的方法既可以作为连接器被GTSP作为流程节点调用, 又可以作为jar包直接被客户端集成
  • connector定义的方法未来会透出给前端, 因此其方法参数以及函数返回结果需要有较好的自解释性
  • 可以利用注解的方式为方法参数进行定义
  • 这就要求每个函数都声明自己必须使用的参数, 尽力去除无效参数; 而这有为函数参数形式的统一带来了挑战
  • Init
  • 连接器初始化, 为后续Spring-Boot等提供支持
  • Trigger
  • 可以作为开始节点的连接器
  • 连接器实现案例: 以全表查询为例, 为原生的jdbc进行部分能力增强
@Connector(name = "jdbc", title = "JDBC连接器")
public interface JdbcConnector {
    /**
     * 遍历查询全表, 不使用query中sql语句
     *
     * @param config
     * @param query  支持额外参数(AdvancedOptions):
     *               分页查询参数:
     *               int pageSize - default=10
     *               String waterColumn
     *               sql修饰参数:
     *               String selectColumns
     *               String whereCondition
     * @return
     */
    @Operation(name = "iterate", title = "遍历查询(返回map)")
    Iterator<Map<String, Object>> iterate(@Config JdbcConfig config, @Param Query query);
}
    public <T> Iterator<T> iterate(Query<T> query, JdbcConfig config) {
        NamedParameterJdbcTemplate jdbcTemplate = jdbcTemplate(config);
        String table = getTableNameFromAnnotationAndQuery(query);
        PageIteratorAsClass<T> pageIteratorAsClass = new PageIteratorAsClass<>(jdbcTemplate, query);
        return pageIteratorAsClass;
    }
}

全表查询迭代器Iterator实现类

public class PageIteratorAsClass<T> implements Iterator<T> {
    @Override
    public boolean hasNext() {
        if (iterator == null || iterator.hasNext() == false) poll();
        return iterator.hasNext();
    }
    @Override
    public T next() {
        T t = iterator.next();
        if (t != null) {
            updateDataIndex(t);
        }
        return t;
    }
    private void updateDataIndex(T data) {
        dataMark!=null?updateIndexWithDataMark():updateIndexWithOutDataMark();
    }
    /*
     * current, limit only get supported by mysql
     * so this connector only works for rds-mysql now
     */
    private void poll() {
        String sql = generateNextSQL();
        LOG.info("Execute " + sql);
        iterator = jdbcTemplate.query(sql, connectorRowMapper).stream().iterator();
    }
}

连接器原生使用方式:

public void init() {
        jdbcConnector = new JdbcConnectorImpl();
        jdbcConfig = new JdbcConfig();
        setJdbcConfig(jdbcConfig);
    }
    public void iterate() {
        Query<MidwayData> query = new Query<>();
        query.setClazz(MidwayData.class); //只需在query中注明DO类, 即可实现全表查询
        // 更多设置可以在query中进行定义, 本方法反射方式读取mybatis注解属性构造sql语句
        Iterator<MidwayData> iterator = jdbcConnector.iterate(query, jdbcConfig);
        while (iterator.hasNext()) {
            System.out.println(iterator.next().toString());
        }
    }

CodeGen实现方式

codegen代码形式:

public void test() throws Exception {
        FlowGeneratorImpl generator = new FlowGeneratorImpl();
        generator.setNodeMetaProvider(new NodeMetaProviderImpl());
        FlowModel flowModel = new FlowModel();
        NodeModel nodeModel = new NodeModel();
        setNodeModelProperty(nodeModel);
        NodeModel nodeModel2 = new NodeModel();
        setNodeModelProperty(nodeModel2);
        flowModel.getNodes().add(nodeModel).add(nodeModel2);
        String code = generator.generate(flowModel);
        System.out.println(code);
    }

生成代码形式

/** codegen by engine @Date Wed Sep 28 17:47:13 CST 2022 */
public class nullFlow implements Flow {
    @Resource 
    private HttpConnector httpConnector;
    @Override
    public void execute(FlowContext context) {
        /* step 1 invoke httpTest  */
        httpTest(context);
        /* step 2 invoke httpTest2  */
        httpTest2(context);
    }
    private void httpTest(FlowContext context) {
        HttpConfig config = context.getConfig("httpConfigxxx", HttpConfig.class);
        HttpRequest request = new HttpRequest();
        request.setMethod("GET");
        HttpResponse response = httpConnector.execute(config, request);
        context.putResult("httpTest", response);
    }
    private void httpTest2(FlowContext context) {
        HttpConfig config = context.getConfig("httpConfigxxx2", HttpConfig.class);
        HttpRequest request = new HttpRequest();
        request.setMethod("POST");
        request.setBody(context.getValue("#input.body", String.class));
        request.setContentType("application/json");
        HttpResponse response = httpConnector.execute(config, request);
        context.putResult("httpTest2", response);
    }
}

总结与展望

如何更好地在实际项目中应用集成平台, 这本身就是一个见仁见智的问题. 本文尝试从传统的code与新型的no-code方式之中进行一个折中的尝试; 希望能够通过更加灵活/便捷的使用方式, 为项目中的数据集成提供更大的价值.

问题讨论

上文中, 我们发现交付项目中存在的种种需要数据集成而又不利于集成平台存在的场景, 针对于此GTSP则灵活提供三类配置, 其中 连接器输出以及生成源码(CodeGen)+运行时这两个模式可以较好的减弱人员上手成本以及降低外部依赖性, 使用此方式开发效率相较于传统code方式能够获得一定提效, 可以增强代码的复用性, 相较于集成平台式的界面开发仍然不足之处, 并且对于接口文档的依赖以及接口易用性/可靠性的要求也较高. 更好地解决方式目前仍然在探索中, 或许一个易于使用同时方便理解的架构平台会是一种解决方式, 只是此时仍未可知.

读者若有更多的见解, 欢迎留言进行讨论, 不胜感激

相关文章
|
15天前
|
Java 测试技术 持续交付
自动化测试实践:从单元测试到集成测试
【6月更文挑战第28天】-单元测试:聚焦代码最小单元,确保每个函数或模块按预期工作。使用测试框架(如JUnit, unittest),编写覆盖所有功能和边界的测试用例,持续集成确保每次变更后自动测试。 - 集成测试:关注模块间交互,检查协同工作。选择集成策略,编写集成测试用例,模拟真实环境执行测试,整合到CI/CD流程以持续验证软件稳定性。 自动化测试提升软件质量,降低成本,加速开发周期,是现代软件开发不可或缺的部分。
|
14天前
|
人工智能 自然语言处理 数据挖掘
利用AI集成工具提升工作效率的实践经验
随着人工智能技术的蓬勃发展,以及当今数字化快速发展的时代,人工智能的运用已经渗透到各个行业和工作领域中,大语言模型在自然语言处理领域的应用也愈发广泛,而且市面上涌现出一批AI集成工具,比如Langchain、Dify、llamaIndex、fastgpt、百炼等,它们为开发者提供了强大的支持和便利,极大地提升了AI模型的构建和管理效率。作为一名热衷于利用新技术提高工作效率的开发者,我也积极尝试将这些工具融入到我的日常工作中,以期望提升工作效率和质量,下面我将分享我是如何使用AI集成工具来提升工作效率的,以及实践经验和心得。
51 1
利用AI集成工具提升工作效率的实践经验
|
8天前
|
Java jenkins 持续交付
Java中的版本控制与持续集成实践
Java中的版本控制与持续集成实践
|
1月前
|
人工智能 移动开发 IDE
安利几款与钉钉平台无缝集成打通账号认证的企业文档管理系统
钉钉是很多中小企业都爱用的产品,开通账号就能直接使用了,应用生态非常丰富,尤其是AI技术的应用,走在行业前列。但仍有很多企业对于全面拥抱SaaS服务充满了顾虑,尤其在内部资料的管理这块,即使钉钉在线文档已经提供了非常优秀的协作体验,不少客户仍更偏爱私有部署在局域网里面的企业文档管理系统。那么能将企业内部部署的文档管理系统集成到钉钉平台上面,和钉钉文档并行使用呢?市面上又有哪些企业文档管理系统软件支持与钉钉的集成呢?这也是很多企业客户的疑问。
安利几款与钉钉平台无缝集成打通账号认证的企业文档管理系统
|
11天前
|
机器学习/深度学习 分布式计算 算法
Java中的机器学习模型集成与训练实践
Java中的机器学习模型集成与训练实践
|
18天前
|
监控 安全 搜索推荐
企业应用集成(EAI):连接企业系统的技术探索
【6月更文挑战第25天】企业应用集成(EAI)技术连接异构系统,实现数据共享和业务流程优化。EAI包括界面、业务过程、应用和数据集成,提升协同效率、降低成本、改善客户体验、支持创新及强化风险管控。实施涉及规划、需求分析、选择方案、开发测试、部署监控及维护优化。EAI在企业信息化中扮演关键角色。
|
18天前
|
监控 Devops 测试技术
DevOps实践:持续集成与持续部署(CI/CD)在现代软件开发中的作用
【6月更文挑战第24天】本文深入探讨了持续集成(Continuous Integration,简称CI)和持续部署(Continuous Deployment,简称CD)在现代软件开发生命周期中的核心作用。通过阐述CI/CD的概念、优势以及实施策略,文章旨在为读者提供一套完整的理论框架和实践指南,以促进软件开发流程的高效性和产品质量的提升。
|
25天前
|
运维 监控 Devops
DevOps实践:持续集成与持续部署(CI/CD)的精髓
【6月更文挑战第17天】本文将深入探讨DevOps文化中的核心实践——持续集成(CI)和持续部署(CD)。我们将从理论出发,逐步过渡到实际操作,介绍如何搭建一个高效的CI/CD流程。文章将涵盖工具选择、流程设计、自动化测试以及监控和反馈机制的建立。通过具体案例分析,揭示成功实施CI/CD的关键因素,并探讨如何在组织内推广这一实践以促进开发和运维之间的协作。
|
4天前
|
Java jenkins 测试技术
Java中的自动化测试与持续集成实践
Java中的自动化测试与持续集成实践
|
8天前
|
敏捷开发 jenkins 测试技术
软件测试中的敏捷实践:持续集成与自动化测试的融合之道
在软件开发领域,敏捷实践已成为提升项目响应速度和质量保证的重要手段。本文深入探讨了持续集成(CI)与自动化测试如何相辅相成,共同构建起一套高效、可靠的软件测试体系。通过实例分析,揭示了将敏捷原则应用于软件测试过程中的关键策略,并提出了实现这一目标的可行路径。文章旨在为软件测试专业人员提供一套结合敏捷理念的实用方法论,以应对快速迭代的软件项目需求。