Skip to content

Commit

Permalink
Performance optimisation for INSERT BULK
Browse files Browse the repository at this point in the history
Initially Insert Bulk was implemented using SPI_execute_with_args, which practically were single inserts in the executor level. This made database migration slow for our users, with this task we have moved on from SPI to use PG’s multi insert functionality. PG’s multi insert feature which is much faster since we can write just a single WAL for all of the inserts at the end and we need to lock/unlock page only once. Moreover, there is no need to parse/plan/optimise the query (When compared to initial implementation) thus bypassing directly to inserting multiple tuples into the table at once. A speedup of 8x on average was achieved when compared to the initial implementation.

Task: BABEL-3503
Authored-by: Kushaal Shroff <kushaal@amazon.com>
Signed-off-by: Kushaal Shroff <kushaal@amazon.com>
  • Loading branch information
KushaalShroff authored Oct 31, 2022
1 parent 928f844 commit d91be29
Show file tree
Hide file tree
Showing 9 changed files with 1,117 additions and 201 deletions.
62 changes: 23 additions & 39 deletions contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,15 +481,15 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)
request->rowCount++;

rowData->columnValues = palloc0(request->colCount * sizeof(StringInfoData));
rowData->isNull = palloc0(request->colCount);
rowData->isNull = palloc0(request->colCount * sizeof(bool));

offset++;
request->currentBatchSize++;

while(i != request->colCount) /* Loop over each column. */
{
initStringInfo(&rowData->columnValues[i]);
rowData->isNull[i] = 'f';
rowData->isNull[i] = false;
switch(colmetadata[i].columnTdsType)
{
case TDS_TYPE_INTEGER:
Expand All @@ -514,7 +514,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)

if (rowData->columnValues[i].len == 0) /* null */
{
rowData->isNull[i] = 'n';
rowData->isNull[i] = true;
i++;
continue;
}
Expand Down Expand Up @@ -547,7 +547,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)
request->currentBatchSize++;
if (rowData->columnValues[i].len == 0) /* null */
{
rowData->isNull[i] = 'n';
rowData->isNull[i] = true;
i++;
continue;
}
Expand Down Expand Up @@ -592,7 +592,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)
}
else /* null */
{
rowData->isNull[i] = 'n';
rowData->isNull[i] = true;
i++;
continue;
}
Expand All @@ -607,7 +607,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)
CheckPLPStatusNotOK(request, retStatus, i);
if (temp->isNull) /* null */
{
rowData->isNull[i] = 'n';
rowData->isNull[i] = true;
i++;
temp->isNull = false;
continue;
Expand All @@ -630,7 +630,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)
request->currentBatchSize++;
if (dataTextPtrLen == 0) /* null */
{
rowData->isNull[i] = 'n';
rowData->isNull[i] = true;
i++;
continue;
}
Expand All @@ -647,7 +647,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)
request->currentBatchSize += sizeof(uint32_t);
if (rowData->columnValues[i].len == 0) /* null */
{
rowData->isNull[i] = 'n';
rowData->isNull[i] = true;
i++;
continue;
}
Expand All @@ -672,7 +672,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)
CheckPLPStatusNotOK(request, retStatus, i);
if (temp->isNull) /* null */
{
rowData->isNull[i] = 'n';
rowData->isNull[i] = true;
i++;
temp->isNull = false;
continue;
Expand All @@ -694,7 +694,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)

