From bb904eea6c10c23e0511c594ec1651cde8231673 Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 13 Nov 2024 15:16:30 +0000 Subject: [PATCH] WIP Signed-off-by: Richard Chapman --- system/jhtree/ctfile.cpp | 21 ++++-- system/jhtree/ctfile.hpp | 7 +- system/jhtree/hlzw.cpp | 129 +++++++++++++++++++++++++----------- system/jhtree/hlzw.h | 5 +- system/jhtree/jhinplace.hpp | 5 +- system/jhtree/jhtree.cpp | 4 +- system/jhtree/keybuild.cpp | 12 +++- 7 files changed, 126 insertions(+), 57 deletions(-) diff --git a/system/jhtree/ctfile.cpp b/system/jhtree/ctfile.cpp index b14eb840768..bdf4a6e19d9 100644 --- a/system/jhtree/ctfile.cpp +++ b/system/jhtree/ctfile.cpp @@ -25,6 +25,7 @@ #include "jmisc.hpp" #include "hlzw.h" +#include "jlz4.hpp" #include "ctfile.hpp" #include "jstats.h" @@ -369,8 +370,10 @@ const void *CPOCWriteNode::getLastKeyValue() const //========================================================================================================= -CLegacyWriteNode::CLegacyWriteNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode) : CWriteNode(_fpos, _keyHdr, isLeafNode) +CLegacyWriteNode::CLegacyWriteNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode, bool LZ4) : CWriteNode(_fpos, _keyHdr, isLeafNode) { + if (LZ4) + hdr.compressionType = LZ4Compression; keyLen = keyHdr->getMaxKeyLength(); if (!isLeafNode) { @@ -399,7 +402,7 @@ bool CLegacyWriteNode::add(offset_t pos, const void *indata, size32_t insize, un if (isLeaf() && (keyType & HTREE_COMPRESSED_KEY)) { if (0 == hdr.numKeys) - lzwcomp.open(keyPtr, maxBytes-hdr.keyBytes, keyHdr->isVariable(), (keyType&HTREE_QUICK_COMPRESSED_KEY)==HTREE_QUICK_COMPRESSED_KEY); + lzwcomp.open(keyPtr, maxBytes-hdr.keyBytes, keyHdr->isVariable(), (keyType&HTREE_QUICK_COMPRESSED_KEY)==HTREE_QUICK_COMPRESSED_KEY, hdr.compressionType==LZ4Compression); if (0xffff == hdr.numKeys || 0 == lzwcomp.writekey(pos, (const char *)indata, insize)) { lzwcomp.close(); @@ -475,10 +478,12 @@ size32_t CLegacyWriteNode::compressValue(const char *keyData, size32_t size, cha //========================================================================================================= -CBlobWriteNode::CBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr, bool) : CWriteNodeBase(_fpos, _keyHdr) +CBlobWriteNode::CBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr, bool useLZ4) : CWriteNodeBase(_fpos, _keyHdr) { hdr.nodeType = NodeBlob; - lzwcomp.openBlob(keyPtr, maxBytes); + if (useLZ4) + hdr.compressionType = LZ4Compression; + lzwcomp.openBlob(keyPtr, maxBytes, useLZ4); } CBlobWriteNode::~CBlobWriteNode() @@ -638,9 +643,13 @@ void *CJHTreeNode::allocMem(size32_t len) return ret; } -char *CJHTreeNode::expandData(const void *src,size32_t &retsize) +char *CJHTreeNode::expandData(const void *src, size32_t &retsize) { - Owned exp = createLZWExpander(true); + Owned exp; + if (hdr.compressionType==LegacyCompression) + exp.setown(createLZWExpander(true)); + else + exp.setown(createLZ4Expander()); int len=exp->init(src); if (len==0) { diff --git a/system/jhtree/ctfile.hpp b/system/jhtree/ctfile.hpp index 1f0b1e2455d..6d0dbd1674f 100644 --- a/system/jhtree/ctfile.hpp +++ b/system/jhtree/ctfile.hpp @@ -121,6 +121,7 @@ enum CompressionType : byte // Additional compression formats can be added here... SplitPayload = 1, // A proof-of-concept using separate compression blocks for keyed fields vs payload InplaceCompression = 2, + LZ4Compression = 3, // Legacy formats, just different compressor }; //#pragma pack(1) @@ -250,7 +251,7 @@ class jhtree_decl CJHTreeNode : public CNodeBase size32_t expandedSize = 0; char *keyBuf = nullptr; - static char *expandData(const void *src,size32_t &retsize); + char *expandData(const void *src,size32_t &retsize); static void releaseMem(void *togo, size32_t size); static void *allocMem(size32_t size); @@ -464,7 +465,7 @@ class jhtree_decl CLegacyWriteNode : public CWriteNode size32_t compressValue(const char *keyData, size32_t size, char *result); public: - CLegacyWriteNode(offset_t fpos, CKeyHdr *keyHdr, bool isLeafNode); + CLegacyWriteNode(offset_t fpos, CKeyHdr *keyHdr, bool isLeafNode, bool useLZ4); ~CLegacyWriteNode(); virtual void write(IFileIOStream *, CRC32 *crc) override; @@ -512,7 +513,7 @@ interface IIndexCompressor : public IInterface { virtual const char *queryName() const = 0; virtual CWriteNode *createNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode) const = 0; - virtual CWriteNode *createBlobNode(offset_t _fpos, CKeyHdr *_keyHdr) const = 0; + virtual CBlobWriteNode *createBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) const = 0; virtual offset_t queryBranchMemorySize() const = 0; virtual offset_t queryLeafMemorySize() const = 0; }; diff --git a/system/jhtree/hlzw.cpp b/system/jhtree/hlzw.cpp index 6d77861dd2e..84ff111cb3d 100644 --- a/system/jhtree/hlzw.cpp +++ b/system/jhtree/hlzw.cpp @@ -32,6 +32,7 @@ #include "jmisc.hpp" #include "hlzw.h" +#include "jlz4.hpp" KeyCompressor::~KeyCompressor() { @@ -43,7 +44,7 @@ KeyCompressor::~KeyCompressor() } } -void KeyCompressor::open(void *blk,int blksize,bool _isVariable, bool rowcompression) +void KeyCompressor::open(void *blk,int blksize, bool _isVariable, bool rowcompression, bool LZ4) { isVariable = _isVariable; isBlob = false; @@ -56,6 +57,8 @@ void KeyCompressor::open(void *blk,int blksize,bool _isVariable, bool rowcompres else comp = createRDiffCompressor(); } + else if (LZ4) + comp = createLZ4Compressor(nullptr, false); else comp = createLZWCompressor(true); comp->open(blk,blksize); @@ -74,15 +77,21 @@ void KeyCompressor::open(void *blk,int blksize, ICompressHandler * compressionHa fixedRowSize = _fixedRowSize; } -void KeyCompressor::openBlob(void *blk,int blksize) +void KeyCompressor::openBlob(void *blk,int blksize, bool LZ4) { isVariable = false; isBlob = true; curOffset = 0; ::Release(comp); comp = NULL; - comp = createLZWCompressor(true); - comp->open(blk,blksize); + comp = LZ4 ? createLZ4Compressor(nullptr, false) : createLZWCompressor(true); + if (!comp->supportsIncrementalCompression()) + { + bufp = blk; + bufl = blksize; + } + else + comp->open(blk,blksize); method = comp->getCompressionMethod(); } @@ -177,52 +186,88 @@ unsigned KeyCompressor::writeBlob(const char *data, unsigned datalength) assert(datalength); if (!comp) return 0; - - - unsigned originalOffset = curOffset; - comp->startblock(); // start transaction - char zero = 0; - while (curOffset & 0xf) // We only have 16 bits to store the blob offset in the blobId, so we 16-byte align them which gives us another 8 bits + if (comp->supportsIncrementalCompression()) { - if (comp->write(&zero,sizeof(zero))!=sizeof(zero)) { + unsigned originalOffset = curOffset; + comp->startblock(); // start transaction + char zero = 0; + while (curOffset & 0xf) // We only have 16 bits to store the blob offset in the blobId, so we 16-byte align them which gives us another 8 bits + { + if (comp->write(&zero,sizeof(zero))!=sizeof(zero)) { + close(); + curOffset = originalOffset; + return 0; + } + curOffset++; + } + + unsigned rdatalength = datalength; + _WINREV(rdatalength); + if (comp->write(&rdatalength, sizeof(rdatalength))!=sizeof(rdatalength)) { close(); curOffset = originalOffset; return 0; } - curOffset++; - } + curOffset += sizeof(datalength); - unsigned rdatalength = datalength; - _WINREV(rdatalength); - if (comp->write(&rdatalength, sizeof(rdatalength))!=sizeof(rdatalength)) { - close(); - curOffset = originalOffset; - return 0; + unsigned written = 0; + while (written < datalength && curOffset < 0x100000) // curOffset must not go over 24 bits (see above) + { + if (comp->write(data,sizeof(*data))!=sizeof(*data)) + { + if (!written) + { + close(); + curOffset = originalOffset; + return 0; // only room to put the length - don't! + } + break; + } + curOffset++; + written++; + data++; + comp->startblock(); + } + comp->commitblock(); + if (written != datalength) + close(); + return written; } - curOffset += sizeof(datalength); - - unsigned written = 0; - while (written < datalength && curOffset < 0x100000) // curOffset must not go over 24 bits (see above) + else { - if (comp->write(data,sizeof(*data))!=sizeof(*data)) + assertex(uncompressed.length() == curOffset); + unsigned originalOffset = curOffset; + char zero = 0; + while (curOffset & 0xf) // We only have 16 bits to store the blob offset in the blobId, so we 16-byte align them which gives us another 8 bits { - if (!written) + uncompressed.append(zero); + curOffset++; + } + unsigned rdatalength = datalength; + _WINREV(rdatalength); + uncompressed.append(rdatalength); + unsigned written = 0; + while (datalength && curOffset < 0x100000) // curOffset must not go over 24 bits (see above) + { + unsigned bytesnow = datalength > 128 ? 128 : datalength; + uncompressed.append(bytesnow, data); + comp->Release(); comp = createLZ4Compressor(nullptr, false); + size32_t compressed = comp->compressBlock(bufl, bufp, uncompressed.length(), uncompressed.toByteArray()); + if (!compressed) { - close(); + uncompressed.setLength(originalOffset); curOffset = originalOffset; - return 0; // only room to put the length - don't! + break; } - break; + bufl = compressed; + written += bytesnow; + data += bytesnow; + curOffset = uncompressed.length(); + originalOffset = curOffset; + datalength -= bytesnow; } - curOffset++; - written++; - data++; - comp->startblock(); + return written; } - comp->commitblock(); - if (written != datalength) - close(); - return written; } @@ -233,10 +278,14 @@ bool KeyCompressor::adjustLimit(size32_t newLimit) void KeyCompressor::close() { // gets called either when write failed or explicitly by client - if (comp!=NULL) { - comp->close(); - bufp = comp->bufptr(); - bufl = comp->buflen(); + if (comp!=NULL) + { + if (comp->supportsIncrementalCompression()) + { + comp->close(); + bufp = comp->bufptr(); + bufl = comp->buflen(); + } comp->Release(); comp = NULL; } diff --git a/system/jhtree/hlzw.h b/system/jhtree/hlzw.h index 7f09267f6d4..94bf36fb332 100644 --- a/system/jhtree/hlzw.h +++ b/system/jhtree/hlzw.h @@ -29,7 +29,7 @@ class KeyCompressor final public: KeyCompressor() {} ~KeyCompressor(); - void open(void *blk,int blksize, bool isVariable, bool rowcompression); + void open(void *blk,int blksize, bool isVariable, bool rowcompression, bool LZ4); void open(void *blk,int blksize, ICompressHandler * compressionHandler, const char * options, bool _isVariable, size32_t fixedRowSize); int writekey(offset_t fPtr, const char *key, unsigned datalength); @@ -37,7 +37,7 @@ class KeyCompressor final bool compressBlock(size32_t destSize, void * dest, size32_t srcSize, const void * src, ICompressHandler * compressionHandler, const char * options, bool isVariable, size32_t fixedSize); - void openBlob(void *blk,int blksize); + void openBlob(void *blk,int blksize, bool LZ4); unsigned writeBlob(const char *data, unsigned datalength); void close(); bool adjustLimit(size32_t newLimit); @@ -49,6 +49,7 @@ class KeyCompressor final protected: ICompressor *comp = nullptr; + MemoryBuffer uncompressed; void *bufp = nullptr; unsigned curOffset = 0; size32_t fixedRowSize = 0; diff --git a/system/jhtree/jhinplace.hpp b/system/jhtree/jhinplace.hpp index 4718ef3259a..bb9c8ee584c 100644 --- a/system/jhtree/jhinplace.hpp +++ b/system/jhtree/jhinplace.hpp @@ -322,7 +322,10 @@ class InplaceIndexCompressor : public CInterfaceOf else return new CInplaceBranchWriteNode(_fpos, _keyHdr, ctx); } - + virtual CBlobWriteNode *createBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) const override + { + return new CBlobWriteNode(_fpos, _keyHdr, false); + } virtual offset_t queryBranchMemorySize() const override { return ctx.branchMemorySize; diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index d8bcf5008b3..19ac4a21802 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -1305,7 +1305,7 @@ const CJHTreeNode *CDiskKeyIndex::loadNode(cycle_t * fetchCycles, offset_t pos, CJHTreeNode *CKeyIndex::_createNode(const NodeHdr &nodeHdr) const { - if (nodeHdr.compressionType == LegacyCompression) + if (nodeHdr.compressionType == LegacyCompression || nodeHdr.compressionType == LZ4Compression) { switch(nodeHdr.nodeType) { @@ -1340,8 +1340,6 @@ CJHTreeNode *CKeyIndex::_createNode(const NodeHdr &nodeHdr) const return new CJHInplaceLeafNode(); if (nodeHdr.nodeType == NodeBranch) return new CJHInplaceBranchNode(); - if (nodeHdr.nodeType == NodeBlob) - return new CJHTreeBlobNode(); UNIMPLEMENTED; default: throwUnexpected(); diff --git a/system/jhtree/keybuild.cpp b/system/jhtree/keybuild.cpp index 79765f54544..898743d90f7 100644 --- a/system/jhtree/keybuild.cpp +++ b/system/jhtree/keybuild.cpp @@ -83,7 +83,11 @@ class PocIndexCompressor : public CInterfaceOf if (isLeafNode) return new CPOCWriteNode(_fpos, _keyHdr, isLeafNode); else - return new CLegacyWriteNode(_fpos, _keyHdr, isLeafNode); + return new CLegacyWriteNode(_fpos, _keyHdr, isLeafNode, false); + } + virtual CBlobWriteNode *createBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) const override + { + return new CBlobWriteNode(_fpos, _keyHdr, false); } virtual offset_t queryBranchMemorySize() const override { @@ -100,7 +104,11 @@ class LegacyIndexCompressor : public CInterfaceOf virtual const char *queryName() const override { return "Legacy"; } virtual CWriteNode *createNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode) const override { - return new CLegacyWriteNode(_fpos, _keyHdr, isLeafNode); + return new CLegacyWriteNode(_fpos, _keyHdr, isLeafNode, false); + } + virtual CBlobWriteNode *createBlobWriteNode(offset_t _fpos, CKeyHdr *_keyHdr) const override + { + return new CBlobWriteNode(_fpos, _keyHdr, false); } virtual offset_t queryBranchMemorySize() const override {