Skip to content

Commit

Permalink
♻️ Refactor SQL chunk processing to reduce memory errors
Browse files Browse the repository at this point in the history
This refactor increases the likelihood of processing larger `.sql` files without encountering memory errors.

- Updated `processor` to improve error handling and prevent unnecessary semicolon-related logic.
- Modified `processSQLInChunks` to track read offsets and adjust chunk sizes dynamically.
- Improved test cases to ensure SQL chunks are processed correctly, even when split mid-statement.
  • Loading branch information
hoshinotsuyoshi committed Feb 10, 2025
1 parent 0cda4f4 commit de1bc5d
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 51 deletions.
64 changes: 52 additions & 12 deletions frontend/packages/db-structure/src/parser/sql/postgresql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,66 @@ import { mergeDBStructures } from './mergeDBStructures.js'
import { parse } from './parser.js'
import { processSQLInChunks } from './processSQLInChunks.js'

export const processor: Processor = async (str: string) => {
const dbStructure: DBStructure = { tables: {}, relationships: {} }
const CHUNK_SIZE = 1000
const errors: ProcessError[] = []
/**
* Processes SQL statements and constructs a database structure.
*/
export const processor: Processor = async (sql: string) => {
const dbSchema: DBStructure = { tables: {}, relationships: {} }

// Number of lines to process in a single chunk.
// While a chunk size of around 1000 might work, running it on db/structure.sql
// from https://gitlab.com/gitlab-org/gitlab-foss resulted in a memory error.
// Keep this in mind when considering any adjustments.
const CHUNK_SIZE = 500

const parseErrors: ProcessError[] = []

const errors = await processSQLInChunks(sql, CHUNK_SIZE, async (chunk) => {
let readOffset: number | null = null
let errorOffset: number | null = null
const errors: ProcessError[] = []

await processSQLInChunks(str, CHUNK_SIZE, async (chunk) => {
const { parse_tree, error: parseError } = await parse(chunk)

if (parse_tree.stmts.length > 0 && parseError !== null) {
throw new Error('UnexpectedCondition')
}

if (parseError !== null) {
errors.push(new UnexpectedTokenWarningError(parseError.message))
errorOffset = parseError.cursorpos
return [errorOffset, readOffset, errors]
}

const { value: converted, errors: convertErrors } = convertToDBStructure(
parse_tree.stmts,
)
if (convertErrors !== null) {
errors.push(...convertErrors)
let isLastStatementComplete = true
const statementCount = parse_tree.stmts.length

if (statementCount > 0) {
const lastStmt = parse_tree.stmts[statementCount - 1]
if (lastStmt?.stmt_len === undefined) {
isLastStatementComplete = false
if (lastStmt?.stmt_location === undefined) {
throw new Error('UnexpectedCondition')
}
readOffset = lastStmt?.stmt_location - 1
}
}

mergeDBStructures(dbStructure, converted)
const { value: convertedSchema, errors: conversionErrors } =
convertToDBStructure(
isLastStatementComplete
? parse_tree.stmts
: parse_tree.stmts.slice(0, -1),
)

if (conversionErrors !== null) {
parseErrors.push(...conversionErrors)
}

mergeDBStructures(dbSchema, convertedSchema)

return [errorOffset, readOffset, errors]
})

return { value: dbStructure, errors }
return { value: dbSchema, errors: parseErrors.concat(errors) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ describe(processSQLInChunks, () => {
it('should split input by newline and process each chunk', async () => {
const input = 'SELECT 1;\nSELECT 2;\nSELECT 3;'
const chunkSize = 2
const callback = vi.fn()
const callback = vi.fn().mockResolvedValue([null, null, []])

await processSQLInChunks(input, chunkSize, callback)

Expand All @@ -18,7 +18,7 @@ describe(processSQLInChunks, () => {
it('should handle chunks correctly to avoid invalid SQL syntax', async () => {
const input = 'SELECT 1;\nSELECT 2;\nSELECT 3;\nSELECT 4;'
const chunkSize = 3
const callback = vi.fn()
const callback = vi.fn().mockResolvedValue([null, null, []])

await processSQLInChunks(input, chunkSize, callback)

Expand All @@ -30,7 +30,7 @@ describe(processSQLInChunks, () => {
it('should handle input with no newlines correctly', async () => {
const input = 'SELECT 1; SELECT 2; SELECT 3;'
const chunkSize = 1
const callback = vi.fn()
const callback = vi.fn().mockResolvedValue([null, null, []])

await processSQLInChunks(input, chunkSize, callback)

Expand All @@ -41,24 +41,41 @@ describe(processSQLInChunks, () => {
it('should handle empty input correctly', async () => {
const input = ''
const chunkSize = 1
const callback = vi.fn()
const callback = vi.fn().mockResolvedValue([null, null, []])

await processSQLInChunks(input, chunkSize, callback)

expect(callback).not.toHaveBeenCalled()
})

it('should correctly process SQL chunks while ignoring semicolons in comment lines starting with "--"', async () => {
const input =
'SELECT 1;\nSELECT 2;\n-- This is a comment line; additional text here should be ignored.\nSELECT 3;\nSELECT 4;'
it('should correctly handle readOffset by partially consuming chunk lines', async () => {
const input = [
'SELECT 1;',
'SELECT 2;',
'SELECT 3, -- partial statement',
'4;',
].join('\n')
const chunkSize = 3
const callback = vi.fn()
const callback = vi
.fn()
// On the first call, return readOffset=19, indicating that only part of the chunk was successfully processed.
.mockResolvedValueOnce([null, 19, []])
// Subsequent calls should process without issues.
.mockResolvedValue([null, null, []])

await processSQLInChunks(input, chunkSize, callback)
const errors = await processSQLInChunks(input, chunkSize, callback)

expect(callback).toHaveBeenCalledTimes(2)
expect(callback).toHaveBeenCalledWith('SELECT 1;\nSELECT 2;\nSELECT 3;')
expect(callback).toHaveBeenCalledWith('SELECT 4;')
expect(errors).toEqual([])
// Verify the first call: the first three lines are passed as a chunk.
expect(callback).toHaveBeenNthCalledWith(
1,
'SELECT 1;\nSELECT 2;\nSELECT 3, -- partial statement',
)
// Verify the second call: the unprocessed part of the chunk is retried along with the next line.
expect(callback).toHaveBeenNthCalledWith(
2,
'SELECT 3, -- partial statement\n4;',
)
})
})
})
Original file line number Diff line number Diff line change
@@ -1,38 +1,101 @@
import type { ProcessError } from '../../errors.js'

/**
* Processes a large SQL input string in chunks, ensuring that each chunk ends with a complete SQL statement.
* Processes a large SQL input string in chunks (by line count)
*
* @param input - The large SQL input string to be processed.
* @param chunkSize - The number of lines to include in each chunk.
* @param callback - An asynchronous callback function to process each chunk of SQL statements.
* @param sqlInput - The SQL input string to be processed.
* @param chunkSize - The number of lines per chunk (e.g., 500).
* @param callback - An asynchronous function to process each chunk.
*/
export const processSQLInChunks = async (
input: string,
sqlInput: string,
chunkSize: number,
callback: (chunk: string) => Promise<void>,
): Promise<void> => {
const semicolon = ';'
// Even though the parser can handle "--", we remove such lines for ease of splitting by semicolons.
const lines = input.split('\n').filter((line) => !line.startsWith('--'))

let partialStmt = ''

for (let i = 0; i < lines.length; i += chunkSize) {
const chunk = lines.slice(i, i + chunkSize).join('\n')
const combined = partialStmt + chunk

const lastSemicolonIndex = combined.lastIndexOf(semicolon)
if (lastSemicolonIndex === -1) {
partialStmt = combined
continue
callback: (
chunk: string,
) => Promise<[number | null, number | null, ProcessError[]]>,
): Promise<ProcessError[]> => {
if (sqlInput === '') return []
const lines = sqlInput.split('\n')
let currentChunkSize = 0
const processErrors: ProcessError[] = []

for (let i = 0; i < lines.length; ) {
if (processErrors.length > 0) break
currentChunkSize = chunkSize
enum RetryDirection {
Decrease = -1, // Shrinking mode
Increase = 1, // Expanding mode
}
let retryDirection: RetryDirection = RetryDirection.Decrease

while (true) {
// NOTE: To minimize unnecessary retries, avoid increasing currentChunkSize excessively,
// especially when errorOffset is present.
if (retryDirection === RetryDirection.Decrease) {
if (i + currentChunkSize > lines.length) {
currentChunkSize = lines.length - i
}
}

const chunk = lines.slice(i, i + currentChunkSize).join('\n')
const [errorOffset, readOffset, errors] = await callback(chunk)

const parseablePart = combined.slice(0, lastSemicolonIndex + 1)
partialStmt = combined.slice(lastSemicolonIndex + 1)
await callback(parseablePart)
if (errorOffset !== null) {
if (retryDirection === RetryDirection.Decrease) {
currentChunkSize--
if (currentChunkSize === 0) {
retryDirection = RetryDirection.Increase
currentChunkSize = chunkSize
}
} else if (retryDirection === RetryDirection.Increase) {
currentChunkSize++
// NOTE: No further progress can be made in this case, so break.
if (i + currentChunkSize > lines.length) {
processErrors.push(...errors)
break
}
// NOTE: Prevent excessive memory usage. If currentChunkSize exceeds twice the original chunkSize, return an error.
// The factor of 2 is arbitrary and can be adjusted in the future if necessary.
if (currentChunkSize > chunkSize * 2) {
processErrors.push(...errors)
break
}
}
} else if (readOffset !== null) {
const lineNumber = getLineNumber(chunk, readOffset)
if (lineNumber === null) {
throw new Error('UnexpectedCondition')
}
i += lineNumber
break
} else {
i += currentChunkSize
break
}
}
}

// Process the last remaining statement.
if (partialStmt.trim()) {
await callback(partialStmt)
return processErrors
}

/**
* Determines the line number in a string corresponding to a given character index.
*
* @param inputString - The string to search within.
* @param charIndex - The character index.
* @returns The line number, or null if the index is out of bounds.
*/
function getLineNumber(inputString: string, charIndex: number): number | null {
if (charIndex < 0 || charIndex >= inputString.length) return null

let lineNumber = 1
let currentIndex = 0

for (const char of inputString) {
if (currentIndex === charIndex) return lineNumber
if (char === '\n') lineNumber++
currentIndex++
}

return null
}

0 comments on commit de1bc5d

Please sign in to comment.