Skip to content

Commit

Permalink
HPCC-32763 Prevent crash when reporting a disk full error writing to …
Browse files Browse the repository at this point in the history
…a compressed file

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
  • Loading branch information
ghalliday committed Oct 3, 2024
1 parent 50ed84f commit 231688e
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 75 deletions.
7 changes: 7 additions & 0 deletions system/jlib/jfcmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ class jlib_decl CFcmpCompressor : public CSimpleInterfaceOf<ICompressor>

virtual void close() override
{
//Protect against close() being called more than once on a compressor
if (!inbuf)
{
if (isDebugBuild())
throwUnexpectedX("CFCmpCompressor::close() called more than once");
return;
}
if (inlenblk!=COMMITTED)
{
inlen = inlenblk; // transaction failed
Expand Down
3 changes: 3 additions & 0 deletions system/jlib/jlzw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2773,6 +2773,9 @@ class CAESCompressor : implements ICompressor, public CInterface

virtual void close() override
{
if (outmax == 0)
return;

comp->close();
// now encrypt
MemoryBuffer buf;
Expand Down
230 changes: 155 additions & 75 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3463,92 +3463,44 @@ class JlibIOTest : public CppUnit::TestFixture
CPPUNIT_TEST_SUITE_REGISTRATION(JlibIOTest);
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(JlibIOTest, "JlibIOTest");

//--------------------------------------------------------------------------------------------------------------------

class JlibCompressionTestsStress : public CppUnit::TestFixture
class JlibCompressionTestBase : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(JlibCompressionTestsStress);
CPPUNIT_TEST(test);
CPPUNIT_TEST_SUITE_END();

protected:
static constexpr size32_t sz = 100*0x100000; // 100MB
static constexpr const char *aesKey = "012345678901234567890123";
enum CompressOpt { RowCompress, AllRowCompress, BlockCompress, CompressToBuffer };
public:
void test()
void initCompressionBuffer(MemoryBuffer & src, size32_t & rowSz)
{
try
{
MemoryBuffer src;
src.ensureCapacity(sz);
const char *aesKey = "012345678901234567890123";
Owned<ICompressHandlerIterator> iter = getCompressHandlerIterator();
src.ensureCapacity(sz);

StringBuffer tmp;
unsigned card = 0;
size32_t rowSz = 0;
while (true)
StringBuffer tmp;
unsigned card = 0;
while (true)
{
size32_t cLen = src.length();
if (cLen > sz)
break;
src.append(cLen);
tmp.clear().appendf("%10u", cLen);
src.append(tmp.length(), tmp.str());
src.append(++card % 52);
src.append(crc32((const char *)&cLen, sizeof(cLen), 0));
unsigned ccrc = crc32((const char *)&card, sizeof(card), 0);
tmp.clear().appendf("%10u", ccrc);
src.append(tmp.length(), tmp.str());
tmp.clear().appendf("%20u", (++card % 10));
src.append(tmp.length(), tmp.str());
if (0 == rowSz)
rowSz = src.length();
else
{
size32_t cLen = src.length();
if (cLen > sz)
break;
src.append(cLen);
tmp.clear().appendf("%10u", cLen);
src.append(tmp.length(), tmp.str());
src.append(++card % 52);
src.append(crc32((const char *)&cLen, sizeof(cLen), 0));
unsigned ccrc = crc32((const char *)&card, sizeof(card), 0);
tmp.clear().appendf("%10u", ccrc);
src.append(tmp.length(), tmp.str());
tmp.clear().appendf("%20u", (++card % 10));
src.append(tmp.length(), tmp.str());
if (0 == rowSz)
rowSz = src.length();
else
{
dbgassertex(0 == (src.length() % rowSz));
}
dbgassertex(0 == (src.length() % rowSz));
}

DBGLOG("Algorithm(options) || Comp(ms) || Deco(ms) || 200MB/s (w,r) || 1GB/s (w,r) || 5GB/s (w,r) || Ratio [cLen]");
DBGLOG(" || || || 2Gb/s || 10Gb/s || 50Gb/s ||");

unsigned time200MBs = transferTimeMs(sz, 200000000);
unsigned time1GBs = transferTimeMs(sz, 1000000000);
unsigned time5GBs = transferTimeMs(sz, 5000000000);
DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", "uncompressed", 0, 0,
time200MBs, time200MBs, time200MBs, time1GBs, time1GBs, time1GBs, time5GBs, time5GBs, time5GBs, 1.0, sz);
ForEach(*iter)
{
ICompressHandler &handler = iter->query();
const char * type = handler.queryType();
//Ignore unusual compressors with no expanders...
if (strieq(type, "randrow"))
continue;
const char * options = streq("AES", handler.queryType()) ? aesKey: "";
if (streq(type, "LZ4HC"))
{
testCompressor(handler, "hclevel=3", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=4", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=5", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=6", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=8", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=10", rowSz, src.length(), src.bytes(), RowCompress);
}
testCompressor(handler, options, rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, options, rowSz, src.length(), src.bytes(), CompressToBuffer);
if (streq(type, "LZ4"))
{
testCompressor(handler, "allrow", rowSz, src.length(), src.bytes(), AllRowCompress); // block doesn't affect the compressor, just tracing
testCompressor(handler, "block", rowSz, src.length(), src.bytes(), BlockCompress); // block doesn't affect the compressor, just tracing
}
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
throw;
}
}

unsigned transferTimeMs(__int64 size, __int64 bytesPerSecond)
{
return (unsigned)((size * 1000) / bytesPerSecond);
Expand All @@ -3575,6 +3527,18 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture
}
compressor->commitblock();
compressor->close();
try
{
//Test that calling close again doesn't cause any problems
compressor->close();
}
catch (IException * e)
{
//In debug mode it is ok to throw an unexpected error, in release it should be ignored.
if (!isDebugBuild() || e->errorCode() != 9999)
CPPUNIT_FAIL("Unexpected exception from second call to compressor->close()");
e->Release();
}
break;
}
case AllRowCompress:
Expand Down Expand Up @@ -3649,9 +3613,125 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture
}
};


class JlibCompressionStandardTest : public JlibCompressionTestBase
{
CPPUNIT_TEST_SUITE(JlibCompressionStandardTest);
CPPUNIT_TEST(test);
CPPUNIT_TEST_SUITE_END();

public:
void test()
{
try
{
MemoryBuffer src;
size32_t rowSz = 0;
initCompressionBuffer(src, rowSz);


DBGLOG("Algorithm(options) || Comp(ms) || Deco(ms) || 200MB/s (w,r) || 1GB/s (w,r) || 5GB/s (w,r) || Ratio [cLen]");
DBGLOG(" || || || 2Gb/s || 10Gb/s || 50Gb/s ||");

unsigned time200MBs = transferTimeMs(sz, 200000000);
unsigned time1GBs = transferTimeMs(sz, 1000000000);
unsigned time5GBs = transferTimeMs(sz, 5000000000);
DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", "uncompressed", 0, 0,
time200MBs, time200MBs, time200MBs, time1GBs, time1GBs, time1GBs, time5GBs, time5GBs, time5GBs, 1.0, sz);

Owned<ICompressHandlerIterator> iter = getCompressHandlerIterator();
ForEach(*iter)
{
ICompressHandler &handler = iter->query();
const char * type = handler.queryType();
//Ignore unusual compressors with no expanders...
if (strieq(type, "randrow"))
continue;
const char * options = streq("AES", handler.queryType()) ? aesKey: "";
testCompressor(handler, options, rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, options, rowSz, src.length(), src.bytes(), CompressToBuffer);
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
throw;
}
}

};

CPPUNIT_TEST_SUITE_REGISTRATION( JlibCompressionStandardTest );
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibCompressionStandardTest, "JlibCompressionStandardTest" );


class JlibCompressionTestsStress : public JlibCompressionTestBase
{
CPPUNIT_TEST_SUITE(JlibCompressionTestsStress);
CPPUNIT_TEST(test);
CPPUNIT_TEST_SUITE_END();

public:
void test()
{
try
{
MemoryBuffer src;
size32_t rowSz = 0;
initCompressionBuffer(src, rowSz);

DBGLOG("Algorithm(options) || Comp(ms) || Deco(ms) || 200MB/s (w,r) || 1GB/s (w,r) || 5GB/s (w,r) || Ratio [cLen]");
DBGLOG(" || || || 2Gb/s || 10Gb/s || 50Gb/s ||");

unsigned time200MBs = transferTimeMs(sz, 200000000);
unsigned time1GBs = transferTimeMs(sz, 1000000000);
unsigned time5GBs = transferTimeMs(sz, 5000000000);
DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", "uncompressed", 0, 0,
time200MBs, time200MBs, time200MBs, time1GBs, time1GBs, time1GBs, time5GBs, time5GBs, time5GBs, 1.0, sz);

Owned<ICompressHandlerIterator> iter = getCompressHandlerIterator();
ForEach(*iter)
{
ICompressHandler &handler = iter->query();
const char * type = handler.queryType();
//Ignore unusual compressors with no expanders...
if (strieq(type, "randrow"))
continue;
const char * options = streq("AES", handler.queryType()) ? aesKey: "";
if (streq(type, "LZ4HC"))
{
testCompressor(handler, "hclevel=3", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=4", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=5", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=6", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=8", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=10", rowSz, src.length(), src.bytes(), RowCompress);
}
testCompressor(handler, options, rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, options, rowSz, src.length(), src.bytes(), CompressToBuffer);
if (streq(type, "LZ4"))
{
testCompressor(handler, "allrow", rowSz, src.length(), src.bytes(), AllRowCompress); // block doesn't affect the compressor, just tracing
testCompressor(handler, "block", rowSz, src.length(), src.bytes(), BlockCompress); // block doesn't affect the compressor, just tracing
}
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
throw;
}
}

};

CPPUNIT_TEST_SUITE_REGISTRATION( JlibCompressionTestsStress );
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibCompressionTestsStress, "JlibCompressionTestsStress" );


//--------------------------------------------------------------------------------------------------------------------


class JlibFriendlySizeTest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(JlibFriendlySizeTest);
Expand Down

0 comments on commit 231688e

Please sign in to comment.