Skip to content

Commit

Permalink
HPCC4J-542 DFSClient: Create JUnit for read retry
Browse files Browse the repository at this point in the history
- Added file part failure retry test
- Fixed retry issue on initial connection

Signed-off-by: James McMullan James.McMullan@lexisnexis.com
  • Loading branch information
jpmcmu committed Oct 12, 2023
1 parent acc0ecb commit 6dca48f
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package org.hpccsystems.dfs.client;

import java.io.Serializable;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;

import org.hpccsystems.commons.ecl.FileFilter;
import org.hpccsystems.commons.errors.HpccFileException;
Expand Down Expand Up @@ -266,6 +269,38 @@ public String getCopyIP(int copyindex)
return copyLocations[copyindex];
}

/**
* Set the copy IP
*
* @param copyIndex
* the copyindex
* @param copyIP The IP of the file part copy
*/
public void setCopyIP(int copyIndex, String copyIP)
{
if (copyIndex < 0 || copyIndex >= copyLocations.length) return;

copyLocations[copyIndex] = copyIP;
}

/**
* Add file part copy
*
* @param index The index at which to insert the file part copy
* @param copyIP The IP of the new file part copy
* @param copyPath The path of the new file part copy
*/
public void add(int index, String copyIP, String copyPath)
{
List<String> copyLocationsList = new ArrayList<>(Arrays.asList(copyLocations));
copyLocationsList.add(index, copyIP);
copyLocations = copyLocationsList.toArray(new String[0]);

List<String> copyPathList = new ArrayList<>(Arrays.asList(copyPaths));
copyPathList.add(index, copyPath);
copyPaths = copyPathList.toArray(new String[0]);
}

