Skip to content

Commit

Permalink
fix(dvs-client): missing fields in callback 4
Browse files Browse the repository at this point in the history
  • Loading branch information
vincejv committed Aug 28, 2023
1 parent a48ee67 commit cb4a197
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*************************************************************************
* FPI Application - Abavilla *
* Copyright (C) 2023 Vince Jerald Villamora *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>.*
*************************************************************************/

package com.abavilla.fpi.load.codec;

import java.util.List;

import com.dtone.dvs.dto.Benefit;
import com.dtone.dvs.dto.BenefitFixed;
import com.dtone.dvs.dto.BenefitRanged;
import com.dtone.dvs.dto.Product;
import com.dtone.dvs.dto.ProductFixed;
import com.dtone.dvs.dto.ProductPrice;
import com.dtone.dvs.dto.ProductPriceFixed;
import com.dtone.dvs.dto.ProductPriceRanged;
import com.dtone.dvs.dto.ProductRanged;
import com.dtone.dvs.dto.ProductSource;
import com.dtone.dvs.dto.Source;
import com.dtone.dvs.dto.SourceFixed;
import com.dtone.dvs.dto.SourceRanged;
import com.dtone.dvs.dto.Transaction;
import com.dtone.dvs.dto.TransactionFixed;
import com.dtone.dvs.dto.TransactionRanged;
import org.bson.codecs.Codec;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.ClassModel;
import org.bson.codecs.pojo.PojoCodecProvider;