if (rowData->columnValues[i].len == 0) /* null */
{
rowData->isNull[i] = 'n';
rowData->isNull[i] = true;
i++;
continue;
}
Expand Down Expand Up @@ -742,7 +742,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)
void
ProcessBCPRequest(TDSRequest request)
{
int retValue = 0;
uint64 retValue = 0;
StringInfo temp = makeStringInfo();
TDSRequestBulkLoad req = (TDSRequestBulkLoad) request;
BulkLoadColMetaData *colMetaData = req->colMetaData;
Expand All @@ -755,9 +755,7 @@ ProcessBCPRequest(TDSRequest request)
{
int nargs = 0;
Datum *values = NULL;
char *nulls = NULL;
Oid *argtypes = NULL;
bool *defaults = NULL;
bool *nulls = NULL;
int count = 0;
ListCell *lc;

Expand Down Expand Up @@ -786,53 +784,39 @@ ProcessBCPRequest(TDSRequest request)
}
PG_END_TRY();
/*
* If the row-count is 0 then this no rows are left to be inserted.
* If the row-count is 0 then there are no rows left to be inserted.
* We should begin with cleanup.
*/
if (req->rowCount == 0)
{
/* Using Same callback function to fo the clean-up. */
pltsql_plugin_handler_ptr->bulk_load_callback(0, 0, NULL, NULL, NULL, NULL);
/* Using Same callback function to do the clean-up. */
pltsql_plugin_handler_ptr->bulk_load_callback(0, 0, NULL, NULL);
break;
}

/*
* defaults array will always contain nargs length of data, where as
* values and nulls array can be less than nargs length. The length of
* values and nulls array will be the number of bind params in
* bulk_load_callback function.
*/
nargs = req->colCount * req->rowCount;
values = palloc0(nargs * sizeof(Datum));
nulls = palloc0(nargs * sizeof(char));
argtypes= palloc0(nargs * sizeof(Oid));
defaults = palloc0(nargs * sizeof(bool));
nulls = palloc0(nargs * sizeof(bool));
nargs = 0;

foreach (lc, req->rowData) /* build an array of Value Datums */
{
BulkLoadRowData *row = (BulkLoadRowData *) lfirst(lc);
TdsIoFunctionInfo tempFuncInfo;
int currentColumn = 0;

while(currentColumn != req->colCount)
{
temp = &(row->columnValues[currentColumn]);
tempFuncInfo = TdsLookupTypeFunctionsByTdsId(colMetaData[currentColumn].columnTdsType, colMetaData[currentColumn].maxLen);
GetPgOid(argtypes[count], tempFuncInfo);
if (row->isNull[currentColumn] == 'n') /* null */
if (pltsql_plugin_handler_ptr->get_insert_bulk_keep_nulls())
if (row->isNull[currentColumn]) /* null */
nulls[count++] = row->isNull[currentColumn];
else
defaults[nargs] = true;
else
{
switch(colMetaData[currentColumn].columnTdsType)
{
case TDS_TYPE_CHAR:
case TDS_TYPE_VARCHAR:
case TDS_TYPE_TEXT:
values[count] = TdsTypeVarcharToDatum(temp, argtypes[count], colMetaData[currentColumn].collation, colMetaData[currentColumn].columnTdsType);
values[count] = TdsTypeVarcharToDatum(temp, colMetaData[currentColumn].collation, colMetaData[currentColumn].columnTdsType);
break;
case TDS_TYPE_NCHAR:
case TDS_TYPE_NVARCHAR:
Expand All @@ -854,7 +838,6 @@ ProcessBCPRequest(TDSRequest request)
case TDS_TYPE_BINARY:
case TDS_TYPE_IMAGE:
values[count] = TdsTypeVarbinaryToDatum(temp);
argtypes[count] = tempFuncInfo->ttmtypeid;
break;
case TDS_TYPE_DATE:
values[count] = TdsTypeDateToDatum(temp);
Expand Down Expand Up @@ -902,8 +885,7 @@ ProcessBCPRequest(TDSRequest request)
PG_TRY();
{
retValue += pltsql_plugin_handler_ptr->bulk_load_callback(req->colCount,
req->rowCount, argtypes,
values, nulls, defaults);
req->rowCount, values, nulls);
}
PG_CATCH();
{
Expand All @@ -920,6 +902,9 @@ ProcessBCPRequest(TDSRequest request)

RESUME_CANCEL_INTERRUPTS();

/* Using Same callback function to do the clean-up. */
pltsql_plugin_handler_ptr->bulk_load_callback(0, 0, NULL, NULL);

if (ret < 0)
TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request";

Expand All @@ -939,10 +924,9 @@ ProcessBCPRequest(TDSRequest request)
pfree(values);
if (nulls)
pfree(nulls);
if (argtypes)
pfree(argtypes);
}
}

/* Send Done Token if rows processed is a positive number. Command type - execute (0xf0). */
if (retValue >= 0)
TdsSendDone(TDS_TOKEN_DONE, TDS_DONE_COUNT, 0xf0, retValue);
Expand Down
24 changes: 10 additions & 14 deletions contrib/babelfishpg_tds/src/backend/tds/tdstypeio.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ uint128 GetMsgUInt128(StringInfo msg);
float4 GetMsgFloat4(StringInfo msg);
float8 GetMsgFloat8(StringInfo msg);
static void SwapData(StringInfo buf, int st, int end);
static Datum TdsAnyToServerEncodingConversion(Oid oid, pg_enc encoding, char *str, int len, uint8_t tdsColDataType);
static Datum TdsAnyToServerEncodingConversion(pg_enc encoding, char *str, int len, uint8_t tdsColDataType);
int TdsUTF16toUTF8XmlResult(StringInfo buf, void **resultPtr);

Datum TdsTypeBitToDatum(StringInfo buf);
Datum TdsTypeIntegerToDatum(StringInfo buf, int maxLen);
Datum TdsTypeFloatToDatum(StringInfo buf, int maxLen);
Datum TdsTypeVarcharToDatum(StringInfo buf, Oid pgTypeOid, uint32_t collation, uint8_t tdsColDataType);
Datum TdsTypeVarcharToDatum(StringInfo buf, uint32_t collation, uint8_t tdsColDataType);
Datum TdsTypeNCharToDatum(StringInfo buf);
Datum TdsTypeNumericToDatum(StringInfo buf, int scale);
Datum TdsTypeVarbinaryToDatum(StringInfo buf);
Expand Down Expand Up @@ -309,7 +309,7 @@ int TdsUTF16toUTF8XmlResult(StringInfo buf, void **resultPtr)
* and convert the encoding of input str
*/
static Datum
TdsAnyToServerEncodingConversion(Oid oid, pg_enc encoding, char *str, int len, uint8_t tdsColDataType)
TdsAnyToServerEncodingConversion(pg_enc encoding, char *str, int len, uint8_t tdsColDataType)
{
Oid typinput;
Oid typioparam;
Expand All @@ -334,7 +334,7 @@ TdsAnyToServerEncodingConversion(Oid oid, pg_enc encoding, char *str, int len, u
default:
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("TdsAnyToServerEncodingConversion is not supported for Oid: %s", format_type_be(oid))));
errmsg("TdsAnyToServerEncodingConversion is not supported for Tds Type: %d", tdsColDataType)));
break;
}

