Cassandra JAVA客户端是如何做到高性能高并发的

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Cassandra Java驱动程序 本文翻译至:https://beyondthelines.net/databases/the-cassandra-java-driver/同时也加上了作者阅读源码后的观后感,丰富了很多细节。

Cassandra Java驱动程序

Cassandra驱动程序不是将CQL字符串发送到Cassandra节点并等待响应的傻瓜程序

它们实际上很聪明,并且以某种方式组织的,使您易于使用,工作更开心,同时仍然尝试从Cassandra中获得最大的性能。

在本文中,我将重点介绍Java驱动程序,快速了解其体系结构及其提供的某些功能。

快速使用

3.x版本

Cluster cluster = Cluster.builder().addContactPoints(contactPoints).withPort(port).build();
session = cluster.connect();
ResultSet results = session.execute(query);
for (Row row : results) {
//TODO: access row;
}

4.x版本

session = CqlSession.builder().build();
ResultSet results = session.execute(query);
for (Row row : results) {
//TODO: access row;
}

配置application.conf,放在java进程的classpath下

datastax-java-driver {
  basic.contact-points = ["127.0.0.1:9042"]
  basic {
    load-balancing-policy {
      local-datacenter = datacenter1
    }
  }

可以看到4.x完全移除了Cluster这个类,一个会话会创建n个pool(n=node个数),一个pool就是一个连接池,拥有若干个连接,请求都是异步的,所以一个连接也是可以同时发送多个request,这种我们称之为inFlight
image

因为目前主流的客户端还是3.x,下面我们重点介绍3.x版本

架构

Cassandra Java驱动程序提供了一个异步API。请注意,它还提供了一个同步API,但由于它是基于异步API的,并且我不想在我的应用程序线程与Cassandra交互时夯住,因此我不准备介绍它。
image

让我们自底向上研究一下驱动程序各个组件

连接

最底部是与Cassandra节点的连接。Cassandra协议是完全异步的。这意味着我们可以通过同一连接发送多个请求。在发送下一个请求之前,我们不必等待单个请求完成。每个请求都由流ID标识,并且在响应中也设置了该ID,以便driver可以将响应与相应的请求进行关联。
该驱动程序依靠Netty执行异步IO操作。
一旦将请求发送到连接会话,executeAsync将返回Future,然后在接收到相应的响应(或发生超时异常)时使用Promise完成。
正在进行的请求(也称为“in-flight”请求)存储在队列中。队列已满时,您将无法再将查询发送到Cassandra。executeAsync将返回失败的future(jdk8异步作业句柄)。在版本3.1之前,调用线程处于阻塞状态,等待有可用的连接。当然,队列大小可以在poolingOptions中配置。

val poolingOptions = new PoolingOptions()
poolingOptions
  .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
  .setMaxRequestsPerConnection(HostDistance.REMOTE, 2000)

val cluster = Cluster.builder()
    .withContactPoints("127.0.0.1")
    .withPoolingOptions(poolingOptions)
    .build()

默认值非常低(本地连接为1024,远程连接为256)。256在生产环境很容易就用满,因此,我建议您根据需要调整这些值。
使用TCP保持活动状态或发送应用程序心跳以保持连接打开,以保持连接打开。

连接池

连接属于连接池。驱动程序为每个Cassandra节点维护一个连接池。连接池也可以通过poolingOptions进行配置。

poolingOptions
    .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
    .setConnectionsPerHost(HostDistance.REMOTE, 2, 4)

主要配置是池大小。可用连接的数量可以根据负载在核心和最大数量之间变化。我们还可以为本地或远程数据中心设置不同的设置。当连接闲置时间过长时,连接将关闭,直到池大小达到其核心大小为止。

会话

连接池属于会话。

会话也是应用程序用于与Cassandra通信的对象。
该层为应用程序抽象所有连接管理。

val session = cluster.connect()

Session提供了所有与Cassandra通信的API,例如session.executeAsync,它允许应用程序向Cassandra发送请求,或者session.getState允许我们监控后端主机和进行中的查询。

cluster

群集是顶层抽象。在这里,我们可以配置所有内容,例如指定池选项,负载平衡策略,重试策略或默认一致性级别。
val cluster = Cluster.builder()
.withContactPoints("127.0.0.1")
.withPoolingOptions(poolingOptions)
.withLoadBalancingPolicy(new RoundRobinPolicy())
.withQueryOptions(
new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
)
.build()

Bootstrapping

当驱动程序首次连接到种子节点之一时,它会建立一个控制连接,用于发现群集拓扑。基本上,它查询Cassandra中的系统表。
引导启动时,从种子节点列表中随机选择种子节点,以避免在初始群集拓扑中始终使用相同的节点。

负载均衡

负载平衡负责建立与整个Cassandra集群(不仅在一个节点上)的连接,并维护与集群中每个主机的连接池。
它具有将某些请求发送到某些节点的逻辑。与哪些主机建立连接以及向哪些主机发送请求由负载平衡策略确定。
实际上,对每个请求都会算出一个查询计划。查询计划确定向哪个主机发送请求以及以哪个顺序发送(取决于推测执行策略和重试策略)。
负载平衡还确定主机是本地主机还是远程主机(跟客户端DCAware配置有关)。
如果默认策略不够用,可以编写自定义负载平衡策略。
image

驱动程序从请求中提取partitionKey,并使用正确的哈希算法路由到持有该分区的Cassandra节点。

默认策略是DatacenterAwareLoadBalancingPolicy。拥有如下两特性

  • 数据中心感知:确定哪些节点属于本地数据中心,哪些节点属于远程数据中心。然后,驱动程序仅将请求发送到本地数据中心,并将远程数据中心用作备用。
  • 令牌感知:查找请求的分区键,并使用与群集相同的算法对其进行哈希处理。然后,它将请求发送到负责令牌的节点(在该分区的副本中随机选择)。
    使用DDCAwareRoundRobinPolicy时可以指定本地数据中心:
    Cluster cluster = Cluster.builder()
    .addContactPoint("127.0.0.1")
    .withLoadBalancingPolicy(
      DCAwareRoundRobinPolicy.builder()
        .withLocalDc("myLocalDC")
        .withUsedHostsPerRemoteDc(2)
        .allowRemoteDCsForLocalConsistencyLevel()
        .build()
      )
    )
    .build()
    

