springboot 整合 apache camel实现企业级数据集成和处理

简介: springboot 整合 apache camel实现企业级数据集成和处理

Apache Camel是一个集成框架,它具有用于集成各种应用程序的编程模型。


对于需要在不同的微服务和其他上下游系统(如数据库和消息传递系统)之间进行通信的微服务体系结构,它非常适合。


在本文中,我们将在代码示例的帮助下,使用Apache Camel在用Spring Boot构建的微服务应用程序中构建集成逻辑。


Apache Camel 介绍

如开始所述,Apache Camel是一个集成框架。camel可以做到:


路由:将数据有效负载(也称为“消息”)从源系统发送到目标系统

中介:消息处理,如基于一个或多个消息属性过滤消息、修改消息的某些字段、通过API调用进行充实等。

在集成过程中使用的Apache Camel的一些重要概念如下图所示:


65.png


下面来介绍下camel有个基础的了解


Camel 上下文

camel上下文是所有camel构造的运行时容器,并执行路由规则。Camel上下文在启动时通过加载执行路由规则所需的所有资源来激活路由规则。


Camel上下文由CamelContext接口描述,如果在Spring容器中运行,默认情况下会自动配置。


路由和端点

Route是最基本的构造,我们用它来定义消息从源移动到目的地时应该采取的路径。我们使用领域特定语言(DSL)定义路由。


在Camel上下文中加载路由,并在触发路由时用于执行路由逻辑。每条路由都由Camel上下文中的唯一标识符标识。


端点表示消息的源和目的地。它们通常通过它们的uri在领域特定语言(DSL)中被引用。端点的例子可以是web应用程序的URL或消息传递系统的源或目的地。


领域特定语言 (DSL)

我们用各种领域特定语言(DSL)在Apache Camel中定义路由。Java DSL和Spring XML DSL是Spring应用程序中使用的两种主要类型的DSL。


下面是一个在Java DSL中使用RouteBuilder类定义路由的例子:

RouteBuilder builder = new RouteBuilder() {
      @Override
      public void configure() throws Exception {
        // Route definition in Java DSL for
        // moving file from jms queue to file system.
        from("jms:queue:myQueue").to("file://mysrc");
      }
    };

在这里,我们使用RouteBuilder类定义了一个以JMS队列为源、文件端点为目的地的路由。RouteBuilder类使用DSL创建路由规则。RouteBuilder类的实例被添加到Camel上下文中。


使用Spring XML DSL定义的相同路由如下所示:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
      http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans.xsd
      http://camel.apache.org/schema/spring
      http://camel.apache.org/schema/spring/camel-spring.xsd" >
  <camelContext id="sendtoqueue"
                  xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="jms:queue:myQueue"/>
      <to uri="file://mysrc"/>
    </route>
  </camelContext>
</beans>

组件

消息从源到目的地的传输经过多个步骤。每个步骤的处理可能需要连接到消息流中的不同类型的资源,比如调用bean方法或调用API。我们使用组件来执行连接到这些资源的功能。


例如,在Java DSL中使用RouteBuilder类定义的路由使用文件组件桥接到文件系统,使用jms组件桥接到jms提供程序。

RouteBuilder builder = new RouteBuilder() {
      @Override
      public void configure() throws Exception {
        // Route definition in Java DSL for
        // moving file from jms queue to file system.
        from("jms:queue:myQueue").to("file://mysrc");
      }
    };

Camel有几个预先构建的组件和许多由社区构建的其他组件。下面是Camel中可用组件的一个片段,它让我们了解了使用这个框架可以集成的系统的范围:


  • ActiveMQ
  • AMQP
  • Async HTTP Client
  • Atom
  • Avro RPC
  • AWS2 DynamoDB
  • AWS2 Lambda
  • AWS2 SQS
  • AWS2 SNS
  • Azure CosmosDB
  • Azure Storage Blob
  • Azure Storage Queue
  • Bean
  • Cassandra CQL
  • Consul
  • CouchDB
  • Cron
  • Direct
  • Docker
  • Elasticsearch
  • Facebook
  • FTP
  • Google Cloud Storage
  • Google Cloud Function
  • GraphQL
  • Google Pubsub
  • gRPC
  • HTTP

这些函数被分组在单独的Jar文件中。根据我们使用的组件,我们需要引入相应的Jar依赖项。


在我们的示例,我们需要引入Camel - JMS依赖项,并通过参考Camel JMS组件的文档来使用该组件。


我们还可以通过实现Component接口来构建自己的组件。


