ZhouXY108 2023-02-01 14:24:51 +08:00
commit 6c396d2893
12 changed files with 167 additions and 126 deletions

2
.gitignore vendored
View File

@ -12,8 +12,8 @@ target
*.iws *.iws
*.log *.log
.idea .idea
*.conf *.conf
*.PNG *.PNG
.vscode/ .vscode/
*.class

View File

@ -1,3 +1,8 @@
Version 1.30 2023-01-29
* support tracker server fail over
If the tracker server is not specified, when the tracker server fails to
get the connection, it will try to get the connection from other tracker servers.
The maximum number of attempts is the number of tracker servers minus 1
Version 1.29 2020-01-03 Version 1.29 2020-01-03
* support active test for connection pool. * support active test for connection pool.

View File

@ -26,7 +26,7 @@ mvn install:install-file -DgroupId=org.csource -DartifactId=fastdfs-client-java
<dependency> <dependency>
<groupId>org.csource</groupId> <groupId>org.csource</groupId>
<artifactId>fastdfs-client-java</artifactId> <artifactId>fastdfs-client-java</artifactId>
<version>1.29-SNAPSHOT</version> <version>1.30-SNAPSHOT</version>
</dependency> </dependency>
``` ```

View File

@ -4,7 +4,7 @@
<groupId>org.csource</groupId> <groupId>org.csource</groupId>
<artifactId>fastdfs-client-java</artifactId> <artifactId>fastdfs-client-java</artifactId>
<version>1.29-SNAPSHOT</version> <version>1.30-SNAPSHOT</version>
<name>fastdfs-client-java</name> <name>fastdfs-client-java</name>
<description>fastdfs client for java</description> <description>fastdfs client for java</description>
<packaging>jar</packaging> <packaging>jar</packaging>

View File

