Skip to content

Commit

Permalink
HPCC-32873 Prevent concurrent write to same file when spraying/despra…
Browse files Browse the repository at this point in the history
…ying

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
  • Loading branch information
jakesmith committed Oct 29, 2024
1 parent ed05bcf commit c4739fc
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 109 deletions.
160 changes: 106 additions & 54 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1592,12 +1592,22 @@ void FileSprayer::commonUpSlaves()
cur.whichSlave = 0;
}

if (options->getPropBool(ANnocommon, true) || pushWhole)
if (pushWhole)
return;

//First work out which are the same slaves, and then map the partition.
//Previously it was n^2 in partition, which is fine until you spray 100K files.
unsigned numSlaves = pull ? targets.ordinality() : sources.ordinality();
bool commonByIp = !isContainerized() && (!options->getPropBool(ANnocommon, true));
offset_t threshold = 0x8000 * numSlaves;
offset_t totalSourceFileSize = 0;

ForEachItemIn(i, sources)
{
const FilePartInfo & cur = sources.item(i);
totalSourceFileSize += copyCompressed ? cur.psize : cur.size;
}

unsigned * slaveMapping = new unsigned [numSlaves];
for (unsigned i = 0; i < numSlaves; i++)
slaveMapping[i] = i;
Expand All @@ -1609,22 +1619,35 @@ void FileSprayer::commonUpSlaves()
TargetLocation & cur = targets.item(i1);
for (unsigned i2 = 0; i2 < i1; i2++)
{
if (targets.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
bool match = false;
if (commonByIp)
match = targets.item(i2).filename.queryIP().ipequals(cur.filename.queryIP());
else if (!targetSupportsConcurrentWrite || totalSourceFileSize < threshold)
match = targets.item(i2).filename.equals(cur.filename);
if (match)
{
slaveMapping[i1] = i2;
break;
}
}
}
}
else
else // push
{
if (!targetSupportsConcurrentWrite) // should not get here if push mode and !targetSupportsConcurrentWrite
throwUnexpected();

for (unsigned i1 = 1; i1 < numSlaves; i1++)
{
FilePartInfo & cur = sources.item(i1);
for (unsigned i2 = 0; i2 < i1; i2++)
{
if (sources.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
bool match = false;
if (commonByIp) // match by IP
match = sources.item(i2).filename.queryIP().ipequals(cur.filename.queryIP());
else if (totalSourceFileSize < threshold)
match = sources.item(i2).filename.equals(cur.filename);
if (match)
{
slaveMapping[i1] = i2;
break;
Expand All @@ -1633,7 +1656,6 @@ void FileSprayer::commonUpSlaves()
}
}


for (unsigned i3 = 0; i3 < max; i3++)
{
PartitionPoint & cur = partition.item(i3);
Expand Down Expand Up @@ -2607,6 +2629,7 @@ void FileSprayer::pullParts()
transferSlaves.append(next);
}

// NB: not all transferServers will be used, depending on mapping of whichSlave
ForEachItemIn(idx3, partition)
{
PartitionPoint & cur = partition.item(idx3);
Expand Down Expand Up @@ -2662,6 +2685,7 @@ void FileSprayer::pushParts()
transferSlaves.append(next);
}

// NB: not all transferServers will be used, depending on mapping of whichSlave
ForEachItemIn(idx3, partition)
{
PartitionPoint & cur = partition.item(idx3);
Expand Down Expand Up @@ -3048,6 +3072,7 @@ void FileSprayer::setTarget(IDistributedFile * target)
TargetLocation & next = * new TargetLocation(curPart->getFilename(rfn,copy), idx);
targets.append(next);
}
target->getClusterGroupName(0, targetPlane.clear());

checkSprayOptions();
}
Expand All @@ -3069,6 +3094,7 @@ void FileSprayer::setTarget(IFileDescriptor * target, unsigned copy)
target->getFilename(idx, copy, filename);
targets.append(*new TargetLocation(filename, idx));
}
target->getClusterGroupName(0, targetPlane.clear());

checkSprayOptions();
}
Expand Down Expand Up @@ -3284,15 +3310,6 @@ void FileSprayer::spray()
checkFormats();
checkForOverlap();

progressTree->setPropBool(ANpull, usePullOperation());

const char * splitPrefix = querySplitPrefix();
if (!replicate && (sources.ordinality() == targets.ordinality()))
{
if (srcFormat.equals(tgtFormat) && !disallowImplicitReplicate())
copySource = true;
}

if (compressOutput&&!replicate&&!copySource)
{
PROGLOG("Compress output forcing pull");
Expand All @@ -3303,6 +3320,8 @@ void FileSprayer::spray()
// in containerized mode, redirect to dafilesrv service or local (if useFtSlave=true)
if (isContainerized())
{
targetSupportsConcurrentWrite = false;

if (useFtSlave)
{
//In containerized world all ftslave processes are executed locally, so make sure we try and connect to a local instance
Expand All @@ -3315,6 +3334,19 @@ void FileSprayer::spray()
sprayServiceHost.clear().append(sprayServiceConfig->queryProp("@name")).append(':').append(sprayServiceConfig->getPropInt("@port"));
}
}
else
targetSupportsConcurrentWrite = true;
if (!targetPlane.isEmpty())
targetSupportsConcurrentWrite = 0 != getPlaneAttributeValue(targetPlane, ConcurrentWriteSupport, targetSupportsConcurrentWrite ? 1 : 0);

progressTree->setPropBool(ANpull, usePullOperation()); // NB: usePullOperation will cache result

const char * splitPrefix = querySplitPrefix();
if (!replicate && (sources.ordinality() == targets.ordinality()))
{
if (srcFormat.equals(tgtFormat) && !disallowImplicitReplicate())
copySource = true;
}

gatherFileSizes(true);
if (!replicate||copySource) // NB: When copySource=true, analyseFileHeaders mainly just sets srcFormat.type
Expand Down Expand Up @@ -3942,6 +3974,7 @@ bool FileSprayer::usePullOperation() const
{
calcedPullPush = true;
cachedUsePull = calcUsePull();
LOG(MCdebugInfo, "Using %s operation", cachedUsePull ? "pull" : "push");
}
return cachedUsePull;
}
Expand Down Expand Up @@ -3979,63 +4012,82 @@ bool FileSprayer::calcUsePull() const
if (sources.ordinality() == 0)
return true;