容错能力

image

错误主要有3种:

  • 无效的请求:错误直接返回应用上层,因为驱动程序无法知道如何处理此类请求
  • 服务器错误:驱动程序可以根据负载平衡策略尝试下一个节点
  • 网络超时:如果请求被标记为幂等,则驱动程序可以重试该请求。默认情况下,请求不被认为是幂等的,因此在可能的情况下将请求尽量标记是一个好习惯。
    对于幂等请求,如果在一定的延迟内没有来自第一节点的响应,则驱动程序可以将请求发送到第二节点。这称为“推测重试”,用SpeculativeExecutionPolicy进行配置。
    val cluster = Cluster.builder()
    .addContactPoint("127.0.0.1")
    .withSpeculativeExecutionPolicy(
      new ConstantSpeculativeExecutionPolicy(
        500, // delay before a new execution is launched
        2    // maximum number of executions
      )
    )
    .build()
    

    结论

    感谢datastax为我们提供了这么强大的客户端,Java驱动程序值得花一些时间来了解其体系结构以及如何正确配置它(每个连接的最大请求尤为重要,因为我发现默认值不是很合适–配置本地数据中心也很重要,否则驱动程序可能会连接到远程数据中心)。

入群邀约

为了营造一个开放的 Cassandra 技术交流环境,社区建立了微信群公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另外阿里云提供免费Cassandra试用:https://www.aliyun.com/product/cds

相关文章
|
5月前
|
缓存 算法 Java
Java本地高性能缓存实践问题之Java本地高性能缓存实践的问题如何解决
Java本地高性能缓存实践问题之Java本地高性能缓存实践的问题如何解决
|
5月前
|
缓存 Java
Java本地高性能缓存实践问题之创建一个AsyncCache实例的问题如何解决
Java本地高性能缓存实践问题之创建一个AsyncCache实例的问题如何解决
|
6月前
|
存储 NoSQL Java
探索Java分布式锁:在高并发环境下的同步访问实现与优化
【7月更文挑战第1天】在分布式系统中,Java分布式锁解决了多节点共享资源的同步访问问题,确保数据一致性。常见的实现包括Redis的SETNX和过期时间、ZooKeeper的临时有序节点、数据库操作及Java并发库。优化策略涉及锁超时、续期、公平性及性能。选择合适的锁策略对高并发系统的稳定性和性能至关重要。
206 0
|
8月前
|
缓存 算法 Java
Java本地高性能缓存实践
本篇博文将首先介绍常见的本地缓存技术,对本地缓存有个大概的了解;其次介绍本地缓存中号称性能最好的Cache,可以探讨看看到底有多好?怎么做到这么好?最后通过几个实战样例,在日常工作中应用高性能的本地缓存。
|
消息中间件 NoSQL Java
Java中分布式概念
将一个大的系统划分为多个业务模块,业务模块分别部署到不同的机器上,各个业务模块之间通过接口进行数据交互。区别分布式的方式是根据不同机器不同业务。 微服务的设计是为了不因为某个模块的升级和BUG影响现有的系统业务。 微服务与分布式的细微差别是,微服务的应用不一定是分散在多个服务器上,可以是同一个服务器。
170 0
史上最全499道Java面试题:JVM+分布式+算法+锁+MQ+微服务+数据库
JAVA中的几种基本数据类型是什么,各自占用多少字节。 String类能被继承吗,为什么。 String,Stringbuffer,StringBuilder的区别。 ArrayList和LinkedList有什么区别。 讲讲类的实例化顺序,比如父类静态数据,构造函数,字段,子类静态数据,构造函数,字段,当new的时候,他们的执行顺序。 用过哪些Map类,都有什么区别,HashMap是线程安全的吗,并发下使用的Map是什么,他们内部原理分别是什么,比如存储方式,hashcode,扩容,默认容量等。
|
消息中间件 Java Kafka
Java 最常见的面试题:什么情况会导致 kafka 运行变慢?
Java 最常见的面试题:什么情况会导致 kafka 运行变慢?
|
JSON 自然语言处理 Java
来聊一聊 ElasticSearch 最新版的 Java 客户端
来聊一聊 ElasticSearch 最新版的 Java 客户端
|
SQL Java 数据库连接
史上最全的java分布式锁的5种实现方式
要实现Excel一万条数据批量导入,可以使用Apache POI库来读取和解析Excel文件,并使用JDBC连接数据库将数据批量插入。
166 0
|
消息中间件 安全 算法
神马操作!Kafka 竟然宣布弃用 Java 8
第一条就是宣布弃用对 Java 8 和 Scala 2.12 的支持!!! 在 Kafka 3.0.0 中,Kafka 项目中的所有组件都已弃用对 Java 8、Scala 2.12 的支持,宣布弃用,但 3.0.0 还能用,这次宣布只是给用户一个调整的时间,到了 Kafka 4.0,Java 8、Scala 2.12 将将正式取消支持。 其实,其他一些中间件也早有停止对 Java 8 的支持,选择 Java 11 作为最低支持版本,但 Kafka,这可能是自 Java 17 发布以来,也就是近半月时间,官宣弃用 Java 8 打响的第一枪。。 Java 8 虽然有点老了,但依然是现在市