Skip to content

Commit

Permalink
Merge pull request #529 from dsplayerX/fix_524
Browse files Browse the repository at this point in the history
Fix transaction panics in endTransaction with "transaction not found"
  • Loading branch information
gimantha authored Jan 3, 2024
2 parents 2037cb9 + ee909bf commit b3bfec5
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 74 deletions.
4 changes: 2 additions & 2 deletions transaction-ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

[ballerina]
dependencies-toml-version = "2"
distribution-version = "2201.8.0-20230726-145300-b2bdf796"
distribution-version = "2201.8.0-20230908-135700-74a59dff"

[[package]]
org = "ballerina"
Expand Down Expand Up @@ -64,7 +64,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "http"
version = "2.10.0"
version = "2.10.3"
dependencies = [
{org = "ballerina", name = "auth"},
{org = "ballerina", name = "cache"},
Expand Down
151 changes: 90 additions & 61 deletions transaction-ballerina/commons.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/cache;
import ballerina/http;
import ballerina/lang.'transaction as lang_trx;
import ballerina/lang.'value as value;
import ballerina/log;
import ballerina/uuid;
import ballerina/task;
import ballerina/time;
import ballerina/lang.'transaction as lang_trx;
import ballerina/lang.'value as value;
import ballerina/uuid;

# ID of the local participant used when registering with the initiator.
string localParticipantId = uuid:createType4AsString();
Expand All @@ -30,7 +29,8 @@ string localParticipantId = uuid:createType4AsString();
map<TwoPhaseCommitTransaction> initiatedTransactions = {};

# This map is used for caching transaction that are this Ballerina instance participates in.
@tainted map<TwoPhaseCommitTransaction> participatedTransactions = {};
@tainted
map<TwoPhaseCommitTransaction> participatedTransactions = {};

# This cache is used for caching HTTP connectors against the URL, since creating connectors is expensive.
cache:Cache httpClientCache = new;
Expand All @@ -55,8 +55,8 @@ function cleanupTransactions() returns error? {
while (i < participatedTransactionsArr.length()) {
var twopcTxn = participatedTransactionsArr[i];
i += 1;
//TODO: commenting due to a caching issue
//foreach var twopcTxn in participatedTransactions {
//TODO: commenting due to a caching issue
//foreach var twopcTxn in participatedTransactions {
final string participatedTxnId = getParticipatedTransactionId(twopcTxn.transactionId,
twopcTxn.transactionBlockId);
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 120d) {
Expand Down Expand Up @@ -85,20 +85,23 @@ function cleanupTransactions() returns error? {
}
}
}
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= <decimal> 600) {
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= <decimal>600) {
// We don't want dead transactions hanging around
removeParticipatedTransaction(participatedTxnId);
}
}
}
worker w2 returns () {
TwoPhaseCommitTransaction[] initiatedTransactionsArr = initiatedTransactions.toArray();
TwoPhaseCommitTransaction[] initiatedTransactionsArr;
lock {
initiatedTransactionsArr = initiatedTransactions.toArray();
}
int i = 0;
while(i < initiatedTransactionsArr.length()) {
while (i < initiatedTransactionsArr.length()) {
var twopcTxn = initiatedTransactionsArr[i];
i += 1;
//TODO:commenting due to a caching issue
//foreach var twopcTxn in initiatedTransactions {
//TODO:commenting due to a caching issue
//foreach var twopcTxn in initiatedTransactions {
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= <decimal>120) {
if (twopcTxn.state != TXN_STATE_ABORTED) {
// Commit the transaction since prepare hasn't been received
Expand All @@ -114,7 +117,7 @@ function cleanupTransactions() returns error? {
}
}
}
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= <decimal> 600) {
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= <decimal>600) {
// We don't want dead transactions hanging around
removeInitiatedTransaction(twopcTxn.transactionId);
}
Expand All @@ -125,7 +128,6 @@ function cleanupTransactions() returns error? {
return value;
}


function isRegisteredParticipant(string participantId, map<Participant> participants) returns boolean {
return participants.hasKey(participantId);
}
Expand All @@ -135,8 +137,8 @@ function isValidCoordinationType(string coordinationType) returns boolean {
while (i < coordinationTypes.length()) {
var coordType = coordinationTypes[i];
i += 1;
//TODO:commenting due to caching issue;
//foreach var coordType in coordinationTypes {
//TODO:commenting due to caching issue;
//foreach var coordType in coordinationTypes {
if (coordinationType == coordType) {
return true;
}
Expand All @@ -148,27 +150,27 @@ function protoName(UProtocol p) returns string {
if (p is RemoteProtocol) {
return p.name;
} else {
return <string> p.name;
return <string>p.name;
}
}

function protocolCompatible(string coordinationType, UProtocol?[] participantProtocols) returns boolean {
boolean participantProtocolIsValid = false;
string[] validProtocols = coordinationTypeToProtocolsMap[coordinationType] ?: [];
int i = 0;
while ( i < participantProtocols.length()) {
while (i < participantProtocols.length()) {
var p = participantProtocols[i];
i += 1;
//TODO: commenting due to a caching issue
//foreach var p in participantProtocols {
//TODO: commenting due to a caching issue
//foreach var p in participantProtocols {
if (p is UProtocol) {
UProtocol participantProtocol = p;
int j = 0;
while (j < validProtocols.length()) {
var validProtocol = validProtocols[j];
j += 1;
//TODO: commenting due to a caching issue
//foreach var validProtocol in validProtocols {
//TODO: commenting due to a caching issue
//foreach var validProtocol in validProtocols {
if (protoName(participantProtocol) == validProtocol) {
participantProtocolIsValid = true;
break;
Expand All @@ -188,11 +190,12 @@ type JsonTypedesc typedesc<json>;

function respondToBadRequest(http:Caller ep, string msg) {
log:printError(msg);
http:Response res = new; res.statusCode = http:STATUS_BAD_REQUEST;
RequestError requestError = {errorMessage:msg};
http:Response res = new;
res.statusCode = http:STATUS_BAD_REQUEST;
RequestError requestError = {errorMessage: msg};
var resPayload = requestError.cloneWithType(JsonTypedesc);
if (resPayload is json) {
res.setJsonPayload(<@untainted json> resPayload);
res.setJsonPayload(<@untainted json>resPayload);
var resResult = ep->respond(res);
if (resResult is error) {
log:printError("Could not send Bad Request error response to caller", 'error = resResult);
Expand Down Expand Up @@ -220,7 +223,7 @@ function getParticipantProtocolAt(string protocolName, string transactionBlockId
# corresponding to the coordinationType will also be created and stored as an initiated transaction.
#
# + coordinationType - The type of the coordination relevant to the transaction block for which this TransactionContext
# is being created for.
# is being created for.
# + transactionBlockId - The ID of the transaction block.
# + return - TransactionContext if the coordination type is valid or an error in case of an invalid coordination type.
function createTransactionContext(string coordinationType, string transactionBlockId) returns TransactionContext|error {
Expand All @@ -229,15 +232,17 @@ function createTransactionContext(string coordinationType, string transactionBlo
error err = error(msg);
return err;
} else {
TwoPhaseCommitTransaction txn = new(uuid(), transactionBlockId, coordinationType = coordinationType);
TwoPhaseCommitTransaction txn = new (uuid(), transactionBlockId, coordinationType = coordinationType);
string txnId = txn.transactionId;
txn.isInitiated = true;
initiatedTransactions[txnId] = txn;
lock {
initiatedTransactions[txnId] = txn;
}
TransactionContext txnContext = {
transactionId:txnId,
transactionBlockId:transactionBlockId,
coordinationType:coordinationType,
registerAtURL:"http://" + value:toString(coordinatorHost) + ":" + value:toString(coordinatorPort) +
transactionId: txnId,
transactionBlockId: transactionBlockId,
coordinationType: coordinationType,
registerAtURL: "http://" + value:toString(coordinatorHost) + ":" + value:toString(coordinatorPort) +
initiatorCoordinatorBasePath + "/" + transactionBlockId + registrationPath
};
return txnContext;
Expand All @@ -254,38 +259,46 @@ function createTransactionContext(string coordinationType, string transactionBlo
# + return - TransactionContext if the registration is successul or an error in case of a failure.
function registerLocalParticipantWithInitiator(string transactionId, string transactionBlockId, string registerAtURL)
returns TransactionContext|error {

final string trxId = transactionId;
final string participantId = getParticipantId(transactionBlockId);
//TODO: Protocol name should be passed down from the transaction statement
LocalProtocol participantProtocol = {name:PROTOCOL_DURABLE};
var initiatedTxn = initiatedTransactions[transactionId];
LocalProtocol participantProtocol = {name: PROTOCOL_DURABLE};
TwoPhaseCommitTransaction? initiatedTxn;
lock {
initiatedTxn = initiatedTransactions[transactionId];
}
if (initiatedTxn is ()) {
return error lang_trx:Error("Transaction-Unknown. Invalid TID:" + transactionId);
} else {
if (isRegisteredParticipant(participantId, initiatedTxn.participants)) { // Already-Registered
log:printDebug("Already-Registered. TID:" + trxId + ", participant ID:" + participantId);
TransactionContext txnCtx = {
transactionId:transactionId, transactionBlockId:transactionBlockId,
coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL
transactionId: transactionId,
transactionBlockId: transactionBlockId,
coordinationType: TWO_PHASE_COMMIT,
registerAtURL: registerAtURL
};
return txnCtx;
} else if (!protocolCompatible(initiatedTxn.coordinationType, [participantProtocol])) { // Invalid-Protocol
return error lang_trx:Error("Invalid-Protocol in local participant. TID:" + transactionId + ",participantID:" +
participantId);
} else {
//Set initiator protocols
TwoPhaseCommitTransaction participatedTxn = new(transactionId, transactionBlockId);
TwoPhaseCommitTransaction participatedTxn = new (transactionId, transactionBlockId);
//Protocol initiatorProto = {name: PROTOCOL_DURABLE, transactionBlockId:transactionBlockId};
//participatedTxn.coordinatorProtocols = [initiatorProto];

LocalParticipant participant = new(participantId, participatedTxn, [participantProtocol]);
LocalParticipant participant = new (participantId, participatedTxn, [participantProtocol]);
initiatedTxn.participants[participantId] = participant;

string participatedTxnId = getParticipatedTransactionId(transactionId, transactionBlockId);
participatedTransactions[participatedTxnId] = participatedTxn;
TransactionContext txnCtx = {transactionId:transactionId, transactionBlockId:transactionBlockId,
coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL};
TransactionContext txnCtx = {
transactionId: transactionId,
transactionBlockId: transactionBlockId,
coordinationType: TWO_PHASE_COMMIT,
registerAtURL: registerAtURL
};
log:printDebug("Registered local participant: " + participantId + " for transaction:" + trxId);
return txnCtx;
}
Expand All @@ -299,24 +312,34 @@ function removeParticipatedTransaction(string participatedTxnId) {
}
}

function hasInitiatedTransaction(string transactionId) returns boolean {
lock {
return initiatedTransactions.hasKey(transactionId);
}
}

function removeInitiatedTransaction(string transactionId) {
var removed = trap initiatedTransactions.remove(transactionId);
if (removed is error) {
panic error lang_trx:Error("Removing initiated transaction: " + transactionId + " failed");
lock {
var removed = trap initiatedTransactions.remove(transactionId);
if (removed is error) {
panic error lang_trx:Error("Removing initiated transaction: " + transactionId + " failed");
}
}
}

function getInitiatorClient(string registerAtURL) returns InitiatorClientEP {
InitiatorClientEP initiatorEP;
if (httpClientCache.hasKey(registerAtURL)) {
return <InitiatorClientEP> checkpanic httpClientCache.get(registerAtURL);
return <InitiatorClientEP>checkpanic httpClientCache.get(registerAtURL);
} else {
lock {
if (httpClientCache.hasKey(registerAtURL)) {
return <InitiatorClientEP> checkpanic httpClientCache.get(registerAtURL);
return <InitiatorClientEP>checkpanic httpClientCache.get(registerAtURL);
}
initiatorEP = new({ registerAtURL: registerAtURL, timeout: 15,
retryConfig: { count: 2, interval: 5 }
initiatorEP = new ({
registerAtURL: registerAtURL,
timeout: 15,
retryConfig: {count: 2, interval: 5}
});
cache:Error? result = httpClientCache.put(registerAtURL, initiatorEP);
if (result is cache:Error) {
Expand All @@ -330,15 +353,17 @@ function getInitiatorClient(string registerAtURL) returns InitiatorClientEP {

function getParticipant2pcClient(string participantURL) returns Participant2pcClientEP {
Participant2pcClientEP participantEP;
if (httpClientCache.hasKey(<@untainted> participantURL)) {
return <Participant2pcClientEP> checkpanic httpClientCache.get(<@untainted>participantURL);
if (httpClientCache.hasKey(<@untainted>participantURL)) {
return <Participant2pcClientEP>checkpanic httpClientCache.get(<@untainted>participantURL);
} else {
lock {
if (httpClientCache.hasKey(<@untainted> participantURL)) {
return <Participant2pcClientEP> checkpanic httpClientCache.get(<@untainted>participantURL);
if (httpClientCache.hasKey(<@untainted>participantURL)) {
return <Participant2pcClientEP>checkpanic httpClientCache.get(<@untainted>participantURL);
}
participantEP = new({ participantURL: participantURL,
timeout: 15, retryConfig: { count: 2, interval: 5 }
participantEP = new ({
participantURL: participantURL,
timeout: 15,
retryConfig: {count: 2, interval: 5}
});
cache:Error? result = httpClientCache.put(participantURL, participantEP);
if (result is cache:Error) {
Expand All @@ -352,13 +377,13 @@ function getParticipant2pcClient(string participantURL) returns Participant2pcCl

# Registers a participant with the initiator's coordinator. This function will be called by the participant.
#
# + transactionId - Global transaction ID to which this participant is registering with.
# + transactionId - Global transaction ID to which this participant is registering with.
# + transactionBlockId - The local ID of the transaction block on the participant.
# + registerAtURL - The URL of the coordinator.
# + participantProtocols - The coordination protocals supported by the participant.
# + return - TransactionContext if the registration is successful or an error in case of a failure.
function registerParticipantWithRemoteInitiator(string transactionId, string transactionBlockId,
string registerAtURL, RemoteProtocol[] participantProtocols)
string registerAtURL, RemoteProtocol[] participantProtocols)
returns TransactionContext|error {

InitiatorClientEP initiatorEP = getInitiatorClient(registerAtURL);
Expand All @@ -368,8 +393,10 @@ function registerParticipantWithRemoteInitiator(string transactionId, string tra
if (participatedTransactions.hasKey(participatedTxnId)) {
log:printDebug("Already registered with initiator for transaction:" + participatedTxnId);
TransactionContext txnCtx = {
transactionId:transactionId, transactionBlockId:transactionBlockId,
coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL
transactionId: transactionId,
transactionBlockId: transactionBlockId,
coordinationType: TWO_PHASE_COMMIT,
registerAtURL: registerAtURL
};
return txnCtx;
}
Expand All @@ -385,12 +412,14 @@ function registerParticipantWithRemoteInitiator(string transactionId, string tra
return error lang_trx:Error(msg);
} else {
RemoteProtocol[] coordinatorProtocols = result.coordinatorProtocols;
TwoPhaseCommitTransaction twopcTxn = new(transactionId, transactionBlockId);
TwoPhaseCommitTransaction twopcTxn = new (transactionId, transactionBlockId);
twopcTxn.coordinatorProtocols = toProtocolArray(coordinatorProtocols);
participatedTransactions[participatedTxnId] = twopcTxn;
TransactionContext txnCtx = {
transactionId:transactionId, transactionBlockId:transactionBlockId,
coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL
transactionId: transactionId,
transactionBlockId: transactionBlockId,
coordinationType: TWO_PHASE_COMMIT,
registerAtURL: registerAtURL
};
final string trxId = transactionId;
log:printDebug("Registered with coordinator for transaction: " + trxId);
Expand Down
Loading

0 comments on commit b3bfec5

Please sign in to comment.