Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hydra-queue-runner: drop broken connections from pool #1370

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/hydra-queue-runner/hydra-queue-runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -926,10 +926,17 @@ void State::run(BuildID buildOne)
while (true) {
try {
auto conn(dbPool.get());
receiver dumpStatus_(*conn, "dump_status");
while (true) {
conn->await_notification();
dumpStatus(*conn);
try {
receiver dumpStatus_(*conn, "dump_status");
while (true) {
conn->await_notification();
dumpStatus(*conn);
}
} catch (pqxx::broken_connection & connEx) {
printMsg(lvlError, "main thread: %s", connEx.what());
printMsg(lvlError, "main thread: Reconnecting in 10s");
conn.markBad();
sleep(10);
}
} catch (std::exception & e) {
printMsg(lvlError, "main thread: %s", e.what());
Expand Down
34 changes: 19 additions & 15 deletions src/hydra-queue-runner/queue-monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@ using namespace nix;
void State::queueMonitor()
{
while (true) {
auto conn(dbPool.get());
try {
queueMonitorLoop();
queueMonitorLoop(*conn);
} catch (pqxx::broken_connection & e) {
printMsg(lvlError, "queue monitor: %s", e.what());
printMsg(lvlError, "queue monitor: Reconnecting in 10s");
conn.markBad();
sleep(10);
} catch (std::exception & e) {
printError("queue monitor: %s", e.what());
sleep(10); // probably a DB problem, so don't retry right away
Expand All @@ -20,16 +26,14 @@ void State::queueMonitor()
}


void State::queueMonitorLoop()
void State::queueMonitorLoop(Connection & conn)
{
auto conn(dbPool.get());

receiver buildsAdded(*conn, "builds_added");
receiver buildsRestarted(*conn, "builds_restarted");
receiver buildsCancelled(*conn, "builds_cancelled");
receiver buildsDeleted(*conn, "builds_deleted");
receiver buildsBumped(*conn, "builds_bumped");
receiver jobsetSharesChanged(*conn, "jobset_shares_changed");
receiver buildsAdded(conn, "builds_added");
receiver buildsRestarted(conn, "builds_restarted");
receiver buildsCancelled(conn, "builds_cancelled");
receiver buildsDeleted(conn, "builds_deleted");
receiver buildsBumped(conn, "builds_bumped");
receiver jobsetSharesChanged(conn, "jobset_shares_changed");

auto destStore = getDestStore();

Expand All @@ -39,17 +43,17 @@ void State::queueMonitorLoop()
while (!quit) {
localStore->clearPathInfoCache();

bool done = getQueuedBuilds(*conn, destStore, lastBuildId);
bool done = getQueuedBuilds(conn, destStore, lastBuildId);

if (buildOne && buildOneDone) quit = true;

/* Sleep until we get notification from the database about an
event. */
if (done && !quit) {
conn->await_notification();
conn.await_notification();
nrQueueWakeups++;
} else
conn->get_notifs();
conn.get_notifs();

if (auto lowestId = buildsAdded.get()) {
lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1));
Expand All @@ -61,11 +65,11 @@ void State::queueMonitorLoop()
}
if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) {
printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
processQueueChange(*conn);
processQueueChange(conn);
}
if (jobsetSharesChanged.get()) {
printMsg(lvlTalkative, "got notification: jobset shares changed");
processJobsetSharesChange(*conn);
processJobsetSharesChange(conn);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/hydra-queue-runner/state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ private:

void queueMonitor();

void queueMonitorLoop();
void queueMonitorLoop(Connection & conn);

/* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn,
Expand Down
Loading