Skip to content

Commit

Permalink
HPCC-33244 Extend BlockedTimeTracker to allow concurrent query load t…
Browse files Browse the repository at this point in the history
…o be estimated

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
  • Loading branch information
ghalliday committed Jan 15, 2025
1 parent 421e8fd commit 8a510e7
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 14 deletions.
63 changes: 52 additions & 11 deletions system/jlib/jdebug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,18 +667,34 @@ MODULE_EXIT()
//===========================================================================


void BlockedTimeTracker::noteWaiting()
//Calculate how much time has elapsed total - with the given number of threads still active
__uint64 BlockedTimeTracker::calcActiveTime(cycle_t tally, unsigned active) const
{
if (active != 0)
{
cycle_t now = get_cycles_now();
tally += active * now;
}

return cycle_to_nanosec(tally);
}

cycle_t BlockedTimeTracker::noteWaiting()
{
cycle_t now = get_cycles_now();
CriticalBlock block(cs);
numWaiting++;
timeStampTally -= get_cycles_now();
numStarted++;
timeStampTally -= now;
return now;
}

void BlockedTimeTracker::noteComplete()
cycle_t BlockedTimeTracker::noteComplete()
{
cycle_t now = get_cycles_now();
CriticalBlock block(cs);
numWaiting--;
timeStampTally += get_cycles_now();
numFinished++;
timeStampTally += now;
return now;
}

__uint64 BlockedTimeTracker::getWaitingNs() const
Expand All @@ -687,17 +703,42 @@ __uint64 BlockedTimeTracker::getWaitingNs() const
cycle_t tally;
{
CriticalBlock block(cs);
active = numWaiting;
active = numStarted - numFinished;
tally = timeStampTally;
}

if (active != 0)
return calcActiveTime(tally, active);
}


void BlockedTimeTracker::extractOverlapInfo(OverlapTimeInfo & info, bool isStart, bool increment)
{
unsigned started;
unsigned finished;
cycle_t tally;
{
cycle_t now = get_cycles_now();
tally += active * now;
CriticalBlock block(cs);
started = numStarted;
finished = numFinished;
tally = timeStampTally;
if (increment)
{
if (isStart)
{
numStarted++;
timeStampTally -= get_cycles_now();
}
else
{
numFinished++;
timeStampTally += get_cycles_now();
}
}
}

return cycle_to_nanosec(tally);
//Record so that when counts are subtracted, the total count will include all jobs that overlapped in any part
info.count = isStart ? finished : started;
info.elapsedNs = calcActiveTime(tally, started - finished);
}


Expand Down
38 changes: 35 additions & 3 deletions system/jlib/jdebug.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,21 +307,53 @@ Also since you are only ever interested in (sumEndTimestamps - sumStartTimestamp
There are two versions, one that uses a critical section, and a second that uses atomics, but is limited to the number
of active blocked items.
There is a second potential use for the BlockedTimeTracker class - for monitoring how many queries overlap the execution of the current query.
This can be calculated in the following way:
a) When a server query starts it calls
serverLoadTracker.extractInfo(startServer, true, true)
workerLoadTracker.extractInfo(startWorker, true, false);
b) When a server query finished it calls
serverLoadTracker.extractInfo(finishServer, true, true)
workerLoadTracker.extractInfo(finishWorker, true, false);
c) When a worker query starts it calls
workerLoadTracker.noteWaiting();
d) When a worker query finished it calls
workerLoadTracker.noteComplete();
* (finishX.count - startX.count) gives the total number of queries (including this one) that executed at the same time as the query
* (finishX.elapsedNs - startX.elapsedNs) gives the total execution time of all queries (including this one)
* This can be reported separately for the server and worker to give an estimate of the concurrent load when a query was running.
*/

struct OverlapTimeInfo
{
unsigned count;
stat_type elapsedNs;
};

class jlib_decl BlockedTimeTracker
{
public:
BlockedTimeTracker() = default;
BlockedTimeTracker(const BlockedTimeTracker &) = delete;

void noteWaiting();
void noteComplete();
//The following return get_cycles_now() - which can be used for tracking elapsed time.
cycle_t noteWaiting();
cycle_t noteComplete();
__uint64 getWaitingNs() const;

//A helper function to help calculate the overlapping load on the system
void extractOverlapInfo(OverlapTimeInfo & info, bool isStart, bool increment);

protected:
__uint64 calcActiveTime(cycle_t tally, unsigned active) const;

private:
mutable CriticalSection cs;
unsigned numWaiting = 0;
unsigned numStarted = 0;
unsigned numFinished = 0;
cycle_t timeStampTally = 0;
};

Expand Down
90 changes: 90 additions & 0 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4170,6 +4170,8 @@ class BlockedTimingTests : public CppUnit::TestFixture
CPPUNIT_TEST(testStandard3);
CPPUNIT_TEST(testLightweight);
CPPUNIT_TEST(testLightweight2);
CPPUNIT_TEST(testConcurrent1);
CPPUNIT_TEST(testConcurrent2);
CPPUNIT_TEST_SUITE_END();

