Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrading to release version 1.6.8 #267

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<name>Salesforce plugins</name>
<groupId>io.cdap.plugin</groupId>
<artifactId>salesforce-plugins</artifactId>
<version>1.6.8-SNAPSHOT</version>
<version>1.6.8</version>
<packaging>jar</packaging>
<description>Salesforce Plugins</description>
<url>https://github.com/data-integrations/salesforce</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
public SalesforceWideRecordReader initialize(
InputSplit inputSplit, AuthenticatorCredentials credentials)
throws IOException, InterruptedException {
List<Map<String, ?>> fetchedIdList = fetchBulkQueryIds(inputSplit, null);
// Use default configurations of BulkRecordReader.
super.initialize(inputSplit, credentials);

List<Map<String, ?>> fetchedIdList = fetchBulkQueryIds();
LOG.debug("Number of records received from batch job for wide object: '{}'", fetchedIdList.size());

try {
Expand All @@ -84,12 +87,18 @@ public SalesforceWideRecordReader initialize(
Lists.partition(fetchedIdList, SalesforceSourceConstants.WIDE_QUERY_MAX_BATCH_COUNT);
LOG.debug("Number of partitions to be fetched for wide object: '{}'", partitions.size());

// Process partitions with batches sized to adhere to API limits and optimize memory usage.
// [CDAP]TODO: Address issues while handling large datasets.
results = partitions.parallelStream()
.map(this::getSObjectIds)
.map(sObjectIds -> fetchPartition(partnerConnection, fields, sObjectName, sObjectIds))
.flatMap(Arrays::stream)
.map(sObject -> transformer.transformToMap(sObject, sObjectDescriptor))
.collect(Collectors.toList());
.flatMap(partition -> processPartition(partnerConnection, fields, sObjectName,
partition, sObjectDescriptor).stream())
.collect(Collectors.toList());

if (results == null) {
LOG.warn("Result list is null after processing partitions.");
results = new ArrayList<>();
}

return this;
} catch (ConnectionException e) {
String errorMessage = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
Expand Down Expand Up @@ -123,15 +132,10 @@ public float getProgress() {
/**
* Fetches single entry map (Id -> SObjectId_value) values received from Bulk API.
*
* @param inputSplit specifies batch details
* @param taskAttemptContext task context
* @return list of single entry Map
* @throws IOException can be due error during reading query
* @throws InterruptedException interrupted sleep while waiting for batch results
*/
private List<Map<String, ?>> fetchBulkQueryIds(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
super.initialize(inputSplit, taskAttemptContext);
private List<Map<String, ?>> fetchBulkQueryIds()
throws IOException {
List<Map<String, ?>> fetchedIdList = new ArrayList<>();
while (super.nextKeyValue()) {
fetchedIdList.add(super.getCurrentValue());
Expand Down Expand Up @@ -181,4 +185,37 @@ private SObject[] fetchPartition(PartnerConnection partnerConnection, String fie
e);
}
}

/**
* Processes a partition of SObject records by dividing the IDs into smaller batches,
* retrieving the corresponding records from Salesforce, and transforming them into maps.
*
* @param partnerConnection the Salesforce partner connection used for retrieving data.
* @param fields the fields to be retrieved for each SObject.
* @param sObjectName the name of the Salesforce object (e.g., Account, Lead).
* @param partition the partition containing the ID records to be processed.
* @param sObjectDescriptor descriptor containing the structure of the SObject.
* @return result from partitions
*/
private List<Map<String, ?>> processPartition(PartnerConnection partnerConnection, String fields, String sObjectName,
List<Map<String, ?>> partition, SObjectDescriptor sObjectDescriptor) {
List<Map<String, ?>> partitionResults = new ArrayList<>();
// Divide the list of SObject Ids into smaller batches to avoid exceeding retrieve id limits.

/* see more - https://developer.salesforce.com/docs/atlas.en-us.salesforce_app_limits_cheatsheet.meta/
salesforce_app_limits_cheatsheet/salesforce_app_limits_platform_apicalls.htm */
List<List<String>> idBatches = Lists.partition(
Arrays.asList(getSObjectIds(partition)), SalesforceSourceConstants.RETRIEVE_MAX_BATCH_COUNT);

// Iterate over each batch of Ids to fetch the records.
idBatches.forEach(idBatch -> {
SObject[] fetchedObjects = fetchPartition(
partnerConnection, fields, sObjectName, idBatch.toArray(new String[0]));
Arrays.stream(fetchedObjects)
.map(sObject -> transformer.transformToMap(sObject, sObjectDescriptor))
.forEach(partitionResults::add);
});

return partitionResults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class SalesforceSourceConstants {
public static final String CONFIG_RETRY_REQUIRED = "mapred.salesforce.retryOnBackendError";

public static final int WIDE_QUERY_MAX_BATCH_COUNT = 2000;
public static final int RETRIEVE_MAX_BATCH_COUNT = 2000;
// https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/
// async_api_headers_enable_pk_chunking.htm
public static final int MAX_PK_CHUNK_SIZE = 250000;
Expand Down
Loading