-
-
Notifications
You must be signed in to change notification settings - Fork 312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use LegacySSHStore
#1444
Use LegacySSHStore
#1444
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,13 +9,10 @@ | |
#include "path.hh" | ||
#include "legacy-ssh-store.hh" | ||
#include "serve-protocol.hh" | ||
#include "serve-protocol-impl.hh" | ||
#include "state.hh" | ||
#include "current-process.hh" | ||
#include "processes.hh" | ||
#include "util.hh" | ||
#include "serve-protocol.hh" | ||
#include "serve-protocol-impl.hh" | ||
#include "ssh.hh" | ||
#include "finally.hh" | ||
#include "url.hh" | ||
|
@@ -39,38 +36,6 @@ bool ::Machine::isLocalhost() const | |
|
||
namespace nix::build_remote { | ||
|
||
static std::unique_ptr<SSHMaster::Connection> openConnection( | ||
::Machine::ptr machine, SSHMaster & master) | ||
{ | ||
Strings command = {"nix-store", "--serve", "--write"}; | ||
if (machine->isLocalhost()) { | ||
command.push_back("--builders"); | ||
command.push_back(""); | ||
} else { | ||
auto remoteStore = machine->storeUri.params.find("remote-store"); | ||
if (remoteStore != machine->storeUri.params.end()) { | ||
command.push_back("--store"); | ||
command.push_back(shellEscape(remoteStore->second)); | ||
} | ||
Comment on lines
-50
to
-54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is handled by Nix. |
||
} | ||
|
||
auto ret = master.startCommand(std::move(command), { | ||
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are moved below |
||
}); | ||
|
||
// XXX: determine the actual max value we can use from /proc. | ||
|
||
// FIXME: Should this be upstreamed into `startCommand` in Nix? | ||
|
||
int pipesize = 1024 * 1024; | ||
|
||
fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize); | ||
fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize); | ||
Comment on lines
-61
to
-68
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In NixOS/nix#10765 @vcunat determind this code was wrong. However, if the address is not too large it might make the pipe larger? Something should be done upstream. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is now done below too. See remoteStore->connPipeSize = 1024 * 1024; Setting a variable doesn't have any side-effects. We do that before any connection is established, however, and then that setting will be applied. |
||
|
||
return ret; | ||
} | ||
|
||
|
||
static void copyClosureTo( | ||
::Machine::Connection & conn, | ||
Store & destStore, | ||
|
@@ -87,8 +52,8 @@ static void copyClosureTo( | |
// FIXME: substitute output pollutes our build log | ||
/* Get back the set of paths that are already valid on the remote | ||
host. */ | ||
auto present = conn.queryValidPaths( | ||
destStore, true, closure, useSubstitutes); | ||
auto present = conn.store->queryValidPaths( | ||
closure, true, useSubstitutes); | ||
|
||
if (present.size() == closure.size()) return; | ||
|
||
|
@@ -103,12 +68,7 @@ static void copyClosureTo( | |
std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock, | ||
std::chrono::seconds(600)); | ||
|
||
conn.to << ServeProto::Command::ImportPaths; | ||
destStore.exportPaths(missing, conn.to); | ||
conn.to.flush(); | ||
|
||
if (readInt(conn.from) != 1) | ||
throw Error("remote machine failed to import closure"); | ||
conn.store->addMultipleToStoreLegacy(destStore, missing); | ||
} | ||
|
||
|
||
|
@@ -228,7 +188,7 @@ static BuildResult performBuild( | |
counter & nrStepsBuilding | ||
) | ||
{ | ||
conn.putBuildDerivationRequest(localStore, drvPath, drv, options); | ||
auto kont = conn.store->buildDerivationAsync(drvPath, drv, options); | ||
|
||
BuildResult result; | ||
|
||
|
@@ -237,7 +197,10 @@ static BuildResult performBuild( | |
startTime = time(0); | ||
{ | ||
MaintainCount<counter> mc(nrStepsBuilding); | ||
result = ServeProto::Serialise<BuildResult>::read(localStore, conn); | ||
result = kont(); | ||
// Without proper call-once functions, we need to manually | ||
// delete after calling. | ||
kont = {}; | ||
} | ||
stopTime = time(0); | ||
|
||
|
@@ -253,7 +216,7 @@ static BuildResult performBuild( | |
|
||
// If the protocol was too old to give us `builtOutputs`, initialize | ||
// it manually by introspecting the derivation. | ||
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6) | ||
if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6) | ||
{ | ||
// If the remote is too old to handle CA derivations, we can’t get this | ||
// far anyways | ||
|
@@ -286,26 +249,25 @@ static void copyPathFromRemote( | |
const ValidPathInfo & info | ||
) | ||
{ | ||
/* Receive the NAR from the remote and add it to the | ||
destination store. Meanwhile, extract all the info from the | ||
NAR that getBuildOutput() needs. */ | ||
auto source2 = sinkToSource([&](Sink & sink) | ||
{ | ||
/* Note: we should only send the command to dump the store | ||
path to the remote if the NAR is actually going to get read | ||
by the destination store, which won't happen if this path | ||
is already valid on the destination store. Since this | ||
lambda function only gets executed if someone tries to read | ||
from source2, we will send the command from here rather | ||
than outside the lambda. */ | ||
conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path); | ||
conn.to.flush(); | ||
|
||
TeeSource tee(conn.from, sink); | ||
extractNarData(tee, localStore.printStorePath(info.path), narMembers); | ||
}); | ||
|
||
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); | ||
/* Receive the NAR from the remote and add it to the | ||
destination store. Meanwhile, extract all the info from the | ||
NAR that getBuildOutput() needs. */ | ||
auto source2 = sinkToSource([&](Sink & sink) | ||
{ | ||
/* Note: we should only send the command to dump the store | ||
path to the remote if the NAR is actually going to get read | ||
by the destination store, which won't happen if this path | ||
is already valid on the destination store. Since this | ||
lambda function only gets executed if someone tries to read | ||
from source2, we will send the command from here rather | ||
than outside the lambda. */ | ||
conn.store->narFromPath(info.path, [&](Source & source) { | ||
TeeSource tee{source, sink}; | ||
extractNarData(tee, conn.store->printStorePath(info.path), narMembers); | ||
}); | ||
}); | ||
|
||
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); | ||
} | ||
|
||
static void copyPathsFromRemote( | ||
|
@@ -404,30 +366,39 @@ void State::buildRemote(ref<Store> destStore, | |
|
||
updateStep(ssConnecting); | ||
|
||
auto storeRef = machine->completeStoreReference(); | ||
|
||
auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant); | ||
if (!pSpecified || pSpecified->scheme != "ssh") { | ||
throw Error("Currently, only (legacy-)ssh stores are supported!"); | ||
} | ||
|
||
LegacySSHStoreConfig storeConfig { | ||
pSpecified->scheme, | ||
pSpecified->authority, | ||
storeRef.params | ||
}; | ||
|
||
auto master = storeConfig.createSSHMaster( | ||
false, // no SSH master yet | ||
logFD.get()); | ||
|
||
// FIXME: rewrite to use Store. | ||
auto child = build_remote::openConnection(machine, master); | ||
::Machine::Connection conn { | ||
.machine = machine, | ||
.store = [&]{ | ||
auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant); | ||
if (!pSpecified || pSpecified->scheme != "ssh") { | ||
throw Error("Currently, only (legacy-)ssh stores are supported!"); | ||
} | ||
|
||
auto remoteStore = machine->openStore().dynamic_pointer_cast<LegacySSHStore>(); | ||
assert(remoteStore); | ||
|
||
remoteStore->connPipeSize = 1024 * 1024; | ||
|
||
if (machine->isLocalhost()) { | ||
auto rp_new = remoteStore->remoteProgram.get(); | ||
rp_new.push_back("--builders"); | ||
rp_new.push_back(""); | ||
const_cast<nix::Setting<Strings> &>(remoteStore->remoteProgram).assign(rp_new); | ||
} | ||
remoteStore->extraSshArgs = { | ||
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" | ||
}; | ||
const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get()); | ||
|
||
return nix::ref{remoteStore}; | ||
}(), | ||
}; | ||
|
||
{ | ||
auto activeStepState(activeStep->state_.lock()); | ||
if (activeStepState->cancelled) throw Error("step cancelled"); | ||
activeStepState->pid = child->sshPid; | ||
activeStepState->pid = conn.store->getConnectionPid(); | ||
} | ||
|
||
Finally clearPid([&]() { | ||
|
@@ -442,35 +413,12 @@ void State::buildRemote(ref<Store> destStore, | |
process. Meh. */ | ||
}); | ||
|
||
::Machine::Connection conn { | ||
{ | ||
.to = child->in.get(), | ||
.from = child->out.get(), | ||
/* Handshake. */ | ||
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize | ||
}, | ||
/*.machine =*/ machine, | ||
}; | ||
|
||
Finally updateStats([&]() { | ||
bytesReceived += conn.from.read; | ||
bytesSent += conn.to.written; | ||
auto stats = conn.store->getConnectionStats(); | ||
bytesReceived += stats.bytesReceived; | ||
bytesSent += stats.bytesSent; | ||
}); | ||
|
||
constexpr ServeProto::Version our_version = 0x206; | ||
|
||
try { | ||
conn.remoteVersion = decltype(conn)::handshake( | ||
conn.to, | ||
conn.from, | ||
our_version, | ||
machine->storeUri.render()); | ||
} catch (EndOfFile & e) { | ||
child->sshPid.wait(); | ||
std::string s = chomp(readFile(result.logFile)); | ||
throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s); | ||
} | ||
|
||
{ | ||
auto info(machine->state->connectInfo.lock()); | ||
info->consecutiveFailures = 0; | ||
|
@@ -539,7 +487,7 @@ void State::buildRemote(ref<Store> destStore, | |
|
||
auto now1 = std::chrono::steady_clock::now(); | ||
|
||
auto infos = conn.queryPathInfos(*localStore, outputs); | ||
auto infos = conn.store->queryPathInfosUncached(outputs); | ||
|
||
size_t totalNarSize = 0; | ||
for (auto & [_, info] : infos) totalNarSize += info.narSize; | ||
|
@@ -574,9 +522,11 @@ void State::buildRemote(ref<Store> destStore, | |
} | ||
} | ||
|
||
/* Shut down the connection. */ | ||
child->in = -1; | ||
child->sshPid.wait(); | ||
/* Shut down the connection done by RAII. | ||
|
||
Only difference is kill() instead of wait() (i.e. send signal | ||
then wait()) | ||
*/ | ||
|
||
} catch (Error & e) { | ||
/* Disable this machine until a certain period of time has | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done below