浅谈轻量级socket连接池实现

简介: 浅谈轻量级socket连接池实现

1.背景

前段时间有幸参与到开源轻量级分布式文件系统Fastdfs Java-SDK的改造,


支持连接池和短连接两种方式,是否开启连接池可选(默认开启),短连接(用完即关闭)


2.如何实现

调研了连接池下,网上很多socket连接池都是用Apache Commons Pool来实现的,个人感觉可能有点重,所以就完全原生代码实现连接池开发了。


3.实现代码

·socket连接类

public class Connection {
  private Socket sock;
  private InetSocketAddress inetSockAddr;
  private Long lastAccessTime = System.currentTimeMillis();
  private boolean needActiveTest = false;
  public Connection(Socket sock, InetSocketAddress inetSockAddr) {
      this.sock = sock;
      this.inetSockAddr = inetSockAddr;
  }
  /**
   * get the server info
   *
   * @return the server info
   */
  public InetSocketAddress getInetSocketAddress() {
      return this.inetSockAddr;
  }
  public OutputStream getOutputStream() throws IOException {
      return this.sock.getOutputStream();
  }
  public InputStream getInputStream() throws IOException {
      return this.sock.getInputStream();
  }
  public Long getLastAccessTime() {
      return lastAccessTime;
  }
  public void setLastAccessTime(Long lastAccessTime) {
      this.lastAccessTime = lastAccessTime;
  }
  /**
   *
   * @throws IOException
   */
  public void close() throws IOException {
      //if connection enabled get from connection pool
      if (ClientGlobal.g_connection_pool_enabled) {
          ConnectionPool.closeConnection(this);
      } else {
          this.closeDirectly();
      }
  }
  public void release() throws IOException {
      if (ClientGlobal.g_connection_pool_enabled) {
          ConnectionPool.releaseConnection(this);
      } else {
          this.closeDirectly();
      }
  }
  /**
   * force close socket,
   */
  public void closeDirectly() throws IOException {
      if (this.sock != null) {
          try {
              ProtoCommon.closeSocket(this.sock);
          } finally {
              this.sock = null;
          }
      }
  }
  public boolean activeTest() throws IOException {
      if (this.sock == null) {
          return false;
      }
      return ProtoCommon.activeTest(this.sock);
  }
  public boolean isConnected() {
      boolean isConnected = false;
      if (sock != null) {
          if (sock.isConnected()) {
              isConnected = true;
          }
      }
      return isConnected;
  }
  public boolean isAvaliable() {
      if (isConnected()) {
          if (sock.getPort() == 0) {
              return false;
          }
          if (sock.getInetAddress() == null) {
              return false;
          }
          if (sock.getRemoteSocketAddress() == null) {
              return false;
          }
          if (sock.isInputShutdown()) {
              return false;
          }
          if (sock.isOutputShutdown()) {
              return false;
          }
          return true;
      }
      return false;
  }
  public boolean isNeedActiveTest() {
      return needActiveTest;
  }
  public void setNeedActiveTest(boolean needActiveTest) {
      this.needActiveTest = needActiveTest;
  }
}

·连接管理器(主要职责:获取连接,关闭连接,释放连接)

