Skip to content

Commit

Permalink
Merge pull request #18758 from richardkchapman/rwlocks
Browse files Browse the repository at this point in the history
HPCC-32144 Investigate using std::shared_mutex for read-write locks

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
  • Loading branch information
ghalliday authored Jul 24, 2024
2 parents 4b3d714 + da0f608 commit c556345
Showing 1 changed file with 200 additions and 0 deletions.
200 changes: 200 additions & 0 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3116,7 +3116,83 @@ CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(JlibIPTTest, "JlibIPTTest");

#include "jdebug.hpp"
#include "jmutex.hpp"
#include <shared_mutex>

class SReadWriteLock
{
public:
void lockRead() { mutex.lock_shared(); }
void lockWrite() { mutex.lock(); writelocked = true; }
bool lockRead(unsigned timeout);
bool lockWrite(unsigned timeout);
void unlock() { if (writelocked) unlockWrite(); else unlockRead(); }
void unlockRead() { mutex.unlock_shared(); }
void unlockWrite() { writelocked = false; mutex.unlock(); }
bool queryWriteLocked() { return writelocked; };

protected:
std::shared_mutex mutex;
bool writelocked = false;
};

bool SReadWriteLock::lockRead(unsigned timeout)
{
if (timeout == (unsigned)-1)
{
lockRead();
return true;
}
// std::chrono::milliseconds ms(timeout);
return false;//mutex.try_lock_shared_for(ms);
}

bool SReadWriteLock::lockWrite(unsigned timeout)
{
if (timeout == (unsigned)-1)
{
lockWrite();
return true;
}
return false;
// std::chrono::milliseconds ms(timeout);
//return mutex.try_lock_for(ms);
}

class SReadLockBlock
{
SReadWriteLock *lock;
public:
SReadLockBlock(SReadWriteLock &l) : lock(&l) { lock->lockRead(); }
~SReadLockBlock() { if (lock) lock->unlockRead(); }
void clear()
{
if (lock)
{
lock->unlockRead();
lock = NULL;
}
}
};

class SWriteLockBlock
{
SReadWriteLock *lock;
public:
SWriteLockBlock(SReadWriteLock &l) : lock(&l) { lock->lockWrite(); }
~SWriteLockBlock() { if (lock) lock->unlockWrite(); }
void clear()
{
if (lock)
{
lock->unlockWrite();
lock = NULL;
}
}
};

#define ReadWriteLock SReadWriteLock
#define ReadLockBlock SReadLockBlock
#define WriteLockBlock SWriteLockBlock

class AtomicTimingStressTest : public CppUnit::TestFixture
{
Expand Down Expand Up @@ -3411,6 +3487,130 @@ class MachineInfoTimingTest : public CppUnit::TestFixture
CPPUNIT_TEST_SUITE_REGISTRATION(MachineInfoTimingTest);
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(MachineInfoTimingTest, "MachineInfoTimingTest");

class RWLockStressTest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(RWLockStressTest);
CPPUNIT_TEST(runAllTests);
CPPUNIT_TEST_SUITE_END();

public:

class RWLockReadTestThread : public Thread
{
public:
RWLockReadTestThread(Semaphore & _startSem, Semaphore & _endSem, ReadWriteLock & _lock, unsigned __int64 &_value, unsigned _numIterations)
: startSem(_startSem), endSem(_endSem),
lock(_lock),
value(_value),
numIterations(_numIterations)
{
}

virtual void execute()
{
{
ReadLockBlock block(lock);
value++;
}
}

virtual int run()
{
startSem.wait();
for (unsigned i = 0; i < numIterations; i++)
execute();
endSem.signal();
return 0;
}

protected:
Semaphore & startSem;
Semaphore & endSem;
ReadWriteLock &lock;
unsigned __int64 & value;
const unsigned numIterations;
};

class RWLockWriteTestThread : public Thread
{
public:
RWLockWriteTestThread(bool &_finished, ReadWriteLock & _lock, unsigned __int64 &_value)
: finished (_finished),
lock(_lock),
value(_value)
{
}

virtual void execute()
{
{
WriteLockBlock block(lock);
value -= 5;

}
}
virtual int run()
{
while (!finished)
execute();
return 0;
}

protected:
ReadWriteLock &lock;
unsigned __int64 & value;
bool &finished;
};

unsigned __int64 run(const char * title, unsigned numThreads, unsigned numWriteThreads, unsigned numIterations)
{
IArrayOf<Thread> threads;
Semaphore startSem;
Semaphore endSem;
ReadWriteLock lock;
unsigned __int64 value;

value = 0;
bool finished = false;
for (unsigned i = 0; i < numThreads; i++)
{
RWLockReadTestThread * next = new RWLockReadTestThread(startSem, endSem, lock, value, numIterations);
threads.append(*next);
next->start(false);
}
for (unsigned i = 0; i < numWriteThreads; i++)
{
RWLockWriteTestThread * next = new RWLockWriteTestThread(finished, lock, value);
threads.append(*next);
next->start(false);
}
cycle_t startCycles = get_cycles_now();
startSem.signal(numThreads);
for (unsigned i2 = 0; i2 < numThreads; i2++)
endSem.wait();
cycle_t endCycles = get_cycles_now();
finished = true;
unsigned __int64 expected = (unsigned __int64)numIterations * numThreads;
unsigned __int64 averageTime = cycle_to_nanosec(endCycles - startCycles) / (numIterations * numThreads);
DBGLOG("%s@%u/%u threads(%u) %" I64F "uns/iteration lost(%" I64F "d)", title, 1, 1, numThreads, averageTime, expected - value);
for (unsigned i3 = 0; i3 < numThreads+numWriteThreads; i3++)
threads.item(i3).join();
return averageTime;
}

const unsigned numIterations = 100000;
const unsigned numCores = std::max(getAffinityCpus(), 16U);
void runAllTests()
{
run("10:1", 10, 1, numIterations);
run("20:1", 20, 1, numIterations);
run("10:10", 10, 10, numIterations);
}

};

CPPUNIT_TEST_SUITE_REGISTRATION(RWLockStressTest);
CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(RWLockStressTest, "RWLockStressTest");



Expand Down

0 comments on commit c556345

Please sign in to comment.