Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
  • Loading branch information
richardkchapman committed Nov 13, 2024
1 parent 1f13268 commit bb904ee
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 57 deletions.
21 changes: 15 additions & 6 deletions system/jhtree/ctfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "jmisc.hpp"
#include "hlzw.h"
#include "jlz4.hpp"

#include "ctfile.hpp"
#include "jstats.h"
Expand Down Expand Up @@ -369,8 +370,10 @@ const void *CPOCWriteNode::getLastKeyValue() const

//=========================================================================================================

CLegacyWriteNode::CLegacyWriteNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode) : CWriteNode(_fpos, _keyHdr, isLeafNode)
CLegacyWriteNode::CLegacyWriteNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode, bool LZ4) : CWriteNode(_fpos, _keyHdr, isLeafNode)
{
if (LZ4)
hdr.compressionType = LZ4Compression;
keyLen = keyHdr->getMaxKeyLength();
if (!isLeafNode)
{
Expand Down Expand Up @@ -399,7 +402,7 @@ bool CLegacyWriteNode::add(offset_t pos, const void *indata, size32_t insize, un
if (isLeaf() && (keyType & HTREE_COMPRESSED_KEY))
{
if (0 == hdr.numKeys)
lzwcomp.open(keyPtr, maxBytes-hdr.keyBytes, keyHdr->isVariable(), (keyType&HTREE_QUICK_COMPRESSED_KEY)==HTREE_QUICK_COMPRESSED_KEY);
lzwcomp.open(keyPtr, maxBytes-hdr.keyBytes, keyHdr->isVariable(), (keyType&HTREE_QUICK_COMPRESSED_KEY)==HTREE_QUICK_COMPRESSED_KEY, hdr.compressionType==LZ4Compression);
if (0xffff == hdr.numKeys || 0 == lzwcomp.writekey(pos, (const char *)indata, insize))
{
lzwcomp.close();
Expand Down Expand Up @@ -475,10 +478,12 @@ size32_t CLegacyWriteNode::compressValue(const char *keyData, size32_t size, cha

//=========================================================================================================

CBlobWriteNode::CBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr, bool) : CWriteNodeBase(_fpos, _keyHdr)
CBlobWriteNode::CBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr, bool useLZ4) : CWriteNodeBase(_fpos, _keyHdr)
{
hdr.nodeType = NodeBlob;
lzwcomp.openBlob(keyPtr, maxBytes);
if (useLZ4)
hdr.compressionType = LZ4Compression;
lzwcomp.openBlob(keyPtr, maxBytes, useLZ4);
}

CBlobWriteNode::~CBlobWriteNode()
Expand Down Expand Up @@ -638,9 +643,13 @@ void *CJHTreeNode::allocMem(size32_t len)
return ret;
}

char *CJHTreeNode::expandData(const void *src,size32_t &retsize)
char *CJHTreeNode::expandData(const void *src, size32_t &retsize)
{
Owned<IExpander> exp = createLZWExpander(true);
Owned<IExpander> exp;
if (hdr.compressionType==LegacyCompression)
exp.setown(createLZWExpander(true));
else
exp.setown(createLZ4Expander());
int len=exp->init(src);
if (len==0)
{
Expand Down
7 changes: 4 additions & 3 deletions system/jhtree/ctfile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ enum CompressionType : byte
// Additional compression formats can be added here...
SplitPayload = 1, // A proof-of-concept using separate compression blocks for keyed fields vs payload
InplaceCompression = 2,
LZ4Compression = 3, // Legacy formats, just different compressor
};

//#pragma pack(1)
Expand Down Expand Up @@ -250,7 +251,7 @@ class jhtree_decl CJHTreeNode : public CNodeBase
size32_t expandedSize = 0;
char *keyBuf = nullptr;

static char *expandData(const void *src,size32_t &retsize);
char *expandData(const void *src,size32_t &retsize);
static void releaseMem(void *togo, size32_t size);
static void *allocMem(size32_t size);

Expand Down Expand Up @@ -464,7 +465,7 @@ class jhtree_decl CLegacyWriteNode : public CWriteNode

size32_t compressValue(const char *keyData, size32_t size, char *result);
public:
CLegacyWriteNode(offset_t fpos, CKeyHdr *keyHdr, bool isLeafNode);
CLegacyWriteNode(offset_t fpos, CKeyHdr *keyHdr, bool isLeafNode, bool useLZ4);
~CLegacyWriteNode();

virtual void write(IFileIOStream *, CRC32 *crc) override;
Expand Down Expand Up @@ -512,7 +513,7 @@ interface IIndexCompressor : public IInterface
{
virtual const char *queryName() const = 0;
virtual CWriteNode *createNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode) const = 0;
virtual CWriteNode *createBlobNode(offset_t _fpos, CKeyHdr *_keyHdr) const = 0;
virtual CBlobWriteNode *createBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) const = 0;
virtual offset_t queryBranchMemorySize() const = 0;
virtual offset_t queryLeafMemorySize() const = 0;
};
Expand Down
129 changes: 89 additions & 40 deletions system/jhtree/hlzw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "jmisc.hpp"
#include "hlzw.h"
#include "jlz4.hpp"

KeyCompressor::~KeyCompressor()
{
Expand All @@ -43,7 +44,7 @@ KeyCompressor::~KeyCompressor()
}
}

void KeyCompressor::open(void *blk,int blksize,bool _isVariable, bool rowcompression)
void KeyCompressor::open(void *blk,int blksize, bool _isVariable, bool rowcompression, bool LZ4)
{
isVariable = _isVariable;
isBlob = false;
Expand All @@ -56,6 +57,8 @@ void KeyCompressor::open(void *blk,int blksize,bool _isVariable, bool rowcompres
else
comp = createRDiffCompressor();
}
else if (LZ4)
comp = createLZ4Compressor(nullptr, false);
else
comp = createLZWCompressor(true);
comp->open(blk,blksize);
Expand All @@ -74,15 +77,21 @@ void KeyCompressor::open(void *blk,int blksize, ICompressHandler * compressionHa
fixedRowSize = _fixedRowSize;
}

void KeyCompressor::openBlob(void *blk,int blksize)
void KeyCompressor::openBlob(void *blk,int blksize, bool LZ4)
{
isVariable = false;
isBlob = true;
curOffset = 0;
::Release(comp);
comp = NULL;
comp = createLZWCompressor(true);
comp->open(blk,blksize);
comp = LZ4 ? createLZ4Compressor(nullptr, false) : createLZWCompressor(true);
if (!comp->supportsIncrementalCompression())
{
bufp = blk;
bufl = blksize;
}
else
comp->open(blk,blksize);
method = comp->getCompressionMethod();
}

Expand Down Expand Up @@ -177,52 +186,88 @@ unsigned KeyCompressor::writeBlob(const char *data, unsigned datalength)
assert(datalength);
if (!comp)
return 0;


unsigned originalOffset = curOffset;
comp->startblock(); // start transaction
char zero = 0;
while (curOffset & 0xf) // We only have 16 bits to store the blob offset in the blobId, so we 16-byte align them which gives us another 8 bits
if (comp->supportsIncrementalCompression())
{
if (comp->write(&zero,sizeof(zero))!=sizeof(zero)) {
unsigned originalOffset = curOffset;
comp->startblock(); // start transaction
char zero = 0;
while (curOffset & 0xf) // We only have 16 bits to store the blob offset in the blobId, so we 16-byte align them which gives us another 8 bits
{
if (comp->write(&zero,sizeof(zero))!=sizeof(zero)) {
close();
curOffset = originalOffset;
return 0;
}
curOffset++;
}

unsigned rdatalength = datalength;
_WINREV(rdatalength);
if (comp->write(&rdatalength, sizeof(rdatalength))!=sizeof(rdatalength)) {
close();
curOffset = originalOffset;
return 0;
}
curOffset++;
}
curOffset += sizeof(datalength);

unsigned rdatalength = datalength;
_WINREV(rdatalength);
if (comp->write(&rdatalength, sizeof(rdatalength))!=sizeof(rdatalength)) {
close();
curOffset = originalOffset;
return 0;
unsigned written = 0;
while (written < datalength && curOffset < 0x100000) // curOffset must not go over 24 bits (see above)
{
if (comp->write(data,sizeof(*data))!=sizeof(*data))
{
if (!written)
{
close();
curOffset = originalOffset;
return 0; // only room to put the length - don't!
}
break;
}
curOffset++;
written++;
data++;
comp->startblock();
}
comp->commitblock();
if (written != datalength)
close();
return written;
}
curOffset += sizeof(datalength);

unsigned written = 0;
while (written < datalength && curOffset < 0x100000) // curOffset must not go over 24 bits (see above)
else
{
if (comp->write(data,sizeof(*data))!=sizeof(*data))
assertex(uncompressed.length() == curOffset);
unsigned originalOffset = curOffset;
char zero = 0;
while (curOffset & 0xf) // We only have 16 bits to store the blob offset in the blobId, so we 16-byte align them which gives us another 8 bits
{
if (!written)
uncompressed.append(zero);
curOffset++;
}
unsigned rdatalength = datalength;
_WINREV(rdatalength);
uncompressed.append(rdatalength);
unsigned written = 0;
while (datalength && curOffset < 0x100000) // curOffset must not go over 24 bits (see above)
{
unsigned bytesnow = datalength > 128 ? 128 : datalength;
uncompressed.append(bytesnow, data);
comp->Release(); comp = createLZ4Compressor(nullptr, false);
size32_t compressed = comp->compressBlock(bufl, bufp, uncompressed.length(), uncompressed.toByteArray());
if (!compressed)
{
close();
uncompressed.setLength(originalOffset);
curOffset = originalOffset;
return 0; // only room to put the length - don't!
break;
}
break;
bufl = compressed;
written += bytesnow;
data += bytesnow;
curOffset = uncompressed.length();
originalOffset = curOffset;
datalength -= bytesnow;
}
curOffset++;
written++;
data++;
comp->startblock();
return written;
}
comp->commitblock();
if (written != datalength)
close();
return written;
}


Expand All @@ -233,10 +278,14 @@ bool KeyCompressor::adjustLimit(size32_t newLimit)

void KeyCompressor::close()
{ // gets called either when write failed or explicitly by client
if (comp!=NULL) {
comp->close();
bufp = comp->bufptr();
bufl = comp->buflen();
if (comp!=NULL)
{
if (comp->supportsIncrementalCompression())
{
comp->close();
bufp = comp->bufptr();
bufl = comp->buflen();
}
comp->Release();
comp = NULL;
}
Expand Down
5 changes: 3 additions & 2 deletions system/jhtree/hlzw.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ class KeyCompressor final
public:
KeyCompressor() {}
~KeyCompressor();
void open(void *blk,int blksize, bool isVariable, bool rowcompression);
void open(void *blk,int blksize, bool isVariable, bool rowcompression, bool LZ4);
void open(void *blk,int blksize, ICompressHandler * compressionHandler, const char * options, bool _isVariable, size32_t fixedRowSize);

int writekey(offset_t fPtr, const char *key, unsigned datalength);
bool write(const void * data, size32_t datalength);

bool compressBlock(size32_t destSize, void * dest, size32_t srcSize, const void * src, ICompressHandler * compressionHandler, const char * options, bool isVariable, size32_t fixedSize);

void openBlob(void *blk,int blksize);
void openBlob(void *blk,int blksize, bool LZ4);
unsigned writeBlob(const char *data, unsigned datalength);
void close();
bool adjustLimit(size32_t newLimit);
Expand All @@ -49,6 +49,7 @@ class KeyCompressor final

protected:
ICompressor *comp = nullptr;
MemoryBuffer uncompressed;
void *bufp = nullptr;
unsigned curOffset = 0;
size32_t fixedRowSize = 0;
Expand Down
5 changes: 4 additions & 1 deletion system/jhtree/jhinplace.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,10 @@ class InplaceIndexCompressor : public CInterfaceOf<IIndexCompressor>
else
return new CInplaceBranchWriteNode(_fpos, _keyHdr, ctx);
}

virtual CBlobWriteNode *createBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) const override
{
return new CBlobWriteNode(_fpos, _keyHdr, false);
}
virtual offset_t queryBranchMemorySize() const override
{
return ctx.branchMemorySize;
Expand Down
4 changes: 1 addition & 3 deletions system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,7 @@ const CJHTreeNode *CDiskKeyIndex::loadNode(cycle_t * fetchCycles, offset_t pos,

CJHTreeNode *CKeyIndex::_createNode(const NodeHdr &nodeHdr) const
{
if (nodeHdr.compressionType == LegacyCompression)
if (nodeHdr.compressionType == LegacyCompression || nodeHdr.compressionType == LZ4Compression)
{
switch(nodeHdr.nodeType)
{
Expand Down Expand Up @@ -1340,8 +1340,6 @@ CJHTreeNode *CKeyIndex::_createNode(const NodeHdr &nodeHdr) const
return new CJHInplaceLeafNode();
if (nodeHdr.nodeType == NodeBranch)
return new CJHInplaceBranchNode();
if (nodeHdr.nodeType == NodeBlob)
return new CJHTreeBlobNode();
UNIMPLEMENTED;
default:
throwUnexpected();
Expand Down
12 changes: 10 additions & 2 deletions system/jhtree/keybuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ class PocIndexCompressor : public CInterfaceOf<IIndexCompressor>
if (isLeafNode)
return new CPOCWriteNode(_fpos, _keyHdr, isLeafNode);
else
return new CLegacyWriteNode(_fpos, _keyHdr, isLeafNode);
return new CLegacyWriteNode(_fpos, _keyHdr, isLeafNode, false);
}
virtual CBlobWriteNode *createBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) const override
{
return new CBlobWriteNode(_fpos, _keyHdr, false);
}
virtual offset_t queryBranchMemorySize() const override
{
Expand All @@ -100,7 +104,11 @@ class LegacyIndexCompressor : public CInterfaceOf<IIndexCompressor>
virtual const char *queryName() const override { return "Legacy"; }
virtual CWriteNode *createNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode) const override
{
return new CLegacyWriteNode(_fpos, _keyHdr, isLeafNode);
return new CLegacyWriteNode(_fpos, _keyHdr, isLeafNode, false);
}
virtual CBlobWriteNode *createBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) const override
{
return new CBlobWriteNode(_fpos, _keyHdr, false);
}
virtual offset_t queryBranchMemorySize() const override
{
Expand Down

0 comments on commit bb904ee

Please sign in to comment.