From 5c1ef6d2be7c3dd4cd3b49882d061b595da7163b Mon Sep 17 00:00:00 2001 From: Yordan Ivanov Date: Wed, 9 Oct 2024 15:23:10 +0300 Subject: [PATCH 1/2] Support BULK 2.0 API The BULK 2.0 API is faster and sipler to use. It's also specifically built to work larger data sets. From a user point of vew, it's the same as using the classic BULK API. All you need to do is to change the API type to "BULK2". --- README.md | 4 +- tap_salesforce/__init__.py | 2 +- tap_salesforce/salesforce/__init__.py | 9 ++- tap_salesforce/salesforce/bulk2.py | 90 +++++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 5 deletions(-) create mode 100644 tap_salesforce/salesforce/bulk2.py diff --git a/README.md b/README.md index 5ab9361..ebff2bf 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ pip install git+https://github.com/MeltanoLabs/tap-salesforce.git **Required** ``` { - "api_type": "BULK", + "api_type": "BULK2", "select_fields_by_default": true, } ``` @@ -67,7 +67,7 @@ The `client_id` and `client_secret` keys are your OAuth Salesforce App secrets. The `start_date` is used by the tap as a bound on SOQL queries when searching for records. This should be an [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) formatted date-time, like "2018-01-08T00:00:00Z". For more details, see the [Singer best practices for dates](https://github.com/singer-io/getting-started/blob/master/BEST_PRACTICES.md#dates). -The `api_type` is used to switch the behavior of the tap between using Salesforce's "REST" and "BULK" APIs. When new fields are discovered in Salesforce objects, the `select_fields_by_default` key describes whether or not the tap will select those fields by default. +The `api_type` is used to switch the behavior of the tap between using Salesforce's "REST", "BULK" and "BULK 2.0" APIs. When new fields are discovered in Salesforce objects, the `select_fields_by_default` key describes whether or not the tap will select those fields by default. The `state_message_threshold` is used to throttle how often STATE messages are generated when the tap is using the "REST" API. This is a balance between not slowing down execution due to too many STATE messages produced and how many records must be fetched again if a tap fails unexpectedly. Defaults to 1000 (generate a STATE message every 1000 records). diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 8110893..973eaa1 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -186,7 +186,7 @@ def do_discover(sf: Salesforce, streams: list[str]): f, mdata) # Compound Address fields cannot be queried by the Bulk API - if f['type'] in ("address", "location") and sf.api_type == tap_salesforce.salesforce.BULK_API_TYPE: + if f['type'] in ("address", "location") and sf.api_type in [tap_salesforce.salesforce.BULK_API_TYPE, tap_salesforce.salesforce.BULK2_API_TYPE]: unsupported_fields.add( (field_name, 'cannot query compound address fields with bulk API')) diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index 36b4a0b..f1ff070 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -9,6 +9,7 @@ from singer import metadata, metrics from tap_salesforce.salesforce.bulk import Bulk +from tap_salesforce.salesforce.bulk2 import Bulk2 from tap_salesforce.salesforce.rest import Rest from tap_salesforce.salesforce.exceptions import ( TapSalesforceException, @@ -20,6 +21,7 @@ LOGGER = singer.get_logger() BULK_API_TYPE = "BULK" +BULK2_API_TYPE = "BULK2" REST_API_TYPE = "REST" STRING_TYPES = set([ @@ -388,6 +390,9 @@ def query(self, catalog_entry, state): if self.api_type == BULK_API_TYPE: bulk = Bulk(self) return bulk.query(catalog_entry, state) + elif self.api_type == BULK2_API_TYPE: + bulk = Bulk2(self) + return bulk.query(catalog_entry, state) elif self.api_type == REST_API_TYPE: rest = Rest(self) return rest.query(catalog_entry, state) @@ -397,7 +402,7 @@ def query(self, catalog_entry, state): self.api_type)) def get_blacklisted_objects(self): - if self.api_type == BULK_API_TYPE: + if self.api_type in [BULK_API_TYPE, BULK2_API_TYPE]: return UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS.union( QUERY_RESTRICTED_SALESFORCE_OBJECTS).union(QUERY_INCOMPATIBLE_SALESFORCE_OBJECTS) elif self.api_type == REST_API_TYPE: @@ -409,7 +414,7 @@ def get_blacklisted_objects(self): # pylint: disable=line-too-long def get_blacklisted_fields(self): - if self.api_type == BULK_API_TYPE: + if self.api_type == BULK_API_TYPE or self.api_type == BULK2_API_TYPE: return {('EntityDefinition', 'RecordTypesSupported'): "this field is unsupported by the Bulk API."} elif self.api_type == REST_API_TYPE: return {} diff --git a/tap_salesforce/salesforce/bulk2.py b/tap_salesforce/salesforce/bulk2.py new file mode 100644 index 0000000..a740faf --- /dev/null +++ b/tap_salesforce/salesforce/bulk2.py @@ -0,0 +1,90 @@ +import time +import csv +import sys +import json +import singer +from singer import metrics + + +BATCH_STATUS_POLLING_SLEEP = 20 +DEFAULT_CHUNK_SIZE = 50000 + +LOGGER = singer.get_logger() + +class Bulk2(): + bulk_url = '{}/services/data/v60.0/jobs/query' + + def __init__(self, sf): + csv.field_size_limit(sys.maxsize) + self.sf = sf + + + def query(self, catalog_entry, state): + job_id = self._create_job(catalog_entry, state) + self._wait_for_job(job_id) + + for batch in self._get_next_batch(job_id): + reader = csv.DictReader(batch.decode('utf-8').splitlines()) + + for row in reader: + yield row + + + def _get_bulk_headers(self): + return {**self.sf.auth.rest_headers, "Content-Type": "application/json"} + + def _create_job(self, catalog_entry, state): + url = self.bulk_url.format(self.sf.instance_url) + start_date = self.sf.get_start_date(state, catalog_entry) + + query = self.sf._build_query_string(catalog_entry, start_date, order_by_clause=True) + + body = { + "operation": "query", + "query": query, + } + + with metrics.http_request_timer("create_job") as timer: + timer.tags['sobject'] = catalog_entry['stream'] + resp = self.sf._make_request( + 'POST', + url, + headers=self._get_bulk_headers(), + body=json.dumps(body)) + + job = resp.json() + + return job['id'] + + def _wait_for_job(self, job_id): + status_url = self.bulk_url + '/{}' + url = status_url.format(self.sf.instance_url, job_id) + status = None + + while status not in ('JobComplete', 'Failed'): + resp = self.sf._make_request('GET', url, headers=self._get_bulk_headers()).json() + status = resp['state'] + + if status == 'JobComplete': + break + + if status == 'Failed': + raise Exception("Job failed: {}".format(resp.json())) + + time.sleep(BATCH_STATUS_POLLING_SLEEP) + + def _get_next_batch(self, job_id): + url = self.bulk_url + '/{}/results' + url = url.format(self.sf.instance_url, job_id) + locator = '' + + while locator != 'null': + params = {"maxRecords": DEFAULT_CHUNK_SIZE} + + if locator != '': + params['locator'] = locator + + resp = self.sf._make_request('GET', url, headers=self._get_bulk_headers(), params=params) + locator = resp.headers.get('Sforce-Locator') + + yield resp.content From e9a0cee2ff691eb1695f600e98ec1f7e5cf49110 Mon Sep 17 00:00:00 2001 From: Yordan Ivanov Date: Wed, 9 Oct 2024 16:19:52 +0300 Subject: [PATCH 2/2] Disable ordering for BULK 2.0 calls Ordering records when using the Bulk 2.0 disables PKChunking. This might lead to query timeouts. More info: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/bulk_api_2_0_troubleshoot_query_timeouts.htm --- tap_salesforce/salesforce/bulk2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_salesforce/salesforce/bulk2.py b/tap_salesforce/salesforce/bulk2.py index a740faf..6f1ba22 100644 --- a/tap_salesforce/salesforce/bulk2.py +++ b/tap_salesforce/salesforce/bulk2.py @@ -37,7 +37,7 @@ def _create_job(self, catalog_entry, state): url = self.bulk_url.format(self.sf.instance_url) start_date = self.sf.get_start_date(state, catalog_entry) - query = self.sf._build_query_string(catalog_entry, start_date, order_by_clause=True) + query = self.sf._build_query_string(catalog_entry, start_date, order_by_clause=False) body = { "operation": "query",