Skip to content

Commit

Permalink
🚸 Ensure exception propagation from threads via std::async (#263)
Browse files Browse the repository at this point in the history
## Description

If an unhandled exception occurs in a C++ thread, `std::terminate` is
called. This is undesirable, especially from the Python side, since
there is no way for the exception to be propagated to Python and, thus,
for handling it gracefully.
It just causes a crash of the Python executable.

Fortunately, C++ offers some ways to allow for exceptions from threads
to be propagated to the main thread.
This is done by using `std::async` instead of `std::thread`. Upon
creation, it returns an `std::future` object that is used to refer to
the return value of the asynchronous computation.
A future can hold the return value, but also an exception. Upon calling
`future.get()`, either the value is returned or the stored exception is
rethrown.
In this way, any exception happening concurrently is eventually
propagated to the main thread and, as a result, to Python.

This also caught some instances of methods being declared `noexcept`
despite possibly throwing. This also causes the C++ runtime to call
`std::terminate` instead of resulting in a proper error message.

## Checklist:

<!---
This checklist serves as a reminder of a couple of things that ensure
your pull request will be merged swiftly.
-->

- [x] The pull request only contains commits that are related to it.
- [x] I have added appropriate tests and documentation.
- [x] I have made sure that all CI jobs on GitHub pass.
- [x] The pull request introduces no new warnings and follows the
project's style guidelines.
  • Loading branch information
burgholzer authored Apr 2, 2023
2 parents ea9f0a1 + 4e867e8 commit 5e927b3
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 83 deletions.
44 changes: 44 additions & 0 deletions include/EquivalenceCheckingManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <atomic>
#include <chrono>
#include <future>
#include <memory>
#include <mutex>
#include <thread>
Expand Down Expand Up @@ -277,6 +278,49 @@ class EquivalenceCheckingManager {
}
}

/// \brief Run an EquivalenceChecker asynchronously
///
/// This function is used to asynchronously run an EquivalenceChecker. It also
/// takes care of creating the checker if it does not exist yet. Additionally,
/// it takes care that the checker signals the main thread when it is done
/// (even in case of an exception).
///
/// \tparam Checker The type of the checker (must be derived from the
/// EquivalenceChecker class).
/// \param id The id in the checkers vector where the checker is stored.
/// \param queue The queue to which the checker shall push its id
/// once it is done.
/// \return A future that can be used to wait for the checker to finish.
template <class Checker>
std::future<void> asyncRunChecker(const std::size_t id,
ThreadSafeQueue<std::size_t>& queue) {
static_assert(std::is_base_of_v<EquivalenceChecker, Checker>,
"Checker must be derived from EquivalenceChecker");
return std::async(std::launch::async, [this, id, &queue]() {
try {
auto& checker = checkers[id];
if (!checker) {
checker = std::make_unique<Checker>(qc1, qc2, configuration);
}

if constexpr (std::is_same_v<Checker, DDSimulationChecker>) {
auto* const simChecker =
dynamic_cast<DDSimulationChecker*>(checker.get());
const std::lock_guard stateGeneratorLock(stateGeneratorMutex);
simChecker->setRandomInitialState(stateGenerator);
}

if (!done) {
checker->run();
}
queue.push(id);
} catch (const std::exception& e) {
queue.push(id);
throw;
}
});
}

[[nodiscard]] bool simulationsFinished() const {
return results.performedSimulations == configuration.simulation.maxSims;
}
Expand Down
2 changes: 1 addition & 1 deletion include/checker/dd/DDEquivalenceChecker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class DDEquivalenceChecker : public EquivalenceChecker {
public:
DDEquivalenceChecker(const qc::QuantumComputation& circ1,
const qc::QuantumComputation& circ2,
Configuration config) noexcept
Configuration config)
: EquivalenceChecker(circ1, circ2, std::move(config)),
dd(std::make_unique<dd::Package<Config>>(nqubits)),
taskManager1(TaskManager<DDType, Config>(circ1, dd)),
Expand Down
2 changes: 1 addition & 1 deletion include/checker/dd/DDSimulationChecker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class DDSimulationChecker final
public:
DDSimulationChecker(const qc::QuantumComputation& circ1,
const qc::QuantumComputation& circ2,
Configuration configuration) noexcept;
Configuration configuration);

void setRandomInitialState(StateGenerator& generator);