/**
* MongoDB Codec registry, contains the codec for saving DVS API Objects to mongodb.
*
* @author <a href="mailto:vincevillamora@gmail.com">Vince Villamora</a>
*/
public class DvsCodecProvider implements CodecProvider {

@SuppressWarnings("rawtypes")
private static final List<Class> discriminatorClasses;

static {
discriminatorClasses = List.of(
Transaction.class, TransactionRanged.class, TransactionFixed.class,
Product.class, ProductRanged.class, ProductFixed.class,
ProductPrice.class, ProductPriceRanged.class, ProductPriceFixed.class,
ProductSource.class, Source.class, SourceRanged.class, SourceFixed.class,
Benefit.class, BenefitFixed.class, BenefitRanged.class
);
}

@Override
public <T> Codec<T> get(Class<T> clazz, CodecRegistry registry) {
if (discriminatorClasses.contains(clazz)) {
return buildDiscriminatorCodec(clazz, registry);
}
return null; // Don't throw here, this tells
}

private static <T> Codec<T> buildDiscriminatorCodec(Class<T> clazz, CodecRegistry registry) {
ClassModel<T> discriminatorModel = ClassModel.builder(clazz)
.enableDiscriminator(true).build();
return PojoCodecProvider.builder()
.register(discriminatorModel)
.build().get(clazz, registry);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,18 @@ public Uni<Void> storeCallback(Transaction dvsCallbackTransaction) {
private Uni<Void> storeCallback(AbsMongoItem callbackResponse, ApiStatus status,
String provider, Long transactionId) {
var byTransId = advRepo.findByRespTransIdAndProvider(
String.valueOf(transactionId), provider);
String.valueOf(transactionId), provider);

byTransId.chain(transPulled -> checkIfTxExists(transPulled.orElse(null), transactionId))
.onFailure(ApiSvcEx.class).retry().withBackOff(
Duration.ofSeconds(3)).withJitter(0.2)
Duration.ofSeconds(3)).withJitter(0.2)
.atMost(5) // Retry for item not found and nothing else
.chain(transFound -> updateTransWithCallback(transFound, callbackResponse, status))
.chain(updatedTrans -> sendLoaderAckMsg(updatedTrans, callbackResponse, status))
.chain(updatedTrans -> sendFPIAckMsg(updatedTrans, callbackResponse, status)).onFailure()
.call(ex -> saveCallbackAsLeak(ex, callbackResponse, transactionId))
.subscribe().with(ignored->{});
.subscribe().with(ignored -> {
});

return Uni.createFrom().voidItem();
}
Expand All @@ -128,23 +129,23 @@ private Uni<Void> storeCallback(AbsMongoItem callbackResponse, ApiStatus status,
* Checks if rewards transaction have been logged in the db, if not, throw an {@link ApiSvcEx} exception.
*
* @param rewardsTransStatus Entity in DB found
* @param transactionId External transaction id
* @param transactionId External transaction id
* @return @return {@link Function} callback
*/
private Uni<? extends RewardsTransStatus> checkIfTxExists(RewardsTransStatus rewardsTransStatus, Long transactionId) {
if (rewardsTransStatus != null) {
return Uni.createFrom().item(rewardsTransStatus);
} else {
throw new ApiSvcEx("Trans Id for rewards callback not found: " + transactionId);
}
if (rewardsTransStatus != null) {
return Uni.createFrom().item(rewardsTransStatus);
} else {
throw new ApiSvcEx("Trans Id for rewards callback not found: " + transactionId);
}
}

/**
* Updates the rewards transaction with the callback status.
*
* @param rewardsTrans Rewards transaction
* @param field Callback status
* @param status Status of transaction
* @param field Callback status
* @param status Status of transaction
* @return {@link Function} callback
*/
private Uni<? extends RewardsTransStatus> updateTransWithCallback(RewardsTransStatus rewardsTrans, AbsMongoItem field,
Expand All @@ -161,24 +162,24 @@ private Uni<? extends RewardsTransStatus> updateTransWithCallback(RewardsTransSt
/**
* Logs the load transaction as a failed in the database
*
* @param ex Failed transaction
* @param field Load transaction
* @param ex Failed transaction
* @param field Load transaction
* @param transactionId External transaction id
* @return {@link Function} callback
*/
private Uni<?> saveCallbackAsLeak(Throwable ex, AbsMongoItem field, Long transactionId) {
Log.error("Rewards leak " + transactionId, ex);
field.setDateCreated(DateUtil.now());
field.setDateUpdated(DateUtil.now());
return leakRepo.persist(field)
.onFailure().recoverWithNull();
Log.error("Rewards leak " + transactionId, ex);
field.setDateCreated(DateUtil.now());
field.setDateUpdated(DateUtil.now());
return leakRepo.persist(field)
.onFailure().recoverWithNull();
}

/**
* Determines if transaction requires an acknowledgement message for successful load transfer
*
* @param rewardsTransStatus Rewards transaction
* @param status Load status
* @param status Load status
* @return {@link Function} callback
*/
private Uni<?> sendFPIAckMsg(RewardsTransStatus rewardsTransStatus, AbsMongoItem callbackResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ public Uni<LoadRespDto> reloadNumber(LoadReqDto loadReqDto) {

return skuLookup.chain(promo -> {
ILoadProviderSvc loadSvc = promo
.map(promoSku -> loadEngine.getProvider(promoSku))
.orElse(null);
.map(promoSku -> loadEngine.getProvider(promoSku))
.orElse(null);

if (loadSvc != null) {
log.setLoadProvider(loadSvc.getProviderName());
log.setDateUpdated(DateUtil.now());
return repo.persist(log)
.chain(savedLog -> reloadAndUpdateDb(savedLog, loadReqDto, promo.get(), loadSvc));
.chain(savedLog -> reloadAndUpdateDb(savedLog, loadReqDto, promo.get(), loadSvc));
} else {
return buildRejectedResponse();
}
Expand All @@ -92,7 +92,7 @@ private Uni<? extends LoadRespDto> reloadAndUpdateDb(RewardsTransStatus savedLog
PromoSku promo, ILoadProviderSvc loadSvcProvider) {
loadReqDto.setTransactionId(savedLog.getId().toString()); // map mongo id to load request
return loadSvcProvider.reload(loadReqDto, promo)
.chain(resp -> updateRequestInDb(resp, savedLog));
.chain(resp -> updateRequestInDb(resp, savedLog));
}

private Uni<LoadRespDto> buildRejectedResponse() {
Expand All @@ -104,25 +104,22 @@ private Uni<LoadRespDto> buildRejectedResponse() {
return Uni.createFrom().failure(ex);
}

private Uni<? extends LoadRespDto> updateRequestInDb(
LoadRespDto loadRespDto,
RewardsTransStatus logEntity) {
rewardsMapper.mapLoadRespDtoToEntity(
loadRespDto, logEntity);

if (loadRespDto.getStatus() == ApiStatus.WAIT ||
loadRespDto.getStatus() == ApiStatus.CREATED) {
// only generate a load sms id if there was no error encountered
// during load transaction during external load api call
logEntity.setLoadSmsId(LoadUtil.encodeId(logEntity.getLoadProvider(), logEntity.getTransactionId()));
}
private Uni<? extends LoadRespDto> updateRequestInDb(LoadRespDto loadRespDto, RewardsTransStatus logEntity) {
rewardsMapper.mapLoadRespDtoToEntity(loadRespDto, logEntity);
if (loadRespDto.getStatus() == ApiStatus.WAIT ||
loadRespDto.getStatus() == ApiStatus.CREATED) {
// only generate a load sms id if there was no error encountered
// during load transaction during external load api call
logEntity.setLoadSmsId(LoadUtil.encodeId(logEntity.getLoadProvider(), logEntity.getTransactionId()));
}

logEntity.setDateUpdated(DateUtil.now());
return repo.update(logEntity)
.map(res -> {
loadRespDto.setSmsTransactionId(res.getLoadSmsId());
return loadRespDto;
});
logEntity.setDateUpdated(DateUtil.now());
return repo.update(logEntity)
.map(res -> {
Log.info("saved logEntity: " + res);
loadRespDto.setSmsTransactionId(res.getLoadSmsId());
return loadRespDto;
});
}

}

0 comments on commit cb4a197

Please sign in to comment.