@ -155,13 +155,13 @@ public class IniFileReader {
} finally { } finally {
try { try {
if (in != null) in.close(); if (in != null) in.close();
//System.out.println("loadFrom...finally...in.close(); done");
} catch (Exception ex) { } catch (Exception ex) {
ex.printStackTrace(); ex.printStackTrace();
} }
} }
} }
@SuppressWarnings("unchecked")
private void readToParamTable(InputStream in) throws IOException { private void readToParamTable(InputStream in) throws IOException {
this.paramTable = new Hashtable(); this.paramTable = new Hashtable();
if (in == null) return; if (in == null) return;
@ -206,7 +206,6 @@ public class IniFileReader {
try { try {
if (bufferedReader != null) bufferedReader.close(); if (bufferedReader != null) bufferedReader.close();
if (inReader != null) inReader.close(); if (inReader != null) inReader.close();
//System.out.println("readToParamTable...finally...bufferedReader.close();inReader.close(); done");
} catch (Exception ex) { } catch (Exception ex) {
ex.printStackTrace(); ex.printStackTrace();
} }

View File

@ -48,7 +48,6 @@ public class ClientGlobal {
public static final String PROP_KEY_CONNECTION_POOL_MAX_IDLE_TIME = "fastdfs.connection_pool.max_idle_time"; public static final String PROP_KEY_CONNECTION_POOL_MAX_IDLE_TIME = "fastdfs.connection_pool.max_idle_time";
public static final String PROP_KEY_CONNECTION_POOL_MAX_WAIT_TIME_IN_MS = "fastdfs.connection_pool.max_wait_time_in_ms"; public static final String PROP_KEY_CONNECTION_POOL_MAX_WAIT_TIME_IN_MS = "fastdfs.connection_pool.max_wait_time_in_ms";
public static final int DEFAULT_CONNECT_TIMEOUT = 5; //second public static final int DEFAULT_CONNECT_TIMEOUT = 5; //second
public static final int DEFAULT_NETWORK_TIMEOUT = 30; //second public static final int DEFAULT_NETWORK_TIMEOUT = 30; //second
public static final String DEFAULT_CHARSET = "UTF-8"; public static final String DEFAULT_CHARSET = "UTF-8";
@ -180,7 +179,6 @@ public class ClientGlobal {
String poolMaxCountPerEntry = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_COUNT_PER_ENTRY); String poolMaxCountPerEntry = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_COUNT_PER_ENTRY);
String poolMaxIdleTime = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_IDLE_TIME); String poolMaxIdleTime = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_IDLE_TIME);
String poolMaxWaitTimeInMS = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_WAIT_TIME_IN_MS); String poolMaxWaitTimeInMS = props.getProperty(PROP_KEY_CONNECTION_POOL_MAX_WAIT_TIME_IN_MS);
if (connectTimeoutInSecondsConf != null && connectTimeoutInSecondsConf.trim().length() != 0) { if (connectTimeoutInSecondsConf != null && connectTimeoutInSecondsConf.trim().length() != 0) {
g_connect_timeout = Integer.parseInt(connectTimeoutInSecondsConf.trim()) * 1000; g_connect_timeout = Integer.parseInt(connectTimeoutInSecondsConf.trim()) * 1000;
} }
@ -221,7 +219,7 @@ public class ClientGlobal {
* server',' * server','
*/ */
public static void initByTrackers(String trackerServers) throws IOException, MyException { public static void initByTrackers(String trackerServers) throws IOException, MyException {
List<InetSocketAddress> list = new ArrayList(); List<InetSocketAddress> list = new ArrayList<InetSocketAddress>();
String spr1 = ","; String spr1 = ",";
String spr2 = ":"; String spr2 = ":";
String[] arr1 = trackerServers.trim().split(spr1); String[] arr1 = trackerServers.trim().split(spr1);

View File

@ -27,6 +27,7 @@ public class ProtoStructDecoder<T extends StructBase> {
/** /**
* decode byte buffer * decode byte buffer
*/ */
@SuppressWarnings("unchecked")
public T[] decode(byte[] bs, Class<T> clazz, int fieldsTotalSize) throws Exception { public T[] decode(byte[] bs, Class<T> clazz, int fieldsTotalSize) throws Exception {
if (bs.length % fieldsTotalSize != 0) { if (bs.length % fieldsTotalSize != 0) {
throw new IOException("byte array length: " + bs.length + " is invalid!"); throw new IOException("byte array length: " + bs.length + " is invalid!");

View File

@ -731,7 +731,6 @@ public class StorageClient {
try { try {
connection = this.storageServer.getConnection(); connection = this.storageServer.getConnection();
ext_name_bs = new byte[ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN]; ext_name_bs = new byte[ProtoCommon.FDFS_FILE_EXT_NAME_MAX_LEN];
Arrays.fill(ext_name_bs, (byte) 0); Arrays.fill(ext_name_bs, (byte) 0);
if (file_ext_name != null && file_ext_name.length() > 0) { if (file_ext_name != null && file_ext_name.length() > 0) {

View File

@ -71,6 +71,71 @@ public class TrackerClient {
return this.getStoreStorage(trackerServer, groupName); return this.getStoreStorage(trackerServer, groupName);
} }
public Connection getConnection(TrackerServer trackerServer) throws IOException, MyException {
Connection connection = null;
int length = this.tracker_group.tracker_servers.length;
boolean failOver = length > 1 && trackerServer == null;
try {
if (trackerServer == null) {
trackerServer = getTrackerServer();
if (trackerServer == null) {
throw new MyException("tracker server is empty!");
}
}
connection = trackerServer.getConnection();
} catch (IOException e) {
if (failOver) {
System.err.println("trackerServer get connection error, emsg:" + e.getMessage());
} else {
throw e;
}
} catch (MyException e) {
if (failOver) {
System.err.println("trackerServer get connection error, emsg:" + e.getMessage());
} else {
throw e;
}
}
if (connection != null || !failOver) {
return connection;
}
//do fail over
int currentIndex = 0;
if (trackerServer != null) {
currentIndex = trackerServer.getIndex();
}
int failOverCount = 0;
while (failOverCount < length - 1) {
failOverCount++;
currentIndex++;
if (currentIndex >= length) {
currentIndex = 0;
}
try {
trackerServer = this.tracker_group.getTrackerServer(currentIndex);
if (trackerServer == null) {
throw new MyException("tracker server is empty!");
}
return trackerServer.getConnection();
} catch (IOException e) {
System.err.println("fail over trackerServer get connection error, failOverCount:" + failOverCount + "," + e.getMessage());
if (failOverCount == length - 1) {
throw e;
}
} catch (MyException e) {
System.err.println("fail over trackerServer get connection error, failOverCount:" + failOverCount + ", " + e.getMessage());
if (failOverCount == length - 1) {
throw e;
}
}
}
return null;
}
/** /**
* query storage server to upload file * query storage server to upload file
* *
@ -85,12 +150,7 @@ public class TrackerClient {
byte cmd; byte cmd;
int out_len; int out_len;
byte store_path; byte store_path;
Connection connection; Connection connection = getConnection(trackerServer);
if (trackerServer == null) {
trackerServer = getTrackerServer();
}
connection = trackerServer.getConnection();
OutputStream out = connection.getOutputStream(); OutputStream out = connection.getOutputStream();
try { try {
@ -170,16 +230,7 @@ public class TrackerClient {
int port; int port;
byte cmd; byte cmd;
int out_len; int out_len;
Connection connection; Connection connection = getConnection(trackerServer);
if (trackerServer == null) {
trackerServer = getTrackerServer();
if (trackerServer == null) {
return null;
}
}
connection = trackerServer.getConnection();
OutputStream out = connection.getOutputStream(); OutputStream out = connection.getOutputStream();
try { try {
@ -343,15 +394,7 @@ public class TrackerClient {
int len; int len;
String ip_addr; String ip_addr;
int port; int port;
Connection connection; Connection connection = getConnection(trackerServer);
if (trackerServer == null) {
trackerServer = getTrackerServer();
if (trackerServer == null) {
return null;
}
}
connection = trackerServer.getConnection();
OutputStream out = connection.getOutputStream(); OutputStream out = connection.getOutputStream();
try { try {
@ -473,16 +516,7 @@ public class TrackerClient {
byte cmd; byte cmd;
int out_len; int out_len;
byte store_path; byte store_path;
Connection connection; Connection connection = getConnection(trackerServer);
if (trackerServer == null) {
trackerServer = getTrackerServer();
if (trackerServer == null) {
return null;
}
}
connection = trackerServer.getConnection();
OutputStream out = connection.getOutputStream(); OutputStream out = connection.getOutputStream();
try { try {
@ -548,15 +582,7 @@ public class TrackerClient {
byte[] bGroupName; byte[] bGroupName;
byte[] bs; byte[] bs;
int len; int len;
Connection connection; Connection connection = getConnection(trackerServer);
if (trackerServer == null) {
trackerServer = getTrackerServer();
if (trackerServer == null) {
return null;
}
}
connection = trackerServer.getConnection();
OutputStream out = connection.getOutputStream(); OutputStream out = connection.getOutputStream();
try { try {

View File

@ -18,84 +18,83 @@ import java.net.InetSocketAddress;
* @version Version 1.17 * @version Version 1.17
*/ */
public class TrackerGroup { public class TrackerGroup {
public int tracker_server_index; public int tracker_server_index;
public InetSocketAddress[] tracker_servers; public InetSocketAddress[] tracker_servers;
protected Integer lock; protected Integer lock;
/** /**
* Constructor * Constructor
* *
* @param tracker_servers tracker servers * @param tracker_servers tracker servers
*/ */
public TrackerGroup(InetSocketAddress[] tracker_servers) { public TrackerGroup(InetSocketAddress[] tracker_servers) {
this.tracker_servers = tracker_servers; this.tracker_servers = tracker_servers;
this.lock = new Integer(0); this.lock = new Integer(0);
this.tracker_server_index = 0;
}
/**
* return connected tracker server
*
* @return connected tracker server, null for fail
*/
public TrackerServer getTrackerServer(int serverIndex) throws IOException {
return new TrackerServer(this.tracker_servers[serverIndex]);
}
/**
* return connected tracker server
*
* @return connected tracker server, null for fail
*/
public TrackerServer getTrackerServer() throws IOException {
int current_index;
synchronized (this.lock) {
this.tracker_server_index++;
if (this.tracker_server_index >= this.tracker_servers.length) {
this.tracker_server_index = 0; this.tracker_server_index = 0;
}
current_index = this.tracker_server_index;
} }
try { /**
return this.getTrackerServer(current_index); * return connected tracker server
} catch (IOException ex) { *
System.err.println("connect to server " + this.tracker_servers[current_index].getAddress().getHostAddress() + ":" + this.tracker_servers[current_index].getPort() + " fail"); * @return connected tracker server, null for fail
ex.printStackTrace(System.err); */
public TrackerServer getTrackerServer(int serverIndex) throws IOException {
return new TrackerServer(this.tracker_servers[serverIndex], serverIndex);
} }
/**
for (int i = 0; i < this.tracker_servers.length; i++) { * return connected tracker server
if (i == current_index) { *
continue; * @return connected tracker server, null for fail
} */
public TrackerServer getTrackerServer() throws IOException {
try { int current_index;
TrackerServer trackerServer = this.getTrackerServer(i);
synchronized (this.lock) { synchronized (this.lock) {
if (this.tracker_server_index == current_index) { this.tracker_server_index++;
this.tracker_server_index = i; if (this.tracker_server_index >= this.tracker_servers.length) {
} this.tracker_server_index = 0;
}
current_index = this.tracker_server_index;
} }
return trackerServer; try {
} catch (IOException ex) { return this.getTrackerServer(current_index);
System.err.println("connect to server " + this.tracker_servers[i].getAddress().getHostAddress() + ":" + this.tracker_servers[i].getPort() + " fail"); } catch (IOException ex) {
ex.printStackTrace(System.err); System.err.println("connect to server " + this.tracker_servers[current_index].getAddress().getHostAddress() + ":" + this.tracker_servers[current_index].getPort() + " fail");
} ex.printStackTrace(System.err);
}
for (int i = 0; i < this.tracker_servers.length; i++) {
if (i == current_index) {
continue;
}
try {
TrackerServer trackerServer = this.getTrackerServer(i);
synchronized (this.lock) {
if (this.tracker_server_index == current_index) {
this.tracker_server_index = i;
}
}
return trackerServer;
} catch (IOException ex) {
System.err.println("connect to server " + this.tracker_servers[i].getAddress().getHostAddress() + ":" + this.tracker_servers[i].getPort() + " fail");
ex.printStackTrace(System.err);
}
}
return null;
} }
return null; public Object clone() {
} InetSocketAddress[] trackerServers = new InetSocketAddress[this.tracker_servers.length];
for (int i = 0; i < trackerServers.length; i++) {
trackerServers[i] = new InetSocketAddress(this.tracker_servers[i].getAddress().getHostAddress(), this.tracker_servers[i].getPort());
}
public Object clone() { return new TrackerGroup(trackerServers);
InetSocketAddress[] trackerServers = new InetSocketAddress[this.tracker_servers.length];
for (int i = 0; i < trackerServers.length; i++) {
trackerServers[i] = new InetSocketAddress(this.tracker_servers[i].getAddress().getHostAddress(), this.tracker_servers[i].getPort());
} }
return new TrackerGroup(trackerServers);
}
} }

View File

@ -25,11 +25,18 @@ import java.net.InetSocketAddress;
public class TrackerServer { public class TrackerServer {
protected InetSocketAddress inetSockAddr; protected InetSocketAddress inetSockAddr;
protected int index;
public TrackerServer(InetSocketAddress inetSockAddr) throws IOException { public TrackerServer(InetSocketAddress inetSockAddr) throws IOException {
this.inetSockAddr = inetSockAddr; this.inetSockAddr = inetSockAddr;
} }
public TrackerServer(InetSocketAddress inetSockAddr, int index) {
this.inetSockAddr = inetSockAddr;
this.index = index;
}
public Connection getConnection() throws MyException, IOException { public Connection getConnection() throws MyException, IOException {
Connection connection; Connection connection;
if (ClientGlobal.g_connection_pool_enabled) { if (ClientGlobal.g_connection_pool_enabled) {
@ -48,4 +55,11 @@ public class TrackerServer {
return this.inetSockAddr; return this.inetSockAddr;
} }
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
} }

View File

@ -87,9 +87,9 @@ public class FdfsTest {
@Test @Test
public void download() throws Exception { public void download() throws Exception {
String[] uploadresult = {"group1", "M00/00/00/wKgBZV0phl2ASV1nAACk1tFxwrM3814331"}; String[] uploadresult = {"group1", "M00/00/00/J2fL12PVypeAWiGcAAM_gDeWVyw5817085"};
byte[] result = storageClient.download_file(uploadresult[0], uploadresult[1]); byte[] result = storageClient.download_file(uploadresult[0], uploadresult[1]);
String local_filename = "build.PNG"; String local_filename = "commitment.d2f57e10.jpg";
writeByteToFile(result, local_filename); writeByteToFile(result, local_filename);
File file = new File(local_filename); File file = new File(local_filename);
Assert.assertTrue(file.isFile()); Assert.assertTrue(file.isFile());
@ -98,17 +98,17 @@ public class FdfsTest {
@Test @Test
public void testUploadDownload() throws Exception { public void testUploadDownload() throws Exception {
NameValuePair[] metaList = new NameValuePair[1]; NameValuePair[] metaList = new NameValuePair[1];
String local_filename = "build.PNG"; String local_filename = "commitment.d2f57e10 (2).jpg";
metaList[0] = new NameValuePair("fileName", local_filename); metaList[0] = new NameValuePair("fileName", local_filename);
File file = new File("C:/Users/chengdu/Desktop/build.PNG"); File file = new File("/Users/iyw/Downloads/commitment.d2f57e10 (2).jpg");
InputStream inputStream = new FileInputStream(file); InputStream inputStream = new FileInputStream(file);
int length = inputStream.available(); int length = inputStream.available();
byte[] bytes = new byte[length]; byte[] bytes = new byte[length];
inputStream.read(bytes); inputStream.read(bytes);
String[] result = storageClient.upload_file(bytes, null, metaList); String[] result = storageClient.upload_file(bytes, null, metaList);
Assert.assertTrue(storageClient.isConnected()); //Assert.assertTrue(storageClient.isConnected());
// pool testOnborrow isAvaliable // pool testOnborrow isAvaliable
Assert.assertTrue(storageClient.isAvaliable()); // Assert.assertTrue(storageClient.isAvaliable());
LOGGER.info("result {}", Arrays.asList(result)); LOGGER.info("result {}", Arrays.asList(result));
byte[] resultbytes = storageClient.download_file(result[0], result[1]); byte[] resultbytes = storageClient.download_file(result[0], result[1]);
writeByteToFile(resultbytes, local_filename); writeByteToFile(resultbytes, local_filename);