Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-33244 Extend BlockedTimeTracker to allow concurrent query load to be estimated #19414

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions system/jlib/jdebug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,18 +667,34 @@ MODULE_EXIT()
//===========================================================================


void BlockedTimeTracker::noteWaiting()
//Calculate how much time has elapsed total - with the given number of threads still active
__uint64 BlockedTimeTracker::calcActiveTime(cycle_t tally, unsigned active) const
{
if (active != 0)
{
cycle_t now = get_cycles_now();
tally += active * now;
}

return cycle_to_nanosec(tally);
}

cycle_t BlockedTimeTracker::noteWaiting()
{
cycle_t now = get_cycles_now();
CriticalBlock block(cs);
numWaiting++;
timeStampTally -= get_cycles_now();
numStarted++;
timeStampTally -= now;
return now;
}

void BlockedTimeTracker::noteComplete()
cycle_t BlockedTimeTracker::noteComplete()
{
cycle_t now = get_cycles_now();
CriticalBlock block(cs);
numWaiting--;
timeStampTally += get_cycles_now();
numFinished++;
timeStampTally += now;
return now;
}

__uint64 BlockedTimeTracker::getWaitingNs() const
Expand All @@ -687,17 +703,30 @@ __uint64 BlockedTimeTracker::getWaitingNs() const
cycle_t tally;
{
CriticalBlock block(cs);
active = numWaiting;
active = numStarted - numFinished;
tally = timeStampTally;
}

if (active != 0)
return calcActiveTime(tally, active);
}


void BlockedTimeTracker::extractOverlapInfo(OverlapTimeInfo & info, bool isStart) const
{
unsigned started;
unsigned finished;
cycle_t tally;

{
cycle_t now = get_cycles_now();
tally += active * now;
CriticalBlock block(cs);
started = numStarted;
finished = numFinished;
tally = timeStampTally;
}

return cycle_to_nanosec(tally);
//Record so that when counts are subtracted, the total count will include all jobs that overlapped in any part
info.count = isStart ? finished : started;
info.elapsedNs = calcActiveTime(tally, started - finished);
}


Expand Down
41 changes: 38 additions & 3 deletions system/jlib/jdebug.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,21 +307,56 @@ Also since you are only ever interested in (sumEndTimestamps - sumStartTimestamp
There are two versions, one that uses a critical section, and a second that uses atomics, but is limited to the number
of active blocked items.
There is a second potential use for the BlockedTimeTracker class - for monitoring how many queries overlap the execution of the current query.
This can be calculated in the following way:
a) When a server query starts it calls
serverLoadTracker.extractInfo(startServer, true)
workerLoadTracker.extractInfo(startWorker, true);
startCycles = serverLoadTracker.noteWaiting();
b) When a server query finished it calls
serverLoadTracker.noteComplete();
serverLoadTracker.extractInfo(finishServer, true)
endCycles = workerLoadTracker.extractInfo(finishWorker, true);
c) When a worker query starts it calls
workerLoadTracker.noteWaiting();
d) When a worker query finished it calls
workerLoadTracker.noteComplete();
* (finishX.count - startX.count) gives the total number of queries (including this one) that executed at the same time as the query
* (finishX.elapsedNs - startX.elapsedNs) gives the total execution time of all queries (including this one)
* endCycles - startCycles gives the elapsed time for this query.
* This can be reported separately for the server and worker to give an estimate of the concurrent load when a query was running.
*/

struct OverlapTimeInfo
{
unsigned count;
stat_type elapsedNs;
};

class jlib_decl BlockedTimeTracker
{
public:
BlockedTimeTracker() = default;
BlockedTimeTracker(const BlockedTimeTracker &) = delete;

void noteWaiting();
void noteComplete();
//The following return get_cycles_now() - which can be used for tracking elapsed time.
cycle_t noteWaiting();
cycle_t noteComplete();
__uint64 getWaitingNs() const;

//A helper function to help calculate the overlapping load on the system
void extractOverlapInfo(OverlapTimeInfo & info, bool isStart) const;

protected:
__uint64 calcActiveTime(cycle_t tally, unsigned active) const;

private:
mutable CriticalSection cs;
unsigned numWaiting = 0;
unsigned numStarted = 0;
unsigned numFinished = 0;
cycle_t timeStampTally = 0;
};

Expand Down
48 changes: 48 additions & 0 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4170,6 +4170,7 @@ class BlockedTimingTests : public CppUnit::TestFixture
CPPUNIT_TEST(testStandard3);
CPPUNIT_TEST(testLightweight);
CPPUNIT_TEST(testLightweight2);
CPPUNIT_TEST(testConcurrent);
CPPUNIT_TEST_SUITE_END();

void testStandard()
Expand Down Expand Up @@ -4286,6 +4287,53 @@ class BlockedTimingTests : public CppUnit::TestFixture
if (trace)
DBGLOG("%" I64F "u %" I64F "u", blockTime-expected, postBlockTime-blockTime);
}

inline bool within10Percent(__uint64 expected, __uint64 actual)
{
__uint64 threshold = expected / 10;
return (actual >= expected - threshold) && (actual <= expected + threshold);
}

void testConcurrent()
{
BlockedTimeTracker serverLoadTracker;
BlockedTimeTracker workerLoadTracker;
OverlapTimeInfo serverStartInfo;
OverlapTimeInfo workerStartInfo;
OverlapTimeInfo serverFinishInfo;
OverlapTimeInfo workerFinishInfo;
OverlapTimeInfo dummyInfo;

serverLoadTracker.extractOverlapInfo(serverStartInfo, true);
workerLoadTracker.extractOverlapInfo(workerStartInfo, true);
cycle_t startCycles = serverLoadTracker.noteWaiting();

MilliSleep(100);
serverLoadTracker.noteWaiting(); // 2nd query starts Q1 Q2
workerLoadTracker.noteWaiting(); // worker thread starts Q1 Q2 W1
MilliSleep(100);
workerLoadTracker.noteComplete(); // worker thread finishes Q1 Q2
serverLoadTracker.noteWaiting(); // 3rd query starts Q1 Q2 Q3
MilliSleep(100);
serverLoadTracker.noteComplete(); // 2nd query finishes Q1 Q3
workerLoadTracker.noteWaiting(); // worker thread starts Q1 Q3 W2
MilliSleep(100);
workerLoadTracker.noteComplete(); // worker thread finishes Q1 Q3
MilliSleep(100);

cycle_t endCycles = serverLoadTracker.noteComplete(); // Q3
workerLoadTracker.extractOverlapInfo(workerFinishInfo, false);
serverLoadTracker.extractOverlapInfo(serverFinishInfo, false);

CPPUNIT_ASSERT_EQUAL(2U, workerFinishInfo.count - workerStartInfo.count);
CPPUNIT_ASSERT_EQUAL(true, within10Percent(200'000'000, workerFinishInfo.elapsedNs - workerStartInfo.elapsedNs));

CPPUNIT_ASSERT_EQUAL(3U, serverFinishInfo.count - serverStartInfo.count);

// This query for 500, second query for 200, 3rd query for 300
CPPUNIT_ASSERT_EQUAL(true, within10Percent(1000'000'000, serverFinishInfo.elapsedNs - serverStartInfo.elapsedNs));
CPPUNIT_ASSERT_EQUAL(true, within10Percent(500'000'000, cycle_to_nanosec(endCycles - startCycles)));
}
};

CPPUNIT_TEST_SUITE_REGISTRATION( BlockedTimingTests );
Expand Down
Loading