/**
* Count of copies available for this file part.
* @return copy locations size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1528,150 +1528,156 @@ private void makeActive() throws HpccFileException
this.active.set(false);
this.handle = 0;

try
boolean needsRetry = false;
do
{
log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + (getFilePartCopy() + 1) + "' on IP: '"
+ getIP() + "'");

needsRetry = false;
try
{
if (getUseSSL())
log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '"
+ (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'");
try
{
SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault();
sock = (SSLSocket) ssf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);

log.debug("Attempting SSL handshake...");
((SSLSocket) sock).startHandshake();
log.debug("SSL handshake successful...");
log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
if (getUseSSL())
{
SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault();
sock = (SSLSocket) ssf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);

log.debug("Attempting SSL handshake...");
((SSLSocket) sock).startHandshake();
log.debug("SSL handshake successful...");
log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
else
{
SocketFactory sf = SocketFactory.getDefault();
sock = sf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
}

this.sock.setSoTimeout(socketOpTimeoutMs);

log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
else
catch (java.net.UnknownHostException e)
{
SocketFactory sf = SocketFactory.getDefault();
sock = sf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
throw new HpccFileException("Bad file part addr " + this.getIP(), e);
}
catch (java.io.IOException e)
{
throw new HpccFileException(e);
}

this.sock.setSoTimeout(socketOpTimeoutMs);
try
{
this.dos = new java.io.DataOutputStream(sock.getOutputStream());
this.dis = new java.io.DataInputStream(sock.getInputStream());
}
catch (java.io.IOException e)
{
throw new HpccFileException("Failed to create streams", e);
}

log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
catch (java.net.UnknownHostException e)
{
throw new HpccFileException("Bad file part addr " + this.getIP(), e);
}
catch (java.io.IOException e)
{
throw new HpccFileException(e);
}
//------------------------------------------------------------------------------
// Check protocol version
//------------------------------------------------------------------------------

try
{
this.dos = new java.io.DataOutputStream(sock.getOutputStream());
this.dis = new java.io.DataInputStream(sock.getInputStream());
}
catch (java.io.IOException e)
{
throw new HpccFileException("Failed to create streams", e);
}
try
{
String msg = makeGetVersionRequest();
int msgLen = msg.length();

//------------------------------------------------------------------------------
// Check protocol version
//------------------------------------------------------------------------------
this.dos.writeInt(msgLen);
this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Failed on initial remote read trans", e);
}

try
{
String msg = makeGetVersionRequest();
int msgLen = msg.length();
RowServiceResponse response = readResponse();
if (response.len == 0)
{
useOldProtocol = true;
}
else
{
useOldProtocol = false;

this.dos.writeInt(msgLen);
this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Failed on initial remote read trans", e);
}
byte[] versionBytes = new byte[response.len];
try
{
this.dis.readFully(versionBytes);
}
catch (IOException e)
{
throw new HpccFileException("Error while attempting to read version response.", e);
}

RowServiceResponse response = readResponse();
if (response.len == 0)
{
useOldProtocol = true;
}
else
{
useOldProtocol = false;
rowServiceVersion = new String(versionBytes, HPCCCharSet);
}

//------------------------------------------------------------------------------
// Send initial read request
//------------------------------------------------------------------------------

byte[] versionBytes = new byte[response.len];
try
{
this.dis.readFully(versionBytes);
String readTrans = null;
if (this.tokenBin == null)
{
this.tokenBin = new byte[0];
readTrans = makeInitialRequest();
}
else
{
readTrans = makeTokenRequest();
}

int transLen = readTrans.length();
this.dos.writeInt(transLen);
this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Error while attempting to read version response.", e);
throw new HpccFileException("Failed on initial remote read read trans", e);
}

rowServiceVersion = new String(versionBytes, HPCCCharSet);
}

//------------------------------------------------------------------------------
// Send initial read request
//------------------------------------------------------------------------------

try
{
String readTrans = null;
if (this.tokenBin == null)
{
this.tokenBin = new byte[0];
readTrans = makeInitialRequest();
}
else
if (CompileTimeConstants.PROFILE_CODE)
{
readTrans = makeTokenRequest();
firstByteTimeNS = System.nanoTime();
}

int transLen = readTrans.length();
this.dos.writeInt(transLen);
this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
this.dos.flush();
this.active.set(true);
}
catch (IOException e)
catch (Exception e)
{
throw new HpccFileException("Failed on initial remote read read trans", e);
}
log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP()
+ "'");
log.error(e.getMessage());

if (CompileTimeConstants.PROFILE_CODE)
{
firstByteTimeNS = System.nanoTime();
needsRetry = true;
if (!setNextFilePartCopy())
{
throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
}
}

this.active.set(true);
}
catch (Exception e)
{
log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP()
+ "'");
log.error(e.getMessage());

if (!setNextFilePartCopy())
// This should be a multi exception
throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
}
} while (needsRetry);
}

/* Notes on protocol:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,53 @@ public void emptyCompressedFileTest()
}
}

@Test
public void filePartReadRetryTest()
{
{
HPCCFile readFile = null;
try
{
readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass);
DataPartition[] fileParts = readFile.getFileParts();
for (int i = 0; i < fileParts.length; i++)
{
String firstCopyIP = fileParts[i].getCopyIP(0);
String firstCopyPath = fileParts[i].getCopyPath(0);
fileParts[i].setCopyIP(0, "1.1.1.1");
fileParts[i].add(1, firstCopyIP, firstCopyPath);
}

List<HPCCRecord> records = readFile(readFile, null, false);
System.out.println("Record count: " + records.size());
}
catch (Exception e)
{
Assert.fail(e.getMessage());
}
}

{
HPCCFile readFile = null;
try
{
readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass);
DataPartition[] fileParts = readFile.getFileParts();
for (int i = 0; i < fileParts.length; i++)
{
fileParts[i].add(0,"1.1.1.1", "");
}

List<HPCCRecord> records = readFile(readFile, null, false);
System.out.println("Record count: " + records.size());
}
catch (Exception e)
{
Assert.fail(e.getMessage());
}
}
}

@Test
public void invalidSignatureTest()
{
Expand Down

0 comments on commit 6dca48f

Please sign in to comment.