Expand Down Expand Up @@ -902,7 +902,7 @@ TdsTypeFloatToDatum(StringInfo buf, int maxLen)

/* Helper Function to convert Varchar,Char and Text values into Datum. */
Datum
TdsTypeVarcharToDatum(StringInfo buf, Oid pgTypeOid, uint32_t collation, uint8_t tdsColDataType)
TdsTypeVarcharToDatum(StringInfo buf, uint32_t collation, uint8_t tdsColDataType)
{
char csave;
Datum pval;
Expand All @@ -914,8 +914,7 @@ TdsTypeVarcharToDatum(StringInfo buf, Oid pgTypeOid, uint32_t collation, uint8_t
/* If we recieve 0 value for LCID then we should treat it as a default LCID.*/
encoding = TdsGetEncoding(collation);

pval = TdsAnyToServerEncodingConversion(pgTypeOid,
encoding,
pval = TdsAnyToServerEncodingConversion(encoding,
buf->data, buf->len,
tdsColDataType);
buf->data[buf->len] = csave;
Expand Down Expand Up @@ -1559,8 +1558,7 @@ TdsRecvTypeVarchar(const char *message, const ParameterToken token)

csave = buf->data[buf->len];
buf->data[buf->len] = '\0';
pval = TdsAnyToServerEncodingConversion(token->paramMeta.pgTypeOid,
token->paramMeta.encoding,
pval = TdsAnyToServerEncodingConversion(token->paramMeta.encoding,
buf->data, buf->len, TDS_TYPE_VARCHAR);
buf->data[buf->len] = csave;

Expand Down Expand Up @@ -1632,8 +1630,7 @@ TdsRecvTypeText(const char *message, const ParameterToken token)

csave = buf->data[buf->len];
buf->data[buf->len] = '\0';
pval = TdsAnyToServerEncodingConversion(token->paramMeta.pgTypeOid,
token->paramMeta.encoding, buf->data, buf->len,
pval = TdsAnyToServerEncodingConversion(token->paramMeta.encoding, buf->data, buf->len,
TDS_TYPE_TEXT);
buf->data[buf->len] = csave;

Expand Down Expand Up @@ -1668,8 +1665,7 @@ TdsRecvTypeChar(const char *message, const ParameterToken token)

csave = buf->data[buf->len];
buf->data[buf->len] = '\0';
pval = TdsAnyToServerEncodingConversion(token->paramMeta.pgTypeOid,
token->paramMeta.encoding, buf->data, buf->len, TDS_TYPE_CHAR);
pval = TdsAnyToServerEncodingConversion(token->paramMeta.encoding, buf->data, buf->len, TDS_TYPE_CHAR);
buf->data[buf->len] = csave;

pfree(buf);
Expand Down Expand Up @@ -2080,7 +2076,7 @@ TdsRecvTypeTable(const char *message, const ParameterToken token)
{
case TDS_TYPE_CHAR:
case TDS_TYPE_VARCHAR:
values[i] = TdsTypeVarcharToDatum(temp, argtypes[i], colMetaData[currentColumn].collation, colMetaData[currentColumn].columnTdsType);
values[i] = TdsTypeVarcharToDatum(temp, colMetaData[currentColumn].collation, colMetaData[currentColumn].columnTdsType);
break;
case TDS_TYPE_NCHAR:
values[i] = TdsTypeNCharToDatum(temp);
Expand Down
4 changes: 2 additions & 2 deletions contrib/babelfishpg_tds/src/include/tds_typeio.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ typedef struct BulkLoadRowData
/* Array of length col count, holds value of each column in that row. */
StringInfo columnValues;

char *isNull;
bool *isNull;
} BulkLoadRowData;

/* Map TVP to its underlying table, either by relid or by table name. */
Expand Down Expand Up @@ -457,7 +457,7 @@ extern Datum TdsRecvTypeDatetimeoffset(const char *message, const ParameterToken
extern Datum TdsTypeBitToDatum(StringInfo buf);
extern Datum TdsTypeIntegerToDatum(StringInfo buf, int maxLen);
extern Datum TdsTypeFloatToDatum(StringInfo buf, int maxLen);
extern Datum TdsTypeVarcharToDatum(StringInfo buf, Oid pgTypeOid, uint32_t collation, uint8_t tdsColDataType);
extern Datum TdsTypeVarcharToDatum(StringInfo buf, uint32_t collation, uint8_t tdsColDataType);
extern Datum TdsTypeNCharToDatum(StringInfo buf);
extern Datum TdsTypeNumericToDatum(StringInfo buf, int scale);
extern Datum TdsTypeVarbinaryToDatum(StringInfo buf);
Expand Down
1 change: 1 addition & 0 deletions contrib/babelfishpg_tsql/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export ANTLR4_JAVA_BIN=java
export ANTLR4_RUNTIME_LIB=-lantlr4-runtime
export ANTLR4_RUNTIME_INCLUDE_DIR=/usr/local/include/antlr4-runtime
export ANTLR4_RUNTIME_LIB_DIR=/usr/local/lib
OBJS += src/pltsql_bulkcopy.o

PG_CXXFLAGS += -g -Werror
PG_CXXFLAGS += -Wno-deprecated -Wno-error=attributes -Wno-suggest-attribute=format # disable some warnings from ANTLR runtime header
Expand Down
Loading

0 comments on commit d91be29

Please sign in to comment.