if (options->getPropBool(ANpull, false))
{
LOG(MCdebugInfo, "Use pull since explicitly specified");
return true;
}
if (options->getPropBool(ANpush, false))
if (!isContainerized()) // using spray-service (dafilsrv) not ftslave in containerized (or if useFtSlave, then created as local process)
{
LOG(MCdebugInfo, "Use push since explicitly specified");
return false;
}
// In BM the sources or targets may not be HPCC environment machines and/or might not be able to run ftslave
ForEachItemIn(idx2, sources)
{
if (!sources.item(idx2).canPush())
{
StringBuffer s;
sources.item(idx2).filename.queryIP().getHostText(s);
LOG(MCdebugInfo, "Use pull operation because %s cannot push", s.str());
return true;
}
}

ForEachItemIn(idx2, sources)
{
if (!sources.item(idx2).canPush())
if (!canLocateSlaveForNode(sources.item(0).filename.queryIP()))
{
StringBuffer s;
sources.item(idx2).filename.queryIP().getHostText(s);
LOG(MCdebugInfo, "Use pull operation because %s cannot push", s.str());
sources.item(0).filename.queryIP().getHostText(s);
LOG(MCdebugInfo, "Use pull operation because %s doesn't appear to have an ftslave", s.str());
return true;
}
}
if (!canLocateSlaveForNode(sources.item(0).filename.queryIP()))
{
StringBuffer s;
sources.item(0).filename.queryIP().getHostText(s);
LOG(MCdebugInfo, "Use pull operation because %s doesn't appear to have an ftslave", s.str());
return true;
}

ForEachItemIn(idx, targets)
{
if (!targets.item(idx).canPull())
ForEachItemIn(idx, targets)
{
if (!targets.item(idx).canPull())
{
StringBuffer s;
targets.item(idx).queryIP().getHostText(s);
LOG(MCdebugInfo, "Use push operation because %s cannot pull", s.str());
return false;
}
}

if (!canLocateSlaveForNode(targets.item(0).queryIP()))
{
StringBuffer s;
targets.item(idx).queryIP().getHostText(s);
LOG(MCdebugInfo, "Use push operation because %s cannot pull", s.str());
targets.item(0).queryIP().getHostText(s);
LOG(MCdebugInfo, "Use push operation because %s doesn't appear to have an ftslave", s.str());
return false;
}
}

if (!canLocateSlaveForNode(targets.item(0).queryIP()))
bool pullRequested = options->hasProp(ANpull) ? options->getPropBool(ANpull) : false;
bool pushRequested = options->hasProp(ANpush) ? options->getPropBool(ANpush) : false;
if (!targetSupportsConcurrentWrite)
{
StringBuffer s;
targets.item(0).queryIP().getHostText(s);
LOG(MCdebugInfo, "Use push operation because %s doesn't appear to have an ftslave", s.str());
return false;
// always pull
if (pushRequested)
IWARNLOG("Ignoring push option as targets < sources and target does not support concurrent write");
return true; // could be push if equal # of soruces and targets but no point
}

//Use push if going to a single node.
if ((targets.ordinality() == 1) && (sources.ordinality() > 1))
else
{
LOG(MCdebugInfo, "Use push operation because going to a single node from many");
return false;
}
bool wantPull = false;
bool wantPush = false;
if (targets.ordinality() < sources.ordinality()) // implying multiple writes to same target, force pull, and will common up on matching target filenames
wantPull = true;
else if (targets.ordinality() > sources.ordinality()) // targets > sources. i.e. multiple splits of source files for each target
wantPush = true;

LOG(MCdebugInfo, "Use pull operation as default");
return true;
if (wantPull && pushRequested)
{
IWARNLOG("Wanted to pull since targets < sources, but push option takes precedence");
return false; // push
}
else if (wantPush && pullRequested)
{
IWARNLOG("Wanted to push since targets > sources, but pull option takes precedence");
return true; // pull
}

if (wantPush || pushRequested)
return false; // push

return true; // default pull
}
}


Expand Down
2 changes: 2 additions & 0 deletions dali/ft/filecopy.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ protected:
Linked<IDistributedFile> distributedTarget;
Linked<IDistributedFile> distributedSource;
TargetLocationArray targets;
StringBuffer targetPlane;
bool targetSupportsConcurrentWrite = true; // if false, will prevent multiple writers to same target file (e.g. not supported by Azure Blob storage)
FileFormat srcFormat;
FileFormat tgtFormat;
Owned<IDFPartFilter> filter;
Expand Down
Loading

0 comments on commit c4739fc

Please sign in to comment.