Skip to content

Commit

Permalink
Use LZ4HC rather than LZW to compress Roxie continuation data
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
  • Loading branch information
richardkchapman committed Nov 1, 2023
1 parent 910338f commit 4f2cb57
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 8 deletions.
13 changes: 9 additions & 4 deletions roxie/ccd/ccdactivities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "thorcommon.ipp"
#include "thorstrand.hpp"
#include "jstats.h"
#include "jlz4.hpp"

using roxiemem::OwnedRoxieRow;
using roxiemem::OwnedConstRoxieRow;
Expand Down Expand Up @@ -2707,7 +2708,8 @@ class CRoxieIndexActivity : public CRoxieKeyedActivity
compressed.append(siLen); // Leaving space to patch when size known
compressed.append(true);
compressed.append(lastRowCompleteMatch); // This field is not compressed - see above!
compressToBuffer(compressed, si.length() - compressed.length(), si.toByteArray() + compressed.length());
Owned<ICompressor> compressor = createLZ4Compressor("hclevel=3", true);
compressToBuffer(compressed, si.length() - compressed.length(), si.toByteArray() + compressed.length(), compressor);
bool report = logctx.queryTraceLevel() && (doTrace(traceRoxiePackets) || si.length() >= continuationWarnThreshold);
if (report)
logctx.CTXLOG("ERROR: continuation data size %u for %d cursor positions is large - compressed to %u", si.length(), tlk->numActiveKeys(), compressed.length());
Expand All @@ -2733,7 +2735,8 @@ class CRoxieIndexActivity : public CRoxieKeyedActivity
if (isCompressed)
{
MemoryBuffer decompressed;
decompressToBuffer(decompressed, resentInfo);
Owned<IExpander> expander = createLZ4Expander();
decompressToBuffer(decompressed, resentInfo, expander);
if (doTrace(traceRoxiePackets))
logctx.CTXLOG("readContinuationInfo: decompressed from %u to %u", resentInfo.length(), decompressed.length());
resentInfo.swapWith(decompressed);
Expand Down Expand Up @@ -4120,7 +4123,8 @@ class CRoxieKeyedJoinIndexActivity : public CRoxieKeyedActivity
MemoryBuffer compressed;
compressed.append(siLen); // Leaving space to patch when size known
compressed.append(true);
compressToBuffer(compressed, si.length() - compressed.length(), si.toByteArray() + compressed.length());
Owned<ICompressor> compressor = createLZ4Compressor("hclevel=3", true);
compressToBuffer(compressed, si.length() - compressed.length(), si.toByteArray() + compressed.length(), compressor);
bool report = logctx.queryTraceLevel() && (doTrace(traceRoxiePackets) || si.length() >= continuationWarnThreshold);
if (report)
DBGLOG("ERROR: continuation data size %u for %d cursor positions is large - compressed to %u", si.length(), tlk->numActiveKeys(), compressed.length());
Expand All @@ -4145,7 +4149,8 @@ class CRoxieKeyedJoinIndexActivity : public CRoxieKeyedActivity
if (isCompressed)
{
MemoryBuffer decompressed;
decompressToBuffer(decompressed, resentInfo);
Owned<IExpander> expander = createLZ4Expander();
decompressToBuffer(decompressed, resentInfo, expander);
if (doTrace(traceRoxiePackets))
DBGLOG("readContinuationInfo: decompressed from %u to %u", resentInfo.length(), decompressed.length());
resentInfo.swapWith(decompressed);
Expand Down
37 changes: 33 additions & 4 deletions system/jlib/jlzw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ void appendToBuffer(MemoryBuffer & out, size32_t len, const void * src)
out.append(len, src);
}

void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src)
void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src, ICompressor *compressor)
{
unsigned originalLength = out.length();
out.append(true);
Expand All @@ -774,7 +774,6 @@ void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src)
if (len >= 32)
{
size32_t newSize = len * 4 / 5; // Copy if compresses less than 80% ...
Owned<ICompressor> compressor = createLZWCompressor();
void *newData = out.reserve(newSize);
compressor->open(newData, newSize);
if (compressor->write(src, len)==len)
Expand All @@ -793,15 +792,34 @@ void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src)
appendToBuffer(out, len, src);
}

void decompressToBuffer(MemoryBuffer & out, const void * src)
void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src)
{
if (len < 32)
{
out.append(false);
appendToBuffer(out, len, src);
}
else
{
Owned<ICompressor> compressor = createLZWCompressor();
compressToBuffer(out, len, src, compressor);
}
}

void decompressToBuffer(MemoryBuffer & out, const void * src, IExpander *expander)
{
Owned<IExpander> expander = createLZWExpander();
unsigned outSize = expander->init(src);
void * buff = out.reserve(outSize);
expander->expand(buff);
}


void decompressToBuffer(MemoryBuffer & out, const void * src)
{
Owned<IExpander> expander = createLZWExpander();
decompressToBuffer(out, src, expander);
}

void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in)
{
bool compressed;
Expand All @@ -813,6 +831,17 @@ void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in)
out.append(srcLen, in.readDirect(srcLen));
}

void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in, IExpander *expander)
{
bool compressed;
size32_t srcLen;
in.read(compressed).read(srcLen);
if (compressed)
decompressToBuffer(out, in.readDirect(srcLen), expander);
else
out.append(srcLen, in.readDirect(srcLen));
}

void decompressToAttr(MemoryAttr & out, const void * src)
{
Owned<IExpander> expander = createLZWExpander();
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jlzw.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ extern jlib_decl IRandRowExpander *createRandRDiffExpander(); // NB only support


//Some helper functions to make it easy to compress/decompress to memorybuffers.
extern jlib_decl void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src, ICompressor *compressor);
extern jlib_decl void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src);
extern jlib_decl void decompressToBuffer(MemoryBuffer & out, const void * src);
extern jlib_decl void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in, IExpander *expander);
extern jlib_decl void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in);
extern jlib_decl void decompressToAttr(MemoryAttr & out, const void * src);
extern jlib_decl void decompressToBuffer(MemoryAttr & out, MemoryBuffer & in);
Expand Down
42 changes: 42 additions & 0 deletions testing/unittests/unittests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1052,5 +1052,47 @@ class RelaxedAtomicTimingTest : public CppUnit::TestFixture

CPPUNIT_TEST_SUITE_REGISTRATION( RelaxedAtomicTimingTest );
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RelaxedAtomicTimingTest, "RelaxedAtomicTimingTest" );
#include "jlzw.hpp"
class compressToBufferTest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE( compressToBufferTest );
CPPUNIT_TEST(testRun);
CPPUNIT_TEST_SUITE_END();

void testRun()
{
MemoryBuffer x;
compressToBuffer(x, 251,
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
);
for (unsigned i = 0; i < x.length(); i++)
printf("%02x ", x.toByteArray()[i]);
printf("\n");
* (byte *) x.toByteArray() = 2;
for (unsigned i = 0; i < x.length(); i++)
printf("%02x ", x.toByteArray()[i]);
printf("\n");
try
{
MemoryBuffer out;
decompressToBuffer(out, x);
printf("%s\n", out.toByteArray());
}
catch(IException *E)
{
StringBuffer s;
printf("Exception %s\n", E->errorMessage(s).str());
::Release(E);
}
}
};

CPPUNIT_TEST_SUITE_REGISTRATION( compressToBufferTest );
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( compressToBufferTest, "CompressToBufferTest" );


#endif // _USE_CPPUNIT

0 comments on commit 4f2cb57

Please sign in to comment.