diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index ad2d42bdb60..de3093e6a4e 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -1272,6 +1272,10 @@ class CRowSet : public CSimpleInterface, implements IInterface { return rows.get(r); } + inline bool isFull() const + { + return rows.isFull(); + } }; class Chunk : public CInterface @@ -1395,6 +1399,7 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu Owned outputOwnedRows; CRowSet *rowSet; unsigned row, rowsInRowSet; + bool lastWasNull=false; // for sanity check only (there should never be two consequetive nulls) void init() { @@ -1464,6 +1469,15 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu } rowsRead++; const void *retrow = rowSet->getRow(row++); + if (lastWasNull) + { + if (retrow) + lastWasNull = false; + else + throw makeStringExceptionV(0, "Double null detected - output=%u, rowsRead = %" RCPF "u, rowsWritten = %" RCPF "u, writeAtEof = %s", output, rowsRead, parent.rowsWritten, boolToStr(parent.writeAtEof)); + } + else + lastWasNull = (retrow == nullptr); return retrow; } virtual void stop() @@ -1693,7 +1707,8 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu unsigned len=rowSize(row); CriticalBlock b(crit); bool paged = false; - if (totalOutChunkSize >= minChunkSize) // chunks required to be at least minChunkSize + // NB: The isFull condition ensures that we never expand inMemRows, which would cause a race with readers reading same set + if (totalOutChunkSize >= minChunkSize || inMemRows->isFull()) // chunks required to be at least minChunkSize, or if hits max capacity { unsigned reader=anyReaderBehind(); if (NotFound != reader) diff --git a/thorlcr/thorutil/thmem.hpp b/thorlcr/thorutil/thmem.hpp index a89d4cdd0f2..f642481f49b 100644 --- a/thorlcr/thorutil/thmem.hpp +++ b/thorlcr/thorutil/thmem.hpp @@ -327,7 +327,8 @@ class graph_decl CThorExpandingRowArray : public CSimpleInterface if (!resize(numRows+1)) return false; } - rows[numRows++] = row; + rows[numRows] = row; + numRows++; return true; } bool binaryInsert(const void *row, ICompare &compare, bool dropLast=false); // NB: takes ownership on success @@ -356,6 +357,7 @@ class graph_decl CThorExpandingRowArray : public CSimpleInterface } inline rowidx_t ordinality() const { return numRows; } inline rowidx_t queryMaxRows() const { return maxRows; } + inline bool isFull() const { return numRows >= maxRows; } inline const void **getRowArray() { return rows; } void swap(CThorExpandingRowArray &src);