Skip to content

Commit

Permalink
Add transaction commit and abort functionalities
Browse files Browse the repository at this point in the history
  • Loading branch information
DreamPearl committed Nov 21, 2024
1 parent 0e20334 commit eb4514b
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 96 deletions.
75 changes: 51 additions & 24 deletions cpp/examples/tx_send.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,21 @@
#include <map>
#include <string>

#include <chrono>
#include <thread>

class tx_send : public proton::messaging_handler, proton::transaction_handler {
private:
proton::sender sender;
std::string url;
int total;
int batch_size;
int sent;
int batch_index = 0;
int current_batch = 0;
int committed = 0;
int confirmed = 0;

proton::container *container;
// proton::transaction_handler transaction_handler;
proton::transaction transaction;
Expand All @@ -56,68 +61,90 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
sender = c.open_sender(url);
connection = sender.connection();
std::cout << " [on_container_start] declare_txn started..." << std::endl;
transaction = c.declare_transaction(connection, *this);
std::cout << " [on_container_start] completed!! txn: " << &transaction << std::endl;
c.declare_transaction(connection, *this);
std::cout << " [on_container_start] completed!!" << &transaction
<< std::endl;
}

void on_transaction_aborted(proton::transaction) {}
void on_transaction_declare_failed(proton::transaction) {}
void on_transaction_commit_failed(proton::transaction) {}
void on_transaction_commit_failed(proton::transaction) {
std::cout << "Transaction Commit Failed" << std::endl;
connection.close();
exit(-1);
}

void on_transaction_declared(proton::transaction t) override {
std::cout << "[on_transaction_declared] txn: " << (&transaction)
<< " new_txn: " << (t._impl->id) << std::endl;
connection.close();
// transaction = &t;
// ASSUME: THIS FUNCTION DOESN"T WORK
// send();
std::cout << "[on_transaction_declared] txn called " << (&t)
<< std::endl;
// connection.close();
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
<< "\t" << transaction.is_empty() << std::endl;
transaction = t;

send(sender);
}

void on_sendable(proton::sender &s) override {
// send();
// std::cout<<" [OnSendable] transaction: "<< &transaction << std::endl;
// send(s);
std::cout << " [OnSendable] transaction: " << &transaction
<< std::endl;
send(s);
}