Expand Down
103 changes: 23 additions & 80 deletions src/EquivalenceCheckingManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,67 +469,35 @@ void EquivalenceCheckingManager::checkParallel() {
ThreadSafeQueue<std::size_t> queue{};
std::size_t id = 0U;

// reserve space for the threads
std::vector<std::thread> threads{};
threads.reserve(effectiveThreads);
// reserve space for the futures received from the async calls
std::vector<std::future<void>> futures{};
futures.reserve(effectiveThreads);

if (configuration.execution.runAlternatingChecker) {
// start a new thread that constructs and runs the alternating check
threads.emplace_back([this, &queue, id] {
checkers[id] =
std::make_unique<DDAlternatingChecker>(qc1, qc2, configuration);
checkers[id]->run();
queue.push(id);
});
futures.emplace_back(asyncRunChecker<DDAlternatingChecker>(id, queue));
++id;
}

if (configuration.execution.runConstructionChecker && !done) {
// start a new thread that constructs and runs the construction check
threads.emplace_back([this, &queue, id] {
checkers[id] =
std::make_unique<DDConstructionChecker>(qc1, qc2, configuration);
if (!done) {
checkers[id]->run();
}
queue.push(id);
});
futures.emplace_back(asyncRunChecker<DDConstructionChecker>(id, queue));
++id;
}

if (configuration.execution.runZXChecker && !done) {
// start a new thread that constructs and runs the ZX checker
threads.emplace_back([this, &queue, id] {
checkers[id] =
std::make_unique<ZXEquivalenceChecker>(qc1, qc2, configuration);
if (!done) {
checkers[id]->run();
}
queue.push(id);
});
futures.emplace_back(asyncRunChecker<ZXEquivalenceChecker>(id, queue));
++id;
}

if (configuration.execution.runSimulationChecker) {
const auto effectiveThreadsLeft = effectiveThreads - threads.size();
const auto effectiveThreadsLeft = effectiveThreads - futures.size();
const auto simulationsToStart =
std::min(effectiveThreadsLeft, configuration.simulation.maxSims);
// launch as many simulations as possible
for (std::size_t i = 0; i < simulationsToStart && !done; ++i) {
threads.emplace_back([this, &queue, id] {
checkers[id] =
std::make_unique<DDSimulationChecker>(qc1, qc2, configuration);
auto* const checker =
dynamic_cast<DDSimulationChecker*>(checkers[id].get());
{
const std::lock_guard stateGeneratorLock(stateGeneratorMutex);
checker->setRandomInitialState(stateGenerator);
}
if (!done) {
checkers[id]->run();
}
queue.push(id);
});
futures.emplace_back(asyncRunChecker<DDSimulationChecker>(id, queue));
++id;
++results.startedSimulations;
}
Expand All @@ -552,8 +520,9 @@ void EquivalenceCheckingManager::checkParallel() {
}

// otherwise, a checker has finished its execution
// join the respective thread (which should return immediately)
threads.at(*completedID).join();
// get the result of the future (which should be ready)
// this makes sure exceptions are thrown if necessary
futures.at(*completedID).get();

// in case non-equivalence has been shown, the execution can be stopped
const auto* const checker = checkers.at(*completedID).get();
Expand Down Expand Up @@ -685,18 +654,8 @@ void EquivalenceCheckingManager::checkParallel() {
// it has to be checked, whether further simulations shall be
// conducted
if (results.startedSimulations < configuration.simulation.maxSims) {
threads[*completedID] = std::thread([&, this, id = *completedID] {
auto* const simChecker =
dynamic_cast<DDSimulationChecker*>(checkers[id].get());
{
const std::lock_guard stateGeneratorLock(stateGeneratorMutex);
simChecker->setRandomInitialState(stateGenerator);
}
if (!done) {
checkers[id]->run();
}
queue.push(id);
});
futures[*completedID] =
asyncRunChecker<DDSimulationChecker>(*completedID, queue);
++results.startedSimulations;
}
}
Expand All @@ -705,29 +664,13 @@ void EquivalenceCheckingManager::checkParallel() {
const auto end = std::chrono::steady_clock::now();
results.checkTime = std::chrono::duration<double>(end - start).count();

// cleanup threads that are still running by joining them
// start by joining all the completed threads, which should succeed
// instantly
while (!queue.empty()) {
const auto completedID = queue.waitAndPop();
auto& thread = threads.at(*completedID);
if (thread.joinable()) {
thread.join();
}
}

// afterwards, join all threads that are still (potentially) running.
// at the moment there seems to be no solution for prematurely killing
// running threads without risking synchronisation issues. on the positive
// side, joining should avoid all of these potential issues on the
// negative side, one of the threads might still be stuck in a
// long-running operation and does not check for the `done` signal until
// this operation completes
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
// Futures are not explicitly waited for here, since the destructor of the
// `std::future` object will block until the associated thread has finished.
// If any thread is still stuck in a long-running operation, this might take a
// while, but the program will terminate anyway. C++20 introduces
// `std::jthread`, which allows to explicitly cancel a thread. This could be a
// solution for the future to avoid this problem (and reduce the number of
// `isDone` checks).
}

void EquivalenceCheckingManager::checkSymbolic() {
Expand All @@ -736,11 +679,11 @@ void EquivalenceCheckingManager::checkSymbolic() {
// sets the `done` flag after the timeout has passed
std::thread timeoutThread{};
if (configuration.execution.timeout > 0.) {
timeoutThread = std::thread([&, timeout = std::chrono::duration<double>(
configuration.execution.timeout)] {
timeoutThread = std::thread([this, timeout = std::chrono::duration<double>(
configuration.execution.timeout)] {
std::unique_lock doneLock(doneMutex);
auto finished =
doneCond.wait_for(doneLock, timeout, [&] { return done; });
doneCond.wait_for(doneLock, timeout, [this] { return done; });
// if the thread has already finished within the timeout,
// nothing has to be done
if (!finished) {
Expand Down
2 changes: 1 addition & 1 deletion src/checker/dd/DDSimulationChecker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace ec {
DDSimulationChecker::DDSimulationChecker(const qc::QuantumComputation& circ1,
const qc::QuantumComputation& circ2,
Configuration config) noexcept
Configuration config)
: DDEquivalenceChecker(circ1, circ2, std::move(config)) {
initialState = dd->makeZeroState(static_cast<dd::QubitCount>(nqubits));
initializeApplicationScheme(configuration.application.simulationScheme);
Expand Down
28 changes: 28 additions & 0 deletions test/python/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,31 @@ def test_compiled_circuit_without_measurements() -> None:

result = qcec.verify(qc, qc_compiled)
assert result.equivalence == qcec.EquivalenceCriterion.equivalent


def test_cpp_exception_propagation_internal() -> None:
"""Test that C++ exceptions caused by code within QCEC are propagated correctly."""
qc = QuantumCircuit(1)
qc.x(0)

config = qcec.Configuration()
config.execution.run_alternating_checker = False
config.execution.run_simulation_checker = True
config.execution.run_construction_checker = False
config.execution.run_zx_checker = False
config.application.simulation_scheme = qcec.ApplicationScheme.lookahead

with pytest.raises(ValueError, match="Lookahead application scheme can only be used for matrices."):
qcec.verify(qc, qc, configuration=config)


def test_cpp_exception_propagation_external() -> None:
"""Test that C++ exceptions caused by code outside of QCEC are propagated correctly."""
qc = QuantumCircuit(129)
qc.x(range(129))

config = qcec.Configuration()
config.execution.run_zx_checker = False

with pytest.raises(ValueError, match="Requested too many qubits from package."):
qcec.verify(qc, qc, configuration=config)
27 changes: 27 additions & 0 deletions test/test_equality.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,30 @@ TEST_F(EqualityTest, AutomaticSwitchToConstructionChecker) {
ecm.setParallel(false);
EXPECT_THROW(ecm.run(), std::invalid_argument);
}

TEST_F(EqualityTest, ExceptionInParallelThread) {
qc1.x(0);

config = ec::Configuration{};
config.execution.runAlternatingChecker = false;
config.execution.runConstructionChecker = false;
config.execution.runSimulationChecker = true;
config.execution.runZXChecker = false;
config.application.simulationScheme = ec::ApplicationSchemeType::Lookahead;

ec::EquivalenceCheckingManager ecm(qc1, qc1, config);
EXPECT_THROW(ecm.run(), std::invalid_argument);
}

TEST_F(EqualityTest, ExceptionInParallelThread2) {
qc1.addQubitRegister(128U);
for (auto i = 0U; i <= 128U; ++i) {
qc1.x(i);
}

config = ec::Configuration{};
config.execution.runZXChecker = false;

ec::EquivalenceCheckingManager ecm(qc1, qc1, config);
EXPECT_THROW(ecm.run(), std::invalid_argument);
}

0 comments on commit 5e927b3

Please sign in to comment.