在Spring Boot中使用Apache Camel

camel 对Spring Boot的支持包括camel上下文和许多camel组件的启动器的自动配置。Camel上下文的自动配置检测在Spring上下文中可用的Camel路由,并将关键的Camel实用程序(如生产者模板、消费者模板和类型转换器)注册为Spring bean。


让我们通过一个例子来理解这一点。我们将设置一个简单的路由来调用bean方法,并从REST端点调用该路由。


让我们首先在Spring Boot Initializr的帮助下创建一个Spring Boot项目,然后在我们喜欢的IDE中打开这个项目。


添加依赖项

Apache Camel提供了一个Spring Boot Starter模块,允许我们在Spring Boot应用程序中使用Camel。


让我们首先将Camel Spring Boot BOM添加到我们的Maven pom .xml中:

<dependencyManagement>
  <dependencies>
    <!-- Camel BOM -->
    <dependency>
      <groupId>org.apache.camel.springboot</groupId>
      <artifactId>camel-spring-boot-bom</artifactId>
      <version>${project.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <!-- ... other BOMs or dependencies ... -->
  </dependencies>
</dependencyManagement>

camel-spring-boot-bom 包含了所有Camel Spring Boot starter 的jar文件.


接下来 让我们添加 Camel Spring Boot starter:

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-spring-boot-starter</artifactId>
</dependency>

添加 camel-spring-boot-starter 设置 Camel 上下文.


我们需要进一步为Spring Boot应用程序所需 starter

<dependency>
  <groupId>org.apache.camel.springboot</groupId>
  <artifactId>camel-servlet-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.springboot</groupId>
  <artifactId>camel-jackson-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.springboot</groupId>
  <artifactId>camel-swagger-java-starter</artifactId>
</dependency>

在这里,我们添加了三个依赖于使用servlet、jackson和swagger组件的启动器,它们将执行以下功能:


  1. servlet组件将提供基于HTTP 接口。
  2. jackson组件用于java和json对象的转换。
  3. swagger根据controller接口反向生成接口文档。

使用Java DSL的RouteBuilder定义路由

现在让我们使用Spring bean方法创建一个获取产品的路由。我们通过扩展RouteBuilder类并覆盖其配置方法来创建camel路由,以Java域特定语言(DSL)定义路由规则。


每个路由器类实例化一次,然后用CamelContext对象注册。


我们的类包含了使用Java域特定语言(DSL)定义的路由规则,代码如下:


@Component
public class FetchProductsRoute extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    from("direct:fetchProducts")
      .routeId("direct-fetchProducts")
      .tracing()
      .log(">>> ${body}")
      .bean(ProductService.class, "fetchProductsByCategory")
      .end();
  }
}

在这里,我们通过扩展RouteBuilder类,在FetchProductsRoute类中定义Java DSL来创建路由。我们将端点定义为direct:fetchProducts,并提供了一个路由标识符direct-fetchProducts。前缀direct:在端点的名称中,可以使用direct Camel组件从另一个Camel路由调用该路由。


使用模板触发路由

我们可以使用ProducerTemplate和ConsumerTemplate来调用路由。ProducerTemplate提供了向Camel端点发送消息的封装方法。


这两个模板都类似于Spring框架中的模板工具类程序类,如JmsTemplate或JdbcTemplate,它们简化了对JMS和JDBC api的访问。


让我们调用上一步中创建的路由:

@RestController
public class ProductResource {
  @Autowired
  private ProducerTemplate producerTemplate;
  @GetMapping("/products/{category}")
  @ResponseBody
  public List<Product> getProductsByCategory(
                @PathVariable("category") final String category){
    producerTemplate.start();
    List<Product> products = producerTemplate
       .requestBody("direct:fetchProducts", category, List.class);
    producerTemplate.stop();
    return products;
  }
}
@Configuration
public class AppConfig {
  @Autowired
  private  CamelContext camelContext;
  ...
  ...
  @Bean
  ProducerTemplate producerTemplate() {
    return camelContext.createProducerTemplate();
  }
  @Bean
  ConsumerTemplate consumerTemplate() {
    return camelContext.createConsumerTemplate();
  }
}


这里,我们在定义了一个REST端点,该端点带有一个GET方法,用于按类别获取产品。通过使用在Spring配置中配置的producerTemplate,我们将在该方法内部调用camel路由。


在我们的Spring配置中,我们通过调用CamelContext中的相应方法定义了producerTemplate和consumerTemplate, CamelContext在ApplicationContext中是可用的。


使用拆分器-聚合器企业集成模式定义路由

