diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index da214c0f16a..f34d124b803 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -1072,6 +1072,8 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl StringAttr id; // for tracing ICompressHandler *compressHandler; StringBuffer compressOptions; + LookAheadOptions options; + bool newLookAhead = false; public: IMPLEMENT_IINTERFACE_USING(CInterface); @@ -1126,6 +1128,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl ::ActPrintLog(activity, thorDetailedLogLevel, "inputBufferSize : %d, bucketSendSize = %d, pullBufferSize=%d", inputBufferSize, bucketSendSize, pullBufferSize); targetWriterLimit = activity->getOptUInt(THOROPT_HDIST_TARGETWRITELIMIT); ::ActPrintLog(activity, thorDetailedLogLevel, "targetWriterLimit : %d", targetWriterLimit); + + newLookAhead = activity->getOptBool("newlookahead", false); + if (newLookAhead) + populateLookAheadOptions(*activity, options); } virtual void beforeDispose() @@ -1187,7 +1193,14 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl { StringBuffer temp; GetTempFilePath(temp,"hddrecvbuff"); - piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf)); + if (newLookAhead) + { + options.totalCompressionBufferSize = pullBufferSize; // hd option overrides defaults + ICompressHandler *compressHandler = pullBufferSize ? queryDefaultCompressHandler() : nullptr; + piperd.setown(createCompressedSpillingRowStream(activity, temp.str(), false, rowIf, options, compressHandler)); + } + else + piperd.setown(createSmartBuffer(activity, temp.str(), pullBufferSize, rowIf)); } else piperd.setown(createSmartInMemoryBuffer(activity, rowIf, pullBufferSize)); diff --git a/thorlcr/activities/thactivityutil.cpp b/thorlcr/activities/thactivityutil.cpp index fe9a960bb74..2beb728f227 100644 --- a/thorlcr/activities/thactivityutil.cpp +++ b/thorlcr/activities/thactivityutil.cpp @@ -224,21 +224,7 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf allowspill = true; } - // for "newlookahead" only - if (isContainerized()) - { - // JCSMORE - add CJobBase::getTempBlockSize() to calc. once. - StringBuffer planeName; - if (!getDefaultPlane(planeName, "@tempPlane", "temp")) - getDefaultPlane(planeName, "@spillPlane", "spill"); - size32_t blockedSequentialIOSize = getPlaneAttributeValue(planeName, BlockedSequentialIO, (size32_t)-1); - if ((size32_t)-1 != blockedSequentialIOSize) - options.storageBlockSize = blockedSequentialIOSize; - } - options.totalCompressionBufferSize = activity.getOptInt(THOROPT_LOOKAHEAD_COMPRESSIONTOTALK, options.totalCompressionBufferSize / 1024) * 1024; - options.inMemMaxMem = activity.getOptInt(THOROPT_LOOKAHEAD_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024; - options.writeAheadSize = activity.getOptInt64(THOROPT_LOOKAHEAD_WRITEAHEADK, options.writeAheadSize / 1024) * 1024; - options.tempFileGranularity = activity.getOptInt64(THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY, options.tempFileGranularity / 0x100000) * 0x100000; + populateLookAheadOptions(activity, options); } ~CRowStreamLookAhead() { diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index 2e45c7c78d8..7c5d533fa77 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -2957,3 +2957,19 @@ class CRCFileStream: public CSimpleInterface, implements IFileIOStream } }; +void populateLookAheadOptions(CActivityBase &activity, LookAheadOptions &options) +{ + if (isContainerized()) + { + StringBuffer planeName; + if (!getDefaultPlane(planeName, "@tempPlane", "temp")) + getDefaultPlane(planeName, "@spillPlane", "spill"); + size32_t blockedSequentialIOSize = getPlaneAttributeValue(planeName, BlockedSequentialIO, (size32_t)-1); + if ((size32_t)-1 != blockedSequentialIOSize) + options.storageBlockSize = blockedSequentialIOSize; + } + options.totalCompressionBufferSize = activity.getOptInt(THOROPT_LOOKAHEAD_COMPRESSIONTOTALK, options.totalCompressionBufferSize / 1024) * 1024; + options.inMemMaxMem = activity.getOptInt(THOROPT_LOOKAHEAD_MAXROWMEMK, options.inMemMaxMem / 1024) * 1024; + options.writeAheadSize = activity.getOptInt64(THOROPT_LOOKAHEAD_WRITEAHEADK, options.writeAheadSize / 1024) * 1024; + options.tempFileGranularity = activity.getOptInt64(THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY, options.tempFileGranularity / 0x100000) * 0x100000; +} diff --git a/thorlcr/thorutil/thbuf.hpp b/thorlcr/thorutil/thbuf.hpp index fb5a66af8fa..69a28f04824 100644 --- a/thorlcr/thorutil/thbuf.hpp +++ b/thorlcr/thorutil/thbuf.hpp @@ -132,5 +132,6 @@ interface IRowMultiWriterReader : extends IRowStream #define DEFAULT_WR_WRITE_GRANULARITY 1000 // Amount writers buffer up before committing to output extern graph_decl IRowMultiWriterReader *createSharedWriteBuffer(CActivityBase *activity, IThorRowInterfaces *rowif, unsigned limit, unsigned readGranularity=DEFAULT_WR_READ_GRANULARITY, unsigned writeGranularity=DEFAULT_WR_WRITE_GRANULARITY); +extern graph_decl void populateLookAheadOptions(CActivityBase &activity, LookAheadOptions &options); #endif