void testStandard()
Expand Down Expand Up @@ -4286,6 +4288,94 @@ class BlockedTimingTests : public CppUnit::TestFixture
if (trace)
DBGLOG("%" I64F "u %" I64F "u", blockTime-expected, postBlockTime-blockTime);
}

inline bool within10Percent(__uint64 expected, __uint64 actual)
{
__uint64 threshold = expected / 10;
return (actual >= expected - threshold) && (actual <= expected + threshold);
}

void testConcurrent1()
{
BlockedTimeTracker serverLoadTracker;
BlockedTimeTracker workerLoadTracker;
OverlapTimeInfo serverStartInfo;
OverlapTimeInfo workerStartInfo;
OverlapTimeInfo serverFinishInfo;
OverlapTimeInfo workerFinishInfo;
OverlapTimeInfo dummyInfo;

serverLoadTracker.extractOverlapInfo(serverStartInfo, true, true);
workerLoadTracker.extractOverlapInfo(workerStartInfo, true, false);

MilliSleep(100);
//Another query starts
serverLoadTracker.extractOverlapInfo(dummyInfo, true, true);
workerLoadTracker.noteWaiting(); // worker thread starts
MilliSleep(100);
workerLoadTracker.noteComplete(); // worker thread finishes
serverLoadTracker.extractOverlapInfo(dummyInfo, true, true); // 3rd query starts
MilliSleep(100);
serverLoadTracker.extractOverlapInfo(dummyInfo, false, true); // 2nd (or 3rd) query finishes
workerLoadTracker.noteWaiting(); // worker thread starts
MilliSleep(100);
workerLoadTracker.noteComplete(); // worker thread finishes
MilliSleep(100);

workerLoadTracker.extractOverlapInfo(workerFinishInfo, false, false);
serverLoadTracker.extractOverlapInfo(serverFinishInfo, false, true); // main thread stops

CPPUNIT_ASSERT_EQUAL(2U, workerFinishInfo.count - workerStartInfo.count);
CPPUNIT_ASSERT_EQUAL(true, within10Percent(200'000'000, workerFinishInfo.elapsedNs - workerStartInfo.elapsedNs));

CPPUNIT_ASSERT_EQUAL(3U, serverFinishInfo.count - serverStartInfo.count);

// This query for 500, second query for 200, 3rd query for 300
CPPUNIT_ASSERT_EQUAL(true, within10Percent(1000'000'000, serverFinishInfo.elapsedNs - serverStartInfo.elapsedNs));
}

//MORE: Is this a cleaner interface - effectively remove the 3rd parameter to extractOverlapInfo
void testConcurrent2()
{
BlockedTimeTracker serverLoadTracker;
BlockedTimeTracker workerLoadTracker;
OverlapTimeInfo serverStartInfo;
OverlapTimeInfo workerStartInfo;
OverlapTimeInfo serverFinishInfo;
OverlapTimeInfo workerFinishInfo;
OverlapTimeInfo dummyInfo;

serverLoadTracker.extractOverlapInfo(serverStartInfo, true, false);
workerLoadTracker.extractOverlapInfo(workerStartInfo, true, false);
cycle_t startCycles = serverLoadTracker.noteWaiting();

MilliSleep(100);
//Another query starts
serverLoadTracker.extractOverlapInfo(dummyInfo, true, true);
workerLoadTracker.noteWaiting(); // worker thread starts
MilliSleep(100);
workerLoadTracker.noteComplete(); // worker thread finishes
serverLoadTracker.extractOverlapInfo(dummyInfo, true, true); // 3rd query starts
MilliSleep(100);
serverLoadTracker.extractOverlapInfo(dummyInfo, false, true); // 2nd (or 3rd) query finishes
workerLoadTracker.noteWaiting(); // worker thread starts
MilliSleep(100);
workerLoadTracker.noteComplete(); // worker thread finishes
MilliSleep(100);

workerLoadTracker.extractOverlapInfo(workerFinishInfo, false, false);
serverLoadTracker.extractOverlapInfo(serverFinishInfo, false, false); // main thread stops
cycle_t endCycles = serverLoadTracker.noteComplete();

CPPUNIT_ASSERT_EQUAL(2U, workerFinishInfo.count - workerStartInfo.count);
CPPUNIT_ASSERT_EQUAL(true, within10Percent(200'000'000, workerFinishInfo.elapsedNs - workerStartInfo.elapsedNs));

CPPUNIT_ASSERT_EQUAL(3U, serverFinishInfo.count - serverStartInfo.count);

// This query for 500, second query for 200, 3rd query for 300
CPPUNIT_ASSERT_EQUAL(true, within10Percent(1000'000'000, serverFinishInfo.elapsedNs - serverStartInfo.elapsedNs));
CPPUNIT_ASSERT_EQUAL(true, within10Percent(500'000'000, cycle_to_nanosec(endCycles - startCycles)));
}
};

CPPUNIT_TEST_SUITE_REGISTRATION( BlockedTimingTests );
Expand Down

0 comments on commit 8a510e7

Please sign in to comment.