Skip to content

Commit

Permalink
Use LegacySSHStore
Browse files Browse the repository at this point in the history
In NixOS/nix#10748 it is extended with
everything we need.
  • Loading branch information
Ericson2314 committed Feb 18, 2025
1 parent 881462b commit 4a4a0f9
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 119 deletions.
180 changes: 65 additions & 115 deletions src/hydra-queue-runner/build-remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@
#include "path.hh"
#include "legacy-ssh-store.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh"
#include "current-process.hh"
#include "processes.hh"
#include "util.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "ssh.hh"
#include "finally.hh"
#include "url.hh"
Expand All @@ -39,38 +36,6 @@ bool ::Machine::isLocalhost() const

namespace nix::build_remote {

static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master)
{
Strings command = {"nix-store", "--serve", "--write"};
if (machine->isLocalhost()) {
command.push_back("--builders");
command.push_back("");
} else {
auto remoteStore = machine->storeUri.params.find("remote-store");
if (remoteStore != machine->storeUri.params.end()) {
command.push_back("--store");
command.push_back(shellEscape(remoteStore->second));
}
}

auto ret = master.startCommand(std::move(command), {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
});

// XXX: determine the actual max value we can use from /proc.

// FIXME: Should this be upstreamed into `startCommand` in Nix?

int pipesize = 1024 * 1024;

fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize);
fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize);

return ret;
}


static void copyClosureTo(
::Machine::Connection & conn,
Store & destStore,
Expand All @@ -87,8 +52,8 @@ static void copyClosureTo(
// FIXME: substitute output pollutes our build log
/* Get back the set of paths that are already valid on the remote
host. */
auto present = conn.queryValidPaths(
destStore, true, closure, useSubstitutes);
auto present = conn.store->queryValidPaths(
closure, true, useSubstitutes);

if (present.size() == closure.size()) return;

Expand All @@ -103,12 +68,7 @@ static void copyClosureTo(
std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
std::chrono::seconds(600));

conn.to << ServeProto::Command::ImportPaths;
destStore.exportPaths(missing, conn.to);
conn.to.flush();

if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure");
conn.store->addMultipleToStoreLegacy(destStore, missing);
}


Expand Down Expand Up @@ -228,7 +188,7 @@ static BuildResult performBuild(
counter & nrStepsBuilding
)
{
conn.putBuildDerivationRequest(localStore, drvPath, drv, options);
auto kont = conn.store->buildDerivationAsync(drvPath, drv, options);

BuildResult result;

Expand All @@ -237,7 +197,10 @@ static BuildResult performBuild(
startTime = time(0);
{
MaintainCount<counter> mc(nrStepsBuilding);
result = ServeProto::Serialise<BuildResult>::read(localStore, conn);
result = kont();
// Without proper call-once functions, we need to manually
// delete after calling.
kont = {};
}
stopTime = time(0);

Expand All @@ -253,7 +216,7 @@ static BuildResult performBuild(

// If the protocol was too old to give us `builtOutputs`, initialize
// it manually by introspecting the derivation.
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6)
if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6)
{
// If the remote is too old to handle CA derivations, we can’t get this
// far anyways
Expand Down Expand Up @@ -286,26 +249,25 @@ static void copyPathFromRemote(
const ValidPathInfo & info
)
{
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
auto source2 = sinkToSource([&](Sink & sink)
{
/* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path
is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path);
conn.to.flush();

TeeSource tee(conn.from, sink);
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
});

destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
auto source2 = sinkToSource([&](Sink & sink)
{
/* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path
is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
conn.store->narFromPath(info.path, [&](Source & source) {
TeeSource tee{source, sink};
extractNarData(tee, conn.store->printStorePath(info.path), narMembers);
});
});

destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
}

static void copyPathsFromRemote(
Expand Down Expand Up @@ -404,30 +366,39 @@ void State::buildRemote(ref<Store> destStore,

updateStep(ssConnecting);

auto storeRef = machine->completeStoreReference();

auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}

LegacySSHStoreConfig storeConfig {
pSpecified->scheme,
pSpecified->authority,
storeRef.params
};

auto master = storeConfig.createSSHMaster(
false, // no SSH master yet
logFD.get());

// FIXME: rewrite to use Store.
auto child = build_remote::openConnection(machine, master);
::Machine::Connection conn {
.machine = machine,
.store = [&]{
auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}

auto remoteStore = machine->openStore().dynamic_pointer_cast<LegacySSHStore>();
assert(remoteStore);

remoteStore->connPipeSize = 1024 * 1024;

if (machine->isLocalhost()) {
auto rp_new = remoteStore->remoteProgram.get();
rp_new.push_back("--builders");
rp_new.push_back("");
const_cast<nix::Setting<Strings> &>(remoteStore->remoteProgram).assign(rp_new);
}
remoteStore->extraSshArgs = {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
};
const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get());

return nix::ref{remoteStore};
}(),
};

{
auto activeStepState(activeStep->state_.lock());
if (activeStepState->cancelled) throw Error("step cancelled");
activeStepState->pid = child->sshPid;
activeStepState->pid = conn.store->getConnectionPid();
}

Finally clearPid([&]() {
Expand All @@ -442,35 +413,12 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */
});

::Machine::Connection conn {
{
.to = child->in.get(),
.from = child->out.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
},
/*.machine =*/ machine,
};

Finally updateStats([&]() {
bytesReceived += conn.from.read;
bytesSent += conn.to.written;
auto stats = conn.store->getConnectionStats();
bytesReceived += stats.bytesReceived;
bytesSent += stats.bytesSent;
});

constexpr ServeProto::Version our_version = 0x206;

try {
conn.remoteVersion = decltype(conn)::handshake(
conn.to,
conn.from,
our_version,
machine->storeUri.render());
} catch (EndOfFile & e) {
child->sshPid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s);
}

{
auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0;
Expand Down Expand Up @@ -539,7 +487,7 @@ void State::buildRemote(ref<Store> destStore,

auto now1 = std::chrono::steady_clock::now();

auto infos = conn.queryPathInfos(*localStore, outputs);
auto infos = conn.store->queryPathInfosUncached(outputs);

size_t totalNarSize = 0;
for (auto & [_, info] : infos) totalNarSize += info.narSize;
Expand Down Expand Up @@ -574,9 +522,11 @@ void State::buildRemote(ref<Store> destStore,
}
}

/* Shut down the connection. */
child->in = -1;
child->sshPid.wait();
/* Shut down the connection done by RAII.
Only difference is kill() instead of wait() (i.e. send signal
then wait())
*/

} catch (Error & e) {
/* Disable this machine until a certain period of time has
Expand Down
8 changes: 4 additions & 4 deletions src/hydra-queue-runner/state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
#include "store-api.hh"
#include "sync.hh"
#include "nar-extractor.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "serve-protocol-connection.hh"
#include "legacy-ssh-store.hh"
#include "machines.hh"


Expand Down Expand Up @@ -292,9 +290,11 @@ struct Machine : nix::Machine
bool isLocalhost() const;

// A connection to a machine
struct Connection : nix::ServeProto::BasicClientConnection {
struct Connection {
// Backpointer to the machine
ptr machine;
// Opened store
nix::ref<nix::LegacySSHStore> store;
};
};

Expand Down

0 comments on commit 4a4a0f9

Please sign in to comment.