void send(proton::sender &s) {
// TODO: Add more condition in while loop
// transaction != null
while ( sender.credit() && (committed + current_batch) < total)
{
while (!transaction.is_empty() && sender.credit() &&
(committed + current_batch) < total) {
proton::message msg;
std::map<std::string, int> m;
m["sequence"] = committed + current_batch;

msg.id(committed + current_batch + 1);
msg.body(m);
std::cout << " [example] transaction send msg: " << msg
<< std::endl;
transaction.send(sender, msg);
current_batch += 1;
if(current_batch == batch_size)
{
transaction.commit();
// transaction = NULL;
std::cout << " >> Txn attempt commit" << std::endl;
if (batch_index % 2 == 0) {
transaction.commit();
} else {
transaction.abort();
}

transaction = proton::transaction();
batch_index++;
}
}

}

void on_tracker_accept(proton::tracker &t) override {
confirmed += 1;
std::cout << " [example] on_tracker_accept:" << confirmed
<< std::endl;
}

void on_transaction_committed(proton::transaction t) override {
committed += current_batch;
current_batch = 0;
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
if(committed == total) {
std::cout << "All messages committed";
// connection.close();
std::cout << "All messages committed" << std::endl;
connection.close();
}
else {
// current_batch = 0;
// container->declare_transaction(connection, transaction_handler);
container->declare_transaction(connection, *this);
}
}

void on_transaction_aborted(proton::transaction t) override {
std::cout << "Meesages Aborted ....." << std::endl;
current_batch = 0;
container->declare_transaction(connection, *this);
}

void on_sender_close(proton::sender &s) override {
current_batch = 0;
}
Expand All @@ -126,8 +153,8 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {

int main(int argc, char **argv) {
std::string address("127.0.0.1:5672/examples");
int message_count = 10;
int batch_size = 10;
int message_count = 6;
int batch_size = 3;
example::options opts(argc, argv);

opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
Expand Down
25 changes: 16 additions & 9 deletions cpp/include/proton/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class transaction_handler;
// TODO: This should not be accessible to users.
class transaction_impl {
public:
proton::sender *txn_ctrl = nullptr;
proton::sender txn_ctrl;
proton::transaction_handler *handler = nullptr;
proton::binary id;
proton::tracker _declare;
Expand All @@ -54,6 +54,11 @@ class transaction_impl {
proton::tracker send(proton::sender s, proton::message msg);

void discharge(bool failed);
void release_pending();
void accept(tracker &d);
void update(tracker &d, uint64_t state);
void set_id(binary _id);

proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
void handle_outcome(proton::tracker t);
transaction_impl(proton::sender &_txn_ctrl,
Expand All @@ -67,19 +72,21 @@ class transaction_impl {

class
PN_CPP_CLASS_EXTERN transaction {
// private:
// PN_CPP_EXTERN transaction(proton::sender& _txn_ctrl,
// proton::transaction_handler& _handler, bool _settle_before_discharge);
private:
// PN_CPP_EXTERN transaction(proton::sender& _txn_ctrl,
// proton::transaction_handler& _handler, bool _settle_before_discharge);

static transaction mk_transaction_impl(sender &s, transaction_handler &h,
bool f);
PN_CPP_EXTERN transaction(transaction_impl *impl);
transaction_impl *_impl;

static transaction mk_transaction_impl(sender &s, transaction_handler &h,
bool f);
PN_CPP_EXTERN transaction(transaction_impl* impl);
public:
transaction_impl* _impl;
// TODO:
// TODO:
// PN_CPP_EXTERN transaction(transaction &o);
PN_CPP_EXTERN transaction();
PN_CPP_EXTERN ~transaction();
PN_CPP_EXTERN bool is_empty();
PN_CPP_EXTERN void commit();
PN_CPP_EXTERN void abort();
PN_CPP_EXTERN void declare();
Expand Down
43 changes: 31 additions & 12 deletions cpp/src/messaging_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,8 @@ void on_link_flow(messaging_handler& handler, pn_event_t* event) {
// TODO: process session flow data, if no link-specific data, just return.
if (!lnk) return;
int state = pn_link_state(lnk);
if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
std::cout << " on_link_flow, type: PN_COORDINATOR" << std::endl;
return;

}
if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) {
if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR ||
((state & PN_LOCAL_ACTIVE) && (state & PN_REMOTE_ACTIVE))) {
link_context& lctx = link_context::get(lnk);
if (pn_link_is_sender(lnk)) {
if (pn_link_credit(lnk) > 0) {
Expand Down Expand Up @@ -123,13 +119,36 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) {
Tracing& ot = Tracing::getTracing();
if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
// delivery d(make_wrapper<delivery>(dlv));
pn_disposition_t *disposition = pn_delivery_remote(dlv);
proton::value val(pn_disposition_data(disposition));
std::cout << " on_delivery: COOORINDATOR.. tracker: " << val
<< std::endl;
tracker t(make_wrapper<tracker>(dlv));
// pn_disposition_t *disposition = pn_delivery_remote(dlv);
// proton::value val(pn_disposition_data(disposition));
// std::cout << " on_delivery: COOORINDATOR.. tracker: " << val
// << std::endl;
// tracker t(make_wrapper<tracker>(dlv));
std::cout << " on_delivery: COOORINDATOR.. TRACKER MADE: "
<< std::endl;

if (pn_delivery_updated(dlv)) {
tracker t(make_wrapper<tracker>(dlv));
ot.on_settled_span(t);
switch (pn_delivery_remote_state(dlv)) {
case PN_ACCEPTED:
handler.on_tracker_accept(t);
break;
case PN_REJECTED:
handler.on_tracker_reject(t);
break;
case PN_RELEASED:
case PN_MODIFIED:
handler.on_tracker_release(t);
break;
}
if (t.settled()) {
handler.on_tracker_settle(t);
if (lctx.auto_settle)
t.settle();
}
}

// t.user_data = val; // not

// proton::disposition _disposition = make_wrapper(disposition); // #
Expand All @@ -140,7 +159,7 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) {
// std::cout<< " on_delivery: COOORINDATOR with TXN IN :"
// << val2 << std::endl;

handler.on_tracker_settle(t);
// handler.on_tracker_settle(t);
} else if (pn_link_is_receiver(lnk)) {
delivery d(make_wrapper<delivery>(dlv));
if (pn_delivery_aborted(dlv)) {
Expand Down
33 changes: 5 additions & 28 deletions cpp/src/proactor_container_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,52 +872,29 @@ void container::impl::stop(const proton::error_condition& err) {
transaction container::impl::declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge) {
class InternalTransactionHandler : public proton::messaging_handler {
// TODO: auto_settle

void on_tracker_settle(proton::tracker &t) override {
std::cout<<" [InternalTransactionHandler][on_tracker_settle] called with tracker.txn"
<< std::endl;
t.transaction().handle_outcome(t);

// t.user_data = val; // not

// proton::disposition _disposition = make_wrapper(disposition);
// // # t.remote();

// proton::value val2 = _disposition.data();

// proton::disposition _disposition = t.remote();

// proton::value val = _disposition.data();

// std::cout<< " declare_transaction: on_tracker_settle with
// TXN IN :" << val << std::endl;

// if(t.transaction()) {
// t.transaction().handle_outcome(t);
// }
if (!t.transaction().is_empty()) {
t.transaction().handle_outcome(t);
}
}

// TODO: Add on_unhandled function
};

// TODO: Sender should be created only once. (May be use Singleton Class)
// proton::target_options t;

proton::target_options t;
std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
t.capabilities(cap);
t.type(PN_COORDINATOR);

proton::sender_options so;
so.name("txn-ctrl");
// Todo: Check the value, Or by deafult null?
//so.source() ?
so.target(t);
// TODO: FIX STATIC
static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it
so.handler(internal_handler);
std::cout<<" [declare_transaction] txn-name sender open with handler: " << &internal_handler << std::endl;

proton::sender s = conn.open_sender("does not matter", so);
static proton::sender s = conn.open_sender("does not matter", so);

settle_before_discharge = false;

Expand Down
Loading

0 comments on commit eb4514b

Please sign in to comment.