diff --git a/cpp/examples/tx_send.cpp b/cpp/examples/tx_send.cpp index 319ab3731..f2e8e4e21 100644 --- a/cpp/examples/tx_send.cpp +++ b/cpp/examples/tx_send.cpp @@ -33,6 +33,9 @@ #include #include +#include +#include + class tx_send : public proton::messaging_handler, proton::transaction_handler { private: proton::sender sender; @@ -40,9 +43,11 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { int total; int batch_size; int sent; + int batch_index = 0; int current_batch = 0; int committed = 0; int confirmed = 0; + proton::container *container; // proton::transaction_handler transaction_handler; proton::transaction transaction; @@ -56,68 +61,90 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { sender = c.open_sender(url); connection = sender.connection(); std::cout << " [on_container_start] declare_txn started..." << std::endl; - transaction = c.declare_transaction(connection, *this); - std::cout << " [on_container_start] completed!! txn: " << &transaction << std::endl; + c.declare_transaction(connection, *this); + std::cout << " [on_container_start] completed!!" << &transaction + << std::endl; } - void on_transaction_aborted(proton::transaction) {} void on_transaction_declare_failed(proton::transaction) {} - void on_transaction_commit_failed(proton::transaction) {} + void on_transaction_commit_failed(proton::transaction) { + std::cout << "Transaction Commit Failed" << std::endl; + connection.close(); + exit(-1); + } void on_transaction_declared(proton::transaction t) override { - std::cout << "[on_transaction_declared] txn: " << (&transaction) - << " new_txn: " << (t._impl->id) << std::endl; - connection.close(); - // transaction = &t; - // ASSUME: THIS FUNCTION DOESN"T WORK - // send(); + std::cout << "[on_transaction_declared] txn called " << (&t) + << std::endl; + // connection.close(); + std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty()) + << "\t" << transaction.is_empty() << std::endl; + transaction = t; + + send(sender); } void on_sendable(proton::sender &s) override { // send(); - // std::cout<<" [OnSendable] transaction: "<< &transaction << std::endl; - // send(s); + std::cout << " [OnSendable] transaction: " << &transaction + << std::endl; + send(s); } void send(proton::sender &s) { // TODO: Add more condition in while loop - // transaction != null - while ( sender.credit() && (committed + current_batch) < total) - { + while (!transaction.is_empty() && sender.credit() && + (committed + current_batch) < total) { proton::message msg; std::map m; m["sequence"] = committed + current_batch; msg.id(committed + current_batch + 1); msg.body(m); + std::cout << " [example] transaction send msg: " << msg + << std::endl; transaction.send(sender, msg); current_batch += 1; if(current_batch == batch_size) { - transaction.commit(); - // transaction = NULL; + std::cout << " >> Txn attempt commit" << std::endl; + if (batch_index % 2 == 0) { + transaction.commit(); + } else { + transaction.abort(); + } + + transaction = proton::transaction(); + batch_index++; } } - } void on_tracker_accept(proton::tracker &t) override { confirmed += 1; + std::cout << " [example] on_tracker_accept:" << confirmed + << std::endl; } void on_transaction_committed(proton::transaction t) override { committed += current_batch; + current_batch = 0; std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl; if(committed == total) { - std::cout << "All messages committed"; - // connection.close(); + std::cout << "All messages committed" << std::endl; + connection.close(); } else { - // current_batch = 0; - // container->declare_transaction(connection, transaction_handler); + container->declare_transaction(connection, *this); } } + void on_transaction_aborted(proton::transaction t) override { + std::cout << "Meesages Aborted ....." << std::endl; + current_batch = 0; + container->declare_transaction(connection, *this); + } + void on_sender_close(proton::sender &s) override { current_batch = 0; } @@ -126,8 +153,8 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { int main(int argc, char **argv) { std::string address("127.0.0.1:5672/examples"); - int message_count = 10; - int batch_size = 10; + int message_count = 6; + int batch_size = 3; example::options opts(argc, argv); opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); diff --git a/cpp/include/proton/transaction.hpp b/cpp/include/proton/transaction.hpp index a74f85d94..51759f796 100644 --- a/cpp/include/proton/transaction.hpp +++ b/cpp/include/proton/transaction.hpp @@ -40,7 +40,7 @@ class transaction_handler; // TODO: This should not be accessible to users. class transaction_impl { public: - proton::sender *txn_ctrl = nullptr; + proton::sender txn_ctrl; proton::transaction_handler *handler = nullptr; proton::binary id; proton::tracker _declare; @@ -54,6 +54,11 @@ class transaction_impl { proton::tracker send(proton::sender s, proton::message msg); void discharge(bool failed); + void release_pending(); + void accept(tracker &d); + void update(tracker &d, uint64_t state); + void set_id(binary _id); + proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); void handle_outcome(proton::tracker t); transaction_impl(proton::sender &_txn_ctrl, @@ -67,19 +72,21 @@ class transaction_impl { class PN_CPP_CLASS_EXTERN transaction { - // private: - // PN_CPP_EXTERN transaction(proton::sender& _txn_ctrl, - // proton::transaction_handler& _handler, bool _settle_before_discharge); + private: + // PN_CPP_EXTERN transaction(proton::sender& _txn_ctrl, + // proton::transaction_handler& _handler, bool _settle_before_discharge); + + static transaction mk_transaction_impl(sender &s, transaction_handler &h, + bool f); + PN_CPP_EXTERN transaction(transaction_impl *impl); + transaction_impl *_impl; - static transaction mk_transaction_impl(sender &s, transaction_handler &h, - bool f); - PN_CPP_EXTERN transaction(transaction_impl* impl); public: - transaction_impl* _impl; - // TODO: + // TODO: // PN_CPP_EXTERN transaction(transaction &o); PN_CPP_EXTERN transaction(); PN_CPP_EXTERN ~transaction(); + PN_CPP_EXTERN bool is_empty(); PN_CPP_EXTERN void commit(); PN_CPP_EXTERN void abort(); PN_CPP_EXTERN void declare(); diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp index 9a9a048bf..dcf79f777 100644 --- a/cpp/src/messaging_adapter.cpp +++ b/cpp/src/messaging_adapter.cpp @@ -71,12 +71,8 @@ void on_link_flow(messaging_handler& handler, pn_event_t* event) { // TODO: process session flow data, if no link-specific data, just return. if (!lnk) return; int state = pn_link_state(lnk); - if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) { - std::cout << " on_link_flow, type: PN_COORDINATOR" << std::endl; - return; - - } - if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) { + if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR || + ((state & PN_LOCAL_ACTIVE) && (state & PN_REMOTE_ACTIVE))) { link_context& lctx = link_context::get(lnk); if (pn_link_is_sender(lnk)) { if (pn_link_credit(lnk) > 0) { @@ -123,13 +119,36 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) { Tracing& ot = Tracing::getTracing(); if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) { // delivery d(make_wrapper(dlv)); - pn_disposition_t *disposition = pn_delivery_remote(dlv); - proton::value val(pn_disposition_data(disposition)); - std::cout << " on_delivery: COOORINDATOR.. tracker: " << val - << std::endl; - tracker t(make_wrapper(dlv)); + // pn_disposition_t *disposition = pn_delivery_remote(dlv); + // proton::value val(pn_disposition_data(disposition)); + // std::cout << " on_delivery: COOORINDATOR.. tracker: " << val + // << std::endl; + // tracker t(make_wrapper(dlv)); std::cout << " on_delivery: COOORINDATOR.. TRACKER MADE: " << std::endl; + + if (pn_delivery_updated(dlv)) { + tracker t(make_wrapper(dlv)); + ot.on_settled_span(t); + switch (pn_delivery_remote_state(dlv)) { + case PN_ACCEPTED: + handler.on_tracker_accept(t); + break; + case PN_REJECTED: + handler.on_tracker_reject(t); + break; + case PN_RELEASED: + case PN_MODIFIED: + handler.on_tracker_release(t); + break; + } + if (t.settled()) { + handler.on_tracker_settle(t); + if (lctx.auto_settle) + t.settle(); + } + } + // t.user_data = val; // not // proton::disposition _disposition = make_wrapper(disposition); // # @@ -140,7 +159,7 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) { // std::cout<< " on_delivery: COOORINDATOR with TXN IN :" // << val2 << std::endl; - handler.on_tracker_settle(t); + // handler.on_tracker_settle(t); } else if (pn_link_is_receiver(lnk)) { delivery d(make_wrapper(dlv)); if (pn_delivery_aborted(dlv)) { diff --git a/cpp/src/proactor_container_impl.cpp b/cpp/src/proactor_container_impl.cpp index 00ab38bef..8fd238b40 100644 --- a/cpp/src/proactor_container_impl.cpp +++ b/cpp/src/proactor_container_impl.cpp @@ -872,36 +872,16 @@ void container::impl::stop(const proton::error_condition& err) { transaction container::impl::declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge) { class InternalTransactionHandler : public proton::messaging_handler { // TODO: auto_settle + void on_tracker_settle(proton::tracker &t) override { std::cout<<" [InternalTransactionHandler][on_tracker_settle] called with tracker.txn" << std::endl; - t.transaction().handle_outcome(t); - - // t.user_data = val; // not - - // proton::disposition _disposition = make_wrapper(disposition); - // // # t.remote(); - - // proton::value val2 = _disposition.data(); - - // proton::disposition _disposition = t.remote(); - - // proton::value val = _disposition.data(); - - // std::cout<< " declare_transaction: on_tracker_settle with - // TXN IN :" << val << std::endl; - - // if(t.transaction()) { - // t.transaction().handle_outcome(t); - // } + if (!t.transaction().is_empty()) { + t.transaction().handle_outcome(t); + } } - - // TODO: Add on_unhandled function }; - // TODO: Sender should be created only once. (May be use Singleton Class) - // proton::target_options t; - proton::target_options t; std::vector cap = {proton::symbol("amqp:local-transactions")}; t.capabilities(cap); @@ -909,15 +889,12 @@ transaction container::impl::declare_transaction(proton::connection conn, proton proton::sender_options so; so.name("txn-ctrl"); - // Todo: Check the value, Or by deafult null? - //so.source() ? so.target(t); - // TODO: FIX STATIC static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it so.handler(internal_handler); std::cout<<" [declare_transaction] txn-name sender open with handler: " << &internal_handler << std::endl; - proton::sender s = conn.open_sender("does not matter", so); + static proton::sender s = conn.open_sender("does not matter", so); settle_before_discharge = false; diff --git a/cpp/src/transaction.cpp b/cpp/src/transaction.cpp index b7865cf7b..f67c08d9d 100644 --- a/cpp/src/transaction.cpp +++ b/cpp/src/transaction.cpp @@ -21,9 +21,11 @@ #include "proton/transaction.hpp" #include "proton/delivery.h" +#include "proton/delivery.hpp" #include "proton/message.hpp" #include "proton/target_options.hpp" #include "proton/tracker.hpp" +#include "proton/transfer.hpp" #include "proton_bits.hpp" #include @@ -51,6 +53,7 @@ transaction::~transaction() = default; void transaction::commit() { _impl->commit(); }; void transaction::abort() { _impl->abort(); }; void transaction::declare() { _impl->declare(); }; +bool transaction::is_empty() { return _impl == NULL; }; proton::tracker transaction::send(proton::sender s, proton::message msg) { return _impl->send(s, msg); }; @@ -60,11 +63,10 @@ void transaction::handle_outcome(proton::tracker t) { _impl->handle_outcome(t); }; - -transaction_impl::transaction_impl(proton::sender& _txn_ctrl, proton::transaction_handler& _handler, bool _settle_before_discharge): - txn_ctrl(&_txn_ctrl), - handler(&_handler) -{ +transaction_impl::transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge) + : txn_ctrl(_txn_ctrl), handler(&_handler) { // bool settle_before_discharge = _settle_before_discharge; declare(); } @@ -95,11 +97,19 @@ void transaction_impl::declare() { << std::endl; } -void transaction_impl::discharge(bool failed) { - failed = failed; - proton::symbol descriptor("amqp:declare:list"); - proton::value _value; - proton::tracker discharge = send_ctrl(descriptor, _value); +void transaction_impl::discharge(bool _failed) { + failed = _failed; + proton::symbol descriptor("amqp:discharge:list"); + std::list vd; + vd.push_back(id); + vd.push_back(failed); + proton::value _value = vd; + _discharge = send_ctrl(descriptor, _value); +} + +void transaction_impl::set_id(binary _id) { + std::cout << " TXN ID: " << _id << " from " << this << std::endl; + id = _id; } proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::value _value) { @@ -113,7 +123,7 @@ proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::v proton::message msg = msg_value; std::cout << " [transaction_impl::send_ctrl] sending " << msg << std::endl; - proton::tracker delivery = txn_ctrl->send(msg); + proton::tracker delivery = txn_ctrl.send(msg); std::cout << " # declare, delivery as tracker: " << delivery << std::endl; delivery.transaction(transaction(this)); @@ -125,32 +135,101 @@ proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::v proton::tracker transaction_impl::send(proton::sender s, proton::message msg) { proton::tracker tracker = s.send(msg); + std::cout << " transaction_impl::send " << id << ", done: " << msg + << " tracker: " << tracker << std::endl; + update(tracker, 0x34); + std::cout << " transaction_impl::send, update" << std::endl; return tracker; } +void transaction_impl::accept(tracker &t) { + // TODO: settle-before-discharge + t.settle(); + // pending.push_back(d); +} + +// TODO: use enum transfer::state +void transaction_impl::update(tracker &t, uint64_t state) { + if (state) { + proton::value data(pn_disposition_data(pn_delivery_local(unwrap(t)))); + std::list data_to_send; + data_to_send.push_back(id); + data = data_to_send; + + pn_delivery_update(unwrap(t), state); + // pn_delivery_settle(o); + // delivery.update(0x34) + } +} + +void transaction_impl::release_pending() { + for (auto d : pending) { + // d.update(released); + // d.settle(); + // TODO: fix it + delivery d2(make_wrapper(unwrap(d))); + d2.release(); + } + pending.clear(); +} + void transaction_impl::handle_outcome(proton::tracker t) { // std::vector _data = // proton::get>(val); auto txn = t.transaction(); - std::cout << " handle_outcome::txn_impl i am is " << this << std::endl; - std::cout << " handle_outcome::_declare is " << _declare << std::endl; - std::cout << " handle_outcome::tracker is " << t << std::endl; + std::cout << " ## handle_outcome::txn_impl i am is " << this << std::endl; + std::cout << " ## handle_outcome::_declare is " << _declare << std::endl; + std::cout << " ## handle_outcome::tracker is " << t << std::endl; + pn_disposition_t *disposition = pn_delivery_remote(unwrap(t)); // TODO: handle outcome if(_declare == t) { - std::cout<<" transaction_impl::handle_outcome => got _declare" << std::endl; - pn_disposition_t *disposition = pn_delivery_remote(unwrap(t)); + std::cout << " transaction_impl::handle_outcome => got _declare" + << std::endl; proton::value val(pn_disposition_data(disposition)); auto vd = get>(val); - txn._impl->id = vd[0]; - std::cout << " transaction_impl: handle_outcome.. got txnid:: " - << vd[0] << std::endl; - handler->on_transaction_declared(txn); + if (vd.size() > 0) { + txn._impl->set_id(vd[0]); + std::cout << " transaction_impl: handle_outcome.. txn_declared " + "got txnid:: " + << vd[0] << std::endl; + handler->on_transaction_declared(txn); + } else if (pn_disposition_is_failed(disposition)) { + std::cout << " transaction_impl: handle_outcome.. " + "txn_declared_failed pn_disposition_is_failed " + << std::endl; + handler->on_transaction_declare_failed(txn); + } else { + std::cout + << " transaction_impl: handle_outcome.. txn_declared_failed " + << std::endl; + handler->on_transaction_declare_failed(txn); + } } else if (_discharge == t) { - std::cout << " transaction_impl::handle_outcome => got _discharge" - << std::endl; - handler->on_transaction_committed(txn); + if (pn_disposition_is_failed(disposition)) { + if (!failed) { + std::cout + << " transaction_impl: handle_outcome.. commit failed " + << std::endl; + handler->on_transaction_commit_failed(txn); + // release pending + } + } else { + if (failed) { + handler->on_transaction_aborted(txn); + std::cout + << " transaction_impl: handle_outcome.. txn aborted" + << std::endl; + // release pending + } else { + handler->on_transaction_committed(txn); + std::cout + << " transaction_impl: handle_outcome.. txn commited" + << std::endl; + } + } + pending.clear(); } else { std::cout << " transaction_impl::handle_outcome => got NONE!" << std::endl;