diff --git a/pom.xml b/pom.xml index 6c3d16ce..053b4b46 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ Salesforce plugins io.cdap.plugin salesforce-plugins - 1.6.8-SNAPSHOT + 1.6.8 jar Salesforce Plugins https://github.com/data-integrations/salesforce diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceWideRecordReader.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceWideRecordReader.java index cb57385c..e7eaec5c 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceWideRecordReader.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceWideRecordReader.java @@ -70,7 +70,10 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont public SalesforceWideRecordReader initialize( InputSplit inputSplit, AuthenticatorCredentials credentials) throws IOException, InterruptedException { - List> fetchedIdList = fetchBulkQueryIds(inputSplit, null); + // Use default configurations of BulkRecordReader. + super.initialize(inputSplit, credentials); + + List> fetchedIdList = fetchBulkQueryIds(); LOG.debug("Number of records received from batch job for wide object: '{}'", fetchedIdList.size()); try { @@ -84,12 +87,18 @@ public SalesforceWideRecordReader initialize( Lists.partition(fetchedIdList, SalesforceSourceConstants.WIDE_QUERY_MAX_BATCH_COUNT); LOG.debug("Number of partitions to be fetched for wide object: '{}'", partitions.size()); + // Process partitions with batches sized to adhere to API limits and optimize memory usage. + // [CDAP]TODO: Address issues while handling large datasets. results = partitions.parallelStream() - .map(this::getSObjectIds) - .map(sObjectIds -> fetchPartition(partnerConnection, fields, sObjectName, sObjectIds)) - .flatMap(Arrays::stream) - .map(sObject -> transformer.transformToMap(sObject, sObjectDescriptor)) - .collect(Collectors.toList()); + .flatMap(partition -> processPartition(partnerConnection, fields, sObjectName, + partition, sObjectDescriptor).stream()) + .collect(Collectors.toList()); + + if (results == null) { + LOG.warn("Result list is null after processing partitions."); + results = new ArrayList<>(); + } + return this; } catch (ConnectionException e) { String errorMessage = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e); @@ -123,15 +132,10 @@ public float getProgress() { /** * Fetches single entry map (Id -> SObjectId_value) values received from Bulk API. * - * @param inputSplit specifies batch details - * @param taskAttemptContext task context - * @return list of single entry Map * @throws IOException can be due error during reading query - * @throws InterruptedException interrupted sleep while waiting for batch results */ - private List> fetchBulkQueryIds(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - super.initialize(inputSplit, taskAttemptContext); + private List> fetchBulkQueryIds() + throws IOException { List> fetchedIdList = new ArrayList<>(); while (super.nextKeyValue()) { fetchedIdList.add(super.getCurrentValue()); @@ -181,4 +185,37 @@ private SObject[] fetchPartition(PartnerConnection partnerConnection, String fie e); } } + + /** + * Processes a partition of SObject records by dividing the IDs into smaller batches, + * retrieving the corresponding records from Salesforce, and transforming them into maps. + * + * @param partnerConnection the Salesforce partner connection used for retrieving data. + * @param fields the fields to be retrieved for each SObject. + * @param sObjectName the name of the Salesforce object (e.g., Account, Lead). + * @param partition the partition containing the ID records to be processed. + * @param sObjectDescriptor descriptor containing the structure of the SObject. + * @return result from partitions + */ + private List> processPartition(PartnerConnection partnerConnection, String fields, String sObjectName, + List> partition, SObjectDescriptor sObjectDescriptor) { + List> partitionResults = new ArrayList<>(); + // Divide the list of SObject Ids into smaller batches to avoid exceeding retrieve id limits. + + /* see more - https://developer.salesforce.com/docs/atlas.en-us.salesforce_app_limits_cheatsheet.meta/ + salesforce_app_limits_cheatsheet/salesforce_app_limits_platform_apicalls.htm */ + List> idBatches = Lists.partition( + Arrays.asList(getSObjectIds(partition)), SalesforceSourceConstants.RETRIEVE_MAX_BATCH_COUNT); + + // Iterate over each batch of Ids to fetch the records. + idBatches.forEach(idBatch -> { + SObject[] fetchedObjects = fetchPartition( + partnerConnection, fields, sObjectName, idBatch.toArray(new String[0])); + Arrays.stream(fetchedObjects) + .map(sObject -> transformer.transformToMap(sObject, sObjectDescriptor)) + .forEach(partitionResults::add); + }); + + return partitionResults; + } } diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java index 6967de85..3af38f5a 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/util/SalesforceSourceConstants.java @@ -59,6 +59,7 @@ public class SalesforceSourceConstants { public static final String CONFIG_RETRY_REQUIRED = "mapred.salesforce.retryOnBackendError"; public static final int WIDE_QUERY_MAX_BATCH_COUNT = 2000; + public static final int RETRIEVE_MAX_BATCH_COUNT = 2000; // https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/ // async_api_headers_enable_pk_chunking.htm public static final int MAX_PK_CHUNK_SIZE = 250000;