Skip to content

Commit

Permalink
HPCC-29917 Refactor compressToBuffer to support different compression…
Browse files Browse the repository at this point in the history
… methods

Add some tests, and fix some issues that the tests showed up

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
  • Loading branch information
richardkchapman committed Nov 15, 2023
1 parent 588f5a8 commit c25ff6e
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 62 deletions.
8 changes: 4 additions & 4 deletions system/jlib/jfcmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ class jlib_decl CFcmpCompressor : public CSimpleInterfaceOf<ICompressor>

virtual void open(void *buf,size32_t max) override
{
if (max<1024)
throw MakeStringException(-1,"CFcmpCompressor::open - block size (%d) not large enough", max);
// if (max<1024)
// throw MakeStringException(-1,"CFcmpCompressor::open - block size (%d) not large enough", max);
wrmax = max;
originalMax = max;
if (buf)
Expand Down Expand Up @@ -103,8 +103,8 @@ class jlib_decl CFcmpCompressor : public CSimpleInterfaceOf<ICompressor>
{
if (!initialSize)
initialSize = FCMP_BUFFER_SIZE; // 1MB
if (initialSize<1024)
throw MakeStringException(-1,"CFcmpCompressor::open - block size (%d) not large enough", initialSize);
//if (initialSize<1024)
// throw MakeStringException(-1,"CFcmpCompressor::open - block size (%d) not large enough", initialSize);
wrmax = initialSize;
if (bufalloc)
{
Expand Down
11 changes: 5 additions & 6 deletions system/jlib/jlz4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,17 @@ class CLZ4Compressor final : public CFcmpCompressor
protected:
virtual void setinmax() override
{
inmax = blksz-outlen-sizeof(size32_t);
if (inmax<256)
if (blksz <= outlen+sizeof(size32_t))
trailing = true; // too small to bother compressing
else
{
trailing = false;
inmax = blksz-outlen-sizeof(size32_t);
size32_t slack = LZ4_COMPRESSBOUND(inmax) - inmax;
int inmax2 = inmax - (slack + sizeof(size32_t));
if (inmax2<256)
if (inmax <= (slack + sizeof(size32_t)))
trailing = true;
else
inmax = inmax2;
inmax = inmax - (slack + sizeof(size32_t));
}
}

Expand Down Expand Up @@ -73,7 +72,7 @@ class CLZ4Compressor final : public CFcmpCompressor
if (toflush == 0)
return;

if (toflush < 256)
if (false && toflush < 256)
{
trailing = true;
return;
Expand Down
109 changes: 80 additions & 29 deletions system/jlib/jlzw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -760,33 +760,42 @@ size32_t RLEExpand(void *dst,const void *src,size32_t expsize)

void compressToBuffer(MemoryBuffer & out, size32_t len, const void * src, CompressionMethod method, const char *options)
{
if (method != COMPRESS_METHOD_NONE)
if (method != COMPRESS_METHOD_NONE && len >= 32)
{
ICompressHandler *handler = queryCompressHandler(method);
if (!handler)
{
VStringBuffer s("Unknown compression method %x requested in compressToBuffer", (byte) method);
throw makeStringException(0, s.str());
}
unsigned originalLength = out.length();
// For back-compatibility, we always store COMPRESS_METHOD_LZW as 1 as earlier versions stored a boolean here
// rather than an enum
// This means that compressToBuffer/decompressToBuffer cannot bs used for rowdiff compression - this is not likely to be an issue
// Alternative would be a separate enum for compressToBuffer formats, but that seems more likely to cause confusion
out.append((byte) (method == COMPRESS_METHOD_LZW ? COMPRESS_METHOD_LZWLEGACY : method));
out.append((size32_t)0);
if (len >= 32)
size32_t newSize = len * 4 / 5; // Copy if compresses less than 80% ...
Owned<ICompressor> compressor = handler->getCompressor(options);
void *newData = out.reserve(newSize);
try
{
size32_t newSize = len * 4 / 5; // Copy if compresses less than 80% ...
Owned<ICompressor> compressor = queryCompressHandler(method)->getCompressor(options);
void *newData = out.reserve(newSize);
compressor->open(newData, newSize);
if (compressor->write(src, len)==len)
{
compressor->close();
size32_t compressedLen = compressor->buflen();
out.setWritePos(originalLength + sizeof(bool));
out.setWritePos(originalLength + sizeof(byte));
out.append(compressedLen);
out.setWritePos(originalLength + sizeof(bool) + sizeof(size32_t) + compressedLen);
out.setWritePos(originalLength + sizeof(byte) + sizeof(size32_t) + compressedLen);
return;
}
}

// all or don't compress
catch (IException *E)
{
E->Release();
}
// failed to compress...
out.setWritePos(originalLength);
}
out.append((byte) COMPRESS_METHOD_NONE);
Expand All @@ -806,7 +815,13 @@ void decompressToBuffer(MemoryBuffer & out, MemoryBuffer & in, const char *optio
{
if (method==COMPRESS_METHOD_LZWLEGACY)
method = COMPRESS_METHOD_LZW; // Back compatibilty
Owned<IExpander> expander = queryCompressHandler(method)->getExpander(options);
ICompressHandler *handler = queryCompressHandler(method);
if (!handler)
{
VStringBuffer s("Unknown decompression method %x required in decompressToBuffer", (byte) method);
throw makeStringException(0, s.str());
}
Owned<IExpander> expander = handler->getExpander(options);
unsigned outSize = expander->init(in.readDirect(srcLen));
void * buff = out.reserve(outSize);
expander->expand(buff);
Expand Down Expand Up @@ -2651,6 +2666,8 @@ class CAESCompressor : implements ICompressor, public CInterface
else
outbuf = outattr.allocate(blksize);
outBufMb = NULL;
if (blksize <= AES_PADDING_SIZE+sizeof(size32_t))
throw makeStringException(0, "CAESCompressor: target buffer too small");
size32_t subsz = blksize-AES_PADDING_SIZE-sizeof(size32_t);
comp->open(compattr.reserveTruncate(subsz),subsz);
}
Expand Down Expand Up @@ -2841,54 +2858,88 @@ IPropertyTree *getBlockedFileDetails(IFile *file)
return NULL;
}

class CCompressHandlerArray : public IArrayOf<ICompressHandler>
class CCompressHandlerArray
{
IArrayOf<ICompressHandler> registered; // Owns the relevant handler objects
ICompressHandler *byMethod[COMPRESS_METHOD_LAST] = { nullptr };
ICompressHandler *AESbyMethod[COMPRESS_METHOD_LAST] = { nullptr };

public:
ICompressHandler *lookup(const char *type) const
{
ForEachItemIn(h, *this)
ForEachItemIn(h, registered)
{
ICompressHandler &handler = item(h);
ICompressHandler &handler = registered.item(h);
if (0 == stricmp(type, handler.queryType()))
return &handler;
}
return NULL;
}
ICompressHandler *lookup(CompressionMethod method) const
{
// MORE - should probably use an array, cache last lookup, or something...
// This is called quite a lot now
ForEachItemIn(h, *this)
if ((method & ~COMPRESS_METHOD_AES) >= COMPRESS_METHOD_LAST)
return nullptr;
else if (method & COMPRESS_METHOD_AES)
return AESbyMethod[method & ~COMPRESS_METHOD_AES];
else
return byMethod[method];
}
ICompressHandlerIterator *getIterator()
{
return new ArrayIIteratorOf<IArrayOf<ICompressHandler>, ICompressHandler, ICompressHandlerIterator>(registered);
}
bool addCompressor(ICompressHandler *handler)
{
CompressionMethod method = handler->queryMethod();
if (lookup(method))
{
ICompressHandler &handler = item(h);
if (method == handler.queryMethod())
return &handler;
handler->Release();
return false; // already registered
}
return NULL;
registered.append(* handler);
if ((method & ~COMPRESS_METHOD_AES) < COMPRESS_METHOD_LAST)
{
if (method & COMPRESS_METHOD_AES)
AESbyMethod[method & ~COMPRESS_METHOD_AES] = handler;
else
byMethod[method] = handler;
}
return true;
}
bool removeCompressor(ICompressHandler *handler)
{
CompressionMethod method = handler->queryMethod();
if (registered.zap(* handler))
{
if ((method & ~COMPRESS_METHOD_AES) < COMPRESS_METHOD_LAST)
{
if (method & COMPRESS_METHOD_AES)
AESbyMethod[method & ~COMPRESS_METHOD_AES] = handler;
else
byMethod[method] = handler;
}
return true;
}
else
return false;
}
} compressors;

typedef IIteratorOf<ICompressHandler> ICompressHandlerIterator;

ICompressHandlerIterator *getCompressHandlerIterator()
{
return new ArrayIIteratorOf<IArrayOf<ICompressHandler>, ICompressHandler, ICompressHandlerIterator>(compressors);
return compressors.getIterator();
}

bool addCompressorHandler(ICompressHandler *handler)
{
if (compressors.lookup(handler->queryMethod()))
{
handler->Release();
return false; // already registered
}
compressors.append(* handler);
return true;
return compressors.addCompressor(handler);
}

bool removeCompressorHandler(ICompressHandler *handler)
{
return compressors.zap(* handler);
return compressors.removeCompressor(handler);
}

Linked<ICompressHandler> defaultCompressor;
Expand Down
1 change: 1 addition & 0 deletions system/jlib/jlzw.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum CompressionMethod
COMPRESS_METHOD_LZ4,
COMPRESS_METHOD_LZ4HC,
COMPRESS_METHOD_RANDROW,
COMPRESS_METHOD_LAST,


COMPRESS_METHOD_AES = 0x80,
Expand Down
78 changes: 55 additions & 23 deletions testing/unittests/unittests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1059,35 +1059,67 @@ class compressToBufferTest : public CppUnit::TestFixture
CPPUNIT_TEST(testRun);
CPPUNIT_TEST_SUITE_END();

void testRun()
void testOne(unsigned len, CompressionMethod method, const char *options=nullptr)
{
MemoryBuffer x;
compressToBuffer(x, 251,
constexpr const char *in =
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
);
for (unsigned i = 0; i < x.length(); i++)
printf("%02x ", x.toByteArray()[i]);
printf("\n");
* (byte *) x.toByteArray() = 2;
for (unsigned i = 0; i < x.length(); i++)
printf("%02x ", x.toByteArray()[i]);
printf("\n");
try
{
MemoryBuffer out;
decompressToBuffer(out, x);
printf("%s\n", out.toByteArray());
}
catch(IException *E)
{
StringBuffer s;
printf("Exception %s\n", E->errorMessage(s).str());
::Release(E);
}
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello"
"HelloHelloHelloHelloHelloHelloHelloHelloHelloHello";
assertex(len <= strlen(in));
MemoryBuffer compressed;
compressToBuffer(compressed, len, in, method, options);

if (method != COMPRESS_METHOD_NONE && len >= 32 && compressed.length() == len+5)
DBGLOG("compressToBuffer %x size %u did not compress", (byte) method, len);
else
DBGLOG("compressToBuffer %x size %u compressed to %u", (byte) method, len, compressed.length());
CPPUNIT_ASSERT(compressed.length() <= len+5);
MemoryBuffer out;
decompressToBuffer(out, compressed, options);
CPPUNIT_ASSERT(out.length() == len);
CPPUNIT_ASSERT(memcmp(out.bytes(), in, len) == 0);
}

void testSome(unsigned len)
{
testOne(len, COMPRESS_METHOD_NONE);
testOne(len, COMPRESS_METHOD_LZW);
testOne(len, COMPRESS_METHOD_LZ4);
testOne(len, (CompressionMethod) (COMPRESS_METHOD_LZW|COMPRESS_METHOD_AES), "0123456789abcdef");
}

void testRun()
{
testSome(0);
testSome(1);
testSome(16);
testSome(32);
testSome(200);
testSome(1000);
}
};

Expand Down

0 comments on commit c25ff6e

Please sign in to comment.