现在让我们看一下将使用企业集成模式的路线。


Camel为Gregor Hohpe和Bobby Woolf所著的书中描述的许多企业集成模式提供了实现。在我们的示例中,我们将使用Splitter和Aggregator集成模式。


我们可以使用Splitter将单个消息分割为多个片段,并分别处理它们。然后,我们可以使用Aggregator将这些单独的片段组合成单个消息。


选择企业集成模式 (EIP)

在尝试构建我们的集成逻辑之前,我们应该寻找最适合实现我们的用例的集成模式。


让我们来看一个使用Splitter和Aggregate集成模式定义路由的示例。在这里,我们将考虑一个假设的场景,为电子商务应用程序构建一个REST API,用于处理客户下的订单。我们将期望我们的订单处理API执行以下步骤:


  1. 从购物车中获取商品列表
  2. 获取购物车中每个订单项的价格
  3. 计算所有订单项目的价格之和,以生成订单发票。

在完成步骤1之后,我们想获取步骤2中每个订单行项目的价格。我们想要并行获取它们,因为它们彼此不依赖。有多种方法可以进行这种处理。


然而,由于设计模式是给定上下文中重复出现问题的公认解决方案,因此我们将从企业集成模式列表中寻找与我们的问题非常相似的模式。在浏览列表之后,我们发现Splitter和Aggregator模式最适合进行此处理。


应用企业集成模式 (EIP)

接下来,我们将参考Apache Camel的文档,了解如何使用Splitter和Aggregator集成模式来构建路由。


让我们执行以下步骤来应用这些模式:


从购物车中获取订单行,然后使用Splitter EIP将它们拆分为单独的订单行项目。

对于每个订单项目,获取价格,应用折扣等。这些步骤并行运行。

从实现AggregationStrategy接口的PriceAggregationStrategy类中的每一行项目聚合价格。

代码如下

@Component
public class OrderProcessingRoute extends RouteBuilder {
  @Autowired
  private PriceAggregationStrategy priceAggregationStrategy;
  @Override
  public void configure() throws Exception {
    from("direct:fetchProcess")
    .split(body(), priceAggregationStrategy).parallelProcessing()
    .to("bean:pricingService?method=calculatePrice")
    .end();
  }
}
@Component
public class PriceAggregationStrategy implements AggregationStrategy{
  @Override
  public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    OrderLine newBody = newExchange.getIn().getBody(OrderLine.class);
        if (oldExchange == null) {
            Order order = new Order();
            order.setOrderNo(UUID.randomUUID().toString());
            order.setOrderDate(Instant.now().toString());
            order.setOrderPrice(newBody.getPrice());
            order.addOrderLine(newBody);
            newExchange.getIn().setBody(order, Order.class);
            return newExchange;
        }
        OrderLine newOrderLine = newExchange.getIn()
                                .getBody(OrderLine.class);
        Order order = oldExchange.getIn().getBody(Order.class);
        order.setOrderPrice(order.getOrderPrice() + newOrderLine.getPrice());
        order.addOrderLine(newOrderLine);
        oldExchange.getIn().setBody(order);
        return oldExchange;
  }
}
@Service
public class PricingService {
  public OrderLine calculatePrice(final OrderLine orderLine ) {
    String category = orderLine.getProduct().getProductCategory();
    if("Electronics".equalsIgnoreCase(category))
       orderLine.setPrice(300.0);
...
...
    return orderLine;
  }

这里我们在Java DSL中定义了一个路由,它将传入消息(订单行集合)分割为单独的订单行项目。每个订单行项目被发送到PricingService类的calculatePrice方法,以计算项目的价格。


接下来,我们在拆分步骤之后绑定了一个聚合器。聚合器实现了AggregationStrategy接口,我们的聚合逻辑位于覆盖的aggregate()方法中。在aggregate()方法中,我们获取每个订单行项,并将它们合并到单个订单对象中。


使用REST风格DSL的拆分聚合器模式来消费路由

接下来,让我们在Apache Camel中使用REST风格的DSL来定义带有HTTP请求方式(如GET、POST、PUT和DELETE)的REST api。实际的REST传输是使用camel REST组件(如Netty HTTP、Servlet)。


要在Java中使用Rest DSL,我们需要扩展RouteBuilder类,并在configure方法中定义路由,类似于前面创建常规Camel路由的方法。


让我们通过使用Java DSL中的REST构造来定义API来定义一个用于处理订单的REST服务。我们还将基于OpenAPI规范(OAS)生成API规范:

@Component
public class RestApiRoute  extends RouteBuilder {
  @Autowired
  private Environment env;
  @Override
  public void configure() throws Exception {
    restConfiguration()
        .contextPath("/ecommapp")
        .apiContextPath("/api-doc")
        .apiProperty("api.title", "REST API for processing Order")
        .apiProperty("api.version", "1.0")
        .apiProperty("cors", "true")
        .apiContextRouteId("doc-api")
        .port(env.getProperty("server.port", "8080"))
        .bindingMode(RestBindingMode.json);
    rest("/order/")
    .get("/process").description("Process order")
    .route().routeId("orders-api")
    .bean(OrderService.class, "generateOrder")
    .to("direct:fetchProcess")
    .endRest();
  }

这定义了一个具有URL映射/order/process 的GET类型的REST服务。


然后,我们使用Splitter和Aggregator Enterprise Integration模式直接路由到名为direct:fetchProcess的路由的Camel端点,该模式是我们之前在DSL中使用to构造创建的。


何时使用 Apache Camel

正如我们在示例中看到的,我们可以通过自定义编码轻松地完成上述任务,而不用使用Apache Camel。让我们了解一下我们应该考虑使用Apache Camel来满足我们的集成需求的一些情况:


  1. 具有丰富组件集的Apache Camel对于需要通过不同协议(如文件、api或JMS队列)与系统集成的应用程序非常有用。
  2. Apache Camel的实现了企业集成模式,满足非常复杂的集成场景。
  3. 微服务中的编排和编排可以用Apache Camel路由中的领域特定语言来定义。路由有助于保持核心业务逻辑与通信逻辑解耦,并满足SRP(单一责任原则)。
  4. Apache Camel非常适合Java和Spring应用程序。
  5. 使用Java对象(pojo): Apache Camel是一个Java框架,因此它特别擅长使用Java对象。因此,如果我们使用的是XML、JSON等文件格式,可以反序列化为Java对象,那么Camel就可以轻松地对其进行处理。

相反,我们应该避免在以下场景中使用Apache camel:


  • 如果我们的集成只是简单调用少量api
  • Camel在处理大量数据时表现不佳
  • Camel也不适合缺乏Java技能的团队

一般来说,caeml的最佳用例是,我们有一个数据源,我们希望从其中消费数据,比如队列上的传入消息,或者从API和目标中获取数据,我们希望将数据发送到这些数据源。


结论

在本文中,我们了解了Apache Camel的重要概念,并使用它在Spring Boot应用程序中构建集成逻辑。以下是我们讨论的内容的摘要:


Apache Camel是一个集成框架,提供了一个编程模型以及许多企业集成模式的实现。

我们使用不同类型的领域特定语言(DSL)来定义消息的路由规则。

Route是我们用DSL指定的最基本的构造,用来定义消息从源移动到目的地时应该采取的路径。

Camel上下文是执行 Camel路由的运行时容器。

我们使用Splitter和Aggregator企业集成模式构建了一条路由,并从REST DSL调用它来演示通过应用企业集成模式来解决集成问题。

最后,我们研究了使用Apache Camel将使我们受益的一些场景。

希望这篇文章对大家有帮助


相关文章
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
86 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
71 1
|
3月前
|
NoSQL Java Redis
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
这篇文章介绍了如何使用Spring Boot整合Apache Shiro框架进行后端开发,包括认证和授权流程,并使用Redis存储Token以及MD5加密用户密码。
58 0
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
|
3月前
|
Java 测试技术 API
如何在 Apache JMeter 中集成 Elastic APM
如何在 Apache JMeter 中集成 Elastic APM
57 1
|
6月前
|
关系型数据库 API Apache
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
18526 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
|
5月前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
112 0
|
6月前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成
|
6月前
|
监控 Java API
使用Spring Boot构建企业级应用的实践
使用Spring Boot构建企业级应用的实践
|
6月前
|
测试技术 Windows
基于SpringBoot+Vue企业级工位管理系统(源码+部署说明+演示视频+源码介绍+lw)(3)
基于SpringBoot+Vue企业级工位管理系统(源码+部署说明+演示视频+源码介绍+lw)
78 0
|
6月前
|
数据库 数据安全/隐私保护 数据库管理
基于SpringBoot+Vue企业级工位管理系统(源码+部署说明+演示视频+源码介绍+lw)(2)
基于SpringBoot+Vue企业级工位管理系统(源码+部署说明+演示视频+源码介绍+lw)
64 0

热门文章

最新文章

推荐镜像

更多