Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use LegacySSHStore #1444

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 65 additions & 115 deletions src/hydra-queue-runner/build-remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@
#include "path.hh"
#include "legacy-ssh-store.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh"
#include "current-process.hh"
#include "processes.hh"
#include "util.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "ssh.hh"
#include "finally.hh"
#include "url.hh"
Expand All @@ -39,38 +36,6 @@ bool ::Machine::isLocalhost() const

namespace nix::build_remote {

static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master)
{
Strings command = {"nix-store", "--serve", "--write"};
if (machine->isLocalhost()) {
command.push_back("--builders");
command.push_back("");
Comment on lines -46 to -48
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done below

} else {
auto remoteStore = machine->storeUri.params.find("remote-store");
if (remoteStore != machine->storeUri.params.end()) {
command.push_back("--store");
command.push_back(shellEscape(remoteStore->second));
}
Comment on lines -50 to -54
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled by Nix.

}

auto ret = master.startCommand(std::move(command), {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are moved below

});

// XXX: determine the actual max value we can use from /proc.

// FIXME: Should this be upstreamed into `startCommand` in Nix?

int pipesize = 1024 * 1024;

fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize);
fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize);
Comment on lines -61 to -68
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In NixOS/nix#10765 @vcunat determind this code was wrong. However, if the address is not too large it might make the pipe larger? Something should be done upstream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now done below too.

See

remoteStore->connPipeSize = 1024 * 1024;

Setting a variable doesn't have any side-effects. We do that before any connection is established, however, and then that setting will be applied.


return ret;
}


static void copyClosureTo(
::Machine::Connection & conn,
Store & destStore,
Expand All @@ -87,8 +52,8 @@ static void copyClosureTo(
// FIXME: substitute output pollutes our build log
/* Get back the set of paths that are already valid on the remote
host. */
auto present = conn.queryValidPaths(
destStore, true, closure, useSubstitutes);
auto present = conn.store->queryValidPaths(
closure, true, useSubstitutes);

if (present.size() == closure.size()) return;

Expand All @@ -103,12 +68,7 @@ static void copyClosureTo(
std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
std::chrono::seconds(600));

conn.to << ServeProto::Command::ImportPaths;
destStore.exportPaths(missing, conn.to);
conn.to.flush();

if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure");
conn.store->addMultipleToStoreLegacy(destStore, missing);
}


Expand Down Expand Up @@ -228,7 +188,7 @@ static BuildResult performBuild(
counter & nrStepsBuilding
)
{
conn.putBuildDerivationRequest(localStore, drvPath, drv, options);
auto kont = conn.store->buildDerivationAsync(drvPath, drv, options);

BuildResult result;

Expand All @@ -237,7 +197,10 @@ static BuildResult performBuild(
startTime = time(0);
{
MaintainCount<counter> mc(nrStepsBuilding);
result = ServeProto::Serialise<BuildResult>::read(localStore, conn);
result = kont();
// Without proper call-once functions, we need to manually
// delete after calling.
kont = {};
}
stopTime = time(0);

Expand All @@ -253,7 +216,7 @@ static BuildResult performBuild(

// If the protocol was too old to give us `builtOutputs`, initialize
// it manually by introspecting the derivation.
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6)
if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6)
{
// If the remote is too old to handle CA derivations, we can’t get this
// far anyways
Expand Down Expand Up @@ -286,26 +249,25 @@ static void copyPathFromRemote(
const ValidPathInfo & info
)
{
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
auto source2 = sinkToSource([&](Sink & sink)
{
/* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path
is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path);
conn.to.flush();

TeeSource tee(conn.from, sink);
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
});

destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
auto source2 = sinkToSource([&](Sink & sink)
{
/* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path
is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
conn.store->narFromPath(info.path, [&](Source & source) {
TeeSource tee{source, sink};
extractNarData(tee, conn.store->printStorePath(info.path), narMembers);
});
});

destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
}

static void copyPathsFromRemote(
Expand Down Expand Up @@ -404,30 +366,39 @@ void State::buildRemote(ref<Store> destStore,

updateStep(ssConnecting);

auto storeRef = machine->completeStoreReference();

auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}

LegacySSHStoreConfig storeConfig {
pSpecified->scheme,
pSpecified->authority,
storeRef.params
};

auto master = storeConfig.createSSHMaster(
false, // no SSH master yet
logFD.get());

// FIXME: rewrite to use Store.
auto child = build_remote::openConnection(machine, master);
::Machine::Connection conn {
.machine = machine,
.store = [&]{
auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}

auto remoteStore = machine->openStore().dynamic_pointer_cast<LegacySSHStore>();
assert(remoteStore);

remoteStore->connPipeSize = 1024 * 1024;

if (machine->isLocalhost()) {
auto rp_new = remoteStore->remoteProgram.get();
rp_new.push_back("--builders");
rp_new.push_back("");
const_cast<nix::Setting<Strings> &>(remoteStore->remoteProgram).assign(rp_new);
}
remoteStore->extraSshArgs = {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
};
const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get());

return nix::ref{remoteStore};
}(),
};

{
auto activeStepState(activeStep->state_.lock());
if (activeStepState->cancelled) throw Error("step cancelled");
activeStepState->pid = child->sshPid;
activeStepState->pid = conn.store->getConnectionPid();
}

Finally clearPid([&]() {
Expand All @@ -442,35 +413,12 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */
});

::Machine::Connection conn {
{
.to = child->in.get(),
.from = child->out.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
},
/*.machine =*/ machine,
};

Finally updateStats([&]() {
bytesReceived += conn.from.read;
bytesSent += conn.to.written;
auto stats = conn.store->getConnectionStats();
bytesReceived += stats.bytesReceived;
bytesSent += stats.bytesSent;
});

constexpr ServeProto::Version our_version = 0x206;

try {
conn.remoteVersion = decltype(conn)::handshake(
conn.to,
conn.from,
our_version,
machine->storeUri.render());
} catch (EndOfFile & e) {
child->sshPid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s);
}

{
auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0;
Expand Down Expand Up @@ -539,7 +487,7 @@ void State::buildRemote(ref<Store> destStore,

auto now1 = std::chrono::steady_clock::now();

auto infos = conn.queryPathInfos(*localStore, outputs);
auto infos = conn.store->queryPathInfosUncached(outputs);

size_t totalNarSize = 0;
for (auto & [_, info] : infos) totalNarSize += info.narSize;
Expand Down Expand Up @@ -574,9 +522,11 @@ void State::buildRemote(ref<Store> destStore,
}
}

/* Shut down the connection. */
child->in = -1;
child->sshPid.wait();
/* Shut down the connection done by RAII.

Only difference is kill() instead of wait() (i.e. send signal
then wait())
*/

} catch (Error & e) {
/* Disable this machine until a certain period of time has
Expand Down
8 changes: 4 additions & 4 deletions src/hydra-queue-runner/state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
#include "store-api.hh"
#include "sync.hh"
#include "nar-extractor.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "serve-protocol-connection.hh"
#include "legacy-ssh-store.hh"
#include "machines.hh"


Expand Down Expand Up @@ -292,9 +290,11 @@ struct Machine : nix::Machine
bool isLocalhost() const;

// A connection to a machine
struct Connection : nix::ServeProto::BasicClientConnection {
struct Connection {
// Backpointer to the machine
ptr machine;
// Opened store
nix::ref<nix::LegacySSHStore> store;
};
};

Expand Down