From 3ad9f94a697501e84e3f57001931fe329d4cf6bb Mon Sep 17 00:00:00 2001 From: tanyawen Date: Fri, 27 Dec 2019 21:41:10 +0800 Subject: [PATCH] feature: add socket connection pool --- .../java/org/csource/common/MyException.java | 2 +- .../org/csource/fastdfs/ClientGlobal.java | 68 ++++++- .../org/csource/fastdfs/StorageServer.java | 4 +- .../org/csource/fastdfs/TrackerGroup.java | 6 +- .../org/csource/fastdfs/TrackerServer.java | 187 ++++++++++-------- .../csource/fastdfs/pool/ConnectionInfo.java | 50 +++++ .../fastdfs/pool/ConnectionManager.java | 152 ++++++++++++++ .../csource/fastdfs/pool/ConnectionPool.java | 75 +++++++ .../fastdfs/pool/PoolConnectionFactory.java | 30 +++ src/main/resources/fastdfs-client.properties | 17 ++ .../fastdfs-client.properties.sample | 10 +- src/main/resources/fdfs_client.conf.sample | 5 + 12 files changed, 517 insertions(+), 89 deletions(-) create mode 100644 src/main/java/org/csource/fastdfs/pool/ConnectionInfo.java create mode 100644 src/main/java/org/csource/fastdfs/pool/ConnectionManager.java create mode 100644 src/main/java/org/csource/fastdfs/pool/ConnectionPool.java create mode 100644 src/main/java/org/csource/fastdfs/pool/PoolConnectionFactory.java create mode 100644 src/main/resources/fastdfs-client.properties diff --git a/src/main/java/org/csource/common/MyException.java b/src/main/java/org/csource/common/MyException.java index 6349d65..67d2220 100644 --- a/src/main/java/org/csource/common/MyException.java +++ b/src/main/java/org/csource/common/MyException.java @@ -15,7 +15,7 @@ package org.csource.common; * @version Version 1.0 */ public class MyException extends Exception { - public MyException() { + public MyException(String s, Exception e) { } public MyException(String message) { diff --git a/src/main/java/org/csource/fastdfs/ClientGlobal.java b/src/main/java/org/csource/fastdfs/ClientGlobal.java index 7c8beed..b5928cf 100644 --- a/src/main/java/org/csource/fastdfs/ClientGlobal.java +++ b/src/main/java/org/csource/fastdfs/ClientGlobal.java @@ -34,7 +34,6 @@ public class ClientGlobal { public static final String CONF_KEY_HTTP_SECRET_KEY = "http.secret_key"; public static final String CONF_KEY_HTTP_TRACKER_HTTP_PORT = "http.tracker_http_port"; public static final String CONF_KEY_TRACKER_SERVER = "tracker_server"; - public static final String PROP_KEY_CONNECT_TIMEOUT_IN_SECONDS = "fastdfs.connect_timeout_in_seconds"; public static final String PROP_KEY_NETWORK_TIMEOUT_IN_SECONDS = "fastdfs.network_timeout_in_seconds"; public static final String PROP_KEY_CHARSET = "fastdfs.charset"; @@ -43,6 +42,13 @@ public class ClientGlobal { public static final String PROP_KEY_HTTP_TRACKER_HTTP_PORT = "fastdfs.http_tracker_http_port"; public static final String PROP_KEY_TRACKER_SERVERS = "fastdfs.tracker_servers"; + + public static final String PROP_KEY_CONNECTION_POOL_ENABLED = "fastdfs.connection_pool.enabled"; + public static final String PROP_KEY_CONNECTION_POOL_MAX_COUNT_PER_ENTRY = "fastdfs.connection_pool.max_count_per_entry"; + public static final String PROP_KEY_CONNECTION_POOL_MAX_IDLE_TIME = "fastdfs.connection_pool.max_idle_time"; + public static final String PROP_KEY_CONNECTION_POOL_MAX_WAIT_TIME = "fastdfs.connection_pool.max_wait_time"; + + public static final int DEFAULT_CONNECT_TIMEOUT = 5; //second public static final int DEFAULT_NETWORK_TIMEOUT = 30; //second public static final String DEFAULT_CHARSET = "UTF-8"; @@ -50,6 +56,11 @@ public class ClientGlobal { public static final String DEFAULT_HTTP_SECRET_KEY = "FastDFS1234567890"; public static final int DEFAULT_HTTP_TRACKER_HTTP_PORT = 80; + public static final boolean DEFAULT_CONNECTION_POOL_ENABLED = true; + public static final int DEFAULT_CONNECTION_POOL_MAX_COUNT_PER_ENTRY = 50; + public static final int DEFAULT_CONNECTION_POOL_MAX_IDLE_TIME = 60 ;//second + public static final int DEFAULT_CONNECTION_POOL_MAX_WAIT_TIME = 3 ;//second + public static int g_connect_timeout = DEFAULT_CONNECT_TIMEOUT * 1000; //millisecond public static int g_network_timeout = DEFAULT_NETWORK_TIMEOUT * 1000; //millisecond public static String g_charset = DEFAULT_CHARSET; @@ -57,6 +68,11 @@ public class ClientGlobal { public static String g_secret_key = DEFAULT_HTTP_SECRET_KEY; //generage token secret key public static int g_tracker_http_port = DEFAULT_HTTP_TRACKER_HTTP_PORT; + public static boolean g_connection_pool_enabled = DEFAULT_CONNECTION_POOL_ENABLED; + public static int g_connection_pool_max_count_per_entry = DEFAULT_CONNECTION_POOL_MAX_COUNT_PER_ENTRY; + public static int g_connection_pool_max_idle_time = DEFAULT_CONNECTION_POOL_MAX_IDLE_TIME * 1000; //millisecond + public static int g_connection_pool_max_wait_time = DEFAULT_CONNECTION_POOL_MAX_WAIT_TIME * 1000; //millisecond + public static TrackerGroup g_tracker_group; private ClientGlobal() { @@ -112,6 +128,18 @@ public class ClientGlobal { if (g_anti_steal_token) { g_secret_key = iniReader.getStrValue("http.secret_key"); } + g_connection_pool_enabled = iniReader.getBoolValue("connection_pool.enabled", DEFAULT_CONNECTION_POOL_ENABLED); + g_connection_pool_max_count_per_entry = iniReader.getIntValue("connection_pool.max_count_per_entry", DEFAULT_CONNECTION_POOL_MAX_COUNT_PER_ENTRY); + g_connection_pool_max_idle_time = iniReader.getIntValue("connection_pool.max_idle_time", DEFAULT_CONNECTION_POOL_MAX_IDLE_TIME); + if (g_connection_pool_max_idle_time < 0) { + g_connection_pool_max_idle_time = DEFAULT_CONNECTION_POOL_MAX_IDLE_TIME; + } + g_connection_pool_max_idle_time *= 1000; + g_connection_pool_max_wait_time = iniReader.getIntValue("connection_pool.max_wait_time", DEFAULT_CONNECTION_POOL_MAX_WAIT_TIME); + if (g_connection_pool_max_wait_time < 0) { + g_connection_pool_max_wait_time = DEFAULT_CONNECTION_POOL_MAX_WAIT_TIME; + } + g_connection_pool_max_wait_time *= 1000; } /** @@ -149,6 +177,11 @@ public class ClientGlobal { String httpAntiStealTokenConf = props.getProperty(PROP_KEY_HTTP_ANTI_STEAL_TOKEN); String httpSecretKeyConf = props.getProperty(PROP_KEY_HTTP_SECRET_KEY); String httpTrackerHttpPortConf = props.getProperty(PROP_KEY_HTTP_TRACKER_HTTP_PORT); + String poolEnabled = props.getProperty(PROP_KEY_CONNECTION_POOL_ENABLED); + String poolMaxCountPerEntry = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_COUNT_PER_ENTRY); + String poolMaxIdleTime = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_IDLE_TIME); + String poolMaxWaitTime = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_WAIT_TIME); + if (connectTimeoutInSecondsConf != null && connectTimeoutInSecondsConf.trim().length() != 0) { g_connect_timeout = Integer.parseInt(connectTimeoutInSecondsConf.trim()) * 1000; } @@ -167,6 +200,18 @@ public class ClientGlobal { if (httpTrackerHttpPortConf != null && httpTrackerHttpPortConf.trim().length() != 0) { g_tracker_http_port = Integer.parseInt(httpTrackerHttpPortConf); } + if (poolEnabled != null && poolEnabled.trim().length() != 0) { + g_connection_pool_enabled = Boolean.parseBoolean(poolEnabled); + } + if (poolMaxCountPerEntry != null && poolMaxCountPerEntry.trim().length() != 0 ) { + g_connection_pool_max_count_per_entry = Integer.parseInt(poolMaxCountPerEntry); + } + if (poolMaxIdleTime != null && poolMaxIdleTime.trim().length() != 0) { + g_connection_pool_max_idle_time = Integer.parseInt(poolMaxIdleTime) * 1000; + } + if (poolMaxWaitTime != null && poolMaxWaitTime.trim().length() != 0) { + g_connection_pool_max_wait_time = Integer.parseInt(poolMaxWaitTime) * 1000; + } } /** @@ -217,6 +262,7 @@ public class ClientGlobal { */ public static Socket getSocket(InetSocketAddress addr) throws IOException { Socket sock = new Socket(); + sock.setReuseAddress(true); sock.setSoTimeout(ClientGlobal.g_network_timeout); sock.connect(addr, ClientGlobal.g_connect_timeout); return sock; @@ -282,6 +328,22 @@ public class ClientGlobal { ClientGlobal.g_tracker_group = tracker_group; } + public static boolean isG_connection_pool_enabled() { + return g_connection_pool_enabled; + } + + public static int getG_connection_pool_max_count_per_entry() { + return g_connection_pool_max_count_per_entry; + } + + public static int getG_connection_pool_max_idle_time() { + return g_connection_pool_max_idle_time; + } + + public static int getG_connection_pool_max_wait_time() { + return g_connection_pool_max_wait_time; + } + public static String configInfo() { String trackerServers = ""; if (g_tracker_group != null) { @@ -298,6 +360,10 @@ public class ClientGlobal { + "\n g_anti_steal_token = " + g_anti_steal_token + "\n g_secret_key = " + g_secret_key + "\n g_tracker_http_port = " + g_tracker_http_port + + "\n g_connection_pool_enabled = " + g_connection_pool_enabled + + "\n g_connection_pool_max_count_per_entry = " + g_connection_pool_max_count_per_entry + + "\n g_connection_pool_max_idle_time = " + g_connection_pool_max_idle_time + + "\n g_connection_pool_max_wait_time = " + g_connection_pool_max_wait_time + "\n trackerServers = " + trackerServers + "\n}"; } diff --git a/src/main/java/org/csource/fastdfs/StorageServer.java b/src/main/java/org/csource/fastdfs/StorageServer.java index 219521e..1811c19 100644 --- a/src/main/java/org/csource/fastdfs/StorageServer.java +++ b/src/main/java/org/csource/fastdfs/StorageServer.java @@ -28,7 +28,7 @@ public class StorageServer extends TrackerServer { * @param store_path the store path index on the storage server */ public StorageServer(String ip_addr, int port, int store_path) throws IOException { - super(ClientGlobal.getSocket(ip_addr, port), new InetSocketAddress(ip_addr, port)); + super(new InetSocketAddress(ip_addr, port)); this.store_path_index = store_path; } @@ -40,7 +40,7 @@ public class StorageServer extends TrackerServer { * @param store_path the store path index on the storage server */ public StorageServer(String ip_addr, int port, byte store_path) throws IOException { - super(ClientGlobal.getSocket(ip_addr, port), new InetSocketAddress(ip_addr, port)); + super(new InetSocketAddress(ip_addr, port)); if (store_path < 0) { this.store_path_index = 256 + store_path; } else { diff --git a/src/main/java/org/csource/fastdfs/TrackerGroup.java b/src/main/java/org/csource/fastdfs/TrackerGroup.java index 6f657d5..9ba83cf 100644 --- a/src/main/java/org/csource/fastdfs/TrackerGroup.java +++ b/src/main/java/org/csource/fastdfs/TrackerGroup.java @@ -40,11 +40,7 @@ public class TrackerGroup { * @return connected tracker server, null for fail */ public TrackerServer getConnection(int serverIndex) throws IOException { - Socket sock = new Socket(); - sock.setReuseAddress(true); - sock.setSoTimeout(ClientGlobal.g_network_timeout); - sock.connect(this.tracker_servers[serverIndex], ClientGlobal.g_connect_timeout); - return new TrackerServer(sock, this.tracker_servers[serverIndex]); + return new TrackerServer(this.tracker_servers[serverIndex]); } /** diff --git a/src/main/java/org/csource/fastdfs/TrackerServer.java b/src/main/java/org/csource/fastdfs/TrackerServer.java index 09c9442..cab264a 100644 --- a/src/main/java/org/csource/fastdfs/TrackerServer.java +++ b/src/main/java/org/csource/fastdfs/TrackerServer.java @@ -8,6 +8,9 @@ package org.csource.fastdfs; +import org.csource.fastdfs.pool.ConnectionInfo; +import org.csource.fastdfs.pool.ConnectionPool; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -21,93 +24,119 @@ import java.net.Socket; * @version Version 1.11 */ public class TrackerServer { - protected Socket sock; - protected InetSocketAddress inetSockAddr; + protected Socket sock; + protected InetSocketAddress inetSockAddr; - /** - * Constructor - * - * @param sock Socket of server - * @param inetSockAddr the server info - */ - public TrackerServer(Socket sock, InetSocketAddress inetSockAddr) { - this.sock = sock; - this.inetSockAddr = inetSockAddr; - } + private Long lastAccessTime = System.currentTimeMillis(); - /** - * get the connected socket - * - * @return the socket - */ - public Socket getSocket() throws IOException { - if (this.sock == null) { - this.sock = ClientGlobal.getSocket(this.inetSockAddr); + /** + * Constructor + * + * @param sock Socket of server + * @param inetSockAddr the server info + */ + public TrackerServer(Socket sock, InetSocketAddress inetSockAddr) { + this.sock = sock; + this.inetSockAddr = inetSockAddr; } - return this.sock; - } - - /** - * get the server info - * - * @return the server info - */ - public InetSocketAddress getInetSocketAddress() { - return this.inetSockAddr; - } - - public OutputStream getOutputStream() throws IOException { - return this.sock.getOutputStream(); - } - - public InputStream getInputStream() throws IOException { - return this.sock.getInputStream(); - } - - public void close() throws IOException { - if (this.sock != null) { - try { - ProtoCommon.closeSocket(this.sock); - } finally { - this.sock = null; - } + public TrackerServer(InetSocketAddress inetSockAddr) throws IOException { + this.inetSockAddr = inetSockAddr; + this.sock = getSocket(); } - } - protected void finalize() throws Throwable { - this.close(); - } + /** + * get the connected socket + * + * @return the socket + */ + public Socket getSocket() throws IOException { + if (this.sock == null) { + if (ClientGlobal.g_connection_pool_enabled) { + ConnectionInfo connection = ConnectionPool.getConnection(this.inetSockAddr); + this.sock = connection.getSocket(); + this.lastAccessTime = connection.getLastAccessTime(); + } else { + this.sock = ClientGlobal.getSocket(this.inetSockAddr); + } + } - public boolean isConnected(){ - boolean isConnected = false; - if (sock != null) { - if (sock.isConnected()) { - isConnected = true; - } + return this.sock; } - return isConnected; - } - public boolean isAvaliable() { - if (isConnected()) { - if (sock.getPort() == 0) { - return false; - } - if (sock.getInetAddress() == null) { - return false; - } - if (sock.getRemoteSocketAddress() == null) { - return false; - } - if (sock.isInputShutdown()) { - return false; - } - if (sock.isOutputShutdown()) { - return false; - } - return true; + /** + * get the server info + * + * @return the server info + */ + public InetSocketAddress getInetSocketAddress() { + return this.inetSockAddr; + } + + public OutputStream getOutputStream() throws IOException { + return this.sock.getOutputStream(); + } + + public InputStream getInputStream() throws IOException { + return this.sock.getInputStream(); + } + + public void close() throws IOException { + //if connection enabled get from connection pool + if (ClientGlobal.g_connection_pool_enabled) { + ConnectionPool.freeConnection(this); + } else { + if (this.sock != null) { + try { + ProtoCommon.closeSocket(this.sock); + } finally { + this.sock = null; + } + } + } + } + + protected void finalize() throws Throwable { + this.close(); + } + + public boolean isConnected() { + boolean isConnected = false; + if (sock != null) { + if (sock.isConnected()) { + isConnected = true; + } + } + return isConnected; + } + + public boolean isAvaliable() { + if (isConnected()) { + if (sock.getPort() == 0) { + return false; + } + if (sock.getInetAddress() == null) { + return false; + } + if (sock.getRemoteSocketAddress() == null) { + return false; + } + if (sock.isInputShutdown()) { + return false; + } + if (sock.isOutputShutdown()) { + return false; + } + return true; + } + return false; + } + + public Long getLastAccessTime() { + return lastAccessTime; + } + + public void setLastAccessTime(Long lastAccessTime) { + this.lastAccessTime = lastAccessTime; } - return false; - } } diff --git a/src/main/java/org/csource/fastdfs/pool/ConnectionInfo.java b/src/main/java/org/csource/fastdfs/pool/ConnectionInfo.java new file mode 100644 index 0000000..45898f2 --- /dev/null +++ b/src/main/java/org/csource/fastdfs/pool/ConnectionInfo.java @@ -0,0 +1,50 @@ +package org.csource.fastdfs.pool; + +import java.net.InetSocketAddress; +import java.net.Socket; + +public class ConnectionInfo { + private Socket socket; + protected InetSocketAddress inetSockAddr; + private Long lastAccessTime; + private boolean needActiveCheck = false; + + public Socket getSocket() { + return socket; + } + + public void setSocket(Socket socket) { + this.socket = socket; + } + + public InetSocketAddress getInetSockAddr() { + return inetSockAddr; + } + + public void setInetSockAddr(InetSocketAddress inetSockAddr) { + this.inetSockAddr = inetSockAddr; + } + + public Long getLastAccessTime() { + return lastAccessTime; + } + + public void setLastAccessTime(Long lastAccessTime) { + this.lastAccessTime = lastAccessTime; + } + + public boolean isNeedActiveCheck() { + return needActiveCheck; + } + + public void setNeedActiveCheck(boolean needActiveCheck) { + this.needActiveCheck = needActiveCheck; + } + + public ConnectionInfo(Socket socket, InetSocketAddress inetSockAddr, Long lastAccessTime, boolean needActiveCheck) { + this.socket = socket; + this.inetSockAddr = inetSockAddr; + this.lastAccessTime = lastAccessTime; + this.needActiveCheck = needActiveCheck; + } +} diff --git a/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java b/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java new file mode 100644 index 0000000..3bbb55d --- /dev/null +++ b/src/main/java/org/csource/fastdfs/pool/ConnectionManager.java @@ -0,0 +1,152 @@ +package org.csource.fastdfs.pool; + +import org.csource.fastdfs.ClientGlobal; +import org.csource.fastdfs.ProtoCommon; +import org.csource.fastdfs.TrackerServer; + +import java.io.IOException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class ConnectionManager { + /** + * ip_port + */ + private String key; + + /** + * total create connection pool + */ + private AtomicInteger totalCount = new AtomicInteger(); + + /** + * freeCP + */ + private AtomicInteger freeCount = new AtomicInteger(); + + /** + * lock + */ + private ReentrantLock lock = new ReentrantLock(true); + + /** + * condition + */ + private Condition condition = lock.newCondition(); + + /** + * connection container + */ + private volatile ConcurrentLinkedQueue freeConnections = new ConcurrentLinkedQueue(); + + private ConnectionManager() { + + } + + public ConnectionManager(String key) { + this.key = key; + } + + private synchronized ConnectionInfo newConnection() throws IOException { + try { + ConnectionInfo connectionInfo = PoolConnectionFactory.create(this.key); + return connectionInfo; + + } catch (IOException e) { + throw e; + } + } + + + public synchronized ConnectionInfo getConnection() throws IOException { + lock.lock(); + try { + ConnectionInfo connectionInfo = null; + while (true) { + if (freeCount.get() > 0) { + connectionInfo = freeConnections.poll(); + if ((System.currentTimeMillis() - connectionInfo.getLastAccessTime()) > ClientGlobal.getG_connection_pool_max_idle_time()) { + closeConnection(connectionInfo); + continue; + } else { + freeCount.decrementAndGet(); + } + } else if (ClientGlobal.getG_connection_pool_max_count_per_entry() == 0 || totalCount.get() < ClientGlobal.getG_connection_pool_max_count_per_entry()) { + connectionInfo = newConnection(); + if (connectionInfo != null) { + totalCount.incrementAndGet(); + } + } else { + try { + if (condition.await(ClientGlobal.getG_connection_pool_max_wait_time(), TimeUnit.MILLISECONDS)) { + //wait single success + continue; + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + //if need check active + if (connectionInfo.isNeedActiveCheck()) { + boolean activeYes = ProtoCommon.activeTest(connectionInfo.getSocket()); + if (activeYes) { + connectionInfo.setLastAccessTime(System.currentTimeMillis()); + } else { + //close if check fail + closeConnection(connectionInfo); + continue; + } + } + return connectionInfo; + } + } catch (IOException e) { + return null; + } finally { + lock.unlock(); + } + } + + public void freeConnection(TrackerServer trackerServer) throws IOException { + if (trackerServer == null || trackerServer.getSocket() == null) { + return; + } + if ((System.currentTimeMillis() - trackerServer.getLastAccessTime()) < ClientGlobal.getG_connection_pool_max_idle_time()) { + try { + lock.lock(); + ConnectionInfo connectionInfo = new ConnectionInfo(trackerServer.getSocket(),trackerServer.getInetSocketAddress(),System.currentTimeMillis(),true); + freeConnections.add(connectionInfo); + freeCount.incrementAndGet(); + condition.signal(); + } finally { + lock.unlock(); + } + } else { + trackerServer.close(); + totalCount.decrementAndGet(); + } + } + + public void closeConnection(ConnectionInfo connectionInfo) throws IOException { + if (connectionInfo.getSocket() != null) { + totalCount.decrementAndGet(); + try { + ProtoCommon.closeSocket(connectionInfo.getSocket()); + } finally { + connectionInfo.setSocket(null); + } + } + } + + @Override + public String toString() { + return "ConnectionManager{" + + "key='" + key + '\'' + + ", totalCount=" + totalCount + + ", freeCount=" + freeCount + + ", linkedQueueCP=" + freeConnections + + '}'; + } +} diff --git a/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java b/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java new file mode 100644 index 0000000..cb4b803 --- /dev/null +++ b/src/main/java/org/csource/fastdfs/pool/ConnectionPool.java @@ -0,0 +1,75 @@ +package org.csource.fastdfs.pool; + +import org.csource.fastdfs.TrackerServer; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ConnectionPool { + /** + * key is ip:port, value is ConnectionManager + */ + private final static ConcurrentHashMap CP = new ConcurrentHashMap(); + + public static synchronized ConnectionInfo getConnection(InetSocketAddress socketAddress) throws IOException { + if (socketAddress == null) { + return null; + } + String key = getKey(socketAddress); + ConnectionManager connectionManager = CP.get(key); + if (connectionManager == null) { + connectionManager = new ConnectionManager(key); + CP.put(key, connectionManager); + } + return connectionManager.getConnection(); + } + + /** + * release connection + */ + public static void closeConnection(TrackerServer trackerServer) throws IOException { + if (trackerServer == null || trackerServer.getInetSocketAddress() == null) { + return; + } + String key = getKey(trackerServer.getInetSocketAddress()); + if (key != null) { + ConnectionManager connectionManager = CP.get(key); + connectionManager.closeConnection(new ConnectionInfo(trackerServer.getSocket(), trackerServer.getInetSocketAddress(),trackerServer.getLastAccessTime(),true)); + } else { + trackerServer.close(); + } + } + + public static void freeConnection(TrackerServer trackerServer) throws IOException { + if (trackerServer == null || trackerServer.getInetSocketAddress() == null) { + return; + } + String key = getKey(trackerServer.getInetSocketAddress()); + if (key != null) { + ConnectionManager connectionManager = CP.get(key); + connectionManager.freeConnection(trackerServer); + } else { + trackerServer.close(); + } + } + + private static String getKey(InetSocketAddress socketAddress) { + if (socketAddress == null) { + return null; + } + return String.format("%s:%s", socketAddress.getHostName(), socketAddress.getPort()); + } + @Override + public String toString() { + if (!CP.isEmpty()) { + StringBuilder builder = new StringBuilder(); + for (Map.Entry managerEntry : CP.entrySet()) { + builder.append("key:" + managerEntry.getKey() + " -------- entry:" + managerEntry.getValue() + "\n"); + } + return builder.toString(); + } + return null; + } +} diff --git a/src/main/java/org/csource/fastdfs/pool/PoolConnectionFactory.java b/src/main/java/org/csource/fastdfs/pool/PoolConnectionFactory.java new file mode 100644 index 0000000..545af98 --- /dev/null +++ b/src/main/java/org/csource/fastdfs/pool/PoolConnectionFactory.java @@ -0,0 +1,30 @@ +package org.csource.fastdfs.pool; + +import org.csource.fastdfs.ClientGlobal; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; + +public class PoolConnectionFactory { + public static ConnectionInfo create(String key) throws IOException { + if (key == null) { + System.err.printf("ip:port entry conn't be null"); + return null; + } + String[] keyPortString = key.split(":"); + if (keyPortString.length != 2) { + System.err.printf("ip:port entry is invalid! key:{}", keyPortString); + return null; + } + String ip = keyPortString[0]; + Integer port = Integer.parseInt(keyPortString[1]); + Socket sock = new Socket(); + sock.setReuseAddress(true); + sock.setSoTimeout(ClientGlobal.g_network_timeout); + InetSocketAddress inetSocketAddress = new InetSocketAddress(ip, port); + sock.connect(inetSocketAddress, ClientGlobal.g_connect_timeout); + return new ConnectionInfo(sock, inetSocketAddress, System.currentTimeMillis(), false); + + } +} diff --git a/src/main/resources/fastdfs-client.properties b/src/main/resources/fastdfs-client.properties new file mode 100644 index 0000000..40af15a --- /dev/null +++ b/src/main/resources/fastdfs-client.properties @@ -0,0 +1,17 @@ +## fastdfs-client.properties + +fastdfs.connect_timeout_in_seconds = 5 +fastdfs.network_timeout_in_seconds = 30 + +fastdfs.charset = UTF-8 + +fastdfs.http_anti_steal_token = false +fastdfs.http_secret_key = FastDFS1234567890 +fastdfs.http_tracker_http_port = 80 + +fastdfs.tracker_servers = 10.0.11.201:22122,10.0.11.202:22122,10.0.11.203:22122 + +fastdfs.connection_pool.enabled = false +fastdfs.connection_pool.max_count_per_entry = 50 +fastdfs.connection_pool.max_idle_time = 60 +fastdfs.connection_pool.max_wait_time = 5 \ No newline at end of file diff --git a/src/main/resources/fastdfs-client.properties.sample b/src/main/resources/fastdfs-client.properties.sample index 5031a45..00e7deb 100644 --- a/src/main/resources/fastdfs-client.properties.sample +++ b/src/main/resources/fastdfs-client.properties.sample @@ -9,5 +9,13 @@ fastdfs.http_anti_steal_token = false fastdfs.http_secret_key = FastDFS1234567890 fastdfs.http_tracker_http_port = 80 -fastdfs.tracker_servers = 10.0.11.201:22122,10.0.11.202:22122,10.0.11.203:22122 +fastdfs.tracker_servers = 185.245.40.70:22122 +## Whether to open the connection pool, if not, create a new connection every time +fastdfs.connection_pool.enabled = false + +## max_count_per_entry: max connection count per host:port , 0 is not limit +fastdfs.connection_pool.max_count_per_entry = 2 + +fastdfs.connection_pool.max_idle_time = 60 +fastdfs.connection_pool.max_wait_time = 5 \ No newline at end of file diff --git a/src/main/resources/fdfs_client.conf.sample b/src/main/resources/fdfs_client.conf.sample index df531db..0ed36ba 100644 --- a/src/main/resources/fdfs_client.conf.sample +++ b/src/main/resources/fdfs_client.conf.sample @@ -7,3 +7,8 @@ http.secret_key = FastDFS1234567890 tracker_server = 10.0.11.243:22122 tracker_server = 10.0.11.244:22122 + +connection_pool.enabled = false +connection_pool.max_count_per_entry = 100 +connection_pool.max_idle_time = 60 +connection_pool.max_wait_time = 5 \ No newline at end of file