From 98f55774cf90197c32e6f484dff844daffc76b7d Mon Sep 17 00:00:00 2001 From: tanyawen Date: Sun, 29 Dec 2019 20:53:37 +0800 Subject: [PATCH] fix:some function rename --- .../org/csource/fastdfs/StorageClient.java | 54 +++++++++---------- .../org/csource/fastdfs/TrackerClient.java | 25 +++++---- .../org/csource/fastdfs/TrackerServer.java | 29 ---------- .../org/csource/fastdfs/pool/Connection.java | 22 +++++++- .../fastdfs/pool/ConnectionFactory.java | 21 -------- .../fastdfs/pool/ConnectionManager.java | 54 ++++++++----------- .../csource/fastdfs/pool/ConnectionPool.java | 8 +-- 7 files changed, 84 insertions(+), 129 deletions(-) diff --git a/src/main/java/org/csource/fastdfs/StorageClient.java b/src/main/java/org/csource/fastdfs/StorageClient.java index 9b966b6..89f2798 100644 --- a/src/main/java/org/csource/fastdfs/StorageClient.java +++ b/src/main/java/org/csource/fastdfs/StorageClient.java @@ -12,10 +12,8 @@ import org.csource.common.Base64; import org.csource.common.MyException; import org.csource.common.NameValuePair; import org.csource.fastdfs.pool.Connection; -import org.omg.CORBA.CODESET_INCOMPATIBLE; import java.io.*; -import java.net.Socket; import java.util.Arrays; /** @@ -678,7 +676,7 @@ public class StorageClient { return results; } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -686,7 +684,7 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } @@ -841,7 +839,7 @@ public class StorageClient { return results; } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -849,7 +847,7 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } @@ -920,7 +918,7 @@ public class StorageClient { return 0; } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -928,14 +926,14 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } - private void doFinally(Connection connection, boolean bNewStorageServer) { + private void releaseConnection(Connection connection, boolean bNewStorageServer) { try { if (connection != null) { - this.storageServer.releaseConnection(connection); + connection.release(); } } catch (IOException ex1) { ex1.printStackTrace(); @@ -1017,7 +1015,7 @@ public class StorageClient { return 0; } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -1026,7 +1024,7 @@ public class StorageClient { throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } @@ -1050,7 +1048,7 @@ public class StorageClient { return pkgInfo.errno; } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -1058,7 +1056,7 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } @@ -1131,7 +1129,7 @@ public class StorageClient { return pkgInfo.errno; } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -1139,7 +1137,7 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } @@ -1185,7 +1183,7 @@ public class StorageClient { return pkgInfo.body; } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -1193,7 +1191,7 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } @@ -1274,7 +1272,7 @@ public class StorageClient { } } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -1282,7 +1280,7 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } @@ -1353,7 +1351,7 @@ public class StorageClient { return 0; } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -1361,7 +1359,7 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } @@ -1390,7 +1388,7 @@ public class StorageClient { return ProtoCommon.split_metadata(new String(pkgInfo.body, ClientGlobal.g_charset)); } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -1398,7 +1396,7 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } @@ -1477,7 +1475,7 @@ public class StorageClient { return pkgInfo.errno; } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -1485,7 +1483,7 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } @@ -1596,7 +1594,7 @@ public class StorageClient { create_timestamp, crc32, source_ip_addr); } catch (IOException ex) { try { - this.storageServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -1604,7 +1602,7 @@ public class StorageClient { } throw ex; } finally { - doFinally(connection, bNewStorageServer); + releaseConnection(connection, bNewStorageServer); } } diff --git a/src/main/java/org/csource/fastdfs/TrackerClient.java b/src/main/java/org/csource/fastdfs/TrackerClient.java index ab1917f..22c4bba 100644 --- a/src/main/java/org/csource/fastdfs/TrackerClient.java +++ b/src/main/java/org/csource/fastdfs/TrackerClient.java @@ -13,7 +13,6 @@ import org.csource.fastdfs.pool.Connection; import java.io.IOException; import java.io.OutputStream; -import java.net.Socket; import java.util.Arrays; /** @@ -140,7 +139,7 @@ public class TrackerClient { return new StorageServer(ip_addr, port, store_path); } catch (IOException ex) { try { - trackerServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -150,7 +149,7 @@ public class TrackerClient { } finally { if (connection != null) { try { - trackerServer.releaseConnection(connection); + connection.release(); } catch (IOException ex1) { ex1.printStackTrace(); } @@ -255,7 +254,7 @@ public class TrackerClient { return results; } catch (IOException ex) { try { - trackerServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -265,7 +264,7 @@ public class TrackerClient { } finally { if (connection != null) { try { - trackerServer.releaseConnection(connection); + connection.release(); } catch (IOException ex1) { ex1.printStackTrace(); } @@ -408,7 +407,7 @@ public class TrackerClient { return servers; } catch (IOException ex) { try { - trackerServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -419,7 +418,7 @@ public class TrackerClient { } finally { if (connection != null) { try { - trackerServer.releaseConnection(connection); + connection.release(); } catch (IOException ex1) { ex1.printStackTrace(); } @@ -501,7 +500,7 @@ public class TrackerClient { return decoder.decode(pkgInfo.body, StructGroupStat.class, StructGroupStat.getFieldsTotalSize()); } catch (IOException ex) { try { - trackerServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } finally { @@ -515,7 +514,7 @@ public class TrackerClient { } finally { if (connection != null) { try { - trackerServer.releaseConnection(connection); + connection.release(); } catch (IOException ex1) { ex1.printStackTrace(); } @@ -606,7 +605,7 @@ public class TrackerClient { return decoder.decode(pkgInfo.body, StructStorageStat.class, StructStorageStat.getFieldsTotalSize()); } catch (IOException ex) { try { - trackerServer.close(connection); + connection.close(); } catch (IOException ex1) { ex1.printStackTrace(); } @@ -619,7 +618,7 @@ public class TrackerClient { } finally { if (connection != null) { try { - trackerServer.releaseConnection(connection); + connection.release(); } catch (IOException ex1) { ex1.printStackTrace(); } @@ -679,14 +678,14 @@ public class TrackerClient { return pkgInfo.errno == 0; } catch (IOException e) { try { - trackerServer.close(connection); + connection.close(); } finally { connection = null; } throw e; } finally { if (connection != null) { - trackerServer.releaseConnection(connection); + connection.release(); } } } diff --git a/src/main/java/org/csource/fastdfs/TrackerServer.java b/src/main/java/org/csource/fastdfs/TrackerServer.java index bcb5413..a9875ed 100644 --- a/src/main/java/org/csource/fastdfs/TrackerServer.java +++ b/src/main/java/org/csource/fastdfs/TrackerServer.java @@ -47,34 +47,5 @@ public class TrackerServer { public InetSocketAddress getInetSocketAddress() { return this.inetSockAddr; } - public void close(Connection connection) throws IOException { - if (connection == null) { - return; - } - //if connection enabled get from connection pool - if (ClientGlobal.g_connection_pool_enabled) { - ConnectionPool.closeConnection(connection); - } else { - connection.close(); - } - } - - /** - * releaseConnection connection - * @param connection - * @throws IOException - */ - public void releaseConnection(Connection connection) throws IOException { - if (connection == null) { - return; - } - if (ClientGlobal.g_connection_pool_enabled) { - ConnectionPool.releaseConnection(connection); - } else { - connection.close(); - } - } - - } diff --git a/src/main/java/org/csource/fastdfs/pool/Connection.java b/src/main/java/org/csource/fastdfs/pool/Connection.java index cecbeae..ea9324e 100644 --- a/src/main/java/org/csource/fastdfs/pool/Connection.java +++ b/src/main/java/org/csource/fastdfs/pool/Connection.java @@ -1,5 +1,6 @@ package org.csource.fastdfs.pool; +import org.csource.fastdfs.ClientGlobal; import org.csource.fastdfs.ProtoCommon; import java.io.IOException; @@ -45,11 +46,30 @@ public class Connection { } /** - * close direct if not create from pool or not open pool * * @throws IOException */ public void close() throws IOException { + //if connection enabled get from connection pool + if (ClientGlobal.g_connection_pool_enabled) { + ConnectionPool.closeConnection(this); + } else { + this.closeDirectly(); + } + } + + public void release() throws IOException { + if (ClientGlobal.g_connection_pool_enabled) { + ConnectionPool.releaseConnection(this); + } else { + this.closeDirectly(); + } + } + + /** + * force close socket, + */ + public void closeDirectly() throws IOException { if (this.sock != null) { try { ProtoCommon.closeSocket(this.sock); diff --git a/src/main/java/org/csource/fastdfs/pool/ConnectionFactory.java b/src/main/java/org/csource/fastdfs/pool/ConnectionFactory.java index 8c64acc..3b79131 100644 --- a/src/main/java/org/csource/fastdfs/pool/ConnectionFactory.java +++ b/src/main/java/org/csource/fastdfs/pool/ConnectionFactory.java @@ -6,27 +6,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; public class ConnectionFactory { - public static Connection create(String key) throws IOException { - if (key == null) { - System.err.printf("ip:port entry conn't be null"); - return null; - } - String[] keyPortString = key.split(":"); - if (keyPortString.length != 2) { - System.err.printf("ip:port entry is invalid! key:{}", keyPortString); - return null; - } - String ip = keyPortString[0]; - Integer port = Integer.parseInt(keyPortString[1]); - Socket sock = new Socket(); - sock.setReuseAddress(true); - sock.setSoTimeout(ClientGlobal.g_network_timeout); - InetSocketAddress inetSocketAddress = new InetSocketAddress(ip, port); - sock.connect(inetSocketAddress, ClientGlobal.g_connect_timeout); - return new Connection(sock, inetSocketAddress); - - } - /** * create from InetSocketAddress * @param socketAddress diff --git a/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java b/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java index 5922ca6..26fd37b 100644 --- a/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java +++ b/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java @@ -2,7 +2,9 @@ package org.csource.fastdfs.pool; import org.csource.common.MyException; import org.csource.fastdfs.ClientGlobal; + import java.io.IOException; +import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -10,10 +12,8 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ConnectionManager { - /** - * ip:port is key - */ - private String key; + + private InetSocketAddress inetSocketAddress; /** * total create connection pool @@ -41,21 +41,11 @@ public class ConnectionManager { } - public ConnectionManager(String key) { - this.key = key; + public ConnectionManager(InetSocketAddress socketAddress) { + this.inetSocketAddress = socketAddress; } - private Connection newConnection() throws IOException { - try { - Connection connection = ConnectionFactory.create(this.key); - return connection; - } catch (IOException e) { - throw e; - } - } - - - public Connection getConnection() throws MyException, IOException { + public Connection getConnection() throws MyException, IOException { lock.lock(); try { Connection connection = null; @@ -68,7 +58,7 @@ public class ConnectionManager { continue; } } else if (ClientGlobal.getG_connection_pool_max_count_per_entry() == 0 || totalCount.get() < ClientGlobal.getG_connection_pool_max_count_per_entry()) { - connection = newConnection(); + connection = ConnectionFactory.create(this.inetSocketAddress); if (connection != null) { totalCount.incrementAndGet(); } @@ -94,21 +84,18 @@ public class ConnectionManager { } } - public void releaseConnection(Connection connection) throws IOException { + public void releaseConnection(Connection connection) { if (connection == null) { return; } - if ((System.currentTimeMillis() - connection.getLastAccessTime()) < ClientGlobal.g_connection_pool_max_idle_time) { - try { - lock.lock(); - freeConnections.add(connection); - freeCount.incrementAndGet(); - condition.signal(); - } finally { - lock.unlock(); - } - } else { - closeConnection(connection); + lock.lock(); + try { + connection.setLastAccessTime(System.currentTimeMillis()); + freeConnections.add(connection); + freeCount.incrementAndGet(); + condition.signal(); + } finally { + lock.unlock(); } } @@ -117,21 +104,22 @@ public class ConnectionManager { try { if (connection != null) { totalCount.decrementAndGet(); - connection.close(); + connection.closeDirectly(); } } catch (IOException e) { System.err.println("close socket error , msg:" + e.getMessage()); e.printStackTrace(); + throw e; } } @Override public String toString() { return "ConnectionManager{" + - "key='" + key + '\'' + + "ip:port='" + inetSocketAddress.getHostName() + ":" + inetSocketAddress.getPort() + ", totalCount=" + totalCount + ", freeCount=" + freeCount + - ", linkedQueueCP=" + freeConnections + + ", freeConnections =" + freeConnections + '}'; } } diff --git a/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java b/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java index b192137..2748623 100644 --- a/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java +++ b/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java @@ -22,7 +22,7 @@ public class ConnectionPool { synchronized (ConnectionPool.class) { connectionManager = CP.get(key); if (connectionManager == null) { - connectionManager = new ConnectionManager(key); + connectionManager = new ConnectionManager(socketAddress); CP.put(key, connectionManager); } } @@ -39,7 +39,7 @@ public class ConnectionPool { connectionManager.releaseConnection(connection); } else { try { - connection.close(); + connection.closeDirectly(); } catch (IOException e) { System.err.println("close socket error, msg:" + e.getMessage()); e.printStackTrace(); @@ -57,7 +57,7 @@ public class ConnectionPool { if (connectionManager != null) { connectionManager.closeConnection(connection); } else { - connection.close(); + connection.closeDirectly(); } } @@ -73,7 +73,7 @@ public class ConnectionPool { if (!CP.isEmpty()) { StringBuilder builder = new StringBuilder(); for (Map.Entry managerEntry : CP.entrySet()) { - builder.append("key:" + managerEntry.getKey() + " -------- entry:" + managerEntry.getValue() + "\n"); + builder.append("key:[" + managerEntry.getKey() + " ]-------- entry:" + managerEntry.getValue() + "\n"); } return builder.toString(); }