public class ConnectionManager {
    private InetSocketAddress inetSocketAddress;
    /**
     * count of total connections 
     */
    private AtomicInteger totalCount = new AtomicInteger();
    /**
     * count of free connections
     */
    private AtomicInteger freeCount = new AtomicInteger();
    /**
     * lock
     */
    private ReentrantLock lock = new ReentrantLock(true);
    private Condition condition = lock.newCondition();
    /**
     * free container connection  
     */
    private LinkedList<Connection> freeConnections = new LinkedList<Connection>();
    private ConnectionManager() {
    }
    public ConnectionManager(InetSocketAddress socketAddress) {
        this.inetSocketAddress = socketAddress;
    }
   /**
    * get connection
    **/
    public Connection getConnection() throws MyException {
        lock.lock();
        try {
            Connection connection = null;
            while (true) {
                if (freeCount.get() > 0) {
                    freeCount.decrementAndGet();
                    connection = freeConnections.poll();
                    if (!connection.isAvaliable() || (System.currentTimeMillis() - connection.getLastAccessTime()) > ClientGlobal.g_connection_pool_max_idle_time) {
                        closeConnection(connection);
                        continue;
                    }
                    if (connection.isNeedActiveTest()) {
                        boolean isActive = false;
                        try {
                            isActive = connection.activeTest();
                        } catch (IOException e) {
                            System.err.println("send to server[" + inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort() + "] active test fail ,emsg:" + e.getMessage());
                            isActive = false;
                        }
                        if (!isActive) {
                            closeConnection(connection);
                            continue;
                        } else {
                            connection.setNeedActiveTest(false);
                        }
                    }
                } else if (ClientGlobal.g_connection_pool_max_count_per_entry == 0 || totalCount.get() < ClientGlobal.g_connection_pool_max_count_per_entry) {
                    connection = ConnectionFactory.create(this.inetSocketAddress);
                    totalCount.incrementAndGet();
                } else {
                    try {
                        if (condition.await(ClientGlobal.g_connection_pool_max_wait_time_in_ms, TimeUnit.MILLISECONDS)) {
                            //wait single success
                            continue;
                        }
                        throw new MyException("connect to server " + inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort() + " fail, wait_time > " + ClientGlobal.g_connection_pool_max_wait_time_in_ms + "ms");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        throw new MyException("connect to server " + inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort() + " fail, emsg:" + e.getMessage());
                    }
                }
                return connection;
            }
        } finally {
            lock.unlock();
        }
    }
   /**
   *release connection
   */
    public void releaseConnection(Connection connection) {
        if (connection == null) {
            return;
        }
        lock.lock();
        try {
            connection.setLastAccessTime(System.currentTimeMillis());
            freeConnections.add(connection);
            freeCount.incrementAndGet();
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
    /**
     *close connection
     *
     */
    public void closeConnection(Connection connection) {
        try {
            if (connection != null) {
                totalCount.decrementAndGet();
                connection.closeDirectly();
            }
        } catch (IOException e) {
            System.err.println("close socket[" + inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort() + "] error ,emsg:" + e.getMessage());
            e.printStackTrace();
        }
    }
    public void setActiveTestFlag() {
        if (freeCount.get() > 0) {
            lock.lock();
            try {
                for (Connection freeConnection : freeConnections) {
                    freeConnection.setNeedActiveTest(true);
                }
            } finally {
                lock.unlock();
            }
        }
    }
}

注:其中setActiveTestFlag()方法解释一下,有可能出现连接断了之后,可能是这台服务器重启了,或者网络抽风导致闪断。希望只牺牲一次请求,主要解决服务器重启问题


大致思路就是一旦client请求时一旦有连接出现IOException,就会将所有当前实例对应的所有连接全部变为

需要activeTest,当下一个连接获取时,就会检测所有的连接,从而达到只牺牲一次请求的目的.

.

·管理器连接池(由于存在多个实例,一个实例对应一个ConnectionManager,连接获取,释放,关闭的入口)

实现代码

public class ConnectionPool {
    /**
     * key is ip:port, value is ConnectionManager
     */
    private final static ConcurrentHashMap<String, ConnectionManager> CP = new ConcurrentHashMap<String, ConnectionManager>();
    public static Connection getConnection(InetSocketAddress socketAddress) throws MyException {
        if (socketAddress == null) {
            return null;
        }
        String key = getKey(socketAddress);
        ConnectionManager connectionManager;
        connectionManager = CP.get(key);
        if (connectionManager == null) {
            synchronized (ConnectionPool.class) {
                connectionManager = CP.get(key);
                if (connectionManager == null) {
                    connectionManager = new ConnectionManager(socketAddress);
                    CP.put(key, connectionManager);
                }
            }
        }
        return connectionManager.getConnection();
    }
    public static void releaseConnection(Connection connection) throws IOException {
        if (connection == null) {
            return;
        }
        String key = getKey(connection.getInetSocketAddress());
        ConnectionManager connectionManager = CP.get(key);
        if (connectionManager != null) {
            connectionManager.releaseConnection(connection);
        } else {
            connection.closeDirectly();
        }
    }
    public static void closeConnection(Connection connection) throws IOException {
        if (connection == null) {
            return;
        }
        String key = getKey(connection.getInetSocketAddress());
        ConnectionManager connectionManager = CP.get(key);
        if (connectionManager != null) {
            connectionManager.closeConnection(connection);
            connectionManager.setActiveTestFlag();
        } else {
            connection.closeDirectly();
        }
    }
    private static String getKey(InetSocketAddress socketAddress) {
        if (socketAddress == null) {
            return null;
        }
        return String.format("%s:%s", socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
 }
相关文章
|
7月前
|
编解码 NoSQL 数据可视化
一个现代化轻量级的跨平台Redis桌面客户端
一个现代化轻量级的跨平台Redis桌面客户端
349 0
|
7月前
|
Java 调度 开发者
JDK 21中的虚拟线程:轻量级并发的新篇章
本文深入探讨了JDK 21中引入的虚拟线程(Virtual Threads)概念,分析了其背后的设计哲学,以及与传统线程模型的区别。文章还将讨论虚拟线程如何简化并发编程,提高资源利用率,并展示了一些使用虚拟线程进行开发的示例。
1094 4
|
7月前
|
存储 传感器 缓存
面向嵌入式系统的轻量级框架分析
面向嵌入式系统的轻量级框架分析
132 1
|
安全 Java 调度
Java中重量级和轻量级的区别及应用场景
Java作为一种面向对象的编程语言,在开发中经常会遇到重量级和轻量级的概念。本文将详细介绍Java中重量级和轻量级的区别,并探讨它们在实际开发中的应用场景。
604 0
|
传感器 边缘计算 监控
轻量级网络协议
MQTT(Message Queuing Telemetry Transport)是一种轻量级的、开放的消息协议,旨在实现各种设备之间的可靠、高效的实时通信。MQTT协议在物联网、远程监控、传感器网络等领域具有重要的应用价值。本文将深入探讨MQTT的背景、特点、工作原理以及在物联网、边缘计算和实时数据传输方面的重要应用,展示MQTT作为实时通信的轻量级协议。
|
自然语言处理 Java
AviatorScript轻量级高性能脚本语言
在 5.0 版本以前,它的名字是叫 Aviator ,定位一直只是一个表达式引擎,不支持 if/else 条件语句(仅有三元运算符支持 ?: ),没有内置的 for/while 循环支持(虽然你可以用 seq 库类似函数式的方式来处理集合),也没有赋值(后来在 4.0 引入),没有作用域的概念(也在 4.0 引入 lambda 函数后部分实现)等等一般语言常见的能力。在 5.0 版本后,它变成了一门脚本语言,叫:AviatorScript 。
1658 0
|
弹性计算 应用服务中间件 开发者
轻量级服务器做网站可以吗?
这个的话肯定是可以的,尽量应用服务器上面是可以用来搭建网站的,不管我们搭建个人站点还是公司企业官网这些的话都是没有问题的,完全可以满足需求,那么应该说搭建网站这些项目,或者说这些用途的话,应该说是这些云服务器,包括轻量服务器的一些最基本的,最入门的一些用途和需求吧,所以基本上来说都是可以的,没有问题。
|
机器学习/深度学习 并行计算 机器人
轻量级模型设计与部署总结
轻量级模型设计与部署总结
296 0
|
存储 安全 Linux
嵌入式应用的超轻量级、高性能的 C/C++ 日志库
嵌入式应用的超轻量级、高性能的 C/C++ 日志库
629 0
嵌入式应用的超轻量级、高性能的 C/C++ 日志库
|
算法 文件存储 计算机视觉
轻量级网络——EfficientNetV2
轻量级网络——EfficientNetV2
1255 0
轻量级网络——EfficientNetV2