From f742f3c6aa979ba2f8d1b7aae8c8d33893e501a3 Mon Sep 17 00:00:00 2001 From: tanyawen Date: Tue, 31 Dec 2019 15:28:43 +0800 Subject: [PATCH] feature:add active test flag --- .../org/csource/fastdfs/pool/Connection.java | 14 ++++++++++- .../fastdfs/pool/ConnectionManager.java | 24 +++++++++++++++++++ .../csource/fastdfs/pool/ConnectionPool.java | 1 + 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/csource/fastdfs/pool/Connection.java b/src/main/java/org/csource/fastdfs/pool/Connection.java index ea9324e..695b4ea 100644 --- a/src/main/java/org/csource/fastdfs/pool/Connection.java +++ b/src/main/java/org/csource/fastdfs/pool/Connection.java @@ -15,6 +15,8 @@ public class Connection { private InetSocketAddress inetSockAddr; private Long lastAccessTime = System.currentTimeMillis(); + private boolean needActiveTest = false; + public Connection(Socket sock, InetSocketAddress inetSockAddr) { this.sock = sock; this.inetSockAddr = inetSockAddr; @@ -117,11 +119,21 @@ public class Connection { return false; } + public boolean isNeedActiveTest() { + return needActiveTest; + } + + public void setNeedActiveTest(boolean needActiveTest) { + this.needActiveTest = needActiveTest; + } + @Override public String toString() { - return "TrackerServer{" + + return "Connection{" + "sock=" + sock + ", inetSockAddr=" + inetSockAddr + + ", lastAccessTime=" + lastAccessTime + + ", needActiveTest=" + needActiveTest + '}'; } } diff --git a/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java b/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java index 548d2ee..3fc3c85 100644 --- a/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java +++ b/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java @@ -57,6 +57,16 @@ public class ConnectionManager { closeConnection(connection); continue; } + if (connection.isNeedActiveTest()) { + boolean isActive = false; + try { + isActive = connection.activeTest(); + } catch (IOException e) { + System.err.println("send to server[" + inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort() + "] active test error ,emsg:" + e.getMessage()); + isActive = false; + } + if (!isActive) continue; + } } else if (ClientGlobal.g_connection_pool_max_count_per_entry == 0 || totalCount.get() < ClientGlobal.g_connection_pool_max_count_per_entry) { connection = ConnectionFactory.create(this.inetSocketAddress); totalCount.incrementAndGet(); @@ -107,6 +117,20 @@ public class ConnectionManager { } } + public void setActiveTestFlag() { + if (freeCount.get() > 0) { + lock.lock(); + try { + for (Connection freeConnection : freeConnections) { + freeConnection.setNeedActiveTest(true); + } + } finally { + lock.unlock(); + } + } + } + + @Override public String toString() { return "ConnectionManager{" + diff --git a/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java b/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java index f5ac1ec..ec3820d 100644 --- a/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java +++ b/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java @@ -54,6 +54,7 @@ public class ConnectionPool { ConnectionManager connectionManager = CP.get(key); if (connectionManager != null) { connectionManager.closeConnection(connection); + connectionManager.setActiveTestFlag(); } else { connection.closeDirectly(); }